using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Abstractions; using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Events; using Newtonsoft.Json; using RabbitMQ.Client; using RabbitMQ.Client.Events; using System; using System.Collections.Generic; using System.Linq; using System.Reflection; using System.Text; using System.Threading.Tasks; namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ { public class EventBusRabbitMQ : IEventBus, IDisposable { private readonly string _brokerName = "eshop_event_bus"; private readonly string _connectionString; private readonly Dictionary> _handlers; private readonly List _eventTypes; private IModel _model; private IConnection _connection; private string _queueName; public EventBusRabbitMQ(string connectionString) { _connectionString = connectionString; _handlers = new Dictionary>(); _eventTypes = new List(); } public void Publish(IntegrationEvent @event) { var eventName = @event.GetType() .Name; var factory = new ConnectionFactory() { HostName = _connectionString }; using (var connection = factory.CreateConnection()) using (var channel = connection.CreateModel()) { channel.ExchangeDeclare(exchange: _brokerName, type: "direct"); string message = JsonConvert.SerializeObject(@event); var body = Encoding.UTF8.GetBytes(message); channel.BasicPublish(exchange: _brokerName, routingKey: eventName, basicProperties: null, body: body); } } public void Subscribe(IIntegrationEventHandler handler) where T : IntegrationEvent { var eventName = typeof(T).Name; if (_handlers.ContainsKey(eventName)) { _handlers[eventName].Add(handler); } else { var channel = GetChannel(); channel.QueueBind(queue: _queueName, exchange: _brokerName, routingKey: eventName); _handlers.Add(eventName, new List()); _handlers[eventName].Add(handler); _eventTypes.Add(typeof(T)); } } public void Unsubscribe(IIntegrationEventHandler handler) where T : IntegrationEvent { var eventName = typeof(T).Name; if (_handlers.ContainsKey(eventName) && _handlers[eventName].Contains(handler)) { _handlers[eventName].Remove(handler); if (_handlers[eventName].Count == 0) { _handlers.Remove(eventName); var eventType = _eventTypes.Single(e => e.Name == eventName); _eventTypes.Remove(eventType); _model.QueueUnbind(queue: _queueName, exchange: _brokerName, routingKey: eventName); if (_handlers.Keys.Count == 0) { _queueName = string.Empty; _model.Dispose(); _connection.Dispose(); } } } } public void Dispose() { _handlers.Clear(); _model?.Dispose(); _connection?.Dispose(); } private IModel GetChannel() { if (_model != null) { return _model; } else { (_model, _connection) = CreateConnection(); return _model; } } private (IModel model, IConnection connection) CreateConnection() { var factory = new ConnectionFactory() { HostName = _connectionString }; var con = factory.CreateConnection(); var channel = con.CreateModel(); channel.ExchangeDeclare(exchange: _brokerName, type: "direct"); if (string.IsNullOrEmpty(_queueName)) { _queueName = channel.QueueDeclare().QueueName; } var consumer = new EventingBasicConsumer(channel); consumer.Received += async (model, ea) => { var eventName = ea.RoutingKey; var message = Encoding.UTF8.GetString(ea.Body); await ProcessEvent(eventName, message); }; channel.BasicConsume(queue: _queueName, noAck: true, consumer: consumer); return (channel, con); } private async Task ProcessEvent(string eventName, string message) { if (_handlers.ContainsKey(eventName)) { Type eventType = _eventTypes.Single(t => t.Name == eventName); var integrationEvent = JsonConvert.DeserializeObject(message, eventType); var concreteType = typeof(IIntegrationEventHandler<>).MakeGenericType(eventType); var handlers = _handlers[eventName]; foreach (var handler in handlers) { await (Task)concreteType.GetMethod("Handle").Invoke(handler, new object[] { integrationEvent }); } } } } }