Merge pull request #582 from i02coroj/fix-messages-lost

Update EventBusServiceBus.cs
This commit is contained in:
Miguel Veloso 2018-07-26 20:02:33 +01:00 committed by GitHub
commit c3983b30c7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -22,7 +22,7 @@
private readonly string AUTOFAC_SCOPE_NAME = "eshop_event_bus"; private readonly string AUTOFAC_SCOPE_NAME = "eshop_event_bus";
private const string INTEGRATION_EVENT_SUFIX = "IntegrationEvent"; private const string INTEGRATION_EVENT_SUFIX = "IntegrationEvent";
public EventBusServiceBus(IServiceBusPersisterConnection serviceBusPersisterConnection, public EventBusServiceBus(IServiceBusPersisterConnection serviceBusPersisterConnection,
ILogger<EventBusServiceBus> logger, IEventBusSubscriptionsManager subsManager, string subscriptionClientName, ILogger<EventBusServiceBus> logger, IEventBusSubscriptionsManager subsManager, string subscriptionClientName,
ILifetimeScope autofac) ILifetimeScope autofac)
{ {
@ -30,7 +30,7 @@
_logger = logger; _logger = logger;
_subsManager = subsManager ?? new InMemoryEventBusSubscriptionsManager(); _subsManager = subsManager ?? new InMemoryEventBusSubscriptionsManager();
_subscriptionClient = new SubscriptionClient(serviceBusPersisterConnection.ServiceBusConnectionStringBuilder, _subscriptionClient = new SubscriptionClient(serviceBusPersisterConnection.ServiceBusConnectionStringBuilder,
subscriptionClientName); subscriptionClientName);
_autofac = autofac; _autofac = autofac;
@ -68,7 +68,7 @@
where T : IntegrationEvent where T : IntegrationEvent
where TH : IIntegrationEventHandler<T> where TH : IIntegrationEventHandler<T>
{ {
var eventName = typeof(T).Name.Replace(INTEGRATION_EVENT_SUFIX, ""); var eventName = typeof(T).Name.Replace(INTEGRATION_EVENT_SUFIX, "");
var containsKey = _subsManager.HasSubscriptionsForEvent<T>(); var containsKey = _subsManager.HasSubscriptionsForEvent<T>();
if (!containsKey) if (!containsKey)
@ -81,7 +81,7 @@
Name = eventName Name = eventName
}).GetAwaiter().GetResult(); }).GetAwaiter().GetResult();
} }
catch(ServiceBusException) catch (ServiceBusException)
{ {
_logger.LogInformation($"The messaging entity {eventName} already exists."); _logger.LogInformation($"The messaging entity {eventName} already exists.");
} }
@ -129,10 +129,12 @@
{ {
var eventName = $"{message.Label}{INTEGRATION_EVENT_SUFIX}"; var eventName = $"{message.Label}{INTEGRATION_EVENT_SUFIX}";
var messageData = Encoding.UTF8.GetString(message.Body); var messageData = Encoding.UTF8.GetString(message.Body);
await ProcessEvent(eventName, messageData);
// Complete the message so that it is not received again. // Complete the message so that it is not received again.
await _subscriptionClient.CompleteAsync(message.SystemProperties.LockToken); if (await ProcessEvent(eventName, messageData))
{
await _subscriptionClient.CompleteAsync(message.SystemProperties.LockToken);
}
}, },
new MessageHandlerOptions(ExceptionReceivedHandler) { MaxConcurrentCalls = 10, AutoComplete = false }); new MessageHandlerOptions(ExceptionReceivedHandler) { MaxConcurrentCalls = 10, AutoComplete = false });
} }
@ -148,8 +150,9 @@
return Task.CompletedTask; return Task.CompletedTask;
} }
private async Task ProcessEvent(string eventName, string message) private async Task<bool> ProcessEvent(string eventName, string message)
{ {
var processed = false;
if (_subsManager.HasSubscriptionsForEvent(eventName)) if (_subsManager.HasSubscriptionsForEvent(eventName))
{ {
using (var scope = _autofac.BeginLifetimeScope(AUTOFAC_SCOPE_NAME)) using (var scope = _autofac.BeginLifetimeScope(AUTOFAC_SCOPE_NAME))
@ -173,7 +176,9 @@
} }
} }
} }
processed = true;
} }
return processed;
} }
private void RemoveDefaultRule() private void RemoveDefaultRule()
@ -191,4 +196,4 @@
} }
} }
} }
} }