Implementing IMultiEventBus in all baseline microservices. Still need to ensure that tenantId is set for all events.

This commit is contained in:
espent1004 2020-02-05 22:15:48 +01:00
parent 2a42053817
commit 748a0596d3
16 changed files with 605 additions and 734 deletions

View File

@ -8,10 +8,12 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ
public class MultiEventBusRabbitMQ : IMultiEventBus public class MultiEventBusRabbitMQ : IMultiEventBus
{ {
private List<IEventBus> _eventBuses; private List<IEventBus> _eventBuses;
private Dictionary<int, String> _tenants;
public MultiEventBusRabbitMQ(List<IEventBus> eventBuses) public MultiEventBusRabbitMQ(List<IEventBus> eventBuses, Dictionary<int, String> tenants)
{ {
_eventBuses = eventBuses; _eventBuses = eventBuses;
_tenants = tenants;
} }
public void AddEventBus(IEventBus eventBus) public void AddEventBus(IEventBus eventBus)
@ -21,14 +23,18 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ
public void Publish(IntegrationEvent @event) public void Publish(IntegrationEvent @event)
{ {
//TODO if (@event.TenantId == 0)//System wide event?
var actualEventBus = _eventBuses.Find(e => e.GetVHost().Equals("TenantA"));
if (actualEventBus == null)
{ {
throw new Exception(); _eventBuses.ForEach(eventBus =>
{
eventBus.Publish(@event);
});
} }
//TODO requires ALL events to have tenantId set!
_tenants.TryGetValue(@event.TenantId, out String tenantName);
var actualEventBus = _eventBuses.Find(e => e.GetVHost().Equals(tenantName));
actualEventBus.Publish(@event); actualEventBus.Publish(@event);
} }

View File

@ -5,7 +5,6 @@ using Basket.API.Infrastructure.Middlewares;
using Basket.API.IntegrationEvents.EventHandling; using Basket.API.IntegrationEvents.EventHandling;
using Basket.API.IntegrationEvents.Events; using Basket.API.IntegrationEvents.Events;
using HealthChecks.UI.Client; using HealthChecks.UI.Client;
using Microsoft.ApplicationInsights.Extensibility; using Microsoft.ApplicationInsights.Extensibility;
using Microsoft.ApplicationInsights.ServiceFabric; using Microsoft.ApplicationInsights.ServiceFabric;
using Microsoft.AspNetCore.Authentication.JwtBearer; using Microsoft.AspNetCore.Authentication.JwtBearer;
@ -58,7 +57,6 @@ namespace Microsoft.eShopOnContainers.Services.Basket.API
{ {
options.Filters.Add(typeof(HttpGlobalExceptionFilter)); options.Filters.Add(typeof(HttpGlobalExceptionFilter));
options.Filters.Add(typeof(ValidateModelStateFilter)); options.Filters.Add(typeof(ValidateModelStateFilter));
}) })
.SetCompatibilityVersion(CompatibilityVersion.Version_2_2) .SetCompatibilityVersion(CompatibilityVersion.Version_2_2)
.AddControllersAsServices(); .AddControllersAsServices();
@ -85,70 +83,16 @@ namespace Microsoft.eShopOnContainers.Services.Basket.API
return ConnectionMultiplexer.Connect(configuration); return ConnectionMultiplexer.Connect(configuration);
}); });
/* if (Configuration.GetValue<bool>("AzureServiceBusEnabled"))
{
services.AddSingleton<IServiceBusPersisterConnection>(sp =>
{
var logger = sp.GetRequiredService<ILogger<DefaultServiceBusPersisterConnection>>();
var serviceBusConnectionString = Configuration["EventBusConnection"];
var serviceBusConnection = new ServiceBusConnectionStringBuilder(serviceBusConnectionString);
return new DefaultServiceBusPersisterConnection(serviceBusConnection, logger);
});
}*/
/* else
{
services.AddSingleton<IRabbitMQPersistentConnection>(sp =>
{
var logger = sp.GetRequiredService<ILogger<DefaultRabbitMQPersistentConnection>>();
var factory = new ConnectionFactory()
{
HostName = Configuration["EventBusConnection"],
DispatchConsumersAsync = true
};
if (!string.IsNullOrEmpty(Configuration["EventBusUserName"]))
{
factory.UserName = Configuration["EventBusUserName"];
}
if (!string.IsNullOrEmpty(Configuration["EventBusPassword"]))
{
factory.Password = Configuration["EventBusPassword"];
}
factory.VirtualHost = "TenantA";
var retryCount = 5;
if (!string.IsNullOrEmpty(Configuration["EventBusRetryCount"]))
{
retryCount = int.Parse(Configuration["EventBusRetryCount"]);
}
return new DefaultRabbitMQPersistentConnection(factory, logger, retryCount);
});*/
services.AddSingleton<IMultiRabbitMQPersistentConnections>(sp => services.AddSingleton<IMultiRabbitMQPersistentConnections>(sp =>
{ {
IMultiRabbitMQPersistentConnections connections = new MultiRabbitMQPersistentConnections(); IMultiRabbitMQPersistentConnections connections = new MultiRabbitMQPersistentConnections();
connections.AddConnection(GenerateConnection("TenantA", sp)); connections.AddConnection(GenerateConnection("TenantA", sp));
connections.AddConnection(GenerateConnection("TenantB", sp)); connections.AddConnection(GenerateConnection("TenantB", sp));
connections.AddConnection(GenerateConnection("/", sp));
return connections; return connections;
}); });
//}
RegisterEventBus(services); RegisterEventBus(services);
services.AddSwaggerGen(options => services.AddSwaggerGen(options =>
@ -170,7 +114,7 @@ namespace Microsoft.eShopOnContainers.Services.Basket.API
TokenUrl = $"{Configuration.GetValue<string>("IdentityUrlExternal")}/connect/token", TokenUrl = $"{Configuration.GetValue<string>("IdentityUrlExternal")}/connect/token",
Scopes = new Dictionary<string, string>() Scopes = new Dictionary<string, string>()
{ {
{ "basket", "Basket API" } {"basket", "Basket API"}
} }
}); });
@ -228,7 +172,6 @@ namespace Microsoft.eShopOnContainers.Services.Basket.API
} }
return new DefaultRabbitMQPersistentConnection(factory, logger, retryCount); return new DefaultRabbitMQPersistentConnection(factory, logger, retryCount);
} }
// This method gets called by the runtime. Use this method to configure the HTTP request pipeline. // This method gets called by the runtime. Use this method to configure the HTTP request pipeline.
@ -264,13 +207,14 @@ namespace Microsoft.eShopOnContainers.Services.Basket.API
app.UseSwagger() app.UseSwagger()
.UseSwaggerUI(c => .UseSwaggerUI(c =>
{ {
c.SwaggerEndpoint($"{ (!string.IsNullOrEmpty(pathBase) ? pathBase : string.Empty) }/swagger/v1/swagger.json", "Basket.API V1"); c.SwaggerEndpoint(
c.OAuthClientId ("basketswaggerui"); $"{(!string.IsNullOrEmpty(pathBase) ? pathBase : string.Empty)}/swagger/v1/swagger.json",
"Basket.API V1");
c.OAuthClientId("basketswaggerui");
c.OAuthAppName("Basket Swagger UI"); c.OAuthAppName("Basket Swagger UI");
}); });
ConfigureEventBus(app); ConfigureEventBus(app);
} }
private void RegisterAppInsights(IServiceCollection services) private void RegisterAppInsights(IServiceCollection services)
@ -283,6 +227,7 @@ namespace Microsoft.eShopOnContainers.Services.Basket.API
// Enable K8s telemetry initializer // Enable K8s telemetry initializer
services.AddApplicationInsightsKubernetesEnricher(); services.AddApplicationInsightsKubernetesEnricher();
} }
if (orchestratorType?.ToUpper() == "SF") if (orchestratorType?.ToUpper() == "SF")
{ {
// Enable SF telemetry initializer // Enable SF telemetry initializer
@ -302,7 +247,6 @@ namespace Microsoft.eShopOnContainers.Services.Basket.API
{ {
options.DefaultAuthenticateScheme = JwtBearerDefaults.AuthenticationScheme; options.DefaultAuthenticateScheme = JwtBearerDefaults.AuthenticationScheme;
options.DefaultChallengeScheme = JwtBearerDefaults.AuthenticationScheme; options.DefaultChallengeScheme = JwtBearerDefaults.AuthenticationScheme;
}).AddJwtBearer(options => }).AddJwtBearer(options =>
{ {
options.Authority = identityUrl; options.Authority = identityUrl;
@ -325,22 +269,6 @@ namespace Microsoft.eShopOnContainers.Services.Basket.API
{ {
var subscriptionClientName = Configuration["SubscriptionClientName"]; var subscriptionClientName = Configuration["SubscriptionClientName"];
/*if (Configuration.GetValue<bool>("AzureServiceBusEnabled"))
{
services.AddSingleton<IEventBus, EventBusServiceBus>(sp =>
{
var serviceBusPersisterConnection = sp.GetRequiredService<IServiceBusPersisterConnection>();
var iLifetimeScope = sp.GetRequiredService<ILifetimeScope>();
var logger = sp.GetRequiredService<ILogger<EventBusServiceBus>>();
var eventBusSubcriptionsManager = sp.GetRequiredService<IEventBusSubscriptionsManager>();
return new EventBusServiceBus(serviceBusPersisterConnection, logger,
eventBusSubcriptionsManager, subscriptionClientName, iLifetimeScope);
});
}
else
{*/
services.AddSingleton<IMultiEventBus, MultiEventBusRabbitMQ>(sp => services.AddSingleton<IMultiEventBus, MultiEventBusRabbitMQ>(sp =>
{ {
var multiRabbitMqPersistentConnections = sp.GetRequiredService<IMultiRabbitMQPersistentConnections>(); var multiRabbitMqPersistentConnections = sp.GetRequiredService<IMultiRabbitMQPersistentConnections>();
@ -353,49 +281,20 @@ namespace Microsoft.eShopOnContainers.Services.Basket.API
{ {
retryCount = int.Parse(Configuration["EventBusRetryCount"]); retryCount = int.Parse(Configuration["EventBusRetryCount"]);
} }
List<IEventBus> eventBuses = new List<IEventBus>(); List<IEventBus> eventBuses = new List<IEventBus>();
eventBuses.Add(new EventBusRabbitMQ(multiRabbitMqPersistentConnections.GetConnections()[0], logger, iLifetimeScope, eventBusSubcriptionsManager, "TenantA", subscriptionClientName, retryCount)); eventBuses.Add(new EventBusRabbitMQ(multiRabbitMqPersistentConnections.GetConnections()[0], logger,
eventBuses.Add(new EventBusRabbitMQ(multiRabbitMqPersistentConnections.GetConnections()[1], logger, iLifetimeScope, eventBusSubcriptionsManager, "TenantB", subscriptionClientName, retryCount)); iLifetimeScope, eventBusSubcriptionsManager, "TenantA", subscriptionClientName, retryCount));
eventBuses.Add(new EventBusRabbitMQ(multiRabbitMqPersistentConnections.GetConnections()[1], logger,
iLifetimeScope, eventBusSubcriptionsManager, "TenantB", subscriptionClientName, retryCount));
Dictionary<int, String> tenants = new Dictionary<int, string>();
tenants.Add(1, "TenantA");
tenants.Add(2, "TenantB");
return new MultiEventBusRabbitMQ(eventBuses, tenants);
/*multiRabbitMqPersistentConnections.GetConnections().ForEach(conn =>
{
eventBuses.Add(new EventBusRabbitMQ(conn, logger, iLifetimeScope, eventBusSubcriptionsManager, subscriptionClientName, retryCount));
});*/
return new MultiEventBusRabbitMQ(eventBuses);
}); });
/* services.AddSingleton<IEventBus, EventBusRabbitMQ>(sp =>
{
var rabbitMQPersistentConnection = sp.GetRequiredService<IRabbitMQPersistentConnection>();
var iLifetimeScope = sp.GetRequiredService<ILifetimeScope>();
var logger = sp.GetRequiredService<ILogger<EventBusRabbitMQ>>();
var eventBusSubcriptionsManager = sp.GetRequiredService<IEventBusSubscriptionsManager>();
var retryCount = 5;
if (!string.IsNullOrEmpty(Configuration["EventBusRetryCount"]))
{
retryCount = int.Parse(Configuration["EventBusRetryCount"]);
}
var multiRabbitMqPersistentConnections = sp.GetRequiredService<IMultiRabbitMQPersistentConnections>();
List<IEventBus> testing = new List<IEventBus>();
multiRabbitMqPersistentConnections.GetConnections().ForEach(conn =>
{
testing.Add(new EventBusRabbitMQ(conn, logger, iLifetimeScope, eventBusSubcriptionsManager, subscriptionClientName, retryCount));
});
Console.WriteLine(testing);
return new EventBusRabbitMQ(rabbitMQPersistentConnection, logger, iLifetimeScope, eventBusSubcriptionsManager, subscriptionClientName, retryCount);
});*/
//}
services.AddSingleton<IEventBusSubscriptionsManager, InMemoryEventBusSubscriptionsManager>(); services.AddSingleton<IEventBusSubscriptionsManager, InMemoryEventBusSubscriptionsManager>();
services.AddTransient<ProductPriceChangedIntegrationEventHandler>(); services.AddTransient<ProductPriceChangedIntegrationEventHandler>();
@ -413,7 +312,8 @@ namespace Microsoft.eShopOnContainers.Services.Basket.API
public static class CustomExtensionMethods public static class CustomExtensionMethods
{ {
public static IServiceCollection AddCustomHealthCheck(this IServiceCollection services, IConfiguration configuration) public static IServiceCollection AddCustomHealthCheck(this IServiceCollection services,
IConfiguration configuration)
{ {
var hcBuilder = services.AddHealthChecks(); var hcBuilder = services.AddHealthChecks();
@ -423,7 +323,7 @@ namespace Microsoft.eShopOnContainers.Services.Basket.API
.AddRedis( .AddRedis(
configuration["ConnectionString"], configuration["ConnectionString"],
name: "redis-check", name: "redis-check",
tags: new string[] { "redis" }); tags: new string[] {"redis"});
if (configuration.GetValue<bool>("AzureServiceBusEnabled")) if (configuration.GetValue<bool>("AzureServiceBusEnabled"))
{ {
@ -432,7 +332,7 @@ namespace Microsoft.eShopOnContainers.Services.Basket.API
configuration["EventBusConnection"], configuration["EventBusConnection"],
topicName: "eshop_event_bus", topicName: "eshop_event_bus",
name: "basket-servicebus-check", name: "basket-servicebus-check",
tags: new string[] { "servicebus" }); tags: new string[] {"servicebus"});
} }
else else
{ {
@ -440,7 +340,7 @@ namespace Microsoft.eShopOnContainers.Services.Basket.API
.AddRabbitMQ( .AddRabbitMQ(
$"amqp://{configuration["EventBusConnection"]}", $"amqp://{configuration["EventBusConnection"]}",
name: "basket-rabbitmqbus-check", name: "basket-rabbitmqbus-check",
tags: new string[] { "rabbitmqbus" }); tags: new string[] {"rabbitmqbus"});
} }
return services; return services;

View File

@ -17,14 +17,14 @@ namespace Catalog.API.IntegrationEvents
public class CatalogIntegrationEventService : ICatalogIntegrationEventService public class CatalogIntegrationEventService : ICatalogIntegrationEventService
{ {
private readonly Func<DbConnection, IIntegrationEventLogService> _integrationEventLogServiceFactory; private readonly Func<DbConnection, IIntegrationEventLogService> _integrationEventLogServiceFactory;
private readonly IEventBus _eventBus; private readonly IMultiEventBus _eventBus;
private readonly CatalogContext _catalogContext; private readonly CatalogContext _catalogContext;
private readonly IIntegrationEventLogService _eventLogService; private readonly IIntegrationEventLogService _eventLogService;
private readonly ILogger<CatalogIntegrationEventService> _logger; private readonly ILogger<CatalogIntegrationEventService> _logger;
public CatalogIntegrationEventService( public CatalogIntegrationEventService(
ILogger<CatalogIntegrationEventService> logger, ILogger<CatalogIntegrationEventService> logger,
IEventBus eventBus, IMultiEventBus eventBus,
CatalogContext catalogContext, CatalogContext catalogContext,
Func<DbConnection, IIntegrationEventLogService> integrationEventLogServiceFactory) Func<DbConnection, IIntegrationEventLogService> integrationEventLogServiceFactory)
{ {

View File

@ -26,6 +26,7 @@ using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options; using Microsoft.Extensions.Options;
using RabbitMQ.Client; using RabbitMQ.Client;
using System; using System;
using System.Collections.Generic;
using System.Data.Common; using System.Data.Common;
using System.Reflection; using System.Reflection;
using HealthChecks.UI.Client; using HealthChecks.UI.Client;
@ -101,7 +102,7 @@ namespace Microsoft.eShopOnContainers.Services.Catalog.API
protected virtual void ConfigureEventBus(IApplicationBuilder app) protected virtual void ConfigureEventBus(IApplicationBuilder app)
{ {
var eventBus = app.ApplicationServices.GetRequiredService<IEventBus>(); var eventBus = app.ApplicationServices.GetRequiredService<IMultiEventBus>();
eventBus.Subscribe<OrderStatusChangedToAwaitingValidationIntegrationEvent, OrderStatusChangedToAwaitingValidationIntegrationEventHandler>(); eventBus.Subscribe<OrderStatusChangedToAwaitingValidationIntegrationEvent, OrderStatusChangedToAwaitingValidationIntegrationEventHandler>();
eventBus.Subscribe<OrderStatusChangedToPaidIntegrationEvent, OrderStatusChangedToPaidIntegrationEventHandler>(); eventBus.Subscribe<OrderStatusChangedToPaidIntegrationEvent, OrderStatusChangedToPaidIntegrationEventHandler>();
} }
@ -276,20 +277,6 @@ namespace Microsoft.eShopOnContainers.Services.Catalog.API
services.AddTransient<ICatalogIntegrationEventService, CatalogIntegrationEventService>(); services.AddTransient<ICatalogIntegrationEventService, CatalogIntegrationEventService>();
if (configuration.GetValue<bool>("AzureServiceBusEnabled"))
{
services.AddSingleton<IServiceBusPersisterConnection>(sp =>
{
var settings = sp.GetRequiredService<IOptions<CatalogSettings>>().Value;
var logger = sp.GetRequiredService<ILogger<DefaultServiceBusPersisterConnection>>();
var serviceBusConnection = new ServiceBusConnectionStringBuilder(settings.EventBusConnection);
return new DefaultServiceBusPersisterConnection(serviceBusConnection, logger);
});
}
else
{
services.AddSingleton<IRabbitMQPersistentConnection>(sp => services.AddSingleton<IRabbitMQPersistentConnection>(sp =>
{ {
var settings = sp.GetRequiredService<IOptions<CatalogSettings>>().Value; var settings = sp.GetRequiredService<IOptions<CatalogSettings>>().Value;
@ -321,34 +308,57 @@ namespace Microsoft.eShopOnContainers.Services.Catalog.API
return new DefaultRabbitMQPersistentConnection(factory, logger, retryCount); return new DefaultRabbitMQPersistentConnection(factory, logger, retryCount);
}); });
services.AddSingleton<IMultiRabbitMQPersistentConnections>(sp =>
{
IMultiRabbitMQPersistentConnections connections = new MultiRabbitMQPersistentConnections();
connections.AddConnection(GenerateConnection("TenantA", sp, configuration));
connections.AddConnection(GenerateConnection("TenantB", sp, configuration));
return connections;
});
return services;
} }
return services;
private static IRabbitMQPersistentConnection GenerateConnection(String vHost, IServiceProvider sp, IConfiguration configuration)
{
var logger = sp.GetRequiredService<ILogger<DefaultRabbitMQPersistentConnection>>();
var factory = new ConnectionFactory()
{
HostName = configuration["EventBusConnection"],
DispatchConsumersAsync = true
};
if (!string.IsNullOrEmpty(configuration["EventBusUserName"]))
{
factory.UserName = configuration["EventBusUserName"];
}
if (!string.IsNullOrEmpty(configuration["EventBusPassword"]))
{
factory.Password = configuration["EventBusPassword"];
}
factory.VirtualHost = vHost;
var retryCount = 5;
if (!string.IsNullOrEmpty(configuration["EventBusRetryCount"]))
{
retryCount = int.Parse(configuration["EventBusRetryCount"]);
}
return new DefaultRabbitMQPersistentConnection(factory, logger, retryCount);
} }
public static IServiceCollection AddEventBus(this IServiceCollection services, IConfiguration configuration) public static IServiceCollection AddEventBus(this IServiceCollection services, IConfiguration configuration)
{ {
var subscriptionClientName = configuration["SubscriptionClientName"]; var subscriptionClientName = configuration["SubscriptionClientName"];
/* if (configuration.GetValue<bool>("AzureServiceBusEnabled")) services.AddSingleton<IMultiEventBus, MultiEventBusRabbitMQ>(sp =>
{ {
services.AddSingleton<IEventBus, EventBusServiceBus>(sp => var multiRabbitMqPersistentConnections = sp.GetRequiredService<IMultiRabbitMQPersistentConnections>();
{
var serviceBusPersisterConnection = sp.GetRequiredService<IServiceBusPersisterConnection>();
var iLifetimeScope = sp.GetRequiredService<ILifetimeScope>();
var logger = sp.GetRequiredService<ILogger<EventBusServiceBus>>();
var eventBusSubcriptionsManager = sp.GetRequiredService<IEventBusSubscriptionsManager>();
return new EventBusServiceBus(serviceBusPersisterConnection, logger,
eventBusSubcriptionsManager, subscriptionClientName, iLifetimeScope);
});
}
else*/
{
services.AddSingleton<IEventBus, EventBusRabbitMQ>(sp =>
{
var rabbitMQPersistentConnection = sp.GetRequiredService<IRabbitMQPersistentConnection>();
var iLifetimeScope = sp.GetRequiredService<ILifetimeScope>(); var iLifetimeScope = sp.GetRequiredService<ILifetimeScope>();
var logger = sp.GetRequiredService<ILogger<EventBusRabbitMQ>>(); var logger = sp.GetRequiredService<ILogger<EventBusRabbitMQ>>();
var eventBusSubcriptionsManager = sp.GetRequiredService<IEventBusSubscriptionsManager>(); var eventBusSubcriptionsManager = sp.GetRequiredService<IEventBusSubscriptionsManager>();
@ -359,9 +369,18 @@ namespace Microsoft.eShopOnContainers.Services.Catalog.API
retryCount = int.Parse(configuration["EventBusRetryCount"]); retryCount = int.Parse(configuration["EventBusRetryCount"]);
} }
return new EventBusRabbitMQ(rabbitMQPersistentConnection, logger, iLifetimeScope, eventBusSubcriptionsManager, "TenantA", subscriptionClientName, retryCount); List<IEventBus> eventBuses = new List<IEventBus>();
eventBuses.Add(new EventBusRabbitMQ(multiRabbitMqPersistentConnections.GetConnections()[0], logger,
iLifetimeScope, eventBusSubcriptionsManager, "TenantA", subscriptionClientName, retryCount));
eventBuses.Add(new EventBusRabbitMQ(multiRabbitMqPersistentConnections.GetConnections()[1], logger,
iLifetimeScope, eventBusSubcriptionsManager, "TenantB", subscriptionClientName, retryCount));
Dictionary<int, String> tenants = new Dictionary<int, string>();
tenants.Add(1, "TenantA");
tenants.Add(2, "TenantB");
return new MultiEventBusRabbitMQ(eventBuses, tenants);
}); });
}
services.AddSingleton<IEventBusSubscriptionsManager, InMemoryEventBusSubscriptionsManager>(); services.AddSingleton<IEventBusSubscriptionsManager, InMemoryEventBusSubscriptionsManager>();
services.AddTransient<OrderStatusChangedToAwaitingValidationIntegrationEventHandler>(); services.AddTransient<OrderStatusChangedToAwaitingValidationIntegrationEventHandler>();

View File

@ -14,12 +14,12 @@
public class LocationsService : ILocationsService public class LocationsService : ILocationsService
{ {
private readonly ILocationsRepository _locationsRepository; private readonly ILocationsRepository _locationsRepository;
private readonly IEventBus _eventBus; private readonly IMultiEventBus _eventBus;
private readonly ILogger<LocationsService> _logger; private readonly ILogger<LocationsService> _logger;
public LocationsService( public LocationsService(
ILocationsRepository locationsRepository, ILocationsRepository locationsRepository,
IEventBus eventBus, IMultiEventBus eventBus,
ILogger<LocationsService> logger) ILogger<LocationsService> logger)
{ {
_locationsRepository = locationsRepository ?? throw new ArgumentNullException(nameof(locationsRepository)); _locationsRepository = locationsRepository ?? throw new ArgumentNullException(nameof(locationsRepository));

View File

@ -46,10 +46,7 @@ namespace Microsoft.eShopOnContainers.Services.Locations.API
services services
.AddCustomHealthCheck(Configuration) .AddCustomHealthCheck(Configuration)
.AddMvc(options => .AddMvc(options => { options.Filters.Add(typeof(HttpGlobalExceptionFilter)); })
{
options.Filters.Add(typeof(HttpGlobalExceptionFilter));
})
.SetCompatibilityVersion(CompatibilityVersion.Version_2_2) .SetCompatibilityVersion(CompatibilityVersion.Version_2_2)
.AddControllersAsServices(); .AddControllersAsServices();
@ -57,51 +54,14 @@ namespace Microsoft.eShopOnContainers.Services.Locations.API
services.Configure<LocationSettings>(Configuration); services.Configure<LocationSettings>(Configuration);
if (Configuration.GetValue<bool>("AzureServiceBusEnabled")) services.AddSingleton<IMultiRabbitMQPersistentConnections>(sp =>
{ {
services.AddSingleton<IServiceBusPersisterConnection>(sp => IMultiRabbitMQPersistentConnections connections = new MultiRabbitMQPersistentConnections();
{ connections.AddConnection(GenerateConnection("TenantA", sp));
var logger = sp.GetRequiredService<ILogger<DefaultServiceBusPersisterConnection>>(); connections.AddConnection(GenerateConnection("TenantB", sp));
var serviceBusConnectionString = Configuration["EventBusConnection"]; return connections;
var serviceBusConnection = new ServiceBusConnectionStringBuilder(serviceBusConnectionString);
return new DefaultServiceBusPersisterConnection(serviceBusConnection, logger);
}); });
}
else
{
services.AddSingleton<IRabbitMQPersistentConnection>(sp =>
{
var logger = sp.GetRequiredService<ILogger<DefaultRabbitMQPersistentConnection>>();
var factory = new ConnectionFactory()
{
HostName = Configuration["EventBusConnection"],
DispatchConsumersAsync = true
};
if (!string.IsNullOrEmpty(Configuration["EventBusUserName"]))
{
factory.UserName = Configuration["EventBusUserName"];
}
if (!string.IsNullOrEmpty(Configuration["EventBusPassword"]))
{
factory.Password = Configuration["EventBusPassword"];
}
factory.VirtualHost = "TenantA";
var retryCount = 5;
if (!string.IsNullOrEmpty(Configuration["EventBusRetryCount"]))
{
retryCount = int.Parse(Configuration["EventBusRetryCount"]);
}
return new DefaultRabbitMQPersistentConnection(factory, logger, retryCount);
});
}
RegisterEventBus(services); RegisterEventBus(services);
@ -125,12 +85,11 @@ namespace Microsoft.eShopOnContainers.Services.Locations.API
TokenUrl = $"{Configuration.GetValue<string>("IdentityUrlExternal")}/connect/token", TokenUrl = $"{Configuration.GetValue<string>("IdentityUrlExternal")}/connect/token",
Scopes = new Dictionary<string, string>() Scopes = new Dictionary<string, string>()
{ {
{ "locations", "Locations API" } {"locations", "Locations API"}
} }
}); });
options.OperationFilter<AuthorizeCheckOperationFilter>(); options.OperationFilter<AuthorizeCheckOperationFilter>();
}); });
services.AddCors(options => services.AddCors(options =>
@ -155,6 +114,38 @@ namespace Microsoft.eShopOnContainers.Services.Locations.API
return new AutofacServiceProvider(container.Build()); return new AutofacServiceProvider(container.Build());
} }
private IRabbitMQPersistentConnection GenerateConnection(String vHost, IServiceProvider sp)
{
var logger = sp.GetRequiredService<ILogger<DefaultRabbitMQPersistentConnection>>();
var factory = new ConnectionFactory()
{
HostName = Configuration["EventBusConnection"],
DispatchConsumersAsync = true
};
if (!string.IsNullOrEmpty(Configuration["EventBusUserName"]))
{
factory.UserName = Configuration["EventBusUserName"];
}
if (!string.IsNullOrEmpty(Configuration["EventBusPassword"]))
{
factory.Password = Configuration["EventBusPassword"];
}
factory.VirtualHost = vHost;
var retryCount = 5;
if (!string.IsNullOrEmpty(Configuration["EventBusRetryCount"]))
{
retryCount = int.Parse(Configuration["EventBusRetryCount"]);
}
return new DefaultRabbitMQPersistentConnection(factory, logger, retryCount);
}
// This method gets called by the runtime. Use this method to configure the HTTP request pipeline. // This method gets called by the runtime. Use this method to configure the HTTP request pipeline.
public void Configure(IApplicationBuilder app, IHostingEnvironment env, ILoggerFactory loggerFactory) public void Configure(IApplicationBuilder app, IHostingEnvironment env, ILoggerFactory loggerFactory)
{ {
@ -187,7 +178,9 @@ namespace Microsoft.eShopOnContainers.Services.Locations.API
app.UseSwagger() app.UseSwagger()
.UseSwaggerUI(c => .UseSwaggerUI(c =>
{ {
c.SwaggerEndpoint($"{ (!string.IsNullOrEmpty(pathBase) ? pathBase : string.Empty) }/swagger/v1/swagger.json", "Locations.API V1"); c.SwaggerEndpoint(
$"{(!string.IsNullOrEmpty(pathBase) ? pathBase : string.Empty)}/swagger/v1/swagger.json",
"Locations.API V1");
c.OAuthClientId("locationsswaggerui"); c.OAuthClientId("locationsswaggerui");
c.OAuthAppName("Locations Swagger UI"); c.OAuthAppName("Locations Swagger UI");
}); });
@ -206,6 +199,7 @@ namespace Microsoft.eShopOnContainers.Services.Locations.API
// Enable K8s telemetry initializer // Enable K8s telemetry initializer
services.AddApplicationInsightsKubernetesEnricher(); services.AddApplicationInsightsKubernetesEnricher();
} }
if (orchestratorType?.ToUpper() == "SF") if (orchestratorType?.ToUpper() == "SF")
{ {
// Enable SF telemetry initializer // Enable SF telemetry initializer
@ -246,24 +240,9 @@ namespace Microsoft.eShopOnContainers.Services.Locations.API
{ {
var subscriptionClientName = Configuration["SubscriptionClientName"]; var subscriptionClientName = Configuration["SubscriptionClientName"];
/* if (Configuration.GetValue<bool>("AzureServiceBusEnabled")) services.AddSingleton<IMultiEventBus, MultiEventBusRabbitMQ>(sp =>
{ {
services.AddSingleton<IEventBus, EventBusServiceBus>(sp => var multiRabbitMqPersistentConnections = sp.GetRequiredService<IMultiRabbitMQPersistentConnections>();
{
var serviceBusPersisterConnection = sp.GetRequiredService<IServiceBusPersisterConnection>();
var iLifetimeScope = sp.GetRequiredService<ILifetimeScope>();
var logger = sp.GetRequiredService<ILogger<EventBusServiceBus>>();
var eventBusSubcriptionsManager = sp.GetRequiredService<IEventBusSubscriptionsManager>();
return new EventBusServiceBus(serviceBusPersisterConnection, logger,
eventBusSubcriptionsManager, subscriptionClientName, iLifetimeScope);
});
}
else*/
{
services.AddSingleton<IEventBus, EventBusRabbitMQ>(sp =>
{
var rabbitMQPersistentConnection = sp.GetRequiredService<IRabbitMQPersistentConnection>();
var iLifetimeScope = sp.GetRequiredService<ILifetimeScope>(); var iLifetimeScope = sp.GetRequiredService<ILifetimeScope>();
var logger = sp.GetRequiredService<ILogger<EventBusRabbitMQ>>(); var logger = sp.GetRequiredService<ILogger<EventBusRabbitMQ>>();
var eventBusSubcriptionsManager = sp.GetRequiredService<IEventBusSubscriptionsManager>(); var eventBusSubcriptionsManager = sp.GetRequiredService<IEventBusSubscriptionsManager>();
@ -274,9 +253,18 @@ namespace Microsoft.eShopOnContainers.Services.Locations.API
retryCount = int.Parse(Configuration["EventBusRetryCount"]); retryCount = int.Parse(Configuration["EventBusRetryCount"]);
} }
return new EventBusRabbitMQ(rabbitMQPersistentConnection, logger, iLifetimeScope, eventBusSubcriptionsManager, "TenantA", subscriptionClientName, retryCount); List<IEventBus> eventBuses = new List<IEventBus>();
eventBuses.Add(new EventBusRabbitMQ(multiRabbitMqPersistentConnections.GetConnections()[0], logger,
iLifetimeScope, eventBusSubcriptionsManager, "TenantA", subscriptionClientName, retryCount));
eventBuses.Add(new EventBusRabbitMQ(multiRabbitMqPersistentConnections.GetConnections()[1], logger,
iLifetimeScope, eventBusSubcriptionsManager, "TenantB", subscriptionClientName, retryCount));
Dictionary<int, String> tenants = new Dictionary<int, string>();
tenants.Add(1, "TenantA");
tenants.Add(2, "TenantB");
return new MultiEventBusRabbitMQ(eventBuses, tenants);
}); });
}
services.AddSingleton<IEventBusSubscriptionsManager, InMemoryEventBusSubscriptionsManager>(); services.AddSingleton<IEventBusSubscriptionsManager, InMemoryEventBusSubscriptionsManager>();
} }
@ -284,7 +272,8 @@ namespace Microsoft.eShopOnContainers.Services.Locations.API
public static class CustomExtensionMethods public static class CustomExtensionMethods
{ {
public static IServiceCollection AddCustomHealthCheck(this IServiceCollection services, IConfiguration configuration) public static IServiceCollection AddCustomHealthCheck(this IServiceCollection services,
IConfiguration configuration)
{ {
var hcBuilder = services.AddHealthChecks(); var hcBuilder = services.AddHealthChecks();
@ -294,7 +283,7 @@ namespace Microsoft.eShopOnContainers.Services.Locations.API
.AddMongoDb( .AddMongoDb(
configuration["ConnectionString"], configuration["ConnectionString"],
name: "locations-mongodb-check", name: "locations-mongodb-check",
tags: new string[] { "mongodb" }); tags: new string[] {"mongodb"});
if (configuration.GetValue<bool>("AzureServiceBusEnabled")) if (configuration.GetValue<bool>("AzureServiceBusEnabled"))
{ {
@ -303,7 +292,7 @@ namespace Microsoft.eShopOnContainers.Services.Locations.API
configuration["EventBusConnection"], configuration["EventBusConnection"],
topicName: "eshop_event_bus", topicName: "eshop_event_bus",
name: "locations-servicebus-check", name: "locations-servicebus-check",
tags: new string[] { "servicebus" }); tags: new string[] {"servicebus"});
} }
else else
{ {
@ -311,7 +300,7 @@ namespace Microsoft.eShopOnContainers.Services.Locations.API
.AddRabbitMQ( .AddRabbitMQ(
$"amqp://{configuration["EventBusConnection"]}", $"amqp://{configuration["EventBusConnection"]}",
name: "locations-rabbitmqbus-check", name: "locations-rabbitmqbus-check",
tags: new string[] { "rabbitmqbus" }); tags: new string[] {"rabbitmqbus"});
} }
return services; return services;

View File

@ -81,51 +81,16 @@
//Check Client vs. Server evaluation: https://docs.microsoft.com/en-us/ef/core/querying/client-eval //Check Client vs. Server evaluation: https://docs.microsoft.com/en-us/ef/core/querying/client-eval
}); });
if (Configuration.GetValue<bool>("AzureServiceBusEnabled"))
{
services.AddSingleton<IServiceBusPersisterConnection>(sp =>
{
var logger = sp.GetRequiredService<ILogger<DefaultServiceBusPersisterConnection>>();
var serviceBusConnectionString = Configuration["EventBusConnection"]; services.AddSingleton<IMultiRabbitMQPersistentConnections>(sp =>
var serviceBusConnection = new ServiceBusConnectionStringBuilder(serviceBusConnectionString); {
IMultiRabbitMQPersistentConnections connections = new MultiRabbitMQPersistentConnections();
connections.AddConnection(GenerateConnection("TenantA", sp));
connections.AddConnection(GenerateConnection("TenantB", sp));
return new DefaultServiceBusPersisterConnection(serviceBusConnection, logger); return connections;
}); });
}
else
{
services.AddSingleton<IRabbitMQPersistentConnection>(sp =>
{
var logger = sp.GetRequiredService<ILogger<DefaultRabbitMQPersistentConnection>>();
var factory = new ConnectionFactory()
{
HostName = Configuration["EventBusConnection"],
DispatchConsumersAsync = true
};
if (!string.IsNullOrEmpty(Configuration["EventBusUserName"]))
{
factory.UserName = Configuration["EventBusUserName"];
}
if (!string.IsNullOrEmpty(Configuration["EventBusPassword"]))
{
factory.Password = Configuration["EventBusPassword"];
}
factory.VirtualHost = "TenantA";
var retryCount = 5;
if (!string.IsNullOrEmpty(Configuration["EventBusRetryCount"]))
{
retryCount = int.Parse(Configuration["EventBusRetryCount"]);
}
return new DefaultRabbitMQPersistentConnection(factory, logger, retryCount);
});
}
// Add framework services. // Add framework services.
services.AddSwaggerGen(options => services.AddSwaggerGen(options =>
@ -259,25 +224,10 @@
private void RegisterEventBus(IServiceCollection services) private void RegisterEventBus(IServiceCollection services)
{ {
var subscriptionClientName = Configuration["SubscriptionClientName"]; var subscriptionClientName = Configuration["SubscriptionClientName"];
/* if (Configuration.GetValue<bool>("AzureServiceBusEnabled"))
{ {
services.AddSingleton<IEventBus, EventBusServiceBus>(sp => services.AddSingleton<IMultiEventBus, MultiEventBusRabbitMQ>(sp =>
{ {
var serviceBusPersisterConnection = sp.GetRequiredService<IServiceBusPersisterConnection>(); var multiRabbitMqPersistentConnections = sp.GetRequiredService<IMultiRabbitMQPersistentConnections>();
var iLifetimeScope = sp.GetRequiredService<ILifetimeScope>();
var logger = sp.GetRequiredService<ILogger<EventBusServiceBus>>();
var eventBusSubcriptionsManager = sp.GetRequiredService<IEventBusSubscriptionsManager>();
return new EventBusServiceBus(serviceBusPersisterConnection, logger,
eventBusSubcriptionsManager, subscriptionClientName, iLifetimeScope);
});
}
else*/
{
services.AddSingleton<IEventBus, EventBusRabbitMQ>(sp =>
{
var rabbitMQPersistentConnection = sp.GetRequiredService<IRabbitMQPersistentConnection>();
var iLifetimeScope = sp.GetRequiredService<ILifetimeScope>(); var iLifetimeScope = sp.GetRequiredService<ILifetimeScope>();
var logger = sp.GetRequiredService<ILogger<EventBusRabbitMQ>>(); var logger = sp.GetRequiredService<ILogger<EventBusRabbitMQ>>();
var eventBusSubcriptionsManager = sp.GetRequiredService<IEventBusSubscriptionsManager>(); var eventBusSubcriptionsManager = sp.GetRequiredService<IEventBusSubscriptionsManager>();
@ -288,7 +238,17 @@
retryCount = int.Parse(Configuration["EventBusRetryCount"]); retryCount = int.Parse(Configuration["EventBusRetryCount"]);
} }
return new EventBusRabbitMQ(rabbitMQPersistentConnection, logger, iLifetimeScope, eventBusSubcriptionsManager, "TenantA", subscriptionClientName, retryCount); List<IEventBus> eventBuses = new List<IEventBus>();
eventBuses.Add(new EventBusRabbitMQ(multiRabbitMqPersistentConnections.GetConnections()[0], logger,
iLifetimeScope, eventBusSubcriptionsManager, "TenantA", subscriptionClientName, retryCount));
eventBuses.Add(new EventBusRabbitMQ(multiRabbitMqPersistentConnections.GetConnections()[1], logger,
iLifetimeScope, eventBusSubcriptionsManager, "TenantB", subscriptionClientName, retryCount));
Dictionary<int, String> tenants = new Dictionary<int, string>();
tenants.Add(1, "TenantA");
tenants.Add(2, "TenantB");
return new MultiEventBusRabbitMQ(eventBuses, tenants);
}); });
} }
@ -296,9 +256,40 @@
services.AddTransient<UserLocationUpdatedIntegrationEventHandler>(); services.AddTransient<UserLocationUpdatedIntegrationEventHandler>();
} }
private IRabbitMQPersistentConnection GenerateConnection(String vHost, IServiceProvider sp)
{
var logger = sp.GetRequiredService<ILogger<DefaultRabbitMQPersistentConnection>>();
var factory = new ConnectionFactory()
{
HostName = Configuration["EventBusConnection"],
DispatchConsumersAsync = true
};
if (!string.IsNullOrEmpty(Configuration["EventBusUserName"]))
{
factory.UserName = Configuration["EventBusUserName"];
}
if (!string.IsNullOrEmpty(Configuration["EventBusPassword"]))
{
factory.Password = Configuration["EventBusPassword"];
}
factory.VirtualHost = vHost;
var retryCount = 5;
if (!string.IsNullOrEmpty(Configuration["EventBusRetryCount"]))
{
retryCount = int.Parse(Configuration["EventBusRetryCount"]);
}
return new DefaultRabbitMQPersistentConnection(factory, logger, retryCount);
}
private void ConfigureEventBus(IApplicationBuilder app) private void ConfigureEventBus(IApplicationBuilder app)
{ {
var eventBus = app.ApplicationServices.GetRequiredService<IEventBus>(); var eventBus = app.ApplicationServices.GetRequiredService<IMultiEventBus>();
eventBus.Subscribe<UserLocationUpdatedIntegrationEvent, UserLocationUpdatedIntegrationEventHandler>(); eventBus.Subscribe<UserLocationUpdatedIntegrationEvent, UserLocationUpdatedIntegrationEventHandler>();
} }

