Browse Source

feat: changing the Dispose with potential deadlocks to the DisposeAsync

pull/1934/head
Taras Kovalenko 2 years ago
parent
commit
fe1b9218e0
8 changed files with 31 additions and 42 deletions
  1. +1
    -1
      src/BuildingBlocks/EventBus/EventBus/Extensions/GenericTypeExtensions.cs
  2. +10
    -16
      src/BuildingBlocks/EventBus/EventBusRabbitMQ/DefaultRabbitMQPersistentConnection.cs
  3. +1
    -1
      src/BuildingBlocks/EventBus/EventBusRabbitMQ/EventBusRabbitMQ.cs
  4. +4
    -9
      src/BuildingBlocks/EventBus/EventBusServiceBus/DefaultServiceBusPersisterConnection.cs
  5. +10
    -10
      src/BuildingBlocks/EventBus/EventBusServiceBus/EventBusServiceBus.cs
  6. +1
    -1
      src/BuildingBlocks/EventBus/EventBusServiceBus/IServiceBusPersisterConnection.cs
  7. +1
    -1
      src/BuildingBlocks/EventBus/IntegrationEventLogEF/Services/IntegrationEventLogService.cs
  8. +3
    -3
      src/BuildingBlocks/EventBus/IntegrationEventLogEF/Utilities/ResilientTransaction.cs

+ 1
- 1
src/BuildingBlocks/EventBus/EventBus/Extensions/GenericTypeExtensions.cs View File

