并发概述
HarmonyOS提供了异步并发 (Promise和async/await)和多线程并发(TaskPool和Worker)两种处理策略。实际上异步并发 (Promise和async/await)严格上讲不是真正意义上的并发,目前只有worker和taskpool的方式了,但是taskpool的线程数量由系统调度,taskpool优点是所有线程由系统调度,如果想要指定执行线程的数量只能由开放者自行封装worker实现线程数量的控制。更具体的去鸿蒙官网看文档。本文章要讲的是封装Worker实现多线程执行批量任务。
更多的并发相关的内容请看文档
在workers目录创建Woker.ets 并定义
import { MessageEvents, ThreadWorkerGlobalScope, worker } from '@kit.ArkTS';
import { WorkTask } from '../threads/WorkTask';
import { BusinessError } from '@kit.BasicServicesKit';
const workerPort: ThreadWorkerGlobalScope = worker.workerPort;
workerPort.onmessage = (event: MessageEvents) => {
const task: WorkTask = event.data.task
const context: Context = event.data.context
task.run(context)
.then((result) => workerPort.postMessage({ task: task, result: result }))
.catch((error: BusinessError) => workerPort.postMessage({ task: task, error: error }))
}
WorkTask
@Sendable
export class WorkTask {
retryCount: number = 0
name: string = WorkTask.name
params: collections.Map<string, lang.ISendable> = new collections.Map()
constructor(params: collections.Map<string, lang.ISendable>) {
this.params = params
}
run(context: Context): Promise<boolean> {
return Promise.resolve(true)
}
}
因为需要讲任务传递给Worker执行,所以需要讲WorkerTask使用@Sendable注解或者实现lang.Sendale接口。
ArkTS提供了Sendable对象类型,在并发通信时支持通过引用传递来解决上述问题。
WorkerThreadManager
export class WorkerThreadManager {
private tasks: Array<WorkTask> = new Array<WorkTask>()
private maxConcurrentTasks: number = 5
private runningTasks: number = 0
private failedTasks: number = 0
static newFixedThreadPool(count:number):WorkerThreadManager{
const thread = new WorkerThreadManager()
thread.setMaxConcurrentTasks(count)
return thread
}
static newSingleThreadPool():WorkerThreadManager{
const thread = new WorkerThreadManager()
thread.setMaxConcurrentTasks(1)
return thread
}
static newCachedThreadPool():WorkerThreadManager{
const thread = new WorkerThreadManager()
thread.setMaxConcurrentTasks(Number.MAX_VALUE)
return thread
}
public addTask(task: WorkTask) {
this.tasks.push(task)
return this
}
public executeTask(context: Context): Promise<boolean> {
return new Promise((resolve, reject) => {
for (let i = this.runningTasks; i < this.maxConcurrentTasks; i++) {
const task = this.tasks.shift()
if (task) {
this.runningTasks++
const workerThread = new worker.ThreadWorker('entry/ets/workers/Worker.ets')
this.runTask(task, workerThread, context, resolve, reject)
}
}
})
}
private async runTask(task: WorkTask,
workerThread: worker.ThreadWorker, context: Context,
resolve: (result: boolean) => void, reject: (reason: Object) => void) {
workerThread.postMessage({ task: task, context: context })
workerThread.onmessage = async (event) => {
try {
if (event.data.result) {
const queueSize = this.tasks.length
if (queueSize <= 0) {
this.runningTasks--
workerThread.terminate()
} else {
const nextTask = this.tasks.shift()
if (nextTask) {
this.runTask(nextTask, workerThread, context, resolve, reject)
}
}
} else {
const errorTask: WorkTask = event.data.task
if (errorTask && errorTask.retryCount < 3) {
errorTask.retryCount++
this.runTask(errorTask, workerThread, context, resolve, reject)
} else {
this.executeTaskFailed(workerThread, reject, new Error("error and retry full"));
}
}
} catch (error) {
this.executeTaskFailed(workerThread, reject, error);
} finally {
const runningTaskNum = this.runningTasks
LogUtil.info("runningTaskNum:" + runningTaskNum)
if (runningTaskNum <= 0) {
resolve(this.failedTasks == 0)
}
}
}
}
private executeTaskFailed(workerThread: worker.ThreadWorker, reject: (reason: Object) => void, error: Error) {
this.failedTasks++
this.runningTasks--
try {
workerThread.terminate()
} finally {
reject("task execute failed " + JSON.stringify(error))
}
}
setMaxConcurrentTasks(maxConcurrentTasks: number) {
this.maxConcurrentTasks = maxConcurrentTasks
return this
}
}
我们调用worker.ThreadWorker 的postMessage 方法讲task和context传递给Worker并在Worker的onmessage回调方法中执行任务,当封装的Task批量执行之后并返回Promise。
网友评论