Multi-tasks With Process & Thread

现代操作系统都是「多任务」的,也就是操作系统可以「并发」处理多个任务,比如可以在浏览页面的时候同时播放音乐,但是一般来说我们的 PC 一般都只有一个物理 CPU,那么是如何做到只有一个 CPU 的情况下并发处理多个任务的呢,我们简单探究一下。

前置知识

我们先简单熟悉一下需要理解的 CPU 硬件相关的术语

1.Sockets(physical CPU): 物理 CPU,指我们主板上实际插入的 CPU,一般来说 PC 只有一个,服务器可能会有多个
2.Cores: CPU 物理核心,CPU 商品上宣传的一共几核指代的就是这个
3.Logical Processors: 逻辑处理器,如果采用超线程(多线程)技术的话,会比物理核心数多

总的来说: Logical Processors = Sockets _ Cores _ SMT(HT) Multiple
逻辑处理器数量也就代表了操作系统认为能「并行」执行的任务的最高数量

并发 VS 并行

我们对「并发」和「并行」先下个定义,「并发」指的是系统允许多个任务同时存在,「并行」则指的是系统支持多个任务同时执行,「并发」和「并行」的关键区别在于是否能同时执行。在只有单一逻辑处理器的情况下,我们的操作系统只能「并发」执行任务,比如早期的单核 CPU 电脑,但是我们仍然可以边听歌边浏览网页,这是因为 CPU 速度足够快,可以在系统的使用过程中快速切换任务,这样我们就感觉到多个任务同时存在在单一逻辑处理器的情况下,虽然我们可以「并发」执行任务,实际上我们同时也只能执行一个任务,对于 IO 密集类型的任务,我们用到 CPU 的时间不多,决定任务快慢的往往是硬盘以及网络等硬件,「并发」执行也未尝不可,但是对于计算密集型的任务,我们需要占用更多的 CPU 时间,如果「并发」执行,则往往会造成任务的卡顿(响应时间过长),因此我们需要「并行」的执行该任务,而逻辑处理器的数量代表了能「并行」执行任务的最高数量,这也是为什么现在的处理器大多是多核处理器的原因所在。

进程 VS 线程

我们使用的一个个程序可以称为「进程」(process),而 process 下可以开辟多个「线程」(thread),这里引用一下 Microsoft 官方对于进程和线程的解释About Processes and Threads:

Each process provides the resources needed to execute a program. A process has a virtual address space, executable code, open handles to system objects, a security context, a unique process identifier, environment variables, a priority class, minimum and maximum working set sizes, and at least one thread of execution. Each process is started with a single thread, often called the primary thread, but can create additional threads from any of its threads.

A thread is the entity within a process that can be scheduled for execution. All threads of a process share its virtual address space and system resources. In addition, each thread maintains exception handlers, a scheduling priority, thread local storage, a unique thread identifier, and a set of structures the system will use to save the thread context until it is scheduled. The thread context includes the thread’s set of machine registers, the kernel stack, a thread environment block, and a user stack in the address space of the thread’s process. Threads can also have their own security context, which can be used for impersonating clients.

在操作系统层面,process 相互独立,拥有一块独立的虚拟地址空间(内存中),而同一 process 下的 thread 共享该虚拟地址空间,这也是 process 和 thread 最典型,最根本的区别

多进程 VS 多线程

假如我们现在要开发一款浏览器,浏览器的基础功能包括 HTTP 请求,GUI 渲染等功能,如果我们采用单线程来开发,那么势必会遇到一个问题: 当需要网络请求的时候,我们的浏览器就会卡住,所有的用户操作如输入等都没有响应,等网络请求完成,我们才可以进行后续操作,非常影响用户体验,这也是为什么像浏览器这样的程序大多都是多线程的原因,我们需要任务同时进行。但是我们前面讲到的多进程也可以多任务同时进行,那么问题就来了,当我们需要实现多任务的时候,多进程和多线程该如何选择呢?

多进程

前面我们提到过,进程之间是相互独立的,每个进程有独立的虚址空间,那么当一个进程因为某些原因崩掉了,其他的进程也不会受到影响(主进程挂掉除外,但是主进程一般只负责调度,挂掉的几率较小),所以当我们需要较高的稳定性时,可以考虑多进程。但是创建进程的开销是比较大的,因此要考虑资源问题

多线程

多线程可以共享虚址空间,而且创建一个线程的开销较小,这样我们就可以减少资源的占用。但是正是因为线程之间可以共享虚址空间,当一个线程挂掉了,整个进程会随之挂掉,所以多线程的稳定性相比多进程较差

Node.js 中的多线程与多进程

child_process & cluster

Node.js 提供了多种方法来创建多进程,例如 child_process 提供的child_process.spawn()child_process.fork(),那么什么是 spawn:

Spawn in computing refers to a function that loads and executes a new child process. The current process may wait for the child to terminate or may continue to execute concurrent computing.

