Spring Boot整合FTPClient线程池的实现示例

时间:2022-11-30 10:50:57

最近在写一个ftp上传工具,用到了apache的ftpclient,但是每个线程频繁的创建和销毁ftpclient对象对服务器的压力很大,因此,此处最好使用一个ftpclient连接池。仔细翻了一下apache的api,发现它并没有一个ftpclientpool的实现,所以,不得不自己写一个ftpclientpool。下面就大体介绍一下开发连接池的整个过程,供大家参考。

我们可以利用apache提供的common-pool包来协助我们开发连接池。而开发一个简单的对象池,仅需要实现common-pool 包中的objectpool和poolableobjectfactory两个接口即可。

线程池的意义

为了减少频繁创建、销毁对象带来的性能消耗,我们可以利用对象池的技术来实现对象的复用。对象池提供了一种机制,它可以管理对象池中对象的生命周期,提供了获取和释放对象的方法,可以让客户端很方便的使用对象池中的对象。

pom引入依赖

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
<!-- ftpclient依赖包-->
   <dependency>
     <groupid>commons-net</groupid>
     <artifactid>commons-net</artifactid>
     <version>3.5</version>
   </dependency>
 
   <!-- 线程池-->
   <dependency>
     <groupid>commons-pool</groupid>
     <artifactid>commons-pool</artifactid>
     <version>1.6</version>
   </dependency>
 
   <dependency>
     <groupid>org.apache.commons</groupid>
     <artifactid>commons-pool2</artifactid>
     <version>2.0</version>
   </dependency>

创建ftp配置信息

在resources目录下创建ftp.properties配置文件,目录结构如下:

Spring Boot整合FTPClient线程池的实现示例

添加如下的配置信息:

?
1
2
3
4
5
6
7
8
9
10
########### ftp用户名称 ###########
ftp.username=hrabbit
########### ftp用户密码 ###########
ftp.password=123456
########### ftp主机ip ###########
ftp.host=127.0.0.1
########### ftp主机端口号 ###########
ftp.port=21
########### 保存根路径 ###########
ftp.baseurl=/

创建ftpproperties.java配置文件

加载配置内容到spring中,配置信息基本延用我的就可以。

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
/**
 * ftp的配置信息
 * @auther: hrabbit
 * @date: 2018-12-03 2:06 pm
 * @description:
 */
@data
@component
@propertysource("classpath:ftp.properties")
@configurationproperties(prefix = "ftp")
public class ftpproperties {
 
  private string username;
 
  private string password;
 
  private string host;
 
  private integer port;
 
  private string baseurl;
 
  private integer passivemode = ftp.binary_file_type;
 
  private string encoding="utf-8";
 
  private int clienttimeout=120000;
 
  private int buffersize;
 
  private int transferfiletype=ftp.binary_file_type;
 
  private boolean renameuploaded;
 
  private int retrytime;
}

创建ftpclientpool线程池

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
/**
 * 自定义实现ftp连接池
 * @auther: hrabbit
 * @date: 2018-12-03 3:40 pm
 * @description:
 */
@slf4j
@suppresswarnings("all")
public class ftpclientpool implements objectpool<ftpclient> {
 
  private static final int default_pool_size = 10;
 
  public blockingqueue<ftpclient> blockingqueue;
 
  private ftpclientfactory factory;
 
  public ftpclientpool(ftpclientfactory factory) throws exception {
    this(default_pool_size, factory);
  }
 
  public ftpclientpool(int poolsize, ftpclientfactory factory) throws exception {
    this.factory = factory;
    this.blockingqueue = new arrayblockingqueue<ftpclient>(poolsize);
    initpool(poolsize);
  }
 
  /**
   * 初始化连接池
   * @param maxpoolsize
   *         最大连接数
   * @throws exception
   */
  private void initpool(int maxpoolsize) throws exception {
    int count = 0;
    while(count < maxpoolsize) {
      this.addobject();
      count++;
    }
  }
 
