diff --git a/src/BuildingBlocks/EventBus/EventBusRabbitMQ/EventBusRabbitMQ.cs b/src/BuildingBlocks/EventBus/EventBusRabbitMQ/EventBusRabbitMQ.cs index cbdb233f8..e7a493c10 100644 --- a/src/BuildingBlocks/EventBus/EventBusRabbitMQ/EventBusRabbitMQ.cs +++ b/src/BuildingBlocks/EventBus/EventBusRabbitMQ/EventBusRabbitMQ.cs @@ -31,6 +31,7 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ = new List(); private IModel _consumerChannel; + private string _queueName; public EventBusRabbitMQ(IRabbitMQPersisterConnection persisterConnection, ILogger logger) { @@ -93,7 +94,7 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ using (var channel = _persisterConnection.CreateModel()) { - channel.QueueBind(queue: channel.QueueDeclare().QueueName, + channel.QueueBind(queue: _queueName, exchange: BROKER_NAME, routingKey: eventName); @@ -131,9 +132,16 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ using (var channel = _persisterConnection.CreateModel()) { - channel.QueueUnbind(queue: channel.QueueDeclare().QueueName, + channel.QueueUnbind(queue: _queueName, exchange: BROKER_NAME, routingKey: eventName); + + if (_handlers.Keys.Count == 0) + { + _queueName = string.Empty; + + _consumerChannel.Close(); + } } } } @@ -142,7 +150,11 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ public void Dispose() { - _consumerChannel.Dispose(); + if (_consumerChannel != null) + { + _consumerChannel.Dispose(); + } + _handlers.Clear(); } @@ -158,6 +170,8 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ channel.ExchangeDeclare(exchange: BROKER_NAME, type: "direct"); + _queueName = channel.QueueDeclare().QueueName; + var consumer = new EventingBasicConsumer(channel); consumer.Received += async (model, ea) => { @@ -167,16 +181,10 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ await ProcessEvent(eventName, message); }; - channel.BasicConsume(queue: channel.QueueDeclare().QueueName, + channel.BasicConsume(queue: _queueName, noAck: true, consumer: consumer); - channel.ModelShutdown += (sender, ea) => - { - _consumerChannel.Dispose(); - _consumerChannel = CreateConsumerChannel(); - }; - channel.CallbackException += (sender, ea) => { _consumerChannel.Dispose();