Jedis超时时间设置梳理

时间:2021-07-19 22:22:26

JedisConnectionException: Unexpected end of stream #932

Repeatable exception and for the life of me, I cannot find something I'm doing wrong.

redis.clients.jedis.exceptions.JedisConnectionException: Unexpected end of stream.
at redis.clients.util.RedisInputStream.ensureFill(RedisInputStream.java:198)
at redis.clients.util.RedisInputStream.read(RedisInputStream.java:180)
at redis.clients.jedis.Protocol.processBulkReply(Protocol.java:158)
at redis.clients.jedis.Protocol.process(Protocol.java:132)
at redis.clients.jedis.Protocol.processMultiBulkReply(Protocol.java:183)
at redis.clients.jedis.Protocol.process(Protocol.java:134)
at redis.clients.jedis.Protocol.read(Protocol.java:192)
at redis.clients.jedis.Connection.readProtocolWithCheckingBroken(Connection.java:282)
at redis.clients.jedis.Connection.getRawObjectMultiBulkReply(Connection.java:227)
at redis.clients.jedis.JedisPubSub.process(JedisPubSub.java:108)
at redis.clients.jedis.JedisPubSub.proceedWithPatterns(JedisPubSub.java:95)
at redis.clients.jedis.Jedis.psubscribe(Jedis.java:2513)
at BenchRedisConsumer$BenchRunner.run(BenchRedisConsumer.java:208)
at java.lang.Thread.run(Thread.java:745)
Running redis version 2.8.19 on Linux 3.16.0-33-generic #44-Ubuntu SMP Thu Mar 12 12:19:35 UTC 2015 x86_64 x86_64 x86_64 GNU/Linux
Java version: java version "1.7.0_76"
Java(TM) SE Runtime Environment (build 1.7.0_76-b13)
Java HotSpot(TM) 64-Bit Server VM (build 24.76-b04, mixed mode)

Run the redis consumer followed by the producer of the project here:https://github.com/Climax777/message-queue-bench

client-output-buffer-limit was the cause. redis-server closed the connections, leading to the exceptions.

https://github.com/xetorthio/jedis/issues/932

http://*.com/questions/2309561/how-to-fix-java-net-socketexception-broken-pipe

redis.clients.jedis.JedisPoolConfig

package redis.clients.jedis;

import org.apache.commons.pool2.impl.GenericObjectPoolConfig;

public class JedisPoolConfig extends GenericObjectPoolConfig {
public JedisPoolConfig() {
// defaults to make your life with connection pool easier :)
setTestWhileIdle(true);
setMinEvictableIdleTimeMillis(60000);
setTimeBetweenEvictionRunsMillis(30000);
setNumTestsPerEvictionRun(-1);
}
}

org.apache.commons.pool2.impl.GenericKeyedObjectPool

从Pool(也就是 LinkedBlockingDeque<PooledObject<T>>)中获取一个PooledObject<T> 需要等待的时间。

来自

    public GenericKeyedObjectPool(KeyedPooledObjectFactory<K,T> factory,
GenericKeyedObjectPoolConfig config) { super(config, ONAME_BASE, config.getJmxNamePrefix()); if (factory == null) {
jmxUnregister(); // tidy up
throw new IllegalArgumentException("factory may not be null");
}
this.factory = factory;
this.fairness = config.getFairness(); setConfig(config); startEvictor(getTimeBetweenEvictionRunsMillis());
}
public void setConfig(GenericKeyedObjectPoolConfig conf) {
setLifo(conf.getLifo());
setMaxIdlePerKey(conf.getMaxIdlePerKey());
setMaxTotalPerKey(conf.getMaxTotalPerKey());
setMaxTotal(conf.getMaxTotal());
setMinIdlePerKey(conf.getMinIdlePerKey());
setMaxWaitMillis(conf.getMaxWaitMillis());
setBlockWhenExhausted(conf.getBlockWhenExhausted());
setTestOnCreate(conf.getTestOnCreate());
setTestOnBorrow(conf.getTestOnBorrow());
setTestOnReturn(conf.getTestOnReturn());
setTestWhileIdle(conf.getTestWhileIdle());
setNumTestsPerEvictionRun(conf.getNumTestsPerEvictionRun());
setMinEvictableIdleTimeMillis(conf.getMinEvictableIdleTimeMillis());
setSoftMinEvictableIdleTimeMillis(
conf.getSoftMinEvictableIdleTimeMillis());
setTimeBetweenEvictionRunsMillis(
conf.getTimeBetweenEvictionRunsMillis());
setEvictionPolicyClassName(conf.getEvictionPolicyClassName());
}

