Browse Source

WIP

pull/1240/head
espent1004 5 years ago
parent
commit
d940d9a65d
16 changed files with 166 additions and 79 deletions
  1. +4
    -4
      src/BuildingBlocks/EventBus/EventBus/Abstractions/IEventBus.cs
  2. +15
    -0
      src/BuildingBlocks/EventBus/EventBus/Abstractions/IMultiEventBus.cs
  3. +8
    -8
      src/BuildingBlocks/EventBus/EventBus/IEventBusSubscriptionsManager.cs
  4. +54
    -36
      src/BuildingBlocks/EventBus/EventBus/InMemoryEventBusSubscriptionsManager.cs
  5. +12
    -16
      src/BuildingBlocks/EventBus/EventBusRabbitMQ/EventBusRabbitMQ.cs
  6. +36
    -0
      src/BuildingBlocks/EventBus/EventBusRabbitMQ/MultiEventBusRabbitMQ.cs
  7. +29
    -5
      src/Services/Basket/Basket.API/Startup.cs
  8. +1
    -1
      src/Services/Catalog/Catalog.API/Startup.cs
  9. +1
    -1
      src/Services/Location/Locations.API/Startup.cs
  10. +1
    -1
      src/Services/Marketing/Marketing.API/Startup.cs
  11. +1
    -1
      src/Services/Ordering/Ordering.API/Startup.cs
  12. +0
    -2
      src/Services/Ordering/Ordering.BackgroundTasks/Startup.cs
  13. +1
    -1
      src/Services/Ordering/Ordering.SignalrHub/Startup.cs
  14. +1
    -1
      src/Services/Payment/Payment.API/Startup.cs
  15. +1
    -1
      src/Services/TenantCustomisations/TenantACustomisations/Startup.cs
  16. +1
    -1
      src/Services/Webhooks/Webhooks.API/Startup.cs

+ 4
- 4
src/BuildingBlocks/EventBus/EventBus/Abstractions/IEventBus.cs View File

@ -7,17 +7,17 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Abstractions
{
void Publish(IntegrationEvent @event);
void Subscribe<T, TH>()
void Subscribe<T, TH>(String vHost)
where T : IntegrationEvent
where TH : IIntegrationEventHandler<T>;
void SubscribeDynamic<TH>(string eventName)
void SubscribeDynamic<TH>(string eventName, String vHost)
where TH : IDynamicIntegrationEventHandler;
void UnsubscribeDynamic<TH>(string eventName)
void UnsubscribeDynamic<TH>(string eventName, String vHost)
where TH : IDynamicIntegrationEventHandler;
void Unsubscribe<T, TH>()
void Unsubscribe<T, TH>(String vHost)
where TH : IIntegrationEventHandler<T>
where T : IntegrationEvent;
}


+ 15
- 0
src/BuildingBlocks/EventBus/EventBus/Abstractions/IMultiEventBus.cs View File

@ -0,0 +1,15 @@
using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Events;
namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Abstractions
{
public interface IMultiEventBus
{
void AddEventBus(IEventBus eventBus);
void Publish(IntegrationEvent @event);
void Subscribe<T, TH>()
where T : IntegrationEvent
where TH : IIntegrationEventHandler<T>;
}
}

+ 8
- 8
src/BuildingBlocks/EventBus/EventBus/IEventBusSubscriptionsManager.cs View File