所以child_process.spawn的作用是创建了一个子进程,然后在子进程执行一些命令,但是child_process.spawn()有一个缺点,就是不能进行进程间通信(IPC: Inter Process Communication),那么当需要进程间通信的时候,就需要使用child_process.fork()

涉及到现实中多进程的运用,我们往往不会只起一个子进程,当我们需要进程间共享一个端口时,这时候就可以使用 Node.js 提供的cluster,cluster创建子进程内部也是通过child_process.fork()实现的,支持 IPC

structured clone

当我们创建了一个子进程的时候,进程间的通信 Node.js 已经帮我们封装好了,使用worker.send(message)process.on('message', handle)就可以实现进程间的通信,以cluster为例:

1
2
3
4
5
6
7
8
if (cluster.isPrimary) {
const worker = cluster.fork();
worker.send('hi there');
} else if (cluster.isWorker) {
process.on('message', msg => {
process.send(msg);
});
}

但是注意我们发送的 message 会被structured clone一份,然后传递给其他进程,因此我们需要注意我们如果传递了一个 Object 过去,Object 中定义的 Function 及其 prototype 等内容都不会被 clone 过去。这里发散一下,如果我们需要深拷贝一个对象,而且该对象满足 Structured clone 的相关算法要求,那么我们可以考虑使用structuredClonecaniuse)或者直接创建一个 worker 来拷贝(当然不推荐)

worker_threads

上述我们讲到进程间的资源是独立的,当我们想共享数据的时候,我们需要 structured clone 对应的数据然后传递过去,这在共享数据量较小的时候还可以接受,但是当数据量较多时,克隆数据是一个比较大的开销,这是我们所不能接受的,因此我们需要多线程来共享内存(数据),Node.js 中也提供了相应的方法worker_threads

多线程在 ko 中的实践

ko 中提供了 concurrency 模式,该模式下使用多线程执行 eslint,prettier 或 stylelint,这里简单介绍一下如何实现:

获取需要 lint 的所有文件

这里使用的是fast-glob,主要代码如下所示factory/runner.ts:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
import fg, { Pattern } from 'fast-glob';

protected async getEntries(
patterns: Pattern[],
ignoreFiles: string[]
): Promise<string[]> {
return fg(patterns, {
dot: true,
ignore: this.getIgnorePatterns(...ignoreFiles),
});
}

private getIgnorePatterns(...ignoreFiles: string[]) {
return ['.gitignore', ...ignoreFiles]
.map(fileName => {
const filePath = join(this.cwd, fileName);
if (existsSync(filePath)) {
return readFileSync(filePath, 'utf-8')
.split('\n')
.filter(str => str && !str.startsWith('#'));
}
return [];
})
.reduce((acc, current) => {
current.forEach(p => {
if (!acc.includes(p)) {
acc.push(p);
}
});
return acc;
}, []);
}

返回的是需要 lint 的所有文件路径

lint 相关的 Parser

我们以 eslint 为例eslint/parser.ts:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
import { eslint } from 'ko-lint-config';
import LintParserFactory from '../factory/parser';
import { IParserOpts } from '../interfaces';

class ESLintParser extends LintParserFactory {
static readonly EXTENSIONS = ['ts', 'tsx', 'js', 'jsx'];
private eslintInstance: eslint.ESLint;
private opts: IParserOpts;
private config: Record<string, any>;

constructor(opts: IParserOpts) {
super();
this.opts = opts;
this.generateConfig();
this.initInstance();
}

private initInstance() {
const { write } = this.opts;
this.eslintInstance = new eslint.ESLint({
fix: write,
overrideConfig: this.config,
useEslintrc: false,
extensions: ESLintParser.EXTENSIONS,
});
}

public async format(file: string): Promise<string> {
const formatter = await this.eslintInstance.loadFormatter();
let resultText = '';
try {
const result = await this.eslintInstance.lintFiles(file);
if (result[0].errorCount) {
resultText = formatter.format(result) as string;
}
return resultText;
} catch (ex) {
console.log(ex);
process.exit(1);
}
}

public generateConfig() {
if (this.opts.configPath) {
this.config = this.getConfigFromFile(this.opts.configPath);
} else {
const localConfigPath = this.detectLocalRunnerConfig(this.opts.name);
if (localConfigPath) {
this.config = this.getConfigFromFile(localConfigPath);
}
}
}
}

export default ESLintParser;

所有的 parser 实现了 format()方法,作用是输入一个文件的路径,然后进行 lint,如果有相关的错误则返回错误结果

Thread Pool

创建一个线程的是有开销的,虽然相比创建进程而言消耗的较小,但是我们也并不能无休止创建线程。线程是需要调度的,如果我们创建了很多线程,那么系统花在线程调度的时间往往会更长,导致的结果是我们开了多个线程,但是执行程序的耗时反而更长了。为了更好的使用线程,我们引入线程池的概念WikiPedia:

