eShopOnContainers/src/BuildingBlocks/EventBus/EventBusRabbitMQ/DefaultRabbitMQPersistentConnection.cs

132 lines
4.1 KiB
C#
Raw Normal View History

using Microsoft.Extensions.Logging;
using Polly;
using Polly.Retry;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using RabbitMQ.Client.Exceptions;
using System;
using System.IO;
using System.Net.Sockets;
namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ
{
2017-04-29 21:58:11 -07:00
public class DefaultRabbitMQPersistentConnection
: IRabbitMQPersistentConnection
{
private readonly IConnectionFactory _connectionFactory;
2017-04-29 21:58:11 -07:00
private readonly ILogger<DefaultRabbitMQPersistentConnection> _logger;
2017-10-12 09:25:01 +01:00
private readonly int _retryCount;
IConnection _connection;
bool _disposed;
object sync_root = new object();
2017-10-12 09:25:01 +01:00
public DefaultRabbitMQPersistentConnection(IConnectionFactory connectionFactory, ILogger<DefaultRabbitMQPersistentConnection> logger, int retryCount = 5)
{
_connectionFactory = connectionFactory ?? throw new ArgumentNullException(nameof(connectionFactory));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
2017-10-12 09:25:01 +01:00
_retryCount = retryCount;
}
public bool IsConnected
{
get
{
return _connection != null && _connection.IsOpen && !_disposed;
}
}
public IModel CreateModel()
{
if (!IsConnected)
{
throw new InvalidOperationException("No RabbitMQ connections are available to perform this action");
}
return _connection.CreateModel();
}
public void Dispose()
{
if (_disposed) return;
_disposed = true;
try
{
_connection.Dispose();
}
catch (IOException ex)
{
_logger.LogCritical(ex.ToString());
}
}
public bool TryConnect()
{
_logger.LogInformation("RabbitMQ Client is trying to connect");
lock (sync_root)
{
var policy = RetryPolicy.Handle<SocketException>()
.Or<BrokerUnreachableException>()
2017-10-12 09:25:01 +01:00
.WaitAndRetry(_retryCount, retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)), (ex, time) =>
{
2019-01-31 13:32:27 +00:00
_logger.LogWarning(ex, "RabbitMQ Client could not connect after {TimeOut}s ({ExceptionMessage})", $"{time.TotalSeconds:n1}", ex.Message);
}
);
policy.Execute(() =>
{
_connection = _connectionFactory
.CreateConnection();
});
if (IsConnected)
{
_connection.ConnectionShutdown += OnConnectionShutdown;
_connection.CallbackException += OnCallbackException;
_connection.ConnectionBlocked += OnConnectionBlocked;
2019-01-31 15:30:50 +00:00
_logger.LogInformation("RabbitMQ Client acquired a persistent connection to '{HostName}' and is subscribed to failure events", _connection.Endpoint.HostName);
2017-10-12 09:25:01 +01:00
return true;
}
else
{
2017-04-29 21:58:11 -07:00
_logger.LogCritical("FATAL ERROR: RabbitMQ connections could not be created and opened");
return false;
}
}
}
private void OnConnectionBlocked(object sender, ConnectionBlockedEventArgs e)
{
if (_disposed) return;
_logger.LogWarning("A RabbitMQ connection is shutdown. Trying to re-connect...");
TryConnect();
}
void OnCallbackException(object sender, CallbackExceptionEventArgs e)
{
if (_disposed) return;
_logger.LogWarning("A RabbitMQ connection throw exception. Trying to re-connect...");
TryConnect();
}
void OnConnectionShutdown(object sender, ShutdownEventArgs reason)
{
if (_disposed) return;
_logger.LogWarning("A RabbitMQ connection is on shutdown. Trying to re-connect...");
TryConnect();
}
}
}