namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusServiceBus { using System; using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Abstractions; using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Events; using Microsoft.Extensions.Logging; using Microsoft.Azure.ServiceBus; using Newtonsoft.Json; using System.Text; using System.Threading.Tasks; using Microsoft.eShopOnContainers.BuildingBlocks.EventBus; using System.Reflection; using Microsoft.Azure.ServiceBus.Filters; public class EventBusServiceBus : IEventBus { private readonly IServiceBusPersisterConnection _serviceBusPersisterConnection; private ServiceBusConnectionStringBuilder _serviceBusConnectionStringBuilder; private readonly ILogger _logger; private readonly IEventBusSubscriptionsManager _subsManager; private readonly SubscriptionClient _subscriptionClient; public EventBusServiceBus(IServiceBusPersisterConnection serviceBusPersisterConnection, ILogger logger, IEventBusSubscriptionsManager subsManager, string subscriptionClientName) { _serviceBusPersisterConnection = serviceBusPersisterConnection; _logger = logger; _subsManager = subsManager ?? new InMemoryEventBusSubscriptionsManager(); _subscriptionClient = new SubscriptionClient(serviceBusPersisterConnection.ServiceBusConnectionStringBuilder, subscriptionClientName); RemoveDefaultRule(); RegisterSubscriptionClientMessageHandler(); } public void Publish(IntegrationEvent @event) { var eventName = @event.GetType().Name; var jsonMessage = JsonConvert.SerializeObject(@event); var body = Encoding.UTF8.GetBytes(jsonMessage); var message = new Message { MessageId = new Guid().ToString(), Body = Encoding.UTF8.GetBytes(jsonMessage), Label = eventName, }; var topicClient = _serviceBusPersisterConnection.CreateModel(); topicClient.SendAsync(message) .GetAwaiter() .GetResult(); } public void Subscribe(Func handler) where T : IntegrationEvent where TH : IIntegrationEventHandler { var eventName = typeof(T).Name; var containsKey = _subsManager.HasSubscriptionsForEvent(); if (!containsKey) { try { _subscriptionClient.AddRuleAsync(new RuleDescription { Filter = new CorrelationFilter { Label = eventName }, Name = eventName }).GetAwaiter().GetResult(); } catch(ServiceBusException) { _logger.LogInformation($"The messaging entity {eventName} already exists."); } } _subsManager.AddSubscription(handler); } public void Unsubscribe() where T : IntegrationEvent where TH : IIntegrationEventHandler { var eventName = typeof(T).Name; try { _subscriptionClient .RemoveRuleAsync(eventName) .GetAwaiter() .GetResult(); } catch (MessagingEntityNotFoundException) { _logger.LogInformation($"The messaging entity {eventName} Could not be found."); } _subsManager.RemoveSubscription(); } public void Dispose() { _subsManager.Clear(); } private void RegisterSubscriptionClientMessageHandler() { _subscriptionClient.RegisterMessageHandler( async (message, token) => { var eventName = message.Label; var messageData = Encoding.UTF8.GetString(message.Body); await ProcessEvent(eventName, messageData); }, new MessageHandlerOptions() { MaxConcurrentCalls = 10, AutoComplete = true }); } private async Task ProcessEvent(string eventName, string message) { if (_subsManager.HasSubscriptionsForEvent(eventName)) { var eventType = _subsManager.GetEventTypeByName(eventName); var integrationEvent = JsonConvert.DeserializeObject(message, eventType); var handlers = _subsManager.GetHandlersForEvent(eventName); foreach (var handlerfactory in handlers) { var handler = handlerfactory.DynamicInvoke(); var concreteType = typeof(IIntegrationEventHandler<>).MakeGenericType(eventType); await (Task)concreteType.GetMethod("Handle").Invoke(handler, new object[] { integrationEvent }); } } } private void RemoveDefaultRule() { try { _subscriptionClient .RemoveRuleAsync(SubscriptionClient.DefaultRule) .GetAwaiter() .GetResult(); } catch (MessagingEntityNotFoundException) { _logger.LogInformation($"The messaging entity {SubscriptionClient.DefaultRule} Could not be found."); } } } }