@ -10,25 +10,25 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBus
{
bool IsEmpty { get; }
event EventHandler<string> OnEventRemoved;
void AddDynamicSubscription<TH>(string eventName)
void AddDynamicSubscription<TH>(string eventName, String vHost)
where TH : IDynamicIntegrationEventHandler;
void AddSubscription<T, TH>()
void AddSubscription<T, TH>(String vHost)
where T : IntegrationEvent
where TH : IIntegrationEventHandler<T>;
void RemoveSubscription<T, TH>()
void RemoveSubscription<T, TH>(String vHost)
where TH : IIntegrationEventHandler<T>
where T : IntegrationEvent;
void RemoveDynamicSubscription<TH>(string eventName)
void RemoveDynamicSubscription<TH>(string eventName, String vHost)
where TH : IDynamicIntegrationEventHandler;
bool HasSubscriptionsForEvent<T>() where T : IntegrationEvent;
bool HasSubscriptionsForEvent(string eventName);
bool HasSubscriptionsForEvent<T>(String vHost) where T : IntegrationEvent;
bool HasSubscriptionsForEvent(string eventName, String vHost);
Type GetEventTypeByName(string eventName);
void Clear();
IEnumerable<SubscriptionInfo> GetHandlersForEvent<T>() where T : IntegrationEvent;
IEnumerable<SubscriptionInfo> GetHandlersForEvent(string eventName);
IEnumerable<SubscriptionInfo> GetHandlersForEvent<T>(String vHost) where T : IntegrationEvent;
IEnumerable<SubscriptionInfo> GetHandlersForEvent(string eventName, String vHost);
string GetEventKey<T>();
}
}

+ 54
- 36
src/BuildingBlocks/EventBus/EventBus/InMemoryEventBusSubscriptionsManager.cs View File

