巧用GenericObjectPool创建自定义对象池

时间:2022-01-08 00:51:12

作者:京东物流 高圆庆

1 前言

通常一个对象创建、销毁非常耗时的时候,我们不会频繁的创建和销毁它,而是考虑复用。复用对象的一种做法就是对象池,将创建好的对象放入池中维护起来,下次再用的时候直接拿池中已经创建好的对象继续用,这就是池化的思想。在java中,有很多池管理的概念,典型的如线程池,数据库连接池,socket连接池。本文章讲介绍apache提供的通用对象池框架GenericObjectPool,以及基于GenericObjectPool实现的sftp连接池在国际物流调度履约系统中的应用。

2 GenericObjectPool剖析

Apache Commons Pool是一个对象池的框架,他提供了一整套用于实现对象池化的API。它提供了三种对象池:GenericKeyedObjectPool,SoftReferenceObjectPool和GenericObjectPool,其中GenericObjectPool是我们最常用的对象池,内部实现也最复杂。GenericObjectPool的UML图如下所示:

巧用GenericObjectPool创建自定义对象池

2.1 核心接口ObjectPool

从图中可以看出,GenericObjectPool实现了ObjectPool接口,而ObjectPool就是对象池的核心接口,它定义了一个对象池应该实现的行为。

  • addObject方法:往池中添加一个对象
  • borrowObject方法:从池中借走到一个对象
  • returnObject方法:把对象归还给对象池
  • invalidateObject:验证对象的有效性
  • getNumIdle:返回对象池中有多少对象是空闲的,也就是能够被借走的对象的数量。
  • getNumActive:返回对象池中有对象对象是活跃的,也就是已经被借走的,在使用中的对象的数量。
  • clear:清理对象池。注意是清理不是清空,该方法要求的是,清理所有空闲对象,释放相关资源。
  • close:关闭对象池。这个方法可以达到清空的效果,清理所有对象以及相关资源。

2.2 对象工厂BasePooledObjectFactory

对象的创建需要通过对象工厂来创建,对象工厂需要实现BasePooledObjectFactory接口。ObjectPool接口中往池中添加一个对象,就需要使用对象工厂来创建一个对象。该接口说明如下:

  1. ​public interface PooledObjectFactory<T> {​
  2. ​/**​
  3. ​* 创建一个可由池提供服务的实例,并将其封装在由池管理的PooledObject中。​
  4. ​*/​
  5. ​PooledObject<T> makeObject() throws Exception;​
  6. ​/**​
  7. ​* 销毁池不再需要的实例​
  8. ​*/​
  9. ​void destroyObject(PooledObject<T> p) throws Exception;​
  10. ​/**​
  11. ​* 确保实例可以安全地由池返回​
  12. ​*/​
  13. ​boolean validateObject(PooledObject<T> p);​
  14. ​/**​
  15. ​* 重新初始化池返回的实例​
  16. ​*/​
  17. ​void activateObject(PooledObject<T> p) throws Exception;​
  18. ​/**​
  19. ​* 取消初始化要返回到空闲对象池的实例​
  20. ​*/​
  21. ​void passivateObject(PooledObject<T> p) throws Exception;​
  22. ​}​

2.3 配置类GenericObjectPoolConfig

GenericObjectPoolConfig是封装GenericObject池配置的简单“结构”,此类不是线程安全的;它仅用于提供创建池时使用的属性。大多数情况,可以使用GenericObjectPoolConfig提供的默认参数就可以满足日常的需求,GenericObjectPoolConfig是一个抽象类,实际应用中需要新建配置类,然后继承它。

2.4 工作原理流程

  1. 构造方法
    当我们执行构造方法时,主要工作就是创建了一个存储对象的LinkedList类型容器,也就是概念意义上的“池”
  2. 从对象池中获取对象
    获取池中的对象是通过borrowObject()命令,源码比较复杂,简单而言就是去LinkedList中获取一个对象,如果不存在的话,要调用构造方法中第一个参数Factory工厂类的makeObject()方法去创建一个对象再获取,获取到对象后要调用validateObject方法判断该对象是否是可用的,如果是可用的才拿去使用。LinkedList容器减一
  3. 归还对象到线程池
    简单而言就是先调用validateObject方法判断该对象是否是可用的,如果可用则归还到池中,LinkedList容器加一,如果是不可以的则则调用destroyObject方法进行销毁

