美文网首页
用Python操作nanomsg(二)——PipeLine

用Python操作nanomsg(二)——PipeLine

作者: 钢琴师2005 | 来源:发表于2020-02-07 22:54 被阅读0次

PipeLine基本用法

运行效果

用Python操作nanomsg(一)——准备之后,本文着重介绍nanomsg的PipeLine通信模式。

在PipeLine模式中,Socket Type为NN_PUSH的可以发送,Socket Type为NN_PULL的可以接收:

建立PUSH节点发送数据(Establish PUSH node to send data)

import nnpy
node0 = nnpy.Socket(nnpy.AF_SP, nnpy.PUSH)
node0.bind('tcp://*:4000')
# 发送数据
node0.send(bytes('你好', encoding='utf-8'))

建立PULL节点接收数据(Establish PULL node to receive data)

import nnpy
node1 = nnpy.Socket(nnpy.AF_SP, nnpy.PULL)
node1.connect('tcp://127.0.0.1:4000')
# 接收数据
recv_data = node1.recv()
data = recv_data.decode('utf-8')

注意的是发送和接收的数据都是二进制,发送前和接收后分别要进行编码和解码,这里使用utf-8作为统一格式。
没错就是这么简单,直接开始项目练手。

PipeLine练习项目

单向管道局域网聊天程序(PipeLine LAN-chat program)

这个LAN-chat program是一个命令行程序,通过子命令在PUSH或PULL模式下运行。

编写cli入口

首先明确我们的子命令和其参数,先简单一些,实现发送文字消息功能需要明确的内容为如下三部分:

子命令(sub-commands)

mode sub-command command description
PUSH bind chat.py bind [protocol] [addr] run by server
PULL connect chat.py connect [--keep-alive] [protocol] [addr] run by client

主要由两个子命令bindconnect组成,分别启动push server和pull client两个不同角色。

子命令参数(arguments of sub-commands)

argument default value description
protocol tcp inproc/tcp/udp/ws
addr *:4000 for PUSH and 127.0.01:4000 for PULL
--keep-alive False (未完善)

参数默认值定义在config.py中,默认protocol=tcpaddr=*:4000(PULL模式为127.0.0.1:4000)。

标志(flags)

flag variable name default value send by react by action
FLAG_CLIENT_OFFLINE client-offline-now PUSH PULL PULL节点结束进程
FLAG_SERVER_EXIT server-exit-now PUSH PUSH PUSH节点结束进程

以上flag作用于其被作为消息发送时可触发对应的动作。

关于包含子命令可参考前期文章:给Python脚本带上子命令(sub-commands),基于此可得到cli入口文件如下:

# _*_coding:utf-8 _*_
# @Time    : 2020/2/5 18:41
# @Author  : Shek 
# @FileName: OneWayPipe_CLI.py
# @Software: PyCharm
import argparse
from module.func import *
import config

parser = argparse.ArgumentParser(description=config.PROGRAM_DESCRIPTION)
subparsers = parser.add_subparsers()

# command 'bind'
cmd_bind = subparsers.add_parser('bind', help=config.H_BIND)
cmd_bind.add_argument('protocol', action='store', nargs='?', default=config.PROTOCOL, help=config.H_BIND_PROTOCOL)
cmd_bind.add_argument('addr', action='store', nargs='?', default=config.BIND_ADDR, help=config.H_BIND_ADDR)
cmd_bind.set_defaults(func=sub_cmd_bind)

# command 'connect'
cmd_connect = subparsers.add_parser('connect', help=config.H_CONNECT)
cmd_connect.add_argument('protocol', action='store', nargs='?', default=config.PROTOCOL, help=config.H_CONNECT_PROTOCOL)
cmd_connect.add_argument('addr', action='store', nargs='?', default=config.CONNECT_ADDR, help=config.H_CONNECT_ADDR)
cmd_connect.add_argument('--keep-alive', action='store_true', help=config.H_CONNECT_KEEP_ALIVE)
cmd_connect.set_defaults(func=sub_cmd_connect)

args = parser.parse_args()  # 处理输入的参数
if not hasattr(args, 'func'):
    # 无参数时跳转到-h
    #否则会提示 namespace object has not attribute 'func',故这里用hasattr()判断
    args = parser.parse_args(['-h'])
args.func(args)  # 跳转到对应的函数

sub_cmd_bind:服务端消息处理函数

1.创建日志写入对象,这是我整合的一个自用类,日志可同时写入到本地文件和输出到终端,用来替代printf():

# 1 initialize a logger
log = logger.Logger(config.LOG_NAME_PUSH)

2.创建nnpy.Socket对象,装入nnpy.PUSH到其参数protocol中,绑定本地地址tcp://*:4000:

# 2 create object
push_server = nnpy.Socket(nnpy.AF_SP, nnpy.PUSH)
# 3 establish a sever to push message
log.info('binding to {}://{} ...'.format(arguments.protocol, arguments.addr))
result = push_server.bind('{}://{}'.format(arguments.protocol, arguments.addr))
# bind status
log.info('success') if result else log.info('failed') and exit(0)

