Browse Source

Merge pull request #6 from Espent1004/eventBusIsolation

Event bus isolation
pull/1240/head
Espent1004 5 years ago
committed by GitHub
parent
commit
2e7a4b2945
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
34 changed files with 863 additions and 721 deletions
  1. +2
    -0
      src/BuildingBlocks/EventBus/EventBus/Abstractions/IEventBus.cs
  2. +15
    -0
      src/BuildingBlocks/EventBus/EventBus/Abstractions/IMultiEventBus.cs
  3. +8
    -8
      src/BuildingBlocks/EventBus/EventBus/IEventBusSubscriptionsManager.cs
  4. +75
    -36
      src/BuildingBlocks/EventBus/EventBus/InMemoryEventBusSubscriptionsManager.cs
  5. +16
    -13
      src/BuildingBlocks/EventBus/EventBusRabbitMQ/EventBusRabbitMQ.cs
  6. +12
    -0
      src/BuildingBlocks/EventBus/EventBusRabbitMQ/IMultiRabbitMQPersistentConnections.cs
  7. +49
    -0
      src/BuildingBlocks/EventBus/EventBusRabbitMQ/MultiEventBusRabbitMQ.cs
  8. +30
    -0
      src/BuildingBlocks/EventBus/EventBusRabbitMQ/MultiRabbitMQPersistentConnections.cs
  9. +2
    -1
      src/BuildingBlocks/EventBus/EventBusServiceBus/EventBusServiceBus.cs
  10. +2
    -2
      src/Services/Basket/Basket.API/Controllers/BasketController.cs
  11. +85
    -95
      src/Services/Basket/Basket.API/Startup.cs
  12. +2
    -2
      src/Services/Catalog/Catalog.API/IntegrationEvents/CatalogIntegrationEventService.cs
  13. +2
    -0
      src/Services/Catalog/Catalog.API/IntegrationEvents/EventHandling/OrderStatusChangedToAwaitingValidationIntegrationEventHandler.cs
  14. +64
    -43
      src/Services/Catalog/Catalog.API/Startup.cs
  15. +2
    -2
      src/Services/Location/Locations.API/Infrastructure/Services/LocationsService.cs
  16. +89
    -98
      src/Services/Location/Locations.API/Startup.cs
  17. +54
    -61
      src/Services/Marketing/Marketing.API/Startup.cs
  18. +4
    -1
      src/Services/Ordering/Ordering.API/Application/Commands/SetStockConfirmedOrderStatusCommand.cs
  19. +1
    -1
      src/Services/Ordering/Ordering.API/Application/Commands/SetStockConfirmedOrderStatusCommandHandler.cs
  20. +2
    -0
      src/Services/Ordering/Ordering.API/Application/DomainEventHandlers/OrderStockConfirmed/OrderStatusChangedToStockConfirmedDomainEventHandler.cs
  21. +1
    -1
      src/Services/Ordering/Ordering.API/Application/IntegrationEvents/EventHandling/OrderStockConfirmedIntegrationEventHandler.cs
  22. +2
    -2
      src/Services/Ordering/Ordering.API/Application/IntegrationEvents/EventHandling/UserCheckoutAcceptedIntegrationEventHandler.cs
  23. +2
    -2
      src/Services/Ordering/Ordering.API/Application/IntegrationEvents/OrderingIntegrationEventService.cs
  24. +46
    -54
      src/Services/Ordering/Ordering.API/Startup.cs
  25. +2
    -2
      src/Services/Ordering/Ordering.API/appsettings.json
  26. +62
    -70
      src/Services/Ordering/Ordering.BackgroundTasks/Startup.cs
  27. +2
    -2
      src/Services/Ordering/Ordering.BackgroundTasks/Tasks/GracePeriodManagerTask.cs
  28. +2
    -2
      src/Services/Ordering/Ordering.Domain/AggregatesModel/OrderAggregate/Order.cs
  29. +7
    -0
      src/Services/Ordering/Ordering.Domain/Events/OrderStatusChangedToStockConfirmedDomainEvent.cs
  30. +91
    -87
      src/Services/Ordering/Ordering.SignalrHub/Startup.cs
  31. +6
    -3
      src/Services/Payment/Payment.API/IntegrationEvents/EventHandling/OrderStatusChangedToStockConfirmedIntegrationEventHandler.cs
  32. +66
    -72
      src/Services/Payment/Payment.API/Startup.cs
  33. +5
    -3
      src/Services/TenantCustomisations/TenantACustomisations/Startup.cs
  34. +53
    -58
      src/Services/Webhooks/Webhooks.API/Startup.cs

+ 2
- 0
src/BuildingBlocks/EventBus/EventBus/Abstractions/IEventBus.cs View File

@ -20,5 +20,7 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Abstractions
void Unsubscribe<T, TH>() void Unsubscribe<T, TH>()
where TH : IIntegrationEventHandler<T> where TH : IIntegrationEventHandler<T>
where T : IntegrationEvent; where T : IntegrationEvent;
String GetVHost();
} }
} }

+ 15
- 0
src/BuildingBlocks/EventBus/EventBus/Abstractions/IMultiEventBus.cs View File

@ -0,0 +1,15 @@
using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Events;
namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Abstractions
{
public interface IMultiEventBus
{
void AddEventBus(IEventBus eventBus);
void Publish(IntegrationEvent @event);
void Subscribe<T, TH>()
where T : IntegrationEvent
where TH : IIntegrationEventHandler<T>;
}
}

+ 8
- 8
src/BuildingBlocks/EventBus/EventBus/IEventBusSubscriptionsManager.cs View File

@ -10,25 +10,25 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBus
{ {
bool IsEmpty { get; } bool IsEmpty { get; }
event EventHandler<string> OnEventRemoved; event EventHandler<string> OnEventRemoved;
void AddDynamicSubscription<TH>(string eventName)
void AddDynamicSubscription<TH>(string eventName, String vHost)
where TH : IDynamicIntegrationEventHandler; where TH : IDynamicIntegrationEventHandler;
void AddSubscription<T, TH>()
void AddSubscription<T, TH>(String vHost)
where T : IntegrationEvent where T : IntegrationEvent
where TH : IIntegrationEventHandler<T>; where TH : IIntegrationEventHandler<T>;
void RemoveSubscription<T, TH>()
void RemoveSubscription<T, TH>(String vHost)
where TH : IIntegrationEventHandler<T> where TH : IIntegrationEventHandler<T>
where T : IntegrationEvent; where T : IntegrationEvent;
void RemoveDynamicSubscription<TH>(string eventName)
void RemoveDynamicSubscription<TH>(string eventName, String vHost)
where TH : IDynamicIntegrationEventHandler; where TH : IDynamicIntegrationEventHandler;
bool HasSubscriptionsForEvent<T>() where T : IntegrationEvent;
bool HasSubscriptionsForEvent(string eventName);
bool HasSubscriptionsForEvent<T>(String vHost) where T : IntegrationEvent;
bool HasSubscriptionsForEvent(string eventName, String vHost);
Type GetEventTypeByName(string eventName); Type GetEventTypeByName(string eventName);
void Clear(); void Clear();
IEnumerable<SubscriptionInfo> GetHandlersForEvent<T>() where T : IntegrationEvent;
IEnumerable<SubscriptionInfo> GetHandlersForEvent(string eventName);
IEnumerable<SubscriptionInfo> GetHandlersForEvent<T>(String vHost) where T : IntegrationEvent;
IEnumerable<SubscriptionInfo> GetHandlersForEvent(string eventName, String vHost);
string GetEventKey<T>(); string GetEventKey<T>();
} }
} }

+ 75
- 36
src/BuildingBlocks/EventBus/EventBus/InMemoryEventBusSubscriptionsManager.cs View File

