二十七、工作者线程(一)

时间:2022-03-09 01:24:04

工作者线程允许把主线程的工作转嫁给独立的实体,而不会改变现有的单线程模型。
二十七、工作者线程(一)

1.工作者线程简介

使用工作者线程,浏览器可以在原始页面环境之外再分配一个完全独立的二级子环境。这个子环境不能与依赖单线程交互的 API(如 DOM)互操作,但可以与父环境并行执行代码。

1.1工作者线程和线程

  • 工作者线程以实际线程实现。
  • 工作者线程可以并行执行。
  • 工作者线程可以共享某些内存,其依据便是SharedArrayBuffer,线程使用实现并发控制,javascript使用Atomics API实现并发控制。
  • 工作者线程不会共享全部的内存。传统线程有能力读写共享内存空间;除了SharedArrayBuffer外,从工作者线程进出的数据都需要复制或者转移。
  • 工作者线程不一定都在同一个进程里。如Chrome的Blink引擎对共享工作者线程服务工作者线程使用独立的进程。
  • 创建工作者线程开销更大。工作者线程具有自己独立的事件循环、全局对象、事件处理程序和其他js环境必须的特性。

1.2工作者线程的类型

专用工作者线程共享工作者线程服务工作者线程

1.2.1专用工作者线程 webWorker

可以让脚本单独创建一个(只能被创建页面所使用的)javascript线程。

1.2.2 共享工作者线程

任何与创建共享工作者线程的脚本同源的脚本,都可以向共享工作者线程发送消息或从中接收消息

1.2.3 服务工作者线程

它的主要用途是拦截、重定向和修改页面发出的请求,充当网络请求的仲裁者的角色

1.3 WorkerGlobalScope

在工作者线程内部的全局对象就是WorkerGlobalScope的实例(类似于在网页撒上window对象作为全局对象),通过self关键字暴露出来。

属性 说明
navigator 与工作者线程关联的WorkerNavigator
self 工作者线程的全局对象,WorkerGlobalScope实例
location 与工作者线程关联的WorkerLocation
performance 返回只包含特定属性和方法的Performance对象
console 返回与工作者线程相关的Console对象,对api无限制
caches 返回与工作者线程相关的CacheStorage对象,对api无限制
indexedDB 返回IDBFactory对象
isSecureContext 返回boolean值,表示工作者上下文是否安全
origin 返回WorkerGlobalScope的源

此外self上还有一些方法,与window上对象的方法操作一样。

  • atob()
  • btoa()
  • clearInterval()
  • clearTimeout()
  • createImageBitmap()
  • fetch()
  • setInterval()
  • setTimeout()
WorkerGlobalScope子类

并不是所有的地方都实现了WorkerGlobalScope,每种类型的工作者线程都使用自己特定的全局对象(继承自WorkerGlobalScope)

  • 专用工作者线程使用DedicatedWorkerGlobalScope;
  • 共享工作者线程使用SharedWorkerGlobalScope;
  • 服务工作者线程使用ServiceWorkerGlobalScope;

2.专用工作者线程

专用工作者线程是最简单的 Web 工作者线程,网页中的脚本可以创建专用工作者线程来执行在页面线程之外的其他任务。这样的线程可以与父页面交换信息、发送网络请求、执行文件输入/输出、进行密集计算、处理大量数据,以及实现其他不适合在页面执行线程里做的任务(否则会导致页面响应迟钝)。

2.1专用工作者线程的基本概念

2.1.1创建专用工作者线程
/*** emptyWorker.js ***/
// 空的js文件

/*** main.js *****/
console.log(location.href);
const worker = new Worker(location.href + 'emptyWorker.js');
console.log(worker);
//亦可以使用相对路径创建worker,但需要两个文件在同一个目录下
const worker1 = new Worker('./emptyWorker.js');
console.log(worker1);

在这个案例中,emptyWorker.js的加载是在后台进行的,工作者线程的初始化完全独立于main.js,工作者线程本身存在于一个独立的js环境中,在main.js中必须以worker对象作为代理实现与工作者线程的通信。

2.1.2 工作者线程安全限制
  • 工作者线程的脚本文件只能从与父页面同源的地方加载;
  • 不能使用非同源的脚本创建工作者线程,但不影响执行非同源的脚本,importScripts();
