Nodejs 多进程多线程和线程通信以及应用和原理

时间:2024-03-03 20:14:04

Nodejs以事件驱动、非阻塞式I/O的模型,擅长IO密集型操作。

早期版本提供了child_processcluster(V0.6.0)来提供多进程的支持。
v10版本实验性的引入worker_threads,Nodejs具有多线程的支持,终于在v12.11.0正式稳定

下面讲解Nodejs多进程,多线程应用,后续在补充进程,线程实现方式

child_process 进程

const spawn = require(\'child_process\').spawn
const ls = spawn(\'ls\',[\'-lh\',\'/user\']) // ls -lh /user
ls.stdout.on(\'data\',data => {
    console.log(`stdout: ${data}`)
})
ls.stderr.on(\'data\', (data) => {
    console.log(`stderr: ${data}`)
})
ls.on(\'close\', (code) => {
    console.log(`child process exited with code ${code}`)
})
  • .exec()、.execFile()、.fork()底层都是通过.spawn()实现的
  • 上面都是异步方法,有相应的同步方法,例如在子进程调用shell,需要使用同步的方式

下面是fork的使用,以及进程通信

// main.js
var child_process = require(\'child_process\')
var child = child_process.fork(\'./child.js\',{
    silent: false
})
child.on(\'message\', function (m) {
    console.log(\'message from child: \' + JSON.stringify(m))
})
// child.js
process.on(\'message\', function(m){
    console.log(\'message from parent: \' + JSON.stringify(m))
})
process.send({from: \'child\'})

结果

> node main.js
message from child: {"from":"child"}
message from parent: {"from":"parent"}

cluster 进程

var cluster = require(\'cluster\')
var http = require(\'http\')
var numCPUs = require(\'os\').cpus().length
if (cluster.isMaster) {
    console.log(`[master] master start...`)
    for (var i = 0; i < numCPUs; i++) {
        var worker = cluster.fork()
        worker.send(`hello I am master`)
    }
    cluster.on(\'exit\', function (worker, code, signal) {
        console.log(\'worker \' + worker.process.pid + \' died\')
        // cluster.fork() // 监听work进程是否退出,退出就新建进程
    })
} else {
    process.on(\'message\',function(p) {
        console.log(`[worker]: ${p}`)
        process.send(\'hi i am worker\')
        process.exit(0) // 手动推出worker进程
    })
}

worker_threads 线程

const {
  isMainThread, parentPort, workerData, threadId,
  MessageChannel, MessagePort, Worker
} = require(\'worker_threads\');
function mainThread() {
  const worker = new Worker(__filename, { workerData: 0 });
  worker.on(\'exit\', code => { console.log(`main: worker stopped with exit code ${code}`); });
  worker.on(\'message\', msg => {
    console.log(`main: receive ${msg}`);
    worker.postMessage(msg + 1);
  });
}
function workerThread() {
  console.log(`worker: threadId ${threadId} start with ${__filename}`);
  console.log(`worker: workerDate ${workerData}`);
  parentPort.on(\'message\', msg => {
    console.log(`worker: receive ${msg}`);
    if (msg === 5) { process.exit(); }
    parentPort.postMessage(msg);
  }),
  parentPort.postMessage(workerData);
}
 
if (isMainThread) {
  mainThread();
} else {
  workerThread();
}

MessageChannel 通信方式

const {
  isMainThread, parentPort, workerData, threadId,
  MessageChannel, MessagePort, Worker
} = require(\'worker_threads\');
 
if (isMainThread) {
  const worker1 = new Worker(__filename);
  const worker2 = new Worker(__filename);
  const subChannel = new MessageChannel();
  worker1.postMessage({ hereIsYourPort: subChannel.port1 }, [subChannel.port1]);
  worker2.postMessage({ hereIsYourPort: subChannel.port2 }, [subChannel.port2]);
} else {
  parentPort.once(\'message\', (value) => {
    value.hereIsYourPort.postMessage(\'hello\');
    value.hereIsYourPort.on(\'message\', msg => {
      console.log(`thread ${threadId}: receive ${msg}`);
    });
  });
}

参考
Nodejs进阶:如何玩转子进程(child_process)