@ -10,48 +10,52 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBus
{ {
private readonly Dictionary<string, List<SubscriptionInfo>> _handlers;
//private readonly Dictionary<string, List<SubscriptionInfo>> _handlers;
private readonly Dictionary<CompositeHandler, List<SubscriptionInfo>> _handlers;
private readonly List<Type> _eventTypes; private readonly List<Type> _eventTypes;
public event EventHandler<string> OnEventRemoved; public event EventHandler<string> OnEventRemoved;
public InMemoryEventBusSubscriptionsManager() public InMemoryEventBusSubscriptionsManager()
{ {
_handlers = new Dictionary<string, List<SubscriptionInfo>>();
//_handlers = new Dictionary<string, List<SubscriptionInfo>>();
_handlers = new Dictionary<CompositeHandler, List<SubscriptionInfo>>();
_eventTypes = new List<Type>(); _eventTypes = new List<Type>();
} }
public bool IsEmpty => !_handlers.Keys.Any(); public bool IsEmpty => !_handlers.Keys.Any();
public void Clear() => _handlers.Clear(); public void Clear() => _handlers.Clear();
public void AddDynamicSubscription<TH>(string eventName)
public void AddDynamicSubscription<TH>(string eventName, String vHost)
where TH : IDynamicIntegrationEventHandler where TH : IDynamicIntegrationEventHandler
{ {
DoAddSubscription(typeof(TH), eventName, isDynamic: true);
DoAddSubscription(typeof(TH), eventName, isDynamic: true, vHost);
} }
public void AddSubscription<T, TH>()
public void AddSubscription<T, TH>(String vHost)
where T : IntegrationEvent where T : IntegrationEvent
where TH : IIntegrationEventHandler<T> where TH : IIntegrationEventHandler<T>
{ {
var eventName = GetEventKey<T>(); var eventName = GetEventKey<T>();
DoAddSubscription(typeof(TH), eventName, isDynamic: false);
DoAddSubscription(typeof(TH), eventName, isDynamic: false, vHost);
if (!_eventTypes.Contains(typeof(T))) if (!_eventTypes.Contains(typeof(T)))
{ {
_eventTypes.Add(typeof(T)); _eventTypes.Add(typeof(T));
} }
} }
private void DoAddSubscription(Type handlerType, string eventName, bool isDynamic)
private void DoAddSubscription(Type handlerType, string eventName, bool isDynamic, String vHost)
{ {
if (!HasSubscriptionsForEvent(eventName))
var compositeHandler = new CompositeHandler{TenantVHostName = vHost, EventName = eventName};
if (!HasSubscriptionsForEvent(eventName, vHost))
{ {
_handlers.Add(eventName, new List<SubscriptionInfo>());
_handlers.Add(compositeHandler, new List<SubscriptionInfo>());
} }
if (_handlers[eventName].Any(s => s.HandlerType == handlerType))
if (_handlers[compositeHandler].Any(s => s.HandlerType == handlerType))
{ {
throw new ArgumentException( throw new ArgumentException(
$"Handler Type {handlerType.Name} already registered for '{eventName}'", nameof(handlerType)); $"Handler Type {handlerType.Name} already registered for '{eventName}'", nameof(handlerType));
@ -59,41 +63,43 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBus
if (isDynamic) if (isDynamic)
{ {
_handlers[eventName].Add(SubscriptionInfo.Dynamic(handlerType));
_handlers[compositeHandler].Add(SubscriptionInfo.Dynamic(handlerType));
} }
else else
{ {
_handlers[eventName].Add(SubscriptionInfo.Typed(handlerType));
_handlers[compositeHandler].Add(SubscriptionInfo.Typed(handlerType));
} }
} }
public void RemoveDynamicSubscription<TH>(string eventName)
public void RemoveDynamicSubscription<TH>(string eventName, String vHost)
where TH : IDynamicIntegrationEventHandler where TH : IDynamicIntegrationEventHandler
{ {
var handlerToRemove = FindDynamicSubscriptionToRemove<TH>(eventName);
DoRemoveHandler(eventName, handlerToRemove);
var handlerToRemove = FindDynamicSubscriptionToRemove<TH>(eventName, vHost);
DoRemoveHandler(eventName, handlerToRemove, vHost);
} }
public void RemoveSubscription<T, TH>()
public void RemoveSubscription<T, TH>(String vHost)
where TH : IIntegrationEventHandler<T> where TH : IIntegrationEventHandler<T>
where T : IntegrationEvent where T : IntegrationEvent
{ {
var handlerToRemove = FindSubscriptionToRemove<T, TH>();
var handlerToRemove = FindSubscriptionToRemove<T, TH>(vHost);
var eventName = GetEventKey<T>(); var eventName = GetEventKey<T>();
DoRemoveHandler(eventName, handlerToRemove);
DoRemoveHandler(eventName, handlerToRemove, vHost);
} }
private void DoRemoveHandler(string eventName, SubscriptionInfo subsToRemove)
private void DoRemoveHandler(string eventName, SubscriptionInfo subsToRemove, String vHost)
{ {
if (subsToRemove != null) if (subsToRemove != null)
{ {
_handlers[eventName].Remove(subsToRemove);
if (!_handlers[eventName].Any())
var compositeHandler = new CompositeHandler{EventName = eventName, TenantVHostName = vHost};
_handlers[compositeHandler].Remove(subsToRemove);
if (!_handlers[compositeHandler].Any())
{ {
_handlers.Remove(eventName);
_handlers.Remove(compositeHandler);
var eventType = _eventTypes.SingleOrDefault(e => e.Name == eventName); var eventType = _eventTypes.SingleOrDefault(e => e.Name == eventName);
if (eventType != null) if (eventType != null)
{ {
@ -105,12 +111,12 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBus
} }
} }
public IEnumerable<SubscriptionInfo> GetHandlersForEvent<T>() where T : IntegrationEvent
public IEnumerable<SubscriptionInfo> GetHandlersForEvent<T>(String vHost) where T : IntegrationEvent
{ {
var key = GetEventKey<T>(); var key = GetEventKey<T>();
return GetHandlersForEvent(key);
return GetHandlersForEvent(key, vHost);
} }
public IEnumerable<SubscriptionInfo> GetHandlersForEvent(string eventName) => _handlers[eventName];
public IEnumerable<SubscriptionInfo> GetHandlersForEvent(string eventName, String vHost) => _handlers[new CompositeHandler{EventName = eventName, TenantVHostName = vHost}];
private void RaiseOnEventRemoved(string eventName) private void RaiseOnEventRemoved(string eventName)
{ {
@ -119,38 +125,43 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBus
} }
private SubscriptionInfo FindDynamicSubscriptionToRemove<TH>(string eventName)
private SubscriptionInfo FindDynamicSubscriptionToRemove<TH>(string eventName, String vHost)
where TH : IDynamicIntegrationEventHandler where TH : IDynamicIntegrationEventHandler
{ {
return DoFindSubscriptionToRemove(eventName, typeof(TH));
return DoFindSubscriptionToRemove(eventName, typeof(TH), vHost);
} }
private SubscriptionInfo FindSubscriptionToRemove<T, TH>()
private SubscriptionInfo FindSubscriptionToRemove<T, TH>(String vHost)
where T : IntegrationEvent where T : IntegrationEvent
where TH : IIntegrationEventHandler<T> where TH : IIntegrationEventHandler<T>
{ {
var eventName = GetEventKey<T>(); var eventName = GetEventKey<T>();
return DoFindSubscriptionToRemove(eventName, typeof(TH));
return DoFindSubscriptionToRemove(eventName, typeof(TH), vHost);
} }
private SubscriptionInfo DoFindSubscriptionToRemove(string eventName, Type handlerType)
private SubscriptionInfo DoFindSubscriptionToRemove(string eventName, Type handlerType, String vHost)
{ {
if (!HasSubscriptionsForEvent(eventName))
if (!HasSubscriptionsForEvent(eventName, vHost))
{ {
return null; return null;
} }
var compositeHandler = new CompositeHandler{EventName = eventName, TenantVHostName = vHost};
return _handlers[eventName].SingleOrDefault(s => s.HandlerType == handlerType);
return _handlers[compositeHandler].SingleOrDefault(s => s.HandlerType == handlerType);
} }
public bool HasSubscriptionsForEvent<T>() where T : IntegrationEvent
public bool HasSubscriptionsForEvent<T>(String vHost) where T : IntegrationEvent
{ {
var key = GetEventKey<T>(); var key = GetEventKey<T>();
return HasSubscriptionsForEvent(key);
return HasSubscriptionsForEvent(key, vHost);
} }
public bool HasSubscriptionsForEvent(string eventName) => _handlers.ContainsKey(eventName);
public bool HasSubscriptionsForEvent(string eventName, String vHost) => _handlers.ContainsKey(new CompositeHandler{EventName = eventName, TenantVHostName = vHost});
public Type GetEventTypeByName(string eventName) => _eventTypes.SingleOrDefault(t => t.Name == eventName); public Type GetEventTypeByName(string eventName) => _eventTypes.SingleOrDefault(t => t.Name == eventName);
@ -160,3 +171,31 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBus
} }
} }
} }
class CompositeHandler
{
public String TenantVHostName { get; set; }
public String EventName { get; set; }
protected bool Equals(CompositeHandler other)
{
return TenantVHostName == other.TenantVHostName && EventName == other.EventName;
}
public override bool Equals(object obj)
{
if (ReferenceEquals(null, obj)) return false;
if (ReferenceEquals(this, obj)) return true;
if (obj.GetType() != this.GetType()) return false;
return Equals((CompositeHandler) obj);
}
public override int GetHashCode()
{
unchecked
{
return ((TenantVHostName != null ? TenantVHostName.GetHashCode() : 0) * 397) ^ (EventName != null ? EventName.GetHashCode() : 0);
}
}
}

+ 16
- 13
src/BuildingBlocks/EventBus/EventBusRabbitMQ/EventBusRabbitMQ.cs View File

@ -12,12 +12,8 @@ using RabbitMQ.Client;
using RabbitMQ.Client.Events; using RabbitMQ.Client.Events;
using RabbitMQ.Client.Exceptions; using RabbitMQ.Client.Exceptions;
using System; using System;
using System.Collections;
using System.Collections.Generic; using System.Collections.Generic;
using System.Diagnostics;
using System.Net;
using System.Net.Http; using System.Net.Http;
using System.Net.Mime;
using System.Net.Sockets; using System.Net.Sockets;
using System.Text; using System.Text;
using System.Threading.Tasks; using System.Threading.Tasks;
@ -37,14 +33,15 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ
private static readonly String tenantACustomisationUrl = @"http://tenantacustomisation/"; private static readonly String tenantACustomisationUrl = @"http://tenantacustomisation/";
private static readonly String tenantManagerUrl = @"http://tenantmanager/"; private static readonly String tenantManagerUrl = @"http://tenantmanager/";
private readonly int _retryCount; private readonly int _retryCount;
private readonly Dictionary<int, String> _tenantInfo;
private readonly Dictionary<int, String> _tenantInfo;
public String vHost { get; set; }
private IModel _consumerChannel; private IModel _consumerChannel;
private string _queueName; private string _queueName;
public EventBusRabbitMQ(IRabbitMQPersistentConnection persistentConnection, ILogger<EventBusRabbitMQ> logger, public EventBusRabbitMQ(IRabbitMQPersistentConnection persistentConnection, ILogger<EventBusRabbitMQ> logger,
ILifetimeScope autofac, IEventBusSubscriptionsManager subsManager, string queueName = null,
ILifetimeScope autofac, IEventBusSubscriptionsManager subsManager, String vhost, string queueName = null,
int retryCount = 5) int retryCount = 5)
{ {
_persistentConnection = _persistentConnection =
@ -59,6 +56,7 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ
_tenantInfo = new Dictionary<int, string>(); _tenantInfo = new Dictionary<int, string>();
_tenantInfo.Add(1, "TenantA"); _tenantInfo.Add(1, "TenantA");
_tenantInfo.Add(2, "TenantB"); _tenantInfo.Add(2, "TenantB");
vHost = vhost;
} }
private void SubsManager_OnEventRemoved(object sender, string eventName) private void SubsManager_OnEventRemoved(object sender, string eventName)
@ -137,7 +135,7 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ
typeof(TH).GetGenericTypeName()); typeof(TH).GetGenericTypeName());
DoInternalSubscription(eventName); DoInternalSubscription(eventName);
_subsManager.AddDynamicSubscription<TH>(eventName);
_subsManager.AddDynamicSubscription<TH>(eventName, vHost);
StartBasicConsume(); StartBasicConsume();
} }
@ -151,13 +149,13 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ
_logger.LogInformation("Subscribing to event {EventName} with {EventHandler}", eventName, _logger.LogInformation("Subscribing to event {EventName} with {EventHandler}", eventName,
typeof(TH).GetGenericTypeName()); typeof(TH).GetGenericTypeName());
_subsManager.AddSubscription<T, TH>();
_subsManager.AddSubscription<T, TH>(vHost);
StartBasicConsume(); StartBasicConsume();
} }
private void DoInternalSubscription(string eventName) private void DoInternalSubscription(string eventName)
{ {
var containsKey = _subsManager.HasSubscriptionsForEvent(eventName);
var containsKey = _subsManager.HasSubscriptionsForEvent(eventName, vHost);
if (!containsKey) if (!containsKey)
{ {
if (!_persistentConnection.IsConnected) if (!_persistentConnection.IsConnected)
@ -182,13 +180,18 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ
_logger.LogInformation("Unsubscribing from event {EventName}", eventName); _logger.LogInformation("Unsubscribing from event {EventName}", eventName);
_subsManager.RemoveSubscription<T, TH>();
_subsManager.RemoveSubscription<T, TH>(vHost);
}
public string GetVHost()
{
return vHost;
} }
public void UnsubscribeDynamic<TH>(string eventName) public void UnsubscribeDynamic<TH>(string eventName)
where TH : IDynamicIntegrationEventHandler where TH : IDynamicIntegrationEventHandler
{ {
_subsManager.RemoveDynamicSubscription<TH>(eventName);
_subsManager.RemoveDynamicSubscription<TH>(eventName, vHost);
} }
public void Dispose() public void Dispose()
@ -341,11 +344,11 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ
{ {
_logger.LogWarning("Processing RabbitMQ event: {EventName}", eventName); _logger.LogWarning("Processing RabbitMQ event: {EventName}", eventName);
if (_subsManager.HasSubscriptionsForEvent(eventName))
if (_subsManager.HasSubscriptionsForEvent(eventName, vHost))
{ {
using (var scope = _autofac.BeginLifetimeScope(AUTOFAC_SCOPE_NAME)) using (var scope = _autofac.BeginLifetimeScope(AUTOFAC_SCOPE_NAME))
{ {
var subscriptions = _subsManager.GetHandlersForEvent(eventName);
var subscriptions = _subsManager.GetHandlersForEvent(eventName, vHost);
foreach (var subscription in subscriptions) foreach (var subscription in subscriptions)
{ {
if (subscription.IsDynamic) if (subscription.IsDynamic)


+ 12
- 0
src/BuildingBlocks/EventBus/EventBusRabbitMQ/IMultiRabbitMQPersistentConnections.cs View File

@ -0,0 +1,12 @@
using System;
using System.Collections.Generic;
namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ
{
public interface IMultiRabbitMQPersistentConnections
{
List<IRabbitMQPersistentConnection> GetConnections();
void AddConnection(IRabbitMQPersistentConnection connection);
void RemoveConnection(IRabbitMQPersistentConnection connection);
}
}

+ 49
- 0
src/BuildingBlocks/EventBus/EventBusRabbitMQ/MultiEventBusRabbitMQ.cs View File

@ -0,0 +1,49 @@
using System;
using System.Collections.Generic;
using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Abstractions;
using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Events;
namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ
{
public class MultiEventBusRabbitMQ : IMultiEventBus
{
private List<IEventBus> _eventBuses;
private Dictionary<int, String> _tenants;
public MultiEventBusRabbitMQ(List<IEventBus> eventBuses, Dictionary<int, String> tenants)
{
_eventBuses = eventBuses;
_tenants = tenants;
}
public void AddEventBus(IEventBus eventBus)
{
_eventBuses.Add(eventBus);
}
public void Publish(IntegrationEvent @event)
{
if (@event.TenantId == 0)//System wide event?
{
_eventBuses.ForEach(eventBus =>
{
eventBus.Publish(@event);
});
}
else
{
//TODO requires ALL events to have tenantId set!
_tenants.TryGetValue(@event.TenantId, out String tenantName);
var actualEventBus = _eventBuses.Find(e => e.GetVHost().Equals(tenantName));
actualEventBus.Publish(@event);
}
}
public void Subscribe<T, TH>() where T : IntegrationEvent where TH : IIntegrationEventHandler<T>
{
_eventBuses.ForEach(e => { e.Subscribe<T, TH>(); });
}
}
}

+ 30
- 0
src/BuildingBlocks/EventBus/EventBusRabbitMQ/MultiRabbitMQPersistentConnections.cs View File

@ -0,0 +1,30 @@
using System.Collections.Generic;
namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ
{
public class MultiRabbitMQPersistentConnections : IMultiRabbitMQPersistentConnections
{
public List<IRabbitMQPersistentConnection> Connections;
public MultiRabbitMQPersistentConnections()
{
Connections = new List<IRabbitMQPersistentConnection>();
}
public List<IRabbitMQPersistentConnection> GetConnections()
{
return Connections;
}
public void AddConnection(IRabbitMQPersistentConnection connection)
{
Connections.Add((connection));
}
public void RemoveConnection(IRabbitMQPersistentConnection connection)
{
Connections.Remove(connection);
}
}
}

+ 2
- 1
src/BuildingBlocks/EventBus/EventBusServiceBus/EventBusServiceBus.cs View File

@ -1,4 +1,4 @@
namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusServiceBus
/*namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusServiceBus
{ {
using Autofac; using Autofac;
using Microsoft.Azure.ServiceBus; using Microsoft.Azure.ServiceBus;
@ -206,3 +206,4 @@
} }
} }
} }
*/

+ 2
- 2
src/Services/Basket/Basket.API/Controllers/BasketController.cs View File

@ -21,14 +21,14 @@ namespace Microsoft.eShopOnContainers.Services.Basket.API.Controllers
{ {
private readonly IBasketRepository _repository; private readonly IBasketRepository _repository;
private readonly IIdentityService _identityService; private readonly IIdentityService _identityService;
private readonly IEventBus _eventBus;
private readonly IMultiEventBus _eventBus;
private readonly ILogger<BasketController> _logger; private readonly ILogger<BasketController> _logger;
public BasketController( public BasketController(
ILogger<BasketController> logger, ILogger<BasketController> logger,
IBasketRepository repository, IBasketRepository repository,
IIdentityService identityService, IIdentityService identityService,
IEventBus eventBus)
IMultiEventBus eventBus)
{ {
_logger = logger; _logger = logger;
_repository = repository; _repository = repository;


+ 85
- 95
src/Services/Basket/Basket.API/Startup.cs View File

@ -5,7 +5,6 @@ using Basket.API.Infrastructure.Middlewares;
using Basket.API.IntegrationEvents.EventHandling; using Basket.API.IntegrationEvents.EventHandling;
using Basket.API.IntegrationEvents.Events; using Basket.API.IntegrationEvents.Events;
using HealthChecks.UI.Client; using HealthChecks.UI.Client;
using Microsoft.ApplicationInsights.Extensibility; using Microsoft.ApplicationInsights.Extensibility;
using Microsoft.ApplicationInsights.ServiceFabric; using Microsoft.ApplicationInsights.ServiceFabric;
using Microsoft.AspNetCore.Authentication.JwtBearer; using Microsoft.AspNetCore.Authentication.JwtBearer;
@ -58,7 +57,6 @@ namespace Microsoft.eShopOnContainers.Services.Basket.API
{ {
options.Filters.Add(typeof(HttpGlobalExceptionFilter)); options.Filters.Add(typeof(HttpGlobalExceptionFilter));
options.Filters.Add(typeof(ValidateModelStateFilter)); options.Filters.Add(typeof(ValidateModelStateFilter));
}) })
.SetCompatibilityVersion(CompatibilityVersion.Version_2_2) .SetCompatibilityVersion(CompatibilityVersion.Version_2_2)
.AddControllersAsServices(); .AddControllersAsServices();
@ -85,50 +83,15 @@ namespace Microsoft.eShopOnContainers.Services.Basket.API
return ConnectionMultiplexer.Connect(configuration); return ConnectionMultiplexer.Connect(configuration);
}); });
if (Configuration.GetValue<bool>("AzureServiceBusEnabled"))
services.AddSingleton<IMultiRabbitMQPersistentConnections>(sp =>
{ {
services.AddSingleton<IServiceBusPersisterConnection>(sp =>
{
var logger = sp.GetRequiredService<ILogger<DefaultServiceBusPersisterConnection>>();
var serviceBusConnectionString = Configuration["EventBusConnection"];
var serviceBusConnection = new ServiceBusConnectionStringBuilder(serviceBusConnectionString);
return new DefaultServiceBusPersisterConnection(serviceBusConnection, logger);
});
}
else
{
services.AddSingleton<IRabbitMQPersistentConnection>(sp =>
{
var logger = sp.GetRequiredService<ILogger<DefaultRabbitMQPersistentConnection>>();
var factory = new ConnectionFactory()
{
HostName = Configuration["EventBusConnection"],
DispatchConsumersAsync = true
};
if (!string.IsNullOrEmpty(Configuration["EventBusUserName"]))
{
factory.UserName = Configuration["EventBusUserName"];
}
IMultiRabbitMQPersistentConnections connections = new MultiRabbitMQPersistentConnections();
connections.AddConnection(GenerateConnection("TenantA", sp));
connections.AddConnection(GenerateConnection("TenantB", sp));
if (!string.IsNullOrEmpty(Configuration["EventBusPassword"]))
{
factory.Password = Configuration["EventBusPassword"];
}
return connections;
});
var retryCount = 5;
if (!string.IsNullOrEmpty(Configuration["EventBusRetryCount"]))
{
retryCount = int.Parse(Configuration["EventBusRetryCount"]);
}
return new DefaultRabbitMQPersistentConnection(factory, logger, retryCount);
});
}
RegisterEventBus(services); RegisterEventBus(services);
@ -151,7 +114,7 @@ namespace Microsoft.eShopOnContainers.Services.Basket.API
TokenUrl = $"{Configuration.GetValue<string>("IdentityUrlExternal")}/connect/token", TokenUrl = $"{Configuration.GetValue<string>("IdentityUrlExternal")}/connect/token",
Scopes = new Dictionary<string, string>() Scopes = new Dictionary<string, string>()
{ {
{ "basket", "Basket API" }
{"basket", "Basket API"}
} }
}); });
@ -162,10 +125,10 @@ namespace Microsoft.eShopOnContainers.Services.Basket.API
{ {
options.AddPolicy("CorsPolicy", options.AddPolicy("CorsPolicy",
builder => builder builder => builder
.SetIsOriginAllowed((host) => true)
.AllowAnyMethod()
.AllowAnyHeader()
.AllowCredentials());
.SetIsOriginAllowed((host) => true)
.AllowAnyMethod()
.AllowAnyHeader()
.AllowCredentials());
}); });
services.AddSingleton<IHttpContextAccessor, HttpContextAccessor>(); services.AddSingleton<IHttpContextAccessor, HttpContextAccessor>();
services.AddTransient<IBasketRepository, RedisBasketRepository>(); services.AddTransient<IBasketRepository, RedisBasketRepository>();
@ -178,7 +141,38 @@ namespace Microsoft.eShopOnContainers.Services.Basket.API
return new AutofacServiceProvider(container.Build()); return new AutofacServiceProvider(container.Build());
} }
private IRabbitMQPersistentConnection GenerateConnection(String vHost, IServiceProvider sp)
{
var logger = sp.GetRequiredService<ILogger<DefaultRabbitMQPersistentConnection>>();
var factory = new ConnectionFactory()
{
HostName = Configuration["EventBusConnection"],
DispatchConsumersAsync = true
};
if (!string.IsNullOrEmpty(Configuration["EventBusUserName"]))
{
factory.UserName = Configuration["EventBusUserName"];
}
if (!string.IsNullOrEmpty(Configuration["EventBusPassword"]))
{
factory.Password = Configuration["EventBusPassword"];
}
factory.VirtualHost = vHost;
var retryCount = 5;
if (!string.IsNullOrEmpty(Configuration["EventBusRetryCount"]))
{
retryCount = int.Parse(Configuration["EventBusRetryCount"]);
}
return new DefaultRabbitMQPersistentConnection(factory, logger, retryCount);
}
// This method gets called by the runtime. Use this method to configure the HTTP request pipeline. // This method gets called by the runtime. Use this method to configure the HTTP request pipeline.
public void Configure(IApplicationBuilder app, IHostingEnvironment env, ILoggerFactory loggerFactory) public void Configure(IApplicationBuilder app, IHostingEnvironment env, ILoggerFactory loggerFactory)
@ -203,7 +197,7 @@ namespace Microsoft.eShopOnContainers.Services.Basket.API
Predicate = r => r.Name.Contains("self") Predicate = r => r.Name.Contains("self")
}); });
app.UseStaticFiles();
app.UseStaticFiles();
app.UseCors("CorsPolicy"); app.UseCors("CorsPolicy");
ConfigureAuth(app); ConfigureAuth(app);
@ -211,27 +205,29 @@ namespace Microsoft.eShopOnContainers.Services.Basket.API
app.UseMvcWithDefaultRoute(); app.UseMvcWithDefaultRoute();
app.UseSwagger() app.UseSwagger()
.UseSwaggerUI(c =>
{
c.SwaggerEndpoint($"{ (!string.IsNullOrEmpty(pathBase) ? pathBase : string.Empty) }/swagger/v1/swagger.json", "Basket.API V1");
c.OAuthClientId ("basketswaggerui");
c.OAuthAppName("Basket Swagger UI");
});
.UseSwaggerUI(c =>
{
c.SwaggerEndpoint(
$"{(!string.IsNullOrEmpty(pathBase) ? pathBase : string.Empty)}/swagger/v1/swagger.json",
"Basket.API V1");
c.OAuthClientId("basketswaggerui");
c.OAuthAppName("Basket Swagger UI");
});
ConfigureEventBus(app); ConfigureEventBus(app);
} }
private void RegisterAppInsights(IServiceCollection services) private void RegisterAppInsights(IServiceCollection services)
{ {
services.AddApplicationInsightsTelemetry(Configuration); services.AddApplicationInsightsTelemetry(Configuration);
var orchestratorType = Configuration.GetValue<string>("OrchestratorType"); var orchestratorType = Configuration.GetValue<string>("OrchestratorType");
if (orchestratorType?.ToUpper() == "K8S") if (orchestratorType?.ToUpper() == "K8S")
{ {
// Enable K8s telemetry initializer // Enable K8s telemetry initializer
services.AddApplicationInsightsKubernetesEnricher(); services.AddApplicationInsightsKubernetesEnricher();
} }
if (orchestratorType?.ToUpper() == "SF") if (orchestratorType?.ToUpper() == "SF")
{ {
// Enable SF telemetry initializer // Enable SF telemetry initializer
@ -245,13 +241,12 @@ namespace Microsoft.eShopOnContainers.Services.Basket.API
// prevent from mapping "sub" claim to nameidentifier. // prevent from mapping "sub" claim to nameidentifier.
JwtSecurityTokenHandler.DefaultInboundClaimTypeMap.Clear(); JwtSecurityTokenHandler.DefaultInboundClaimTypeMap.Clear();
var identityUrl = Configuration.GetValue<string>("IdentityUrl");
var identityUrl = Configuration.GetValue<string>("IdentityUrl");
services.AddAuthentication(options => services.AddAuthentication(options =>
{ {
options.DefaultAuthenticateScheme = JwtBearerDefaults.AuthenticationScheme; options.DefaultAuthenticateScheme = JwtBearerDefaults.AuthenticationScheme;
options.DefaultChallengeScheme = JwtBearerDefaults.AuthenticationScheme; options.DefaultChallengeScheme = JwtBearerDefaults.AuthenticationScheme;
}).AddJwtBearer(options => }).AddJwtBearer(options =>
{ {
options.Authority = identityUrl; options.Authority = identityUrl;
@ -274,37 +269,31 @@ namespace Microsoft.eShopOnContainers.Services.Basket.API
{ {
var subscriptionClientName = Configuration["SubscriptionClientName"]; var subscriptionClientName = Configuration["SubscriptionClientName"];
if (Configuration.GetValue<bool>("AzureServiceBusEnabled"))
services.AddSingleton<IMultiEventBus, MultiEventBusRabbitMQ>(sp =>
{ {
services.AddSingleton<IEventBus, EventBusServiceBus>(sp =>
{
var serviceBusPersisterConnection = sp.GetRequiredService<IServiceBusPersisterConnection>();
var iLifetimeScope = sp.GetRequiredService<ILifetimeScope>();
var logger = sp.GetRequiredService<ILogger<EventBusServiceBus>>();
var eventBusSubcriptionsManager = sp.GetRequiredService<IEventBusSubscriptionsManager>();
var multiRabbitMqPersistentConnections = sp.GetRequiredService<IMultiRabbitMQPersistentConnections>();
var iLifetimeScope = sp.GetRequiredService<ILifetimeScope>();
var logger = sp.GetRequiredService<ILogger<EventBusRabbitMQ>>();
var eventBusSubcriptionsManager = sp.GetRequiredService<IEventBusSubscriptionsManager>();
return new EventBusServiceBus(serviceBusPersisterConnection, logger,
eventBusSubcriptionsManager, subscriptionClientName, iLifetimeScope);
});
}
else
{
services.AddSingleton<IEventBus, EventBusRabbitMQ>(sp =>
var retryCount = 5;
if (!string.IsNullOrEmpty(Configuration["EventBusRetryCount"]))
{ {
var rabbitMQPersistentConnection = sp.GetRequiredService<IRabbitMQPersistentConnection>();
var iLifetimeScope = sp.GetRequiredService<ILifetimeScope>();
var logger = sp.GetRequiredService<ILogger<EventBusRabbitMQ>>();
var eventBusSubcriptionsManager = sp.GetRequiredService<IEventBusSubscriptionsManager>();
retryCount = int.Parse(Configuration["EventBusRetryCount"]);
}
var retryCount = 5;
if (!string.IsNullOrEmpty(Configuration["EventBusRetryCount"]))
{
retryCount = int.Parse(Configuration["EventBusRetryCount"]);
}
List<IEventBus> eventBuses = new List<IEventBus>();
return new EventBusRabbitMQ(rabbitMQPersistentConnection, logger, iLifetimeScope, eventBusSubcriptionsManager, subscriptionClientName, retryCount);
});
}
eventBuses.Add(new EventBusRabbitMQ(multiRabbitMqPersistentConnections.GetConnections()[0], logger,
iLifetimeScope, eventBusSubcriptionsManager, "TenantA", subscriptionClientName, retryCount));
eventBuses.Add(new EventBusRabbitMQ(multiRabbitMqPersistentConnections.GetConnections()[1], logger,
iLifetimeScope, eventBusSubcriptionsManager, "TenantB", subscriptionClientName, retryCount));
Dictionary<int, String> tenants = new Dictionary<int, string>();
tenants.Add(1, "TenantA");
tenants.Add(2, "TenantB");
return new MultiEventBusRabbitMQ(eventBuses, tenants);
});
services.AddSingleton<IEventBusSubscriptionsManager, InMemoryEventBusSubscriptionsManager>(); services.AddSingleton<IEventBusSubscriptionsManager, InMemoryEventBusSubscriptionsManager>();
@ -314,26 +303,27 @@ namespace Microsoft.eShopOnContainers.Services.Basket.API
private void ConfigureEventBus(IApplicationBuilder app) private void ConfigureEventBus(IApplicationBuilder app)
{ {
var eventBus = app.ApplicationServices.GetRequiredService<IEventBus>();
var eventBus = app.ApplicationServices.GetRequiredService<IMultiEventBus>();
eventBus.Subscribe<ProductPriceChangedIntegrationEvent, ProductPriceChangedIntegrationEventHandler>(); eventBus.Subscribe<ProductPriceChangedIntegrationEvent, ProductPriceChangedIntegrationEventHandler>();
eventBus.Subscribe<OrderStartedIntegrationEvent, OrderStartedIntegrationEventHandler>(); eventBus.Subscribe<OrderStartedIntegrationEvent, OrderStartedIntegrationEventHandler>();
}
}
} }
public static class CustomExtensionMethods public static class CustomExtensionMethods
{ {
public static IServiceCollection AddCustomHealthCheck(this IServiceCollection services, IConfiguration configuration)
public static IServiceCollection AddCustomHealthCheck(this IServiceCollection services,
IConfiguration configuration)
{ {
var hcBuilder = services.AddHealthChecks(); var hcBuilder = services.AddHealthChecks();
hcBuilder.AddCheck("self", () => HealthCheckResult.Healthy()); hcBuilder.AddCheck("self", () => HealthCheckResult.Healthy());
hcBuilder
hcBuilder
.AddRedis( .AddRedis(
configuration["ConnectionString"], configuration["ConnectionString"],
name: "redis-check", name: "redis-check",
tags: new string[] { "redis" });
tags: new string[] {"redis"});
if (configuration.GetValue<bool>("AzureServiceBusEnabled")) if (configuration.GetValue<bool>("AzureServiceBusEnabled"))
{ {
@ -342,7 +332,7 @@ namespace Microsoft.eShopOnContainers.Services.Basket.API
configuration["EventBusConnection"], configuration["EventBusConnection"],
topicName: "eshop_event_bus", topicName: "eshop_event_bus",
name: "basket-servicebus-check", name: "basket-servicebus-check",
tags: new string[] { "servicebus" });
tags: new string[] {"servicebus"});
} }
else else
{ {
@ -350,10 +340,10 @@ namespace Microsoft.eShopOnContainers.Services.Basket.API
.AddRabbitMQ( .AddRabbitMQ(
$"amqp://{configuration["EventBusConnection"]}", $"amqp://{configuration["EventBusConnection"]}",
name: "basket-rabbitmqbus-check", name: "basket-rabbitmqbus-check",
tags: new string[] { "rabbitmqbus" });
tags: new string[] {"rabbitmqbus"});
} }
return services; return services;
} }
} }
}
}

+ 2
- 2
src/Services/Catalog/Catalog.API/IntegrationEvents/CatalogIntegrationEventService.cs View File

@ -17,14 +17,14 @@ namespace Catalog.API.IntegrationEvents
public class CatalogIntegrationEventService : ICatalogIntegrationEventService public class CatalogIntegrationEventService : ICatalogIntegrationEventService
{ {
private readonly Func<DbConnection, IIntegrationEventLogService> _integrationEventLogServiceFactory; private readonly Func<DbConnection, IIntegrationEventLogService> _integrationEventLogServiceFactory;
private readonly IEventBus _eventBus;
private readonly IMultiEventBus _eventBus;
private readonly CatalogContext _catalogContext; private readonly CatalogContext _catalogContext;
private readonly IIntegrationEventLogService _eventLogService; private readonly IIntegrationEventLogService _eventLogService;
private readonly ILogger<CatalogIntegrationEventService> _logger; private readonly ILogger<CatalogIntegrationEventService> _logger;
public CatalogIntegrationEventService( public CatalogIntegrationEventService(
ILogger<CatalogIntegrationEventService> logger, ILogger<CatalogIntegrationEventService> logger,
IEventBus eventBus,
IMultiEventBus eventBus,
CatalogContext catalogContext, CatalogContext catalogContext,
Func<DbConnection, IIntegrationEventLogService> integrationEventLogServiceFactory) Func<DbConnection, IIntegrationEventLogService> integrationEventLogServiceFactory)
{ {


+ 2
- 0
src/Services/Catalog/Catalog.API/IntegrationEvents/EventHandling/OrderStatusChangedToAwaitingValidationIntegrationEventHandler.cs View File

@ -49,6 +49,8 @@
? (IntegrationEvent)new OrderStockRejectedIntegrationEvent(@event.OrderId, confirmedOrderStockItems) ? (IntegrationEvent)new OrderStockRejectedIntegrationEvent(@event.OrderId, confirmedOrderStockItems)
: new OrderStockConfirmedIntegrationEvent(@event.OrderId); : new OrderStockConfirmedIntegrationEvent(@event.OrderId);
confirmedIntegrationEvent.TenantId = @event.TenantId;
await _catalogIntegrationEventService.SaveEventAndCatalogContextChangesAsync(confirmedIntegrationEvent); await _catalogIntegrationEventService.SaveEventAndCatalogContextChangesAsync(confirmedIntegrationEvent);
await _catalogIntegrationEventService.PublishThroughEventBusAsync(confirmedIntegrationEvent); await _catalogIntegrationEventService.PublishThroughEventBusAsync(confirmedIntegrationEvent);


+ 64
- 43
src/Services/Catalog/Catalog.API/Startup.cs View File

@ -26,6 +26,7 @@ using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options; using Microsoft.Extensions.Options;
using RabbitMQ.Client; using RabbitMQ.Client;
using System; using System;
using System.Collections.Generic;
using System.Data.Common; using System.Data.Common;
using System.Reflection; using System.Reflection;
using HealthChecks.UI.Client; using HealthChecks.UI.Client;
@ -101,7 +102,7 @@ namespace Microsoft.eShopOnContainers.Services.Catalog.API
protected virtual void ConfigureEventBus(IApplicationBuilder app) protected virtual void ConfigureEventBus(IApplicationBuilder app)
{ {
var eventBus = app.ApplicationServices.GetRequiredService<IEventBus>();
var eventBus = app.ApplicationServices.GetRequiredService<IMultiEventBus>();
eventBus.Subscribe<OrderStatusChangedToAwaitingValidationIntegrationEvent, OrderStatusChangedToAwaitingValidationIntegrationEventHandler>(); eventBus.Subscribe<OrderStatusChangedToAwaitingValidationIntegrationEvent, OrderStatusChangedToAwaitingValidationIntegrationEventHandler>();
eventBus.Subscribe<OrderStatusChangedToPaidIntegrationEvent, OrderStatusChangedToPaidIntegrationEventHandler>(); eventBus.Subscribe<OrderStatusChangedToPaidIntegrationEvent, OrderStatusChangedToPaidIntegrationEventHandler>();
} }
@ -275,21 +276,7 @@ namespace Microsoft.eShopOnContainers.Services.Catalog.API
sp => (DbConnection c) => new IntegrationEventLogService(c)); sp => (DbConnection c) => new IntegrationEventLogService(c));
services.AddTransient<ICatalogIntegrationEventService, CatalogIntegrationEventService>(); services.AddTransient<ICatalogIntegrationEventService, CatalogIntegrationEventService>();
if (configuration.GetValue<bool>("AzureServiceBusEnabled"))
{
services.AddSingleton<IServiceBusPersisterConnection>(sp =>
{
var settings = sp.GetRequiredService<IOptions<CatalogSettings>>().Value;
var logger = sp.GetRequiredService<ILogger<DefaultServiceBusPersisterConnection>>();
var serviceBusConnection = new ServiceBusConnectionStringBuilder(settings.EventBusConnection);
return new DefaultServiceBusPersisterConnection(serviceBusConnection, logger);
});
}
else
{
services.AddSingleton<IRabbitMQPersistentConnection>(sp => services.AddSingleton<IRabbitMQPersistentConnection>(sp =>
{ {
var settings = sp.GetRequiredService<IOptions<CatalogSettings>>().Value; var settings = sp.GetRequiredService<IOptions<CatalogSettings>>().Value;
@ -310,6 +297,8 @@ namespace Microsoft.eShopOnContainers.Services.Catalog.API
{ {
factory.Password = configuration["EventBusPassword"]; factory.Password = configuration["EventBusPassword"];
} }
factory.VirtualHost = "TenantA";
var retryCount = 5; var retryCount = 5;
if (!string.IsNullOrEmpty(configuration["EventBusRetryCount"])) if (!string.IsNullOrEmpty(configuration["EventBusRetryCount"]))
@ -319,47 +308,79 @@ namespace Microsoft.eShopOnContainers.Services.Catalog.API
return new DefaultRabbitMQPersistentConnection(factory, logger, retryCount); return new DefaultRabbitMQPersistentConnection(factory, logger, retryCount);
}); });
}
services.AddSingleton<IMultiRabbitMQPersistentConnections>(sp =>
{
IMultiRabbitMQPersistentConnections connections = new MultiRabbitMQPersistentConnections();
connections.AddConnection(GenerateConnection("TenantA", sp, configuration));
connections.AddConnection(GenerateConnection("TenantB", sp, configuration));
return connections;
});
return services; return services;
} }
public static IServiceCollection AddEventBus(this IServiceCollection services, IConfiguration configuration)
private static IRabbitMQPersistentConnection GenerateConnection(String vHost, IServiceProvider sp, IConfiguration configuration)
{ {
var subscriptionClientName = configuration["SubscriptionClientName"];
var logger = sp.GetRequiredService<ILogger<DefaultRabbitMQPersistentConnection>>();
if (configuration.GetValue<bool>("AzureServiceBusEnabled"))
var factory = new ConnectionFactory()
{ {
services.AddSingleton<IEventBus, EventBusServiceBus>(sp =>
{
var serviceBusPersisterConnection = sp.GetRequiredService<IServiceBusPersisterConnection>();
var iLifetimeScope = sp.GetRequiredService<ILifetimeScope>();
var logger = sp.GetRequiredService<ILogger<EventBusServiceBus>>();
var eventBusSubcriptionsManager = sp.GetRequiredService<IEventBusSubscriptionsManager>();
HostName = configuration["EventBusConnection"],
DispatchConsumersAsync = true
};
return new EventBusServiceBus(serviceBusPersisterConnection, logger,
eventBusSubcriptionsManager, subscriptionClientName, iLifetimeScope);
});
if (!string.IsNullOrEmpty(configuration["EventBusUserName"]))
{
factory.UserName = configuration["EventBusUserName"];
}
if (!string.IsNullOrEmpty(configuration["EventBusPassword"]))
{
factory.Password = configuration["EventBusPassword"];
} }
else
factory.VirtualHost = vHost;
var retryCount = 5;
if (!string.IsNullOrEmpty(configuration["EventBusRetryCount"]))
{
retryCount = int.Parse(configuration["EventBusRetryCount"]);
}
return new DefaultRabbitMQPersistentConnection(factory, logger, retryCount);
}
public static IServiceCollection AddEventBus(this IServiceCollection services, IConfiguration configuration)
{
var subscriptionClientName = configuration["SubscriptionClientName"];
services.AddSingleton<IMultiEventBus, MultiEventBusRabbitMQ>(sp =>
{ {
services.AddSingleton<IEventBus, EventBusRabbitMQ>(sp =>
var multiRabbitMqPersistentConnections = sp.GetRequiredService<IMultiRabbitMQPersistentConnections>();
var iLifetimeScope = sp.GetRequiredService<ILifetimeScope>();
var logger = sp.GetRequiredService<ILogger<EventBusRabbitMQ>>();
var eventBusSubcriptionsManager = sp.GetRequiredService<IEventBusSubscriptionsManager>();
var retryCount = 5;
if (!string.IsNullOrEmpty(configuration["EventBusRetryCount"]))
{ {
var rabbitMQPersistentConnection = sp.GetRequiredService<IRabbitMQPersistentConnection>();
var iLifetimeScope = sp.GetRequiredService<ILifetimeScope>();
var logger = sp.GetRequiredService<ILogger<EventBusRabbitMQ>>();
var eventBusSubcriptionsManager = sp.GetRequiredService<IEventBusSubscriptionsManager>();
retryCount = int.Parse(configuration["EventBusRetryCount"]);
}
var retryCount = 5;
if (!string.IsNullOrEmpty(configuration["EventBusRetryCount"]))
{
retryCount = int.Parse(configuration["EventBusRetryCount"]);
}
List<IEventBus> eventBuses = new List<IEventBus>();
return new EventBusRabbitMQ(rabbitMQPersistentConnection, logger, iLifetimeScope, eventBusSubcriptionsManager, subscriptionClientName, retryCount);
});
}
eventBuses.Add(new EventBusRabbitMQ(multiRabbitMqPersistentConnections.GetConnections()[0], logger,
iLifetimeScope, eventBusSubcriptionsManager, "TenantA", subscriptionClientName, retryCount));
eventBuses.Add(new EventBusRabbitMQ(multiRabbitMqPersistentConnections.GetConnections()[1], logger,
iLifetimeScope, eventBusSubcriptionsManager, "TenantB", subscriptionClientName, retryCount));
Dictionary<int, String> tenants = new Dictionary<int, string>();
tenants.Add(1, "TenantA");
tenants.Add(2, "TenantB");
return new MultiEventBusRabbitMQ(eventBuses, tenants);
});
services.AddSingleton<IEventBusSubscriptionsManager, InMemoryEventBusSubscriptionsManager>(); services.AddSingleton<IEventBusSubscriptionsManager, InMemoryEventBusSubscriptionsManager>();
services.AddTransient<OrderStatusChangedToAwaitingValidationIntegrationEventHandler>(); services.AddTransient<OrderStatusChangedToAwaitingValidationIntegrationEventHandler>();


+ 2
- 2
src/Services/Location/Locations.API/Infrastructure/Services/LocationsService.cs View File

@ -14,12 +14,12 @@
public class LocationsService : ILocationsService public class LocationsService : ILocationsService
{ {
private readonly ILocationsRepository _locationsRepository; private readonly ILocationsRepository _locationsRepository;
private readonly IEventBus _eventBus;
private readonly IMultiEventBus _eventBus;
private readonly ILogger<LocationsService> _logger; private readonly ILogger<LocationsService> _logger;
public LocationsService( public LocationsService(
ILocationsRepository locationsRepository, ILocationsRepository locationsRepository,
IEventBus eventBus,
IMultiEventBus eventBus,
ILogger<LocationsService> logger) ILogger<LocationsService> logger)
{ {
_locationsRepository = locationsRepository ?? throw new ArgumentNullException(nameof(locationsRepository)); _locationsRepository = locationsRepository ?? throw new ArgumentNullException(nameof(locationsRepository));


+ 89
- 98
src/Services/Location/Locations.API/Startup.cs View File

@ -46,10 +46,7 @@ namespace Microsoft.eShopOnContainers.Services.Locations.API
services services
.AddCustomHealthCheck(Configuration) .AddCustomHealthCheck(Configuration)
.AddMvc(options =>
{
options.Filters.Add(typeof(HttpGlobalExceptionFilter));
})
.AddMvc(options => { options.Filters.Add(typeof(HttpGlobalExceptionFilter)); })
.SetCompatibilityVersion(CompatibilityVersion.Version_2_2) .SetCompatibilityVersion(CompatibilityVersion.Version_2_2)
.AddControllersAsServices(); .AddControllersAsServices();
@ -57,49 +54,14 @@ namespace Microsoft.eShopOnContainers.Services.Locations.API
services.Configure<LocationSettings>(Configuration); services.Configure<LocationSettings>(Configuration);
if (Configuration.GetValue<bool>("AzureServiceBusEnabled"))
services.AddSingleton<IMultiRabbitMQPersistentConnections>(sp =>
{ {
services.AddSingleton<IServiceBusPersisterConnection>(sp =>
{
var logger = sp.GetRequiredService<ILogger<DefaultServiceBusPersisterConnection>>();
var serviceBusConnectionString = Configuration["EventBusConnection"];
var serviceBusConnection = new ServiceBusConnectionStringBuilder(serviceBusConnectionString);
return new DefaultServiceBusPersisterConnection(serviceBusConnection, logger);
});
}
else
{
services.AddSingleton<IRabbitMQPersistentConnection>(sp =>
{
var logger = sp.GetRequiredService<ILogger<DefaultRabbitMQPersistentConnection>>();
var factory = new ConnectionFactory()
{
HostName = Configuration["EventBusConnection"],
DispatchConsumersAsync = true
};
if (!string.IsNullOrEmpty(Configuration["EventBusUserName"]))
{
factory.UserName = Configuration["EventBusUserName"];
}
IMultiRabbitMQPersistentConnections connections = new MultiRabbitMQPersistentConnections();
connections.AddConnection(GenerateConnection("TenantA", sp));
connections.AddConnection(GenerateConnection("TenantB", sp));
if (!string.IsNullOrEmpty(Configuration["EventBusPassword"]))
{
factory.Password = Configuration["EventBusPassword"];
}
var retryCount = 5;
if (!string.IsNullOrEmpty(Configuration["EventBusRetryCount"]))
{
retryCount = int.Parse(Configuration["EventBusRetryCount"]);
}
return new DefaultRabbitMQPersistentConnection(factory, logger, retryCount);
});
}
return connections;
});
RegisterEventBus(services); RegisterEventBus(services);
@ -123,22 +85,21 @@ namespace Microsoft.eShopOnContainers.Services.Locations.API
TokenUrl = $"{Configuration.GetValue<string>("IdentityUrlExternal")}/connect/token", TokenUrl = $"{Configuration.GetValue<string>("IdentityUrlExternal")}/connect/token",
Scopes = new Dictionary<string, string>() Scopes = new Dictionary<string, string>()
{ {
{ "locations", "Locations API" }
{"locations", "Locations API"}
} }
}); });
options.OperationFilter<AuthorizeCheckOperationFilter>(); options.OperationFilter<AuthorizeCheckOperationFilter>();
}); });
services.AddCors(options => services.AddCors(options =>
{ {
options.AddPolicy("CorsPolicy", options.AddPolicy("CorsPolicy",
builder => builder builder => builder
.SetIsOriginAllowed((host) => true)
.AllowAnyMethod()
.AllowAnyHeader()
.AllowCredentials());
.SetIsOriginAllowed((host) => true)
.AllowAnyMethod()
.AllowAnyHeader()
.AllowCredentials());
}); });
services.AddSingleton<IHttpContextAccessor, HttpContextAccessor>(); services.AddSingleton<IHttpContextAccessor, HttpContextAccessor>();
@ -153,6 +114,38 @@ namespace Microsoft.eShopOnContainers.Services.Locations.API
return new AutofacServiceProvider(container.Build()); return new AutofacServiceProvider(container.Build());
} }
private IRabbitMQPersistentConnection GenerateConnection(String vHost, IServiceProvider sp)
{
var logger = sp.GetRequiredService<ILogger<DefaultRabbitMQPersistentConnection>>();
var factory = new ConnectionFactory()
{
HostName = Configuration["EventBusConnection"],
DispatchConsumersAsync = true
};
if (!string.IsNullOrEmpty(Configuration["EventBusUserName"]))
{
factory.UserName = Configuration["EventBusUserName"];
}
if (!string.IsNullOrEmpty(Configuration["EventBusPassword"]))
{
factory.Password = Configuration["EventBusPassword"];
}
factory.VirtualHost = vHost;
var retryCount = 5;
if (!string.IsNullOrEmpty(Configuration["EventBusRetryCount"]))
{
retryCount = int.Parse(Configuration["EventBusRetryCount"]);
}
return new DefaultRabbitMQPersistentConnection(factory, logger, retryCount);
}
// This method gets called by the runtime. Use this method to configure the HTTP request pipeline. // This method gets called by the runtime. Use this method to configure the HTTP request pipeline.
public void Configure(IApplicationBuilder app, IHostingEnvironment env, ILoggerFactory loggerFactory) public void Configure(IApplicationBuilder app, IHostingEnvironment env, ILoggerFactory loggerFactory)
{ {
@ -183,12 +176,14 @@ namespace Microsoft.eShopOnContainers.Services.Locations.API
app.UseMvcWithDefaultRoute(); app.UseMvcWithDefaultRoute();
app.UseSwagger() app.UseSwagger()
.UseSwaggerUI(c =>
{
c.SwaggerEndpoint($"{ (!string.IsNullOrEmpty(pathBase) ? pathBase : string.Empty) }/swagger/v1/swagger.json", "Locations.API V1");
c.OAuthClientId("locationsswaggerui");
c.OAuthAppName("Locations Swagger UI");
});
.UseSwaggerUI(c =>
{
c.SwaggerEndpoint(
$"{(!string.IsNullOrEmpty(pathBase) ? pathBase : string.Empty)}/swagger/v1/swagger.json",
"Locations.API V1");
c.OAuthClientId("locationsswaggerui");
c.OAuthAppName("Locations Swagger UI");
});
LocationsContextSeed.SeedAsync(app, loggerFactory) LocationsContextSeed.SeedAsync(app, loggerFactory)
.Wait(); .Wait();
@ -204,6 +199,7 @@ namespace Microsoft.eShopOnContainers.Services.Locations.API
// Enable K8s telemetry initializer // Enable K8s telemetry initializer
services.AddApplicationInsightsKubernetesEnricher(); services.AddApplicationInsightsKubernetesEnricher();
} }
if (orchestratorType?.ToUpper() == "SF") if (orchestratorType?.ToUpper() == "SF")
{ {
// Enable SF telemetry initializer // Enable SF telemetry initializer
@ -218,16 +214,16 @@ namespace Microsoft.eShopOnContainers.Services.Locations.API
JwtSecurityTokenHandler.DefaultInboundClaimTypeMap.Clear(); JwtSecurityTokenHandler.DefaultInboundClaimTypeMap.Clear();
services.AddAuthentication(options => services.AddAuthentication(options =>
{
options.DefaultAuthenticateScheme = JwtBearerDefaults.AuthenticationScheme;
options.DefaultChallengeScheme = JwtBearerDefaults.AuthenticationScheme;
})
.AddJwtBearer(options =>
{
options.Authority = Configuration.GetValue<string>("IdentityUrl");
options.Audience = "locations";
options.RequireHttpsMetadata = false;
});
{
options.DefaultAuthenticateScheme = JwtBearerDefaults.AuthenticationScheme;
options.DefaultChallengeScheme = JwtBearerDefaults.AuthenticationScheme;
})
.AddJwtBearer(options =>
{
options.Authority = Configuration.GetValue<string>("IdentityUrl");
options.Audience = "locations";
options.RequireHttpsMetadata = false;
});
} }
protected virtual void ConfigureAuth(IApplicationBuilder app) protected virtual void ConfigureAuth(IApplicationBuilder app)
@ -244,37 +240,31 @@ namespace Microsoft.eShopOnContainers.Services.Locations.API
{ {
var subscriptionClientName = Configuration["SubscriptionClientName"]; var subscriptionClientName = Configuration["SubscriptionClientName"];
if (Configuration.GetValue<bool>("AzureServiceBusEnabled"))
services.AddSingleton<IMultiEventBus, MultiEventBusRabbitMQ>(sp =>
{ {
services.AddSingleton<IEventBus, EventBusServiceBus>(sp =>
{
var serviceBusPersisterConnection = sp.GetRequiredService<IServiceBusPersisterConnection>();
var iLifetimeScope = sp.GetRequiredService<ILifetimeScope>();
var logger = sp.GetRequiredService<ILogger<EventBusServiceBus>>();
var eventBusSubcriptionsManager = sp.GetRequiredService<IEventBusSubscriptionsManager>();
var multiRabbitMqPersistentConnections = sp.GetRequiredService<IMultiRabbitMQPersistentConnections>();
var iLifetimeScope = sp.GetRequiredService<ILifetimeScope>();
var logger = sp.GetRequiredService<ILogger<EventBusRabbitMQ>>();
var eventBusSubcriptionsManager = sp.GetRequiredService<IEventBusSubscriptionsManager>();
return new EventBusServiceBus(serviceBusPersisterConnection, logger,
eventBusSubcriptionsManager, subscriptionClientName, iLifetimeScope);
});
}
else
{
services.AddSingleton<IEventBus, EventBusRabbitMQ>(sp =>
var retryCount = 5;
if (!string.IsNullOrEmpty(Configuration["EventBusRetryCount"]))
{ {
var rabbitMQPersistentConnection = sp.GetRequiredService<IRabbitMQPersistentConnection>();
var iLifetimeScope = sp.GetRequiredService<ILifetimeScope>();
var logger = sp.GetRequiredService<ILogger<EventBusRabbitMQ>>();
var eventBusSubcriptionsManager = sp.GetRequiredService<IEventBusSubscriptionsManager>();
retryCount = int.Parse(Configuration["EventBusRetryCount"]);
}
var retryCount = 5;
if (!string.IsNullOrEmpty(Configuration["EventBusRetryCount"]))
{
retryCount = int.Parse(Configuration["EventBusRetryCount"]);
}
List<IEventBus> eventBuses = new List<IEventBus>();
return new EventBusRabbitMQ(rabbitMQPersistentConnection, logger, iLifetimeScope, eventBusSubcriptionsManager, subscriptionClientName, retryCount);
});
}
eventBuses.Add(new EventBusRabbitMQ(multiRabbitMqPersistentConnections.GetConnections()[0], logger,
iLifetimeScope, eventBusSubcriptionsManager, "TenantA", subscriptionClientName, retryCount));
eventBuses.Add(new EventBusRabbitMQ(multiRabbitMqPersistentConnections.GetConnections()[1], logger,
iLifetimeScope, eventBusSubcriptionsManager, "TenantB", subscriptionClientName, retryCount));
Dictionary<int, String> tenants = new Dictionary<int, string>();
tenants.Add(1, "TenantA");
tenants.Add(2, "TenantB");
return new MultiEventBusRabbitMQ(eventBuses, tenants);
});
services.AddSingleton<IEventBusSubscriptionsManager, InMemoryEventBusSubscriptionsManager>(); services.AddSingleton<IEventBusSubscriptionsManager, InMemoryEventBusSubscriptionsManager>();
} }
@ -282,7 +272,8 @@ namespace Microsoft.eShopOnContainers.Services.Locations.API
public static class CustomExtensionMethods public static class CustomExtensionMethods
{ {
public static IServiceCollection AddCustomHealthCheck(this IServiceCollection services, IConfiguration configuration)
public static IServiceCollection AddCustomHealthCheck(this IServiceCollection services,
IConfiguration configuration)
{ {
var hcBuilder = services.AddHealthChecks(); var hcBuilder = services.AddHealthChecks();
@ -292,7 +283,7 @@ namespace Microsoft.eShopOnContainers.Services.Locations.API
.AddMongoDb( .AddMongoDb(
configuration["ConnectionString"], configuration["ConnectionString"],
name: "locations-mongodb-check", name: "locations-mongodb-check",
tags: new string[] { "mongodb" });
tags: new string[] {"mongodb"});
if (configuration.GetValue<bool>("AzureServiceBusEnabled")) if (configuration.GetValue<bool>("AzureServiceBusEnabled"))
{ {
@ -301,7 +292,7 @@ namespace Microsoft.eShopOnContainers.Services.Locations.API
configuration["EventBusConnection"], configuration["EventBusConnection"],
topicName: "eshop_event_bus", topicName: "eshop_event_bus",
name: "locations-servicebus-check", name: "locations-servicebus-check",
tags: new string[] { "servicebus" });
tags: new string[] {"servicebus"});
} }
else else
{ {
@ -309,10 +300,10 @@ namespace Microsoft.eShopOnContainers.Services.Locations.API
.AddRabbitMQ( .AddRabbitMQ(
$"amqp://{configuration["EventBusConnection"]}", $"amqp://{configuration["EventBusConnection"]}",
name: "locations-rabbitmqbus-check", name: "locations-rabbitmqbus-check",
tags: new string[] { "rabbitmqbus" });
tags: new string[] {"rabbitmqbus"});
} }
return services; return services;
} }
} }
}
}

