美文网首页
打造自己的通信框架五——异步串行无锁化

打造自己的通信框架五——异步串行无锁化

作者: alonwang | 来源:发表于2020-08-26 19:44 被阅读0次

前言

上一节我们已经将消息分发给用户.下面要思考如何去执行这些消息

正文

多线程VS单线程

处理方案需要保证

  • 异步性,不能阻塞消息分发线程, 假如分发线程分发了多条消息,不能因为上一条消息还没处理完就阻塞它.

  • 有序性,消息处理需要保证有序,先到先处理.

  • 尽量少的内存冲突.如果出现内存冲突,就需要加锁保证正确性,必定影响执行效率,开发难度大大增加.

下面我们看下多线程和单线程两种方案的优劣

多线程,NO!

假如我们将同一用户的多条消息放到多个线程处理,可能出现以下问题

  • 大量的内存冲突. 它们可能会同时修改这个玩家的数据,导致内存冲突问题,
  • 消息乱序,由于线程调度不可预测,不同消息的执行复杂性不同,可能出现用户X的发送的消息A比消息B更早收到,但是消息B比消息A先处理的情况.即无法保证有序性

因此多线程方式不可取.

单线程,YES!

对单线程方案,每个用户都需要一个队列去存储它们的消息.由某个特定的线程去消耗这些消息.它能满足处理方案的所有要求:

  • 分发线程只需要将消息放到队列即可,不会阻塞
  • 消息是按收到的顺序依次执行的,不会乱序
  • 只有单个线程访问修改这些数据,很少有内存冲突

单线程方案很好,满足了上面的三个条件,但是还有一个关键点需要确定,某个特定的线程到底是啥?

“某个特定的线程”==线程池+空闲处理

我们不可能给每个用户一个独立的线程,如果在基于Netty的方案中使用这种方式,那是在侮辱Netty.

首先借鉴Netty的思想,我们使用线程池,将用户随机绑定到线程池的某个线程上.

其次,考虑这种系统的特点,一段时间内有一部分用户是不活跃的,他们不会发送或接收任何消息.因此 有消息时,将用户与线程绑定,无消息时解绑

单线程实现

具体的流程如下图

image.png

单线程实现分为三块

  • 添加消息 将消息添加到队列中,如有必要启动绑定线程流程 对应途中的红色部分
  • 绑定线程 将用户和线程绑定起来,并开始处理消息,对应绿色部分
  • 处理消息 依次消耗处理队列中的消息,并在消息处理完时解绑线程,对应黄色部分
  • 解绑线程 将用户和线程解绑. 对应蓝色部分

下面来看核心实现DefaultTaskExecutor,和上图的对应关系已在注释中标明

public class DefaultTaskExecutor<T extends TaskExecutor<?>> implements Runnable, TaskExecutor<T> {
    private static final ExecutorService DEFAULT_EXECUTOR_SERVICE = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2, new CustomizableThreadFactory("MessageTask-Worker"));
    private final ExecutorService executorService;
    private final Queue<Task<T>> tasks = PlatformDependent.newMpscQueue();
    private final AtomicInteger size = new AtomicInteger();
    private volatile Thread current;

    public DefaultTaskExecutor() {
        executorService = DEFAULT_EXECUTOR_SERVICE;
    }
        //添加信息
    @Override
    public void execute(Task<T> task) {
        tasks.add(task);
        int curSize = size.incrementAndGet();
        if (curSize == 1) {
            //绑定线程
            executorService.execute(this);
        }
    }
        //处理信息
    @Override
    public void run() {
        this.current = Thread.currentThread();
        while (true) {
            Task<T> task = tasks.poll();
            if (task == null) {
                break;
            }
            try {
                task.execute((T) this);
            } catch (Exception e) {
                log.error("task execute error", e);
            }
            int curSize = size.decrementAndGet();
            //解绑线程  
            if (curSize <= 0) {
                break;
            }
        }
    }

    public boolean inThread() {
        return Thread.currentThread() == current;
    }
}

后记

本文中使用的方式,和Netty中实现NioEventLoop的实现十分类似,不同点在于,NioEventLoop中的Channel创建后就会绑定到某个线程,不会再变化.而我们这里是会变化的. 如果能理解本文的设计思路,对理解Netty也有很大帮助.

相关文章

  • 打造自己的通信框架五——异步串行无锁化

    前言 上一节我们已经将消息分发给用户.下面要思考如何去执行这些消息 正文 多线程VS单线程 处理方案需要保证 异步...

  • Netty中真的没有使用锁吗?

    Netty号称是一个事件驱动&异步串行无锁化的网络通信框架. 在Netty的官方网站中声称, 它是一个异步的, 事...

  • 2017-12-12

    今天我们黄老师为我们讲解了UART 通用异步串行通信,UART--通用异步串行通信接口的总称,UART允许在串行链...

  • 同步和异步

    同步和异步 串行通信有两种传输方式: 1 同步通信 2 异步通信 异步通信是一种很常用的通信方式。异步通信在发送字...

  • 命题_第十四章

    1.什么是串行异步通信?有哪些作用? 答:在异步串行通信中,数据是一帧一帧传送的,通信采用帧格式,无需同步字符,存...

  • 命题_第十四章_填空题

    PC机的串行通信接口(COM1、COM2)采用异步通信。异步通信的一帧信息包括起始位、数据位、奇偶校验位和 ﹎﹍。...

  • 多线程

    1、同步、异步、串行、并行、全局队列、主队列2、Thread、NSOperation、GCD3、锁

  • 多线程

    GCD NSOperation NSThread 多线程与锁 一、GCD 同步、异步 和 串行、并发 dispat...

  • ES6的学习(下)

    promise:异步操作同步化 同步 -- 串行 简单、方便异步 -- 并行 性能高、体验好promise的是...

  • 9.21

    串口通信: 1. 串行通信 2.全双工、半双工 3.同步与异步 4.通信的速率 结合所讲的内容写串口程序,根据硬件...

网友评论

      本文标题:打造自己的通信框架五——异步串行无锁化

      本文链接:https://www.haomeiwen.com/subject/jlvfsktx.html