Better-sqlite3 工作线程

来自泡泡学习笔记
跳到导航 跳到搜索

对于大多数应用,better-sqlite3在主线程中运行足够快,不会阻塞很长时间。但是,如果你需要执行非常慢的查询,你可以使用工作线程来保持一切顺利运行。下面是一个在后台执行查询的线程池示例。


worker.js

在我们的案例中,工作逻辑非常简单。它接受来自主线程的消息,执行每个消息的SQL(带有任何给定参数),并将查询结果发送回去。

const { parentPort } = require('worker_threads');
const db = require('better-sqlite3')('foobar.db');

parentPort.on('message', ({ sql, parameters }) => {
  const result = db.prepare(sql).all(...parameters);
  parentPort.postMessage(result);
});


master.js

主线程负责生成工作线程、重启崩溃的线程以及接受查询任务。

const { Worker } = require('worker_threads');
const os = require('os');

/*
  导出一个函数,用于排队待处理的工作。
 */

const queue = [];
exports.asyncQuery = (sql, ...parameters) => {
  return new Promise((resolve, reject) => {
    queue.push({
      resolve,
      reject,
      message: { sql, parameters },
    });
    drainQueue();
  });
};

/*
  指示工作线程排空队列。
 */

let workers = [];
function drainQueue() {
  for (const worker of workers) {
    worker.takeWork();
  }
}

/*
  生成尝试排空队列的工作线程。
 */

os.cpus().forEach(function spawn() {
  const worker = new Worker('./worker.js');

  let job = null; // 队列中的当前项目
  let error = null; // 导致工作线程崩溃的错误

  function takeWork() {
    if (!job && queue.length) {
      // 如果队列中有工作,将其发送给工作线程
      job = queue.shift();
      worker.postMessage(job.message);
    }
  }

  worker
    .on('online', () => {
      workers.push({ takeWork });
      takeWork();
    })
    .on('message', (result) => {
      job.resolve(result);
      job = null;
      takeWork(); // 检查是否有更多要做的事情
    })
    .on('error', (err) => {
      console.error(err);
      error = err;
    })
    .on('exit', (code) => {
      workers = workers.filter(w => w.takeWork !== takeWork);
      if (job) {
        job.reject(error || new Error('worker died'));
      }
      if (code !== 0) {
        console.error(`worker exited with code ${code}`);
        spawn(); // 工作线程死亡,所以生成一个新的工作线程
      }
    });
});