From 396d33fe7b95de424b7cb88c344851d44db3f746 Mon Sep 17 00:00:00 2001 From: Miguel Veloso Date: Tue, 2 Apr 2019 15:36:20 +0100 Subject: [PATCH] Use AsyncEventingBasicConsumer in RabbitMQ to properly use async event handlers --- .../EventBusRabbitMQ/EventBusRabbitMQ.cs | 47 +++++++++++++------ src/Services/Basket/Basket.API/Startup.cs | 3 +- src/Services/Catalog/Catalog.API/Startup.cs | 3 +- .../Location/Locations.API/Startup.cs | 3 +- .../Marketing/Marketing.API/Startup.cs | 3 +- src/Services/Ordering/Ordering.API/Startup.cs | 3 +- .../Ordering.BackgroundTasks/Startup.cs | 3 +- .../Ordering/Ordering.SignalrHub/Startup.cs | 3 +- src/Services/Payment/Payment.API/Startup.cs | 3 +- src/Services/Webhooks/Webhooks.API/Startup.cs | 3 +- 10 files changed, 51 insertions(+), 23 deletions(-) diff --git a/src/BuildingBlocks/EventBus/EventBusRabbitMQ/EventBusRabbitMQ.cs b/src/BuildingBlocks/EventBus/EventBusRabbitMQ/EventBusRabbitMQ.cs index 9044a4283..397b75017 100644 --- a/src/BuildingBlocks/EventBus/EventBusRabbitMQ/EventBusRabbitMQ.cs +++ b/src/BuildingBlocks/EventBus/EventBusRabbitMQ/EventBusRabbitMQ.cs @@ -178,27 +178,46 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ { if (_consumerChannel != null) { - var consumer = new EventingBasicConsumer(_consumerChannel); - consumer.Received += async (model, ea) => - { - var eventName = ea.RoutingKey; - var message = Encoding.UTF8.GetString(ea.Body); - - await ProcessEvent(eventName, message); + var consumer = new AsyncEventingBasicConsumer(_consumerChannel); - _consumerChannel.BasicAck(ea.DeliveryTag, multiple: false); - }; + consumer.Received += Consumer_Received; - _consumerChannel.BasicConsume(queue: _queueName, - autoAck: false, - consumer: consumer); + _consumerChannel.BasicConsume( + queue: _queueName, + autoAck: false, + consumer: consumer); } else { - _logger.LogError("StartBasicConsume can not call on _consumerChannelCreated == false"); + _logger.LogError("StartBasicConsume can't call on _consumerChannel == null"); } } + private async Task Consumer_Received(object sender, BasicDeliverEventArgs eventArgs) + { + var eventName = eventArgs.RoutingKey; + var message = Encoding.UTF8.GetString(eventArgs.Body); + + try + { + if (message.ToLowerInvariant().Contains("throw-fake-exception")) + { + throw new InvalidOperationException($"Fake exception requested: \"{message}\""); + } + + await ProcessEvent(eventName, message); + } + catch (Exception ex) + { + _logger.LogWarning(ex, "----- ERROR Processing message \"{Message}\"", message); + } + + // Even on exception we take the message off the queue. + // in a REAL WORLD app this should be handled with a Dead Letter Exchange (DLX). + // For more information see: https://www.rabbitmq.com/dlx.html + _consumerChannel.BasicAck(eventArgs.DeliveryTag, multiple: false); + } + private IModel CreateConsumerChannel() { if (!_persistentConnection.IsConnected) @@ -209,7 +228,7 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ var channel = _persistentConnection.CreateModel(); channel.ExchangeDeclare(exchange: BROKER_NAME, - type: "direct"); + type: "direct"); channel.QueueDeclare(queue: _queueName, durable: true, diff --git a/src/Services/Basket/Basket.API/Startup.cs b/src/Services/Basket/Basket.API/Startup.cs index 235b787d0..97ede1879 100644 --- a/src/Services/Basket/Basket.API/Startup.cs +++ b/src/Services/Basket/Basket.API/Startup.cs @@ -105,7 +105,8 @@ namespace Microsoft.eShopOnContainers.Services.Basket.API var factory = new ConnectionFactory() { - HostName = Configuration["EventBusConnection"] + HostName = Configuration["EventBusConnection"], + DispatchConsumersAsync = true }; if (!string.IsNullOrEmpty(Configuration["EventBusUserName"])) diff --git a/src/Services/Catalog/Catalog.API/Startup.cs b/src/Services/Catalog/Catalog.API/Startup.cs index 0258a0a98..1a51a86fb 100644 --- a/src/Services/Catalog/Catalog.API/Startup.cs +++ b/src/Services/Catalog/Catalog.API/Startup.cs @@ -297,7 +297,8 @@ namespace Microsoft.eShopOnContainers.Services.Catalog.API var factory = new ConnectionFactory() { - HostName = configuration["EventBusConnection"] + HostName = configuration["EventBusConnection"], + DispatchConsumersAsync = true }; if (!string.IsNullOrEmpty(configuration["EventBusUserName"])) diff --git a/src/Services/Location/Locations.API/Startup.cs b/src/Services/Location/Locations.API/Startup.cs index 6d5fe3200..4664381d0 100644 --- a/src/Services/Location/Locations.API/Startup.cs +++ b/src/Services/Location/Locations.API/Startup.cs @@ -77,7 +77,8 @@ namespace Microsoft.eShopOnContainers.Services.Locations.API var factory = new ConnectionFactory() { - HostName = Configuration["EventBusConnection"] + HostName = Configuration["EventBusConnection"], + DispatchConsumersAsync = true }; if (!string.IsNullOrEmpty(Configuration["EventBusUserName"])) diff --git a/src/Services/Marketing/Marketing.API/Startup.cs b/src/Services/Marketing/Marketing.API/Startup.cs index 75c749452..7f990e3ad 100644 --- a/src/Services/Marketing/Marketing.API/Startup.cs +++ b/src/Services/Marketing/Marketing.API/Startup.cs @@ -101,7 +101,8 @@ var factory = new ConnectionFactory() { - HostName = Configuration["EventBusConnection"] + HostName = Configuration["EventBusConnection"], + DispatchConsumersAsync = true }; if (!string.IsNullOrEmpty(Configuration["EventBusUserName"])) diff --git a/src/Services/Ordering/Ordering.API/Startup.cs b/src/Services/Ordering/Ordering.API/Startup.cs index cd34f98dd..b77354052 100644 --- a/src/Services/Ordering/Ordering.API/Startup.cs +++ b/src/Services/Ordering/Ordering.API/Startup.cs @@ -305,7 +305,8 @@ var factory = new ConnectionFactory() { - HostName = configuration["EventBusConnection"] + HostName = configuration["EventBusConnection"], + DispatchConsumersAsync = true }; if (!string.IsNullOrEmpty(configuration["EventBusUserName"])) diff --git a/src/Services/Ordering/Ordering.BackgroundTasks/Startup.cs b/src/Services/Ordering/Ordering.BackgroundTasks/Startup.cs index 683fe1d0c..9d6a78e38 100644 --- a/src/Services/Ordering/Ordering.BackgroundTasks/Startup.cs +++ b/src/Services/Ordering/Ordering.BackgroundTasks/Startup.cs @@ -68,7 +68,8 @@ namespace Ordering.BackgroundTasks var factory = new ConnectionFactory() { - HostName = Configuration["EventBusConnection"] + HostName = Configuration["EventBusConnection"], + DispatchConsumersAsync = true }; if (!string.IsNullOrEmpty(Configuration["EventBusUserName"])) diff --git a/src/Services/Ordering/Ordering.SignalrHub/Startup.cs b/src/Services/Ordering/Ordering.SignalrHub/Startup.cs index e8bb7ecd1..edcc80521 100644 --- a/src/Services/Ordering/Ordering.SignalrHub/Startup.cs +++ b/src/Services/Ordering/Ordering.SignalrHub/Startup.cs @@ -80,7 +80,8 @@ namespace Ordering.SignalrHub var factory = new ConnectionFactory() { - HostName = Configuration["EventBusConnection"] + HostName = Configuration["EventBusConnection"], + DispatchConsumersAsync = true }; if (!string.IsNullOrEmpty(Configuration["EventBusUserName"])) diff --git a/src/Services/Payment/Payment.API/Startup.cs b/src/Services/Payment/Payment.API/Startup.cs index 4d5010868..39bb78f91 100644 --- a/src/Services/Payment/Payment.API/Startup.cs +++ b/src/Services/Payment/Payment.API/Startup.cs @@ -58,7 +58,8 @@ namespace Payment.API var logger = sp.GetRequiredService>(); var factory = new ConnectionFactory() { - HostName = Configuration["EventBusConnection"] + HostName = Configuration["EventBusConnection"], + DispatchConsumersAsync = true }; if (!string.IsNullOrEmpty(Configuration["EventBusUserName"])) diff --git a/src/Services/Webhooks/Webhooks.API/Startup.cs b/src/Services/Webhooks/Webhooks.API/Startup.cs index 654b1c897..9d39da719 100644 --- a/src/Services/Webhooks/Webhooks.API/Startup.cs +++ b/src/Services/Webhooks/Webhooks.API/Startup.cs @@ -320,7 +320,8 @@ namespace Webhooks.API var factory = new ConnectionFactory() { - HostName = configuration["EventBusConnection"] + HostName = configuration["EventBusConnection"], + DispatchConsumersAsync = true }; if (!string.IsNullOrEmpty(configuration["EventBusUserName"]))