View File

@ -15,12 +15,12 @@ namespace Ordering.API.Application.IntegrationEvents.EventHandling
public class UserCheckoutAcceptedIntegrationEventHandler : IIntegrationEventHandler<UserCheckoutAcceptedIntegrationEvent> public class UserCheckoutAcceptedIntegrationEventHandler : IIntegrationEventHandler<UserCheckoutAcceptedIntegrationEvent>
{ {
private readonly IMediator _mediator; private readonly IMediator _mediator;
private readonly IEventBus _eventBus; private readonly IMultiEventBus _eventBus;
private readonly ILogger<UserCheckoutAcceptedIntegrationEventHandler> _logger; private readonly ILogger<UserCheckoutAcceptedIntegrationEventHandler> _logger;
public UserCheckoutAcceptedIntegrationEventHandler( public UserCheckoutAcceptedIntegrationEventHandler(
IMediator mediator, IMediator mediator,
ILogger<UserCheckoutAcceptedIntegrationEventHandler> logger, IEventBus eventBus) ILogger<UserCheckoutAcceptedIntegrationEventHandler> logger, IMultiEventBus eventBus)
{ {
_mediator = mediator ?? throw new ArgumentNullException(nameof(mediator)); _mediator = mediator ?? throw new ArgumentNullException(nameof(mediator));
_logger = logger ?? throw new ArgumentNullException(nameof(logger)); _logger = logger ?? throw new ArgumentNullException(nameof(logger));

View File

@ -19,13 +19,13 @@ namespace Ordering.API.Application.IntegrationEvents
public class OrderingIntegrationEventService : IOrderingIntegrationEventService public class OrderingIntegrationEventService : IOrderingIntegrationEventService
{ {
private readonly Func<DbConnection, IIntegrationEventLogService> _integrationEventLogServiceFactory; private readonly Func<DbConnection, IIntegrationEventLogService> _integrationEventLogServiceFactory;
private readonly IEventBus _eventBus; private readonly IMultiEventBus _eventBus;
private readonly OrderingContext _orderingContext; private readonly OrderingContext _orderingContext;
private readonly IntegrationEventLogContext _eventLogContext; private readonly IntegrationEventLogContext _eventLogContext;
private readonly IIntegrationEventLogService _eventLogService; private readonly IIntegrationEventLogService _eventLogService;
private readonly ILogger<OrderingIntegrationEventService> _logger; private readonly ILogger<OrderingIntegrationEventService> _logger;
public OrderingIntegrationEventService(IEventBus eventBus, public OrderingIntegrationEventService(IMultiEventBus eventBus,
OrderingContext orderingContext, OrderingContext orderingContext,
IntegrationEventLogContext eventLogContext, IntegrationEventLogContext eventLogContext,
Func<DbConnection, IIntegrationEventLogService> integrationEventLogServiceFactory, Func<DbConnection, IIntegrationEventLogService> integrationEventLogServiceFactory,

View File

@ -115,7 +115,7 @@
private void ConfigureEventBus(IApplicationBuilder app) private void ConfigureEventBus(IApplicationBuilder app)
{ {
var eventBus = app.ApplicationServices.GetRequiredService<BuildingBlocks.EventBus.Abstractions.IEventBus>(); var eventBus = app.ApplicationServices.GetRequiredService<IMultiEventBus>();
eventBus.Subscribe<UserCheckoutAcceptedIntegrationEvent, IIntegrationEventHandler<UserCheckoutAcceptedIntegrationEvent>>(); eventBus.Subscribe<UserCheckoutAcceptedIntegrationEvent, IIntegrationEventHandler<UserCheckoutAcceptedIntegrationEvent>>();
eventBus.Subscribe<GracePeriodConfirmedIntegrationEvent, IIntegrationEventHandler<GracePeriodConfirmedIntegrationEvent>>(); eventBus.Subscribe<GracePeriodConfirmedIntegrationEvent, IIntegrationEventHandler<GracePeriodConfirmedIntegrationEvent>>();
@ -284,25 +284,22 @@
services.AddTransient<IOrderingIntegrationEventService, OrderingIntegrationEventService>(); services.AddTransient<IOrderingIntegrationEventService, OrderingIntegrationEventService>();
if (configuration.GetValue<bool>("AzureServiceBusEnabled")) services.AddSingleton<IMultiRabbitMQPersistentConnections>(sp =>
{ {
services.AddSingleton<IServiceBusPersisterConnection>(sp => IMultiRabbitMQPersistentConnections connections = new MultiRabbitMQPersistentConnections();
{ connections.AddConnection(GenerateConnection("TenantA", sp, configuration));
var logger = sp.GetRequiredService<ILogger<DefaultServiceBusPersisterConnection>>(); connections.AddConnection(GenerateConnection("TenantB", sp, configuration));
var serviceBusConnectionString = configuration["EventBusConnection"]; return connections;
var serviceBusConnection = new ServiceBusConnectionStringBuilder(serviceBusConnectionString);
return new DefaultServiceBusPersisterConnection(serviceBusConnection, logger);
}); });
return services;
} }
else
{ private static IRabbitMQPersistentConnection GenerateConnection(String vHost, IServiceProvider sp, IConfiguration configuration)
services.AddSingleton<IRabbitMQPersistentConnection>(sp =>
{ {
var logger = sp.GetRequiredService<ILogger<DefaultRabbitMQPersistentConnection>>(); var logger = sp.GetRequiredService<ILogger<DefaultRabbitMQPersistentConnection>>();
var factory = new ConnectionFactory() var factory = new ConnectionFactory()
{ {
HostName = configuration["EventBusConnection"], HostName = configuration["EventBusConnection"],
@ -319,8 +316,7 @@
factory.Password = configuration["EventBusPassword"]; factory.Password = configuration["EventBusPassword"];
} }
factory.VirtualHost = "TenantA"; factory.VirtualHost = vHost;
var retryCount = 5; var retryCount = 5;
if (!string.IsNullOrEmpty(configuration["EventBusRetryCount"])) if (!string.IsNullOrEmpty(configuration["EventBusRetryCount"]))
@ -329,10 +325,6 @@
} }
return new DefaultRabbitMQPersistentConnection(factory, logger, retryCount); return new DefaultRabbitMQPersistentConnection(factory, logger, retryCount);
});
}
return services;
} }
public static IServiceCollection AddCustomConfiguration(this IServiceCollection services, IConfiguration configuration) public static IServiceCollection AddCustomConfiguration(this IServiceCollection services, IConfiguration configuration)
@ -364,24 +356,11 @@
{ {
var subscriptionClientName = configuration["SubscriptionClientName"]; var subscriptionClientName = configuration["SubscriptionClientName"];
/* if (configuration.GetValue<bool>("AzureServiceBusEnabled"))
{ {
services.AddSingleton<IEventBus, EventBusServiceBus>(sp =>
{
var serviceBusPersisterConnection = sp.GetRequiredService<IServiceBusPersisterConnection>();
var iLifetimeScope = sp.GetRequiredService<ILifetimeScope>();
var logger = sp.GetRequiredService<ILogger<EventBusServiceBus>>();
var eventBusSubcriptionsManager = sp.GetRequiredService<IEventBusSubscriptionsManager>();
return new EventBusServiceBus(serviceBusPersisterConnection, logger, services.AddSingleton<IMultiEventBus, MultiEventBusRabbitMQ>(sp =>
eventBusSubcriptionsManager, subscriptionClientName, iLifetimeScope);
});
}
else*/
{ {
services.AddSingleton<IEventBus, EventBusRabbitMQ>(sp => var multiRabbitMqPersistentConnections = sp.GetRequiredService<IMultiRabbitMQPersistentConnections>();
{
var rabbitMQPersistentConnection = sp.GetRequiredService<IRabbitMQPersistentConnection>();
var iLifetimeScope = sp.GetRequiredService<ILifetimeScope>(); var iLifetimeScope = sp.GetRequiredService<ILifetimeScope>();
var logger = sp.GetRequiredService<ILogger<EventBusRabbitMQ>>(); var logger = sp.GetRequiredService<ILogger<EventBusRabbitMQ>>();
var eventBusSubcriptionsManager = sp.GetRequiredService<IEventBusSubscriptionsManager>(); var eventBusSubcriptionsManager = sp.GetRequiredService<IEventBusSubscriptionsManager>();
@ -392,7 +371,17 @@
retryCount = int.Parse(configuration["EventBusRetryCount"]); retryCount = int.Parse(configuration["EventBusRetryCount"]);
} }
return new EventBusRabbitMQ(rabbitMQPersistentConnection, logger, iLifetimeScope, eventBusSubcriptionsManager, "TenantA", subscriptionClientName, retryCount); List<IEventBus> eventBuses = new List<IEventBus>();
eventBuses.Add(new EventBusRabbitMQ(multiRabbitMqPersistentConnections.GetConnections()[0], logger,
iLifetimeScope, eventBusSubcriptionsManager, "TenantA", subscriptionClientName, retryCount));
eventBuses.Add(new EventBusRabbitMQ(multiRabbitMqPersistentConnections.GetConnections()[1], logger,
iLifetimeScope, eventBusSubcriptionsManager, "TenantB", subscriptionClientName, retryCount));
Dictionary<int, String> tenants = new Dictionary<int, string>();
tenants.Add(1, "TenantA");
tenants.Add(2, "TenantB");
return new MultiEventBusRabbitMQ(eventBuses, tenants);
}); });
} }

View File

@ -15,6 +15,7 @@ using Ordering.BackgroundTasks.Configuration;
using Ordering.BackgroundTasks.Tasks; using Ordering.BackgroundTasks.Tasks;
using RabbitMQ.Client; using RabbitMQ.Client;
using System; using System;
using System.Collections.Generic;
using HealthChecks.UI.Client; using HealthChecks.UI.Client;
using Microsoft.AspNetCore.Diagnostics.HealthChecks; using Microsoft.AspNetCore.Diagnostics.HealthChecks;
using Microsoft.Extensions.Diagnostics.HealthChecks; using Microsoft.Extensions.Diagnostics.HealthChecks;
@ -47,25 +48,30 @@ namespace Ordering.BackgroundTasks
//configure event bus related services //configure event bus related services
if (Configuration.GetValue<bool>("AzureServiceBusEnabled")) services.AddSingleton<IMultiRabbitMQPersistentConnections>(sp =>
{ {
services.AddSingleton<IServiceBusPersisterConnection>(sp => IMultiRabbitMQPersistentConnections connections = new MultiRabbitMQPersistentConnections();
{ connections.AddConnection(GenerateConnection("TenantA", sp));
var logger = sp.GetRequiredService<ILogger<DefaultServiceBusPersisterConnection>>(); connections.AddConnection(GenerateConnection("TenantB", sp));
var serviceBusConnectionString = Configuration["EventBusConnection"]; return connections;
var serviceBusConnection = new ServiceBusConnectionStringBuilder(serviceBusConnectionString);
return new DefaultServiceBusPersisterConnection(serviceBusConnection, logger);
}); });
RegisterEventBus(services);
//create autofac based service provider
var container = new ContainerBuilder();
container.Populate(services);
return new AutofacServiceProvider(container.Build());
} }
else
{ private IRabbitMQPersistentConnection GenerateConnection(String vHost, IServiceProvider sp)
services.AddSingleton<IRabbitMQPersistentConnection>(sp =>
{ {
var logger = sp.GetRequiredService<ILogger<DefaultRabbitMQPersistentConnection>>(); var logger = sp.GetRequiredService<ILogger<DefaultRabbitMQPersistentConnection>>();
var factory = new ConnectionFactory() var factory = new ConnectionFactory()
{ {
HostName = Configuration["EventBusConnection"], HostName = Configuration["EventBusConnection"],
@ -82,7 +88,7 @@ namespace Ordering.BackgroundTasks
factory.Password = Configuration["EventBusPassword"]; factory.Password = Configuration["EventBusPassword"];
} }
factory.VirtualHost = "TenantA"; factory.VirtualHost = vHost;
var retryCount = 5; var retryCount = 5;
if (!string.IsNullOrEmpty(Configuration["EventBusRetryCount"])) if (!string.IsNullOrEmpty(Configuration["EventBusRetryCount"]))
@ -91,17 +97,6 @@ namespace Ordering.BackgroundTasks
} }
return new DefaultRabbitMQPersistentConnection(factory, logger, retryCount); return new DefaultRabbitMQPersistentConnection(factory, logger, retryCount);
});
}
RegisterEventBus(services);
//create autofac based service provider
var container = new ContainerBuilder();
container.Populate(services);
return new AutofacServiceProvider(container.Build());
} }
@ -125,24 +120,9 @@ namespace Ordering.BackgroundTasks
{ {
var subscriptionClientName = Configuration["SubscriptionClientName"]; var subscriptionClientName = Configuration["SubscriptionClientName"];
/* if (Configuration.GetValue<bool>("AzureServiceBusEnabled")) services.AddSingleton<IMultiEventBus, MultiEventBusRabbitMQ>(sp =>
{ {
services.AddSingleton<IEventBus, EventBusServiceBus>(sp => var multiRabbitMqPersistentConnections = sp.GetRequiredService<IMultiRabbitMQPersistentConnections>();
{
var serviceBusPersisterConnection = sp.GetRequiredService<IServiceBusPersisterConnection>();
var iLifetimeScope = sp.GetRequiredService<ILifetimeScope>();
var logger = sp.GetRequiredService<ILogger<EventBusServiceBus>>();
var eventBusSubcriptionsManager = sp.GetRequiredService<IEventBusSubscriptionsManager>();
return new EventBusServiceBus(serviceBusPersisterConnection, logger,
eventBusSubcriptionsManager, subscriptionClientName, iLifetimeScope);
});
}
else*/
{
services.AddSingleton<IEventBus, EventBusRabbitMQ>(sp =>
{
var rabbitMQPersistentConnection = sp.GetRequiredService<IRabbitMQPersistentConnection>();
var iLifetimeScope = sp.GetRequiredService<ILifetimeScope>(); var iLifetimeScope = sp.GetRequiredService<ILifetimeScope>();
var logger = sp.GetRequiredService<ILogger<EventBusRabbitMQ>>(); var logger = sp.GetRequiredService<ILogger<EventBusRabbitMQ>>();
var eventBusSubcriptionsManager = sp.GetRequiredService<IEventBusSubscriptionsManager>(); var eventBusSubcriptionsManager = sp.GetRequiredService<IEventBusSubscriptionsManager>();
@ -153,9 +133,19 @@ namespace Ordering.BackgroundTasks
retryCount = int.Parse(Configuration["EventBusRetryCount"]); retryCount = int.Parse(Configuration["EventBusRetryCount"]);
} }
return new EventBusRabbitMQ(rabbitMQPersistentConnection, logger, iLifetimeScope, eventBusSubcriptionsManager, "TenantA", subscriptionClientName, retryCount); List<IEventBus> eventBuses = new List<IEventBus>();
eventBuses.Add(new EventBusRabbitMQ(multiRabbitMqPersistentConnections.GetConnections()[0], logger,
iLifetimeScope, eventBusSubcriptionsManager, "TenantA", subscriptionClientName, retryCount));
eventBuses.Add(new EventBusRabbitMQ(multiRabbitMqPersistentConnections.GetConnections()[1], logger,
iLifetimeScope, eventBusSubcriptionsManager, "TenantB", subscriptionClientName, retryCount));
Dictionary<int, String> tenants = new Dictionary<int, string>();
tenants.Add(1, "TenantA");
tenants.Add(2, "TenantB");
return new MultiEventBusRabbitMQ(eventBuses, tenants);
}); });
}
services.AddSingleton<IEventBusSubscriptionsManager, InMemoryEventBusSubscriptionsManager>(); services.AddSingleton<IEventBusSubscriptionsManager, InMemoryEventBusSubscriptionsManager>();
} }