@ -4,7 +4,7 @@ public static class GenericTypeExtensions
{ {
public static string GetGenericTypeName(this Type type) public static string GetGenericTypeName(this Type type)
{ {
var typeName = string.Empty;
string typeName;
if (type.IsGenericType) if (type.IsGenericType)
{ {


+ 10
- 16
src/BuildingBlocks/EventBus/EventBusRabbitMQ/DefaultRabbitMQPersistentConnection.cs View File

@ -6,10 +6,10 @@ public class DefaultRabbitMQPersistentConnection
private readonly IConnectionFactory _connectionFactory; private readonly IConnectionFactory _connectionFactory;
private readonly ILogger<DefaultRabbitMQPersistentConnection> _logger; private readonly ILogger<DefaultRabbitMQPersistentConnection> _logger;
private readonly int _retryCount; private readonly int _retryCount;
IConnection _connection;
bool _disposed;
private IConnection _connection;
public bool Disposed;
object sync_root = new object();
readonly object _syncRoot = new();
public DefaultRabbitMQPersistentConnection(IConnectionFactory connectionFactory, ILogger<DefaultRabbitMQPersistentConnection> logger, int retryCount = 5) public DefaultRabbitMQPersistentConnection(IConnectionFactory connectionFactory, ILogger<DefaultRabbitMQPersistentConnection> logger, int retryCount = 5)
{ {
@ -18,13 +18,7 @@ public class DefaultRabbitMQPersistentConnection
_retryCount = retryCount; _retryCount = retryCount;
} }
public bool IsConnected
{
get
{
return _connection != null && _connection.IsOpen && !_disposed;
}
}
public bool IsConnected => _connection is { IsOpen: true } && !Disposed;
public IModel CreateModel() public IModel CreateModel()
{ {
@ -38,9 +32,9 @@ public class DefaultRabbitMQPersistentConnection
public void Dispose() public void Dispose()
{ {
if (_disposed) return;
if (Disposed) return;
_disposed = true;
Disposed = true;
try try
{ {
@ -59,7 +53,7 @@ public class DefaultRabbitMQPersistentConnection
{ {
_logger.LogInformation("RabbitMQ Client is trying to connect"); _logger.LogInformation("RabbitMQ Client is trying to connect");
lock (sync_root)
lock (_syncRoot)
{ {
var policy = RetryPolicy.Handle<SocketException>() var policy = RetryPolicy.Handle<SocketException>()
.Or<BrokerUnreachableException>() .Or<BrokerUnreachableException>()
@ -96,7 +90,7 @@ public class DefaultRabbitMQPersistentConnection
private void OnConnectionBlocked(object sender, ConnectionBlockedEventArgs e) private void OnConnectionBlocked(object sender, ConnectionBlockedEventArgs e)
{ {
if (_disposed) return;
if (Disposed) return;
_logger.LogWarning("A RabbitMQ connection is shutdown. Trying to re-connect..."); _logger.LogWarning("A RabbitMQ connection is shutdown. Trying to re-connect...");
@ -105,7 +99,7 @@ public class DefaultRabbitMQPersistentConnection
void OnCallbackException(object sender, CallbackExceptionEventArgs e) void OnCallbackException(object sender, CallbackExceptionEventArgs e)
{ {
if (_disposed) return;
if (Disposed) return;
_logger.LogWarning("A RabbitMQ connection throw exception. Trying to re-connect..."); _logger.LogWarning("A RabbitMQ connection throw exception. Trying to re-connect...");
@ -114,7 +108,7 @@ public class DefaultRabbitMQPersistentConnection
void OnConnectionShutdown(object sender, ShutdownEventArgs reason) void OnConnectionShutdown(object sender, ShutdownEventArgs reason)
{ {
if (_disposed) return;
if (Disposed) return;
_logger.LogWarning("A RabbitMQ connection is on shutdown. Trying to re-connect..."); _logger.LogWarning("A RabbitMQ connection is on shutdown. Trying to re-connect...");


+ 1
- 1
src/BuildingBlocks/EventBus/EventBusRabbitMQ/EventBusRabbitMQ.cs View File

@ -240,7 +240,7 @@ public class EventBusRabbitMQ : IEventBus, IDisposable
if (_subsManager.HasSubscriptionsForEvent(eventName)) if (_subsManager.HasSubscriptionsForEvent(eventName))
{ {
using var scope = _autofac.BeginLifetimeScope(AUTOFAC_SCOPE_NAME);
await using var scope = _autofac.BeginLifetimeScope(AUTOFAC_SCOPE_NAME);
var subscriptions = _subsManager.GetHandlersForEvent(eventName); var subscriptions = _subsManager.GetHandlersForEvent(eventName);
foreach (var subscription in subscriptions) foreach (var subscription in subscriptions)
{ {


+ 4
- 9
src/BuildingBlocks/EventBus/EventBusServiceBus/DefaultServiceBusPersisterConnection.cs View File

@ -27,13 +27,8 @@ public class DefaultServiceBusPersisterConnection : IServiceBusPersisterConnecti
} }
} }
public ServiceBusAdministrationClient AdministrationClient
{
get
{
return _subscriptionClient;
}
}
public ServiceBusAdministrationClient AdministrationClient =>
_subscriptionClient;
public ServiceBusClient CreateModel() public ServiceBusClient CreateModel()
{ {
@ -45,11 +40,11 @@ public class DefaultServiceBusPersisterConnection : IServiceBusPersisterConnecti
return _topicClient; return _topicClient;
} }
public void Dispose()
public async ValueTask DisposeAsync()
{ {
if (_disposed) return; if (_disposed) return;
_disposed = true; _disposed = true;
_topicClient.DisposeAsync().GetAwaiter().GetResult();
await _topicClient.DisposeAsync();
} }
} }

+ 10
- 10
src/BuildingBlocks/EventBus/EventBusServiceBus/EventBusServiceBus.cs View File

@ -1,6 +1,6 @@
namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusServiceBus; namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusServiceBus;
public class EventBusServiceBus : IEventBus, IDisposable
public class EventBusServiceBus : IEventBus, IAsyncDisposable
{ {
private readonly IServiceBusPersisterConnection _serviceBusPersisterConnection; private readonly IServiceBusPersisterConnection _serviceBusPersisterConnection;
private readonly ILogger<EventBusServiceBus> _logger; private readonly ILogger<EventBusServiceBus> _logger;
@ -8,8 +8,8 @@ public class EventBusServiceBus : IEventBus, IDisposable
private readonly ILifetimeScope _autofac; private readonly ILifetimeScope _autofac;
private readonly string _topicName = "eshop_event_bus"; private readonly string _topicName = "eshop_event_bus";
private readonly string _subscriptionName; private readonly string _subscriptionName;
private ServiceBusSender _sender;
private ServiceBusProcessor _processor;
private readonly ServiceBusSender _sender;
private readonly ServiceBusProcessor _processor;
private readonly string AUTOFAC_SCOPE_NAME = "eshop_event_bus"; private readonly string AUTOFAC_SCOPE_NAME = "eshop_event_bus";
private const string INTEGRATION_EVENT_SUFFIX = "IntegrationEvent"; private const string INTEGRATION_EVENT_SUFFIX = "IntegrationEvent";
@ -134,12 +134,6 @@ public class EventBusServiceBus : IEventBus, IDisposable
await _processor.StartProcessingAsync(); await _processor.StartProcessingAsync();
} }
public void Dispose()
{
_subsManager.Clear();
_processor.CloseAsync().GetAwaiter().GetResult();
}
private Task ErrorHandler(ProcessErrorEventArgs args) private Task ErrorHandler(ProcessErrorEventArgs args)
{ {
var ex = args.Exception; var ex = args.Exception;
@ -173,7 +167,7 @@ public class EventBusServiceBus : IEventBus, IDisposable
var eventType = _subsManager.GetEventTypeByName(eventName); var eventType = _subsManager.GetEventTypeByName(eventName);
var integrationEvent = JsonSerializer.Deserialize(message, eventType); var integrationEvent = JsonSerializer.Deserialize(message, eventType);
var concreteType = typeof(IIntegrationEventHandler<>).MakeGenericType(eventType); var concreteType = typeof(IIntegrationEventHandler<>).MakeGenericType(eventType);
await (Task)concreteType.GetMethod("Handle").Invoke(handler, new object[] { integrationEvent });
await (Task)concreteType.GetMethod("Handle").Invoke(handler, new[] { integrationEvent });
} }
} }
} }
@ -196,4 +190,10 @@ public class EventBusServiceBus : IEventBus, IDisposable
_logger.LogWarning("The messaging entity {DefaultRuleName} Could not be found.", RuleProperties.DefaultRuleName); _logger.LogWarning("The messaging entity {DefaultRuleName} Could not be found.", RuleProperties.DefaultRuleName);
} }
} }
public async ValueTask DisposeAsync()
{
_subsManager.Clear();
await _processor.CloseAsync();
}
} }

+ 1
- 1
src/BuildingBlocks/EventBus/EventBusServiceBus/IServiceBusPersisterConnection.cs View File

@ -1,6 +1,6 @@
namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusServiceBus; namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusServiceBus;
public interface IServiceBusPersisterConnection : IDisposable
public interface IServiceBusPersisterConnection : IAsyncDisposable
{ {
ServiceBusClient TopicClient { get; } ServiceBusClient TopicClient { get; }
ServiceBusAdministrationClient AdministrationClient { get; } ServiceBusAdministrationClient AdministrationClient { get; }

+ 1
- 1
src/BuildingBlocks/EventBus/IntegrationEventLogEF/Services/IntegrationEventLogService.cs View File

@ -28,7 +28,7 @@ public class IntegrationEventLogService : IIntegrationEventLogService, IDisposab
var result = await _integrationEventLogContext.IntegrationEventLogs var result = await _integrationEventLogContext.IntegrationEventLogs
.Where(e => e.TransactionId == tid && e.State == EventStateEnum.NotPublished).ToListAsync(); .Where(e => e.TransactionId == tid && e.State == EventStateEnum.NotPublished).ToListAsync();
if (result != null && result.Any())
if (result.Any())
{ {
return result.OrderBy(o => o.CreationTime) return result.OrderBy(o => o.CreationTime)
.Select(e => e.DeserializeJsonContent(_eventTypes.Find(t => t.Name == e.EventTypeShortName))); .Select(e => e.DeserializeJsonContent(_eventTypes.Find(t => t.Name == e.EventTypeShortName)));


+ 3
- 3
src/BuildingBlocks/EventBus/IntegrationEventLogEF/Utilities/ResilientTransaction.cs View File

@ -2,7 +2,7 @@
public class ResilientTransaction public class ResilientTransaction
{ {
private DbContext _context;
private readonly DbContext _context;
private ResilientTransaction(DbContext context) => private ResilientTransaction(DbContext context) =>
_context = context ?? throw new ArgumentNullException(nameof(context)); _context = context ?? throw new ArgumentNullException(nameof(context));
@ -15,9 +15,9 @@ public class ResilientTransaction
var strategy = _context.Database.CreateExecutionStrategy(); var strategy = _context.Database.CreateExecutionStrategy();
await strategy.ExecuteAsync(async () => await strategy.ExecuteAsync(async () =>
{ {
using var transaction = _context.Database.BeginTransaction();
await using var transaction = await _context.Database.BeginTransactionAsync();
await action(); await action();
transaction.Commit();
await transaction.CommitAsync();
}); });
} }
} }

Loading…
Cancel
Save