2.1.3使用Worker对象

要管理好使用worker()创建的每个Worker对象。在终止工作者线程之前,它不会被垃圾回收机制回收,也不能通过编程方式恢复之前Worker对象的引用。
Worker对象支持的事件处理程序:

事件 说明
onerror 工作者线程中发生ErrorEvent类型的错误事件时会调用
  • 会在工作者线程抛出错误时发生
  • 也可以通过worder.addEventListener(‘error’, hanlder)方式绑定
onmessage 工作者线程中发生MessageEvent类型的消息事件时发生
  • 该事件会在工作者线程向父级上下文发送消息时发生
  • worker.addEventListener(‘message’, handler)
onmessageerror 工作者线程中发生MessageEvent类型的错误时发生
  • 工作者线程收到无法序列化的消息时也会发生
  • worker.addEventListener(‘messageerror’, handler)

Worker对象支持的方法
postMessage():通过异步消息事件想工作者线程发送信息
terminate():立即终止工作者线程,没有为工作者线程提供情理的机会。

2.1.4DedicatedWorkerGlobalScope
/**** globalScopeWorker.js ****/
console.log('inside worker:', self);

/*** main.js**/
const worker = new Worker('./globalScopeWorker.js');
console.log('created worker:', worker);
// created worker: Worker {}
// inside worker: DedicatedWorkerGlobalScope {} 

DedicatedWorkerGlobalScope新增属性和方法:

属性或者方法 说明
name 可以提供给Worker构造函数的一个字符串标识符
postMessage() 用于工作者线程内部向父级上下文发送消息
close() 对应terminate(), 立即终止工作者线程
importScripts() 用于向工作者内部导入任意数量的脚本

2.2 专用工作者线程与隐式MessagePorts

专用工作者线程的 Worker 对象DedicatedWorkerGlobalScope MessagePorts有一些相同接口处理程序和方法:onmessage、onmessageerror、close()和 postMessage()。这不是偶然的,因为专用工作者线程隐式使用了 MessagePorts 在两个上下文之间通信

2.3专用工作者线程的生命周期

一般来说,专用工作者线程可以非正式区分为处于下列三个状态:初始化(initializing)活动(active)终止(terminated)。这几个状态对其他上下文是不可见的。
虽然父级上下文可以立即使用worker对象,但与之相关联的工作者线程可能还没有创建,因为存在请求脚本的网络延迟初始化延迟

/*
	初始化时,虽然工作者线程脚本尚未执行,但可以先把要发送给工作者线程的消息加入队列。这些消息会等待工作者线程的状态变为活动,再把消息添加到它的消息队列。
*/
/****initializingWorker.js**/
self.addEventListener('message', ({data}) => console.log(data));
/****main.js**/
const worker = new Worker('./initializingWorker.js');
// Worker 可能仍处于初始化状态
// 但 postMessage()数据可以正常处理
worker.postMessage('foo');
worker.postMessage('bar');
worker.postMessage('baz');
// foo
// bar
// baz 

/*
	close()在这里会通知工作者线程取消事件循环中的所有任务,并阻止继续添加新任务。
	工作者线程不需要执行同步停止
*/
/*** closeWorker.js ***/
self.postMessage('foo');
self.close();
self.postMessage('bar');
setTimeout(() => self.postMessage('baz'), 0);
/*** main.js***/
const worker = new Worker('./closeWorker.js');
worker.onmessage = ({data}) => console.log(data);
// foo
// bar 

/*
一旦调用了 terminate(),工作者线程的消息队列就会被清理并锁住,
*/
/***terminateWorker.js***/
self.onmessage = ({data}) => console.log(data);
/**main.js**/
const worker = new Worker('./terminateWorker.js');
// 给 1000 毫秒让工作者线程初始化
setTimeout(() => {
 worker.postMessage('foo');
 worker.terminate();
 worker.postMessage('bar');
 setTimeout(() => worker.postMessage('baz'), 0);
}, 1000);
// foo 

2.4配置Worker选项

Worker()的第二个参数作为配置项,是可选的。

配置项 说明
name 工作者线程中self.name可以访问到的字符串标识
type 加载脚本的方式,可选值有两个
classic 常规脚本
module 模块脚本
credentials type为module时,指定如何获取与传输凭证数据相关的工作者线程模块脚本。值可以是omit,same-origin, include.与fetch()的配置相同

