Moved using statements to EventBusRabbitMQ project
This commit is contained in:
		
							parent
							
								
									783c69974a
								
							
						
					
					
						commit
						4ed086f675
					
				| @ -1,131 +1,120 @@ | ||||
| using Microsoft.Extensions.Logging; | ||||
| using Polly; | ||||
| using Polly.Retry; | ||||
| using RabbitMQ.Client; | ||||
| using RabbitMQ.Client.Events; | ||||
| using RabbitMQ.Client.Exceptions; | ||||
| using System; | ||||
| using System.IO; | ||||
| using System.Net.Sockets; | ||||
| namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ; | ||||
| 
 | ||||
| namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ | ||||
| public class DefaultRabbitMQPersistentConnection | ||||
|     : IRabbitMQPersistentConnection | ||||
| { | ||||
|     public class DefaultRabbitMQPersistentConnection | ||||
|        : IRabbitMQPersistentConnection | ||||
|     private readonly IConnectionFactory _connectionFactory; | ||||
|     private readonly ILogger<DefaultRabbitMQPersistentConnection> _logger; | ||||
|     private readonly int _retryCount; | ||||
|     IConnection _connection; | ||||
|     bool _disposed; | ||||
| 
 | ||||
|     object sync_root = new object(); | ||||
| 
 | ||||
|     public DefaultRabbitMQPersistentConnection(IConnectionFactory connectionFactory, ILogger<DefaultRabbitMQPersistentConnection> logger, int retryCount = 5) | ||||
|     { | ||||
|         private readonly IConnectionFactory _connectionFactory; | ||||
|         private readonly ILogger<DefaultRabbitMQPersistentConnection> _logger; | ||||
|         private readonly int _retryCount; | ||||
|         IConnection _connection; | ||||
|         bool _disposed; | ||||
|         _connectionFactory = connectionFactory ?? throw new ArgumentNullException(nameof(connectionFactory)); | ||||
|         _logger = logger ?? throw new ArgumentNullException(nameof(logger)); | ||||
|         _retryCount = retryCount; | ||||
|     } | ||||
| 
 | ||||
|         object sync_root = new object(); | ||||
| 
 | ||||
|         public DefaultRabbitMQPersistentConnection(IConnectionFactory connectionFactory, ILogger<DefaultRabbitMQPersistentConnection> logger, int retryCount = 5) | ||||
|     public bool IsConnected | ||||
|     { | ||||
|         get | ||||
|         { | ||||
|             _connectionFactory = connectionFactory ?? throw new ArgumentNullException(nameof(connectionFactory)); | ||||
|             _logger = logger ?? throw new ArgumentNullException(nameof(logger)); | ||||
|             _retryCount = retryCount; | ||||
|         } | ||||
| 
 | ||||
|         public bool IsConnected | ||||
|         { | ||||
|             get | ||||
|             { | ||||
|                 return _connection != null && _connection.IsOpen && !_disposed; | ||||
|             } | ||||
|         } | ||||
| 
 | ||||
|         public IModel CreateModel() | ||||
|         { | ||||
|             if (!IsConnected) | ||||
|             { | ||||
|                 throw new InvalidOperationException("No RabbitMQ connections are available to perform this action"); | ||||
|             } | ||||
| 
 | ||||
|             return _connection.CreateModel(); | ||||
|         } | ||||
| 
 | ||||
|         public void Dispose() | ||||
|         { | ||||
|             if (_disposed) return; | ||||
| 
 | ||||
|             _disposed = true; | ||||
| 
 | ||||
|             try | ||||
|             { | ||||
|                 _connection.Dispose(); | ||||
|             } | ||||
|             catch (IOException ex) | ||||
|             { | ||||
|                 _logger.LogCritical(ex.ToString()); | ||||
|             } | ||||
|         } | ||||
| 
 | ||||
|         public bool TryConnect() | ||||
|         { | ||||
|             _logger.LogInformation("RabbitMQ Client is trying to connect"); | ||||
| 
 | ||||
|             lock (sync_root) | ||||
|             { | ||||
|                 var policy = RetryPolicy.Handle<SocketException>() | ||||
|                     .Or<BrokerUnreachableException>() | ||||
|                     .WaitAndRetry(_retryCount, retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)), (ex, time) => | ||||
|                     { | ||||
|                         _logger.LogWarning(ex, "RabbitMQ Client could not connect after {TimeOut}s ({ExceptionMessage})", $"{time.TotalSeconds:n1}", ex.Message); | ||||
|                     } | ||||
|                 ); | ||||
| 
 | ||||
|                 policy.Execute(() => | ||||
|                 { | ||||
|                     _connection = _connectionFactory | ||||
|                           .CreateConnection(); | ||||
|                 }); | ||||
| 
 | ||||
|                 if (IsConnected) | ||||
|                 { | ||||
|                     _connection.ConnectionShutdown += OnConnectionShutdown; | ||||
|                     _connection.CallbackException += OnCallbackException; | ||||
|                     _connection.ConnectionBlocked += OnConnectionBlocked; | ||||
| 
 | ||||
|                     _logger.LogInformation("RabbitMQ Client acquired a persistent connection to '{HostName}' and is subscribed to failure events", _connection.Endpoint.HostName); | ||||
| 
 | ||||
|                     return true; | ||||
|                 } | ||||
|                 else | ||||
|                 { | ||||
|                     _logger.LogCritical("FATAL ERROR: RabbitMQ connections could not be created and opened"); | ||||
| 
 | ||||
|                     return false; | ||||
|                 } | ||||
|             } | ||||
|         } | ||||
| 
 | ||||
