using Microsoft.eShopOnContainers.BuildingBlocks.EventBus; using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Abstractions; using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Events; using Microsoft.Extensions.Logging; using Newtonsoft.Json; using Newtonsoft.Json.Linq; using Polly; using Polly.Retry; using RabbitMQ.Client; using RabbitMQ.Client.Events; using RabbitMQ.Client.Exceptions; using System; using System.Collections.Generic; using System.Linq; using System.Net.Sockets; using System.Reflection; using System.Text; using System.Threading.Tasks; namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ { public class EventBusRabbitMQ : IEventBus, IDisposable { const string BROKER_NAME = "eshop_event_bus"; private readonly IRabbitMQPersistentConnection _persistentConnection; private readonly ILogger _logger; private readonly IEventBusSubscriptionsManager _subsManager; private IModel _consumerChannel; private string _queueName; public EventBusRabbitMQ(IRabbitMQPersistentConnection persistentConnection, ILogger logger, IEventBusSubscriptionsManager subsManager) { _persistentConnection = persistentConnection ?? throw new ArgumentNullException(nameof(persistentConnection)); _logger = logger ?? throw new ArgumentNullException(nameof(logger)); _subsManager = subsManager ?? new InMemoryEventBusSubscriptionsManager(); _consumerChannel = CreateConsumerChannel(); _subsManager.OnEventRemoved += SubsManager_OnEventRemoved; } private void SubsManager_OnEventRemoved(object sender, string eventName) { if (!_persistentConnection.IsConnected) { _persistentConnection.TryConnect(); } using (var channel = _persistentConnection.CreateModel()) { channel.QueueUnbind(queue: _queueName, exchange: BROKER_NAME, routingKey: eventName); if (_subsManager.IsEmpty) { _queueName = string.Empty; _consumerChannel.Close(); } } } public void Publish(IntegrationEvent @event) { if (!_persistentConnection.IsConnected) { _persistentConnection.TryConnect(); } var policy = RetryPolicy.Handle() .Or() .WaitAndRetry(5, retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)), (ex, time) => { _logger.LogWarning(ex.ToString()); }); using (var channel = _persistentConnection.CreateModel()) { var eventName = @event.GetType() .Name; channel.ExchangeDeclare(exchange: BROKER_NAME, type: "direct"); var message = JsonConvert.SerializeObject(@event); var body = Encoding.UTF8.GetBytes(message); policy.Execute(() => { channel.BasicPublish(exchange: BROKER_NAME, routingKey: eventName, basicProperties: null, body: body); }); } } public void SubscribeDynamic(string eventName, Func handler) where TH: IDynamicIntegrationEventHandler { DoInternalSubscription(eventName); _subsManager.AddDynamicSubscription(eventName,handler); } public void Subscribe(Func handler) where T : IntegrationEvent where TH : IIntegrationEventHandler { var eventName = _subsManager.GetEventKey(); DoInternalSubscription(eventName); _subsManager.AddSubscription(handler); } private void DoInternalSubscription(string eventName) { var containsKey = _subsManager.HasSubscriptionsForEvent(eventName); if (!containsKey) { if (!_persistentConnection.IsConnected) { _persistentConnection.TryConnect(); } using (var channel = _persistentConnection.CreateModel()) { channel.QueueBind(queue: _queueName, exchange: BROKER_NAME, routingKey: eventName); } } } public void Unsubscribe() where TH : IIntegrationEventHandler where T : IntegrationEvent { _subsManager.RemoveSubscription(); } public void UnsubscribeDynamic(string eventName) where TH: IDynamicIntegrationEventHandler { _subsManager.RemoveDynamicSubscription(eventName); } public void Dispose() { if (_consumerChannel != null) { _consumerChannel.Dispose(); } _subsManager.Clear(); } private IModel CreateConsumerChannel() { if (!_persistentConnection.IsConnected) { _persistentConnection.TryConnect(); } var channel = _persistentConnection.CreateModel(); channel.ExchangeDeclare(exchange: BROKER_NAME, type: "direct"); _queueName = channel.QueueDeclare().QueueName; var consumer = new EventingBasicConsumer(channel); consumer.Received += async (model, ea) => { var eventName = ea.RoutingKey; var message = Encoding.UTF8.GetString(ea.Body); await ProcessEvent(eventName, message); }; channel.BasicConsume(queue: _queueName, noAck: true, consumer: consumer); channel.CallbackException += (sender, ea) => { _consumerChannel.Dispose(); _consumerChannel = CreateConsumerChannel(); }; return channel; } private async Task ProcessEvent(string eventName, string message) { if (_subsManager.HasSubscriptionsForEvent(eventName)) { var subscriptions = _subsManager.GetHandlersForEvent(eventName); foreach (var subscription in subscriptions) { if (subscription.IsDynamic) { var handler = subscription.Factory.DynamicInvoke() as IDynamicIntegrationEventHandler; dynamic eventData = JObject.Parse(message); await handler.Handle(eventData); } else { var eventType = _subsManager.GetEventTypeByName(eventName); var integrationEvent = JsonConvert.DeserializeObject(message, eventType); var handler = subscription.Factory.DynamicInvoke(); var concreteType = typeof(IIntegrationEventHandler<>).MakeGenericType(eventType); await (Task)concreteType.GetMethod("Handle").Invoke(handler, new object[] { integrationEvent }); } } } } } }