跃迁引擎

空気を読んだ雨降らないでよ

iOS Research & Development


HarmonyOS - 鸿蒙线程池及异步任务

鸿蒙中的并发

并发是指在同一时间段内,能够处理多个任务的能力。为了提升应用的响应速度与帧率,以及防止耗时任务对主线程的干扰,HarmonyOS 系统提供了异步并发和多线程并发两种处理策略。

  • 异步并发是指异步代码在执行到一定程度后会被暂停,以便在未来某个时间点继续执行,这种情况下,同一时间只有一段代码在执行。
  • 多线程并发允许在同一时间段内同时执行多段代码。在主线程继续响应用户操作和更新 UI 的同时,后台也能执行耗时操作,从而避免应用出现卡顿。

一.异步并发

问题1:Promise / async 中执行的异步任务运行在哪个线程,为什么不会阻塞主线程

Promise和async/await提供异步并发能力,是标准的JS异步语法。异步代码会被挂起并在之后继续执行,同一时间只有一段代码执行,适用于单次I/O任务的场景开发,例如一次网络请求、一次文件读写等操作。

异步语法是一种编程语言的特性,允许程序在执行某些操作时不必等待其完成,而是可以继续执行其他操作。

  1. Promise

Promise是一种用于处理异步操作的对象,可以将异步操作转换为类似于同步操作的风格,以方便代码编写和维护。Promise提供了一个状态机制来管理异步操作的不同阶段,并提供了一些方法来注册回调函数以处理异步操作的成功或失败的结果。

Promise有三种状态:

  1. pending(进行中)
  2. fulfilled(已完成)
  3. rejected(已拒绝)

Promise对象创建后处于pending状态,并在异步操作完成后转换为fulfilled或rejected状态。

最基本的用法是通过构造函数实例化一个Promise对象,同时传入一个带有两个参数的函数,通常称为executor函数。executor函数接收两个参数:resolve和reject,分别表示异步操作成功和失败时的回调函数,在函数的闭包中可以书写耗时代码

1
2
3
4
5
6
7
8
9
10
11
let p: Promise<string> = new Promise((resolve, reject) => {
// 执行异步耗时操作
YCAppStoreManager.mInstance.get(key).then((value) => {
// 返回数据
resolve(value as string)
}).catch((e: Error) => {
// 返回异常
reject(e)
})
}
})

Promise对象创建后,可以使用then方法和catch方法指定fulfilled状态和rejected状态的回调函数。then方法可接受两个参数,一个处理fulfilled(已完成)状态的函数,另一个处理rejected(已拒绝)状态的函数。只传一个参数则表示当Promise对象状态变为fulfilled(已完成)时,then方法会自动调用这个回调函数,并将Promise对象的结果作为参数传递给它。使用catch方法注册一个回调函数,用于处理“失败”的结果,即捕获Promise的状态改变为rejected状态或操作失败抛出的异常

1
2
3
4
5
 p.then((result: string) => {

}).catch((error: BusinessError) => {

})

也可以都用then来处理

1
2
3
4
5
p.then((result: string) => {

}, (error: BusinessError) => {

})
  1. async/await

async/await是一种用于处理异步操作的Promise语法糖,使得编写异步代码变得更加简单和易读。通过使用async关键字声明一个函数为异步函数,并使用await关键字等待Promise的解析(完成或拒绝),以同步的方式编写异步操作的代码。

async函数是一个返回Promise对象的函数,用于表示一个异步操作。在async函数内部,可以使用await关键字等待一个Promise对象的解析,并返回其解析值。如果一个async函数抛出异常,那么该函数返回的Promise对象将被拒绝,并且异常信息会被传递给Promise对象的onRejected()方法。

1
2
3
4
5
6
7
8
9
10
11
12
async function myAsyncFunction() {
try {
const result: string = await new Promise((resolve: Function) => {
resolve('Hello, world!');
})
console.info(result); // 输出: Hello, world!
} catch (e) {
console.error(`Get exception: ${e}`);
}
}

myAsyncFunction();

问题1回答:

