Skip to content

Commit

Permalink
Merge pull request #10 from peppy/process-batching
Browse files Browse the repository at this point in the history
Add basic batch result processing support
  • Loading branch information
peppy authored May 17, 2022
2 parents dbcd117 + b2b21f8 commit 7950742
Show file tree
Hide file tree
Showing 5 changed files with 338 additions and 28 deletions.
241 changes: 241 additions & 0 deletions osu.Server.QueueProcessor.Tests/BatchProcessorTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,241 @@
// Copyright (c) ppy Pty Ltd <[email protected]>. Licensed under the MIT Licence.
// See the LICENCE file in the repository root for full licence text.

using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Xunit;
using Xunit.Abstractions;

namespace osu.Server.QueueProcessor.Tests;

public class BatchProcessorTests
{
private readonly ITestOutputHelper output;
private readonly TestBatchProcessor processor;

public BatchProcessorTests(ITestOutputHelper output)
{
this.output = output;

processor = new TestBatchProcessor();
processor.ClearQueue();
}

/// <summary>
/// Checking that processing an empty queue works as expected.
/// </summary>
[Fact]
public void ProcessEmptyQueue()
{
processor.Run(new CancellationTokenSource(1000).Token);
}

[Fact]
public void SendThenReceive_Single()
{
var cts = new CancellationTokenSource(10000);

var obj = FakeData.New();

FakeData? receivedObject = null;

processor.PushToQueue(obj);

processor.Received += o =>
{
receivedObject = o;
cts.Cancel();
};

processor.Run(cts.Token);

Assert.Equal(obj, receivedObject);
}

[Fact]
public void SendThenReceive_Multiple()
{
const int send_count = 20;

var cts = new CancellationTokenSource(10000);

var objects = new HashSet<FakeData>();
for (int i = 0; i < send_count; i++)
objects.Add(FakeData.New());

var receivedObjects = new HashSet<FakeData>();

foreach (var obj in objects)
processor.PushToQueue(obj);

processor.Received += o =>
{
lock (receivedObjects)
receivedObjects.Add(o);

if (receivedObjects.Count == send_count)
cts.Cancel();
};

processor.Run(cts.Token);

Assert.Equal(objects, receivedObjects);
}

/// <summary>
/// If the processor is cancelled mid-operation, every item should either be processed or still in the queue.
/// </summary>
[Fact]
public void EnsureCancellingDoesNotLoseItems()
{
var inFlightObjects = new List<FakeData>();

int processed = 0;
int sent = 0;

processor.Received += o =>
{
lock (inFlightObjects)
{
inFlightObjects.Remove(o);
Interlocked.Increment(ref processed);
}
};

// start and stop processing multiple times, checking items are in a good state each time.
for (int i = 0; i < 5; i++)
{
var cts = new CancellationTokenSource();

var sendTask = Task.Run(() =>
{
while (!cts.IsCancellationRequested)
{
var obj = FakeData.New();

lock (inFlightObjects)
{
processor.PushToQueue(obj);
inFlightObjects.Add(obj);
}

Interlocked.Increment(ref sent);
}
}, CancellationToken.None);

var receiveTask = Task.Run(() => processor.Run((cts = new CancellationTokenSource()).Token), CancellationToken.None);

Thread.Sleep(1000);

cts.Cancel();

sendTask.Wait(10000);
receiveTask.Wait(10000);

output.WriteLine($"Sent: {sent} In-flight: {inFlightObjects.Count} Processed: {processed}");

Assert.Equal(inFlightObjects.Count, processor.GetQueueSize());
}

var finalCts = new CancellationTokenSource(10000);

processor.Received += _ =>
{
if (inFlightObjects.Count == 0)
// early cancel once the list is emptied.
finalCts.Cancel();
};

// process all remaining items
processor.Run(finalCts.Token);

Assert.Empty(inFlightObjects);
Assert.Equal(0, processor.GetQueueSize());

output.WriteLine($"Sent: {sent} In-flight: {inFlightObjects.Count} Processed: {processed}");
}

[Fact]
public void SendThenErrorDoesRetry()
{
var cts = new CancellationTokenSource(10000);

var obj = FakeData.New();

FakeData? receivedObject = null;

bool didThrowOnce = false;

processor.PushToQueue(obj);

processor.Received += o =>
{
if (o.TotalRetries == 0)
{
didThrowOnce = true;
throw new Exception();
}

receivedObject = o;
cts.Cancel();
};

processor.Run(cts.Token);

Assert.True(didThrowOnce);
Assert.Equal(obj, receivedObject);
}

[Fact]
public void SendThenErrorForeverDoesDrop()
{
var cts = new CancellationTokenSource(10000);

var obj = FakeData.New();

int attemptCount = 0;

processor.PushToQueue(obj);

processor.Received += o =>
{
attemptCount++;
if (attemptCount > 3)
cts.Cancel();

throw new Exception();
};

processor.Run(cts.Token);

Assert.Equal(4, attemptCount);
Assert.Equal(0, processor.GetQueueSize());
}

[Fact]
public void ExitOnErrorThresholdHit()
{
var cts = new CancellationTokenSource(10000);

int attemptCount = 0;

// 3 retries for each, so at least one should remain in queue.
processor.PushToQueue(FakeData.New());
processor.PushToQueue(FakeData.New());
processor.PushToQueue(FakeData.New());
processor.PushToQueue(FakeData.New());

processor.Received += o =>
{
o.Failed = true;
attemptCount++;
};

Assert.Throws<Exception>(() => processor.Run(cts.Token));

Assert.True(attemptCount >= 10, "attemptCount >= 10");
Assert.NotEqual(0, processor.GetQueueSize());
}
}
29 changes: 29 additions & 0 deletions osu.Server.QueueProcessor.Tests/TestBatchProcessor.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
// Copyright (c) ppy Pty Ltd <[email protected]>. Licensed under the MIT Licence.
// See the LICENCE file in the repository root for full licence text.

