Merge branch 'refactor-event-bus' into dev
This commit is contained in:
commit
f5c196925a
@ -74,6 +74,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Microsoft.AspNetCore.Health
|
||||
EndProject
|
||||
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Microsoft.Extensions.HealthChecks.SqlServer", "src\BuildingBlocks\HealthChecks\src\Microsoft.Extensions.HealthChecks.SqlServer\Microsoft.Extensions.HealthChecks.SqlServer.csproj", "{4BD76717-3102-4969-8C2C-BAAA3F0263B6}"
|
||||
EndProject
|
||||
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "EventBus.Tests", "src\BuildingBlocks\EventBus\EventBus.Tests\EventBus.Tests.csproj", "{89D80DF1-32E1-4AAF-970F-DA0AA6881F9D}"
|
||||
EndProject
|
||||
Global
|
||||
GlobalSection(SolutionConfigurationPlatforms) = preSolution
|
||||
Ad-Hoc|Any CPU = Ad-Hoc|Any CPU
|
||||
@ -952,6 +954,54 @@ Global
|
||||
{4BD76717-3102-4969-8C2C-BAAA3F0263B6}.Release|x64.Build.0 = Release|Any CPU
|
||||
{4BD76717-3102-4969-8C2C-BAAA3F0263B6}.Release|x86.ActiveCfg = Release|Any CPU
|
||||
{4BD76717-3102-4969-8C2C-BAAA3F0263B6}.Release|x86.Build.0 = Release|Any CPU
|
||||
{89D80DF1-32E1-4AAF-970F-DA0AA6881F9D}.Ad-Hoc|Any CPU.ActiveCfg = Debug|Any CPU
|
||||
{89D80DF1-32E1-4AAF-970F-DA0AA6881F9D}.Ad-Hoc|Any CPU.Build.0 = Debug|Any CPU
|
||||
{89D80DF1-32E1-4AAF-970F-DA0AA6881F9D}.Ad-Hoc|ARM.ActiveCfg = Debug|Any CPU
|
||||
{89D80DF1-32E1-4AAF-970F-DA0AA6881F9D}.Ad-Hoc|ARM.Build.0 = Debug|Any CPU
|
||||
{89D80DF1-32E1-4AAF-970F-DA0AA6881F9D}.Ad-Hoc|iPhone.ActiveCfg = Debug|Any CPU
|
||||
{89D80DF1-32E1-4AAF-970F-DA0AA6881F9D}.Ad-Hoc|iPhone.Build.0 = Debug|Any CPU
|
||||
{89D80DF1-32E1-4AAF-970F-DA0AA6881F9D}.Ad-Hoc|iPhoneSimulator.ActiveCfg = Debug|Any CPU
|
||||
{89D80DF1-32E1-4AAF-970F-DA0AA6881F9D}.Ad-Hoc|iPhoneSimulator.Build.0 = Debug|Any CPU
|
||||
{89D80DF1-32E1-4AAF-970F-DA0AA6881F9D}.Ad-Hoc|x64.ActiveCfg = Debug|Any CPU
|
||||
{89D80DF1-32E1-4AAF-970F-DA0AA6881F9D}.Ad-Hoc|x64.Build.0 = Debug|Any CPU
|
||||
{89D80DF1-32E1-4AAF-970F-DA0AA6881F9D}.Ad-Hoc|x86.ActiveCfg = Debug|Any CPU
|
||||
{89D80DF1-32E1-4AAF-970F-DA0AA6881F9D}.Ad-Hoc|x86.Build.0 = Debug|Any CPU
|
||||
{89D80DF1-32E1-4AAF-970F-DA0AA6881F9D}.AppStore|Any CPU.ActiveCfg = Debug|Any CPU
|
||||
{89D80DF1-32E1-4AAF-970F-DA0AA6881F9D}.AppStore|Any CPU.Build.0 = Debug|Any CPU
|
||||
{89D80DF1-32E1-4AAF-970F-DA0AA6881F9D}.AppStore|ARM.ActiveCfg = Debug|Any CPU
|
||||
{89D80DF1-32E1-4AAF-970F-DA0AA6881F9D}.AppStore|ARM.Build.0 = Debug|Any CPU
|
||||
{89D80DF1-32E1-4AAF-970F-DA0AA6881F9D}.AppStore|iPhone.ActiveCfg = Debug|Any CPU
|
||||
{89D80DF1-32E1-4AAF-970F-DA0AA6881F9D}.AppStore|iPhone.Build.0 = Debug|Any CPU
|
||||
{89D80DF1-32E1-4AAF-970F-DA0AA6881F9D}.AppStore|iPhoneSimulator.ActiveCfg = Debug|Any CPU
|
||||
{89D80DF1-32E1-4AAF-970F-DA0AA6881F9D}.AppStore|iPhoneSimulator.Build.0 = Debug|Any CPU
|
||||
{89D80DF1-32E1-4AAF-970F-DA0AA6881F9D}.AppStore|x64.ActiveCfg = Debug|Any CPU
|
||||
{89D80DF1-32E1-4AAF-970F-DA0AA6881F9D}.AppStore|x64.Build.0 = Debug|Any CPU
|
||||
{89D80DF1-32E1-4AAF-970F-DA0AA6881F9D}.AppStore|x86.ActiveCfg = Debug|Any CPU
|
||||
{89D80DF1-32E1-4AAF-970F-DA0AA6881F9D}.AppStore|x86.Build.0 = Debug|Any CPU
|
||||
{89D80DF1-32E1-4AAF-970F-DA0AA6881F9D}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
|
||||
{89D80DF1-32E1-4AAF-970F-DA0AA6881F9D}.Debug|Any CPU.Build.0 = Debug|Any CPU
|
||||
{89D80DF1-32E1-4AAF-970F-DA0AA6881F9D}.Debug|ARM.ActiveCfg = Debug|Any CPU
|
||||
{89D80DF1-32E1-4AAF-970F-DA0AA6881F9D}.Debug|ARM.Build.0 = Debug|Any CPU
|
||||
{89D80DF1-32E1-4AAF-970F-DA0AA6881F9D}.Debug|iPhone.ActiveCfg = Debug|Any CPU
|
||||
{89D80DF1-32E1-4AAF-970F-DA0AA6881F9D}.Debug|iPhone.Build.0 = Debug|Any CPU
|
||||
{89D80DF1-32E1-4AAF-970F-DA0AA6881F9D}.Debug|iPhoneSimulator.ActiveCfg = Debug|Any CPU
|
||||
{89D80DF1-32E1-4AAF-970F-DA0AA6881F9D}.Debug|iPhoneSimulator.Build.0 = Debug|Any CPU
|
||||
{89D80DF1-32E1-4AAF-970F-DA0AA6881F9D}.Debug|x64.ActiveCfg = Debug|Any CPU
|
||||
{89D80DF1-32E1-4AAF-970F-DA0AA6881F9D}.Debug|x64.Build.0 = Debug|Any CPU
|
||||
{89D80DF1-32E1-4AAF-970F-DA0AA6881F9D}.Debug|x86.ActiveCfg = Debug|Any CPU
|
||||
{89D80DF1-32E1-4AAF-970F-DA0AA6881F9D}.Debug|x86.Build.0 = Debug|Any CPU
|
||||
{89D80DF1-32E1-4AAF-970F-DA0AA6881F9D}.Release|Any CPU.ActiveCfg = Release|Any CPU
|
||||
{89D80DF1-32E1-4AAF-970F-DA0AA6881F9D}.Release|Any CPU.Build.0 = Release|Any CPU
|
||||
{89D80DF1-32E1-4AAF-970F-DA0AA6881F9D}.Release|ARM.ActiveCfg = Release|Any CPU
|
||||
{89D80DF1-32E1-4AAF-970F-DA0AA6881F9D}.Release|ARM.Build.0 = Release|Any CPU
|
||||
{89D80DF1-32E1-4AAF-970F-DA0AA6881F9D}.Release|iPhone.ActiveCfg = Release|Any CPU
|
||||
{89D80DF1-32E1-4AAF-970F-DA0AA6881F9D}.Release|iPhone.Build.0 = Release|Any CPU
|
||||
{89D80DF1-32E1-4AAF-970F-DA0AA6881F9D}.Release|iPhoneSimulator.ActiveCfg = Release|Any CPU
|
||||
{89D80DF1-32E1-4AAF-970F-DA0AA6881F9D}.Release|iPhoneSimulator.Build.0 = Release|Any CPU
|
||||
{89D80DF1-32E1-4AAF-970F-DA0AA6881F9D}.Release|x64.ActiveCfg = Release|Any CPU
|
||||
{89D80DF1-32E1-4AAF-970F-DA0AA6881F9D}.Release|x64.Build.0 = Release|Any CPU
|
||||
{89D80DF1-32E1-4AAF-970F-DA0AA6881F9D}.Release|x86.ActiveCfg = Release|Any CPU
|
||||
{89D80DF1-32E1-4AAF-970F-DA0AA6881F9D}.Release|x86.Build.0 = Release|Any CPU
|
||||
EndGlobalSection
|
||||
GlobalSection(SolutionProperties) = preSolution
|
||||
HideSolutionNode = FALSE
|
||||
@ -987,5 +1037,6 @@ Global
|
||||
{D1C47FF1-91F1-4CAF-9ABB-AD642B821502} = {FBF43D93-F2E7-4FF8-B4AB-186895949B88}
|
||||
{22A0F9C1-2D4A-4107-95B7-8459E6688BC5} = {A81ECBC2-6B00-4DCD-8388-469174033379}
|
||||
{4BD76717-3102-4969-8C2C-BAAA3F0263B6} = {A81ECBC2-6B00-4DCD-8388-469174033379}
|
||||
{89D80DF1-32E1-4AAF-970F-DA0AA6881F9D} = {807BB76E-B2BB-47A2-A57B-3D1B20FF5E7F}
|
||||
EndGlobalSection
|
||||
EndGlobal
|
||||
|
@ -0,0 +1,22 @@
|
||||
<Project Sdk="Microsoft.NET.Sdk">
|
||||
|
||||
<PropertyGroup>
|
||||
<TargetFramework>netcoreapp1.1</TargetFramework>
|
||||
</PropertyGroup>
|
||||
|
||||
<ItemGroup>
|
||||
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="15.0.0" />
|
||||
<PackageReference Include="xunit" Version="2.2.0" />
|
||||
<PackageReference Include="xunit.runner.visualstudio" Version="2.2.0" />
|
||||
</ItemGroup>
|
||||
|
||||
<ItemGroup>
|
||||
<ProjectReference Include="..\EventBusRabbitMQ\EventBusRabbitMQ.csproj" />
|
||||
<ProjectReference Include="..\EventBus\EventBus.csproj" />
|
||||
</ItemGroup>
|
||||
|
||||
<ItemGroup>
|
||||
<Service Include="{82a7f48d-3b50-4b1e-b82e-3ada8210c358}" />
|
||||
</ItemGroup>
|
||||
|
||||
</Project>
|
@ -0,0 +1,56 @@
|
||||
using Microsoft.eShopOnContainers.BuildingBlocks.EventBus;
|
||||
using System;
|
||||
using System.Linq;
|
||||
using Xunit;
|
||||
|
||||
namespace EventBus.Tests
|
||||
{
|
||||
public class InMemory_SubscriptionManager_Tests
|
||||
{
|
||||
[Fact]
|
||||
public void After_Creation_Should_Be_Empty()
|
||||
{
|
||||
var manager = new InMemoryEventBusSubscriptionsManager();
|
||||
Assert.True(manager.IsEmpty);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void After_One_Event_Subscription_Should_Contain_The_Event()
|
||||
{
|
||||
var manager = new InMemoryEventBusSubscriptionsManager();
|
||||
manager.AddSubscription<TestIntegrationEvent,TestIntegrationEventHandler>(() => new TestIntegrationEventHandler());
|
||||
Assert.True(manager.HasSubscriptionsForEvent<TestIntegrationEvent>());
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void After_All_Subscriptions_Are_Deleted_Event_Should_No_Longer_Exists()
|
||||
{
|
||||
var manager = new InMemoryEventBusSubscriptionsManager();
|
||||
manager.AddSubscription<TestIntegrationEvent, TestIntegrationEventHandler>(() => new TestIntegrationEventHandler());
|
||||
manager.RemoveSubscription<TestIntegrationEvent, TestIntegrationEventHandler>();
|
||||
Assert.False(manager.HasSubscriptionsForEvent<TestIntegrationEvent>());
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Deleting_Last_Subscription_Should_Raise_On_Deleted_Event()
|
||||
{
|
||||
bool raised = false;
|
||||
var manager = new InMemoryEventBusSubscriptionsManager();
|
||||
manager.OnEventRemoved += (o, e) => raised = true;
|
||||
manager.AddSubscription<TestIntegrationEvent, TestIntegrationEventHandler>(() => new TestIntegrationEventHandler());
|
||||
manager.RemoveSubscription<TestIntegrationEvent, TestIntegrationEventHandler>();
|
||||
Assert.True(raised);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Get_Handlers_For_Event_Should_Return_All_Handlers()
|
||||
{
|
||||
var manager = new InMemoryEventBusSubscriptionsManager();
|
||||
manager.AddSubscription<TestIntegrationEvent, TestIntegrationEventHandler>(() => new TestIntegrationEventHandler());
|
||||
manager.AddSubscription<TestIntegrationEvent, TestIntegrationOtherEventHandler>(() => new TestIntegrationOtherEventHandler());
|
||||
var handlers = manager.GetHandlersForEvent<TestIntegrationEvent>();
|
||||
Assert.Equal(2, handlers.Count());
|
||||
}
|
||||
|
||||
}
|
||||
}
|
@ -0,0 +1,11 @@
|
||||
using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Events;
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Text;
|
||||
|
||||
namespace EventBus.Tests
|
||||
{
|
||||
public class TestIntegrationEvent : IntegrationEvent
|
||||
{
|
||||
}
|
||||
}
|
@ -0,0 +1,23 @@
|
||||
using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Abstractions;
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Text;
|
||||
using System.Threading.Tasks;
|
||||
|
||||
namespace EventBus.Tests
|
||||
{
|
||||
public class TestIntegrationOtherEventHandler : IIntegrationEventHandler<TestIntegrationEvent>
|
||||
{
|
||||
public bool Handled { get; private set; }
|
||||
|
||||
public TestIntegrationOtherEventHandler()
|
||||
{
|
||||
Handled = false;
|
||||
}
|
||||
|
||||
public async Task Handle(TestIntegrationEvent @event)
|
||||
{
|
||||
Handled = true;
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,23 @@
|
||||
using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Abstractions;
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Text;
|
||||
using System.Threading.Tasks;
|
||||
|
||||
namespace EventBus.Tests
|
||||
{
|
||||
public class TestIntegrationEventHandler : IIntegrationEventHandler<TestIntegrationEvent>
|
||||
{
|
||||
public bool Handled { get; private set; }
|
||||
|
||||
public TestIntegrationEventHandler()
|
||||
{
|
||||
Handled = false;
|
||||
}
|
||||
|
||||
public async Task Handle(TestIntegrationEvent @event)
|
||||
{
|
||||
Handled = true;
|
||||
}
|
||||
}
|
||||
}
|
@ -1,11 +1,17 @@
|
||||
using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Events;
|
||||
using System;
|
||||
|
||||
namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Abstractions
|
||||
{
|
||||
public interface IEventBus
|
||||
{
|
||||
void Subscribe<T>(IIntegrationEventHandler<T> handler) where T: IntegrationEvent;
|
||||
void Unsubscribe<T>(IIntegrationEventHandler<T> handler) where T : IntegrationEvent;
|
||||
void Subscribe<T, TH>(Func<TH> handler)
|
||||
where T : IntegrationEvent
|
||||
where TH : IIntegrationEventHandler<T>;
|
||||
void Unsubscribe<T, TH>()
|
||||
where TH : IIntegrationEventHandler<T>
|
||||
where T : IntegrationEvent;
|
||||
|
||||
void Publish(IntegrationEvent @event);
|
||||
}
|
||||
}
|
||||
|
@ -0,0 +1,26 @@
|
||||
using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Abstractions;
|
||||
using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Events;
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
|
||||
namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBus
|
||||
{
|
||||
public interface IEventBusSubscriptionsManager
|
||||
{
|
||||
bool IsEmpty { get; }
|
||||
event EventHandler<string> OnEventRemoved;
|
||||
void AddSubscription<T, TH>(Func<TH> handler)
|
||||
where T : IntegrationEvent
|
||||
where TH : IIntegrationEventHandler<T>;
|
||||
|
||||
void RemoveSubscription<T, TH>()
|
||||
where TH : IIntegrationEventHandler<T>
|
||||
where T : IntegrationEvent;
|
||||
bool HasSubscriptionsForEvent<T>() where T : IntegrationEvent;
|
||||
bool HasSubscriptionsForEvent(string eventName);
|
||||
Type GetEventTypeByName(string eventName);
|
||||
void Clear();
|
||||
IEnumerable<Delegate> GetHandlersForEvent<T>() where T : IntegrationEvent;
|
||||
IEnumerable<Delegate> GetHandlersForEvent(string eventName);
|
||||
}
|
||||
}
|
@ -0,0 +1,115 @@
|
||||
using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Abstractions;
|
||||
using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Events;
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Linq;
|
||||
using System.Reflection;
|
||||
using System.Text;
|
||||
|
||||
namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBus
|
||||
{
|
||||
public class InMemoryEventBusSubscriptionsManager : IEventBusSubscriptionsManager
|
||||
{
|
||||
private readonly Dictionary<string, List<Delegate>> _handlers;
|
||||
private readonly List<Type> _eventTypes;
|
||||
|
||||
public event EventHandler<string> OnEventRemoved;
|
||||
|
||||
public InMemoryEventBusSubscriptionsManager()
|
||||
{
|
||||
_handlers = new Dictionary<string, List<Delegate>>();
|
||||
_eventTypes = new List<Type>();
|
||||
}
|
||||
|
||||
public bool IsEmpty => !_handlers.Keys.Any();
|
||||
public void Clear() => _handlers.Clear();
|
||||
|
||||
public void AddSubscription<T, TH>(Func<TH> handler)
|
||||
where T : IntegrationEvent
|
||||
where TH : IIntegrationEventHandler<T>
|
||||
{
|
||||
var key = GetEventKey<T>();
|
||||
if (!HasSubscriptionsForEvent<T>())
|
||||
{
|
||||
_handlers.Add(key, new List<Delegate>());
|
||||
}
|
||||
_handlers[key].Add(handler);
|
||||
_eventTypes.Add(typeof(T));
|
||||
}
|
||||
|
||||
public void RemoveSubscription<T, TH>()
|
||||
where TH : IIntegrationEventHandler<T>
|
||||
where T : IntegrationEvent
|
||||
{
|
||||
var handlerToRemove = FindHandlerToRemove<T, TH>();
|
||||
if (handlerToRemove != null)
|
||||
{
|
||||
var key = GetEventKey<T>();
|
||||
_handlers[key].Remove(handlerToRemove);
|
||||
if (!_handlers[key].Any())
|
||||
{
|
||||
_handlers.Remove(key);
|
||||
var eventType = _eventTypes.SingleOrDefault(e => e.Name == key);
|
||||
if (eventType != null)
|
||||
{
|
||||
_eventTypes.Remove(eventType);
|
||||
RaiseOnEventRemoved(eventType.Name);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
public IEnumerable<Delegate> GetHandlersForEvent<T>() where T : IntegrationEvent
|
||||
{
|
||||
var key = GetEventKey<T>();
|
||||
return GetHandlersForEvent(key);
|
||||
}
|
||||
public IEnumerable<Delegate> GetHandlersForEvent(string eventName) => _handlers[eventName];
|
||||
|
||||
private void RaiseOnEventRemoved(string eventName)
|
||||
{
|
||||
var handler = OnEventRemoved;
|
||||
if (handler != null)
|
||||
{
|
||||
OnEventRemoved(this, eventName);
|
||||
}
|
||||
}
|
||||
|
||||
private Delegate FindHandlerToRemove<T, TH>()
|
||||
where T : IntegrationEvent
|
||||
where TH : IIntegrationEventHandler<T>
|
||||
{
|
||||
if (!HasSubscriptionsForEvent<T>())
|
||||
{
|
||||
return null;
|
||||
}
|
||||
|
||||
var key = GetEventKey<T>();
|
||||
foreach (var func in _handlers[key])
|
||||
{
|
||||
var genericArgs = func.GetType().GetGenericArguments();
|
||||
if (genericArgs.SingleOrDefault() == typeof(TH))
|
||||
{
|
||||
return func;
|
||||
}
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
public bool HasSubscriptionsForEvent<T>() where T : IntegrationEvent
|
||||
{
|
||||
var key = GetEventKey<T>();
|
||||
return HasSubscriptionsForEvent(key);
|
||||
}
|
||||
public bool HasSubscriptionsForEvent(string eventName) => _handlers.ContainsKey(eventName);
|
||||
|
||||
public Type GetEventTypeByName(string eventName) => _eventTypes.Single(t => t.Name == eventName);
|
||||
|
||||
private string GetEventKey<T>()
|
||||
{
|
||||
return typeof(T).Name;
|
||||
}
|
||||
}
|
||||
}
|
@ -1,4 +1,5 @@
|
||||
using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Abstractions;
|
||||
using Microsoft.eShopOnContainers.BuildingBlocks.EventBus;
|
||||
using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Abstractions;
|
||||
using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Events;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using Newtonsoft.Json;
|
||||
@ -23,22 +24,41 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ
|
||||
|
||||
private readonly IRabbitMQPersistentConnection _persistentConnection;
|
||||
private readonly ILogger<EventBusRabbitMQ> _logger;
|
||||
|
||||
private readonly Dictionary<string, List<IIntegrationEventHandler>> _handlers
|
||||
= new Dictionary<string, List<IIntegrationEventHandler>>();
|
||||
|
||||
private readonly List<Type> _eventTypes
|
||||
= new List<Type>();
|
||||
private readonly IEventBusSubscriptionsManager _subsManager;
|
||||
|
||||
|
||||
private IModel _consumerChannel;
|
||||
private string _queueName;
|
||||
|
||||
public EventBusRabbitMQ(IRabbitMQPersistentConnection persistentConnection, ILogger<EventBusRabbitMQ> logger)
|
||||
public EventBusRabbitMQ(IRabbitMQPersistentConnection persistentConnection, ILogger<EventBusRabbitMQ> logger, IEventBusSubscriptionsManager subsManager)
|
||||
{
|
||||
_persistentConnection = persistentConnection ?? throw new ArgumentNullException(nameof(persistentConnection));
|
||||
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
|
||||
|
||||
_subsManager = subsManager ?? new InMemoryEventBusSubscriptionsManager();
|
||||
_consumerChannel = CreateConsumerChannel();
|
||||
|
||||
_subsManager.OnEventRemoved += SubsManager_OnEventRemoved;
|
||||
}
|
||||
|
||||
private void SubsManager_OnEventRemoved(object sender, string eventName)
|
||||
{
|
||||
if (!_persistentConnection.IsConnected)
|
||||
{
|
||||
_persistentConnection.TryConnect();
|
||||
}
|
||||
|
||||
using (var channel = _persistentConnection.CreateModel())
|
||||
{
|
||||
channel.QueueUnbind(queue: _queueName,
|
||||
exchange: BROKER_NAME,
|
||||
routingKey: eventName);
|
||||
|
||||
if (_subsManager.IsEmpty)
|
||||
{
|
||||
_queueName = string.Empty;
|
||||
_consumerChannel.Close();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void Publish(IntegrationEvent @event)
|
||||
@ -76,15 +96,13 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ
|
||||
}
|
||||
}
|
||||
|
||||
public void Subscribe<T>(IIntegrationEventHandler<T> handler) where T : IntegrationEvent
|
||||
public void Subscribe<T, TH>(Func<TH> handler)
|
||||
where T : IntegrationEvent
|
||||
where TH : IIntegrationEventHandler<T>
|
||||
{
|
||||
var eventName = typeof(T).Name;
|
||||
|
||||
if (_handlers.ContainsKey(eventName))
|
||||
{
|
||||
_handlers[eventName].Add(handler);
|
||||
}
|
||||
else
|
||||
var containsKey = _subsManager.HasSubscriptionsForEvent<T>();
|
||||
if (!containsKey)
|
||||
{
|
||||
if (!_persistentConnection.IsConnected)
|
||||
{
|
||||
@ -96,55 +114,31 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ
|
||||
channel.QueueBind(queue: _queueName,
|
||||
exchange: BROKER_NAME,
|
||||
routingKey: eventName);
|
||||
|
||||
_handlers.Add(eventName, new List<IIntegrationEventHandler>());
|
||||
_handlers[eventName].Add(handler);
|
||||
_eventTypes.Add(typeof(T));
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
_subsManager.AddSubscription<T, TH>(handler);
|
||||
|
||||
}
|
||||
|
||||
public void Unsubscribe<T>(IIntegrationEventHandler<T> handler) where T : IntegrationEvent
|
||||
public void Unsubscribe<T, TH>()
|
||||
where TH : IIntegrationEventHandler<T>
|
||||
where T : IntegrationEvent
|
||||
{
|
||||
var eventName = typeof(T).Name;
|
||||
_subsManager.RemoveSubscription<T, TH>();
|
||||
}
|
||||
|
||||
if (_handlers.ContainsKey(eventName) && _handlers[eventName].Contains(handler))
|
||||
private static Func<IIntegrationEventHandler> FindHandlerByType(Type handlerType, IEnumerable<Func<IIntegrationEventHandler>> handlers)
|
||||
{
|
||||
foreach (var func in handlers)
|
||||
{
|
||||
_handlers[eventName].Remove(handler);
|
||||
|
||||
if (_handlers[eventName].Count == 0)
|
||||
if (func.GetMethodInfo().ReturnType == handlerType)
|
||||
{
|
||||
_handlers.Remove(eventName);
|
||||
|
||||
var eventType = _eventTypes.SingleOrDefault(e => e.Name == eventName);
|
||||
|
||||
if (eventType != null)
|
||||
{
|
||||
_eventTypes.Remove(eventType);
|
||||
|
||||
if (!_persistentConnection.IsConnected)
|
||||
{
|
||||
_persistentConnection.TryConnect();
|
||||
}
|
||||
|
||||
using (var channel = _persistentConnection.CreateModel())
|
||||
{
|
||||
channel.QueueUnbind(queue: _queueName,
|
||||
exchange: BROKER_NAME,
|
||||
routingKey: eventName);
|
||||
|
||||
if (_handlers.Keys.Count == 0)
|
||||
{
|
||||
_queueName = string.Empty;
|
||||
|
||||
_consumerChannel.Close();
|
||||
}
|
||||
}
|
||||
}
|
||||
return func;
|
||||
}
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
public void Dispose()
|
||||
@ -153,8 +147,8 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ
|
||||
{
|
||||
_consumerChannel.Dispose();
|
||||
}
|
||||
|
||||
_handlers.Clear();
|
||||
|
||||
_subsManager.Clear();
|
||||
}
|
||||
|
||||
private IModel CreateConsumerChannel()
|
||||
@ -195,15 +189,17 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ
|
||||
|
||||
private async Task ProcessEvent(string eventName, string message)
|
||||
{
|
||||
if (_handlers.ContainsKey(eventName))
|
||||
{
|
||||
Type eventType = _eventTypes.Single(t => t.Name == eventName);
|
||||
var integrationEvent = JsonConvert.DeserializeObject(message, eventType);
|
||||
var concreteType = typeof(IIntegrationEventHandler<>).MakeGenericType(eventType);
|
||||
var handlers = _handlers[eventName];
|
||||
|
||||
foreach (var handler in handlers)
|
||||
if (_subsManager.HasSubscriptionsForEvent(eventName))
|
||||
{
|
||||
var eventType = _subsManager.GetEventTypeByName(eventName);
|
||||
var integrationEvent = JsonConvert.DeserializeObject(message, eventType);
|
||||
var handlers = _subsManager.GetHandlersForEvent(eventName);
|
||||
|
||||
foreach (var handlerfactory in handlers)
|
||||
{
|
||||
var handler = handlerfactory.DynamicInvoke();
|
||||
var concreteType = typeof(IIntegrationEventHandler<>).MakeGenericType(eventType);
|
||||
await (Task)concreteType.GetMethod("Handle").Invoke(handler, new object[] { integrationEvent });
|
||||
}
|
||||
}
|
||||
|
@ -3,6 +3,7 @@ using Basket.API.IntegrationEvents.EventHandling;
|
||||
using Basket.API.IntegrationEvents.Events;
|
||||
using Microsoft.AspNetCore.Builder;
|
||||
using Microsoft.AspNetCore.Hosting;
|
||||
using Microsoft.eShopOnContainers.BuildingBlocks.EventBus;
|
||||
using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Abstractions;
|
||||
using Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ;
|
||||
using Microsoft.eShopOnContainers.Services.Basket.API.Auth.Server;
|
||||
@ -19,6 +20,7 @@ using StackExchange.Redis;
|
||||
using System.Linq;
|
||||
using System.Net;
|
||||
using System.Threading.Tasks;
|
||||
using System;
|
||||
|
||||
namespace Microsoft.eShopOnContainers.Services.Basket.API
|
||||
{
|
||||
@ -80,8 +82,6 @@ namespace Microsoft.eShopOnContainers.Services.Basket.API
|
||||
return new DefaultRabbitMQPersistentConnection(factory, logger);
|
||||
});
|
||||
|
||||
services.AddSingleton<IEventBus, EventBusRabbitMQ>();
|
||||
|
||||
services.AddSwaggerGen();
|
||||
|
||||
services.ConfigureSwaggerGen(options =>
|
||||
@ -108,9 +108,16 @@ namespace Microsoft.eShopOnContainers.Services.Basket.API
|
||||
});
|
||||
|
||||
services.AddTransient<IBasketRepository, RedisBasketRepository>();
|
||||
services.AddTransient<IIntegrationEventHandler<ProductPriceChangedIntegrationEvent>, ProductPriceChangedIntegrationEventHandler>();
|
||||
services.AddTransient<IIntegrationEventHandler<OrderStartedIntegrationEvent>, OrderStartedIntegrationEventHandler>();
|
||||
RegisterServiceBus(services);
|
||||
}
|
||||
|
||||
private void RegisterServiceBus(IServiceCollection services)
|
||||
{
|
||||
services.AddSingleton<IEventBus, EventBusRabbitMQ>();
|
||||
services.AddSingleton<IEventBusSubscriptionsManager, InMemoryEventBusSubscriptionsManager>();
|
||||
|
||||
services.AddTransient<ProductPriceChangedIntegrationEventHandler>();
|
||||
services.AddTransient<OrderStartedIntegrationEventHandler>();
|
||||
|
||||
}
|
||||
|
||||
@ -155,11 +162,13 @@ namespace Microsoft.eShopOnContainers.Services.Basket.API
|
||||
var orderStartedHandler = app.ApplicationServices
|
||||
.GetService<IIntegrationEventHandler<OrderStartedIntegrationEvent>>();
|
||||
|
||||
var eventBus = app.ApplicationServices
|
||||
.GetRequiredService<IEventBus>();
|
||||
var eventBus = app.ApplicationServices.GetRequiredService<IEventBus>();
|
||||
|
||||
eventBus.Subscribe<ProductPriceChangedIntegrationEvent>(catalogPriceHandler);
|
||||
eventBus.Subscribe<OrderStartedIntegrationEvent>(orderStartedHandler);
|
||||
eventBus.Subscribe<ProductPriceChangedIntegrationEvent, ProductPriceChangedIntegrationEventHandler>
|
||||
(() => app.ApplicationServices.GetRequiredService<ProductPriceChangedIntegrationEventHandler>());
|
||||
|
||||
eventBus.Subscribe<OrderStartedIntegrationEvent, OrderStartedIntegrationEventHandler>
|
||||
(() => app.ApplicationServices.GetRequiredService<OrderStartedIntegrationEventHandler>());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -6,6 +6,7 @@
|
||||
using Microsoft.AspNetCore.Hosting;
|
||||
using Microsoft.EntityFrameworkCore;
|
||||
using Microsoft.EntityFrameworkCore.Infrastructure;
|
||||
using Microsoft.eShopOnContainers.BuildingBlocks.EventBus;
|
||||
using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Abstractions;
|
||||
using Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ;
|
||||
using Microsoft.eShopOnContainers.BuildingBlocks.IntegrationEventLogEF;
|
||||
@ -115,6 +116,7 @@
|
||||
return new DefaultRabbitMQPersistentConnection(factory, logger);
|
||||
});
|
||||
|
||||
services.AddSingleton<IEventBusSubscriptionsManager, InMemoryEventBusSubscriptionsManager>();
|
||||
services.AddSingleton<IEventBus, EventBusRabbitMQ>();
|
||||
}
|
||||
|
||||
|
@ -13,6 +13,7 @@
|
||||
using Microsoft.AspNetCore.Builder;
|
||||
using Microsoft.AspNetCore.Hosting;
|
||||
using Microsoft.EntityFrameworkCore;
|
||||
using Microsoft.eShopOnContainers.BuildingBlocks.EventBus;
|
||||
using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Abstractions;
|
||||
using Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ;
|
||||
using Microsoft.eShopOnContainers.BuildingBlocks.IntegrationEventLogEF;
|
||||
@ -119,6 +120,7 @@
|
||||
return new DefaultRabbitMQPersistentConnection(factory, logger);
|
||||
});
|
||||
|
||||
services.AddSingleton<IEventBusSubscriptionsManager, InMemoryEventBusSubscriptionsManager>();
|
||||
services.AddSingleton<IEventBus, EventBusRabbitMQ>();
|
||||
|
||||
services.AddOptions();
|
||||
|
Loading…
x
Reference in New Issue
Block a user