上面三步就是最简单的流程,由于取和还的流程步骤都在borrowObject和returnObject方法中固定的,所以我们只要重写Factory工厂类的makeObject()和validateObject以及destroyObject方法即可实现最简单的池的管理控制,通过构造方法传入该Factory工厂类对象则可以创建最简单的对象池管理类。这算是比较好的解耦设计模式,借和还的流程如下图所示:

巧用GenericObjectPool创建自定义对象池

3 开源框架如何使用GenericObjectPool

redis的java客户端jedis就是基于Apache Commons Pool对象池的框架来实现的。

3.1 对象工厂类JedisFactory

对象工厂类只需实现activateObject、destroyObject、makeObject、validateObject方法即可,源码如下:

  1. ​class JedisFactory implements PooledObjectFactory<Jedis> {​
  2. ​private final String host;​
  3. ​private final int port;​
  4. ​private final int timeout;​
  5. ​private final int newTimeout;​
  6. ​private final String password;​
  7. ​private final int database;​
  8. ​private final String clientName;​
  9. ​public JedisFactory(String host, int port, int timeout, String password, int database) {​
  10. ​this(host, port, timeout, password, database, (String)null);​
  11. ​}​
  12. ​public JedisFactory(String host, int port, int timeout, String password, int database, String clientName) {​
  13. ​this(host, port, timeout, timeout, password, database, clientName);​
  14. ​}​
  15. ​public JedisFactory(String host, int port, int timeout, int newTimeout, String password, int database, String clientName) {​
  16. ​this.host = host;​
  17. ​this.port = port;​
  18. ​this.timeout = timeout;​
  19. ​this.newTimeout = newTimeout;​
  20. ​this.password = password;​
  21. ​this.database = database;​
  22. ​this.clientName = clientName;​
  23. ​}​
  24. ​public void activateObject(PooledObject<Jedis> pooledJedis) throws Exception {​
  25. ​BinaryJedis jedis = (BinaryJedis)pooledJedis.getObject();​
  26. ​if (jedis.getDB() != (long)this.database) {​
  27. ​jedis.select(this.database);​
  28. ​}​
  29. ​}​
  30. ​public void destroyObject(PooledObject<Jedis> pooledJedis) throws Exception {​
  31. ​BinaryJedis jedis = (BinaryJedis)pooledJedis.getObject();​
  32. ​if (jedis.isConnected()) {​
  33. ​try {​
  34. ​try {​
  35. ​jedis.quit();​
  36. ​} catch (Exception var4) {​
  37. ​}​
  38. ​jedis.disconnect();​
  39. ​} catch (Exception var5) {​
  40. ​}​
  41. ​}​
  42. ​}​
  43. ​public PooledObject<Jedis> makeObject() throws Exception {​
  44. ​Jedis jedis = new Jedis(this.host, this.port, this.timeout, this.newTimeout);​
  45. ​jedis.connect();​
  46. ​if (null != this.password) {​
  47. ​jedis.auth(this.password);​
  48. ​}​
  49. ​if (this.database != 0) {​
  50. ​jedis.select(this.database);​
  51. ​}​
  52. ​if (this.clientName != null) {​
  53. ​jedis.clientSetname(this.clientName);​
  54. ​}​
  55. ​return new DefaultPooledObject(jedis);​
  56. ​}​
  57. ​public void passivateObject(PooledObject<Jedis> pooledJedis) throws Exception {​
  58. ​}​
  59. ​public boolean validateObject(PooledObject<Jedis> pooledJedis) {​
  60. ​BinaryJedis jedis = (BinaryJedis)pooledJedis.getObject();​
  61. ​try {​
  62. ​return jedis.isConnected() && jedis.ping().equals("PONG");​
  63. ​} catch (Exception var4) {​
  64. ​return false;​
  65. ​}​
  66. ​}​
  67. ​}​

