Fix bug with queue names

This commit is contained in:
Unai Zorrilla Castro 2017-04-20 16:44:07 +02:00
parent 09a53f0137
commit 70f50a0fce

View File

@ -31,6 +31,7 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ
= new List<Type>(); = new List<Type>();
private IModel _consumerChannel; private IModel _consumerChannel;
private string _queueName;
public EventBusRabbitMQ(IRabbitMQPersisterConnection persisterConnection, ILogger<EventBusRabbitMQ> logger) public EventBusRabbitMQ(IRabbitMQPersisterConnection persisterConnection, ILogger<EventBusRabbitMQ> logger)
{ {
@ -93,7 +94,7 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ
using (var channel = _persisterConnection.CreateModel()) using (var channel = _persisterConnection.CreateModel())
{ {
channel.QueueBind(queue: channel.QueueDeclare().QueueName, channel.QueueBind(queue: _queueName,
exchange: BROKER_NAME, exchange: BROKER_NAME,
routingKey: eventName); routingKey: eventName);
@ -131,9 +132,16 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ
using (var channel = _persisterConnection.CreateModel()) using (var channel = _persisterConnection.CreateModel())
{ {
channel.QueueUnbind(queue: channel.QueueDeclare().QueueName, channel.QueueUnbind(queue: _queueName,
exchange: BROKER_NAME, exchange: BROKER_NAME,
routingKey: eventName); routingKey: eventName);
if (_handlers.Keys.Count == 0)
{
_queueName = string.Empty;
_consumerChannel.Close();
}
} }
} }
} }
@ -142,7 +150,11 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ
public void Dispose() public void Dispose()
{ {
_consumerChannel.Dispose(); if (_consumerChannel != null)
{
_consumerChannel.Dispose();
}
_handlers.Clear(); _handlers.Clear();
} }
@ -158,6 +170,8 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ
channel.ExchangeDeclare(exchange: BROKER_NAME, channel.ExchangeDeclare(exchange: BROKER_NAME,
type: "direct"); type: "direct");
_queueName = channel.QueueDeclare().QueueName;
var consumer = new EventingBasicConsumer(channel); var consumer = new EventingBasicConsumer(channel);
consumer.Received += async (model, ea) => consumer.Received += async (model, ea) =>
{ {
@ -167,16 +181,10 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ
await ProcessEvent(eventName, message); await ProcessEvent(eventName, message);
}; };
channel.BasicConsume(queue: channel.QueueDeclare().QueueName, channel.BasicConsume(queue: _queueName,
noAck: true, noAck: true,
consumer: consumer); consumer: consumer);
channel.ModelShutdown += (sender, ea) =>
{
_consumerChannel.Dispose();
_consumerChannel = CreateConsumerChannel();
};
channel.CallbackException += (sender, ea) => channel.CallbackException += (sender, ea) =>
{ {
_consumerChannel.Dispose(); _consumerChannel.Dispose();