仓颉之并发编程 2024-09-03 周二

  • 并发编程是现代编程语言中不可或缺的特性,仓颉编程语言提供抢占式的线程模型作为并发编程机制。

  • 在谈及编程语言和线程时,线程其实可以细化为两种不同概念,语言线程和 native 线程。

  • 语言线程是编程语言中并发模型的基本执行单位,语言线程的目的是屏蔽底层实现细节。

  • native 线程指语言实现中所使用到的线程(一般是操作系统线程),他们作为语言线程的具体实现载体。不同编程语言会以不同的方式实现语言线程。

  • 在大多数情况下,开发者只需要面向仓颉线程进行并发编程而不需要考虑这些细节。但在进行跨语言编程时,开发者需要谨慎调用可能发生阻塞的 foreign 函数,例如 IO 相关的操作系统调用等。

foreign socket_read(sock: Int64): CPointer<Int8>

let fut = spawn {
    let sock: Int64 = ...
    let ptr = socket_read(sock)


当开发者希望并发执行某一段代码时,只需创建一个仓颉线程即可。要创建一个新的仓颉线程,可以使用关键字 spawn 并传递一个无形参的 lambda 表达式,该 lambda 表达式即为在新线程中执行的代码。

import std.sync.*
import std.time.*

main(): Int64 {
    spawn { =>
        println("New thread before sleeping")
        sleep(100 * Duration.millisecond) // sleep for 100ms.
        println("New thread after sleeping")

    println("Main thread")

    return 0

/* 由于没有同步机制,新线程会在主线程结束时一起停止,输出内容不确定,有可能是这样的:
New thread before sleeping
Main thread


使用 Future<T> 等待线程结束并获取返回值

  • spawn 表达式的返回类型是 Future<T>,其中 T 是类型变元,其类型与 lambda 表达式的返回类型一致。当我们调用 Future<T> 的 get() 成员函数时,它将等待它的线程执行完成。
public class Future<T> {
    // Blocking the current thread, waiting for the result of the thread corresponding to the current Future object.
    // If an exception occurs in the corresponding thread, the method will throw the exception.
    public func get(): T

    // Blocking the current thread, waiting for the result of the thread corresponding to the current Future object.
    // If the corresponding thread has not completed execution within ns nanoseconds, the method will return a Option<T>.None.
    // If `ns` <= 0, its behavior is the same as `get()`.
    public func get(ns: Int64): Option<T>

    // Non-blocking method that immediately returns Option<T>.None if thread has not finished execution.
    // Returns the computed result otherwise.
    // If an exception occurs in the corresponding thread, the method will throw the exception.
    public func tryGet(): Option<T>
import std.sync.*
import std.time.*

main(): Int64 {
    let fut: Future<Unit> = spawn { =>
        println("New thread before sleeping")
        sleep(100 * Duration.millisecond) // sleep for 100ms.
        println("New thread after sleeping")

    println("Main thread")

    fut.get() // wait for the thread to finish.
    return 0

/* 主线程在完成打印后会因为调用 get() 而等待新创建的线程执行结束。但主线程和新线程的打印顺序具有不确定性。输出内容可能如下:
New thread before sleeping
Main thread
New thread after sleeping
  • get() 的调用位置会影响线程是否能同时运行。
import std.sync.*
import std.time.*

main(): Int64 {
    let fut: Future<Unit> = spawn { =>
        println("New thread before sleeping")
        sleep(100 * Duration.millisecond) // sleep for 100ms.
        println("New thread after sleeping")

    fut.get() // wait for the thread to finish.

    println("Main thread")
    return 0

/* 主线程将等待新创建的线程执行完成,然后再执行打印,因此程序的输出将变得确定,如下所示:
New thread before sleeping
New thread after sleeping
Main thread
  • Future<T> 除了可以用于阻塞等待线程执行结束以外,还可以获取线程执行的结果。
  1. get(): T:阻塞等待线程执行结束,并返回执行结果,如果该线程已经结束,则直接返回执行结果。
import std.sync.*
import std.time.*

main(): Int64 {
    let fut: Future<Int64> = spawn {
        sleep(Duration.second) // sleep for 1s.
        return 1

    try {
        // wait for the thread to finish, and get the result.
        let res: Int64 = fut.get()
        println("result = ${res}")
    } catch (_) {
    return 0

result = 1
  1. get(ns: Int64): Option<T>:阻塞等待该 Future<T> 所代表的线程执行结束,并返回执行结果,当到达超时时间 ns 时,如果该线程还没有执行结束,将会返回 Option<T>.None。如果 ns <= 0,其行为与 get() 相同。
import std.sync.*
import std.time.*

main(): Int64 {
    let fut = spawn {
        sleep(Duration.second) // sleep for 1s.
        return 1

    // wait for the thread to finish, but only for 1ms.
    let res: Option<Int64> = fut.get(1000 * 1000)
    match (res) {
        case Some(val) => println("result = ${val}")
        case None => println("oops")
    return 0



每个 Future<T> 对象都有一个对应的仓颉线程,以 Thread 对象为表示。Thread 类主要被用于访问线程的属性信息,例如线程标识等。需要注意的是,Thread 无法直接被实例化构造对象,仅能从 Future<T> 的 thread 成员属性获取对应的 Thread 对象,或是通过 Thread 的静态成员属性 currentThread 得到当前正在执行线程对应的 Thread 对象。

class Thread {
    ... ...
    // Get the currently running thread
    static prop currentThread: Thread

    // Get the unique identifier (represented as an integer) of the thread object
    prop id: Int64

    // Check whether the thread has any cancellation request
    prop hasPendingCancellation: Bool
main(): Unit {
    let fut = spawn {
        println("Current thread id: ${Thread.currentThread.id}")
    println("New thread id: ${fut.thread.id}")

/* 由于主线程和新线程获取的是同一个 Thread 对象,所以他们能够打印出相同的线程标识。
New thread id: 1
Current thread id: 1


  • 可以通过 Future<T> 的 cancel() 方法向对应的线程发送终止请求,该方法不会停止线程执行。开发者需要使用 Thread 的 hasPendingCancellation 属性来检查线程是否存在终止请求。

  • 一般而言,如果线程存在终止请求,那么开发者可以实施相应的线程终止逻辑。因此,如何终止线程都交由开发者自行处理,如果开发者忽略终止请求,那么线程继续执行直到正常结束。

import std.sync.SyncCounter

main(): Unit {
    let syncCounter = SyncCounter(1)
    let fut = spawn {
        // Check cancellation request
        if (Thread.currentThread.hasPendingCancellation) {
    fut.cancel()    // Send cancellation request
    fut.get() // Join thread



  • 在并发编程中,如果缺少同步机制来保护多个线程共享的变量,很容易会出现数据竞争问题(data race)。

  • 仓颉编程语言提供三种常见的同步机制来确保数据的线程安全:原子操作,互斥锁以及条件变量。

原子操作 Atomic

  • 仓颉提供整数类型、Bool 类型和引用类型的原子操作。
class AtomicInt8 {
    public func load(): Int8
    public func store(val: Int8): Unit
    public func swap(val: Int8): Int8
    public func compareAndSwap(old: Int8, new: Int8): Bool
    public func fetchAdd(val: Int8): Int8
    public func fetchSub(val: Int8): Int8
    public func fetchAnd(val: Int8): Int8
    public func fetchOr(val: Int8): Int8
    public func fetchXor(val: Int8): Int8
  • 整数类型的原子操作支持基本的读写、交换以及算术运算操作:
  • 下方示例演示了如何在多线程程序中,使用原子操作实现计数:
import std.sync.*
import std.time.*
import std.collection.*

let count = AtomicInt64(0)

main(): Int64 {
    let list = ArrayList<Future<Int64>>()

    // create 1000 threads.
    for (i in 0..1000) {
        let fut = spawn {
            sleep(Duration.millisecond) // sleep for 1ms.

    // Wait for all threads finished.
    for (f in list) {

    let val = count.load()
    println("count = ${val}")
    return 0

count = 1000
  • 以下是使用整数类型原子操作的一些其他正确示例:
var obj: AtomicInt32 = AtomicInt32(1)
var x = obj.load() // x: 1, the type is Int32
x = obj.swap(2) // x: 1
x = obj.load() // x: 2
var y = obj.compareAndSwap(2, 3) // y: true, the type is Bool.
y = obj.compareAndSwap(2, 3) // y: false, the value in obj is no longer 2 but 3. Therefore, the CAS operation fails.
x = obj.fetchAdd(1) // x: 3
x = obj.load() // x: 4
  • Bool 类型和引用类型的原子操作只提供读写和交换操作:
  • 原子引用类型是 AtomicReference,以下是使用 Bool 类型、引用类型原子操作的一些正确示例
import std.sync.*

class A {}

main() {
    var obj = AtomicBool(true)
    var x1 = obj.load() // x1: true, the type is Bool
    var t1 = A()
    var obj2 = AtomicReference(t1)
    var x2 = obj2.load() // x2 and t1 are the same object
    var y1 = obj2.compareAndSwap(x2, t1) // x2 and t1 are the same object, y1: true
    var t2 = A()
    var y2 = obj2.compareAndSwap(t2, A()) // x and t1 are not the same object, CAS fails, y2: false
    y2 = obj2.compareAndSwap(t1, A()) // CAS successes, y2: true


可重入互斥锁 ReentrantMutex


  • 使用可重入互斥锁时,必须牢记两条规则:
  1. 在访问共享数据之前,必须尝试获取锁;
  2. 处理完共享数据后,必须进行解锁,以便其他线程可以获得锁。
  • ReentrantMutex 提供的主要成员函数如下:
public open class ReentrantMutex {
    // Create a ReentrantMutex.
    public init()

    // Locks the mutex, blocks if the mutex is not available.
    public func lock(): Unit

    // Unlocks the mutex. If there are other threads blocking on this
    // lock, then wake up one of them.
    public func unlock(): Unit

    // Tries to lock the mutex, returns false if the mutex is not
    // available, otherwise returns true.
    public func tryLock(): Bool
  • 下方示例演示了如何使用 ReentrantMutex 来保护对全局共享变量 count 的访问,对 count 的操作即属于临界区:
import std.sync.*
import std.time.*
import std.collection.*

var count: Int64 = 0
let mtx = ReentrantMutex()

main(): Int64 {
    let list = ArrayList<Future<Unit>>()

    // creat 1000 threads.
    for (i in 0..1000) {
        let fut = spawn {
            sleep(Duration.millisecond) // sleep for 1ms.

    // Wait for all threads finished.
    for (f in list) {

    println("count = ${count}")
    return 0

count = 1000
  • 下方示例演示了如何使用 tryLock:
import std.sync.*

main(): Int64 {
    let mtx: ReentrantMutex = ReentrantMutex()
    var future: Future<Unit> = spawn {
        while (true) {}
    let res: Option<Unit> = future.get(10*1000*1000)
    match (res) {
        case Some(v) => ()
        case None =>
            if (mtx.tryLock()) {
                return 1
            return 0
    return 2
  • 错误示例 1:线程操作临界区后没有解锁,导致其他线程无法获得锁而阻塞。
import std.sync.*

var sum: Int64 = 0
let mutex = ReentrantMutex()

main() {
    let foo = spawn { =>
        sum = sum + 1
    let bar = spawn { =>
        sum = sum + 1
    bar.get() // Because the thread is not unlocked, other threads waiting to obtain the current mutex will be blocked.
  • 错误示例 2:在本线程没有持有锁的情况下调用 unlock 将会抛出异常。
import std.sync.*

var sum: Int64 = 0
let mutex = ReentrantMutex()

main() {
    let foo = spawn { =>
        sum = sum + 1
        mutex.unlock() // Error, Unlock without obtaining the lock and throw an exception: IllegalSynchronizationStateException.
  • 错误示例 3:tryLock() 并不保证获取到锁,可能会造成不在锁的保护下操作临界区和在没有持有锁的情况下调用 unlock 抛出异常等行为。
var sum: Int64 = 0
let mutex = ReentrantMutex()

main() {
    for (i in 0..100) {
        spawn { =>
            mutex.tryLock() // Error, `tryLock()` just trying to acquire a lock, there is no guarantee that the lock will be acquired, and this can lead to abnormal behavior.
            sum = sum + 1
  • 虽然 ReentrantMutex 是一个可重入锁,但是调用 unlock() 的次数必须和调用 lock() 的次数相同,才能成功释放该锁。

  • 下方示例代码演示了 ReentrantMutex 可重入的特性:
    无论是主线程还是新创建的线程,如果在 foo() 中已经获得了锁,那么继续调用 bar() 的话,在 bar() 函数中由于是对同一个 ReentrantMutex 进行加锁,因此也是能立即获得该锁的,不会出现死锁。

import std.sync.*
import std.time.*

var count: Int64 = 0
let mtx = ReentrantMutex()

func foo() {
    count += 10

func bar() {
    count += 100

main(): Int64 {
    let fut = spawn {
        sleep(Duration.millisecond) // sleep for 1ms.



    println("count = ${count}")
    return 0

count = 220


Monitor 是一个内置的数据结构,它绑定了互斥锁和单个与之相关的条件变量(也就是等待队列)。Monitor 可以使线程阻塞并等待来自另一个线程的信号以恢复执行。这是一种利用共享变量进行线程同步的机制,主要提供如下方法:

public class Monitor <: ReentrantMutex {
    // Create a monitor.
    public init()

    // Wait for a signal, blocking the current thread.
    public func wait(timeout!: Duration = Duration.Max): Bool

    // Wake up one thread of those waiting on the monitor, if any.
    public func notify(): Unit

    // Wake up all threads waiting on the monitor, if any.
    public func notifyAll(): Unit
  • 调用 Monitor 对象的 wait、notify 或 notifyAll 方法前,需要确保当前线程已经持有对应的 Monitor 锁。wait 方法包含如下动作:
  1. 添加当前线程到该 Monitor 对应的等待队列中;
  2. 阻塞当前线程,同时完全释放该 Monitor 锁,并记录锁的重入次数;
  3. 等待某个其它线程使用同一个 Monitor 实例的 notify 或 notifyAll 方法向该线程发出信号;
  4. 当前线程被唤醒后,会自动尝试重新获取 Monitor 锁,且持有锁的重入状态与第 2 步记录的重入次数相同;但是如果尝试获取 Monitor 锁失败,则当前线程会阻塞在该 Monitor 锁上。
  • 以下是使用 Monitor 的一个正确示例:
import std.sync.*
import std.time.*

var mon = Monitor()
var flag: Bool = true

main(): Int64 {
    let fut = spawn {
        while (flag) {
            println("New thread: before wait")
            println("New thread: after wait")

    // Sleep for 10ms, to make sure the new thread can be executed.
    sleep(10 * Duration.millisecond)

    println("Main thread: set flag")
    flag = false

    println("Main thread: notify")

    // wait for the new thread finished.
    return 0

New thread: before wait
Main thread: set flag
Main thread: notify
New thread: after wait
  • Monitor 对象执行 wait 时,必须在锁的保护下进行,否则 wait 中释放锁的操作会抛出异常。

  • 以下是使用条件变量的一些错误示例:

import std.sync.*

var m1 = Monitor()
var m2 = ReentrantMutex()
var flag: Bool = true
var count: Int64 = 0

func foo1() {
    spawn {
        while (flag) {
            m1.wait() // Error:The lock used together with the condition variable must be the same lock and in the locked state. Otherwise, the unlock operation in `wait` throws an exception.
        count = count + 1
    flag = false

func foo2() {
    spawn {
        while (flag) {
            m1.wait() // Error:The `wait` of a conditional variable must be called with a lock held.
        count = count + 1
    flag = false

main() {
    return 0


MultiConditionMonitor 是一个内置的数据结构,它绑定了互斥锁和一组与之相关的动态创建的条件变量。该类应仅当在 Monitor 类不足以满足复杂的线程间同步的场景下使用。主要提供如下方法:

public class MultiConditionMonitor <: ReentrantMutex {
   // Constructor.

   // Returns a new ConditionID associated with this monitor. May be used to implement
   // "single mutex -- multiple wait queues" concurrent primitives.
   // Throws IllegalSynchronizationStateException("Mutex is not locked by the current thread") if the current thread does not hold this mutex.
   func newCondition(): ConditionID

   // Blocks until either a paired `notify` is invoked or `timeout` nanoseconds pass.
   // Returns `true` if the specified condition was signalled by another thread or `false` on timeout.
   // Spurious wakeups are allowed.
   // Throws IllegalSynchronizationStateException("Mutex is not locked by the current thread") if the current thread does not hold this mutex.
   // Throws IllegalSynchronizationStateException("Invalid condition") if `id` was not returned by `newCondition` of this MultiConditionMonitor instance.
   func wait(id: ConditionID, timeout!: Duration = Duration.Max): Bool

   // Wakes up a single thread waiting on the specified condition, if any (no particular admission policy implied).
   // Throws IllegalSynchronizationStateException("Mutex is not locked by the current thread") if the current thread does not hold this mutex.
   // Throws IllegalSynchronizationStateException("Invalid condition") if `id` was not returned by `newCondition` of this MultiConditionMonitor instance.
   func notify(id: ConditionID): Unit

   // Wakes up all threads waiting on the specified condition, if any (no particular admission policy implied).
   // Throws IllegalSynchronizationStateException("Mutex is not locked by the current thread") if the current thread does not hold this mutex.
   // Throws IllegalSynchronizationStateException("Invalid condition") if `id` was not returned by `newCondition` of this MultiConditionMonitor instance.
   func notifyAll(id: ConditionID): Unit
  1. newCondition(): ConditionID:创建一个新的条件变量并与当前对象关联,返回一个特定的 ConditionID 标识符
  2. wait(id: ConditionID, timeout!: Duration = Duration.Max): Bool:等待信号,阻塞当前线程
  3. notify(id: ConditionID): Unit:唤醒一个在 Monitor 上等待的线程(如果有)
  4. notifyAll(id: ConditionID): Unit:唤醒所有在 Monitor 上等待的线程(如果有)
  • 初始化时,MultiConditionMonitor 没有与之相关的 ConditionID 实例。每次调用 newCondition 都会将创建一个新的条件变量并与当前对象关联,并返回如下类型作为唯一标识符
public struct ConditionID {
   private init() { ... } // constructor is intentionally private to prevent
                          // creation of such structs outside of MultiConditionMonitor
  • 请注意使用者不可以将一个 MultiConditionMonitor 实例返回的 ConditionID 传给其它实例,或者手动创建 ConditionID(例如使用 unsafe)。由于 ConditionID 所包含的数据(例如内部数组的索引,内部队列的直接地址,或任何其他类型数据等)和创建它的 MultiConditionMonitor 相关,所以将“外部” conditonID 传入 MultiConditionMonitor 中会导致 IllegalSynchronizationStateException。

  • 以下是使用 MultiConditionMonitor 去实现一个长度固定的有界 FIFO 队列,当队列为空,get() 会被阻塞;当队列满了时,put() 会被阻塞。

import std.sync.*

class BoundedQueue {
    // Create a MultiConditionMonitor, two Conditions.
    let m: MultiConditionMonitor = MultiConditionMonitor()
    var notFull: ConditionID
    var notEmpty: ConditionID

    var count: Int64 // Object count in buffer.
    var head: Int64  // Write index.
    var tail: Int64  // Read index.

    // Queue's length is 100.
    let items: Array<Object> = Array<Object>(100, {i => Object()})

    init() {
        count = 0
        head = 0
        tail = 0

        synchronized(m) {
          notFull  = m.newCondition()
          notEmpty = m.newCondition()

    // Insert an object, if the queue is full, block the current thread.
    public func put(x: Object) {
        // Acquire the mutex.
        synchronized(m) {
          while (count == 100) {
            // If the queue is full, wait for the "queue notFull" event.
          items[head] = x
          if (head == 100) {
            head = 0

          // An object has been inserted and the current queue is no longer
          // empty, so wake up the thread previously blocked on get()
          // because the queue was empty.
        } // Release the mutex.

    // Pop an object, if the queue is empty, block the current thread.
    public func get(): Object {
        // Acquire the mutex.
        synchronized(m) {
          while (count == 0) {
            // If the queue is empty, wait for the "queue notEmpty" event.
          let x: Object = items[tail]
          if (tail == 100) {
            tail = 0

          // An object has been popped and the current queue is no longer
          // full, so wake up the thread previously blocked on put()
          // because the queue was full.

          return x
        } // Release the mutex.


synchronized 关键字

互斥锁 ReentrantMutex 提供了一种便利灵活的加锁的方式,同时因为它的灵活性,也可能引起忘了解锁,或者在持有互斥锁的情况下抛出异常不能自动释放持有的锁的问题。因此,仓颉编程语言提供一个 synchronized 关键字,搭配ReentrantMutex一起使用,可以在其后跟随的作用域内自动进行加锁解锁操作,用来解决类似的问题。

  • 下方示例代码演示了如何使用 synchronized 关键字来保护共享数据:
import std.sync.*
import std.time.*
import std.collection.*

var count: Int64 = 0
let mtx = ReentrantMutex()

main(): Int64 {
    let list = ArrayList<Future<Unit>>()

    // creat 1000 threads.
    for (i in 0..1000) {
        let fut = spawn {
            sleep(Duration.millisecond) // sleep for 1ms.
            // Use synchronized(mtx), instead of mtx.lock() and mtx.unlock().
            synchronized(mtx) {

    // Wait for all threads finished.
    for (f in list) {

    println("count = ${count}")
    return 0

count = 1000
  • 通过在 synchronized 后面加上一个 ReentrantMutex 实例,对其后面修饰的代码块进行保护,可以使得任意时刻最多只有一个线程可以执行被保护的代码:
  1. 一个线程在进入 synchronized 修饰的代码块之前,会自动获取 ReentrantMutex 实例对应的锁,如果无法获取锁,则当前线程被阻塞;

  2. 一个线程在退出 synchronized 修饰的代码块之前,会自动释放该 ReentrantMutex 实例的锁。

  • 对于控制转移表达式(如 break、continue、return、throw),在导致程序的执行跳出 synchronized 代码块时,也符合上面第 2 条的说明,也就说也会自动释放 synchronized 表达式对应的锁。
import std.sync.*
import std.collection.*

var count: Int64 = 0
var mtx: ReentrantMutex = ReentrantMutex()

main(): Int64 {
    let list = ArrayList<Future<Unit>>()
    for (i in 0..10) {
        let fut = spawn {
            while (true) {
                synchronized(mtx) {
                    count = count + 1
                    println("in thread") // 实际上 in thread 这行不会被打印,因为 break 语句实际上会让程序执行跳出 while 循环(当然,在跳出 while 循环之前,是先跳出 synchronized 代码块)。

    // Wait for all threads finished.
    for (f in list) {

    synchronized(mtx) {
        println("in main, count = ${count}")
    return 0

in main, count = 10

spawn(Future, get());相当于Swift的Task,await/async; 感觉用起来更自然一点

线程局部变量 ThreadLocal

使用 core 包中的 ThreadLocal 可以创建并使用线程局部变量,每一个线程都有它独立的一个存储空间来保存这些线程局部变量,因此,在每个线程可以安全地访问他们各自的线程局部变量,而不受其他线程的影响。

public class ThreadLocal<T> {
     * 构造一个携带空值的仓颉线程局部变量
    public init()

     * 获得仓颉线程局部变量的值,如果值不存在,则返回 Option<T>.None
     * 返回值 Option<T> - 仓颉线程局部变量的值
    public func get(): Option<T>

     * 通过 value 设置仓颉线程局部变量的值
     * 如果传入 Option<T>.None,该局部变量的值将被删除,在线程后续操作中将无法获取
     * 参数 value - 需要设置的局部变量的值
    public func set(value: Option<T>): Unit
  • 下方示例代码演示了如何通过 ThreadLocal类来创建并使用各自线程的局部变量:
main(): Int64 {
    let tl = ThreadLocal<Int64>()
    let fut1 = spawn {
        println("tl in spawn1 = ${tl.get().getOrThrow()}")
    let fut2 = spawn {
        println("tl in spawn2 = ${tl.get().getOrThrow()}")

tl in spawn1 = 123
tl in spawn2 = 456
tl in spawn2 = 456
tl in spawn1 = 123

线程睡眠指定时长 sleep

sleep 函数会阻塞当前运行的线程,该线程会主动睡眠一段时间,之后再恢复执行,其参数类型为 Duration 类型。函数原型为:

func sleep(dur: Duration): Unit // Sleep for at least `dur`.
import std.sync.*
import std.time.*

main(): Int64 {
    sleep(Duration.second)  // sleep for 1s.
    return 0