在 ArkTS 层级上看,ts语言作为一个专注于构建用户界面的框架,其设计原则主要是为了简化UI开发流程。当执行Promise链或异步函数时,实际的异步操作(如网络请求、文件读写)会被委托给环境处理,这个环境就是当前的操作系统,这些异步的耗时操作并不会阻塞UI线程。而TS继承自JS的特性让Promise/async没有多线程的概念,而是基于事件的回调。一旦这些操作完成,它们的结果会被放入事件队列,等待主线程空闲时通过事件循环机制处理,进而执行相应的回调函数或Promise的resolve/reject。因此,即使异步操作在技术上可能涉及其他线程,它们与主线程的交互仍然是非阻塞的,确保了UI渲染线程不会被长时间的任务所阻塞,从而保持应用的响应性。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
test() {
let p: Promise<string> = new Promise((resolve, reject) => {
console.log(`Promise执行时当前线程: ${process.tid},当前进程:${process.pid}`)
setTimeout(() => {
resolve("成功")
console.log(`当前线程: ${process.tid},当前进程:${process.pid}`)
}, 1000)
})

p.then((result: string) => {
console.log(`返回结果:${result},当前线程:" + ${process.tid},当前进程:${process.pid}`)
}).catch((error: Error) => {
console.log("出现异常了:" + error)
})
}

上面打印的线程进程id都是同样的值

二. 多线程并发

并发模型是用来实现不同应用场景中并发任务的编程模型,常见的并发模型分为基于内存共享的并发模型和基于消息通信的并发模型。

Actor并发模型作为基于消息通信并发模型的典型代表,不需要开发者去面对锁带来的一系列复杂偶发的问题,同时并发度也相对较高,因此得到了广泛的支持和使用。

当前ArkTS提供了TaskPool和Worker两种并发能力,TaskPool和Worker都基于Actor并发模型实现。

更多基于Actor模型进行多线程并发编程的例子可参考Actor并发模型对比内存共享并发模型

问题2:Actor模型是什么,与我们目前使用的thread有什么不同

Actor模型就是每个线程拥有独立的内存空间,线程直接通过消息传递来同步数据;而涉及到消息传递就需要序列化,所以鸿蒙中的多线程最需要注意的点就是参数要保证可序列化的问题。

Java中的的thread是内存共享并发模型,每个线程都在同一块内存上操作,为了不影响数据,需要通过锁来保证同一时间只有一个线程去操作数据

Actor并发模型对比内存共享并发模型

1. TaskPool

任务池(TaskPool)作用是为应用程序提供一个多线程的运行环境,降低整体资源的消耗、提高系统的整体性能,且无需关心线程实例的生命周期。

TaskPool支持开发者在主线程封装任务抛给任务队列,系统选择合适的工作线程,进行任务的分发及执行,再将结果返回给主线程。接口直观易用,支持任务的执行、取消,以及指定优先级的能力,同时通过系统统一线程管理,结合动态调度及负载均衡算法,可以节约系统资源。系统默认会启动一个任务工作线程,当任务较多时会扩容,工作线程数量上限跟当前设备的物理核数相关,具体数量内部管理,保证最优的调度及执行效率,长时间没有任务分发时会缩容,减少工作线程数量。可以理解成一个很智能的线程池。

使用示例

在子线程中频繁写入文件为例:

1.定义并发函数,内部密集调用I/O能力,首先定义个a.ets文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// a.ets
import fs from '@ohos.file.fs';


// 定义并发函数,内部密集调用I/O能力
// 写入文件的实现
export async function write(data: string|null, filePath: string): Promise<void> {
if(data === null){
let url = "https://fp.yangcong345.com/middle//zhufeng/1.0.0/data.json"
// let savePath = getFileSaveName(url)
let savePath = '/data/storage/el2/base/haps/phone/files/preLoad/8d777f385d3dfec8815d20f7496026dc.json'
data = fs.readTextSync(savePath)
}
console.log(`taskPool write 执行时当前线程: ${process.tid},当前进程:${process.pid}`)
let file: fs.File = await fs.open(filePath, fs.OpenMode.READ_WRITE | fs.OpenMode.CREATE);
await fs.write(file.fd, data);
fs.close(file);
}
  1. 构建 @Concurrent 修饰的异步函数,taskPool必须接收@Concurrent修饰的函数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import { write } from './a'
import { BusinessError } from '@ohos.base';