3.建立消息发送循环:

# 4 push loop
time.sleep(0.5)
while True:
    content = input('Send>')
    # send message/data/command
    send_result = push_server.send(bytes(content, encoding=config.DATA_ENCODING))
# 5 close server
push_server.close()

同Socket一样,网络中传输过程中统一走的是二进制,故发送和接收需要进行编码和解码:

str转二进制:bin = bytes(content, encoding='utf-8')

二进制转str:content = bin.decode('utf-8')

但是这个循环是有问题的:这个While跳不出来,所以我们需要设置一个标志信息,当发送这个标志信息的时候退出服务端的消息循环,这是前面设置flags的目的。硬件设计中flag通常设置为整数,如0x77,这里方便起见直接使用字符串即可。

4.加入对flag的判断:

content = input('Send({})>'.format(config.COUNT_SEND_SUCCESS))
# process input message/command
if content == config.FLAG_SERVER_EXIT:
    # exit command caught, break loop
    log.info(config.L_SERVER_EXIT)
    break
else:
    # send message/data/command
    ...

5.消息发送次数统计:

# send message/data/command
send_result = push_server.send(bytes(content, encoding=config.DATA_ENCODING))
if send_result:  # success
    config.COUNT_SEND_SUCCESS += 1
else:  # failed (warning: in push / pull mode will never reach here)
    config.COUNT_SEND_FAILED += 1
    log.warning('{}:{}'.format(config.L_SERVER_SEND_FAILED_PREFIX, content))

发送次数统计仅为美化,另外这里发送后成功的返回值是什么,所以这里的发送成功统计暂时没有意义,约等于发送次数统计。

6.完成:


sub_cmd_bind函数完整代码

sub_cmd_connect:编写客户端消息处理函数

1.同样先创建日志写入对象:

# 1 initialize a logger
log = logger.Logger(config.LOG_NAME_PULL)

2.创建nnpy.Socket对象,装入nnpy.PULL到其参数protocol中,连接地址tcp://127.0.0.1:4000(这里keep_alive参数暂时没有利用起来):

# 2 create object
pull_client = nnpy.Socket(nnpy.AF_SP, nnpy.PULL)
# not completed yet
if arguments.keep_alive:
    print(config.I_KEEP_ALIVE_ENABLED)
# 3 connect to a server for receiving message
log.info('connecting to {}://{}'.format(arguments.protocol, arguments.addr))
result = pull_client.connect('{}://{}'.format(arguments.protocol, arguments.addr))
# connect status
log.info(config.I_OP_SUCCESS) if result else log.info(config.I_OP_FAILED) and exit(0)

4.创建消息接受循环,根据服务端的经验,同样加入跳出标志,一个是客户端本机按Ctrl + C时利用触发的KeyboardInterrupt异常跳出循环,另一个是接收服务端发送的FLAG_CLIENT_OFFLINE标志信息自行下线:

# 4 receive in loop
time.sleep(0.5)
while True:
    try:
        recv_data = pull_client.recv()
        if recv_data:
            decoded_data = recv_data.decode(config.DATA_ENCODING)
            # 3 process received data
            if decoded_data == config.FLAG_CLIENT_OFFLINE:
                # receive a go-offline flag from server, break loop
                log.info(config.L_CLIENT_FLAG_OFFLINE_DETECTED)
                break
            # display message push by server
            print('{} {}'.format(current_datetime(), decoded_data))
            # logging to text file
            log.debug(decoded_data)
    except KeyboardInterrupt:
        # ctrl + c detected
        log.info(config.L_CLIENT_CTRL_C)
        break

# 5 close client
pull_client.close()
log.info(config.L_CLIENT_CLOSED)

5.完成:

sub_cmd_connect函数完整代码

测试运行

主要工作已完成,可能有眼细的同学看到上述代码中有很多config.开头的变量,这是为了方便日后维护,故把配置信息单独放在config.py中。


flag都定义在config.py中 运行效果:v0.0.5 测试版本1, 0ac4bf57 on 2020/2/6 at 22:55

总结

上述虽然实现了基本的通信功能,但是很明显PipeLine模式不能满足“局域网聊天”的这个需要,首先它不支持双向通信,也不支持异步收发。在后续的模式测试中我们再对其进行改进。


本系列其他文章:

内容 文章地址 说明
准备 用Python操作nanomsg(一)——准备 2020.2.7更新
PushPub 用Python操作nanomsg(三)——PubSub 2020.2.8更新
Pair 用Python操作nanomsg(四)——Pair 未开始
ReqRep 用Python操作nanomsg(五)——ReqRep 未开始
Survey 用Python操作nanomsg(六)——Survey 未开始
Bus 用Python操作nanomsg(七)——Bus 未开始

相关文章

网友评论

      本文标题:用Python操作nanomsg(二)——PipeLine

      本文链接:https://www.haomeiwen.com/subject/fnfbxhtx.html