diff --git a/src/BuildingBlocks/EventBus/EventBus/Abstractions/IEventBus.cs b/src/BuildingBlocks/EventBus/EventBus/Abstractions/IEventBus.cs index 4c9758a40..c84b6f56f 100644 --- a/src/BuildingBlocks/EventBus/EventBus/Abstractions/IEventBus.cs +++ b/src/BuildingBlocks/EventBus/EventBus/Abstractions/IEventBus.cs @@ -19,5 +19,7 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Abstractions void Unsubscribe() where TH : IIntegrationEventHandler where T : IntegrationEvent; + + void Dispose(); } } diff --git a/src/BuildingBlocks/EventBus/EventBusRabbitMQ/EventBusRabbitMQ.cs b/src/BuildingBlocks/EventBus/EventBusRabbitMQ/EventBusRabbitMQ.cs index 415da4283..7ab1f77c1 100644 --- a/src/BuildingBlocks/EventBus/EventBusRabbitMQ/EventBusRabbitMQ.cs +++ b/src/BuildingBlocks/EventBus/EventBusRabbitMQ/EventBusRabbitMQ.cs @@ -4,6 +4,7 @@ using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Abstractions; using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Events; using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Extensions; using Microsoft.Extensions.Logging; +using Azure.Messaging.ServiceBus; using Polly; using Polly.Retry; using RabbitMQ.Client; @@ -27,6 +28,7 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ private readonly IEventBusSubscriptionsManager _subsManager; private readonly ILifetimeScope _autofac; private readonly int _retryCount; + private readonly ServiceBusProcessor _processor; private IModel _consumerChannel; private string _queueName; @@ -42,6 +44,7 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ _autofac = autofac; _retryCount = retryCount; _subsManager.OnEventRemoved += SubsManager_OnEventRemoved; + this._processor = EventBusServiceBus.EventBusServiceBus.DeliverProcessor(); } private void SubsManager_OnEventRemoved(object sender, string eventName) @@ -178,6 +181,7 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ } _subsManager.Clear(); + _processor.CloseAsync().GetAwaiter().GetResult(); } private void StartBasicConsume() diff --git a/src/BuildingBlocks/EventBus/EventBusRabbitMQ/EventBusRabbitMQ.csproj b/src/BuildingBlocks/EventBus/EventBusRabbitMQ/EventBusRabbitMQ.csproj index f8dfc42e5..00d216d19 100644 --- a/src/BuildingBlocks/EventBus/EventBusRabbitMQ/EventBusRabbitMQ.csproj +++ b/src/BuildingBlocks/EventBus/EventBusRabbitMQ/EventBusRabbitMQ.csproj @@ -7,6 +7,7 @@ + @@ -14,6 +15,7 @@ + diff --git a/src/BuildingBlocks/EventBus/EventBusServiceBus/EventBusServiceBus.cs b/src/BuildingBlocks/EventBus/EventBusServiceBus/EventBusServiceBus.cs index c04753894..8b1aac516 100644 --- a/src/BuildingBlocks/EventBus/EventBusServiceBus/EventBusServiceBus.cs +++ b/src/BuildingBlocks/EventBus/EventBusServiceBus/EventBusServiceBus.cs @@ -20,7 +20,7 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusServiceBus private readonly string _topicName; private readonly string _subscriptionName; private ServiceBusSender _sender; - private ServiceBusProcessor _processor; + private static ServiceBusProcessor _processor; private readonly string AUTOFAC_SCOPE_NAME = "eshop_event_bus"; private const string INTEGRATION_EVENT_SUFFIX = "IntegrationEvent"; @@ -34,6 +34,8 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusServiceBus _topicName = topicName; _subscriptionName = subscriptionName; _sender = _serviceBusPersisterConnection.TopicClient.CreateSender(_topicName); + ServiceBusProcessorOptions options = new ServiceBusProcessorOptions { MaxConcurrentCalls = 10, AutoCompleteMessages = false }; + _processor = _serviceBusPersisterConnection.TopicClient.CreateProcessor(_topicName, options); RemoveDefaultRule(); RegisterSubscriptionClientMessageHandlerAsync().GetAwaiter().GetResult(); @@ -126,14 +128,17 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusServiceBus public void Dispose() { _subsManager.Clear(); - this._processor.CloseAsync().GetAwaiter().GetResult(); + _processor.CloseAsync().GetAwaiter().GetResult(); + } + + public static ServiceBusProcessor DeliverProcessor() + { + return _processor; } private async Task RegisterSubscriptionClientMessageHandlerAsync() { - ServiceBusProcessorOptions options = new ServiceBusProcessorOptions { MaxConcurrentCalls = 10, AutoCompleteMessages = false }; - this._processor = _serviceBusPersisterConnection.TopicClient.CreateProcessor(_topicName, options); - this._processor.ProcessMessageAsync += + _processor.ProcessMessageAsync += async (args) => { var eventName = $"{args.Message.Subject}{INTEGRATION_EVENT_SUFFIX}"; @@ -146,8 +151,8 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusServiceBus } }; - this._processor.ProcessErrorAsync += ErrorHandler; - await this._processor.StartProcessingAsync(); + _processor.ProcessErrorAsync += ErrorHandler; + await _processor.StartProcessingAsync(); } private Task ErrorHandler(ProcessErrorEventArgs args)