2.5在javascript行内创建工作者线程

专用工作者线程也可以通过 Blob 对象URL 在行内脚本创建。这样可以更快速地初始化工作者线程,因为没有网络延迟。

const workerScript = `
self.onmessage = ({data}) => console.log(data);
`;
const workerScriptBlob = new Blob([workerScript]);
const workerScriptBlobUrl = URL.createObjectURL(workerScriptBlob);
const worker = new Worker(workerScriptBlobUrl);
worker.postMessage('blob worker script');

工作者线程也可以利用函数序列化来初始化行内脚本。这是因为函数的 toString()方法返回函数代码的字符串,而函数可以在父上下文中定义但在子上下文中执行。

function fibonacci(n){
	return n < 1 ? 0 : ( n <= 2 ? 1 : fibonacci(n - 1) + fibonacci(n - 2));
}
cosnt workerScript = `
	self.postMessage((${fibonacci.toString()})(9));
`
const worker = new Worker(URL.createObjectURL(new Blob([workerScript])));
worker.onmessage = ({data}) => console.log(data);

像这样序列化函数有个前提,就是函数体内不能使用通过闭包获得的引用,也包括全局变量,比如 window,因为这些引用在工作者线程中执行时会出错

2.6在工作者线程中动态执行脚本

  • 工作者线程中的脚本并非铁板一块,而是可以使用 importScripts()方法通过编程方式加载和执行任意脚本。该方法可用于全局 Worker 对象。这个方法会加载脚本并按照加载顺序同步执行。
  • importScripts()方法可以接收任意数量的脚本作为参数。浏览器下载它们的顺序没有限制,但执行则会严格按照它们在参数列表的顺序进行。因
  • 脚本加载受到常规 CORS 的限制,但在工作者线程内部可以请求来自任何源的脚本。

2.7委托任务到子工作者线程

除了路径解析不同,创建子工作者线程与创建普通工作者线程是一样的。子工作者线程的脚本路径根据父工作者线程而不是相对于网页来解析。

2.8处理工作者线程错误

如果工作者线程脚本抛出了错误,该工作者线程沙盒可以阻止它打断父线程的执行,但是该错误不能try/catch捕获,只能有worker.onerror监听

2.9与专用工作者线程通信

与工作者线程通信都是通过异步消息完成的。

  • postMessage()传递序列化的消息
  • 使用MessageChannel
    MessageChannel 实例有两个端口,分别代表两个通信端点。要让父页面和工作线程通过MessageChannel 通信,需要把一个端口传到工作者线程中.
/**** worker.js******/
let messagePort = null;
function factorial(n){
	let result = 1;
	while(n){
		result *= n--;
	}
	return result;
}
self.onmessage = ({ports}) => {
	// 只设置一次端口
	if (!messagePort){
		// 初始化消息发送端口,给变量赋值并重置监听器
		messagePort = ports[0];
		self.onmessage = null;
		// 在全局对象上设置消息处理程序
		messagePort.onmessage = ({data}) => {
			messagePort.postMessage(`${data}! = ${factorial(data)}`);
		}
	}
}
/****main.js*********/
const channel = new MessageChannel();
const factorialWorker = new Worker('./worker.js');
//把MessagePort对象发送给工作者线程,工作者线程初始化信道
factorialWorker.postMessage(null, [channel.port1]);
channel.port2.onmessage = ({data}) => console.log(data);
channel.port2.postMessage(5);

MessageChannel 真正有用的地方是让两个工作者线程之间直接通信。这可以通过把端口传给另一个工作者线程实现。

  • 使用BroadCastChannel
/*************main.js*****************/
const channel = new BroadcastChannel('worker_channel');
const worker = new Worker('./worker.js');
channel.onmessage = ({data}) => console.log(`heard ${data} onpage`);
setTimeout(() => channel.postMessage('foo'), 1000);

/******** worker.js*****************/
const channel = new BroadcastChannel('worker_channel');
channel.onmessage = ({data}) => {
	console.log(`{heard ${data} in worker`);
	channel.postMessage('bar');
}

2.10工作者线程数据传输

