From 2edb81974ef566c089b8f532210b58a5d73877c9 Mon Sep 17 00:00:00 2001 From: Georgy Sayganov Date: Mon, 24 May 2021 14:33:22 +0300 Subject: [PATCH] Remove Autofac from the EventBusRabbitMQ project Refactor the EventBusRabbitMQ class --- .../EventBusRabbitMQ/EventBusRabbitMQ.cs | 188 ++++++++++-------- .../EventBusRabbitMQ/EventBusRabbitMQ.csproj | 1 - 2 files changed, 102 insertions(+), 87 deletions(-) diff --git a/src/BuildingBlocks/EventBus/EventBusRabbitMQ/EventBusRabbitMQ.cs b/src/BuildingBlocks/EventBus/EventBusRabbitMQ/EventBusRabbitMQ.cs index 415da4283..c8d773a56 100644 --- a/src/BuildingBlocks/EventBus/EventBusRabbitMQ/EventBusRabbitMQ.cs +++ b/src/BuildingBlocks/EventBus/EventBusRabbitMQ/EventBusRabbitMQ.cs @@ -1,45 +1,47 @@ -using Autofac; +using System; +using System.Net.Sockets; +using System.Text; +using System.Text.Json; +using System.Threading.Tasks; using Microsoft.eShopOnContainers.BuildingBlocks.EventBus; using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Abstractions; using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Events; using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Extensions; +using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; using Polly; -using Polly.Retry; using RabbitMQ.Client; using RabbitMQ.Client.Events; using RabbitMQ.Client.Exceptions; -using System; -using System.Net.Sockets; -using System.Text; -using System.Threading.Tasks; -using System.Text.Json; namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ { public class EventBusRabbitMQ : IEventBus, IDisposable { - const string BROKER_NAME = "eshop_event_bus"; - const string AUTOFAC_SCOPE_NAME = "eshop_event_bus"; + private const string BrokerName = "eshop_event_bus"; private readonly IRabbitMQPersistentConnection _persistentConnection; private readonly ILogger _logger; private readonly IEventBusSubscriptionsManager _subsManager; - private readonly ILifetimeScope _autofac; + private readonly IServiceScopeFactory _serviceScopeFactory; private readonly int _retryCount; private IModel _consumerChannel; private string _queueName; - public EventBusRabbitMQ(IRabbitMQPersistentConnection persistentConnection, ILogger logger, - ILifetimeScope autofac, IEventBusSubscriptionsManager subsManager, string queueName = null, int retryCount = 5) + public EventBusRabbitMQ(IRabbitMQPersistentConnection persistentConnection, + ILogger logger, + IServiceScopeFactory serviceScopeFactory, + IEventBusSubscriptionsManager subsManager, + string queueName = null, + int retryCount = 5) { _persistentConnection = persistentConnection ?? throw new ArgumentNullException(nameof(persistentConnection)); _logger = logger ?? throw new ArgumentNullException(nameof(logger)); _subsManager = subsManager ?? new InMemoryEventBusSubscriptionsManager(); _queueName = queueName; _consumerChannel = CreateConsumerChannel(); - _autofac = autofac; + _serviceScopeFactory = serviceScopeFactory; _retryCount = retryCount; _subsManager.OnEventRemoved += SubsManager_OnEventRemoved; } @@ -53,15 +55,15 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ using (var channel = _persistentConnection.CreateModel()) { - channel.QueueUnbind(queue: _queueName, - exchange: BROKER_NAME, - routingKey: eventName); + channel.QueueUnbind(_queueName, BrokerName, eventName); - if (_subsManager.IsEmpty) + if (!_subsManager.IsEmpty) { - _queueName = string.Empty; - _consumerChannel.Close(); + return; } + + _queueName = string.Empty; + _consumerChannel.Close(); } } @@ -72,23 +74,27 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ _persistentConnection.TryConnect(); } - var policy = RetryPolicy.Handle() + var policy = Policy.Handle() .Or() - .WaitAndRetry(_retryCount, retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)), (ex, time) => - { - _logger.LogWarning(ex, "Could not publish event: {EventId} after {Timeout}s ({ExceptionMessage})", @event.Id, $"{time.TotalSeconds:n1}", ex.Message); - }); + .WaitAndRetry(_retryCount, retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)), + (ex, time) => + { + _logger.LogWarning(ex, + "Could not publish event: {EventId} after {Timeout}s ({ExceptionMessage})", @event.Id, + $"{time.TotalSeconds:n1}", ex.Message); + }); var eventName = @event.GetType().Name; - _logger.LogTrace("Creating RabbitMQ channel to publish event: {EventId} ({EventName})", @event.Id, eventName); + _logger.LogTrace("Creating RabbitMQ channel to publish event: {EventId} ({EventName})", @event.Id, + eventName); using (var channel = _persistentConnection.CreateModel()) { _logger.LogTrace("Declaring RabbitMQ exchange to publish event: {EventId}", @event.Id); - channel.ExchangeDeclare(exchange: BROKER_NAME, type: "direct"); - + channel.ExchangeDeclare(BrokerName, "direct"); + var body = JsonSerializer.SerializeToUtf8Bytes(@event, @event.GetType(), new JsonSerializerOptions { WriteIndented = true @@ -97,86 +103,81 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ policy.Execute(() => { var properties = channel.CreateBasicProperties(); - properties.DeliveryMode = 2; // persistent + properties.DeliveryMode = 2; _logger.LogTrace("Publishing event to RabbitMQ: {EventId}", @event.Id); channel.BasicPublish( - exchange: BROKER_NAME, - routingKey: eventName, - mandatory: true, - basicProperties: properties, - body: body); + BrokerName, + eventName, + true, + properties, + body); }); } } - public void SubscribeDynamic(string eventName) - where TH : IDynamicIntegrationEventHandler + public void SubscribeDynamic(string eventName) where TH : IDynamicIntegrationEventHandler { - _logger.LogInformation("Subscribing to dynamic event {EventName} with {EventHandler}", eventName, typeof(TH).GetGenericTypeName()); + _logger.LogInformation("Subscribing to dynamic event {EventName} with {EventHandler}", eventName, + typeof(TH).GetGenericTypeName()); DoInternalSubscription(eventName); + _subsManager.AddDynamicSubscription(eventName); + StartBasicConsume(); } - public void Subscribe() - where T : IntegrationEvent - where TH : IIntegrationEventHandler + public void Subscribe() where T : IntegrationEvent where TH : IIntegrationEventHandler { var eventName = _subsManager.GetEventKey(); + DoInternalSubscription(eventName); - _logger.LogInformation("Subscribing to event {EventName} with {EventHandler}", eventName, typeof(TH).GetGenericTypeName()); - + _logger.LogInformation("Subscribing to event {EventName} with {EventHandler}", eventName, + typeof(TH).GetGenericTypeName()); _subsManager.AddSubscription(); + StartBasicConsume(); } private void DoInternalSubscription(string eventName) { var containsKey = _subsManager.HasSubscriptionsForEvent(eventName); - if (!containsKey) - { - if (!_persistentConnection.IsConnected) - { - _persistentConnection.TryConnect(); - } - using (var channel = _persistentConnection.CreateModel()) - { - channel.QueueBind(queue: _queueName, - exchange: BROKER_NAME, - routingKey: eventName); - } + if (containsKey) + { + return; + } + + if (!_persistentConnection.IsConnected) + { + _persistentConnection.TryConnect(); + } + + using (var channel = _persistentConnection.CreateModel()) + { + channel.QueueBind(_queueName, BrokerName, eventName); } } - public void Unsubscribe() - where T : IntegrationEvent - where TH : IIntegrationEventHandler + public void Unsubscribe() where T : IntegrationEvent where TH : IIntegrationEventHandler { var eventName = _subsManager.GetEventKey(); _logger.LogInformation("Unsubscribing from event {EventName}", eventName); - _subsManager.RemoveSubscription(); } - public void UnsubscribeDynamic(string eventName) - where TH : IDynamicIntegrationEventHandler + public void UnsubscribeDynamic(string eventName) where TH : IDynamicIntegrationEventHandler { _subsManager.RemoveDynamicSubscription(eventName); } public void Dispose() { - if (_consumerChannel != null) - { - _consumerChannel.Dispose(); - } - + _consumerChannel?.Dispose(); _subsManager.Clear(); } @@ -191,9 +192,9 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ consumer.Received += Consumer_Received; _consumerChannel.BasicConsume( - queue: _queueName, - autoAck: false, - consumer: consumer); + _queueName, + false, + consumer); } else { @@ -223,7 +224,7 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ // Even on exception we take the message off the queue. // in a REAL WORLD app this should be handled with a Dead Letter Exchange (DLX). // For more information see: https://www.rabbitmq.com/dlx.html - _consumerChannel.BasicAck(eventArgs.DeliveryTag, multiple: false); + _consumerChannel.BasicAck(eventArgs.DeliveryTag, false); } private IModel CreateConsumerChannel() @@ -237,21 +238,19 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ var channel = _persistentConnection.CreateModel(); - channel.ExchangeDeclare(exchange: BROKER_NAME, - type: "direct"); - - channel.QueueDeclare(queue: _queueName, - durable: true, - exclusive: false, - autoDelete: false, - arguments: null); + channel.ExchangeDeclare(BrokerName, "direct"); + channel.QueueDeclare(_queueName, + true, + false, + false, + null); channel.CallbackException += (sender, ea) => { _logger.LogWarning(ea.Exception, "Recreating RabbitMQ consumer channel"); - _consumerChannel.Dispose(); _consumerChannel = CreateConsumerChannel(); + StartBasicConsume(); }; @@ -264,29 +263,46 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ if (_subsManager.HasSubscriptionsForEvent(eventName)) { - using (var scope = _autofac.BeginLifetimeScope(AUTOFAC_SCOPE_NAME)) + using (var scope = _serviceScopeFactory.CreateScope()) { var subscriptions = _subsManager.GetHandlersForEvent(eventName); + foreach (var subscription in subscriptions) { if (subscription.IsDynamic) { - var handler = scope.ResolveOptional(subscription.HandlerType) as IDynamicIntegrationEventHandler; - if (handler == null) continue; - using dynamic eventData = JsonDocument.Parse(message); + if (!(scope.ServiceProvider.GetService(subscription.HandlerType) is + IDynamicIntegrationEventHandler handler)) + { + continue; + } + + using dynamic eventData = JsonDocument.Parse(message); + await Task.Yield(); await handler.Handle(eventData); } else { - var handler = scope.ResolveOptional(subscription.HandlerType); - if (handler == null) continue; + var handler = scope.ServiceProvider.GetService(subscription.HandlerType); + if (handler == null) + { + continue; + } + var eventType = _subsManager.GetEventTypeByName(eventName); - var integrationEvent = JsonSerializer.Deserialize(message, eventType, new JsonSerializerOptions() { PropertyNameCaseInsensitive= true}); + var integrationEvent = JsonSerializer.Deserialize(message, eventType, + new JsonSerializerOptions + { + PropertyNameCaseInsensitive = true + }); var concreteType = typeof(IIntegrationEventHandler<>).MakeGenericType(eventType); await Task.Yield(); - await (Task)concreteType.GetMethod("Handle").Invoke(handler, new object[] { integrationEvent }); + await (Task) concreteType.GetMethod("Handle").Invoke(handler, new[] + { + integrationEvent + }); } } } @@ -297,4 +313,4 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ } } } -} +} \ No newline at end of file diff --git a/src/BuildingBlocks/EventBus/EventBusRabbitMQ/EventBusRabbitMQ.csproj b/src/BuildingBlocks/EventBus/EventBusRabbitMQ/EventBusRabbitMQ.csproj index f8dfc42e5..e998a30e7 100644 --- a/src/BuildingBlocks/EventBus/EventBusRabbitMQ/EventBusRabbitMQ.csproj +++ b/src/BuildingBlocks/EventBus/EventBusRabbitMQ/EventBusRabbitMQ.csproj @@ -6,7 +6,6 @@ -