diff --git a/src/BuildingBlocks/EventBus/EventBusServiceBus/DefaultServiceBusPersisterConnection.cs b/src/BuildingBlocks/EventBus/EventBusServiceBus/DefaultServiceBusPersisterConnection.cs index 881c7668e..0dff4154b 100644 --- a/src/BuildingBlocks/EventBus/EventBusServiceBus/DefaultServiceBusPersisterConnection.cs +++ b/src/BuildingBlocks/EventBus/EventBusServiceBus/DefaultServiceBusPersisterConnection.cs @@ -1,27 +1,51 @@ using Microsoft.Azure.ServiceBus; -using Microsoft.Extensions.Logging; using System; namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusServiceBus { public class DefaultServiceBusPersisterConnection : IServiceBusPersisterConnection { - private readonly ILogger _logger; private readonly ServiceBusConnectionStringBuilder _serviceBusConnectionStringBuilder; + private readonly string _subscriptionClientName; + private SubscriptionClient _subscriptionClient; private ITopicClient _topicClient; bool _disposed; public DefaultServiceBusPersisterConnection(ServiceBusConnectionStringBuilder serviceBusConnectionStringBuilder, - ILogger logger) + string subscriptionClientName) { - _logger = logger ?? throw new ArgumentNullException(nameof(logger)); - _serviceBusConnectionStringBuilder = serviceBusConnectionStringBuilder ?? throw new ArgumentNullException(nameof(serviceBusConnectionStringBuilder)); + _subscriptionClientName = subscriptionClientName; + _subscriptionClient = new SubscriptionClient(_serviceBusConnectionStringBuilder, subscriptionClientName); _topicClient = new TopicClient(_serviceBusConnectionStringBuilder, RetryPolicy.Default); } + public ITopicClient TopicClient + { + get + { + if (_topicClient.IsClosedOrClosing) + { + _topicClient = new TopicClient(_serviceBusConnectionStringBuilder, RetryPolicy.Default); + } + return _topicClient; + } + } + + public ISubscriptionClient SubscriptionClient + { + get + { + if (_subscriptionClient.IsClosedOrClosing) + { + _subscriptionClient = new SubscriptionClient(_serviceBusConnectionStringBuilder, _subscriptionClientName); + } + return _subscriptionClient; + } + } + public ServiceBusConnectionStringBuilder ServiceBusConnectionStringBuilder => _serviceBusConnectionStringBuilder; public ITopicClient CreateModel() diff --git a/src/BuildingBlocks/EventBus/EventBusServiceBus/EventBusServiceBus.cs b/src/BuildingBlocks/EventBus/EventBusServiceBus/EventBusServiceBus.cs index 53ae33ae0..841c303f7 100644 --- a/src/BuildingBlocks/EventBus/EventBusServiceBus/EventBusServiceBus.cs +++ b/src/BuildingBlocks/EventBus/EventBusServiceBus/EventBusServiceBus.cs @@ -17,21 +17,16 @@ private readonly IServiceBusPersisterConnection _serviceBusPersisterConnection; private readonly ILogger _logger; private readonly IEventBusSubscriptionsManager _subsManager; - private readonly SubscriptionClient _subscriptionClient; private readonly ILifetimeScope _autofac; private readonly string AUTOFAC_SCOPE_NAME = "eshop_event_bus"; private const string INTEGRATION_EVENT_SUFFIX = "IntegrationEvent"; public EventBusServiceBus(IServiceBusPersisterConnection serviceBusPersisterConnection, - ILogger logger, IEventBusSubscriptionsManager subsManager, string subscriptionClientName, - ILifetimeScope autofac) + ILogger logger, IEventBusSubscriptionsManager subsManager, ILifetimeScope autofac) { _serviceBusPersisterConnection = serviceBusPersisterConnection; _logger = logger ?? throw new ArgumentNullException(nameof(logger)); _subsManager = subsManager ?? new InMemoryEventBusSubscriptionsManager(); - - _subscriptionClient = new SubscriptionClient(serviceBusPersisterConnection.ServiceBusConnectionStringBuilder, - subscriptionClientName); _autofac = autofac; RemoveDefaultRule(); @@ -51,9 +46,7 @@ Label = eventName, }; - var topicClient = _serviceBusPersisterConnection.CreateModel(); - - topicClient.SendAsync(message) + _serviceBusPersisterConnection.TopicClient.SendAsync(message) .GetAwaiter() .GetResult(); } @@ -77,7 +70,7 @@ { try { - _subscriptionClient.AddRuleAsync(new RuleDescription + _serviceBusPersisterConnection.SubscriptionClient.AddRuleAsync(new RuleDescription { Filter = new CorrelationFilter { Label = eventName }, Name = eventName @@ -102,10 +95,11 @@ try { - _subscriptionClient - .RemoveRuleAsync(eventName) - .GetAwaiter() - .GetResult(); + _serviceBusPersisterConnection + .SubscriptionClient + .RemoveRuleAsync(eventName) + .GetAwaiter() + .GetResult(); } catch (MessagingEntityNotFoundException) { @@ -132,7 +126,7 @@ private void RegisterSubscriptionClientMessageHandler() { - _subscriptionClient.RegisterMessageHandler( + _serviceBusPersisterConnection.SubscriptionClient.RegisterMessageHandler( async (message, token) => { var eventName = $"{message.Label}{INTEGRATION_EVENT_SUFFIX}"; @@ -141,7 +135,7 @@ // Complete the message so that it is not received again. if (await ProcessEvent(eventName, messageData)) { - await _subscriptionClient.CompleteAsync(message.SystemProperties.LockToken); + await _serviceBusPersisterConnection.SubscriptionClient.CompleteAsync(message.SystemProperties.LockToken); } }, new MessageHandlerOptions(ExceptionReceivedHandler) { MaxConcurrentCalls = 10, AutoComplete = false }); @@ -194,10 +188,11 @@ { try { - _subscriptionClient - .RemoveRuleAsync(RuleDescription.DefaultRuleName) - .GetAwaiter() - .GetResult(); + _serviceBusPersisterConnection + .SubscriptionClient + .RemoveRuleAsync(RuleDescription.DefaultRuleName) + .GetAwaiter() + .GetResult(); } catch (MessagingEntityNotFoundException) { diff --git a/src/BuildingBlocks/EventBus/EventBusServiceBus/IServiceBusPersisterConnection.cs b/src/BuildingBlocks/EventBus/EventBusServiceBus/IServiceBusPersisterConnection.cs index 52737cef7..8863db62e 100644 --- a/src/BuildingBlocks/EventBus/EventBusServiceBus/IServiceBusPersisterConnection.cs +++ b/src/BuildingBlocks/EventBus/EventBusServiceBus/IServiceBusPersisterConnection.cs @@ -5,8 +5,7 @@ public interface IServiceBusPersisterConnection : IDisposable { - ServiceBusConnectionStringBuilder ServiceBusConnectionStringBuilder { get; } - - ITopicClient CreateModel(); + ITopicClient TopicClient { get; } + ISubscriptionClient SubscriptionClient { get; } } } \ No newline at end of file diff --git a/src/Services/Basket/Basket.API/Startup.cs b/src/Services/Basket/Basket.API/Startup.cs index daa4e8342..2f53cbcb0 100644 --- a/src/Services/Basket/Basket.API/Startup.cs +++ b/src/Services/Basket/Basket.API/Startup.cs @@ -121,12 +121,11 @@ namespace Microsoft.eShopOnContainers.Services.Basket.API { services.AddSingleton(sp => { - var logger = sp.GetRequiredService>(); - var serviceBusConnectionString = Configuration["EventBusConnection"]; var serviceBusConnection = new ServiceBusConnectionStringBuilder(serviceBusConnectionString); - return new DefaultServiceBusPersisterConnection(serviceBusConnection, logger); + var subscriptionClientName = Configuration["SubscriptionClientName"]; + return new DefaultServiceBusPersisterConnection(serviceBusConnection, subscriptionClientName); }); } else @@ -278,8 +277,6 @@ namespace Microsoft.eShopOnContainers.Services.Basket.API private void RegisterEventBus(IServiceCollection services) { - var subscriptionClientName = Configuration["SubscriptionClientName"]; - if (Configuration.GetValue("AzureServiceBusEnabled")) { services.AddSingleton(sp => @@ -290,13 +287,14 @@ namespace Microsoft.eShopOnContainers.Services.Basket.API var eventBusSubcriptionsManager = sp.GetRequiredService(); return new EventBusServiceBus(serviceBusPersisterConnection, logger, - eventBusSubcriptionsManager, subscriptionClientName, iLifetimeScope); + eventBusSubcriptionsManager, iLifetimeScope); }); } else { services.AddSingleton(sp => { + var subscriptionClientName = Configuration["SubscriptionClientName"]; var rabbitMQPersistentConnection = sp.GetRequiredService(); var iLifetimeScope = sp.GetRequiredService(); var logger = sp.GetRequiredService>(); diff --git a/src/Services/Catalog/Catalog.API/Startup.cs b/src/Services/Catalog/Catalog.API/Startup.cs index 5747f83b4..557183e38 100644 --- a/src/Services/Catalog/Catalog.API/Startup.cs +++ b/src/Services/Catalog/Catalog.API/Startup.cs @@ -280,11 +280,10 @@ namespace Microsoft.eShopOnContainers.Services.Catalog.API services.AddSingleton(sp => { var settings = sp.GetRequiredService>().Value; - var logger = sp.GetRequiredService>(); - var serviceBusConnection = new ServiceBusConnectionStringBuilder(settings.EventBusConnection); + var subscriptionClientName = configuration["SubscriptionClientName"]; - return new DefaultServiceBusPersisterConnection(serviceBusConnection, logger); + return new DefaultServiceBusPersisterConnection(serviceBusConnection, subscriptionClientName); }); } else @@ -325,8 +324,6 @@ namespace Microsoft.eShopOnContainers.Services.Catalog.API public static IServiceCollection AddEventBus(this IServiceCollection services, IConfiguration configuration) { - var subscriptionClientName = configuration["SubscriptionClientName"]; - if (configuration.GetValue("AzureServiceBusEnabled")) { services.AddSingleton(sp => @@ -337,7 +334,7 @@ namespace Microsoft.eShopOnContainers.Services.Catalog.API var eventBusSubcriptionsManager = sp.GetRequiredService(); return new EventBusServiceBus(serviceBusPersisterConnection, logger, - eventBusSubcriptionsManager, subscriptionClientName, iLifetimeScope); + eventBusSubcriptionsManager, iLifetimeScope); }); } @@ -345,6 +342,7 @@ namespace Microsoft.eShopOnContainers.Services.Catalog.API { services.AddSingleton(sp => { + var subscriptionClientName = configuration["SubscriptionClientName"]; var rabbitMQPersistentConnection = sp.GetRequiredService(); var iLifetimeScope = sp.GetRequiredService(); var logger = sp.GetRequiredService>(); diff --git a/src/Services/Ordering/Ordering.API/Startup.cs b/src/Services/Ordering/Ordering.API/Startup.cs index 445d0b873..d55876203 100644 --- a/src/Services/Ordering/Ordering.API/Startup.cs +++ b/src/Services/Ordering/Ordering.API/Startup.cs @@ -297,12 +297,11 @@ { services.AddSingleton(sp => { - var logger = sp.GetRequiredService>(); - var serviceBusConnectionString = configuration["EventBusConnection"]; var serviceBusConnection = new ServiceBusConnectionStringBuilder(serviceBusConnectionString); + var subscriptionClientName = configuration["SubscriptionClientName"]; - return new DefaultServiceBusPersisterConnection(serviceBusConnection, logger); + return new DefaultServiceBusPersisterConnection(serviceBusConnection, subscriptionClientName); }); } else @@ -368,8 +367,6 @@ public static IServiceCollection AddEventBus(this IServiceCollection services, IConfiguration configuration) { - var subscriptionClientName = configuration["SubscriptionClientName"]; - if (configuration.GetValue("AzureServiceBusEnabled")) { services.AddSingleton(sp => @@ -380,13 +377,14 @@ var eventBusSubcriptionsManager = sp.GetRequiredService(); return new EventBusServiceBus(serviceBusPersisterConnection, logger, - eventBusSubcriptionsManager, subscriptionClientName, iLifetimeScope); + eventBusSubcriptionsManager, iLifetimeScope); }); } else { services.AddSingleton(sp => { + var subscriptionClientName = configuration["SubscriptionClientName"]; var rabbitMQPersistentConnection = sp.GetRequiredService(); var iLifetimeScope = sp.GetRequiredService(); var logger = sp.GetRequiredService>(); diff --git a/src/Services/Ordering/Ordering.BackgroundTasks/Extensions/CustomExtensionMethods.cs b/src/Services/Ordering/Ordering.BackgroundTasks/Extensions/CustomExtensionMethods.cs index 82ffae84f..ffdf68161 100644 --- a/src/Services/Ordering/Ordering.BackgroundTasks/Extensions/CustomExtensionMethods.cs +++ b/src/Services/Ordering/Ordering.BackgroundTasks/Extensions/CustomExtensionMethods.cs @@ -53,12 +53,10 @@ namespace Ordering.BackgroundTasks.Extensions { services.AddSingleton(sp => { - var logger = sp.GetRequiredService>(); - var serviceBusConnectionString = configuration["EventBusConnection"]; var serviceBusConnection = new ServiceBusConnectionStringBuilder(serviceBusConnectionString); - return new DefaultServiceBusPersisterConnection(serviceBusConnection, logger); + return new DefaultServiceBusPersisterConnection(serviceBusConnection, subscriptionClientName); }); services.AddSingleton(sp => @@ -68,7 +66,7 @@ namespace Ordering.BackgroundTasks.Extensions var logger = sp.GetRequiredService>(); var eventBusSubcriptionsManager = sp.GetRequiredService(); - return new EventBusServiceBus(serviceBusPersisterConnection, logger, eventBusSubcriptionsManager, subscriptionClientName, iLifetimeScope); + return new EventBusServiceBus(serviceBusPersisterConnection, logger, eventBusSubcriptionsManager, iLifetimeScope); }); } else diff --git a/src/Services/Ordering/Ordering.SignalrHub/Startup.cs b/src/Services/Ordering/Ordering.SignalrHub/Startup.cs index 11f239119..a4a5d08c7 100644 --- a/src/Services/Ordering/Ordering.SignalrHub/Startup.cs +++ b/src/Services/Ordering/Ordering.SignalrHub/Startup.cs @@ -64,12 +64,12 @@ namespace Ordering.SignalrHub { services.AddSingleton(sp => { - var logger = sp.GetRequiredService>(); - var serviceBusConnectionString = Configuration["EventBusConnection"]; var serviceBusConnection = new ServiceBusConnectionStringBuilder(serviceBusConnectionString); - return new DefaultServiceBusPersisterConnection(serviceBusConnection, logger); + var subscriptionClientName = Configuration["SubscriptionClientName"]; + + return new DefaultServiceBusPersisterConnection(serviceBusConnection, subscriptionClientName); }); } else @@ -205,8 +205,6 @@ namespace Ordering.SignalrHub private void RegisterEventBus(IServiceCollection services) { - var subscriptionClientName = Configuration["SubscriptionClientName"]; - if (Configuration.GetValue("AzureServiceBusEnabled")) { services.AddSingleton(sp => @@ -217,13 +215,14 @@ namespace Ordering.SignalrHub var eventBusSubcriptionsManager = sp.GetRequiredService(); return new EventBusServiceBus(serviceBusPersisterConnection, logger, - eventBusSubcriptionsManager, subscriptionClientName, iLifetimeScope); + eventBusSubcriptionsManager, iLifetimeScope); }); } else { services.AddSingleton(sp => { + var subscriptionClientName = Configuration["SubscriptionClientName"]; var rabbitMQPersistentConnection = sp.GetRequiredService(); var iLifetimeScope = sp.GetRequiredService(); var logger = sp.GetRequiredService>(); diff --git a/src/Services/Payment/Payment.API/Startup.cs b/src/Services/Payment/Payment.API/Startup.cs index 77b8d55fd..a39d3b97d 100644 --- a/src/Services/Payment/Payment.API/Startup.cs +++ b/src/Services/Payment/Payment.API/Startup.cs @@ -40,12 +40,11 @@ namespace Payment.API { services.AddSingleton(sp => { - var logger = sp.GetRequiredService>(); - var serviceBusConnectionString = Configuration["EventBusConnection"]; var serviceBusConnection = new ServiceBusConnectionStringBuilder(serviceBusConnectionString); + var subscriptionClientName = Configuration["SubscriptionClientName"]; - return new DefaultServiceBusPersisterConnection(serviceBusConnection, logger); + return new DefaultServiceBusPersisterConnection(serviceBusConnection, subscriptionClientName); }); } else @@ -123,8 +122,6 @@ namespace Payment.API private void RegisterEventBus(IServiceCollection services) { - var subscriptionClientName = Configuration["SubscriptionClientName"]; - if (Configuration.GetValue("AzureServiceBusEnabled")) { services.AddSingleton(sp => @@ -135,13 +132,14 @@ namespace Payment.API var eventBusSubcriptionsManager = sp.GetRequiredService(); return new EventBusServiceBus(serviceBusPersisterConnection, logger, - eventBusSubcriptionsManager, subscriptionClientName, iLifetimeScope); + eventBusSubcriptionsManager, iLifetimeScope); }); } else { services.AddSingleton(sp => { + var subscriptionClientName = Configuration["SubscriptionClientName"]; var rabbitMQPersistentConnection = sp.GetRequiredService(); var iLifetimeScope = sp.GetRequiredService(); var logger = sp.GetRequiredService>(); diff --git a/src/Services/Webhooks/Webhooks.API/Startup.cs b/src/Services/Webhooks/Webhooks.API/Startup.cs index f3b3efbb8..4744e9d24 100644 --- a/src/Services/Webhooks/Webhooks.API/Startup.cs +++ b/src/Services/Webhooks/Webhooks.API/Startup.cs @@ -207,8 +207,6 @@ namespace Webhooks.API } public static IServiceCollection AddEventBus(this IServiceCollection services, IConfiguration configuration) { - var subscriptionClientName = configuration["SubscriptionClientName"]; - if (configuration.GetValue("AzureServiceBusEnabled")) { services.AddSingleton(sp => @@ -219,7 +217,7 @@ namespace Webhooks.API var eventBusSubcriptionsManager = sp.GetRequiredService(); return new EventBusServiceBus(serviceBusPersisterConnection, logger, - eventBusSubcriptionsManager, subscriptionClientName, iLifetimeScope); + eventBusSubcriptionsManager, iLifetimeScope); }); } @@ -227,6 +225,7 @@ namespace Webhooks.API { services.AddSingleton(sp => { + var subscriptionClientName = configuration["SubscriptionClientName"]; var rabbitMQPersistentConnection = sp.GetRequiredService(); var iLifetimeScope = sp.GetRequiredService(); var logger = sp.GetRequiredService>(); @@ -286,9 +285,9 @@ namespace Webhooks.API { services.AddSingleton(sp => { - var logger = sp.GetRequiredService>(); var serviceBusConnection = new ServiceBusConnectionStringBuilder(configuration["EventBusConnection"]); - return new DefaultServiceBusPersisterConnection(serviceBusConnection, logger); + var subscriptionClientName = configuration["SubscriptionClientName"]; + return new DefaultServiceBusPersisterConnection(serviceBusConnection, subscriptionClientName); }); } else