diff --git a/src/BuildingBlocks/EventBus/EventBus/Abstractions/IEventBus.cs b/src/BuildingBlocks/EventBus/EventBus/Abstractions/IEventBus.cs index dde05e1e3..3b58f2e49 100644 --- a/src/BuildingBlocks/EventBus/EventBus/Abstractions/IEventBus.cs +++ b/src/BuildingBlocks/EventBus/EventBus/Abstractions/IEventBus.cs @@ -20,5 +20,7 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Abstractions void Unsubscribe() where TH : IIntegrationEventHandler where T : IntegrationEvent; + + String GetVHost(); } } diff --git a/src/BuildingBlocks/EventBus/EventBus/Abstractions/IMultiEventBus.cs b/src/BuildingBlocks/EventBus/EventBus/Abstractions/IMultiEventBus.cs new file mode 100644 index 000000000..4057b261e --- /dev/null +++ b/src/BuildingBlocks/EventBus/EventBus/Abstractions/IMultiEventBus.cs @@ -0,0 +1,15 @@ +using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Events; + +namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Abstractions +{ + public interface IMultiEventBus + { + void AddEventBus(IEventBus eventBus); + + void Publish(IntegrationEvent @event); + + void Subscribe() + where T : IntegrationEvent + where TH : IIntegrationEventHandler; + } +} \ No newline at end of file diff --git a/src/BuildingBlocks/EventBus/EventBus/IEventBusSubscriptionsManager.cs b/src/BuildingBlocks/EventBus/EventBus/IEventBusSubscriptionsManager.cs index c83c505b1..0c791816e 100644 --- a/src/BuildingBlocks/EventBus/EventBus/IEventBusSubscriptionsManager.cs +++ b/src/BuildingBlocks/EventBus/EventBus/IEventBusSubscriptionsManager.cs @@ -10,25 +10,25 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBus { bool IsEmpty { get; } event EventHandler OnEventRemoved; - void AddDynamicSubscription(string eventName) + void AddDynamicSubscription(string eventName, String vHost) where TH : IDynamicIntegrationEventHandler; - void AddSubscription() + void AddSubscription(String vHost) where T : IntegrationEvent where TH : IIntegrationEventHandler; - void RemoveSubscription() + void RemoveSubscription(String vHost) where TH : IIntegrationEventHandler where T : IntegrationEvent; - void RemoveDynamicSubscription(string eventName) + void RemoveDynamicSubscription(string eventName, String vHost) where TH : IDynamicIntegrationEventHandler; - bool HasSubscriptionsForEvent() where T : IntegrationEvent; - bool HasSubscriptionsForEvent(string eventName); + bool HasSubscriptionsForEvent(String vHost) where T : IntegrationEvent; + bool HasSubscriptionsForEvent(string eventName, String vHost); Type GetEventTypeByName(string eventName); void Clear(); - IEnumerable GetHandlersForEvent() where T : IntegrationEvent; - IEnumerable GetHandlersForEvent(string eventName); + IEnumerable GetHandlersForEvent(String vHost) where T : IntegrationEvent; + IEnumerable GetHandlersForEvent(string eventName, String vHost); string GetEventKey(); } } \ No newline at end of file diff --git a/src/BuildingBlocks/EventBus/EventBus/InMemoryEventBusSubscriptionsManager.cs b/src/BuildingBlocks/EventBus/EventBus/InMemoryEventBusSubscriptionsManager.cs index ea1aca61e..e13881fbe 100644 --- a/src/BuildingBlocks/EventBus/EventBus/InMemoryEventBusSubscriptionsManager.cs +++ b/src/BuildingBlocks/EventBus/EventBus/InMemoryEventBusSubscriptionsManager.cs @@ -10,48 +10,52 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBus { - private readonly Dictionary> _handlers; + //private readonly Dictionary> _handlers; + private readonly Dictionary> _handlers; private readonly List _eventTypes; public event EventHandler OnEventRemoved; public InMemoryEventBusSubscriptionsManager() { - _handlers = new Dictionary>(); + //_handlers = new Dictionary>(); + _handlers = new Dictionary>(); _eventTypes = new List(); } public bool IsEmpty => !_handlers.Keys.Any(); public void Clear() => _handlers.Clear(); - public void AddDynamicSubscription(string eventName) + public void AddDynamicSubscription(string eventName, String vHost) where TH : IDynamicIntegrationEventHandler { - DoAddSubscription(typeof(TH), eventName, isDynamic: true); + DoAddSubscription(typeof(TH), eventName, isDynamic: true, vHost); } - public void AddSubscription() + public void AddSubscription(String vHost) where T : IntegrationEvent where TH : IIntegrationEventHandler { var eventName = GetEventKey(); - DoAddSubscription(typeof(TH), eventName, isDynamic: false); - + DoAddSubscription(typeof(TH), eventName, isDynamic: false, vHost); + if (!_eventTypes.Contains(typeof(T))) { _eventTypes.Add(typeof(T)); } } - private void DoAddSubscription(Type handlerType, string eventName, bool isDynamic) + private void DoAddSubscription(Type handlerType, string eventName, bool isDynamic, String vHost) { - if (!HasSubscriptionsForEvent(eventName)) + var compositeHandler = new CompositeHandler{TenantVHostName = vHost, EventName = eventName}; + + if (!HasSubscriptionsForEvent(eventName, vHost)) { - _handlers.Add(eventName, new List()); + _handlers.Add(compositeHandler, new List()); } - if (_handlers[eventName].Any(s => s.HandlerType == handlerType)) + if (_handlers[compositeHandler].Any(s => s.HandlerType == handlerType)) { throw new ArgumentException( $"Handler Type {handlerType.Name} already registered for '{eventName}'", nameof(handlerType)); @@ -59,41 +63,43 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBus if (isDynamic) { - _handlers[eventName].Add(SubscriptionInfo.Dynamic(handlerType)); + _handlers[compositeHandler].Add(SubscriptionInfo.Dynamic(handlerType)); } else { - _handlers[eventName].Add(SubscriptionInfo.Typed(handlerType)); + _handlers[compositeHandler].Add(SubscriptionInfo.Typed(handlerType)); } } - public void RemoveDynamicSubscription(string eventName) + public void RemoveDynamicSubscription(string eventName, String vHost) where TH : IDynamicIntegrationEventHandler { - var handlerToRemove = FindDynamicSubscriptionToRemove(eventName); - DoRemoveHandler(eventName, handlerToRemove); + var handlerToRemove = FindDynamicSubscriptionToRemove(eventName, vHost); + DoRemoveHandler(eventName, handlerToRemove, vHost); } - public void RemoveSubscription() + public void RemoveSubscription(String vHost) where TH : IIntegrationEventHandler where T : IntegrationEvent { - var handlerToRemove = FindSubscriptionToRemove(); + var handlerToRemove = FindSubscriptionToRemove(vHost); var eventName = GetEventKey(); - DoRemoveHandler(eventName, handlerToRemove); + DoRemoveHandler(eventName, handlerToRemove, vHost); } - private void DoRemoveHandler(string eventName, SubscriptionInfo subsToRemove) + private void DoRemoveHandler(string eventName, SubscriptionInfo subsToRemove, String vHost) { if (subsToRemove != null) { - _handlers[eventName].Remove(subsToRemove); - if (!_handlers[eventName].Any()) + + var compositeHandler = new CompositeHandler{EventName = eventName, TenantVHostName = vHost}; + _handlers[compositeHandler].Remove(subsToRemove); + if (!_handlers[compositeHandler].Any()) { - _handlers.Remove(eventName); + _handlers.Remove(compositeHandler); var eventType = _eventTypes.SingleOrDefault(e => e.Name == eventName); if (eventType != null) { @@ -105,12 +111,12 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBus } } - public IEnumerable GetHandlersForEvent() where T : IntegrationEvent + public IEnumerable GetHandlersForEvent(String vHost) where T : IntegrationEvent { var key = GetEventKey(); - return GetHandlersForEvent(key); + return GetHandlersForEvent(key, vHost); } - public IEnumerable GetHandlersForEvent(string eventName) => _handlers[eventName]; + public IEnumerable GetHandlersForEvent(string eventName, String vHost) => _handlers[new CompositeHandler{EventName = eventName, TenantVHostName = vHost}]; private void RaiseOnEventRemoved(string eventName) { @@ -119,38 +125,43 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBus } - private SubscriptionInfo FindDynamicSubscriptionToRemove(string eventName) + private SubscriptionInfo FindDynamicSubscriptionToRemove(string eventName, String vHost) where TH : IDynamicIntegrationEventHandler { - return DoFindSubscriptionToRemove(eventName, typeof(TH)); + return DoFindSubscriptionToRemove(eventName, typeof(TH), vHost); } - private SubscriptionInfo FindSubscriptionToRemove() + private SubscriptionInfo FindSubscriptionToRemove(String vHost) where T : IntegrationEvent where TH : IIntegrationEventHandler { var eventName = GetEventKey(); - return DoFindSubscriptionToRemove(eventName, typeof(TH)); + + return DoFindSubscriptionToRemove(eventName, typeof(TH), vHost); } - private SubscriptionInfo DoFindSubscriptionToRemove(string eventName, Type handlerType) + private SubscriptionInfo DoFindSubscriptionToRemove(string eventName, Type handlerType, String vHost) { - if (!HasSubscriptionsForEvent(eventName)) + + if (!HasSubscriptionsForEvent(eventName, vHost)) { return null; } + + var compositeHandler = new CompositeHandler{EventName = eventName, TenantVHostName = vHost}; - return _handlers[eventName].SingleOrDefault(s => s.HandlerType == handlerType); + return _handlers[compositeHandler].SingleOrDefault(s => s.HandlerType == handlerType); } - public bool HasSubscriptionsForEvent() where T : IntegrationEvent + public bool HasSubscriptionsForEvent(String vHost) where T : IntegrationEvent { var key = GetEventKey(); - return HasSubscriptionsForEvent(key); + + return HasSubscriptionsForEvent(key, vHost); } - public bool HasSubscriptionsForEvent(string eventName) => _handlers.ContainsKey(eventName); + public bool HasSubscriptionsForEvent(string eventName, String vHost) => _handlers.ContainsKey(new CompositeHandler{EventName = eventName, TenantVHostName = vHost}); public Type GetEventTypeByName(string eventName) => _eventTypes.SingleOrDefault(t => t.Name == eventName); @@ -160,3 +171,31 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBus } } } + + +class CompositeHandler +{ + public String TenantVHostName { get; set; } + public String EventName { get; set; } + + protected bool Equals(CompositeHandler other) + { + return TenantVHostName == other.TenantVHostName && EventName == other.EventName; + } + + public override bool Equals(object obj) + { + if (ReferenceEquals(null, obj)) return false; + if (ReferenceEquals(this, obj)) return true; + if (obj.GetType() != this.GetType()) return false; + return Equals((CompositeHandler) obj); + } + + public override int GetHashCode() + { + unchecked + { + return ((TenantVHostName != null ? TenantVHostName.GetHashCode() : 0) * 397) ^ (EventName != null ? EventName.GetHashCode() : 0); + } + } +} \ No newline at end of file diff --git a/src/BuildingBlocks/EventBus/EventBusRabbitMQ/EventBusRabbitMQ.cs b/src/BuildingBlocks/EventBus/EventBusRabbitMQ/EventBusRabbitMQ.cs index 5f68648d8..84d39c979 100644 --- a/src/BuildingBlocks/EventBus/EventBusRabbitMQ/EventBusRabbitMQ.cs +++ b/src/BuildingBlocks/EventBus/EventBusRabbitMQ/EventBusRabbitMQ.cs @@ -12,12 +12,8 @@ using RabbitMQ.Client; using RabbitMQ.Client.Events; using RabbitMQ.Client.Exceptions; using System; -using System.Collections; using System.Collections.Generic; -using System.Diagnostics; -using System.Net; using System.Net.Http; -using System.Net.Mime; using System.Net.Sockets; using System.Text; using System.Threading.Tasks; @@ -37,14 +33,15 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ private static readonly String tenantACustomisationUrl = @"http://tenantacustomisation/"; private static readonly String tenantManagerUrl = @"http://tenantmanager/"; private readonly int _retryCount; - private readonly Dictionary _tenantInfo; + private readonly Dictionary _tenantInfo; + public String vHost { get; set; } private IModel _consumerChannel; private string _queueName; public EventBusRabbitMQ(IRabbitMQPersistentConnection persistentConnection, ILogger logger, - ILifetimeScope autofac, IEventBusSubscriptionsManager subsManager, string queueName = null, + ILifetimeScope autofac, IEventBusSubscriptionsManager subsManager, String vhost, string queueName = null, int retryCount = 5) { _persistentConnection = @@ -59,6 +56,7 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ _tenantInfo = new Dictionary(); _tenantInfo.Add(1, "TenantA"); _tenantInfo.Add(2, "TenantB"); + vHost = vhost; } private void SubsManager_OnEventRemoved(object sender, string eventName) @@ -137,7 +135,7 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ typeof(TH).GetGenericTypeName()); DoInternalSubscription(eventName); - _subsManager.AddDynamicSubscription(eventName); + _subsManager.AddDynamicSubscription(eventName, vHost); StartBasicConsume(); } @@ -151,13 +149,13 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ _logger.LogInformation("Subscribing to event {EventName} with {EventHandler}", eventName, typeof(TH).GetGenericTypeName()); - _subsManager.AddSubscription(); + _subsManager.AddSubscription(vHost); StartBasicConsume(); } private void DoInternalSubscription(string eventName) { - var containsKey = _subsManager.HasSubscriptionsForEvent(eventName); + var containsKey = _subsManager.HasSubscriptionsForEvent(eventName, vHost); if (!containsKey) { if (!_persistentConnection.IsConnected) @@ -182,13 +180,18 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ _logger.LogInformation("Unsubscribing from event {EventName}", eventName); - _subsManager.RemoveSubscription(); + _subsManager.RemoveSubscription(vHost); + } + + public string GetVHost() + { + return vHost; } public void UnsubscribeDynamic(string eventName) where TH : IDynamicIntegrationEventHandler { - _subsManager.RemoveDynamicSubscription(eventName); + _subsManager.RemoveDynamicSubscription(eventName, vHost); } public void Dispose() @@ -341,11 +344,11 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ { _logger.LogWarning("Processing RabbitMQ event: {EventName}", eventName); - if (_subsManager.HasSubscriptionsForEvent(eventName)) + if (_subsManager.HasSubscriptionsForEvent(eventName, vHost)) { using (var scope = _autofac.BeginLifetimeScope(AUTOFAC_SCOPE_NAME)) { - var subscriptions = _subsManager.GetHandlersForEvent(eventName); + var subscriptions = _subsManager.GetHandlersForEvent(eventName, vHost); foreach (var subscription in subscriptions) { if (subscription.IsDynamic) diff --git a/src/BuildingBlocks/EventBus/EventBusRabbitMQ/IMultiRabbitMQPersistentConnections.cs b/src/BuildingBlocks/EventBus/EventBusRabbitMQ/IMultiRabbitMQPersistentConnections.cs new file mode 100644 index 000000000..f74d6dea8 --- /dev/null +++ b/src/BuildingBlocks/EventBus/EventBusRabbitMQ/IMultiRabbitMQPersistentConnections.cs @@ -0,0 +1,12 @@ +using System; +using System.Collections.Generic; + +namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ +{ + public interface IMultiRabbitMQPersistentConnections + { + List GetConnections(); + void AddConnection(IRabbitMQPersistentConnection connection); + void RemoveConnection(IRabbitMQPersistentConnection connection); + } +} \ No newline at end of file diff --git a/src/BuildingBlocks/EventBus/EventBusRabbitMQ/MultiEventBusRabbitMQ.cs b/src/BuildingBlocks/EventBus/EventBusRabbitMQ/MultiEventBusRabbitMQ.cs new file mode 100644 index 000000000..f2c9fd0c5 --- /dev/null +++ b/src/BuildingBlocks/EventBus/EventBusRabbitMQ/MultiEventBusRabbitMQ.cs @@ -0,0 +1,49 @@ +using System; +using System.Collections.Generic; +using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Abstractions; +using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Events; + +namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ +{ + public class MultiEventBusRabbitMQ : IMultiEventBus + { + private List _eventBuses; + private Dictionary _tenants; + + public MultiEventBusRabbitMQ(List eventBuses, Dictionary tenants) + { + _eventBuses = eventBuses; + _tenants = tenants; + } + + public void AddEventBus(IEventBus eventBus) + { + _eventBuses.Add(eventBus); + } + + public void Publish(IntegrationEvent @event) + { + if (@event.TenantId == 0)//System wide event? + { + _eventBuses.ForEach(eventBus => + { + eventBus.Publish(@event); + }); + } + else + { + //TODO requires ALL events to have tenantId set! + _tenants.TryGetValue(@event.TenantId, out String tenantName); + var actualEventBus = _eventBuses.Find(e => e.GetVHost().Equals(tenantName)); + + actualEventBus.Publish(@event); + } + + } + + public void Subscribe() where T : IntegrationEvent where TH : IIntegrationEventHandler + { + _eventBuses.ForEach(e => { e.Subscribe(); }); + } + } +} \ No newline at end of file diff --git a/src/BuildingBlocks/EventBus/EventBusRabbitMQ/MultiRabbitMQPersistentConnections.cs b/src/BuildingBlocks/EventBus/EventBusRabbitMQ/MultiRabbitMQPersistentConnections.cs new file mode 100644 index 000000000..a2e794695 --- /dev/null +++ b/src/BuildingBlocks/EventBus/EventBusRabbitMQ/MultiRabbitMQPersistentConnections.cs @@ -0,0 +1,30 @@ +using System.Collections.Generic; + +namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ +{ + public class MultiRabbitMQPersistentConnections : IMultiRabbitMQPersistentConnections + { + public List Connections; + + public MultiRabbitMQPersistentConnections() + { + Connections = new List(); + } + + + public List GetConnections() + { + return Connections; + } + + public void AddConnection(IRabbitMQPersistentConnection connection) + { + Connections.Add((connection)); + } + + public void RemoveConnection(IRabbitMQPersistentConnection connection) + { + Connections.Remove(connection); + } + } +} \ No newline at end of file diff --git a/src/BuildingBlocks/EventBus/EventBusServiceBus/EventBusServiceBus.cs b/src/BuildingBlocks/EventBus/EventBusServiceBus/EventBusServiceBus.cs index 53ae33ae0..f88361688 100644 --- a/src/BuildingBlocks/EventBus/EventBusServiceBus/EventBusServiceBus.cs +++ b/src/BuildingBlocks/EventBus/EventBusServiceBus/EventBusServiceBus.cs @@ -1,4 +1,4 @@ -namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusServiceBus +/*namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusServiceBus { using Autofac; using Microsoft.Azure.ServiceBus; @@ -206,3 +206,4 @@ } } } +*/ \ No newline at end of file diff --git a/src/Services/Basket/Basket.API/Controllers/BasketController.cs b/src/Services/Basket/Basket.API/Controllers/BasketController.cs index c69d82a89..10036c283 100644 --- a/src/Services/Basket/Basket.API/Controllers/BasketController.cs +++ b/src/Services/Basket/Basket.API/Controllers/BasketController.cs @@ -21,14 +21,14 @@ namespace Microsoft.eShopOnContainers.Services.Basket.API.Controllers { private readonly IBasketRepository _repository; private readonly IIdentityService _identityService; - private readonly IEventBus _eventBus; + private readonly IMultiEventBus _eventBus; private readonly ILogger _logger; public BasketController( ILogger logger, IBasketRepository repository, IIdentityService identityService, - IEventBus eventBus) + IMultiEventBus eventBus) { _logger = logger; _repository = repository; diff --git a/src/Services/Basket/Basket.API/Startup.cs b/src/Services/Basket/Basket.API/Startup.cs index 9cfabf20a..11413e929 100644 --- a/src/Services/Basket/Basket.API/Startup.cs +++ b/src/Services/Basket/Basket.API/Startup.cs @@ -5,7 +5,6 @@ using Basket.API.Infrastructure.Middlewares; using Basket.API.IntegrationEvents.EventHandling; using Basket.API.IntegrationEvents.Events; using HealthChecks.UI.Client; - using Microsoft.ApplicationInsights.Extensibility; using Microsoft.ApplicationInsights.ServiceFabric; using Microsoft.AspNetCore.Authentication.JwtBearer; @@ -58,7 +57,6 @@ namespace Microsoft.eShopOnContainers.Services.Basket.API { options.Filters.Add(typeof(HttpGlobalExceptionFilter)); options.Filters.Add(typeof(ValidateModelStateFilter)); - }) .SetCompatibilityVersion(CompatibilityVersion.Version_2_2) .AddControllersAsServices(); @@ -85,50 +83,15 @@ namespace Microsoft.eShopOnContainers.Services.Basket.API return ConnectionMultiplexer.Connect(configuration); }); - - if (Configuration.GetValue("AzureServiceBusEnabled")) + services.AddSingleton(sp => { - services.AddSingleton(sp => - { - var logger = sp.GetRequiredService>(); - - var serviceBusConnectionString = Configuration["EventBusConnection"]; - var serviceBusConnection = new ServiceBusConnectionStringBuilder(serviceBusConnectionString); - - return new DefaultServiceBusPersisterConnection(serviceBusConnection, logger); - }); - } - else - { - services.AddSingleton(sp => - { - var logger = sp.GetRequiredService>(); - - var factory = new ConnectionFactory() - { - HostName = Configuration["EventBusConnection"], - DispatchConsumersAsync = true - }; - - if (!string.IsNullOrEmpty(Configuration["EventBusUserName"])) - { - factory.UserName = Configuration["EventBusUserName"]; - } + IMultiRabbitMQPersistentConnections connections = new MultiRabbitMQPersistentConnections(); + connections.AddConnection(GenerateConnection("TenantA", sp)); + connections.AddConnection(GenerateConnection("TenantB", sp)); - if (!string.IsNullOrEmpty(Configuration["EventBusPassword"])) - { - factory.Password = Configuration["EventBusPassword"]; - } + return connections; + }); - var retryCount = 5; - if (!string.IsNullOrEmpty(Configuration["EventBusRetryCount"])) - { - retryCount = int.Parse(Configuration["EventBusRetryCount"]); - } - - return new DefaultRabbitMQPersistentConnection(factory, logger, retryCount); - }); - } RegisterEventBus(services); @@ -151,7 +114,7 @@ namespace Microsoft.eShopOnContainers.Services.Basket.API TokenUrl = $"{Configuration.GetValue("IdentityUrlExternal")}/connect/token", Scopes = new Dictionary() { - { "basket", "Basket API" } + {"basket", "Basket API"} } }); @@ -162,10 +125,10 @@ namespace Microsoft.eShopOnContainers.Services.Basket.API { options.AddPolicy("CorsPolicy", builder => builder - .SetIsOriginAllowed((host) => true) - .AllowAnyMethod() - .AllowAnyHeader() - .AllowCredentials()); + .SetIsOriginAllowed((host) => true) + .AllowAnyMethod() + .AllowAnyHeader() + .AllowCredentials()); }); services.AddSingleton(); services.AddTransient(); @@ -178,7 +141,38 @@ namespace Microsoft.eShopOnContainers.Services.Basket.API return new AutofacServiceProvider(container.Build()); } - + + + private IRabbitMQPersistentConnection GenerateConnection(String vHost, IServiceProvider sp) + { + var logger = sp.GetRequiredService>(); + + var factory = new ConnectionFactory() + { + HostName = Configuration["EventBusConnection"], + DispatchConsumersAsync = true + }; + + if (!string.IsNullOrEmpty(Configuration["EventBusUserName"])) + { + factory.UserName = Configuration["EventBusUserName"]; + } + + if (!string.IsNullOrEmpty(Configuration["EventBusPassword"])) + { + factory.Password = Configuration["EventBusPassword"]; + } + + factory.VirtualHost = vHost; + + var retryCount = 5; + if (!string.IsNullOrEmpty(Configuration["EventBusRetryCount"])) + { + retryCount = int.Parse(Configuration["EventBusRetryCount"]); + } + + return new DefaultRabbitMQPersistentConnection(factory, logger, retryCount); + } // This method gets called by the runtime. Use this method to configure the HTTP request pipeline. public void Configure(IApplicationBuilder app, IHostingEnvironment env, ILoggerFactory loggerFactory) @@ -203,7 +197,7 @@ namespace Microsoft.eShopOnContainers.Services.Basket.API Predicate = r => r.Name.Contains("self") }); - app.UseStaticFiles(); + app.UseStaticFiles(); app.UseCors("CorsPolicy"); ConfigureAuth(app); @@ -211,27 +205,29 @@ namespace Microsoft.eShopOnContainers.Services.Basket.API app.UseMvcWithDefaultRoute(); app.UseSwagger() - .UseSwaggerUI(c => - { - c.SwaggerEndpoint($"{ (!string.IsNullOrEmpty(pathBase) ? pathBase : string.Empty) }/swagger/v1/swagger.json", "Basket.API V1"); - c.OAuthClientId ("basketswaggerui"); - c.OAuthAppName("Basket Swagger UI"); - }); + .UseSwaggerUI(c => + { + c.SwaggerEndpoint( + $"{(!string.IsNullOrEmpty(pathBase) ? pathBase : string.Empty)}/swagger/v1/swagger.json", + "Basket.API V1"); + c.OAuthClientId("basketswaggerui"); + c.OAuthAppName("Basket Swagger UI"); + }); ConfigureEventBus(app); - } private void RegisterAppInsights(IServiceCollection services) { services.AddApplicationInsightsTelemetry(Configuration); var orchestratorType = Configuration.GetValue("OrchestratorType"); - + if (orchestratorType?.ToUpper() == "K8S") { // Enable K8s telemetry initializer services.AddApplicationInsightsKubernetesEnricher(); } + if (orchestratorType?.ToUpper() == "SF") { // Enable SF telemetry initializer @@ -245,13 +241,12 @@ namespace Microsoft.eShopOnContainers.Services.Basket.API // prevent from mapping "sub" claim to nameidentifier. JwtSecurityTokenHandler.DefaultInboundClaimTypeMap.Clear(); - var identityUrl = Configuration.GetValue("IdentityUrl"); - + var identityUrl = Configuration.GetValue("IdentityUrl"); + services.AddAuthentication(options => { options.DefaultAuthenticateScheme = JwtBearerDefaults.AuthenticationScheme; options.DefaultChallengeScheme = JwtBearerDefaults.AuthenticationScheme; - }).AddJwtBearer(options => { options.Authority = identityUrl; @@ -274,37 +269,31 @@ namespace Microsoft.eShopOnContainers.Services.Basket.API { var subscriptionClientName = Configuration["SubscriptionClientName"]; - if (Configuration.GetValue("AzureServiceBusEnabled")) + services.AddSingleton(sp => { - services.AddSingleton(sp => - { - var serviceBusPersisterConnection = sp.GetRequiredService(); - var iLifetimeScope = sp.GetRequiredService(); - var logger = sp.GetRequiredService>(); - var eventBusSubcriptionsManager = sp.GetRequiredService(); + var multiRabbitMqPersistentConnections = sp.GetRequiredService(); + var iLifetimeScope = sp.GetRequiredService(); + var logger = sp.GetRequiredService>(); + var eventBusSubcriptionsManager = sp.GetRequiredService(); - return new EventBusServiceBus(serviceBusPersisterConnection, logger, - eventBusSubcriptionsManager, subscriptionClientName, iLifetimeScope); - }); - } - else - { - services.AddSingleton(sp => + var retryCount = 5; + if (!string.IsNullOrEmpty(Configuration["EventBusRetryCount"])) { - var rabbitMQPersistentConnection = sp.GetRequiredService(); - var iLifetimeScope = sp.GetRequiredService(); - var logger = sp.GetRequiredService>(); - var eventBusSubcriptionsManager = sp.GetRequiredService(); + retryCount = int.Parse(Configuration["EventBusRetryCount"]); + } - var retryCount = 5; - if (!string.IsNullOrEmpty(Configuration["EventBusRetryCount"])) - { - retryCount = int.Parse(Configuration["EventBusRetryCount"]); - } + List eventBuses = new List(); - return new EventBusRabbitMQ(rabbitMQPersistentConnection, logger, iLifetimeScope, eventBusSubcriptionsManager, subscriptionClientName, retryCount); - }); - } + eventBuses.Add(new EventBusRabbitMQ(multiRabbitMqPersistentConnections.GetConnections()[0], logger, + iLifetimeScope, eventBusSubcriptionsManager, "TenantA", subscriptionClientName, retryCount)); + eventBuses.Add(new EventBusRabbitMQ(multiRabbitMqPersistentConnections.GetConnections()[1], logger, + iLifetimeScope, eventBusSubcriptionsManager, "TenantB", subscriptionClientName, retryCount)); + Dictionary tenants = new Dictionary(); + tenants.Add(1, "TenantA"); + tenants.Add(2, "TenantB"); + + return new MultiEventBusRabbitMQ(eventBuses, tenants); + }); services.AddSingleton(); @@ -314,26 +303,27 @@ namespace Microsoft.eShopOnContainers.Services.Basket.API private void ConfigureEventBus(IApplicationBuilder app) { - var eventBus = app.ApplicationServices.GetRequiredService(); + var eventBus = app.ApplicationServices.GetRequiredService(); eventBus.Subscribe(); eventBus.Subscribe(); - } + } } public static class CustomExtensionMethods { - public static IServiceCollection AddCustomHealthCheck(this IServiceCollection services, IConfiguration configuration) + public static IServiceCollection AddCustomHealthCheck(this IServiceCollection services, + IConfiguration configuration) { var hcBuilder = services.AddHealthChecks(); hcBuilder.AddCheck("self", () => HealthCheckResult.Healthy()); - hcBuilder + hcBuilder .AddRedis( configuration["ConnectionString"], name: "redis-check", - tags: new string[] { "redis" }); + tags: new string[] {"redis"}); if (configuration.GetValue("AzureServiceBusEnabled")) { @@ -342,7 +332,7 @@ namespace Microsoft.eShopOnContainers.Services.Basket.API configuration["EventBusConnection"], topicName: "eshop_event_bus", name: "basket-servicebus-check", - tags: new string[] { "servicebus" }); + tags: new string[] {"servicebus"}); } else { @@ -350,10 +340,10 @@ namespace Microsoft.eShopOnContainers.Services.Basket.API .AddRabbitMQ( $"amqp://{configuration["EventBusConnection"]}", name: "basket-rabbitmqbus-check", - tags: new string[] { "rabbitmqbus" }); + tags: new string[] {"rabbitmqbus"}); } return services; } } -} +} \ No newline at end of file diff --git a/src/Services/Catalog/Catalog.API/IntegrationEvents/CatalogIntegrationEventService.cs b/src/Services/Catalog/Catalog.API/IntegrationEvents/CatalogIntegrationEventService.cs index bb3a23d40..ce4d12728 100644 --- a/src/Services/Catalog/Catalog.API/IntegrationEvents/CatalogIntegrationEventService.cs +++ b/src/Services/Catalog/Catalog.API/IntegrationEvents/CatalogIntegrationEventService.cs @@ -17,14 +17,14 @@ namespace Catalog.API.IntegrationEvents public class CatalogIntegrationEventService : ICatalogIntegrationEventService { private readonly Func _integrationEventLogServiceFactory; - private readonly IEventBus _eventBus; + private readonly IMultiEventBus _eventBus; private readonly CatalogContext _catalogContext; private readonly IIntegrationEventLogService _eventLogService; private readonly ILogger _logger; public CatalogIntegrationEventService( ILogger logger, - IEventBus eventBus, + IMultiEventBus eventBus, CatalogContext catalogContext, Func integrationEventLogServiceFactory) { diff --git a/src/Services/Catalog/Catalog.API/IntegrationEvents/EventHandling/OrderStatusChangedToAwaitingValidationIntegrationEventHandler.cs b/src/Services/Catalog/Catalog.API/IntegrationEvents/EventHandling/OrderStatusChangedToAwaitingValidationIntegrationEventHandler.cs index 493a271cc..b402f48b6 100644 --- a/src/Services/Catalog/Catalog.API/IntegrationEvents/EventHandling/OrderStatusChangedToAwaitingValidationIntegrationEventHandler.cs +++ b/src/Services/Catalog/Catalog.API/IntegrationEvents/EventHandling/OrderStatusChangedToAwaitingValidationIntegrationEventHandler.cs @@ -49,6 +49,8 @@ ? (IntegrationEvent)new OrderStockRejectedIntegrationEvent(@event.OrderId, confirmedOrderStockItems) : new OrderStockConfirmedIntegrationEvent(@event.OrderId); + confirmedIntegrationEvent.TenantId = @event.TenantId; + await _catalogIntegrationEventService.SaveEventAndCatalogContextChangesAsync(confirmedIntegrationEvent); await _catalogIntegrationEventService.PublishThroughEventBusAsync(confirmedIntegrationEvent); diff --git a/src/Services/Catalog/Catalog.API/Startup.cs b/src/Services/Catalog/Catalog.API/Startup.cs index 1a51a86fb..ca9a24457 100644 --- a/src/Services/Catalog/Catalog.API/Startup.cs +++ b/src/Services/Catalog/Catalog.API/Startup.cs @@ -26,6 +26,7 @@ using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using RabbitMQ.Client; using System; +using System.Collections.Generic; using System.Data.Common; using System.Reflection; using HealthChecks.UI.Client; @@ -101,7 +102,7 @@ namespace Microsoft.eShopOnContainers.Services.Catalog.API protected virtual void ConfigureEventBus(IApplicationBuilder app) { - var eventBus = app.ApplicationServices.GetRequiredService(); + var eventBus = app.ApplicationServices.GetRequiredService(); eventBus.Subscribe(); eventBus.Subscribe(); } @@ -275,21 +276,7 @@ namespace Microsoft.eShopOnContainers.Services.Catalog.API sp => (DbConnection c) => new IntegrationEventLogService(c)); services.AddTransient(); - - if (configuration.GetValue("AzureServiceBusEnabled")) - { - services.AddSingleton(sp => - { - var settings = sp.GetRequiredService>().Value; - var logger = sp.GetRequiredService>(); - - var serviceBusConnection = new ServiceBusConnectionStringBuilder(settings.EventBusConnection); - - return new DefaultServiceBusPersisterConnection(serviceBusConnection, logger); - }); - } - else - { + services.AddSingleton(sp => { var settings = sp.GetRequiredService>().Value; @@ -310,6 +297,8 @@ namespace Microsoft.eShopOnContainers.Services.Catalog.API { factory.Password = configuration["EventBusPassword"]; } + + factory.VirtualHost = "TenantA"; var retryCount = 5; if (!string.IsNullOrEmpty(configuration["EventBusRetryCount"])) @@ -319,47 +308,79 @@ namespace Microsoft.eShopOnContainers.Services.Catalog.API return new DefaultRabbitMQPersistentConnection(factory, logger, retryCount); }); - } + + services.AddSingleton(sp => + { + IMultiRabbitMQPersistentConnections connections = new MultiRabbitMQPersistentConnections(); + connections.AddConnection(GenerateConnection("TenantA", sp, configuration)); + connections.AddConnection(GenerateConnection("TenantB", sp, configuration)); + return connections; + }); return services; } - public static IServiceCollection AddEventBus(this IServiceCollection services, IConfiguration configuration) + + private static IRabbitMQPersistentConnection GenerateConnection(String vHost, IServiceProvider sp, IConfiguration configuration) { - var subscriptionClientName = configuration["SubscriptionClientName"]; + var logger = sp.GetRequiredService>(); - if (configuration.GetValue("AzureServiceBusEnabled")) + var factory = new ConnectionFactory() { - services.AddSingleton(sp => - { - var serviceBusPersisterConnection = sp.GetRequiredService(); - var iLifetimeScope = sp.GetRequiredService(); - var logger = sp.GetRequiredService>(); - var eventBusSubcriptionsManager = sp.GetRequiredService(); + HostName = configuration["EventBusConnection"], + DispatchConsumersAsync = true + }; - return new EventBusServiceBus(serviceBusPersisterConnection, logger, - eventBusSubcriptionsManager, subscriptionClientName, iLifetimeScope); - }); + if (!string.IsNullOrEmpty(configuration["EventBusUserName"])) + { + factory.UserName = configuration["EventBusUserName"]; + } + if (!string.IsNullOrEmpty(configuration["EventBusPassword"])) + { + factory.Password = configuration["EventBusPassword"]; } - else + + factory.VirtualHost = vHost; + + var retryCount = 5; + if (!string.IsNullOrEmpty(configuration["EventBusRetryCount"])) + { + retryCount = int.Parse(configuration["EventBusRetryCount"]); + } + + return new DefaultRabbitMQPersistentConnection(factory, logger, retryCount); + } + + public static IServiceCollection AddEventBus(this IServiceCollection services, IConfiguration configuration) + { + var subscriptionClientName = configuration["SubscriptionClientName"]; + + services.AddSingleton(sp => { - services.AddSingleton(sp => + var multiRabbitMqPersistentConnections = sp.GetRequiredService(); + var iLifetimeScope = sp.GetRequiredService(); + var logger = sp.GetRequiredService>(); + var eventBusSubcriptionsManager = sp.GetRequiredService(); + + var retryCount = 5; + if (!string.IsNullOrEmpty(configuration["EventBusRetryCount"])) { - var rabbitMQPersistentConnection = sp.GetRequiredService(); - var iLifetimeScope = sp.GetRequiredService(); - var logger = sp.GetRequiredService>(); - var eventBusSubcriptionsManager = sp.GetRequiredService(); + retryCount = int.Parse(configuration["EventBusRetryCount"]); + } - var retryCount = 5; - if (!string.IsNullOrEmpty(configuration["EventBusRetryCount"])) - { - retryCount = int.Parse(configuration["EventBusRetryCount"]); - } + List eventBuses = new List(); - return new EventBusRabbitMQ(rabbitMQPersistentConnection, logger, iLifetimeScope, eventBusSubcriptionsManager, subscriptionClientName, retryCount); - }); - } + eventBuses.Add(new EventBusRabbitMQ(multiRabbitMqPersistentConnections.GetConnections()[0], logger, + iLifetimeScope, eventBusSubcriptionsManager, "TenantA", subscriptionClientName, retryCount)); + eventBuses.Add(new EventBusRabbitMQ(multiRabbitMqPersistentConnections.GetConnections()[1], logger, + iLifetimeScope, eventBusSubcriptionsManager, "TenantB", subscriptionClientName, retryCount)); + Dictionary tenants = new Dictionary(); + tenants.Add(1, "TenantA"); + tenants.Add(2, "TenantB"); + + return new MultiEventBusRabbitMQ(eventBuses, tenants); + }); services.AddSingleton(); services.AddTransient(); diff --git a/src/Services/Location/Locations.API/Infrastructure/Services/LocationsService.cs b/src/Services/Location/Locations.API/Infrastructure/Services/LocationsService.cs index f6b9ed708..27eb53dc9 100644 --- a/src/Services/Location/Locations.API/Infrastructure/Services/LocationsService.cs +++ b/src/Services/Location/Locations.API/Infrastructure/Services/LocationsService.cs @@ -14,12 +14,12 @@ public class LocationsService : ILocationsService { private readonly ILocationsRepository _locationsRepository; - private readonly IEventBus _eventBus; + private readonly IMultiEventBus _eventBus; private readonly ILogger _logger; public LocationsService( ILocationsRepository locationsRepository, - IEventBus eventBus, + IMultiEventBus eventBus, ILogger logger) { _locationsRepository = locationsRepository ?? throw new ArgumentNullException(nameof(locationsRepository)); diff --git a/src/Services/Location/Locations.API/Startup.cs b/src/Services/Location/Locations.API/Startup.cs index 4664381d0..742dd18ac 100644 --- a/src/Services/Location/Locations.API/Startup.cs +++ b/src/Services/Location/Locations.API/Startup.cs @@ -46,10 +46,7 @@ namespace Microsoft.eShopOnContainers.Services.Locations.API services .AddCustomHealthCheck(Configuration) - .AddMvc(options => - { - options.Filters.Add(typeof(HttpGlobalExceptionFilter)); - }) + .AddMvc(options => { options.Filters.Add(typeof(HttpGlobalExceptionFilter)); }) .SetCompatibilityVersion(CompatibilityVersion.Version_2_2) .AddControllersAsServices(); @@ -57,49 +54,14 @@ namespace Microsoft.eShopOnContainers.Services.Locations.API services.Configure(Configuration); - if (Configuration.GetValue("AzureServiceBusEnabled")) + services.AddSingleton(sp => { - services.AddSingleton(sp => - { - var logger = sp.GetRequiredService>(); - - var serviceBusConnectionString = Configuration["EventBusConnection"]; - var serviceBusConnection = new ServiceBusConnectionStringBuilder(serviceBusConnectionString); - - return new DefaultServiceBusPersisterConnection(serviceBusConnection, logger); - }); - } - else - { - services.AddSingleton(sp => - { - var logger = sp.GetRequiredService>(); - - var factory = new ConnectionFactory() - { - HostName = Configuration["EventBusConnection"], - DispatchConsumersAsync = true - }; - - if (!string.IsNullOrEmpty(Configuration["EventBusUserName"])) - { - factory.UserName = Configuration["EventBusUserName"]; - } + IMultiRabbitMQPersistentConnections connections = new MultiRabbitMQPersistentConnections(); + connections.AddConnection(GenerateConnection("TenantA", sp)); + connections.AddConnection(GenerateConnection("TenantB", sp)); - if (!string.IsNullOrEmpty(Configuration["EventBusPassword"])) - { - factory.Password = Configuration["EventBusPassword"]; - } - - var retryCount = 5; - if (!string.IsNullOrEmpty(Configuration["EventBusRetryCount"])) - { - retryCount = int.Parse(Configuration["EventBusRetryCount"]); - } - - return new DefaultRabbitMQPersistentConnection(factory, logger, retryCount); - }); - } + return connections; + }); RegisterEventBus(services); @@ -123,22 +85,21 @@ namespace Microsoft.eShopOnContainers.Services.Locations.API TokenUrl = $"{Configuration.GetValue("IdentityUrlExternal")}/connect/token", Scopes = new Dictionary() { - { "locations", "Locations API" } + {"locations", "Locations API"} } }); options.OperationFilter(); - }); services.AddCors(options => { options.AddPolicy("CorsPolicy", builder => builder - .SetIsOriginAllowed((host) => true) - .AllowAnyMethod() - .AllowAnyHeader() - .AllowCredentials()); + .SetIsOriginAllowed((host) => true) + .AllowAnyMethod() + .AllowAnyHeader() + .AllowCredentials()); }); services.AddSingleton(); @@ -153,6 +114,38 @@ namespace Microsoft.eShopOnContainers.Services.Locations.API return new AutofacServiceProvider(container.Build()); } + + private IRabbitMQPersistentConnection GenerateConnection(String vHost, IServiceProvider sp) + { + var logger = sp.GetRequiredService>(); + + var factory = new ConnectionFactory() + { + HostName = Configuration["EventBusConnection"], + DispatchConsumersAsync = true + }; + + if (!string.IsNullOrEmpty(Configuration["EventBusUserName"])) + { + factory.UserName = Configuration["EventBusUserName"]; + } + + if (!string.IsNullOrEmpty(Configuration["EventBusPassword"])) + { + factory.Password = Configuration["EventBusPassword"]; + } + + factory.VirtualHost = vHost; + + var retryCount = 5; + if (!string.IsNullOrEmpty(Configuration["EventBusRetryCount"])) + { + retryCount = int.Parse(Configuration["EventBusRetryCount"]); + } + + return new DefaultRabbitMQPersistentConnection(factory, logger, retryCount); + } + // This method gets called by the runtime. Use this method to configure the HTTP request pipeline. public void Configure(IApplicationBuilder app, IHostingEnvironment env, ILoggerFactory loggerFactory) { @@ -183,12 +176,14 @@ namespace Microsoft.eShopOnContainers.Services.Locations.API app.UseMvcWithDefaultRoute(); app.UseSwagger() - .UseSwaggerUI(c => - { - c.SwaggerEndpoint($"{ (!string.IsNullOrEmpty(pathBase) ? pathBase : string.Empty) }/swagger/v1/swagger.json", "Locations.API V1"); - c.OAuthClientId("locationsswaggerui"); - c.OAuthAppName("Locations Swagger UI"); - }); + .UseSwaggerUI(c => + { + c.SwaggerEndpoint( + $"{(!string.IsNullOrEmpty(pathBase) ? pathBase : string.Empty)}/swagger/v1/swagger.json", + "Locations.API V1"); + c.OAuthClientId("locationsswaggerui"); + c.OAuthAppName("Locations Swagger UI"); + }); LocationsContextSeed.SeedAsync(app, loggerFactory) .Wait(); @@ -204,6 +199,7 @@ namespace Microsoft.eShopOnContainers.Services.Locations.API // Enable K8s telemetry initializer services.AddApplicationInsightsKubernetesEnricher(); } + if (orchestratorType?.ToUpper() == "SF") { // Enable SF telemetry initializer @@ -218,16 +214,16 @@ namespace Microsoft.eShopOnContainers.Services.Locations.API JwtSecurityTokenHandler.DefaultInboundClaimTypeMap.Clear(); services.AddAuthentication(options => - { - options.DefaultAuthenticateScheme = JwtBearerDefaults.AuthenticationScheme; - options.DefaultChallengeScheme = JwtBearerDefaults.AuthenticationScheme; - }) - .AddJwtBearer(options => - { - options.Authority = Configuration.GetValue("IdentityUrl"); - options.Audience = "locations"; - options.RequireHttpsMetadata = false; - }); + { + options.DefaultAuthenticateScheme = JwtBearerDefaults.AuthenticationScheme; + options.DefaultChallengeScheme = JwtBearerDefaults.AuthenticationScheme; + }) + .AddJwtBearer(options => + { + options.Authority = Configuration.GetValue("IdentityUrl"); + options.Audience = "locations"; + options.RequireHttpsMetadata = false; + }); } protected virtual void ConfigureAuth(IApplicationBuilder app) @@ -244,37 +240,31 @@ namespace Microsoft.eShopOnContainers.Services.Locations.API { var subscriptionClientName = Configuration["SubscriptionClientName"]; - if (Configuration.GetValue("AzureServiceBusEnabled")) + services.AddSingleton(sp => { - services.AddSingleton(sp => - { - var serviceBusPersisterConnection = sp.GetRequiredService(); - var iLifetimeScope = sp.GetRequiredService(); - var logger = sp.GetRequiredService>(); - var eventBusSubcriptionsManager = sp.GetRequiredService(); + var multiRabbitMqPersistentConnections = sp.GetRequiredService(); + var iLifetimeScope = sp.GetRequiredService(); + var logger = sp.GetRequiredService>(); + var eventBusSubcriptionsManager = sp.GetRequiredService(); - return new EventBusServiceBus(serviceBusPersisterConnection, logger, - eventBusSubcriptionsManager, subscriptionClientName, iLifetimeScope); - }); - } - else - { - services.AddSingleton(sp => + var retryCount = 5; + if (!string.IsNullOrEmpty(Configuration["EventBusRetryCount"])) { - var rabbitMQPersistentConnection = sp.GetRequiredService(); - var iLifetimeScope = sp.GetRequiredService(); - var logger = sp.GetRequiredService>(); - var eventBusSubcriptionsManager = sp.GetRequiredService(); + retryCount = int.Parse(Configuration["EventBusRetryCount"]); + } - var retryCount = 5; - if (!string.IsNullOrEmpty(Configuration["EventBusRetryCount"])) - { - retryCount = int.Parse(Configuration["EventBusRetryCount"]); - } + List eventBuses = new List(); - return new EventBusRabbitMQ(rabbitMQPersistentConnection, logger, iLifetimeScope, eventBusSubcriptionsManager, subscriptionClientName, retryCount); - }); - } + eventBuses.Add(new EventBusRabbitMQ(multiRabbitMqPersistentConnections.GetConnections()[0], logger, + iLifetimeScope, eventBusSubcriptionsManager, "TenantA", subscriptionClientName, retryCount)); + eventBuses.Add(new EventBusRabbitMQ(multiRabbitMqPersistentConnections.GetConnections()[1], logger, + iLifetimeScope, eventBusSubcriptionsManager, "TenantB", subscriptionClientName, retryCount)); + Dictionary tenants = new Dictionary(); + tenants.Add(1, "TenantA"); + tenants.Add(2, "TenantB"); + + return new MultiEventBusRabbitMQ(eventBuses, tenants); + }); services.AddSingleton(); } @@ -282,7 +272,8 @@ namespace Microsoft.eShopOnContainers.Services.Locations.API public static class CustomExtensionMethods { - public static IServiceCollection AddCustomHealthCheck(this IServiceCollection services, IConfiguration configuration) + public static IServiceCollection AddCustomHealthCheck(this IServiceCollection services, + IConfiguration configuration) { var hcBuilder = services.AddHealthChecks(); @@ -292,7 +283,7 @@ namespace Microsoft.eShopOnContainers.Services.Locations.API .AddMongoDb( configuration["ConnectionString"], name: "locations-mongodb-check", - tags: new string[] { "mongodb" }); + tags: new string[] {"mongodb"}); if (configuration.GetValue("AzureServiceBusEnabled")) { @@ -301,7 +292,7 @@ namespace Microsoft.eShopOnContainers.Services.Locations.API configuration["EventBusConnection"], topicName: "eshop_event_bus", name: "locations-servicebus-check", - tags: new string[] { "servicebus" }); + tags: new string[] {"servicebus"}); } else { @@ -309,10 +300,10 @@ namespace Microsoft.eShopOnContainers.Services.Locations.API .AddRabbitMQ( $"amqp://{configuration["EventBusConnection"]}", name: "locations-rabbitmqbus-check", - tags: new string[] { "rabbitmqbus" }); + tags: new string[] {"rabbitmqbus"}); } return services; } } -} +} \ No newline at end of file diff --git a/src/Services/Marketing/Marketing.API/Startup.cs b/src/Services/Marketing/Marketing.API/Startup.cs index 7f990e3ad..533f2adc2 100644 --- a/src/Services/Marketing/Marketing.API/Startup.cs +++ b/src/Services/Marketing/Marketing.API/Startup.cs @@ -80,50 +80,17 @@ options.ConfigureWarnings(warnings => warnings.Throw(RelationalEventId.QueryClientEvaluationWarning)); //Check Client vs. Server evaluation: https://docs.microsoft.com/en-us/ef/core/querying/client-eval }); - - if (Configuration.GetValue("AzureServiceBusEnabled")) + + + services.AddSingleton(sp => { - services.AddSingleton(sp => - { - var logger = sp.GetRequiredService>(); + IMultiRabbitMQPersistentConnections connections = new MultiRabbitMQPersistentConnections(); + connections.AddConnection(GenerateConnection("TenantA", sp)); + connections.AddConnection(GenerateConnection("TenantB", sp)); - var serviceBusConnectionString = Configuration["EventBusConnection"]; - var serviceBusConnection = new ServiceBusConnectionStringBuilder(serviceBusConnectionString); - - return new DefaultServiceBusPersisterConnection(serviceBusConnection, logger); - }); - } - else - { - services.AddSingleton(sp => - { - var logger = sp.GetRequiredService>(); - - var factory = new ConnectionFactory() - { - HostName = Configuration["EventBusConnection"], - DispatchConsumersAsync = true - }; - - if (!string.IsNullOrEmpty(Configuration["EventBusUserName"])) - { - factory.UserName = Configuration["EventBusUserName"]; - } - - if (!string.IsNullOrEmpty(Configuration["EventBusPassword"])) - { - factory.Password = Configuration["EventBusPassword"]; - } - - var retryCount = 5; - if (!string.IsNullOrEmpty(Configuration["EventBusRetryCount"])) - { - retryCount = int.Parse(Configuration["EventBusRetryCount"]); - } - - return new DefaultRabbitMQPersistentConnection(factory, logger, retryCount); - }); - } + return connections; + }); + // Add framework services. services.AddSwaggerGen(options => @@ -257,25 +224,10 @@ private void RegisterEventBus(IServiceCollection services) { var subscriptionClientName = Configuration["SubscriptionClientName"]; - - if (Configuration.GetValue("AzureServiceBusEnabled")) { - services.AddSingleton(sp => + services.AddSingleton(sp => { - var serviceBusPersisterConnection = sp.GetRequiredService(); - var iLifetimeScope = sp.GetRequiredService(); - var logger = sp.GetRequiredService>(); - var eventBusSubcriptionsManager = sp.GetRequiredService(); - - return new EventBusServiceBus(serviceBusPersisterConnection, logger, - eventBusSubcriptionsManager, subscriptionClientName, iLifetimeScope); - }); - } - else - { - services.AddSingleton(sp => - { - var rabbitMQPersistentConnection = sp.GetRequiredService(); + var multiRabbitMqPersistentConnections = sp.GetRequiredService(); var iLifetimeScope = sp.GetRequiredService(); var logger = sp.GetRequiredService>(); var eventBusSubcriptionsManager = sp.GetRequiredService(); @@ -286,17 +238,58 @@ retryCount = int.Parse(Configuration["EventBusRetryCount"]); } - return new EventBusRabbitMQ(rabbitMQPersistentConnection, logger, iLifetimeScope, eventBusSubcriptionsManager, subscriptionClientName, retryCount); + List eventBuses = new List(); + + eventBuses.Add(new EventBusRabbitMQ(multiRabbitMqPersistentConnections.GetConnections()[0], logger, + iLifetimeScope, eventBusSubcriptionsManager, "TenantA", subscriptionClientName, retryCount)); + eventBuses.Add(new EventBusRabbitMQ(multiRabbitMqPersistentConnections.GetConnections()[1], logger, + iLifetimeScope, eventBusSubcriptionsManager, "TenantB", subscriptionClientName, retryCount)); + Dictionary tenants = new Dictionary(); + tenants.Add(1, "TenantA"); + tenants.Add(2, "TenantB"); + + return new MultiEventBusRabbitMQ(eventBuses, tenants); }); } services.AddSingleton(); services.AddTransient(); } + + private IRabbitMQPersistentConnection GenerateConnection(String vHost, IServiceProvider sp) + { + var logger = sp.GetRequiredService>(); + + var factory = new ConnectionFactory() + { + HostName = Configuration["EventBusConnection"], + DispatchConsumersAsync = true + }; + + if (!string.IsNullOrEmpty(Configuration["EventBusUserName"])) + { + factory.UserName = Configuration["EventBusUserName"]; + } + + if (!string.IsNullOrEmpty(Configuration["EventBusPassword"])) + { + factory.Password = Configuration["EventBusPassword"]; + } + + factory.VirtualHost = vHost; + + var retryCount = 5; + if (!string.IsNullOrEmpty(Configuration["EventBusRetryCount"])) + { + retryCount = int.Parse(Configuration["EventBusRetryCount"]); + } + + return new DefaultRabbitMQPersistentConnection(factory, logger, retryCount); + } private void ConfigureEventBus(IApplicationBuilder app) { - var eventBus = app.ApplicationServices.GetRequiredService(); + var eventBus = app.ApplicationServices.GetRequiredService(); eventBus.Subscribe(); } diff --git a/src/Services/Ordering/Ordering.API/Application/Commands/SetStockConfirmedOrderStatusCommand.cs b/src/Services/Ordering/Ordering.API/Application/Commands/SetStockConfirmedOrderStatusCommand.cs index 74f002e21..8668214e4 100644 --- a/src/Services/Ordering/Ordering.API/Application/Commands/SetStockConfirmedOrderStatusCommand.cs +++ b/src/Services/Ordering/Ordering.API/Application/Commands/SetStockConfirmedOrderStatusCommand.cs @@ -12,10 +12,13 @@ namespace Ordering.API.Application.Commands [DataMember] public int OrderNumber { get; private set; } + [DataMember] + public int TenantId { get; private set; } - public SetStockConfirmedOrderStatusCommand(int orderNumber) + public SetStockConfirmedOrderStatusCommand(int orderNumber, int tenantId) { OrderNumber = orderNumber; + TenantId = tenantId; } } } \ No newline at end of file diff --git a/src/Services/Ordering/Ordering.API/Application/Commands/SetStockConfirmedOrderStatusCommandHandler.cs b/src/Services/Ordering/Ordering.API/Application/Commands/SetStockConfirmedOrderStatusCommandHandler.cs index b76c16ec6..f82b37aca 100644 --- a/src/Services/Ordering/Ordering.API/Application/Commands/SetStockConfirmedOrderStatusCommandHandler.cs +++ b/src/Services/Ordering/Ordering.API/Application/Commands/SetStockConfirmedOrderStatusCommandHandler.cs @@ -35,7 +35,7 @@ namespace Ordering.API.Application.Commands return false; } - orderToUpdate.SetStockConfirmedStatus(); + orderToUpdate.SetStockConfirmedStatus(command.TenantId); return await _orderRepository.UnitOfWork.SaveEntitiesAsync(cancellationToken); } } diff --git a/src/Services/Ordering/Ordering.API/Application/DomainEventHandlers/OrderStockConfirmed/OrderStatusChangedToStockConfirmedDomainEventHandler.cs b/src/Services/Ordering/Ordering.API/Application/DomainEventHandlers/OrderStockConfirmed/OrderStatusChangedToStockConfirmedDomainEventHandler.cs index 0e9ca4f08..6c4bb4dad 100644 --- a/src/Services/Ordering/Ordering.API/Application/DomainEventHandlers/OrderStockConfirmed/OrderStatusChangedToStockConfirmedDomainEventHandler.cs +++ b/src/Services/Ordering/Ordering.API/Application/DomainEventHandlers/OrderStockConfirmed/OrderStatusChangedToStockConfirmedDomainEventHandler.cs @@ -41,6 +41,8 @@ var buyer = await _buyerRepository.FindByIdAsync(order.GetBuyerId.Value.ToString()); var orderStatusChangedToStockConfirmedIntegrationEvent = new OrderStatusChangedToStockConfirmedIntegrationEvent(order.Id, order.OrderStatus.Name, buyer.Name); + orderStatusChangedToStockConfirmedIntegrationEvent.TenantId = + orderStatusChangedToStockConfirmedDomainEvent.TenantId; await _orderingIntegrationEventService.AddAndSaveEventAsync(orderStatusChangedToStockConfirmedIntegrationEvent); } } diff --git a/src/Services/Ordering/Ordering.API/Application/IntegrationEvents/EventHandling/OrderStockConfirmedIntegrationEventHandler.cs b/src/Services/Ordering/Ordering.API/Application/IntegrationEvents/EventHandling/OrderStockConfirmedIntegrationEventHandler.cs index 6438b01d0..c1b79ed54 100644 --- a/src/Services/Ordering/Ordering.API/Application/IntegrationEvents/EventHandling/OrderStockConfirmedIntegrationEventHandler.cs +++ b/src/Services/Ordering/Ordering.API/Application/IntegrationEvents/EventHandling/OrderStockConfirmedIntegrationEventHandler.cs @@ -33,7 +33,7 @@ { _logger.LogInformation("----- Handling integration event: {IntegrationEventId} at {AppName} - ({@IntegrationEvent})", @event.Id, Program.AppName, @event); - var command = new SetStockConfirmedOrderStatusCommand(@event.OrderId); + var command = new SetStockConfirmedOrderStatusCommand(@event.OrderId, @event.TenantId); _logger.LogInformation( "----- Sending command: {CommandName} - {IdProperty}: {CommandId} ({@Command})", diff --git a/src/Services/Ordering/Ordering.API/Application/IntegrationEvents/EventHandling/UserCheckoutAcceptedIntegrationEventHandler.cs b/src/Services/Ordering/Ordering.API/Application/IntegrationEvents/EventHandling/UserCheckoutAcceptedIntegrationEventHandler.cs index 6b8ed53e4..f67661450 100644 --- a/src/Services/Ordering/Ordering.API/Application/IntegrationEvents/EventHandling/UserCheckoutAcceptedIntegrationEventHandler.cs +++ b/src/Services/Ordering/Ordering.API/Application/IntegrationEvents/EventHandling/UserCheckoutAcceptedIntegrationEventHandler.cs @@ -15,12 +15,12 @@ namespace Ordering.API.Application.IntegrationEvents.EventHandling public class UserCheckoutAcceptedIntegrationEventHandler : IIntegrationEventHandler { private readonly IMediator _mediator; - private readonly IEventBus _eventBus; + private readonly IMultiEventBus _eventBus; private readonly ILogger _logger; public UserCheckoutAcceptedIntegrationEventHandler( IMediator mediator, - ILogger logger, IEventBus eventBus) + ILogger logger, IMultiEventBus eventBus) { _mediator = mediator ?? throw new ArgumentNullException(nameof(mediator)); _logger = logger ?? throw new ArgumentNullException(nameof(logger)); diff --git a/src/Services/Ordering/Ordering.API/Application/IntegrationEvents/OrderingIntegrationEventService.cs b/src/Services/Ordering/Ordering.API/Application/IntegrationEvents/OrderingIntegrationEventService.cs index cb7ce5513..08fa966b3 100644 --- a/src/Services/Ordering/Ordering.API/Application/IntegrationEvents/OrderingIntegrationEventService.cs +++ b/src/Services/Ordering/Ordering.API/Application/IntegrationEvents/OrderingIntegrationEventService.cs @@ -19,13 +19,13 @@ namespace Ordering.API.Application.IntegrationEvents public class OrderingIntegrationEventService : IOrderingIntegrationEventService { private readonly Func _integrationEventLogServiceFactory; - private readonly IEventBus _eventBus; + private readonly IMultiEventBus _eventBus; private readonly OrderingContext _orderingContext; private readonly IntegrationEventLogContext _eventLogContext; private readonly IIntegrationEventLogService _eventLogService; private readonly ILogger _logger; - public OrderingIntegrationEventService(IEventBus eventBus, + public OrderingIntegrationEventService(IMultiEventBus eventBus, OrderingContext orderingContext, IntegrationEventLogContext eventLogContext, Func integrationEventLogServiceFactory, diff --git a/src/Services/Ordering/Ordering.API/Startup.cs b/src/Services/Ordering/Ordering.API/Startup.cs index 8be92a453..d4fe19ea3 100644 --- a/src/Services/Ordering/Ordering.API/Startup.cs +++ b/src/Services/Ordering/Ordering.API/Startup.cs @@ -115,7 +115,7 @@ private void ConfigureEventBus(IApplicationBuilder app) { - var eventBus = app.ApplicationServices.GetRequiredService(); + var eventBus = app.ApplicationServices.GetRequiredService(); eventBus.Subscribe>(); eventBus.Subscribe>(); @@ -284,52 +284,47 @@ services.AddTransient(); - if (configuration.GetValue("AzureServiceBusEnabled")) + services.AddSingleton(sp => { - services.AddSingleton(sp => - { - var logger = sp.GetRequiredService>(); - - var serviceBusConnectionString = configuration["EventBusConnection"]; - var serviceBusConnection = new ServiceBusConnectionStringBuilder(serviceBusConnectionString); + IMultiRabbitMQPersistentConnections connections = new MultiRabbitMQPersistentConnections(); + connections.AddConnection(GenerateConnection("TenantA", sp, configuration)); + connections.AddConnection(GenerateConnection("TenantB", sp, configuration)); - return new DefaultServiceBusPersisterConnection(serviceBusConnection, logger); - }); - } - else - { - services.AddSingleton(sp => - { - var logger = sp.GetRequiredService>(); + return connections; + }); + return services; + } + + private static IRabbitMQPersistentConnection GenerateConnection(String vHost, IServiceProvider sp, IConfiguration configuration) + { + var logger = sp.GetRequiredService>(); - var factory = new ConnectionFactory() - { - HostName = configuration["EventBusConnection"], - DispatchConsumersAsync = true - }; + var factory = new ConnectionFactory() + { + HostName = configuration["EventBusConnection"], + DispatchConsumersAsync = true + }; - if (!string.IsNullOrEmpty(configuration["EventBusUserName"])) - { - factory.UserName = configuration["EventBusUserName"]; - } + if (!string.IsNullOrEmpty(configuration["EventBusUserName"])) + { + factory.UserName = configuration["EventBusUserName"]; + } - if (!string.IsNullOrEmpty(configuration["EventBusPassword"])) - { - factory.Password = configuration["EventBusPassword"]; - } + if (!string.IsNullOrEmpty(configuration["EventBusPassword"])) + { + factory.Password = configuration["EventBusPassword"]; + } - var retryCount = 5; - if (!string.IsNullOrEmpty(configuration["EventBusRetryCount"])) - { - retryCount = int.Parse(configuration["EventBusRetryCount"]); - } + factory.VirtualHost = vHost; - return new DefaultRabbitMQPersistentConnection(factory, logger, retryCount); - }); + var retryCount = 5; + if (!string.IsNullOrEmpty(configuration["EventBusRetryCount"])) + { + retryCount = int.Parse(configuration["EventBusRetryCount"]); } - return services; + return new DefaultRabbitMQPersistentConnection(factory, logger, retryCount); } public static IServiceCollection AddCustomConfiguration(this IServiceCollection services, IConfiguration configuration) @@ -360,25 +355,12 @@ public static IServiceCollection AddEventBus(this IServiceCollection services, IConfiguration configuration) { var subscriptionClientName = configuration["SubscriptionClientName"]; - - if (configuration.GetValue("AzureServiceBusEnabled")) + { - services.AddSingleton(sp => - { - var serviceBusPersisterConnection = sp.GetRequiredService(); - var iLifetimeScope = sp.GetRequiredService(); - var logger = sp.GetRequiredService>(); - var eventBusSubcriptionsManager = sp.GetRequiredService(); - return new EventBusServiceBus(serviceBusPersisterConnection, logger, - eventBusSubcriptionsManager, subscriptionClientName, iLifetimeScope); - }); - } - else - { - services.AddSingleton(sp => + services.AddSingleton(sp => { - var rabbitMQPersistentConnection = sp.GetRequiredService(); + var multiRabbitMqPersistentConnections = sp.GetRequiredService(); var iLifetimeScope = sp.GetRequiredService(); var logger = sp.GetRequiredService>(); var eventBusSubcriptionsManager = sp.GetRequiredService(); @@ -389,7 +371,17 @@ retryCount = int.Parse(configuration["EventBusRetryCount"]); } - return new EventBusRabbitMQ(rabbitMQPersistentConnection, logger, iLifetimeScope, eventBusSubcriptionsManager, subscriptionClientName, retryCount); + List eventBuses = new List(); + + eventBuses.Add(new EventBusRabbitMQ(multiRabbitMqPersistentConnections.GetConnections()[0], logger, + iLifetimeScope, eventBusSubcriptionsManager, "TenantA", subscriptionClientName, retryCount)); + eventBuses.Add(new EventBusRabbitMQ(multiRabbitMqPersistentConnections.GetConnections()[1], logger, + iLifetimeScope, eventBusSubcriptionsManager, "TenantB", subscriptionClientName, retryCount)); + Dictionary tenants = new Dictionary(); + tenants.Add(1, "TenantA"); + tenants.Add(2, "TenantB"); + + return new MultiEventBusRabbitMQ(eventBuses, tenants); }); } diff --git a/src/Services/Ordering/Ordering.API/appsettings.json b/src/Services/Ordering/Ordering.API/appsettings.json index f74c312b1..ddb377d46 100644 --- a/src/Services/Ordering/Ordering.API/appsettings.json +++ b/src/Services/Ordering/Ordering.API/appsettings.json @@ -6,10 +6,10 @@ "SeqServerUrl": null, "LogstashgUrl": null, "MinimumLevel": { - "Default": "Information", + "Default": "Verbose", "Override": { "Microsoft": "Warning", - "Microsoft.eShopOnContainers": "Information", + "Microsoft.eShopOnContainers": "Verbose", "System": "Warning" } } diff --git a/src/Services/Ordering/Ordering.BackgroundTasks/Startup.cs b/src/Services/Ordering/Ordering.BackgroundTasks/Startup.cs index 9d6a78e38..7d39ead70 100644 --- a/src/Services/Ordering/Ordering.BackgroundTasks/Startup.cs +++ b/src/Services/Ordering/Ordering.BackgroundTasks/Startup.cs @@ -15,6 +15,7 @@ using Ordering.BackgroundTasks.Configuration; using Ordering.BackgroundTasks.Tasks; using RabbitMQ.Client; using System; +using System.Collections.Generic; using HealthChecks.UI.Client; using Microsoft.AspNetCore.Diagnostics.HealthChecks; using Microsoft.Extensions.Diagnostics.HealthChecks; @@ -46,60 +47,56 @@ namespace Ordering.BackgroundTasks services.AddSingleton(); //configure event bus related services - - if (Configuration.GetValue("AzureServiceBusEnabled")) + + services.AddSingleton(sp => { - services.AddSingleton(sp => - { - var logger = sp.GetRequiredService>(); + IMultiRabbitMQPersistentConnections connections = new MultiRabbitMQPersistentConnections(); + connections.AddConnection(GenerateConnection("TenantA", sp)); + connections.AddConnection(GenerateConnection("TenantB", sp)); - var serviceBusConnectionString = Configuration["EventBusConnection"]; - var serviceBusConnection = new ServiceBusConnectionStringBuilder(serviceBusConnectionString); + return connections; + }); + - return new DefaultServiceBusPersisterConnection(serviceBusConnection, logger); - }); - } - else - { - services.AddSingleton(sp => - { - var logger = sp.GetRequiredService>(); + RegisterEventBus(services); + //create autofac based service provider + var container = new ContainerBuilder(); + container.Populate(services); - var factory = new ConnectionFactory() - { - HostName = Configuration["EventBusConnection"], - DispatchConsumersAsync = true - }; - if (!string.IsNullOrEmpty(Configuration["EventBusUserName"])) - { - factory.UserName = Configuration["EventBusUserName"]; - } + return new AutofacServiceProvider(container.Build()); + } - if (!string.IsNullOrEmpty(Configuration["EventBusPassword"])) - { - factory.Password = Configuration["EventBusPassword"]; - } + private IRabbitMQPersistentConnection GenerateConnection(String vHost, IServiceProvider sp) + { + var logger = sp.GetRequiredService>(); - var retryCount = 5; - if (!string.IsNullOrEmpty(Configuration["EventBusRetryCount"])) - { - retryCount = int.Parse(Configuration["EventBusRetryCount"]); - } + var factory = new ConnectionFactory() + { + HostName = Configuration["EventBusConnection"], + DispatchConsumersAsync = true + }; - return new DefaultRabbitMQPersistentConnection(factory, logger, retryCount); - }); + if (!string.IsNullOrEmpty(Configuration["EventBusUserName"])) + { + factory.UserName = Configuration["EventBusUserName"]; } - RegisterEventBus(services); + if (!string.IsNullOrEmpty(Configuration["EventBusPassword"])) + { + factory.Password = Configuration["EventBusPassword"]; + } - //create autofac based service provider - var container = new ContainerBuilder(); - container.Populate(services); + factory.VirtualHost = vHost; + var retryCount = 5; + if (!string.IsNullOrEmpty(Configuration["EventBusRetryCount"])) + { + retryCount = int.Parse(Configuration["EventBusRetryCount"]); + } - return new AutofacServiceProvider(container.Build()); + return new DefaultRabbitMQPersistentConnection(factory, logger, retryCount); } @@ -122,38 +119,33 @@ namespace Ordering.BackgroundTasks private void RegisterEventBus(IServiceCollection services) { var subscriptionClientName = Configuration["SubscriptionClientName"]; - - if (Configuration.GetValue("AzureServiceBusEnabled")) - { - services.AddSingleton(sp => - { - var serviceBusPersisterConnection = sp.GetRequiredService(); - var iLifetimeScope = sp.GetRequiredService(); - var logger = sp.GetRequiredService>(); - var eventBusSubcriptionsManager = sp.GetRequiredService(); - - return new EventBusServiceBus(serviceBusPersisterConnection, logger, - eventBusSubcriptionsManager, subscriptionClientName, iLifetimeScope); - }); - } - else + + services.AddSingleton(sp => { - services.AddSingleton(sp => + var multiRabbitMqPersistentConnections = sp.GetRequiredService(); + var iLifetimeScope = sp.GetRequiredService(); + var logger = sp.GetRequiredService>(); + var eventBusSubcriptionsManager = sp.GetRequiredService(); + + var retryCount = 5; + if (!string.IsNullOrEmpty(Configuration["EventBusRetryCount"])) { - var rabbitMQPersistentConnection = sp.GetRequiredService(); - var iLifetimeScope = sp.GetRequiredService(); - var logger = sp.GetRequiredService>(); - var eventBusSubcriptionsManager = sp.GetRequiredService(); - - var retryCount = 5; - if (!string.IsNullOrEmpty(Configuration["EventBusRetryCount"])) - { - retryCount = int.Parse(Configuration["EventBusRetryCount"]); - } - - return new EventBusRabbitMQ(rabbitMQPersistentConnection, logger, iLifetimeScope, eventBusSubcriptionsManager, subscriptionClientName, retryCount); - }); - } + retryCount = int.Parse(Configuration["EventBusRetryCount"]); + } + + List eventBuses = new List(); + + eventBuses.Add(new EventBusRabbitMQ(multiRabbitMqPersistentConnections.GetConnections()[0], logger, + iLifetimeScope, eventBusSubcriptionsManager, "TenantA", subscriptionClientName, retryCount)); + eventBuses.Add(new EventBusRabbitMQ(multiRabbitMqPersistentConnections.GetConnections()[1], logger, + iLifetimeScope, eventBusSubcriptionsManager, "TenantB", subscriptionClientName, retryCount)); + Dictionary tenants = new Dictionary(); + tenants.Add(1, "TenantA"); + tenants.Add(2, "TenantB"); + + return new MultiEventBusRabbitMQ(eventBuses, tenants); + }); + services.AddSingleton(); } diff --git a/src/Services/Ordering/Ordering.BackgroundTasks/Tasks/GracePeriodManagerTask.cs b/src/Services/Ordering/Ordering.BackgroundTasks/Tasks/GracePeriodManagerTask.cs index b667615fb..6f8282350 100644 --- a/src/Services/Ordering/Ordering.BackgroundTasks/Tasks/GracePeriodManagerTask.cs +++ b/src/Services/Ordering/Ordering.BackgroundTasks/Tasks/GracePeriodManagerTask.cs @@ -21,12 +21,12 @@ namespace Ordering.BackgroundTasks.Tasks { private readonly ILogger _logger; private readonly BackgroundTaskSettings _settings; - private readonly IEventBus _eventBus; + private readonly IMultiEventBus _eventBus; private static readonly String identityUrl = @"http://identity.api/"; public GracePeriodManagerService( IOptions settings, - IEventBus eventBus, + IMultiEventBus eventBus, ILogger logger) { _settings = settings?.Value ?? throw new ArgumentNullException(nameof(settings)); diff --git a/src/Services/Ordering/Ordering.Domain/AggregatesModel/OrderAggregate/Order.cs b/src/Services/Ordering/Ordering.Domain/AggregatesModel/OrderAggregate/Order.cs index 59c981935..e2ac9f134 100644 --- a/src/Services/Ordering/Ordering.Domain/AggregatesModel/OrderAggregate/Order.cs +++ b/src/Services/Ordering/Ordering.Domain/AggregatesModel/OrderAggregate/Order.cs @@ -114,11 +114,11 @@ namespace Microsoft.eShopOnContainers.Services.Ordering.Domain.AggregatesModel.O } } - public void SetStockConfirmedStatus() + public void SetStockConfirmedStatus(int tenantId) { if (_orderStatusId == OrderStatus.AwaitingValidation.Id) { - AddDomainEvent(new OrderStatusChangedToStockConfirmedDomainEvent(Id)); + AddDomainEvent(new OrderStatusChangedToStockConfirmedDomainEvent(Id).withTenantId(tenantId)); _orderStatusId = OrderStatus.StockConfirmed.Id; _description = "All the items were confirmed with available stock."; diff --git a/src/Services/Ordering/Ordering.Domain/Events/OrderStatusChangedToStockConfirmedDomainEvent.cs b/src/Services/Ordering/Ordering.Domain/Events/OrderStatusChangedToStockConfirmedDomainEvent.cs index b16bebbcc..c68a69855 100644 --- a/src/Services/Ordering/Ordering.Domain/Events/OrderStatusChangedToStockConfirmedDomainEvent.cs +++ b/src/Services/Ordering/Ordering.Domain/Events/OrderStatusChangedToStockConfirmedDomainEvent.cs @@ -9,8 +9,15 @@ : INotification { public int OrderId { get; } + public int TenantId { get; set; } public OrderStatusChangedToStockConfirmedDomainEvent(int orderId) => OrderId = orderId; + + public OrderStatusChangedToStockConfirmedDomainEvent withTenantId(int tenantId) + { + this.TenantId = tenantId; + return this; + } } } \ No newline at end of file diff --git a/src/Services/Ordering/Ordering.SignalrHub/Startup.cs b/src/Services/Ordering/Ordering.SignalrHub/Startup.cs index edcc80521..f335f3986 100644 --- a/src/Services/Ordering/Ordering.SignalrHub/Startup.cs +++ b/src/Services/Ordering/Ordering.SignalrHub/Startup.cs @@ -16,6 +16,7 @@ using Ordering.SignalrHub.IntegrationEvents.EventHandling; using Ordering.SignalrHub.IntegrationEvents.Events; using RabbitMQ.Client; using System; +using System.Collections.Generic; using System.IdentityModel.Tokens.Jwt; using HealthChecks.UI.Client; using Microsoft.AspNetCore.Diagnostics.HealthChecks; @@ -42,10 +43,10 @@ namespace Ordering.SignalrHub { options.AddPolicy("CorsPolicy", builder => builder - .AllowAnyMethod() - .AllowAnyHeader() - .SetIsOriginAllowed((host) => true) - .AllowCredentials()); + .AllowAnyMethod() + .AllowAnyHeader() + .SetIsOriginAllowed((host) => true) + .AllowCredentials()); }); if (Configuration.GetValue("IsClusterEnv") == bool.TrueString) @@ -59,50 +60,14 @@ namespace Ordering.SignalrHub services.AddSignalR(); } - if (Configuration.GetValue("AzureServiceBusEnabled")) + services.AddSingleton(sp => { - services.AddSingleton(sp => - { - var logger = sp.GetRequiredService>(); - - var serviceBusConnectionString = Configuration["EventBusConnection"]; - var serviceBusConnection = new ServiceBusConnectionStringBuilder(serviceBusConnectionString); - - return new DefaultServiceBusPersisterConnection(serviceBusConnection, logger); - }); - } - else - { - services.AddSingleton(sp => - { - var logger = sp.GetRequiredService>(); - - - var factory = new ConnectionFactory() - { - HostName = Configuration["EventBusConnection"], - DispatchConsumersAsync = true - }; - - if (!string.IsNullOrEmpty(Configuration["EventBusUserName"])) - { - factory.UserName = Configuration["EventBusUserName"]; - } - - if (!string.IsNullOrEmpty(Configuration["EventBusPassword"])) - { - factory.Password = Configuration["EventBusPassword"]; - } + IMultiRabbitMQPersistentConnections connections = new MultiRabbitMQPersistentConnections(); + connections.AddConnection(GenerateConnection("TenantA", sp)); + connections.AddConnection(GenerateConnection("TenantB", sp)); - var retryCount = 5; - if (!string.IsNullOrEmpty(Configuration["EventBusRetryCount"])) - { - retryCount = int.Parse(Configuration["EventBusRetryCount"]); - } - - return new DefaultRabbitMQPersistentConnection(factory, logger, retryCount); - }); - } + return connections; + }); ConfigureAuthService(services); @@ -118,6 +83,38 @@ namespace Ordering.SignalrHub return new AutofacServiceProvider(container.Build()); } + private IRabbitMQPersistentConnection GenerateConnection(String vHost, IServiceProvider sp) + { + var logger = sp.GetRequiredService>(); + + var factory = new ConnectionFactory() + { + HostName = Configuration["EventBusConnection"], + DispatchConsumersAsync = true + }; + + if (!string.IsNullOrEmpty(Configuration["EventBusUserName"])) + { + factory.UserName = Configuration["EventBusUserName"]; + } + + if (!string.IsNullOrEmpty(Configuration["EventBusPassword"])) + { + factory.Password = Configuration["EventBusPassword"]; + } + + factory.VirtualHost = vHost; + + var retryCount = 5; + if (!string.IsNullOrEmpty(Configuration["EventBusRetryCount"])) + { + retryCount = int.Parse(Configuration["EventBusRetryCount"]); + } + + return new DefaultRabbitMQPersistentConnection(factory, logger, retryCount); + } + + // This method gets called by the runtime. Use this method to configure the HTTP request pipeline. public void Configure(IApplicationBuilder app, ILoggerFactory loggerFactory) { @@ -159,14 +156,25 @@ namespace Ordering.SignalrHub private void ConfigureEventBus(IApplicationBuilder app) { - var eventBus = app.ApplicationServices.GetRequiredService(); - - eventBus.Subscribe(); - eventBus.Subscribe(); - eventBus.Subscribe(); - eventBus.Subscribe(); - eventBus.Subscribe(); - eventBus.Subscribe(); + var eventBus = app.ApplicationServices.GetRequiredService(); + + eventBus + .Subscribe(); + eventBus + .Subscribe(); + eventBus + .Subscribe(); + eventBus + .Subscribe(); + eventBus + .Subscribe(); + eventBus + .Subscribe(); } private void ConfigureAuthService(IServiceCollection services) @@ -180,7 +188,6 @@ namespace Ordering.SignalrHub { options.DefaultAuthenticateScheme = JwtBearerDefaults.AuthenticationScheme; options.DefaultChallengeScheme = JwtBearerDefaults.AuthenticationScheme; - }).AddJwtBearer(options => { options.Authority = identityUrl; @@ -193,37 +200,33 @@ namespace Ordering.SignalrHub { var subscriptionClientName = Configuration["SubscriptionClientName"]; - if (Configuration.GetValue("AzureServiceBusEnabled")) - { - services.AddSingleton(sp => - { - var serviceBusPersisterConnection = sp.GetRequiredService(); - var iLifetimeScope = sp.GetRequiredService(); - var logger = sp.GetRequiredService>(); - var eventBusSubcriptionsManager = sp.GetRequiredService(); - return new EventBusServiceBus(serviceBusPersisterConnection, logger, - eventBusSubcriptionsManager, subscriptionClientName, iLifetimeScope); - }); - } - else + services.AddSingleton(sp => { - services.AddSingleton(sp => + var multiRabbitMqPersistentConnections = sp.GetRequiredService(); + var iLifetimeScope = sp.GetRequiredService(); + var logger = sp.GetRequiredService>(); + var eventBusSubcriptionsManager = sp.GetRequiredService(); + + var retryCount = 5; + if (!string.IsNullOrEmpty(Configuration["EventBusRetryCount"])) { - var rabbitMQPersistentConnection = sp.GetRequiredService(); - var iLifetimeScope = sp.GetRequiredService(); - var logger = sp.GetRequiredService>(); - var eventBusSubcriptionsManager = sp.GetRequiredService(); - - var retryCount = 5; - if (!string.IsNullOrEmpty(Configuration["EventBusRetryCount"])) - { - retryCount = int.Parse(Configuration["EventBusRetryCount"]); - } - - return new EventBusRabbitMQ(rabbitMQPersistentConnection, logger, iLifetimeScope, eventBusSubcriptionsManager, subscriptionClientName, retryCount); - }); - } + retryCount = int.Parse(Configuration["EventBusRetryCount"]); + } + + List eventBuses = new List(); + + eventBuses.Add(new EventBusRabbitMQ(multiRabbitMqPersistentConnections.GetConnections()[0], logger, + iLifetimeScope, eventBusSubcriptionsManager, "TenantA", subscriptionClientName, retryCount)); + eventBuses.Add(new EventBusRabbitMQ(multiRabbitMqPersistentConnections.GetConnections()[1], logger, + iLifetimeScope, eventBusSubcriptionsManager, "TenantB", subscriptionClientName, retryCount)); + Dictionary tenants = new Dictionary(); + tenants.Add(1, "TenantA"); + tenants.Add(2, "TenantB"); + + return new MultiEventBusRabbitMQ(eventBuses, tenants); + }); + services.AddSingleton(); } @@ -231,7 +234,8 @@ namespace Ordering.SignalrHub public static class CustomExtensionMethods { - public static IServiceCollection AddCustomHealthCheck(this IServiceCollection services, IConfiguration configuration) + public static IServiceCollection AddCustomHealthCheck(this IServiceCollection services, + IConfiguration configuration) { var hcBuilder = services.AddHealthChecks(); @@ -244,7 +248,7 @@ namespace Ordering.SignalrHub configuration["EventBusConnection"], topicName: "eshop_event_bus", name: "signalr-servicebus-check", - tags: new string[] { "servicebus" }); + tags: new string[] {"servicebus"}); } else { @@ -252,10 +256,10 @@ namespace Ordering.SignalrHub .AddRabbitMQ( $"amqp://{configuration["EventBusConnection"]}", name: "signalr-rabbitmqbus-check", - tags: new string[] { "rabbitmqbus" }); + tags: new string[] {"rabbitmqbus"}); } return services; } } -} +} \ No newline at end of file diff --git a/src/Services/Payment/Payment.API/IntegrationEvents/EventHandling/OrderStatusChangedToStockConfirmedIntegrationEventHandler.cs b/src/Services/Payment/Payment.API/IntegrationEvents/EventHandling/OrderStatusChangedToStockConfirmedIntegrationEventHandler.cs index c2f31b4cf..31d126cd0 100644 --- a/src/Services/Payment/Payment.API/IntegrationEvents/EventHandling/OrderStatusChangedToStockConfirmedIntegrationEventHandler.cs +++ b/src/Services/Payment/Payment.API/IntegrationEvents/EventHandling/OrderStatusChangedToStockConfirmedIntegrationEventHandler.cs @@ -1,4 +1,6 @@ -namespace Payment.API.IntegrationEvents.EventHandling +using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Extensions; + +namespace Payment.API.IntegrationEvents.EventHandling { using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Abstractions; using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Events; @@ -11,12 +13,12 @@ public class OrderStatusChangedToStockConfirmedIntegrationEventHandler : IIntegrationEventHandler { - private readonly IEventBus _eventBus; + private readonly IMultiEventBus _eventBus; private readonly PaymentSettings _settings; private readonly ILogger _logger; public OrderStatusChangedToStockConfirmedIntegrationEventHandler( - IEventBus eventBus, + IMultiEventBus eventBus, IOptionsSnapshot settings, ILogger logger) { @@ -50,6 +52,7 @@ orderPaymentIntegrationEvent = new OrderPaymentFailedIntegrationEvent(@event.OrderId); } + orderPaymentIntegrationEvent.TenantId = @event.TenantId; _logger.LogInformation("----- Publishing integration event: {IntegrationEventId} from {AppName} - ({@IntegrationEvent})", orderPaymentIntegrationEvent.Id, Program.AppName, orderPaymentIntegrationEvent); _eventBus.Publish(orderPaymentIntegrationEvent); diff --git a/src/Services/Payment/Payment.API/Startup.cs b/src/Services/Payment/Payment.API/Startup.cs index 39bb78f91..cdc7ebb63 100644 --- a/src/Services/Payment/Payment.API/Startup.cs +++ b/src/Services/Payment/Payment.API/Startup.cs @@ -16,6 +16,7 @@ using Payment.API.IntegrationEvents.EventHandling; using Payment.API.IntegrationEvents.Events; using RabbitMQ.Client; using System; +using System.Collections.Generic; using HealthChecks.UI.Client; using Microsoft.AspNetCore.Diagnostics.HealthChecks; using Microsoft.Extensions.Diagnostics.HealthChecks; @@ -38,50 +39,16 @@ namespace Payment.API services.Configure(Configuration); RegisterAppInsights(services); - - if (Configuration.GetValue("AzureServiceBusEnabled")) + + services.AddSingleton(sp => { - services.AddSingleton(sp => - { - var logger = sp.GetRequiredService>(); - - var serviceBusConnectionString = Configuration["EventBusConnection"]; - var serviceBusConnection = new ServiceBusConnectionStringBuilder(serviceBusConnectionString); - - return new DefaultServiceBusPersisterConnection(serviceBusConnection, logger); - }); - } - else - { - services.AddSingleton(sp => - { - var logger = sp.GetRequiredService>(); - var factory = new ConnectionFactory() - { - HostName = Configuration["EventBusConnection"], - DispatchConsumersAsync = true - }; - - if (!string.IsNullOrEmpty(Configuration["EventBusUserName"])) - { - factory.UserName = Configuration["EventBusUserName"]; - } - - if (!string.IsNullOrEmpty(Configuration["EventBusPassword"])) - { - factory.Password = Configuration["EventBusPassword"]; - } - - var retryCount = 5; - if (!string.IsNullOrEmpty(Configuration["EventBusRetryCount"])) - { - retryCount = int.Parse(Configuration["EventBusRetryCount"]); - } - - return new DefaultRabbitMQPersistentConnection(factory, logger, retryCount); - }); - } + IMultiRabbitMQPersistentConnections connections = new MultiRabbitMQPersistentConnections(); + connections.AddConnection(GenerateConnection("TenantA", sp)); + connections.AddConnection(GenerateConnection("TenantB", sp)); + return connections; + }); + RegisterEventBus(services); var container = new ContainerBuilder(); @@ -89,6 +56,37 @@ namespace Payment.API return new AutofacServiceProvider(container.Build()); } + private IRabbitMQPersistentConnection GenerateConnection(String vHost, IServiceProvider sp) + { + var logger = sp.GetRequiredService>(); + + var factory = new ConnectionFactory() + { + HostName = Configuration["EventBusConnection"], + DispatchConsumersAsync = true + }; + + if (!string.IsNullOrEmpty(Configuration["EventBusUserName"])) + { + factory.UserName = Configuration["EventBusUserName"]; + } + + if (!string.IsNullOrEmpty(Configuration["EventBusPassword"])) + { + factory.Password = Configuration["EventBusPassword"]; + } + + factory.VirtualHost = vHost; + + var retryCount = 5; + if (!string.IsNullOrEmpty(Configuration["EventBusRetryCount"])) + { + retryCount = int.Parse(Configuration["EventBusRetryCount"]); + } + + return new DefaultRabbitMQPersistentConnection(factory, logger, retryCount); + } + // This method gets called by the runtime. Use this method to configure the HTTP request pipeline. public void Configure(IApplicationBuilder app, IHostingEnvironment env, ILoggerFactory loggerFactory) { @@ -138,37 +136,33 @@ namespace Payment.API { var subscriptionClientName = Configuration["SubscriptionClientName"]; - if (Configuration.GetValue("AzureServiceBusEnabled")) + + services.AddSingleton(sp => { - services.AddSingleton(sp => - { - var serviceBusPersisterConnection = sp.GetRequiredService(); - var iLifetimeScope = sp.GetRequiredService(); - var logger = sp.GetRequiredService>(); - var eventBusSubcriptionsManager = sp.GetRequiredService(); - - return new EventBusServiceBus(serviceBusPersisterConnection, logger, - eventBusSubcriptionsManager, subscriptionClientName, iLifetimeScope); - }); - } - else - { - services.AddSingleton(sp => + var multiRabbitMqPersistentConnections = sp.GetRequiredService(); + var iLifetimeScope = sp.GetRequiredService(); + var logger = sp.GetRequiredService>(); + var eventBusSubcriptionsManager = sp.GetRequiredService(); + + var retryCount = 5; + if (!string.IsNullOrEmpty(Configuration["EventBusRetryCount"])) { - var rabbitMQPersistentConnection = sp.GetRequiredService(); - var iLifetimeScope = sp.GetRequiredService(); - var logger = sp.GetRequiredService>(); - var eventBusSubcriptionsManager = sp.GetRequiredService(); - - var retryCount = 5; - if (!string.IsNullOrEmpty(Configuration["EventBusRetryCount"])) - { - retryCount = int.Parse(Configuration["EventBusRetryCount"]); - } - - return new EventBusRabbitMQ(rabbitMQPersistentConnection, logger, iLifetimeScope, eventBusSubcriptionsManager, subscriptionClientName, retryCount); - }); - } + retryCount = int.Parse(Configuration["EventBusRetryCount"]); + } + + List eventBuses = new List(); + + eventBuses.Add(new EventBusRabbitMQ(multiRabbitMqPersistentConnections.GetConnections()[0], logger, + iLifetimeScope, eventBusSubcriptionsManager, "TenantA", subscriptionClientName, retryCount)); + eventBuses.Add(new EventBusRabbitMQ(multiRabbitMqPersistentConnections.GetConnections()[1], logger, + iLifetimeScope, eventBusSubcriptionsManager, "TenantB", subscriptionClientName, retryCount)); + Dictionary tenants = new Dictionary(); + tenants.Add(1, "TenantA"); + tenants.Add(2, "TenantB"); + + return new MultiEventBusRabbitMQ(eventBuses, tenants); + }); + services.AddTransient(); services.AddSingleton(); @@ -176,7 +170,7 @@ namespace Payment.API private void ConfigureEventBus(IApplicationBuilder app) { - var eventBus = app.ApplicationServices.GetRequiredService(); + var eventBus = app.ApplicationServices.GetRequiredService(); eventBus.Subscribe(); } } diff --git a/src/Services/TenantCustomisations/TenantACustomisations/Startup.cs b/src/Services/TenantCustomisations/TenantACustomisations/Startup.cs index c8f551829..b61568d67 100644 --- a/src/Services/TenantCustomisations/TenantACustomisations/Startup.cs +++ b/src/Services/TenantCustomisations/TenantACustomisations/Startup.cs @@ -311,6 +311,8 @@ factory.Password = configuration["EventBusPassword"]; } + factory.VirtualHost = "TenantA"; + var retryCount = 5; if (!string.IsNullOrEmpty(configuration["EventBusRetryCount"])) { @@ -353,7 +355,7 @@ { var subscriptionClientName = configuration["SubscriptionClientName"]; - if (configuration.GetValue("AzureServiceBusEnabled")) +/* if (configuration.GetValue("AzureServiceBusEnabled")) { services.AddSingleton(sp => { @@ -366,7 +368,7 @@ eventBusSubcriptionsManager, subscriptionClientName, iLifetimeScope); }); } - else + else*/ { services.AddSingleton(sp => { @@ -381,7 +383,7 @@ retryCount = int.Parse(configuration["EventBusRetryCount"]); } - return new EventBusRabbitMQ(rabbitMQPersistentConnection, logger, iLifetimeScope, eventBusSubcriptionsManager, subscriptionClientName, retryCount); + return new EventBusRabbitMQ(rabbitMQPersistentConnection, logger, iLifetimeScope, eventBusSubcriptionsManager, "TenantA", subscriptionClientName, retryCount); }); } diff --git a/src/Services/Webhooks/Webhooks.API/Startup.cs b/src/Services/Webhooks/Webhooks.API/Startup.cs index f1a89d329..8186f1cde 100644 --- a/src/Services/Webhooks/Webhooks.API/Startup.cs +++ b/src/Services/Webhooks/Webhooks.API/Startup.cs @@ -126,7 +126,7 @@ namespace Webhooks.API protected virtual void ConfigureEventBus(IApplicationBuilder app) { - var eventBus = app.ApplicationServices.GetRequiredService(); + var eventBus = app.ApplicationServices.GetRequiredService(); eventBus.Subscribe(); eventBus.Subscribe(); eventBus.Subscribe(); @@ -231,26 +231,10 @@ namespace Webhooks.API public static IServiceCollection AddEventBus(this IServiceCollection services, IConfiguration configuration) { var subscriptionClientName = configuration["SubscriptionClientName"]; - - if (configuration.GetValue("AzureServiceBusEnabled")) - { - services.AddSingleton(sp => - { - var serviceBusPersisterConnection = sp.GetRequiredService(); - var iLifetimeScope = sp.GetRequiredService(); - var logger = sp.GetRequiredService>(); - var eventBusSubcriptionsManager = sp.GetRequiredService(); - - return new EventBusServiceBus(serviceBusPersisterConnection, logger, - eventBusSubcriptionsManager, subscriptionClientName, iLifetimeScope); - }); - - } - else - { - services.AddSingleton(sp => + + services.AddSingleton(sp => { - var rabbitMQPersistentConnection = sp.GetRequiredService(); + var multiRabbitMqPersistentConnections = sp.GetRequiredService(); var iLifetimeScope = sp.GetRequiredService(); var logger = sp.GetRequiredService>(); var eventBusSubcriptionsManager = sp.GetRequiredService(); @@ -261,9 +245,19 @@ namespace Webhooks.API retryCount = int.Parse(configuration["EventBusRetryCount"]); } - return new EventBusRabbitMQ(rabbitMQPersistentConnection, logger, iLifetimeScope, eventBusSubcriptionsManager, subscriptionClientName, retryCount); + List eventBuses = new List(); + + eventBuses.Add(new EventBusRabbitMQ(multiRabbitMqPersistentConnections.GetConnections()[0], logger, + iLifetimeScope, eventBusSubcriptionsManager, "TenantA", subscriptionClientName, retryCount)); + eventBuses.Add(new EventBusRabbitMQ(multiRabbitMqPersistentConnections.GetConnections()[1], logger, + iLifetimeScope, eventBusSubcriptionsManager, "TenantB", subscriptionClientName, retryCount)); + Dictionary tenants = new Dictionary(); + tenants.Add(1, "TenantA"); + tenants.Add(2, "TenantB"); + + return new MultiEventBusRabbitMQ(eventBuses, tenants); }); - } + services.AddSingleton(); services.AddTransient(); @@ -304,51 +298,52 @@ namespace Webhooks.API { services.AddTransient>( sp => (DbConnection c) => new IntegrationEventLogService(c)); - - if (configuration.GetValue("AzureServiceBusEnabled")) - { - services.AddSingleton(sp => - { - var logger = sp.GetRequiredService>(); - var serviceBusConnection = new ServiceBusConnectionStringBuilder(configuration["EventBusConnection"]); - return new DefaultServiceBusPersisterConnection(serviceBusConnection, logger); - }); - } - else + + + services.AddSingleton(sp => { - services.AddSingleton(sp => - { - var logger = sp.GetRequiredService>(); + IMultiRabbitMQPersistentConnections connections = new MultiRabbitMQPersistentConnections(); + connections.AddConnection(GenerateConnection("TenantA", sp, configuration)); + connections.AddConnection(GenerateConnection("TenantB", sp, configuration)); - var factory = new ConnectionFactory() - { - HostName = configuration["EventBusConnection"], - DispatchConsumersAsync = true - }; + return connections; + }); + - if (!string.IsNullOrEmpty(configuration["EventBusUserName"])) - { - factory.UserName = configuration["EventBusUserName"]; - } + return services; + } - if (!string.IsNullOrEmpty(configuration["EventBusPassword"])) - { - factory.Password = configuration["EventBusPassword"]; - } + private static IRabbitMQPersistentConnection GenerateConnection(String vHost, IServiceProvider sp, IConfiguration configuration) + { + var logger = sp.GetRequiredService>(); - var retryCount = 5; - if (!string.IsNullOrEmpty(configuration["EventBusRetryCount"])) - { - retryCount = int.Parse(configuration["EventBusRetryCount"]); - } + var factory = new ConnectionFactory() + { + HostName = configuration["EventBusConnection"], + DispatchConsumersAsync = true + }; - return new DefaultRabbitMQPersistentConnection(factory, logger, retryCount); - }); + if (!string.IsNullOrEmpty(configuration["EventBusUserName"])) + { + factory.UserName = configuration["EventBusUserName"]; } - return services; - } + if (!string.IsNullOrEmpty(configuration["EventBusPassword"])) + { + factory.Password = configuration["EventBusPassword"]; + } + factory.VirtualHost = vHost; + + var retryCount = 5; + if (!string.IsNullOrEmpty(configuration["EventBusRetryCount"])) + { + retryCount = int.Parse(configuration["EventBusRetryCount"]); + } + + return new DefaultRabbitMQPersistentConnection(factory, logger, retryCount); + } + public static IServiceCollection AddCustomAuthentication(this IServiceCollection services, IConfiguration configuration) { // prevent from mapping "sub" claim to nameidentifier.