public abstract class BaseHttpClientService {
@Setter /* 读写超时时间 **/
protected int socketTimeout = 1000;
@Setter /* ConnectionManager获取连接的超时时间 **/
protected int connectionRequestTimeout = 1000;
@Setter /* 连接超时时间 **/
protected int connectTimeout = 1000;
@Setter /* 最大链接数 **/
protected int maxRoute = 2048;
@Setter /* 单个域名最大链接数 **/
protected int perRoute = 1024;
public abstract Future<HttpResponse> execute(final HttpUriRequest request);
public abstract Future<HttpResponse> execute(final HttpUriRequest request, FutureCallback<HttpResponse> callback);
public abstract void idleConnectionMonitor(long idleTime, TimeUnit timeUnit);
public abstract void start();
public abstract void shutdown();
public BaseHttpClientService withMaxRoute(int maxRoute) {
this.maxRoute = maxRoute;
return this;
}
public BaseHttpClientService withPreRoute(int perRoute) {
this.perRoute = perRoute;
return this;
}
public BaseHttpClientService withSocketTimeout(int socketTimeout) {
this.socketTimeout = socketTimeout;
return this;
}
public BaseHttpClientService withConnectionRequestTimeout(int connectionRequestTimeout) {
this.connectionRequestTimeout = connectionRequestTimeout;
return this;
}
public BaseHttpClientService withConnectTimeout(int connectTimeout) {
this.connectTimeout = connectTimeout;
return this;
}
}
public class HttpClientService extends BaseHttpClientService {
private static SSLContext ctx = null;
@Getter
private CloseableHttpClient httpClient;
@Getter
private HttpClientConnectionManager httpClientConnectionManager;
@Override
public Future<HttpResponse> execute(HttpUriRequest request) {
CloseableHttpResponse httpResponse = null;
try {
httpResponse = httpClient.execute(request);
ListenableFuture<HttpResponse> future = Futures.immediateFuture(httpResponse);
return future;
} catch (Exception e) {
return Futures.immediateFailedFuture(e);
}
}
@Override
public Future<HttpResponse> execute(HttpUriRequest request, FutureCallback<HttpResponse> callback) {
try {
CloseableHttpResponse httpResponse = httpClient.execute(request);
ListenableFuture<HttpResponse> future = Futures.immediateFuture(httpResponse);
if (callback != null) {
future.addListener(() -> {
callback.completed(httpResponse);
}, MoreExecutors.directExecutor());
}
return future;
} catch (Exception e) {
ListenableFuture<HttpResponse> future = Futures.immediateFailedFuture(e);
if (callback != null) {
future.addListener(() -> {
callback.failed(e);
}, MoreExecutors.directExecutor());
}
return future;
}
}
@Override
public void start() {
final PoolingHttpClientConnectionManager connectionManager = new DefaultPoolingHttpClientConnectionManager(60, TimeUnit.SECONDS);
// 最大连接数
connectionManager.setMaxTotal(maxRoute);
// 每个路由的最大连接
connectionManager.setDefaultMaxPerRoute(perRoute);
connectionManager.setDefaultSocketConfig(SocketConfig.custom().setTcpNoDelay(true).build());
//在从连接池获取连接时,连接不活跃多长时间后需要进行一次验证,默认为2s
connectionManager.setValidateAfterInactivity(-1);
RequestConfig requestConfig = RequestConfig.custom()
.setSocketTimeout(socketTimeout) //读写超时时间
.setConnectTimeout(connectTimeout) //连接超时时间
.setConnectionRequestTimeout(connectionRequestTimeout) //ConnectionManager获取连接的超时时间
.setStaleConnectionCheckEnabled(false) //关闭stale连接检查
.setCookieSpec(CookieSpecs.IGNORE_COOKIES)//关闭cookie,启动手动cookie模式
.build();
ConnectionConfig connectionConfig = ConnectionConfig.custom()
.setCharset(Consts.UTF_8)
.setBufferSize(HttpIOConfigConstant.SOCKET_BUFFER_SIZE)
.build();
final ConnectionKeepAliveStrategy keepAliveStrategy = keepAliveStrategy();
httpClient = HttpClients.custom()
.setConnectionManager(connectionManager)
.setConnectionManagerShared(false)
.evictIdleConnections(60, TimeUnit.SECONDS)
.evictExpiredConnections()
.setConnectionTimeToLive(60, TimeUnit.SECONDS)
.setRetryHandler(new DefaultHttpRequestRetryHandler(0, false))
.setKeepAliveStrategy(keepAliveStrategy)
.setDefaultRequestConfig(requestConfig)
.setDefaultConnectionConfig(connectionConfig)
.build();
httpClientConnectionManager = connectionManager;
}
@Override
public void idleConnectionMonitor(long idleTime, TimeUnit timeUnit) {
// Close expired connections
httpClientConnectionManager.closeExpiredConnections();
// Optionally, close connections
// that have been idle longer than 30 sec
httpClientConnectionManager.closeIdleConnections(idleTime, timeUnit);
}
@Override
public void shutdown() {
try {
httpClient.close();
} catch (IOException e) {
System.out.println("关闭httpClient异常:" + e.getMessage());
}
}
private static ConnectionKeepAliveStrategy keepAliveStrategy() {
return new DefaultConnectionKeepAliveStrategy() {
@Override
public long getKeepAliveDuration(final HttpResponse response, final HttpContext context) {
long keepAlive = super.getKeepAliveDuration(response, context);
if (keepAlive == -1) {
keepAlive = 60000;
}
return keepAlive;
}
};
}
private static class DefaultTrustManager implements X509TrustManager {
private DefaultTrustManager() {}
public X509Certificate[] getAcceptedIssuers() {
return null;
}
public void checkClientTrusted(X509Certificate[] chain, String authType) throws CertificateException {}
public void checkServerTrusted(X509Certificate[] chain, String authType) throws CertificateException {}
}
private static class DefaultPoolingHttpClientConnectionManager extends PoolingHttpClientConnectionManager {
DefaultPoolingHttpClientConnectionManager(long timeToLive, TimeUnit timeUnit) {
super(getDefaultRegistry(), null, null, null, timeToLive, timeUnit);
}
}
private static Registry<ConnectionSocketFactory> getDefaultRegistry() {
return RegistryBuilder.<ConnectionSocketFactory>create()
.register("http", PlainConnectionSocketFactory.getSocketFactory())
.register("https", new SSLConnectionSocketFactory(ctx))
.build();
}
public static DefaultProxyRoutePlanner getProxyRoutePlanner(String proxyHost, int proxyPort) {
HttpHost proxy = new HttpHost(proxyHost, proxyPort);
return new DefaultProxyRoutePlanner(proxy);
}
static {
try {
ctx = SSLContext.getInstance("TLS");
ctx.init(new KeyManager[0], new TrustManager[]{new DefaultTrustManager()}, new SecureRandom());
ctx.getClientSessionContext().setSessionTimeout(15);
ctx.getClientSessionContext().setSessionCacheSize(1000);
ctx.getSocketFactory();
} catch (Exception var1) {
}
}
}
public class HttpAsyncClientService extends BaseHttpClientService {
@Getter
private CloseableHttpAsyncClient httpAsyncClient;
@Getter
private NHttpClientConnectionManager httpClientConnectionManager;
@Override
public Future<HttpResponse> execute(HttpUriRequest request) {
return httpAsyncClient.execute(request, null);
}
@Override
public Future<HttpResponse> execute(HttpUriRequest request, FutureCallback<HttpResponse> callback) {
return httpAsyncClient.execute(request, callback);
}
@Override
public void start() {
//配置io线程
IOReactorConfig ioReactorConfig = IOReactorConfig.custom().
setIoThreadCount(Runtime.getRuntime().availableProcessors())
.setSoKeepAlive(true)
.setTcpNoDelay(true)
.setSoTimeout(socketTimeout)
.build();
ConnectionConfig connectionConfig = ConnectionConfig.custom()
.setCharset(Consts.UTF_8)
.setBufferSize(HttpIOConfigConstant.SOCKET_BUFFER_SIZE)
.build();
RequestConfig requestConfig = RequestConfig.custom()
.setSocketTimeout(socketTimeout) //读写超时时间
.setConnectTimeout(connectTimeout) //连接超时时间
.setConnectionRequestTimeout(connectionRequestTimeout) //ConnectionManager获取连接的超时时间
.setStaleConnectionCheckEnabled(false) //关闭stale连接检查
.setCookieSpec(CookieSpecs.IGNORE_COOKIES)//关闭cookie,启动手动cookie模式
.build();
DefaultConnectingIOReactor ioreactor = null;
try {
ioreactor = new DefaultConnectingIOReactor(ioReactorConfig);
} catch (IOReactorException e) {
throw new RuntimeException("创建DefaultConnectingIOReactor异常", e);
}
PoolingNHttpClientConnectionManager connectionManager = new PoolingNHttpClientConnectionManager(ioreactor);
connectionManager.setMaxTotal(maxRoute);
connectionManager.setDefaultMaxPerRoute(perRoute);
connectionManager.setDefaultConnectionConfig(connectionConfig);
httpAsyncClient = HttpAsyncClients.custom()
.setConnectionManager(connectionManager)
.setConnectionManagerShared(false)
.setDefaultRequestConfig(requestConfig)
.setDefaultIOReactorConfig(ioReactorConfig)
.setDefaultConnectionConfig(connectionConfig)
.build();
httpClientConnectionManager = connectionManager;
httpAsyncClient.start();
}
@Override
public void idleConnectionMonitor(long idleTime, TimeUnit timeUnit) {
httpClientConnectionManager.closeExpiredConnections();
httpClientConnectionManager.closeIdleConnections(idleTime, timeUnit);
}
@Override
public void shutdown() {
try {
httpAsyncClient.close();
} catch (IOException e) {
System.out.println("关闭httpAsyncClient异常:" + e.getMessage());
}
}
}
网友评论