Code re-factorings and formatting in EventBus.RabbitMQ project

This commit is contained in:
Rafsanul Hasan 2018-09-09 05:17:16 +06:00
parent 086302f5f1
commit db13fa6091
No known key found for this signature in database
GPG Key ID: FC57FD2D87BE60DD
3 changed files with 290 additions and 285 deletions

View File

@ -10,122 +10,127 @@ using System.Net.Sockets;
namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ
{ {
public class DefaultRabbitMQPersistentConnection public class DefaultRabbitMQPersistentConnection
: IRabbitMQPersistentConnection : IRabbitMQPersistentConnection
{ {
private readonly IConnectionFactory _connectionFactory; private readonly IConnectionFactory _connectionFactory;
private readonly ILogger<DefaultRabbitMQPersistentConnection> _logger; private readonly ILogger<DefaultRabbitMQPersistentConnection> _logger;
private readonly int _retryCount; private readonly int _retryCount;
IConnection _connection; IConnection _connection;
bool _disposed; bool _disposed;
readonly object sync_root = new object();
object sync_root = new object(); public DefaultRabbitMQPersistentConnection(IConnectionFactory connectionFactory, ILogger<DefaultRabbitMQPersistentConnection> logger, int retryCount = 5)
{
_connectionFactory = connectionFactory ?? throw new ArgumentNullException(nameof(connectionFactory));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_retryCount = retryCount;
}
public DefaultRabbitMQPersistentConnection(IConnectionFactory connectionFactory, ILogger<DefaultRabbitMQPersistentConnection> logger, int retryCount = 5) public bool IsConnected => _connection != null && _connection.IsOpen && !_disposed;
{
_connectionFactory = connectionFactory ?? throw new ArgumentNullException(nameof(connectionFactory));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_retryCount = retryCount;
}
public bool IsConnected public IModel CreateModel()
{ {
get if (!IsConnected)
{ {
return _connection != null && _connection.IsOpen && !_disposed; throw new InvalidOperationException("No RabbitMQ connections are available to perform this action");
} }
}
public IModel CreateModel() return _connection.CreateModel();
{ }
if (!IsConnected)
{
throw new InvalidOperationException("No RabbitMQ connections are available to perform this action");
}
return _connection.CreateModel(); public void Dispose()
} {
if (_disposed)
{
return;
}
public void Dispose() _disposed = true;
{
if (_disposed) return;
_disposed = true; try
{
_connection.Dispose();
}
catch (IOException ex)
{
_logger.LogCritical(ex.ToString());
}
}
try public bool TryConnect()
{ {
_connection.Dispose(); _logger.LogInformation("RabbitMQ Client is trying to connect");
}
catch (IOException ex)
{
_logger.LogCritical(ex.ToString());
}
}
public bool TryConnect() lock (sync_root)
{ {
_logger.LogInformation("RabbitMQ Client is trying to connect"); var policy = RetryPolicy.Handle<SocketException>()
.Or<BrokerUnreachableException>()
.WaitAndRetry(_retryCount, retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)), (ex, time) =>
{
_logger.LogWarning(ex.ToString());
}
);
lock (sync_root) policy.Execute(() =>
{ {
var policy = RetryPolicy.Handle<SocketException>() _connection = _connectionFactory
.Or<BrokerUnreachableException>() .CreateConnection();
.WaitAndRetry(_retryCount, retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)), (ex, time) => });
{
_logger.LogWarning(ex.ToString());
}
);
policy.Execute(() => if (IsConnected)
{ {
_connection = _connectionFactory _connection.ConnectionShutdown += OnConnectionShutdown;
.CreateConnection(); _connection.CallbackException += OnCallbackException;
}); _connection.ConnectionBlocked += OnConnectionBlocked;
if (IsConnected) _logger.LogInformation($"RabbitMQ persistent connection acquired a connection {_connection.Endpoint.HostName} and is subscribed to failure events");
{
_connection.ConnectionShutdown += OnConnectionShutdown;
_connection.CallbackException += OnCallbackException;
_connection.ConnectionBlocked += OnConnectionBlocked;
_logger.LogInformation($"RabbitMQ persistent connection acquired a connection {_connection.Endpoint.HostName} and is subscribed to failure events"); return true;
}
else
{
_logger.LogCritical("FATAL ERROR: RabbitMQ connections could not be created and opened");
return true; return false;
} }
else }
{ }
_logger.LogCritical("FATAL ERROR: RabbitMQ connections could not be created and opened");
return false; private void OnConnectionBlocked(object sender, ConnectionBlockedEventArgs e)
} {
} if (_disposed)
} {
return;
}
private void OnConnectionBlocked(object sender, ConnectionBlockedEventArgs e) _logger.LogWarning("A RabbitMQ connection is shutdown. Trying to re-connect...");
{
if (_disposed) return;
_logger.LogWarning("A RabbitMQ connection is shutdown. Trying to re-connect..."); TryConnect();
}
TryConnect(); void OnCallbackException(object sender, CallbackExceptionEventArgs e)
} {
if (_disposed)
{
return;
}
void OnCallbackException(object sender, CallbackExceptionEventArgs e) _logger.LogWarning("A RabbitMQ connection throw exception. Trying to re-connect...");
{
if (_disposed) return;
_logger.LogWarning("A RabbitMQ connection throw exception. Trying to re-connect..."); TryConnect();
}
TryConnect(); void OnConnectionShutdown(object sender, ShutdownEventArgs reason)
} {
if (_disposed)
{
return;
}
void OnConnectionShutdown(object sender, ShutdownEventArgs reason) _logger.LogWarning("A RabbitMQ connection is on shutdown. Trying to re-connect...");
{
if (_disposed) return;
_logger.LogWarning("A RabbitMQ connection is on shutdown. Trying to re-connect..."); TryConnect();
}
TryConnect(); }
}
}
} }

