commit
665392fa0b
@ -178,27 +178,46 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ
|
|||||||
{
|
{
|
||||||
if (_consumerChannel != null)
|
if (_consumerChannel != null)
|
||||||
{
|
{
|
||||||
var consumer = new EventingBasicConsumer(_consumerChannel);
|
var consumer = new AsyncEventingBasicConsumer(_consumerChannel);
|
||||||
consumer.Received += async (model, ea) =>
|
|
||||||
{
|
|
||||||
var eventName = ea.RoutingKey;
|
|
||||||
var message = Encoding.UTF8.GetString(ea.Body);
|
|
||||||
|
|
||||||
await ProcessEvent(eventName, message);
|
consumer.Received += Consumer_Received;
|
||||||
|
|
||||||
_consumerChannel.BasicAck(ea.DeliveryTag, multiple: false);
|
_consumerChannel.BasicConsume(
|
||||||
};
|
queue: _queueName,
|
||||||
|
autoAck: false,
|
||||||
_consumerChannel.BasicConsume(queue: _queueName,
|
consumer: consumer);
|
||||||
autoAck: false,
|
|
||||||
consumer: consumer);
|
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
_logger.LogError("StartBasicConsume can not call on _consumerChannelCreated == false");
|
_logger.LogError("StartBasicConsume can't call on _consumerChannel == null");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private async Task Consumer_Received(object sender, BasicDeliverEventArgs eventArgs)
|
||||||
|
{
|
||||||
|
var eventName = eventArgs.RoutingKey;
|
||||||
|
var message = Encoding.UTF8.GetString(eventArgs.Body);
|
||||||
|
|
||||||
|
try
|
||||||
|
{
|
||||||
|
if (message.ToLowerInvariant().Contains("throw-fake-exception"))
|
||||||
|
{
|
||||||
|
throw new InvalidOperationException($"Fake exception requested: \"{message}\"");
|
||||||
|
}
|
||||||
|
|
||||||
|
await ProcessEvent(eventName, message);
|
||||||
|
}
|
||||||
|
catch (Exception ex)
|
||||||
|
{
|
||||||
|
_logger.LogWarning(ex, "----- ERROR Processing message \"{Message}\"", message);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Even on exception we take the message off the queue.
|
||||||
|
// in a REAL WORLD app this should be handled with a Dead Letter Exchange (DLX).
|
||||||
|
// For more information see: https://www.rabbitmq.com/dlx.html
|
||||||
|
_consumerChannel.BasicAck(eventArgs.DeliveryTag, multiple: false);
|
||||||
|
}
|
||||||
|
|
||||||
private IModel CreateConsumerChannel()
|
private IModel CreateConsumerChannel()
|
||||||
{
|
{
|
||||||
if (!_persistentConnection.IsConnected)
|
if (!_persistentConnection.IsConnected)
|
||||||
@ -209,7 +228,7 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ
|
|||||||
var channel = _persistentConnection.CreateModel();
|
var channel = _persistentConnection.CreateModel();
|
||||||
|
|
||||||
channel.ExchangeDeclare(exchange: BROKER_NAME,
|
channel.ExchangeDeclare(exchange: BROKER_NAME,
|
||||||
type: "direct");
|
type: "direct");
|
||||||
|
|
||||||
channel.QueueDeclare(queue: _queueName,
|
channel.QueueDeclare(queue: _queueName,
|
||||||
durable: true,
|
durable: true,
|
||||||
|
@ -105,7 +105,8 @@ namespace Microsoft.eShopOnContainers.Services.Basket.API
|
|||||||
|
|
||||||
var factory = new ConnectionFactory()
|
var factory = new ConnectionFactory()
|
||||||
{
|
{
|
||||||
HostName = Configuration["EventBusConnection"]
|
HostName = Configuration["EventBusConnection"],
|
||||||
|
DispatchConsumersAsync = true
|
||||||
};
|
};
|
||||||
|
|
||||||
if (!string.IsNullOrEmpty(Configuration["EventBusUserName"]))
|
if (!string.IsNullOrEmpty(Configuration["EventBusUserName"]))
|
||||||
|
@ -297,7 +297,8 @@ namespace Microsoft.eShopOnContainers.Services.Catalog.API
|
|||||||
|
|
||||||
var factory = new ConnectionFactory()
|
var factory = new ConnectionFactory()
|
||||||
{
|
{
|
||||||
HostName = configuration["EventBusConnection"]
|
HostName = configuration["EventBusConnection"],
|
||||||
|
DispatchConsumersAsync = true
|
||||||
};
|
};
|
||||||
|
|
||||||
if (!string.IsNullOrEmpty(configuration["EventBusUserName"]))
|
if (!string.IsNullOrEmpty(configuration["EventBusUserName"]))
|
||||||
|
@ -77,7 +77,8 @@ namespace Microsoft.eShopOnContainers.Services.Locations.API
|
|||||||
|
|
||||||
var factory = new ConnectionFactory()
|
var factory = new ConnectionFactory()
|
||||||
{
|
{
|
||||||
HostName = Configuration["EventBusConnection"]
|
HostName = Configuration["EventBusConnection"],
|
||||||
|
DispatchConsumersAsync = true
|
||||||
};
|
};
|
||||||
|
|
||||||
if (!string.IsNullOrEmpty(Configuration["EventBusUserName"]))
|
if (!string.IsNullOrEmpty(Configuration["EventBusUserName"]))
|
||||||
|
@ -101,7 +101,8 @@
|
|||||||
|
|
||||||
var factory = new ConnectionFactory()
|
var factory = new ConnectionFactory()
|
||||||
{
|
{
|
||||||
HostName = Configuration["EventBusConnection"]
|
HostName = Configuration["EventBusConnection"],
|
||||||
|
DispatchConsumersAsync = true
|
||||||
};
|
};
|
||||||
|
|
||||||
if (!string.IsNullOrEmpty(Configuration["EventBusUserName"]))
|
if (!string.IsNullOrEmpty(Configuration["EventBusUserName"]))
|
||||||
|
@ -305,7 +305,8 @@
|
|||||||
|
|
||||||
var factory = new ConnectionFactory()
|
var factory = new ConnectionFactory()
|
||||||
{
|
{
|
||||||
HostName = configuration["EventBusConnection"]
|
HostName = configuration["EventBusConnection"],
|
||||||
|
DispatchConsumersAsync = true
|
||||||
};
|
};
|
||||||
|
|
||||||
if (!string.IsNullOrEmpty(configuration["EventBusUserName"]))
|
if (!string.IsNullOrEmpty(configuration["EventBusUserName"]))
|
||||||
|
@ -68,7 +68,8 @@ namespace Ordering.BackgroundTasks
|
|||||||
|
|
||||||
var factory = new ConnectionFactory()
|
var factory = new ConnectionFactory()
|
||||||
{
|
{
|
||||||
HostName = Configuration["EventBusConnection"]
|
HostName = Configuration["EventBusConnection"],
|
||||||
|
DispatchConsumersAsync = true
|
||||||
};
|
};
|
||||||
|
|
||||||
if (!string.IsNullOrEmpty(Configuration["EventBusUserName"]))
|
if (!string.IsNullOrEmpty(Configuration["EventBusUserName"]))
|
||||||
|
@ -80,7 +80,8 @@ namespace Ordering.SignalrHub
|
|||||||
|
|
||||||
var factory = new ConnectionFactory()
|
var factory = new ConnectionFactory()
|
||||||
{
|
{
|
||||||
HostName = Configuration["EventBusConnection"]
|
HostName = Configuration["EventBusConnection"],
|
||||||
|
DispatchConsumersAsync = true
|
||||||
};
|
};
|
||||||
|
|
||||||
if (!string.IsNullOrEmpty(Configuration["EventBusUserName"]))
|
if (!string.IsNullOrEmpty(Configuration["EventBusUserName"]))
|
||||||
|
@ -58,7 +58,8 @@ namespace Payment.API
|
|||||||
var logger = sp.GetRequiredService<ILogger<DefaultRabbitMQPersistentConnection>>();
|
var logger = sp.GetRequiredService<ILogger<DefaultRabbitMQPersistentConnection>>();
|
||||||
var factory = new ConnectionFactory()
|
var factory = new ConnectionFactory()
|
||||||
{
|
{
|
||||||
HostName = Configuration["EventBusConnection"]
|
HostName = Configuration["EventBusConnection"],
|
||||||
|
DispatchConsumersAsync = true
|
||||||
};
|
};
|
||||||
|
|
||||||
if (!string.IsNullOrEmpty(Configuration["EventBusUserName"]))
|
if (!string.IsNullOrEmpty(Configuration["EventBusUserName"]))
|
||||||
|
@ -320,7 +320,8 @@ namespace Webhooks.API
|
|||||||
|
|
||||||
var factory = new ConnectionFactory()
|
var factory = new ConnectionFactory()
|
||||||
{
|
{
|
||||||
HostName = configuration["EventBusConnection"]
|
HostName = configuration["EventBusConnection"],
|
||||||
|
DispatchConsumersAsync = true
|
||||||
};
|
};
|
||||||
|
|
||||||
if (!string.IsNullOrEmpty(configuration["EventBusUserName"]))
|
if (!string.IsNullOrEmpty(configuration["EventBusUserName"]))
|
||||||
|
Loading…
x
Reference in New Issue
Block a user