|         private void OnConnectionBlocked(object sender, ConnectionBlockedEventArgs e) | ||||
|         { | ||||
|             if (_disposed) return; | ||||
| 
 | ||||
|             _logger.LogWarning("A RabbitMQ connection is shutdown. Trying to re-connect..."); | ||||
| 
 | ||||
|             TryConnect(); | ||||
|         } | ||||
| 
 | ||||
|         void OnCallbackException(object sender, CallbackExceptionEventArgs e) | ||||
|         { | ||||
|             if (_disposed) return; | ||||
| 
 | ||||
|             _logger.LogWarning("A RabbitMQ connection throw exception. Trying to re-connect..."); | ||||
| 
 | ||||
|             TryConnect(); | ||||
|         } | ||||
| 
 | ||||
|         void OnConnectionShutdown(object sender, ShutdownEventArgs reason) | ||||
|         { | ||||
|             if (_disposed) return; | ||||
| 
 | ||||
|             _logger.LogWarning("A RabbitMQ connection is on shutdown. Trying to re-connect..."); | ||||
| 
 | ||||
|             TryConnect(); | ||||
|             return _connection != null && _connection.IsOpen && !_disposed; | ||||
|         } | ||||
|     } | ||||
| 
 | ||||
|     public IModel CreateModel() | ||||
|     { | ||||
|         if (!IsConnected) | ||||
|         { | ||||
|             throw new InvalidOperationException("No RabbitMQ connections are available to perform this action"); | ||||
|         } | ||||
| 
 | ||||
|         return _connection.CreateModel(); | ||||
|     } | ||||
| 
 | ||||
|     public void Dispose() | ||||
|     { | ||||
|         if (_disposed) return; | ||||
| 
 | ||||
|         _disposed = true; | ||||
| 
 | ||||
|         try | ||||
|         { | ||||
|             _connection.Dispose(); | ||||
|         } | ||||
|         catch (IOException ex) | ||||
|         { | ||||
|             _logger.LogCritical(ex.ToString()); | ||||
|         } | ||||
|     } | ||||
| 
 | ||||
|     public bool TryConnect() | ||||
|     { | ||||
|         _logger.LogInformation("RabbitMQ Client is trying to connect"); | ||||
| 
 | ||||
|         lock (sync_root) | ||||
|         { | ||||
|             var policy = RetryPolicy.Handle<SocketException>() | ||||
|                 .Or<BrokerUnreachableException>() | ||||
|                 .WaitAndRetry(_retryCount, retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)), (ex, time) => | ||||
|                 { | ||||
|                     _logger.LogWarning(ex, "RabbitMQ Client could not connect after {TimeOut}s ({ExceptionMessage})", $"{time.TotalSeconds:n1}", ex.Message); | ||||
|                 } | ||||
|             ); | ||||
| 
 | ||||
|             policy.Execute(() => | ||||
|             { | ||||
|                 _connection = _connectionFactory | ||||
|                         .CreateConnection(); | ||||
|             }); | ||||
| 
 | ||||
|             if (IsConnected) | ||||
|             { | ||||
|                 _connection.ConnectionShutdown += OnConnectionShutdown; | ||||
|                 _connection.CallbackException += OnCallbackException; | ||||
|                 _connection.ConnectionBlocked += OnConnectionBlocked; | ||||
| 
 | ||||
|                 _logger.LogInformation("RabbitMQ Client acquired a persistent connection to '{HostName}' and is subscribed to failure events", _connection.Endpoint.HostName); | ||||
| 
 | ||||
|                 return true; | ||||
|             } | ||||
|             else | ||||
|             { | ||||
|                 _logger.LogCritical("FATAL ERROR: RabbitMQ connections could not be created and opened"); | ||||
| 
 | ||||
|                 return false; | ||||
|             } | ||||
|         } | ||||
|     } | ||||
| 
 | ||||