在支持传统多线程模型的语言中,可以使用锁、互斥量,以及volatile 变量。
在 JavaScript 中,有三种在上下文间转移信息的方式:结构化克隆算法(structured clonealgorithm)、可转移对象(transferable objects)和共享数组缓冲区(shared array buffers)

结构化克隆算法

该算法由浏览器后台实现,不可直接调用。在使用postMessage()传递对象时,浏览器会在目标上下文生成该对象的一个副本。
支持的类型有 除了Symbol外所有原始类型,Boolean对象String对象, BDate,RegExp, Blob, File, FileList, ArrayBuffer,ArrayBufferView, ImageData, Array, Object, Map, Set
注意事项

  • 复制之后,源上下文对该对象的修改,不会传播到目标上下文中的副本;
  • 克隆Error/Function/DOM节点会报错
  • 可以识别对象中包含的循环引用,不会无穷遍历
  • 并不总是创建完全一致的副本
  • 原型链不会被复制
  • RegExp.prototype.lastIndex属性不会被复制
  • 对象属性描述符、获取方法、设置方法不会被复制,必要时使用默认值
可转移对象

可以把所有权从一个上下文转移到另一个上下文
ArrayBuffer
MessagePort
ImageBitmap
OffscreenCanvas
postMessage()方法的第二个可选参数是数组,它指定应该将哪些对象转移到目标上下文。

SharedArrayBuffer

既不克隆、也不复制。两个不同的 JavaScript 上下文会分别维护对同一个内存块的引用。
为解决资源争抢问题,可以使用 Atomics 对象让一个工作者线程获得SharedArrayBuffer 实例的锁,在执行完全部读/写/读操作后,再允许另一个工作者线程执行操作。

2.11 线程池

因为启用工作者线程代价很大,所以某些情况下可以考虑始终保持固定数量的线程活动,需要时就把任务分派给它们。工作者线程在执行计算时,会被标记为忙碌状态。直到它通知线程池自己空闲了,才准备好接收新任务。

首先是定义一个 TaskWorker 类,它可以扩展 Worker 类。TaskWorker 类负责两件事:跟踪线程是否正忙于工作,并管理进出线程的信息与事件。另外,传入给这个工作者线程的任务会封装到一个期约中,然后正确地解决和拒绝。

class TaskWorker extends Worker{
	constructor(notifyAvailable, ...workerArgs){
		super(...workerArgs);
		// 初始状态为不可用状态
		this.available = false;
		this.resolve = null;
		this.rejct = null;
		// 线程池传递回调,以便工作者线程发出它需要新任务的信号
		this.notifyAvailable = notifyAvailable;
		// 线程脚本在完成初始化后,会发送一条ready的消息
		this.onmessage = () => this.setAvailable();
	}
	// 由线程池调用,用以分派新任务
	dispatch({resolve, reject, posteMassageArgs}){
		this.available = false;
		this.onmessage = ({data}) => {
			resolve(data);
			this.setAvailable();
		}
		this.onerror = (e) => {
			reject(e);
			this.setAvailble();
		}
		this.postMessage(postMessageArgs);
	}
	setAvailable(){
		this.available = true;
		this.resolve = null;
		this.reject = null;
		this.notifyAvailable();
	}
}

然后是定义使用 TaskWorker 类的 WorkerPool 类。它还必须维护尚未分派给工作者线程的任务队列。两个事件可以表明应该分派一个新任务:新任务被添加到队列中,或者工作者线程完成了一个任务,应该再发送另一个任务。