@Concurrent
async function concurrentTest(fileList: string[]): Promise<boolean> {
console.log(`taskTool concurrentTest函数执行时 当前线程: ${process.tid},当前进程:${process.pid}`)
// 循环写文件操作
for (let i: number = 0; i < fileList.length * 500; i++) {
write(`Hello World!`, fileList[i % fileList.length]).then(() => {
console.info(`taskTool 写入文件成功, FileList: ${fileList[i% fileList.length]},当前线程: ${process.tid},当前进程:${process.pid}`);
}).catch((err: BusinessError) => {
console.error(`taskTool 写入文件失败, Code is ${err.code}, message is ${err.message},当前线程: ${process.tid},当前进程:${process.pid}`)
return false;
})
}
return true;
}
  1. 使用TaskPool执行包含密集I/O的并发函数:通过调用execute()方法执行任务,并在回调中进行调度结果处理。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// taskTool写入1000个文件
testTaskTool() {
console.log(`taskpool 开始调用,当前为主线程,线程id: ${process.tid},当前进程:${process.pid}`)
let filesDir = YCContextProvider.getContext().filesDir + "/testTaskPool/"
if (!fs.accessSync(filesDir)) {
fs.mkdirSync(filesDir, true)
}
let filePath1: string = filesDir + "path1.txt"; // 应用文件路径
let filePath2: string = filesDir + "path2.txt";

// 使用TaskPool执行包含密集I/O的并发函数
// 数组较大时,I/O密集型任务任务分发也会抢占主线程,需要使用多线程能力
let startTime = YCDateUtil.getCurrentTime()
console.log(`taskpool 开始调用,当前时间:${startTime}`)
taskpool.execute(concurrentTest, [filePath1, filePath2]).then(() => {
// 调度结果处理
console.log(`taskpool 调度结果处理,当前线程: ${process.tid},当前进程:${process.pid},耗时:${YCDateUtil.getCurrentTime() - startTime}`)
ToastUtil.showToast(`taskpool 调度完成,耗时:${YCDateUtil.getCurrentTime() - startTime}`)
})
ToastUtil.showToast(`测试taskTool写入弹窗toast,耗时:${YCDateUtil.getCurrentTime() - startTime}`)
}
使用TaskPool进行图像直方图处理
  1. 实现图像处理的业务逻辑。
  2. 数据分段,通过任务组发起关联任务调度。
  3. 创建TaskGroup并通过addTask()添加对应的任务,通过execute()执行任务组,并指定为高优先级,在当前任务组所有任务结束后,会将直方图处理结果同时返回。
  4. 结果数组汇总处理。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
import taskpool from '@ohos.taskpool';

@Concurrent
function imageProcessing(dataSlice: ArrayBuffer): ArrayBuffer {
// 步骤1: 具体的图像处理操作及其他耗时操作
return dataSlice;
}

function histogramStatistic(pixelBuffer: ArrayBuffer): void {
// 步骤2: 分成三段并发调度
let number: number = pixelBuffer.byteLength / 3;
let buffer1: ArrayBuffer = pixelBuffer.slice(0, number);
let buffer2: ArrayBuffer = pixelBuffer.slice(number, number * 2);
let buffer3: ArrayBuffer = pixelBuffer.slice(number * 2);

let group: taskpool.TaskGroup = new taskpool.TaskGroup();
group.addTask(imageProcessing, buffer1);
group.addTask(imageProcessing, buffer2);
group.addTask(imageProcessing, buffer3);

taskpool.execute(group, taskpool.Priority.HIGH).then((ret: Object) => {
// 步骤3: 结果数组汇总处理
})
}

@Entry
@Component
struct Index {
@State message: string = 'Hello World'

build() {
Row() {
Column() {
Text(this.message)
.fontSize(50)
.fontWeight(FontWeight.Bold)
.onClick(() => {
let buffer: ArrayBuffer = new ArrayBuffer(24);
histogramStatistic(buffer);
})
}
.width('100%')
}
.height('100%')
}
}