|     private void OnConnectionBlocked(object sender, ConnectionBlockedEventArgs e) | ||||
|     { | ||||
|         if (_disposed) return; | ||||
| 
 | ||||
|         _logger.LogWarning("A RabbitMQ connection is shutdown. Trying to re-connect..."); | ||||
| 
 | ||||
|         TryConnect(); | ||||
|     } | ||||
| 
 | ||||
|     void OnCallbackException(object sender, CallbackExceptionEventArgs e) | ||||
|     { | ||||
|         if (_disposed) return; | ||||
| 
 | ||||
|         _logger.LogWarning("A RabbitMQ connection throw exception. Trying to re-connect..."); | ||||
| 
 | ||||
|         TryConnect(); | ||||
|     } | ||||
| 
 | ||||
|     void OnConnectionShutdown(object sender, ShutdownEventArgs reason) | ||||
|     { | ||||
|         if (_disposed) return; | ||||
| 
 | ||||
|         _logger.LogWarning("A RabbitMQ connection is on shutdown. Trying to re-connect..."); | ||||
| 
 | ||||
|         TryConnect(); | ||||
|     } | ||||
| } | ||||
|  | ||||
| @ -1,297 +1,279 @@ | ||||
| using Autofac; | ||||
| using Microsoft.eShopOnContainers.BuildingBlocks.EventBus; | ||||
| using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Abstractions; | ||||
| using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Events; | ||||
| using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Extensions; | ||||
| using Microsoft.Extensions.Logging; | ||||
| using Polly; | ||||
| using Polly.Retry; | ||||
| using RabbitMQ.Client; | ||||
| using RabbitMQ.Client.Events; | ||||
| using RabbitMQ.Client.Exceptions; | ||||
| using System; | ||||
| using System.Net.Sockets; | ||||
| using System.Text; | ||||
| using System.Threading.Tasks; | ||||
| using System.Text.Json; | ||||
| namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ; | ||||
| 
 | ||||
| namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ | ||||
| public class EventBusRabbitMQ : IEventBus, IDisposable | ||||
| { | ||||
|     public class EventBusRabbitMQ : IEventBus, IDisposable | ||||
|     const string BROKER_NAME = "eshop_event_bus"; | ||||
|     const string AUTOFAC_SCOPE_NAME = "eshop_event_bus"; | ||||
| 
 | ||||
|     private readonly IRabbitMQPersistentConnection _persistentConnection; | ||||
|     private readonly ILogger<EventBusRabbitMQ> _logger; | ||||
|     private readonly IEventBusSubscriptionsManager _subsManager; | ||||
|     private readonly ILifetimeScope _autofac; | ||||
|     private readonly int _retryCount; | ||||
| 
 | ||||
|     private IModel _consumerChannel; | ||||
|     private string _queueName; | ||||
| 
 | ||||
|     public EventBusRabbitMQ(IRabbitMQPersistentConnection persistentConnection, ILogger<EventBusRabbitMQ> logger, | ||||
|         ILifetimeScope autofac, IEventBusSubscriptionsManager subsManager, string queueName = null, int retryCount = 5) | ||||
|     { | ||||
|         const string BROKER_NAME = "eshop_event_bus"; | ||||
|         const string AUTOFAC_SCOPE_NAME = "eshop_event_bus"; | ||||
|         _persistentConnection = persistentConnection ?? throw new ArgumentNullException(nameof(persistentConnection)); | ||||
|         _logger = logger ?? throw new ArgumentNullException(nameof(logger)); | ||||
|         _subsManager = subsManager ?? new InMemoryEventBusSubscriptionsManager(); | ||||
|         _queueName = queueName; | ||||
|         _consumerChannel = CreateConsumerChannel(); | ||||
|         _autofac = autofac; | ||||
|         _retryCount = retryCount; | ||||
|         _subsManager.OnEventRemoved += SubsManager_OnEventRemoved; | ||||
|     } | ||||
| 
 | ||||
|         private readonly IRabbitMQPersistentConnection _persistentConnection; | ||||
|         private readonly ILogger<EventBusRabbitMQ> _logger; | ||||
|         private readonly IEventBusSubscriptionsManager _subsManager; | ||||
|         private readonly ILifetimeScope _autofac; | ||||
|         private readonly int _retryCount; | ||||
| 
 | ||||
|         private IModel _consumerChannel; | ||||
|         private string _queueName; | ||||
| 
 | ||||
|         public EventBusRabbitMQ(IRabbitMQPersistentConnection persistentConnection, ILogger<EventBusRabbitMQ> logger, | ||||
|             ILifetimeScope autofac, IEventBusSubscriptionsManager subsManager, string queueName = null, int retryCount = 5) | ||||
|     private void SubsManager_OnEventRemoved(object sender, string eventName) | ||||
|     { | ||||
|         if (!_persistentConnection.IsConnected) | ||||
|         { | ||||
|             _persistentConnection = persistentConnection ?? throw new ArgumentNullException(nameof(persistentConnection)); | ||||
|             _logger = logger ?? throw new ArgumentNullException(nameof(logger)); | ||||
|             _subsManager = subsManager ?? new InMemoryEventBusSubscriptionsManager(); | ||||
|             _queueName = queueName; | ||||
|             _consumerChannel = CreateConsumerChannel(); | ||||
|             _autofac = autofac; | ||||
|             _retryCount = retryCount; | ||||
|             _subsManager.OnEventRemoved += SubsManager_OnEventRemoved; | ||||
|             _persistentConnection.TryConnect(); | ||||
|         } | ||||
| 
 | ||||
|         private void SubsManager_OnEventRemoved(object sender, string eventName) | ||||
|         using (var channel = _persistentConnection.CreateModel()) | ||||
|         { | ||||
|             if (!_persistentConnection.IsConnected) | ||||
|             channel.QueueUnbind(queue: _queueName, | ||||
|                 exchange: BROKER_NAME, | ||||
|                 routingKey: eventName); | ||||
| 
 | ||||
|             if (_subsManager.IsEmpty) | ||||
|             { | ||||
|                 _persistentConnection.TryConnect(); | ||||
|             } | ||||
| 
 | ||||
|             using (var channel = _persistentConnection.CreateModel()) | ||||
|             { | ||||
|                 channel.QueueUnbind(queue: _queueName, | ||||
|                     exchange: BROKER_NAME, | ||||
|                     routingKey: eventName); | ||||
| 
 | ||||
|                 if (_subsManager.IsEmpty) | ||||
|                 { | ||||
|                     _queueName = string.Empty; | ||||
|                     _consumerChannel.Close(); | ||||
|                 } | ||||
|             } | ||||
|         } | ||||
| 
 | ||||
|         public void Publish(IntegrationEvent @event) | ||||
|         { | ||||
|             if (!_persistentConnection.IsConnected) | ||||
|             { | ||||
|                 _persistentConnection.TryConnect(); | ||||
|             } | ||||
| 
 | ||||
|             var policy = RetryPolicy.Handle<BrokerUnreachableException>() | ||||
|                 .Or<SocketException>() | ||||
|                 .WaitAndRetry(_retryCount, retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)), (ex, time) => | ||||
|                 { | ||||
|                     _logger.LogWarning(ex, "Could not publish event: {EventId} after {Timeout}s ({ExceptionMessage})", @event.Id, $"{time.TotalSeconds:n1}", ex.Message); | ||||
|                 }); | ||||
| 
 | ||||
|             var eventName = @event.GetType().Name; | ||||
| 
 | ||||
|             _logger.LogTrace("Creating RabbitMQ channel to publish event: {EventId} ({EventName})", @event.Id, eventName); | ||||
| 
 | ||||
|             using (var channel = _persistentConnection.CreateModel()) | ||||
|             { | ||||
|                 _logger.LogTrace("Declaring RabbitMQ exchange to publish event: {EventId}", @event.Id); | ||||
| 
 | ||||
|                 channel.ExchangeDeclare(exchange: BROKER_NAME, type: "direct"); | ||||
|                                  | ||||
|                 var body = JsonSerializer.SerializeToUtf8Bytes(@event, @event.GetType(), new JsonSerializerOptions | ||||
|                 { | ||||
|                     WriteIndented = true | ||||
|                 }); | ||||
| 
 | ||||
|                 policy.Execute(() => | ||||
|                 { | ||||
|                     var properties = channel.CreateBasicProperties(); | ||||
|                     properties.DeliveryMode = 2; // persistent | ||||
| 
 | ||||
|                     _logger.LogTrace("Publishing event to RabbitMQ: {EventId}", @event.Id); | ||||
| 
 | ||||
|                     channel.BasicPublish( | ||||
|                         exchange: BROKER_NAME, | ||||
|                         routingKey: eventName, | ||||
|                         mandatory: true, | ||||
|                         basicProperties: properties, | ||||
|                         body: body); | ||||
|                 }); | ||||
|             } | ||||
|         } | ||||
| 
 | ||||
|         public void SubscribeDynamic<TH>(string eventName) | ||||
|             where TH : IDynamicIntegrationEventHandler | ||||
|         { | ||||
|             _logger.LogInformation("Subscribing to dynamic event {EventName} with {EventHandler}", eventName, typeof(TH).GetGenericTypeName()); | ||||
| 
 | ||||
|             DoInternalSubscription(eventName); | ||||
|             _subsManager.AddDynamicSubscription<TH>(eventName); | ||||
|             StartBasicConsume(); | ||||
|         } | ||||
| 
 | ||||