class WorkerPool{
	constructor(poolsize, ...workerArgs){
		this.taskQueue = [];
		this.workers = [];
		// 初始化线程池
		for(let i = 0; i < poolsize;i++){
			this.workers.push(
				new TaskWorker(() => this.dispatchIfAvailable(), ...workerArgs);
			)
		}
	}
	// 任务入队
	enqueue(...postMessageArgs){
		return new Promise((resolve, reject) => {
			this.taskQueue.push({resolve, reject, postMessageArgs});
			this.dispatchIfAvailable();
		}
	}
	// 把任务发送给下一个空闲的线程
	dispatchIfAvailable(){
		if (!this.taskQueue.length){
			return
		}
		for(const worker fo this.workers){
			if (worker.available){
				let a = this.taskQueue.shift();
				worker.dispatch(a);
				break;
			}
		}
	}
	// 终止所有工作者线程
	close(){
		for(const worker of this.workers){
			worker.terminate();
		}
	}
}

使用的demo,假设计算100万个浮点数之和

/************worker.js******************/
self.onmessage = ({data}) => {
	let sum = 0;
	let view = new Float32Array(data.arrayBuffer);
	for(let i = 0; i < data.startIndex, i < data.endIndex; i++){
		sum += view[i];
	}
	self.postMessage(sum);
};
// 发送消息给TaskWorker,告知自己准备好接受任务了
self.postMessage('ready');

/************main.js***************/
// 主线程可以通过线程池分派任务
//class TaskWorker{}
//class WorkerPool{}
const totalFloats = 1E8;
const numTasks = 20;
const floatsPerTask = totalFloats / numTasks;
const numWorkers = 4;
// 创建线程池
const pool = new WorkerPool(numWorkers, './worker.js');
// 填充浮点数组
let arrayBuffer = new SharedArrayBuffer(4 * totalFloats);
let view = new Float32Array(arrayBuffer);
for(let i = 0; i < totalFloats; i++){
	view[i] = Math.random();
}
let partialSumPromises = [];
for(let i =0; i < totalFloats; i+=floatsPerTask){
	partialSumPromises.push(
		pool.enqueue({
			startIndex: i,
			endIndex: i + floatsPerTask,
			arrayBuffer: arrayBuffer,
		})
	)
}
Promise.all(partialSumPromises).then(partialSums => partialSums.reduce((prev, cur) => prev + cur)).then(finalSum => console.log(finalSum));

3.共享工作者线程

3.1共享工作者线程简介

const worker = new SharedWorker(url); // 创建一个共享工作者线程

虽然 Worker()构造函数始终会创建新实例,而 SharedWorker()则只会在相同的标识不存在的情况下才创建新实例。如果的确存在与标识匹配的共享工作者线程,则只会与已有共享者线程建立新的连接
共享工作者线程标识源自解析后的脚本 URL工作者线程名称文档源

使用SharedWorker

SharedWorker 对象被用作与新创建的共享工作者线程通信的连接点。它可以用来通过MessagePort在共享工作者线程和父上下文间传递信息,也可以用来捕获共享线程中发出的错误事件。

  • onerror
  • port 专门用来跟共享线程通信的MessagePort
SharedWorkerGlobalScope

继承自WorkerGlobalScope,

属性或者方法 说明
name 标识符, self.name
importScripts() 导入脚本
close() 与worker.terminate()对象,用于立即停止工作者线程,没有给工作者线程提供终止前情理的机会,脚本会突然停止
onconnect 建立连接时的处理程序,
  • worker.port.onmessage/worker.port.start()
  • worker.addEventListener(‘connect’, handler)

3.2共享工作者线程的生命周期

  • 共享工作者线程只要还有一个上下文连接就会持续存在。
  • 没有办法以编程方式终止共享线程。SharedWorker 对象上没有terminate()方法。在共享线程端口上调用 close()时,只要还有一个端口连接到该线程就不会真的终止线程

3.3连接到共享工作者线程

每次调用 SharedWorker()构造函数,无论是否创建了工作者线程,都会在共享线程内部触发connect 事件
发生 connect 事件时,SharedWorker()构造函数会隐式创建 MessageChannel 实例,并把MessagePort 实例的所有权唯一地转移给该 SharedWorker 的实例。这个 MessagePort 实例会保存在 connect 事件对象 ports 数组中。一个连接事件只能代表一个连接,因此可以假定 ports 数组的长度等于 1。

/****sharedWorker.js***/
const connectedPorts = new Set();
self.onconnect = ({ports}) => {
 connectedPorts.add(ports[0]);
 console.log(`${connectedPorts.size} unique connected ports`);
};

/*****main.js**/
for (let i = 0; i < 5; ++i) {
 new SharedWorker('./sharedWorker.js');
}
// 1 unique connected ports
// 2 unique connected ports
// 3 unique connected ports
// 4 unique connected ports
// 5 unique connected ports 

每个新 SharedWorker 连接都会触发一个事件,但没有事件对应断开 SharedWorker 实例的连接。一个解决方案是在 beforeunload 事件即将销毁页面时,明确发送卸载消息,让共享线程有机会清除死端口