Skip to content

Commit

Permalink
Merge pull request #89 from mainmatter/abstract-task-to-core
Browse files Browse the repository at this point in the history
feat: abstract task logic to core
  • Loading branch information
paoloricciuti authored May 31, 2024
2 parents 228e0a9 + 8fcd937 commit fdaa699
Show file tree
Hide file tree
Showing 3 changed files with 205 additions and 160 deletions.
155 changes: 155 additions & 0 deletions src/lib/core.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
/* eslint-disable @typescript-eslint/no-explicit-any */
import default_handler from './handlers/default';
import drop from './handlers/drop';
import enqueue from './handlers/enqueue';
import keep_latest from './handlers/keep_latest';
import restart from './handlers/restart';

export const handlers = {
default: default_handler,
drop,
enqueue,
keepLatest: keep_latest,
restart,
} as const;

export type HandlersMap = typeof handlers;

export type HandlerType = keyof HandlersMap;

export type TaskOptions = {
[K in HandlerType]: { kind: K } & (Parameters<HandlersMap[K]> extends []
? object
: Parameters<HandlersMap[K]>[0]);
}[HandlerType];

export type SvelteConcurrencyUtils = {
signal: AbortSignal;
link: <TArgs, TReturn>(task: Task<TArgs, TReturn>) => Task<TArgs, TReturn>;
};

export type Task<TArgs = unknown, TReturn = unknown> = ReturnType<
typeof createTask<TArgs, TReturn>
>;

type TaskAdapter<TReturn = unknown> = {
onDestroy: (fn: () => void) => void;
onInstanceCancel: () => void;
onInstanceStart: () => void;
onInstanceComplete: (new_value: TReturn) => void;
onError: (error: unknown | undefined) => void;
};

export function createTask<TArgs = unknown, TReturn = unknown>(
adapter: TaskAdapter<TReturn>,
gen_or_fun: (
args: TArgs,
utils: SvelteConcurrencyUtils,
) => Promise<TReturn> | AsyncGenerator<unknown, TReturn, unknown>,
options?: TaskOptions,
) {
const handler_factory = handlers[options?.kind ?? 'default'];
if (!handler_factory) {
throw new Error(`Unexpected kind '${options?.kind}'`);
}
const handler = handler_factory(options as never);

const abort_controllers = new Set<{ controller: AbortController; listener: () => void }>();

adapter.onDestroy(() => {
abort_controllers.forEach((abort_controller) => {
abort_controller.controller.abort();
abort_controller.controller.signal.removeEventListener('abort', abort_controller.listener);
});
});

return {
cancelAll() {
abort_controllers.forEach((abort_controller) => {
abort_controller.controller.abort();
});
},
perform(...args: undefined extends TArgs ? [] : [TArgs]) {
const child_tasks = new Set<ReturnType<Task<any, any>['perform']>>();

function cancel_linked_and_update_store() {
for (const child_task of child_tasks) {
child_task.cancel();
}
adapter.onInstanceCancel();
}

let resolve: (value: TReturn) => unknown;
let reject: (cause: unknown) => unknown;
const promise = new Promise<TReturn>((resolver, rejecter) => {
resolve = resolver;
reject = rejecter;
});
const abort_controller = new AbortController();
abort_controller.signal.addEventListener('abort', cancel_linked_and_update_store);
abort_controllers.add({
controller: abort_controller,
listener: cancel_linked_and_update_store,
});
function link<TLinkArgs, TLinkReturn>(
task: Task<TLinkArgs, TLinkReturn>,
): Task<TLinkArgs, TLinkReturn> {
const old_perform = task.perform;
return {
...task,
perform(...args) {
const instance = old_perform(...args);
child_tasks.add(instance);
return instance;
},
};
}
handler(
() => {
adapter.onInstanceStart();
queueMicrotask(async () => {
try {
const gen_or_value = await gen_or_fun(args[0]!, {
signal: abort_controller.signal,
link,
});
const is_generator =
gen_or_value &&
typeof gen_or_value === 'object' &&
Symbol.asyncIterator in gen_or_value;
if (is_generator) {
let next_val = await gen_or_value.next();
while (!next_val.done) {
if (abort_controller.signal.aborted) {
break;
}
next_val = await gen_or_value.next(next_val.value);
}
if (next_val.done) {
const last_result = next_val.value;
adapter.onInstanceComplete(last_result);
resolve(next_val.value);
}
} else if (!abort_controller.signal.aborted) {
adapter.onInstanceComplete(gen_or_value);
resolve(gen_or_value);
}
} catch (e) {
if (!abort_controller.signal.aborted) {
adapter.onError(e);
}
reject(e);
}
});
},
{ promise, abort_controller },
);

return Object.assign(promise, {
cancel() {
abort_controller.abort();
},
});
},
};
}
209 changes: 49 additions & 160 deletions src/lib/task.ts
Original file line number Diff line number Diff line change
@@ -1,182 +1,67 @@
/* eslint-disable @typescript-eslint/no-explicit-any */
import { onDestroy } from 'svelte';
import { createTask, handlers } from './core';
import { writable } from 'svelte/store';
import default_handler from './handlers/default';
import drop from './handlers/drop';
import enqueue from './handlers/enqueue';
import keep_latest from './handlers/keep_latest';
import restart from './handlers/restart';
import type { SvelteConcurrencyUtils, Task, TaskOptions, HandlerType, HandlersMap } from './core';
export type { SvelteConcurrencyUtils, Task, TaskOptions };

const handlers = {
default: default_handler,
drop,
enqueue,
keepLatest: keep_latest,
restart,
} as const;