+ 54
- 61
src/Services/Marketing/Marketing.API/Startup.cs View File

@ -80,50 +80,17 @@
options.ConfigureWarnings(warnings => warnings.Throw(RelationalEventId.QueryClientEvaluationWarning)); options.ConfigureWarnings(warnings => warnings.Throw(RelationalEventId.QueryClientEvaluationWarning));
//Check Client vs. Server evaluation: https://docs.microsoft.com/en-us/ef/core/querying/client-eval //Check Client vs. Server evaluation: https://docs.microsoft.com/en-us/ef/core/querying/client-eval
}); });
if (Configuration.GetValue<bool>("AzureServiceBusEnabled"))
services.AddSingleton<IMultiRabbitMQPersistentConnections>(sp =>
{ {
services.AddSingleton<IServiceBusPersisterConnection>(sp =>
{
var logger = sp.GetRequiredService<ILogger<DefaultServiceBusPersisterConnection>>();
IMultiRabbitMQPersistentConnections connections = new MultiRabbitMQPersistentConnections();
connections.AddConnection(GenerateConnection("TenantA", sp));
connections.AddConnection(GenerateConnection("TenantB", sp));
var serviceBusConnectionString = Configuration["EventBusConnection"];
var serviceBusConnection = new ServiceBusConnectionStringBuilder(serviceBusConnectionString);
return new DefaultServiceBusPersisterConnection(serviceBusConnection, logger);
});
}
else
{
services.AddSingleton<IRabbitMQPersistentConnection>(sp =>
{
var logger = sp.GetRequiredService<ILogger<DefaultRabbitMQPersistentConnection>>();
var factory = new ConnectionFactory()
{
HostName = Configuration["EventBusConnection"],
DispatchConsumersAsync = true
};
if (!string.IsNullOrEmpty(Configuration["EventBusUserName"]))
{
factory.UserName = Configuration["EventBusUserName"];
}
if (!string.IsNullOrEmpty(Configuration["EventBusPassword"]))
{
factory.Password = Configuration["EventBusPassword"];
}
var retryCount = 5;
if (!string.IsNullOrEmpty(Configuration["EventBusRetryCount"]))
{
retryCount = int.Parse(Configuration["EventBusRetryCount"]);
}
return new DefaultRabbitMQPersistentConnection(factory, logger, retryCount);
});
}
return connections;
});
// Add framework services. // Add framework services.
services.AddSwaggerGen(options => services.AddSwaggerGen(options =>
@ -257,25 +224,10 @@
private void RegisterEventBus(IServiceCollection services) private void RegisterEventBus(IServiceCollection services)
{ {
var subscriptionClientName = Configuration["SubscriptionClientName"]; var subscriptionClientName = Configuration["SubscriptionClientName"];
if (Configuration.GetValue<bool>("AzureServiceBusEnabled"))
{ {
services.AddSingleton<IEventBus, EventBusServiceBus>(sp =>
services.AddSingleton<IMultiEventBus, MultiEventBusRabbitMQ>(sp =>
{ {
var serviceBusPersisterConnection = sp.GetRequiredService<IServiceBusPersisterConnection>();
var iLifetimeScope = sp.GetRequiredService<ILifetimeScope>();
var logger = sp.GetRequiredService<ILogger<EventBusServiceBus>>();
var eventBusSubcriptionsManager = sp.GetRequiredService<IEventBusSubscriptionsManager>();
return new EventBusServiceBus(serviceBusPersisterConnection, logger,
eventBusSubcriptionsManager, subscriptionClientName, iLifetimeScope);
});
}
else
{
services.AddSingleton<IEventBus, EventBusRabbitMQ>(sp =>
{
var rabbitMQPersistentConnection = sp.GetRequiredService<IRabbitMQPersistentConnection>();
var multiRabbitMqPersistentConnections = sp.GetRequiredService<IMultiRabbitMQPersistentConnections>();
var iLifetimeScope = sp.GetRequiredService<ILifetimeScope>(); var iLifetimeScope = sp.GetRequiredService<ILifetimeScope>();
var logger = sp.GetRequiredService<ILogger<EventBusRabbitMQ>>(); var logger = sp.GetRequiredService<ILogger<EventBusRabbitMQ>>();
var eventBusSubcriptionsManager = sp.GetRequiredService<IEventBusSubscriptionsManager>(); var eventBusSubcriptionsManager = sp.GetRequiredService<IEventBusSubscriptionsManager>();
@ -286,17 +238,58 @@
retryCount = int.Parse(Configuration["EventBusRetryCount"]); retryCount = int.Parse(Configuration["EventBusRetryCount"]);
} }
return new EventBusRabbitMQ(rabbitMQPersistentConnection, logger, iLifetimeScope, eventBusSubcriptionsManager, subscriptionClientName, retryCount);
List<IEventBus> eventBuses = new List<IEventBus>();
eventBuses.Add(new EventBusRabbitMQ(multiRabbitMqPersistentConnections.GetConnections()[0], logger,
iLifetimeScope, eventBusSubcriptionsManager, "TenantA", subscriptionClientName, retryCount));
eventBuses.Add(new EventBusRabbitMQ(multiRabbitMqPersistentConnections.GetConnections()[1], logger,
iLifetimeScope, eventBusSubcriptionsManager, "TenantB", subscriptionClientName, retryCount));
Dictionary<int, String> tenants = new Dictionary<int, string>();
tenants.Add(1, "TenantA");
tenants.Add(2, "TenantB");
return new MultiEventBusRabbitMQ(eventBuses, tenants);
}); });
} }
services.AddSingleton<IEventBusSubscriptionsManager, InMemoryEventBusSubscriptionsManager>(); services.AddSingleton<IEventBusSubscriptionsManager, InMemoryEventBusSubscriptionsManager>();
services.AddTransient<UserLocationUpdatedIntegrationEventHandler>(); services.AddTransient<UserLocationUpdatedIntegrationEventHandler>();
} }
private IRabbitMQPersistentConnection GenerateConnection(String vHost, IServiceProvider sp)
{
var logger = sp.GetRequiredService<ILogger<DefaultRabbitMQPersistentConnection>>();
var factory = new ConnectionFactory()
{
HostName = Configuration["EventBusConnection"],
DispatchConsumersAsync = true
};
if (!string.IsNullOrEmpty(Configuration["EventBusUserName"]))
{
factory.UserName = Configuration["EventBusUserName"];
}
if (!string.IsNullOrEmpty(Configuration["EventBusPassword"]))
{
factory.Password = Configuration["EventBusPassword"];
}
factory.VirtualHost = vHost;
var retryCount = 5;
if (!string.IsNullOrEmpty(Configuration["EventBusRetryCount"]))
{
retryCount = int.Parse(Configuration["EventBusRetryCount"]);
}
return new DefaultRabbitMQPersistentConnection(factory, logger, retryCount);
}
private void ConfigureEventBus(IApplicationBuilder app) private void ConfigureEventBus(IApplicationBuilder app)
{ {
var eventBus = app.ApplicationServices.GetRequiredService<IEventBus>();
var eventBus = app.ApplicationServices.GetRequiredService<IMultiEventBus>();
eventBus.Subscribe<UserLocationUpdatedIntegrationEvent, UserLocationUpdatedIntegrationEventHandler>(); eventBus.Subscribe<UserLocationUpdatedIntegrationEvent, UserLocationUpdatedIntegrationEventHandler>();
} }