b.TaskPool注意事项

  • 实现任务的函数需要使用装饰器@Concurrent标注,且仅支持在.ets文件中使用。
  • 从API version 11开始,实现任务的函数需要使用类方法时,该类必须使用装饰器@Sendable标注,且仅支持在.ets文件中使用。
  • 任务函数在TaskPool工作线程的执行耗时不能超过3分钟(不包含Promise和async/await异步调用的耗时,例如网络下载、文件读写等I/O任务的耗时),否则会被强制退出。
  • 实现任务的函数入参需满足序列化支持的类型,详情请参见TaskPool和Worker支持的序列化类型
  • ArrayBuffer参数在TaskPool中默认转移,需要设置转移列表的话可通过接口setTransferList()设置。
  • 由于不同线程中上下文对象是不同的,因此TaskPool工作线程只能使用线程安全的库,例如UI相关的非线程安全库不能使用,具体请见多线程安全注意事项
  • 序列化传输的数据量大小限制为16MB。

2. Worker

Worker主要作用是为应用程序提供一个多线程的运行环境,可满足应用程序在执行过程中与主线程分离,在后台线程中运行一个脚本操作耗时操作,极大避免类似于计算密集型或高延迟的任务阻塞主线程的运行。具体接口信息及使用方法详情请见Worker

创建Worker的线程称为宿主线程(不一定是主线程,工作线程也支持创建Worker子线程),Worker自身的线程称为Worker子线程(或Actor线程、工作线程)。每个Worker子线程与宿主线程拥有独立的实例,包含基础设施、对象、代码段等,因此每个Worker启动存在一定的内存开销,需要限制Worker的子线程数量。Worker子线程和宿主线程之间的通信是基于消息传递的,Worker通过序列化机制与宿主线程之间相互通信,完成命令及数据交互。

Worker注意事项

  • 创建Worker时,有手动和自动两种创建方式,手动创建Worker线程目录及文件时,还需同步进行相关配置,详情请参考创建Worker的注意事项
  • 使用Worker能力时,构造函数中传入的Worker线程文件的路径在不同版本有不同的规则,详情请参见文件路径注意事项
  • Worker创建后需要手动管理生命周期,且最多同时运行的Worker子线程数量为8个,详情请参见生命周期注意事项
  • 由于不同线程中上下文对象是不同的,因此Worker线程只能使用线程安全的库,例如UI相关的非线程安全库不能使用,具体请见多线程安全注意事项
  • 序列化传输的数据量大小限制为16MB。
  • 使用Worker模块时,需要在主线程中注册onerror接口,否则当worker线程出现异常时会发生jscrash问题。
  • 不支持跨HAP使用Worker线程文件。

创建Worker的注意事项

Worker线程文件需要放在”{moduleName}/src/main/ets/“目录层级之下,否则不会被打包到应用中。有手动和自动两种创建Worker线程目录及文件的方式。

  • 手动创建:开发者手动创建相关目录及文件,此时需要配置build-profile.json5的相关字段信息,Worker线程文件才能确保被打包到应用中。
    • Stage模型:
1
2
3
4
5
6
7
"buildOption": {
"sourceOption": {
"workers": [
"./src/main/ets/workers/worker.ets"
]
}
}
  • 自动创建:DevEco Studio支持一键生成Worker,在对应的{moduleName}目录下任意位置,点击鼠标右键 > New > Worker,即可自动生成Worker的模板文件及配置信息,无需再手动在build-profile.json5中进行相关配置。

文件路径注意事项

当使用Worker模块具体功能时,均需先构造Worker实例对象,其构造函数与API版本相关,且构造函数需要传入Worker线程文件的路径(scriptURL)。

1
2
3
4
5
6
7
// 导入模块
import worker from '@ohos.worker';

// API 9及之后版本使用:
const worker1: worker.ThreadWorker = new worker.ThreadWorker('entry/ets/workers/MyWorker.ets');
// API 8及之前版本使用:
const worker2: worker.Worker = new worker.Worker('entry/ets/workers/MyWorker.ets');
Stage模型下的文件路径规则

构造函数中的scriptURL要求如下:

  • scriptURL的组成包含 {moduleName}/ets 和相对路径 relativePath。
  • relativePath是Worker线程文件和”{moduleName}/src/main/ets/“目录的相对路径。

1) 加载Ability中Worker线程文件场景

加载Ability中的worker线程文件,加载路径规则:{moduleName}/ets/{relativePath}。

1
2
3
4
5
6
7
import worker from '@ohos.worker';

// worker线程文件所在路径:"entry/src/main/ets/workers/worker.ets"
const workerStage1: worker.ThreadWorker = new worker.ThreadWorker('entry/ets/workers/worker.ets');