org.apache.commons.pool2.impl.GenericObjectPool

    /**
* Borrow an object from the pool using the specific waiting time which only
* applies if {@link #getBlockWhenExhausted()} is true.
* <p>
* If there is one or more idle instance available in the pool, then an
* idle instance will be selected based on the value of {@link #getLifo()},
* activated and returned. If activation fails, or {@link #getTestOnBorrow()
* testOnBorrow} is set to <code>true</code> and validation fails, the
* instance is destroyed and the next available instance is examined. This
* continues until either a valid instance is returned or there are no more
* idle instances available.
* <p>
* If there are no idle instances available in the pool, behavior depends on
* the {@link #getMaxTotal() maxTotal}, (if applicable)
* {@link #getBlockWhenExhausted()} and the value passed in to the
* <code>borrowMaxWaitMillis</code> parameter. If the number of instances
* checked out from the pool is less than <code>maxTotal,</code> a new
* instance is created, activated and (if applicable) validated and returned
* to the caller. If validation fails, a <code>NoSuchElementException</code>
* is thrown.
* <p>
* If the pool is exhausted (no available idle instances and no capacity to
* create new ones), this method will either block (if
* {@link #getBlockWhenExhausted()} is true) or throw a
* <code>NoSuchElementException</code> (if
* {@link #getBlockWhenExhausted()} is false). The length of time that this
* method will block when {@link #getBlockWhenExhausted()} is true is
* determined by the value passed in to the <code>borrowMaxWaitMillis</code>
* parameter.
* <p>
* When the pool is exhausted, multiple calling threads may be
* simultaneously blocked waiting for instances to become available. A
* "fairness" algorithm has been implemented to ensure that threads receive
* available instances in request arrival order.
*
* @param borrowMaxWaitMillis The time to wait in milliseconds for an object
* to become available
*
* @return object instance from the pool
*
* @throws NoSuchElementException if an instance cannot be returned
*
* @throws Exception if an object instance cannot be returned due to an
* error
*/
public T borrowObject(long borrowMaxWaitMillis) throws Exception {
assertOpen(); AbandonedConfig ac = this.abandonedConfig;
if (ac != null && ac.getRemoveAbandonedOnBorrow() &&
(getNumIdle() < 2) &&
(getNumActive() > getMaxTotal() - 3) ) {
removeAbandoned(ac);
} PooledObject<T> p = null; // Get local copy of current config so it is consistent for entire
// method execution
boolean blockWhenExhausted = getBlockWhenExhausted(); boolean create;
long waitTime = System.currentTimeMillis(); while (p == null) {
create = false;
if (blockWhenExhausted) {
p = idleObjects.pollFirst();
if (p == null) {
p = create();
if (p != null) {
create = true;
}
}
if (p == null) {
if (borrowMaxWaitMillis < 0) {
p = idleObjects.takeFirst();
} else {
p = idleObjects.pollFirst(borrowMaxWaitMillis,
TimeUnit.MILLISECONDS);
}
}
if (p == null) {
throw new NoSuchElementException(
"Timeout waiting for idle object"
);
}
if (!p.allocate()) {
p = null;
}
} else {
p = idleObjects.pollFirst();
if (p == null) {
p = create();
if (p != null) {
create = true;
}
}
if (p == null) {
throw new NoSuchElementException("Pool exhausted");
}
if (!p.allocate()) {
p = null;
}
} if (p != null) {
try {
factory.activateObject(p);
} catch (Exception e) {
try {
destroy(p);
} catch (Exception e1) {
// Ignore - activation failure is more important
}
p = null;
if (create) {
NoSuchElementException nsee = new NoSuchElementException(
"Unable to activate object");
nsee.initCause(e);
throw nsee;
}
}
if (p != null && (getTestOnBorrow() || create && getTestOnCreate())) {
boolean validate = false;
Throwable validationThrowable = null;
try {
validate = factory.validateObject(p);
} catch (Throwable t) {
PoolUtils.checkRethrow(t);
validationThrowable = t;
}
if (!validate) {
try {
destroy(p);
destroyedByBorrowValidationCount.incrementAndGet();
} catch (Exception e) {
// Ignore - validation failure is more important
}
p = null;
if (create) {
NoSuchElementException nsee = new NoSuchElementException(
"Unable to validate object");
nsee.initCause(validationThrowable);
throw nsee;
}
}
}
}
} updateStatsBorrow(p, System.currentTimeMillis() - waitTime); return p.getObject();
}

