|
|
@ -21,7 +21,7 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ |
|
|
|
{ |
|
|
|
const string BROKER_NAME = "eshop_event_bus"; |
|
|
|
|
|
|
|
private readonly IRabbitMQPersisterConnection _persisterConnection; |
|
|
|
private readonly IRabbitMQPersistentConnection _persistentConnection; |
|
|
|
private readonly ILogger<EventBusRabbitMQ> _logger; |
|
|
|
|
|
|
|
private readonly Dictionary<string, List<IIntegrationEventHandler>> _handlers |
|
|
@ -33,20 +33,19 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ |
|
|
|
private IModel _consumerChannel; |
|
|
|
private string _queueName; |
|
|
|
|
|
|
|
public EventBusRabbitMQ(IRabbitMQPersisterConnection persisterConnection, ILogger<EventBusRabbitMQ> logger) |
|
|
|
public EventBusRabbitMQ(IRabbitMQPersistentConnection persistentConnection, ILogger<EventBusRabbitMQ> logger) |
|
|
|
{ |
|
|
|
_persisterConnection = persisterConnection ?? throw new ArgumentNullException(nameof(persisterConnection)); |
|
|
|
_persistentConnection = persistentConnection ?? throw new ArgumentNullException(nameof(persistentConnection)); |
|
|
|
_logger = logger ?? throw new ArgumentNullException(nameof(logger)); |
|
|
|
|
|
|
|
_consumerChannel = CreateConsumerChannel(); |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
public void Publish(IntegrationEvent @event) |
|
|
|
{ |
|
|
|
if (!_persisterConnection.IsConnected) |
|
|
|
if (!_persistentConnection.IsConnected) |
|
|
|
{ |
|
|
|
_persisterConnection.TryConnect(); |
|
|
|
_persistentConnection.TryConnect(); |
|
|
|
} |
|
|
|
|
|
|
|
var policy = RetryPolicy.Handle<BrokerUnreachableException>() |
|
|
@ -56,7 +55,7 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ |
|
|
|
_logger.LogWarning(ex.ToString()); |
|
|
|
}); |
|
|
|
|
|
|
|
using (var channel = _persisterConnection.CreateModel()) |
|
|
|
using (var channel = _persistentConnection.CreateModel()) |
|
|
|
{ |
|
|
|
var eventName = @event.GetType() |
|
|
|
.Name; |
|
|
@ -87,12 +86,12 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ |
|
|
|
} |
|
|
|
else |
|
|
|
{ |
|
|
|
if (!_persisterConnection.IsConnected) |
|
|
|
if (!_persistentConnection.IsConnected) |
|
|
|
{ |
|
|
|
_persisterConnection.TryConnect(); |
|
|
|
_persistentConnection.TryConnect(); |
|
|
|
} |
|
|
|
|
|
|
|
using (var channel = _persisterConnection.CreateModel()) |
|
|
|
using (var channel = _persistentConnection.CreateModel()) |
|
|
|
{ |
|
|
|
channel.QueueBind(queue: _queueName, |
|
|
|
exchange: BROKER_NAME, |
|
|
@ -125,12 +124,12 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ |
|
|
|
{ |
|
|
|
_eventTypes.Remove(eventType); |
|
|
|
|
|
|
|
if (!_persisterConnection.IsConnected) |
|
|
|
if (!_persistentConnection.IsConnected) |
|
|
|
{ |
|
|
|
_persisterConnection.TryConnect(); |
|
|
|
_persistentConnection.TryConnect(); |
|
|
|
} |
|
|
|
|
|
|
|
using (var channel = _persisterConnection.CreateModel()) |
|
|
|
using (var channel = _persistentConnection.CreateModel()) |
|
|
|
{ |
|
|
|
channel.QueueUnbind(queue: _queueName, |
|
|
|
exchange: BROKER_NAME, |
|
|
@ -160,12 +159,12 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ |
|
|
|
|
|
|
|
private IModel CreateConsumerChannel() |
|
|
|
{ |
|
|
|
if (!_persisterConnection.IsConnected) |
|
|
|
if (!_persistentConnection.IsConnected) |
|
|
|
{ |
|
|
|
_persisterConnection.TryConnect(); |
|
|
|
_persistentConnection.TryConnect(); |
|
|
|
} |
|
|
|
|
|
|
|
var channel = _persisterConnection.CreateModel(); |
|
|
|
var channel = _persistentConnection.CreateModel(); |
|
|
|
|
|
|
|
channel.ExchangeDeclare(exchange: BROKER_NAME, |
|
|
|
type: "direct"); |
|
|
|