我们从这段代码分析jedisPool:
JedisPool
@Test
public void checkJedisIsReusedWhenReturned() {
JedisPool pool = new JedisPool(new JedisPoolConfig(), hnp.getHost(), hnp.getPort());
Jedis jedis = pool.getResource();
jedis.auth("foobared");
jedis.set("foo", "0");
jedis.close();
jedis = pool.getResource();
jedis.auth("foobared");
jedis.incr("foo");
jedis.close();
pool.destroy();
assertTrue(pool.isClosed());
}
接着我们看这个Pool,是什么?怎么实现的?
JedisPool pool = new JedisPool(new JedisPoolConfig(), hnp.getHost(), hnp.getPort());
public class JedisPool extends JedisPoolAbstract {
.......
public JedisPool(final String host) {
URI uri = URI.create(host);
if (JedisURIHelper.isValid(uri)) {
String h = uri.getHost();
int port = uri.getPort();
String password = JedisURIHelper.getPassword(uri);
int database = JedisURIHelper.getDBIndex(uri);
// 这里创建了一个Pool,jedisPool使用org.apache.commons.pool2.impl.GenericObjectPool.GenericObjectPool<Jedis>实现
// 在父类Pool里声明一个protected GenericObjectPool<T>internalPool对象;(JedisPool extends JedisPoolAbstract extends Pool)
//GenericObjectPool(...)有两个参数,第二个是默认的池配置。稍后看一下第一个参数 JedisFactory
this.internalPool = new GenericObjectPool<Jedis>(new JedisFactory(h, port, Protocol.DEFAULT_TIMEOUT,
Protocol.DEFAULT_TIMEOUT, password, database, null), new GenericObjectPoolConfig());
} else {
this.internalPool = new GenericObjectPool<Jedis>(new JedisFactory(host, Protocol.DEFAULT_PORT,
Protocol.DEFAULT_TIMEOUT, Protocol.DEFAULT_TIMEOUT, null, Protocol.DEFAULT_DATABASE, null),
new GenericObjectPoolConfig());
}
}
.......
@Override
public Jedis getResource() {
//从池中获取jedis,父类又是如何获取的?稍后看一下它的父类Pool.java
Jedis jedis = super.getResource();
jedis.setDataSource(this);
return jedis;
}
JedisFactory.java
首先它继承自PooledObjectFactory:
package org.apache.commons.pool2;
public abstract interface PooledObjectFactory<T> {
// 创建池中对象
public abstract PooledObject<T> makeObject() throws Exception;
// 销毁池中对象
public abstract void destroyObject(PooledObject<T> paramPooledObject) throws Exception;
// 验证池中对象
public abstract boolean validateObject(PooledObject<T> paramPooledObject);
// 获取池中对象
public abstract void activateObject(PooledObject<T> paramPooledObject) throws Exception;
// 钝化池对象,未实现
public abstract void passivateObject(PooledObject<T> paramPooledObject) throws Exception;
}
我们来看JedisFactory的实现:
@Override
public PooledObject<Jedis> makeObject() throws Exception {
final HostAndPort hostAndPort = this.hostAndPort.get();
final Jedis jedis = new Jedis(hostAndPort.getHost(), hostAndPort.getPort(), connectionTimeout,
soTimeout);
try {
jedis.connect();
if (null != this.password) {
jedis.auth(this.password);
}
if (database != 0) {
jedis.select(database);
}
if (clientName != null) {
jedis.clientSetname(clientName);
}
} catch (JedisException je) {
jedis.close();
throw je;
}
return new DefaultPooledObject<Jedis>(jedis);
}
@Override
public void destroyObject(PooledObject<Jedis> pooledJedis) throws Exception {
final BinaryJedis jedis = pooledJedis.getObject();
if (jedis.isConnected()) {
try {
try {
jedis.quit();
} catch (Exception e) {
}
jedis.disconnect();
} catch (Exception e) {
}
}
}
@Override
public boolean validateObject(PooledObject<Jedis> pooledJedis) {
final BinaryJedis jedis = pooledJedis.getObject();
try {
HostAndPort hostAndPort = this.hostAndPort.get();
String connectionHost = jedis.getClient().getHost();
int connectionPort = jedis.getClient().getPort();
return hostAndPort.getHost().equals(connectionHost)
&& hostAndPort.getPort() == connectionPort && jedis.isConnected()
&& jedis.ping().equals("PONG");
} catch (final Exception e) {
return false;
}
}
}
@Override
public void activateObject(PooledObject<Jedis> pooledJedis) throws Exception {
//return
final BinaryJedis jedis = pooledJedis.getObject();
if (jedis.getDB() != database) {
jedis.select(database);
}
}
@Override
public void passivateObject(PooledObject<Jedis> pooledJedis) throws Exception {
// TODO maybe should select db 0? Not sure right now.
}
以上代码很清晰,不做解释。
好,池对象ok,我们看看jedisPool的父类Pool.java如何获取Jedis:
public T getResource() {
try {
// protected GenericObjectPool<T> internalPool;
return internalPool.borrowObject();
} catch (Exception e) {
throw new JedisConnectionException("Could not get a resource from the pool", e);
}
}
是通过调用GenericObjectPool.borrowObject():
GenericObjectPool.java
public class GenericObjectPool<T> extends BaseGenericObjectPool<T>
implements ObjectPool<T>, GenericObjectPoolMXBean, UsageTracking<T> {
private volatile String factoryType;
private volatile int maxIdle;
private volatile int minIdle;
private final PooledObjectFactory<T> factory;
private final Map<T, PooledObject<T>> allObjects;// 存放所有的Jedis包装类对象(DefaultPooledObject)
private final AtomicLong createCount;
private final LinkedBlockingDeque<PooledObject<T>> idleObjects;// 存放idle的Jedis包装类对象(DefaultPooledObject)
private static final String ONAME_BASE = "org.apache.commons.pool2:type=GenericObjectPool,name=";
private volatile AbandonedConfig abandonedConfig;
........
public T borrowObject(long borrowMaxWaitMillis) throws Exception {
assertOpen();
AbandonedConfig ac = this.abandonedConfig;
if ((ac != null) && (ac.getRemoveAbandonedOnBorrow()) && (getNumIdle() < 2)
&& (getNumActive() > getMaxTotal() - 3)) {
removeAbandoned(ac);
}
PooledObject p = null;
boolean blockWhenExhausted = getBlockWhenExhausted();
long waitTime = System.currentTimeMillis();
while (p == null) {
boolean create = false;
if (blockWhenExhausted) {
// 轮询获取池中对象,也就是PooledObject;PooledObject可以理解为jedis的包装对象
p = (PooledObject) this.idleObjects.pollFirst();
if (p == null) {
// 如果没有就创建并返回这个PooledObject对象
p = create();
if (p != null) {
create = true;
}
}
if (p == null) {
if (borrowMaxWaitMillis < 0L)
p = (PooledObject) this.idleObjects.takeFirst();
else {
p = (PooledObject) this.idleObjects.pollFirst(borrowMaxWaitMillis, TimeUnit.MILLISECONDS);
}
}
if (p == null) {
throw new NoSuchElementException("Timeout waiting for idle object");
}
if (!(p.allocate()))
p = null;
} else {
p = (PooledObject) this.idleObjects.pollFirst();
if (p == null) {
p = create();
if (p != null) {
create = true;
}
}
if (p == null) {
throw new NoSuchElementException("Pool exhausted");
}
if (!(p.allocate())) {
p = null;
}
}
if (p == null)
continue;
try {
this.factory.activateObject(p);
} catch (Exception e) {
try {
destroy(p);
} catch (Exception e1) {
}
p = null;
if (create) {
NoSuchElementException nsee = new NoSuchElementException("Unable to activate object");
nsee.initCause(e);
throw nsee;
}
}
if ((p != null) && (((getTestOnBorrow()) || ((create) && (getTestOnCreate())))))
;
boolean validate = false;
Throwable validationThrowable = null;
try {
// 验证有效
validate = this.factory.validateObject(p);
} catch (Throwable t) {
PoolUtils.checkRethrow(t);
validationThrowable = t;
}
if (!(validate)) {
try {
destroy(p);
this.destroyedByBorrowValidationCount.incrementAndGet();
} catch (Exception e) {
}
p = null;
if (create) {
NoSuchElementException nsee = new NoSuchElementException("Unable to validate object");
nsee.initCause(validationThrowable);
throw nsee;
}
}
}
updateStatsBorrow(p, System.currentTimeMillis() - waitTime);
// 最终返回这个jedis,当使用完调用closes时放入LinkedBlockingDeque<PooledObject<T>>
// idleObjects中
return p.getObject();
}
一个UnitTest加深理解:
@Test
public void checkJedisIsReusedWhenReturned() {
JedisPool pool = new JedisPool(new JedisPoolConfig(), hnp.getHost(), hnp.getPort());
Jedis jedis = pool.getResource();// 产生一个Jedis包装类,并getObject()返回Jedis.
// jedis.auth("foobared");
Jedis jedis2 = pool.getResource();
jedis.set("foo", "0");
jedis.set("foo2", "0");
System.out.println(jedis);// redis.clients.jedis.Jedis@19e3cd51
System.out.println(jedis2);// redis.clients.jedis.Jedis@3abc8e1e
// 现在池中有两个jedis对象
jedis.close();
jedis = pool.getResource();
System.out.println(jedis);// redis.clients.jedis.Jedis@19e3cd51;返回其中一个并使用
// jedis.auth("foobared");
jedis.incr("foo");
jedis.close();
pool.destroy();
assertTrue(pool.isClosed());
}