From 44e39a685c1038c4a74036789f4e8883e4029f67 Mon Sep 17 00:00:00 2001 From: Unai Zorrilla Castro Date: Thu, 20 Apr 2017 10:53:17 +0200 Subject: [PATCH] Added IRabbitMQPersisterConnection and more resilient work on rabbitmq event bus --- .../DefaultRabbitMQPersisterConnection.cs | 130 +++++++++++++ .../EventBusRabbitMQ/EventBusRabbitMQ.cs | 175 +++++++++++------- .../EventBusRabbitMQ/EventBusRabbitMQ.csproj | 2 + .../IRabbitMQPersisterConnection.cs | 16 ++ src/Services/Basket/Basket.API/Startup.cs | 20 +- src/Services/Catalog/Catalog.API/Startup.cs | 18 +- src/Services/Ordering/Ordering.API/Startup.cs | 17 +- 7 files changed, 295 insertions(+), 83 deletions(-) create mode 100644 src/BuildingBlocks/EventBus/EventBusRabbitMQ/DefaultRabbitMQPersisterConnection.cs create mode 100644 src/BuildingBlocks/EventBus/EventBusRabbitMQ/IRabbitMQPersisterConnection.cs diff --git a/src/BuildingBlocks/EventBus/EventBusRabbitMQ/DefaultRabbitMQPersisterConnection.cs b/src/BuildingBlocks/EventBus/EventBusRabbitMQ/DefaultRabbitMQPersisterConnection.cs new file mode 100644 index 000000000..894afb4e4 --- /dev/null +++ b/src/BuildingBlocks/EventBus/EventBusRabbitMQ/DefaultRabbitMQPersisterConnection.cs @@ -0,0 +1,130 @@ +using Microsoft.Extensions.Logging; +using Polly; +using Polly.Retry; +using RabbitMQ.Client; +using RabbitMQ.Client.Events; +using RabbitMQ.Client.Exceptions; +using System; +using System.IO; +using System.Net.Sockets; + +namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ +{ + public class DefaultRabbitMQPersisterConnection + : IRabbitMQPersisterConnection + { + private readonly IConnectionFactory _connectionFactory; + private readonly ILogger _logger; + + IConnection _connection; + bool _disposed; + + object sync_root = new object(); + + public DefaultRabbitMQPersisterConnection(IConnectionFactory connectionFactory,ILogger logger) + { + _connectionFactory = connectionFactory ?? throw new ArgumentNullException(nameof(connectionFactory)); + _logger = logger ?? throw new ArgumentNullException(nameof(logger)); + } + + public bool IsConnected + { + get + { + return _connection != null && _connection.IsOpen && !_disposed; + } + } + + public IModel CreateModel() + { + if (!IsConnected) + { + throw new InvalidOperationException("No RabbitMQ connections are available to perform this action"); + } + + return _connection.CreateModel(); + } + + public void Dispose() + { + if (_disposed) return; + + _disposed = true; + + try + { + _connection.Dispose(); + } + catch (IOException ex) + { + _logger.LogCritical(ex.ToString()); + } + } + + public bool TryConnect() + { + _logger.LogInformation("RabbitMQ Client is trying to connect"); + + lock (sync_root) + { + var policy = RetryPolicy.Handle() + .Or() + .WaitAndRetry(5, retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)), (ex, time) => + { + _logger.LogWarning(ex.ToString()); + } + ); + + policy.Execute(() => + { + _connection = _connectionFactory + .CreateConnection(); + }); + + if (IsConnected) + { + _connection.ConnectionShutdown += OnConnectionShutdown; + _connection.CallbackException += OnCallbackException; + _connection.ConnectionBlocked += OnConnectionBlocked; + + _logger.LogInformation($"RabbitMQ persister connection acquire a connection {_connection.Endpoint.HostName} and is subscribed to failure events"); + + return true; + } + else + { + _logger.LogCritical("FATAL ERROR: RabbitMQ connections can't be created and opened"); + + return false; + } + } + } + + private void OnConnectionBlocked(object sender, ConnectionBlockedEventArgs e) + { + if (_disposed) return; + + _logger.LogWarning("A RabbitMQ connection is shutdown. Trying to re-connect..."); + + TryConnect(); + } + + void OnCallbackException(object sender, CallbackExceptionEventArgs e) + { + if (_disposed) return; + + _logger.LogWarning("A RabbitMQ connection throw exception. Trying to re-connect..."); + + TryConnect(); + } + + void OnConnectionShutdown(object sender, ShutdownEventArgs reason) + { + if (_disposed) return; + + _logger.LogWarning("A RabbitMQ connection is on shutdown. Trying to re-connect..."); + + TryConnect(); + } + } +} diff --git a/src/BuildingBlocks/EventBus/EventBusRabbitMQ/EventBusRabbitMQ.cs b/src/BuildingBlocks/EventBus/EventBusRabbitMQ/EventBusRabbitMQ.cs index 903601378..cbdb233f8 100644 --- a/src/BuildingBlocks/EventBus/EventBusRabbitMQ/EventBusRabbitMQ.cs +++ b/src/BuildingBlocks/EventBus/EventBusRabbitMQ/EventBusRabbitMQ.cs @@ -1,12 +1,16 @@ - -using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Abstractions; +using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Abstractions; using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Events; +using Microsoft.Extensions.Logging; using Newtonsoft.Json; +using Polly; +using Polly.Retry; using RabbitMQ.Client; using RabbitMQ.Client.Events; +using RabbitMQ.Client.Exceptions; using System; using System.Collections.Generic; using System.Linq; +using System.Net.Sockets; using System.Reflection; using System.Text; using System.Threading.Tasks; @@ -15,70 +19,97 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ { public class EventBusRabbitMQ : IEventBus, IDisposable { - private readonly string _brokerName = "eshop_event_bus"; - private readonly string _connectionString; - private readonly Dictionary> _handlers; - private readonly List _eventTypes; + const string BROKER_NAME = "eshop_event_bus"; - private IModel _model; - private IConnection _connection; - private string _queueName; - + private readonly IRabbitMQPersisterConnection _persisterConnection; + private readonly ILogger _logger; - public EventBusRabbitMQ(string connectionString) + private readonly Dictionary> _handlers + = new Dictionary>(); + + private readonly List _eventTypes + = new List(); + + private IModel _consumerChannel; + + public EventBusRabbitMQ(IRabbitMQPersisterConnection persisterConnection, ILogger logger) { - _connectionString = connectionString; - _handlers = new Dictionary>(); - _eventTypes = new List(); + _persisterConnection = persisterConnection ?? throw new ArgumentNullException(nameof(persisterConnection)); + _logger = logger ?? throw new ArgumentNullException(nameof(logger)); + + _consumerChannel = CreateConsumerChannel(); } + public void Publish(IntegrationEvent @event) { - var eventName = @event.GetType() - .Name; + if (!_persisterConnection.IsConnected) + { + _persisterConnection.TryConnect(); + } - var factory = new ConnectionFactory() { HostName = _connectionString }; - using (var connection = factory.CreateConnection()) - using (var channel = connection.CreateModel()) + var policy = RetryPolicy.Handle() + .Or() + .WaitAndRetry(5, retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)), (ex, time) => + { + _logger.LogWarning(ex.ToString()); + }); + + using (var channel = _persisterConnection.CreateModel()) { - channel.ExchangeDeclare(exchange: _brokerName, + var eventName = @event.GetType() + .Name; + + channel.ExchangeDeclare(exchange: BROKER_NAME, type: "direct"); - string message = JsonConvert.SerializeObject(@event); + var message = JsonConvert.SerializeObject(@event); var body = Encoding.UTF8.GetBytes(message); - channel.BasicPublish(exchange: _brokerName, + policy.Execute(() => + { + channel.BasicPublish(exchange: BROKER_NAME, routingKey: eventName, basicProperties: null, - body: body); + body: body); + }); } - } public void Subscribe(IIntegrationEventHandler handler) where T : IntegrationEvent { var eventName = typeof(T).Name; - if (_handlers.ContainsKey(eventName)) + + if (_handlers.ContainsKey(eventName)) { _handlers[eventName].Add(handler); } else { - var channel = GetChannel(); - channel.QueueBind(queue: _queueName, - exchange: _brokerName, - routingKey: eventName); - - _handlers.Add(eventName, new List()); - _handlers[eventName].Add(handler); - _eventTypes.Add(typeof(T)); + if (!_persisterConnection.IsConnected) + { + _persisterConnection.TryConnect(); + } + + using (var channel = _persisterConnection.CreateModel()) + { + channel.QueueBind(queue: channel.QueueDeclare().QueueName, + exchange: BROKER_NAME, + routingKey: eventName); + + _handlers.Add(eventName, new List()); + _handlers[eventName].Add(handler); + _eventTypes.Add(typeof(T)); + } + } - + } public void Unsubscribe(IIntegrationEventHandler handler) where T : IntegrationEvent { var eventName = typeof(T).Name; + if (_handlers.ContainsKey(eventName) && _handlers[eventName].Contains(handler)) { _handlers[eventName].Remove(handler); @@ -86,56 +117,46 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ if (_handlers[eventName].Count == 0) { _handlers.Remove(eventName); - var eventType = _eventTypes.Single(e => e.Name == eventName); - _eventTypes.Remove(eventType); - _model.QueueUnbind(queue: _queueName, - exchange: _brokerName, - routingKey: eventName); - if (_handlers.Keys.Count == 0) + var eventType = _eventTypes.SingleOrDefault(e => e.Name == eventName); + + if (eventType != null) { - _queueName = string.Empty; - _model.Dispose(); - _connection.Dispose(); + _eventTypes.Remove(eventType); + + if (!_persisterConnection.IsConnected) + { + _persisterConnection.TryConnect(); + } + + using (var channel = _persisterConnection.CreateModel()) + { + channel.QueueUnbind(queue: channel.QueueDeclare().QueueName, + exchange: BROKER_NAME, + routingKey: eventName); + } } - } } } public void Dispose() { + _consumerChannel.Dispose(); _handlers.Clear(); - _model?.Dispose(); - _connection?.Dispose(); } - private IModel GetChannel() + private IModel CreateConsumerChannel() { - if (_model != null) - { - return _model; - } - else + if (!_persisterConnection.IsConnected) { - (_model, _connection) = CreateConnection(); - return _model; + _persisterConnection.TryConnect(); } - } + var channel = _persisterConnection.CreateModel(); - private (IModel model, IConnection connection) CreateConnection() - { - var factory = new ConnectionFactory() { HostName = _connectionString }; - var con = factory.CreateConnection(); - var channel = con.CreateModel(); - - channel.ExchangeDeclare(exchange: _brokerName, - type: "direct"); - if (string.IsNullOrEmpty(_queueName)) - { - _queueName = channel.QueueDeclare().QueueName; - } + channel.ExchangeDeclare(exchange: BROKER_NAME, + type: "direct"); var consumer = new EventingBasicConsumer(channel); consumer.Received += async (model, ea) => @@ -145,11 +166,24 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ await ProcessEvent(eventName, message); }; - channel.BasicConsume(queue: _queueName, + + channel.BasicConsume(queue: channel.QueueDeclare().QueueName, noAck: true, consumer: consumer); - return (channel, con); + channel.ModelShutdown += (sender, ea) => + { + _consumerChannel.Dispose(); + _consumerChannel = CreateConsumerChannel(); + }; + + channel.CallbackException += (sender, ea) => + { + _consumerChannel.Dispose(); + _consumerChannel = CreateConsumerChannel(); + }; + + return channel; } private async Task ProcessEvent(string eventName, string message) @@ -157,7 +191,7 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ if (_handlers.ContainsKey(eventName)) { Type eventType = _eventTypes.Single(t => t.Name == eventName); - var integrationEvent = JsonConvert.DeserializeObject(message, eventType); + var integrationEvent = JsonConvert.DeserializeObject(message, eventType); var concreteType = typeof(IIntegrationEventHandler<>).MakeGenericType(eventType); var handlers = _handlers[eventName]; @@ -167,6 +201,5 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ } } } - } } diff --git a/src/BuildingBlocks/EventBus/EventBusRabbitMQ/EventBusRabbitMQ.csproj b/src/BuildingBlocks/EventBus/EventBusRabbitMQ/EventBusRabbitMQ.csproj index cf36a2222..023a5d5ec 100644 --- a/src/BuildingBlocks/EventBus/EventBusRabbitMQ/EventBusRabbitMQ.csproj +++ b/src/BuildingBlocks/EventBus/EventBusRabbitMQ/EventBusRabbitMQ.csproj @@ -7,7 +7,9 @@ + + diff --git a/src/BuildingBlocks/EventBus/EventBusRabbitMQ/IRabbitMQPersisterConnection.cs b/src/BuildingBlocks/EventBus/EventBusRabbitMQ/IRabbitMQPersisterConnection.cs new file mode 100644 index 000000000..b9debe743 --- /dev/null +++ b/src/BuildingBlocks/EventBus/EventBusRabbitMQ/IRabbitMQPersisterConnection.cs @@ -0,0 +1,16 @@ +using RabbitMQ.Client; +using System; + +namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ +{ + + public interface IRabbitMQPersisterConnection + : IDisposable + { + bool IsConnected { get; } + + bool TryConnect(); + + IModel CreateModel(); + } +} diff --git a/src/Services/Basket/Basket.API/Startup.cs b/src/Services/Basket/Basket.API/Startup.cs index f24818ab6..60fc46de2 100644 --- a/src/Services/Basket/Basket.API/Startup.cs +++ b/src/Services/Basket/Basket.API/Startup.cs @@ -14,6 +14,7 @@ using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.HealthChecks; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; +using RabbitMQ.Client; using StackExchange.Redis; using System.Linq; using System.Net; @@ -58,22 +59,31 @@ namespace Microsoft.eShopOnContainers.Services.Basket.API //and then creating the connection it seems reasonable to move //that cost to startup instead of having the first request pay the //penalty. - services.AddSingleton(sp => { + services.AddSingleton(sp => + { var settings = sp.GetRequiredService>().Value; var ips = Dns.GetHostAddressesAsync(settings.ConnectionString).Result; return ConnectionMultiplexer.Connect(ips.First().ToString()); }); - services.AddSingleton(sp => + + services.AddSingleton(sp => { var settings = sp.GetRequiredService>().Value; + var logger = sp.GetRequiredService>(); + var factory = new ConnectionFactory() + { + HostName = settings.EventBusConnection + }; - return new EventBusRabbitMQ(settings.EventBusConnection); + return new DefaultRabbitMQPersisterConnection(factory, logger); }); + services.AddSingleton(); + services.AddSwaggerGen(); - + services.ConfigureSwaggerGen(options => { options.OperationFilter(); @@ -101,7 +111,7 @@ namespace Microsoft.eShopOnContainers.Services.Basket.API services.AddTransient, ProductPriceChangedIntegrationEventHandler>(); services.AddTransient, OrderStartedIntegrationEventHandler>(); - + } // This method gets called by the runtime. Use this method to configure the HTTP request pipeline. diff --git a/src/Services/Catalog/Catalog.API/Startup.cs b/src/Services/Catalog/Catalog.API/Startup.cs index e3f671652..c13ac2d1b 100644 --- a/src/Services/Catalog/Catalog.API/Startup.cs +++ b/src/Services/Catalog/Catalog.API/Startup.cs @@ -16,6 +16,7 @@ using Microsoft.Extensions.HealthChecks; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; + using RabbitMQ.Client; using System; using System.Data.Common; using System.Data.SqlClient; @@ -102,14 +103,19 @@ services.AddTransient(); - - var serviceProvider = services.BuildServiceProvider(); - - services.AddSingleton(sp => + services.AddSingleton(sp => { - var settings = serviceProvider.GetRequiredService>().Value; - return new EventBusRabbitMQ(settings.EventBusConnection); + var settings = sp.GetRequiredService>().Value; + var logger = sp.GetRequiredService>(); + var factory = new ConnectionFactory() + { + HostName = settings.EventBusConnection + }; + + return new DefaultRabbitMQPersisterConnection(factory, logger); }); + + services.AddSingleton(); } public void Configure(IApplicationBuilder app, IHostingEnvironment env, ILoggerFactory loggerFactory) diff --git a/src/Services/Ordering/Ordering.API/Startup.cs b/src/Services/Ordering/Ordering.API/Startup.cs index ae5c4c3ff..0d6e222b6 100644 --- a/src/Services/Ordering/Ordering.API/Startup.cs +++ b/src/Services/Ordering/Ordering.API/Startup.cs @@ -22,6 +22,7 @@ using Microsoft.Extensions.HealthChecks; using Microsoft.Extensions.Logging; using Ordering.Infrastructure; + using RabbitMQ.Client; using System; using System.Data.Common; using System.Reflection; @@ -105,7 +106,21 @@ sp => (DbConnection c) => new IntegrationEventLogService(c)); var serviceProvider = services.BuildServiceProvider(); services.AddTransient(); - services.AddSingleton(new EventBusRabbitMQ(Configuration["EventBusConnection"])); + + services.AddSingleton(sp => + { + var logger = sp.GetRequiredService>(); + + var factory = new ConnectionFactory() + { + HostName = Configuration["EventBusConnection"] + }; + + return new DefaultRabbitMQPersisterConnection(factory, logger); + }); + + services.AddSingleton(); + services.AddOptions(); //configure autofac