  /**
   * 从连接池中获取对象
   */
  @override
  public ftpclient borrowobject() throws exception {
    ftpclient client = blockingqueue.take();
    if(client == null) {
      client = factory.makeobject();
    } else if(!factory.validateobject(client)) {
      invalidateobject(client);
      client = factory.makeobject();
    }
    return client;
  }
 
  /**
   * 返还一个对象(链接)
   */
  @override
  public void returnobject(ftpclient client) throws exception {
    if ((client != null) && !blockingqueue.offer(client,2,timeunit.minutes)) {
      try {
        factory.destroyobject(client);
      } catch (exception e) {
        throw e;
      }
    }
  }
 
  /**
   * 移除无效的对象(ftp客户端)
   */
  @override
  public void invalidateobject(ftpclient client) throws exception {
    blockingqueue.remove(client);
  }
 
  /**
   * 增加一个新的链接,超时失效
   */
  @override
  public void addobject() throws exception {
    blockingqueue.offer(factory.makeobject(), 2, timeunit.minutes);
  }
 
  /**
   * 重新连接
   */
  public ftpclient reconnect() throws exception {
    return factory.makeobject();
  }
 
  /**
   * 获取空闲链接数(这里暂不实现)
   */
  @override
  public int getnumidle() {
    return blockingqueue.size();
  }
 
  /**
   * 获取正在被使用的链接数
   */
  @override
  public int getnumactive() {
    return default_pool_size - getnumidle();
  }
 
  @override
  public void clear() throws exception {
 
  }
 
  /**
   * 关闭连接池
   */
  @override
  public void close() {
    try {
      while(blockingqueue.iterator().hasnext()) {
        ftpclient client = blockingqueue.take();
        factory.destroyobject(client);
      }
    } catch(exception e) {
      log.error("close ftp client pool failed...{}", e);
    }
  }
 
  /**
   * 增加一个新的链接,超时失效
   */
  public void addobject(ftpclient ftpclient) throws exception {
    blockingqueue.put(ftpclient);
  }
}

创建一个ftpclientfactory工厂类

创建ftpclientfactory实现poolableobjectfactory的接口,ftpclient工厂类,通过ftpclient工厂提供ftpclient实例的创建和销毁

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
/**
 * ftpclient 工厂
 * @auther: hrabbit
 * @date: 2018-12-03 3:41 pm
 * @description:
 */
@slf4j
@suppresswarnings("all")
public class ftpclientfactory implements poolableobjectfactory<ftpclient> {
 
  private ftpproperties ftpproperties;
 
  public ftpclientfactory(ftpproperties ftpproperties) {
    this.ftpproperties = ftpproperties;
  }
 
  @override
  public ftpclient makeobject() throws exception {
    ftpclient ftpclient = new ftpclient();
    ftpclient.setcontrolencoding(ftpproperties.getencoding());
    ftpclient.setconnecttimeout(ftpproperties.getclienttimeout());
    try {
      ftpclient.connect(ftpproperties.gethost(), ftpproperties.getport());
      int reply = ftpclient.getreplycode();
      if (!ftpreply.ispositivecompletion(reply)) {
        ftpclient.disconnect();
        log.warn("ftpserver refused connection");
        return null;
      }
      boolean result = ftpclient.login(ftpproperties.getusername(), ftpproperties.getpassword());
      ftpclient.setfiletype(ftpproperties.gettransferfiletype());
      if (!result) {
        log.warn("ftpclient login failed... username is {}", ftpproperties.getusername());
      }
    } catch (exception e) {
      log.error("create ftp connection failed...{}", e);
      throw e;
    }
 
    return ftpclient;
  }
 
  @override
  public void destroyobject(ftpclient ftpclient) throws exception {
    try {
      if(ftpclient != null && ftpclient.isconnected()) {
        ftpclient.logout();
      }
    } catch (exception e) {
      log.error("ftp client logout failed...{}", e);
      throw e;
    } finally {
      if(ftpclient != null) {
        ftpclient.disconnect();
      }
    }
 
  }
 
