From 8b846968e2dfcbb350dad0f7a24a988e4edb0e7f Mon Sep 17 00:00:00 2001 From: i02coroj Date: Fri, 13 Apr 2018 12:03:34 +0200 Subject: [PATCH 1/3] Update EventBusServiceBus.cs Do not complete messages until actually processed. --- .../EventBusServiceBus/EventBusServiceBus.cs | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/src/BuildingBlocks/EventBus/EventBusServiceBus/EventBusServiceBus.cs b/src/BuildingBlocks/EventBus/EventBusServiceBus/EventBusServiceBus.cs index 446fcd7b7..cfaf1856b 100644 --- a/src/BuildingBlocks/EventBus/EventBusServiceBus/EventBusServiceBus.cs +++ b/src/BuildingBlocks/EventBus/EventBusServiceBus/EventBusServiceBus.cs @@ -129,10 +129,13 @@ { var eventName = $"{message.Label}{INTEGRATION_EVENT_SUFIX}"; var messageData = Encoding.UTF8.GetString(message.Body); - await ProcessEvent(eventName, messageData); + var processed = await ProcessEvent(eventName, messageData); // Complete the message so that it is not received again. - await _subscriptionClient.CompleteAsync(message.SystemProperties.LockToken); + if (processed) + { + await _subscriptionClient.CompleteAsync(message.SystemProperties.LockToken); + } }, new MessageHandlerOptions(ExceptionReceivedHandler) { MaxConcurrentCalls = 10, AutoComplete = false }); } @@ -148,8 +151,9 @@ return Task.CompletedTask; } - private async Task ProcessEvent(string eventName, string message) + private async Task ProcessEvent(string eventName, string message) { + var processed = false; if (_subsManager.HasSubscriptionsForEvent(eventName)) { using (var scope = _autofac.BeginLifetimeScope(AUTOFAC_SCOPE_NAME)) @@ -173,7 +177,9 @@ } } } + processed = true; } + return processed } private void RemoveDefaultRule() From 44f43cdc0d533a985788ffdf759152dfd2ca2d0c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Alberto?= Date: Wed, 30 May 2018 22:51:49 +0200 Subject: [PATCH 2/3] Remove unneeded local variable. --- .../EventBusServiceBus/EventBusServiceBus.cs | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/src/BuildingBlocks/EventBus/EventBusServiceBus/EventBusServiceBus.cs b/src/BuildingBlocks/EventBus/EventBusServiceBus/EventBusServiceBus.cs index cfaf1856b..2bc6390ef 100644 --- a/src/BuildingBlocks/EventBus/EventBusServiceBus/EventBusServiceBus.cs +++ b/src/BuildingBlocks/EventBus/EventBusServiceBus/EventBusServiceBus.cs @@ -22,7 +22,7 @@ private readonly string AUTOFAC_SCOPE_NAME = "eshop_event_bus"; private const string INTEGRATION_EVENT_SUFIX = "IntegrationEvent"; - public EventBusServiceBus(IServiceBusPersisterConnection serviceBusPersisterConnection, + public EventBusServiceBus(IServiceBusPersisterConnection serviceBusPersisterConnection, ILogger logger, IEventBusSubscriptionsManager subsManager, string subscriptionClientName, ILifetimeScope autofac) { @@ -30,7 +30,7 @@ _logger = logger; _subsManager = subsManager ?? new InMemoryEventBusSubscriptionsManager(); - _subscriptionClient = new SubscriptionClient(serviceBusPersisterConnection.ServiceBusConnectionStringBuilder, + _subscriptionClient = new SubscriptionClient(serviceBusPersisterConnection.ServiceBusConnectionStringBuilder, subscriptionClientName); _autofac = autofac; @@ -68,7 +68,7 @@ where T : IntegrationEvent where TH : IIntegrationEventHandler { - var eventName = typeof(T).Name.Replace(INTEGRATION_EVENT_SUFIX, ""); + var eventName = typeof(T).Name.Replace(INTEGRATION_EVENT_SUFIX, ""); var containsKey = _subsManager.HasSubscriptionsForEvent(); if (!containsKey) @@ -81,7 +81,7 @@ Name = eventName }).GetAwaiter().GetResult(); } - catch(ServiceBusException) + catch (ServiceBusException) { _logger.LogInformation($"The messaging entity {eventName} already exists."); } @@ -129,10 +129,9 @@ { var eventName = $"{message.Label}{INTEGRATION_EVENT_SUFIX}"; var messageData = Encoding.UTF8.GetString(message.Body); - var processed = await ProcessEvent(eventName, messageData); - + // Complete the message so that it is not received again. - if (processed) + if (await ProcessEvent(eventName, messageData)) { await _subscriptionClient.CompleteAsync(message.SystemProperties.LockToken); } @@ -197,4 +196,4 @@ } } } -} +} \ No newline at end of file From bee5286bc4744ecea3840406034fd70d20d359e4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Alberto?= Date: Wed, 30 May 2018 22:57:27 +0200 Subject: [PATCH 3/3] Remove unneeded local variable. --- .../EventBus/EventBusServiceBus/EventBusServiceBus.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/BuildingBlocks/EventBus/EventBusServiceBus/EventBusServiceBus.cs b/src/BuildingBlocks/EventBus/EventBusServiceBus/EventBusServiceBus.cs index 2bc6390ef..2cd86669b 100644 --- a/src/BuildingBlocks/EventBus/EventBusServiceBus/EventBusServiceBus.cs +++ b/src/BuildingBlocks/EventBus/EventBusServiceBus/EventBusServiceBus.cs @@ -178,7 +178,7 @@ } processed = true; } - return processed + return processed; } private void RemoveDefaultRule()