From 03b80d037efae521abb443ad7c6300f0238f6b03 Mon Sep 17 00:00:00 2001 From: Sumit Ghosh Date: Wed, 13 Oct 2021 16:47:51 +0530 Subject: [PATCH] Moved using statements to globalusing for EventBusServiceBus --- .../DefaultServiceBusPersisterConnection.cs | 100 +++--- .../EventBusServiceBus/EventBusServiceBus.cs | 322 +++++++++--------- .../IServiceBusPersisterConnection.cs | 16 +- 3 files changed, 209 insertions(+), 229 deletions(-) diff --git a/src/BuildingBlocks/EventBus/EventBusServiceBus/DefaultServiceBusPersisterConnection.cs b/src/BuildingBlocks/EventBus/EventBusServiceBus/DefaultServiceBusPersisterConnection.cs index 0dff4154b..84aaa7faa 100644 --- a/src/BuildingBlocks/EventBus/EventBusServiceBus/DefaultServiceBusPersisterConnection.cs +++ b/src/BuildingBlocks/EventBus/EventBusServiceBus/DefaultServiceBusPersisterConnection.cs @@ -1,68 +1,64 @@ -using Microsoft.Azure.ServiceBus; -using System; +namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusServiceBus; -namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusServiceBus +public class DefaultServiceBusPersisterConnection : IServiceBusPersisterConnection { - public class DefaultServiceBusPersisterConnection : IServiceBusPersisterConnection + private readonly ServiceBusConnectionStringBuilder _serviceBusConnectionStringBuilder; + private readonly string _subscriptionClientName; + private SubscriptionClient _subscriptionClient; + private ITopicClient _topicClient; + + bool _disposed; + + public DefaultServiceBusPersisterConnection(ServiceBusConnectionStringBuilder serviceBusConnectionStringBuilder, + string subscriptionClientName) { - private readonly ServiceBusConnectionStringBuilder _serviceBusConnectionStringBuilder; - private readonly string _subscriptionClientName; - private SubscriptionClient _subscriptionClient; - private ITopicClient _topicClient; + _serviceBusConnectionStringBuilder = serviceBusConnectionStringBuilder ?? + throw new ArgumentNullException(nameof(serviceBusConnectionStringBuilder)); + _subscriptionClientName = subscriptionClientName; + _subscriptionClient = new SubscriptionClient(_serviceBusConnectionStringBuilder, subscriptionClientName); + _topicClient = new TopicClient(_serviceBusConnectionStringBuilder, RetryPolicy.Default); + } - bool _disposed; - - public DefaultServiceBusPersisterConnection(ServiceBusConnectionStringBuilder serviceBusConnectionStringBuilder, - string subscriptionClientName) - { - _serviceBusConnectionStringBuilder = serviceBusConnectionStringBuilder ?? - throw new ArgumentNullException(nameof(serviceBusConnectionStringBuilder)); - _subscriptionClientName = subscriptionClientName; - _subscriptionClient = new SubscriptionClient(_serviceBusConnectionStringBuilder, subscriptionClientName); - _topicClient = new TopicClient(_serviceBusConnectionStringBuilder, RetryPolicy.Default); - } - - public ITopicClient TopicClient - { - get - { - if (_topicClient.IsClosedOrClosing) - { - _topicClient = new TopicClient(_serviceBusConnectionStringBuilder, RetryPolicy.Default); - } - return _topicClient; - } - } - - public ISubscriptionClient SubscriptionClient - { - get - { - if (_subscriptionClient.IsClosedOrClosing) - { - _subscriptionClient = new SubscriptionClient(_serviceBusConnectionStringBuilder, _subscriptionClientName); - } - return _subscriptionClient; - } - } - - public ServiceBusConnectionStringBuilder ServiceBusConnectionStringBuilder => _serviceBusConnectionStringBuilder; - - public ITopicClient CreateModel() + public ITopicClient TopicClient + { + get { if (_topicClient.IsClosedOrClosing) { _topicClient = new TopicClient(_serviceBusConnectionStringBuilder, RetryPolicy.Default); } - return _topicClient; } + } - public void Dispose() + public ISubscriptionClient SubscriptionClient + { + get { - if (_disposed) return; - - _disposed = true; + if (_subscriptionClient.IsClosedOrClosing) + { + _subscriptionClient = new SubscriptionClient(_serviceBusConnectionStringBuilder, _subscriptionClientName); + } + return _subscriptionClient; } } + + public ServiceBusConnectionStringBuilder ServiceBusConnectionStringBuilder => _serviceBusConnectionStringBuilder; + + public ITopicClient CreateModel() + { + if (_topicClient.IsClosedOrClosing) + { + _topicClient = new TopicClient(_serviceBusConnectionStringBuilder, RetryPolicy.Default); + } + + return _topicClient; + } + + public void Dispose() + { + if (_disposed) return; + + _disposed = true; + } } diff --git a/src/BuildingBlocks/EventBus/EventBusServiceBus/EventBusServiceBus.cs b/src/BuildingBlocks/EventBus/EventBusServiceBus/EventBusServiceBus.cs index 84229b238..efe59f438 100644 --- a/src/BuildingBlocks/EventBus/EventBusServiceBus/EventBusServiceBus.cs +++ b/src/BuildingBlocks/EventBus/EventBusServiceBus/EventBusServiceBus.cs @@ -1,203 +1,191 @@ -namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusServiceBus +namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusServiceBus; + +public class EventBusServiceBus : IEventBus { - using Autofac; - using Microsoft.Azure.ServiceBus; - using Microsoft.eShopOnContainers.BuildingBlocks.EventBus; - using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Abstractions; - using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Events; - using Microsoft.Extensions.Logging; - using System; - using System.Text; - using System.Text.Json; - using System.Threading.Tasks; + private readonly IServiceBusPersisterConnection _serviceBusPersisterConnection; + private readonly ILogger _logger; + private readonly IEventBusSubscriptionsManager _subsManager; + private readonly ILifetimeScope _autofac; + private readonly string AUTOFAC_SCOPE_NAME = "eshop_event_bus"; + private const string INTEGRATION_EVENT_SUFFIX = "IntegrationEvent"; - public class EventBusServiceBus : IEventBus + public EventBusServiceBus(IServiceBusPersisterConnection serviceBusPersisterConnection, + ILogger logger, IEventBusSubscriptionsManager subsManager, ILifetimeScope autofac) { - private readonly IServiceBusPersisterConnection _serviceBusPersisterConnection; - private readonly ILogger _logger; - private readonly IEventBusSubscriptionsManager _subsManager; - private readonly ILifetimeScope _autofac; - private readonly string AUTOFAC_SCOPE_NAME = "eshop_event_bus"; - private const string INTEGRATION_EVENT_SUFFIX = "IntegrationEvent"; + _serviceBusPersisterConnection = serviceBusPersisterConnection; + _logger = logger ?? throw new ArgumentNullException(nameof(logger)); + _subsManager = subsManager ?? new InMemoryEventBusSubscriptionsManager(); + _autofac = autofac; - public EventBusServiceBus(IServiceBusPersisterConnection serviceBusPersisterConnection, - ILogger logger, IEventBusSubscriptionsManager subsManager, ILifetimeScope autofac) + RemoveDefaultRule(); + RegisterSubscriptionClientMessageHandler(); + } + + public void Publish(IntegrationEvent @event) + { + var eventName = @event.GetType().Name.Replace(INTEGRATION_EVENT_SUFFIX, ""); + var jsonMessage = JsonSerializer.Serialize(@event); + var body = Encoding.UTF8.GetBytes(jsonMessage); + + var message = new Message { - _serviceBusPersisterConnection = serviceBusPersisterConnection; - _logger = logger ?? throw new ArgumentNullException(nameof(logger)); - _subsManager = subsManager ?? new InMemoryEventBusSubscriptionsManager(); - _autofac = autofac; + MessageId = Guid.NewGuid().ToString(), + Body = body, + Label = eventName, + }; - RemoveDefaultRule(); - RegisterSubscriptionClientMessageHandler(); + _serviceBusPersisterConnection.TopicClient.SendAsync(message) + .GetAwaiter() + .GetResult(); + } + + public void SubscribeDynamic(string eventName) + where TH : IDynamicIntegrationEventHandler + { + _logger.LogInformation("Subscribing to dynamic event {EventName} with {EventHandler}", eventName, typeof(TH).Name); + + _subsManager.AddDynamicSubscription(eventName); + } + + public void Subscribe() + where T : IntegrationEvent + where TH : IIntegrationEventHandler + { + var eventName = typeof(T).Name.Replace(INTEGRATION_EVENT_SUFFIX, ""); + + var containsKey = _subsManager.HasSubscriptionsForEvent(); + if (!containsKey) + { + try + { + _serviceBusPersisterConnection.SubscriptionClient.AddRuleAsync(new RuleDescription + { + Filter = new CorrelationFilter { Label = eventName }, + Name = eventName + }).GetAwaiter().GetResult(); + } + catch (ServiceBusException) + { + _logger.LogWarning("The messaging entity {eventName} already exists.", eventName); + } } - public void Publish(IntegrationEvent @event) + _logger.LogInformation("Subscribing to event {EventName} with {EventHandler}", eventName, typeof(TH).Name); + + _subsManager.AddSubscription(); + } + + public void Unsubscribe() + where T : IntegrationEvent + where TH : IIntegrationEventHandler + { + var eventName = typeof(T).Name.Replace(INTEGRATION_EVENT_SUFFIX, ""); + + try { - var eventName = @event.GetType().Name.Replace(INTEGRATION_EVENT_SUFFIX, ""); - var jsonMessage = JsonSerializer.Serialize(@event); - var body = Encoding.UTF8.GetBytes(jsonMessage); - - var message = new Message - { - MessageId = Guid.NewGuid().ToString(), - Body = body, - Label = eventName, - }; - - _serviceBusPersisterConnection.TopicClient.SendAsync(message) + _serviceBusPersisterConnection + .SubscriptionClient + .RemoveRuleAsync(eventName) .GetAwaiter() .GetResult(); } - - public void SubscribeDynamic(string eventName) - where TH : IDynamicIntegrationEventHandler + catch (MessagingEntityNotFoundException) { - _logger.LogInformation("Subscribing to dynamic event {EventName} with {EventHandler}", eventName, typeof(TH).Name); - - _subsManager.AddDynamicSubscription(eventName); + _logger.LogWarning("The messaging entity {eventName} Could not be found.", eventName); } - public void Subscribe() - where T : IntegrationEvent - where TH : IIntegrationEventHandler - { - var eventName = typeof(T).Name.Replace(INTEGRATION_EVENT_SUFFIX, ""); + _logger.LogInformation("Unsubscribing from event {EventName}", eventName); - var containsKey = _subsManager.HasSubscriptionsForEvent(); - if (!containsKey) + _subsManager.RemoveSubscription(); + } + + public void UnsubscribeDynamic(string eventName) + where TH : IDynamicIntegrationEventHandler + { + _logger.LogInformation("Unsubscribing from dynamic event {EventName}", eventName); + + _subsManager.RemoveDynamicSubscription(eventName); + } + + public void Dispose() + { + _subsManager.Clear(); + } + + private void RegisterSubscriptionClientMessageHandler() + { + _serviceBusPersisterConnection.SubscriptionClient.RegisterMessageHandler( + async (message, token) => { - try + var eventName = $"{message.Label}{INTEGRATION_EVENT_SUFFIX}"; + var messageData = Encoding.UTF8.GetString(message.Body); + + // Complete the message so that it is not received again. + if (await ProcessEvent(eventName, messageData)) { - _serviceBusPersisterConnection.SubscriptionClient.AddRuleAsync(new RuleDescription - { - Filter = new CorrelationFilter { Label = eventName }, - Name = eventName - }).GetAwaiter().GetResult(); + await _serviceBusPersisterConnection.SubscriptionClient.CompleteAsync(message.SystemProperties.LockToken); } - catch (ServiceBusException) - { - _logger.LogWarning("The messaging entity {eventName} already exists.", eventName); - } - } + }, + new MessageHandlerOptions(ExceptionReceivedHandler) { MaxConcurrentCalls = 10, AutoComplete = false }); + } - _logger.LogInformation("Subscribing to event {EventName} with {EventHandler}", eventName, typeof(TH).Name); + private Task ExceptionReceivedHandler(ExceptionReceivedEventArgs exceptionReceivedEventArgs) + { + var ex = exceptionReceivedEventArgs.Exception; + var context = exceptionReceivedEventArgs.ExceptionReceivedContext; - _subsManager.AddSubscription(); - } + _logger.LogError(ex, "ERROR handling message: {ExceptionMessage} - Context: {@ExceptionContext}", ex.Message, context); - public void Unsubscribe() - where T : IntegrationEvent - where TH : IIntegrationEventHandler + return Task.CompletedTask; + } + + private async Task ProcessEvent(string eventName, string message) + { + var processed = false; + if (_subsManager.HasSubscriptionsForEvent(eventName)) { - var eventName = typeof(T).Name.Replace(INTEGRATION_EVENT_SUFFIX, ""); - - try + using (var scope = _autofac.BeginLifetimeScope(AUTOFAC_SCOPE_NAME)) { - _serviceBusPersisterConnection - .SubscriptionClient - .RemoveRuleAsync(eventName) - .GetAwaiter() - .GetResult(); - } - catch (MessagingEntityNotFoundException) - { - _logger.LogWarning("The messaging entity {eventName} Could not be found.", eventName); - } - - _logger.LogInformation("Unsubscribing from event {EventName}", eventName); - - _subsManager.RemoveSubscription(); - } - - public void UnsubscribeDynamic(string eventName) - where TH : IDynamicIntegrationEventHandler - { - _logger.LogInformation("Unsubscribing from dynamic event {EventName}", eventName); - - _subsManager.RemoveDynamicSubscription(eventName); - } - - public void Dispose() - { - _subsManager.Clear(); - } - - private void RegisterSubscriptionClientMessageHandler() - { - _serviceBusPersisterConnection.SubscriptionClient.RegisterMessageHandler( - async (message, token) => + var subscriptions = _subsManager.GetHandlersForEvent(eventName); + foreach (var subscription in subscriptions) { - var eventName = $"{message.Label}{INTEGRATION_EVENT_SUFFIX}"; - var messageData = Encoding.UTF8.GetString(message.Body); - - // Complete the message so that it is not received again. - if (await ProcessEvent(eventName, messageData)) + if (subscription.IsDynamic) { - await _serviceBusPersisterConnection.SubscriptionClient.CompleteAsync(message.SystemProperties.LockToken); - } - }, - new MessageHandlerOptions(ExceptionReceivedHandler) { MaxConcurrentCalls = 10, AutoComplete = false }); - } - - private Task ExceptionReceivedHandler(ExceptionReceivedEventArgs exceptionReceivedEventArgs) - { - var ex = exceptionReceivedEventArgs.Exception; - var context = exceptionReceivedEventArgs.ExceptionReceivedContext; - - _logger.LogError(ex, "ERROR handling message: {ExceptionMessage} - Context: {@ExceptionContext}", ex.Message, context); - - return Task.CompletedTask; - } - - private async Task ProcessEvent(string eventName, string message) - { - var processed = false; - if (_subsManager.HasSubscriptionsForEvent(eventName)) - { - using (var scope = _autofac.BeginLifetimeScope(AUTOFAC_SCOPE_NAME)) - { - var subscriptions = _subsManager.GetHandlersForEvent(eventName); - foreach (var subscription in subscriptions) - { - if (subscription.IsDynamic) - { - var handler = scope.ResolveOptional(subscription.HandlerType) as IDynamicIntegrationEventHandler; - if (handler == null) continue; + var handler = scope.ResolveOptional(subscription.HandlerType) as IDynamicIntegrationEventHandler; + if (handler == null) continue; - using dynamic eventData = JsonDocument.Parse(message); - await handler.Handle(eventData); - } - else - { - var handler = scope.ResolveOptional(subscription.HandlerType); - if (handler == null) continue; - var eventType = _subsManager.GetEventTypeByName(eventName); - var integrationEvent = JsonSerializer.Deserialize(message, eventType); - var concreteType = typeof(IIntegrationEventHandler<>).MakeGenericType(eventType); - await (Task)concreteType.GetMethod("Handle").Invoke(handler, new object[] { integrationEvent }); - } + using dynamic eventData = JsonDocument.Parse(message); + await handler.Handle(eventData); + } + else + { + var handler = scope.ResolveOptional(subscription.HandlerType); + if (handler == null) continue; + var eventType = _subsManager.GetEventTypeByName(eventName); + var integrationEvent = JsonSerializer.Deserialize(message, eventType); + var concreteType = typeof(IIntegrationEventHandler<>).MakeGenericType(eventType); + await (Task)concreteType.GetMethod("Handle").Invoke(handler, new object[] { integrationEvent }); } } - processed = true; } - return processed; + processed = true; } + return processed; + } - private void RemoveDefaultRule() + private void RemoveDefaultRule() + { + try { - try - { - _serviceBusPersisterConnection - .SubscriptionClient - .RemoveRuleAsync(RuleDescription.DefaultRuleName) - .GetAwaiter() - .GetResult(); - } - catch (MessagingEntityNotFoundException) - { - _logger.LogWarning("The messaging entity {DefaultRuleName} Could not be found.", RuleDescription.DefaultRuleName); - } + _serviceBusPersisterConnection + .SubscriptionClient + .RemoveRuleAsync(RuleDescription.DefaultRuleName) + .GetAwaiter() + .GetResult(); + } + catch (MessagingEntityNotFoundException) + { + _logger.LogWarning("The messaging entity {DefaultRuleName} Could not be found.", RuleDescription.DefaultRuleName); } } } diff --git a/src/BuildingBlocks/EventBus/EventBusServiceBus/IServiceBusPersisterConnection.cs b/src/BuildingBlocks/EventBus/EventBusServiceBus/IServiceBusPersisterConnection.cs index 8863db62e..c02e957d4 100644 --- a/src/BuildingBlocks/EventBus/EventBusServiceBus/IServiceBusPersisterConnection.cs +++ b/src/BuildingBlocks/EventBus/EventBusServiceBus/IServiceBusPersisterConnection.cs @@ -1,11 +1,7 @@ -namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusServiceBus -{ - using Microsoft.Azure.ServiceBus; - using System; +namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusServiceBus; - public interface IServiceBusPersisterConnection : IDisposable - { - ITopicClient TopicClient { get; } - ISubscriptionClient SubscriptionClient { get; } - } -} \ No newline at end of file +public interface IServiceBusPersisterConnection : IDisposable +{ + ITopicClient TopicClient { get; } + ISubscriptionClient SubscriptionClient { get; } +}