From ca5b2f41f6a48675f31d5add5008fd4cadaefe37 Mon Sep 17 00:00:00 2001 From: Chris Duncan Date: Mon, 2 Dec 2024 17:08:40 -0800 Subject: [PATCH] Add converge functionality to Pool so that work can be divided or collaborated. --- src/lib/pool.ts | 34 ++++++++++++++++++++++++++-------- 1 file changed, 26 insertions(+), 8 deletions(-) diff --git a/src/lib/pool.ts b/src/lib/pool.ts index 9d9d473..dad96c6 100644 --- a/src/lib/pool.ts +++ b/src/lib/pool.ts @@ -18,6 +18,7 @@ type Thread = { * Processes an array of tasks using Web Workers. */ export class Pool { + #approach: 'converge' | 'divide' = 'divide' #cores: number = Math.max(1, navigator.hardwareConcurrency ?? 1 - 1) #queue: object[] = [] #resolve: Function = (value: unknown) => { } @@ -61,23 +62,40 @@ export class Pool { #report (thread: Thread, result: any[]) { this.#results.push(...result) thread.isBusy = false - if (this.#queue.length > 0) { - this.#assign(thread, [this.#queue.shift()]) - } else if (this.isDone) { + if (this.#approach === 'converge') { + this.#stop() + } + if (this.isDone) { this.#resolve(this.#results) } } - async work (data: object[]): Promise { + #stop () { + for (const thread of this.#threads) { + const msg = ['stop'] + const buf = new TextEncoder().encode(JSON.stringify(msg)).buffer + thread.worker.postMessage(buf, [buf]) + thread.isBusy = false + } + } + + async work (approach: 'converge' | 'divide', data: object[],): Promise { + if (approach !== 'converge' && approach !== 'divide') + throw new TypeError('Invalid work approach') if (!Array.isArray(data)) data = [data] return new Promise(resolve => { + this.#approach = approach this.#queue = data this.#resolve = resolve - const chunk = 1 + (this.#queue.length / this.#cores) + const chunk = 1 + (this.#queue.length / this.#threads.length) for (const thread of this.#threads) { - const next = this.#queue.slice(0, chunk) - this.#queue = this.#queue.slice(chunk) - this.#assign(thread, next) + if (approach === 'converge') { + this.#assign(thread, this.#queue) + } else { + const next = this.#queue.slice(0, chunk) + this.#queue = this.#queue.slice(chunk) + this.#assign(thread, next) + } } }) } -- 2.34.1