From 5f5b2792c038d4439a0d7636157fc6e53b5a3357 Mon Sep 17 00:00:00 2001 From: Chris Duncan Date: Sat, 30 Nov 2024 12:47:32 -0800 Subject: [PATCH] Add cores count to pool for job splitting. --- src/lib/pool.ts | 23 ++++++++++++++--------- 1 file changed, 14 insertions(+), 9 deletions(-) diff --git a/src/lib/pool.ts b/src/lib/pool.ts index e54702f..afab902 100644 --- a/src/lib/pool.ts +++ b/src/lib/pool.ts @@ -18,6 +18,7 @@ type Thread = { * Processes an array of tasks using Web Workers. */ export class Pool { + #cores: number = navigator.hardwareConcurrency - 1 #queue: object[] = [] #resolve: Function = (value: unknown) => { } #results: object[] = [] @@ -34,31 +35,35 @@ export class Pool { constructor (fn: string) { const url = URL.createObjectURL(new Blob([fn], { type: 'text/javascript' })) - for (let i = navigator.hardwareConcurrency - 1; i > 0; i--) { + for (let i = this.#cores; i > 0; i--) { const thread = { isBusy: false, //@ts-expect-error worker: new Worker(url, { type: 'module', eval: true }) } thread.worker.addEventListener('message', (message) => { - this.#report(thread, message.data ?? message) + let result = JSON.parse(new TextDecoder().decode(message.data ?? message)) + if (!Array.isArray(result)) result = [result] + this.#report(thread, result) }) this.#threads.push(thread) } } #assign (thread: Thread) { - const next = this.#queue.shift() - if (next != null) { + const chunk = 1 + (this.#queue.length / this.#cores) + const next = this.#queue.slice(0, chunk) + this.#queue = this.#queue.slice(chunk) + if (next.length > 0) { thread.isBusy = true - thread.worker.postMessage(next) + const buf = new TextEncoder().encode(JSON.stringify(next)).buffer + console.log(`posting to ${thread} (${performance.now()})`) + thread.worker.postMessage(buf, [buf]) } } - #report (thread: Thread, result: any) { - this.#results.push(result) - if (this.#results.length % 1000 === 0) - console.log(`results: ${this.#results.length} (${performance.now()})`) + #report (thread: Thread, result: any[]) { + this.#results.push(...result) thread.isBusy = false if (this.#queue.length > 0) { this.#assign(thread) -- 2.34.1