  @override
  public boolean validateobject(ftpclient ftpclient) {
    try {
      return ftpclient.sendnoop();
    } catch (exception e) {
      log.error("failed to validate client: {}");
    }
    return false;
  }
 
  @override
  public void activateobject(ftpclient obj) throws exception {
    //do nothing
 
  }
 
  @override
  public void passivateobject(ftpclient obj) throws exception {
    //do nothing
 
  }
}

创建ftputils.java的工具类

ftputils.java中封装了上传、下载等方法,在项目启动的时候,在@postconstruct注解的作用下通过执行init()的方法,创建ftpclientfactory工厂中,并初始化了ftpclientpool线程池,这样每次调用方法的时候,都直接从ftpclientpool中取出一个ftpclient对象

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
/**
 * @auther: hrabbit
 * @date: 2018-12-03 3:47 pm
 * @description:
 */
@slf4j
@component
public class ftputils {
 
  /**
   * ftp的连接池
   */
  @autowired
  public static ftpclientpool ftpclientpool;
  /**
   * ftpclient对象
   */
  public static ftpclient ftpclient;
 
 
  private static ftputils ftputils;
 
  @autowired
  private ftpproperties ftpproperties;
 
  /**
   * 初始化设置
   * @return
   */
  @postconstruct
  public boolean init() {
    ftpclientfactory factory = new ftpclientfactory(ftpproperties);
    ftputils = this;
    try {
      ftpclientpool = new ftpclientpool(factory);
    } catch (exception e) {
      e.printstacktrace();
      return false;
    }
    return true;
  }
 
 
  /**
   * 获取连接对象
   * @return
   * @throws exception
   */
  public static ftpclient getftpclient() throws exception {
    //初始化的时候从队列中取出一个连接
    if (ftpclient==null) {
      synchronized (ftpclientpool) {
        ftpclient = ftpclientpool.borrowobject();
      }
    }
    return ftpclient;
  }
 
 
  /**
   * 当前命令执行完成命令完成
   * @throws ioexception
   */
  public void complete() throws ioexception {
    ftpclient.completependingcommand();
  }
 
  /**
   * 当前线程任务处理完成,加入到队列的最后
   * @return
   */
  public void disconnect() throws exception {
    ftpclientpool.addobject(ftpclient);
  }
 
  /**
   * description: 向ftp服务器上传文件
   *
   * @version1.0
   * @param remotefile
   *      上传到ftp服务器上的文件名
   * @param input
   *      本地文件流
   * @return 成功返回true,否则返回false
   */
  public static boolean uploadfile(string remotefile, inputstream input) {
    boolean result = false;
    try {
      getftpclient();
      ftpclient.enterlocalpassivemode();
      result = ftpclient.storefile(remotefile, input);
      input.close();
      ftpclient.disconnect();
    } catch (exception e) {
      e.printstacktrace();
    }
    return result;
  }
 
  /**
   * description: 向ftp服务器上传文件
   *
   * @version1.0
   * @param remotefile
   *      上传到ftp服务器上的文件名
   * @param localfile
   *      本地文件
   * @return 成功返回true,否则返回false
   */
  public static boolean uploadfile(string remotefile, string localfile){
    fileinputstream input = null;
    try {
      input = new fileinputstream(new file(localfile));
    } catch (filenotfoundexception e) {
      e.printstacktrace();
    }
    return uploadfile(remotefile, input);
  }
 
  /**
   * 拷贝文件
   * @param fromfile
   * @param tofile
   * @return
   * @throws exception
   */
  public boolean copyfile(string fromfile, string tofile) throws exception {
    inputstream in=getfileinputstream(fromfile);
    getftpclient();
    boolean flag = ftpclient.storefile(tofile, in);
    in.close();
    return flag;
  }
 
  /**
   * 获取文件输入流
   * @param filename
   * @return
   * @throws ioexception
   */
  public static inputstream getfileinputstream(string filename) throws exception {
    bytearrayoutputstream fos=new bytearrayoutputstream();
    getftpclient();
    ftpclient.retrievefile(filename, fos);
    bytearrayinputstream in=new bytearrayinputstream(fos.tobytearray());
    fos.close();
    return in;
  }
 
