
按照发布订阅者模式的思路,自己想了一个用例。类似于微信公众号,用户订阅某个公众号,公众号发布信息后,用户可以实时更新。简单版本如下:
# -*- coding: utf-8 -*-
class _PublisherManager(object):
def __init__(self):
self.publiser_objs_map = {}
self.publiser_to_subscirber_map = {}
def register_publiser(self, publiser_obj):
if publiser_obj in self.publiser_objs_map:
print('you have been registered before')
return
self.publiser_objs_map[publiser_obj] = []
self.publiser_to_subscirber_map[publiser_obj] = []
def publish_msg(self, publiser_obj, msg):
if publiser_obj not in self.publiser_objs_map:
print('please register and become a publiser before publish msg')
return
self.publiser_objs_map[publiser_obj].append(msg)
subscriber_objs = self.publiser_to_subscirber_map[publiser_obj]
self.notify_msg(subscriber_objs, msg)
def subscribe_msg(self, subscriber_obj, publiser_obj):
if publiser_obj not in self.publiser_objs_map:
print('this publisher:%s not exit'% publiser_obj.publiser_name)
return
self.publiser_to_subscirber_map[publiser_obj].append(subscriber_obj)
def notify_msg(self, subscriber_objs, msg):
for subscriber_obj in subscriber_objs:
subscriber_obj.receive_new_msg(msg)
PublisherManager = _PublisherManager()
class PubliserBase(object):
def __init__(self, name):
self.publiser_name = name
def register_publiser(self):
if not self.check_register_condition():
print('sorry, register wrong')
return
PublisherManager.register_publiser(self)
def publish_msg(self, msg):
PublisherManager.publish_msg(self, msg)
def check_register_condition(self):
pass
class PubliserAndroid(PubliserBase):
def __init__(self, name):
super(PubliserAndroid, self).__init__(name)
self.system_version = 0
def set_system_version(self, version_id):
self.system_version = version_id
def check_register_condition(self):
if self.system_version >= 4.0:
return True
else:
print('cur version:%f is too low'%self.system_version)
return False
class PubliserIOS(PubliserBase):
def __init__(self, name):
super(PubliserIOS, self).__init__(name)
self.system_version = 0
def set_system_version(self, version_id):
self.system_version = version_id
def check_register_condition(self):
if self.system_version >= 8.0:
return True
else:
return False
class SubscriberBase(object):
def __init__(self, name):
self.subscriber_name = name
def subscribe_msg(self, publiser):
PublisherManager.subscribe_msg(self, publiser)
def receive_new_msg(self, msg):
pass
class SubscriberAndroid(SubscriberBase):
# 安卓平台注册的用户
def receive_new_msg(self, msg):
print('android platform user:%s reveive a new msg:%s'%(self.subscriber_name, msg))
class SubscriberIOS(SubscriberBase):
# ios平台注册的用户
def receive_new_msg(self, msg):
print('ios platform user:%s reveive a new msg:%s'%(self.subscriber_name, msg))
if __name__ == "__main__":
new_publiser = PubliserAndroid('PythonCommunity')
new_publiser.set_system_version(6.0)
new_publiser.register_publiser()
user1 = SubscriberAndroid('xiaohong')
user2 = SubscriberIOS('xiaoming')
user1.subscribe_msg(new_publiser)
user2.subscribe_msg(new_publiser)
msg = {'tilte':'hello python'}
new_publiser.publish_msg(msg)
- 这里简单的用不同的平台版本号作为是否可以注册
发布者
的条件。
网友评论