type HandlersMap = typeof handlers;

type HandlerType = keyof HandlersMap;

type TaskOptions = {
[K in HandlerType]: { kind: K } & (Parameters<HandlersMap[K]> extends []
? object
: Parameters<HandlersMap[K]>[0]);
}[HandlerType];

export type SvelteConcurrencyUtils = {
signal: AbortSignal;
link: <TArgs, TReturn>(task: Task<TArgs, TReturn>) => Task<TArgs, TReturn>;
};

export type Task<TArgs = unknown, TReturn = unknown> = ReturnType<typeof task<TArgs, TReturn>>;

function _task<TArgs = undefined, TReturn = unknown>(
export function _task<TArgs = unknown, TReturn = undefined>(
gen_or_fun: (
args: TArgs,
utils: SvelteConcurrencyUtils,
) => Promise<TReturn> | AsyncGenerator<unknown, TReturn, unknown>,
options?: TaskOptions,
) {
const handler_factory = handlers[options?.kind ?? 'default'];
if (!handler_factory) {
throw new Error(`Unexpected kind '${options?.kind}'`);
}
const handler = handler_factory(options as never);
const results: TReturn[] = [];

const { subscribe, ...result } = writable({
is_loading: false,
last_successful: undefined as undefined | TReturn,
isLoading: false,
lastSuccessful: undefined as undefined | TReturn,
error: undefined as undefined | unknown,
results,
performCount: 0,
});

const abort_controllers = new Set<{ controller: AbortController; listener: () => void }>();

onDestroy(() => {
abort_controllers.forEach((abort_controller) => {
abort_controller.controller.abort();
abort_controller.controller.signal.removeEventListener('abort', abort_controller.listener);
});
});

return {
subscribe,
cancelAll() {
abort_controllers.forEach((abort_controller) => {
abort_controller.controller.abort();
});
},
perform(...args: undefined extends TArgs ? [] : [TArgs]) {
const child_tasks = new Set<ReturnType<Task<any, any>['perform']>>();

function cancel_linked_and_update_store() {
for (const child_task of child_tasks) {
child_task.cancel();
}
const actual_task = createTask<TArgs, TReturn>(
{
onDestroy(fn) {
onDestroy(fn);
},
onError(error) {
result.update((old) => {
old.is_loading = false;
old.error = error;
old.isLoading = false;
return old;
});
}

let resolve: (value: TReturn) => unknown;
let reject: (cause: unknown) => unknown;
const promise = new Promise<TReturn>((resolver, rejecter) => {
resolve = resolver;
reject = rejecter;
});
const abort_controller = new AbortController();
abort_controller.signal.addEventListener('abort', cancel_linked_and_update_store);
abort_controllers.add({
controller: abort_controller,
listener: cancel_linked_and_update_store,
});
function link<TLinkArgs, TLinkReturn>(
task: Task<TLinkArgs, TLinkReturn>,
): Task<TLinkArgs, TLinkReturn> {
const old_perform = task.perform;
return {
...task,
perform(...args) {
const instance = old_perform(...args);
child_tasks.add(instance);
return instance;
},
};
}
handler(
() => {
result.update((old) => {
old.is_loading = true;
old.performCount++;
return old;
});
queueMicrotask(async () => {
try {
const gen_or_value = await gen_or_fun(args[0]!, {
signal: abort_controller.signal,
link,
});
const is_generator =
gen_or_value &&
typeof gen_or_value === 'object' &&
Symbol.asyncIterator in gen_or_value;
if (is_generator) {
let next_val = await gen_or_value.next();
while (!next_val.done) {
if (abort_controller.signal.aborted) {
break;
}
next_val = await gen_or_value.next(next_val.value);
}
if (next_val.done) {
const last_result = next_val.value;
results.push(last_result);
result.update((old) => {
old.error = undefined;
old.is_loading = false;
old.last_successful = last_result;
return old;
});
resolve(next_val.value);
}
} else if (!abort_controller.signal.aborted) {
results.push(gen_or_value);
result.update((old) => {
old.error = undefined;
old.is_loading = false;
old.last_successful = gen_or_value;
return old;
});
resolve(gen_or_value);
}
} catch (e) {
if (!abort_controller.signal.aborted) {
result.update((old) => {
old.error = e;
old.is_loading = false;
return old;
});
}
reject(e);
}
});
},
{ promise, abort_controller },
);

return Object.assign(promise, {
subscribe,
cancel() {
abort_controller.abort();
},
});
},
onInstanceCancel() {
result.update((old) => {
old.isLoading = false;
return old;
});
},
onInstanceStart() {
result.update((old) => {
old.isLoading = true;
old.performCount++;
return old;
});
},
onInstanceComplete(last_result) {
results.push(last_result);
result.update((old) => {
old.error = undefined;
old.isLoading = false;
old.lastSuccessful = last_result;
return old;
});
},
},
};
gen_or_fun,
options,
);
return Object.assign(actual_task, {
subscribe,
});
}

type HandlersShorthands = {
Expand All @@ -197,8 +82,12 @@ function is_key(handler: string): handler is HandlerType {

for (const handler in handlers) {
if (is_key(handler)) {
to_assign[handler] = (gen_or_fun, options) =>
_task(gen_or_fun, { kind: is_key(handler) ? handler : 'default', ...(options ?? {}) });
to_assign[handler] = (gen_or_fun, options) => {
if (!is_key(handler)) {
throw new Error('Impossible');
}
return _task(gen_or_fun, { ...(options ?? {}), kind: handler });
};
}
}

Expand Down
Loading

0 comments on commit fdaa699

Please sign in to comment.