View File

@ -21,12 +21,12 @@ namespace Ordering.BackgroundTasks.Tasks
{ {
private readonly ILogger<GracePeriodManagerService> _logger; private readonly ILogger<GracePeriodManagerService> _logger;
private readonly BackgroundTaskSettings _settings; private readonly BackgroundTaskSettings _settings;
private readonly IEventBus _eventBus; private readonly IMultiEventBus _eventBus;
private static readonly String identityUrl = @"http://identity.api/"; private static readonly String identityUrl = @"http://identity.api/";
public GracePeriodManagerService( public GracePeriodManagerService(
IOptions<BackgroundTaskSettings> settings, IOptions<BackgroundTaskSettings> settings,
IEventBus eventBus, IMultiEventBus eventBus,
ILogger<GracePeriodManagerService> logger) ILogger<GracePeriodManagerService> logger)
{ {
_settings = settings?.Value ?? throw new ArgumentNullException(nameof(settings)); _settings = settings?.Value ?? throw new ArgumentNullException(nameof(settings));

View File

@ -16,6 +16,7 @@ using Ordering.SignalrHub.IntegrationEvents.EventHandling;
using Ordering.SignalrHub.IntegrationEvents.Events; using Ordering.SignalrHub.IntegrationEvents.Events;
using RabbitMQ.Client; using RabbitMQ.Client;
using System; using System;
using System.Collections.Generic;
using System.IdentityModel.Tokens.Jwt; using System.IdentityModel.Tokens.Jwt;
using HealthChecks.UI.Client; using HealthChecks.UI.Client;
using Microsoft.AspNetCore.Diagnostics.HealthChecks; using Microsoft.AspNetCore.Diagnostics.HealthChecks;
@ -59,25 +60,33 @@ namespace Ordering.SignalrHub
services.AddSignalR(); services.AddSignalR();
} }
if (Configuration.GetValue<bool>("AzureServiceBusEnabled")) services.AddSingleton<IMultiRabbitMQPersistentConnections>(sp =>
{ {
services.AddSingleton<IServiceBusPersisterConnection>(sp => IMultiRabbitMQPersistentConnections connections = new MultiRabbitMQPersistentConnections();
{ connections.AddConnection(GenerateConnection("TenantA", sp));
var logger = sp.GetRequiredService<ILogger<DefaultServiceBusPersisterConnection>>(); connections.AddConnection(GenerateConnection("TenantB", sp));
var serviceBusConnectionString = Configuration["EventBusConnection"]; return connections;
var serviceBusConnection = new ServiceBusConnectionStringBuilder(serviceBusConnectionString);
return new DefaultServiceBusPersisterConnection(serviceBusConnection, logger);
}); });
ConfigureAuthService(services);
RegisterEventBus(services);
services.AddOptions();
//configure autofac
var container = new ContainerBuilder();
container.RegisterModule(new ApplicationModule());
container.Populate(services);
return new AutofacServiceProvider(container.Build());
} }
else
{ private IRabbitMQPersistentConnection GenerateConnection(String vHost, IServiceProvider sp)
services.AddSingleton<IRabbitMQPersistentConnection>(sp =>
{ {
var logger = sp.GetRequiredService<ILogger<DefaultRabbitMQPersistentConnection>>(); var logger = sp.GetRequiredService<ILogger<DefaultRabbitMQPersistentConnection>>();
var factory = new ConnectionFactory() var factory = new ConnectionFactory()
{ {
HostName = Configuration["EventBusConnection"], HostName = Configuration["EventBusConnection"],
@ -94,7 +103,7 @@ namespace Ordering.SignalrHub
factory.Password = Configuration["EventBusPassword"]; factory.Password = Configuration["EventBusPassword"];
} }
factory.VirtualHost = "TenantA"; factory.VirtualHost = vHost;
var retryCount = 5; var retryCount = 5;
if (!string.IsNullOrEmpty(Configuration["EventBusRetryCount"])) if (!string.IsNullOrEmpty(Configuration["EventBusRetryCount"]))
@ -103,22 +112,8 @@ namespace Ordering.SignalrHub
} }
return new DefaultRabbitMQPersistentConnection(factory, logger, retryCount); return new DefaultRabbitMQPersistentConnection(factory, logger, retryCount);
});
} }
ConfigureAuthService(services);
RegisterEventBus(services);
services.AddOptions();
//configure autofac
var container = new ContainerBuilder();
container.RegisterModule(new ApplicationModule());
container.Populate(services);
return new AutofacServiceProvider(container.Build());
}
// This method gets called by the runtime. Use this method to configure the HTTP request pipeline. // This method gets called by the runtime. Use this method to configure the HTTP request pipeline.
public void Configure(IApplicationBuilder app, ILoggerFactory loggerFactory) public void Configure(IApplicationBuilder app, ILoggerFactory loggerFactory)
@ -161,14 +156,25 @@ namespace Ordering.SignalrHub
private void ConfigureEventBus(IApplicationBuilder app) private void ConfigureEventBus(IApplicationBuilder app)
{ {
var eventBus = app.ApplicationServices.GetRequiredService<IEventBus>(); var eventBus = app.ApplicationServices.GetRequiredService<IMultiEventBus>();
eventBus.Subscribe<OrderStatusChangedToAwaitingValidationIntegrationEvent, OrderStatusChangedToAwaitingValidationIntegrationEventHandler>(); eventBus
eventBus.Subscribe<OrderStatusChangedToPaidIntegrationEvent, OrderStatusChangedToPaidIntegrationEventHandler>(); .Subscribe<OrderStatusChangedToAwaitingValidationIntegrationEvent,
eventBus.Subscribe<OrderStatusChangedToStockConfirmedIntegrationEvent, OrderStatusChangedToStockConfirmedIntegrationEventHandler>(); OrderStatusChangedToAwaitingValidationIntegrationEventHandler>();
eventBus.Subscribe<OrderStatusChangedToShippedIntegrationEvent, OrderStatusChangedToShippedIntegrationEventHandler>(); eventBus
eventBus.Subscribe<OrderStatusChangedToCancelledIntegrationEvent, OrderStatusChangedToCancelledIntegrationEventHandler>(); .Subscribe<OrderStatusChangedToPaidIntegrationEvent, OrderStatusChangedToPaidIntegrationEventHandler>();
eventBus.Subscribe<OrderStatusChangedToSubmittedIntegrationEvent, OrderStatusChangedToSubmittedIntegrationEventHandler>(); eventBus
.Subscribe<OrderStatusChangedToStockConfirmedIntegrationEvent,
OrderStatusChangedToStockConfirmedIntegrationEventHandler>();
eventBus
.Subscribe<OrderStatusChangedToShippedIntegrationEvent,
OrderStatusChangedToShippedIntegrationEventHandler>();
eventBus
.Subscribe<OrderStatusChangedToCancelledIntegrationEvent,
OrderStatusChangedToCancelledIntegrationEventHandler>();
eventBus
.Subscribe<OrderStatusChangedToSubmittedIntegrationEvent,
OrderStatusChangedToSubmittedIntegrationEventHandler>();
} }
private void ConfigureAuthService(IServiceCollection services) private void ConfigureAuthService(IServiceCollection services)
@ -182,7 +188,6 @@ namespace Ordering.SignalrHub
{ {
options.DefaultAuthenticateScheme = JwtBearerDefaults.AuthenticationScheme; options.DefaultAuthenticateScheme = JwtBearerDefaults.AuthenticationScheme;
options.DefaultChallengeScheme = JwtBearerDefaults.AuthenticationScheme; options.DefaultChallengeScheme = JwtBearerDefaults.AuthenticationScheme;
}).AddJwtBearer(options => }).AddJwtBearer(options =>
{ {
options.Authority = identityUrl; options.Authority = identityUrl;
@ -195,24 +200,10 @@ namespace Ordering.SignalrHub
{ {
var subscriptionClientName = Configuration["SubscriptionClientName"]; var subscriptionClientName = Configuration["SubscriptionClientName"];
/* if (Configuration.GetValue<bool>("AzureServiceBusEnabled"))
{
services.AddSingleton<IEventBus, EventBusServiceBus>(sp =>
{
var serviceBusPersisterConnection = sp.GetRequiredService<IServiceBusPersisterConnection>();
var iLifetimeScope = sp.GetRequiredService<ILifetimeScope>();
var logger = sp.GetRequiredService<ILogger<EventBusServiceBus>>();
var eventBusSubcriptionsManager = sp.GetRequiredService<IEventBusSubscriptionsManager>();
return new EventBusServiceBus(serviceBusPersisterConnection, logger, services.AddSingleton<IMultiEventBus, MultiEventBusRabbitMQ>(sp =>
eventBusSubcriptionsManager, subscriptionClientName, iLifetimeScope);
});
}
else*/
{ {
services.AddSingleton<IEventBus, EventBusRabbitMQ>(sp => var multiRabbitMqPersistentConnections = sp.GetRequiredService<IMultiRabbitMQPersistentConnections>();
{
var rabbitMQPersistentConnection = sp.GetRequiredService<IRabbitMQPersistentConnection>();
var iLifetimeScope = sp.GetRequiredService<ILifetimeScope>(); var iLifetimeScope = sp.GetRequiredService<ILifetimeScope>();
var logger = sp.GetRequiredService<ILogger<EventBusRabbitMQ>>(); var logger = sp.GetRequiredService<ILogger<EventBusRabbitMQ>>();
var eventBusSubcriptionsManager = sp.GetRequiredService<IEventBusSubscriptionsManager>(); var eventBusSubcriptionsManager = sp.GetRequiredService<IEventBusSubscriptionsManager>();
@ -223,9 +214,19 @@ namespace Ordering.SignalrHub
retryCount = int.Parse(Configuration["EventBusRetryCount"]); retryCount = int.Parse(Configuration["EventBusRetryCount"]);
} }
return new EventBusRabbitMQ(rabbitMQPersistentConnection, logger, iLifetimeScope, eventBusSubcriptionsManager, "TenantA", subscriptionClientName, retryCount); List<IEventBus> eventBuses = new List<IEventBus>();
eventBuses.Add(new EventBusRabbitMQ(multiRabbitMqPersistentConnections.GetConnections()[0], logger,
iLifetimeScope, eventBusSubcriptionsManager, "TenantA", subscriptionClientName, retryCount));
eventBuses.Add(new EventBusRabbitMQ(multiRabbitMqPersistentConnections.GetConnections()[1], logger,
iLifetimeScope, eventBusSubcriptionsManager, "TenantB", subscriptionClientName, retryCount));
Dictionary<int, String> tenants = new Dictionary<int, string>();
tenants.Add(1, "TenantA");
tenants.Add(2, "TenantB");
return new MultiEventBusRabbitMQ(eventBuses, tenants);
}); });
}
services.AddSingleton<IEventBusSubscriptionsManager, InMemoryEventBusSubscriptionsManager>(); services.AddSingleton<IEventBusSubscriptionsManager, InMemoryEventBusSubscriptionsManager>();
} }
@ -233,7 +234,8 @@ namespace Ordering.SignalrHub
public static class CustomExtensionMethods public static class CustomExtensionMethods
{ {
public static IServiceCollection AddCustomHealthCheck(this IServiceCollection services, IConfiguration configuration) public static IServiceCollection AddCustomHealthCheck(this IServiceCollection services,
IConfiguration configuration)
{ {
var hcBuilder = services.AddHealthChecks(); var hcBuilder = services.AddHealthChecks();
@ -246,7 +248,7 @@ namespace Ordering.SignalrHub
configuration["EventBusConnection"], configuration["EventBusConnection"],
topicName: "eshop_event_bus", topicName: "eshop_event_bus",
name: "signalr-servicebus-check", name: "signalr-servicebus-check",
tags: new string[] { "servicebus" }); tags: new string[] {"servicebus"});
} }
else else
{ {
@ -254,7 +256,7 @@ namespace Ordering.SignalrHub
.AddRabbitMQ( .AddRabbitMQ(
$"amqp://{configuration["EventBusConnection"]}", $"amqp://{configuration["EventBusConnection"]}",
name: "signalr-rabbitmqbus-check", name: "signalr-rabbitmqbus-check",
tags: new string[] { "rabbitmqbus" }); tags: new string[] {"rabbitmqbus"});
} }
return services; return services;

