namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusServiceBus { using Autofac; using Microsoft.Azure.ServiceBus; using Microsoft.eShopOnContainers.BuildingBlocks.EventBus; using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Abstractions; using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Events; using Microsoft.Extensions.Logging; using Newtonsoft.Json; using Newtonsoft.Json.Linq; using System; using System.Text; using System.Threading.Tasks; public class EventBusServiceBus : IEventBus { private readonly IServiceBusPersisterConnection _serviceBusPersisterConnection; private readonly ILogger _logger; private readonly IEventBusSubscriptionsManager _subsManager; private readonly SubscriptionClient _subscriptionClient; private readonly ILifetimeScope _autofac; private readonly string AUTOFAC_SCOPE_NAME = "eshop_event_bus"; private const string INTEGRATION_EVENT_SUFIX = "IntegrationEvent"; public EventBusServiceBus(IServiceBusPersisterConnection serviceBusPersisterConnection, ILogger logger, IEventBusSubscriptionsManager subsManager, string subscriptionClientName, ILifetimeScope autofac) { _serviceBusPersisterConnection = serviceBusPersisterConnection; _logger = logger; _subsManager = subsManager ?? new InMemoryEventBusSubscriptionsManager(); _subscriptionClient = new SubscriptionClient(serviceBusPersisterConnection.ServiceBusConnectionStringBuilder, subscriptionClientName); _autofac = autofac; RemoveDefaultRule(); RegisterSubscriptionClientMessageHandler(); } public void Publish(IntegrationEvent @event) { var eventName = @event.GetType().Name.Replace(INTEGRATION_EVENT_SUFIX, ""); var jsonMessage = JsonConvert.SerializeObject(@event); var body = Encoding.UTF8.GetBytes(jsonMessage); var message = new Message { MessageId = Guid.NewGuid().ToString(), Body = body, Label = eventName, }; var topicClient = _serviceBusPersisterConnection.CreateModel(); topicClient.SendAsync(message) .GetAwaiter() .GetResult(); } public void SubscribeDynamic(string eventName) where TH : IDynamicIntegrationEventHandler { _subsManager.AddDynamicSubscription(eventName); } public void Subscribe() where T : IntegrationEvent where TH : IIntegrationEventHandler { var eventName = typeof(T).Name.Replace(INTEGRATION_EVENT_SUFIX, ""); var containsKey = _subsManager.HasSubscriptionsForEvent(); if (!containsKey) { try { _subscriptionClient.AddRuleAsync(new RuleDescription { Filter = new CorrelationFilter { Label = eventName }, Name = eventName }).GetAwaiter().GetResult(); } catch (ServiceBusException) { _logger.LogInformation($"The messaging entity {eventName} already exists."); } } _subsManager.AddSubscription(); } public void Unsubscribe() where T : IntegrationEvent where TH : IIntegrationEventHandler { var eventName = typeof(T).Name.Replace(INTEGRATION_EVENT_SUFIX, ""); try { _subscriptionClient .RemoveRuleAsync(eventName) .GetAwaiter() .GetResult(); } catch (MessagingEntityNotFoundException) { _logger.LogInformation($"The messaging entity {eventName} Could not be found."); } _subsManager.RemoveSubscription(); } public void UnsubscribeDynamic(string eventName) where TH : IDynamicIntegrationEventHandler { _subsManager.RemoveDynamicSubscription(eventName); } public void Dispose() { _subsManager.Clear(); } private void RegisterSubscriptionClientMessageHandler() { _subscriptionClient.RegisterMessageHandler( async (message, token) => { var eventName = $"{message.Label}{INTEGRATION_EVENT_SUFIX}"; var messageData = Encoding.UTF8.GetString(message.Body); // Complete the message so that it is not received again. if (await ProcessEvent(eventName, messageData)) { await _subscriptionClient.CompleteAsync(message.SystemProperties.LockToken); } }, new MessageHandlerOptions(ExceptionReceivedHandler) { MaxConcurrentCalls = 10, AutoComplete = false }); } private Task ExceptionReceivedHandler(ExceptionReceivedEventArgs exceptionReceivedEventArgs) { Console.WriteLine($"Message handler encountered an exception {exceptionReceivedEventArgs.Exception}."); var context = exceptionReceivedEventArgs.ExceptionReceivedContext; Console.WriteLine("Exception context for troubleshooting:"); Console.WriteLine($"- Endpoint: {context.Endpoint}"); Console.WriteLine($"- Entity Path: {context.EntityPath}"); Console.WriteLine($"- Executing Action: {context.Action}"); return Task.CompletedTask; } private async Task ProcessEvent(string eventName, string message) { var processed = false; if (_subsManager.HasSubscriptionsForEvent(eventName)) { using (var scope = _autofac.BeginLifetimeScope(AUTOFAC_SCOPE_NAME)) { var subscriptions = _subsManager.GetHandlersForEvent(eventName); foreach (var subscription in subscriptions) { if (subscription.IsDynamic) { var handler = scope.ResolveOptional(subscription.HandlerType) as IDynamicIntegrationEventHandler; dynamic eventData = JObject.Parse(message); await handler.Handle(eventData); } else { var eventType = _subsManager.GetEventTypeByName(eventName); var integrationEvent = JsonConvert.DeserializeObject(message, eventType); var handler = scope.ResolveOptional(subscription.HandlerType); var concreteType = typeof(IIntegrationEventHandler<>).MakeGenericType(eventType); await (Task)concreteType.GetMethod("Handle").Invoke(handler, new object[] { integrationEvent }); } } } processed = true; } return processed; } private void RemoveDefaultRule() { try { _subscriptionClient .RemoveRuleAsync(RuleDescription.DefaultRuleName) .GetAwaiter() .GetResult(); } catch (MessagingEntityNotFoundException) { _logger.LogInformation($"The messaging entity { RuleDescription.DefaultRuleName } Could not be found."); } } } }