Browse Source

Split RabbitMQ channel create and consumer create.

Because when channel create the Subscribe is not register, so the previously stored message may lose. This fix split the channel create and consumer create, when Subscribe registered then call the consumer create function.
pull/972/head
liubaishui 5 years ago
committed by GitHub
parent
commit
a92383e123
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 29 additions and 17 deletions
  1. +29
    -17
      src/BuildingBlocks/EventBus/EventBusRabbitMQ/EventBusRabbitMQ.cs

+ 29
- 17
src/BuildingBlocks/EventBus/EventBusRabbitMQ/EventBusRabbitMQ.cs View File

@ -98,7 +98,7 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ
channel.BasicPublish(exchange: BROKER_NAME,
routingKey: eventName,
mandatory:true,
mandatory: true,
basicProperties: properties,
body: body);
});
@ -112,6 +112,7 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ
DoInternalSubscription(eventName);
_subsManager.AddDynamicSubscription<TH>(eventName);
StartBasicConsume();
}
public void Subscribe<T, TH>()
@ -124,6 +125,7 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ
_logger.LogInformation("Subscribing to event {EventName} with {EventHandler}", eventName, typeof(TH).GetGenericTypeName());
_subsManager.AddSubscription<T, TH>();
StartBasicConsume();
}
private void DoInternalSubscription(string eventName)
@ -172,6 +174,31 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ
_subsManager.Clear();
}
private void StartBasicConsume()
{
if (_consumerChannel != null)
{
var consumer = new EventingBasicConsumer(_consumerChannel);
consumer.Received += async (model, ea) =>
{
var eventName = ea.RoutingKey;
var message = Encoding.UTF8.GetString(ea.Body);
await ProcessEvent(eventName, message);
_consumerChannel.BasicAck(ea.DeliveryTag, multiple: false);
};
_consumerChannel.BasicConsume(queue: _queueName,
autoAck: false,
consumer: consumer);
}
else
{
_logger.LogError("StartBasicConsume can not call on _consumerChannelCreated == false");
}
}
private IModel CreateConsumerChannel()
{
if (!_persistentConnection.IsConnected)
@ -190,26 +217,11 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ
autoDelete: false,
arguments: null);
var consumer = new EventingBasicConsumer(channel);
consumer.Received += async (model, ea) =>
{
var eventName = ea.RoutingKey;
var message = Encoding.UTF8.GetString(ea.Body);
await ProcessEvent(eventName, message);
channel.BasicAck(ea.DeliveryTag,multiple:false);
};
channel.BasicConsume(queue: _queueName,
autoAck: false,
consumer: consumer);
channel.CallbackException += (sender, ea) =>
{
_consumerChannel.Dispose();
_consumerChannel = CreateConsumerChannel();
StartBasicConsume();
};
return channel;


Loading…
Cancel
Save