Skip to content

Commit

Permalink
fix: allow expectationManager to skip ahead in state processing
Browse files Browse the repository at this point in the history
This is to help with a situation where a check is really slow, to avoid expectationManager spending too much time evaluating a single state
  • Loading branch information
nytamin committed Feb 18, 2025
1 parent 310188b commit e215d1f
Show file tree
Hide file tree
Showing 5 changed files with 171 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,16 @@ export class EvaluationRunner {
ExpectedPackageStatusAPI.WorkStatusState.WORKING,
]

/** Count of expectations that are in READY or WORKING */
const readyCount = tracked.reduce(
(memo, trackedExp) =>
trackedExp.state === ExpectedPackageStatusAPI.WorkStatusState.READY ||
trackedExp.state === ExpectedPackageStatusAPI.WorkStatusState.WORKING
? memo + 1
: memo,
0
)

// Step 1: Evaluate the Expectations which are in the states that can be handled in parallel:
for (const handleState of handleStatesParallel) {
const timer = startTimer()
Expand All @@ -322,6 +332,13 @@ export class EvaluationRunner {
if (trackedWithState.length) {
// We're using a PromisePool so that we don't send out an unlimited number of parallel requests to the workers.

const startTime = Date.now()
/** How long to wait before skipping ahead to process the next state */
const allowSkipTime =
this.tracker.constants.ALLOW_SKIPPING_QUEUE_TIME *
// If there are expectations in READY, we should skip ahead to process them sooner:
(readyCount > 0 ? 0.25 : 0.5)

await PromisePool.for(trackedWithState)
.withConcurrency(this.tracker.constants.PARALLEL_CONCURRENCY)
.handleError(async (error, trackedExp) => {
Expand All @@ -332,7 +349,18 @@ export class EvaluationRunner {
trackedExp.session.hadError = true
}
})
.process(async (trackedExp) => {
.process(async (trackedExp, _index, pool) => {
// If enough time has passed since we started processing this state,
// we should move on to the next state (by cancelling handling the rest of the expectations in this PromisePool).
// This is so that we can continue to process other states that might be more important.

const timeSinceStart = Date.now() - startTime
if (timeSinceStart > allowSkipTime) {
this.logger.debug(`Skipping ahead (after ${timeSinceStart}ms, limit: ${allowSkipTime}ms)`)
pool.stop()
return
}

await evaluateExpectationState(this, trackedExp)
postProcessSession(trackedExp)
})
Expand All @@ -356,7 +384,7 @@ export class EvaluationRunner {
const timer = startTimer()
// Step 2: Evaluate the expectations, now one by one:
for (const trackedExp of tracked) {
// Only handle the states that
// Only handle the states that are in this state
if (handleStatesSerial.includes(trackedExp.state)) {
// Evaluate the Expectation:
await evaluateExpectationState(this, trackedExp)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ export function sortTrackedExpectations(
if (aLastErrorTime > bLastErrorTime) return 1
if (aLastErrorTime < bLastErrorTime) return -1

// Lowest lastOperationTime first
// Lowest lastEvaluationTime first
if (a.lastEvaluationTime > b.lastEvaluationTime) return 1
if (a.lastEvaluationTime < b.lastEvaluationTime) return -1

Expand Down
25 changes: 14 additions & 11 deletions tests/internal-tests/src/__mocks__/fs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ export function __setCallbackInterceptor(interceptor: (type: string, cb: () => v
fs.__setCallbackInterceptor = __setCallbackInterceptor
export function __restoreCallbackInterceptor(): void {
fs.__cb = (_type: string, cb: () => void) => {
cb()
return cb()
}
}
fs.__restoreCallbackInterceptor = __restoreCallbackInterceptor
Expand Down Expand Up @@ -504,16 +504,19 @@ fs.readFile = readFile
export function open(path: string, _options: string, callback: (error: any, result?: any) => void): void {
path = fixPath(path)
if (DEBUG_LOG) console.log('fs.open', path)

fsMockEmitter.emit('open', path)
try {
const file = getMock(path)
fdId++
openFileDescriptors[fdId + ''] = file
return fs.__cb('open', () => {
try {
const file = getMock(path)
fdId++
openFileDescriptors[fdId + ''] = file

return callback(undefined, fdId)
} catch (err) {
callback(err)
}
return callback(undefined, fdId)
} catch (err) {
callback(err)
}
})
}
fs.open = open
export function close(fd: number, callback: (error: any, result?: any) => void): void {
Expand All @@ -534,7 +537,7 @@ export function copyFile(source: string, destination: string, callback: (error:
if (DEBUG_LOG) console.log('fs.copyFile', source, destination)

fsMockEmitter.emit('copyFile', source, destination)
fs.__cb('copyFile', () => {
return fs.__cb('copyFile', () => {
try {
const mockFile = getMock(source)
if (DEBUG_LOG) console.log('source', source)
Expand All @@ -558,7 +561,7 @@ export function rename(source: string, destination: string, callback: (error: an
destination = fixPath(destination)
if (DEBUG_LOG) console.log('fs.rename', source, destination)
fsMockEmitter.emit('rename', source, destination)
fs.__cb('rename', () => {
return fs.__cb('rename', () => {
try {
const mockFile = getMock(source)
setMock(destination, mockFile, false)
Expand Down
118 changes: 115 additions & 3 deletions tests/internal-tests/src/__tests__/issues.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,9 @@ describeForAllPlatforms(
})
afterEach(() => {
fs.__mockSetAccessDelay(0) // Reset any access delay

fs.__emitter().removeAllListeners()
fs.__restoreCallbackInterceptor()
})
},
(_platform: string) => {
Expand Down Expand Up @@ -310,7 +313,9 @@ describeForAllPlatforms(

// Wait until the work have been aborted, and restarted:
await waitUntil(() => {
expect(env.expectationStatuses[EXP_copy0].statusInfo.status).toEqual(expect.stringMatching(/new|waiting/))
expect(env.expectationStatuses[EXP_copy0].statusInfo.status).toEqual(
expect.stringMatching(/new|waiting/)
)
}, env.WORK_TIMEOUT_TIME + env.WAIT_JOB_TIME_SAFE)

// Add another worker:
Expand Down Expand Up @@ -400,6 +405,113 @@ describeForAllPlatforms(
// clean up:
deferredCallbacks.forEach((cb) => cb())
})
test('Access times out, queue should continue', async () => {
expect(env.workerAgents).toHaveLength(1)

const listenToOpen = jest.fn(() => {
fs.__setCallbackInterceptor((type, cb) => {
if (type === 'open') {
// Delay the access
setTimeout(() => {
cb()
}, 1000)
} else {
return cb()
}
})
})
fs.__emitter().on('open', listenToOpen)

const expectationList: Array<Expectation.Any> = []
fs.__mockSetDirectory('/targets/target0')
for (let i = 0; i < 100; i++) {
fs.__mockSetFile(`/sources/source0/file${i}Source.mp4`, 1234)

const expectationId = protectString<ExpectationId>(`copy${i}`)
expectationList.push(
literal<Expectation.FileCopy>({
id: expectationId,
priority: 0,
managerId: MANAGER0,
fromPackages: [{ id: PACKAGE0, expectedContentVersionHash: 'abcd1234' }],
type: Expectation.Type.FILE_COPY,
statusReport: {
label: `Copy file "${expectationId}"`,
description: `Copy "${expectationId}" because test`,
displayRank: 0,
sendReport: true,
},
startRequirement: {
sources: [getLocalSource(SOURCE0, `file${i}Source.mp4`)],
},
endRequirement: {
targets: [getLocalTarget(TARGET0, `file${i}Target.mp4`)],
content: {
filePath: `file${i}Target.mp4`,
},
version: { type: Expectation.Version.Type.FILE_ON_DISK },
},
workOptions: {
requiredForPlayout: true,
},
})
)
}

const expectations: Record<ExpectationId, Expectation.Any> = {}
expectationList.slice(0, 9999).forEach((exp) => {
expectations[exp.id] = exp
})
env.expectationManager.updateExpectations(expectations)

expect(env.PARALLEL_CONCURRENCY).toBe(10)
expect(env.ALLOW_SKIPPING_QUEUE_TIME).toBe(3000)

// What we're expecting to happen:
// * We've got 100 expectations.
// * The Expectations take 1 second to fs.open (which is done during WAITING)
// * The Expectations are evaluated in batches of 10
// * After 3s, expectationManager should decide to skip ahead from evaluating the WAITING expectations
// So a short while later, we should see that some of the expectations have been fulfilled

const getStatusCount = () => {
const statusCount: Record<string, number> = {
new: 0,
waiting: 0,
ready: 0,
working: 0,
fulfilled: 0,
}
Object.values(env.expectationStatuses).forEach((v) => {
if (v.statusInfo.status) {
statusCount[v.statusInfo.status] = (statusCount[v.statusInfo.status] || 0) + 1
}
})
return statusCount
}
{
await waitTime(1000) // 1000
// At this time, no expectations should have moved past the WAITING state yet
const statuses = getStatusCount()
expect(statuses['waiting']).toBe(100)
}
{
await waitTime(1000) // 2000
// At this time, some expectations should have moved past the WAITING state
const statuses = getStatusCount()
expect(statuses['waiting']).toBeGreaterThan(50)
expect(statuses['ready']).toBeGreaterThanOrEqual(10)
expect(statuses['working']).toBe(0)
expect(statuses['fulfilled']).toBe(0)
}
{
await waitTime(1000) // 3000
// By this time, expectationManager should have skipped ahead and processed more states

const statuses = getStatusCount()
expect(statuses['fulfilled']).toBeGreaterThanOrEqual(1)
}
}, 5000)
test.skip('One of the workers reply very slowly', async () => {
// The expectation should be picked up by one of the faster workers

Expand Down Expand Up @@ -547,8 +659,8 @@ function addCopyFileExpectation(
fromPackages: [{ id: PACKAGE0, expectedContentVersionHash: 'abcd1234' }],
type: Expectation.Type.FILE_COPY,
statusReport: {
label: `Copy file0`,
description: `Copy file0 because test`,
label: `Copy file "${expectationId}"`,
description: `Copy "${expectationId}" because test`,
displayRank: 0,
sendReport: true,
},
Expand Down
12 changes: 11 additions & 1 deletion tests/internal-tests/src/__tests__/lib/setupEnv.ts
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,9 @@ export async function prepareTestEnvironment(debugLogging: boolean): Promise<Tes
const WORK_TIMEOUT_TIME = 900 // ms
const ERROR_WAIT_TIME = 500

const ALLOW_SKIPPING_QUEUE_TIME = 3000 // ms
const PARALLEL_CONCURRENCY = 10

const SCALE_UP_TIME = 100

const config: { process: ProcessConfig } = {
Expand Down Expand Up @@ -268,7 +271,8 @@ export async function prepareTestEnvironment(debugLogging: boolean): Promise<Tes
packageId: ExpectedPackageId,
packageStatus: Omit<ExpectedPackageStatusAPI.PackageContainerPackageStatus, 'statusChanged'> | null
) => {
if (debugLogging) console.log('reportPackageContainerPackageStatus', containerId, packageId, packageStatus)
if (debugLogging)
console.log('reportPackageContainerPackageStatus', containerId, packageId, packageStatus)

let container = containerStatuses[containerId]
if (!container) {
Expand Down Expand Up @@ -308,6 +312,8 @@ export async function prepareTestEnvironment(debugLogging: boolean): Promise<Tes
FULFILLED_MONITOR_TIME: WAIT_SCAN_TIME - WAIT_JOB_TIME - 300,
WORK_TIMEOUT_TIME: WORK_TIMEOUT_TIME - 300,
ERROR_WAIT_TIME: ERROR_WAIT_TIME - 300,
ALLOW_SKIPPING_QUEUE_TIME,
PARALLEL_CONCURRENCY,

SCALE_UP_TIME: SCALE_UP_TIME,
},
Expand All @@ -321,6 +327,8 @@ export async function prepareTestEnvironment(debugLogging: boolean): Promise<Tes
WAIT_SCAN_TIME,
WORK_TIMEOUT_TIME,
ERROR_WAIT_TIME,
ALLOW_SKIPPING_QUEUE_TIME,
PARALLEL_CONCURRENCY,
SCALE_UP_TIME,
expectationManager: em.expectationManager,
workerAgents: em.workerAgents,
Expand Down Expand Up @@ -360,6 +368,8 @@ export interface TestEnvironment {
WAIT_SCAN_TIME: number
WORK_TIMEOUT_TIME: number
ERROR_WAIT_TIME: number
ALLOW_SKIPPING_QUEUE_TIME: number
PARALLEL_CONCURRENCY: number
SCALE_UP_TIME: number
expectationManager: ExpectationManager
workerAgents: Worker.WorkerAgent[]
Expand Down

0 comments on commit e215d1f

Please sign in to comment.