In computer programming, a thread pool is a software design pattern for achieving concurrency of execution in a computer program. Often also called a replicated workers or worker-crew model, a thread pool maintains multiple threads waiting for tasks to be allocated for concurrent execution by the supervising program

还是 WikiPedia 的示例图:
thread pool

简单来说,线程池创建了一定数量的线程,每个线程从任务队列中获取任务并执行,然后继续执行下一个任务直到结束。ko 中也实现了相关的线程池threads/Pool.ts

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
import { join } from 'path';
import { Worker } from 'worker_threads';
import { IThreadOpts, IParserOpts } from '../interfaces';

class ThreadPool {
private readonly workers: Worker[] = [];
private readonly workerPList: Promise<boolean>[] = [];
private readonly opts: IThreadOpts;
private queue: string[];
private stdout: string[] = [];

constructor(opts: IThreadOpts) {
console.log('Using Multithreading...');
this.opts = opts;
this.queue = this.opts.entries;
this.format();
}

format() {
const { concurrentNumber, configPath, write, name } = this.opts;
if (this.workers.length < concurrentNumber) {
this.workerPList.push(
this.createWorker({
configPath,
write,
name,
})
);
this.format();
}
}

createWorker(opts: IParserOpts): Promise<boolean> {
const worker = new Worker(join(__dirname, './Worker.js'), {
workerData: {
opts,
},
});
return new Promise(resolve => {
worker.postMessage(this.queue.shift());
worker.on('message', (result: string) => {
this.stdout.push(result);
if (this.queue.length === 0) {
resolve(true);
} else {
const next = this.queue.shift();
worker.postMessage(next);
}
});
worker.on('error', err => {
console.log(err);
process.exit(1);
});
this.workers.push(worker);
});
}

async exec(): Promise<string[]> {
return Promise.all(this.workerPList).then(() => {
return this.stdout;
});
}
}

export default ThreadPool;

这里的workers维护了多个 worker,相当于线程池的概念,而任务队列对应的则是queue,也就是传入的需要 lint 的所有文件,当一个 worker 执行完一个文件的 lint 之后,从queue中拿一个新的文件继续执行新的 lint 任务,当queue为空时,我们结束任务并返回最终结果

需要注意的一点是关于concurrentNumber也就是我们启动的线程数量,这里我们默认是Logical Processors的数量

结果

那么我们来对比一下多线程和普通情况下的性能,以执行 eslint 为例:

硬件信息:

  • CPU: Apple M1
  • Memory: 8 GB LPDDR4

普通模式下的 log 为:

1
2
exec cmd: pnpm exec ko eslint '**/*.{ts,tsx,js,jsx}' --write
exec eslint with 704 files cost 31.71s

多线程模式下的 log 为:

1
2
3
exec cmd: pnpm exec ko eslint '**/*.{ts,tsx,js,jsx}' --write --concurrency
Using Multithreading...
exec eslint with 704 files cost 23.60s

可以看到性能有一定程度的提升,但是并没有我们想象中的性能提升多倍,这是为什么呢?我们简单分析一下:

  1. 线程启动及其调度消耗了一定的时间
  2. 线程内部涉及到了 IO 操作,而不是单纯的运算

但是可以肯定的是,随着需要 lint 的文件数量增多,两个模式下所用的时间差会增大

线程安全

在 ko 中, 我们针对 lint 进行了多线程的操作,性能上有了一定程度的提升,但是我们线程间总的来说是相互独立的,没有使用到共享内存的情况。那么当我们需要共享内存时,会遇到一个问题,我们启用了多个线程,线程之间针对共享内存可能存在竞争关系,也就是可能会同时操作共享内存中的数据,这个时候我们就不能保证数据的准确性,专业术语描述为不是线程安全的。遇到这种情况,我们一般会涉及到一个专业术语(Lock)

我们回到work_threads,看一下官方文档中是如何共享内存的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
const { MessageChannel } = require('worker_threads');
const { port1, port2 } = new MessageChannel();

port1.on('message', message => console.log(message));

const uint8Array = new Uint8Array([1, 2, 3, 4]);
// This posts a copy of `uint8Array`:
port2.postMessage(uint8Array);
// This does not copy data, but renders `uint8Array` unusable:
port2.postMessage(uint8Array, [uint8Array.buffer]);

// The memory for the `sharedUint8Array` is accessible from both the
// original and the copy received by `.on('message')`:
const sharedUint8Array = new Uint8Array(new SharedArrayBuffer(4));
port2.postMessage(sharedUint8Array);

// This transfers a freshly created message port to the receiver.
// This can be used, for example, to create communication channels between
// multiple `Worker` threads that are children of the same parent thread.
const otherChannel = new MessageChannel();
port2.postMessage({ port: otherChannel.port1 }, [otherChannel.port1]);

注意一点,如果我们想共享内存,我们可以传递ArrayBuffer或者SharedArrayBuffer,那么这两种类型的数据有什么特殊性呢?

答案是ArrayBufferSharedArrayBuffer支持Atomics一起使用,可以实现 Lock 相关的概念

Comments