美文网首页
HarmonyOS封装多线程执行批量任务池

HarmonyOS封装多线程执行批量任务池

作者: 有没有口罩给我一个 | 来源:发表于2025-01-12 10:54 被阅读0次

并发概述

HarmonyOS提供了异步并发 (Promise和async/await)多线程并发(TaskPool和Worker)两种处理策略。实际上异步并发 (Promise和async/await)严格上讲不是真正意义上的并发,目前只有worker和taskpool的方式了,但是taskpool的线程数量由系统调度,taskpool优点是所有线程由系统调度,如果想要指定执行线程的数量只能由开放者自行封装worker实现线程数量的控制。更具体的去鸿蒙官网看文档。本文章要讲的是封装Worker实现多线程执行批量任务。
更多的并发相关的内容请看文档

在workers目录创建Woker.ets 并定义

import { MessageEvents, ThreadWorkerGlobalScope, worker } from '@kit.ArkTS';
import { WorkTask } from '../threads/WorkTask';
import { BusinessError } from '@kit.BasicServicesKit';

const workerPort: ThreadWorkerGlobalScope = worker.workerPort;

workerPort.onmessage = (event: MessageEvents) => {
  const task: WorkTask = event.data.task
  const context: Context = event.data.context
  task.run(context)
.then((result) => workerPort.postMessage({ task: task, result: result }))
.catch((error: BusinessError) => workerPort.postMessage({ task: task, error: error }))
  }

WorkTask

@Sendable
export class WorkTask {
  retryCount: number = 0
  name: string = WorkTask.name
  params: collections.Map<string, lang.ISendable> = new collections.Map()

  constructor(params: collections.Map<string, lang.ISendable>) {
    this.params = params
  }

  run(context: Context): Promise<boolean> {
    return Promise.resolve(true)
  }
}

因为需要讲任务传递给Worker执行,所以需要讲WorkerTask使用@Sendable注解或者实现lang.Sendale接口。

ArkTS提供了Sendable对象类型,在并发通信时支持通过引用传递来解决上述问题。

WorkerThreadManager

export class WorkerThreadManager {
  private tasks: Array<WorkTask> = new Array<WorkTask>()
  private maxConcurrentTasks: number = 5
  private runningTasks: number = 0
  private failedTasks: number = 0

  static newFixedThreadPool(count:number):WorkerThreadManager{
    const thread = new WorkerThreadManager()
    thread.setMaxConcurrentTasks(count)
    return thread
  }

  static newSingleThreadPool():WorkerThreadManager{
    const thread = new WorkerThreadManager()
    thread.setMaxConcurrentTasks(1)
    return thread
  }

  static newCachedThreadPool():WorkerThreadManager{
    const thread = new WorkerThreadManager()
    thread.setMaxConcurrentTasks(Number.MAX_VALUE)
    return thread
  }

  public addTask(task: WorkTask) {
    this.tasks.push(task)
    return this
  }

  public executeTask(context: Context): Promise<boolean> {
    return new Promise((resolve, reject) => {
      for (let i = this.runningTasks; i < this.maxConcurrentTasks; i++) {
        const task = this.tasks.shift()
        if (task) {
          this.runningTasks++
          const workerThread = new worker.ThreadWorker('entry/ets/workers/Worker.ets')
          this.runTask(task, workerThread, context, resolve, reject)
    }
  }
  })
}

  private async runTask(task: WorkTask,
    workerThread: worker.ThreadWorker, context: Context,
    resolve: (result: boolean) => void, reject: (reason: Object) => void) {
      workerThread.postMessage({ task: task, context: context })
      workerThread.onmessage = async (event) => {
        try {
          if (event.data.result) {
          const queueSize = this.tasks.length
          if (queueSize <= 0) {
              this.runningTasks--
              workerThread.terminate()
          } else {
              const nextTask = this.tasks.shift()
              if (nextTask) {
                this.runTask(nextTask, workerThread, context, resolve, reject)
            }
          }
        } else {
          const errorTask: WorkTask = event.data.task
      if (errorTask && errorTask.retryCount < 3) {
        errorTask.retryCount++
        this.runTask(errorTask, workerThread, context, resolve, reject)
      } else {
        this.executeTaskFailed(workerThread, reject, new Error("error and retry full"));
      }
    }
  } catch (error) {
    this.executeTaskFailed(workerThread, reject, error);
  } finally {
    const runningTaskNum = this.runningTasks
    LogUtil.info("runningTaskNum:" + runningTaskNum)
    if (runningTaskNum <= 0) {
      resolve(this.failedTasks == 0)
    }
  }
}
  }

  private executeTaskFailed(workerThread: worker.ThreadWorker, reject: (reason: Object) => void, error: Error) {
    this.failedTasks++
    this.runningTasks--
    try {
      workerThread.terminate()
    } finally {
      reject("task execute failed " + JSON.stringify(error))
    }
    }

  setMaxConcurrentTasks(maxConcurrentTasks: number) {
      this.maxConcurrentTasks = maxConcurrentTasks
      return this
  }
}

我们调用worker.ThreadWorker 的postMessage 方法讲task和context传递给Worker并在Worker的onmessage回调方法中执行任务,当封装的Task批量执行之后并返回Promise。

相关文章

网友评论

      本文标题:HarmonyOS封装多线程执行批量任务池

      本文链接:https://www.haomeiwen.com/subject/fdsmyjtx.html