|         public void Subscribe<T, TH>() | ||||
|             where T : IntegrationEvent | ||||
|             where TH : IIntegrationEventHandler<T> | ||||
|         { | ||||
|             var eventName = _subsManager.GetEventKey<T>(); | ||||
|             DoInternalSubscription(eventName); | ||||
| 
 | ||||
|             _logger.LogInformation("Subscribing to event {EventName} with {EventHandler}", eventName, typeof(TH).GetGenericTypeName()); | ||||
| 
 | ||||
|             _subsManager.AddSubscription<T, TH>(); | ||||
|             StartBasicConsume(); | ||||
|         } | ||||
| 
 | ||||
|         private void DoInternalSubscription(string eventName) | ||||
|         { | ||||
|             var containsKey = _subsManager.HasSubscriptionsForEvent(eventName); | ||||
|             if (!containsKey) | ||||
|             { | ||||
|                 if (!_persistentConnection.IsConnected) | ||||
|                 { | ||||
|                     _persistentConnection.TryConnect(); | ||||
|                 } | ||||
|   | ||||
|                 _consumerChannel.QueueBind(queue: _queueName, | ||||
|                                     exchange: BROKER_NAME, | ||||
|                                     routingKey: eventName); | ||||
|             } | ||||
|         } | ||||
| 
 | ||||
|         public void Unsubscribe<T, TH>() | ||||
|             where T : IntegrationEvent | ||||
|             where TH : IIntegrationEventHandler<T> | ||||
|         { | ||||
|             var eventName = _subsManager.GetEventKey<T>(); | ||||
| 
 | ||||
|             _logger.LogInformation("Unsubscribing from event {EventName}", eventName); | ||||
| 
 | ||||
|             _subsManager.RemoveSubscription<T, TH>(); | ||||
|         } | ||||
| 
 | ||||
|         public void UnsubscribeDynamic<TH>(string eventName) | ||||
|             where TH : IDynamicIntegrationEventHandler | ||||
|         { | ||||
|             _subsManager.RemoveDynamicSubscription<TH>(eventName); | ||||
|         } | ||||
| 
 | ||||
|         public void Dispose() | ||||
|         { | ||||
|             if (_consumerChannel != null) | ||||
|             { | ||||
|                 _consumerChannel.Dispose(); | ||||
|             } | ||||
| 
 | ||||
|             _subsManager.Clear(); | ||||
|         } | ||||
| 
 | ||||
|         private void StartBasicConsume() | ||||
|         { | ||||
|             _logger.LogTrace("Starting RabbitMQ basic consume"); | ||||
| 
 | ||||
|             if (_consumerChannel != null) | ||||
|             { | ||||
|                 var consumer = new AsyncEventingBasicConsumer(_consumerChannel); | ||||
| 
 | ||||
|                 consumer.Received += Consumer_Received; | ||||
| 
 | ||||
|                 _consumerChannel.BasicConsume( | ||||
|                     queue: _queueName, | ||||
|                     autoAck: false, | ||||
|                     consumer: consumer); | ||||
|             } | ||||
|             else | ||||
|             { | ||||
|                 _logger.LogError("StartBasicConsume can't call on _consumerChannel == null"); | ||||
|             } | ||||
|         } | ||||
| 
 | ||||
|         private async Task Consumer_Received(object sender, BasicDeliverEventArgs eventArgs) | ||||
|         { | ||||
|             var eventName = eventArgs.RoutingKey; | ||||
|             var message = Encoding.UTF8.GetString(eventArgs.Body.Span); | ||||
| 
 | ||||
|             try | ||||
|             { | ||||
|                 if (message.ToLowerInvariant().Contains("throw-fake-exception")) | ||||
|                 { | ||||
|                     throw new InvalidOperationException($"Fake exception requested: \"{message}\""); | ||||
|                 } | ||||
| 
 | ||||
|                 await ProcessEvent(eventName, message); | ||||
|             } | ||||
|             catch (Exception ex) | ||||
|             { | ||||
|                 _logger.LogWarning(ex, "----- ERROR Processing message \"{Message}\"", message); | ||||
|             } | ||||
| 
 | ||||
|             // Even on exception we take the message off the queue. | ||||
|             // in a REAL WORLD app this should be handled with a Dead Letter Exchange (DLX).  | ||||
|             // For more information see: https://www.rabbitmq.com/dlx.html | ||||
|             _consumerChannel.BasicAck(eventArgs.DeliveryTag, multiple: false); | ||||
|         } | ||||
| 
 | ||||