View File

@ -11,12 +11,12 @@
public class OrderStatusChangedToStockConfirmedIntegrationEventHandler : public class OrderStatusChangedToStockConfirmedIntegrationEventHandler :
IIntegrationEventHandler<OrderStatusChangedToStockConfirmedIntegrationEvent> IIntegrationEventHandler<OrderStatusChangedToStockConfirmedIntegrationEvent>
{ {
private readonly IEventBus _eventBus; private readonly IMultiEventBus _eventBus;
private readonly PaymentSettings _settings; private readonly PaymentSettings _settings;
private readonly ILogger<OrderStatusChangedToStockConfirmedIntegrationEventHandler> _logger; private readonly ILogger<OrderStatusChangedToStockConfirmedIntegrationEventHandler> _logger;
public OrderStatusChangedToStockConfirmedIntegrationEventHandler( public OrderStatusChangedToStockConfirmedIntegrationEventHandler(
IEventBus eventBus, IMultiEventBus eventBus,
IOptionsSnapshot<PaymentSettings> settings, IOptionsSnapshot<PaymentSettings> settings,
ILogger<OrderStatusChangedToStockConfirmedIntegrationEventHandler> logger) ILogger<OrderStatusChangedToStockConfirmedIntegrationEventHandler> logger)
{ {

View File

@ -16,6 +16,7 @@ using Payment.API.IntegrationEvents.EventHandling;
using Payment.API.IntegrationEvents.Events; using Payment.API.IntegrationEvents.Events;
using RabbitMQ.Client; using RabbitMQ.Client;
using System; using System;
using System.Collections.Generic;
using HealthChecks.UI.Client; using HealthChecks.UI.Client;
using Microsoft.AspNetCore.Diagnostics.HealthChecks; using Microsoft.AspNetCore.Diagnostics.HealthChecks;
using Microsoft.Extensions.Diagnostics.HealthChecks; using Microsoft.Extensions.Diagnostics.HealthChecks;
@ -39,23 +40,26 @@ namespace Payment.API
RegisterAppInsights(services); RegisterAppInsights(services);
if (Configuration.GetValue<bool>("AzureServiceBusEnabled")) services.AddSingleton<IMultiRabbitMQPersistentConnections>(sp =>
{ {
services.AddSingleton<IServiceBusPersisterConnection>(sp => IMultiRabbitMQPersistentConnections connections = new MultiRabbitMQPersistentConnections();
{ connections.AddConnection(GenerateConnection("TenantA", sp));
var logger = sp.GetRequiredService<ILogger<DefaultServiceBusPersisterConnection>>(); connections.AddConnection(GenerateConnection("TenantB", sp));
var serviceBusConnectionString = Configuration["EventBusConnection"]; return connections;
var serviceBusConnection = new ServiceBusConnectionStringBuilder(serviceBusConnectionString);
return new DefaultServiceBusPersisterConnection(serviceBusConnection, logger);
}); });
RegisterEventBus(services);
var container = new ContainerBuilder();
container.Populate(services);
return new AutofacServiceProvider(container.Build());
} }
else
{ private IRabbitMQPersistentConnection GenerateConnection(String vHost, IServiceProvider sp)
services.AddSingleton<IRabbitMQPersistentConnection>(sp =>
{ {
var logger = sp.GetRequiredService<ILogger<DefaultRabbitMQPersistentConnection>>(); var logger = sp.GetRequiredService<ILogger<DefaultRabbitMQPersistentConnection>>();
var factory = new ConnectionFactory() var factory = new ConnectionFactory()
{ {
HostName = Configuration["EventBusConnection"], HostName = Configuration["EventBusConnection"],
@ -72,7 +76,7 @@ namespace Payment.API
factory.Password = Configuration["EventBusPassword"]; factory.Password = Configuration["EventBusPassword"];
} }
factory.VirtualHost = "TenantA"; factory.VirtualHost = vHost;
var retryCount = 5; var retryCount = 5;
if (!string.IsNullOrEmpty(Configuration["EventBusRetryCount"])) if (!string.IsNullOrEmpty(Configuration["EventBusRetryCount"]))
@ -81,14 +85,6 @@ namespace Payment.API
} }
return new DefaultRabbitMQPersistentConnection(factory, logger, retryCount); return new DefaultRabbitMQPersistentConnection(factory, logger, retryCount);
});
}
RegisterEventBus(services);
var container = new ContainerBuilder();
container.Populate(services);
return new AutofacServiceProvider(container.Build());
} }
// This method gets called by the runtime. Use this method to configure the HTTP request pipeline. // This method gets called by the runtime. Use this method to configure the HTTP request pipeline.
@ -140,24 +136,10 @@ namespace Payment.API
{ {
var subscriptionClientName = Configuration["SubscriptionClientName"]; var subscriptionClientName = Configuration["SubscriptionClientName"];
/* if (Configuration.GetValue<bool>("AzureServiceBusEnabled"))
{
services.AddSingleton<IEventBus, EventBusServiceBus>(sp =>
{
var serviceBusPersisterConnection = sp.GetRequiredService<IServiceBusPersisterConnection>();
var iLifetimeScope = sp.GetRequiredService<ILifetimeScope>();
var logger = sp.GetRequiredService<ILogger<EventBusServiceBus>>();
var eventBusSubcriptionsManager = sp.GetRequiredService<IEventBusSubscriptionsManager>();
return new EventBusServiceBus(serviceBusPersisterConnection, logger, services.AddSingleton<IMultiEventBus, MultiEventBusRabbitMQ>(sp =>
eventBusSubcriptionsManager, subscriptionClientName, iLifetimeScope);
});
}
else*/
{ {
services.AddSingleton<IEventBus, EventBusRabbitMQ>(sp => var multiRabbitMqPersistentConnections = sp.GetRequiredService<IMultiRabbitMQPersistentConnections>();
{
var rabbitMQPersistentConnection = sp.GetRequiredService<IRabbitMQPersistentConnection>();
var iLifetimeScope = sp.GetRequiredService<ILifetimeScope>(); var iLifetimeScope = sp.GetRequiredService<ILifetimeScope>();
var logger = sp.GetRequiredService<ILogger<EventBusRabbitMQ>>(); var logger = sp.GetRequiredService<ILogger<EventBusRabbitMQ>>();
var eventBusSubcriptionsManager = sp.GetRequiredService<IEventBusSubscriptionsManager>(); var eventBusSubcriptionsManager = sp.GetRequiredService<IEventBusSubscriptionsManager>();
@ -168,9 +150,19 @@ namespace Payment.API
retryCount = int.Parse(Configuration["EventBusRetryCount"]); retryCount = int.Parse(Configuration["EventBusRetryCount"]);
} }
return new EventBusRabbitMQ(rabbitMQPersistentConnection, logger, iLifetimeScope, eventBusSubcriptionsManager, "TenantA", subscriptionClientName, retryCount); List<IEventBus> eventBuses = new List<IEventBus>();
eventBuses.Add(new EventBusRabbitMQ(multiRabbitMqPersistentConnections.GetConnections()[0], logger,
iLifetimeScope, eventBusSubcriptionsManager, "TenantA", subscriptionClientName, retryCount));
eventBuses.Add(new EventBusRabbitMQ(multiRabbitMqPersistentConnections.GetConnections()[1], logger,
iLifetimeScope, eventBusSubcriptionsManager, "TenantB", subscriptionClientName, retryCount));
Dictionary<int, String> tenants = new Dictionary<int, string>();
tenants.Add(1, "TenantA");
tenants.Add(2, "TenantB");
return new MultiEventBusRabbitMQ(eventBuses, tenants);
}); });
}
services.AddTransient<OrderStatusChangedToStockConfirmedIntegrationEventHandler>(); services.AddTransient<OrderStatusChangedToStockConfirmedIntegrationEventHandler>();
services.AddSingleton<IEventBusSubscriptionsManager, InMemoryEventBusSubscriptionsManager>(); services.AddSingleton<IEventBusSubscriptionsManager, InMemoryEventBusSubscriptionsManager>();
@ -178,7 +170,7 @@ namespace Payment.API
private void ConfigureEventBus(IApplicationBuilder app) private void ConfigureEventBus(IApplicationBuilder app)
{ {
var eventBus = app.ApplicationServices.GetRequiredService<IEventBus>(); var eventBus = app.ApplicationServices.GetRequiredService<IMultiEventBus>();
eventBus.Subscribe<OrderStatusChangedToStockConfirmedIntegrationEvent, OrderStatusChangedToStockConfirmedIntegrationEventHandler>(); eventBus.Subscribe<OrderStatusChangedToStockConfirmedIntegrationEvent, OrderStatusChangedToStockConfirmedIntegrationEventHandler>();
} }
} }