using System;
using System.Collections.Generic;

namespace osu.Server.QueueProcessor.Tests;

public class TestBatchProcessor : QueueProcessor<FakeData>
{
public TestBatchProcessor()
: base(new QueueConfiguration
{
InputQueueName = "test-batch",
BatchSize = 5,
})
{
}

protected override void ProcessResults(IEnumerable<FakeData> items)
{
foreach (var item in items)
{
Received?.Invoke(item);
}
}

public Action<FakeData>? Received;
}
7 changes: 6 additions & 1 deletion osu.Server.QueueProcessor/QueueConfiguration.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,5 +29,10 @@ public class QueueConfiguration
/// Every error will increment an internal count, while every success will decrement it.
/// </remarks>
public int ErrorThreshold { get; set; } = 10;

/// <summary>
/// Setting above 1 will allow processing in batches (see <see cref="QueueProcessor{T}.ProcessResults"/>).
/// </summary>
public int BatchSize { get; set; } = 1;
}
}
}
10 changes: 10 additions & 0 deletions osu.Server.QueueProcessor/QueueItem.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// See the LICENCE file in the repository root for full licence text.

using System;
using System.Runtime.Serialization;

namespace osu.Server.QueueProcessor
{
Expand All @@ -11,6 +12,15 @@ namespace osu.Server.QueueProcessor
[Serializable]
public abstract class QueueItem
{
/// <summary>
/// Set to <c>true</c> to mark this item is failed. This will cause it to be retried.
/// </summary>
[IgnoreDataMember]
public bool Failed { get; set; }

/// <summary>
/// The number of times processing this item has been retried. Handled internally by <see cref="QueueProcessor{T}"/>.
/// </summary>
public int TotalRetries { get; set; }

/// <summary>
Expand Down
Loading

0 comments on commit 7950742

Please sign in to comment.