diff --git a/eShopOnContainers-ServicesAndWebApps.sln b/eShopOnContainers-ServicesAndWebApps.sln
index 27434ec86..cf2be1c26 100644
--- a/eShopOnContainers-ServicesAndWebApps.sln
+++ b/eShopOnContainers-ServicesAndWebApps.sln
@@ -74,6 +74,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Microsoft.AspNetCore.Health
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Microsoft.Extensions.HealthChecks.SqlServer", "src\BuildingBlocks\HealthChecks\src\Microsoft.Extensions.HealthChecks.SqlServer\Microsoft.Extensions.HealthChecks.SqlServer.csproj", "{4BD76717-3102-4969-8C2C-BAAA3F0263B6}"
EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "EventBus.Tests", "src\BuildingBlocks\EventBus\EventBus.Tests\EventBus.Tests.csproj", "{89D80DF1-32E1-4AAF-970F-DA0AA6881F9D}"
+EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Ad-Hoc|Any CPU = Ad-Hoc|Any CPU
@@ -952,6 +954,54 @@ Global
{4BD76717-3102-4969-8C2C-BAAA3F0263B6}.Release|x64.Build.0 = Release|Any CPU
{4BD76717-3102-4969-8C2C-BAAA3F0263B6}.Release|x86.ActiveCfg = Release|Any CPU
{4BD76717-3102-4969-8C2C-BAAA3F0263B6}.Release|x86.Build.0 = Release|Any CPU
+ {89D80DF1-32E1-4AAF-970F-DA0AA6881F9D}.Ad-Hoc|Any CPU.ActiveCfg = Debug|Any CPU
+ {89D80DF1-32E1-4AAF-970F-DA0AA6881F9D}.Ad-Hoc|Any CPU.Build.0 = Debug|Any CPU
+ {89D80DF1-32E1-4AAF-970F-DA0AA6881F9D}.Ad-Hoc|ARM.ActiveCfg = Debug|Any CPU
+ {89D80DF1-32E1-4AAF-970F-DA0AA6881F9D}.Ad-Hoc|ARM.Build.0 = Debug|Any CPU
+ {89D80DF1-32E1-4AAF-970F-DA0AA6881F9D}.Ad-Hoc|iPhone.ActiveCfg = Debug|Any CPU
+ {89D80DF1-32E1-4AAF-970F-DA0AA6881F9D}.Ad-Hoc|iPhone.Build.0 = Debug|Any CPU
+ {89D80DF1-32E1-4AAF-970F-DA0AA6881F9D}.Ad-Hoc|iPhoneSimulator.ActiveCfg = Debug|Any CPU
+ {89D80DF1-32E1-4AAF-970F-DA0AA6881F9D}.Ad-Hoc|iPhoneSimulator.Build.0 = Debug|Any CPU
+ {89D80DF1-32E1-4AAF-970F-DA0AA6881F9D}.Ad-Hoc|x64.ActiveCfg = Debug|Any CPU
+ {89D80DF1-32E1-4AAF-970F-DA0AA6881F9D}.Ad-Hoc|x64.Build.0 = Debug|Any CPU
+ {89D80DF1-32E1-4AAF-970F-DA0AA6881F9D}.Ad-Hoc|x86.ActiveCfg = Debug|Any CPU
+ {89D80DF1-32E1-4AAF-970F-DA0AA6881F9D}.Ad-Hoc|x86.Build.0 = Debug|Any CPU
+ {89D80DF1-32E1-4AAF-970F-DA0AA6881F9D}.AppStore|Any CPU.ActiveCfg = Debug|Any CPU
+ {89D80DF1-32E1-4AAF-970F-DA0AA6881F9D}.AppStore|Any CPU.Build.0 = Debug|Any CPU
+ {89D80DF1-32E1-4AAF-970F-DA0AA6881F9D}.AppStore|ARM.ActiveCfg = Debug|Any CPU
+ {89D80DF1-32E1-4AAF-970F-DA0AA6881F9D}.AppStore|ARM.Build.0 = Debug|Any CPU
+ {89D80DF1-32E1-4AAF-970F-DA0AA6881F9D}.AppStore|iPhone.ActiveCfg = Debug|Any CPU
+ {89D80DF1-32E1-4AAF-970F-DA0AA6881F9D}.AppStore|iPhone.Build.0 = Debug|Any CPU
+ {89D80DF1-32E1-4AAF-970F-DA0AA6881F9D}.AppStore|iPhoneSimulator.ActiveCfg = Debug|Any CPU
+ {89D80DF1-32E1-4AAF-970F-DA0AA6881F9D}.AppStore|iPhoneSimulator.Build.0 = Debug|Any CPU
+ {89D80DF1-32E1-4AAF-970F-DA0AA6881F9D}.AppStore|x64.ActiveCfg = Debug|Any CPU
+ {89D80DF1-32E1-4AAF-970F-DA0AA6881F9D}.AppStore|x64.Build.0 = Debug|Any CPU
+ {89D80DF1-32E1-4AAF-970F-DA0AA6881F9D}.AppStore|x86.ActiveCfg = Debug|Any CPU
+ {89D80DF1-32E1-4AAF-970F-DA0AA6881F9D}.AppStore|x86.Build.0 = Debug|Any CPU
+ {89D80DF1-32E1-4AAF-970F-DA0AA6881F9D}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {89D80DF1-32E1-4AAF-970F-DA0AA6881F9D}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {89D80DF1-32E1-4AAF-970F-DA0AA6881F9D}.Debug|ARM.ActiveCfg = Debug|Any CPU
+ {89D80DF1-32E1-4AAF-970F-DA0AA6881F9D}.Debug|ARM.Build.0 = Debug|Any CPU
+ {89D80DF1-32E1-4AAF-970F-DA0AA6881F9D}.Debug|iPhone.ActiveCfg = Debug|Any CPU
+ {89D80DF1-32E1-4AAF-970F-DA0AA6881F9D}.Debug|iPhone.Build.0 = Debug|Any CPU
+ {89D80DF1-32E1-4AAF-970F-DA0AA6881F9D}.Debug|iPhoneSimulator.ActiveCfg = Debug|Any CPU
+ {89D80DF1-32E1-4AAF-970F-DA0AA6881F9D}.Debug|iPhoneSimulator.Build.0 = Debug|Any CPU
+ {89D80DF1-32E1-4AAF-970F-DA0AA6881F9D}.Debug|x64.ActiveCfg = Debug|Any CPU
+ {89D80DF1-32E1-4AAF-970F-DA0AA6881F9D}.Debug|x64.Build.0 = Debug|Any CPU
+ {89D80DF1-32E1-4AAF-970F-DA0AA6881F9D}.Debug|x86.ActiveCfg = Debug|Any CPU
+ {89D80DF1-32E1-4AAF-970F-DA0AA6881F9D}.Debug|x86.Build.0 = Debug|Any CPU
+ {89D80DF1-32E1-4AAF-970F-DA0AA6881F9D}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {89D80DF1-32E1-4AAF-970F-DA0AA6881F9D}.Release|Any CPU.Build.0 = Release|Any CPU
+ {89D80DF1-32E1-4AAF-970F-DA0AA6881F9D}.Release|ARM.ActiveCfg = Release|Any CPU
+ {89D80DF1-32E1-4AAF-970F-DA0AA6881F9D}.Release|ARM.Build.0 = Release|Any CPU
+ {89D80DF1-32E1-4AAF-970F-DA0AA6881F9D}.Release|iPhone.ActiveCfg = Release|Any CPU
+ {89D80DF1-32E1-4AAF-970F-DA0AA6881F9D}.Release|iPhone.Build.0 = Release|Any CPU
+ {89D80DF1-32E1-4AAF-970F-DA0AA6881F9D}.Release|iPhoneSimulator.ActiveCfg = Release|Any CPU
+ {89D80DF1-32E1-4AAF-970F-DA0AA6881F9D}.Release|iPhoneSimulator.Build.0 = Release|Any CPU
+ {89D80DF1-32E1-4AAF-970F-DA0AA6881F9D}.Release|x64.ActiveCfg = Release|Any CPU
+ {89D80DF1-32E1-4AAF-970F-DA0AA6881F9D}.Release|x64.Build.0 = Release|Any CPU
+ {89D80DF1-32E1-4AAF-970F-DA0AA6881F9D}.Release|x86.ActiveCfg = Release|Any CPU
+ {89D80DF1-32E1-4AAF-970F-DA0AA6881F9D}.Release|x86.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
@@ -987,5 +1037,6 @@ Global
{D1C47FF1-91F1-4CAF-9ABB-AD642B821502} = {FBF43D93-F2E7-4FF8-B4AB-186895949B88}
{22A0F9C1-2D4A-4107-95B7-8459E6688BC5} = {A81ECBC2-6B00-4DCD-8388-469174033379}
{4BD76717-3102-4969-8C2C-BAAA3F0263B6} = {A81ECBC2-6B00-4DCD-8388-469174033379}
+ {89D80DF1-32E1-4AAF-970F-DA0AA6881F9D} = {807BB76E-B2BB-47A2-A57B-3D1B20FF5E7F}
EndGlobalSection
EndGlobal
diff --git a/src/BuildingBlocks/EventBus/EventBus.Tests/EventBus.Tests.csproj b/src/BuildingBlocks/EventBus/EventBus.Tests/EventBus.Tests.csproj
new file mode 100644
index 000000000..1387a74dd
--- /dev/null
+++ b/src/BuildingBlocks/EventBus/EventBus.Tests/EventBus.Tests.csproj
@@ -0,0 +1,22 @@
+
+
+
+ netcoreapp1.1
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/src/BuildingBlocks/EventBus/EventBus.Tests/InMemory_SubscriptionManager_Tests.cs b/src/BuildingBlocks/EventBus/EventBus.Tests/InMemory_SubscriptionManager_Tests.cs
new file mode 100644
index 000000000..dd5f7f5b4
--- /dev/null
+++ b/src/BuildingBlocks/EventBus/EventBus.Tests/InMemory_SubscriptionManager_Tests.cs
@@ -0,0 +1,56 @@
+using Microsoft.eShopOnContainers.BuildingBlocks.EventBus;
+using System;
+using System.Linq;
+using Xunit;
+
+namespace EventBus.Tests
+{
+ public class InMemory_SubscriptionManager_Tests
+ {
+ [Fact]
+ public void After_Creation_Should_Be_Empty()
+ {
+ var manager = new InMemoryEventBusSubscriptionsManager();
+ Assert.True(manager.IsEmpty);
+ }
+
+ [Fact]
+ public void After_One_Event_Subscription_Should_Contain_The_Event()
+ {
+ var manager = new InMemoryEventBusSubscriptionsManager();
+ manager.AddSubscription(() => new TestIntegrationEventHandler());
+ Assert.True(manager.HasSubscriptionsForEvent());
+ }
+
+ [Fact]
+ public void After_All_Subscriptions_Are_Deleted_Event_Should_No_Longer_Exists()
+ {
+ var manager = new InMemoryEventBusSubscriptionsManager();
+ manager.AddSubscription(() => new TestIntegrationEventHandler());
+ manager.RemoveSubscription();
+ Assert.False(manager.HasSubscriptionsForEvent());
+ }
+
+ [Fact]
+ public void Deleting_Last_Subscription_Should_Raise_On_Deleted_Event()
+ {
+ bool raised = false;
+ var manager = new InMemoryEventBusSubscriptionsManager();
+ manager.OnEventRemoved += (o, e) => raised = true;
+ manager.AddSubscription(() => new TestIntegrationEventHandler());
+ manager.RemoveSubscription();
+ Assert.True(raised);
+ }
+
+ [Fact]
+ public void Get_Handlers_For_Event_Should_Return_All_Handlers()
+ {
+ var manager = new InMemoryEventBusSubscriptionsManager();
+ manager.AddSubscription(() => new TestIntegrationEventHandler());
+ manager.AddSubscription(() => new TestIntegrationOtherEventHandler());
+ var handlers = manager.GetHandlersForEvent();
+ Assert.Equal(2, handlers.Count());
+ }
+
+ }
+}
diff --git a/src/BuildingBlocks/EventBus/EventBus.Tests/TestIntegrationEvent.cs b/src/BuildingBlocks/EventBus/EventBus.Tests/TestIntegrationEvent.cs
new file mode 100644
index 000000000..a77f3ef6f
--- /dev/null
+++ b/src/BuildingBlocks/EventBus/EventBus.Tests/TestIntegrationEvent.cs
@@ -0,0 +1,11 @@
+using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Events;
+using System;
+using System.Collections.Generic;
+using System.Text;
+
+namespace EventBus.Tests
+{
+ public class TestIntegrationEvent : IntegrationEvent
+ {
+ }
+}
diff --git a/src/BuildingBlocks/EventBus/EventBus.Tests/TestIntegrationEventHandler.cs b/src/BuildingBlocks/EventBus/EventBus.Tests/TestIntegrationEventHandler.cs
new file mode 100644
index 000000000..0b5b793ee
--- /dev/null
+++ b/src/BuildingBlocks/EventBus/EventBus.Tests/TestIntegrationEventHandler.cs
@@ -0,0 +1,23 @@
+using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Abstractions;
+using System;
+using System.Collections.Generic;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace EventBus.Tests
+{
+ public class TestIntegrationOtherEventHandler : IIntegrationEventHandler
+ {
+ public bool Handled { get; private set; }
+
+ public TestIntegrationOtherEventHandler()
+ {
+ Handled = false;
+ }
+
+ public async Task Handle(TestIntegrationEvent @event)
+ {
+ Handled = true;
+ }
+ }
+}
diff --git a/src/BuildingBlocks/EventBus/EventBus.Tests/TestIntegrationOtherEventHandler.cs b/src/BuildingBlocks/EventBus/EventBus.Tests/TestIntegrationOtherEventHandler.cs
new file mode 100644
index 000000000..72e1ed2cd
--- /dev/null
+++ b/src/BuildingBlocks/EventBus/EventBus.Tests/TestIntegrationOtherEventHandler.cs
@@ -0,0 +1,23 @@
+using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Abstractions;
+using System;
+using System.Collections.Generic;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace EventBus.Tests
+{
+ public class TestIntegrationEventHandler : IIntegrationEventHandler
+ {
+ public bool Handled { get; private set; }
+
+ public TestIntegrationEventHandler()
+ {
+ Handled = false;
+ }
+
+ public async Task Handle(TestIntegrationEvent @event)
+ {
+ Handled = true;
+ }
+ }
+}
diff --git a/src/BuildingBlocks/EventBus/EventBus/Abstractions/IEventBus.cs b/src/BuildingBlocks/EventBus/EventBus/Abstractions/IEventBus.cs
index 63f9f1b99..9ab7a4499 100644
--- a/src/BuildingBlocks/EventBus/EventBus/Abstractions/IEventBus.cs
+++ b/src/BuildingBlocks/EventBus/EventBus/Abstractions/IEventBus.cs
@@ -1,11 +1,17 @@
using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Events;
+using System;
namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Abstractions
{
public interface IEventBus
{
- void Subscribe(IIntegrationEventHandler handler) where T: IntegrationEvent;
- void Unsubscribe(IIntegrationEventHandler handler) where T : IntegrationEvent;
+ void Subscribe(Func handler)
+ where T : IntegrationEvent
+ where TH : IIntegrationEventHandler;
+ void Unsubscribe()
+ where TH : IIntegrationEventHandler
+ where T : IntegrationEvent;
+
void Publish(IntegrationEvent @event);
}
}
diff --git a/src/BuildingBlocks/EventBus/EventBus/IEventBusSubscriptionsManager.cs b/src/BuildingBlocks/EventBus/EventBus/IEventBusSubscriptionsManager.cs
new file mode 100644
index 000000000..2fdefc039
--- /dev/null
+++ b/src/BuildingBlocks/EventBus/EventBus/IEventBusSubscriptionsManager.cs
@@ -0,0 +1,26 @@
+using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Abstractions;
+using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Events;
+using System;
+using System.Collections.Generic;
+
+namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBus
+{
+ public interface IEventBusSubscriptionsManager
+ {
+ bool IsEmpty { get; }
+ event EventHandler OnEventRemoved;
+ void AddSubscription(Func handler)
+ where T : IntegrationEvent
+ where TH : IIntegrationEventHandler;
+
+ void RemoveSubscription()
+ where TH : IIntegrationEventHandler
+ where T : IntegrationEvent;
+ bool HasSubscriptionsForEvent() where T : IntegrationEvent;
+ bool HasSubscriptionsForEvent(string eventName);
+ Type GetEventTypeByName(string eventName);
+ void Clear();
+ IEnumerable GetHandlersForEvent() where T : IntegrationEvent;
+ IEnumerable GetHandlersForEvent(string eventName);
+ }
+}
\ No newline at end of file
diff --git a/src/BuildingBlocks/EventBus/EventBus/InMemoryEventBusSubscriptionsManager.cs b/src/BuildingBlocks/EventBus/EventBus/InMemoryEventBusSubscriptionsManager.cs
new file mode 100644
index 000000000..11fdba3c5
--- /dev/null
+++ b/src/BuildingBlocks/EventBus/EventBus/InMemoryEventBusSubscriptionsManager.cs
@@ -0,0 +1,115 @@
+using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Abstractions;
+using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Events;
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Reflection;
+using System.Text;
+
+namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBus
+{
+ public class InMemoryEventBusSubscriptionsManager : IEventBusSubscriptionsManager
+ {
+ private readonly Dictionary> _handlers;
+ private readonly List _eventTypes;
+
+ public event EventHandler OnEventRemoved;
+
+ public InMemoryEventBusSubscriptionsManager()
+ {
+ _handlers = new Dictionary>();
+ _eventTypes = new List();
+ }
+
+ public bool IsEmpty => !_handlers.Keys.Any();
+ public void Clear() => _handlers.Clear();
+
+ public void AddSubscription(Func handler)
+ where T : IntegrationEvent
+ where TH : IIntegrationEventHandler
+ {
+ var key = GetEventKey();
+ if (!HasSubscriptionsForEvent())
+ {
+ _handlers.Add(key, new List());
+ }
+ _handlers[key].Add(handler);
+ _eventTypes.Add(typeof(T));
+ }
+
+ public void RemoveSubscription()
+ where TH : IIntegrationEventHandler
+ where T : IntegrationEvent
+ {
+ var handlerToRemove = FindHandlerToRemove();
+ if (handlerToRemove != null)
+ {
+ var key = GetEventKey();
+ _handlers[key].Remove(handlerToRemove);
+ if (!_handlers[key].Any())
+ {
+ _handlers.Remove(key);
+ var eventType = _eventTypes.SingleOrDefault(e => e.Name == key);
+ if (eventType != null)
+ {
+ _eventTypes.Remove(eventType);
+ RaiseOnEventRemoved(eventType.Name);
+ }
+ }
+
+ }
+ }
+
+ public IEnumerable GetHandlersForEvent() where T : IntegrationEvent
+ {
+ var key = GetEventKey();
+ return GetHandlersForEvent(key);
+ }
+ public IEnumerable GetHandlersForEvent(string eventName) => _handlers[eventName];
+
+ private void RaiseOnEventRemoved(string eventName)
+ {
+ var handler = OnEventRemoved;
+ if (handler != null)
+ {
+ OnEventRemoved(this, eventName);
+ }
+ }
+
+ private Delegate FindHandlerToRemove()
+ where T : IntegrationEvent
+ where TH : IIntegrationEventHandler
+ {
+ if (!HasSubscriptionsForEvent())
+ {
+ return null;
+ }
+
+ var key = GetEventKey();
+ foreach (var func in _handlers[key])
+ {
+ var genericArgs = func.GetType().GetGenericArguments();
+ if (genericArgs.SingleOrDefault() == typeof(TH))
+ {
+ return func;
+ }
+ }
+
+ return null;
+ }
+
+ public bool HasSubscriptionsForEvent() where T : IntegrationEvent
+ {
+ var key = GetEventKey();
+ return HasSubscriptionsForEvent(key);
+ }
+ public bool HasSubscriptionsForEvent(string eventName) => _handlers.ContainsKey(eventName);
+
+ public Type GetEventTypeByName(string eventName) => _eventTypes.Single(t => t.Name == eventName);
+
+ private string GetEventKey()
+ {
+ return typeof(T).Name;
+ }
+ }
+}
diff --git a/src/BuildingBlocks/EventBus/EventBusRabbitMQ/EventBusRabbitMQ.cs b/src/BuildingBlocks/EventBus/EventBusRabbitMQ/EventBusRabbitMQ.cs
index 0eb29b72b..adbc52ad1 100644
--- a/src/BuildingBlocks/EventBus/EventBusRabbitMQ/EventBusRabbitMQ.cs
+++ b/src/BuildingBlocks/EventBus/EventBusRabbitMQ/EventBusRabbitMQ.cs
@@ -1,4 +1,5 @@
-using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Abstractions;
+using Microsoft.eShopOnContainers.BuildingBlocks.EventBus;
+using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Abstractions;
using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Events;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
@@ -23,22 +24,41 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ
private readonly IRabbitMQPersistentConnection _persistentConnection;
private readonly ILogger _logger;
-
- private readonly Dictionary> _handlers
- = new Dictionary>();
-
- private readonly List _eventTypes
- = new List();
+ private readonly IEventBusSubscriptionsManager _subsManager;
+
private IModel _consumerChannel;
private string _queueName;
- public EventBusRabbitMQ(IRabbitMQPersistentConnection persistentConnection, ILogger logger)
+ public EventBusRabbitMQ(IRabbitMQPersistentConnection persistentConnection, ILogger logger, IEventBusSubscriptionsManager subsManager)
{
_persistentConnection = persistentConnection ?? throw new ArgumentNullException(nameof(persistentConnection));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
-
+ _subsManager = subsManager ?? new InMemoryEventBusSubscriptionsManager();
_consumerChannel = CreateConsumerChannel();
+
+ _subsManager.OnEventRemoved += SubsManager_OnEventRemoved;
+ }
+
+ private void SubsManager_OnEventRemoved(object sender, string eventName)
+ {
+ if (!_persistentConnection.IsConnected)
+ {
+ _persistentConnection.TryConnect();
+ }
+
+ using (var channel = _persistentConnection.CreateModel())
+ {
+ channel.QueueUnbind(queue: _queueName,
+ exchange: BROKER_NAME,
+ routingKey: eventName);
+
+ if (_subsManager.IsEmpty)
+ {
+ _queueName = string.Empty;
+ _consumerChannel.Close();
+ }
+ }
}
public void Publish(IntegrationEvent @event)
@@ -76,15 +96,13 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ
}
}
- public void Subscribe(IIntegrationEventHandler handler) where T : IntegrationEvent
+ public void Subscribe(Func handler)
+ where T : IntegrationEvent
+ where TH : IIntegrationEventHandler
{
var eventName = typeof(T).Name;
-
- if (_handlers.ContainsKey(eventName))
- {
- _handlers[eventName].Add(handler);
- }
- else
+ var containsKey = _subsManager.HasSubscriptionsForEvent();
+ if (!containsKey)
{
if (!_persistentConnection.IsConnected)
{
@@ -96,55 +114,31 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ
channel.QueueBind(queue: _queueName,
exchange: BROKER_NAME,
routingKey: eventName);
-
- _handlers.Add(eventName, new List());
- _handlers[eventName].Add(handler);
- _eventTypes.Add(typeof(T));
}
-
}
+ _subsManager.AddSubscription(handler);
+
}
- public void Unsubscribe(IIntegrationEventHandler handler) where T : IntegrationEvent
+ public void Unsubscribe()
+ where TH : IIntegrationEventHandler
+ where T : IntegrationEvent
{
- var eventName = typeof(T).Name;
+ _subsManager.RemoveSubscription();
+ }
- if (_handlers.ContainsKey(eventName) && _handlers[eventName].Contains(handler))
+ private static Func FindHandlerByType(Type handlerType, IEnumerable> handlers)
+ {
+ foreach (var func in handlers)
{
- _handlers[eventName].Remove(handler);
-
- if (_handlers[eventName].Count == 0)
+ if (func.GetMethodInfo().ReturnType == handlerType)
{
- _handlers.Remove(eventName);
-
- var eventType = _eventTypes.SingleOrDefault(e => e.Name == eventName);
-
- if (eventType != null)
- {
- _eventTypes.Remove(eventType);
-
- if (!_persistentConnection.IsConnected)
- {
- _persistentConnection.TryConnect();
- }
-
- using (var channel = _persistentConnection.CreateModel())
- {
- channel.QueueUnbind(queue: _queueName,
- exchange: BROKER_NAME,
- routingKey: eventName);
-
- if (_handlers.Keys.Count == 0)
- {
- _queueName = string.Empty;
-
- _consumerChannel.Close();
- }
- }
- }
+ return func;
}
}
+
+ return null;
}
public void Dispose()
@@ -153,8 +147,8 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ
{
_consumerChannel.Dispose();
}
-
- _handlers.Clear();
+
+ _subsManager.Clear();
}
private IModel CreateConsumerChannel()
@@ -195,15 +189,17 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ
private async Task ProcessEvent(string eventName, string message)
{
- if (_handlers.ContainsKey(eventName))
- {
- Type eventType = _eventTypes.Single(t => t.Name == eventName);
+
+ if (_subsManager.HasSubscriptionsForEvent(eventName))
+ {
+ var eventType = _subsManager.GetEventTypeByName(eventName);
var integrationEvent = JsonConvert.DeserializeObject(message, eventType);
- var concreteType = typeof(IIntegrationEventHandler<>).MakeGenericType(eventType);
- var handlers = _handlers[eventName];
+ var handlers = _subsManager.GetHandlersForEvent(eventName);
- foreach (var handler in handlers)
+ foreach (var handlerfactory in handlers)
{
+ var handler = handlerfactory.DynamicInvoke();
+ var concreteType = typeof(IIntegrationEventHandler<>).MakeGenericType(eventType);
await (Task)concreteType.GetMethod("Handle").Invoke(handler, new object[] { integrationEvent });
}
}
diff --git a/src/Services/Basket/Basket.API/Startup.cs b/src/Services/Basket/Basket.API/Startup.cs
index da9baac48..855312a65 100644
--- a/src/Services/Basket/Basket.API/Startup.cs
+++ b/src/Services/Basket/Basket.API/Startup.cs
@@ -3,6 +3,7 @@ using Basket.API.IntegrationEvents.EventHandling;
using Basket.API.IntegrationEvents.Events;
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
+using Microsoft.eShopOnContainers.BuildingBlocks.EventBus;
using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Abstractions;
using Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ;
using Microsoft.eShopOnContainers.Services.Basket.API.Auth.Server;
@@ -19,6 +20,7 @@ using StackExchange.Redis;
using System.Linq;
using System.Net;
using System.Threading.Tasks;
+using System;
namespace Microsoft.eShopOnContainers.Services.Basket.API
{
@@ -80,8 +82,6 @@ namespace Microsoft.eShopOnContainers.Services.Basket.API
return new DefaultRabbitMQPersistentConnection(factory, logger);
});
- services.AddSingleton();
-
services.AddSwaggerGen();
services.ConfigureSwaggerGen(options =>
@@ -108,9 +108,16 @@ namespace Microsoft.eShopOnContainers.Services.Basket.API
});
services.AddTransient();
- services.AddTransient, ProductPriceChangedIntegrationEventHandler>();
- services.AddTransient, OrderStartedIntegrationEventHandler>();
+ RegisterServiceBus(services);
+ }
+ private void RegisterServiceBus(IServiceCollection services)
+ {
+ services.AddSingleton();
+ services.AddSingleton();
+
+ services.AddTransient();
+ services.AddTransient();
}
@@ -155,11 +162,13 @@ namespace Microsoft.eShopOnContainers.Services.Basket.API
var orderStartedHandler = app.ApplicationServices
.GetService>();
- var eventBus = app.ApplicationServices
- .GetRequiredService();
+ var eventBus = app.ApplicationServices.GetRequiredService();
+
+ eventBus.Subscribe
+ (() => app.ApplicationServices.GetRequiredService());
- eventBus.Subscribe(catalogPriceHandler);
- eventBus.Subscribe(orderStartedHandler);
+ eventBus.Subscribe
+ (() => app.ApplicationServices.GetRequiredService());
}
}
}
diff --git a/src/Services/Catalog/Catalog.API/Startup.cs b/src/Services/Catalog/Catalog.API/Startup.cs
index 1c1408b67..9eb195674 100644
--- a/src/Services/Catalog/Catalog.API/Startup.cs
+++ b/src/Services/Catalog/Catalog.API/Startup.cs
@@ -6,6 +6,7 @@
using Microsoft.AspNetCore.Hosting;
using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Infrastructure;
+ using Microsoft.eShopOnContainers.BuildingBlocks.EventBus;
using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Abstractions;
using Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ;
using Microsoft.eShopOnContainers.BuildingBlocks.IntegrationEventLogEF;
@@ -115,6 +116,7 @@
return new DefaultRabbitMQPersistentConnection(factory, logger);
});
+ services.AddSingleton();
services.AddSingleton();
}
diff --git a/src/Services/Ordering/Ordering.API/Startup.cs b/src/Services/Ordering/Ordering.API/Startup.cs
index 6665908a6..58d8f1cbe 100644
--- a/src/Services/Ordering/Ordering.API/Startup.cs
+++ b/src/Services/Ordering/Ordering.API/Startup.cs
@@ -13,6 +13,7 @@
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
using Microsoft.EntityFrameworkCore;
+ using Microsoft.eShopOnContainers.BuildingBlocks.EventBus;
using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Abstractions;
using Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ;
using Microsoft.eShopOnContainers.BuildingBlocks.IntegrationEventLogEF;
@@ -119,6 +120,7 @@
return new DefaultRabbitMQPersistentConnection(factory, logger);
});
+ services.AddSingleton();
services.AddSingleton();
services.AddOptions();
| | | |