View File

@ -126,7 +126,7 @@ namespace Webhooks.API
protected virtual void ConfigureEventBus(IApplicationBuilder app) protected virtual void ConfigureEventBus(IApplicationBuilder app)
{ {
var eventBus = app.ApplicationServices.GetRequiredService<IEventBus>(); var eventBus = app.ApplicationServices.GetRequiredService<IMultiEventBus>();
eventBus.Subscribe<ProductPriceChangedIntegrationEvent, ProductPriceChangedIntegrationEventHandler>(); eventBus.Subscribe<ProductPriceChangedIntegrationEvent, ProductPriceChangedIntegrationEventHandler>();
eventBus.Subscribe<OrderStatusChangedToShippedIntegrationEvent, OrderStatusChangedToShippedIntegrationEventHandler>(); eventBus.Subscribe<OrderStatusChangedToShippedIntegrationEvent, OrderStatusChangedToShippedIntegrationEventHandler>();
eventBus.Subscribe<OrderStatusChangedToPaidIntegrationEvent, OrderStatusChangedToPaidIntegrationEventHandler>(); eventBus.Subscribe<OrderStatusChangedToPaidIntegrationEvent, OrderStatusChangedToPaidIntegrationEventHandler>();
@ -232,25 +232,9 @@ namespace Webhooks.API
{ {
var subscriptionClientName = configuration["SubscriptionClientName"]; var subscriptionClientName = configuration["SubscriptionClientName"];
/* if (configuration.GetValue<bool>("AzureServiceBusEnabled")) services.AddSingleton<IMultiEventBus, MultiEventBusRabbitMQ>(sp =>
{ {
services.AddSingleton<IEventBus, EventBusServiceBus>(sp => var multiRabbitMqPersistentConnections = sp.GetRequiredService<IMultiRabbitMQPersistentConnections>();
{
var serviceBusPersisterConnection = sp.GetRequiredService<IServiceBusPersisterConnection>();
var iLifetimeScope = sp.GetRequiredService<ILifetimeScope>();
var logger = sp.GetRequiredService<ILogger<EventBusServiceBus>>();
var eventBusSubcriptionsManager = sp.GetRequiredService<IEventBusSubscriptionsManager>();
return new EventBusServiceBus(serviceBusPersisterConnection, logger,
eventBusSubcriptionsManager, subscriptionClientName, iLifetimeScope);
});
}
else*/
{
services.AddSingleton<IEventBus, EventBusRabbitMQ>(sp =>
{
var rabbitMQPersistentConnection = sp.GetRequiredService<IRabbitMQPersistentConnection>();
var iLifetimeScope = sp.GetRequiredService<ILifetimeScope>(); var iLifetimeScope = sp.GetRequiredService<ILifetimeScope>();
var logger = sp.GetRequiredService<ILogger<EventBusRabbitMQ>>(); var logger = sp.GetRequiredService<ILogger<EventBusRabbitMQ>>();
var eventBusSubcriptionsManager = sp.GetRequiredService<IEventBusSubscriptionsManager>(); var eventBusSubcriptionsManager = sp.GetRequiredService<IEventBusSubscriptionsManager>();
@ -261,9 +245,19 @@ namespace Webhooks.API
retryCount = int.Parse(configuration["EventBusRetryCount"]); retryCount = int.Parse(configuration["EventBusRetryCount"]);
} }
return new EventBusRabbitMQ(rabbitMQPersistentConnection, logger, iLifetimeScope, eventBusSubcriptionsManager, "TenantA", subscriptionClientName, retryCount); List<IEventBus> eventBuses = new List<IEventBus>();
eventBuses.Add(new EventBusRabbitMQ(multiRabbitMqPersistentConnections.GetConnections()[0], logger,
iLifetimeScope, eventBusSubcriptionsManager, "TenantA", subscriptionClientName, retryCount));
eventBuses.Add(new EventBusRabbitMQ(multiRabbitMqPersistentConnections.GetConnections()[1], logger,
iLifetimeScope, eventBusSubcriptionsManager, "TenantB", subscriptionClientName, retryCount));
Dictionary<int, String> tenants = new Dictionary<int, string>();
tenants.Add(1, "TenantA");
tenants.Add(2, "TenantB");
return new MultiEventBusRabbitMQ(eventBuses, tenants);
}); });
}
services.AddSingleton<IEventBusSubscriptionsManager, InMemoryEventBusSubscriptionsManager>(); services.AddSingleton<IEventBusSubscriptionsManager, InMemoryEventBusSubscriptionsManager>();
services.AddTransient<ProductPriceChangedIntegrationEventHandler>(); services.AddTransient<ProductPriceChangedIntegrationEventHandler>();
@ -305,18 +299,21 @@ namespace Webhooks.API
services.AddTransient<Func<DbConnection, IIntegrationEventLogService>>( services.AddTransient<Func<DbConnection, IIntegrationEventLogService>>(
sp => (DbConnection c) => new IntegrationEventLogService(c)); sp => (DbConnection c) => new IntegrationEventLogService(c));
if (configuration.GetValue<bool>("AzureServiceBusEnabled"))
services.AddSingleton<IMultiRabbitMQPersistentConnections>(sp =>
{ {
services.AddSingleton<IServiceBusPersisterConnection>(sp => IMultiRabbitMQPersistentConnections connections = new MultiRabbitMQPersistentConnections();
{ connections.AddConnection(GenerateConnection("TenantA", sp, configuration));
var logger = sp.GetRequiredService<ILogger<DefaultServiceBusPersisterConnection>>(); connections.AddConnection(GenerateConnection("TenantB", sp, configuration));
var serviceBusConnection = new ServiceBusConnectionStringBuilder(configuration["EventBusConnection"]);
return new DefaultServiceBusPersisterConnection(serviceBusConnection, logger); return connections;
}); });
return services;
} }
else
{ private static IRabbitMQPersistentConnection GenerateConnection(String vHost, IServiceProvider sp, IConfiguration configuration)
services.AddSingleton<IRabbitMQPersistentConnection>(sp =>
{ {
var logger = sp.GetRequiredService<ILogger<DefaultRabbitMQPersistentConnection>>(); var logger = sp.GetRequiredService<ILogger<DefaultRabbitMQPersistentConnection>>();
@ -336,7 +333,7 @@ namespace Webhooks.API
factory.Password = configuration["EventBusPassword"]; factory.Password = configuration["EventBusPassword"];
} }
factory.VirtualHost = "TenantA"; factory.VirtualHost = vHost;
var retryCount = 5; var retryCount = 5;
if (!string.IsNullOrEmpty(configuration["EventBusRetryCount"])) if (!string.IsNullOrEmpty(configuration["EventBusRetryCount"]))
@ -345,10 +342,6 @@ namespace Webhooks.API
} }
return new DefaultRabbitMQPersistentConnection(factory, logger, retryCount); return new DefaultRabbitMQPersistentConnection(factory, logger, retryCount);
});
}
return services;
} }
public static IServiceCollection AddCustomAuthentication(this IServiceCollection services, IConfiguration configuration) public static IServiceCollection AddCustomAuthentication(this IServiceCollection services, IConfiguration configuration)