From d10a42fded4a463ff9175b8c77fdae870ef6d4d4 Mon Sep 17 00:00:00 2001 From: Chris Duncan Date: Thu, 12 Dec 2024 12:02:41 -0800 Subject: [PATCH] Scrap isTransferable since really we're only concerned with ArrayBuffers. Update worker listener to handle buffers. --- src/lib/pool.ts | 50 +++++++++++++++++-------------------------------- 1 file changed, 17 insertions(+), 33 deletions(-) diff --git a/src/lib/pool.ts b/src/lib/pool.ts index a615b28..ca3277c 100644 --- a/src/lib/pool.ts +++ b/src/lib/pool.ts @@ -8,7 +8,7 @@ type Job = { name: string reject: (value: any) => void resolve: (value: any) => void - data: any[] + data: any results: any[] } @@ -33,8 +33,9 @@ export class Pool { job: null } thread.worker.addEventListener('message', message => { - const data: string = new TextDecoder().decode(message.data) - let result: any = JSON.parse(data || "[]") + let result = (message.data instanceof ArrayBuffer) + ? new Uint8Array(message.data) + : JSON.parse(new TextDecoder().decode(message.data) || "[]") if (!Array.isArray(result)) result = [result] this.#report(thread, result) }) @@ -43,16 +44,17 @@ export class Pool { } static #assign (thread: Thread, job: Job): void { - const chunk: number = 1 + (job.data.length / this.#cores) - const next = job.data.slice(0, chunk) - job.data = job.data.slice(chunk) - if (job.data.length === 0) this.#queue.shift() - if (next?.length > 0) { - if (this.#isTransferable(next)) { - thread.worker.postMessage({ name: job.name, next }, [next]) - } else if (this.#isTransferable(next[0])) { - thread.worker.postMessage({ name: job.name, next }, next) - } else { + if (job.data instanceof ArrayBuffer) { + if (job.data.byteLength > 0) { + thread.job = job + thread.worker.postMessage({ name: job.name, buffer: job.data }, [job.data]) + } + } else { + const chunk: number = 1 + (job.data.length / this.#cores) + const next = job.data.slice(0, chunk) + job.data = job.data.slice(chunk) + if (job.data.length === 0) this.#queue.shift() + if (next?.length > 0) { const buffer = new TextEncoder().encode(JSON.stringify(next)).buffer thread.job = job thread.worker.postMessage({ name: job.name, buffer }, [buffer]) @@ -67,24 +69,6 @@ export class Pool { return true } - static #isTransferable (obj: any): boolean { - return obj instanceof ArrayBuffer - //@ts-expect-error - || obj instanceof AudioData - || obj instanceof ImageBitmap - || obj instanceof MIDIAccess - || obj instanceof OffscreenCanvas - || obj instanceof ReadableStream - || obj instanceof RTCDataChannel - || obj instanceof TransformStream - || obj instanceof VideoFrame - //@ts-expect-error - || obj instanceof WebTransportReceiveStream - //@ts-expect-error - || obj instanceof WebTransportSendStream - || obj instanceof WritableStream - } - static #report (thread: Thread, results: any[]): void { if (thread.job == null) { throw new Error('Thread returned results but had nowhere to report it.') @@ -103,8 +87,8 @@ export class Pool { } } - static async work (name: string, data: object[]): Promise { - if (!Array.isArray(data)) data = [data] + static async work (name: string, data: any): Promise { + if (!(data instanceof ArrayBuffer || Array.isArray(data))) data = [data] return new Promise((resolve, reject) => { const job: Job = { id: performance.now(), -- 2.34.1