]> zoso.dev Git - libnemo.git/commitdiff
Add cores count to pool for job splitting.
authorChris Duncan <chris@zoso.dev>
Sat, 30 Nov 2024 20:47:32 +0000 (12:47 -0800)
committerChris Duncan <chris@zoso.dev>
Sat, 30 Nov 2024 20:47:32 +0000 (12:47 -0800)
src/lib/pool.ts

index e54702f7c3f900ce3e9a9978ee38a984f287331e..afab9020345f8127f9e1c974b636003e2e13e2ac 100644 (file)
@@ -18,6 +18,7 @@ type Thread = {
 * Processes an array of tasks using Web Workers.
 */
 export class Pool {
+       #cores: number = navigator.hardwareConcurrency - 1
        #queue: object[] = []
        #resolve: Function = (value: unknown) => { }
        #results: object[] = []
@@ -34,31 +35,35 @@ export class Pool {
 
        constructor (fn: string) {
                const url = URL.createObjectURL(new Blob([fn], { type: 'text/javascript' }))
-               for (let i = navigator.hardwareConcurrency - 1; i > 0; i--) {
+               for (let i = this.#cores; i > 0; i--) {
                        const thread = {
                                isBusy: false,
                                //@ts-expect-error
                                worker: new Worker(url, { type: 'module', eval: true })
                        }
                        thread.worker.addEventListener('message', (message) => {
-                               this.#report(thread, message.data ?? message)
+                               let result = JSON.parse(new TextDecoder().decode(message.data ?? message))
+                               if (!Array.isArray(result)) result = [result]
+                               this.#report(thread, result)
                        })
                        this.#threads.push(thread)
                }
        }
 
        #assign (thread: Thread) {
-               const next = this.#queue.shift()
-               if (next != null) {
+               const chunk = 1 + (this.#queue.length / this.#cores)
+               const next = this.#queue.slice(0, chunk)
+               this.#queue = this.#queue.slice(chunk)
+               if (next.length > 0) {
                        thread.isBusy = true
-                       thread.worker.postMessage(next)
+                       const buf = new TextEncoder().encode(JSON.stringify(next)).buffer
+                       console.log(`posting to ${thread} (${performance.now()})`)
+                       thread.worker.postMessage(buf, [buf])
                }
        }
 
-       #report (thread: Thread, result: any) {
-               this.#results.push(result)
-               if (this.#results.length % 1000 === 0)
-                       console.log(`results: ${this.#results.length} (${performance.now()})`)
+       #report (thread: Thread, result: any[]) {
+               this.#results.push(...result)
                thread.isBusy = false
                if (this.#queue.length > 0) {
                        this.#assign(thread)