|         private IModel CreateConsumerChannel() | ||||
|         { | ||||
|             if (!_persistentConnection.IsConnected) | ||||
|             { | ||||
|                 _persistentConnection.TryConnect(); | ||||
|             } | ||||
| 
 | ||||
|             _logger.LogTrace("Creating RabbitMQ consumer channel"); | ||||
| 
 | ||||
|             var channel = _persistentConnection.CreateModel(); | ||||
| 
 | ||||
|             channel.ExchangeDeclare(exchange: BROKER_NAME, | ||||
|                                     type: "direct"); | ||||
| 
 | ||||
|             channel.QueueDeclare(queue: _queueName, | ||||
|                                  durable: true, | ||||
|                                  exclusive: false, | ||||
|                                  autoDelete: false, | ||||
|                                  arguments: null); | ||||
| 
 | ||||
|             channel.CallbackException += (sender, ea) => | ||||
|             { | ||||
|                 _logger.LogWarning(ea.Exception, "Recreating RabbitMQ consumer channel"); | ||||
| 
 | ||||
|                 _consumerChannel.Dispose(); | ||||
|                 _consumerChannel = CreateConsumerChannel(); | ||||
|                 StartBasicConsume(); | ||||
|             }; | ||||
| 
 | ||||
|             return channel; | ||||
|         } | ||||
| 
 | ||||
|         private async Task ProcessEvent(string eventName, string message) | ||||
|         { | ||||
|             _logger.LogTrace("Processing RabbitMQ event: {EventName}", eventName); | ||||
| 
 | ||||
|             if (_subsManager.HasSubscriptionsForEvent(eventName)) | ||||
|             { | ||||
|                 using (var scope = _autofac.BeginLifetimeScope(AUTOFAC_SCOPE_NAME)) | ||||
|                 { | ||||
|                     var subscriptions = _subsManager.GetHandlersForEvent(eventName); | ||||
|                     foreach (var subscription in subscriptions) | ||||
|                     { | ||||
|                         if (subscription.IsDynamic) | ||||
|                         { | ||||
|                             var handler = scope.ResolveOptional(subscription.HandlerType) as IDynamicIntegrationEventHandler; | ||||
|                             if (handler == null) continue; | ||||
|                             using dynamic eventData = JsonDocument.Parse(message);                             | ||||
|                             await Task.Yield(); | ||||
|                             await handler.Handle(eventData); | ||||
|                         } | ||||
|                         else | ||||
|                         { | ||||
|                             var handler = scope.ResolveOptional(subscription.HandlerType); | ||||
|                             if (handler == null) continue; | ||||
|                             var eventType = _subsManager.GetEventTypeByName(eventName); | ||||
|                             var integrationEvent = JsonSerializer.Deserialize(message, eventType, new JsonSerializerOptions() { PropertyNameCaseInsensitive= true});                             | ||||
|                             var concreteType = typeof(IIntegrationEventHandler<>).MakeGenericType(eventType); | ||||
| 
 | ||||
|                             await Task.Yield(); | ||||
|                             await (Task)concreteType.GetMethod("Handle").Invoke(handler, new object[] { integrationEvent }); | ||||
|                         } | ||||
|                     } | ||||
|                 } | ||||
|             } | ||||
|             else | ||||
|             { | ||||
|                 _logger.LogWarning("No subscription for RabbitMQ event: {EventName}", eventName); | ||||
|                 _queueName = string.Empty; | ||||
|                 _consumerChannel.Close(); | ||||
|             } | ||||
|         } | ||||
|     } | ||||
| 
 | ||||
|     public void Publish(IntegrationEvent @event) | ||||
|     { | ||||
|         if (!_persistentConnection.IsConnected) | ||||
|         { | ||||
|             _persistentConnection.TryConnect(); | ||||
|         } | ||||
| 
 | ||||
|         var policy = RetryPolicy.Handle<BrokerUnreachableException>() | ||||
|             .Or<SocketException>() | ||||
|             .WaitAndRetry(_retryCount, retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)), (ex, time) => | ||||
|             { | ||||
|                 _logger.LogWarning(ex, "Could not publish event: {EventId} after {Timeout}s ({ExceptionMessage})", @event.Id, $"{time.TotalSeconds:n1}", ex.Message); | ||||
|             }); | ||||
| 
 | ||||
|         var eventName = @event.GetType().Name; | ||||
| 
 | ||||
|         _logger.LogTrace("Creating RabbitMQ channel to publish event: {EventId} ({EventName})", @event.Id, eventName); | ||||
| 
 | ||||
|         using (var channel = _persistentConnection.CreateModel()) | ||||
|         { | ||||
|             _logger.LogTrace("Declaring RabbitMQ exchange to publish event: {EventId}", @event.Id); | ||||
| 
 | ||||
|             channel.ExchangeDeclare(exchange: BROKER_NAME, type: "direct"); | ||||
|                                  | ||||
|             var body = JsonSerializer.SerializeToUtf8Bytes(@event, @event.GetType(), new JsonSerializerOptions | ||||
|             { | ||||
|                 WriteIndented = true | ||||
|             }); | ||||
| 
 | ||||
|             policy.Execute(() => | ||||
|             { | ||||
|                 var properties = channel.CreateBasicProperties(); | ||||
|                 properties.DeliveryMode = 2; // persistent | ||||
| 
 | ||||
|                 _logger.LogTrace("Publishing event to RabbitMQ: {EventId}", @event.Id); | ||||
| 
 | ||||
|                 channel.BasicPublish( | ||||
|                     exchange: BROKER_NAME, | ||||
|                     routingKey: eventName, | ||||
|                     mandatory: true, | ||||
|                     basicProperties: properties, | ||||
|                     body: body); | ||||
|             }); | ||||
|         } | ||||
|     } | ||||
| 
 | ||||
