using Azure.Messaging.ServiceBus; using Azure.Messaging.ServiceBus.Administration; using Autofac; using Microsoft.eShopOnContainers.BuildingBlocks.EventBus; using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Abstractions; using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Events; using Microsoft.Extensions.Logging; using System; using System.Text; using System.Text.Json; using System.Threading.Tasks; namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusServiceBus { public class EventBusServiceBus : IEventBus { private readonly IServiceBusPersisterConnection _serviceBusPersisterConnection; private readonly ILogger _logger; private readonly IEventBusSubscriptionsManager _subsManager; private readonly ILifetimeScope _autofac; private readonly string _topicName; private readonly string _subscriptionName; private ServiceBusSender _sender; private ServiceBusProcessor _processor; private readonly string AUTOFAC_SCOPE_NAME = "eshop_event_bus"; private const string INTEGRATION_EVENT_SUFFIX = "IntegrationEvent"; public EventBusServiceBus(IServiceBusPersisterConnection serviceBusPersisterConnection, ILogger logger, IEventBusSubscriptionsManager subsManager, ILifetimeScope autofac, string topicName, string subscriptionName) { _serviceBusPersisterConnection = serviceBusPersisterConnection; _logger = logger ?? throw new ArgumentNullException(nameof(logger)); _subsManager = subsManager ?? new InMemoryEventBusSubscriptionsManager(); _autofac = autofac; _topicName = topicName; _subscriptionName = subscriptionName; _sender = _serviceBusPersisterConnection.TopicClient.CreateSender(_topicName); RemoveDefaultRule(); RegisterSubscriptionClientMessageHandlerAsync().GetAwaiter().GetResult(); } public void Publish(IntegrationEvent @event) { var eventName = @event.GetType().Name.Replace(INTEGRATION_EVENT_SUFFIX, ""); var message = new ServiceBusMessage { MessageId = Guid.NewGuid().ToString(), Body = new BinaryData(@event), Subject = eventName, }; _sender.SendMessageAsync(message) .GetAwaiter() .GetResult(); } public void SubscribeDynamic(string eventName) where TH : IDynamicIntegrationEventHandler { _logger.LogInformation("Subscribing to dynamic event {EventName} with {EventHandler}", eventName, typeof(TH).Name); _subsManager.AddDynamicSubscription(eventName); } public void Subscribe() where T : IntegrationEvent where TH : IIntegrationEventHandler { var eventName = typeof(T).Name.Replace(INTEGRATION_EVENT_SUFFIX, ""); var containsKey = _subsManager.HasSubscriptionsForEvent(); if (!containsKey) { try { _serviceBusPersisterConnection.AdministrationClient.CreateRuleAsync(_topicName, _subscriptionName, new CreateRuleOptions { Filter = new CorrelationRuleFilter() { Subject = eventName }, Name = eventName }).GetAwaiter().GetResult(); } catch (ServiceBusException) { _logger.LogWarning("The messaging entity {eventName} already exists.", eventName); } } _logger.LogInformation("Subscribing to event {EventName} with {EventHandler}", eventName, typeof(TH).Name); _subsManager.AddSubscription(); } public void Unsubscribe() where T : IntegrationEvent where TH : IIntegrationEventHandler { var eventName = typeof(T).Name.Replace(INTEGRATION_EVENT_SUFFIX, ""); try { _serviceBusPersisterConnection .AdministrationClient .DeleteRuleAsync(_topicName, _subscriptionName, eventName) .GetAwaiter() .GetResult(); } catch (ServiceBusException ex) when (ex.Reason == ServiceBusFailureReason.MessagingEntityNotFound) { _logger.LogWarning("The messaging entity {eventName} Could not be found.", eventName); } _logger.LogInformation("Unsubscribing from event {EventName}", eventName); _subsManager.RemoveSubscription(); } public void UnsubscribeDynamic(string eventName) where TH : IDynamicIntegrationEventHandler { _logger.LogInformation("Unsubscribing from dynamic event {EventName}", eventName); _subsManager.RemoveDynamicSubscription(eventName); } public void Dispose() { _subsManager.Clear(); this._processor.CloseAsync().GetAwaiter().GetResult(); } private async Task RegisterSubscriptionClientMessageHandlerAsync() { ServiceBusProcessorOptions options = new ServiceBusProcessorOptions { MaxConcurrentCalls = 10, AutoCompleteMessages = false }; this._processor = _serviceBusPersisterConnection.TopicClient.CreateProcessor(_topicName, options); this._processor.ProcessMessageAsync += async (args) => { var eventName = $"{args.Message.Subject}{INTEGRATION_EVENT_SUFFIX}"; string messageData = args.Message.Body.ToString(); // Complete the message so that it is not received again. if (await ProcessEvent(eventName, messageData)) { await args.CompleteMessageAsync(args.Message); } }; this._processor.ProcessErrorAsync += ErrorHandler; await this._processor.StartProcessingAsync(); } private Task ErrorHandler(ProcessErrorEventArgs args) { var ex = args.Exception; var context = args.ErrorSource; _logger.LogError(ex, "ERROR handling message: {ExceptionMessage} - Context: {@ExceptionContext}", ex.Message, context); return Task.CompletedTask; } private async Task ProcessEvent(string eventName, string message) { var processed = false; if (_subsManager.HasSubscriptionsForEvent(eventName)) { using (var scope = _autofac.BeginLifetimeScope(AUTOFAC_SCOPE_NAME)) { var subscriptions = _subsManager.GetHandlersForEvent(eventName); foreach (var subscription in subscriptions) { if (subscription.IsDynamic) { var handler = scope.ResolveOptional(subscription.HandlerType) as IDynamicIntegrationEventHandler; if (handler == null) continue; using dynamic eventData = JsonDocument.Parse(message); await handler.Handle(eventData); } else { var handler = scope.ResolveOptional(subscription.HandlerType); if (handler == null) continue; var eventType = _subsManager.GetEventTypeByName(eventName); var integrationEvent = JsonSerializer.Deserialize(message, eventType); var concreteType = typeof(IIntegrationEventHandler<>).MakeGenericType(eventType); await (Task)concreteType.GetMethod("Handle").Invoke(handler, new object[] { integrationEvent }); } } } processed = true; } return processed; } private void RemoveDefaultRule() { try { _serviceBusPersisterConnection .AdministrationClient .DeleteRuleAsync(_topicName, _subscriptionName, RuleProperties.DefaultRuleName) .GetAwaiter() .GetResult(); } catch (ServiceBusException ex) when (ex.Reason == ServiceBusFailureReason.MessagingEntityNotFound) { _logger.LogWarning("The messaging entity {DefaultRuleName} Could not be found.", RuleProperties.DefaultRuleName); } } } }