Browse Source

Merge pull request #972 from liubaishui/dev

Split RabbitMQ channel create and consumer create.
pull/980/head
Miguel Veloso 5 years ago
committed by GitHub
parent
commit
61ba545a9b
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 29 additions and 17 deletions
  1. +29
    -17
      src/BuildingBlocks/EventBus/EventBusRabbitMQ/EventBusRabbitMQ.cs

+ 29
- 17
src/BuildingBlocks/EventBus/EventBusRabbitMQ/EventBusRabbitMQ.cs View File

@ -98,7 +98,7 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ
channel.BasicPublish(exchange: BROKER_NAME, channel.BasicPublish(exchange: BROKER_NAME,
routingKey: eventName, routingKey: eventName,
mandatory:true,
mandatory: true,
basicProperties: properties, basicProperties: properties,
body: body); body: body);
}); });
@ -112,6 +112,7 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ
DoInternalSubscription(eventName); DoInternalSubscription(eventName);
_subsManager.AddDynamicSubscription<TH>(eventName); _subsManager.AddDynamicSubscription<TH>(eventName);
StartBasicConsume();
} }
public void Subscribe<T, TH>() public void Subscribe<T, TH>()
@ -124,6 +125,7 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ
_logger.LogInformation("Subscribing to event {EventName} with {EventHandler}", eventName, typeof(TH).GetGenericTypeName()); _logger.LogInformation("Subscribing to event {EventName} with {EventHandler}", eventName, typeof(TH).GetGenericTypeName());
_subsManager.AddSubscription<T, TH>(); _subsManager.AddSubscription<T, TH>();
StartBasicConsume();
} }
private void DoInternalSubscription(string eventName) private void DoInternalSubscription(string eventName)
@ -172,6 +174,31 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ
_subsManager.Clear(); _subsManager.Clear();
} }
private void StartBasicConsume()
{
if (_consumerChannel != null)
{
var consumer = new EventingBasicConsumer(_consumerChannel);
consumer.Received += async (model, ea) =>
{
var eventName = ea.RoutingKey;
var message = Encoding.UTF8.GetString(ea.Body);
await ProcessEvent(eventName, message);
_consumerChannel.BasicAck(ea.DeliveryTag, multiple: false);
};
_consumerChannel.BasicConsume(queue: _queueName,
autoAck: false,
consumer: consumer);
}
else
{
_logger.LogError("StartBasicConsume can not call on _consumerChannelCreated == false");
}
}
private IModel CreateConsumerChannel() private IModel CreateConsumerChannel()
{ {
if (!_persistentConnection.IsConnected) if (!_persistentConnection.IsConnected)
@ -190,26 +217,11 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ
autoDelete: false, autoDelete: false,
arguments: null); arguments: null);
var consumer = new EventingBasicConsumer(channel);
consumer.Received += async (model, ea) =>
{
var eventName = ea.RoutingKey;
var message = Encoding.UTF8.GetString(ea.Body);
await ProcessEvent(eventName, message);
channel.BasicAck(ea.DeliveryTag,multiple:false);
};
channel.BasicConsume(queue: _queueName,
autoAck: false,
consumer: consumer);
channel.CallbackException += (sender, ea) => channel.CallbackException += (sender, ea) =>
{ {
_consumerChannel.Dispose(); _consumerChannel.Dispose();
_consumerChannel = CreateConsumerChannel(); _consumerChannel = CreateConsumerChannel();
StartBasicConsume();
}; };
return channel; return channel;


Loading…
Cancel
Save