Browse Source

Service Bus persisted connection for ISubscriptionClient (#1521)

pull/1629/head
Roman Marusyk 3 years ago
committed by GitHub
parent
commit
9b1a08a761
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 73 additions and 67 deletions
  1. +29
    -5
      src/BuildingBlocks/EventBus/EventBusServiceBus/DefaultServiceBusPersisterConnection.cs
  2. +15
    -20
      src/BuildingBlocks/EventBus/EventBusServiceBus/EventBusServiceBus.cs
  3. +2
    -3
      src/BuildingBlocks/EventBus/EventBusServiceBus/IServiceBusPersisterConnection.cs
  4. +4
    -6
      src/Services/Basket/Basket.API/Startup.cs
  5. +4
    -6
      src/Services/Catalog/Catalog.API/Startup.cs
  6. +4
    -6
      src/Services/Ordering/Ordering.API/Startup.cs
  7. +2
    -4
      src/Services/Ordering/Ordering.BackgroundTasks/Extensions/CustomExtensionMethods.cs
  8. +5
    -6
      src/Services/Ordering/Ordering.SignalrHub/Startup.cs
  9. +4
    -6
      src/Services/Payment/Payment.API/Startup.cs
  10. +4
    -5
      src/Services/Webhooks/Webhooks.API/Startup.cs

+ 29
- 5
src/BuildingBlocks/EventBus/EventBusServiceBus/DefaultServiceBusPersisterConnection.cs View File

@ -1,27 +1,51 @@
using Microsoft.Azure.ServiceBus;
using Microsoft.Extensions.Logging;
using System;
namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusServiceBus
{
public class DefaultServiceBusPersisterConnection : IServiceBusPersisterConnection
{
private readonly ILogger<DefaultServiceBusPersisterConnection> _logger;
private readonly ServiceBusConnectionStringBuilder _serviceBusConnectionStringBuilder;
private readonly string _subscriptionClientName;
private SubscriptionClient _subscriptionClient;
private ITopicClient _topicClient;
bool _disposed;
public DefaultServiceBusPersisterConnection(ServiceBusConnectionStringBuilder serviceBusConnectionStringBuilder,
ILogger<DefaultServiceBusPersisterConnection> logger)
string subscriptionClientName)
{
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_serviceBusConnectionStringBuilder = serviceBusConnectionStringBuilder ??
throw new ArgumentNullException(nameof(serviceBusConnectionStringBuilder));
_subscriptionClientName = subscriptionClientName;
_subscriptionClient = new SubscriptionClient(_serviceBusConnectionStringBuilder, subscriptionClientName);
_topicClient = new TopicClient(_serviceBusConnectionStringBuilder, RetryPolicy.Default);
}
public ITopicClient TopicClient
{
get
{
if (_topicClient.IsClosedOrClosing)
{
_topicClient = new TopicClient(_serviceBusConnectionStringBuilder, RetryPolicy.Default);
}
return _topicClient;
}
}
public ISubscriptionClient SubscriptionClient
{
get
{
if (_subscriptionClient.IsClosedOrClosing)
{
_subscriptionClient = new SubscriptionClient(_serviceBusConnectionStringBuilder, _subscriptionClientName);
}
return _subscriptionClient;
}
}
public ServiceBusConnectionStringBuilder ServiceBusConnectionStringBuilder => _serviceBusConnectionStringBuilder;
public ITopicClient CreateModel()


+ 15
- 20
src/BuildingBlocks/EventBus/EventBusServiceBus/EventBusServiceBus.cs View File

@ -17,21 +17,16 @@
private readonly IServiceBusPersisterConnection _serviceBusPersisterConnection;
private readonly ILogger<EventBusServiceBus> _logger;
private readonly IEventBusSubscriptionsManager _subsManager;
private readonly SubscriptionClient _subscriptionClient;
private readonly ILifetimeScope _autofac;
private readonly string AUTOFAC_SCOPE_NAME = "eshop_event_bus";
private const string INTEGRATION_EVENT_SUFFIX = "IntegrationEvent";
public EventBusServiceBus(IServiceBusPersisterConnection serviceBusPersisterConnection,
ILogger<EventBusServiceBus> logger, IEventBusSubscriptionsManager subsManager, string subscriptionClientName,
ILifetimeScope autofac)
ILogger<EventBusServiceBus> logger, IEventBusSubscriptionsManager subsManager, ILifetimeScope autofac)
{
_serviceBusPersisterConnection = serviceBusPersisterConnection;
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_subsManager = subsManager ?? new InMemoryEventBusSubscriptionsManager();
_subscriptionClient = new SubscriptionClient(serviceBusPersisterConnection.ServiceBusConnectionStringBuilder,
subscriptionClientName);
_autofac = autofac;
RemoveDefaultRule();
@ -51,9 +46,7 @@
Label = eventName,
};
var topicClient = _serviceBusPersisterConnection.CreateModel();
topicClient.SendAsync(message)
_serviceBusPersisterConnection.TopicClient.SendAsync(message)
.GetAwaiter()
.GetResult();
}
@ -77,7 +70,7 @@
{
try
{
_subscriptionClient.AddRuleAsync(new RuleDescription
_serviceBusPersisterConnection.SubscriptionClient.AddRuleAsync(new RuleDescription
{
Filter = new CorrelationFilter { Label = eventName },
Name = eventName
@ -102,10 +95,11 @@
try
{
_subscriptionClient
.RemoveRuleAsync(eventName)
.GetAwaiter()
.GetResult();
_serviceBusPersisterConnection
.SubscriptionClient
.RemoveRuleAsync(eventName)
.GetAwaiter()
.GetResult();
}
catch (MessagingEntityNotFoundException)
{
@ -132,7 +126,7 @@
private void RegisterSubscriptionClientMessageHandler()
{
_subscriptionClient.RegisterMessageHandler(
_serviceBusPersisterConnection.SubscriptionClient.RegisterMessageHandler(
async (message, token) =>
{
var eventName = $"{message.Label}{INTEGRATION_EVENT_SUFFIX}";
@ -141,7 +135,7 @@
// Complete the message so that it is not received again.
if (await ProcessEvent(eventName, messageData))
{
await _subscriptionClient.CompleteAsync(message.SystemProperties.LockToken);
await _serviceBusPersisterConnection.SubscriptionClient.CompleteAsync(message.SystemProperties.LockToken);
}
},
new MessageHandlerOptions(ExceptionReceivedHandler) { MaxConcurrentCalls = 10, AutoComplete = false });
@ -194,10 +188,11 @@
{
try
{
_subscriptionClient
.RemoveRuleAsync(RuleDescription.DefaultRuleName)
.GetAwaiter()
.GetResult();
_serviceBusPersisterConnection
.SubscriptionClient
.RemoveRuleAsync(RuleDescription.DefaultRuleName)
.GetAwaiter()
.GetResult();
}
catch (MessagingEntityNotFoundException)
{


+ 2
- 3
src/BuildingBlocks/EventBus/EventBusServiceBus/IServiceBusPersisterConnection.cs View File

@ -5,8 +5,7 @@
public interface IServiceBusPersisterConnection : IDisposable
{
ServiceBusConnectionStringBuilder ServiceBusConnectionStringBuilder { get; }
ITopicClient CreateModel();
ITopicClient TopicClient { get; }
ISubscriptionClient SubscriptionClient { get; }
}
}

+ 4
- 6
src/Services/Basket/Basket.API/Startup.cs View File

@ -121,12 +121,11 @@ namespace Microsoft.eShopOnContainers.Services.Basket.API
{
services.AddSingleton<IServiceBusPersisterConnection>(sp =>
{
var logger = sp.GetRequiredService<ILogger<DefaultServiceBusPersisterConnection>>();
var serviceBusConnectionString = Configuration["EventBusConnection"];
var serviceBusConnection = new ServiceBusConnectionStringBuilder(serviceBusConnectionString);
return new DefaultServiceBusPersisterConnection(serviceBusConnection, logger);
var subscriptionClientName = Configuration["SubscriptionClientName"];
return new DefaultServiceBusPersisterConnection(serviceBusConnection, subscriptionClientName);
});
}
else
@ -278,8 +277,6 @@ namespace Microsoft.eShopOnContainers.Services.Basket.API
private void RegisterEventBus(IServiceCollection services)
{
var subscriptionClientName = Configuration["SubscriptionClientName"];
if (Configuration.GetValue<bool>("AzureServiceBusEnabled"))
{
services.AddSingleton<IEventBus, EventBusServiceBus>(sp =>
@ -290,13 +287,14 @@ namespace Microsoft.eShopOnContainers.Services.Basket.API
var eventBusSubcriptionsManager = sp.GetRequiredService<IEventBusSubscriptionsManager>();
return new EventBusServiceBus(serviceBusPersisterConnection, logger,
eventBusSubcriptionsManager, subscriptionClientName, iLifetimeScope);
eventBusSubcriptionsManager, iLifetimeScope);
});
}
else
{
services.AddSingleton<IEventBus, EventBusRabbitMQ>(sp =>
{
var subscriptionClientName = Configuration["SubscriptionClientName"];
var rabbitMQPersistentConnection = sp.GetRequiredService<IRabbitMQPersistentConnection>();
var iLifetimeScope = sp.GetRequiredService<ILifetimeScope>();
var logger = sp.GetRequiredService<ILogger<EventBusRabbitMQ>>();


+ 4
- 6
src/Services/Catalog/Catalog.API/Startup.cs View File

@ -280,11 +280,10 @@ namespace Microsoft.eShopOnContainers.Services.Catalog.API
services.AddSingleton<IServiceBusPersisterConnection>(sp =>
{
var settings = sp.GetRequiredService<IOptions<CatalogSettings>>().Value;
var logger = sp.GetRequiredService<ILogger<DefaultServiceBusPersisterConnection>>();
var serviceBusConnection = new ServiceBusConnectionStringBuilder(settings.EventBusConnection);
var subscriptionClientName = configuration["SubscriptionClientName"];
return new DefaultServiceBusPersisterConnection(serviceBusConnection, logger);
return new DefaultServiceBusPersisterConnection(serviceBusConnection, subscriptionClientName);
});
}
else
@ -325,8 +324,6 @@ namespace Microsoft.eShopOnContainers.Services.Catalog.API
public static IServiceCollection AddEventBus(this IServiceCollection services, IConfiguration configuration)
{
var subscriptionClientName = configuration["SubscriptionClientName"];
if (configuration.GetValue<bool>("AzureServiceBusEnabled"))
{
services.AddSingleton<IEventBus, EventBusServiceBus>(sp =>
@ -337,7 +334,7 @@ namespace Microsoft.eShopOnContainers.Services.Catalog.API
var eventBusSubcriptionsManager = sp.GetRequiredService<IEventBusSubscriptionsManager>();
return new EventBusServiceBus(serviceBusPersisterConnection, logger,
eventBusSubcriptionsManager, subscriptionClientName, iLifetimeScope);
eventBusSubcriptionsManager, iLifetimeScope);
});
}
@ -345,6 +342,7 @@ namespace Microsoft.eShopOnContainers.Services.Catalog.API
{
services.AddSingleton<IEventBus, EventBusRabbitMQ>(sp =>
{
var subscriptionClientName = configuration["SubscriptionClientName"];
var rabbitMQPersistentConnection = sp.GetRequiredService<IRabbitMQPersistentConnection>();
var iLifetimeScope = sp.GetRequiredService<ILifetimeScope>();
var logger = sp.GetRequiredService<ILogger<EventBusRabbitMQ>>();


+ 4
- 6
src/Services/Ordering/Ordering.API/Startup.cs View File

@ -297,12 +297,11 @@
{
services.AddSingleton<IServiceBusPersisterConnection>(sp =>
{
var logger = sp.GetRequiredService<ILogger<DefaultServiceBusPersisterConnection>>();
var serviceBusConnectionString = configuration["EventBusConnection"];
var serviceBusConnection = new ServiceBusConnectionStringBuilder(serviceBusConnectionString);
var subscriptionClientName = configuration["SubscriptionClientName"];
return new DefaultServiceBusPersisterConnection(serviceBusConnection, logger);
return new DefaultServiceBusPersisterConnection(serviceBusConnection, subscriptionClientName);
});
}
else
@ -368,8 +367,6 @@
public static IServiceCollection AddEventBus(this IServiceCollection services, IConfiguration configuration)
{
var subscriptionClientName = configuration["SubscriptionClientName"];
if (configuration.GetValue<bool>("AzureServiceBusEnabled"))
{
services.AddSingleton<IEventBus, EventBusServiceBus>(sp =>
@ -380,13 +377,14 @@
var eventBusSubcriptionsManager = sp.GetRequiredService<IEventBusSubscriptionsManager>();
return new EventBusServiceBus(serviceBusPersisterConnection, logger,
eventBusSubcriptionsManager, subscriptionClientName, iLifetimeScope);
eventBusSubcriptionsManager, iLifetimeScope);
});
}
else
{
services.AddSingleton<IEventBus, EventBusRabbitMQ>(sp =>
{
var subscriptionClientName = configuration["SubscriptionClientName"];
var rabbitMQPersistentConnection = sp.GetRequiredService<IRabbitMQPersistentConnection>();
var iLifetimeScope = sp.GetRequiredService<ILifetimeScope>();
var logger = sp.GetRequiredService<ILogger<EventBusRabbitMQ>>();


+ 2
- 4
src/Services/Ordering/Ordering.BackgroundTasks/Extensions/CustomExtensionMethods.cs View File

@ -53,12 +53,10 @@ namespace Ordering.BackgroundTasks.Extensions
{
services.AddSingleton<IServiceBusPersisterConnection>(sp =>
{
var logger = sp.GetRequiredService<ILogger<DefaultServiceBusPersisterConnection>>();
var serviceBusConnectionString = configuration["EventBusConnection"];
var serviceBusConnection = new ServiceBusConnectionStringBuilder(serviceBusConnectionString);
return new DefaultServiceBusPersisterConnection(serviceBusConnection, logger);
return new DefaultServiceBusPersisterConnection(serviceBusConnection, subscriptionClientName);
});
services.AddSingleton<IEventBus, EventBusServiceBus>(sp =>
@ -68,7 +66,7 @@ namespace Ordering.BackgroundTasks.Extensions
var logger = sp.GetRequiredService<ILogger<EventBusServiceBus>>();
var eventBusSubcriptionsManager = sp.GetRequiredService<IEventBusSubscriptionsManager>();
return new EventBusServiceBus(serviceBusPersisterConnection, logger, eventBusSubcriptionsManager, subscriptionClientName, iLifetimeScope);
return new EventBusServiceBus(serviceBusPersisterConnection, logger, eventBusSubcriptionsManager, iLifetimeScope);
});
}
else


+ 5
- 6
src/Services/Ordering/Ordering.SignalrHub/Startup.cs View File

@ -64,12 +64,12 @@ namespace Ordering.SignalrHub
{
services.AddSingleton<IServiceBusPersisterConnection>(sp =>
{
var logger = sp.GetRequiredService<ILogger<DefaultServiceBusPersisterConnection>>();
var serviceBusConnectionString = Configuration["EventBusConnection"];
var serviceBusConnection = new ServiceBusConnectionStringBuilder(serviceBusConnectionString);
return new DefaultServiceBusPersisterConnection(serviceBusConnection, logger);
var subscriptionClientName = Configuration["SubscriptionClientName"];
return new DefaultServiceBusPersisterConnection(serviceBusConnection, subscriptionClientName);
});
}
else
@ -205,8 +205,6 @@ namespace Ordering.SignalrHub
private void RegisterEventBus(IServiceCollection services)
{
var subscriptionClientName = Configuration["SubscriptionClientName"];
if (Configuration.GetValue<bool>("AzureServiceBusEnabled"))
{
services.AddSingleton<IEventBus, EventBusServiceBus>(sp =>
@ -217,13 +215,14 @@ namespace Ordering.SignalrHub
var eventBusSubcriptionsManager = sp.GetRequiredService<IEventBusSubscriptionsManager>();
return new EventBusServiceBus(serviceBusPersisterConnection, logger,
eventBusSubcriptionsManager, subscriptionClientName, iLifetimeScope);
eventBusSubcriptionsManager, iLifetimeScope);
});
}
else
{
services.AddSingleton<IEventBus, EventBusRabbitMQ>(sp =>
{
var subscriptionClientName = Configuration["SubscriptionClientName"];
var rabbitMQPersistentConnection = sp.GetRequiredService<IRabbitMQPersistentConnection>();
var iLifetimeScope = sp.GetRequiredService<ILifetimeScope>();
var logger = sp.GetRequiredService<ILogger<EventBusRabbitMQ>>();


+ 4
- 6
src/Services/Payment/Payment.API/Startup.cs View File

@ -40,12 +40,11 @@ namespace Payment.API
{
services.AddSingleton<IServiceBusPersisterConnection>(sp =>
{
var logger = sp.GetRequiredService<ILogger<DefaultServiceBusPersisterConnection>>();
var serviceBusConnectionString = Configuration["EventBusConnection"];
var serviceBusConnection = new ServiceBusConnectionStringBuilder(serviceBusConnectionString);
var subscriptionClientName = Configuration["SubscriptionClientName"];
return new DefaultServiceBusPersisterConnection(serviceBusConnection, logger);
return new DefaultServiceBusPersisterConnection(serviceBusConnection, subscriptionClientName);
});
}
else
@ -123,8 +122,6 @@ namespace Payment.API
private void RegisterEventBus(IServiceCollection services)
{
var subscriptionClientName = Configuration["SubscriptionClientName"];
if (Configuration.GetValue<bool>("AzureServiceBusEnabled"))
{
services.AddSingleton<IEventBus, EventBusServiceBus>(sp =>
@ -135,13 +132,14 @@ namespace Payment.API
var eventBusSubcriptionsManager = sp.GetRequiredService<IEventBusSubscriptionsManager>();
return new EventBusServiceBus(serviceBusPersisterConnection, logger,
eventBusSubcriptionsManager, subscriptionClientName, iLifetimeScope);
eventBusSubcriptionsManager, iLifetimeScope);
});
}
else
{
services.AddSingleton<IEventBus, EventBusRabbitMQ>(sp =>
{
var subscriptionClientName = Configuration["SubscriptionClientName"];
var rabbitMQPersistentConnection = sp.GetRequiredService<IRabbitMQPersistentConnection>();
var iLifetimeScope = sp.GetRequiredService<ILifetimeScope>();
var logger = sp.GetRequiredService<ILogger<EventBusRabbitMQ>>();


+ 4
- 5
src/Services/Webhooks/Webhooks.API/Startup.cs View File

@ -207,8 +207,6 @@ namespace Webhooks.API
}
public static IServiceCollection AddEventBus(this IServiceCollection services, IConfiguration configuration)
{
var subscriptionClientName = configuration["SubscriptionClientName"];
if (configuration.GetValue<bool>("AzureServiceBusEnabled"))
{
services.AddSingleton<IEventBus, EventBusServiceBus>(sp =>
@ -219,7 +217,7 @@ namespace Webhooks.API
var eventBusSubcriptionsManager = sp.GetRequiredService<IEventBusSubscriptionsManager>();
return new EventBusServiceBus(serviceBusPersisterConnection, logger,
eventBusSubcriptionsManager, subscriptionClientName, iLifetimeScope);
eventBusSubcriptionsManager, iLifetimeScope);
});
}
@ -227,6 +225,7 @@ namespace Webhooks.API
{
services.AddSingleton<IEventBus, EventBusRabbitMQ>(sp =>
{
var subscriptionClientName = configuration["SubscriptionClientName"];
var rabbitMQPersistentConnection = sp.GetRequiredService<IRabbitMQPersistentConnection>();
var iLifetimeScope = sp.GetRequiredService<ILifetimeScope>();
var logger = sp.GetRequiredService<ILogger<EventBusRabbitMQ>>();
@ -286,9 +285,9 @@ namespace Webhooks.API
{
services.AddSingleton<IServiceBusPersisterConnection>(sp =>
{
var logger = sp.GetRequiredService<ILogger<DefaultServiceBusPersisterConnection>>();
var serviceBusConnection = new ServiceBusConnectionStringBuilder(configuration["EventBusConnection"]);
return new DefaultServiceBusPersisterConnection(serviceBusConnection, logger);
var subscriptionClientName = configuration["SubscriptionClientName"];
return new DefaultServiceBusPersisterConnection(serviceBusConnection, subscriptionClientName);
});
}
else


Loading…
Cancel
Save