Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Convert background cluster networking calls to async - Gossip, Replication #914

Merged
merged 9 commits into from
Jan 16, 2025
16 changes: 8 additions & 8 deletions libs/cluster/Server/Failover/ReplicaFailoverSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ internal sealed partial class FailoverSession : IDisposable
/// <param name="nodeId">Node-id to use for search the connection array</param>
/// <returns></returns>
/// <exception cref="GarnetException"></exception>
private GarnetClient GetOrAddConnection(string nodeId)
private async Task<GarnetClient> GetOrAddConnectionAsync(string nodeId)
{
_ = clusterProvider.clusterManager.clusterConnectionStore.GetConnection(nodeId, out var gsn);

Expand Down Expand Up @@ -65,7 +65,7 @@ private GarnetClient GetOrAddConnection(string nodeId)
throw new GarnetException($"Connection not established to node {nodeId}");
}

gsn.Initialize();
await gsn.InitializeAsync();

return gsn.Client;
}
Expand All @@ -75,7 +75,7 @@ private GarnetClient GetOrAddConnection(string nodeId)
/// </summary>
/// <param name="nodeId">Id of node to create connection for</param>
/// <returns></returns>
private GarnetClient CreateConnection(string nodeId)
private async Task<GarnetClient> CreateConnectionAsync(string nodeId)
{
var (address, port) = oldConfig.GetEndpointFromNodeId(nodeId);
var client = new GarnetClient(
Expand All @@ -90,7 +90,7 @@ private GarnetClient CreateConnection(string nodeId)
try
{
if (!client.IsConnected)
client.ReconnectAsync().WaitAsync(failoverTimeout, cts.Token).GetAwaiter().GetResult();
await client.ReconnectAsync().WaitAsync(failoverTimeout, cts.Token);

return client;
}
Expand All @@ -108,9 +108,9 @@ private GarnetClient CreateConnection(string nodeId)
/// </summary>
/// <param name="nodeId"></param>
/// <returns></returns>
private GarnetClient GetConnection(string nodeId)
private Task<GarnetClient> GetConnectionAsync(string nodeId)
{
return useGossipConnections ? GetOrAddConnection(nodeId) : CreateConnection(nodeId);
return useGossipConnections ? GetOrAddConnectionAsync(nodeId) : CreateConnectionAsync(nodeId);
}

/// <summary>
Expand All @@ -120,7 +120,7 @@ private GarnetClient GetConnection(string nodeId)
private async Task<bool> PauseWritesAndWaitForSync()
{
var primaryId = oldConfig.LocalNodePrimaryId;
var client = GetConnection(primaryId);
var client = await GetConnectionAsync(primaryId);
try
{
if (client == null)
Expand Down Expand Up @@ -210,7 +210,7 @@ private async Task BroadcastConfigAndRequestAttach(string replicaId, byte[] conf
{
var oldPrimaryId = oldConfig.LocalNodePrimaryId;
var newConfig = clusterProvider.clusterManager.CurrentConfig;
var client = oldPrimaryId.Equals(replicaId) ? primaryClient : GetConnection(replicaId);
var client = oldPrimaryId.Equals(replicaId) ? primaryClient : await GetConnectionAsync(replicaId);

try
{
Expand Down
29 changes: 8 additions & 21 deletions libs/cluster/Server/GarnetServerNode.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) Microsoft Corporation.
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.

using System;
Expand Down Expand Up @@ -29,11 +29,6 @@ internal sealed class GarnetServerNode
/// </summary>
ClusterConfig lastConfig = null;

/// <summary>
/// Gossip with meet command lock
/// </summary>
SingleWriterMultiReaderLock meetLock;

/// <summary>
/// Outstanding gossip task if any
/// </summary>
Expand Down Expand Up @@ -95,13 +90,13 @@ public GarnetServerNode(ClusterProvider clusterProvider, string address, int por
/// Initialize connection and cancellation tokens.
/// Initialization is performed only once
/// </summary>
public void Initialize()
public Task InitializeAsync()
{
// Ensure initialize executes only once
if (Interlocked.CompareExchange(ref initialized, 1, 0) != 0) return;
if (Interlocked.CompareExchange(ref initialized, 1, 0) != 0) return Task.CompletedTask;

cts = CancellationTokenSource.CreateLinkedTokenSource(clusterProvider.clusterManager.ctsGossip.Token, internalCts.Token);
gc.ReconnectAsync().WaitAsync(clusterProvider.clusterManager.gossipDelay, cts.Token).GetAwaiter().GetResult();
return gc.ReconnectAsync().WaitAsync(clusterProvider.clusterManager.gossipDelay, cts.Token);
}

public void Dispose()
Expand Down Expand Up @@ -200,19 +195,11 @@ private Task Gossip(byte[] configByteArray)
/// </summary>
/// <param name="configByteArray"></param>
/// <returns></returns>
public MemoryResult<byte> TryMeet(byte[] configByteArray)
public async Task<MemoryResult<byte>> TryMeetAsync(byte[] configByteArray)
{
try
{
_ = meetLock.TryWriteLock();
UpdateGossipSend();
var resp = gc.GossipWithMeet(configByteArray).WaitAsync(clusterProvider.clusterManager.clusterTimeout, cts.Token).GetAwaiter().GetResult();
return resp;
}
finally
{
meetLock.WriteUnlock();
}
UpdateGossipSend();
var resp = await gc.GossipWithMeet(configByteArray).WaitAsync(clusterProvider.clusterManager.clusterTimeout, cts.Token);
return resp;
}

/// <summary>
Expand Down
16 changes: 8 additions & 8 deletions libs/cluster/Server/Gossip.cs
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ public bool TryMerge(ClusterConfig senderConfig, bool acquireLock = true)
/// <param name="address"></param>
/// <param name="port"></param>
public void RunMeetTask(string address, int port)
=> Task.Run(() => TryMeet(address, port));
=> Task.Run(async () => await TryMeetAsync(address, port));

/// <summary>
/// This task will immediately communicate with the new node and try to merge the retrieve configuration to its own.
Expand All @@ -143,7 +143,7 @@ public void RunMeetTask(string address, int port)
/// <param name="address">Address of node to issue meet to</param>
/// <param name="port"> Port of node to issue meet to</param>
/// <param name="acquireLock">Whether to acquire lock for merging. Default true</param>
public void TryMeet(string address, int port, bool acquireLock = true)
public async Task TryMeetAsync(string address, int port, bool acquireLock = true)
{
GarnetServerNode gsn = null;
var conf = CurrentConfig;
Expand All @@ -165,10 +165,10 @@ public void TryMeet(string address, int port, bool acquireLock = true)

// Initialize GarnetServerNode
// Thread-Safe initialization executes only once
gsn.Initialize();
await gsn.InitializeAsync();

// Send full config in Gossip
resp = gsn.TryMeet(conf.ToByteArray());
resp = await gsn.TryMeetAsync(conf.ToByteArray());
if (resp.Length > 0)
{
var other = ClusterConfig.FromByteArray(resp.Span.ToArray());
Expand Down Expand Up @@ -203,15 +203,15 @@ public void TryMeet(string address, int port, bool acquireLock = true)
/// <summary>
/// Main gossip async task
/// </summary>
async void GossipMain()
async Task GossipMain()
{
// Main gossip loop
try
{
while (true)
{
if (ctsGossip.Token.IsCancellationRequested) return;
InitConnections();
await InitConnections();

// Choose between full broadcast or sample gossip to few nodes
if (GossipSamplePercent == 100)
Expand Down Expand Up @@ -240,7 +240,7 @@ async void GossipMain()
}

// Initialize connections for nodes that have either been dispose due to banlist (after expiry) or timeout
void InitConnections()
async Task InitConnections()
{
DisposeBannedWorkerConnections();

Expand All @@ -263,7 +263,7 @@ void InitConnections()
};
try
{
gsn.Initialize();
await gsn.InitializeAsync();
if (!clusterConnectionStore.AddConnection(gsn))
gsn.Dispose();
}
Expand Down
8 changes: 4 additions & 4 deletions libs/cluster/Server/Migration/MigrationDriver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

namespace Garnet.cluster
{
internal sealed unsafe partial class MigrateSession : IDisposable
internal sealed partial class MigrateSession : IDisposable
{
/// <summary>
/// Begin migration task
Expand Down Expand Up @@ -49,7 +49,7 @@ public bool TryStartMigrationTask(out ReadOnlySpan<byte> errorMessage)
/// <summary>
/// Migrate slots session background task
/// </summary>
private void BeginAsyncMigrationTask()
private async Task BeginAsyncMigrationTask()
{
var configResumed = true;
try
Expand Down Expand Up @@ -91,7 +91,7 @@ private void BeginAsyncMigrationTask()
// Lock config merge to avoid a background epoch bump
clusterProvider.clusterManager.SuspendConfigMerge();
configResumed = false;
clusterProvider.clusterManager.TryMeet(_targetAddress, _targetPort, acquireLock: false);
await clusterProvider.clusterManager.TryMeetAsync(_targetAddress, _targetPort, acquireLock: false);

// Change ownership of slots to target node.
if (!TrySetSlotRanges(GetTargetNodeId, MigrateState.NODE))
Expand All @@ -112,7 +112,7 @@ private void BeginAsyncMigrationTask()
}

// Gossip again to ensure that source and target agree on the slot exchange
clusterProvider.clusterManager.TryMeet(_targetAddress, _targetPort, acquireLock: false);
await clusterProvider.clusterManager.TryMeetAsync(_targetAddress, _targetPort, acquireLock: false);

// Ensure that config merge resumes
clusterProvider.clusterManager.ResumeConfigMerge();
Expand Down
Loading