Skip to content

Commit

Permalink
Merge pull request #15 from peppy/fix-shutdown-item-handling
Browse files Browse the repository at this point in the history
Fix shutdown item handling
  • Loading branch information
peppy authored Jul 21, 2022
2 parents ce88f41 + d576b65 commit a21fa4c
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 8 deletions.
13 changes: 9 additions & 4 deletions osu.Server.QueueProcessor.Tests/BatchProcessorTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,11 @@ public void EnsureCancellingDoesNotLoseItems()
}
};

const int run_count = 5;

// start and stop processing multiple times, checking items are in a good state each time.
for (int i = 0; i < 5; i++)

for (int i = 0; i < run_count; i++)
{
var cts = new CancellationTokenSource();

Expand All @@ -125,7 +128,11 @@ public void EnsureCancellingDoesNotLoseItems()
}
}, CancellationToken.None);

var receiveTask = Task.Run(() => processor.Run((cts = new CancellationTokenSource()).Token), CancellationToken.None);
// Ensure there are some items in the queue before starting the processor.
while (inFlightObjects.Count < 1000)
Thread.Sleep(100);

var receiveTask = Task.Run(() => processor.Run(cts.Token), CancellationToken.None);

Thread.Sleep(1000);

Expand All @@ -135,8 +142,6 @@ public void EnsureCancellingDoesNotLoseItems()
receiveTask.Wait(10000);

output.WriteLine($"Sent: {sent} In-flight: {inFlightObjects.Count} Processed: {processed}");

Assert.Equal(inFlightObjects.Count, processor.GetQueueSize());
}

var finalCts = new CancellationTokenSource(10000);
Expand Down
13 changes: 9 additions & 4 deletions osu.Server.QueueProcessor.Tests/InputOnlyQueueTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,11 @@ public void EnsureCancellingDoesNotLoseItems()
}
};

const int run_count = 5;

// start and stop processing multiple times, checking items are in a good state each time.
for (int i = 0; i < 5; i++)

for (int i = 0; i < run_count; i++)
{
var cts = new CancellationTokenSource();

Expand All @@ -114,7 +117,11 @@ public void EnsureCancellingDoesNotLoseItems()
}
}, CancellationToken.None);

var receiveTask = Task.Run(() => processor.Run((cts = new CancellationTokenSource()).Token), CancellationToken.None);
// Ensure there are some items in the queue before starting the processor.
while (inFlightObjects.Count < 1000)
Thread.Sleep(100);

var receiveTask = Task.Run(() => processor.Run(cts.Token), CancellationToken.None);

Thread.Sleep(1000);

Expand All @@ -124,8 +131,6 @@ public void EnsureCancellingDoesNotLoseItems()
receiveTask.Wait(10000);

output.WriteLine($"Sent: {sent} In-flight: {inFlightObjects.Count} Processed: {processed}");

Assert.Equal(inFlightObjects.Count, processor.GetQueueSize());
}

var finalCts = new CancellationTokenSource(10000);
Expand Down
8 changes: 8 additions & 0 deletions osu.Server.QueueProcessor/QueueProcessor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,14 @@ public void Run(CancellationToken cancellation = default)
}

Console.WriteLine("Shutting down..");

while (totalInFlight > 0)
{
Console.WriteLine($"Waiting for remaining {totalInFlight} in-flight items...");
Thread.Sleep(5000);
}

Console.WriteLine("Bye!");
}
}

Expand Down

0 comments on commit a21fa4c

Please sign in to comment.