View File

@ -17,220 +17,220 @@ using System.Threading.Tasks;
namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ
{ {
public class EventBusRabbitMQ : IEventBus, IDisposable public class EventBusRabbitMQ : IEventBus, IDisposable
{ {
const string BROKER_NAME = "eshop_event_bus"; const string BROKER_NAME = "eshop_event_bus";
private readonly IRabbitMQPersistentConnection _persistentConnection; private readonly IRabbitMQPersistentConnection _persistentConnection;
private readonly ILogger<EventBusRabbitMQ> _logger; private readonly ILogger<EventBusRabbitMQ> _logger;
private readonly IEventBusSubscriptionsManager _subsManager; private readonly IEventBusSubscriptionsManager _subsManager;
private readonly ILifetimeScope _autofac; private readonly ILifetimeScope _autofac;
private readonly string AUTOFAC_SCOPE_NAME = "eshop_event_bus"; private readonly string AUTOFAC_SCOPE_NAME = "eshop_event_bus";
private readonly int _retryCount; private readonly int _retryCount;
private IModel _consumerChannel; private IModel _consumerChannel;
private string _queueName; private string _queueName;
public EventBusRabbitMQ(IRabbitMQPersistentConnection persistentConnection, ILogger<EventBusRabbitMQ> logger, public EventBusRabbitMQ(IRabbitMQPersistentConnection persistentConnection, ILogger<EventBusRabbitMQ> logger,
ILifetimeScope autofac, IEventBusSubscriptionsManager subsManager, string queueName = null, int retryCount = 5) ILifetimeScope autofac, IEventBusSubscriptionsManager subsManager, string queueName = null, int retryCount = 5)
{ {
_persistentConnection = persistentConnection ?? throw new ArgumentNullException(nameof(persistentConnection)); _persistentConnection = persistentConnection ?? throw new ArgumentNullException(nameof(persistentConnection));
_logger = logger ?? throw new ArgumentNullException(nameof(logger)); _logger = logger ?? throw new ArgumentNullException(nameof(logger));
_subsManager = subsManager ?? new InMemoryEventBusSubscriptionsManager(); _subsManager = subsManager ?? new InMemoryEventBusSubscriptionsManager();
_queueName = queueName; _queueName = queueName;
_consumerChannel = CreateConsumerChannel(); _consumerChannel = CreateConsumerChannel();
_autofac = autofac; _autofac = autofac;
_retryCount = retryCount; _retryCount = retryCount;
_subsManager.OnEventRemoved += SubsManager_OnEventRemoved; _subsManager.OnEventRemoved += SubsManager_OnEventRemoved;
} }
private void SubsManager_OnEventRemoved(object sender, string eventName) private void SubsManager_OnEventRemoved(object sender, string eventName)
{ {
if (!_persistentConnection.IsConnected) if (!_persistentConnection.IsConnected)
{ {
_persistentConnection.TryConnect(); _persistentConnection.TryConnect();
} }
using (var channel = _persistentConnection.CreateModel()) using (var channel = _persistentConnection.CreateModel())
{ {
channel.QueueUnbind(queue: _queueName, channel.QueueUnbind(queue: _queueName,
exchange: BROKER_NAME, exchange: BROKER_NAME,
routingKey: eventName); routingKey: eventName);
if (_subsManager.IsEmpty) if (_subsManager.IsEmpty)
{ {
_queueName = string.Empty; _queueName = string.Empty;
_consumerChannel.Close(); _consumerChannel.Close();
} }
} }
} }
public void Publish(IntegrationEvent @event) public void Publish(IntegrationEvent @event)
{ {
if (!_persistentConnection.IsConnected) if (!_persistentConnection.IsConnected)
{ {
_persistentConnection.TryConnect(); _persistentConnection.TryConnect();
} }
var policy = RetryPolicy.Handle<BrokerUnreachableException>() var policy = Policy.Handle<BrokerUnreachableException>()
.Or<SocketException>() .Or<SocketException>()
.WaitAndRetry(_retryCount, retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)), (ex, time) => .WaitAndRetry(_retryCount, retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)), (ex, time) =>
{ {
_logger.LogWarning(ex.ToString()); _logger.LogWarning(ex.ToString());
}); });
using (var channel = _persistentConnection.CreateModel()) using (var channel = _persistentConnection.CreateModel())
{ {
var eventName = @event.GetType() var eventName = @event.GetType()
.Name; .Name;
channel.ExchangeDeclare(exchange: BROKER_NAME, channel.ExchangeDeclare(exchange: BROKER_NAME,
type: "direct"); type: "direct");
var message = JsonConvert.SerializeObject(@event); var message = JsonConvert.SerializeObject(@event);
var body = Encoding.UTF8.GetBytes(message); var body = Encoding.UTF8.GetBytes(message);
policy.Execute(() => policy.Execute(() =>
{ {
var properties = channel.CreateBasicProperties(); var properties = channel.CreateBasicProperties();
properties.DeliveryMode = 2; // persistent properties.DeliveryMode = 2; // persistent
channel.BasicPublish(exchange: BROKER_NAME, channel.BasicPublish(exchange: BROKER_NAME,
routingKey: eventName, routingKey: eventName,
mandatory:true, mandatory: true,
basicProperties: properties, basicProperties: properties,
body: body); body: body);
}); });
} }
} }
public void SubscribeDynamic<TH>(string eventName) public void SubscribeDynamic<TH>(string eventName)
where TH : IDynamicIntegrationEventHandler where TH : IDynamicIntegrationEventHandler
{ {
DoInternalSubscription(eventName); DoInternalSubscription(eventName);
_subsManager.AddDynamicSubscription<TH>(eventName); _subsManager.AddDynamicSubscription<TH>(eventName);
} }
public void Subscribe<T, TH>() public void Subscribe<T, TH>()
where T : IntegrationEvent where T : IntegrationEvent
where TH : IIntegrationEventHandler<T> where TH : IIntegrationEventHandler<T>
{ {
var eventName = _subsManager.GetEventKey<T>(); var eventName = _subsManager.GetEventKey<T>();
DoInternalSubscription(eventName); DoInternalSubscription(eventName);
_subsManager.AddSubscription<T, TH>(); _subsManager.AddSubscription<T, TH>();
} }
private void DoInternalSubscription(string eventName) private void DoInternalSubscription(string eventName)
{ {
var containsKey = _subsManager.HasSubscriptionsForEvent(eventName); var containsKey = _subsManager.HasSubscriptionsForEvent(eventName);
if (!containsKey) if (!containsKey)
{ {
if (!_persistentConnection.IsConnected) if (!_persistentConnection.IsConnected)
{ {
_persistentConnection.TryConnect(); _persistentConnection.TryConnect();
} }
using (var channel = _persistentConnection.CreateModel()) using (var channel = _persistentConnection.CreateModel())
{ {
channel.QueueBind(queue: _queueName, channel.QueueBind(queue: _queueName,
exchange: BROKER_NAME, exchange: BROKER_NAME,
routingKey: eventName); routingKey: eventName);
} }
} }
} }
public void Unsubscribe<T, TH>() public void Unsubscribe<T, TH>()
where TH : IIntegrationEventHandler<T> where TH : IIntegrationEventHandler<T>
where T : IntegrationEvent where T : IntegrationEvent
{ {
_subsManager.RemoveSubscription<T, TH>(); _subsManager.RemoveSubscription<T, TH>();
} }
public void UnsubscribeDynamic<TH>(string eventName) public void UnsubscribeDynamic<TH>(string eventName)
where TH : IDynamicIntegrationEventHandler where TH : IDynamicIntegrationEventHandler
{ {
_subsManager.RemoveDynamicSubscription<TH>(eventName); _subsManager.RemoveDynamicSubscription<TH>(eventName);
} }
public void Dispose() public void Dispose()
{ {
if (_consumerChannel != null) if (_consumerChannel != null)
{ {
_consumerChannel.Dispose(); _consumerChannel.Dispose();
} }
_subsManager.Clear(); _subsManager.Clear();
} }
private IModel CreateConsumerChannel() private IModel CreateConsumerChannel()
{ {
if (!_persistentConnection.IsConnected) if (!_persistentConnection.IsConnected)
{ {
_persistentConnection.TryConnect(); _persistentConnection.TryConnect();
} }
var channel = _persistentConnection.CreateModel(); var channel = _persistentConnection.CreateModel();
channel.ExchangeDeclare(exchange: BROKER_NAME, channel.ExchangeDeclare(exchange: BROKER_NAME,
type: "direct"); type: "direct");
channel.QueueDeclare(queue: _queueName, channel.QueueDeclare(queue: _queueName,
durable: true, durable: true,
exclusive: false, exclusive: false,
autoDelete: false, autoDelete: false,
arguments: null); arguments: null);
var consumer = new EventingBasicConsumer(channel); var consumer = new EventingBasicConsumer(channel);
consumer.Received += async (model, ea) => consumer.Received += async (model, ea) =>
{ {
var eventName = ea.RoutingKey; var eventName = ea.RoutingKey;
var message = Encoding.UTF8.GetString(ea.Body); var message = Encoding.UTF8.GetString(ea.Body);
await ProcessEvent(eventName, message); await ProcessEvent(eventName, message);
channel.BasicAck(ea.DeliveryTag,multiple:false); channel.BasicAck(ea.DeliveryTag, multiple: false);
}; };
channel.BasicConsume(queue: _queueName, channel.BasicConsume(queue: _queueName,
autoAck: false, autoAck: false,
consumer: consumer); consumer: consumer);
channel.CallbackException += (sender, ea) => channel.CallbackException += (sender, ea) =>
{ {
_consumerChannel.Dispose(); _consumerChannel.Dispose();
_consumerChannel = CreateConsumerChannel(); _consumerChannel = CreateConsumerChannel();
}; };
return channel; return channel;
} }
private async Task ProcessEvent(string eventName, string message) private async Task ProcessEvent(string eventName, string message)
{ {
if (_subsManager.HasSubscriptionsForEvent(eventName)) if (_subsManager.HasSubscriptionsForEvent(eventName))
{ {
using (var scope = _autofac.BeginLifetimeScope(AUTOFAC_SCOPE_NAME)) using (var scope = _autofac.BeginLifetimeScope(AUTOFAC_SCOPE_NAME))
{ {
var subscriptions = _subsManager.GetHandlersForEvent(eventName); var subscriptions = _subsManager.GetHandlersForEvent(eventName);
foreach (var subscription in subscriptions) foreach (var subscription in subscriptions)
{ {
if (subscription.IsDynamic) if (subscription.IsDynamic)
{ {
var handler = scope.ResolveOptional(subscription.HandlerType) as IDynamicIntegrationEventHandler; var handler = scope.ResolveOptional(subscription.HandlerType) as IDynamicIntegrationEventHandler;
dynamic eventData = JObject.Parse(message); dynamic eventData = JObject.Parse(message);
await handler.Handle(eventData); await handler.Handle(eventData);
} }
else else
{ {
var eventType = _subsManager.GetEventTypeByName(eventName); var eventType = _subsManager.GetEventTypeByName(eventName);
var integrationEvent = JsonConvert.DeserializeObject(message, eventType); var integrationEvent = JsonConvert.DeserializeObject(message, eventType);
var handler = scope.ResolveOptional(subscription.HandlerType); var handler = scope.ResolveOptional(subscription.HandlerType);
var concreteType = typeof(IIntegrationEventHandler<>).MakeGenericType(eventType); var concreteType = typeof(IIntegrationEventHandler<>).MakeGenericType(eventType);
await (Task)concreteType.GetMethod("Handle").Invoke(handler, new object[] { integrationEvent }); await (Task)concreteType.GetMethod("Handle").Invoke(handler, new object[] { integrationEvent });
} }
} }
} }
} }
} }
} }
} }

View File

@ -3,13 +3,13 @@ using System;
namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ
{ {
public interface IRabbitMQPersistentConnection public interface IRabbitMQPersistentConnection
: IDisposable : IDisposable
{ {
bool IsConnected { get; } bool IsConnected { get; }
bool TryConnect(); bool TryConnect();
IModel CreateModel(); IModel CreateModel();
} }
} }