From 79f8f1b949b32c9b5782cc221137d201ec020c19 Mon Sep 17 00:00:00 2001 From: Eduard Tomas Date: Wed, 3 May 2017 10:59:36 +0200 Subject: [PATCH] EventBus refactor. Instead to register EventHandlers we register Func which solves scope problems (having transient/scoped objects owned by singletons) --- eShopOnContainers-ServicesAndWebApps.sln | 51 ++++++++ .../EventBus.Tests/EventBus.Tests.csproj | 22 ++++ .../InMemory_SubscriptionManager_Tests.cs | 56 ++++++++ .../EventBus.Tests/TestIntegrationEvent.cs | 11 ++ .../TestIntegrationEventHandler.cs | 23 ++++ .../TestIntegrationOtherEventHandler.cs | 23 ++++ .../EventBus/Abstractions/IEventBus.cs | 10 +- .../EventBus/IEventBusSubscriptionsManager.cs | 26 ++++ .../InMemoryEventBusSubscriptionsManager.cs | 115 +++++++++++++++++ .../EventBusRabbitMQ/EventBusRabbitMQ.cs | 120 +++++++++--------- src/Services/Basket/Basket.API/Startup.cs | 25 ++-- src/Services/Catalog/Catalog.API/Startup.cs | 2 + src/Services/Ordering/Ordering.API/Startup.cs | 2 + 13 files changed, 414 insertions(+), 72 deletions(-) create mode 100644 src/BuildingBlocks/EventBus/EventBus.Tests/EventBus.Tests.csproj create mode 100644 src/BuildingBlocks/EventBus/EventBus.Tests/InMemory_SubscriptionManager_Tests.cs create mode 100644 src/BuildingBlocks/EventBus/EventBus.Tests/TestIntegrationEvent.cs create mode 100644 src/BuildingBlocks/EventBus/EventBus.Tests/TestIntegrationEventHandler.cs create mode 100644 src/BuildingBlocks/EventBus/EventBus.Tests/TestIntegrationOtherEventHandler.cs create mode 100644 src/BuildingBlocks/EventBus/EventBus/IEventBusSubscriptionsManager.cs create mode 100644 src/BuildingBlocks/EventBus/EventBus/InMemoryEventBusSubscriptionsManager.cs diff --git a/eShopOnContainers-ServicesAndWebApps.sln b/eShopOnContainers-ServicesAndWebApps.sln index 27434ec86..cf2be1c26 100644 --- a/eShopOnContainers-ServicesAndWebApps.sln +++ b/eShopOnContainers-ServicesAndWebApps.sln @@ -74,6 +74,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Microsoft.AspNetCore.Health EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Microsoft.Extensions.HealthChecks.SqlServer", "src\BuildingBlocks\HealthChecks\src\Microsoft.Extensions.HealthChecks.SqlServer\Microsoft.Extensions.HealthChecks.SqlServer.csproj", "{4BD76717-3102-4969-8C2C-BAAA3F0263B6}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "EventBus.Tests", "src\BuildingBlocks\EventBus\EventBus.Tests\EventBus.Tests.csproj", "{89D80DF1-32E1-4AAF-970F-DA0AA6881F9D}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Ad-Hoc|Any CPU = Ad-Hoc|Any CPU @@ -952,6 +954,54 @@ Global {4BD76717-3102-4969-8C2C-BAAA3F0263B6}.Release|x64.Build.0 = Release|Any CPU {4BD76717-3102-4969-8C2C-BAAA3F0263B6}.Release|x86.ActiveCfg = Release|Any CPU {4BD76717-3102-4969-8C2C-BAAA3F0263B6}.Release|x86.Build.0 = Release|Any CPU + {89D80DF1-32E1-4AAF-970F-DA0AA6881F9D}.Ad-Hoc|Any CPU.ActiveCfg = Debug|Any CPU + {89D80DF1-32E1-4AAF-970F-DA0AA6881F9D}.Ad-Hoc|Any CPU.Build.0 = Debug|Any CPU + {89D80DF1-32E1-4AAF-970F-DA0AA6881F9D}.Ad-Hoc|ARM.ActiveCfg = Debug|Any CPU + {89D80DF1-32E1-4AAF-970F-DA0AA6881F9D}.Ad-Hoc|ARM.Build.0 = Debug|Any CPU + {89D80DF1-32E1-4AAF-970F-DA0AA6881F9D}.Ad-Hoc|iPhone.ActiveCfg = Debug|Any CPU + {89D80DF1-32E1-4AAF-970F-DA0AA6881F9D}.Ad-Hoc|iPhone.Build.0 = Debug|Any CPU + {89D80DF1-32E1-4AAF-970F-DA0AA6881F9D}.Ad-Hoc|iPhoneSimulator.ActiveCfg = Debug|Any CPU + {89D80DF1-32E1-4AAF-970F-DA0AA6881F9D}.Ad-Hoc|iPhoneSimulator.Build.0 = Debug|Any CPU + {89D80DF1-32E1-4AAF-970F-DA0AA6881F9D}.Ad-Hoc|x64.ActiveCfg = Debug|Any CPU + {89D80DF1-32E1-4AAF-970F-DA0AA6881F9D}.Ad-Hoc|x64.Build.0 = Debug|Any CPU + {89D80DF1-32E1-4AAF-970F-DA0AA6881F9D}.Ad-Hoc|x86.ActiveCfg = Debug|Any CPU + {89D80DF1-32E1-4AAF-970F-DA0AA6881F9D}.Ad-Hoc|x86.Build.0 = Debug|Any CPU + {89D80DF1-32E1-4AAF-970F-DA0AA6881F9D}.AppStore|Any CPU.ActiveCfg = Debug|Any CPU + {89D80DF1-32E1-4AAF-970F-DA0AA6881F9D}.AppStore|Any CPU.Build.0 = Debug|Any CPU + {89D80DF1-32E1-4AAF-970F-DA0AA6881F9D}.AppStore|ARM.ActiveCfg = Debug|Any CPU + {89D80DF1-32E1-4AAF-970F-DA0AA6881F9D}.AppStore|ARM.Build.0 = Debug|Any CPU + {89D80DF1-32E1-4AAF-970F-DA0AA6881F9D}.AppStore|iPhone.ActiveCfg = Debug|Any CPU + {89D80DF1-32E1-4AAF-970F-DA0AA6881F9D}.AppStore|iPhone.Build.0 = Debug|Any CPU + {89D80DF1-32E1-4AAF-970F-DA0AA6881F9D}.AppStore|iPhoneSimulator.ActiveCfg = Debug|Any CPU + {89D80DF1-32E1-4AAF-970F-DA0AA6881F9D}.AppStore|iPhoneSimulator.Build.0 = Debug|Any CPU + {89D80DF1-32E1-4AAF-970F-DA0AA6881F9D}.AppStore|x64.ActiveCfg = Debug|Any CPU + {89D80DF1-32E1-4AAF-970F-DA0AA6881F9D}.AppStore|x64.Build.0 = Debug|Any CPU + {89D80DF1-32E1-4AAF-970F-DA0AA6881F9D}.AppStore|x86.ActiveCfg = Debug|Any CPU + {89D80DF1-32E1-4AAF-970F-DA0AA6881F9D}.AppStore|x86.Build.0 = Debug|Any CPU + {89D80DF1-32E1-4AAF-970F-DA0AA6881F9D}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {89D80DF1-32E1-4AAF-970F-DA0AA6881F9D}.Debug|Any CPU.Build.0 = Debug|Any CPU + {89D80DF1-32E1-4AAF-970F-DA0AA6881F9D}.Debug|ARM.ActiveCfg = Debug|Any CPU + {89D80DF1-32E1-4AAF-970F-DA0AA6881F9D}.Debug|ARM.Build.0 = Debug|Any CPU + {89D80DF1-32E1-4AAF-970F-DA0AA6881F9D}.Debug|iPhone.ActiveCfg = Debug|Any CPU + {89D80DF1-32E1-4AAF-970F-DA0AA6881F9D}.Debug|iPhone.Build.0 = Debug|Any CPU + {89D80DF1-32E1-4AAF-970F-DA0AA6881F9D}.Debug|iPhoneSimulator.ActiveCfg = Debug|Any CPU + {89D80DF1-32E1-4AAF-970F-DA0AA6881F9D}.Debug|iPhoneSimulator.Build.0 = Debug|Any CPU + {89D80DF1-32E1-4AAF-970F-DA0AA6881F9D}.Debug|x64.ActiveCfg = Debug|Any CPU + {89D80DF1-32E1-4AAF-970F-DA0AA6881F9D}.Debug|x64.Build.0 = Debug|Any CPU + {89D80DF1-32E1-4AAF-970F-DA0AA6881F9D}.Debug|x86.ActiveCfg = Debug|Any CPU + {89D80DF1-32E1-4AAF-970F-DA0AA6881F9D}.Debug|x86.Build.0 = Debug|Any CPU + {89D80DF1-32E1-4AAF-970F-DA0AA6881F9D}.Release|Any CPU.ActiveCfg = Release|Any CPU + {89D80DF1-32E1-4AAF-970F-DA0AA6881F9D}.Release|Any CPU.Build.0 = Release|Any CPU + {89D80DF1-32E1-4AAF-970F-DA0AA6881F9D}.Release|ARM.ActiveCfg = Release|Any CPU + {89D80DF1-32E1-4AAF-970F-DA0AA6881F9D}.Release|ARM.Build.0 = Release|Any CPU + {89D80DF1-32E1-4AAF-970F-DA0AA6881F9D}.Release|iPhone.ActiveCfg = Release|Any CPU + {89D80DF1-32E1-4AAF-970F-DA0AA6881F9D}.Release|iPhone.Build.0 = Release|Any CPU + {89D80DF1-32E1-4AAF-970F-DA0AA6881F9D}.Release|iPhoneSimulator.ActiveCfg = Release|Any CPU + {89D80DF1-32E1-4AAF-970F-DA0AA6881F9D}.Release|iPhoneSimulator.Build.0 = Release|Any CPU + {89D80DF1-32E1-4AAF-970F-DA0AA6881F9D}.Release|x64.ActiveCfg = Release|Any CPU + {89D80DF1-32E1-4AAF-970F-DA0AA6881F9D}.Release|x64.Build.0 = Release|Any CPU + {89D80DF1-32E1-4AAF-970F-DA0AA6881F9D}.Release|x86.ActiveCfg = Release|Any CPU + {89D80DF1-32E1-4AAF-970F-DA0AA6881F9D}.Release|x86.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -987,5 +1037,6 @@ Global {D1C47FF1-91F1-4CAF-9ABB-AD642B821502} = {FBF43D93-F2E7-4FF8-B4AB-186895949B88} {22A0F9C1-2D4A-4107-95B7-8459E6688BC5} = {A81ECBC2-6B00-4DCD-8388-469174033379} {4BD76717-3102-4969-8C2C-BAAA3F0263B6} = {A81ECBC2-6B00-4DCD-8388-469174033379} + {89D80DF1-32E1-4AAF-970F-DA0AA6881F9D} = {807BB76E-B2BB-47A2-A57B-3D1B20FF5E7F} EndGlobalSection EndGlobal diff --git a/src/BuildingBlocks/EventBus/EventBus.Tests/EventBus.Tests.csproj b/src/BuildingBlocks/EventBus/EventBus.Tests/EventBus.Tests.csproj new file mode 100644 index 000000000..1387a74dd --- /dev/null +++ b/src/BuildingBlocks/EventBus/EventBus.Tests/EventBus.Tests.csproj @@ -0,0 +1,22 @@ + + + + netcoreapp1.1 + + + + + + + + + + + + + + + + + + diff --git a/src/BuildingBlocks/EventBus/EventBus.Tests/InMemory_SubscriptionManager_Tests.cs b/src/BuildingBlocks/EventBus/EventBus.Tests/InMemory_SubscriptionManager_Tests.cs new file mode 100644 index 000000000..dd5f7f5b4 --- /dev/null +++ b/src/BuildingBlocks/EventBus/EventBus.Tests/InMemory_SubscriptionManager_Tests.cs @@ -0,0 +1,56 @@ +using Microsoft.eShopOnContainers.BuildingBlocks.EventBus; +using System; +using System.Linq; +using Xunit; + +namespace EventBus.Tests +{ + public class InMemory_SubscriptionManager_Tests + { + [Fact] + public void After_Creation_Should_Be_Empty() + { + var manager = new InMemoryEventBusSubscriptionsManager(); + Assert.True(manager.IsEmpty); + } + + [Fact] + public void After_One_Event_Subscription_Should_Contain_The_Event() + { + var manager = new InMemoryEventBusSubscriptionsManager(); + manager.AddSubscription(() => new TestIntegrationEventHandler()); + Assert.True(manager.HasSubscriptionsForEvent()); + } + + [Fact] + public void After_All_Subscriptions_Are_Deleted_Event_Should_No_Longer_Exists() + { + var manager = new InMemoryEventBusSubscriptionsManager(); + manager.AddSubscription(() => new TestIntegrationEventHandler()); + manager.RemoveSubscription(); + Assert.False(manager.HasSubscriptionsForEvent()); + } + + [Fact] + public void Deleting_Last_Subscription_Should_Raise_On_Deleted_Event() + { + bool raised = false; + var manager = new InMemoryEventBusSubscriptionsManager(); + manager.OnEventRemoved += (o, e) => raised = true; + manager.AddSubscription(() => new TestIntegrationEventHandler()); + manager.RemoveSubscription(); + Assert.True(raised); + } + + [Fact] + public void Get_Handlers_For_Event_Should_Return_All_Handlers() + { + var manager = new InMemoryEventBusSubscriptionsManager(); + manager.AddSubscription(() => new TestIntegrationEventHandler()); + manager.AddSubscription(() => new TestIntegrationOtherEventHandler()); + var handlers = manager.GetHandlersForEvent(); + Assert.Equal(2, handlers.Count()); + } + + } +} diff --git a/src/BuildingBlocks/EventBus/EventBus.Tests/TestIntegrationEvent.cs b/src/BuildingBlocks/EventBus/EventBus.Tests/TestIntegrationEvent.cs new file mode 100644 index 000000000..a77f3ef6f --- /dev/null +++ b/src/BuildingBlocks/EventBus/EventBus.Tests/TestIntegrationEvent.cs @@ -0,0 +1,11 @@ +using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Events; +using System; +using System.Collections.Generic; +using System.Text; + +namespace EventBus.Tests +{ + public class TestIntegrationEvent : IntegrationEvent + { + } +} diff --git a/src/BuildingBlocks/EventBus/EventBus.Tests/TestIntegrationEventHandler.cs b/src/BuildingBlocks/EventBus/EventBus.Tests/TestIntegrationEventHandler.cs new file mode 100644 index 000000000..0b5b793ee --- /dev/null +++ b/src/BuildingBlocks/EventBus/EventBus.Tests/TestIntegrationEventHandler.cs @@ -0,0 +1,23 @@ +using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Abstractions; +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading.Tasks; + +namespace EventBus.Tests +{ + public class TestIntegrationOtherEventHandler : IIntegrationEventHandler + { + public bool Handled { get; private set; } + + public TestIntegrationOtherEventHandler() + { + Handled = false; + } + + public async Task Handle(TestIntegrationEvent @event) + { + Handled = true; + } + } +} diff --git a/src/BuildingBlocks/EventBus/EventBus.Tests/TestIntegrationOtherEventHandler.cs b/src/BuildingBlocks/EventBus/EventBus.Tests/TestIntegrationOtherEventHandler.cs new file mode 100644 index 000000000..72e1ed2cd --- /dev/null +++ b/src/BuildingBlocks/EventBus/EventBus.Tests/TestIntegrationOtherEventHandler.cs @@ -0,0 +1,23 @@ +using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Abstractions; +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading.Tasks; + +namespace EventBus.Tests +{ + public class TestIntegrationEventHandler : IIntegrationEventHandler + { + public bool Handled { get; private set; } + + public TestIntegrationEventHandler() + { + Handled = false; + } + + public async Task Handle(TestIntegrationEvent @event) + { + Handled = true; + } + } +} diff --git a/src/BuildingBlocks/EventBus/EventBus/Abstractions/IEventBus.cs b/src/BuildingBlocks/EventBus/EventBus/Abstractions/IEventBus.cs index 63f9f1b99..9ab7a4499 100644 --- a/src/BuildingBlocks/EventBus/EventBus/Abstractions/IEventBus.cs +++ b/src/BuildingBlocks/EventBus/EventBus/Abstractions/IEventBus.cs @@ -1,11 +1,17 @@ using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Events; +using System; namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Abstractions { public interface IEventBus { - void Subscribe(IIntegrationEventHandler handler) where T: IntegrationEvent; - void Unsubscribe(IIntegrationEventHandler handler) where T : IntegrationEvent; + void Subscribe(Func handler) + where T : IntegrationEvent + where TH : IIntegrationEventHandler; + void Unsubscribe() + where TH : IIntegrationEventHandler + where T : IntegrationEvent; + void Publish(IntegrationEvent @event); } } diff --git a/src/BuildingBlocks/EventBus/EventBus/IEventBusSubscriptionsManager.cs b/src/BuildingBlocks/EventBus/EventBus/IEventBusSubscriptionsManager.cs new file mode 100644 index 000000000..2fdefc039 --- /dev/null +++ b/src/BuildingBlocks/EventBus/EventBus/IEventBusSubscriptionsManager.cs @@ -0,0 +1,26 @@ +using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Abstractions; +using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Events; +using System; +using System.Collections.Generic; + +namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBus +{ + public interface IEventBusSubscriptionsManager + { + bool IsEmpty { get; } + event EventHandler OnEventRemoved; + void AddSubscription(Func handler) + where T : IntegrationEvent + where TH : IIntegrationEventHandler; + + void RemoveSubscription() + where TH : IIntegrationEventHandler + where T : IntegrationEvent; + bool HasSubscriptionsForEvent() where T : IntegrationEvent; + bool HasSubscriptionsForEvent(string eventName); + Type GetEventTypeByName(string eventName); + void Clear(); + IEnumerable GetHandlersForEvent() where T : IntegrationEvent; + IEnumerable GetHandlersForEvent(string eventName); + } +} \ No newline at end of file diff --git a/src/BuildingBlocks/EventBus/EventBus/InMemoryEventBusSubscriptionsManager.cs b/src/BuildingBlocks/EventBus/EventBus/InMemoryEventBusSubscriptionsManager.cs new file mode 100644 index 000000000..11fdba3c5 --- /dev/null +++ b/src/BuildingBlocks/EventBus/EventBus/InMemoryEventBusSubscriptionsManager.cs @@ -0,0 +1,115 @@ +using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Abstractions; +using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Events; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Reflection; +using System.Text; + +namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBus +{ + public class InMemoryEventBusSubscriptionsManager : IEventBusSubscriptionsManager + { + private readonly Dictionary> _handlers; + private readonly List _eventTypes; + + public event EventHandler OnEventRemoved; + + public InMemoryEventBusSubscriptionsManager() + { + _handlers = new Dictionary>(); + _eventTypes = new List(); + } + + public bool IsEmpty => !_handlers.Keys.Any(); + public void Clear() => _handlers.Clear(); + + public void AddSubscription(Func handler) + where T : IntegrationEvent + where TH : IIntegrationEventHandler + { + var key = GetEventKey(); + if (!HasSubscriptionsForEvent()) + { + _handlers.Add(key, new List()); + } + _handlers[key].Add(handler); + _eventTypes.Add(typeof(T)); + } + + public void RemoveSubscription() + where TH : IIntegrationEventHandler + where T : IntegrationEvent + { + var handlerToRemove = FindHandlerToRemove(); + if (handlerToRemove != null) + { + var key = GetEventKey(); + _handlers[key].Remove(handlerToRemove); + if (!_handlers[key].Any()) + { + _handlers.Remove(key); + var eventType = _eventTypes.SingleOrDefault(e => e.Name == key); + if (eventType != null) + { + _eventTypes.Remove(eventType); + RaiseOnEventRemoved(eventType.Name); + } + } + + } + } + + public IEnumerable GetHandlersForEvent() where T : IntegrationEvent + { + var key = GetEventKey(); + return GetHandlersForEvent(key); + } + public IEnumerable GetHandlersForEvent(string eventName) => _handlers[eventName]; + + private void RaiseOnEventRemoved(string eventName) + { + var handler = OnEventRemoved; + if (handler != null) + { + OnEventRemoved(this, eventName); + } + } + + private Delegate FindHandlerToRemove() + where T : IntegrationEvent + where TH : IIntegrationEventHandler + { + if (!HasSubscriptionsForEvent()) + { + return null; + } + + var key = GetEventKey(); + foreach (var func in _handlers[key]) + { + var genericArgs = func.GetType().GetGenericArguments(); + if (genericArgs.SingleOrDefault() == typeof(TH)) + { + return func; + } + } + + return null; + } + + public bool HasSubscriptionsForEvent() where T : IntegrationEvent + { + var key = GetEventKey(); + return HasSubscriptionsForEvent(key); + } + public bool HasSubscriptionsForEvent(string eventName) => _handlers.ContainsKey(eventName); + + public Type GetEventTypeByName(string eventName) => _eventTypes.Single(t => t.Name == eventName); + + private string GetEventKey() + { + return typeof(T).Name; + } + } +} diff --git a/src/BuildingBlocks/EventBus/EventBusRabbitMQ/EventBusRabbitMQ.cs b/src/BuildingBlocks/EventBus/EventBusRabbitMQ/EventBusRabbitMQ.cs index 0eb29b72b..adbc52ad1 100644 --- a/src/BuildingBlocks/EventBus/EventBusRabbitMQ/EventBusRabbitMQ.cs +++ b/src/BuildingBlocks/EventBus/EventBusRabbitMQ/EventBusRabbitMQ.cs @@ -1,4 +1,5 @@ -using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Abstractions; +using Microsoft.eShopOnContainers.BuildingBlocks.EventBus; +using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Abstractions; using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Events; using Microsoft.Extensions.Logging; using Newtonsoft.Json; @@ -23,22 +24,41 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ private readonly IRabbitMQPersistentConnection _persistentConnection; private readonly ILogger _logger; - - private readonly Dictionary> _handlers - = new Dictionary>(); - - private readonly List _eventTypes - = new List(); + private readonly IEventBusSubscriptionsManager _subsManager; + private IModel _consumerChannel; private string _queueName; - public EventBusRabbitMQ(IRabbitMQPersistentConnection persistentConnection, ILogger logger) + public EventBusRabbitMQ(IRabbitMQPersistentConnection persistentConnection, ILogger logger, IEventBusSubscriptionsManager subsManager) { _persistentConnection = persistentConnection ?? throw new ArgumentNullException(nameof(persistentConnection)); _logger = logger ?? throw new ArgumentNullException(nameof(logger)); - + _subsManager = subsManager ?? new InMemoryEventBusSubscriptionsManager(); _consumerChannel = CreateConsumerChannel(); + + _subsManager.OnEventRemoved += SubsManager_OnEventRemoved; + } + + private void SubsManager_OnEventRemoved(object sender, string eventName) + { + if (!_persistentConnection.IsConnected) + { + _persistentConnection.TryConnect(); + } + + using (var channel = _persistentConnection.CreateModel()) + { + channel.QueueUnbind(queue: _queueName, + exchange: BROKER_NAME, + routingKey: eventName); + + if (_subsManager.IsEmpty) + { + _queueName = string.Empty; + _consumerChannel.Close(); + } + } } public void Publish(IntegrationEvent @event) @@ -76,15 +96,13 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ } } - public void Subscribe(IIntegrationEventHandler handler) where T : IntegrationEvent + public void Subscribe(Func handler) + where T : IntegrationEvent + where TH : IIntegrationEventHandler { var eventName = typeof(T).Name; - - if (_handlers.ContainsKey(eventName)) - { - _handlers[eventName].Add(handler); - } - else + var containsKey = _subsManager.HasSubscriptionsForEvent(); + if (!containsKey) { if (!_persistentConnection.IsConnected) { @@ -96,55 +114,31 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ channel.QueueBind(queue: _queueName, exchange: BROKER_NAME, routingKey: eventName); - - _handlers.Add(eventName, new List()); - _handlers[eventName].Add(handler); - _eventTypes.Add(typeof(T)); } - } + _subsManager.AddSubscription(handler); + } - public void Unsubscribe(IIntegrationEventHandler handler) where T : IntegrationEvent + public void Unsubscribe() + where TH : IIntegrationEventHandler + where T : IntegrationEvent { - var eventName = typeof(T).Name; + _subsManager.RemoveSubscription(); + } - if (_handlers.ContainsKey(eventName) && _handlers[eventName].Contains(handler)) + private static Func FindHandlerByType(Type handlerType, IEnumerable> handlers) + { + foreach (var func in handlers) { - _handlers[eventName].Remove(handler); - - if (_handlers[eventName].Count == 0) + if (func.GetMethodInfo().ReturnType == handlerType) { - _handlers.Remove(eventName); - - var eventType = _eventTypes.SingleOrDefault(e => e.Name == eventName); - - if (eventType != null) - { - _eventTypes.Remove(eventType); - - if (!_persistentConnection.IsConnected) - { - _persistentConnection.TryConnect(); - } - - using (var channel = _persistentConnection.CreateModel()) - { - channel.QueueUnbind(queue: _queueName, - exchange: BROKER_NAME, - routingKey: eventName); - - if (_handlers.Keys.Count == 0) - { - _queueName = string.Empty; - - _consumerChannel.Close(); - } - } - } + return func; } } + + return null; } public void Dispose() @@ -153,8 +147,8 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ { _consumerChannel.Dispose(); } - - _handlers.Clear(); + + _subsManager.Clear(); } private IModel CreateConsumerChannel() @@ -195,15 +189,17 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ private async Task ProcessEvent(string eventName, string message) { - if (_handlers.ContainsKey(eventName)) - { - Type eventType = _eventTypes.Single(t => t.Name == eventName); + + if (_subsManager.HasSubscriptionsForEvent(eventName)) + { + var eventType = _subsManager.GetEventTypeByName(eventName); var integrationEvent = JsonConvert.DeserializeObject(message, eventType); - var concreteType = typeof(IIntegrationEventHandler<>).MakeGenericType(eventType); - var handlers = _handlers[eventName]; + var handlers = _subsManager.GetHandlersForEvent(eventName); - foreach (var handler in handlers) + foreach (var handlerfactory in handlers) { + var handler = handlerfactory.DynamicInvoke(); + var concreteType = typeof(IIntegrationEventHandler<>).MakeGenericType(eventType); await (Task)concreteType.GetMethod("Handle").Invoke(handler, new object[] { integrationEvent }); } } diff --git a/src/Services/Basket/Basket.API/Startup.cs b/src/Services/Basket/Basket.API/Startup.cs index da9baac48..855312a65 100644 --- a/src/Services/Basket/Basket.API/Startup.cs +++ b/src/Services/Basket/Basket.API/Startup.cs @@ -3,6 +3,7 @@ using Basket.API.IntegrationEvents.EventHandling; using Basket.API.IntegrationEvents.Events; using Microsoft.AspNetCore.Builder; using Microsoft.AspNetCore.Hosting; +using Microsoft.eShopOnContainers.BuildingBlocks.EventBus; using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Abstractions; using Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ; using Microsoft.eShopOnContainers.Services.Basket.API.Auth.Server; @@ -19,6 +20,7 @@ using StackExchange.Redis; using System.Linq; using System.Net; using System.Threading.Tasks; +using System; namespace Microsoft.eShopOnContainers.Services.Basket.API { @@ -80,8 +82,6 @@ namespace Microsoft.eShopOnContainers.Services.Basket.API return new DefaultRabbitMQPersistentConnection(factory, logger); }); - services.AddSingleton(); - services.AddSwaggerGen(); services.ConfigureSwaggerGen(options => @@ -108,9 +108,16 @@ namespace Microsoft.eShopOnContainers.Services.Basket.API }); services.AddTransient(); - services.AddTransient, ProductPriceChangedIntegrationEventHandler>(); - services.AddTransient, OrderStartedIntegrationEventHandler>(); + RegisterServiceBus(services); + } + private void RegisterServiceBus(IServiceCollection services) + { + services.AddSingleton(); + services.AddSingleton(); + + services.AddTransient(); + services.AddTransient(); } @@ -155,11 +162,13 @@ namespace Microsoft.eShopOnContainers.Services.Basket.API var orderStartedHandler = app.ApplicationServices .GetService>(); - var eventBus = app.ApplicationServices - .GetRequiredService(); + var eventBus = app.ApplicationServices.GetRequiredService(); + + eventBus.Subscribe + (() => app.ApplicationServices.GetRequiredService()); - eventBus.Subscribe(catalogPriceHandler); - eventBus.Subscribe(orderStartedHandler); + eventBus.Subscribe + (() => app.ApplicationServices.GetRequiredService()); } } } diff --git a/src/Services/Catalog/Catalog.API/Startup.cs b/src/Services/Catalog/Catalog.API/Startup.cs index 1c1408b67..9eb195674 100644 --- a/src/Services/Catalog/Catalog.API/Startup.cs +++ b/src/Services/Catalog/Catalog.API/Startup.cs @@ -6,6 +6,7 @@ using Microsoft.AspNetCore.Hosting; using Microsoft.EntityFrameworkCore; using Microsoft.EntityFrameworkCore.Infrastructure; + using Microsoft.eShopOnContainers.BuildingBlocks.EventBus; using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Abstractions; using Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ; using Microsoft.eShopOnContainers.BuildingBlocks.IntegrationEventLogEF; @@ -115,6 +116,7 @@ return new DefaultRabbitMQPersistentConnection(factory, logger); }); + services.AddSingleton(); services.AddSingleton(); } diff --git a/src/Services/Ordering/Ordering.API/Startup.cs b/src/Services/Ordering/Ordering.API/Startup.cs index 6665908a6..58d8f1cbe 100644 --- a/src/Services/Ordering/Ordering.API/Startup.cs +++ b/src/Services/Ordering/Ordering.API/Startup.cs @@ -13,6 +13,7 @@ using Microsoft.AspNetCore.Builder; using Microsoft.AspNetCore.Hosting; using Microsoft.EntityFrameworkCore; + using Microsoft.eShopOnContainers.BuildingBlocks.EventBus; using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Abstractions; using Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ; using Microsoft.eShopOnContainers.BuildingBlocks.IntegrationEventLogEF; @@ -119,6 +120,7 @@ return new DefaultRabbitMQPersistentConnection(factory, logger); }); + services.AddSingleton(); services.AddSingleton(); services.AddOptions();