// worker线程文件所在路径:"phone/src/main/ets/ThreadFile/workers/worker.ets"
const workerStage2: worker.ThreadWorker = new worker.ThreadWorker('phone/ets/ThreadFile/workers/worker.ets');

2) 加载Library-HSP中Worker线程文件场景

加载HSP中worker线程文件,加载路径规则:{moduleName}/ets/{relativePath}。

1
2
3
4
import worker from '@ohos.worker';

// worker线程文件所在路径: "hsp/src/main/ets/workers/worker.ets"
const workerStage3: worker.ThreadWorker = new worker.ThreadWorker('hsp/ets/workers/worker.ets');

3) 加载Library-HAR中Worker线程文件场景

加载HAR中worker线程文件存在以下两种情况:

  • @标识路径加载形式:所有种类的模块加载本地HAR中的Worker线程文件,加载路径规则:@{moduleName}/ets/{relativePath}。
  • 相对路径加载形式:本地HAR加载该包内的Worker线程文件,加载路径规则:创建Worker对象所在文件与Worker线程文件的相对路径。
1
2
3
4
5
6
7
8
9
10
import worker from '@ohos.worker';

// @标识路径加载形式:
// worker线程文件所在路径: "har/src/main/ets/workers/worker.ets"
const workerStage4: worker.ThreadWorker = new worker.ThreadWorker('@har/ets/workers/worker.ets');

// 相对路径加载形式:
// worker线程文件所在路径: "har/src/main/ets/workers/worker.ets"
// 创建Worker对象的文件所在路径:"har/src/main/ets/components/mainpage/MainPage.ets"
const workerStage5: worker.ThreadWorker = new worker.ThreadWorker('../../workers/worker.ets');

生命周期注意事项

  • Worker的创建和销毁耗费性能,建议开发者合理管理已创建的Worker并重复使用。Worker空闲时也会一直运行,因此当不需要Worker时,可以调用terminate()接口或parentPort.close()方法主动销毁Worker。若Worker处于已销毁或正在销毁等非运行状态时,调用其功能接口,会抛出相应的错误。
  • Worker存在数量限制,支持最多同时存在8个Worker。当Worker数量超出限制时,会抛出“Worker initialization failure, the number of workers exceeds the maximum.”错误。

使用示例

使用Worker进行文件的写入

本文通过某地区提供的房价数据训练一个简易的房价预测模型,该模型支持通过输入房屋面积和房间数量去预测该区域的房价,模型需要长时间运行,房价预测需要使用前面的模型运行结果,因此需要使用Worker。

  1. DevEco Studio提供了Worker创建的模板,新建一个Worker线程,例如命名为“MyWorker”

可以看到在当前build-profile.json5 中已经帮我生成好了workers的注册信息

  1. 在主线程中通过调用ThreadWorker的constructor()方法创建Worker对象,当前线程为宿主线程
1
2
3
import worker from '@ohos.worker';

const workerInstance: worker.ThreadWorker = new worker.ThreadWorker('entry/ets/workers/MyWorker.ts');
  1. 在宿主线程中通过调用onmessage()方法接收Worker线程发送过来的消息,并通过调用postMessage()方法向Worker线程发送消息。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
testWorker() {
console.log(`testWorker 开始调用,当前为主线程,线程id: ${process.tid},当前进程:${process.pid}`)
const workerInstance: worker.ThreadWorker = new worker.ThreadWorker('phone/ets/workers/MyWorker.ets');
let done = false;

// 接收Worker子线程的结果
workerInstance.onmessage = ((event: MessageEvents) => {
console.log(`testWorker onmessage回调,message:${event.data.value},线程id: ${process.tid},当前进程:${process.pid}`)
})

workerInstance.onerror = ((err: ErrorEvent) => {
console.log(`testWorker 接收Worker子线程的错误信息,err:${err},线程id: ${process.tid},当前进程:${process.pid}`)
})

// Worker线程销毁后,执行onexit回调方法
workerInstance.onexit = (): void => {
console.log(`testWorker 接收Worker子线程的 onexit 销毁回调,,线程id: ${process.tid},当前进程:${process.pid}`)
}

// 向Worker子线程发送训练消息
workerInstance.postMessage({ 'type': 1,'filesDir': YCContextProvider.getContext().filesDir + "/testWorkPool/" });
}
  1. 在MyWorker.ts文件中绑定Worker对象,当前线程为Worker线程。