@ -10,48 +10,52 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBus
{
private readonly Dictionary<string, List<SubscriptionInfo>> _handlers;
//private readonly Dictionary<string, List<SubscriptionInfo>> _handlers;
private readonly Dictionary<CompositeHandler, List<SubscriptionInfo>> _handlers;
private readonly List<Type> _eventTypes;
public event EventHandler<string> OnEventRemoved;
public InMemoryEventBusSubscriptionsManager()
{
_handlers = new Dictionary<string, List<SubscriptionInfo>>();
//_handlers = new Dictionary<string, List<SubscriptionInfo>>();
_handlers = new Dictionary<CompositeHandler, List<SubscriptionInfo>>();
_eventTypes = new List<Type>();
}
public bool IsEmpty => !_handlers.Keys.Any();
public void Clear() => _handlers.Clear();
public void AddDynamicSubscription<TH>(string eventName)
public void AddDynamicSubscription<TH>(string eventName, String vHost)
where TH : IDynamicIntegrationEventHandler
{
DoAddSubscription(typeof(TH), eventName, isDynamic: true);
DoAddSubscription(typeof(TH), eventName, isDynamic: true, vHost);
}
public void AddSubscription<T, TH>()
public void AddSubscription<T, TH>(String vHost)
where T : IntegrationEvent
where TH : IIntegrationEventHandler<T>
{
var eventName = GetEventKey<T>();
DoAddSubscription(typeof(TH), eventName, isDynamic: false);
DoAddSubscription(typeof(TH), eventName, isDynamic: false, vHost);
if (!_eventTypes.Contains(typeof(T)))
{
_eventTypes.Add(typeof(T));
}
}
private void DoAddSubscription(Type handlerType, string eventName, bool isDynamic)
private void DoAddSubscription(Type handlerType, string eventName, bool isDynamic, String vHost)
{
if (!HasSubscriptionsForEvent(eventName))
var compositeHandler = new CompositeHandler{TenantVHostName = vHost, EventName = eventName};
if (!HasSubscriptionsForEvent(eventName, vHost))
{
_handlers.Add(eventName, new List<SubscriptionInfo>());
_handlers.Add(compositeHandler, new List<SubscriptionInfo>());
}
if (_handlers[eventName].Any(s => s.HandlerType == handlerType))
if (_handlers[compositeHandler].Any(s => s.HandlerType == handlerType))
{
throw new ArgumentException(
$"Handler Type {handlerType.Name} already registered for '{eventName}'", nameof(handlerType));
@ -59,41 +63,43 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBus
if (isDynamic)
{
_handlers[eventName].Add(SubscriptionInfo.Dynamic(handlerType));
_handlers[compositeHandler].Add(SubscriptionInfo.Dynamic(handlerType));
}
else
{
_handlers[eventName].Add(SubscriptionInfo.Typed(handlerType));
_handlers[compositeHandler].Add(SubscriptionInfo.Typed(handlerType));
}
}
public void RemoveDynamicSubscription<TH>(string eventName)
public void RemoveDynamicSubscription<TH>(string eventName, String vHost)
where TH : IDynamicIntegrationEventHandler
{
var handlerToRemove = FindDynamicSubscriptionToRemove<TH>(eventName);
DoRemoveHandler(eventName, handlerToRemove);
var handlerToRemove = FindDynamicSubscriptionToRemove<TH>(eventName, vHost);
DoRemoveHandler(eventName, handlerToRemove, vHost);
}
public void RemoveSubscription<T, TH>()
public void RemoveSubscription<T, TH>(String vHost)
where TH : IIntegrationEventHandler<T>
where T : IntegrationEvent
{
var handlerToRemove = FindSubscriptionToRemove<T, TH>();
var handlerToRemove = FindSubscriptionToRemove<T, TH>(vHost);
var eventName = GetEventKey<T>();
DoRemoveHandler(eventName, handlerToRemove);
DoRemoveHandler(eventName, handlerToRemove, vHost);
}
private void DoRemoveHandler(string eventName, SubscriptionInfo subsToRemove)
private void DoRemoveHandler(string eventName, SubscriptionInfo subsToRemove, String vHost)
{
if (subsToRemove != null)
{
_handlers[eventName].Remove(subsToRemove);
if (!_handlers[eventName].Any())
var compositeHandler = new CompositeHandler{EventName = eventName, TenantVHostName = vHost};
_handlers[compositeHandler].Remove(subsToRemove);
if (!_handlers[compositeHandler].Any())
{
_handlers.Remove(eventName);
_handlers.Remove(compositeHandler);
var eventType = _eventTypes.SingleOrDefault(e => e.Name == eventName);
if (eventType != null)
{
@ -105,12 +111,12 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBus
}
}
public IEnumerable<SubscriptionInfo> GetHandlersForEvent<T>() where T : IntegrationEvent
public IEnumerable<SubscriptionInfo> GetHandlersForEvent<T>(String vHost) where T : IntegrationEvent
{
var key = GetEventKey<T>();
return GetHandlersForEvent(key);
return GetHandlersForEvent(key, vHost);
}
public IEnumerable<SubscriptionInfo> GetHandlersForEvent(string eventName) => _handlers[eventName];
public IEnumerable<SubscriptionInfo> GetHandlersForEvent(string eventName, String vHost) => _handlers[new CompositeHandler{EventName = eventName, TenantVHostName = vHost}];
private void RaiseOnEventRemoved(string eventName)
{
@ -119,38 +125,43 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBus
}
private SubscriptionInfo FindDynamicSubscriptionToRemove<TH>(string eventName)
private SubscriptionInfo FindDynamicSubscriptionToRemove<TH>(string eventName, String vHost)
where TH : IDynamicIntegrationEventHandler
{
return DoFindSubscriptionToRemove(eventName, typeof(TH));
return DoFindSubscriptionToRemove(eventName, typeof(TH), vHost);
}
private SubscriptionInfo FindSubscriptionToRemove<T, TH>()
private SubscriptionInfo FindSubscriptionToRemove<T, TH>(String vHost)
where T : IntegrationEvent
where TH : IIntegrationEventHandler<T>
{
var eventName = GetEventKey<T>();
return DoFindSubscriptionToRemove(eventName, typeof(TH));
return DoFindSubscriptionToRemove(eventName, typeof(TH), vHost);
}
private SubscriptionInfo DoFindSubscriptionToRemove(string eventName, Type handlerType)
private SubscriptionInfo DoFindSubscriptionToRemove(string eventName, Type handlerType, String vHost)
{
if (!HasSubscriptionsForEvent(eventName))
if (!HasSubscriptionsForEvent(eventName, vHost))
{
return null;
}
var compositeHandler = new CompositeHandler{EventName = eventName, TenantVHostName = vHost};
return _handlers[eventName].SingleOrDefault(s => s.HandlerType == handlerType);
return _handlers[compositeHandler].SingleOrDefault(s => s.HandlerType == handlerType);
}
public bool HasSubscriptionsForEvent<T>() where T : IntegrationEvent
public bool HasSubscriptionsForEvent<T>(String vHost) where T : IntegrationEvent
{
var key = GetEventKey<T>();
return HasSubscriptionsForEvent(key);
return HasSubscriptionsForEvent(key, vHost);
}
public bool HasSubscriptionsForEvent(string eventName) => _handlers.ContainsKey(eventName);
public bool HasSubscriptionsForEvent(string eventName, String vHost) => _handlers.ContainsKey(new CompositeHandler{EventName = eventName, TenantVHostName = vHost});
public Type GetEventTypeByName(string eventName) => _eventTypes.SingleOrDefault(t => t.Name == eventName);
@ -160,3 +171,10 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBus
}
}
}
class CompositeHandler
{
public String TenantVHostName { get; set; }
public String EventName { get; set; }
}

+ 12
- 16
src/BuildingBlocks/EventBus/EventBusRabbitMQ/EventBusRabbitMQ.cs View File

@ -12,12 +12,8 @@ using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using RabbitMQ.Client.Exceptions;
using System;
using System.Collections;
using System.Collections.Generic;
using System.Diagnostics;
using System.Net;
using System.Net.Http;
using System.Net.Mime;
using System.Net.Sockets;
using System.Text;
using System.Threading.Tasks;
@ -130,34 +126,34 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ
}
}
public void SubscribeDynamic<TH>(string eventName)
public void SubscribeDynamic<TH>(string eventName, String vHost)
where TH : IDynamicIntegrationEventHandler
{
_logger.LogInformation("Subscribing to dynamic event {EventName} with {EventHandler}", eventName,
typeof(TH).GetGenericTypeName());
DoInternalSubscription(eventName);
_subsManager.AddDynamicSubscription<TH>(eventName);
DoInternalSubscription(eventName, vHost);
_subsManager.AddDynamicSubscription<TH>(eventName, vHost);
StartBasicConsume();
}
public void Subscribe<T, TH>()
public void Subscribe<T, TH>(String vHost)
where T : IntegrationEvent
where TH : IIntegrationEventHandler<T>
{
var eventName = _subsManager.GetEventKey<T>();
DoInternalSubscription(eventName);
DoInternalSubscription(eventName, vHost);
_logger.LogInformation("Subscribing to event {EventName} with {EventHandler}", eventName,
typeof(TH).GetGenericTypeName());
_subsManager.AddSubscription<T, TH>();
_subsManager.AddSubscription<T, TH>(vHost);
StartBasicConsume();
}
private void DoInternalSubscription(string eventName)
private void DoInternalSubscription(string eventName, String vHost)
{
var containsKey = _subsManager.HasSubscriptionsForEvent(eventName);
var containsKey = _subsManager.HasSubscriptionsForEvent(eventName, vHost);
if (!containsKey)
{
if (!_persistentConnection.IsConnected)
@ -174,7 +170,7 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ
}
}
public void Unsubscribe<T, TH>()
public void Unsubscribe<T, TH>(String vHost)
where T : IntegrationEvent
where TH : IIntegrationEventHandler<T>
{
@ -182,13 +178,13 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ
_logger.LogInformation("Unsubscribing from event {EventName}", eventName);
_subsManager.RemoveSubscription<T, TH>();
_subsManager.RemoveSubscription<T, TH>(vHost);
}
public void UnsubscribeDynamic<TH>(string eventName)
public void UnsubscribeDynamic<TH>(string eventName, String vHost)
where TH : IDynamicIntegrationEventHandler
{
_subsManager.RemoveDynamicSubscription<TH>(eventName);
_subsManager.RemoveDynamicSubscription<TH>(eventName, vHost);
}
public void Dispose()