|     public void SubscribeDynamic<TH>(string eventName) | ||||
|         where TH : IDynamicIntegrationEventHandler | ||||
|     { | ||||
|         _logger.LogInformation("Subscribing to dynamic event {EventName} with {EventHandler}", eventName, typeof(TH).GetGenericTypeName()); | ||||
| 
 | ||||
|         DoInternalSubscription(eventName); | ||||
|         _subsManager.AddDynamicSubscription<TH>(eventName); | ||||
|         StartBasicConsume(); | ||||
|     } | ||||
| 
 | ||||
|     public void Subscribe<T, TH>() | ||||
|         where T : IntegrationEvent | ||||
|         where TH : IIntegrationEventHandler<T> | ||||
|     { | ||||
|         var eventName = _subsManager.GetEventKey<T>(); | ||||
|         DoInternalSubscription(eventName); | ||||
| 
 | ||||
|         _logger.LogInformation("Subscribing to event {EventName} with {EventHandler}", eventName, typeof(TH).GetGenericTypeName()); | ||||
| 
 | ||||
|         _subsManager.AddSubscription<T, TH>(); | ||||
|         StartBasicConsume(); | ||||
|     } | ||||
| 
 | ||||
|     private void DoInternalSubscription(string eventName) | ||||
|     { | ||||
|         var containsKey = _subsManager.HasSubscriptionsForEvent(eventName); | ||||
|         if (!containsKey) | ||||
|         { | ||||
|             if (!_persistentConnection.IsConnected) | ||||
|             { | ||||
|                 _persistentConnection.TryConnect(); | ||||
|             } | ||||
|   | ||||
|             _consumerChannel.QueueBind(queue: _queueName, | ||||
|                                 exchange: BROKER_NAME, | ||||
|                                 routingKey: eventName); | ||||
|         } | ||||
|     } | ||||
| 
 | ||||
|     public void Unsubscribe<T, TH>() | ||||
|         where T : IntegrationEvent | ||||
|         where TH : IIntegrationEventHandler<T> | ||||
|     { | ||||
|         var eventName = _subsManager.GetEventKey<T>(); | ||||
| 
 | ||||
|         _logger.LogInformation("Unsubscribing from event {EventName}", eventName); | ||||
| 
 | ||||
|         _subsManager.RemoveSubscription<T, TH>(); | ||||
|     } | ||||
| 
 | ||||
|     public void UnsubscribeDynamic<TH>(string eventName) | ||||
|         where TH : IDynamicIntegrationEventHandler | ||||
|     { | ||||
|         _subsManager.RemoveDynamicSubscription<TH>(eventName); | ||||
|     } | ||||
| 
 | ||||
|     public void Dispose() | ||||
|     { | ||||
|         if (_consumerChannel != null) | ||||
|         { | ||||
|             _consumerChannel.Dispose(); | ||||
|         } | ||||
| 
 | ||||
|         _subsManager.Clear(); | ||||
|     } | ||||
| 
 | ||||
|     private void StartBasicConsume() | ||||
|     { | ||||
|         _logger.LogTrace("Starting RabbitMQ basic consume"); | ||||
| 
 | ||||
|         if (_consumerChannel != null) | ||||
|         { | ||||
|             var consumer = new AsyncEventingBasicConsumer(_consumerChannel); | ||||
| 
 | ||||
|             consumer.Received += Consumer_Received; | ||||
| 
 | ||||
|             _consumerChannel.BasicConsume( | ||||
|                 queue: _queueName, | ||||
|                 autoAck: false, | ||||
|                 consumer: consumer); | ||||
|         } | ||||
|         else | ||||
|         { | ||||
|             _logger.LogError("StartBasicConsume can't call on _consumerChannel == null"); | ||||
|         } | ||||
|     } | ||||
| 
 | ||||
