Browse Source

Merge pull request #987 from dotnet-architecture/fix/888-rabbitmq-message-processing-problem

Use AsyncEventingBasicConsumer in RabbitMQ
pull/988/head
Miguel Veloso 5 years ago
committed by GitHub
parent
commit
167bb167ff
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 51 additions and 23 deletions
  1. +33
    -14
      src/BuildingBlocks/EventBus/EventBusRabbitMQ/EventBusRabbitMQ.cs
  2. +2
    -1
      src/Services/Basket/Basket.API/Startup.cs
  3. +2
    -1
      src/Services/Catalog/Catalog.API/Startup.cs
  4. +2
    -1
      src/Services/Location/Locations.API/Startup.cs
  5. +2
    -1
      src/Services/Marketing/Marketing.API/Startup.cs
  6. +2
    -1
      src/Services/Ordering/Ordering.API/Startup.cs
  7. +2
    -1
      src/Services/Ordering/Ordering.BackgroundTasks/Startup.cs
  8. +2
    -1
      src/Services/Ordering/Ordering.SignalrHub/Startup.cs
  9. +2
    -1
      src/Services/Payment/Payment.API/Startup.cs
  10. +2
    -1
      src/Services/Webhooks/Webhooks.API/Startup.cs

+ 33
- 14
src/BuildingBlocks/EventBus/EventBusRabbitMQ/EventBusRabbitMQ.cs View File

@ -178,27 +178,46 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ
{
if (_consumerChannel != null)
{
var consumer = new EventingBasicConsumer(_consumerChannel);
consumer.Received += async (model, ea) =>
{
var eventName = ea.RoutingKey;
var message = Encoding.UTF8.GetString(ea.Body);
await ProcessEvent(eventName, message);
var consumer = new AsyncEventingBasicConsumer(_consumerChannel);
_consumerChannel.BasicAck(ea.DeliveryTag, multiple: false);
};
consumer.Received += Consumer_Received;
_consumerChannel.BasicConsume(queue: _queueName,
autoAck: false,
consumer: consumer);
_consumerChannel.BasicConsume(
queue: _queueName,
autoAck: false,
consumer: consumer);
}
else
{
_logger.LogError("StartBasicConsume can not call on _consumerChannelCreated == false");
_logger.LogError("StartBasicConsume can't call on _consumerChannel == null");
}
}
private async Task Consumer_Received(object sender, BasicDeliverEventArgs eventArgs)
{
var eventName = eventArgs.RoutingKey;
var message = Encoding.UTF8.GetString(eventArgs.Body);
try
{
if (message.ToLowerInvariant().Contains("throw-fake-exception"))
{
throw new InvalidOperationException($"Fake exception requested: \"{message}\"");
}
await ProcessEvent(eventName, message);
}
catch (Exception ex)
{
_logger.LogWarning(ex, "----- ERROR Processing message \"{Message}\"", message);
}
// Even on exception we take the message off the queue.
// in a REAL WORLD app this should be handled with a Dead Letter Exchange (DLX).
// For more information see: https://www.rabbitmq.com/dlx.html
_consumerChannel.BasicAck(eventArgs.DeliveryTag, multiple: false);
}
private IModel CreateConsumerChannel()
{
if (!_persistentConnection.IsConnected)
@ -209,7 +228,7 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ
var channel = _persistentConnection.CreateModel();
channel.ExchangeDeclare(exchange: BROKER_NAME,
type: "direct");
type: "direct");
channel.QueueDeclare(queue: _queueName,
durable: true,


+ 2
- 1
src/Services/Basket/Basket.API/Startup.cs View File

@ -105,7 +105,8 @@ namespace Microsoft.eShopOnContainers.Services.Basket.API
var factory = new ConnectionFactory()
{
HostName = Configuration["EventBusConnection"]
HostName = Configuration["EventBusConnection"],
DispatchConsumersAsync = true
};
if (!string.IsNullOrEmpty(Configuration["EventBusUserName"]))


+ 2
- 1
src/Services/Catalog/Catalog.API/Startup.cs View File

@ -297,7 +297,8 @@ namespace Microsoft.eShopOnContainers.Services.Catalog.API
var factory = new ConnectionFactory()
{
HostName = configuration["EventBusConnection"]
HostName = configuration["EventBusConnection"],
DispatchConsumersAsync = true
};
if (!string.IsNullOrEmpty(configuration["EventBusUserName"]))


+ 2
- 1
src/Services/Location/Locations.API/Startup.cs View File

@ -77,7 +77,8 @@ namespace Microsoft.eShopOnContainers.Services.Locations.API
var factory = new ConnectionFactory()
{
HostName = Configuration["EventBusConnection"]
HostName = Configuration["EventBusConnection"],
DispatchConsumersAsync = true
};
if (!string.IsNullOrEmpty(Configuration["EventBusUserName"]))


+ 2
- 1
src/Services/Marketing/Marketing.API/Startup.cs View File

@ -101,7 +101,8 @@
var factory = new ConnectionFactory()
{
HostName = Configuration["EventBusConnection"]
HostName = Configuration["EventBusConnection"],
DispatchConsumersAsync = true
};
if (!string.IsNullOrEmpty(Configuration["EventBusUserName"]))


+ 2
- 1
src/Services/Ordering/Ordering.API/Startup.cs View File

@ -305,7 +305,8 @@
var factory = new ConnectionFactory()
{
HostName = configuration["EventBusConnection"]
HostName = configuration["EventBusConnection"],
DispatchConsumersAsync = true
};
if (!string.IsNullOrEmpty(configuration["EventBusUserName"]))


+ 2
- 1
src/Services/Ordering/Ordering.BackgroundTasks/Startup.cs View File

@ -68,7 +68,8 @@ namespace Ordering.BackgroundTasks
var factory = new ConnectionFactory()
{
HostName = Configuration["EventBusConnection"]
HostName = Configuration["EventBusConnection"],
DispatchConsumersAsync = true
};
if (!string.IsNullOrEmpty(Configuration["EventBusUserName"]))


+ 2
- 1
src/Services/Ordering/Ordering.SignalrHub/Startup.cs View File

@ -80,7 +80,8 @@ namespace Ordering.SignalrHub
var factory = new ConnectionFactory()
{
HostName = Configuration["EventBusConnection"]
HostName = Configuration["EventBusConnection"],
DispatchConsumersAsync = true
};
if (!string.IsNullOrEmpty(Configuration["EventBusUserName"]))


+ 2
- 1
src/Services/Payment/Payment.API/Startup.cs View File

@ -58,7 +58,8 @@ namespace Payment.API
var logger = sp.GetRequiredService<ILogger<DefaultRabbitMQPersistentConnection>>();
var factory = new ConnectionFactory()
{
HostName = Configuration["EventBusConnection"]
HostName = Configuration["EventBusConnection"],
DispatchConsumersAsync = true
};
if (!string.IsNullOrEmpty(Configuration["EventBusUserName"]))


+ 2
- 1
src/Services/Webhooks/Webhooks.API/Startup.cs View File

@ -320,7 +320,8 @@ namespace Webhooks.API
var factory = new ConnectionFactory()
{
HostName = configuration["EventBusConnection"]
HostName = configuration["EventBusConnection"],
DispatchConsumersAsync = true
};
if (!string.IsNullOrEmpty(configuration["EventBusUserName"]))


Loading…
Cancel
Save