  /**
   * description: 从ftp服务器下载文件
   *
   * @version1.0
   * @return
   */
  public static boolean downfile(string remotefile, string localfile){
    boolean result = false;
    try {
      getftpclient();
      outputstream os = new fileoutputstream(localfile);
      ftpclient.retrievefile(remotefile, os);
      ftpclient.logout();
      ftpclient.disconnect();
      result = true;
    } catch (exception e) {
      e.printstacktrace();
    } finally {
      try {
      } catch (exception e) {
        e.printstacktrace();
      }
    }
    return result;
  }
 
  /**
   * 从ftp中获取文件流
   * @param filepath
   * @return
   * @throws exception
   */
  public static inputstream getinputstream(string filepath) throws exception {
    getftpclient();
    inputstream inputstream = ftpclient.retrievefilestream(filepath);
    return inputstream;
  }
 
  /**
   * ftp中文件重命名
   * @param fromfile
   * @param tofile
   * @return
   * @throws exception
   */
  public boolean rename(string fromfile,string tofile) throws exception {
    getftpclient();
    boolean result = ftpclient.rename(fromfile,tofile);
    return result;
  }
 
  /**
   * 获取ftp目录下的所有文件
   * @param dir
   * @return
   */
  public ftpfile[] getfiles(string dir) throws exception {
    getftpclient();
    ftpfile[] files = new ftpfile[0];
    try {
      files = ftpclient.listfiles(dir);
    }catch (throwable thr){
      thr.printstacktrace();
    }
    return files;
  }
 
  /**
   * 获取ftp目录下的某种类型的文件
   * @param dir
   * @param filter
   * @return
   */
  public ftpfile[] getfiles(string dir, ftpfilefilter filter) throws exception {
    getftpclient();
    ftpfile[] files = new ftpfile[0];
    try {
      files = ftpclient.listfiles(dir, filter);
    }catch (throwable thr){
      thr.printstacktrace();
    }
    return files;
  }
 
  /**
   * 创建文件夹
   * @param remotedir
   * @return 如果已经有这个文件夹返回false
   */
  public boolean makedirectory(string remotedir) throws exception {
    getftpclient();
    boolean result = false;
    try {
      result = ftpclient.makedirectory(remotedir);
    } catch (ioexception e) {
      e.printstacktrace();
    }
    return result;
  }
 
  public boolean mkdirs(string dir) throws exception {
    boolean result = false;
    if (null == dir) {
      return result;
    }
    getftpclient();
    ftpclient.changeworkingdirectory("/");
    stringtokenizer dirs = new stringtokenizer(dir, "/");
    string temp = null;
    while (dirs.hasmoreelements()) {
      temp = dirs.nextelement().tostring();
      //创建目录
      ftpclient.makedirectory(temp);
      //进入目录
      ftpclient.changeworkingdirectory(temp);
      result = true;
    }
    ftpclient.changeworkingdirectory("/");
    return result;
  }
}

创建ftpclienttest.java测试类

上传一张图片到ftp服务器,并将文件重新命名为hrabbit.jpg,代码如下:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/**
 * ftpclient测试
 * @auther: hrabbit
 * @date: 2018-12-21 9:14 pm
 * @description:
 */
@runwith(springrunner.class)
@springboottest
public class ftpclienttest {
 
  /**
   * 测试上传
   */
  @test
  public void uploadfile(){
    boolean flag = ftputils.uploadfile("hrabbit.jpg", "/users/mrotaku/downloads/klklklkl_4x.jpg");
    assert.assertequals(true, flag);
  }
 
}

程序完美运行,这时候我们查看我们的ftp服务器,http://localhost:8866/hrabbit.jpg

Spring Boot整合FTPClient线程池的实现示例

码云地址:https://gitee.com/hrabbit/hrabbit-admin

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持服务器之家。

原文链接:https://www.jianshu.com/p/6270b2308c4e