From a0f744c585f03321e9a2ccd808217a56f60ecf0d Mon Sep 17 00:00:00 2001 From: Jacob Gillespie Date: Fri, 13 Sep 2024 14:14:52 +0100 Subject: [PATCH] Optimize reconciliation loop --- src/handlers/state.ts | 9 ++------- src/utils/aws.ts | 28 +++++++++++++++++----------- src/utils/fly/reconcile.ts | 28 +++++++++++++++++----------- src/utils/scheduler.ts | 24 ++++++++++++++++++++++++ 4 files changed, 60 insertions(+), 29 deletions(-) create mode 100644 src/utils/scheduler.ts diff --git a/src/handlers/state.ts b/src/handlers/state.ts index a77c9db..301b4a5 100644 --- a/src/handlers/state.ts +++ b/src/handlers/state.ts @@ -17,7 +17,7 @@ import {client} from '../utils/grpc' interface CloudProvider { getCurrentState(): Promise reportCurrentState(currentState: T): Promise - reconcile(response: GetDesiredStateResponse, state: T): Promise + reconcile(response: GetDesiredStateResponse, state: T): Promise } export const AwsProvider: CloudProvider = { @@ -42,12 +42,7 @@ export async function startStateStream(signal: AbortSignal, provider: CloudPr const response = await client.getDesiredState({clientId: clientID}, {signal}) if (isEmptyResponse(response)) continue - currentState = await provider.getCurrentState() - - const errors = await provider.reconcile(response, currentState) - for (const error of errors) { - await reportError(error) - } + await provider.reconcile(response, currentState) } catch (err: any) { if (err instanceof ConnectError && err.code === Code.FailedPrecondition) { // Connection lock was not acquired, sleep and retry diff --git a/src/utils/aws.ts b/src/utils/aws.ts index d5604bb..227ebb5 100644 --- a/src/utils/aws.ts +++ b/src/utils/aws.ts @@ -36,6 +36,7 @@ import { additionalSubnetIDs, } from './env' import {toPlainObject} from './plain' +import {scheduleTask} from './scheduler' const client = new EC2Client({}) @@ -50,17 +51,22 @@ export async function getCurrentState() { return toPlainObject(state) } -export async function reconcile(response: GetDesiredStateResponse, state: CurrentState): Promise { - const results = await Promise.allSettled([ - ...response.newVolumes.map((volume) => reconcileNewVolume(state.volumes, volume)), - ...response.volumeChanges.map((volume) => reconcileVolume(state.volumes, volume, state.instances)), - ...response.newMachines.map((instance) => reconcileNewMachine(state.instances, instance)), - ...response.machineChanges.map((instance) => reconcileMachine(state.instances, instance)), - ]) - - return results - .map((r) => (r.status === 'rejected' ? `${r.reason}` : undefined)) - .filter((r): r is string => r !== undefined) +export async function reconcile(response: GetDesiredStateResponse, state: CurrentState): Promise { + for (const volume of response.newVolumes) { + void scheduleTask(`volume/new/${volume.id}`, () => reconcileNewVolume(state.volumes, volume)) + } + + for (const volume of response.volumeChanges) { + void scheduleTask(`volume/change/${volume.id}`, () => reconcileVolume(state.volumes, volume, state.instances)) + } + + for (const instance of response.newMachines) { + void scheduleTask(`machine/new/${instance.id}`, () => reconcileNewMachine(state.instances, instance)) + } + + for (const instance of response.machineChanges) { + void scheduleTask(`machine/change/${instance.id}`, () => reconcileMachine(state.instances, instance)) + } } /** Filter to select only Depot-managed resources */ diff --git a/src/utils/fly/reconcile.ts b/src/utils/fly/reconcile.ts index 1b39048..e37f9e1 100644 --- a/src/utils/fly/reconcile.ts +++ b/src/utils/fly/reconcile.ts @@ -14,6 +14,7 @@ import {CLOUD_AGENT_CONNECTION_ID, FLY_REGION} from '../env' import {errorMessage} from '../errors' import {client} from '../grpc' import {toPlainObject} from '../plain' +import {scheduleTask} from '../scheduler' import { createBuildkitGPUVolume, createBuildkitVolume, @@ -51,17 +52,22 @@ export async function getCurrentState() { return toPlainObject(state) } -export async function reconcile(response: GetDesiredStateResponse, state: CurrentState): Promise { - const results = await Promise.allSettled([ - ...response.newVolumes.map((volume) => reconcileNewVolume(state.volumes, volume)), - ...response.volumeChanges.map((volume) => reconcileVolume(state, volume)), - ...response.newMachines.map((machine) => reconcileNewMachine(state.machines, machine, state.volumes)), - ...response.machineChanges.map((machine) => reconcileMachine(state.machines, machine)), - ]) - - return results - .map((r) => (r.status === 'rejected' ? `${r.reason}` : undefined)) - .filter((r): r is string => r !== undefined) +export async function reconcile(response: GetDesiredStateResponse, state: CurrentState): Promise { + for (const volume of response.newVolumes) { + void scheduleTask(`volume/new/${volume.id}`, () => reconcileNewVolume(state.volumes, volume)) + } + + for (const volume of response.volumeChanges) { + void scheduleTask(`volume/change/${volume.id}`, () => reconcileVolume(state, volume)) + } + + for (const machine of response.newMachines) { + void scheduleTask(`machine/new/${machine.id}`, () => reconcileNewMachine(state.machines, machine, state.volumes)) + } + + for (const machine of response.machineChanges) { + void scheduleTask(`machine/change/${machine.id}`, () => reconcileMachine(state.machines, machine)) + } } async function reconcileNewVolume(state: Volume[], volume: GetDesiredStateResponse_NewVolume) { diff --git a/src/utils/scheduler.ts b/src/utils/scheduler.ts new file mode 100644 index 0000000..d2e0aff --- /dev/null +++ b/src/utils/scheduler.ts @@ -0,0 +1,24 @@ +import {reportError} from './errors' + +const inProgressTasks = new Set() + +/** + * Schedule an update to run, ensuring that only one update is running at a time. + */ +export async function scheduleTask(key: string, task: () => Promise) { + if (inProgressTasks.has(key)) { + console.log(`Skipping ${key} because it is already in progress`) + return + } + + try { + inProgressTasks.add(key) + console.log(`Accepted ${key}, starting task`) + return await task() + } catch (err) { + await reportError(err) + } finally { + inProgressTasks.delete(key) + console.log(`Task ${key} completed`) + } +}