Refactoring to better terms
This commit is contained in:
parent
ab45bb9772
commit
6f8cd174c7
@ -10,18 +10,18 @@ using System.Net.Sockets;
|
|||||||
|
|
||||||
namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ
|
namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ
|
||||||
{
|
{
|
||||||
public class DefaultRabbitMQPersisterConnection
|
public class DefaultRabbitMQPersistentConnection
|
||||||
: IRabbitMQPersisterConnection
|
: IRabbitMQPersistentConnection
|
||||||
{
|
{
|
||||||
private readonly IConnectionFactory _connectionFactory;
|
private readonly IConnectionFactory _connectionFactory;
|
||||||
private readonly ILogger<DefaultRabbitMQPersisterConnection> _logger;
|
private readonly ILogger<DefaultRabbitMQPersistentConnection> _logger;
|
||||||
|
|
||||||
IConnection _connection;
|
IConnection _connection;
|
||||||
bool _disposed;
|
bool _disposed;
|
||||||
|
|
||||||
object sync_root = new object();
|
object sync_root = new object();
|
||||||
|
|
||||||
public DefaultRabbitMQPersisterConnection(IConnectionFactory connectionFactory,ILogger<DefaultRabbitMQPersisterConnection> logger)
|
public DefaultRabbitMQPersistentConnection(IConnectionFactory connectionFactory,ILogger<DefaultRabbitMQPersistentConnection> logger)
|
||||||
{
|
{
|
||||||
_connectionFactory = connectionFactory ?? throw new ArgumentNullException(nameof(connectionFactory));
|
_connectionFactory = connectionFactory ?? throw new ArgumentNullException(nameof(connectionFactory));
|
||||||
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
|
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
|
||||||
@ -87,13 +87,13 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ
|
|||||||
_connection.CallbackException += OnCallbackException;
|
_connection.CallbackException += OnCallbackException;
|
||||||
_connection.ConnectionBlocked += OnConnectionBlocked;
|
_connection.ConnectionBlocked += OnConnectionBlocked;
|
||||||
|
|
||||||
_logger.LogInformation($"RabbitMQ persister connection acquire a connection {_connection.Endpoint.HostName} and is subscribed to failure events");
|
_logger.LogInformation($"RabbitMQ persistent connection acquired a connection {_connection.Endpoint.HostName} and is subscribed to failure events");
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
_logger.LogCritical("FATAL ERROR: RabbitMQ connections can't be created and opened");
|
_logger.LogCritical("FATAL ERROR: RabbitMQ connections could not be created and opened");
|
||||||
|
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
@ -21,7 +21,7 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ
|
|||||||
{
|
{
|
||||||
const string BROKER_NAME = "eshop_event_bus";
|
const string BROKER_NAME = "eshop_event_bus";
|
||||||
|
|
||||||
private readonly IRabbitMQPersisterConnection _persisterConnection;
|
private readonly IRabbitMQPersistentConnection _persistentConnection;
|
||||||
private readonly ILogger<EventBusRabbitMQ> _logger;
|
private readonly ILogger<EventBusRabbitMQ> _logger;
|
||||||
|
|
||||||
private readonly Dictionary<string, List<IIntegrationEventHandler>> _handlers
|
private readonly Dictionary<string, List<IIntegrationEventHandler>> _handlers
|
||||||
@ -33,20 +33,19 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ
|
|||||||
private IModel _consumerChannel;
|
private IModel _consumerChannel;
|
||||||
private string _queueName;
|
private string _queueName;
|
||||||
|
|
||||||
public EventBusRabbitMQ(IRabbitMQPersisterConnection persisterConnection, ILogger<EventBusRabbitMQ> logger)
|
public EventBusRabbitMQ(IRabbitMQPersistentConnection persistentConnection, ILogger<EventBusRabbitMQ> logger)
|
||||||
{
|
{
|
||||||
_persisterConnection = persisterConnection ?? throw new ArgumentNullException(nameof(persisterConnection));
|
_persistentConnection = persistentConnection ?? throw new ArgumentNullException(nameof(persistentConnection));
|
||||||
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
|
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
|
||||||
|
|
||||||
_consumerChannel = CreateConsumerChannel();
|
_consumerChannel = CreateConsumerChannel();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
public void Publish(IntegrationEvent @event)
|
public void Publish(IntegrationEvent @event)
|
||||||
{
|
{
|
||||||
if (!_persisterConnection.IsConnected)
|
if (!_persistentConnection.IsConnected)
|
||||||
{
|
{
|
||||||
_persisterConnection.TryConnect();
|
_persistentConnection.TryConnect();
|
||||||
}
|
}
|
||||||
|
|
||||||
var policy = RetryPolicy.Handle<BrokerUnreachableException>()
|
var policy = RetryPolicy.Handle<BrokerUnreachableException>()
|
||||||
@ -56,7 +55,7 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ
|
|||||||
_logger.LogWarning(ex.ToString());
|
_logger.LogWarning(ex.ToString());
|
||||||
});
|
});
|
||||||
|
|
||||||
using (var channel = _persisterConnection.CreateModel())
|
using (var channel = _persistentConnection.CreateModel())
|
||||||
{
|
{
|
||||||
var eventName = @event.GetType()
|
var eventName = @event.GetType()
|
||||||
.Name;
|
.Name;
|
||||||
@ -87,12 +86,12 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ
|
|||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
if (!_persisterConnection.IsConnected)
|
if (!_persistentConnection.IsConnected)
|
||||||
{
|
{
|
||||||
_persisterConnection.TryConnect();
|
_persistentConnection.TryConnect();
|
||||||
}
|
}
|
||||||
|
|
||||||
using (var channel = _persisterConnection.CreateModel())
|
using (var channel = _persistentConnection.CreateModel())
|
||||||
{
|
{
|
||||||
channel.QueueBind(queue: _queueName,
|
channel.QueueBind(queue: _queueName,
|
||||||
exchange: BROKER_NAME,
|
exchange: BROKER_NAME,
|
||||||
@ -125,12 +124,12 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ
|
|||||||
{
|
{
|
||||||
_eventTypes.Remove(eventType);
|
_eventTypes.Remove(eventType);
|
||||||
|
|
||||||
if (!_persisterConnection.IsConnected)
|
if (!_persistentConnection.IsConnected)
|
||||||
{
|
{
|
||||||
_persisterConnection.TryConnect();
|
_persistentConnection.TryConnect();
|
||||||
}
|
}
|
||||||
|
|
||||||
using (var channel = _persisterConnection.CreateModel())
|
using (var channel = _persistentConnection.CreateModel())
|
||||||
{
|
{
|
||||||
channel.QueueUnbind(queue: _queueName,
|
channel.QueueUnbind(queue: _queueName,
|
||||||
exchange: BROKER_NAME,
|
exchange: BROKER_NAME,
|
||||||
@ -160,12 +159,12 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ
|
|||||||
|
|
||||||
private IModel CreateConsumerChannel()
|
private IModel CreateConsumerChannel()
|
||||||
{
|
{
|
||||||
if (!_persisterConnection.IsConnected)
|
if (!_persistentConnection.IsConnected)
|
||||||
{
|
{
|
||||||
_persisterConnection.TryConnect();
|
_persistentConnection.TryConnect();
|
||||||
}
|
}
|
||||||
|
|
||||||
var channel = _persisterConnection.CreateModel();
|
var channel = _persistentConnection.CreateModel();
|
||||||
|
|
||||||
channel.ExchangeDeclare(exchange: BROKER_NAME,
|
channel.ExchangeDeclare(exchange: BROKER_NAME,
|
||||||
type: "direct");
|
type: "direct");
|
||||||
|
@ -3,8 +3,7 @@ using System;
|
|||||||
|
|
||||||
namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ
|
namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ
|
||||||
{
|
{
|
||||||
|
public interface IRabbitMQPersistentConnection
|
||||||
public interface IRabbitMQPersisterConnection
|
|
||||||
: IDisposable
|
: IDisposable
|
||||||
{
|
{
|
||||||
bool IsConnected { get; }
|
bool IsConnected { get; }
|
||||||
|
@ -68,16 +68,16 @@ namespace Microsoft.eShopOnContainers.Services.Basket.API
|
|||||||
});
|
});
|
||||||
|
|
||||||
|
|
||||||
services.AddSingleton<IRabbitMQPersisterConnection>(sp =>
|
services.AddSingleton<IRabbitMQPersistentConnection>(sp =>
|
||||||
{
|
{
|
||||||
var settings = sp.GetRequiredService<IOptions<BasketSettings>>().Value;
|
var settings = sp.GetRequiredService<IOptions<BasketSettings>>().Value;
|
||||||
var logger = sp.GetRequiredService<ILogger<DefaultRabbitMQPersisterConnection>>();
|
var logger = sp.GetRequiredService<ILogger<DefaultRabbitMQPersistentConnection>>();
|
||||||
var factory = new ConnectionFactory()
|
var factory = new ConnectionFactory()
|
||||||
{
|
{
|
||||||
HostName = settings.EventBusConnection
|
HostName = settings.EventBusConnection
|
||||||
};
|
};
|
||||||
|
|
||||||
return new DefaultRabbitMQPersisterConnection(factory, logger);
|
return new DefaultRabbitMQPersistentConnection(factory, logger);
|
||||||
});
|
});
|
||||||
|
|
||||||
services.AddSingleton<IEventBus, EventBusRabbitMQ>();
|
services.AddSingleton<IEventBus, EventBusRabbitMQ>();
|
||||||
|
@ -103,16 +103,16 @@
|
|||||||
|
|
||||||
services.AddTransient<ICatalogIntegrationEventService, CatalogIntegrationEventService>();
|
services.AddTransient<ICatalogIntegrationEventService, CatalogIntegrationEventService>();
|
||||||
|
|
||||||
services.AddSingleton<IRabbitMQPersisterConnection>(sp =>
|
services.AddSingleton<IRabbitMQPersistentConnection>(sp =>
|
||||||
{
|
{
|
||||||
var settings = sp.GetRequiredService<IOptions<CatalogSettings>>().Value;
|
var settings = sp.GetRequiredService<IOptions<CatalogSettings>>().Value;
|
||||||
var logger = sp.GetRequiredService<ILogger<DefaultRabbitMQPersisterConnection>>();
|
var logger = sp.GetRequiredService<ILogger<DefaultRabbitMQPersistentConnection>>();
|
||||||
var factory = new ConnectionFactory()
|
var factory = new ConnectionFactory()
|
||||||
{
|
{
|
||||||
HostName = settings.EventBusConnection
|
HostName = settings.EventBusConnection
|
||||||
};
|
};
|
||||||
|
|
||||||
return new DefaultRabbitMQPersisterConnection(factory, logger);
|
return new DefaultRabbitMQPersistentConnection(factory, logger);
|
||||||
});
|
});
|
||||||
|
|
||||||
services.AddSingleton<IEventBus, EventBusRabbitMQ>();
|
services.AddSingleton<IEventBus, EventBusRabbitMQ>();
|
||||||
|
@ -107,16 +107,16 @@
|
|||||||
var serviceProvider = services.BuildServiceProvider();
|
var serviceProvider = services.BuildServiceProvider();
|
||||||
services.AddTransient<IOrderingIntegrationEventService, OrderingIntegrationEventService>();
|
services.AddTransient<IOrderingIntegrationEventService, OrderingIntegrationEventService>();
|
||||||
|
|
||||||
services.AddSingleton<IRabbitMQPersisterConnection>(sp =>
|
services.AddSingleton<IRabbitMQPersistentConnection>(sp =>
|
||||||
{
|
{
|
||||||
var logger = sp.GetRequiredService<ILogger<DefaultRabbitMQPersisterConnection>>();
|
var logger = sp.GetRequiredService<ILogger<DefaultRabbitMQPersistentConnection>>();
|
||||||
|
|
||||||
var factory = new ConnectionFactory()
|
var factory = new ConnectionFactory()
|
||||||
{
|
{
|
||||||
HostName = Configuration["EventBusConnection"]
|
HostName = Configuration["EventBusConnection"]
|
||||||
};
|
};
|
||||||
|
|
||||||
return new DefaultRabbitMQPersisterConnection(factory, logger);
|
return new DefaultRabbitMQPersistentConnection(factory, logger);
|
||||||
});
|
});
|
||||||
|
|
||||||
services.AddSingleton<IEventBus, EventBusRabbitMQ>();
|
services.AddSingleton<IEventBus, EventBusRabbitMQ>();
|
||||||
|
Loading…
x
Reference in New Issue
Block a user