using Azure.Messaging.ServiceBus; using Azure.Messaging.ServiceBus.Administration; using Autofac; using Microsoft.eShopOnContainers.BuildingBlocks.EventBus; using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Abstractions; using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Events; using Microsoft.Extensions.Logging; using System; using System.Text; using System.Text.Json; using System.Threading.Tasks; namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusServiceBus { public class EventBusServiceBus : IEventBus { private readonly IServiceBusPersisterConnection _serviceBusPersisterConnection; private readonly ILogger<EventBusServiceBus> _logger; private readonly IEventBusSubscriptionsManager _subsManager; private readonly ILifetimeScope _autofac; private readonly string _topicName; private readonly string _subscriptionName; private ServiceBusSender _sender; private ServiceBusProcessor processor; private readonly string AUTOFAC_SCOPE_NAME = "eshop_event_bus"; private const string INTEGRATION_EVENT_SUFFIX = "IntegrationEvent"; public EventBusServiceBus(IServiceBusPersisterConnection serviceBusPersisterConnection, ILogger<EventBusServiceBus> logger, IEventBusSubscriptionsManager subsManager, ILifetimeScope autofac, string topicName, string subscriptionName) { _serviceBusPersisterConnection = serviceBusPersisterConnection; _logger = logger ?? throw new ArgumentNullException(nameof(logger)); _subsManager = subsManager ?? new InMemoryEventBusSubscriptionsManager(); _autofac = autofac; _topicName = topicName; _subscriptionName = subscriptionName; _sender = _serviceBusPersisterConnection.TopicClient.CreateSender(_topicName); RemoveDefaultRule(); RegisterSubscriptionClientMessageHandler(); } public void Publish(IntegrationEvent @event) { var eventName = @event.GetType().Name.Replace(INTEGRATION_EVENT_SUFFIX, ""); var message = new ServiceBusMessage { MessageId = Guid.NewGuid().ToString(), Body = new BinaryData(@event), Subject = eventName, }; _sender.SendMessageAsync(message) .GetAwaiter() .GetResult(); } public void SubscribeDynamic<TH>(string eventName) where TH : IDynamicIntegrationEventHandler { _logger.LogInformation("Subscribing to dynamic event {EventName} with {EventHandler}", eventName, typeof(TH).Name); _subsManager.AddDynamicSubscription<TH>(eventName); } public void Subscribe<T, TH>() where T : IntegrationEvent where TH : IIntegrationEventHandler<T> { var eventName = typeof(T).Name.Replace(INTEGRATION_EVENT_SUFFIX, ""); var containsKey = _subsManager.HasSubscriptionsForEvent<T>(); if (!containsKey) { try { _serviceBusPersisterConnection.AdministrationClient.CreateRuleAsync(_topicName, _subscriptionName, new CreateRuleOptions { Filter = new CorrelationRuleFilter() { Subject = eventName }, Name = eventName }).GetAwaiter().GetResult(); } catch (ServiceBusException) { _logger.LogWarning("The messaging entity {eventName} already exists.", eventName); } } _logger.LogInformation("Subscribing to event {EventName} with {EventHandler}", eventName, typeof(TH).Name); _subsManager.AddSubscription<T, TH>(); } public void Unsubscribe<T, TH>() where T : IntegrationEvent where TH : IIntegrationEventHandler<T> { var eventName = typeof(T).Name.Replace(INTEGRATION_EVENT_SUFFIX, ""); try { _serviceBusPersisterConnection .AdministrationClient .DeleteRuleAsync(_topicName, _subscriptionName, eventName) .GetAwaiter() .GetResult(); } catch (ServiceBusException ex) when (ex.Reason == ServiceBusFailureReason.MessagingEntityNotFound) { _logger.LogWarning("The messaging entity {eventName} Could not be found.", eventName); } _logger.LogInformation("Unsubscribing from event {EventName}", eventName); _subsManager.RemoveSubscription<T, TH>(); } public void UnsubscribeDynamic<TH>(string eventName) where TH : IDynamicIntegrationEventHandler { _logger.LogInformation("Unsubscribing from dynamic event {EventName}", eventName); _subsManager.RemoveDynamicSubscription<TH>(eventName); } public void Dispose() { _subsManager.Clear(); this.processor.CloseAsync(); } private void RegisterSubscriptionClientMessageHandler() { ServiceBusProcessorOptions options = new ServiceBusProcessorOptions { MaxConcurrentCalls = 10, AutoCompleteMessages = false }; this.processor = _serviceBusPersisterConnection.TopicClient.CreateProcessor(_topicName, options); processor.ProcessMessageAsync += async (args) => { var eventName = $"{args.Message.Subject}{INTEGRATION_EVENT_SUFFIX}"; string messageData = args.Message.Body.ToString(); // Complete the message so that it is not received again. if (await ProcessEvent(eventName, messageData)) { await args.CompleteMessageAsync(args.Message); } }; processor.ProcessErrorAsync += ErrorHandler; processor.StartProcessingAsync(); } private Task ErrorHandler(ProcessErrorEventArgs exceptionReceivedEventArgs) { var ex = exceptionReceivedEventArgs.Exception; var context = exceptionReceivedEventArgs.ErrorSource; _logger.LogError(ex, "ERROR handling message: {ExceptionMessage} - Context: {@ExceptionContext}", ex.Message, context); return Task.CompletedTask; } private async Task<bool> ProcessEvent(string eventName, string message) { var processed = false; if (_subsManager.HasSubscriptionsForEvent(eventName)) { using (var scope = _autofac.BeginLifetimeScope(AUTOFAC_SCOPE_NAME)) { var subscriptions = _subsManager.GetHandlersForEvent(eventName); foreach (var subscription in subscriptions) { if (subscription.IsDynamic) { var handler = scope.ResolveOptional(subscription.HandlerType) as IDynamicIntegrationEventHandler; if (handler == null) continue; using dynamic eventData = JsonDocument.Parse(message); await handler.Handle(eventData); } else { var handler = scope.ResolveOptional(subscription.HandlerType); if (handler == null) continue; var eventType = _subsManager.GetEventTypeByName(eventName); var integrationEvent = JsonSerializer.Deserialize(message, eventType); var concreteType = typeof(IIntegrationEventHandler<>).MakeGenericType(eventType); await (Task)concreteType.GetMethod("Handle").Invoke(handler, new object[] { integrationEvent }); } } } processed = true; } return processed; } private void RemoveDefaultRule() { try { _serviceBusPersisterConnection .AdministrationClient .DeleteRuleAsync(_topicName, _subscriptionName, RuleProperties.DefaultRuleName) .GetAwaiter() .GetResult(); } catch (ServiceBusException ex) when (ex.Reason == ServiceBusFailureReason.MessagingEntityNotFound) { _logger.LogWarning("The messaging entity {DefaultRuleName} Could not be found.", RuleProperties.DefaultRuleName); } } } }