* Processes an array of tasks using Web Workers.
*/
export class Pool {
+ #approach: 'converge' | 'divide' = 'divide'
#cores: number = Math.max(1, navigator.hardwareConcurrency ?? 1 - 1)
#queue: object[] = []
#resolve: Function = (value: unknown) => { }
#report (thread: Thread, result: any[]) {
this.#results.push(...result)
thread.isBusy = false
- if (this.#queue.length > 0) {
- this.#assign(thread, [this.#queue.shift()])
- } else if (this.isDone) {
+ if (this.#approach === 'converge') {
+ this.#stop()
+ }
+ if (this.isDone) {
this.#resolve(this.#results)
}
}
- async work (data: object[]): Promise<any> {
+ #stop () {
+ for (const thread of this.#threads) {
+ const msg = ['stop']
+ const buf = new TextEncoder().encode(JSON.stringify(msg)).buffer
+ thread.worker.postMessage(buf, [buf])
+ thread.isBusy = false
+ }
+ }
+
+ async work (approach: 'converge' | 'divide', data: object[],): Promise<any> {
+ if (approach !== 'converge' && approach !== 'divide')
+ throw new TypeError('Invalid work approach')
if (!Array.isArray(data)) data = [data]
return new Promise(resolve => {
+ this.#approach = approach
this.#queue = data
this.#resolve = resolve
- const chunk = 1 + (this.#queue.length / this.#cores)
+ const chunk = 1 + (this.#queue.length / this.#threads.length)
for (const thread of this.#threads) {
- const next = this.#queue.slice(0, chunk)
- this.#queue = this.#queue.slice(chunk)
- this.#assign(thread, next)
+ if (approach === 'converge') {
+ this.#assign(thread, this.#queue)
+ } else {
+ const next = this.#queue.slice(0, chunk)
+ this.#queue = this.#queue.slice(chunk)
+ this.#assign(thread, next)
+ }
}
})
}