+ 36
- 0
src/BuildingBlocks/EventBus/EventBusRabbitMQ/MultiEventBusRabbitMQ.cs View File

@ -0,0 +1,36 @@
using System.Collections.Generic;
using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Abstractions;
using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Events;
namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ
{
public class MultiEventBusRabbitMQ : IMultiEventBus
{
private List<IEventBus> _eventBuses;
public MultiEventBusRabbitMQ(List<IEventBus> eventBuses)
{
_eventBuses = eventBuses;
}
public void AddEventBus(IEventBus eventBus)
{
_eventBuses.Add(eventBus);
}
public void Publish(IntegrationEvent @event)
{
throw new System.NotImplementedException();
}
public void Subscribe<T, TH>() where T : IntegrationEvent where TH : IIntegrationEventHandler<T>
{
_eventBuses.ForEach(e =>
{
e.Subscribe<T, TH>();
});
}
}
}

+ 29
- 5
src/Services/Basket/Basket.API/Startup.cs View File

@ -120,7 +120,7 @@ namespace Microsoft.eShopOnContainers.Services.Basket.API
factory.Password = Configuration["EventBusPassword"];
}
factory.VirtualHost = "customisation";
//factory.VirtualHost = "customisation";
var retryCount = 5;
if (!string.IsNullOrEmpty(Configuration["EventBusRetryCount"]))
@ -138,7 +138,8 @@ namespace Microsoft.eShopOnContainers.Services.Basket.API
services.AddSingleton<IMultiRabbitMQPersistentConnections>(sp =>
{
IMultiRabbitMQPersistentConnections connections = new MultiRabbitMQPersistentConnections();
connections.AddConnection(GenerateConnection("customisation", sp));
connections.AddConnection(GenerateConnection("TenantA", sp));
connections.AddConnection(GenerateConnection("TenantB", sp));
connections.AddConnection(GenerateConnection("/", sp));
return connections;
@ -338,7 +339,30 @@ namespace Microsoft.eShopOnContainers.Services.Basket.API
}
else
{
services.AddSingleton<IEventBus, EventBusRabbitMQ>(sp =>
services.AddSingleton<IMultiEventBus, MultiEventBusRabbitMQ>(sp =>
{
var multiRabbitMqPersistentConnections = sp.GetRequiredService<IMultiRabbitMQPersistentConnections>();
var iLifetimeScope = sp.GetRequiredService<ILifetimeScope>();
var logger = sp.GetRequiredService<ILogger<EventBusRabbitMQ>>();
var eventBusSubcriptionsManager = sp.GetRequiredService<IEventBusSubscriptionsManager>();
var retryCount = 5;
if (!string.IsNullOrEmpty(Configuration["EventBusRetryCount"]))
{
retryCount = int.Parse(Configuration["EventBusRetryCount"]);
}
List<IEventBus> eventBuses = new List<IEventBus>();
multiRabbitMqPersistentConnections.GetConnections().ForEach(conn =>
{
eventBuses.Add(new EventBusRabbitMQ(conn, logger, iLifetimeScope, eventBusSubcriptionsManager, subscriptionClientName, retryCount));
});
return new MultiEventBusRabbitMQ(eventBuses);
});
/* services.AddSingleton<IEventBus, EventBusRabbitMQ>(sp =>
{
var rabbitMQPersistentConnection = sp.GetRequiredService<IRabbitMQPersistentConnection>();
var iLifetimeScope = sp.GetRequiredService<ILifetimeScope>();
@ -363,7 +387,7 @@ namespace Microsoft.eShopOnContainers.Services.Basket.API
Console.WriteLine(testing);
return new EventBusRabbitMQ(rabbitMQPersistentConnection, logger, iLifetimeScope, eventBusSubcriptionsManager, subscriptionClientName, retryCount);
});
});*/
}
@ -375,7 +399,7 @@ namespace Microsoft.eShopOnContainers.Services.Basket.API
private void ConfigureEventBus(IApplicationBuilder app)
{
var eventBus = app.ApplicationServices.GetRequiredService<IEventBus>();
var eventBus = app.ApplicationServices.GetRequiredService<IMultiEventBus>();
eventBus.Subscribe<ProductPriceChangedIntegrationEvent, ProductPriceChangedIntegrationEventHandler>();
eventBus.Subscribe<OrderStartedIntegrationEvent, OrderStartedIntegrationEventHandler>();


+ 1
- 1
src/Services/Catalog/Catalog.API/Startup.cs View File

@ -311,7 +311,7 @@ namespace Microsoft.eShopOnContainers.Services.Catalog.API
factory.Password = configuration["EventBusPassword"];
}
factory.VirtualHost = "customisation";
//factory.VirtualHost = "customisation";
var retryCount = 5;
if (!string.IsNullOrEmpty(configuration["EventBusRetryCount"]))


+ 1
- 1
src/Services/Location/Locations.API/Startup.cs View File

@ -91,7 +91,7 @@ namespace Microsoft.eShopOnContainers.Services.Locations.API
factory.Password = Configuration["EventBusPassword"];
}
factory.VirtualHost = "customisation";
//factory.VirtualHost = "customisation";
var retryCount = 5;
if (!string.IsNullOrEmpty(Configuration["EventBusRetryCount"]))


+ 1
- 1
src/Services/Marketing/Marketing.API/Startup.cs View File

@ -115,7 +115,7 @@
factory.Password = Configuration["EventBusPassword"];
}
factory.VirtualHost = "customisation";
// factory.VirtualHost = "customisation";
var retryCount = 5;
if (!string.IsNullOrEmpty(Configuration["EventBusRetryCount"]))


