Eduard Tomas a09d7fd3a5 EventBus refactor.
Instead to register EventHandlers we register Func<EventHandlers> which solves scope problems (having transient/scoped objects owned by singletons)
2017-05-03 10:59:36 +02:00

209 lines
7.0 KiB
C#

using Microsoft.eShopOnContainers.BuildingBlocks.EventBus;
using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Abstractions;
using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Events;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using Polly;
using Polly.Retry;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using RabbitMQ.Client.Exceptions;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net.Sockets;
using System.Reflection;
using System.Text;
using System.Threading.Tasks;
namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ
{
public class EventBusRabbitMQ : IEventBus, IDisposable
{
const string BROKER_NAME = "eshop_event_bus";
private readonly IRabbitMQPersistentConnection _persistentConnection;
private readonly ILogger<EventBusRabbitMQ> _logger;
private readonly IEventBusSubscriptionsManager _subsManager;
private IModel _consumerChannel;
private string _queueName;
public EventBusRabbitMQ(IRabbitMQPersistentConnection persistentConnection, ILogger<EventBusRabbitMQ> logger, IEventBusSubscriptionsManager subsManager)
{
_persistentConnection = persistentConnection ?? throw new ArgumentNullException(nameof(persistentConnection));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_subsManager = subsManager ?? new InMemoryEventBusSubscriptionsManager();
_consumerChannel = CreateConsumerChannel();
_subsManager.OnEventRemoved += SubsManager_OnEventRemoved;
}
private void SubsManager_OnEventRemoved(object sender, string eventName)
{
if (!_persistentConnection.IsConnected)
{
_persistentConnection.TryConnect();
}
using (var channel = _persistentConnection.CreateModel())
{
channel.QueueUnbind(queue: _queueName,
exchange: BROKER_NAME,
routingKey: eventName);
if (_subsManager.IsEmpty)
{
_queueName = string.Empty;
_consumerChannel.Close();
}
}
}
public void Publish(IntegrationEvent @event)
{
if (!_persistentConnection.IsConnected)
{
_persistentConnection.TryConnect();
}
var policy = RetryPolicy.Handle<BrokerUnreachableException>()
.Or<SocketException>()
.WaitAndRetry(5, retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)), (ex, time) =>
{
_logger.LogWarning(ex.ToString());
});
using (var channel = _persistentConnection.CreateModel())
{
var eventName = @event.GetType()
.Name;
channel.ExchangeDeclare(exchange: BROKER_NAME,
type: "direct");
var message = JsonConvert.SerializeObject(@event);
var body = Encoding.UTF8.GetBytes(message);
policy.Execute(() =>
{
channel.BasicPublish(exchange: BROKER_NAME,
routingKey: eventName,
basicProperties: null,
body: body);
});
}
}
public void Subscribe<T, TH>(Func<TH> handler)
where T : IntegrationEvent
where TH : IIntegrationEventHandler<T>
{
var eventName = typeof(T).Name;
var containsKey = _subsManager.HasSubscriptionsForEvent<T>();
if (!containsKey)
{
if (!_persistentConnection.IsConnected)
{
_persistentConnection.TryConnect();
}
using (var channel = _persistentConnection.CreateModel())
{
channel.QueueBind(queue: _queueName,
exchange: BROKER_NAME,
routingKey: eventName);
}
}
_subsManager.AddSubscription<T, TH>(handler);
}
public void Unsubscribe<T, TH>()
where TH : IIntegrationEventHandler<T>
where T : IntegrationEvent
{
_subsManager.RemoveSubscription<T, TH>();
}
private static Func<IIntegrationEventHandler> FindHandlerByType(Type handlerType, IEnumerable<Func<IIntegrationEventHandler>> handlers)
{
foreach (var func in handlers)
{
if (func.GetMethodInfo().ReturnType == handlerType)
{
return func;
}
}
return null;
}
public void Dispose()
{
if (_consumerChannel != null)
{
_consumerChannel.Dispose();
}
_subsManager.Clear();
}
private IModel CreateConsumerChannel()
{
if (!_persistentConnection.IsConnected)
{
_persistentConnection.TryConnect();
}
var channel = _persistentConnection.CreateModel();
channel.ExchangeDeclare(exchange: BROKER_NAME,
type: "direct");
_queueName = channel.QueueDeclare().QueueName;
var consumer = new EventingBasicConsumer(channel);
consumer.Received += async (model, ea) =>
{
var eventName = ea.RoutingKey;
var message = Encoding.UTF8.GetString(ea.Body);
await ProcessEvent(eventName, message);
};
channel.BasicConsume(queue: _queueName,
noAck: true,
consumer: consumer);
channel.CallbackException += (sender, ea) =>
{
_consumerChannel.Dispose();
_consumerChannel = CreateConsumerChannel();
};
return channel;
}
private async Task ProcessEvent(string eventName, string message)
{
if (_subsManager.HasSubscriptionsForEvent(eventName))
{
var eventType = _subsManager.GetEventTypeByName(eventName);
var integrationEvent = JsonConvert.DeserializeObject(message, eventType);
var handlers = _subsManager.GetHandlersForEvent(eventName);
foreach (var handlerfactory in handlers)
{
var handler = handlerfactory.DynamicInvoke();
var concreteType = typeof(IIntegrationEventHandler<>).MakeGenericType(eventType);
await (Task)concreteType.GetMethod("Handle").Invoke(handler, new object[] { integrationEvent });
}
}
}
}
}