记一次kafka消息积压的排查
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.net.ssl.SSLContext;
import java.io.IOException;
import java.security.KeyManagementException;
import java.security.NoSuchAlgorithmException;
/**
* @version 1.0
*/
public class HttpFluentUtil {
private Logger logger = LoggerFactory.getLogger(HttpFluentUtil.class);
private final static int MaxPerRoute = 100;
private final static int MaxTotal = 200;
final static PoolingHttpClientConnectionManager CONNMGR;
final static HttpClient CLIENT;
final static Executor executor;
static {
LayeredConnectionSocketFactory ssl = null;
try {
ssl = SSLConnectionSocketFactory.getSystemSocketFactory();
} catch (final SSLInitializationException ex) {
final SSLContext sslcontext;
try {
sslcontext = SSLContext.getInstance(SSLConnectionSocketFactory.TLS);
sslcontext.init(null, null, null);
ssl = new SSLConnectionSocketFactory(sslcontext);
} catch (final SecurityException ignore) {
} catch (final KeyManagementException ignore) {
} catch (final NoSuchAlgorithmException ignore) {
}
}
final Registry<ConnectionSocketFactory> sfr = RegistryBuilder.<ConnectionSocketFactory>create()
.register("http", PlainConnectionSocketFactory.getSocketFactory())
.register("https", ssl != null ? ssl : SSLConnectionSocketFactory.getSocketFactory()).build();
CONNMGR = new PoolingHttpClientConnectionManager(sfr);
CONNMGR.setDefaultMaxPerRoute(MaxPerRoute);
CONNMGR.setMaxTotal(MaxTotal);
CLIENT = HttpClientBuilder.create().setConnectionManager(CONNMGR).build();
executor = Executor.newInstance(CLIENT);
}
public static String Get(String uri, int connectTimeout, int socketTimeout) throws IOException {
return executor.execute(Request.Get(uri).connectTimeout(connectTimeout).socketTimeout(socketTimeout))
.returnContent().asString();
}
public static String Post(String uri, int connectTimeout, int socketTimeout)
throws IOException {
return executor.execute(Request.Post(uri).socketTimeout(socketTimeout)
).returnContent().asString();
}
}