作者:京东物流 高圆庆
1 前言
通常一个对象创建、销毁非常耗时的时候,我们不会频繁的创建和销毁它,而是考虑复用。复用对象的一种做法就是对象池,将创建好的对象放入池中维护起来,下次再用的时候直接拿池中已经创建好的对象继续用,这就是池化的思想。在java中,有很多池管理的概念,典型的如线程池,数据库连接池,socket连接池。本文章讲介绍apache提供的通用对象池框架GenericObjectPool,以及基于GenericObjectPool实现的sftp连接池在国际物流调度履约系统中的应用。
2 GenericObjectPool剖析
Apache Commons Pool是一个对象池的框架,他提供了一整套用于实现对象池化的API。它提供了三种对象池:GenericKeyedObjectPool,SoftReferenceObjectPool和GenericObjectPool,其中GenericObjectPool是我们最常用的对象池,内部实现也最复杂。GenericObjectPool的UML图如下所示:
2.1 核心接口ObjectPool
从图中可以看出,GenericObjectPool实现了ObjectPool接口,而ObjectPool就是对象池的核心接口,它定义了一个对象池应该实现的行为。
- addObject方法:往池中添加一个对象
- borrowObject方法:从池中借走到一个对象
- returnObject方法:把对象归还给对象池
- invalidateObject:验证对象的有效性
- getNumIdle:返回对象池中有多少对象是空闲的,也就是能够被借走的对象的数量。
- getNumActive:返回对象池中有对象对象是活跃的,也就是已经被借走的,在使用中的对象的数量。
- clear:清理对象池。注意是清理不是清空,该方法要求的是,清理所有空闲对象,释放相关资源。
- close:关闭对象池。这个方法可以达到清空的效果,清理所有对象以及相关资源。
2.2 对象工厂BasePooledObjectFactory
对象的创建需要通过对象工厂来创建,对象工厂需要实现BasePooledObjectFactory接口。ObjectPool接口中往池中添加一个对象,就需要使用对象工厂来创建一个对象。该接口说明如下:
-
public interface PooledObjectFactory<T> {
-
/**
-
* 创建一个可由池提供服务的实例,并将其封装在由池管理的PooledObject中。
-
*/
-
PooledObject<T> makeObject() throws Exception;
-
/**
-
* 销毁池不再需要的实例
-
*/
-
void destroyObject(PooledObject<T> p) throws Exception;
-
/**
-
* 确保实例可以安全地由池返回
-
*/
-
boolean validateObject(PooledObject<T> p);
-
/**
-
* 重新初始化池返回的实例
-
*/
-
void activateObject(PooledObject<T> p) throws Exception;
-
/**
-
* 取消初始化要返回到空闲对象池的实例
-
*/
-
void passivateObject(PooledObject<T> p) throws Exception;
-
}
2.3 配置类GenericObjectPoolConfig
GenericObjectPoolConfig是封装GenericObject池配置的简单“结构”,此类不是线程安全的;它仅用于提供创建池时使用的属性。大多数情况,可以使用GenericObjectPoolConfig提供的默认参数就可以满足日常的需求,GenericObjectPoolConfig是一个抽象类,实际应用中需要新建配置类,然后继承它。
2.4 工作原理流程
- 构造方法
当我们执行构造方法时,主要工作就是创建了一个存储对象的LinkedList类型容器,也就是概念意义上的“池” - 从对象池中获取对象
获取池中的对象是通过borrowObject()命令,源码比较复杂,简单而言就是去LinkedList中获取一个对象,如果不存在的话,要调用构造方法中第一个参数Factory工厂类的makeObject()方法去创建一个对象再获取,获取到对象后要调用validateObject方法判断该对象是否是可用的,如果是可用的才拿去使用。LinkedList容器减一 - 归还对象到线程池
简单而言就是先调用validateObject方法判断该对象是否是可用的,如果可用则归还到池中,LinkedList容器加一,如果是不可以的则则调用destroyObject方法进行销毁
上面三步就是最简单的流程,由于取和还的流程步骤都在borrowObject和returnObject方法中固定的,所以我们只要重写Factory工厂类的makeObject()和validateObject以及destroyObject方法即可实现最简单的池的管理控制,通过构造方法传入该Factory工厂类对象则可以创建最简单的对象池管理类。这算是比较好的解耦设计模式,借和还的流程如下图所示:
3 开源框架如何使用GenericObjectPool
redis的java客户端jedis就是基于Apache Commons Pool对象池的框架来实现的。
3.1 对象工厂类JedisFactory
对象工厂类只需实现activateObject、destroyObject、makeObject、validateObject方法即可,源码如下:
-
class JedisFactory implements PooledObjectFactory<Jedis> {
-
private final String host;
-
private final int port;
-
private final int timeout;
-
private final int newTimeout;
-
private final String password;
-
private final int database;
-
private final String clientName;
-
public JedisFactory(String host, int port, int timeout, String password, int database) {
-
this(host, port, timeout, password, database, (String)null);
-
}
-
public JedisFactory(String host, int port, int timeout, String password, int database, String clientName) {
-
this(host, port, timeout, timeout, password, database, clientName);
-
}
-
public JedisFactory(String host, int port, int timeout, int newTimeout, String password, int database, String clientName) {
-
this.host = host;
-
this.port = port;
-
this.timeout = timeout;
-
this.newTimeout = newTimeout;
-
this.password = password;
-
this.database = database;
-
this.clientName = clientName;
-
}
-
public void activateObject(PooledObject<Jedis> pooledJedis) throws Exception {
-
BinaryJedis jedis = (BinaryJedis)pooledJedis.getObject();
-
if (jedis.getDB() != (long)this.database) {
-
jedis.select(this.database);
-
}
-
}
-
public void destroyObject(PooledObject<Jedis> pooledJedis) throws Exception {
-
BinaryJedis jedis = (BinaryJedis)pooledJedis.getObject();
-
if (jedis.isConnected()) {
-
try {
-
try {
-
jedis.quit();
-
} catch (Exception var4) {
-
}
-
jedis.disconnect();
-
} catch (Exception var5) {
-
}
-
}
-
}
-
public PooledObject<Jedis> makeObject() throws Exception {
-
Jedis jedis = new Jedis(this.host, this.port, this.timeout, this.newTimeout);
-
jedis.connect();
-
if (null != this.password) {
-
jedis.auth(this.password);
-
}
-
if (this.database != 0) {
-
jedis.select(this.database);
-
}
-
if (this.clientName != null) {
-
jedis.clientSetname(this.clientName);
-
}
-
return new DefaultPooledObject(jedis);
-
}
-
public void passivateObject(PooledObject<Jedis> pooledJedis) throws Exception {
-
}
-
public boolean validateObject(PooledObject<Jedis> pooledJedis) {
-
BinaryJedis jedis = (BinaryJedis)pooledJedis.getObject();
-
try {
-
return jedis.isConnected() && jedis.ping().equals("PONG");
-
} catch (Exception var4) {
-
return false;
-
}
-
}
-
}
3.2 配置类JedisPoolConfig
-
public class JedisPoolConfig extends GenericObjectPoolConfig {
-
public JedisPoolConfig() {
-
this.setTestWhileIdle(true);
-
this.setMinEvictableIdleTimeMillis(60000L);
-
this.setTimeBetweenEvictionRunsMillis(30000L);
-
this.setNumTestsPerEvictionRun(-1);
-
}
-
}
4 国际物流履约系统中的应用
在国际物流履约系统中,我们和客户交互文件经常使用sftp服务器,因为创建sftp服务器的连接比较耗时,所以基于Apache Commons Pool对象池的框架来实现的我们自己的sftp链接池。
4.1 sftp对象池
SftpPool比较简单,直接继承GenericObjectPool。
-
public class SftpPool extends GenericObjectPool<Sftp> {
-
public SftpPool(SftpFactory factory, SftpPoolConfig config, SftpAbandonedConfig abandonedConfig) {
-
super(factory, config, abandonedConfig);
-
}
-
}
4.2 对象工厂SftpFactory
这是基于Apache Commons Pool框架实现自定义对象池的核心类,代码如下:
-
public class SftpFactory extends BasePooledObjectFactory<Sftp> {
-
private static final String CHANNEL_TYPE = "sftp";
-
private static Properties sshConfig = new Properties();
-
private String host;
-
private int port;
-
private String username;
-
private String password;
-
static {
-
sshConfig.put("StrictHostKeyChecking", "no");
-
}
-
@Override
-
public Sftp create() {
-
try {
-
JSch jsch = new JSch();
-
Session sshSession = jsch.getSession(username, host, port);
-
sshSession.setPassword(password);
-
sshSession.setConfig(sshConfig);
-
sshSession.connect();
-
ChannelSftp channel = (ChannelSftp) sshSession.openChannel(CHANNEL_TYPE);
-
channel.connect();
-
log.info("sftpFactory创建sftp");
-
return new Sftp(channel);
-
} catch (JSchException e) {
-
log.error("连接sftp失败:", e);
-
throw new BizException(ResultCodeEnum.SFTP_EXCEPTION);
-
}
-
}
-
/**
-
* @param sftp 被包装的对象
-
* @return 对象包装器
-
*/
-
@Override
-
public PooledObject<Sftp> wrap(Sftp sftp) {
-
return new DefaultPooledObject<>(sftp);
-
}
-
/**
-
* 销毁对象
-
* @param p 对象包装器
-
*/
-
@Override
-
public void destroyObject(PooledObject<Sftp> p) {
-
log.info("开始销毁channelSftp");
-
if (p!=null) {
-
Sftp sftp = p.getObject();
-
if (sftp!=null) {
-
ChannelSftp channelSftp = sftp.getChannelSftp();
-
if (channelSftp!=null) {
-
channelSftp.disconnect();
-
log.info("销毁channelSftp成功");
-
}
-
}
-
}
-
}
-
/**
-
* 检查连接是否可用
-
*
-
* @param p 对象包装器
-
* @return {@code true} 可用,{@code false} 不可用
-
*/
-
@Override
-
public boolean validateObject(PooledObject<Sftp> p) {
-
if (p!=null) {
-
Sftp sftp = p.getObject();
-
if (sftp!=null) {
-
try {
-
sftp.getChannelSftp().cd("./");
-
log.info("验证连接是否可用,结果为true");
-
return true;
-
} catch (SftpException e) {
-
log.info("验证连接是否可用,结果为false",e);
-
return false;
-
}
-
}
-
}
-
log.info("验证连接是否可用,结果为false");
-
return false;
-
}
-
public static class Builder {
-
private String host;
-
private int port;
-
private String username;
-
private String password;
-
public SftpFactory build() {
-
return new SftpFactory(host, port, username, password);
-
}
-
public Builder host(String host) {
-
this.host = host;
-
return this;
-
}
-
public Builder port(int port) {
-
this.port = port;
-
return this;
-
}
-
public Builder username(String username) {
-
this.username = username;
-
return this;
-
}
-
public Builder password(String password) {
-
this.password = password;
-
return this;
-
}
-
}
-
}
4.3 配置类SftpPoolConfig
配置类继承了GenericObjectPoolConfig,可继承该类的默认属性,也可自定义配置参数。
-
public class SftpPoolConfig extends GenericObjectPoolConfig<Sftp> {
-
public static class Builder {
-
private int maxTotal;
-
private int maxIdle;
-
private int minIdle;
-
private boolean lifo;
-
private boolean fairness;
-
private long maxWaitMillis;
-
private long minEvictableIdleTimeMillis;
-
private long evictorShutdownTimeoutMillis;
-
private long softMinEvictableIdleTimeMillis;
-
private int numTestsPerEvictionRun;
-
private EvictionPolicy<Sftp> evictionPolicy; // 仅2.6.0版本commons-pool2需要设置
-
private String evictionPolicyClassName;
-
private boolean testOnCreate;
-
private boolean testOnBorrow;
-
private boolean testOnReturn;
-
private boolean testWhileIdle;
-
private long timeBetweenEvictionRunsMillis;
-
private boolean blockWhenExhausted;
-
private boolean jmxEnabled;
-
private String jmxNamePrefix;
-
private String jmxNameBase;
-
public SftpPoolConfig build() {
-
SftpPoolConfig config = new SftpPoolConfig();
-
config.setMaxTotal(maxTotal);
-
config.setMaxIdle(maxIdle);
-
config.setMinIdle(minIdle);
-
config.setLifo(lifo);
-
config.setFairness(fairness);
-
config.setMaxWaitMillis(maxWaitMillis);
-
config.setMinEvictableIdleTimeMillis(minEvictableIdleTimeMillis);
-
config.setEvictorShutdownTimeoutMillis(evictorShutdownTimeoutMillis);
-
config.setSoftMinEvictableIdleTimeMillis(softMinEvictableIdleTimeMillis);
-
config.setNumTestsPerEvictionRun(numTestsPerEvictionRun);
-
config.setEvictionPolicy(evictionPolicy);
-
config.setEvictionPolicyClassName(evictionPolicyClassName);
-
config.setTestOnCreate(testOnCreate);
-
config.setTestOnBorrow(testOnBorrow);
-
config.setTestOnReturn(testOnReturn);
-
config.setTestWhileIdle(testWhileIdle);
-
config.setTimeBetweenEvictionRunsMillis(timeBetweenEvictionRunsMillis);
-
config.setBlockWhenExhausted(blockWhenExhausted);
-
config.setJmxEnabled(jmxEnabled);
-
config.setJmxNamePrefix(jmxNamePrefix);
-
config.setJmxNameBase(jmxNameBase);
-
return config;
-
}
-
}
4.4 SftpClient配置类
读取配置文件,创建SftpFactory、SftpPoolConfig、SftpPool,代码如下:
-
@Configuration
-
@ConditionalOnClass(SftpPool.class)
-
@EnableConfigurationProperties(SftpClientProperties.class)
-
public class SftpClientAutoConfiguration {
-
@Bean
-
@ConditionalOnMissingBean
-
public ISftpClient sftpClient(SftpClientProperties sftpClientProperties) {
-
if (sftpClientProperties.isMultiple()) {
-
MultipleSftpClient multipleSftpClient = new MultipleSftpClient();
-
sftpClientProperties.getClients().forEach((name, properties) -> {
-
SftpFactory sftpFactory = createSftpFactory(properties);
-
SftpPoolConfig sftpPoolConfig = createSftpPoolConfig(properties);
-
SftpAbandonedConfig sftpAbandonedConfig = createSftpAbandonedConfig(properties);
-
SftpPool sftpPool = new SftpPool(sftpFactory, sftpPoolConfig, sftpAbandonedConfig);
-
ISftpClient sftpClient = new SftpClient(sftpPool);
-
multipleSftpClient.put(name, sftpClient);
-
});
-
return multipleSftpClient;
-
}
-
SftpFactory sftpFactory = createSftpFactory(sftpClientProperties);
-
SftpPoolConfig sftpPoolConfig = createSftpPoolConfig(sftpClientProperties);
-
SftpAbandonedConfig sftpAbandonedConfig = createSftpAbandonedConfig(sftpClientProperties);
-
SftpPool sftpPool = new SftpPool(sftpFactory, sftpPoolConfig, sftpAbandonedConfig);
-
return new SftpClient(sftpPool);
-
}
-
public SftpFactory createSftpFactory(SftpClientProperties properties) {
-
return new SftpFactory.Builder()
-
.host(properties.getHost())
-
.port(properties.getPort())
-
.username(properties.getUsername())
-
.password(properties.getPassword())
-
.build();
-
}
-
public SftpPoolConfig createSftpPoolConfig(SftpClientProperties properties) {
-
SftpClientProperties.Pool pool = properties.getPool();
-
return new SftpPoolConfig.Builder()
-
.maxTotal(pool.getMaxTotal())
-
.maxIdle(pool.getMaxIdle())
-
.minIdle(pool.getMinIdle())
-
.lifo(pool.isLifo())
-
.fairness(pool.isFairness())
-
.maxWaitMillis(pool.getMaxWaitMillis())
-
.minEvictableIdleTimeMillis(pool.getMinEvictableIdleTimeMillis())
-
.evictorShutdownTimeoutMillis(pool.getEvictorShutdownTimeoutMillis())
-
.softMinEvictableIdleTimeMillis(pool.getSoftMinEvictableIdleTimeMillis())
-
.numTestsPerEvictionRun(pool.getNumTestsPerEvictionRun())
-
.evictionPolicy(null)
-
.evictionPolicyClassName(DefaultEvictionPolicy.class.getName())
-
.testOnCreate(pool.isTestOnCreate())
-
.testOnBorrow(pool.isTestOnBorrow())
-
.testOnReturn(pool.isTestOnReturn())
-
.testWhileIdle(pool.isTestWhileIdle())
-
.timeBetweenEvictionRunsMillis(pool.getTimeBetweenEvictionRunsMillis())
-
.blockWhenExhausted(pool.isBlockWhenExhausted())
-
.jmxEnabled(pool.isJmxEnabled())
-
.jmxNamePrefix(pool.getJmxNamePrefix())
-
.jmxNameBase(pool.getJmxNameBase())
-
.build();
-
}
-
public SftpAbandonedConfig createSftpAbandonedConfig(SftpClientProperties properties) {
-
SftpClientProperties.Abandoned abandoned = properties.getAbandoned();
-
return new SftpAbandonedConfig.Builder()
-
.removeAbandonedOnBorrow(abandoned.isRemoveAbandonedOnBorrow())
-
.removeAbandonedOnMaintenance(abandoned.isRemoveAbandonedOnMaintenance())
-
.removeAbandonedTimeout(abandoned.getRemoveAbandonedTimeout())
-
.logAbandoned(abandoned.isLogAbandoned())
-
.requireFullStackTrace(abandoned.isRequireFullStackTrace())
-
.logWriter(new PrintWriter(System.out))
-
.useUsageTracking(abandoned.isUseUsageTracking())
-
.build();
-
}
-
}
4.5 对象SftpClient
SftpClient是实际工作的类,从SftpClient 中可获取到一个sftp链接,使用完成后,归还给sftpPool。SftpClient代码如下:
-
public class SftpClient implements ISftpClient {
-
private SftpPool sftpPool;
-
/**
-
* 从sftp连接池获取连接并执行操作
-
*
-
* @param handler sftp操作
-
*/
-
@Override
-
public void open(ISftpClient.Handler handler) {
-
Sftp sftp = null;
-
try {
-
sftp = sftpPool.borrowObject();
-
ISftpClient.Handler policyHandler = new DelegateHandler(handler);
-
policyHandler.doHandle(sftp);
-
} catch (Exception e) {
-
log.error("sftp异常:", e);
-
throw new BizException(ResultCodeEnum.SFTP_EXCEPTION);
-
} finally {
-
if (sftp != null) {
-
sftpPool.returnObject(sftp);
-
}
-
}
-
}
-
@AllArgsConstructor
-
static class DelegateHandler implements ISftpClient.Handler {
-
private ISftpClient.Handler target;
-
@Override
-
public void doHandle(Sftp sftp) {
-
try {
-
target.doHandle(sftp);
-
} catch (Exception e) {
-
log.error("sftp异常:", e);
-
throw new BizException(ResultCodeEnum.SFTP_EXCEPTION);
-
}
-
}
-
}
-
}
4.6 实战代码示例
通过sftp上传文件到XX服务器
-
//通过SFTP上传到XX
-
((MultipleSftpClient) sftpClient).choose("XX");
-
sftpClient.open(sftp -> {
-
boolean exist = sftp.isExist(inventoryPath);
-
if(!exist){
-
sftp.mkdirs(inventoryPath);
-
}
-
// 执行sftp操作
-
InputStream is = new FileInputStream(oneColumnCSVFile);
-
sftp.upload(inventoryPath, titleName, is);
-
log.info("inventory upload over");
-
});
5 总结
通过本文的介绍可以知道,Apache Commons Pool定义了一个对象池的行为,提供了可扩展的配置类和对象工厂,封装了对象创建、从池中获取对象、归还对象的核心流程。还介绍了开源框架Jedis是如何基于GenericObjectPool来实现的连接池。最后介绍了国际物流履约系统中是如何基于GenericObjectPool来管理Sftp连接的。
掌握了GenericObjectPool的核心原理,我们就可以通过实现几个关键的接口,创建一个对象池管理工具,在项目中避免了对象的频繁创建和销毁,从而显著提升程序的性能。