]> zoso.dev Git - libnemo.git/commitdiff
Add converge functionality to Pool so that work can be divided or collaborated.
authorChris Duncan <chris@zoso.dev>
Tue, 3 Dec 2024 01:08:40 +0000 (17:08 -0800)
committerChris Duncan <chris@zoso.dev>
Tue, 3 Dec 2024 01:08:40 +0000 (17:08 -0800)
src/lib/pool.ts

index 9d9d47323a10831fbc37a2acfaf65fbeebe898ae..dad96c6537269299bff1c86a31f8b6a1f3966e15 100644 (file)
@@ -18,6 +18,7 @@ type Thread = {
 * Processes an array of tasks using Web Workers.
 */
 export class Pool {
+       #approach: 'converge' | 'divide' = 'divide'
        #cores: number = Math.max(1, navigator.hardwareConcurrency ?? 1 - 1)
        #queue: object[] = []
        #resolve: Function = (value: unknown) => { }
@@ -61,23 +62,40 @@ export class Pool {
        #report (thread: Thread, result: any[]) {
                this.#results.push(...result)
                thread.isBusy = false
-               if (this.#queue.length > 0) {
-                       this.#assign(thread, [this.#queue.shift()])
-               } else if (this.isDone) {
+               if (this.#approach === 'converge') {
+                       this.#stop()
+               }
+               if (this.isDone) {
                        this.#resolve(this.#results)
                }
        }
 
-       async work (data: object[]): Promise<any> {
+       #stop () {
+               for (const thread of this.#threads) {
+                       const msg = ['stop']
+                       const buf = new TextEncoder().encode(JSON.stringify(msg)).buffer
+                       thread.worker.postMessage(buf, [buf])
+                       thread.isBusy = false
+               }
+       }
+
+       async work (approach: 'converge' | 'divide', data: object[],): Promise<any> {
+               if (approach !== 'converge' && approach !== 'divide')
+                       throw new TypeError('Invalid work approach')
                if (!Array.isArray(data)) data = [data]
                return new Promise(resolve => {
+                       this.#approach = approach
                        this.#queue = data
                        this.#resolve = resolve
-                       const chunk = 1 + (this.#queue.length / this.#cores)
+                       const chunk = 1 + (this.#queue.length / this.#threads.length)
                        for (const thread of this.#threads) {
-                               const next = this.#queue.slice(0, chunk)
-                               this.#queue = this.#queue.slice(chunk)
-                               this.#assign(thread, next)
+                               if (approach === 'converge') {
+                                       this.#assign(thread, this.#queue)
+                               } else {
+                                       const next = this.#queue.slice(0, chunk)
+                                       this.#queue = this.#queue.slice(chunk)
+                                       this.#assign(thread, next)
+                               }
                        }
                })
        }