diff --git a/src/BuildingBlocks/EventBus/EventBusServiceBus/DefaultServiceBusPersisterConnection.cs b/src/BuildingBlocks/EventBus/EventBusServiceBus/DefaultServiceBusPersisterConnection.cs index a3f563c2f..535fd9cd3 100644 --- a/src/BuildingBlocks/EventBus/EventBusServiceBus/DefaultServiceBusPersisterConnection.cs +++ b/src/BuildingBlocks/EventBus/EventBusServiceBus/DefaultServiceBusPersisterConnection.cs @@ -1,44 +1,48 @@ using Microsoft.Azure.ServiceBus; using Microsoft.Extensions.Logging; -using System; namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusServiceBus { - public class DefaultServiceBusPersisterConnection :IServiceBusPersisterConnection - { - private readonly ILogger _logger; - private readonly ServiceBusConnectionStringBuilder _serviceBusConnectionStringBuilder; - private ITopicClient _topicClient; - - bool _disposed; - - public DefaultServiceBusPersisterConnection(ServiceBusConnectionStringBuilder serviceBusConnectionStringBuilder, - ILogger logger) - { - _logger = logger ?? throw new ArgumentNullException(nameof(logger)); - - _serviceBusConnectionStringBuilder = serviceBusConnectionStringBuilder ?? - throw new ArgumentNullException(nameof(serviceBusConnectionStringBuilder)); - _topicClient = new TopicClient(_serviceBusConnectionStringBuilder, RetryPolicy.Default); - } - - public ServiceBusConnectionStringBuilder ServiceBusConnectionStringBuilder => _serviceBusConnectionStringBuilder; - - public ITopicClient CreateModel() - { - if(_topicClient.IsClosedOrClosing) - { - _topicClient = new TopicClient(_serviceBusConnectionStringBuilder, RetryPolicy.Default); - } - - return _topicClient; - } - - public void Dispose() - { - if (_disposed) return; - - _disposed = true; - } - } + using ArgumentNullException = System.ArgumentNullException; + + public class DefaultServiceBusPersisterConnection : IServiceBusPersisterConnection + { + private readonly ILogger _logger; + private readonly ServiceBusConnectionStringBuilder _serviceBusConnectionStringBuilder; + private ITopicClient _topicClient; + + bool _disposed; + + public DefaultServiceBusPersisterConnection(ServiceBusConnectionStringBuilder serviceBusConnectionStringBuilder, + ILogger logger) + { + _logger = logger ?? throw new ArgumentNullException(nameof(logger)); + + _serviceBusConnectionStringBuilder = serviceBusConnectionStringBuilder ?? + throw new ArgumentNullException(nameof(serviceBusConnectionStringBuilder)); + _topicClient = new TopicClient(_serviceBusConnectionStringBuilder, RetryPolicy.Default); + } + + public ServiceBusConnectionStringBuilder ServiceBusConnectionStringBuilder => _serviceBusConnectionStringBuilder; + + public ITopicClient CreateModel() + { + if (_topicClient.IsClosedOrClosing) + { + _topicClient = new TopicClient(_serviceBusConnectionStringBuilder, RetryPolicy.Default); + } + + return _topicClient; + } + + public void Dispose() + { + if (_disposed) + { + return; + } + + _disposed = true; + } + } } diff --git a/src/BuildingBlocks/EventBus/EventBusServiceBus/EventBusServiceBus.cs b/src/BuildingBlocks/EventBus/EventBusServiceBus/EventBusServiceBus.cs index 2cd86669b..bd70c2c4a 100644 --- a/src/BuildingBlocks/EventBus/EventBusServiceBus/EventBusServiceBus.cs +++ b/src/BuildingBlocks/EventBus/EventBusServiceBus/EventBusServiceBus.cs @@ -1,199 +1,199 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusServiceBus { - using Autofac; - using Microsoft.Azure.ServiceBus; - using Microsoft.eShopOnContainers.BuildingBlocks.EventBus; - using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Abstractions; - using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Events; - using Microsoft.Extensions.Logging; - using Newtonsoft.Json; - using Newtonsoft.Json.Linq; - using System; - using System.Text; - using System.Threading.Tasks; - - public class EventBusServiceBus : IEventBus - { - private readonly IServiceBusPersisterConnection _serviceBusPersisterConnection; - private readonly ILogger _logger; - private readonly IEventBusSubscriptionsManager _subsManager; - private readonly SubscriptionClient _subscriptionClient; - private readonly ILifetimeScope _autofac; - private readonly string AUTOFAC_SCOPE_NAME = "eshop_event_bus"; - private const string INTEGRATION_EVENT_SUFIX = "IntegrationEvent"; - - public EventBusServiceBus(IServiceBusPersisterConnection serviceBusPersisterConnection, - ILogger logger, IEventBusSubscriptionsManager subsManager, string subscriptionClientName, - ILifetimeScope autofac) - { - _serviceBusPersisterConnection = serviceBusPersisterConnection; - _logger = logger; - _subsManager = subsManager ?? new InMemoryEventBusSubscriptionsManager(); - - _subscriptionClient = new SubscriptionClient(serviceBusPersisterConnection.ServiceBusConnectionStringBuilder, - subscriptionClientName); - _autofac = autofac; - - RemoveDefaultRule(); - RegisterSubscriptionClientMessageHandler(); - } - - public void Publish(IntegrationEvent @event) - { - var eventName = @event.GetType().Name.Replace(INTEGRATION_EVENT_SUFIX, ""); - var jsonMessage = JsonConvert.SerializeObject(@event); - var body = Encoding.UTF8.GetBytes(jsonMessage); - - var message = new Message - { - MessageId = Guid.NewGuid().ToString(), - Body = body, - Label = eventName, - }; - - var topicClient = _serviceBusPersisterConnection.CreateModel(); - - topicClient.SendAsync(message) - .GetAwaiter() - .GetResult(); - } - - public void SubscribeDynamic(string eventName) - where TH : IDynamicIntegrationEventHandler - { - _subsManager.AddDynamicSubscription(eventName); - } - - public void Subscribe() - where T : IntegrationEvent - where TH : IIntegrationEventHandler - { - var eventName = typeof(T).Name.Replace(INTEGRATION_EVENT_SUFIX, ""); - - var containsKey = _subsManager.HasSubscriptionsForEvent(); - if (!containsKey) - { - try - { - _subscriptionClient.AddRuleAsync(new RuleDescription - { - Filter = new CorrelationFilter { Label = eventName }, - Name = eventName - }).GetAwaiter().GetResult(); - } - catch (ServiceBusException) - { - _logger.LogInformation($"The messaging entity {eventName} already exists."); - } - } - - _subsManager.AddSubscription(); - } - - public void Unsubscribe() - where T : IntegrationEvent - where TH : IIntegrationEventHandler - { - var eventName = typeof(T).Name.Replace(INTEGRATION_EVENT_SUFIX, ""); - - try - { - _subscriptionClient - .RemoveRuleAsync(eventName) - .GetAwaiter() - .GetResult(); - } - catch (MessagingEntityNotFoundException) - { - _logger.LogInformation($"The messaging entity {eventName} Could not be found."); - } - - _subsManager.RemoveSubscription(); - } - - public void UnsubscribeDynamic(string eventName) - where TH : IDynamicIntegrationEventHandler - { - _subsManager.RemoveDynamicSubscription(eventName); - } - - public void Dispose() - { - _subsManager.Clear(); - } - - private void RegisterSubscriptionClientMessageHandler() - { - _subscriptionClient.RegisterMessageHandler( - async (message, token) => - { - var eventName = $"{message.Label}{INTEGRATION_EVENT_SUFIX}"; - var messageData = Encoding.UTF8.GetString(message.Body); - - // Complete the message so that it is not received again. - if (await ProcessEvent(eventName, messageData)) - { - await _subscriptionClient.CompleteAsync(message.SystemProperties.LockToken); - } - }, - new MessageHandlerOptions(ExceptionReceivedHandler) { MaxConcurrentCalls = 10, AutoComplete = false }); - } - - private Task ExceptionReceivedHandler(ExceptionReceivedEventArgs exceptionReceivedEventArgs) - { - Console.WriteLine($"Message handler encountered an exception {exceptionReceivedEventArgs.Exception}."); - var context = exceptionReceivedEventArgs.ExceptionReceivedContext; - Console.WriteLine("Exception context for troubleshooting:"); - Console.WriteLine($"- Endpoint: {context.Endpoint}"); - Console.WriteLine($"- Entity Path: {context.EntityPath}"); - Console.WriteLine($"- Executing Action: {context.Action}"); - return Task.CompletedTask; - } - - private async Task ProcessEvent(string eventName, string message) - { - var processed = false; - if (_subsManager.HasSubscriptionsForEvent(eventName)) - { - using (var scope = _autofac.BeginLifetimeScope(AUTOFAC_SCOPE_NAME)) - { - var subscriptions = _subsManager.GetHandlersForEvent(eventName); - foreach (var subscription in subscriptions) - { - if (subscription.IsDynamic) - { - var handler = scope.ResolveOptional(subscription.HandlerType) as IDynamicIntegrationEventHandler; - dynamic eventData = JObject.Parse(message); - await handler.Handle(eventData); - } - else - { - var eventType = _subsManager.GetEventTypeByName(eventName); - var integrationEvent = JsonConvert.DeserializeObject(message, eventType); - var handler = scope.ResolveOptional(subscription.HandlerType); - var concreteType = typeof(IIntegrationEventHandler<>).MakeGenericType(eventType); - await (Task)concreteType.GetMethod("Handle").Invoke(handler, new object[] { integrationEvent }); - } - } - } - processed = true; - } - return processed; - } - - private void RemoveDefaultRule() - { - try - { - _subscriptionClient - .RemoveRuleAsync(RuleDescription.DefaultRuleName) - .GetAwaiter() - .GetResult(); - } - catch (MessagingEntityNotFoundException) - { - _logger.LogInformation($"The messaging entity { RuleDescription.DefaultRuleName } Could not be found."); - } - } - } + using Autofac; + using Microsoft.Azure.ServiceBus; + using Microsoft.eShopOnContainers.BuildingBlocks.EventBus; + using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Abstractions; + using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Events; + using Microsoft.Extensions.Logging; + using Newtonsoft.Json; + using Newtonsoft.Json.Linq; + using System; + using System.Text; + using System.Threading.Tasks; + + public class EventBusServiceBus : IEventBus + { + private readonly IServiceBusPersisterConnection _serviceBusPersisterConnection; + private readonly ILogger _logger; + private readonly IEventBusSubscriptionsManager _subsManager; + private readonly SubscriptionClient _subscriptionClient; + private readonly ILifetimeScope _autofac; + private readonly string AUTOFAC_SCOPE_NAME = "eshop_event_bus"; + private const string INTEGRATION_EVENT_SUFIX = "IntegrationEvent"; + + public EventBusServiceBus(IServiceBusPersisterConnection serviceBusPersisterConnection, + ILogger logger, IEventBusSubscriptionsManager subsManager, string subscriptionClientName, + ILifetimeScope autofac) + { + _serviceBusPersisterConnection = serviceBusPersisterConnection; + _logger = logger; + _subsManager = subsManager ?? new InMemoryEventBusSubscriptionsManager(); + + _subscriptionClient = new SubscriptionClient(serviceBusPersisterConnection.ServiceBusConnectionStringBuilder, + subscriptionClientName); + _autofac = autofac; + + RemoveDefaultRule(); + RegisterSubscriptionClientMessageHandler(); + } + + public void Publish(IntegrationEvent @event) + { + var eventName = @event.GetType().Name.Replace(INTEGRATION_EVENT_SUFIX, ""); + var jsonMessage = JsonConvert.SerializeObject(@event); + var body = Encoding.UTF8.GetBytes(jsonMessage); + + var message = new Message + { + MessageId = Guid.NewGuid().ToString(), + Body = body, + Label = eventName, + }; + + var topicClient = _serviceBusPersisterConnection.CreateModel(); + + topicClient.SendAsync(message) + .GetAwaiter() + .GetResult(); + } + + public void SubscribeDynamic(string eventName) + where TH : IDynamicIntegrationEventHandler + { + _subsManager.AddDynamicSubscription(eventName); + } + + public void Subscribe() + where T : IntegrationEvent + where TH : IIntegrationEventHandler + { + var eventName = typeof(T).Name.Replace(INTEGRATION_EVENT_SUFIX, ""); + + var containsKey = _subsManager.HasSubscriptionsForEvent(); + if (!containsKey) + { + try + { + _subscriptionClient.AddRuleAsync(new RuleDescription + { + Filter = new CorrelationFilter { Label = eventName }, + Name = eventName + }).GetAwaiter().GetResult(); + } + catch (ServiceBusException) + { + _logger.LogInformation($"The messaging entity {eventName} already exists."); + } + } + + _subsManager.AddSubscription(); + } + + public void Unsubscribe() + where T : IntegrationEvent + where TH : IIntegrationEventHandler + { + var eventName = typeof(T).Name.Replace(INTEGRATION_EVENT_SUFIX, ""); + + try + { + _subscriptionClient + .RemoveRuleAsync(eventName) + .GetAwaiter() + .GetResult(); + } + catch (MessagingEntityNotFoundException) + { + _logger.LogInformation($"The messaging entity {eventName} Could not be found."); + } + + _subsManager.RemoveSubscription(); + } + + public void UnsubscribeDynamic(string eventName) + where TH : IDynamicIntegrationEventHandler + { + _subsManager.RemoveDynamicSubscription(eventName); + } + + public void Dispose() + { + _subsManager.Clear(); + } + + private void RegisterSubscriptionClientMessageHandler() + { + _subscriptionClient.RegisterMessageHandler( + async (message, token) => + { + var eventName = $"{message.Label}{INTEGRATION_EVENT_SUFIX}"; + var messageData = Encoding.UTF8.GetString(message.Body); + + // Complete the message so that it is not received again. + if (await ProcessEvent(eventName, messageData)) + { + await _subscriptionClient.CompleteAsync(message.SystemProperties.LockToken); + } + }, + new MessageHandlerOptions(ExceptionReceivedHandler) { MaxConcurrentCalls = 10, AutoComplete = false }); + } + + private Task ExceptionReceivedHandler(ExceptionReceivedEventArgs exceptionReceivedEventArgs) + { + Console.WriteLine($"Message handler encountered an exception {exceptionReceivedEventArgs.Exception}."); + var context = exceptionReceivedEventArgs.ExceptionReceivedContext; + Console.WriteLine("Exception context for troubleshooting:"); + Console.WriteLine($"- Endpoint: {context.Endpoint}"); + Console.WriteLine($"- Entity Path: {context.EntityPath}"); + Console.WriteLine($"- Executing Action: {context.Action}"); + return Task.CompletedTask; + } + + private async Task ProcessEvent(string eventName, string message) + { + var processed = false; + if (_subsManager.HasSubscriptionsForEvent(eventName)) + { + using (var scope = _autofac.BeginLifetimeScope(AUTOFAC_SCOPE_NAME)) + { + var subscriptions = _subsManager.GetHandlersForEvent(eventName); + foreach (var subscription in subscriptions) + { + if (subscription.IsDynamic) + { + var handler = scope.ResolveOptional(subscription.HandlerType) as IDynamicIntegrationEventHandler; + dynamic eventData = JObject.Parse(message); + await handler.Handle(eventData); + } + else + { + var eventType = _subsManager.GetEventTypeByName(eventName); + var integrationEvent = JsonConvert.DeserializeObject(message, eventType); + var handler = scope.ResolveOptional(subscription.HandlerType); + var concreteType = typeof(IIntegrationEventHandler<>).MakeGenericType(eventType); + await (Task)concreteType.GetMethod("Handle").Invoke(handler, new object[] { integrationEvent }); + } + } + } + processed = true; + } + return processed; + } + + private void RemoveDefaultRule() + { + try + { + _subscriptionClient + .RemoveRuleAsync(RuleDescription.DefaultRuleName) + .GetAwaiter() + .GetResult(); + } + catch (MessagingEntityNotFoundException) + { + _logger.LogInformation($"The messaging entity { RuleDescription.DefaultRuleName } Could not be found."); + } + } + } } \ No newline at end of file diff --git a/src/BuildingBlocks/EventBus/EventBusServiceBus/IServiceBusPersisterConnection.cs b/src/BuildingBlocks/EventBus/EventBusServiceBus/IServiceBusPersisterConnection.cs index 52737cef7..8a3b5caad 100644 --- a/src/BuildingBlocks/EventBus/EventBusServiceBus/IServiceBusPersisterConnection.cs +++ b/src/BuildingBlocks/EventBus/EventBusServiceBus/IServiceBusPersisterConnection.cs @@ -1,12 +1,14 @@ -namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusServiceBus +using IDisposable = System.IDisposable; + +namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusServiceBus { - using Microsoft.Azure.ServiceBus; - using System; + using ITopicClient = Azure.ServiceBus.ITopicClient; + using ServiceBusConnectionStringBuilder = Azure.ServiceBus.ServiceBusConnectionStringBuilder; - public interface IServiceBusPersisterConnection : IDisposable - { - ServiceBusConnectionStringBuilder ServiceBusConnectionStringBuilder { get; } + public interface IServiceBusPersisterConnection : IDisposable + { + ServiceBusConnectionStringBuilder ServiceBusConnectionStringBuilder { get; } - ITopicClient CreateModel(); - } + ITopicClient CreateModel(); + } } \ No newline at end of file