+ 4
- 1
src/Services/Ordering/Ordering.API/Application/Commands/SetStockConfirmedOrderStatusCommand.cs View File

@ -12,10 +12,13 @@ namespace Ordering.API.Application.Commands
[DataMember] [DataMember]
public int OrderNumber { get; private set; } public int OrderNumber { get; private set; }
[DataMember]
public int TenantId { get; private set; }
public SetStockConfirmedOrderStatusCommand(int orderNumber)
public SetStockConfirmedOrderStatusCommand(int orderNumber, int tenantId)
{ {
OrderNumber = orderNumber; OrderNumber = orderNumber;
TenantId = tenantId;
} }
} }
} }

+ 1
- 1
src/Services/Ordering/Ordering.API/Application/Commands/SetStockConfirmedOrderStatusCommandHandler.cs View File

@ -35,7 +35,7 @@ namespace Ordering.API.Application.Commands
return false; return false;
} }
orderToUpdate.SetStockConfirmedStatus();
orderToUpdate.SetStockConfirmedStatus(command.TenantId);
return await _orderRepository.UnitOfWork.SaveEntitiesAsync(cancellationToken); return await _orderRepository.UnitOfWork.SaveEntitiesAsync(cancellationToken);
} }
} }


+ 2
- 0
src/Services/Ordering/Ordering.API/Application/DomainEventHandlers/OrderStockConfirmed/OrderStatusChangedToStockConfirmedDomainEventHandler.cs View File

