Skip to content

Commit

Permalink
Fix duplicate consumption of events when connection is broken
Browse files Browse the repository at this point in the history
  • Loading branch information
realLiangshiwei committed Feb 7, 2025
1 parent 7b293c3 commit de10dcc
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ public virtual void Dispose()

protected virtual Task EnsureInitializedAsync()
{
if (ChannelAccessor != null)
if (ChannelAccessor != null && ChannelAccessor.Channel.IsOpen)
{
return Task.CompletedTask;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ public override void ConfigureServices(ServiceConfigurationContext context)
foreach (var connectionFactory in options.Connections.Values)
{
connectionFactory.DispatchConsumersAsync = true;
connectionFactory.AutomaticRecoveryEnabled = false;
}
});
}
Expand Down
38 changes: 25 additions & 13 deletions framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/ConnectionPool.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,21 +24,19 @@ public ConnectionPool(IOptions<AbpRabbitMqOptions> options)
public virtual IConnection Get(string? connectionName = null)
{
connectionName ??= RabbitMqConnections.DefaultConnectionName;

var connectionFactory = Options.Connections.GetOrDefault(connectionName);
try
{
var lazyConnection = Connections.GetOrAdd(
connectionName, () => new Lazy<IConnection>(() =>
{
var connection = Options.Connections.GetOrDefault(connectionName);
var hostnames = connection.HostName.TrimEnd(';').Split(';');
// Handle Rabbit MQ Cluster.
return hostnames.Length == 1 ? connection.CreateConnection() : connection.CreateConnection(hostnames);

})
);

return lazyConnection.Value;
var connection = GetConnection(connectionName, connectionFactory);

if (connection.IsOpen)
{
return connection;
}

connection.Dispose();
Connections.TryRemove(connectionName, out _);
return GetConnection(connectionName, connectionFactory);
}
catch (Exception)
{
Expand All @@ -47,6 +45,20 @@ public virtual IConnection Get(string? connectionName = null)
}
}

protected virtual IConnection GetConnection(string connectionName, ConnectionFactory connectionFactory)
{
return Connections.GetOrAdd(
connectionName, () => new Lazy<IConnection>(() =>
{
var hostnames = connectionFactory.HostName.TrimEnd(';').Split(';');
// Handle Rabbit MQ Cluster.
return hostnames.Length == 1
? connectionFactory.CreateConnection()
: connectionFactory.CreateConnection(hostnames);
})
).Value;
}

public void Dispose()
{
if (_isDisposed)
Expand Down

0 comments on commit de10dcc

Please sign in to comment.