diff --git a/src/BuildingBlocks/EventBus/EventBusRabbitMQ/EventBusRabbitMQ.cs b/src/BuildingBlocks/EventBus/EventBusRabbitMQ/EventBusRabbitMQ.cs index ac379d50a..9044a4283 100644 --- a/src/BuildingBlocks/EventBus/EventBusRabbitMQ/EventBusRabbitMQ.cs +++ b/src/BuildingBlocks/EventBus/EventBusRabbitMQ/EventBusRabbitMQ.cs @@ -98,7 +98,7 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ channel.BasicPublish(exchange: BROKER_NAME, routingKey: eventName, - mandatory:true, + mandatory: true, basicProperties: properties, body: body); }); @@ -112,6 +112,7 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ DoInternalSubscription(eventName); _subsManager.AddDynamicSubscription(eventName); + StartBasicConsume(); } public void Subscribe() @@ -124,6 +125,7 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ _logger.LogInformation("Subscribing to event {EventName} with {EventHandler}", eventName, typeof(TH).GetGenericTypeName()); _subsManager.AddSubscription(); + StartBasicConsume(); } private void DoInternalSubscription(string eventName) @@ -172,6 +174,31 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ _subsManager.Clear(); } + private void StartBasicConsume() + { + if (_consumerChannel != null) + { + var consumer = new EventingBasicConsumer(_consumerChannel); + consumer.Received += async (model, ea) => + { + var eventName = ea.RoutingKey; + var message = Encoding.UTF8.GetString(ea.Body); + + await ProcessEvent(eventName, message); + + _consumerChannel.BasicAck(ea.DeliveryTag, multiple: false); + }; + + _consumerChannel.BasicConsume(queue: _queueName, + autoAck: false, + consumer: consumer); + } + else + { + _logger.LogError("StartBasicConsume can not call on _consumerChannelCreated == false"); + } + } + private IModel CreateConsumerChannel() { if (!_persistentConnection.IsConnected) @@ -190,26 +217,11 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ autoDelete: false, arguments: null); - - var consumer = new EventingBasicConsumer(channel); - consumer.Received += async (model, ea) => - { - var eventName = ea.RoutingKey; - var message = Encoding.UTF8.GetString(ea.Body); - - await ProcessEvent(eventName, message); - - channel.BasicAck(ea.DeliveryTag,multiple:false); - }; - - channel.BasicConsume(queue: _queueName, - autoAck: false, - consumer: consumer); - channel.CallbackException += (sender, ea) => { _consumerChannel.Dispose(); _consumerChannel = CreateConsumerChannel(); + StartBasicConsume(); }; return channel;