1
2
3
import worker, { ThreadWorkerGlobalScope, MessageEvents, ErrorEvent } from '@ohos.worker';

let workerPort: ThreadWorkerGlobalScope = worker.workerPort;
  1. 在Worker线程中通过调用onmessage()方法接收宿主线程发送的消息内容,并通过调用postMessage()方法向宿主线程发送消息。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
const workerPort: ThreadWorkerGlobalScope = worker.workerPort;

/**
* 定义当工作线程收到主线程发送的消息时要调用的事件处理程序。事件处理程序在工作线程中执行。
* @param e message data
*/
workerPort.onmessage = (e: MessageEvents) => {
console.log(`worker子线程收到 onmessage 回调,e:${e.data}, 当前线程: ${process.tid},当前进程:${process.pid}`)
if (e.data.type === 1) {
workerTest(e.data.filesDir)
}
}

/**
* 定义当工作线程收到无法反序列化的消息时要调用的事件处理程序。事件处理程序在工作线程中执行。
*
* @param e message data
*/
workerPort.onmessageerror = (e: MessageEvents) => {
console.log(`worker子线程收到 onmessageerror 回调,e:${e.data}, 当前线程: ${process.tid},当前进程:${process.pid}`)
}

/**
* 定义在工作线程执行期间发生异常时要调用的事件处理程序。事件处理程序在工作线程中执行。
*
* @param e error message
*/
workerPort.onerror = (e: ErrorEvent) => {
console.log(`worker子线程收到 onerror 回调,e:${e.message}, 当前线程: ${process.tid},当前进程:${process.pid}`)
}


function workerTest(filesDir:string) {
console.log(`worker子线程开始调用workerTest, 当前线程: ${process.tid},当前进程:${process.pid}`)
// YCContextProvider.getContext().filesDir 会报错 Cannot read property filesDir of undefined
// let filesDir = YCContextProvider.getContext().filesDir + "/testTaskPool/"
if (!fs.accessSync(filesDir)) {
fs.mkdirSync(filesDir, true)
}
let filePath1: string = filesDir + "path1.txt"; // 应用文件路径
let filePath2: string = filesDir + "path2.txt";
let fileList = [filePath1, filePath2]
// 循环写文件操作
for (let i: number = 0; i < fileList.length * 500; i++) {
write(`Hello World!`, fileList[i % fileList.length]).then(() => {
console.info(`worker子线程 写入文件成功, FileList: ${fileList[i% fileList.length]},当前线程: ${process.tid},当前进程:${process.pid}`);
}).catch((err: BusinessError) => {
console.error(`worker子线程 写入文件失败, Code is ${err.code}, message is ${err.message},当前线程: ${process.tid},当前进程:${process.pid}`)
return false;
})
}
console.log(`worker写入完成, 当前线程: ${process.tid},当前进程:${process.pid}`)
// 向主线程发送完成事件
workerPort.postMessage({ type: 'message', value: 'write 已完成' });
// 关闭worker线程
workerPort.close();
}
  1. 在Worker线程中完成任务之后,执行Worker线程销毁操作。销毁线程的方式主要有两种:根据需要可以在宿主线程中对Worker线程进行销毁;也可以在Worker线程中主动销毁Worker线程。

在宿主线程中通过调用onexit()方法定义Worker线程销毁后的处理逻辑。

1
2
3
4
// Worker线程销毁后,执行onexit回调方法
workerInstance.onexit = (): void => {
console.log(`testWorker 接收Worker子线程的 onexit 销毁回调,,线程id: ${process.tid},当前进程:${process.pid}`)
}

销毁方式一:在宿主线程中通过调用terminate()方法销毁Worker线程,并终止Worker接收消息。

1
2
// 销毁Worker线程
workerInstance.terminate();

销毁方式二:在Worker线程中通过调用close()方法主动销毁Worker线程,并终止Worker接收消息。

1
2
// 销毁线程
workerPort.close();

3. TaskPool和Worker的对比

TaskPool(任务池)和Worker的作用是为应用程序提供一个多线程的运行环境,用于处理耗时的计算任务或其他密集型任务。可以有效地避免这些任务阻塞主线程,从而最大化系统的利用率,降低整体资源消耗,并提高系统的整体性能。