socketTimeOut

redis.clients.jedis.JedisPool

  public JedisPool(final GenericObjectPoolConfig poolConfig, final String host, int port,
int timeout, final String password) {
this(poolConfig, host, port, timeout, password, Protocol.DEFAULT_DATABASE, null);
}
  public JedisPool(final GenericObjectPoolConfig poolConfig, final String host, int port,
int timeout, final String password, final int database, final String clientName) {
super(poolConfig, new JedisFactory(host, port, timeout, password, database, clientName));
}

redis.clients.jedis.JedisFactory

  @Override
public PooledObject<Jedis> makeObject() throws Exception {
final HostAndPort hostAndPort = this.hostAndPort.get();
final Jedis jedis = new Jedis(hostAndPort.getHost(), hostAndPort.getPort(), this.timeout); jedis.connect();
if (null != this.password) {
jedis.auth(this.password);
}
if (database != 0) {
jedis.select(database);
}
if (clientName != null) {
jedis.clientSetname(clientName);
} return new DefaultPooledObject<Jedis>(jedis);
}

redis.clients.jedis.Jedis

  public Jedis(final String host, final int port, final int timeout) {
super(host, port, timeout);
}

redis.clients.jedis.BinaryJedis

  public BinaryJedis(final String host, final int port, final int timeout) {
client = new Client(host, port);
client.setConnectionTimeout(timeout);
client.setSoTimeout(timeout);
}

redis.clients.jedis.Connection

 public void connect() {
if (!isConnected()) {
try {
socket = new Socket();
// ->@wjw_add
socket.setReuseAddress(true);
socket.setKeepAlive(true); // Will monitor the TCP connection is
// valid
socket.setTcpNoDelay(true); // Socket buffer Whetherclosed, to
// ensure timely delivery of data
socket.setSoLinger(true, 0); // Control calls close () method,
// the underlying socket is closed
// immediately
// <-@wjw_add socket.connect(new InetSocketAddress(host, port), connectionTimeout);
socket.setSoTimeout(soTimeout); if (ssl) {
if (null == sslSocketFactory) {
sslSocketFactory = (SSLSocketFactory)SSLSocketFactory.getDefault();
}
socket = (SSLSocket) sslSocketFactory.createSocket(socket, host, port, true);
if (null != sslParameters) {
((SSLSocket) socket).setSSLParameters(sslParameters);
}
if ((null != hostnameVerifier) &&
(!hostnameVerifier.verify(host, ((SSLSocket) socket).getSession()))) {
String message = String.format(
"The connection to '%s' failed ssl/tls hostname verification.", host);
throw new JedisConnectionException(message);
}
} outputStream = new RedisOutputStream(socket.getOutputStream());
inputStream = new RedisInputStream(socket.getInputStream());
} catch (IOException ex) {
broken = true;
throw new JedisConnectionException(ex);
}
}
}
java.net.Socket
public void connect(@NotNull java.net.SocketAddress endpoint,
int timeout)
throws java.io.IOException
Connects this socket to the server with a specified timeout value. A timeout of zero is interpreted as an infinite timeout. The connection will then block until established or an error occurs.
Parameters:
endpoint - the SocketAddress
timeout - the timeout value to be used in milliseconds.
Throws:
java.io.IOException - if an error occurs during the connection
java.net.SocketTimeoutException - if timeout expires before connecting
java.nio.channels.IllegalBlockingModeException - if this socket has an associated channel, and the channel is in non-blocking mode
IllegalArgumentException - if endpoint is null or is a SocketAddress subclass not supported by this socket
Since:
1.4
java.net.Socket
public void setSoTimeout(int timeout)
throws java.net.SocketException
Enable/disable SO_TIMEOUT with the specified timeout, in milliseconds. With this option set to a non-zero timeout, a read() call on the InputStream associated with this Socket will block for only this amount of time. If the timeout expires, a java.net.SocketTimeoutException is raised, though the Socket is still valid. The option must be enabled prior to entering the blocking operation to have effect. The timeout must be > 0. A timeout of zero is interpreted as an infinite timeout.
Parameters:
timeout - the specified timeout, in milliseconds.
Throws:
java.net.SocketException - if there is an error in the underlying protocol, such as a TCP error.
Since:
JDK 1.1