2023年12月3日日曜日

TypeScriptでジョブキュー

 多分使い古されたネタですが、なんとなく思いついた&作ってみたくなったので気にせずに投下します。

「一度に実行するジョブの数は最大でN個、それ以上はキューイングしておき、実行中のジョブが終了したら順次実行」というアレです。

ソースコード

シンプルに実装するとこんな感じです。依存ライブラリは特にありません。

// ジョブ関数(enqueue() の引数で使用)
type JobFunction<T> = () => Promise<T>;
// 処理待ち関数(内部的に使用)
type WaitingFunction = () => void;

// ジョブキュー
class JobQueue {
  // ジョブキューの最大サイズ
  private capacity: number;
  // 処理中のジョブ数
  private processingJobsNumber: number;
  // 処理待ち関数のキュー
  private waitingFunctions: WaitingFunction[];

  constructor(capacity: number) {
    this.capacity = capacity;
    this.processingJobsNumber = 0;
    this.waitingFunctions = [];
  }

  enqueue<T>(jobFunction: JobFunction<T>): Promise<T> {
    return new Promise((resolve, reject) => {
      this.waitingFunctions.push(() => {
        // 処理可能になったら呼ばれる
        jobFunction()
          .then(resolve, reject) // ジョブ関数の戻り値で解決する
          .finally(() => {
            this.processingJobsNumber--;
            this.stateUpdated();
          });
      });

      this.stateUpdated();
    });
  }

  private stateUpdated(): void {
    if(this.processingJobsNumber >= this.capacity) {
      // 処理可能なジョブの数に余裕がない
      return;
    }

    const waitingFunction = this.waitingFunctions.shift();
    if(waitingFunction === undefined) {
      // 処理待ちのジョブがない
      return;
    }

    // 処理待ち関数を実行
    this.processingJobsNumber++;
    waitingFunction();
  }
}

こうやって使います。enqueue()メソッドに渡すジョブ関数はPromiseを返し、(この例では使っていませんが)ジョブ関数が解決あるいは拒否した値はenqueue()でも取り出せます。

import {setTimeout} from "node:timers/promises";

const q = new JobQueue(5); // 実行可能ジョブ数=5
for(let i = 1; i <= 10; i++) {
  q.enqueue(async() => {
    console.log(`job ${i} start`);
    await setTimeout(1000 * i);
    console.log(`job ${i} end`);
  });
}

実行すると、

  • 0s: job1-5開始
  • 1s: job1終了 / job6開始
  • 2s: job2終了 / job7開始
  • 3s: job3終了 / job8開始
  • 4s: job4終了 / job9開始
  • 5s: job5終了 / job10開始
  • 6s: -
  • 7s: job6終了
  • 8s: -
  • 9s: job7終了
  • 10s: -
  • 11s: job8終了
  • 12s: -
  • 13s: job9終了
  • 14s: -
  • 15s: job10終了 / プログラム終了

という感じで、想定通り一度に5つのジョブしか実行されていないことが確認できます。

実行可能ジョブ数を1にすると、一度に1つのジョブしか実行されないクリティカルセクションのようなものになります。

0 件のコメント:

コメントを投稿