3.2 配置类JedisPoolConfig

  1. ​public class JedisPoolConfig extends GenericObjectPoolConfig {​
  2. ​public JedisPoolConfig() {​
  3. ​this.setTestWhileIdle(true);​
  4. ​this.setMinEvictableIdleTimeMillis(60000L);​
  5. ​this.setTimeBetweenEvictionRunsMillis(30000L);​
  6. ​this.setNumTestsPerEvictionRun(-1);​
  7. ​}​
  8. ​}​

4 国际物流履约系统中的应用

在国际物流履约系统中,我们和客户交互文件经常使用sftp服务器,因为创建sftp服务器的连接比较耗时,所以基于Apache Commons Pool对象池的框架来实现的我们自己的sftp链接池。

4.1 sftp对象池

SftpPool比较简单,直接继承GenericObjectPool。

  1. ​public class SftpPool extends GenericObjectPool<Sftp> {​
  2. ​public SftpPool(SftpFactory factory, SftpPoolConfig config, SftpAbandonedConfig abandonedConfig) {​
  3. ​super(factory, config, abandonedConfig);​
  4. ​}​
  5. ​}​

4.2 对象工厂SftpFactory

这是基于Apache Commons Pool框架实现自定义对象池的核心类,代码如下:

  1. ​public class SftpFactory extends BasePooledObjectFactory<Sftp> {​
  2. ​private static final String CHANNEL_TYPE = "sftp";​
  3. ​private static Properties sshConfig = new Properties();​
  4. ​private String host;​
  5. ​private int port;​
  6. ​private String username;​
  7. ​private String password;​
  8. ​static {​
  9. ​sshConfig.put("StrictHostKeyChecking", "no");​
  10. ​}​
  11. ​@Override​
  12. ​public Sftp create() {​
  13. ​try {​
  14. ​JSch jsch = new JSch();​
  15. ​Session sshSession = jsch.getSession(username, host, port);​
  16. ​sshSession.setPassword(password);​
  17. ​sshSession.setConfig(sshConfig);​
  18. ​sshSession.connect();​
  19. ​ChannelSftp channel = (ChannelSftp) sshSession.openChannel(CHANNEL_TYPE);​
  20. ​channel.connect();​
  21. ​log.info("sftpFactory创建sftp");​
  22. ​return new Sftp(channel);​
  23. ​} catch (JSchException e) {​
  24. ​log.error("连接sftp失败:", e);​
  25. ​throw new BizException(ResultCodeEnum.SFTP_EXCEPTION);​
  26. ​}​
  27. ​}​
  28. ​/**​
  29. ​* @param sftp 被包装的对象​
  30. ​* @return 对象包装器​
  31. ​*/​
  32. ​@Override​
  33. ​public PooledObject<Sftp> wrap(Sftp sftp) {​
  34. ​return new DefaultPooledObject<>(sftp);​
  35. ​}​
  36. ​/**​
  37. ​* 销毁对象​
  38. ​* @param p 对象包装器​
  39. ​*/​
  40. ​@Override​
  41. ​public void destroyObject(PooledObject<Sftp> p) {​
  42. ​log.info("开始销毁channelSftp");​
  43. ​if (p!=null) {​
  44. ​Sftp sftp = p.getObject();​
  45. ​if (sftp!=null) {​
  46. ​ChannelSftp channelSftp = sftp.getChannelSftp();​
  47. ​if (channelSftp!=null) {​
  48. ​channelSftp.disconnect();​
  49. ​log.info("销毁channelSftp成功");​
  50. ​}​
  51. ​}​
  52. ​}​
  53. ​}​
  54. ​/**​
  55. ​* 检查连接是否可用​
  56. ​*​
  57. ​* @param p 对象包装器​
  58. ​* @return {@code true} 可用,{@code false} 不可用​
  59. ​*/​
  60. ​@Override​
  61. ​public boolean validateObject(PooledObject<Sftp> p) {​
  62. ​if (p!=null) {​
  63. ​Sftp sftp = p.getObject();​
  64. ​if (sftp!=null) {​
  65. ​try {​
  66. ​sftp.getChannelSftp().cd("./");​
  67. ​log.info("验证连接是否可用,结果为true");​
  68. ​return true;​
  69. ​} catch (SftpException e) {​
  70. ​log.info("验证连接是否可用,结果为false",e);​
  71. ​return false;​
  72. ​}​
  73. ​}​
  74. ​}​
  75. ​log.info("验证连接是否可用,结果为false");​
  76. ​return false;​
  77. ​}​
  78. ​public static class Builder {​
  79. ​private String host;​
  80. ​private int port;​
  81. ​private String username;​
  82. ​private String password;​
  83. ​public SftpFactory build() {​
  84. ​return new SftpFactory(host, port, username, password);​
  85. ​}​
  86. ​public Builder host(String host) {​
  87. ​this.host = host;​
  88. ​return this;​
  89. ​}​
  90. ​public Builder port(int port) {​
  91. ​this.port = port;​
  92. ​return this;​
  93. ​}​
  94. ​public Builder username(String username) {​
  95. ​this.username = username;​
  96. ​return this;​
  97. ​}​
  98. ​public Builder password(String password) {​
  99. ​this.password = password;​
  100. ​return this;​
  101. ​}​
  102. ​}​
  103. ​}​

