diff --git a/src/BuildingBlocks/EventBus/EventBusServiceBus/EventBusServiceBus.cs b/src/BuildingBlocks/EventBus/EventBusServiceBus/EventBusServiceBus.cs index 446fcd7b7..2cd86669b 100644 --- a/src/BuildingBlocks/EventBus/EventBusServiceBus/EventBusServiceBus.cs +++ b/src/BuildingBlocks/EventBus/EventBusServiceBus/EventBusServiceBus.cs @@ -22,7 +22,7 @@ private readonly string AUTOFAC_SCOPE_NAME = "eshop_event_bus"; private const string INTEGRATION_EVENT_SUFIX = "IntegrationEvent"; - public EventBusServiceBus(IServiceBusPersisterConnection serviceBusPersisterConnection, + public EventBusServiceBus(IServiceBusPersisterConnection serviceBusPersisterConnection, ILogger logger, IEventBusSubscriptionsManager subsManager, string subscriptionClientName, ILifetimeScope autofac) { @@ -30,7 +30,7 @@ _logger = logger; _subsManager = subsManager ?? new InMemoryEventBusSubscriptionsManager(); - _subscriptionClient = new SubscriptionClient(serviceBusPersisterConnection.ServiceBusConnectionStringBuilder, + _subscriptionClient = new SubscriptionClient(serviceBusPersisterConnection.ServiceBusConnectionStringBuilder, subscriptionClientName); _autofac = autofac; @@ -68,7 +68,7 @@ where T : IntegrationEvent where TH : IIntegrationEventHandler { - var eventName = typeof(T).Name.Replace(INTEGRATION_EVENT_SUFIX, ""); + var eventName = typeof(T).Name.Replace(INTEGRATION_EVENT_SUFIX, ""); var containsKey = _subsManager.HasSubscriptionsForEvent(); if (!containsKey) @@ -81,7 +81,7 @@ Name = eventName }).GetAwaiter().GetResult(); } - catch(ServiceBusException) + catch (ServiceBusException) { _logger.LogInformation($"The messaging entity {eventName} already exists."); } @@ -129,10 +129,12 @@ { var eventName = $"{message.Label}{INTEGRATION_EVENT_SUFIX}"; var messageData = Encoding.UTF8.GetString(message.Body); - await ProcessEvent(eventName, messageData); - + // Complete the message so that it is not received again. - await _subscriptionClient.CompleteAsync(message.SystemProperties.LockToken); + if (await ProcessEvent(eventName, messageData)) + { + await _subscriptionClient.CompleteAsync(message.SystemProperties.LockToken); + } }, new MessageHandlerOptions(ExceptionReceivedHandler) { MaxConcurrentCalls = 10, AutoComplete = false }); } @@ -148,8 +150,9 @@ return Task.CompletedTask; } - private async Task ProcessEvent(string eventName, string message) + private async Task ProcessEvent(string eventName, string message) { + var processed = false; if (_subsManager.HasSubscriptionsForEvent(eventName)) { using (var scope = _autofac.BeginLifetimeScope(AUTOFAC_SCOPE_NAME)) @@ -173,7 +176,9 @@ } } } + processed = true; } + return processed; } private void RemoveDefaultRule() @@ -191,4 +196,4 @@ } } } -} +} \ No newline at end of file