From a92383e123b9e2abb7971442dc708ffc60514552 Mon Sep 17 00:00:00 2001 From: liubaishui Date: Fri, 15 Mar 2019 21:03:42 +0800 Subject: [PATCH] Split RabbitMQ channel create and consumer create. Because when channel create the Subscribe is not register, so the previously stored message may lose. This fix split the channel create and consumer create, when Subscribe registered then call the consumer create function. --- .../EventBusRabbitMQ/EventBusRabbitMQ.cs | 46 ++++++++++++------- 1 file changed, 29 insertions(+), 17 deletions(-) diff --git a/src/BuildingBlocks/EventBus/EventBusRabbitMQ/EventBusRabbitMQ.cs b/src/BuildingBlocks/EventBus/EventBusRabbitMQ/EventBusRabbitMQ.cs index ac379d50a..9044a4283 100644 --- a/src/BuildingBlocks/EventBus/EventBusRabbitMQ/EventBusRabbitMQ.cs +++ b/src/BuildingBlocks/EventBus/EventBusRabbitMQ/EventBusRabbitMQ.cs @@ -98,7 +98,7 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ channel.BasicPublish(exchange: BROKER_NAME, routingKey: eventName, - mandatory:true, + mandatory: true, basicProperties: properties, body: body); }); @@ -112,6 +112,7 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ DoInternalSubscription(eventName); _subsManager.AddDynamicSubscription(eventName); + StartBasicConsume(); } public void Subscribe() @@ -124,6 +125,7 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ _logger.LogInformation("Subscribing to event {EventName} with {EventHandler}", eventName, typeof(TH).GetGenericTypeName()); _subsManager.AddSubscription(); + StartBasicConsume(); } private void DoInternalSubscription(string eventName) @@ -172,6 +174,31 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ _subsManager.Clear(); } + private void StartBasicConsume() + { + if (_consumerChannel != null) + { + var consumer = new EventingBasicConsumer(_consumerChannel); + consumer.Received += async (model, ea) => + { + var eventName = ea.RoutingKey; + var message = Encoding.UTF8.GetString(ea.Body); + + await ProcessEvent(eventName, message); + + _consumerChannel.BasicAck(ea.DeliveryTag, multiple: false); + }; + + _consumerChannel.BasicConsume(queue: _queueName, + autoAck: false, + consumer: consumer); + } + else + { + _logger.LogError("StartBasicConsume can not call on _consumerChannelCreated == false"); + } + } + private IModel CreateConsumerChannel() { if (!_persistentConnection.IsConnected) @@ -190,26 +217,11 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ autoDelete: false, arguments: null); - - var consumer = new EventingBasicConsumer(channel); - consumer.Received += async (model, ea) => - { - var eventName = ea.RoutingKey; - var message = Encoding.UTF8.GetString(ea.Body); - - await ProcessEvent(eventName, message); - - channel.BasicAck(ea.DeliveryTag,multiple:false); - }; - - channel.BasicConsume(queue: _queueName, - autoAck: false, - consumer: consumer); - channel.CallbackException += (sender, ea) => { _consumerChannel.Dispose(); _consumerChannel = CreateConsumerChannel(); + StartBasicConsume(); }; return channel;