记一次kafka消息积压的排查

时间:2024-03-13 14:34:15
import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.net.ssl.SSLContext; import java.io.IOException; import java.security.KeyManagementException; import java.security.NoSuchAlgorithmException; /** * @version 1.0 */ public class HttpFluentUtil { private Logger logger = LoggerFactory.getLogger(HttpFluentUtil.class); private final static int MaxPerRoute = 100; private final static int MaxTotal = 200; final static PoolingHttpClientConnectionManager CONNMGR; final static HttpClient CLIENT; final static Executor executor; static { LayeredConnectionSocketFactory ssl = null; try { ssl = SSLConnectionSocketFactory.getSystemSocketFactory(); } catch (final SSLInitializationException ex) { final SSLContext sslcontext; try { sslcontext = SSLContext.getInstance(SSLConnectionSocketFactory.TLS); sslcontext.init(null, null, null); ssl = new SSLConnectionSocketFactory(sslcontext); } catch (final SecurityException ignore) { } catch (final KeyManagementException ignore) { } catch (final NoSuchAlgorithmException ignore) { } } final Registry<ConnectionSocketFactory> sfr = RegistryBuilder.<ConnectionSocketFactory>create() .register("http", PlainConnectionSocketFactory.getSocketFactory()) .register("https", ssl != null ? ssl : SSLConnectionSocketFactory.getSocketFactory()).build(); CONNMGR = new PoolingHttpClientConnectionManager(sfr); CONNMGR.setDefaultMaxPerRoute(MaxPerRoute); CONNMGR.setMaxTotal(MaxTotal); CLIENT = HttpClientBuilder.create().setConnectionManager(CONNMGR).build(); executor = Executor.newInstance(CLIENT); } public static String Get(String uri, int connectTimeout, int socketTimeout) throws IOException { return executor.execute(Request.Get(uri).connectTimeout(connectTimeout).socketTimeout(socketTimeout)) .returnContent().asString(); } public static String Post(String uri, int connectTimeout, int socketTimeout) throws IOException { return executor.execute(Request.Post(uri).socketTimeout(socketTimeout) ).returnContent().asString(); } }