java调用Rsync并发迁移数据并执行校验
java代码如下
RsyncFile.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
|
import lombok.NoArgsConstructor;
import lombok.SneakyThrows;
import java.io.*;
import java.util.ArrayList;
import java.util.Date;
import java.util.concurrent.*;
/**
* @ClassName RsyncFile
* @Descriptiom TODO rsync多线程同步迁移数据
* @Author KING
* @Date 2019/11/25 09:17
* @Version 1.2.2
* rsync -vzrtopg --progress --delete //镜像同步
**/
@NoArgsConstructor
public class RsyncFile implements Runnable{
private static final int availProcessors = Runtime.getRuntime().availableProcessors();
//构造以cpu核心数为核心池,cpu线程数为最大池,超时时间为1s,线程队列为大小为*的安全阻塞线程队列,拒绝策略为DiscardOldestPolicy()的线程池。(同步数据当然不能丢下拒绝任务)
private ExecutorService ThreadPool = new ThreadPoolExecutor(availProcessors >> 1 ,
availProcessors,
1L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(),Executors.defaultThreadFactory(),
new ThreadPoolExecutor.DiscardOldestPolicy());
//保存扫描得到的文件列表
private static ArrayList<String> fileNameList = new ArrayList<String>();
private String shellname;
private String filename;
private String userip;
private CountDownLatch countDownLatch;
private static int deep = 0 ;
public RsyncFile(String ShellName, String filename, String UserIP, CountDownLatch countDownLatch) {
this .shellname = ShellName;
this .filename = filename;
this .userip = UserIP;
this .countDownLatch = countDownLatch;
}
public static void main(String[] args) {
try {
new RsyncFile().Do(args[ 0 ],args[ 1 ],Integer.parseInt(args[ 2 ]));
} catch (ArrayIndexOutOfBoundsException e){
System.out.println(e);
System.out.println( "Error , args send fault" );
System.out.println( "please send localAddress remote username @ remote IP or hostname and catalogue" );
System.out.println( "like this [ /home/test/ root@node1:/test/ 1 ]" );
} catch (NumberFormatException e1){
System.out.println(e1);
System.out.println( "please input Right Directory depth, this number must be int" );
System.out.println( "like this [ /home/test/ root@node1:/test/ 1 ]" );
}
}
@SneakyThrows
private void Do(String content,String UserIP, int setdeep){
System.out.println( "开始执行" );
System.out.println( "开始时间:" + new Date());
Long a = System.nanoTime();
File file = new File(content);
System.out.println( "开始扫描本地指定目录" );
GetAllFile(file,setdeep); //按深度扫描非空文件夹和文件
System.out.println( "扫描本地目录完成" );
//给脚本赋予权限
String [] cmd={ "/bin/sh" , "-c" , "chmod 755 ./do*" };
Runtime.getRuntime().exec(cmd);
//创建远端目录操作
System.out.println( "开始创建远端目录结构" );
//一次计数锁用于保证目录创建完成
CountDownLatch doDirLock = new CountDownLatch( 1 );
ThreadPool.execute( new RsyncFile( "./doDirc.sh" ,content,UserIP,doDirLock));
doDirLock.await();
System.out.println( "创建远端目录结构完成" );
//开始同步工作
System.out.println( "开始执行同步工作" );
System.out.println( "同步的文件夹或文件总数: " + fileNameList.size());
System.out.println( "正在同步。。。。。" );
//fileNameList.size()次计数锁用于保证数据同步完成(保证计时准确)
CountDownLatch rsyncLock = new CountDownLatch(fileNameList.size());
System.out.println(fileNameList.size());
for (String fileName:fileNameList) {
//除去文件名中与UserIP重复的文件路径
String RemoteDir = UserIP.concat(fileName.replace(content, "" ));
System.out.println( "要同步的本地目录或文件: " + fileName);
System.out.println( "要同步的远端目录或文件: " + RemoteDir);
ThreadPool.execute( new RsyncFile( "./doRync.sh" ,fileName, RemoteDir,rsyncLock));
}
rsyncLock.await();
System.out.println( "执行同步工作完成" );
//开始文件校验工作
System.out.println( "执行文件校验及重传" );
//一次计数锁用于保证校验完成
CountDownLatch chechSumLock = new CountDownLatch( 1 );
ThreadPool.execute( new RsyncFile( "./doChecksum.sh" ,content,UserIP,chechSumLock));
chechSumLock.await();
System.out.println( "文件校验及重传完成" );
ThreadPool.shutdown();
Long b = System.nanoTime();
Long aLong = (b - a)/1000000L;
System.out.println( "处理时间" + aLong + "ms" );
System.out.println( "结束时间:" + new Date());
}
/**
* 执行rsync脚本的线程方法,使用PrintWriter来与linux Terminal交互
*/
@Override
public void run() {
try {
String command=shellname.concat( " " ).concat(filename).concat( " " ).concat(userip);
File wd = new File( "/bin" );
Process process = null ;
process = Runtime.getRuntime().exec( "/bin/bash" , null , wd);
if (process != null ) {
InputStream is = process.getInputStream();
BufferedReader reader = new BufferedReader( new InputStreamReader(is));
PrintWriter out = new PrintWriter( new BufferedWriter( new OutputStreamWriter(process.getOutputStream())),
true );
//切换到当前class文件所在目录
out.println( "cd " + System.getProperty( "user.dir" ));
out.println(command);
out.println( "exit" );
StringBuilder sb = new StringBuilder();
String line;
while ((line = reader.readLine()) != null ) {
sb.append(line + System.lineSeparator());
}
process.waitFor();
reader.close();
out.close();
process.destroy();
System.out.println( "result:" + sb.toString());
} else {
System.out.println( "找不到系统bash工具,请检查系统是否异常,并为系统创建/bin/sh的bash工具软连接" );
}
} catch (Exception e) {
System.err.println(e.getMessage());
} finally {
//倒记数锁释放一次
countDownLatch.countDown();
}
}
/**遍历指定的目录并能指定深度
* @param file 指定要遍历的目录
* @param setDeep 设定遍历深度
*/
@SneakyThrows
private static void GetAllFile(File file, int setDeep) {
if (file != null ){
if (file.isDirectory() && deep<setDeep){
deep++;
File f[] = file.listFiles();
if (f != null ) {
int length = f.length;
for ( int i = 0 ; i < length; i++)
GetAllFile(f[i],setDeep);
deep--;
}
} else {
if (file.isDirectory()){
//如果为目录末尾添加 / 保证rsync正常处理
fileNameList.add(file.getAbsolutePath().concat( "/" ));
} else {
fileNameList.add(file.getAbsolutePath());
}
}
}
}
}
|
doDir.sh
1
2
|
rsync -av --include= '*/' --exclude= '*' $1 $2 | tee -a /tmp/rsync .log 2>&1
echo "创建目录结构操作"
|
doRsync.sh
1
|
rsync -avzi --stats --progress $1 $2 | tee -a /tmp/rsync .log 2>&1
|
doChecksum.sh
1
|
rsync -acvzi --stats --progress $1 $2 | tee -a /tmp/checksum .log 2>&1
|
附录
rsync输出日志说明如下
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
|
YXcstpoguax path/to/file
|||||||||||
||||||||||╰- x: The extended attribute information changed
|||||||||╰-- a: The ACL information changed
||||||||╰--- u: The u slot is reserved for future use
|||||||╰---- g: Group is different
||||||╰----- o: Owner is different
|||||╰------ p: Permission are different
||||╰------- t: Modification time is different
|||╰-------- s: Size is different
||╰--------- c: Different checksum (for regular files), or
|| changed value (for symlinks, devices, and special files)
|╰---------- the file type:
| f: for a file,
| d: for a directory,
| L: for a symlink,
| D: for a device,
| S: for a special file (e.g. named sockets and fifos)
╰----------- the type of update being done::
<: file is being transferred to the remote host (sent)
>: file is being transferred to the local host (received)
c: local change/creation for the item, such as:
- the creation of a directory
- the changing of a symlink,
- etc.
h: the item is a hard link to another item (requires
--hard-links).
.: the item is not being updated (though it might have
attributes that are being modified)
*: means that the rest of the itemized-output area contains
a message (e.g. "deleting")
|
到此这篇关于Java之Rsync并发迁移数据并校验详解的文章就介绍到这了,更多相关Java之Rsync并发内容请搜索服务器之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持服务器之家!
原文链接:https://blog.csdn.net/w4187402/article/details/103274618