+ 1
- 1
src/Services/Ordering/Ordering.API/Startup.cs View File

@ -319,7 +319,7 @@
factory.Password = configuration["EventBusPassword"];
}
factory.VirtualHost = "customisation";
//factory.VirtualHost = "customisation";
var retryCount = 5;


+ 0
- 2
src/Services/Ordering/Ordering.BackgroundTasks/Startup.cs View File

@ -82,8 +82,6 @@ namespace Ordering.BackgroundTasks
factory.Password = Configuration["EventBusPassword"];
}
factory.VirtualHost = "customisation";
var retryCount = 5;
if (!string.IsNullOrEmpty(Configuration["EventBusRetryCount"]))
{


+ 1
- 1
src/Services/Ordering/Ordering.SignalrHub/Startup.cs View File

@ -94,7 +94,7 @@ namespace Ordering.SignalrHub
factory.Password = Configuration["EventBusPassword"];
}
factory.VirtualHost = "customisation";
//factory.VirtualHost = "customisation";
var retryCount = 5;
if (!string.IsNullOrEmpty(Configuration["EventBusRetryCount"]))


+ 1
- 1
src/Services/Payment/Payment.API/Startup.cs View File

@ -72,7 +72,7 @@ namespace Payment.API
factory.Password = Configuration["EventBusPassword"];
}
factory.VirtualHost = "customisation";
// factory.VirtualHost = "customisation";
var retryCount = 5;
if (!string.IsNullOrEmpty(Configuration["EventBusRetryCount"]))


+ 1
- 1
src/Services/TenantCustomisations/TenantACustomisations/Startup.cs View File

@ -311,7 +311,7 @@
factory.Password = configuration["EventBusPassword"];
}
factory.VirtualHost = "customisation";
//factory.VirtualHost = "customisation";
var retryCount = 5;
if (!string.IsNullOrEmpty(configuration["EventBusRetryCount"]))


+ 1
- 1
src/Services/Webhooks/Webhooks.API/Startup.cs View File

@ -336,7 +336,7 @@ namespace Webhooks.API
factory.Password = configuration["EventBusPassword"];
}
factory.VirtualHost = "customisation";
//factory.VirtualHost = "customisation";
var retryCount = 5;
if (!string.IsNullOrEmpty(configuration["EventBusRetryCount"]))


Loading…
Cancel
Save