写在开始之前
这篇文章的由来是作者以前在看CocoaAsyncSocket一时兴起写的一个即时通讯小demo的介绍,内容包含心跳检查,粘包断包处理,多用户并发调度,用户间消息传送等。最近由于在搞一个sockes5的项目。重新整理了一下CocoaAsyncSocket方面的东西,觉得这个demo还是很意思,故写出来和大家分享一下。项目中断包处理部分借鉴了涂耀辉的《即时通讯下数据粘包、断包处理实例(基于CocoaAsyncSocket)》一文。感兴趣的同学也可以看看,还是挺简单的。这个项目只作为作者自己学习使用,不做商业用途,有很多不足和不当之处,欢迎大家探讨交流。好吧话不多说,切入正题。
一.关于CocoaAsyncSocket
这一部分是对CocoaAsyncSocket的一些简述和方法的一些介绍,已经了解的同学可以直接跳过此部分。
1.关于CocoaAsyncSocket
CocoaAsyncSocket是谷歌的开发者,基于BSD-Socket写的一个IM框架,它给Mac和iOS提供了易于使用的、强大的异步套接字库,向上封装出简单易用OC接口。省去了我们面向Socket以及数据流Stream等繁琐复杂的编程。
2.结构
CocoaAsyncSocket中主要包含两个类:
(1).GCDAsyncSocket.
用GCD搭建的基于TCP/IP协议的socket网络库
GCDAsyncSocket is a TCP/IP socket networking library built atop Grand Central Dispatch. -- 引自CocoaAsyncSocket.
(2).GCDAsyncUdpSocket.
用GCD搭建的基于UDP/IP协议的socket网络库.
GCDAsyncUdpSocket is a UDP/IP socket networking library built atop Grand Central Dispatch..-- 引自CocoaAsyncSocket.
关于GCDAsyncUdpSocket暂时不表,有机会再讲。以下主要用到GCDAsyncSocket
3.GCDAsyncSocket下的几个主要方法的介绍
(1)主动方法
//连接服务器host:服务器地址,port:端口;
- (BOOL)connectToHost:(NSString*)host onPort:(uint16_t)port error:(NSError **)errPtr
//发送消息 timeout:等待时间设置为-1为一直等待 tag:读取标示;
- (void)writeData:(NSData *)data withTimeout:(NSTimeInterval)timeout tag:(long)tag
//读消息timeout:等待时间设置为-1为一直等待 tag:读取标示。与发送消息的方法对应,要求每发一条消息,就要调用一次读消息,要不然读取不到。GCDAsyncUdpSocket架构要求;
- (void)readDataWithTimeout:(NSTimeInterval)timeout tag:(long)tag;
//监听本地端口。port:要监听的端口。error:返回的错误;
//- (BOOL)acceptOnPort:(uint16_t)port error:(NSError **)errPtr
(2)代理回调方法
//socket成功连接到服务器调用
-(void)socket:(GCDAsyncSocket *)sock didConnectToHost:(NSString *)host port:(uint16_t)port;
//接受到新的socket连接调用
- (void)socket:(GCDAsyncSocket *)sock didAcceptNewSocket:(GCDAsyncSocket *)newSocket;
//读取数据,有数据就会调用
- (void)readDataWithTimeout:(NSTimeInterval)timeout tag:(long)tag;
//直到读到这个长度的数据,才会触发代理
- (void)readDataToLength:(NSUInteger)length withTimeout:(NSTimeInterval)timeout tag:(long)tag;
//直到读到data这个边界,才会触发代理
- (void)readDataToData:(NSData *)data withTimeout:(NSTimeInterval)timeout tag:(long)tag;
//有socket断开连接调用
- (void)socketDidDisconnect:(GCDAsyncSocket *)sock withError:(NSError *)err;
好了。我们要用到的方法大概就是这么几个。方法的解释已经注释标明,下面开始撸代码,看下具体实现。
🏷️看到这可以休息下,好久没打这么多字,有点发昏,接下来就是代码部分的介绍了😊
二.代码介绍
********代码在这里😊
首先说下demo的大体思路:
看过别人写过的一些版本,大多是开两个工程,分别模拟服务器和客户端。个人觉得这样比较麻烦,跑起来还得改ip,开两个设备没太大必要。
我的思路是:做三个单例分别模拟服务端,客户端A,客户端B。然后开三个队列分别处理服务端,客户端A,客户端B的事务。服务器负责接受转发消息,处理用户心跳和进程调度。(🏷️当然可以写一个客户端的公有类,然后实例化更多的客户端,给每个客户端分配队列和clinetID,这都是OK的。我们这里为了简明和方便断点,直接分开写了两个客户端单利的实现,但代码都是一致的。感兴趣的同学可以按这个思路封装一个客户端类创建多个客户端玩玩😊)。
好了言归正传,开始贴代码吧。
1.主体部分
屏幕快照 .png前四个文件为CocoaAsyncSocket的源文件了。还是比较简洁,源码的话,感兴趣的同学也可以看看,后面如果有时间看看能不能做一篇源码分析。ViewController文件作界面管理,对应三个UITextView,分别作ClientA,Sever,ClientB的一些消息展示。Sever,ClientA,ClientB,看名字就知道分别对应服务器,客户端A,客户端B,分别作对应的事务处理。
2.客户端:
.h文件
#import <Foundation/Foundation.h>
typedef void(^clientAMSG)(NSString *msg) ;
@interface ClientA : NSObject
@property(nonatomic,copy)clientAMSG clientAmsg;
+(id)sharClineA;
/*连接服务器**/
-(BOOL)connect;
/*给B发消息**/
-(void)sendMSGToB;
-(void)ClientAGetMSG:(clientAMSG)clientAmsg;
@end
h文件里没什么好说的,大家看注释就好了
.m文件
我先把代码贴出来,贴这里显得有点长,不要被忽悠了,贴在这里也并不是要各位同学在这里看的。代码完全可以先get下来,在xcode里面看,只有200来行,其实结合注释还是比较清晰。
#import "ClientA.h"
#import "GCDAsyncSocket.h"
#define HOST @"127.0.0.1"
#define PORT 8088
static dispatch_queue_t CGD_manager_creation_queue() {
static dispatch_queue_t _CGD_manager_creation_queue;
static dispatch_once_t onceToken;
dispatch_once(&onceToken, ^{
_CGD_manager_creation_queue = dispatch_queue_create("gcd.mine.queue.ClinetAkey", DISPATCH_QUEUE_CONCURRENT);
});
return _CGD_manager_creation_queue;
}
@interface ClientA ()<GCDAsyncSocketDelegate>
{
NSDictionary *currentPacketHead;
}
@property (nonatomic, strong)NSThread *connectThread;
@property (nonatomic,strong)NSTimer * connectTimer;//心跳定时器
@property (nonatomic,strong)GCDAsyncSocket * clinetSocket;//客户端Socket
@property (nonatomic,assign)BOOL isAgain;//控制断线重连
@end
@implementation ClientA
+(id)sharClineA{
static ClientA * clinet;
static dispatch_once_t onceToken;
dispatch_once(&onceToken, ^{
clinet=[[ClientA alloc]init];
});
return clinet;
}
-(void)ClientAGetMSG:(clientAMSG)clientAmsg{
self.clientAmsg=clientAmsg;
}
/*连接服务器**/
-(BOOL)connect{
self.clinetSocket = [[GCDAsyncSocket alloc]initWithDelegate:self delegateQueue:CGD_manager_creation_queue()];
NSError * error;
[self.clinetSocket connectToHost:HOST onPort:PORT error:&error];
if (!error) {
return YES;
}else{
return NO;
}
}
/*给B发消息**/
-(void)sendMSGToB{
NSData *data = [@"Hello" dataUsingEncoding:NSUTF8StringEncoding];
NSData *data1 = [@"I" dataUsingEncoding:NSUTF8StringEncoding];
NSData *data2 = [@"am" dataUsingEncoding:NSUTF8StringEncoding];
NSData *data3 = [@"A," dataUsingEncoding:NSUTF8StringEncoding];
NSData *data4 = [@"nice to meet you!" dataUsingEncoding:NSUTF8StringEncoding];
[self sendData:data :@"txt" toClinet:@"CinentB"];
[self sendData:data1 :@"txt" toClinet:@"CinentB"];
[self sendData:data2 :@"txt" toClinet:@"CinentB"];
[self sendData:data3 :@"txt" toClinet:@"CinentB"];
[self sendData:data4 :@"txt" toClinet:@"CinentB"];
NSString *filePath = [[NSBundle mainBundle]pathForResource:@"7" ofType:@"jpeg"];
NSData *data5 = [NSData dataWithContentsOfFile:filePath];
[self sendData:data5 :@"img" toClinet:@"CinentB"];
}
/*封装报文**/
- (void)sendData:(NSData *)data :(NSString *)type toClinet:(NSString *)target;
{
NSUInteger size = data.length;
NSMutableDictionary *headDic = [NSMutableDictionary dictionary];
[headDic setObject:type forKey:@"type"];
[headDic setObject:@"CinentA" forKey:@"CinentID"];
[headDic setObject:target forKey:@"targetID"];
[headDic setObject:[NSString stringWithFormat:@"%ld",size] forKey:@"size"];
NSString *jsonStr = [self dictionaryToJson:headDic];
NSData *lengthData = [jsonStr dataUsingEncoding:NSUTF8StringEncoding];
NSMutableData *mData = [NSMutableData dataWithData:lengthData];
//分界
[mData appendData:[GCDAsyncSocket CRLFData]];
[mData appendData:data];
//第二个参数,请求超时时间
[self.clinetSocket writeData:mData withTimeout:-1 tag:0];
}
//字典转为Json字符串
- (NSString *)dictionaryToJson:(NSDictionary *)dic
{
NSError *error = nil;
NSData *jsonData = [NSJSONSerialization dataWithJSONObject:dic options:NSJSONWritingPrettyPrinted error:&error];
return [[NSString alloc] initWithData:jsonData encoding:NSUTF8StringEncoding];
}
#pragma mark 加入心跳
- (NSThread*)connectThread{
if (!_connectThread) {
_connectThread = [[NSThread alloc]initWithTarget:self selector:@selector(threadStart) object:nil];
}
return _connectThread;
}
- (void)threadStart{
@autoreleasepool {
[NSTimer scheduledTimerWithTimeInterval:1.0 target:self selector:@selector(heartBeat) userInfo:nil repeats:YES];
[[NSRunLoop currentRunLoop]run];
}
}
#pragma mark 发送心跳包
- (void)heartBeat{
NSData *data = [@"A心跳" dataUsingEncoding:NSUTF8StringEncoding];
[self sendData:data :@"heartA" toClinet:@""];
// [self.clinetSocket writeData:[@"A心跳" dataUsingEncoding:NSUTF8StringEncoding ] withTimeout:-1 tag:0];
}
#pragma mark GCDAsyncSocketDelegate
//读取到数据调用
- (void)socket:(GCDAsyncSocket *)sock didReadData:(NSData *)data withTag:(long)tag{
//先读取到当前数据包头部信息
if (!currentPacketHead) {
currentPacketHead = [NSJSONSerialization
JSONObjectWithData:data
options:NSJSONReadingMutableContainers
error:nil];
if (!currentPacketHead) {
NSLog(@"error:当前数据包的头为空");
//断开这个socket连接或者丢弃这个包的数据进行下一个包的读取
//....
return;
}
NSUInteger packetLength = [currentPacketHead[@"size"] integerValue];
//读到数据包的大小
[sock readDataToLength:packetLength withTimeout:-1 tag:0];
return;
}
//正式的包处理
NSUInteger packetLength = [currentPacketHead[@"size"] integerValue];
//说明数据有问题
if (packetLength <= 0 || data.length != packetLength) {
NSLog(@"error:当前数据包数据大小不正确");
return;
}
NSString *type = currentPacketHead[@"type"];
NSString * sourceClient=currentPacketHead[@"sourceClient"];
if ([type isEqualToString:@"img"]) {
NSLog(@"客户端A成功收到图片--来自于%@",sourceClient);
if (self.clientAmsg) {
self.clientAmsg([NSString stringWithFormat:@"客户端A成功收到图片--来自于%@",sourceClient]);
}
}else{
NSString *msg = [[NSString alloc]initWithData:data encoding:NSUTF8StringEncoding];
NSLog(@"客户端A收到消息:%@--来自于%@",msg,sourceClient);
self.clientAmsg([NSString stringWithFormat:@"客户端A收到消息:%@--来自于%@",msg,sourceClient]);
}
currentPacketHead = nil;
[sock readDataToData:[GCDAsyncSocket CRLFData] withTimeout:-1 tag:0];
}
//连接到服务器调用
- (void)socket:(GCDAsyncSocket *)sock didConnectToHost:(NSString *)host port:(uint16_t)port {
[self heartBeat];
NSLog(@"%@",[NSString stringWithFormat:@"%@:连接成功",self.class]);
if (self.clientAmsg) {
self.clientAmsg([NSString stringWithFormat:@"%@:连接成功",self.class]);
}
[sock readDataToData:[GCDAsyncSocket CRLFData] withTimeout:-1 tag:0];
//开启线程发送心跳
if (!self.isAgain) {
[self.connectThread start];
}
}
//断开连接调用
-(void)socketDidDisconnect:(GCDAsyncSocket *)sock withError:(NSError *)err{
NSLog(@"%@",[NSString stringWithFormat:@"%@:断开连接(Error:%@)",self.class,err]);
if (self.clientAmsg) {
self.clientAmsg([NSString stringWithFormat:@"%@:断开连接(Error:%@)",self.class,err]);
}
if (err) {
//重连
self.isAgain=YES;
[self.clinetSocket connectToHost:HOST onPort:PORT error:nil];
}else{
self.clinetSocket.delegate=nil;
self.clinetSocket=nil;
//断开
}
}
@end
这里肯定要讲一讲,不然肯定有同学要打我了。
讲一讲思路:
单例方法就没必要说了。
1.程序入口为-(BOOL)connect方法,
-(BOOL)connect{
self.clinetSocket = [[GCDAsyncSocket alloc]initWithDelegate:self delegateQueue:CGD_manager_creation_queue()];
NSError * error;
[self.clinetSocket connectToHost:HOST onPort:PORT error:&error];
if (!error) {
return YES;
}else{
return NO;
}
}
创建一个GCDAsyncSocket,指定代理,代理队列指定为我们自己通过CGD_manager_creation_queue()方法创建的队列。
static dispatch_queue_t CGD_manager_creation_queue() {
static dispatch_queue_t _CGD_manager_creation_queue;
static dispatch_once_t onceToken;
dispatch_once(&onceToken, ^{
_CGD_manager_creation_queue = dispatch_queue_create("gcd.mine.queue.ClinetAkey", DISPATCH_QUEUE_CONCURRENT);
});
return _CGD_manager_creation_queue;
}
然后调用 - (BOOL)connectToHost:(NSString*)host onPort:(uint16_t)port error:(NSError **)errPtr方法,指明服务器地址端口连接到服务器。
2.连接到服务器会触发这个代理方法
//连接到服务器调用
- (void)socket:(GCDAsyncSocket *)sock didConnectToHost:(NSString *)host port:(uint16_t)port {
[self heartBeat];
NSLog(@"%@",[NSString stringWithFormat:@"%@:连接成功",self.class]);
self.clientAmsg([NSString stringWithFormat:@"%@:连接成功",self.class]);
[sock readDataToData:[GCDAsyncSocket CRLFData] withTimeout:-1 tag:0];
//开启线程发送心跳
if (!self.isAgain) {
[self.connectThread start];
}
}
在这个方法里加入心跳,连接上第一时间发送一个心跳包,目的是为了更新服务端里的socket的ClientID识别用户这个后面解释。
创建心跳和发送心跳包的方法:
#pragma mark 加入心跳
- (NSThread*)connectThread{
if (!_connectThread) {
_connectThread = [[NSThread alloc]initWithTarget:self selector:@selector(threadStart) object:nil];
}
return _connectThread;
}
- (void)threadStart{
@autoreleasepool {
[NSTimer scheduledTimerWithTimeInterval:1.0 target:self selector:@selector(heartBeat) userInfo:nil repeats:YES];
[[NSRunLoop currentRunLoop]run];
}
}
#pragma mark 发送心跳包
- (void)heartBeat{
NSData *data = [@"A心跳" dataUsingEncoding:NSUTF8StringEncoding];
[self sendData:data :@"heartA" toClinet:@""];
// [self.clinetSocket writeData:[@"A心跳" dataUsingEncoding:NSUTF8StringEncoding ] withTimeout:-1 tag:0];
}
写数据是用
[self.clinetSocket writeData:[@"A心跳" dataUsingEncoding:NSUTF8StringEncoding ] withTimeout:-1 tag:0]方法
这里的-1为等待时间,如果写为-1意思为无限等待,tag指定会话标示。
---注意
(**) [sock readDataToData:[GCDAsyncSocket CRLFData] withTimeout:-1 tag:0] 这一句;
这一句的意思是接收到[GCDAsyncSocket CRLFData] 这个边界,触发代理,至于[GCDAsyncSocket CRLFData]是什么下面会介绍。
上文说过,没发送一次消息,就对应写一个read消息,这样才能触发- (void)socket:(GCDAsyncSocket *)sock didReadData:(NSData *)data withTag:(long)tag方法来接收回调。
3.重点来了:这一块牵扯到数据包的处理。我们看下发送消息和发送心跳包的方法:
/*给B发消息**/
-(void)sendMSGToB{
NSData *data = [@"Hello" dataUsingEncoding:NSUTF8StringEncoding];
NSData *data1 = [@"I" dataUsingEncoding:NSUTF8StringEncoding];
NSData *data2 = [@"am" dataUsingEncoding:NSUTF8StringEncoding];
NSData *data3 = [@"A," dataUsingEncoding:NSUTF8StringEncoding];
NSData *data4 = [@"nice to meet you!" dataUsingEncoding:NSUTF8StringEncoding];
[self sendData:data :@"txt" toClinet:@"CinentB"];
[self sendData:data1 :@"txt" toClinet:@"CinentB"];
[self sendData:data2 :@"txt" toClinet:@"CinentB"];
[self sendData:data3 :@"txt" toClinet:@"CinentB"];
[self sendData:data4 :@"txt" toClinet:@"CinentB"];
NSString *filePath = [[NSBundle mainBundle]pathForResource:@"7" ofType:@"jpeg"];
NSData *data5 = [NSData dataWithContentsOfFile:filePath];
[self sendData:data5 :@"img" toClinet:@"CinentB"];
}
/*封装报文**/
- (void)sendData:(NSData *)data :(NSString *)type toClinet:(NSString *)target;
{
NSUInteger size = data.length;
NSMutableDictionary *headDic = [NSMutableDictionary dictionary];
[headDic setObject:type forKey:@"type"];
[headDic setObject:@"CinentA" forKey:@"CinentID"];
[headDic setObject:target forKey:@"targetID"];
[headDic setObject:[NSString stringWithFormat:@"%ld",size] forKey:@"size"];
NSString *jsonStr = [self dictionaryToJson:headDic];
NSData *lengthData = [jsonStr dataUsingEncoding:NSUTF8StringEncoding];
NSMutableData *mData = [NSMutableData dataWithData:lengthData];
//分界
[mData appendData:[GCDAsyncSocket CRLFData]];
[mData appendData:data];
//第二个参数,请求超时时间
[self.clinetSocket writeData:mData withTimeout:-1 tag:0];
}
//字典转为Json字符串
- (NSString *)dictionaryToJson:(NSDictionary *)dic
{
NSError *error = nil;
NSData *jsonData = [NSJSONSerialization dataWithJSONObject:dic options:NSJSONWritingPrettyPrinted error:&error];
return [[NSString alloc] initWithData:jsonData encoding:NSUTF8StringEncoding];
}
这里就是做封包处理了:
在这里主要是发了几条消息和一个图片给ClientB。
我们定义了一个headDic,这个是我们数据包的头部,里面装了这个数据包的大小和类型信息,自身客户端ID,和目标客户端ID(当然,你可以装更多的其他标识信息。)然后我们把它转成了json,最后转成data。
然后我们把这个head拼在最前面,接着拼了一个:
[GCDAsyncSocket CRLFData]
这个是什么呢?其实它就是一个\r\n。我们用它来做头部的边界。(又或者我们可以规定一个固定的头部长度,来作为边界)。
最后我们把真正的数据包给拼接上。
这一块借鉴了涂耀辉的《即时通讯下数据粘包、断包处理实例(基于CocoaAsyncSocket)》一文。也看过别的几种处理方式,但是感觉这种处理方式比较容易理解也好用一些。
4.最后我们还做了一个客户端的断开重连的例子(只是举个例子,大家还是不要这样干)。
//断开连接调用
-(void)socketDidDisconnect:(GCDAsyncSocket *)sock withError:(NSError *)err{
NSLog(@"%@",[NSString stringWithFormat:@"%@:断开连接(Error:%@)",self.class,err]);
self.clientAmsg([NSString stringWithFormat:@"%@:断开连接(Error:%@)",self.class,err]);
if (err) {
//重连
self.isAgain=YES;
[self.clinetSocket connectToHost:HOST onPort:PORT error:nil];
}else{
self.clinetSocket.delegate=nil;
self.clinetSocket=nil;
//断开
}
}
客户端基本就是这些东西了,至于对应的解包处理我想放到服务端来讲,原理都是一样的。
🏷️休息下,不知道有几个同学能看到这里,如果你看到这里说明你是一个很有耐心的程序员了,不知道各位能看到这里的同学有没有一些混乱,个人语音能力有限,实在抱歉。还是看代码清晰,其实客户端这一部分就做三件事:1.连接服务器,维持心跳。2.封包,通过服务器给指定客户端发送消息。3接收服务器消息。下面我们将开始服务端的部分😊
3.服务端:
老规矩,贴代码
.h
#import <Foundation/Foundation.h>
typedef void(^clientAMSG)(NSString *msg) ;
@interface ClientA : NSObject
@property(nonatomic,copy)clientAMSG clientAmsg;
+(id)sharClineA;
/*连接服务器**/
-(BOOL)connect;
/*给B发消息**/
-(void)sendMSGToB;
-(void)ClientAGetMSG:(clientAMSG)clientAmsg;
@end
.h文件没什么好说的。
.m
代码比较简单,先贴出来,后面做解释,拿到项目的各位可以直接不看这一块在xcode里打开。
#import "Sever.h"
#import "GCDAsyncSocket.h"
static dispatch_queue_t CGD_manager_SEVER_queue() {
static dispatch_queue_t _CGD_manager_SEVER_queue;
static dispatch_once_t onceToken;
dispatch_once(&onceToken, ^{
_CGD_manager_SEVER_queue = dispatch_queue_create("gcd.mine.queue.SeverAkey", DISPATCH_QUEUE_CONCURRENT);
});
return _CGD_manager_SEVER_queue;
}
//储存在本地的客户端类型
@interface Client : NSObject
@property(nonatomic, strong)GCDAsyncSocket *scocket;//客户端scocket
@property(nonatomic, strong)NSDate *timeOfSocket; //更新通讯时间
@property(nonatomic,strong) NSDictionary *currentPacketHead;//客户端报文字典
@property(nonatomic,copy)NSString * clientID;//客户端ID
@end
@implementation Client
@end
@interface Sever () <GCDAsyncSocketDelegate>
@property(nonatomic, strong)GCDAsyncSocket *serve;
@property(nonatomic, strong)NSMutableArray *clientsArray;// 储存客户端
@property(nonatomic, strong)NSThread *checkThread;// 检测心跳
@end
@implementation Sever
+(instancetype)sharSever{
static Sever * sever;
static dispatch_once_t onceToken;
dispatch_once(&onceToken, ^{
sever=[[Sever alloc]init];
});
return sever;
}
-(instancetype)init{
if (self = [super init]) {
self.serve = [[GCDAsyncSocket alloc] initWithDelegate:self delegateQueue:CGD_manager_SEVER_queue()];
self.checkThread = [[NSThread alloc]initWithTarget:self selector:@selector(checkClient) object:nil];
[self.checkThread start];
}
return self;
}
-(NSMutableArray *)clientsArray{
if (!_clientsArray) {
_clientsArray = [NSMutableArray array];
}
return _clientsArray;
}
-(void)SeverGetMSG:(SeverMSG)severAmsg{
self.severAmsg =severAmsg;
}
//监控端口
-(void)openSerVice{
NSError *error;
BOOL sucess = [self.serve acceptOnPort:8088 error:&error];
if (sucess) {
NSLog(@"%@",[NSString stringWithFormat:@"%@---监听端口成功,等待客户端请求连接...",self.class]);
if (self.severAmsg) {
self.severAmsg([NSString stringWithFormat:@"%@---监听端口成功,等待客户端请求连接...",self.class]);
}
}else {
NSLog(@"%@",[NSString stringWithFormat:@"%@---端口开启失败...",self.class]);
if (self.severAmsg) {
self.severAmsg([NSString stringWithFormat:@"%@---端口开启失败...",self.class]);
}
}
}
#pragma mark GCDAsyncSocketDelegate
- (void)socket:(GCDAsyncSocket *)serveSock didAcceptNewSocket:(GCDAsyncSocket *)newSocket{
if (self.severAmsg) {
self.severAmsg([NSString stringWithFormat:@"%@---%@ IP: %@: %zd 客户端请求连接...",self.class,newSocket,newSocket.connectedHost,newSocket.connectedPort]);
}
NSLog(@"%@---%@ IP: %@: %zd 客户端请求连接...",self.class,newSocket,newSocket.connectedHost,newSocket.connectedPort);
// 1.将客户端socket保存起来
Client *client = [[Client alloc]init];
client.scocket = newSocket;
client.timeOfSocket = [NSDate date];
[self.clientsArray addObject:client];
[newSocket readDataToData:[GCDAsyncSocket CRLFData] withTimeout:-1 tag:0];
}
- (void)socket:(GCDAsyncSocket *)sock didReadData:(NSData *)data withTag:(long)tag {
Client * client=[self getClientBysocket:sock];
if (!client) {
[sock readDataToData:[GCDAsyncSocket CRLFData] withTimeout:-1 tag:0];
return;
}
//先读取到当前数据包头部信息
if (!client.currentPacketHead) {
client.currentPacketHead = [NSJSONSerialization
JSONObjectWithData:data
options:NSJSONReadingMutableContainers
error:nil];
if (!client.currentPacketHead) {
NSLog(@"error:当前数据包的头为空");
if (self.severAmsg) {
self.severAmsg(@"error:当前数据包的头为空");
}
//断开这个socket连接或者丢弃这个包的数据进行下一个包的读取
//....
return;
}
NSUInteger packetLength = [client.currentPacketHead[@"size"] integerValue];
//读到数据包的大小
[sock readDataToLength:packetLength withTimeout:-1 tag:0];
return;
}
//正式的包处理
NSUInteger packetLength = [client.currentPacketHead[@"size"] integerValue];
//说明数据有问题
if (packetLength <= 0 || data.length != packetLength) {
NSString *msg = [[NSString alloc]initWithData:data encoding:NSUTF8StringEncoding];
NSLog(@"error:当前数据包数据大小不正确(%@)",msg);
if (self.severAmsg) {
self.severAmsg([NSString stringWithFormat:@"error:当前数据包数据大小不正确(%@)",msg]);
}
return;
}
//分配ID
NSString *clientID=client.currentPacketHead[@"CinentID"];
client.clientID=clientID;
NSString *targetID=client.currentPacketHead[@"targetID"];
NSString *type = client.currentPacketHead[@"type"];
/*
*服务端可以不解析内容,直接转发出去,这里只是想看看打印消息
**/
if ([type isEqualToString:@"img"]) {
NSLog(@"收到图片");
if (self.severAmsg) {
self.severAmsg(@"收到图片");
}
}else{
NSString *msg = [[NSString alloc]initWithData:data encoding:NSUTF8StringEncoding];
if (self.severAmsg) {
self.severAmsg([NSString stringWithFormat:@"收到消息:%@",msg]);
}
NSLog(@"收到消息:%@",msg);
}
for (Client *socket in self.clientsArray) {
//这里找不到目标客户端,可以把数据保存起来,等待目标客户端上线,再转发出去,这里就不做了,感兴趣的同学自己可以试一试
if ([socket.clientID isEqualToString:targetID]) {
[self writeDataWithSocket:socket.scocket data:data type:type sourceClient:clientID];
}
}
client.currentPacketHead = nil;
[sock readDataToData:[GCDAsyncSocket CRLFData] withTimeout:-1 tag:0];
}
-(Client *)getClientBysocket:(GCDAsyncSocket *)sock{
for (Client *socket in self.clientsArray) {
if ([sock isEqual:socket.scocket]) {
///更新最新时间
socket.timeOfSocket = [NSDate date];
return socket;
}
}
return nil;
}
- (void)socketDidDisconnect:(GCDAsyncSocket *)sock withError:(NSError *)err{
if (self.severAmsg) {
self.severAmsg([NSString stringWithFormat:@"%@---有用户下线...",self.class]);
}
NSLog(@"%@",[NSString stringWithFormat:@"%@---有用户下线...",self.class]);
NSMutableArray *arrayNew = [NSMutableArray array];
for (Client *socket in self.clientsArray ) {
if ([socket.scocket isEqual:sock]) {
continue;
}
[arrayNew addObject:socket ];
}
self.clientsArray = arrayNew;
}
-(void)exitWithSocket:(GCDAsyncSocket *)clientSocket{
// [self writeDataWithSocket:clientSocket str:@"成功退出\n"];
// [self.arrayClient removeObject:clientSocket];
//
// NSLog(@"当前在线用户个数:%ld",self.arrayClient.count);
}
- (void)socket:(GCDAsyncSocket *)sock didWriteDataWithTag:(long)tag{
if (self.severAmsg) {
self.severAmsg([NSString stringWithFormat:@"%@---数据发送成功.....",self.class]);
}
NSLog(@"%@",[NSString stringWithFormat:@"%@---数据发送成功.....",self.class]);
}
- (void)writeDataWithSocket:(GCDAsyncSocket*)clientSocket data:(NSData *)data type:(NSString *)type sourceClient:(NSString *)sourceClient {
NSUInteger size = data.length;
NSMutableDictionary *headDic = [NSMutableDictionary dictionary];
[headDic setObject:type forKey:@"type"];
[headDic setObject:sourceClient forKey:@"sourceClient"];
[headDic setObject:[NSString stringWithFormat:@"%ld",size] forKey:@"size"];
NSString *jsonStr = [self dictionaryToJson:headDic];
NSData *lengthData = [jsonStr dataUsingEncoding:NSUTF8StringEncoding];
NSMutableData *mData = [NSMutableData dataWithData:lengthData];
//分界
[mData appendData:[GCDAsyncSocket CRLFData]];
[mData appendData:data];
//第二个参数,请求超时时间
[clientSocket writeData:mData withTimeout:-1 tag:0];
}
//字典转为Json字符串
- (NSString *)dictionaryToJson:(NSDictionary *)dic
{
NSError *error = nil;
NSData *jsonData = [NSJSONSerialization dataWithJSONObject:dic options:NSJSONWritingPrettyPrinted error:&error];
return [[NSString alloc] initWithData:jsonData encoding:NSUTF8StringEncoding];
}
#pragma checkTimeThread
//开启线程 启动runloop 循环检测客户端socket最新time
- (void)checkClient{
@autoreleasepool {
[NSTimer scheduledTimerWithTimeInterval:30 target:self selector:@selector(repeatCheckClinet) userInfo:nil repeats:YES];
[[NSRunLoop currentRunLoop]run];
}
}
//移除 超过心跳的 client
- (void)repeatCheckClinet{
if (self.clientsArray.count == 0) {
return;
}
NSDate *date = [NSDate date];
NSMutableArray *arrayNew = [NSMutableArray array];
for (Client *socket in self.clientsArray ) {
if ([date timeIntervalSinceDate:socket.timeOfSocket]>20||!socket) {
if (socket) {
[socket.scocket disconnect];
}
continue;
}
[arrayNew addObject:socket];
}
self.clientsArray = arrayNew;
}
@end
看过客户端的流程,再来看服务端就简单很多。说下思路:
1.跟客户端一样,先做单例,构造队列初始化服务端Socket,设置代理。
+(instancetype)sharSever{
static Sever * sever;
static dispatch_once_t onceToken;
dispatch_once(&onceToken, ^{
sever=[[Sever alloc]init];
});
return sever;
}
-(instancetype)init{
if (self = [super init]) {
self.serve = [[GCDAsyncSocket alloc] initWithDelegate:self delegateQueue:CGD_manager_SEVER_queue()];
self.checkThread = [[NSThread alloc]initWithTarget:self selector:@selector(checkClient) object:nil];
[self.checkThread start];
}
return self;
}
-(NSMutableArray *)clientsArray{
if (!_clientsArray) {
_clientsArray = [NSMutableArray array];
}
return _clientsArray;
}
2.监听本地端口
//监控端口
-(void)openSerVice{
NSError *error;
BOOL sucess = [self.serve acceptOnPort:8088 error:&error];
if (sucess) {
NSLog(@"%@",[NSString stringWithFormat:@"%@---监听端口成功,等待客户端请求连接...",self.class]);
if (self.severAmsg) {
self.severAmsg([NSString stringWithFormat:@"%@---监听端口成功,等待客户端请求连接...",self.class]);
}
}else {
NSLog(@"%@",[NSString stringWithFormat:@"%@---端口开启失败...",self.class]);
if (self.severAmsg) {
self.severAmsg([NSString stringWithFormat:@"%@---端口开启失败...",self.class]);
}
}
}
3.接收到新的socket连接到本地端口,会触发代理调用
- (void)socket:(GCDAsyncSocket *)serveSock didAcceptNewSocket:(GCDAsyncSocket *)newSocket{
if (self.severAmsg) {
self.severAmsg([NSString stringWithFormat:@"%@---%@ IP: %@: %zd 客户端请求连接...",self.class,newSocket,newSocket.connectedHost,newSocket.connectedPort]);
}
NSLog(@"%@---%@ IP: %@: %zd 客户端请求连接...",self.class,newSocket,newSocket.connectedHost,newSocket.connectedPort);
// 1.将客户端socket保存起来
Client *client = [[Client alloc]init];
client.scocket = newSocket;
client.timeOfSocket = [NSDate date];
[self.clientsArray addObject:client];
[newSocket readDataToData:[GCDAsyncSocket CRLFData] withTimeout:-1 tag:0];
}
这里还是蛮重要的,解释下:在接到新的socket连接后,我们创建一个Client 类型的对象,将当前socket交给这个对象,再将这个Client用一个可变数组self.clientsArray保存起来。(这个数组用来保存所有连接到服务器的socket对应创建的Client对象,后面会利用它来处理心跳,转发,和用户调度)。然后让当socket前读取有[GCDAsyncSocket CRLFData]边界的报文。
可以看下Client对象的声明:
//储存在本地的客户端类型
@interface Client : NSObject
@property(nonatomic, strong)GCDAsyncSocket *scocket;//客户端scocket
@property(nonatomic, strong)NSDate *timeOfSocket; //更新通讯时间
@property(nonatomic,strong) NSDictionary *currentPacketHead;//客户端报文字典
@property(nonatomic,copy)NSString * clientID;//客户端ID
@end
@implementation Client
@end
继承自NSObject类,里面包含4个属性:
scocket属性: 对应每个客户端连接过来的scocket;
timeOfSocket属性: 对应每个客户端最后和服务器交互时间;
currentPacketHead属性: 用来储存用户数据包报头;
clientID属性: 对应每个客户端分配到的ID;
4.接收数据
- (void)socket:(GCDAsyncSocket *)sock didReadData:(NSData *)data withTag:(long)tag {
Client * client=[self getClientBysocket:sock];
if (!client) {
[sock readDataToData:[GCDAsyncSocket CRLFData] withTimeout:-1 tag:0];
return;
}
//先读取到当前数据包头部信息
if (!client.currentPacketHead) {
client.currentPacketHead = [NSJSONSerialization
JSONObjectWithData:data
options:NSJSONReadingMutableContainers
error:nil];
if (!client.currentPacketHead) {
NSLog(@"error:当前数据包的头为空");
if (self.severAmsg) {
self.severAmsg(@"error:当前数据包的头为空");
}
//断开这个socket连接或者丢弃这个包的数据进行下一个包的读取
//....
return;
}
NSUInteger packetLength = [client.currentPacketHead[@"size"] integerValue];
//读到数据包的大小
[sock readDataToLength:packetLength withTimeout:-1 tag:0];
return;
}
//正式的包处理
NSUInteger packetLength = [client.currentPacketHead[@"size"] integerValue];
//说明数据有问题
if (packetLength <= 0 || data.length != packetLength) {
NSString *msg = [[NSString alloc]initWithData:data encoding:NSUTF8StringEncoding];
NSLog(@"error:当前数据包数据大小不正确(%@)",msg);
if (self.severAmsg) {
self.severAmsg([NSString stringWithFormat:@"error:当前数据包数据大小不正确(%@)",msg]);
}
return;
}
//分配ID
NSString *clientID=client.currentPacketHead[@"CinentID"];
client.clientID=clientID;
NSString *targetID=client.currentPacketHead[@"targetID"];
NSString *type = client.currentPacketHead[@"type"];
/*
*服务端可以不解析内容,直接转发出去,这里只是想看看打印消息
**/
if ([type isEqualToString:@"img"]) {
NSLog(@"收到图片");
if (self.severAmsg) {
self.severAmsg(@"收到图片");
}
}else{
NSString *msg = [[NSString alloc]initWithData:data encoding:NSUTF8StringEncoding];
if (self.severAmsg) {
self.severAmsg([NSString stringWithFormat:@"收到消息:%@",msg]);
}
NSLog(@"收到消息:%@",msg);
}
for (Client *socket in self.clientsArray) {
//这里找不到目标客户端,可以把数据保存起来,等待目标客户端上线,再转发出去,这里就不做了,感兴趣的同学自己可以试一试
if ([socket.clientID isEqualToString:targetID]) {
[self writeDataWithSocket:socket.scocket data:data type:type sourceClient:clientID];
}
}
client.currentPacketHead = nil;
[sock readDataToData:[GCDAsyncSocket CRLFData] withTimeout:-1 tag:0];
}
这一块算是服务端核心的部分了,数据包的拆解,分发,用户调度,心跳刷新都在这里处理。
大体思路:监听到有[GCDAsyncSocket CRLFData] 边界的数据包,调用didReadData方法,在这个方法里。先根据当前sock找到self.clientsArray(存储所有client和相关信息)里找到对应的Client,在找的过程中,将找到的对应的Client的timeOfSocket(可以理解为时间戳)刷新。
判断对应Client的currentPacketHead是否为nil。如果为空,将收到的data转化为字典赋值给currentPacketHead。此时currentPacketHead的内容应该为
{
@"size":@"****",//携带内容的大小
@"CinentID":@"****",//源客户端id
@"type":@"****",//携带数据格式
@"targetID":@"****"//目的客户端id
....当然我们还可以封装一些别的信息,我们这里就设计这几个我们需要的
}
然后,通知当前socket来接收size对应长度的数据包。
理想状态下(这里会有并发过程,这里先提一下,后面解释)该sockt会去读到size长度的内容包,检测下内容包的合理性,如果合理。我们就取到正确的内容了,然后,根据根据收到的内容给此Client分配clientID,根据报文目的客户端id,在self.clientsArray中找到对应Client所对应的socket,再将内容封装转发。客户端的解析过程,和这里大体相似,不表。
上面说到用户并发,因为所有的客户端都会同时发送心跳包或用户消息,都会调用didReadData方法,比如说用户A对应的socket读取到报文头部,要去读报文内容的时候,用户B对应的socket也同时调用didReadData方法,那么会照成我们接收到数据处理混乱。所以,我们封装一个Client来对应处理每个客户端的socket事务,通过定位标记,让他们并发工作,各自维持自己处理数据的逻辑,互不干扰。
5.心跳检测
//开启线程 启动runloop 循环检测客户端socket最新time
- (void)checkClient{
@autoreleasepool {
[NSTimer scheduledTimerWithTimeInterval:30 target:self selector:@selector(repeatCheckClinet) userInfo:nil repeats:YES];
[[NSRunLoop currentRunLoop]run];
}
}
//移除 超过心跳的 client
- (void)repeatCheckClinet{
if (self.clientsArray.count == 0) {
return;
}
NSDate *date = [NSDate date];
NSMutableArray *arrayNew = [NSMutableArray array];
for (Client *socket in self.clientsArray ) {
if ([date timeIntervalSinceDate:socket.timeOfSocket]>20||!socket) {
if (socket) {
[socket.scocket disconnect];
}
continue;
}
[arrayNew addObject:socket];
}
self.clientsArray = arrayNew;
}
这一块很简单,做法是,没收到客户端发过来的报文,就更新下,客户端最后交互时间(timeOfSocket),然后,每隔一段时间检测self.clientsArray每个Client对应的timeOfSocket,和目前时间对比,如果超出预先设定的失活时间,就断开此Client对应的scocket,杀死客户端。
写在最后
🏷️到这里,就全部讲完了,文章篇幅比较长,但大多是代码部分,对CocoaAsyncSocket有了解的同学,可以直接看代码,比较简单,可能很多同学看到这种又臭又长的文章,会选择直接略过。嘴拙,总想用更多的文字来解释,还是怕自己表达的不够清晰,水平有限,文章和代码中多有漏洞,欢迎指出,内心忐忑,只愿不要误人子弟就好。同时也希望能抛砖引玉,给有需要的同学一些思路和启发。😊
网友评论