背景:公司登录基础服务是C开发,业务是Java开发,需要Java调用C的服务验证登录、获取登录信息
实现方式:
- 交互通过发送Socket通讯,Google Protobuf 协议,发送心跳保持TCP连接
- 自定义ExecutorCompletionService,实现一个包含Map<Integer, BlockingQueue<Future<V>>>
https://github.com/shawntime/shawn-common-utils/tree/master/src/main/java/com/shawntime/common/socket
Google Protobuf 协议定义
- 定义.proto文件:头文件(head_outer_class.proto)、登录(login_logic.proto)、心跳(login_proxy.proto)
1)字段类型:bool,int32,float,double,string
2)支持消息嵌套
3)支持enum
4)索引号要按顺序指定
5)字段前缀:required:必须的;optional:可选的;repeated:可以重复的
package qiqi;
enum ENUM_HEAD_TYEP
{
enum_cs_head_type_cs = 0x1; //0x1表示CSHead
enum_cs_head_type_sc = 0x2; //0x2表示SCHead
enum_cs_head_type_ss = 0x3; //0x3表示SSHead
};
enum ENUM_CS_HEAD_COMMAND
{
enum_cs_head_cmd_medal = 0x1; //勋章系统
enum_cs_head_cmd_login = 0x2; //登录系统
enum_cs_head_cmd_guild = 0x3; //公会推荐
enum_cs_head_cmd_user_info = 0x4; //用户信息
};
enum ENUM_SC_HEAD_COMMAND
{
enum_sc_head_cmd_medal = 0x1; //勋章系统
enum_sc_head_cmd_login = 0x2; //登录系统
enum_sc_head_cmd_lottery = 0x3; //博彩系统
enum_sc_head_cmd_guild=0x4; // 公会推荐系统
};
enum ENUM_SS_HEAD_COMMAND
{
enum_ss_head_cmd_medal = 0x1; //勋章系统
enum_ss_head_cmd_TVwall = 0x2; //电视墙系统
enum_ss_head_cmd_login = 0x3; //登录系统
enum_ss_head_cmd_guild = 0x4; //公会推荐系统
enum_ss_head_cmd_user_info = 0x5; //用户信息
enum_ss_head_cmd_keep_alive = 0x1001; //心跳
};
message Head
{
optional uint32 uint32_head_type = 1; // Head类型, 见ENUM_HEAD_TYEP定义
optional LoginSig msg_login_sig = 2; // 登录态
optional uint32 uint32_paint_flag = 3; // 染色字段
optional CSHead msg_cs_head = 4; // CS协议头部
optional SCHead msg_sc_head = 5;
optional SSHead msg_ss_head = 6;
};
message LoginSig
{
optional uint32 uint32_type = 1; // 登录态type
optional bytes bytes_sig = 2; // key
}
message CSHead //Client request --> Server, Server response --> Client
{
optional uint64 uint64_uid = 1; // openid
optional uint32 uint32_command = 2; // 消息协议族编号,区分业务系统,见定义(ENUM_CS_HEAD_COMMAND)
optional uint32 uint32_seq = 3; // 对单个用户的seq
optional uint32 uint32_version = 4; // 客户端版本号
optional uint32 uint32_retry_times = 5; // 填正确的重试次数
optional uint32 uint32_client_type = 6; // 登录时要带上客户端类型,复用此字段即可
optional uint32 uint32_pubno = 7; // 等felix分配
optional uint32 uint32_localid = 8; // 地区ID
optional uint32 uint32_timezone = 9; // 时区
optional fixed32 uint32_client_ip = 10; // 客户端IP
optional uint32 uint32_client_port = 11; // 客户端Port
optional fixed32 uint32_conn_ip = 12; // ConnIP
optional uint32 uint32_conn_port = 13; // ConnPort
optional fixed32 uint32_interface_ip = 14; // 接口机器IP
optional uint32 uint32_interface_port = 15; // 接口机Port
optional fixed32 uint32_actual_ip = 16; // 客户端真实IP
optional uint32 uint32_flag = 17;
optional fixed32 uint32_timestamp = 18; // 发包的当前时间戳,用于延时时间上报统计,Server需要原封不动带回Conn
optional uint32 uint32_subcmd = 19; // 子命令号
optional uint32 uint32_result = 20; // 回包给客户端的返回值
// 成功:0x0-0x3f(优先用0x0),conn失败:0x40-0x7f,
// 系统失败:0x80-0xbf,业务失败:0xc0-0xff,其他取值未定义
optional uint64 uint64_session_id = 21; // 如果存在,后端Server需要原样返回!!
optional string str_client_ip = 22; //client 公网IP ,由代理填写
}
message SCHead //Push broadcast
{
optional uint64 uint64_from_uid = 1; // 来源用户,
optional uint64 uint64_to_uid = 2; // 目的用户,
optional uint32 uint32_type = 3; // 预留push类型, 0表示点对点push,1表示对uint64_from_uid所在房间广播
optional uint32 uint32_command = 4; // 消息协议族编号,区分业务系统,见定义(ENUM_SC_HEAD_COMMAND)
optional uint32 uint32_timestamp = 5;
optional uint32 uint32_seq = 6;
optional uint32 uint32_source_type = 7; //来源服务器类型(为兼容windows服务器)
optional uint64 uint64_from_roomid = 8; // 来源用户所在房间
optional uint64 uint64_to_roomid = 9; // 目的用户所在房间,
}
message SSHead //Server <--> Server
{
optional uint64 uint64_jobid = 1;
optional uint64 uint64_seq = 2;
optional uint32 uint32_command = 3; // 消息协议族编号,区分业务系统,见定义(ENUM_SS_HEAD_COMMAND)
optional uint32 uint32_server_id = 4; // 标记消息来源
}
package qiqi.login_logic;
enum ENUM_LOGIN_LOGIC_CMD
{
emum_login_logic_tencent_verify = 0x1;
};
message ReqBody{
optional uint32 uint32_cmd = 1; // ENUM_LOGIN_LOGIC_CMD
optional Reqlogic_Tencent_Verify msg_subcmd0x01_req_tencent_verify = 2;
};
message RspBody{
optional uint32 uint32_cmd = 1; // ENUM_LOGIN_LOGIC_CMD
optional int32 int32_result = 2; // 0 success
optional bytes bytes_error = 3;
optional Rsplogic_Tencent_Verify msg_subcmd0x1_rsp_tencent_verify = 4;
};
message Reqlogic_Tencent_Verify{
optional string str_openid = 1;
optional string str_openkey = 2;
optional uint32 uint32_logmode = 3; //登陆模式 0正常登陆 1 KEY 2 续期
optional string str_pf = 4;
optional string str_userip = 5;
optional string bytes_PcID = 6;
optional uint32 uint32_version = 7;
optional uint32 uint32_StartSource = 8;
optional int64 int64_sourcetype = 9;
};
message Rsplogic_Tencent_Verify{
optional string str_openid = 1;
optional bytes bytes_userkey = 2;
optional int64 int64_qiqi_id = 3;
optional string str_username = 4;
optional int32 int32_user_state = 5; // 0:old user , 1:new user
};
package qiqi.login_porxy;
enum ENUM_LOGIN_PROXY_CMD
{
emum_login_proxy_keep_alive = 0x1001;
};
message ReqBody{
optional uint32 uint32_cmd = 1; // ENUM_LOGIN_LOGIC_CMD
optional Req_keep_alive msg_subcmd0x1001_req_keep_alive = 0x1001;
};
message RspBody{
optional Rsp_keep_alive msg_subcmd0x1001_rsp_keep_alive = 0x1001;
};
message Req_keep_alive{
optional uint32 uint32_ts = 1;
}
message Rsp_keep_alive{
optional uint32 uint32_ts = 1;
}
- 用 protoc.exe 生成head_outer_class.proto和login_logic.proto的协议文件
D:\protoc-2.6.1-win32\protoc-2.6.1-win32>protoc.exe ./head_outer_class.proto --java_out=./
D:\protoc-2.6.1-win32\protoc-2.6.1-win32>protoc.exe ./login_logic.proto --java_out=./
D:\protoc-2.6.1-win32\protoc-2.6.1-win32>protoc.exe ./login_proxy.proto --java_out=./
- 实现代码
package com.shawntime.common.socket;
import java.io.OutputStream;
/**
* protobuf请求封装抽象类
* @author admin
*
*/
public abstract class RequestHandle {
private int seqId;
public final void send(OutputStream outputStream) throws Exception {
byte[] data = pack();
outputStream.write(data);
outputStream.flush();
}
/**
* 将输入的数据打包成TCP通讯包,返回值: 打包好的TCP数据
* @throws Exception
*/
private final byte[] pack() throws Exception {
byte[] headBuf = packHead();
byte[] bodyBuf = packBody();
//生成sendBuf
byte[] sendBuf = new byte[headBuf.length + bodyBuf.length + 10];
sendBuf[0] = (byte) '(';
writeInt(sendBuf, headBuf.length, 1);
writeInt(sendBuf, bodyBuf.length, 5);
System.arraycopy(headBuf, 0, sendBuf, 9, headBuf.length);
System.arraycopy(bodyBuf, 0, sendBuf, headBuf.length + 9, bodyBuf.length);
sendBuf[sendBuf.length - 1] = (byte) ')';
return sendBuf;
}
//按小端模式写入int
private final void writeInt(byte[] writeBuffer, int v, int pos) {
writeBuffer[pos] = (byte) (v >>> 24);
writeBuffer[pos + 1] = (byte) (v >>> 16);
writeBuffer[pos + 2] = (byte) (v >>> 8);
writeBuffer[pos + 3] = (byte) (v >>> 0);
}
protected abstract byte[] packBody();
/**
* 请求头内容
* @return
*/
protected abstract byte[] packHead();
public int getSeqId() {
return seqId;
}
public final void setSeqId(int seqId) {
this.seqId = seqId;
}
}
package com.shawntime.common.socket;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import com.shawntime.common.socket.pool.ProtoPack;
/**
* protobuf响应处理封装抽象类
*
* @author admin
*
*/
public abstract class ResponseHandle<T> {
private String ip;
private int port;
private int seqId;
private final byte[] readIn(InputStream inputStream) throws IOException {
int readBytes = 0;
byte[] buffer = new byte[1024];//1024可改成任何需要的值
int len = buffer.length;
while (readBytes < len) {
int read = inputStream.read(buffer, readBytes, len - readBytes);
//判断是不是读到了数据流的末尾 ,防止出现死循环。
if (read == -1 || read < (len - readBytes)) {
readBytes += read;
break;
}
if(read == (len - readBytes)) {
byte[] tmpBuffer = new byte[len * 2];
System.arraycopy(buffer, 0, tmpBuffer, 0, buffer.length);
buffer = tmpBuffer;
len = buffer.length;
}
readBytes += read;
}
byte[] endodedData = new byte[readBytes];
System.arraycopy(buffer, 0, endodedData, 0, readBytes);
return endodedData;
}
public final ProtoPack unpack(InputStream inputStream) throws IOException {
byte[] data = readIn(inputStream);
byte[] cache = new byte[1024 * 16];
int end = 0;
System.arraycopy(data, 0, cache, end, data.length);
end += data.length;
while (end > 0) {
if (end < 9) {
return null;
}
try {
int headLen = readInt(cache, 1);
int bodyLen = readInt(cache, 5);
if (end < 10 + headLen + bodyLen) {
return null;
}
byte[] headBuf = new byte[headLen];
byte[] bodyBuf = new byte[bodyLen];
System.arraycopy(cache, 9, headBuf, 0, headLen);
System.arraycopy(cache, 9 + headLen, bodyBuf, 0, bodyLen);
//数据前移
int frameLen = 10 + headLen + bodyLen;
int newEnd = end - frameLen;
if (newEnd > 0) {
System.arraycopy(cache, frameLen, cache, 0, newEnd);
}
end = newEnd;
seqId = encodeHeaderReqId(headBuf);
if(seqId > 0) {
ProtoPack protoPack = new ProtoPack();
protoPack.setHeader(headBuf);
protoPack.setBody(bodyBuf);
protoPack.setSeqId(seqId);
return protoPack;
}
}
catch (IOException e) {
e.printStackTrace();
return null;
}
}
return null;
}
//按小端模式读取一个int
private final int readInt(byte[] readBuffer, int pos) throws IOException {
if (readBuffer.length < pos + 4) {
throw new EOFException();
}
return (((int) (readBuffer[pos] & 255) << 24) + ((readBuffer[pos + 1] & 255) << 16) + ((readBuffer[pos + 2] & 255) << 8) + ((readBuffer[pos + 3] & 255) << 0));
}
public int encodeHeaderReqId(byte[] headBuf) {
// try {
// HeadOuterClass.Head header = HeadOuterClass.Head.parseFrom(headBuf);
// int seqId = header.getMsgCsHead().getUint32Seq();
// return seqId;
// } catch (InvalidProtocolBufferException e) {
// e.printStackTrace();
// }
//
return -1;
}
public abstract void encode(byte[] headBuf, byte[] bodyBuf);
public abstract T get();
public String getIp() {
return ip;
}
public void setIp(String ip) {
this.ip = ip;
}
public int getPort() {
return port;
}
public void setPort(int port) {
this.port = port;
}
public int getSeqId() {
return seqId;
}
public void setSeqId(int seqId) {
this.seqId = seqId;
}
}
package com.shawntime.common.socket.pool;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.pool2.BasePooledObjectFactory;
import org.apache.commons.pool2.PooledObject;
import org.apache.commons.pool2.impl.DefaultPooledObject;
import org.apache.log4j.Logger;
/**
* Socket连接创建工厂
* @author shma
*
*/
public class SocketConnectionFactory extends BasePooledObjectFactory<Socket> {
private static final Logger logger = Logger.getLogger(SocketConnectionFactory.class);
private List<InetSocketAddress> socketAddress = null;
private final AtomicInteger atomicIntCount;
//自增,遍历标记位
private final AtomicLong atomicLongTail;
public SocketConnectionFactory(String hosts) {
socketAddress = new ArrayList<InetSocketAddress>();
String[] hostsAdd = hosts.split(";");
if(hostsAdd.length > 0) {
for(String tmpHost : hostsAdd) {
String[] dataStrings = tmpHost.split(":");
InetSocketAddress address = new InetSocketAddress(dataStrings[0], Integer.parseInt(dataStrings[1]));
socketAddress.add(address);
}
}
atomicIntCount = new AtomicInteger(0);
atomicLongTail = new AtomicLong(0);
}
private InetSocketAddress getSocketAddress() {
int index = (int) (atomicLongTail.getAndIncrement() % socketAddress.size());
logger.info("创建Socket>>>address:" + socketAddress.get(index).getHostName() + ", counter:" + atomicIntCount.incrementAndGet());
return socketAddress.get(index);
}
@Override
public void destroyObject(PooledObject<Socket> p) throws Exception {
Socket socket = p.getObject();
logger.info("销毁Socket>>>socket:" + socket + ", counter:" + atomicIntCount.decrementAndGet());
if(socket != null) {
socket.close();
}
}
@Override
public boolean validateObject(PooledObject<Socket> p) {
Socket socket = p.getObject();
if(socket != null) {
if(!socket.isConnected()) {
return false;
}
if(socket.isClosed()) {
return false;
}
// LoginProxyRequest request = new LoginProxyRequest();
// LoginProxyResponse response = new LoginProxyResponse();
// SocketSession socketSession = PassportCommunication.getSocketSession();
// Optional<Boolean> optional = socketSession.send(request, response, socket);
boolean state = false;
// if(optional.isPresent()) {
// state = optional.get();
// }
logger.info("验证socket心跳>>>socket:" + socket + ", state:" + state);
return state;
}
return false;
}
@Override
public Socket create() throws Exception {
Socket socket = new Socket();
socket.connect(getSocketAddress(), 30000);
socket.setSoTimeout(10000);
return socket;
}
@Override
public PooledObject<Socket> wrap(Socket obj) {
return new DefaultPooledObject<Socket>(obj);
}
}
package com.shawntime.common.socket.pool;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import com.shawntime.common.socket.RequestHandle;
import com.shawntime.common.socket.ResponseHandle;
import org.apache.commons.pool2.impl.GenericObjectPool;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.apache.log4j.Logger;
import com.google.common.base.Optional;
/**
* 连接会话
* @author shma
*
*/
public class SocketSession {
private final GenericObjectPool<Socket> pool;
private static final Logger logger = Logger.getLogger(SocketSession.class);
private static final ExecutorCompletionService<ProtoPack> COMPLETION_SERVICE;
static {
ExecutorService executorService = Executors.newCachedThreadPool();
COMPLETION_SERVICE = new ExecutorCompletionService<ProtoPack>(executorService);
}
public SocketSession(GenericObjectPoolConfig config, String hosts) {
SocketConnectionFactory factory = new SocketConnectionFactory(hosts);
pool = new GenericObjectPool<Socket>(factory, config);
}
public Socket getConnection() throws Exception {
Socket socket = pool.borrowObject();
return socket;
}
/**
*
* 回收连接socket
*
* @author shma
* @param socket
* @since JDK 1.6
*/
public void releaseConnection(Socket socket){
try {
pool.returnObject(socket);
} catch(Throwable e) {
if(socket != null){
try{
socket.close();
}catch(Exception ex){
e.printStackTrace();
}
}
}
}
public <T> Optional<T> send(final RequestHandle request, final ResponseHandle<T> response) {
//获取socket
Socket socket = null;
try {
socket = getConnection();
return send(request, response, socket);
} catch (Exception e) {
logger.error("Get socket error, msg : " + e.getMessage());
e.printStackTrace();
} finally {
if(socket != null) {
releaseConnection(socket);
}
}
return Optional.fromNullable(null);
}
public <T> Optional<T> send(final RequestHandle request, final ResponseHandle<T> response, Socket socket) {
final InputStream inputStream;
final OutputStream outputStream;
int incrementId = getDid();
try {
request.setSeqId(incrementId);
//增加ip地址 端口信息
response.setIp(socket.getInetAddress().getHostAddress());
response.setPort(socket.getPort());
inputStream = socket.getInputStream();
outputStream = socket.getOutputStream();
COMPLETION_SERVICE.submit(new Callable<ProtoPack>() {
@Override
public ProtoPack call() throws Exception {
request.send(outputStream);
ProtoPack protoPack = response.unpack(inputStream);
System.out.println("callback seqid : " + protoPack.getSeqId());
return protoPack;
}
}, incrementId);
Future<ProtoPack> future = COMPLETION_SERVICE.poll(3, TimeUnit.SECONDS, incrementId);
if(future != null) {
ProtoPack pack = future.get();
if(pack != null) {
response.encode(pack.getHeader(), pack.getBody());
return Optional.fromNullable(response.get());
}
}
} catch(RuntimeException e) {
throw e;
} catch (Exception e) {
logger.error("SocketSession error, msg : " + e.getMessage());
e.printStackTrace();
} finally {
COMPLETION_SERVICE.remove(incrementId);
}
return Optional.fromNullable(null);
}
private final AtomicInteger counter = new AtomicInteger(1);
private final Lock lock = new ReentrantLock();
private int getDid() {
if(counter.get() == Integer.MAX_VALUE) {
lock.lock();
try {
if(counter.get() == Integer.MAX_VALUE) {
counter.set(1);
}
} finally {
lock.unlock();
}
}
int did = counter.getAndIncrement();
return did;
}
}
package com.shawntime.common.socket.pool;
public class ProtoPack {
private int seqId;
private byte[] header;
private byte[] body;
public int getSeqId() {
return seqId;
}
public void setSeqId(int seqId) {
this.seqId = seqId;
}
public byte[] getHeader() {
return header;
}
public void setHeader(byte[] header) {
this.header = header;
}
public byte[] getBody() {
return body;
}
public void setBody(byte[] body) {
this.body = body;
}
}
package com.shawntime.common.socket.pool;
import java.util.concurrent.ExecutionException;
/**
* Exception thrown when attempting to retrieve the result of a task
* that aborted by throwing an exception. This exception can be
* inspected using the {@link #getCause()} method.
*
* @see Future
* @since 1.5
* @author Doug Lea
*/
public class MyExecutionException extends ExecutionException {
}
package com.shawntime.common.socket.pool;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RunnableFuture;
import java.util.concurrent.TimeUnit;
import com.google.common.collect.Maps;
public class ExecutorCompletionService<V> implements CompletionService<V> {
private final Executor executor;
private final AbstractExecutorService aes;
private final Map<Integer, BlockingQueue<Future<V>>> completionQueueMap;
private class QueueingFuture extends FutureTask<V> {
QueueingFuture(RunnableFuture<V> task, int did) {
super(task, null);
this.task = task;
this.did = did;
}
protected void done() {
BlockingQueue<Future<V>> blockingQueue = ExecutorCompletionService.this.completionQueueMap.get(did);
blockingQueue.add(task);
}
private final Future<V> task;
private final int did;
}
private RunnableFuture<V> newTaskFor(Callable<V> task) {
if (aes == null)
return new FutureTask<V>(task);
else
return aes.newTaskFor(task);
}
private RunnableFuture<V> newTaskFor(Runnable task, V result) {
if (aes == null)
return new FutureTask<V>(task, result);
else
return aes.newTaskFor(task, result);
}
/**
* Creates an ExecutorCompletionService using the supplied
* executor for base task execution and a
* {@link LinkedBlockingQueue} as a completion queue.
*
* @param executor the executor to use
* @throws NullPointerException if executor is {@code null}
*/
public ExecutorCompletionService(Executor executor) {
if (executor == null)
throw new NullPointerException();
this.executor = executor;
this.aes = (executor instanceof AbstractExecutorService) ?
(AbstractExecutorService) executor : null;
this.completionQueueMap = Maps.newConcurrentMap();
}
/**
* Creates an ExecutorCompletionService using the supplied
* executor for base task execution and the supplied queue as its
* completion queue.
*
* @param executor the executor to use
* @param completionQueue the queue to use as the completion queue
* normally one dedicated for use by this service. This
* queue is treated as unbounded -- failed attempted
* {@code Queue.add} operations for completed taskes cause
* them not to be retrievable.
* @throws NullPointerException if executor or completionQueue are {@code null}
*/
public ExecutorCompletionService(Executor executor,
BlockingQueue<Future<V>> completionQueue) {
if (executor == null || completionQueue == null)
throw new NullPointerException();
this.executor = executor;
this.aes = (executor instanceof AbstractExecutorService) ?
(AbstractExecutorService) executor : null;
this.completionQueueMap = Maps.newConcurrentMap();
}
public Future<V> submit(Callable<V> task, int did) {
if (task == null) throw new NullPointerException();
RunnableFuture<V> f = newTaskFor(task);
BlockingQueue<Future<V>> blockingQueue = new LinkedBlockingQueue<Future<V>>(1);
completionQueueMap.put(did, blockingQueue);
executor.execute(new QueueingFuture(f, did));
return f;
}
public Future<V> submit(Runnable task, V result, int did) {
if (task == null) throw new NullPointerException();
RunnableFuture<V> f = newTaskFor(task, result);
BlockingQueue<Future<V>> blockingQueue = new LinkedBlockingQueue<Future<V>>(1);
completionQueueMap.put(did, blockingQueue);
executor.execute(new QueueingFuture(f, did));
return f;
}
public Future<V> take(int did) throws InterruptedException {
return completionQueueMap.get(did).take();
}
public Future<V> poll(int did) {
return completionQueueMap.get(did).poll();
}
public Future<V> poll(long timeout, TimeUnit unit, int did)
throws InterruptedException {
return completionQueueMap.get(did).poll(timeout, unit);
}
public void remove(int did) {
completionQueueMap.remove(did);
}
}
package com.shawntime.common.socket.pool;
import java.util.concurrent.Callable;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
/**
* A service that decouples the production of new asynchronous tasks
* from the consumption of the results of completed tasks. Producers
* <tt>submit</tt> tasks for execution. Consumers <tt>take</tt>
* completed tasks and process their results in the order they
* complete. A <tt>CompletionService</tt> can for example be used to
* manage asynchronous IO, in which tasks that perform reads are
* submitted in one part of a program or system, and then acted upon
* in a different part of the program when the reads complete,
* possibly in a different order than they were requested.
*
* <p>Typically, a <tt>CompletionService</tt> relies on a separate
* {@link Executor} to actually execute the tasks, in which case the
* <tt>CompletionService</tt> only manages an internal completion
* queue. The {@link ExecutorCompletionService} class provides an
* implementation of this approach.
*
* <p>Memory consistency effects: Actions in a thread prior to
* submitting a task to a {@code CompletionService}
* <a href="package-summary.html#MemoryVisibility"><i>happen-before</i></a>
* actions taken by that task, which in turn <i>happen-before</i>
* actions following a successful return from the corresponding {@code take()}.
*
*/
public interface CompletionService<V> {
/**
* Submits a value-returning task for execution and returns a Future
* representing the pending results of the task. Upon completion,
* this task may be taken or polled.
*
* @param task the task to submit
* @return a Future representing pending completion of the task
* @throws RejectedExecutionException if the task cannot be
* scheduled for execution
* @throws NullPointerException if the task is null
*/
Future<V> submit(Callable<V> task, int did);
/**
* Submits a Runnable task for execution and returns a Future
* representing that task. Upon completion, this task may be
* taken or polled.
*
* @param task the task to submit
* @param result the result to return upon successful completion
* @return a Future representing pending completion of the task,
* and whose <tt>get()</tt> method will return the given
* result value upon completion
* @throws RejectedExecutionException if the task cannot be
* scheduled for execution
* @throws NullPointerException if the task is null
*/
Future<V> submit(Runnable task, V result, int did);
/**
* Retrieves and removes the Future representing the next
* completed task, waiting if none are yet present.
*
* @return the Future representing the next completed task
* @throws InterruptedException if interrupted while waiting
*/
Future<V> take(int did) throws InterruptedException;
/**
* Retrieves and removes the Future representing the next
* completed task or <tt>null</tt> if none are present.
*
* @return the Future representing the next completed task, or
* <tt>null</tt> if none are present
*/
Future<V> poll(int did);
/**
* Retrieves and removes the Future representing the next
* completed task, waiting if necessary up to the specified wait
* time if none are yet present.
*
* @param timeout how long to wait before giving up, in units of
* <tt>unit</tt>
* @param unit a <tt>TimeUnit</tt> determining how to interpret the
* <tt>timeout</tt> parameter
* @return the Future representing the next completed task or
* <tt>null</tt> if the specified waiting time elapses
* before one is present
* @throws InterruptedException if interrupted while waiting
*/
Future<V> poll(long timeout, TimeUnit unit, int did) throws InterruptedException;
}
package com.shawntime.common.socket.pool;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.RunnableFuture;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
/**
* Provides default implementations of {@link ExecutorService}
* execution methods. This class implements the <tt>submit</tt>,
* <tt>invokeAny</tt> and <tt>invokeAll</tt> methods using a
* {@link RunnableFuture} returned by <tt>newTaskFor</tt>, which defaults
* to the {@link FutureTask} class provided in this package. For example,
* the implementation of <tt>submit(Runnable)</tt> creates an
* associated <tt>RunnableFuture</tt> that is executed and
* returned. Subclasses may override the <tt>newTaskFor</tt> methods
* to return <tt>RunnableFuture</tt> implementations other than
* <tt>FutureTask</tt>.
*
* <p> <b>Extension example</b>. Here is a sketch of a class
* that customizes {@link ThreadPoolExecutor} to use
* a <tt>CustomTask</tt> class instead of the default <tt>FutureTask</tt>:
* <pre> {@code
* public class CustomThreadPoolExecutor extends ThreadPoolExecutor {
*
* static class CustomTask<V> implements RunnableFuture<V> {...}
*
* protected <V> RunnableFuture<V> newTaskFor(Callable<V> c) {
* return new CustomTask<V>(c);
* }
* protected <V> RunnableFuture<V> newTaskFor(Runnable r, V v) {
* return new CustomTask<V>(r, v);
* }
* // ... add constructors, etc.
* }}</pre>
*
* @since 1.5
* @author Doug Lea
*/
public abstract class AbstractExecutorService implements ExecutorService {
/**
* Returns a <tt>RunnableFuture</tt> for the given runnable and default
* value.
*
* @param runnable the runnable task being wrapped
* @param value the default value for the returned future
* @return a <tt>RunnableFuture</tt> which when run will run the
* underlying runnable and which, as a <tt>Future</tt>, will yield
* the given value as its result and provide for cancellation of
* the underlying task.
* @since 1.6
*/
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return new FutureTask<T>(runnable, value);
}
/**
* Returns a <tt>RunnableFuture</tt> for the given callable task.
*
* @param callable the callable task being wrapped
* @return a <tt>RunnableFuture</tt> which when run will call the
* underlying callable and which, as a <tt>Future</tt>, will yield
* the callable's result as its result and provide for
* cancellation of the underlying task.
* @since 1.6
*/
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new FutureTask<T>(callable);
}
/**
* @throws RejectedExecutionException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
/**
* @throws RejectedExecutionException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public <T> Future<T> submit(Runnable task, T result) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task, result);
execute(ftask);
return ftask;
}
/**
* @throws RejectedExecutionException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
/**
* the main mechanics of invokeAny.
*/
private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
boolean timed, long nanos, int did)
throws InterruptedException, ExecutionException, TimeoutException {
if (tasks == null)
throw new NullPointerException();
int ntasks = tasks.size();
if (ntasks == 0)
throw new IllegalArgumentException();
List<Future<T>> futures= new ArrayList<Future<T>>(ntasks);
ExecutorCompletionService<T> ecs =
new ExecutorCompletionService<T>(this);
// For efficiency, especially in executors with limited
// parallelism, check to see if previously submitted tasks are
// done before submitting more of them. This interleaving
// plus the exception mechanics account for messiness of main
// loop.
try {
// Record exceptions so that if we fail to obtain any
// result, we can throw the last exception we got.
ExecutionException ee = null;
long lastTime = timed ? System.nanoTime() : 0;
Iterator<? extends Callable<T>> it = tasks.iterator();
// Start one task for sure; the rest incrementally
futures.add(ecs.submit(it.next(), did));
--ntasks;
int active = 1;
for (;;) {
Future<T> f = ecs.poll(did);
if (f == null) {
if (ntasks > 0) {
--ntasks;
futures.add(ecs.submit(it.next(), did));
++active;
}
else if (active == 0)
break;
else if (timed) {
f = ecs.poll(nanos, TimeUnit.NANOSECONDS, did);
if (f == null)
throw new TimeoutException();
long now = System.nanoTime();
nanos -= now - lastTime;
lastTime = now;
}
else
f = ecs.take(did);
}
if (f != null) {
--active;
try {
return f.get();
} catch (ExecutionException eex) {
ee = eex;
} catch (RuntimeException rex) {
ee = new ExecutionException(rex);
}
}
}
if (ee == null)
ee = new MyExecutionException();
throw ee;
} finally {
for (Future<T> f : futures)
f.cancel(true);
}
}
public <T> T invokeAny(Collection<? extends Callable<T>> tasks, int did)
throws InterruptedException, ExecutionException {
try {
return doInvokeAny(tasks, false, 0, did);
} catch (TimeoutException cannotHappen) {
assert false;
return null;
}
}
public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit, int did)
throws InterruptedException, ExecutionException, TimeoutException {
return doInvokeAny(tasks, true, unit.toNanos(timeout), did);
}
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException {
if (tasks == null)
throw new NullPointerException();
List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
boolean done = false;
try {
for (Callable<T> t : tasks) {
RunnableFuture<T> f = newTaskFor(t);
futures.add(f);
execute(f);
}
for (Future<T> f : futures) {
if (!f.isDone()) {
try {
f.get();
} catch (CancellationException ignore) {
} catch (ExecutionException ignore) {
}
}
}
done = true;
return futures;
} finally {
if (!done)
for (Future<T> f : futures)
f.cancel(true);
}
}
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException {
if (tasks == null || unit == null)
throw new NullPointerException();
long nanos = unit.toNanos(timeout);
List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
boolean done = false;
try {
for (Callable<T> t : tasks)
futures.add(newTaskFor(t));
long lastTime = System.nanoTime();
// Interleave time checks and calls to execute in case
// executor doesn't have any/much parallelism.
Iterator<Future<T>> it = futures.iterator();
while (it.hasNext()) {
execute((Runnable)(it.next()));
long now = System.nanoTime();
nanos -= now - lastTime;
lastTime = now;
if (nanos <= 0)
return futures;
}
for (Future<T> f : futures) {
if (!f.isDone()) {
if (nanos <= 0)
return futures;
try {
f.get(nanos, TimeUnit.NANOSECONDS);
} catch (CancellationException ignore) {
} catch (ExecutionException ignore) {
} catch (TimeoutException toe) {
return futures;
}
long now = System.nanoTime();
nanos -= now - lastTime;
lastTime = now;
}
}
done = true;
return futures;
} finally {
if (!done)
for (Future<T> f : futures)
f.cancel(true);
}
}
}
package com.shawntime.common.socket.pool.communication;
import com.shawntime.common.config.PropertyConfigurer;
import com.shawntime.common.socket.pool.SocketSession;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
/**
* 普通模式
* 2015-10-19 16:46:19
* @author admin
* @version
* @since JDK 1.6
*/
public class PassportCommunication {
private static final SocketSession SOCKET_SESSION;
static {
GenericObjectPoolConfig config = new GenericObjectPoolConfig();
config.setMaxIdle(Integer.parseInt(PropertyConfigurer.getString("passport_maxIdle")));
config.setMaxWaitMillis(Integer.parseInt(PropertyConfigurer.getString("passport_maxWait")));
config.setMinEvictableIdleTimeMillis(Integer.parseInt(PropertyConfigurer.getString("passport_minEvictableIdleTimeMillis")));
config.setMinIdle(Integer.parseInt(PropertyConfigurer.getString("passport_minIdle")));
config.setTestOnBorrow(Boolean.valueOf(PropertyConfigurer.getString("passport_testOnBorrow")));
config.setTestOnCreate(Boolean.valueOf(PropertyConfigurer.getString("passport_testOnCreate")));
config.setTestOnReturn(Boolean.valueOf(PropertyConfigurer.getString("passport_testOnReturn")));
config.setTestWhileIdle(Boolean.valueOf(PropertyConfigurer.getString("passport_testWhileIdle")));
config.setTimeBetweenEvictionRunsMillis(Integer.parseInt(PropertyConfigurer.getString("passport_timeBetweenEvictionRunsMillis")));
config.setMaxTotal(Integer.parseInt(PropertyConfigurer.getString("passport_maxTotal")));
config.setNumTestsPerEvictionRun(Integer.parseInt(PropertyConfigurer.getString("passport_numTestsPerEvictionRun")));
config.setLifo(Boolean.valueOf(PropertyConfigurer.getString("passport_lifo")));
String normalHosts = PropertyConfigurer.getString("passport_server_info");
SOCKET_SESSION = new SocketSession(config, normalHosts);
}
public static SocketSession getSocketSession() {
return SOCKET_SESSION;
}
}
网友评论