|     private async Task Consumer_Received(object sender, BasicDeliverEventArgs eventArgs) | ||||
|     { | ||||
|         var eventName = eventArgs.RoutingKey; | ||||
|         var message = Encoding.UTF8.GetString(eventArgs.Body.Span); | ||||
| 
 | ||||
|         try | ||||
|         { | ||||
|             if (message.ToLowerInvariant().Contains("throw-fake-exception")) | ||||
|             { | ||||
|                 throw new InvalidOperationException($"Fake exception requested: \"{message}\""); | ||||
|             } | ||||
| 
 | ||||
|             await ProcessEvent(eventName, message); | ||||
|         } | ||||
|         catch (Exception ex) | ||||
|         { | ||||
|             _logger.LogWarning(ex, "----- ERROR Processing message \"{Message}\"", message); | ||||
|         } | ||||
| 
 | ||||
|         // Even on exception we take the message off the queue. | ||||
|         // in a REAL WORLD app this should be handled with a Dead Letter Exchange (DLX).  | ||||
|         // For more information see: https://www.rabbitmq.com/dlx.html | ||||
|         _consumerChannel.BasicAck(eventArgs.DeliveryTag, multiple: false); | ||||
|     } | ||||
| 
 | ||||
|     private IModel CreateConsumerChannel() | ||||
|     { | ||||
|         if (!_persistentConnection.IsConnected) | ||||
|         { | ||||
|             _persistentConnection.TryConnect(); | ||||
|         } | ||||
| 
 | ||||
|         _logger.LogTrace("Creating RabbitMQ consumer channel"); | ||||
| 
 | ||||
|         var channel = _persistentConnection.CreateModel(); | ||||
| 
 | ||||
|         channel.ExchangeDeclare(exchange: BROKER_NAME, | ||||
|                                 type: "direct"); | ||||
| 
 | ||||
|         channel.QueueDeclare(queue: _queueName, | ||||
|                                 durable: true, | ||||
|                                 exclusive: false, | ||||
|                                 autoDelete: false, | ||||
|                                 arguments: null); | ||||
| 
 | ||||
|         channel.CallbackException += (sender, ea) => | ||||
|         { | ||||
|             _logger.LogWarning(ea.Exception, "Recreating RabbitMQ consumer channel"); | ||||
| 
 | ||||
|             _consumerChannel.Dispose(); | ||||
|             _consumerChannel = CreateConsumerChannel(); | ||||
|             StartBasicConsume(); | ||||
|         }; | ||||
| 
 | ||||
|         return channel; | ||||
|     } | ||||
| 
 | ||||
|     private async Task ProcessEvent(string eventName, string message) | ||||
|     { | ||||
|         _logger.LogTrace("Processing RabbitMQ event: {EventName}", eventName); | ||||
| 
 | ||||
|         if (_subsManager.HasSubscriptionsForEvent(eventName)) | ||||
|         { | ||||
|             using (var scope = _autofac.BeginLifetimeScope(AUTOFAC_SCOPE_NAME)) | ||||
|             { | ||||
|                 var subscriptions = _subsManager.GetHandlersForEvent(eventName); | ||||
|                 foreach (var subscription in subscriptions) | ||||
|                 { | ||||
|                     if (subscription.IsDynamic) | ||||
|                     { | ||||
|                         var handler = scope.ResolveOptional(subscription.HandlerType) as IDynamicIntegrationEventHandler; | ||||
|                         if (handler == null) continue; | ||||
|                         using dynamic eventData = JsonDocument.Parse(message);                             | ||||
|                         await Task.Yield(); | ||||
|                         await handler.Handle(eventData); | ||||
|                     } | ||||
|                     else | ||||
|                     { | ||||
|                         var handler = scope.ResolveOptional(subscription.HandlerType); | ||||
|                         if (handler == null) continue; | ||||
|                         var eventType = _subsManager.GetEventTypeByName(eventName); | ||||
|                         var integrationEvent = JsonSerializer.Deserialize(message, eventType, new JsonSerializerOptions() { PropertyNameCaseInsensitive= true});                             | ||||
|                         var concreteType = typeof(IIntegrationEventHandler<>).MakeGenericType(eventType); | ||||
| 
 | ||||
|                         await Task.Yield(); | ||||
|                         await (Task)concreteType.GetMethod("Handle").Invoke(handler, new object[] { integrationEvent }); | ||||
|                     } | ||||
|                 } | ||||
|             } | ||||
|         } | ||||
|         else | ||||
|         { | ||||
|             _logger.LogWarning("No subscription for RabbitMQ event: {EventName}", eventName); | ||||
|         } | ||||
|     } | ||||
| } | ||||
|  | ||||
| @ -1,15 +1,11 @@ | ||||
| using RabbitMQ.Client; | ||||
| using System; | ||||
| namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ; | ||||
| 
 | ||||
| namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ | ||||
| public interface IRabbitMQPersistentConnection | ||||
|     : IDisposable | ||||
| { | ||||
|     public interface IRabbitMQPersistentConnection | ||||
|         : IDisposable | ||||
|     { | ||||
|         bool IsConnected { get; } | ||||
|     bool IsConnected { get; } | ||||
| 
 | ||||
|         bool TryConnect(); | ||||
|     bool TryConnect(); | ||||
| 
 | ||||
|         IModel CreateModel(); | ||||
|     } | ||||
|     IModel CreateModel(); | ||||
| } | ||||
|  | ||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user