|
|
@ -1,13 +1,16 @@ |
|
|
|
|
|
|
|
using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Abstractions; |
|
|
|
using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Abstractions; |
|
|
|
using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Events; |
|
|
|
using Microsoft.Extensions.Logging; |
|
|
|
using Newtonsoft.Json; |
|
|
|
using Polly; |
|
|
|
using Polly.Retry; |
|
|
|
using RabbitMQ.Client; |
|
|
|
using RabbitMQ.Client.Events; |
|
|
|
using RabbitMQ.Client.Exceptions; |
|
|
|
using System; |
|
|
|
using System.Collections; |
|
|
|
using System.Collections.Generic; |
|
|
|
using System.Linq; |
|
|
|
using System.Net.Sockets; |
|
|
|
using System.Reflection; |
|
|
|
using System.Text; |
|
|
|
using System.Threading.Tasks; |
|
|
@ -16,68 +19,98 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ |
|
|
|
{ |
|
|
|
public class EventBusRabbitMQ : IEventBus, IDisposable |
|
|
|
{ |
|
|
|
private readonly string _brokerName = "eshop_event_bus"; |
|
|
|
private readonly string _connectionString; |
|
|
|
private readonly Dictionary<string, List<IIntegrationEventHandler>> _handlers; |
|
|
|
private readonly List<Type> _eventTypes; |
|
|
|
const string BROKER_NAME = "eshop_event_bus"; |
|
|
|
|
|
|
|
private IModel _model; |
|
|
|
private IConnection _connection; |
|
|
|
private readonly IRabbitMQPersisterConnection _persisterConnection; |
|
|
|
private readonly ILogger<EventBusRabbitMQ> _logger; |
|
|
|
|
|
|
|
private readonly Dictionary<string, List<IIntegrationEventHandler>> _handlers |
|
|
|
= new Dictionary<string, List<IIntegrationEventHandler>>(); |
|
|
|
|
|
|
|
private readonly List<Type> _eventTypes |
|
|
|
= new List<Type>(); |
|
|
|
|
|
|
|
private IModel _consumerChannel; |
|
|
|
private string _queueName; |
|
|
|
|
|
|
|
|
|
|
|
public EventBusRabbitMQ(string connectionString) |
|
|
|
public EventBusRabbitMQ(IRabbitMQPersisterConnection persisterConnection, ILogger<EventBusRabbitMQ> logger) |
|
|
|
{ |
|
|
|
_connectionString = connectionString; |
|
|
|
_handlers = new Dictionary<string, List<IIntegrationEventHandler>>(); |
|
|
|
_eventTypes = new List<Type>(); |
|
|
|
_persisterConnection = persisterConnection ?? throw new ArgumentNullException(nameof(persisterConnection)); |
|
|
|
_logger = logger ?? throw new ArgumentNullException(nameof(logger)); |
|
|
|
|
|
|
|
_consumerChannel = CreateConsumerChannel(); |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
public void Publish(IntegrationEvent @event) |
|
|
|
{ |
|
|
|
var eventName = @event.GetType().Name; |
|
|
|
var factory = new ConnectionFactory() { HostName = _connectionString }; |
|
|
|
using (var connection = factory.CreateConnection()) |
|
|
|
using (var channel = connection.CreateModel()) |
|
|
|
if (!_persisterConnection.IsConnected) |
|
|
|
{ |
|
|
|
channel.ExchangeDeclare(exchange: _brokerName, |
|
|
|
_persisterConnection.TryConnect(); |
|
|
|
} |
|
|
|
|
|
|
|
var policy = RetryPolicy.Handle<BrokerUnreachableException>() |
|
|
|
.Or<SocketException>() |
|
|
|
.WaitAndRetry(5, retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)), (ex, time) => |
|
|
|
{ |
|
|
|
_logger.LogWarning(ex.ToString()); |
|
|
|
}); |
|
|
|
|
|
|
|
using (var channel = _persisterConnection.CreateModel()) |
|
|
|
{ |
|
|
|
var eventName = @event.GetType() |
|
|
|
.Name; |
|
|
|
|
|
|
|
channel.ExchangeDeclare(exchange: BROKER_NAME, |
|
|
|
type: "direct"); |
|
|
|
|
|
|
|
string message = JsonConvert.SerializeObject(@event); |
|
|
|
var message = JsonConvert.SerializeObject(@event); |
|
|
|
var body = Encoding.UTF8.GetBytes(message); |
|
|
|
|
|
|
|
channel.BasicPublish(exchange: _brokerName, |
|
|
|
policy.Execute(() => |
|
|
|
{ |
|
|
|
channel.BasicPublish(exchange: BROKER_NAME, |
|
|
|
routingKey: eventName, |
|
|
|
basicProperties: null, |
|
|
|
body: body); |
|
|
|
body: body); |
|
|
|
}); |
|
|
|
} |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
public void Subscribe<T>(IIntegrationEventHandler<T> handler) where T : IntegrationEvent |
|
|
|
{ |
|
|
|
var eventName = typeof(T).Name; |
|
|
|
if (_handlers.ContainsKey(eventName)) |
|
|
|
|
|
|
|
if (_handlers.ContainsKey(eventName)) |
|
|
|
{ |
|
|
|
_handlers[eventName].Add(handler); |
|
|
|
} |
|
|
|
else |
|
|
|
{ |
|
|
|
var channel = GetChannel(); |
|
|
|
channel.QueueBind(queue: _queueName, |
|
|
|
exchange: _brokerName, |
|
|
|
routingKey: eventName); |
|
|
|
|
|
|
|
_handlers.Add(eventName, new List<IIntegrationEventHandler>()); |
|
|
|
_handlers[eventName].Add(handler); |
|
|
|
_eventTypes.Add(typeof(T)); |
|
|
|
if (!_persisterConnection.IsConnected) |
|
|
|
{ |
|
|
|
_persisterConnection.TryConnect(); |
|
|
|
} |
|
|
|
|
|
|
|
using (var channel = _persisterConnection.CreateModel()) |
|
|
|
{ |
|
|
|
channel.QueueBind(queue: _queueName, |
|
|
|
exchange: BROKER_NAME, |
|
|
|
routingKey: eventName); |
|
|
|
|
|
|
|
_handlers.Add(eventName, new List<IIntegrationEventHandler>()); |
|
|
|
_handlers[eventName].Add(handler); |
|
|
|
_eventTypes.Add(typeof(T)); |
|
|
|
} |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
public void Unsubscribe<T>(IIntegrationEventHandler<T> handler) where T : IntegrationEvent |
|
|
|
{ |
|
|
|
var eventName = typeof(T).Name; |
|
|
|
|
|
|
|
if (_handlers.ContainsKey(eventName) && _handlers[eventName].Contains(handler)) |
|
|
|
{ |
|
|
|
_handlers[eventName].Remove(handler); |
|
|
@ -85,56 +118,59 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ |
|
|
|
if (_handlers[eventName].Count == 0) |
|
|
|
{ |
|
|
|
_handlers.Remove(eventName); |
|
|
|
var eventType = _eventTypes.Single(e => e.Name == eventName); |
|
|
|
_eventTypes.Remove(eventType); |
|
|
|
_model.QueueUnbind(queue: _queueName, |
|
|
|
exchange: _brokerName, |
|
|
|
routingKey: eventName); |
|
|
|
|
|
|
|
if (_handlers.Keys.Count == 0) |
|
|
|
var eventType = _eventTypes.SingleOrDefault(e => e.Name == eventName); |
|
|
|
|
|
|
|
if (eventType != null) |
|
|
|
{ |
|
|
|
_queueName = string.Empty; |
|
|
|
_model.Dispose(); |
|
|
|
_connection.Dispose(); |
|
|
|
_eventTypes.Remove(eventType); |
|
|
|
|
|
|
|
if (!_persisterConnection.IsConnected) |
|
|
|
{ |
|
|
|
_persisterConnection.TryConnect(); |
|
|
|
} |
|
|
|
|
|
|
|
using (var channel = _persisterConnection.CreateModel()) |
|
|
|
{ |
|
|
|
channel.QueueUnbind(queue: _queueName, |
|
|
|
exchange: BROKER_NAME, |
|
|
|
routingKey: eventName); |
|
|
|
|
|
|
|
if (_handlers.Keys.Count == 0) |
|
|
|
{ |
|
|
|
_queueName = string.Empty; |
|
|
|
|
|
|
|
_consumerChannel.Close(); |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
public void Dispose() |
|
|
|
{ |
|
|
|
if (_consumerChannel != null) |
|
|
|
{ |
|
|
|
_consumerChannel.Dispose(); |
|
|
|
} |
|
|
|
|
|
|
|
_handlers.Clear(); |
|
|
|
_model?.Dispose(); |
|
|
|
_connection?.Dispose(); |
|
|
|
} |
|
|
|
|
|
|
|
private IModel GetChannel() |
|
|
|
private IModel CreateConsumerChannel() |
|
|
|
{ |
|
|
|
if (_model != null) |
|
|
|
if (!_persisterConnection.IsConnected) |
|
|
|
{ |
|
|
|
return _model; |
|
|
|
_persisterConnection.TryConnect(); |
|
|
|
} |
|
|
|
else |
|
|
|
{ |
|
|
|
(_model, _connection) = CreateConnection(); |
|
|
|
return _model; |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
var channel = _persisterConnection.CreateModel(); |
|
|
|
|
|
|
|
private (IModel model, IConnection connection) CreateConnection() |
|
|
|
{ |
|
|
|
var factory = new ConnectionFactory() { HostName = _connectionString }; |
|
|
|
var con = factory.CreateConnection(); |
|
|
|
var channel = con.CreateModel(); |
|
|
|
channel.ExchangeDeclare(exchange: BROKER_NAME, |
|
|
|
type: "direct"); |
|
|
|
|
|
|
|
channel.ExchangeDeclare(exchange: _brokerName, |
|
|
|
type: "direct"); |
|
|
|
if (string.IsNullOrEmpty(_queueName)) |
|
|
|
{ |
|
|
|
_queueName = channel.QueueDeclare().QueueName; |
|
|
|
} |
|
|
|
_queueName = channel.QueueDeclare().QueueName; |
|
|
|
|
|
|
|
var consumer = new EventingBasicConsumer(channel); |
|
|
|
consumer.Received += async (model, ea) => |
|
|
@ -144,11 +180,18 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ |
|
|
|
|
|
|
|
await ProcessEvent(eventName, message); |
|
|
|
}; |
|
|
|
|
|
|
|
channel.BasicConsume(queue: _queueName, |
|
|
|
noAck: true, |
|
|
|
consumer: consumer); |
|
|
|
|
|
|
|
return (channel, con); |
|
|
|
channel.CallbackException += (sender, ea) => |
|
|
|
{ |
|
|
|
_consumerChannel.Dispose(); |
|
|
|
_consumerChannel = CreateConsumerChannel(); |
|
|
|
}; |
|
|
|
|
|
|
|
return channel; |
|
|
|
} |
|
|
|
|
|
|
|
private async Task ProcessEvent(string eventName, string message) |
|
|
@ -156,7 +199,7 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ |
|
|
|
if (_handlers.ContainsKey(eventName)) |
|
|
|
{ |
|
|
|
Type eventType = _eventTypes.Single(t => t.Name == eventName); |
|
|
|
var integrationEvent = JsonConvert.DeserializeObject(message, eventType); |
|
|
|
var integrationEvent = JsonConvert.DeserializeObject(message, eventType); |
|
|
|
var concreteType = typeof(IIntegrationEventHandler<>).MakeGenericType(eventType); |
|
|
|
var handlers = _handlers[eventName]; |
|
|
|
|
|
|
@ -166,6 +209,5 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
} |
|
|
|
} |