TaskPool和Worker的实现特点对比

实现 TaskPool Worker
内存模型 线程间隔离,内存不共享。 线程间隔离,内存不共享。
参数传递机制 采用标准的结构化克隆算法(Structured Clone)进行序列化、反序列化,完成参数传递。支持ArrayBuffer转移和SharedArrayBuffer共享。 采用标准的结构化克隆算法(Structured Clone)进行序列化、反序列化,完成参数传递。支持ArrayBuffer转移和SharedArrayBuffer共享。
参数传递 直接传递,无需封装,默认进行transfer。 消息对象唯一参数,需要自己封装。
方法调用 直接将方法传入调用。 在Worker线程中进行消息解析并调用对应方法。
返回值 异步调用后默认返回。 主动发送消息,需在onmessage解析赋值。
生命周期 TaskPool自行管理生命周期,无需关心任务负载高低。 开发者自行管理Worker的数量及生命周期。
任务池个数上限 自动管理,无需配置。 同个进程下,最多支持同时开启8个Worker线程。
任务执行时长上限 3分钟(不包含Promise和async/await异步调用的耗时,例如网络下载、文件读写等I/O任务的耗时)。 无限制。
设置任务的优先级 支持配置任务优先级。 不支持。
执行任务的取消 支持取消已经发起的任务。 不支持。
线程复用 支持。 不支持。
任务延时执行 支持。 不支持。
设置任务依赖关系 支持。 不支持。
串行队列 支持。 不支持。
任务组 支持。 不支持。

适用场景对比

TaskPool和Worker均支持多线程并发能力。由于TaskPool的工作线程会绑定系统的调度优先级,并且支持负载均衡(自动扩缩容),而Worker需要开发者自行创建,存在创建耗时以及不支持设置调度优先级,故在性能方面使用TaskPool会优于Worker,因此大多数场景推荐使用TaskPool。

TaskPool偏向独立任务维度,该任务在线程中执行,无需关注线程的生命周期,超长任务(大于3分钟)会被系统自动回收;而Worker偏向线程的维度,支持长时间占据线程执行,需要主动管理线程生命周期。

常见的一些开发场景及适用具体说明如下:

  • 运行时间超过3分钟(不包含Promise和async/await异步调用的耗时,例如网络下载、文件读写等I/O任务的耗时)的任务。例如后台进行1小时的预测算法训练等CPU密集型任务,需要使用Worker。
  • 有关联的一系列同步任务。例如在一些需要创建、使用句柄的场景中,句柄创建每次都是不同的,该句柄需永久保存,保证使用该句柄进行操作,需要使用Worker。
  • 需要设置优先级的任务。例如图库直方图绘制场景,后台计算的直方图数据会用于前台界面的显示,影响用户体验,需要高优先级处理,需要使用TaskPool。
  • 需要频繁取消的任务。例如图库大图浏览场景,为提升体验,会同时缓存当前图片左右侧各2张图片,往一侧滑动跳到下一张图片时,要取消另一侧的一个缓存任务,需要使用TaskPool。
  • 大量或者调度点较分散的任务。例如大型应用的多个模块包含多个耗时任务,不方便使用8个Worker去做负载管理,推荐采用TaskPool。

问题3:3分钟的限制到底是什么表现,后台下载超过3分钟会被杀死么

将上面io的例子改造一下,使用定时器来模拟网络请求耗时,每间隔2s使用一次io,如果3分钟后还在运行,说明只要我们代码的io操作不超过3分钟即可,网络请求已经延时操作并不包含在里面

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@Concurrent
async function concurrentTest(fileList: string[]): Promise<boolean> {
console.log(`taskTool concurrentTest函数执行时 当前线程: ${process.tid},当前进程:${process.pid}`)
// 循环写文件操作
for (let i: number = 0; i < fileList.length * 500; i++) {
setTimeout(() => {
console.log(`taskTool setTimeout执行时 当前线程: ${process.tid},当前进程:${process.pid}`)
write(`Hello World!`, fileList[i % fileList.length]).then(() => {
console.info(`taskTool 写入文件成功, FileList: ${fileList[i% fileList.length]},当前线程: ${process.tid},当前进程:${process.pid}`);
}).catch((err: BusinessError) => {
console.error(`taskTool 写入文件失败, Code is ${err.code}, message is ${err.message},当前线程: ${process.tid},当前进程:${process.pid}`)
return false;
})
}, i * 2000);

}
return true;
}

