From 8b846968e2dfcbb350dad0f7a24a988e4edb0e7f Mon Sep 17 00:00:00 2001 From: i02coroj Date: Fri, 13 Apr 2018 12:03:34 +0200 Subject: [PATCH] Update EventBusServiceBus.cs Do not complete messages until actually processed. --- .../EventBusServiceBus/EventBusServiceBus.cs | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/src/BuildingBlocks/EventBus/EventBusServiceBus/EventBusServiceBus.cs b/src/BuildingBlocks/EventBus/EventBusServiceBus/EventBusServiceBus.cs index 446fcd7b7..cfaf1856b 100644 --- a/src/BuildingBlocks/EventBus/EventBusServiceBus/EventBusServiceBus.cs +++ b/src/BuildingBlocks/EventBus/EventBusServiceBus/EventBusServiceBus.cs @@ -129,10 +129,13 @@ { var eventName = $"{message.Label}{INTEGRATION_EVENT_SUFIX}"; var messageData = Encoding.UTF8.GetString(message.Body); - await ProcessEvent(eventName, messageData); + var processed = await ProcessEvent(eventName, messageData); // Complete the message so that it is not received again. - await _subscriptionClient.CompleteAsync(message.SystemProperties.LockToken); + if (processed) + { + await _subscriptionClient.CompleteAsync(message.SystemProperties.LockToken); + } }, new MessageHandlerOptions(ExceptionReceivedHandler) { MaxConcurrentCalls = 10, AutoComplete = false }); } @@ -148,8 +151,9 @@ return Task.CompletedTask; } - private async Task ProcessEvent(string eventName, string message) + private async Task ProcessEvent(string eventName, string message) { + var processed = false; if (_subsManager.HasSubscriptionsForEvent(eventName)) { using (var scope = _autofac.BeginLifetimeScope(AUTOFAC_SCOPE_NAME)) @@ -173,7 +177,9 @@ } } } + processed = true; } + return processed } private void RemoveDefaultRule()