diff --git a/src/BuildingBlocks/EventBus/EventBusRabbitMQ/DefaultRabbitMQPersisterConnection.cs b/src/BuildingBlocks/EventBus/EventBusRabbitMQ/DefaultRabbitMQPersisterConnection.cs index 2e0555e61..e3c8f3844 100644 --- a/src/BuildingBlocks/EventBus/EventBusRabbitMQ/DefaultRabbitMQPersisterConnection.cs +++ b/src/BuildingBlocks/EventBus/EventBusRabbitMQ/DefaultRabbitMQPersisterConnection.cs @@ -10,122 +10,127 @@ using System.Net.Sockets; namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ { - public class DefaultRabbitMQPersistentConnection - : IRabbitMQPersistentConnection - { - private readonly IConnectionFactory _connectionFactory; - private readonly ILogger _logger; - private readonly int _retryCount; - IConnection _connection; - bool _disposed; - - object sync_root = new object(); - - public DefaultRabbitMQPersistentConnection(IConnectionFactory connectionFactory, ILogger logger, int retryCount = 5) - { - _connectionFactory = connectionFactory ?? throw new ArgumentNullException(nameof(connectionFactory)); - _logger = logger ?? throw new ArgumentNullException(nameof(logger)); - _retryCount = retryCount; - } - - public bool IsConnected - { - get - { - return _connection != null && _connection.IsOpen && !_disposed; - } - } - - public IModel CreateModel() - { - if (!IsConnected) - { - throw new InvalidOperationException("No RabbitMQ connections are available to perform this action"); - } - - return _connection.CreateModel(); - } - - public void Dispose() - { - if (_disposed) return; - - _disposed = true; - - try - { - _connection.Dispose(); - } - catch (IOException ex) - { - _logger.LogCritical(ex.ToString()); - } - } - - public bool TryConnect() - { - _logger.LogInformation("RabbitMQ Client is trying to connect"); - - lock (sync_root) - { - var policy = RetryPolicy.Handle() - .Or() - .WaitAndRetry(_retryCount, retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)), (ex, time) => - { - _logger.LogWarning(ex.ToString()); - } - ); - - policy.Execute(() => - { - _connection = _connectionFactory - .CreateConnection(); - }); - - if (IsConnected) - { - _connection.ConnectionShutdown += OnConnectionShutdown; - _connection.CallbackException += OnCallbackException; - _connection.ConnectionBlocked += OnConnectionBlocked; - - _logger.LogInformation($"RabbitMQ persistent connection acquired a connection {_connection.Endpoint.HostName} and is subscribed to failure events"); - - return true; - } - else - { - _logger.LogCritical("FATAL ERROR: RabbitMQ connections could not be created and opened"); - - return false; - } - } - } - - private void OnConnectionBlocked(object sender, ConnectionBlockedEventArgs e) - { - if (_disposed) return; - - _logger.LogWarning("A RabbitMQ connection is shutdown. Trying to re-connect..."); - - TryConnect(); - } - - void OnCallbackException(object sender, CallbackExceptionEventArgs e) - { - if (_disposed) return; - - _logger.LogWarning("A RabbitMQ connection throw exception. Trying to re-connect..."); - - TryConnect(); - } - - void OnConnectionShutdown(object sender, ShutdownEventArgs reason) - { - if (_disposed) return; - - _logger.LogWarning("A RabbitMQ connection is on shutdown. Trying to re-connect..."); - - TryConnect(); - } - } + public class DefaultRabbitMQPersistentConnection + : IRabbitMQPersistentConnection + { + private readonly IConnectionFactory _connectionFactory; + private readonly ILogger _logger; + private readonly int _retryCount; + IConnection _connection; + bool _disposed; + readonly object sync_root = new object(); + + public DefaultRabbitMQPersistentConnection(IConnectionFactory connectionFactory, ILogger logger, int retryCount = 5) + { + _connectionFactory = connectionFactory ?? throw new ArgumentNullException(nameof(connectionFactory)); + _logger = logger ?? throw new ArgumentNullException(nameof(logger)); + _retryCount = retryCount; + } + + public bool IsConnected => _connection != null && _connection.IsOpen && !_disposed; + + public IModel CreateModel() + { + if (!IsConnected) + { + throw new InvalidOperationException("No RabbitMQ connections are available to perform this action"); + } + + return _connection.CreateModel(); + } + + public void Dispose() + { + if (_disposed) + { + return; + } + + _disposed = true; + + try + { + _connection.Dispose(); + } + catch (IOException ex) + { + _logger.LogCritical(ex.ToString()); + } + } + + public bool TryConnect() + { + _logger.LogInformation("RabbitMQ Client is trying to connect"); + + lock (sync_root) + { + var policy = RetryPolicy.Handle() + .Or() + .WaitAndRetry(_retryCount, retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)), (ex, time) => + { + _logger.LogWarning(ex.ToString()); + } + ); + + policy.Execute(() => + { + _connection = _connectionFactory + .CreateConnection(); + }); + + if (IsConnected) + { + _connection.ConnectionShutdown += OnConnectionShutdown; + _connection.CallbackException += OnCallbackException; + _connection.ConnectionBlocked += OnConnectionBlocked; + + _logger.LogInformation($"RabbitMQ persistent connection acquired a connection {_connection.Endpoint.HostName} and is subscribed to failure events"); + + return true; + } + else + { + _logger.LogCritical("FATAL ERROR: RabbitMQ connections could not be created and opened"); + + return false; + } + } + } + + private void OnConnectionBlocked(object sender, ConnectionBlockedEventArgs e) + { + if (_disposed) + { + return; + } + + _logger.LogWarning("A RabbitMQ connection is shutdown. Trying to re-connect..."); + + TryConnect(); + } + + void OnCallbackException(object sender, CallbackExceptionEventArgs e) + { + if (_disposed) + { + return; + } + + _logger.LogWarning("A RabbitMQ connection throw exception. Trying to re-connect..."); + + TryConnect(); + } + + void OnConnectionShutdown(object sender, ShutdownEventArgs reason) + { + if (_disposed) + { + return; + } + + _logger.LogWarning("A RabbitMQ connection is on shutdown. Trying to re-connect..."); + + TryConnect(); + } + } } diff --git a/src/BuildingBlocks/EventBus/EventBusRabbitMQ/EventBusRabbitMQ.cs b/src/BuildingBlocks/EventBus/EventBusRabbitMQ/EventBusRabbitMQ.cs index 49a417635..c62b0831b 100644 --- a/src/BuildingBlocks/EventBus/EventBusRabbitMQ/EventBusRabbitMQ.cs +++ b/src/BuildingBlocks/EventBus/EventBusRabbitMQ/EventBusRabbitMQ.cs @@ -17,220 +17,220 @@ using System.Threading.Tasks; namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ { - public class EventBusRabbitMQ : IEventBus, IDisposable - { - const string BROKER_NAME = "eshop_event_bus"; - - private readonly IRabbitMQPersistentConnection _persistentConnection; - private readonly ILogger _logger; - private readonly IEventBusSubscriptionsManager _subsManager; - private readonly ILifetimeScope _autofac; - private readonly string AUTOFAC_SCOPE_NAME = "eshop_event_bus"; - private readonly int _retryCount; - - private IModel _consumerChannel; - private string _queueName; - - public EventBusRabbitMQ(IRabbitMQPersistentConnection persistentConnection, ILogger logger, - ILifetimeScope autofac, IEventBusSubscriptionsManager subsManager, string queueName = null, int retryCount = 5) - { - _persistentConnection = persistentConnection ?? throw new ArgumentNullException(nameof(persistentConnection)); - _logger = logger ?? throw new ArgumentNullException(nameof(logger)); - _subsManager = subsManager ?? new InMemoryEventBusSubscriptionsManager(); - _queueName = queueName; - _consumerChannel = CreateConsumerChannel(); - _autofac = autofac; - _retryCount = retryCount; - _subsManager.OnEventRemoved += SubsManager_OnEventRemoved; - } - - private void SubsManager_OnEventRemoved(object sender, string eventName) - { - if (!_persistentConnection.IsConnected) - { - _persistentConnection.TryConnect(); - } - - using (var channel = _persistentConnection.CreateModel()) - { - channel.QueueUnbind(queue: _queueName, - exchange: BROKER_NAME, - routingKey: eventName); - - if (_subsManager.IsEmpty) - { - _queueName = string.Empty; - _consumerChannel.Close(); - } - } - } - - public void Publish(IntegrationEvent @event) - { - if (!_persistentConnection.IsConnected) - { - _persistentConnection.TryConnect(); - } - - var policy = RetryPolicy.Handle() - .Or() - .WaitAndRetry(_retryCount, retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)), (ex, time) => - { - _logger.LogWarning(ex.ToString()); - }); - - using (var channel = _persistentConnection.CreateModel()) - { - var eventName = @event.GetType() - .Name; - - channel.ExchangeDeclare(exchange: BROKER_NAME, - type: "direct"); - - var message = JsonConvert.SerializeObject(@event); - var body = Encoding.UTF8.GetBytes(message); - - policy.Execute(() => - { - var properties = channel.CreateBasicProperties(); - properties.DeliveryMode = 2; // persistent - - channel.BasicPublish(exchange: BROKER_NAME, - routingKey: eventName, - mandatory:true, - basicProperties: properties, - body: body); - }); - } - } - - public void SubscribeDynamic(string eventName) - where TH : IDynamicIntegrationEventHandler - { - DoInternalSubscription(eventName); - _subsManager.AddDynamicSubscription(eventName); - } - - public void Subscribe() - where T : IntegrationEvent - where TH : IIntegrationEventHandler - { - var eventName = _subsManager.GetEventKey(); - DoInternalSubscription(eventName); - _subsManager.AddSubscription(); - } - - private void DoInternalSubscription(string eventName) - { - var containsKey = _subsManager.HasSubscriptionsForEvent(eventName); - if (!containsKey) - { - if (!_persistentConnection.IsConnected) - { - _persistentConnection.TryConnect(); - } - - using (var channel = _persistentConnection.CreateModel()) - { - channel.QueueBind(queue: _queueName, - exchange: BROKER_NAME, - routingKey: eventName); - } - } - } - - public void Unsubscribe() - where TH : IIntegrationEventHandler - where T : IntegrationEvent - { - _subsManager.RemoveSubscription(); - } - - public void UnsubscribeDynamic(string eventName) - where TH : IDynamicIntegrationEventHandler - { - _subsManager.RemoveDynamicSubscription(eventName); - } - - public void Dispose() - { - if (_consumerChannel != null) - { - _consumerChannel.Dispose(); - } - - _subsManager.Clear(); - } - - private IModel CreateConsumerChannel() - { - if (!_persistentConnection.IsConnected) - { - _persistentConnection.TryConnect(); - } - - var channel = _persistentConnection.CreateModel(); - - channel.ExchangeDeclare(exchange: BROKER_NAME, - type: "direct"); - - channel.QueueDeclare(queue: _queueName, - durable: true, - exclusive: false, - autoDelete: false, - arguments: null); - - - var consumer = new EventingBasicConsumer(channel); - consumer.Received += async (model, ea) => - { - var eventName = ea.RoutingKey; - var message = Encoding.UTF8.GetString(ea.Body); - - await ProcessEvent(eventName, message); - - channel.BasicAck(ea.DeliveryTag,multiple:false); - }; - - channel.BasicConsume(queue: _queueName, - autoAck: false, - consumer: consumer); - - channel.CallbackException += (sender, ea) => - { - _consumerChannel.Dispose(); - _consumerChannel = CreateConsumerChannel(); - }; - - return channel; - } - - private async Task ProcessEvent(string eventName, string message) - { - if (_subsManager.HasSubscriptionsForEvent(eventName)) - { - using (var scope = _autofac.BeginLifetimeScope(AUTOFAC_SCOPE_NAME)) - { - var subscriptions = _subsManager.GetHandlersForEvent(eventName); - foreach (var subscription in subscriptions) - { - if (subscription.IsDynamic) - { - var handler = scope.ResolveOptional(subscription.HandlerType) as IDynamicIntegrationEventHandler; - dynamic eventData = JObject.Parse(message); - await handler.Handle(eventData); - } - else - { - var eventType = _subsManager.GetEventTypeByName(eventName); - var integrationEvent = JsonConvert.DeserializeObject(message, eventType); - var handler = scope.ResolveOptional(subscription.HandlerType); - var concreteType = typeof(IIntegrationEventHandler<>).MakeGenericType(eventType); - await (Task)concreteType.GetMethod("Handle").Invoke(handler, new object[] { integrationEvent }); - } - } - } - } - } - } + public class EventBusRabbitMQ : IEventBus, IDisposable + { + const string BROKER_NAME = "eshop_event_bus"; + + private readonly IRabbitMQPersistentConnection _persistentConnection; + private readonly ILogger _logger; + private readonly IEventBusSubscriptionsManager _subsManager; + private readonly ILifetimeScope _autofac; + private readonly string AUTOFAC_SCOPE_NAME = "eshop_event_bus"; + private readonly int _retryCount; + + private IModel _consumerChannel; + private string _queueName; + + public EventBusRabbitMQ(IRabbitMQPersistentConnection persistentConnection, ILogger logger, + ILifetimeScope autofac, IEventBusSubscriptionsManager subsManager, string queueName = null, int retryCount = 5) + { + _persistentConnection = persistentConnection ?? throw new ArgumentNullException(nameof(persistentConnection)); + _logger = logger ?? throw new ArgumentNullException(nameof(logger)); + _subsManager = subsManager ?? new InMemoryEventBusSubscriptionsManager(); + _queueName = queueName; + _consumerChannel = CreateConsumerChannel(); + _autofac = autofac; + _retryCount = retryCount; + _subsManager.OnEventRemoved += SubsManager_OnEventRemoved; + } + + private void SubsManager_OnEventRemoved(object sender, string eventName) + { + if (!_persistentConnection.IsConnected) + { + _persistentConnection.TryConnect(); + } + + using (var channel = _persistentConnection.CreateModel()) + { + channel.QueueUnbind(queue: _queueName, + exchange: BROKER_NAME, + routingKey: eventName); + + if (_subsManager.IsEmpty) + { + _queueName = string.Empty; + _consumerChannel.Close(); + } + } + } + + public void Publish(IntegrationEvent @event) + { + if (!_persistentConnection.IsConnected) + { + _persistentConnection.TryConnect(); + } + + var policy = Policy.Handle() + .Or() + .WaitAndRetry(_retryCount, retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)), (ex, time) => + { + _logger.LogWarning(ex.ToString()); + }); + + using (var channel = _persistentConnection.CreateModel()) + { + var eventName = @event.GetType() + .Name; + + channel.ExchangeDeclare(exchange: BROKER_NAME, + type: "direct"); + + var message = JsonConvert.SerializeObject(@event); + var body = Encoding.UTF8.GetBytes(message); + + policy.Execute(() => + { + var properties = channel.CreateBasicProperties(); + properties.DeliveryMode = 2; // persistent + + channel.BasicPublish(exchange: BROKER_NAME, + routingKey: eventName, + mandatory: true, + basicProperties: properties, + body: body); + }); + } + } + + public void SubscribeDynamic(string eventName) + where TH : IDynamicIntegrationEventHandler + { + DoInternalSubscription(eventName); + _subsManager.AddDynamicSubscription(eventName); + } + + public void Subscribe() + where T : IntegrationEvent + where TH : IIntegrationEventHandler + { + var eventName = _subsManager.GetEventKey(); + DoInternalSubscription(eventName); + _subsManager.AddSubscription(); + } + + private void DoInternalSubscription(string eventName) + { + var containsKey = _subsManager.HasSubscriptionsForEvent(eventName); + if (!containsKey) + { + if (!_persistentConnection.IsConnected) + { + _persistentConnection.TryConnect(); + } + + using (var channel = _persistentConnection.CreateModel()) + { + channel.QueueBind(queue: _queueName, + exchange: BROKER_NAME, + routingKey: eventName); + } + } + } + + public void Unsubscribe() + where TH : IIntegrationEventHandler + where T : IntegrationEvent + { + _subsManager.RemoveSubscription(); + } + + public void UnsubscribeDynamic(string eventName) + where TH : IDynamicIntegrationEventHandler + { + _subsManager.RemoveDynamicSubscription(eventName); + } + + public void Dispose() + { + if (_consumerChannel != null) + { + _consumerChannel.Dispose(); + } + + _subsManager.Clear(); + } + + private IModel CreateConsumerChannel() + { + if (!_persistentConnection.IsConnected) + { + _persistentConnection.TryConnect(); + } + + var channel = _persistentConnection.CreateModel(); + + channel.ExchangeDeclare(exchange: BROKER_NAME, + type: "direct"); + + channel.QueueDeclare(queue: _queueName, + durable: true, + exclusive: false, + autoDelete: false, + arguments: null); + + + var consumer = new EventingBasicConsumer(channel); + consumer.Received += async (model, ea) => + { + var eventName = ea.RoutingKey; + var message = Encoding.UTF8.GetString(ea.Body); + + await ProcessEvent(eventName, message); + + channel.BasicAck(ea.DeliveryTag, multiple: false); + }; + + channel.BasicConsume(queue: _queueName, + autoAck: false, + consumer: consumer); + + channel.CallbackException += (sender, ea) => + { + _consumerChannel.Dispose(); + _consumerChannel = CreateConsumerChannel(); + }; + + return channel; + } + + private async Task ProcessEvent(string eventName, string message) + { + if (_subsManager.HasSubscriptionsForEvent(eventName)) + { + using (var scope = _autofac.BeginLifetimeScope(AUTOFAC_SCOPE_NAME)) + { + var subscriptions = _subsManager.GetHandlersForEvent(eventName); + foreach (var subscription in subscriptions) + { + if (subscription.IsDynamic) + { + var handler = scope.ResolveOptional(subscription.HandlerType) as IDynamicIntegrationEventHandler; + dynamic eventData = JObject.Parse(message); + await handler.Handle(eventData); + } + else + { + var eventType = _subsManager.GetEventTypeByName(eventName); + var integrationEvent = JsonConvert.DeserializeObject(message, eventType); + var handler = scope.ResolveOptional(subscription.HandlerType); + var concreteType = typeof(IIntegrationEventHandler<>).MakeGenericType(eventType); + await (Task)concreteType.GetMethod("Handle").Invoke(handler, new object[] { integrationEvent }); + } + } + } + } + } + } } diff --git a/src/BuildingBlocks/EventBus/EventBusRabbitMQ/IRabbitMQPersisterConnection.cs b/src/BuildingBlocks/EventBus/EventBusRabbitMQ/IRabbitMQPersisterConnection.cs index 5893791c5..949d90f1c 100644 --- a/src/BuildingBlocks/EventBus/EventBusRabbitMQ/IRabbitMQPersisterConnection.cs +++ b/src/BuildingBlocks/EventBus/EventBusRabbitMQ/IRabbitMQPersisterConnection.cs @@ -3,13 +3,13 @@ using System; namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ { - public interface IRabbitMQPersistentConnection - : IDisposable - { - bool IsConnected { get; } + public interface IRabbitMQPersistentConnection + : IDisposable + { + bool IsConnected { get; } - bool TryConnect(); + bool TryConnect(); - IModel CreateModel(); - } + IModel CreateModel(); + } }