示例发现,3分钟后的确还在打印数据

问题4:什么是可共享对象,什么是Sendable对象

Sendable可以理解成为了向 @Concurrent 异步函数中传递数据方便而构建的一个数据bean,例如我要向 @Concurrent 修饰的函数传递10个参数,然后还要调用方法,@Concurrent 是没有办法传递闭包的,所以我必须放到一个对象中进行传递,下面看一个例子:

  1. 首先构建一个 被 @Sendable 修饰的类,这个类中可以定义属性和函数,但是需要注意的是不能写闭包,因为这个类里面的数据需要被克隆到不同的内存中,所以必须是可以被序列化的
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@Sendable
export class TestWorkPoolClass {
desc: string = "我是 sendable 类的属性 ";
taskNum: number = 5;
fileDir: string = YCContextProvider.getContext().filesDir + "/sendableTest/"

printName() {
console.log(`sendable printName方法调用,desc:${this.desc},number:${this.taskNum}, 当前线程: ${process.tid},当前进程:${process.pid}`)
if (!fs.accessSync(this.fileDir)) {
fs.mkdirSync(this.fileDir, true)
}
let filePath: string = this.fileDir + "test.txt";
write(`Hello World!`, filePath).then(() => {
console.info(`sendable workPool 子线程 写入文件成功, filePath: ${filePath},当前线程: ${process.tid},当前进程:${process.pid}`);
}).catch((err: BusinessError) => {
console.error(`sendable workPool 子线程 写入文件失败, Code is ${err.code}, message is ${err.message},当前线程: ${process.tid},当前进程:${process.pid}`)
return false;
})
}

get getTaskNum(): number {
return this.taskNum;
}
}
  1. 构建 @Concurrent 异步函数,注意点事不能直接将 TestWorkPoolClass 传递,而是要包成一个array传入,官方说这个Sendable要配合setCloneList
1
2
3
4
5
6
7
8
9
@Concurrent
async function concurrentTestSendable(arr: Array<TestWorkPoolClass>): Promise<boolean> {
console.log(`taskTool concurrentTest函数执行时 当前线程: ${process.tid},当前进程:${process.pid}`)
let testWorkPoolClass = arr[0]
testWorkPoolClass.printName()
testWorkPoolClass.taskNum = 20
testWorkPoolClass.printName()
return true;
}

3.使用 setCloneList 封装传递使用

1
2
3
4
5
6
7
8
9
10
let sendableInfo = new TestWorkPoolClass()
let array1 = new Array<TestWorkPoolClass>();
array1.push(sendableInfo);
let task1 = new taskpool.Task(concurrentTestSendable, array1);
// @Sendable装饰器需搭配 setCloneList 接口
task1.setCloneList(array1);
taskpool.execute(task1).then(() => {
// 调度结果处理
console.log(`concurrentTestSendable 调度结果处理,当前线程: ${process.tid},当前进程:${process.pid}`)
})

如果我没有传递的是没有被 @Sendable 修饰的类就会报错:

最近的文章

HarmonyOS - 应用程序包

1. 应用程序包概述1.1 应用与应用程序包用户应用程序泛指运行在设备的操作系统之上,为用户提供特定服务的程序,简称“应用”。一个应用所对应的软件包文件,称为“应用程序包”。 当前系统提供了应用程序包开发、安装、查询、更新、卸载的管理机制,便于开发者开发和管理应用。同时,系统还屏蔽了不同的芯片平 …

, , 开始阅读
更早的文章

HarmonyOS - 鸿蒙开发指南

1. 概述1.1 简介鸿蒙(即 HarmonyOS ,开发代号 Ark,正式名称为华为终端鸿蒙智能设备操作系统软件)是华为公司自 2012 年以来开发的一款可支持鸿蒙原生应用和兼容 AOSP 应用的分布式操作系统。该系统利用“分布式”技术将手机、电脑、平板、电视、汽车和智能穿戴等多款设备融合成一 …

, , 开始阅读
comments powered by Disqus