最近在做Druid大数据实时分析系统,以Http的方式向Druid集群写入数据,采用HttpClient向集群提交数据,由于数据量较大,采用多线程的方式开启了10个线程后,发现单个的HttpClient会出现连接超时,效率不高,所以想用到连接池的方式提高效率下面是一个简单的HttpClient连接池的实现:
package com.ssm.httpclient.utils;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.config.Registry;
import org.apache.http.config.RegistryBuilder;
import org.apache.http.config.SocketConfig;
import org.apache.http.conn.DnsResolver;
import org.apache.http.conn.HttpConnectionFactory;
import org.apache.http.conn.ManagedHttpClientConnection;
import org.apache.http.conn.routing.HttpRoute;
import org.apache.http.conn.socket.ConnectionSocketFactory;
import org.apache.http.conn.socket.PlainConnectionSocketFactory;
import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
import org.apache.http.impl.DefaultConnectionReuseStrategy;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.DefaultConnectionKeepAliveStrategy;
import org.apache.http.impl.client.DefaultHttpRequestRetryHandler;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.impl.conn.DefaultHttpResponseParserFactory;
import org.apache.http.impl.conn.ManagedHttpClientConnectionFactory;
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
import org.apache.http.impl.conn.SystemDefaultDnsResolver;
import org.apache.http.impl.io.DefaultHttpRequestWriterFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class HttpClientUtils {
private static final Logger LOG = LoggerFactory.getLogger(HttpClientUtils.class);
static PoolingHttpClientConnectionManager manager = null;
static CloseableHttpClient httpClient = null;
public static synchronized CloseableHttpClient getHttpClient(){
if(httpClient == null){
//注册访问协议相关的Socket工厂
Registry<ConnectionSocketFactory> socketFactoryRegistry = RegistryBuilder.<ConnectionSocketFactory>create()
.register("http", PlainConnectionSocketFactory.INSTANCE)
.register("https", SSLConnectionSocketFactory.getSystemSocketFactory())
.build();
//HttpConnection 工厂:配置写请求/解析响应处理器
HttpConnectionFactory<HttpRoute, ManagedHttpClientConnection> connectionFactory
= new ManagedHttpClientConnectionFactory(DefaultHttpRequestWriterFactory.INSTANCE,
DefaultHttpResponseParserFactory.INSTANCE);
//DNS 解析器
DnsResolver dnsResolver = SystemDefaultDnsResolver.INSTANCE;
//创建池化连接管理器
manager = new PoolingHttpClientConnectionManager(socketFactoryRegistry,connectionFactory,dnsResolver);
//默认为Socket配置
SocketConfig defaultSocketConfig = SocketConfig.custom().setTcpNoDelay(true).build();
manager.setDefaultSocketConfig(defaultSocketConfig);
manager.setMaxTotal(300); //设置整个连接池的最大连接数
//每个路由的默认最大连接,每个路由实际最大连接数由DefaultMaxPerRoute控制,而MaxTotal是整个池子的最大数
//设置过小无法支持大并发(ConnectionPoolTimeoutException) Timeout waiting for connection from pool
manager.setDefaultMaxPerRoute(200);//每个路由的最大连接数
//在从连接池获取连接时,连接不活跃多长时间后需要进行一次验证,默认为2s
manager.setValidateAfterInactivity(5*1000);
//默认请求配置
RequestConfig defaultRequestConfig = RequestConfig.custom()
.setConnectTimeout(2*1000) //设置连接超时时间,2s
.setSocketTimeout(5*1000) //设置等待数据超时时间,5s
.setConnectionRequestTimeout(2000) //设置从连接池获取连接的等待超时时间
.build();
//创建HttpClient
httpClient = HttpClients.custom()
.setConnectionManager(manager)
.setConnectionManagerShared(false) //连接池不是共享模式
.evictIdleConnections(60, TimeUnit.SECONDS) //定期回收空闲连接
.evictExpiredConnections()// 定期回收过期连接
.setConnectionTimeToLive(60, TimeUnit.SECONDS) //连接存活时间,如果不设置,则根据长连接信息决定
.setDefaultRequestConfig(defaultRequestConfig) //设置默认请求配置
.setConnectionReuseStrategy(DefaultConnectionReuseStrategy.INSTANCE) //连接重用策略,即是否能keepAlive
.setKeepAliveStrategy(DefaultConnectionKeepAliveStrategy.INSTANCE) //长连接配置,即获取长连接生产多长时间
.setRetryHandler(new DefaultHttpRequestRetryHandler(0, false)) //设置重试次数,默认是3次,当前是禁用掉(根据需要开启)
.build();
//JVM 停止或重启时,关闭连接池释放掉连接(跟数据库连接池类似)
Runtime.getRuntime().addShutdownHook(new Thread(){
@Override
public void run() {
try{
if(httpClient !=null){
httpClient.close();
}
}catch(IOException e){
LOG.error("error when close httpClient:{}",e);
}
}
});
}
return httpClient;
}
}
基于HttpClientUtils的测试
package com.ssm.httpclient.utils;
import java.io.IOException;
import java.nio.charset.Charset;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
import org.apache.http.HttpResponse;
import org.apache.http.HttpStatus;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.util.EntityUtils;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
public class HttpClientExample {
public static void main(String[] args) {
final ThreadFactory threadFactory = new ThreadFactoryBuilder()
.setNameFormat("Orders-%d").build();
ExecutorService executorService = Executors.newFixedThreadPool(10,threadFactory);
List<Future<?>> list = Lists.newArrayList();
Long startTime = System.currentTimeMillis();
Future<?> future;
for(int i=0;i<100;i++){
future = executorService.submit((new HttpGetTest()));
list.add(future);
}
for(Future<?> future2:list){
try {
future2.get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
System.out.println("total cost times: "+(System.currentTimeMillis()-startTime));
executorService.shutdown();
}
}
class HttpGetTest implements Runnable {
public void run() {
HttpResponse response = null;
try {
HttpGet get = new HttpGet("https://baike.baidu.com/item/httpclient/5766483?fr=aladdin");
response = HttpClientUtils.getHttpClient().execute(get);
if (response.getStatusLine().getStatusCode() != HttpStatus.SC_OK) {
EntityUtils.consume(response.getEntity());
} else {
String result = EntityUtils.toString(response.getEntity(),Charset.forName("utf-8"));
System.out.println(result);
}
} catch (Exception e) {
if (response != null) {
try {
EntityUtils.consume(response.getEntity());
} catch (IOException e1) {
e1.printStackTrace();
}
}
}
}
}
total cost times: 3248
一百个线程请求花了3s 多,性能还是可以