现在的需求是存在一个类内的方法,原来是串行的,现在需要改并行。现在我需要将一个对象 obj ,或者对象的方法 obj.run 传入子线程,然后回调执行。

但是我尝试了几种方式,似乎是没办法将复杂的对象进行传递?以至于常规的回调函数的方式没办法在 node 的并行化中实现。

1. worker_thread

```
new Worker(moduleThreadFile,
workerData:{'obj':obj}) //ERROR
```

会报告 Cannot set property code of which has only a getter.

2. workerpool

```
pool=workerpool.pool()
pool.exec(obj,[])
```

实际上传入的 obj 在子线程中是 undefined
举报· 381 次点击
登录 注册 站外分享
20 条回复  
okakuyang 小成 2024-7-19 15:08:23
维护一个 map ,用 id 来区分哪个任务完成了,触发相应回调。
jifengg 小成 2024-7-19 15:59:05
没用过 workerpool ,但是,以我的理解
pool.exec(obj,[])
exec 第一个参数应该是一个 function ?,第二个参数你写了空数组的[],应该是传给这个 function 的参数列表?
yaodong0126 小成 2024-7-19 16:20:20
传不了,看文档,写的清楚的不能更清楚了,用什么工具之前多看文档,多看文档
shadowyue 初学 2024-7-19 16:22:56
看文档,worker 能传递的数据格式有要求的
photon006 小成 2024-7-19 16:29:32
bluebird.map()比较方便实现并行,还能设置并发量 concurrency

http://bluebirdjs.com/docs/api/promise.map.html
luckyscript 小成 2024-7-19 16:33:23
workerData <any> Any JavaScript value that is cloned and made available as require('node:worker_threads').workerData. The cloning occurs as described in the HTML structured clone algorithm, and an error is thrown if the object cannot be cloned (e.g. because it contains functions).

---

一定要通过子线程的方案来实现吗?把需要执行的方法改造成异步的形式是不是也可以。
duowb 小成 2024-7-19 16:48:00
这个呢
https://github.com/GoogleChromeLabs/comlink
https://juejin.cn/post/6968503345031938085
xiwh 小成 2024-7-19 17:06:53
题主的需求大概率实现不了,Node 的 Worker 是并行用多进程实现的,那哪些对象是没法序列化的?文件句柄,线程句柄,tcp/udp 连接句柄,而这些资源在进程间都是隔离的,即便是强行序列化传过去也用不了,当然 Linux 似乎有方式实现进程间资源共享,最好的方式还是支持基于内存通信的多线程的语言 go java c++等
sinalvee 小成 2024-7-19 17:23:22
没实践过,不知道有没有别的坑,思路就是把函数转为字符串,worker 中再把字符串转回来

```js
const { Worker, isMainThread, parentPort, workerData } = require('node:worker_threads');

if (isMainThread) {
  const obj = {
    name: 'Foo',
  
    greet(other) {
      return `Hello ${this.name} and ${other}`;
    }
  }
  
  const objStr = JSON.stringify(obj, (key, value) => {
    if (typeof value === 'function') {
      return value.toString();
    }
    return value;
  });

  const worker = new Worker(__filename, {
    workerData: objStr,
  });
  worker.on('message', (value) => {
    console.log('Receive data from worker =>', value);
  });
  worker.on('error', console.error);
  worker.on('exit', (code) => {
    if (code !== 0)
      console.error(new Error(`Worker stopped with exit code ${code}`));
  });
} else {
  const objStr = workerData;
  const objParsed = JSON.parse(objStr);

  const run = (obj, funcName, ...args) => {
    if (obj.hasOwnProperty(funcName)) {
      const funcStr = obj[funcName];
      // 提取函数体,忽略函数参数定义
      const funcBody = funcStr.substring(obj.greet.indexOf('{') + 1, obj.greet.lastIndexOf('}'));
      // 使用剩余参数语法来定义一个新的函数,允许接收任意数量的参数
      const funcArgs = funcStr.substring(funcStr.indexOf('(') + 1, funcStr.indexOf(')')).split(',').map(arg => arg.trim()).filter(arg => arg);
      const func = new Function(...funcArgs, funcBody);
  
      return func.call(obj, ...args);
    }
  }

  const result = run(objParsed, 'greet', 'Bar');

  parentPort.postMessage(result);
}
```
123下一页
返回顶部