|
|
@ -1,4 +1,5 @@ |
|
|
|
using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Abstractions; |
|
|
|
using Microsoft.eShopOnContainers.BuildingBlocks.EventBus; |
|
|
|
using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Abstractions; |
|
|
|
using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Events; |
|
|
|
using Microsoft.Extensions.Logging; |
|
|
|
using Newtonsoft.Json; |
|
|
@ -23,22 +24,41 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ |
|
|
|
|
|
|
|
private readonly IRabbitMQPersistentConnection _persistentConnection; |
|
|
|
private readonly ILogger<EventBusRabbitMQ> _logger; |
|
|
|
|
|
|
|
private readonly Dictionary<string, List<IIntegrationEventHandler>> _handlers |
|
|
|
= new Dictionary<string, List<IIntegrationEventHandler>>(); |
|
|
|
|
|
|
|
private readonly List<Type> _eventTypes |
|
|
|
= new List<Type>(); |
|
|
|
private readonly IEventBusSubscriptionsManager _subsManager; |
|
|
|
|
|
|
|
|
|
|
|
private IModel _consumerChannel; |
|
|
|
private string _queueName; |
|
|
|
|
|
|
|
public EventBusRabbitMQ(IRabbitMQPersistentConnection persistentConnection, ILogger<EventBusRabbitMQ> logger) |
|
|
|
public EventBusRabbitMQ(IRabbitMQPersistentConnection persistentConnection, ILogger<EventBusRabbitMQ> logger, IEventBusSubscriptionsManager subsManager) |
|
|
|
{ |
|
|
|
_persistentConnection = persistentConnection ?? throw new ArgumentNullException(nameof(persistentConnection)); |
|
|
|
_logger = logger ?? throw new ArgumentNullException(nameof(logger)); |
|
|
|
|
|
|
|
_subsManager = subsManager ?? new InMemoryEventBusSubscriptionsManager(); |
|
|
|
_consumerChannel = CreateConsumerChannel(); |
|
|
|
|
|
|
|
_subsManager.OnEventRemoved += SubsManager_OnEventRemoved; |
|
|
|
} |
|
|
|
|
|
|
|
private void SubsManager_OnEventRemoved(object sender, string eventName) |
|
|
|
{ |
|
|
|
if (!_persistentConnection.IsConnected) |
|
|
|
{ |
|
|
|
_persistentConnection.TryConnect(); |
|
|
|
} |
|
|
|
|
|
|
|
using (var channel = _persistentConnection.CreateModel()) |
|
|
|
{ |
|
|
|
channel.QueueUnbind(queue: _queueName, |
|
|
|
exchange: BROKER_NAME, |
|
|
|
routingKey: eventName); |
|
|
|
|
|
|
|
if (_subsManager.IsEmpty) |
|
|
|
{ |
|
|
|
_queueName = string.Empty; |
|
|
|
_consumerChannel.Close(); |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
public void Publish(IntegrationEvent @event) |
|
|
@ -76,15 +96,13 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
public void Subscribe<T>(IIntegrationEventHandler<T> handler) where T : IntegrationEvent |
|
|
|
public void Subscribe<T, TH>(Func<TH> handler) |
|
|
|
where T : IntegrationEvent |
|
|
|
where TH : IIntegrationEventHandler<T> |
|
|
|
{ |
|
|
|
var eventName = typeof(T).Name; |
|
|
|
|
|
|
|
if (_handlers.ContainsKey(eventName)) |
|
|
|
{ |
|
|
|
_handlers[eventName].Add(handler); |
|
|
|
} |
|
|
|
else |
|
|
|
var containsKey = _subsManager.HasSubscriptionsForEvent<T>(); |
|
|
|
if (!containsKey) |
|
|
|
{ |
|
|
|
if (!_persistentConnection.IsConnected) |
|
|
|
{ |
|
|
@ -96,55 +114,31 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ |
|
|
|
channel.QueueBind(queue: _queueName, |
|
|
|
exchange: BROKER_NAME, |
|
|
|
routingKey: eventName); |
|
|
|
|
|
|
|
_handlers.Add(eventName, new List<IIntegrationEventHandler>()); |
|
|
|
_handlers[eventName].Add(handler); |
|
|
|
_eventTypes.Add(typeof(T)); |
|
|
|
} |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
_subsManager.AddSubscription<T, TH>(handler); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
public void Unsubscribe<T>(IIntegrationEventHandler<T> handler) where T : IntegrationEvent |
|
|
|
public void Unsubscribe<T, TH>() |
|
|
|
where TH : IIntegrationEventHandler<T> |
|
|
|
where T : IntegrationEvent |
|
|
|
{ |
|
|
|
var eventName = typeof(T).Name; |
|
|
|
_subsManager.RemoveSubscription<T, TH>(); |
|
|
|
} |
|
|
|
|
|
|
|
if (_handlers.ContainsKey(eventName) && _handlers[eventName].Contains(handler)) |
|
|
|
private static Func<IIntegrationEventHandler> FindHandlerByType(Type handlerType, IEnumerable<Func<IIntegrationEventHandler>> handlers) |
|
|
|
{ |
|
|
|
foreach (var func in handlers) |
|
|
|
{ |
|
|
|
_handlers[eventName].Remove(handler); |
|
|
|
|
|
|
|
if (_handlers[eventName].Count == 0) |
|
|
|
if (func.GetMethodInfo().ReturnType == handlerType) |
|
|
|
{ |
|
|
|
_handlers.Remove(eventName); |
|
|
|
|
|
|
|
var eventType = _eventTypes.SingleOrDefault(e => e.Name == eventName); |
|
|
|
|
|
|
|
if (eventType != null) |
|
|
|
{ |
|
|
|
_eventTypes.Remove(eventType); |
|
|
|
|
|
|
|
if (!_persistentConnection.IsConnected) |
|
|
|
{ |
|
|
|
_persistentConnection.TryConnect(); |
|
|
|
} |
|
|
|
|
|
|
|
using (var channel = _persistentConnection.CreateModel()) |
|
|
|
{ |
|
|
|
channel.QueueUnbind(queue: _queueName, |
|
|
|
exchange: BROKER_NAME, |
|
|
|
routingKey: eventName); |
|
|
|
|
|
|
|
if (_handlers.Keys.Count == 0) |
|
|
|
{ |
|
|
|
_queueName = string.Empty; |
|
|
|
|
|
|
|
_consumerChannel.Close(); |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
return func; |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
return null; |
|
|
|
} |
|
|
|
|
|
|
|
public void Dispose() |
|
|
@ -153,8 +147,8 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ |
|
|
|
{ |
|
|
|
_consumerChannel.Dispose(); |
|
|
|
} |
|
|
|
|
|
|
|
_handlers.Clear(); |
|
|
|
|
|
|
|
_subsManager.Clear(); |
|
|
|
} |
|
|
|
|
|
|
|
private IModel CreateConsumerChannel() |
|
|
@ -195,15 +189,17 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ |
|
|
|
|
|
|
|
private async Task ProcessEvent(string eventName, string message) |
|
|
|
{ |
|
|
|
if (_handlers.ContainsKey(eventName)) |
|
|
|
{ |
|
|
|
Type eventType = _eventTypes.Single(t => t.Name == eventName); |
|
|
|
|
|
|
|
if (_subsManager.HasSubscriptionsForEvent(eventName)) |
|
|
|
{ |
|
|
|
var eventType = _subsManager.GetEventTypeByName(eventName); |
|
|
|
var integrationEvent = JsonConvert.DeserializeObject(message, eventType); |
|
|
|
var concreteType = typeof(IIntegrationEventHandler<>).MakeGenericType(eventType); |
|
|
|
var handlers = _handlers[eventName]; |
|
|
|
var handlers = _subsManager.GetHandlersForEvent(eventName); |
|
|
|
|
|
|
|
foreach (var handler in handlers) |
|
|
|
foreach (var handlerfactory in handlers) |
|
|
|
{ |
|
|
|
var handler = handlerfactory.DynamicInvoke(); |
|
|
|
var concreteType = typeof(IIntegrationEventHandler<>).MakeGenericType(eventType); |
|
|
|
await (Task)concreteType.GetMethod("Handle").Invoke(handler, new object[] { integrationEvent }); |
|
|
|
} |
|
|
|
} |
|
|
|