@ -41,6 +41,8 @@
var buyer = await _buyerRepository.FindByIdAsync(order.GetBuyerId.Value.ToString()); var buyer = await _buyerRepository.FindByIdAsync(order.GetBuyerId.Value.ToString());
var orderStatusChangedToStockConfirmedIntegrationEvent = new OrderStatusChangedToStockConfirmedIntegrationEvent(order.Id, order.OrderStatus.Name, buyer.Name); var orderStatusChangedToStockConfirmedIntegrationEvent = new OrderStatusChangedToStockConfirmedIntegrationEvent(order.Id, order.OrderStatus.Name, buyer.Name);
orderStatusChangedToStockConfirmedIntegrationEvent.TenantId =
orderStatusChangedToStockConfirmedDomainEvent.TenantId;
await _orderingIntegrationEventService.AddAndSaveEventAsync(orderStatusChangedToStockConfirmedIntegrationEvent); await _orderingIntegrationEventService.AddAndSaveEventAsync(orderStatusChangedToStockConfirmedIntegrationEvent);
} }
} }

+ 1
- 1
src/Services/Ordering/Ordering.API/Application/IntegrationEvents/EventHandling/OrderStockConfirmedIntegrationEventHandler.cs View File

@ -33,7 +33,7 @@
{ {
_logger.LogInformation("----- Handling integration event: {IntegrationEventId} at {AppName} - ({@IntegrationEvent})", @event.Id, Program.AppName, @event); _logger.LogInformation("----- Handling integration event: {IntegrationEventId} at {AppName} - ({@IntegrationEvent})", @event.Id, Program.AppName, @event);
var command = new SetStockConfirmedOrderStatusCommand(@event.OrderId);
var command = new SetStockConfirmedOrderStatusCommand(@event.OrderId, @event.TenantId);
_logger.LogInformation( _logger.LogInformation(
"----- Sending command: {CommandName} - {IdProperty}: {CommandId} ({@Command})", "----- Sending command: {CommandName} - {IdProperty}: {CommandId} ({@Command})",


+ 2
- 2
src/Services/Ordering/Ordering.API/Application/IntegrationEvents/EventHandling/UserCheckoutAcceptedIntegrationEventHandler.cs View File

@ -15,12 +15,12 @@ namespace Ordering.API.Application.IntegrationEvents.EventHandling
public class UserCheckoutAcceptedIntegrationEventHandler : IIntegrationEventHandler<UserCheckoutAcceptedIntegrationEvent> public class UserCheckoutAcceptedIntegrationEventHandler : IIntegrationEventHandler<UserCheckoutAcceptedIntegrationEvent>
{ {
private readonly IMediator _mediator; private readonly IMediator _mediator;
private readonly IEventBus _eventBus;
private readonly IMultiEventBus _eventBus;
private readonly ILogger<UserCheckoutAcceptedIntegrationEventHandler> _logger; private readonly ILogger<UserCheckoutAcceptedIntegrationEventHandler> _logger;
public UserCheckoutAcceptedIntegrationEventHandler( public UserCheckoutAcceptedIntegrationEventHandler(
IMediator mediator, IMediator mediator,
ILogger<UserCheckoutAcceptedIntegrationEventHandler> logger, IEventBus eventBus)
ILogger<UserCheckoutAcceptedIntegrationEventHandler> logger, IMultiEventBus eventBus)
{ {
_mediator = mediator ?? throw new ArgumentNullException(nameof(mediator)); _mediator = mediator ?? throw new ArgumentNullException(nameof(mediator));
_logger = logger ?? throw new ArgumentNullException(nameof(logger)); _logger = logger ?? throw new ArgumentNullException(nameof(logger));


+ 2
- 2
src/Services/Ordering/Ordering.API/Application/IntegrationEvents/OrderingIntegrationEventService.cs View File

@ -19,13 +19,13 @@ namespace Ordering.API.Application.IntegrationEvents
public class OrderingIntegrationEventService : IOrderingIntegrationEventService public class OrderingIntegrationEventService : IOrderingIntegrationEventService
{ {
private readonly Func<DbConnection, IIntegrationEventLogService> _integrationEventLogServiceFactory; private readonly Func<DbConnection, IIntegrationEventLogService> _integrationEventLogServiceFactory;
private readonly IEventBus _eventBus;
private readonly IMultiEventBus _eventBus;
private readonly OrderingContext _orderingContext; private readonly OrderingContext _orderingContext;
private readonly IntegrationEventLogContext _eventLogContext; private readonly IntegrationEventLogContext _eventLogContext;
private readonly IIntegrationEventLogService _eventLogService; private readonly IIntegrationEventLogService _eventLogService;
private readonly ILogger<OrderingIntegrationEventService> _logger; private readonly ILogger<OrderingIntegrationEventService> _logger;
public OrderingIntegrationEventService(IEventBus eventBus,
public OrderingIntegrationEventService(IMultiEventBus eventBus,
OrderingContext orderingContext, OrderingContext orderingContext,
IntegrationEventLogContext eventLogContext, IntegrationEventLogContext eventLogContext,
Func<DbConnection, IIntegrationEventLogService> integrationEventLogServiceFactory, Func<DbConnection, IIntegrationEventLogService> integrationEventLogServiceFactory,


+ 46
- 54
src/Services/Ordering/Ordering.API/Startup.cs View File

@ -115,7 +115,7 @@
private void ConfigureEventBus(IApplicationBuilder app) private void ConfigureEventBus(IApplicationBuilder app)
{ {
var eventBus = app.ApplicationServices.GetRequiredService<BuildingBlocks.EventBus.Abstractions.IEventBus>();
var eventBus = app.ApplicationServices.GetRequiredService<IMultiEventBus>();
eventBus.Subscribe<UserCheckoutAcceptedIntegrationEvent, IIntegrationEventHandler<UserCheckoutAcceptedIntegrationEvent>>(); eventBus.Subscribe<UserCheckoutAcceptedIntegrationEvent, IIntegrationEventHandler<UserCheckoutAcceptedIntegrationEvent>>();
eventBus.Subscribe<GracePeriodConfirmedIntegrationEvent, IIntegrationEventHandler<GracePeriodConfirmedIntegrationEvent>>(); eventBus.Subscribe<GracePeriodConfirmedIntegrationEvent, IIntegrationEventHandler<GracePeriodConfirmedIntegrationEvent>>();
@ -284,52 +284,47 @@
services.AddTransient<IOrderingIntegrationEventService, OrderingIntegrationEventService>(); services.AddTransient<IOrderingIntegrationEventService, OrderingIntegrationEventService>();
if (configuration.GetValue<bool>("AzureServiceBusEnabled"))
services.AddSingleton<IMultiRabbitMQPersistentConnections>(sp =>
{ {
services.AddSingleton<IServiceBusPersisterConnection>(sp =>
{
var logger = sp.GetRequiredService<ILogger<DefaultServiceBusPersisterConnection>>();
var serviceBusConnectionString = configuration["EventBusConnection"];
var serviceBusConnection = new ServiceBusConnectionStringBuilder(serviceBusConnectionString);
IMultiRabbitMQPersistentConnections connections = new MultiRabbitMQPersistentConnections();
connections.AddConnection(GenerateConnection("TenantA", sp, configuration));
connections.AddConnection(GenerateConnection("TenantB", sp, configuration));
return new DefaultServiceBusPersisterConnection(serviceBusConnection, logger);
});
}
else
{
services.AddSingleton<IRabbitMQPersistentConnection>(sp =>
{
var logger = sp.GetRequiredService<ILogger<DefaultRabbitMQPersistentConnection>>();
return connections;
});
return services;
}
private static IRabbitMQPersistentConnection GenerateConnection(String vHost, IServiceProvider sp, IConfiguration configuration)
{
var logger = sp.GetRequiredService<ILogger<DefaultRabbitMQPersistentConnection>>();
var factory = new ConnectionFactory()
{
HostName = configuration["EventBusConnection"],
DispatchConsumersAsync = true
};
var factory = new ConnectionFactory()
{
HostName = configuration["EventBusConnection"],
DispatchConsumersAsync = true
};
if (!string.IsNullOrEmpty(configuration["EventBusUserName"]))
{
factory.UserName = configuration["EventBusUserName"];
}
if (!string.IsNullOrEmpty(configuration["EventBusUserName"]))
{
factory.UserName = configuration["EventBusUserName"];
}
if (!string.IsNullOrEmpty(configuration["EventBusPassword"]))
{
factory.Password = configuration["EventBusPassword"];
}
if (!string.IsNullOrEmpty(configuration["EventBusPassword"]))
{
factory.Password = configuration["EventBusPassword"];
}
var retryCount = 5;
if (!string.IsNullOrEmpty(configuration["EventBusRetryCount"]))
{
retryCount = int.Parse(configuration["EventBusRetryCount"]);
}
factory.VirtualHost = vHost;
return new DefaultRabbitMQPersistentConnection(factory, logger, retryCount);
});
var retryCount = 5;
if (!string.IsNullOrEmpty(configuration["EventBusRetryCount"]))
{
retryCount = int.Parse(configuration["EventBusRetryCount"]);
} }
return services;
return new DefaultRabbitMQPersistentConnection(factory, logger, retryCount);
} }
public static IServiceCollection AddCustomConfiguration(this IServiceCollection services, IConfiguration configuration) public static IServiceCollection AddCustomConfiguration(this IServiceCollection services, IConfiguration configuration)
@ -360,25 +355,12 @@
public static IServiceCollection AddEventBus(this IServiceCollection services, IConfiguration configuration) public static IServiceCollection AddEventBus(this IServiceCollection services, IConfiguration configuration)
{ {
var subscriptionClientName = configuration["SubscriptionClientName"]; var subscriptionClientName = configuration["SubscriptionClientName"];
if (configuration.GetValue<bool>("AzureServiceBusEnabled"))
{ {
services.AddSingleton<IEventBus, EventBusServiceBus>(sp =>
{
var serviceBusPersisterConnection = sp.GetRequiredService<IServiceBusPersisterConnection>();
var iLifetimeScope = sp.GetRequiredService<ILifetimeScope>();
var logger = sp.GetRequiredService<ILogger<EventBusServiceBus>>();
var eventBusSubcriptionsManager = sp.GetRequiredService<IEventBusSubscriptionsManager>();
return new EventBusServiceBus(serviceBusPersisterConnection, logger,
eventBusSubcriptionsManager, subscriptionClientName, iLifetimeScope);
});
}
else
{
services.AddSingleton<IEventBus, EventBusRabbitMQ>(sp =>
services.AddSingleton<IMultiEventBus, MultiEventBusRabbitMQ>(sp =>
{ {
var rabbitMQPersistentConnection = sp.GetRequiredService<IRabbitMQPersistentConnection>();
var multiRabbitMqPersistentConnections = sp.GetRequiredService<IMultiRabbitMQPersistentConnections>();
var iLifetimeScope = sp.GetRequiredService<ILifetimeScope>(); var iLifetimeScope = sp.GetRequiredService<ILifetimeScope>();
var logger = sp.GetRequiredService<ILogger<EventBusRabbitMQ>>(); var logger = sp.GetRequiredService<ILogger<EventBusRabbitMQ>>();
var eventBusSubcriptionsManager = sp.GetRequiredService<IEventBusSubscriptionsManager>(); var eventBusSubcriptionsManager = sp.GetRequiredService<IEventBusSubscriptionsManager>();
@ -389,7 +371,17 @@
retryCount = int.Parse(configuration["EventBusRetryCount"]); retryCount = int.Parse(configuration["EventBusRetryCount"]);
} }
return new EventBusRabbitMQ(rabbitMQPersistentConnection, logger, iLifetimeScope, eventBusSubcriptionsManager, subscriptionClientName, retryCount);
List<IEventBus> eventBuses = new List<IEventBus>();
eventBuses.Add(new EventBusRabbitMQ(multiRabbitMqPersistentConnections.GetConnections()[0], logger,
iLifetimeScope, eventBusSubcriptionsManager, "TenantA", subscriptionClientName, retryCount));
eventBuses.Add(new EventBusRabbitMQ(multiRabbitMqPersistentConnections.GetConnections()[1], logger,
iLifetimeScope, eventBusSubcriptionsManager, "TenantB", subscriptionClientName, retryCount));
Dictionary<int, String> tenants = new Dictionary<int, string>();
tenants.Add(1, "TenantA");
tenants.Add(2, "TenantB");
return new MultiEventBusRabbitMQ(eventBuses, tenants);
}); });
} }


+ 2
- 2
src/Services/Ordering/Ordering.API/appsettings.json View File

@ -6,10 +6,10 @@
"SeqServerUrl": null, "SeqServerUrl": null,
"LogstashgUrl": null, "LogstashgUrl": null,
"MinimumLevel": { "MinimumLevel": {
"Default": "Information",
"Default": "Verbose",
"Override": { "Override": {
"Microsoft": "Warning", "Microsoft": "Warning",
"Microsoft.eShopOnContainers": "Information",
"Microsoft.eShopOnContainers": "Verbose",
"System": "Warning" "System": "Warning"
} }
} }


+ 62
- 70
src/Services/Ordering/Ordering.BackgroundTasks/Startup.cs View File

@ -15,6 +15,7 @@ using Ordering.BackgroundTasks.Configuration;
using Ordering.BackgroundTasks.Tasks; using Ordering.BackgroundTasks.Tasks;
using RabbitMQ.Client; using RabbitMQ.Client;
using System; using System;
using System.Collections.Generic;
using HealthChecks.UI.Client; using HealthChecks.UI.Client;
using Microsoft.AspNetCore.Diagnostics.HealthChecks; using Microsoft.AspNetCore.Diagnostics.HealthChecks;
using Microsoft.Extensions.Diagnostics.HealthChecks; using Microsoft.Extensions.Diagnostics.HealthChecks;
@ -46,60 +47,56 @@ namespace Ordering.BackgroundTasks
services.AddSingleton<IHostedService, GracePeriodManagerService>(); services.AddSingleton<IHostedService, GracePeriodManagerService>();
//configure event bus related services //configure event bus related services
if (Configuration.GetValue<bool>("AzureServiceBusEnabled"))
services.AddSingleton<IMultiRabbitMQPersistentConnections>(sp =>
{ {
services.AddSingleton<IServiceBusPersisterConnection>(sp =>
{
var logger = sp.GetRequiredService<ILogger<DefaultServiceBusPersisterConnection>>();
IMultiRabbitMQPersistentConnections connections = new MultiRabbitMQPersistentConnections();
connections.AddConnection(GenerateConnection("TenantA", sp));
connections.AddConnection(GenerateConnection("TenantB", sp));
var serviceBusConnectionString = Configuration["EventBusConnection"];
var serviceBusConnection = new ServiceBusConnectionStringBuilder(serviceBusConnectionString);
return connections;
});
return new DefaultServiceBusPersisterConnection(serviceBusConnection, logger);
});
}
else
{
services.AddSingleton<IRabbitMQPersistentConnection>(sp =>
{
var logger = sp.GetRequiredService<ILogger<DefaultRabbitMQPersistentConnection>>();
RegisterEventBus(services);
//create autofac based service provider
var container = new ContainerBuilder();
container.Populate(services);
var factory = new ConnectionFactory()
{
HostName = Configuration["EventBusConnection"],
DispatchConsumersAsync = true
};
if (!string.IsNullOrEmpty(Configuration["EventBusUserName"]))
{
factory.UserName = Configuration["EventBusUserName"];
}
return new AutofacServiceProvider(container.Build());
}
if (!string.IsNullOrEmpty(Configuration["EventBusPassword"]))
{
factory.Password = Configuration["EventBusPassword"];
}
private IRabbitMQPersistentConnection GenerateConnection(String vHost, IServiceProvider sp)
{
var logger = sp.GetRequiredService<ILogger<DefaultRabbitMQPersistentConnection>>();
var retryCount = 5;
if (!string.IsNullOrEmpty(Configuration["EventBusRetryCount"]))
{
retryCount = int.Parse(Configuration["EventBusRetryCount"]);
}
var factory = new ConnectionFactory()
{
HostName = Configuration["EventBusConnection"],
DispatchConsumersAsync = true
};
return new DefaultRabbitMQPersistentConnection(factory, logger, retryCount);
});
if (!string.IsNullOrEmpty(Configuration["EventBusUserName"]))
{
factory.UserName = Configuration["EventBusUserName"];
} }
RegisterEventBus(services);
if (!string.IsNullOrEmpty(Configuration["EventBusPassword"]))
{
factory.Password = Configuration["EventBusPassword"];
}
//create autofac based service provider
var container = new ContainerBuilder();
container.Populate(services);
factory.VirtualHost = vHost;
var retryCount = 5;
if (!string.IsNullOrEmpty(Configuration["EventBusRetryCount"]))
{
retryCount = int.Parse(Configuration["EventBusRetryCount"]);
}
return new AutofacServiceProvider(container.Build());
return new DefaultRabbitMQPersistentConnection(factory, logger, retryCount);
} }
@ -122,38 +119,33 @@ namespace Ordering.BackgroundTasks
private void RegisterEventBus(IServiceCollection services) private void RegisterEventBus(IServiceCollection services)
{ {
var subscriptionClientName = Configuration["SubscriptionClientName"]; var subscriptionClientName = Configuration["SubscriptionClientName"];
if (Configuration.GetValue<bool>("AzureServiceBusEnabled"))
{
services.AddSingleton<IEventBus, EventBusServiceBus>(sp =>
{
var serviceBusPersisterConnection = sp.GetRequiredService<IServiceBusPersisterConnection>();
var iLifetimeScope = sp.GetRequiredService<ILifetimeScope>();
var logger = sp.GetRequiredService<ILogger<EventBusServiceBus>>();
var eventBusSubcriptionsManager = sp.GetRequiredService<IEventBusSubscriptionsManager>();
return new EventBusServiceBus(serviceBusPersisterConnection, logger,
eventBusSubcriptionsManager, subscriptionClientName, iLifetimeScope);
});
}
else
services.AddSingleton<IMultiEventBus, MultiEventBusRabbitMQ>(sp =>
{ {
services.AddSingleton<IEventBus, EventBusRabbitMQ>(sp =>
var multiRabbitMqPersistentConnections = sp.GetRequiredService<IMultiRabbitMQPersistentConnections>();
var iLifetimeScope = sp.GetRequiredService<ILifetimeScope>();
var logger = sp.GetRequiredService<ILogger<EventBusRabbitMQ>>();
var eventBusSubcriptionsManager = sp.GetRequiredService<IEventBusSubscriptionsManager>();
var retryCount = 5;
if (!string.IsNullOrEmpty(Configuration["EventBusRetryCount"]))
{ {
var rabbitMQPersistentConnection = sp.GetRequiredService<IRabbitMQPersistentConnection>();
var iLifetimeScope = sp.GetRequiredService<ILifetimeScope>();
var logger = sp.GetRequiredService<ILogger<EventBusRabbitMQ>>();
var eventBusSubcriptionsManager = sp.GetRequiredService<IEventBusSubscriptionsManager>();
var retryCount = 5;
if (!string.IsNullOrEmpty(Configuration["EventBusRetryCount"]))
{
retryCount = int.Parse(Configuration["EventBusRetryCount"]);
}
return new EventBusRabbitMQ(rabbitMQPersistentConnection, logger, iLifetimeScope, eventBusSubcriptionsManager, subscriptionClientName, retryCount);
});
}
retryCount = int.Parse(Configuration["EventBusRetryCount"]);
}
List<IEventBus> eventBuses = new List<IEventBus>();
eventBuses.Add(new EventBusRabbitMQ(multiRabbitMqPersistentConnections.GetConnections()[0], logger,
iLifetimeScope, eventBusSubcriptionsManager, "TenantA", subscriptionClientName, retryCount));
eventBuses.Add(new EventBusRabbitMQ(multiRabbitMqPersistentConnections.GetConnections()[1], logger,
iLifetimeScope, eventBusSubcriptionsManager, "TenantB", subscriptionClientName, retryCount));
Dictionary<int, String> tenants = new Dictionary<int, string>();
tenants.Add(1, "TenantA");
tenants.Add(2, "TenantB");
return new MultiEventBusRabbitMQ(eventBuses, tenants);
});
services.AddSingleton<IEventBusSubscriptionsManager, InMemoryEventBusSubscriptionsManager>(); services.AddSingleton<IEventBusSubscriptionsManager, InMemoryEventBusSubscriptionsManager>();
} }


+ 2
- 2
src/Services/Ordering/Ordering.BackgroundTasks/Tasks/GracePeriodManagerTask.cs View File

@ -21,12 +21,12 @@ namespace Ordering.BackgroundTasks.Tasks
{ {
private readonly ILogger<GracePeriodManagerService> _logger; private readonly ILogger<GracePeriodManagerService> _logger;
private readonly BackgroundTaskSettings _settings; private readonly BackgroundTaskSettings _settings;
private readonly IEventBus _eventBus;
private readonly IMultiEventBus _eventBus;
private static readonly String identityUrl = @"http://identity.api/"; private static readonly String identityUrl = @"http://identity.api/";
public GracePeriodManagerService( public GracePeriodManagerService(
IOptions<BackgroundTaskSettings> settings, IOptions<BackgroundTaskSettings> settings,
IEventBus eventBus,
IMultiEventBus eventBus,
ILogger<GracePeriodManagerService> logger) ILogger<GracePeriodManagerService> logger)
{ {
_settings = settings?.Value ?? throw new ArgumentNullException(nameof(settings)); _settings = settings?.Value ?? throw new ArgumentNullException(nameof(settings));


+ 2
- 2
src/Services/Ordering/Ordering.Domain/AggregatesModel/OrderAggregate/Order.cs View File

@ -114,11 +114,11 @@ namespace Microsoft.eShopOnContainers.Services.Ordering.Domain.AggregatesModel.O
} }
} }
public void SetStockConfirmedStatus()
public void SetStockConfirmedStatus(int tenantId)
{ {
if (_orderStatusId == OrderStatus.AwaitingValidation.Id) if (_orderStatusId == OrderStatus.AwaitingValidation.Id)
{ {
AddDomainEvent(new OrderStatusChangedToStockConfirmedDomainEvent(Id));
AddDomainEvent(new OrderStatusChangedToStockConfirmedDomainEvent(Id).withTenantId(tenantId));
_orderStatusId = OrderStatus.StockConfirmed.Id; _orderStatusId = OrderStatus.StockConfirmed.Id;
_description = "All the items were confirmed with available stock."; _description = "All the items were confirmed with available stock.";


+ 7
- 0
src/Services/Ordering/Ordering.Domain/Events/OrderStatusChangedToStockConfirmedDomainEvent.cs View File

@ -9,8 +9,15 @@
: INotification : INotification
{ {
public int OrderId { get; } public int OrderId { get; }
public int TenantId { get; set; }
public OrderStatusChangedToStockConfirmedDomainEvent(int orderId) public OrderStatusChangedToStockConfirmedDomainEvent(int orderId)
=> OrderId = orderId; => OrderId = orderId;
public OrderStatusChangedToStockConfirmedDomainEvent withTenantId(int tenantId)
{
this.TenantId = tenantId;
return this;
}
} }
} }

+ 91
- 87
src/Services/Ordering/Ordering.SignalrHub/Startup.cs View File

@ -16,6 +16,7 @@ using Ordering.SignalrHub.IntegrationEvents.EventHandling;
using Ordering.SignalrHub.IntegrationEvents.Events; using Ordering.SignalrHub.IntegrationEvents.Events;
using RabbitMQ.Client; using RabbitMQ.Client;
using System; using System;
using System.Collections.Generic;
using System.IdentityModel.Tokens.Jwt; using System.IdentityModel.Tokens.Jwt;
using HealthChecks.UI.Client; using HealthChecks.UI.Client;
using Microsoft.AspNetCore.Diagnostics.HealthChecks; using Microsoft.AspNetCore.Diagnostics.HealthChecks;
@ -42,10 +43,10 @@ namespace Ordering.SignalrHub
{ {
options.AddPolicy("CorsPolicy", options.AddPolicy("CorsPolicy",
builder => builder builder => builder
.AllowAnyMethod()
.AllowAnyHeader()
.SetIsOriginAllowed((host) => true)
.AllowCredentials());
.AllowAnyMethod()
.AllowAnyHeader()
.SetIsOriginAllowed((host) => true)
.AllowCredentials());
}); });
if (Configuration.GetValue<string>("IsClusterEnv") == bool.TrueString) if (Configuration.GetValue<string>("IsClusterEnv") == bool.TrueString)
@ -59,50 +60,14 @@ namespace Ordering.SignalrHub
services.AddSignalR(); services.AddSignalR();
} }
if (Configuration.GetValue<bool>("AzureServiceBusEnabled"))
services.AddSingleton<IMultiRabbitMQPersistentConnections>(sp =>
{ {
services.AddSingleton<IServiceBusPersisterConnection>(sp =>
{
var logger = sp.GetRequiredService<ILogger<DefaultServiceBusPersisterConnection>>();
var serviceBusConnectionString = Configuration["EventBusConnection"];
var serviceBusConnection = new ServiceBusConnectionStringBuilder(serviceBusConnectionString);
return new DefaultServiceBusPersisterConnection(serviceBusConnection, logger);
});
}
else
{
services.AddSingleton<IRabbitMQPersistentConnection>(sp =>
{
var logger = sp.GetRequiredService<ILogger<DefaultRabbitMQPersistentConnection>>();
var factory = new ConnectionFactory()
{
HostName = Configuration["EventBusConnection"],
DispatchConsumersAsync = true
};
if (!string.IsNullOrEmpty(Configuration["EventBusUserName"]))
{
factory.UserName = Configuration["EventBusUserName"];
}
if (!string.IsNullOrEmpty(Configuration["EventBusPassword"]))
{
factory.Password = Configuration["EventBusPassword"];
}
IMultiRabbitMQPersistentConnections connections = new MultiRabbitMQPersistentConnections();
connections.AddConnection(GenerateConnection("TenantA", sp));
connections.AddConnection(GenerateConnection("TenantB", sp));
var retryCount = 5;
if (!string.IsNullOrEmpty(Configuration["EventBusRetryCount"]))
{
retryCount = int.Parse(Configuration["EventBusRetryCount"]);
}
return new DefaultRabbitMQPersistentConnection(factory, logger, retryCount);
});
}
return connections;
});
ConfigureAuthService(services); ConfigureAuthService(services);
@ -118,6 +83,38 @@ namespace Ordering.SignalrHub
return new AutofacServiceProvider(container.Build()); return new AutofacServiceProvider(container.Build());
} }
private IRabbitMQPersistentConnection GenerateConnection(String vHost, IServiceProvider sp)
{
var logger = sp.GetRequiredService<ILogger<DefaultRabbitMQPersistentConnection>>();
var factory = new ConnectionFactory()
{
HostName = Configuration["EventBusConnection"],
DispatchConsumersAsync = true
};
if (!string.IsNullOrEmpty(Configuration["EventBusUserName"]))
{
factory.UserName = Configuration["EventBusUserName"];
}
if (!string.IsNullOrEmpty(Configuration["EventBusPassword"]))
{
factory.Password = Configuration["EventBusPassword"];
}
factory.VirtualHost = vHost;
var retryCount = 5;
if (!string.IsNullOrEmpty(Configuration["EventBusRetryCount"]))
{
retryCount = int.Parse(Configuration["EventBusRetryCount"]);
}
return new DefaultRabbitMQPersistentConnection(factory, logger, retryCount);
}
// This method gets called by the runtime. Use this method to configure the HTTP request pipeline. // This method gets called by the runtime. Use this method to configure the HTTP request pipeline.
public void Configure(IApplicationBuilder app, ILoggerFactory loggerFactory) public void Configure(IApplicationBuilder app, ILoggerFactory loggerFactory)
{ {
@ -159,14 +156,25 @@ namespace Ordering.SignalrHub
private void ConfigureEventBus(IApplicationBuilder app) private void ConfigureEventBus(IApplicationBuilder app)
{ {
var eventBus = app.ApplicationServices.GetRequiredService<IEventBus>();
eventBus.Subscribe<OrderStatusChangedToAwaitingValidationIntegrationEvent, OrderStatusChangedToAwaitingValidationIntegrationEventHandler>();
eventBus.Subscribe<OrderStatusChangedToPaidIntegrationEvent, OrderStatusChangedToPaidIntegrationEventHandler>();
eventBus.Subscribe<OrderStatusChangedToStockConfirmedIntegrationEvent, OrderStatusChangedToStockConfirmedIntegrationEventHandler>();
eventBus.Subscribe<OrderStatusChangedToShippedIntegrationEvent, OrderStatusChangedToShippedIntegrationEventHandler>();
eventBus.Subscribe<OrderStatusChangedToCancelledIntegrationEvent, OrderStatusChangedToCancelledIntegrationEventHandler>();
eventBus.Subscribe<OrderStatusChangedToSubmittedIntegrationEvent, OrderStatusChangedToSubmittedIntegrationEventHandler>();
var eventBus = app.ApplicationServices.GetRequiredService<IMultiEventBus>();
eventBus
.Subscribe<OrderStatusChangedToAwaitingValidationIntegrationEvent,
OrderStatusChangedToAwaitingValidationIntegrationEventHandler>();
eventBus
.Subscribe<OrderStatusChangedToPaidIntegrationEvent, OrderStatusChangedToPaidIntegrationEventHandler>();
eventBus
.Subscribe<OrderStatusChangedToStockConfirmedIntegrationEvent,
OrderStatusChangedToStockConfirmedIntegrationEventHandler>();
eventBus
.Subscribe<OrderStatusChangedToShippedIntegrationEvent,
OrderStatusChangedToShippedIntegrationEventHandler>();
eventBus
.Subscribe<OrderStatusChangedToCancelledIntegrationEvent,
OrderStatusChangedToCancelledIntegrationEventHandler>();
eventBus
.Subscribe<OrderStatusChangedToSubmittedIntegrationEvent,
OrderStatusChangedToSubmittedIntegrationEventHandler>();
} }
private void ConfigureAuthService(IServiceCollection services) private void ConfigureAuthService(IServiceCollection services)
@ -180,7 +188,6 @@ namespace Ordering.SignalrHub
{ {
options.DefaultAuthenticateScheme = JwtBearerDefaults.AuthenticationScheme; options.DefaultAuthenticateScheme = JwtBearerDefaults.AuthenticationScheme;
options.DefaultChallengeScheme = JwtBearerDefaults.AuthenticationScheme; options.DefaultChallengeScheme = JwtBearerDefaults.AuthenticationScheme;
}).AddJwtBearer(options => }).AddJwtBearer(options =>
{ {
options.Authority = identityUrl; options.Authority = identityUrl;
@ -193,37 +200,33 @@ namespace Ordering.SignalrHub
{ {
var subscriptionClientName = Configuration["SubscriptionClientName"]; var subscriptionClientName = Configuration["SubscriptionClientName"];
if (Configuration.GetValue<bool>("AzureServiceBusEnabled"))
{
services.AddSingleton<IEventBus, EventBusServiceBus>(sp =>
{
var serviceBusPersisterConnection = sp.GetRequiredService<IServiceBusPersisterConnection>();
var iLifetimeScope = sp.GetRequiredService<ILifetimeScope>();
var logger = sp.GetRequiredService<ILogger<EventBusServiceBus>>();
var eventBusSubcriptionsManager = sp.GetRequiredService<IEventBusSubscriptionsManager>();
return new EventBusServiceBus(serviceBusPersisterConnection, logger,
eventBusSubcriptionsManager, subscriptionClientName, iLifetimeScope);
});
}
else
services.AddSingleton<IMultiEventBus, MultiEventBusRabbitMQ>(sp =>
{ {
services.AddSingleton<IEventBus, EventBusRabbitMQ>(sp =>
var multiRabbitMqPersistentConnections = sp.GetRequiredService<IMultiRabbitMQPersistentConnections>();
var iLifetimeScope = sp.GetRequiredService<ILifetimeScope>();
var logger = sp.GetRequiredService<ILogger<EventBusRabbitMQ>>();
var eventBusSubcriptionsManager = sp.GetRequiredService<IEventBusSubscriptionsManager>();
var retryCount = 5;
if (!string.IsNullOrEmpty(Configuration["EventBusRetryCount"]))
{ {
var rabbitMQPersistentConnection = sp.GetRequiredService<IRabbitMQPersistentConnection>();
var iLifetimeScope = sp.GetRequiredService<ILifetimeScope>();
var logger = sp.GetRequiredService<ILogger<EventBusRabbitMQ>>();
var eventBusSubcriptionsManager = sp.GetRequiredService<IEventBusSubscriptionsManager>();
var retryCount = 5;
if (!string.IsNullOrEmpty(Configuration["EventBusRetryCount"]))
{
retryCount = int.Parse(Configuration["EventBusRetryCount"]);
}
return new EventBusRabbitMQ(rabbitMQPersistentConnection, logger, iLifetimeScope, eventBusSubcriptionsManager, subscriptionClientName, retryCount);
});
}
retryCount = int.Parse(Configuration["EventBusRetryCount"]);
}
List<IEventBus> eventBuses = new List<IEventBus>();
eventBuses.Add(new EventBusRabbitMQ(multiRabbitMqPersistentConnections.GetConnections()[0], logger,
iLifetimeScope, eventBusSubcriptionsManager, "TenantA", subscriptionClientName, retryCount));
eventBuses.Add(new EventBusRabbitMQ(multiRabbitMqPersistentConnections.GetConnections()[1], logger,
iLifetimeScope, eventBusSubcriptionsManager, "TenantB", subscriptionClientName, retryCount));
Dictionary<int, String> tenants = new Dictionary<int, string>();
tenants.Add(1, "TenantA");
tenants.Add(2, "TenantB");
return new MultiEventBusRabbitMQ(eventBuses, tenants);
});
services.AddSingleton<IEventBusSubscriptionsManager, InMemoryEventBusSubscriptionsManager>(); services.AddSingleton<IEventBusSubscriptionsManager, InMemoryEventBusSubscriptionsManager>();
} }
@ -231,7 +234,8 @@ namespace Ordering.SignalrHub
public static class CustomExtensionMethods public static class CustomExtensionMethods
{ {
public static IServiceCollection AddCustomHealthCheck(this IServiceCollection services, IConfiguration configuration)
public static IServiceCollection AddCustomHealthCheck(this IServiceCollection services,
IConfiguration configuration)
{ {
var hcBuilder = services.AddHealthChecks(); var hcBuilder = services.AddHealthChecks();
@ -244,7 +248,7 @@ namespace Ordering.SignalrHub
configuration["EventBusConnection"], configuration["EventBusConnection"],
topicName: "eshop_event_bus", topicName: "eshop_event_bus",
name: "signalr-servicebus-check", name: "signalr-servicebus-check",
tags: new string[] { "servicebus" });
tags: new string[] {"servicebus"});
} }
else else
{ {
@ -252,10 +256,10 @@ namespace Ordering.SignalrHub
.AddRabbitMQ( .AddRabbitMQ(
$"amqp://{configuration["EventBusConnection"]}", $"amqp://{configuration["EventBusConnection"]}",
name: "signalr-rabbitmqbus-check", name: "signalr-rabbitmqbus-check",
tags: new string[] { "rabbitmqbus" });
tags: new string[] {"rabbitmqbus"});
} }
return services; return services;
} }
} }
}
}

+ 6
- 3
src/Services/Payment/Payment.API/IntegrationEvents/EventHandling/OrderStatusChangedToStockConfirmedIntegrationEventHandler.cs View File

@ -1,4 +1,6 @@
namespace Payment.API.IntegrationEvents.EventHandling
using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Extensions;
namespace Payment.API.IntegrationEvents.EventHandling
{ {
using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Abstractions; using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Abstractions;
using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Events; using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Events;
@ -11,12 +13,12 @@
public class OrderStatusChangedToStockConfirmedIntegrationEventHandler : public class OrderStatusChangedToStockConfirmedIntegrationEventHandler :
IIntegrationEventHandler<OrderStatusChangedToStockConfirmedIntegrationEvent> IIntegrationEventHandler<OrderStatusChangedToStockConfirmedIntegrationEvent>
{ {
private readonly IEventBus _eventBus;
private readonly IMultiEventBus _eventBus;
private readonly PaymentSettings _settings; private readonly PaymentSettings _settings;
private readonly ILogger<OrderStatusChangedToStockConfirmedIntegrationEventHandler> _logger; private readonly ILogger<OrderStatusChangedToStockConfirmedIntegrationEventHandler> _logger;
public OrderStatusChangedToStockConfirmedIntegrationEventHandler( public OrderStatusChangedToStockConfirmedIntegrationEventHandler(
IEventBus eventBus,
IMultiEventBus eventBus,
IOptionsSnapshot<PaymentSettings> settings, IOptionsSnapshot<PaymentSettings> settings,
ILogger<OrderStatusChangedToStockConfirmedIntegrationEventHandler> logger) ILogger<OrderStatusChangedToStockConfirmedIntegrationEventHandler> logger)
{ {
@ -50,6 +52,7 @@
orderPaymentIntegrationEvent = new OrderPaymentFailedIntegrationEvent(@event.OrderId); orderPaymentIntegrationEvent = new OrderPaymentFailedIntegrationEvent(@event.OrderId);
} }
orderPaymentIntegrationEvent.TenantId = @event.TenantId;
_logger.LogInformation("----- Publishing integration event: {IntegrationEventId} from {AppName} - ({@IntegrationEvent})", orderPaymentIntegrationEvent.Id, Program.AppName, orderPaymentIntegrationEvent); _logger.LogInformation("----- Publishing integration event: {IntegrationEventId} from {AppName} - ({@IntegrationEvent})", orderPaymentIntegrationEvent.Id, Program.AppName, orderPaymentIntegrationEvent);
_eventBus.Publish(orderPaymentIntegrationEvent); _eventBus.Publish(orderPaymentIntegrationEvent);


+ 66
- 72
src/Services/Payment/Payment.API/Startup.cs View File

@ -16,6 +16,7 @@ using Payment.API.IntegrationEvents.EventHandling;
using Payment.API.IntegrationEvents.Events; using Payment.API.IntegrationEvents.Events;
using RabbitMQ.Client; using RabbitMQ.Client;
using System; using System;
using System.Collections.Generic;
using HealthChecks.UI.Client; using HealthChecks.UI.Client;
using Microsoft.AspNetCore.Diagnostics.HealthChecks; using Microsoft.AspNetCore.Diagnostics.HealthChecks;
using Microsoft.Extensions.Diagnostics.HealthChecks; using Microsoft.Extensions.Diagnostics.HealthChecks;
@ -38,50 +39,16 @@ namespace Payment.API
services.Configure<PaymentSettings>(Configuration); services.Configure<PaymentSettings>(Configuration);
RegisterAppInsights(services); RegisterAppInsights(services);
if (Configuration.GetValue<bool>("AzureServiceBusEnabled"))
services.AddSingleton<IMultiRabbitMQPersistentConnections>(sp =>
{ {
services.AddSingleton<IServiceBusPersisterConnection>(sp =>
{
var logger = sp.GetRequiredService<ILogger<DefaultServiceBusPersisterConnection>>();
var serviceBusConnectionString = Configuration["EventBusConnection"];
var serviceBusConnection = new ServiceBusConnectionStringBuilder(serviceBusConnectionString);
return new DefaultServiceBusPersisterConnection(serviceBusConnection, logger);
});
}
else
{
services.AddSingleton<IRabbitMQPersistentConnection>(sp =>
{
var logger = sp.GetRequiredService<ILogger<DefaultRabbitMQPersistentConnection>>();
var factory = new ConnectionFactory()
{
HostName = Configuration["EventBusConnection"],
DispatchConsumersAsync = true
};
if (!string.IsNullOrEmpty(Configuration["EventBusUserName"]))
{
factory.UserName = Configuration["EventBusUserName"];
}
if (!string.IsNullOrEmpty(Configuration["EventBusPassword"]))
{
factory.Password = Configuration["EventBusPassword"];
}
var retryCount = 5;
if (!string.IsNullOrEmpty(Configuration["EventBusRetryCount"]))
{
retryCount = int.Parse(Configuration["EventBusRetryCount"]);
}
return new DefaultRabbitMQPersistentConnection(factory, logger, retryCount);
});
}
IMultiRabbitMQPersistentConnections connections = new MultiRabbitMQPersistentConnections();
connections.AddConnection(GenerateConnection("TenantA", sp));
connections.AddConnection(GenerateConnection("TenantB", sp));
return connections;
});
RegisterEventBus(services); RegisterEventBus(services);
var container = new ContainerBuilder(); var container = new ContainerBuilder();
@ -89,6 +56,37 @@ namespace Payment.API
return new AutofacServiceProvider(container.Build()); return new AutofacServiceProvider(container.Build());
} }
private IRabbitMQPersistentConnection GenerateConnection(String vHost, IServiceProvider sp)
{
var logger = sp.GetRequiredService<ILogger<DefaultRabbitMQPersistentConnection>>();
var factory = new ConnectionFactory()
{
HostName = Configuration["EventBusConnection"],
DispatchConsumersAsync = true
};
if (!string.IsNullOrEmpty(Configuration["EventBusUserName"]))
{
factory.UserName = Configuration["EventBusUserName"];
}
if (!string.IsNullOrEmpty(Configuration["EventBusPassword"]))
{
factory.Password = Configuration["EventBusPassword"];
}
factory.VirtualHost = vHost;
var retryCount = 5;
if (!string.IsNullOrEmpty(Configuration["EventBusRetryCount"]))
{
retryCount = int.Parse(Configuration["EventBusRetryCount"]);
}
return new DefaultRabbitMQPersistentConnection(factory, logger, retryCount);
}
// This method gets called by the runtime. Use this method to configure the HTTP request pipeline. // This method gets called by the runtime. Use this method to configure the HTTP request pipeline.
public void Configure(IApplicationBuilder app, IHostingEnvironment env, ILoggerFactory loggerFactory) public void Configure(IApplicationBuilder app, IHostingEnvironment env, ILoggerFactory loggerFactory)
{ {
@ -138,37 +136,33 @@ namespace Payment.API
{ {
var subscriptionClientName = Configuration["SubscriptionClientName"]; var subscriptionClientName = Configuration["SubscriptionClientName"];
if (Configuration.GetValue<bool>("AzureServiceBusEnabled"))
services.AddSingleton<IMultiEventBus, MultiEventBusRabbitMQ>(sp =>
{ {
services.AddSingleton<IEventBus, EventBusServiceBus>(sp =>
{
var serviceBusPersisterConnection = sp.GetRequiredService<IServiceBusPersisterConnection>();
var iLifetimeScope = sp.GetRequiredService<ILifetimeScope>();
var logger = sp.GetRequiredService<ILogger<EventBusServiceBus>>();
var eventBusSubcriptionsManager = sp.GetRequiredService<IEventBusSubscriptionsManager>();
return new EventBusServiceBus(serviceBusPersisterConnection, logger,
eventBusSubcriptionsManager, subscriptionClientName, iLifetimeScope);
});
}
else
{
services.AddSingleton<IEventBus, EventBusRabbitMQ>(sp =>
var multiRabbitMqPersistentConnections = sp.GetRequiredService<IMultiRabbitMQPersistentConnections>();
var iLifetimeScope = sp.GetRequiredService<ILifetimeScope>();
var logger = sp.GetRequiredService<ILogger<EventBusRabbitMQ>>();
var eventBusSubcriptionsManager = sp.GetRequiredService<IEventBusSubscriptionsManager>();
var retryCount = 5;
if (!string.IsNullOrEmpty(Configuration["EventBusRetryCount"]))
{ {
var rabbitMQPersistentConnection = sp.GetRequiredService<IRabbitMQPersistentConnection>();
var iLifetimeScope = sp.GetRequiredService<ILifetimeScope>();
var logger = sp.GetRequiredService<ILogger<EventBusRabbitMQ>>();
var eventBusSubcriptionsManager = sp.GetRequiredService<IEventBusSubscriptionsManager>();
var retryCount = 5;
if (!string.IsNullOrEmpty(Configuration["EventBusRetryCount"]))
{
retryCount = int.Parse(Configuration["EventBusRetryCount"]);
}
return new EventBusRabbitMQ(rabbitMQPersistentConnection, logger, iLifetimeScope, eventBusSubcriptionsManager, subscriptionClientName, retryCount);
});
}
retryCount = int.Parse(Configuration["EventBusRetryCount"]);
}
List<IEventBus> eventBuses = new List<IEventBus>();
eventBuses.Add(new EventBusRabbitMQ(multiRabbitMqPersistentConnections.GetConnections()[0], logger,
iLifetimeScope, eventBusSubcriptionsManager, "TenantA", subscriptionClientName, retryCount));
eventBuses.Add(new EventBusRabbitMQ(multiRabbitMqPersistentConnections.GetConnections()[1], logger,
iLifetimeScope, eventBusSubcriptionsManager, "TenantB", subscriptionClientName, retryCount));
Dictionary<int, String> tenants = new Dictionary<int, string>();
tenants.Add(1, "TenantA");
tenants.Add(2, "TenantB");
return new MultiEventBusRabbitMQ(eventBuses, tenants);
});
services.AddTransient<OrderStatusChangedToStockConfirmedIntegrationEventHandler>(); services.AddTransient<OrderStatusChangedToStockConfirmedIntegrationEventHandler>();
services.AddSingleton<IEventBusSubscriptionsManager, InMemoryEventBusSubscriptionsManager>(); services.AddSingleton<IEventBusSubscriptionsManager, InMemoryEventBusSubscriptionsManager>();
@ -176,7 +170,7 @@ namespace Payment.API
private void ConfigureEventBus(IApplicationBuilder app) private void ConfigureEventBus(IApplicationBuilder app)
{ {
var eventBus = app.ApplicationServices.GetRequiredService<IEventBus>();
var eventBus = app.ApplicationServices.GetRequiredService<IMultiEventBus>();
eventBus.Subscribe<OrderStatusChangedToStockConfirmedIntegrationEvent, OrderStatusChangedToStockConfirmedIntegrationEventHandler>(); eventBus.Subscribe<OrderStatusChangedToStockConfirmedIntegrationEvent, OrderStatusChangedToStockConfirmedIntegrationEventHandler>();
} }
} }


+ 5
- 3
src/Services/TenantCustomisations/TenantACustomisations/Startup.cs View File

@ -311,6 +311,8 @@
factory.Password = configuration["EventBusPassword"]; factory.Password = configuration["EventBusPassword"];
} }
factory.VirtualHost = "TenantA";
var retryCount = 5; var retryCount = 5;
if (!string.IsNullOrEmpty(configuration["EventBusRetryCount"])) if (!string.IsNullOrEmpty(configuration["EventBusRetryCount"]))
{ {
@ -353,7 +355,7 @@
{ {
var subscriptionClientName = configuration["SubscriptionClientName"]; var subscriptionClientName = configuration["SubscriptionClientName"];
if (configuration.GetValue<bool>("AzureServiceBusEnabled"))
/* if (configuration.GetValue<bool>("AzureServiceBusEnabled"))
{ {
services.AddSingleton<IEventBus, EventBusServiceBus>(sp => services.AddSingleton<IEventBus, EventBusServiceBus>(sp =>
{ {
@ -366,7 +368,7 @@
eventBusSubcriptionsManager, subscriptionClientName, iLifetimeScope); eventBusSubcriptionsManager, subscriptionClientName, iLifetimeScope);
}); });
} }
else
else*/
{ {
services.AddSingleton<IEventBus, EventBusRabbitMQ>(sp => services.AddSingleton<IEventBus, EventBusRabbitMQ>(sp =>
{ {
@ -381,7 +383,7 @@
retryCount = int.Parse(configuration["EventBusRetryCount"]); retryCount = int.Parse(configuration["EventBusRetryCount"]);
} }
return new EventBusRabbitMQ(rabbitMQPersistentConnection, logger, iLifetimeScope, eventBusSubcriptionsManager, subscriptionClientName, retryCount);
return new EventBusRabbitMQ(rabbitMQPersistentConnection, logger, iLifetimeScope, eventBusSubcriptionsManager, "TenantA", subscriptionClientName, retryCount);
}); });
} }


+ 53
- 58
src/Services/Webhooks/Webhooks.API/Startup.cs View File

@ -126,7 +126,7 @@ namespace Webhooks.API
protected virtual void ConfigureEventBus(IApplicationBuilder app) protected virtual void ConfigureEventBus(IApplicationBuilder app)
{ {
var eventBus = app.ApplicationServices.GetRequiredService<IEventBus>();
var eventBus = app.ApplicationServices.GetRequiredService<IMultiEventBus>();
eventBus.Subscribe<ProductPriceChangedIntegrationEvent, ProductPriceChangedIntegrationEventHandler>(); eventBus.Subscribe<ProductPriceChangedIntegrationEvent, ProductPriceChangedIntegrationEventHandler>();
eventBus.Subscribe<OrderStatusChangedToShippedIntegrationEvent, OrderStatusChangedToShippedIntegrationEventHandler>(); eventBus.Subscribe<OrderStatusChangedToShippedIntegrationEvent, OrderStatusChangedToShippedIntegrationEventHandler>();
eventBus.Subscribe<OrderStatusChangedToPaidIntegrationEvent, OrderStatusChangedToPaidIntegrationEventHandler>(); eventBus.Subscribe<OrderStatusChangedToPaidIntegrationEvent, OrderStatusChangedToPaidIntegrationEventHandler>();
@ -231,26 +231,10 @@ namespace Webhooks.API
public static IServiceCollection AddEventBus(this IServiceCollection services, IConfiguration configuration) public static IServiceCollection AddEventBus(this IServiceCollection services, IConfiguration configuration)
{ {
var subscriptionClientName = configuration["SubscriptionClientName"]; var subscriptionClientName = configuration["SubscriptionClientName"];
if (configuration.GetValue<bool>("AzureServiceBusEnabled"))
{
services.AddSingleton<IEventBus, EventBusServiceBus>(sp =>
{
var serviceBusPersisterConnection = sp.GetRequiredService<IServiceBusPersisterConnection>();
var iLifetimeScope = sp.GetRequiredService<ILifetimeScope>();
var logger = sp.GetRequiredService<ILogger<EventBusServiceBus>>();
var eventBusSubcriptionsManager = sp.GetRequiredService<IEventBusSubscriptionsManager>();
return new EventBusServiceBus(serviceBusPersisterConnection, logger,
eventBusSubcriptionsManager, subscriptionClientName, iLifetimeScope);
});
}
else
{
services.AddSingleton<IEventBus, EventBusRabbitMQ>(sp =>
services.AddSingleton<IMultiEventBus, MultiEventBusRabbitMQ>(sp =>
{ {
var rabbitMQPersistentConnection = sp.GetRequiredService<IRabbitMQPersistentConnection>();
var multiRabbitMqPersistentConnections = sp.GetRequiredService<IMultiRabbitMQPersistentConnections>();
var iLifetimeScope = sp.GetRequiredService<ILifetimeScope>(); var iLifetimeScope = sp.GetRequiredService<ILifetimeScope>();
var logger = sp.GetRequiredService<ILogger<EventBusRabbitMQ>>(); var logger = sp.GetRequiredService<ILogger<EventBusRabbitMQ>>();
var eventBusSubcriptionsManager = sp.GetRequiredService<IEventBusSubscriptionsManager>(); var eventBusSubcriptionsManager = sp.GetRequiredService<IEventBusSubscriptionsManager>();
@ -261,9 +245,19 @@ namespace Webhooks.API
retryCount = int.Parse(configuration["EventBusRetryCount"]); retryCount = int.Parse(configuration["EventBusRetryCount"]);
} }
return new EventBusRabbitMQ(rabbitMQPersistentConnection, logger, iLifetimeScope, eventBusSubcriptionsManager, subscriptionClientName, retryCount);
List<IEventBus> eventBuses = new List<IEventBus>();
eventBuses.Add(new EventBusRabbitMQ(multiRabbitMqPersistentConnections.GetConnections()[0], logger,
iLifetimeScope, eventBusSubcriptionsManager, "TenantA", subscriptionClientName, retryCount));
eventBuses.Add(new EventBusRabbitMQ(multiRabbitMqPersistentConnections.GetConnections()[1], logger,
iLifetimeScope, eventBusSubcriptionsManager, "TenantB", subscriptionClientName, retryCount));
Dictionary<int, String> tenants = new Dictionary<int, string>();
tenants.Add(1, "TenantA");
tenants.Add(2, "TenantB");
return new MultiEventBusRabbitMQ(eventBuses, tenants);
}); });
}
services.AddSingleton<IEventBusSubscriptionsManager, InMemoryEventBusSubscriptionsManager>(); services.AddSingleton<IEventBusSubscriptionsManager, InMemoryEventBusSubscriptionsManager>();
services.AddTransient<ProductPriceChangedIntegrationEventHandler>(); services.AddTransient<ProductPriceChangedIntegrationEventHandler>();
@ -304,51 +298,52 @@ namespace Webhooks.API
{ {
services.AddTransient<Func<DbConnection, IIntegrationEventLogService>>( services.AddTransient<Func<DbConnection, IIntegrationEventLogService>>(
sp => (DbConnection c) => new IntegrationEventLogService(c)); sp => (DbConnection c) => new IntegrationEventLogService(c));
if (configuration.GetValue<bool>("AzureServiceBusEnabled"))
{
services.AddSingleton<IServiceBusPersisterConnection>(sp =>
{
var logger = sp.GetRequiredService<ILogger<DefaultServiceBusPersisterConnection>>();
var serviceBusConnection = new ServiceBusConnectionStringBuilder(configuration["EventBusConnection"]);
return new DefaultServiceBusPersisterConnection(serviceBusConnection, logger);
});
}
else
services.AddSingleton<IMultiRabbitMQPersistentConnections>(sp =>
{ {
services.AddSingleton<IRabbitMQPersistentConnection>(sp =>
{
var logger = sp.GetRequiredService<ILogger<DefaultRabbitMQPersistentConnection>>();
IMultiRabbitMQPersistentConnections connections = new MultiRabbitMQPersistentConnections();
connections.AddConnection(GenerateConnection("TenantA", sp, configuration));
connections.AddConnection(GenerateConnection("TenantB", sp, configuration));
var factory = new ConnectionFactory()
{
HostName = configuration["EventBusConnection"],
DispatchConsumersAsync = true
};
return connections;
});
if (!string.IsNullOrEmpty(configuration["EventBusUserName"]))
{
factory.UserName = configuration["EventBusUserName"];
}
return services;
}
if (!string.IsNullOrEmpty(configuration["EventBusPassword"]))
{
factory.Password = configuration["EventBusPassword"];
}
private static IRabbitMQPersistentConnection GenerateConnection(String vHost, IServiceProvider sp, IConfiguration configuration)
{
var logger = sp.GetRequiredService<ILogger<DefaultRabbitMQPersistentConnection>>();
var retryCount = 5;
if (!string.IsNullOrEmpty(configuration["EventBusRetryCount"]))
{
retryCount = int.Parse(configuration["EventBusRetryCount"]);
}
var factory = new ConnectionFactory()
{
HostName = configuration["EventBusConnection"],
DispatchConsumersAsync = true
};
return new DefaultRabbitMQPersistentConnection(factory, logger, retryCount);
});
if (!string.IsNullOrEmpty(configuration["EventBusUserName"]))
{
factory.UserName = configuration["EventBusUserName"];
} }
return services;
}
if (!string.IsNullOrEmpty(configuration["EventBusPassword"]))
{
factory.Password = configuration["EventBusPassword"];
}
factory.VirtualHost = vHost;
var retryCount = 5;
if (!string.IsNullOrEmpty(configuration["EventBusRetryCount"]))
{
retryCount = int.Parse(configuration["EventBusRetryCount"]);
}
return new DefaultRabbitMQPersistentConnection(factory, logger, retryCount);
}
public static IServiceCollection AddCustomAuthentication(this IServiceCollection services, IConfiguration configuration) public static IServiceCollection AddCustomAuthentication(this IServiceCollection services, IConfiguration configuration)
{ {
// prevent from mapping "sub" claim to nameidentifier. // prevent from mapping "sub" claim to nameidentifier.


Loading…
Cancel
Save