前言
-
随着项目的业务越来越重,客户端与服务端对实时性要求较高的数据交互,不能再依赖HTTP轮询或者可能会造成数据丢失的APNS推送。例如,订单状态的改变、新的运营消息、同时存在两类客户端(比如饿了么平台的送餐员与用户)需要进行频繁数据交互。基于上述原因,项目引入socket通信。通道建立好之后,就将之前的轮询机制、依赖APNS推送的部分、还有新需求,移植到sockct通道。
-
关于socket的搭建以及服务端与客户端的协议制定,有时间我单开一篇与大家讨论~~
-
客户端与服务端的通道建立好之后所面临的问题。由于业务比较繁重,通道过来的(socket传递过来的数据,以下简称消息)消息,又有好多类型,每种类型又回包含不同的Value,如何让业务层不同的页面、组件、服务等准确的接收到需要的消息,是本篇的重点。
讨论
- 首先,消息中心如何派发消息队列里的消息到最终的接收者。
-
依赖广播
广播可以让每一个监听广播的接收者都能接收到消息。但是存在一个互斥的问题,就是,如果想要特定的接收者,接收特定的消息,就得增加特定的通知类型,类型越多,维护成本越大,且需要消息服务增加大量的判断。如果不想增加太多的类型,比如关于订单的所有消息,都叫做xxxxxxxOrderNotification,这样一来,类型少了,但是到了业务层,但凡是监听了订单类型消息的接收者,都得加入“大同小异”的判断,也是相当麻烦的操作。 -
KVO
如果使用KVO,这就需要消息中心总是根据业务不同,频繁的变动消息的Model且KVO本身就不是太好用的一种机制。 -
代理
如何使用代理?怎样在delegate回调处减少大量的判断、保持更好的扩展性、业务层代码更简洁?
-
依赖广播
- 如何处理多个接收者对于某条消息的相应优先级。
构建过程
image.png从消息接收者的角度出发考虑问题
比如一个订单服务中页面,根据当前订单ID,只想接收关于当前订单ID的消息,比如消息结构为:
{
"msg_body" : {
"timestamp" : 172831819,
"type" : "order_status_change",
"title" : "",
"sub_title" : "",
"content" : "",
"fmt_date" : "07-22 18:22",
"extra" : {
"order_no": "TAINIUBILE1234"
}
},
"msg_id" : "00544"
}
如何在订单服务中页面,根据order_no订阅该条消息?KVC
首先创建消息Model和消息服务
@interface YGMessage : YGSBaseModel
@property (nonatomic, assign, getter=isProcessed) BOOL processed; //已被处理过
@property (nonatomic, strong) NSDictionary *msg_body;
@property (nonatomic, copy) NSString *msg_id;
@end
@interface YGMessageCenter : NSObject
+ (instancetype)sharedInstance;
@end
关于YGMessage的属性processed的设计,本人的思路是,有可能同一条消息,被多个订阅者所接收,由于消息服务无法判断业务层的逻辑,比如,订单详情页面处理过的订单更新的消息,上层接收者可以不做处理等逻辑,可以通过将消息的processed改为YES,去跳过重复处理,当然可以更好的方式去防止重复处理,
整个的消息体,可以在消息中心解析为一个字典,通过[dict valueForKey:@"msg_body.extra.order_no"]
就可以拿到order_no,而且根据value=TAINIUBILE1234派发到订单服务中页面,由此就可以构建一个消息订阅条件
typedef struct YGMessageCondition {
NSString * _Nullable keyPath; //
_Nullable id value;
} YGMessageCondition;
//提供一个构造函数
static inline YGMessageCondition YGMessageConditionMake(NSString * _Nullable keyPath, _Nullable id value) {
YGMessageCondition condition;
condition.keyPath = keyPath;
condition.value = value;
return condition;
}
消息服务派发消息时通过代理
生成代理协议
@protocol YGMessageCenterDelegate <NSObject>
@optional;
/**
派发消息
@param message 消息
@param intercept 是否阻断消息继续发给其他订阅者
*/
- (void)messageCenterDidReceiveMessage:(YGMessage *)message intercept:(BOOL *)intercept;
@end
每次订阅消息的时候,塞一个Condition给消息服务,YGMessageCenter添加方法:
/**
添加代理
@param delegate 代理
所有接收到的消息都会传给该delegate(如果消息未被栈顶delegate拦截)
*/
- (void)addDelegate:(id<YGMessageCenterDelegate>)delegate;
/**
根据YGMessageCondition添加代理
@param delegate 代理
@param condition keyPath-value
如果condition.keyPath = nil,则所有接收到的消息都会传给该delegate(如果消息未被栈顶delegate拦截)
如果condition.value = nil,则所有接收到的msg.keyPath下有值的消息,都会传给该delegate(如果消息未被栈顶delegate拦截)
*/
- (void)addDelegate:(id<YGMessageCenterDelegate>)delegate condition:(YGMessageCondition)condition;
既然有订阅方法,必然就需要移除订阅的方法
/**
移除代理
@param delegate 代理
*/
- (void)removeDelegate:(id<YGMessageCenterDelegate>)delegate;
/**
根据YGMessageCondition移除代理
@param delegate 代理
@param condition keyPath-value
如果condition.keyPath = nil,则移除该代理,否则,移除指定的keyPath监听
如果condition.value = nil,则移除keyPath的所有监听
*/
- (void)removeDelegate:(id<YGMessageCenterDelegate>)delegate condition:(YGMessageCondition)condition;
再看
addDelegate:
与addDelegate:condition:
方法的内部实现
首先思考一个问题,添加的消息的代理肯定是多个,所以服务内部必然有一个集合去保留这些代理,但是如果直接将代理对象加入到某个集合当中,很容易会因为业务层童鞋的操作不当,造成强引用问题。基于这个原因我们创建一个中间件,来弱引用传入的代理,并保存订阅条件等其他一些操作,此处命名为YGMessageMonitor消息监听者。
YGMessageMonitor该类设计的属性及方法,下面会根据构建过程逐步解释
@interface YGMessageMonitor : NSObject
@property (nonatomic, weak) id<YGMessageCenterDelegate> delegate; //代理
@property (nonatomic, strong) NSMutableDictionary<NSString *, NSMutableArray *> *keyPathValueMaps;
@property (nonatomic, assign) BOOL alwaysRespond; //总是响应所有消息
@property (nonatomic, assign, readonly) BOOL isInvalid; //是否无效,业务类型为0或者代理为空
- (void)addCondition:(YGMessageCondition)condition;
- (void)removeCondition:(YGMessageCondition)condition;
- (BOOL)respondsToMessage:(YGMessage *)obj;
@end
addDelegate:
方法的实现,当某个代理对象,不携带任何condition加入到消息服务中时
- 判空(无须解释)
- 获取弱引用该delegate对象的中间件
YGMessageCenter添加
@property (nonatomic, strong) NSMutableArray *monitorArray;
- (YGMessageMonitor *)monitorForDelegate:(id<YGMessageCenterDelegate>)delegate {
NSMutableArray *invalidArray = [NSMutableArray array];
YGMessageMonitor *monitor = nil;
for (YGMessageMonitor *obj in self.monitorArray) {
if (obj.delegate == delegate) {
monitor = obj;
}
if (obj.delegate == nil) {
[invalidArray addObject:obj];
}
}
[self.monitorArray removeObjectsInArray:invalidArray];
return monitor;
}
- 若获取到的monitor对象为空时,创建新的monitor,并若引用delegate
- 设置该monitor的
alwaysRespond
属性为YES
,该属性的定义为,无条件响应任何消息,增加该属性的目的是减少判断,提高效率
- (void)addDelegate:(id<YGMessageCenterDelegate>)delegate {
if (delegate == nil) { return ; }
//创建一个消息监听者,无条件的delegate,设置其监听者alwaysRespond为yes
YGMessageMonitor *monitor = [self monitorForDelegate:delegate];
if (monitor == nil) {
monitor = [[YGMessageMonitor alloc] init];
monitor.delegate = delegate;
[self.monitorArray addObject:monitor];
}else {
//如果存在监听条件,则移除所有的监听条件,释放内存
[monitor.keyPathValueMaps removeAllObjects];
}
monitor.alwaysRespond = YES;
}
addDelegate:condition:
方法的实现,当某个代理对象携带condition加入到消息服务中时
- 判空(无须解释)
- 判断
condition.keyPath
是否为空,若为空调用addDelegate:
方法 - 获取弱引用该delegate对象的中间件
- 若获取到的monitor对象为空时,创建新的monitor,并若引用delegate
- 判断moitor的alwaysRespond是否为YES,
if (monitor.alwaysRespond) { return ;}
- 若alwaysRespond为NO,则调用
[monitor addCondition:condition];
- (void)addDelegate:(id<YGMessageCenterDelegate>)delegate condition:(YGMessageCondition)condition {
if (delegate == nil) { return ; }
//若condition.keypath为空,则设置该delegate为无条件
if (condition.keyPath.length == 0) {
[self addDelegate:delegate];
return ;
}
YGMessageMonitor *monitor = [self monitorForDelegate:delegate];
if (monitor == nil) {
monitor = [[YGMessageMonitor alloc] init];
monitor.delegate = delegate;
[self.monitorArray addObject:monitor];
}
//若该代理已被设置为无条件监听者,则不再处理监听条件
if (monitor.alwaysRespond) { return ;}
[monitor addCondition:condition];
}
接着看YGMessageMonitor addCondition:
方法如何实现,考虑以下几种情况
- 若传入的condition.value为nil时,也就表示该消息订阅者不必匹配keyPath*下具体的值,只要能匹配到存在值,就接收消息派发
- 若传入的condition.value不为 nil 时,则表示该消息订阅者匹配keyPath下具体的值
- 若同一个消息订阅者,多次传入condition且拥有不同的keyPath,或者相同的keyPath匹配不同的value
由于消息订阅者与YGMessageMonitor是一对一的关系,所以需要给YGMessageMonitor增加属性
@property (nonatomic, strong) NSMutableDictionary<NSString *, NSMutableArray *> *keyPathValueMaps;
在调用addCondition:
方法时首先需要获取到condition.keyPath对应的value数组
- (NSMutableArray *)valueArrayForKeyPath:(NSString *)keyPath {
NSMutableArray *array = self.keyPathValueMaps[keyPath];
if (array == nil) {
array = [NSMutableArray array];
self.keyPathValueMaps[keyPath] = array;
}
return array;
}
其次判断condition.value是否为nil,若为nil则需要有一个特殊的value来标记订阅者接收该keyPath下所有的存在值的消息
定义常量static NSString * const YGAlwaysRespondValue = @"alwaysRespondValue";
,赋值并保存value
- (void)addCondition:(YGMessageCondition)condition {
//查找监听条件keypath下对照的value数组
NSMutableArray *array = [self valueArrayForKeyPath:condition.keyPath];
//若value为空,则监听该keypath下存在值的所有消息
if (condition.value == nil) {
condition.value = YGAlwaysRespondValue;
}
[array addObject:condition.value];
}
另外增加removeCondition:
方法
- (void)removeCondition:(YGMessageCondition)condition {
NSMutableArray *array = self.keyPathValueMaps[condition.keyPath];
if (array == nil) { return ;}
if (condition.value == nil) {
condition.value = YGAlwaysRespondValue;
}
[array removeObject:condition.value];
if (array.count == 0) {
[self.keyPathValueMaps removeObjectForKey:condition.keyPath];
}
}
至此消息订阅已基本实现,下面看消息派发,当接收到socket传递过来的数据时
{
"msg_body" : {
"timestamp" : 172831819,
"type" : "order_status_change",
"title" : "",
"sub_title" : "",
"content" : "",
"fmt_date" : "07-22 18:22",
"extra" : {
"order_no": "TAINIUBILE1234"
}
},
"msg_id" : "00544"
}
首先解析成YGMessageModel,在解析model之前,可以做排重工作,这部分在源码中给出,再次不做讲解,然后获取monitorArray中所有响应该消息的订阅者,调用messageCenterDidReceiveMessage:
进行派发
- 先看如何判断monitor是否响应该消息
给YGMessageMonitor增加方法respondsToMessage:
,该方法只是简单的实现了部分判断条件,可根据不同业务情况做扩展,不多做讲解
- (BOOL)respondsToMessage:(YGMessage *)message {
if (self.alwaysRespond) { return YES; }
NSArray *keyPathArray = self.keyPathValueMaps.allKeys;
for (NSString *keyPath in keyPathArray) {
id value = [message valueForKeyPath:keyPath];
if (value == nil) {
return NO;
}
NSArray *valueArray = self.keyPathValueMaps[keyPath];
for (id conditionValue in valueArray) {
if (conditionValue == YGAlwaysRespondValue) {
return YES;
}
if ([value isKindOfClass:[NSString class]] && [conditionValue isKindOfClass:[NSString class]]) {
if ([value isEqualToString:conditionValue]) {
return YES;
}
}
if ([value isKindOfClass:[NSNumber class]] && [conditionValue isKindOfClass:[NSNumber class]]) {
if ([value isEqualToNumber:conditionValue]) {
return YES;
}
}
}
}
return NO;
}
- 在判断YGMessageMonitor是否响应某条消息的时候,同时判断该监听者是否已经为无效监听着,YGMessageMonitor重写isInvalid的
getter
方法
- (BOOL)isInvalid {
if (self.delegate == nil) {
return YES;
}
if (self.alwaysRespond) {
return NO;
}
return self.keyPathValueMaps.allKeys.count == 0;
}
- 最后派发完消息,移除monitorArray所有无效的monitor
- (void)postMessage:(YGMessage *)message {
for (YGMessageMonitor *monitor in self.monitorArray.reverseObjectEnumerator) {
if (monitor.isInvalid) {
[self.invalidMonitorArray addObject:monitor];
continue ;
}
BOOL responds = [monitor respondsToMessage:message];
if (!responds) { continue ; }
if ([monitor.delegate respondsToSelector:@selector(messageCenterDidReceiveMessage:intercept:)]) {
BOOL intercept = NO;
[monitor.delegate messageCenterDidReceiveMessage:message intercept:&intercept];
if (intercept) { break ; }
}
}
[self.monitorArray removeObjectsInArray:self.invalidMonitorArray];
[self.invalidMonitorArray removeAllObjects];
}
至此,整个的消息订阅与派发功能基本实现,一些完善的逻辑,根据自身业务做不同的处理,在此就不做赘述。若有不明之处,可以评论沟通。不完善之处,还请慷慨指出。
网友评论