4.3 配置类SftpPoolConfig

配置类继承了GenericObjectPoolConfig,可继承该类的默认属性,也可自定义配置参数。

  1. ​public class SftpPoolConfig extends GenericObjectPoolConfig<Sftp> {​
  2. ​public static class Builder {​
  3. ​private int maxTotal;​
  4. ​private int maxIdle;​
  5. ​private int minIdle;​
  6. ​private boolean lifo;​
  7. ​private boolean fairness;​
  8. ​private long maxWaitMillis;​
  9. ​private long minEvictableIdleTimeMillis;​
  10. ​private long evictorShutdownTimeoutMillis;​
  11. ​private long softMinEvictableIdleTimeMillis;​
  12. ​private int numTestsPerEvictionRun;​
  13. ​private EvictionPolicy<Sftp> evictionPolicy; // 仅2.6.0版本commons-pool2需要设置​
  14. ​private String evictionPolicyClassName;​
  15. ​private boolean testOnCreate;​
  16. ​private boolean testOnBorrow;​
  17. ​private boolean testOnReturn;​
  18. ​private boolean testWhileIdle;​
  19. ​private long timeBetweenEvictionRunsMillis;​
  20. ​private boolean blockWhenExhausted;​
  21. ​private boolean jmxEnabled;​
  22. ​private String jmxNamePrefix;​
  23. ​private String jmxNameBase;​
  24. ​public SftpPoolConfig build() {​
  25. ​SftpPoolConfig config = new SftpPoolConfig();​
  26. ​config.setMaxTotal(maxTotal);​
  27. ​config.setMaxIdle(maxIdle);​
  28. ​config.setMinIdle(minIdle);​
  29. ​config.setLifo(lifo);​
  30. ​config.setFairness(fairness);​
  31. ​config.setMaxWaitMillis(maxWaitMillis);​
  32. ​config.setMinEvictableIdleTimeMillis(minEvictableIdleTimeMillis);​
  33. ​config.setEvictorShutdownTimeoutMillis(evictorShutdownTimeoutMillis);​
  34. ​config.setSoftMinEvictableIdleTimeMillis(softMinEvictableIdleTimeMillis);​
  35. ​config.setNumTestsPerEvictionRun(numTestsPerEvictionRun);​
  36. ​config.setEvictionPolicy(evictionPolicy);​
  37. ​config.setEvictionPolicyClassName(evictionPolicyClassName);​
  38. ​config.setTestOnCreate(testOnCreate);​
  39. ​config.setTestOnBorrow(testOnBorrow);​
  40. ​config.setTestOnReturn(testOnReturn);​
  41. ​config.setTestWhileIdle(testWhileIdle);​
  42. ​config.setTimeBetweenEvictionRunsMillis(timeBetweenEvictionRunsMillis);​
  43. ​config.setBlockWhenExhausted(blockWhenExhausted);​
  44. ​config.setJmxEnabled(jmxEnabled);​
  45. ​config.setJmxNamePrefix(jmxNamePrefix);​
  46. ​config.setJmxNameBase(jmxNameBase);​
  47. ​return config;​
  48. ​}​
  49. ​}​

4.4 SftpClient配置类

读取配置文件,创建SftpFactory、SftpPoolConfig、SftpPool,代码如下:

  1. ​@Configuration​
  2. ​@ConditionalOnClass(SftpPool.class)​
  3. ​@EnableConfigurationProperties(SftpClientProperties.class)​
  4. ​public class SftpClientAutoConfiguration {​
  5. ​@Bean​
  6. ​@ConditionalOnMissingBean​
  7. ​public ISftpClient sftpClient(SftpClientProperties sftpClientProperties) {​
  8. ​if (sftpClientProperties.isMultiple()) {​
  9. ​MultipleSftpClient multipleSftpClient = new MultipleSftpClient();​
  10. ​sftpClientProperties.getClients().forEach((name, properties) -> {​
  11. ​SftpFactory sftpFactory = createSftpFactory(properties);​
  12. ​SftpPoolConfig sftpPoolConfig = createSftpPoolConfig(properties);​
  13. ​SftpAbandonedConfig sftpAbandonedConfig = createSftpAbandonedConfig(properties);​
  14. ​SftpPool sftpPool = new SftpPool(sftpFactory, sftpPoolConfig, sftpAbandonedConfig);​
  15. ​ISftpClient sftpClient = new SftpClient(sftpPool);​
  16. ​multipleSftpClient.put(name, sftpClient);​
  17. ​});​
  18. ​return multipleSftpClient;​
  19. ​}​
  20. ​SftpFactory sftpFactory = createSftpFactory(sftpClientProperties);​
  21. ​SftpPoolConfig sftpPoolConfig = createSftpPoolConfig(sftpClientProperties);​
  22. ​SftpAbandonedConfig sftpAbandonedConfig = createSftpAbandonedConfig(sftpClientProperties);​
  23. ​SftpPool sftpPool = new SftpPool(sftpFactory, sftpPoolConfig, sftpAbandonedConfig);​
  24. ​return new SftpClient(sftpPool);​
  25. ​}​
  26. ​public SftpFactory createSftpFactory(SftpClientProperties properties) {​
  27. ​return new SftpFactory.Builder()​
  28. ​.host(properties.getHost())​
  29. ​.port(properties.getPort())​
  30. ​.username(properties.getUsername())​
  31. ​.password(properties.getPassword())​
  32. ​.build();​
  33. ​}​
  34. ​public SftpPoolConfig createSftpPoolConfig(SftpClientProperties properties) {​
  35. ​SftpClientProperties.Pool pool = properties.getPool();​
  36. ​return new SftpPoolConfig.Builder()​
  37. ​.maxTotal(pool.getMaxTotal())​
  38. ​.maxIdle(pool.getMaxIdle())​
  39. ​.minIdle(pool.getMinIdle())​
  40. ​.lifo(pool.isLifo())​
  41. ​.fairness(pool.isFairness())​
  42. ​.maxWaitMillis(pool.getMaxWaitMillis())​
  43. ​.minEvictableIdleTimeMillis(pool.getMinEvictableIdleTimeMillis())​
  44. ​.evictorShutdownTimeoutMillis(pool.getEvictorShutdownTimeoutMillis())​
  45. ​.softMinEvictableIdleTimeMillis(pool.getSoftMinEvictableIdleTimeMillis())​
  46. ​.numTestsPerEvictionRun(pool.getNumTestsPerEvictionRun())​
  47. ​.evictionPolicy(null)​
  48. ​.evictionPolicyClassName(DefaultEvictionPolicy.class.getName())​
  49. ​.testOnCreate(pool.isTestOnCreate())​
  50. ​.testOnBorrow(pool.isTestOnBorrow())​
  51. ​.testOnReturn(pool.isTestOnReturn())​
  52. ​.testWhileIdle(pool.isTestWhileIdle())​
  53. ​.timeBetweenEvictionRunsMillis(pool.getTimeBetweenEvictionRunsMillis())​
  54. ​.blockWhenExhausted(pool.isBlockWhenExhausted())​
  55. ​.jmxEnabled(pool.isJmxEnabled())​
  56. ​.jmxNamePrefix(pool.getJmxNamePrefix())​
  57. ​.jmxNameBase(pool.getJmxNameBase())​
  58. ​.build();​
  59. ​}​
  60. ​public SftpAbandonedConfig createSftpAbandonedConfig(SftpClientProperties properties) {​
  61. ​SftpClientProperties.Abandoned abandoned = properties.getAbandoned();​
  62. ​return new SftpAbandonedConfig.Builder()​
  63. ​.removeAbandonedOnBorrow(abandoned.isRemoveAbandonedOnBorrow())​
  64. ​.removeAbandonedOnMaintenance(abandoned.isRemoveAbandonedOnMaintenance())​
  65. ​.removeAbandonedTimeout(abandoned.getRemoveAbandonedTimeout())​
  66. ​.logAbandoned(abandoned.isLogAbandoned())​
  67. ​.requireFullStackTrace(abandoned.isRequireFullStackTrace())​
  68. ​.logWriter(new PrintWriter(System.out))​
  69. ​.useUsageTracking(abandoned.isUseUsageTracking())​
  70. ​.build();​
  71. ​}​
  72. ​}​

4.5 对象SftpClient

SftpClient是实际工作的类,从SftpClient 中可获取到一个sftp链接,使用完成后,归还给sftpPool。SftpClient代码如下:

  1. ​public class SftpClient implements ISftpClient {​
  2. ​private SftpPool sftpPool;​
  3. ​/**​
  4. ​* 从sftp连接池获取连接并执行操作​
  5. ​*​
  6. ​* @param handler sftp操作​
  7. ​*/​
  8. ​@Override​
  9. ​public void open(ISftpClient.Handler handler) {​
  10. ​Sftp sftp = null;​
  11. ​try {​
  12. ​sftp = sftpPool.borrowObject();​
  13. ​ISftpClient.Handler policyHandler = new DelegateHandler(handler);​
  14. ​policyHandler.doHandle(sftp);​
  15. ​} catch (Exception e) {​
  16. ​log.error("sftp异常:", e);​
  17. ​throw new BizException(ResultCodeEnum.SFTP_EXCEPTION);​
  18. ​} finally {​
  19. ​if (sftp != null) {​
  20. ​sftpPool.returnObject(sftp);​
  21. ​}​
  22. ​}​
  23. ​}​
  24. ​@AllArgsConstructor​
  25. ​static class DelegateHandler implements ISftpClient.Handler {​
  26. ​private ISftpClient.Handler target;​
  27. ​@Override​
  28. ​public void doHandle(Sftp sftp) {​
  29. ​try {​
  30. ​target.doHandle(sftp);​
  31. ​} catch (Exception e) {​
  32. ​log.error("sftp异常:", e);​
  33. ​throw new BizException(ResultCodeEnum.SFTP_EXCEPTION);​
  34. ​}​
  35. ​}​
  36. ​}​
  37. ​}​

4.6 实战代码示例

通过sftp上传文件到XX服务器

  1. ​//通过SFTP上传到XX​
  2. ​((MultipleSftpClient) sftpClient).choose("XX");​
  3. ​sftpClient.open(sftp -> {​
  4. ​boolean exist = sftp.isExist(inventoryPath);​
  5. ​if(!exist){​
  6. ​sftp.mkdirs(inventoryPath);​
  7. ​}​
  8. ​// 执行sftp操作​
  9. ​InputStream is = new FileInputStream(oneColumnCSVFile);​
  10. ​sftp.upload(inventoryPath, titleName, is);​
  11. ​log.info("inventory upload over");​
  12. ​});​

5 总结

通过本文的介绍可以知道,Apache Commons Pool定义了一个对象池的行为,提供了可扩展的配置类和对象工厂,封装了对象创建、从池中获取对象、归还对象的核心流程。还介绍了开源框架Jedis是如何基于GenericObjectPool来实现的连接池。最后介绍了国际物流履约系统中是如何基于GenericObjectPool来管理Sftp连接的。
掌握了GenericObjectPool的核心原理,我们就可以通过实现几个关键的接口,创建一个对象池管理工具,在项目中避免了对象的频繁创建和销毁,从而显著提升程序的性能。