chap8 高性能异步编程框架和中间件
-
Netty
-
Netty框架将网络编程逻辑与业务逻辑处理分离开来,其内部会自动处理好网络与异步处理逻辑,让我们专心写自己的业务处理逻辑。同时,Netty的异步非阻塞能力与CompletableFuture结合可以让我们轻松实现网络请求的异步调用
-
Netty之所以能提供高性能网络通信,其中一个原因是它使用Reactor线程模型。在Netty中,每个EventLoopGroup本身都是一个线程池,其中包含了自定义个数的NioEventLoop,每个NioEventLoop是一个线程,并且每个NioEventLoop里面持有自己的NIO Selector选择器。在Netty中,客户端持有一个EventLoopGroup用来处理网络IO操作;在服务器端持有两个EventLoopGroup,其中boss组是专门用来接收客户端发来的TCP链接请求的,worker组是专门用来处理完成三次握手的链接套接字的网络IO请求的
-
在Netty中,NioEventLoop是EventLoop的一个实现,每个NioEventLoop中会管理自己的一个selector选择器和监控选择器就绪事件的线程;每个Channel在整个生命周期中固定关联到某一个NioEventLoop;但是,每个NioEventLoop中可以关联多个Channel
-
ChannelPipeline:Netty中的ChannelPipeline类似于Tomcat容器中的Filter链,属于设计模式中的责任链模式,其中链上的每个节点就是一个ChannelHandler。在Netty中,每个Channel有属于自己的ChannelPipeline,管线中的处理器会对从Channel中读取或者要写入Channel中的数据进行依次处理
-
每个NioEventLoopGroup里面包含了多个Nio EventLoop,每个NioEventLoop中包含了一个NIO Selector、一个队列、一个线程;其中线程用来做轮询注册到Selector上的Channel的读写事件和对投递到队列里面的事件进行处理
-
每个NioEventLoop中会管理好多客户端发来的连接,并通过循环轮询处理每个连接的读写事件
-
Netty之所以说是异步非阻塞网络框架,是因为通过NioSocketChannel的write系列方法向连接里面写入数据时是非阻塞的,是可以马上返回的(即使调用写入的线程是我们的业务线程)
- 如果调用线程是IO线程,则会在IO线程上执行写入
- 如果发现调用线程不是IO线程,则会把写入请求封装为WriteTask并投递到与其对应的NioEventLoop中的队列里面,然后等其对应的NioEventLoop中的线程轮询连接套接字的读写事件时捎带从队列里面取出来并执行
- 每个NioSocketChannel对应的读写事件都是在与其对应的NioEvent Loop管理的单线程内执行的,不存在并发,所以无须加锁处理
-
使用Netty框架进行网络通信时,当我们发起请求后请求会马上返回,而不会阻塞我们的业务调用线程;如果我们想要获取请求的响应结果,也不需要业务调用线程使用阻塞的方式来等待,而是当响应结果出来时使用IO线程异步通知业务,由此可知,在整个请求-响应过程中,业务线程不会由于阻塞等待而不能干其他事情
-
完成TCP三次握手的套接字应该注册到worker线程池中的哪一个NioEventLoop的Selector上
- 关于NioEventLoop的分配,采用轮询取模的方式来进行分配
-
如果NioEventLoop中的线程负责监听注册到Selector上的所有连接的读写事件和处理队列里面的消息,那么会不会导致由于处理队列里面任务耗时太长导致来不及处理连接的读写事件
-
Netty默认是采用时间均分策略来避免某一方处于饥饿状态
-
处理所有注册到当前NioEventLoop的Selector上的所有连接套接字的读写事件
-
统计其耗时,默认情况下ioRatio为50
-
使用相同的时间来运行队列里面的任务,也就是处理套接字读写事件与运行队列里面任务是使用时间片轮转方式轮询执行
-
landon TODO runAllTasks中如何保证运行队列任务的时间和io一样?任务执行的时间不固定啊
// 从队列里拿出一个任务 Runnable task = pollTask(); ... // 设定deadline final long deadline = timeoutNanos > 0 ? ScheduledFutureTask.nanoTime() + timeoutNanos : 0 ... // 无限循环 for (;;) { // 执行任务,任务计数 safeExecute(task); runTasks ++; // landon-这里代码是核心,注释写的也比较清楚,当任务计数到了64个的时候才执行一次检测,如果执行时间超时,则直接break,所以这里如果有某个任务执行时间特别长,则是很有可能超过deadline的 // Check timeout every 64 tasks because nanoTime() is relatively expensive. // XXX: Hard-coded value - will make it configurable if it is really a problem. if ((runTasks & 0x3F) == 0) { lastExecutionTime = ScheduledFutureTask.nanoTime(); if (lastExecutionTime >= deadline) { break; } } // landon-这里就是如果没有到64个任务,队列就没有,直接break.所以从代码上看,如果有很多队列的任务都很耗时,但是又没有超过64个,则肯定会导致指定执行队列的时间过长的,但是从设计上看,应该任务都会很快执行的 task = pollTask(); if (task == null) { lastExecutionTime = ScheduledFutureTask.nanoTime(); break; } }
-
-
多个套接字注册到同一个NioEventLoop的Selector上,使用单线程轮询处理每个套接字上的事件,如果某一个套接字网络请求比较频繁,轮询线程是不是会一直处理该套接字的请求,而使其他套接字请求得不到及时处理
- 默认情况下maxMessagePerRead为16,所以对应NioEventLoop管理的每个NioSocketChannel中的数据,在一次事件循环内最多连续读取16次数据,并不会一直读取,这就有效避免了其他NioSocketChannel的请求事件得不到及时处理的情况
-
基于Netty与CompletableFuture实现RPC异步调用
-
thenCombine
-
两个CompletionStage是并行执行的,它们之间并没有先后依赖顺序,
other
并不会等待先前的CompletableFuture
执行完毕后再执行 -
其实从功能上来讲,它们的功能更类似
thenAcceptBoth
,只不过thenAcceptBoth
是纯消费,它的函数参数没有返回值,而thenCombine
的函数参数fn
有返回值 -
landon,所以合并之后的任务执行默认线程应该是不确定的
-
-
@Sharable注解是让服务端所有接收的链接对应的channel复用同一个NettyServerHandler实例,这里可以使用@Sharable方式是因为NettyServer Handler内的处理是无状态的,不会存在线程安全问题
-
rpcSyncCall
- 创建一个CompletableFuture future
- 创建一个消息,有消息id,异步发送
- 保存future上下文,<id,future>
- future.get同步等待结果
- 在handler收到消息的时候,从上下文拿到future,调用future.complete
-
rpcAsyncCall
- 则不是调用future.get等待结果,而是直接返回
- 然后future.whenComplete指定回调
-
可以把异步调用改造为Reactive编程风格,只需要把返回的CompletableFuture转换为Flowable即可
-
使用defer,当订阅的时候才执行rpc操作
-
future.whenComplete时,发射结果,这里使用了ReplayProcessor创建含有一个元素的流
landon-这里用到了Processor
-
发起rpc调用后马上返回了一个Flowable流对象,但这时真正的rpc调用还没有发出去,等代码3订阅了流对象时才真正发起rpc调用
-
由于CompletableFuture是可以设置回调函数的,所以把其转换为Reactive风格编程很容易
-
-
-
-
Dubbo
-
概述
- Provider为服务提供者集群,服务提供者负责暴露提供的服务,并将服务注册到服务注册中心
- Consumer为服务消费者集群,服务消费者通过RPC远程调用服务提供者提供的服务
- Registry负责服务注册与发现
- Monitor为统计服务的调用次数和调用时间的监控中心
- 服务提供者在启动时会将自己提供的服务注册到服务注册中心
- 服务消费者在启动时会去服务注册中心订阅自己所需服务的地址列表,由服务注册中心向它异步返回其所需服务接口的提供者的地址列表,再由服务消费者根据路由规则和设置的负载均衡算法选择一个服务提供者IP进行调用
- 监控平台主要用来统计服务的调用次数和调用耗时,服务消费者和提供者,在内存中累计调用次数和调用耗时,并定时每分钟发送一次统计数据到监控中心,监控中心则使用数据绘制图表来显示。监控平台不是分布式系统必须有的,但是这些数据有助于系统运维和调优。服务提供者和消费者可以直接配置监控平台的地址,也可以通过服务注册中心来获取
-
dubbo的异步调用
- Dubbo框架中的异步调用是发生在服务消费端的,异步调用实现基于NIO的非阻塞能力实现并行调用,服务消费端不需要启动多线程即可完成并行调用多个远程服务,相比多线程其开销较小
- 当服务消费端发起RPC调用时使用的是用户线程(步骤1),请求会被转换为IO线程(步骤2),具体向远程服务提供方发起远程调用
- 步骤2的IO线程使用NIO发起远程调用,用户线程通过步骤3创建了一个Future对象,然后通过步骤4将其设置到RpcContext中
- 然后用户线程则可以在某个时间从RpcContext中获取设置的Future对象(步骤5),并且通过步骤6设置回调函数,这样用户线程就返回了
- 当服务提供方返回结果(步骤7)后,调用方线程模型中的线程池线程会把结果通过步骤8写入Future,然后就会回调注册的回调函数
- 调用线程异步调用发起后会马上返回一个Future,并在Future上设置一个回调函数,然后调用线程就可以忙自己的事情去了,不需要同步等待服务提供方返回结果。当服务提供方返回结果时,调用方的IO线程会把响应结果传递给Dubbo框架内部线程池中的线程,后者则会回调注册的回调函数,由此可见,在整个过程中,发起异步调用的用户线程是不会被阻塞的
- 首先考虑在一个线程(记为线程A)中通过RPC请求获取服务B和服务C的数据,然后基于两者的结果做一些事情。在同步RPC调用情况下,线程A在调用服务B后需要等待服务B返回结果,才可以对服务C发起调用,等服务C返回结果后才可以结合服务B和服务C的结果做一件事
- 线程A同步获取服务B的结果后,再同步调用服务C获取结果,可见在同步调用的情况下,线程A必须按顺序对多个服务请求进行调用,因而调用线程必须等待,这显然会浪费资源。在Dubbo中,使用异步调用可以避免这个问题。两次异步远程过程调用,并行的
-
dubbo的异步执行
-
Dubbo框架的异步执行是发生在服务提供端的,在Provider端非异步执行时,其对调用方发来的请求的处理是在Dubbo内部线程模型的线程池中的线程来执行的,在Dubbo中服务提供方提供的所有服务接口都使用这一个线程池来执行,所以当一个服务执行比较耗时时,可能会占用线程池中的很多线程,这可能就会影响到其他服务的处理
-
Provider端异步执行则将服务的处理逻辑从Dubbo内部线程池切换到业务自定义线程,避免Dubbo线程池中线程被过度占用,有助于避免不同服务间的互相影响
-
Provider端异步执行对节省资源和提升RPC响应性能是没有效果的,这是因为如果服务处理比较耗时,虽然不是使用Dubbo框架的内部线程,但还是需要业务自己的线程来处理,另外副作用还有会新增一次线程上下文切换(从Dubbo内部线程池线程切换到业务线程)
-
Landon:这里指provider的线程模型,即从io线程到dubbo的内部线程再到业务线程
和游戏服务器模型基本一致
注意:这个切换到业务线程执行对节省资源和提升RPC响应性能是没有效果的,而客户端异步调用这边是有作用的,因为不会阻塞调用线程
-
Dubbo中提供了两种异步处理方法
- 使用AsyncContext实现异步执行
- 用RpcContext.startAsync()开启服务异步执行,然后返回一个asyncContext
- 把服务处理任务提交到业务线程池后sayHello方法就直接返回了null
- 同时也释放了Dubbo内部线程池中的线程
- 具体业务处理逻辑则在自定义业务线程池内执行,任务内首先执行代码2.2切换任务的上下文,这是因为RpcContext.getContext()是ThreadLocal变量,不能跨线程,这里切换上下文就是为了把保存的上下文内容设置到当前线程内
- 最后把任务执行结果写入异步上下文
- 基于CompletableFuture签名的接口实现异步执行
- 基于定义CompletableFuture签名的接口实现异步执行需要接口方法返回值为CompletableFuture
- 方法内部使用CompletableFuture.supplyAsync让本来应由Dubbo内部线程池中线程处理的服务,转为由业务自定义线程池中线程来处理,所以Dubbo内部线程池线程会得到及时释放
- 调用sayHello方法的线程是Dubbo线程模型线程池中的线程,而业务在bizThreadpool中的线程处理,所以代码2.1保存了RpcContext对象(ThreadLocal变量),以便在业务处理线程中使用
- 使用AsyncContext实现异步执行
-
-
Dubbo demo
-
mac安装zookeeper
% brew info zookeeper zookeeper: stable 3.5.7 (bottled), HEAD Centralized server for distributed coordination of services https://zookeeper.apache.org/ Not installed From: https://github.com/Homebrew/homebrew-core/blob/master/Formula/zookeeper.rb ==> Dependencies Build: ant ✘, autoconf ✘, automake ✘, libtool ✘, pkg-config ✘ ==> Options --HEAD Install HEAD version ==> Caveats To have launchd start zookeeper now and restart at login: brew services start zookeeper Or, if you don't want/need a background service you can just run: zkServer start // 注: 2020.03.26 看zk官网zk的版本是3.6.0 % brew install zookeeper /usr/local/Cellar/zookeeper/3.5.7 // 配置文件目录 /usr/local/etc/zookeeper // 启动zk % zkServer start /usr/bin/java ZooKeeper JMX enabled by default Using config: /usr/local/etc/zookeeper/zoo.cfg Starting zookeeper ... STARTED clientPort=2181 % zkServer status /usr/bin/java ZooKeeper JMX enabled by default Using config: /usr/local/etc/zookeeper/zoo.cfg Client port found: 2181. Client address: localhost. Error contacting service. It is probably not running. // provider启动连zk报错,Exception in thread "main" java.lang.IllegalStateException: zookeeper not connected // 查看日志 /usr/local/etc/zookeeper 有log4j.properties log4j.appender.zklog.File = /usr/local/var/log/zookeeper/zookeeper.log 2020-03-26 14:21:11 NIOServerCnxnFactory [ERROR] Thread Thread[main,5,main] died java.lang.NoSuchMethodError: java.nio.ByteBuffer.clear()Ljava/nio/ByteBuffer; at org.apache.jute.BinaryOutputArchive.stringToByteBuffer(BinaryOutputArchive.java:77) 2020-03-26 14:17:27 NIOServerCnxnFactory [ERROR] Thread Thread[NIOWorkerThread-1,5,main] died java.lang.NoSuchMethodError: java.nio.ByteBuffer.flip()Ljava/nio/ByteBuffer; % zkCli /usr/bin/java Connecting to localhost:2181 Welcome to ZooKeeper! % brew upgrade zookeeper // 临时解决办法 1. 手动去下载zookeeper 3.5.7 2. 拷贝zookeeper-3.5.7.jar和zookeeper-jute-3.5.7.jar到/usr/local/Cellar/zookeeper/3.5.7/libexec 3. 删除原有zookeeper-3.5.6-SNAPSHOT.jar - 猜测是是通过brew下载的zookeeper-3.5.6-SNAPSHOT.jar // 成功 % zkServer status /usr/bin/java ZooKeeper JMX enabled by default Using config: /usr/local/etc/zookeeper/zoo.cfg Client port found: 2181. Client address: localhost. Mode: standalone // 顺便装上maven % brew info maven maven: stable 3.6.3 Java-based project management https://maven.apache.org/ Conflicts with: mvnvm (because also installs a 'mvn' executable) Not installed From: https://github.com/Homebrew/homebrew-core/blob/master/Formula/maven.rb ==> Dependencies Required: openjdk ✘ % brew install maven // 配合文件 /usr/local/Cellar/maven/3.6.3_1/libexec/conf
-
provider application两种方式
- startWithBootstrap和startWithExport,推荐前者
-
consumer application两种方式
- runWithBootstrap和runWithRefer,推荐前者
- ReferenceConfig对象内部封装了所有通讯细节,对象较重,请缓存复用
- ReferenceConfigCache.getCache().get(reference)
- 传统的dubbo服务面向接口编程,如果需要调用其他服务则需要引入该服务对应的接口
- 比如跨语言接口,所以dubbo支持api泛化调用,即invoke传入接口名字,method和parameter等即可
- ReferenceConfig<GenericService>
-
异步调用
-
最简单的就是服务接口返回CompletableFuture,接口实现则是使用CompletableFuture.supplyAsync,传入自定义线程池
- 第二种接口实现是使用Serverlet 3.0的异步接口
AsyncContext
- 注意:Dubbo提供了一个类似Serverlet 3.0的异步接口
AsyncContext
,在没有CompletableFuture签名接口的情况下,也可以实现Provider端的异步执行 - 这个是指没有CompletableFuture签名接口
- 注意:Dubbo提供了一个类似Serverlet 3.0的异步接口
- 这种是服务端异步执行。Provider端异步执行将阻塞的业务从Dubbo内部线程池切换到业务自定义线程,避免Dubbo线程池的过度占用,有助于避免不同服务间的互相影响。异步执行无益于节省资源或提升RPC响应性能,因为如果业务执行需要阻塞,则始终还是要有线程来负责执行。
- 第二种接口实现是使用Serverlet 3.0的异步接口
-
对于consumer
- 接口直接返回结果,实现也直接返回
- 但是consumer这边,reference.setAsync(true)
- 这样consumer这边调用接口就直接返回了
- 然后通过RpcContext.getContext().getCompletableFuture()获得future并设置complete回调
- 另外一种是不设置reference.setAsync(true)
- 而是直接context.asyncCall,传入一个callable执行同步方法,并返回CompletableFuture
- 这两种相当于接口的实现是同步的,但是在调用端执行了异步,调用即返回然后回调
-
总结
-
consumer可以异步执行,此时接口可以是同步的,可以直接返回,也可以用CompletableFuture.completedFuture包装
-
provider可以异步执行,接口必须要是CompletableFuture
-
两者解决的问题不同。如果provider端的服务比较耗时,建议切到自定义业务线程池。而consumer端则通常都是异步的,不影响调用端
- 即两端都可以异步,无论接口定义是什么样子的
-
如果你只有这样的同步服务定义,而又不喜欢RpcContext的异步使用方式。那还有一种方式,就是利用Java 8提供的default接口实现,重载一个带有带有CompletableFuture签名的方法
-
这个指的consumer端
-
https://github.com/apache/dubbo-async-processor#compiler-hacker-processer
-
在测试过程中无法找到AsyncSignal这个类,参数用了另外一个字段,测试倒是通过。所以猜测AsyncSignal只是示例,相当于一个标识符,‘specially designed to distinguish async method’
public interface GreetingsService { String sayHi(String name); // AsyncSignal is totally optional, you can use any parameter type as long as java allows your to do that. default CompletableFuture<String> sayHi(String name, AsyncSignal signal) { return CompletableFuture.completedFuture(sayHi(name)); } }
-
另外一种实现是Compiler hacker processer,第一种是AsyncSignal
-
就是新写一个方法,重命名,如sayHiAsync,返回CompletableFuture
public interface GreetingsService { String sayHi(String name); // Any name is ok default CompletableFuture<String> sayHiAsync(String name) { return CompletableFuture.completedFuture(sayHi(name)); } }
-
缺点是之前的方法就不再起作用
-
The essential part is to overwrite a new async method. Another way would be to generate a new method with a different name, for example,
sayHiAsync
, then we can get rid ofAsyncSignal
. But there's an obvious flaw of this approach, that is, all method level configurators and routers defined tosayHi
will not take effect anymore.
-
-
-
-
-
Disruptor
-
Disruptor是一个高性能的线程间消息传递库
-
要理解Disruptor是什么,最好的方法是将它与目前你已经很好地理解且与之非常相似的东西进行比较,例如与Java的BlockingQueue进行对比。与队列一样,Disruptor的目的也是在同一进程内的线程之间传递数据(例如消息或事件)
- Disruptor中的同一个消息会向所有消费者发送,即多播能力(Multicast Event)
- 为事件预先分配内存(Event Preallocation),避免运行时因频繁地进行垃圾回收与内存分配而增加开销
- 可选择无锁(Optionally Lock-free),使用两阶段协议,让多个线程可同时修改不同元素
- 缓存行填充,避免伪共享(prevent false sharing)
-
Disruptor使用Sequence作为识别特定组件所在位置的方法。每个消费者(EventProcessor)都像Disruptor本身一样维护一个Sequence
-
Sequencer是Disruptor的真正核心。该接口的2个实现(单生产者和多生产者)实现了所有并发算法,用于在生产者和消费者之间快速、正确地传递数据
-
Wait Strategy:等待策略,确定消费者如何等待生产者将事件放入Disruptor
-
每个消费者持有自己的当前消费序号,由于是环形buffer,因而生产者写入事件时要看序号最小的消费者序号,以避免覆盖还没有被消费的事件
-
Disruptor具有多播能力(Multicast),这是Java中队列和Disruptor之间最大的行为差异。当有多个消费者在同一个Disruptor上监听事件时,所有事件都会发布给所有消费者,而Java队列中的每个事件只会发送给某一个消费者。Disruptor的行为旨在用于需要对同一数据进行独立的多个并行操作的情况
-
Disruptor的目标之一是在低延迟环境中使用。在低延迟系统中,必须减少或移除运行时内存分配;在基于Java的系统中,目的是减少由于垃圾收集导致的系统停顿。为了支持这一点,用户可以预先为Disruptor中的事件分配其所需的存储空间(也就是声明Ring Buffer的大小)。在构造Ring Buffer期间,EventFactory由用户提供,并将在Disruptor的Ring Buffer中每个事件元素创建时被调用。将新数据发布到Disruptor时,API将允许用户获取构造的对象,以便调用方法或更新该存储对象上的字段,Disruptor保证这些操作只要正确实现就是并发安全的
-
低延迟期望推动的另一个关键实现细节是使用无锁算法来实现Disruptor,所有内存可见性和正确性保证都是使用内存屏障(体现为volatile关键字)或CAS操作实现的。在Disruptor的实现中只有一种情况需要实际锁定—当使用BlockingWaitStrategy策略时,这仅仅是为了使用条件变量,以便在等待新事件到达时parked消费线程。许多低延迟系统将使用忙等待(busy-wait)来避免使用条件可能引起的抖动,但是大量在系统繁忙等待的操作可能导致性能显著下降,尤其是在CPU资源严重受限的情况下
-
在JDK的BlockingQueue中添加或取出元素时是需要加独占锁的,通过锁来保证多线程对底层共享的数据结构进行并发读写的线程安全性,使用锁会导致同时只有一个线程可以向队列添加或删除元素。Disruptor则使用两阶段协议,让多个线程可同时修改不同元素,需要注意,消费元素时只能读取到已经提交的元素。在Disruptor中某个线程要访问Ring Buffer中某个序列号下对应的元素时,要先通过CAS操作获取对应元素的所有权(第一阶段),然后通过序列号获取对应的元素对象并对其中的属性进行修改,最后再发布元素(第二阶段),只有发布后的元素才可以被消费者读取。当多个线程写入元素时,它们都会先执行CAS操作,获取到Ring buffer中的某一个元素的所有权,然后可以并发对自己的元素进行修改。注意,只有序列号小的元素发布后,后面的元素才可以发布。可知相比使用锁,使用CAS大大减少了开销,提高了并发度
-
其实在单生产者的情况下Disruptor甚至可以省去CAS的开销,因为在这种情况下,只有一个线程来请求修改Ring Buffer中的数据,生产者的序列号使用普通的long型变量即可。在创建Disruptor时是可以指定是单生产者还是多生产者的,如果你的业务就是单生产者模型,那么创建Disruptor时指定生产者模式为ProducerType.SINGLE效果会更好
-
Disruptor中的Ring Buffer底层是一个地址连续的数组,数组内相邻的元素很容易会被放入同一个Cache行里,从而导致伪共享的出现。Disruptor则通过缓存行填充,让数组中的每个元素独占一个缓存行从而解决了伪共享问题的出现。另外为了避免Ring Buffer中序列号(定位元素的游标)与其他元素共享缓存行,对其也进行了缓存行填充,以提高访问序列号时缓存的命中率
-
关键实现原理
landon
- 目前看线程,每1个consumer handler都是固定线程的,依次排队去消费产生的事件。
- 有多个消费者,每个消费者有自己的sequence。因为是环形buffer,当缓冲期满了,但还有没有消费的元素,那么此时生产者只能等。其实这个和阻塞队列满时是一样的。那么什么时候等?只要判断当前很多消费者的最小sequence,这里指在环里的位置,那么生产的当前在环里的位置一一定小于这个位置。这个通常指生产者速度较快,过了一圈后,消费者还没有消费完之前的元素
- 测试是buffer为4,依次放0,1,2,10,11,12
- 0号元素的消费者handler耗时,当生产11(index 0)时,则阻塞等待0号消耗完毕
- 关于ringbuffer
- 我们实现的ring buffer和大家常用的队列之间的区别是,我们不删除buffer中的数据,也就是说这些数据一直存放在buffer中,直到新的数据覆盖他们
- 为什么要和ArrayBlockingQueue对比呢?这是因为两个底层的数据结构类似,都是通过一个环形数组实现
-
https://colobu.com/2014/12/22/why-is-disruptor-faster-than-ArrayBlockingQueue/
- ArrayBlockingQueue通过ReentrantLock以及它的两个condition来控制并发
- 压入元素时:如果数组已满,则等待notFull,如果消费者取出了元素,则会调用
notFull.signal();
: - 这时put方法会被唤醒
- 取出元素时:如果数组为空,则调用
notEmpty.await();
等待, enqueue会调用notEmpty.signal();
唤醒它: - 这种wait-notify(signal)也就是教科书上标准的处理队列的方式
- 压入元素时:如果数组已满,则等待notFull,如果消费者取出了元素,则会调用
- RingBuffer使用了padding方式来提供CPU cache的命中率
- 如果producer生产的快,追上消费者的时候
可以通过gatingSequences让生产者等待消费者消费。
这个时候是通过LockSupport.parkNanos(1L);
不停的循环,直到有消费者消费掉一个或者多个事件 - 如果消费者消费的快,追上生产者的时候
这个时候由于消费者将自己最后能处理的sequence写回到光标后sequence.set(availableSequence);
, 如果生产者还没有写入一个事件, 那么它就会调用waitStrategy.waitFor
等待。 如果生产者publish一个事件,它会更改光标的值:cursor.set(sequence);
,然后通知等待的消费者继续处理waitStrategy.signalAllWhenBlocking();
- 在使用BlockingWaitStrategy情况下,其实这和ArrayBlockingQueue类似,因为ArrayBlockingQueue也是通过Lock的方式等待。 性能测试结果显示Disruptor在这种策略下性能比ArrayBlockingQueue要略好一点,但是达不到10倍的显著提升,大概两倍左右。 这大概就是生产者使用不断的LockSupport.parkNanos方式带来的提升吧
- 但是如果换为YieldingWaitStrategy, CPU使用率差别不大,但是却带来了10倍的性能提升。 这是因为消费者不需sleep, 通过spin-yield方式降低延迟率,提高了吞吐率
- 多生产者时在请求下一个sequence时有竞争的情况,所以通过
cursor.compareAndSet(current, next)
的spin来实现,直到成功的设置next才返回
- ArrayBlockingQueue通过ReentrantLock以及它的两个condition来控制并发
- 伪共享
- 数据X、Y、Z被加载到同一Cache Line中,线程A在Core1修改X,线程B在Core2上修改Y。根据MESI大法,假设是Core1是第一个发起操作的CPU核,Core1上的L1 Cache Line由S(共享)状态变成M(修改,脏数据)状态,然后告知其他的CPU核,图例则是Core2,引用同一地址的Cache Line已经无效了;当Core2发起写操作时,首先导致Core1将X写回主存,Cache Line状态由M变为I(无效),而后才是Core2从主存重新读取该地址内容,Cache Line状态由I变成E(独占),最后进行修改Y操作, Cache Line从E变成M。可见多个线程操作在同一Cache Line上的不同数据,相互竞争同一Cache Line,导致线程彼此牵制影响,变成了串行程序,降低了并发性。此时我们则需要将共享在多线程间的数据进行隔离,使他们不在同一个Cache Line上,从而提升多线程的性能
-
-
akka
-
Akka是一个工具包,用于在JVM上构建高并发、分布式、弹性、基于消息驱动的应用程序
- Akka基于Actor模型和Streams,让我们可以构建可伸缩的,并且可以高效使用服务器资源,使用多个服务器进行扩展的系统
- 在单台计算机上可以处理高达每秒5000万条消息。内存占用少;每GB堆可以创建约250万个actor(参与者)
-
锁
- 使用锁会严重影响并发度,使用锁在现在CPU架构中是一个比较昂贵的操作,因为当线程获取锁失败后会把线程从用户态切换到内核态把线程挂起,稍后唤醒后又需要从内核态切换到用户态继续运行
- 获取锁失败的调用线程会被阻塞挂起,因此它不能做任何有意义的事情。即使在桌面应用程序中这也是不可取的,我们想要的是即使后台有一个比较耗时的工作在运行,也要保证系统对用户的一部分请求有响应。在后端应用中,阻塞是完全浪费资源的。另外可能有人认为,虽然当前线程阻塞了,但是我们可以通过启动新线程来弥补这一点,需要注意,线程也是一种昂贵的资源,操作系统对线程个数是有限制的
- 锁的存在带来了新的威胁,即死锁问题
- 如果不使用足够多的锁,则不能保证多线程下对象中数据不受到破坏
- 如果在对象中每个数据访问时都加了锁,则会导致系统性能下降,并且很容易导致死锁
- 锁只能在单JVM内(本地锁)很好地工作。当涉及跨多台机协调时,只能使用分布式锁。但是分布式锁的效率比本地锁低几个数量级,这是因为分布式锁协议需要跨多台机在网络上进行多次往返通信,其造成的最大影响就是延迟
-
共享内存
- 在现在计算机硬件架构中,计算机系统中为了解决主内存与CPU运行速度的差距,在CPU与主内存之间添加了一级或多级高速缓冲存储器(Cache),每个Cache由多个Cache行组成,这些Cache一般是集成到CPU内部的,所以也叫CPU Cache。当我们写入变量时,实际是写入当前CPU的Cache中,而不是直接写入主内存中,并且当前CPU核对自己Cache写入的变量对其他CPU核是不可见的,这就是Java内存模型中共享变量的内存不可见问题
-
堆栈
- 主调用线程需要在异步任务执行完毕或者出异常时被通知,但是没有调用堆栈可以传递异常。异步任务执行失败的通知只能通过辅助方式完成,比如Future方式,将错误码写到主调用线程所在的地方。如果没有此通知,则主调用线程将永远不会收到失败通知,并且任务将丢失
- 当真的发生错误时,这种情况会变得更糟,当异步工作线程遇到错误时会导致最终陷入无法恢复的境地。异步线程当前正在执行的实际任务并没有存放起来。实际上,由于到达顶部的异常使所有调用栈退出,任务状态已经完全丢失了
- 为了在当前系统上实现任何有意义的并发性和提高性能,线程必须以高效的方式在彼此之间委派任务,而不会阻塞。使用这种类型的任务委派并发(甚至在网络/分布式计算中更是如此),基于调用堆栈的错误处理会导致崩溃。因此需要引入新的显式错误信令机制,让失败成为域模型的一部分
- 具有工作委派的并发系统需要处理服务故障,并需要具有从故障中恢复的原则与方法。此类服务的客户端需要注意,任务/消息可能会在重新启动期间丢失。即使没有发生损失,由于先前排队的任务(较长的队列)或者垃圾回收导致的延迟等,将会导致响应可能会被任意延迟。面对这些情况,并发系统应以超时的形式处理响应截止日期
-
Actor模型解决了传统编程模型的问题
- 在Actor模型中每个Actor都有自己的地址,Actor之间通过地址相互通过消息通信。Actor的目的是处理消息,这些消息是从其他Actor发送给当前Actor的。连接发送方和接收方Actor的是Actor的邮箱
- Akka中对失败的处理使用了“让它崩溃”的理念,这部分关键代码被监控者监控着(每个Actor实际就是一个监控者),监控者的唯一职责是知道失败后该干什么
- 另外Actor模型并不在意接收消息的是当前JVM内的Actor还是远端机器上的Actor,这允许我们基于许多计算机上构建系统
- 使用消息传递避免锁和阻塞
- Actor模型中组件之间的相互通信不再使用方法调用,而是通过发消息的方式进行通信,使用发消息的方式,不会导致发消息的调用线程的执行权转移到消息接收者。每个Actor可以连续发消息,由于是异步的,不会被阻塞。因此在同等时间内其可以完成更多工作
- 对于对象,当调用其方法返回时,它会释放调用其线程的控制权;Actor的行为与对象类似,当接收者Actor接收到消息后,会对消息进行反应,并在处理完消息后返回,所以Actor的执行符合我们认知中的执行逻辑
- 传递消息和调用方法之间的重要区别是,消息没有返回值。通过发送消息,Actor会将工作委托给另一个Actor。正如我们在调用堆栈误解中看到的那样,如果期望返回值,则发送方Actor调用线程将需要阻塞或调用线程会执行其他Actor的工作。相反,接收方会在回复消息中传递结果
- Actor对消息做出反应,就像对象对在其上的调用方法一样。区别在于,接收消息的Actor是独立于消息发送方Actor执行的,是一次接一个地响应传入的消息,而不是多个线程并发执行,因此不会破坏Actor内部状态和不变量。当每个Actor都按顺序处理发送给它的消息时,不同的Actor会并发工作,因此Actor系统可以同时处理硬件支持的尽可能多的消息
- 由于每个Actor同时最多只能处理一条消息,因而可以保持Actor的不变性,而无须使用锁等进行同步
- 当Actor收到消息时,会发生以下情况
- Actor将消息添加到队列的末尾
- 如果Actor没有被安排执行,则将其标记为准备执行
- Actor系统框架内的调度程序将接收该Actor并开始执行它
- Actor从队列的前面选择消息
- Actor修改内部状态,将消息发送给其他Actor
- Actor处于无调度、空闲状态
- 为了实现上述行为,Actor需要具有下面特性
- 一个邮箱(用于存放发送者发来的消息)
- 行为(Actor的状态、内部变量等)
- 消息(代表信号的数据片段,类似于方法调用及其参数)
- 执行环境(一种使具有消息的Actor响应并调用其消息处理代码的机制)
- 地址(每个Actor有自己的地址)
- 其中,Actor的行为描述了其如何响应消息(例如发送更多消息和/或更改状态)。执行环境则编排了一个线程池,以透明地驱动所有这些动作
- 通过将执行与信号分离(方法调用方式会转换任务的执行权,消息传递则不会)来保留封装性
- 不需要锁。只能通过消息修改Actor的内部状态,而消息是顺序处理的,以试图消除保持不变性时的竞争问题
- 任何地方都没有使用锁,发送者也不会被阻塞。可以在十几个线程上有效地调度数百万个Actor,从而充分发挥现代CPU的潜力
- Actor的状态是本地的而不是共享的,更改和数据通过消息进行传递,这与现代系统中内存的实际工作方式相对应
- 使用Actor优雅地处理错误
- 当目标Actor上运行被代理的任务发生错误时,比如任务内参数校验错误或者执行抛出了NPE异常等。在这种情况下,目标Actor封装的服务是完整的,只是任务执行本身发生了错误。目标Actor应该向消息发送方回复一条消息,提示错误情况
- 当服务本身遇到内部错误时,Akka强制将所有Actor组织成树状层次结构,即创建另一个Actor的Actor成为该新Actor的父节点。这与操作系统将进程组织到树结构中的方式非常相似。就像进程一样,当一个Actor失败时,它的父Actor会收到通知,并且可以对失败做出反应。同样,如果父Actor停止了,则其所有子Actor也将递归停止。这项服务被称为监督(supervisor),它是Akka的核心
-
https://developer.lightbend.com/guides/akka-quickstart-java/
-
demo
-
remote示例要引入akka-remote_2.13
-
https://doc.akka.io/docs/akka/current/remoting.html
- Classic Remoting (Deprecated)
- Artery Remoting instead
-
注意事项
-
默认的配置文件
- 这是以前的remote方式,而新版默认是用的Artery,默认端口是25520
- 即使按照下面的配置文件指定端口也不会,启动的时候看日志会看到是artery方式
- 需要关闭Artery:remote.artery.enabled = false
- 当关闭此artery后,启动会报错'Classic remoting is enabled but Netty is not on the classpath'
- 需要:Classic remoting depends on Netty. This needs to be explicitly added as a dependency so that users not using classic remoting do not have to have Netty on the classpath:
akka { actor { // 远端的actor provider = "akka.remote.RemoteActorRefProvider" } remote { // tcp传输 enabled-transports = ["akka.remote.netty.tcp"] // ip和监听端口 netty.tcp { hostname = "127.0.0.1" port = 2552 } } }
-
-
按照新的配置方式
- artery方式,则启动的actor顺利按照指定的2552端口监听
akka { actor { // 远端的actor provider = "akka.remote.RemoteActorRefProvider" allow-java-serialization = on } remote { artery { transport = tcp canonical.hostname = "127.0.0.1" canonical.port = 2552 } } }
-
另外远程要传递消息,做序列化,所以要打开allow-java-serialization,否则序列化报错
-
另外ActorSelection指定远程的path的actor名字是远程通过actorOf创建的actor名字
-
-
Akka中每个Actor都有自己的地址,可以是本地的,也可以是远程的,对于远程的Actor,只需要将其地址配置好,就可以像本地Actor一样使用了
-
-
rocketmq
-
RocketMQ主要由4部分组成,分别为NameServer集群、Broker集群、Producer集群和Consumer集群。每部分都可以进行水平扩展,而不会出现单点问题
- NameServer集群:名称服务集群,提供轻量级的服务发现与路由服务,每个名称服务器记录了全部Broker的路由信息,并且提供相应的读写服务,支持快速存储扩展
- Broker集群:Broker集群,Broker通过提供轻量级的主题和队列机制来维护消息存储。它支持推和拉两种模型,包含容错机制(2个副本或3个副本),并提供了强大的平滑峰值,提供积累数以亿计的消息并保证其在原始时间顺序的被消费能力。此外,Broker也提供灾难恢复、丰富的度量统计和警报机制
- Producer集群:生产者集群,提供分布式部署,分布式的生产者发送消息到Broker集群,具体选择哪一个Broker机器是通过一定的负载均衡策略来决定的,发送消息中支持故障快速恢复,并且具有较低的延迟
- Consumer集群:消费者集群,消费者在推和拉模型中支持分布式部署。它还支持集群消费和消息广播。它提供实时消息订阅机制,可以满足大多数消费者的需求
-
Broker在启动时会去连接NameServer,然后将topic信息注册到NameServer, NameServer维护了所有topic的信息和对应的Broker路由信息。Broker与NameServer之间是有心跳检查的,NameServer发现Broker挂了后,会从注册信息里面删除该Broker,这类似Zookeeper实现的服务注册;Producer则需要配置NameServer的地址,然后定时从NameServer获取对应topic的路由信息(这个topic的消息应该路由到那个Broker)。同时Producer与NameServer、Proudcer与Broker有心跳检查;同样,Consumer需要配置NameServer的地址,然后定时从NameServer获取对应topic的路由信息(应该从那个Broker的消息队列获取消息)。同时Consumer与NameServer、Consumer与Broker有心跳检查。
-
demo
-
rocketmq-all-4.7.0 编译
% pwd /Users/landon30/2020/demo/rocketmq-logging 4.7.0 % mvn -Prelease-all -DskipTests clean install -U // 编译报错 [ERROR] Failed to execute goal org.apache.maven.plugins:maven-compiler-plugin:3.5.1:compile (default-compile) on project rocketmq-logging: Compilation failure: Compilation failure: [ERROR] 不再支持源选项 6。请使用 7 或更高版本。 [ERROR] 不再支持目标选项 6。请使用 7 或更高版本。 修改 <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> 同理其他模块 % pwd /Users/landon30/2020/demo/rocketmq-rocketmq-all-4.7.0/distribution/target/rocketmq-4.7.0/rocketmq-4.7.0 // 需要设置JAVA_HOME环境变量 ERROR: Please set the JAVA_HOME variable in your environment, We need java(x64)! !! /Library/Java/JavaVirtualMachines/jdk1.8.0_241.jdk/Contents/Home vim ~/.bash_profile,编辑环境变量 // 启动nameserver % nohup sh bin/mqnamesrv & [1] 17557 appending output to nohup.out % tail -f ~/logs/rocketmqlogs/namesrv.log 2020-04-09 16:28:21 INFO main - tls.client.keyPassword = null 2020-04-09 16:28:21 INFO main - tls.client.certPath = null 2020-04-09 16:28:21 INFO main - tls.client.authServer = false 2020-04-09 16:28:21 INFO main - tls.client.trustCertPath = null 2020-04-09 16:28:21 INFO main - Using OpenSSL provider 2020-04-09 16:28:21 INFO main - SSLContext created for server 2020-04-09 16:28:22 INFO main - Try to start service thread:FileWatchService started:false lastThread:null 2020-04-09 16:28:22 INFO NettyEventExecutor - NettyEventExecutor service started 2020-04-09 16:28:22 INFO FileWatchService - FileWatchService service started 2020-04-09 16:28:22 INFO main - The Name Server boot success. serializeType=JSON 17589 NamesrvStartup // 启动broker 异常 // 为什么总报 java.lang.NoSuchMethodError // 应该是编译问题 比如编译是用jdk9+编译的,但是却运行在小于9的环境中 // 所以应该是netty编译环境的问题 > nohup sh bin/mqbroker -n localhost:9876 & 2020-04-09 18:02:42 WARN brokerOutApi_thread_1 - registerBroker Exception, localhost:9876 org.apache.rocketmq.remoting.exception.RemotingSendRequestException: send request to <ocalhost/127.0.0.1:9876> failed at org.apache.rocketmq.remoting.netty.NettyRemotingAbstract.invokeSyncImpl(NettyRemotingAbstract.java:440) ~[rocketmq-remoting-4.7.0.jar:4.7.0] at org.apache.rocketmq.remoting.netty.NettyRemotingClient.invokeSync(NettyRemotingClient.java:373) ~[rocketmq-remoting-4.7.0.jar:4.7.0] at org.apache.rocketmq.broker.out.BrokerOuterAPI.registerBroker(BrokerOuterAPI.java:194) ~[rocketmq-broker-4.7.0.jar:4.7.0] at org.apache.rocketmq.broker.out.BrokerOuterAPI.access$000(BrokerOuterAPI.java:61) ~[rocketmq-broker-4.7.0.jar:4.7.0] at org.apache.rocketmq.broker.out.BrokerOuterAPI$1.run(BrokerOuterAPI.java:150) ~[rocketmq-broker-4.7.0.jar:4.7.0] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [na:1.8.0_241] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [na:1.8.0_241] at java.lang.Thread.run(Thread.java:748) [na:1.8.0_241] Caused by: io.netty.handler.codec.EncoderException: java.lang.NoSuchMethodError: java.nio.ByteBuffer.flip()Ljava/nio/ByteBuffer; at io.netty.handler.codec.MessageToByteEncoder.write(MessageToByteEncoder.java:125) ~[netty-all-4.0.42.Final.jar:4.0.42.Final] https://github.com/eclipse/jetty.project/issues/3244 参考 2020-04-09 18:16:06 INFO main - The broker[landon30deMacBook-Pro.local, 10.2.182.240:10911] boot success. serializeType=JSON and name server is localhost:9876 // 参考 https://github.com/netty/netty/issues?q=java.lang.NoSuchMethodError // 临时解决方案是将pom.xml中netty的版本从4.0.42切换到了4.0.43,解决
-
https://rocketmq.apache.org/docs/quick-start/
> unzip rocketmq-all-4.7.0-source-release.zip > cd rocketmq-all-4.7.0/ > mvn -Prelease-all -DskipTests clean install -U > cd distribution/target/rocketmq-4.7.0/rocketmq-4.7.0 Start Name Server > nohup sh bin/mqnamesrv & > tail -f ~/logs/rocketmqlogs/namesrv.log The Name Server boot success... Start Broker > nohup sh bin/mqbroker -n localhost:9876 & > tail -f ~/logs/rocketmqlogs/broker.log The broker[%s, 172.30.30.233:10911] boot success...
-
-
consumer
- 同一个消费集群的每台机器中的实例名称要一样。然后设置了NameServer的地址为127.0.0.1:9876
- 从第一个消息的偏移量开始消费,指定订阅主题和主题下的tag
- 设置回调的消息处理
- 启动消费实例,连接NameServer获取Broker的地址,并与Broker进行连接
-
producer sync
- 同一个生产者集群实例中的实例名称要一致。然后设置了NameServer的地址为127.0.0.1:9876
- 启动生产者实例,然后实例就会去连接NameServer并获取Broker的地址,然后生产者实例就会与Broker建立连接
- 创建Message消息实体,其中第一个参数为主题名称,这里为TopicTest;第二个参数为Tag类型,这里为TagA;第三个参数为消息体内容,是个二进制数据。代码4.2调用生产者实例的send方法同步发送消息,需要注意,这里同步意味着当消息同步通过底层网络通信投递到TCP发送buffer后才会返回,整个过程中需要阻塞调用线程
- 调用线程调用RocketmqClient的send方法发送消息后,其内部会首先创建一个ResponseFuture对象,并切换到IO线程把请求发送到Broker,接着调用线程会调用ResponseFuture的wait方法阻塞调用线程,等IO线程把请求写入TCP发送Buffer后,IO线程会设置ResponseFuture对象说请求已经完成,然后调用线程就会从wait方法返回。需要注意的是,RocketMQ返回成功是指已经把请求发送到了其TCP发送Buffer中,这时候请求还没到Broker
-
producer.async
- 注意demo中,主线程不能结束,否则抛出异常RemotingConnectException,所以加了一个sleep
- 在发送消息的同时设置了一个CallBack,调用该方法后,该方法会马上返回,然后等真的把消息投递到Broker后,底层IO线程会回调设置的Callback来通知,消息已经发送成功或者消息发送失败的原因
- RocketMQ客户端内部把请求提交到线程池后就返回了。消息发送任务会被在线程池内异步执行,异步发送任务内首先会创建一个ResponseFuture对象,然后切换到IO线程来具体发送请求,等IO线程将请求发送到TCP发送Buffer后,IO线程会设置ResponseFuture对象的值,然后ResponseFuture中保存的CallBack的执行切换到线程池来执行。可知使用异步发送消息方式调用线程不会被阻塞
-
chap9 Go语言的异步编程能力
-
Go语言概述
- 传统的编程模型,比如经常使用Java、C++、Python编程时,多线程之间需要通过共享内存(比如在堆上创建的共享变量)来通信。这时为保证线程安全,多线程共享的数据结构需要使用锁来保护,多线程访问共享数据结构时需要竞争获取锁,只有获取到锁的线程才可以存取共享数据
- Go中不仅在语言层面提供了这种低级并发同步原语—锁,比如互斥锁、读写锁、条件变量等,而且Go的并发原语—goroutine和channel提供了一种优雅而独特的结构化开发并发软件的方式。Go鼓励使用channel在goroutine之间传递对共享数据的引用,而不是明确地使用锁来保护对共享数据的访问。这种方法确保在给定时间只有一个goroutine可以访问共享数据。这个理念被总结为:不要通过共享内存来通信,而要通过通信来共享内存
- Go中并发模型采用了通道,体现为CSP的一个变种。之所以选择CSP,一方面是因为Google的开发工程师对它的熟悉程度,另一方面因为CSP具有一种无须对其模型做任何深入的改变就能轻易添加到过程性编程模型中的特性
- 在其他语言,比如Java中线程模型的实现是一个操作系统内核线程对应着一个使用new Thread创建的线程,而由于操作系统线程个数是有限制的,所以限制了创建线程的个数。另外,当线程执行阻塞操作时,线程要从用户态切换到内核态执行,这个开销是比较大的;而在Go中线程模型则是一个操作系统线程对应多个goroutine,用户可以创建的goroutine个数只受内存大小限制,而且上下文切换发生在用户态,切换速度比较快,并且开销比较小,所以Go中一台机器可以创建百万个goroutine
- 在Java中编写并发程序时需要在操作系统线程层面进行考虑,但是在Go中,不需要考虑操作系统线程,而是需要站在goroutine和通道上进行思考,当然有时候也会在共享内存上进行思考
- 在Go中只需要在要异步执行的方法前面加上go关键字,就可以让方法与主goroutine并发运行。另外结合goroutine和channel,可以方便地实现异步非阻塞回压功能
-
Go语言的线程模型
-
一对一模型
- 这种线程模型下用户线程与内核线程是一一对应的,当从程序入口点(比如main函数)启动后,操作系统就创建了一个进程。这个main函数所在的线程就是主线程。在main函数内当我们使用高级语言创建一个用户线程的时候,其实对应创建了一个内核线程
- 这种线程模型的优点是,在多处理器上多个线程可以真正实现并行运行,并且当一个线程由于网络IO等原因被阻塞时,其他线程不受影响
- 缺点是由于一般操作系统会限制内核线程的个数,所以用户线程的个数会受到限制。另外由于用户线程与系统线程一一对应,当用户线程比如执行IO操作(执行系统调用)时,需要从用户态的用户程序执行切换到内核态执行内核操作,然后等执行完毕后又会从内核态切换到用户态执行用户程序,而这个切换操作开销是比较大的
- 另外这里提下,Java的线程模型就是使用的这种一对一的模型,所以Java中多线程对共享变量使用锁同步时会导致获取锁失败的线程进行上下文切换,而JUC包提供的无锁CAS操作则不会产生上下文切换
-
多对一模型
- 多对一模型是指多个用户线程对应一个内核线程,同时同一个用户线程只能对应一个内核线程,这时候对应同一个内核线程的多个用户线程的上下文切换是由用户态的运行时线程库来做的,而不是由操作系统调度系统来做的
- 这种模型的好处是由于上下文切换在用户态,因而切换速度很快,开销很小;另外,可创建的用户线程的数量可以很多,只受内存大小限制
- 这种模型由于多个用户线程对应一个内核线程,当该内核线程对应的一个用户线程被阻塞挂起时,该内核线程对应的其他用户线程也不能运行,因为这时候内核线程已经被阻塞挂起了。另外这种模型并不能很好地利用多核CPU进行并发运行
-
多对多模型
- 多对多模型则结合一对一和多对一模型的特点,让大量的用户线程对应少数几个内核线程
- 这时候每个内核线程对应多个用户线程,每个用户线程又可以对应多个内核线程,当一个用户线程阻塞后,其对应的当前内核线程会被阻塞,但是被阻塞的内核线程对应的其他用户线程可以切换到其他内核线程上继续运行,所以多对多模型是可以充分利用多核CPU提升运行效能的
- 另外多对多模型也对用户线程个数没有限制,理论上只要内存够用可以无限创建
-
Go线程模型属于多对多线程模型
- 每个内核线程对应多个用户线程,每个用户线程又可以对应多个内核线程,当一个用户线程阻塞后,其对应的当前内核线程会被阻塞,但是被阻塞的内核线程对应的其他用户线程可以切换到其他内核线程上继续运行,所以多对多模型是可以充分利用多核CPU提升运行效能的
- Go中使用Go语句创建的goroutine可以认为是轻量级的用户线程。Go线程模型包含3个概念:内核线程(M-Machine)、goroutine(G-Goroutine)和逻辑处理器(P-Processor)。在Go中每个逻辑处理器(P)会绑定到某一个内核线程上,每个逻辑处理器(P)内有一个本地队列,用来存放Go运行时分配的goroutine。在上面介绍的多对多线程模型中是操作系统调度线程在物理CPU上运行,在Go中则是Go的运行时调度goroutine在逻辑处理器(P)上运行
- 在Go中存在两级调度,一级是操作系统的调度系统,该调度系统调度逻辑处理器占用CPU时间片运行;一级是Go的运行时调度系统,该调度系统调度某个goroutine在逻辑处理上运行
- 使用Go语句创建一个goroutine后,创建的goroutine会被放入Go运行时调度器的全局运行队列中,然后Go运行时调度器会把全局队列中的goroutine分配给不同的逻辑处理器(P),分配的goroutine会被放到逻辑处理器(P)的本地队列中,当本地队列中某个goroutine就绪后,待分配到时间片后就可以在逻辑处理器上运行了
- 为了避免某些goroutine出现饥饿现象,被分配到某一个逻辑处理器(P)上的多个goroutine是分时在该逻辑处理器上运行的,而不是独占运行直到结束
- goroutine内部实现与在多个操作系统线程(OS线程)之间复用的协程(coroutine)一样。如果一个goroutine阻塞OS线程,例如等待输入,则该OS线程对应的逻辑处理器(P)中的其他goroutine将迁移到其他OS线程,以便它们继续运行
- 假设goroutine1在执行文件读取操作,则goroutine1会导致内核线程1阻塞,这时候Go运行时调度器会把goroutine1所在的逻辑处理器1迁移到其他内核线程上(这里是内核线程2上),这时候逻辑处理器1上的goroutine2和goroutine3就不会受goroutine1的影响了。等goroutine1文件读取操作完成后,goroutine1又会被Go运行时调度系统重新放入逻辑处理器1的本地队列
-
在Go中,使用go关键字跟上一个函数,就创建了一个goroutine,每个goroutine可以认为是一个轻量级的线程,其占用更少的堆栈空间
-
可以把通道理解为一个并发安全的队列,生产者goroutine可以向通道里放入元素,消费者goroutine可以从通道里获取元素。
- 从队列大小来看,通道可以分为有缓冲通道和无缓冲通道,无缓冲通道里最多有一个元素,有缓冲通道里面可以有很多元素
- 另外,通道还是有方向的
- 通道是可以关闭的
-
Go中以消息进行通信的方式允许程序员安全地协调多个并发任务,并且容易理解语义和控制流,这通常比其他语言(如Java)中的回调函数(callback)或共享内存方式更优雅简单
-
Go的并发原语使构建流式数据管道变得很容易,从而使IO操作和多核CPU更加有效
- 管道是由一系列节点组成,这些节点使用通道连接起来。其中每个节点是一组运行相同功能的goroutine,在每个阶段goroutine从上游通道获取元素,然后对该数据执行某些操作,然后把操作后的结果再写入下游的通道。除了第一个和最后一个节点,每个节点可以有任意多个输入和输出通道,第一个节点有时候被称为数据源或者生产者,最后一个节点被称为数据终点或者消费者
- 如果你对流式编程有经验的话,可能会发现管道和反应式库比如RxJava中的流式编程很相似
- 借助Go中的并发原语goroutine与通道,可以非常方便地构建异步非阻塞、具有回压功能的程序
-
网友评论