Browse Source

EventBus refactor.

Instead to register EventHandlers we register Func<EventHandlers> which solves scope problems (having transient/scoped objects owned by singletons)
pull/181/head
Eduard Tomas 7 years ago
parent
commit
79f8f1b949
13 changed files with 414 additions and 72 deletions
  1. +51
    -0
      eShopOnContainers-ServicesAndWebApps.sln
  2. +22
    -0
      src/BuildingBlocks/EventBus/EventBus.Tests/EventBus.Tests.csproj
  3. +56
    -0
      src/BuildingBlocks/EventBus/EventBus.Tests/InMemory_SubscriptionManager_Tests.cs
  4. +11
    -0
      src/BuildingBlocks/EventBus/EventBus.Tests/TestIntegrationEvent.cs
  5. +23
    -0
      src/BuildingBlocks/EventBus/EventBus.Tests/TestIntegrationEventHandler.cs
  6. +23
    -0
      src/BuildingBlocks/EventBus/EventBus.Tests/TestIntegrationOtherEventHandler.cs
  7. +8
    -2
      src/BuildingBlocks/EventBus/EventBus/Abstractions/IEventBus.cs
  8. +26
    -0
      src/BuildingBlocks/EventBus/EventBus/IEventBusSubscriptionsManager.cs
  9. +115
    -0
      src/BuildingBlocks/EventBus/EventBus/InMemoryEventBusSubscriptionsManager.cs
  10. +58
    -62
      src/BuildingBlocks/EventBus/EventBusRabbitMQ/EventBusRabbitMQ.cs
  11. +17
    -8
      src/Services/Basket/Basket.API/Startup.cs
  12. +2
    -0
      src/Services/Catalog/Catalog.API/Startup.cs
  13. +2
    -0
      src/Services/Ordering/Ordering.API/Startup.cs

+ 51
- 0
eShopOnContainers-ServicesAndWebApps.sln View File

@ -74,6 +74,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Microsoft.AspNetCore.Health
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Microsoft.Extensions.HealthChecks.SqlServer", "src\BuildingBlocks\HealthChecks\src\Microsoft.Extensions.HealthChecks.SqlServer\Microsoft.Extensions.HealthChecks.SqlServer.csproj", "{4BD76717-3102-4969-8C2C-BAAA3F0263B6}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "EventBus.Tests", "src\BuildingBlocks\EventBus\EventBus.Tests\EventBus.Tests.csproj", "{89D80DF1-32E1-4AAF-970F-DA0AA6881F9D}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Ad-Hoc|Any CPU = Ad-Hoc|Any CPU
@ -952,6 +954,54 @@ Global
{4BD76717-3102-4969-8C2C-BAAA3F0263B6}.Release|x64.Build.0 = Release|Any CPU
{4BD76717-3102-4969-8C2C-BAAA3F0263B6}.Release|x86.ActiveCfg = Release|Any CPU
{4BD76717-3102-4969-8C2C-BAAA3F0263B6}.Release|x86.Build.0 = Release|Any CPU
{89D80DF1-32E1-4AAF-970F-DA0AA6881F9D}.Ad-Hoc|Any CPU.ActiveCfg = Debug|Any CPU
{89D80DF1-32E1-4AAF-970F-DA0AA6881F9D}.Ad-Hoc|Any CPU.Build.0 = Debug|Any CPU
{89D80DF1-32E1-4AAF-970F-DA0AA6881F9D}.Ad-Hoc|ARM.ActiveCfg = Debug|Any CPU
{89D80DF1-32E1-4AAF-970F-DA0AA6881F9D}.Ad-Hoc|ARM.Build.0 = Debug|Any CPU
{89D80DF1-32E1-4AAF-970F-DA0AA6881F9D}.Ad-Hoc|iPhone.ActiveCfg = Debug|Any CPU
{89D80DF1-32E1-4AAF-970F-DA0AA6881F9D}.Ad-Hoc|iPhone.Build.0 = Debug|Any CPU
{89D80DF1-32E1-4AAF-970F-DA0AA6881F9D}.Ad-Hoc|iPhoneSimulator.ActiveCfg = Debug|Any CPU
{89D80DF1-32E1-4AAF-970F-DA0AA6881F9D}.Ad-Hoc|iPhoneSimulator.Build.0 = Debug|Any CPU
{89D80DF1-32E1-4AAF-970F-DA0AA6881F9D}.Ad-Hoc|x64.ActiveCfg = Debug|Any CPU
{89D80DF1-32E1-4AAF-970F-DA0AA6881F9D}.Ad-Hoc|x64.Build.0 = Debug|Any CPU
{89D80DF1-32E1-4AAF-970F-DA0AA6881F9D}.Ad-Hoc|x86.ActiveCfg = Debug|Any CPU
{89D80DF1-32E1-4AAF-970F-DA0AA6881F9D}.Ad-Hoc|x86.Build.0 = Debug|Any CPU
{89D80DF1-32E1-4AAF-970F-DA0AA6881F9D}.AppStore|Any CPU.ActiveCfg = Debug|Any CPU
{89D80DF1-32E1-4AAF-970F-DA0AA6881F9D}.AppStore|Any CPU.Build.0 = Debug|Any CPU
{89D80DF1-32E1-4AAF-970F-DA0AA6881F9D}.AppStore|ARM.ActiveCfg = Debug|Any CPU
{89D80DF1-32E1-4AAF-970F-DA0AA6881F9D}.AppStore|ARM.Build.0 = Debug|Any CPU
{89D80DF1-32E1-4AAF-970F-DA0AA6881F9D}.AppStore|iPhone.ActiveCfg = Debug|Any CPU
{89D80DF1-32E1-4AAF-970F-DA0AA6881F9D}.AppStore|iPhone.Build.0 = Debug|Any CPU
{89D80DF1-32E1-4AAF-970F-DA0AA6881F9D}.AppStore|iPhoneSimulator.ActiveCfg = Debug|Any CPU
{89D80DF1-32E1-4AAF-970F-DA0AA6881F9D}.AppStore|iPhoneSimulator.Build.0 = Debug|Any CPU
{89D80DF1-32E1-4AAF-970F-DA0AA6881F9D}.AppStore|x64.ActiveCfg = Debug|Any CPU
{89D80DF1-32E1-4AAF-970F-DA0AA6881F9D}.AppStore|x64.Build.0 = Debug|Any CPU
{89D80DF1-32E1-4AAF-970F-DA0AA6881F9D}.AppStore|x86.ActiveCfg = Debug|Any CPU
{89D80DF1-32E1-4AAF-970F-DA0AA6881F9D}.AppStore|x86.Build.0 = Debug|Any CPU
{89D80DF1-32E1-4AAF-970F-DA0AA6881F9D}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{89D80DF1-32E1-4AAF-970F-DA0AA6881F9D}.Debug|Any CPU.Build.0 = Debug|Any CPU
{89D80DF1-32E1-4AAF-970F-DA0AA6881F9D}.Debug|ARM.ActiveCfg = Debug|Any CPU
{89D80DF1-32E1-4AAF-970F-DA0AA6881F9D}.Debug|ARM.Build.0 = Debug|Any CPU
{89D80DF1-32E1-4AAF-970F-DA0AA6881F9D}.Debug|iPhone.ActiveCfg = Debug|Any CPU
{89D80DF1-32E1-4AAF-970F-DA0AA6881F9D}.Debug|iPhone.Build.0 = Debug|Any CPU
{89D80DF1-32E1-4AAF-970F-DA0AA6881F9D}.Debug|iPhoneSimulator.ActiveCfg = Debug|Any CPU
{89D80DF1-32E1-4AAF-970F-DA0AA6881F9D}.Debug|iPhoneSimulator.Build.0 = Debug|Any CPU
{89D80DF1-32E1-4AAF-970F-DA0AA6881F9D}.Debug|x64.ActiveCfg = Debug|Any CPU
{89D80DF1-32E1-4AAF-970F-DA0AA6881F9D}.Debug|x64.Build.0 = Debug|Any CPU
{89D80DF1-32E1-4AAF-970F-DA0AA6881F9D}.Debug|x86.ActiveCfg = Debug|Any CPU
{89D80DF1-32E1-4AAF-970F-DA0AA6881F9D}.Debug|x86.Build.0 = Debug|Any CPU
{89D80DF1-32E1-4AAF-970F-DA0AA6881F9D}.Release|Any CPU.ActiveCfg = Release|Any CPU
{89D80DF1-32E1-4AAF-970F-DA0AA6881F9D}.Release|Any CPU.Build.0 = Release|Any CPU
{89D80DF1-32E1-4AAF-970F-DA0AA6881F9D}.Release|ARM.ActiveCfg = Release|Any CPU
{89D80DF1-32E1-4AAF-970F-DA0AA6881F9D}.Release|ARM.Build.0 = Release|Any CPU
{89D80DF1-32E1-4AAF-970F-DA0AA6881F9D}.Release|iPhone.ActiveCfg = Release|Any CPU
{89D80DF1-32E1-4AAF-970F-DA0AA6881F9D}.Release|iPhone.Build.0 = Release|Any CPU
{89D80DF1-32E1-4AAF-970F-DA0AA6881F9D}.Release|iPhoneSimulator.ActiveCfg = Release|Any CPU
{89D80DF1-32E1-4AAF-970F-DA0AA6881F9D}.Release|iPhoneSimulator.Build.0 = Release|Any CPU
{89D80DF1-32E1-4AAF-970F-DA0AA6881F9D}.Release|x64.ActiveCfg = Release|Any CPU
{89D80DF1-32E1-4AAF-970F-DA0AA6881F9D}.Release|x64.Build.0 = Release|Any CPU
{89D80DF1-32E1-4AAF-970F-DA0AA6881F9D}.Release|x86.ActiveCfg = Release|Any CPU
{89D80DF1-32E1-4AAF-970F-DA0AA6881F9D}.Release|x86.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
@ -987,5 +1037,6 @@ Global
{D1C47FF1-91F1-4CAF-9ABB-AD642B821502} = {FBF43D93-F2E7-4FF8-B4AB-186895949B88}
{22A0F9C1-2D4A-4107-95B7-8459E6688BC5} = {A81ECBC2-6B00-4DCD-8388-469174033379}
{4BD76717-3102-4969-8C2C-BAAA3F0263B6} = {A81ECBC2-6B00-4DCD-8388-469174033379}
{89D80DF1-32E1-4AAF-970F-DA0AA6881F9D} = {807BB76E-B2BB-47A2-A57B-3D1B20FF5E7F}
EndGlobalSection
EndGlobal

+ 22
- 0
src/BuildingBlocks/EventBus/EventBus.Tests/EventBus.Tests.csproj View File

@ -0,0 +1,22 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFramework>netcoreapp1.1</TargetFramework>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="15.0.0" />
<PackageReference Include="xunit" Version="2.2.0" />
<PackageReference Include="xunit.runner.visualstudio" Version="2.2.0" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\EventBusRabbitMQ\EventBusRabbitMQ.csproj" />
<ProjectReference Include="..\EventBus\EventBus.csproj" />
</ItemGroup>
<ItemGroup>
<Service Include="{82a7f48d-3b50-4b1e-b82e-3ada8210c358}" />
</ItemGroup>
</Project>

+ 56
- 0
src/BuildingBlocks/EventBus/EventBus.Tests/InMemory_SubscriptionManager_Tests.cs View File

@ -0,0 +1,56 @@
using Microsoft.eShopOnContainers.BuildingBlocks.EventBus;
using System;
using System.Linq;
using Xunit;
namespace EventBus.Tests
{
public class InMemory_SubscriptionManager_Tests
{
[Fact]
public void After_Creation_Should_Be_Empty()
{
var manager = new InMemoryEventBusSubscriptionsManager();
Assert.True(manager.IsEmpty);
}
[Fact]
public void After_One_Event_Subscription_Should_Contain_The_Event()
{
var manager = new InMemoryEventBusSubscriptionsManager();
manager.AddSubscription<TestIntegrationEvent,TestIntegrationEventHandler>(() => new TestIntegrationEventHandler());
Assert.True(manager.HasSubscriptionsForEvent<TestIntegrationEvent>());
}
[Fact]
public void After_All_Subscriptions_Are_Deleted_Event_Should_No_Longer_Exists()
{
var manager = new InMemoryEventBusSubscriptionsManager();
manager.AddSubscription<TestIntegrationEvent, TestIntegrationEventHandler>(() => new TestIntegrationEventHandler());
manager.RemoveSubscription<TestIntegrationEvent, TestIntegrationEventHandler>();
Assert.False(manager.HasSubscriptionsForEvent<TestIntegrationEvent>());
}
[Fact]
public void Deleting_Last_Subscription_Should_Raise_On_Deleted_Event()
{
bool raised = false;
var manager = new InMemoryEventBusSubscriptionsManager();
manager.OnEventRemoved += (o, e) => raised = true;
manager.AddSubscription<TestIntegrationEvent, TestIntegrationEventHandler>(() => new TestIntegrationEventHandler());
manager.RemoveSubscription<TestIntegrationEvent, TestIntegrationEventHandler>();
Assert.True(raised);
}
[Fact]
public void Get_Handlers_For_Event_Should_Return_All_Handlers()
{
var manager = new InMemoryEventBusSubscriptionsManager();
manager.AddSubscription<TestIntegrationEvent, TestIntegrationEventHandler>(() => new TestIntegrationEventHandler());
manager.AddSubscription<TestIntegrationEvent, TestIntegrationOtherEventHandler>(() => new TestIntegrationOtherEventHandler());
var handlers = manager.GetHandlersForEvent<TestIntegrationEvent>();
Assert.Equal(2, handlers.Count());
}
}
}

+ 11
- 0
src/BuildingBlocks/EventBus/EventBus.Tests/TestIntegrationEvent.cs View File

@ -0,0 +1,11 @@
using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Events;
using System;
using System.Collections.Generic;
using System.Text;
namespace EventBus.Tests
{
public class TestIntegrationEvent : IntegrationEvent
{
}
}

+ 23
- 0
src/BuildingBlocks/EventBus/EventBus.Tests/TestIntegrationEventHandler.cs View File

@ -0,0 +1,23 @@
using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Abstractions;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
namespace EventBus.Tests
{
public class TestIntegrationOtherEventHandler : IIntegrationEventHandler<TestIntegrationEvent>
{
public bool Handled { get; private set; }
public TestIntegrationOtherEventHandler()
{
Handled = false;
}
public async Task Handle(TestIntegrationEvent @event)
{
Handled = true;
}
}
}

+ 23
- 0
src/BuildingBlocks/EventBus/EventBus.Tests/TestIntegrationOtherEventHandler.cs View File

@ -0,0 +1,23 @@
using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Abstractions;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
namespace EventBus.Tests
{
public class TestIntegrationEventHandler : IIntegrationEventHandler<TestIntegrationEvent>
{
public bool Handled { get; private set; }
public TestIntegrationEventHandler()
{
Handled = false;
}
public async Task Handle(TestIntegrationEvent @event)
{
Handled = true;
}
}
}

+ 8
- 2
src/BuildingBlocks/EventBus/EventBus/Abstractions/IEventBus.cs View File

@ -1,11 +1,17 @@
using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Events;
using System;
namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Abstractions
{
public interface IEventBus
{
void Subscribe<T>(IIntegrationEventHandler<T> handler) where T: IntegrationEvent;
void Unsubscribe<T>(IIntegrationEventHandler<T> handler) where T : IntegrationEvent;
void Subscribe<T, TH>(Func<TH> handler)
where T : IntegrationEvent
where TH : IIntegrationEventHandler<T>;
void Unsubscribe<T, TH>()
where TH : IIntegrationEventHandler<T>
where T : IntegrationEvent;
void Publish(IntegrationEvent @event);
}
}

+ 26
- 0
src/BuildingBlocks/EventBus/EventBus/IEventBusSubscriptionsManager.cs View File

@ -0,0 +1,26 @@
using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Abstractions;
using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Events;
using System;
using System.Collections.Generic;
namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBus
{
public interface IEventBusSubscriptionsManager
{
bool IsEmpty { get; }
event EventHandler<string> OnEventRemoved;
void AddSubscription<T, TH>(Func<TH> handler)
where T : IntegrationEvent
where TH : IIntegrationEventHandler<T>;
void RemoveSubscription<T, TH>()
where TH : IIntegrationEventHandler<T>
where T : IntegrationEvent;
bool HasSubscriptionsForEvent<T>() where T : IntegrationEvent;
bool HasSubscriptionsForEvent(string eventName);
Type GetEventTypeByName(string eventName);
void Clear();
IEnumerable<Delegate> GetHandlersForEvent<T>() where T : IntegrationEvent;
IEnumerable<Delegate> GetHandlersForEvent(string eventName);
}
}

+ 115
- 0
src/BuildingBlocks/EventBus/EventBus/InMemoryEventBusSubscriptionsManager.cs View File

@ -0,0 +1,115 @@
using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Abstractions;
using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Events;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reflection;
using System.Text;
namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBus
{
public class InMemoryEventBusSubscriptionsManager : IEventBusSubscriptionsManager
{
private readonly Dictionary<string, List<Delegate>> _handlers;
private readonly List<Type> _eventTypes;
public event EventHandler<string> OnEventRemoved;
public InMemoryEventBusSubscriptionsManager()
{
_handlers = new Dictionary<string, List<Delegate>>();
_eventTypes = new List<Type>();
}
public bool IsEmpty => !_handlers.Keys.Any();
public void Clear() => _handlers.Clear();
public void AddSubscription<T, TH>(Func<TH> handler)
where T : IntegrationEvent
where TH : IIntegrationEventHandler<T>
{
var key = GetEventKey<T>();
if (!HasSubscriptionsForEvent<T>())
{
_handlers.Add(key, new List<Delegate>());
}
_handlers[key].Add(handler);
_eventTypes.Add(typeof(T));
}
public void RemoveSubscription<T, TH>()
where TH : IIntegrationEventHandler<T>
where T : IntegrationEvent
{
var handlerToRemove = FindHandlerToRemove<T, TH>();
if (handlerToRemove != null)
{
var key = GetEventKey<T>();
_handlers[key].Remove(handlerToRemove);
if (!_handlers[key].Any())
{
_handlers.Remove(key);
var eventType = _eventTypes.SingleOrDefault(e => e.Name == key);
if (eventType != null)
{
_eventTypes.Remove(eventType);
RaiseOnEventRemoved(eventType.Name);
}
}
}
}
public IEnumerable<Delegate> GetHandlersForEvent<T>() where T : IntegrationEvent
{
var key = GetEventKey<T>();
return GetHandlersForEvent(key);
}
public IEnumerable<Delegate> GetHandlersForEvent(string eventName) => _handlers[eventName];
private void RaiseOnEventRemoved(string eventName)
{
var handler = OnEventRemoved;
if (handler != null)
{
OnEventRemoved(this, eventName);
}
}
private Delegate FindHandlerToRemove<T, TH>()
where T : IntegrationEvent
where TH : IIntegrationEventHandler<T>
{
if (!HasSubscriptionsForEvent<T>())
{
return null;
}
var key = GetEventKey<T>();
foreach (var func in _handlers[key])
{
var genericArgs = func.GetType().GetGenericArguments();
if (genericArgs.SingleOrDefault() == typeof(TH))
{
return func;
}
}
return null;
}
public bool HasSubscriptionsForEvent<T>() where T : IntegrationEvent
{
var key = GetEventKey<T>();
return HasSubscriptionsForEvent(key);
}
public bool HasSubscriptionsForEvent(string eventName) => _handlers.ContainsKey(eventName);
public Type GetEventTypeByName(string eventName) => _eventTypes.Single(t => t.Name == eventName);
private string GetEventKey<T>()
{
return typeof(T).Name;
}
}
}

+ 58
- 62
src/BuildingBlocks/EventBus/EventBusRabbitMQ/EventBusRabbitMQ.cs View File

@ -1,4 +1,5 @@
using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Abstractions;
using Microsoft.eShopOnContainers.BuildingBlocks.EventBus;
using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Abstractions;
using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Events;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
@ -23,22 +24,41 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ
private readonly IRabbitMQPersistentConnection _persistentConnection;
private readonly ILogger<EventBusRabbitMQ> _logger;
private readonly Dictionary<string, List<IIntegrationEventHandler>> _handlers
= new Dictionary<string, List<IIntegrationEventHandler>>();
private readonly List<Type> _eventTypes
= new List<Type>();
private readonly IEventBusSubscriptionsManager _subsManager;
private IModel _consumerChannel;
private string _queueName;
public EventBusRabbitMQ(IRabbitMQPersistentConnection persistentConnection, ILogger<EventBusRabbitMQ> logger)
public EventBusRabbitMQ(IRabbitMQPersistentConnection persistentConnection, ILogger<EventBusRabbitMQ> logger, IEventBusSubscriptionsManager subsManager)
{
_persistentConnection = persistentConnection ?? throw new ArgumentNullException(nameof(persistentConnection));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_subsManager = subsManager ?? new InMemoryEventBusSubscriptionsManager();
_consumerChannel = CreateConsumerChannel();
_subsManager.OnEventRemoved += SubsManager_OnEventRemoved;
}
private void SubsManager_OnEventRemoved(object sender, string eventName)
{
if (!_persistentConnection.IsConnected)
{
_persistentConnection.TryConnect();
}
using (var channel = _persistentConnection.CreateModel())
{
channel.QueueUnbind(queue: _queueName,
exchange: BROKER_NAME,
routingKey: eventName);
if (_subsManager.IsEmpty)
{
_queueName = string.Empty;
_consumerChannel.Close();
}
}
}
public void Publish(IntegrationEvent @event)
@ -76,15 +96,13 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ
}
}
public void Subscribe<T>(IIntegrationEventHandler<T> handler) where T : IntegrationEvent
public void Subscribe<T, TH>(Func<TH> handler)
where T : IntegrationEvent
where TH : IIntegrationEventHandler<T>
{
var eventName = typeof(T).Name;
if (_handlers.ContainsKey(eventName))
{
_handlers[eventName].Add(handler);
}
else
var containsKey = _subsManager.HasSubscriptionsForEvent<T>();
if (!containsKey)
{
if (!_persistentConnection.IsConnected)
{
@ -96,55 +114,31 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ
channel.QueueBind(queue: _queueName,
exchange: BROKER_NAME,
routingKey: eventName);
_handlers.Add(eventName, new List<IIntegrationEventHandler>());
_handlers[eventName].Add(handler);
_eventTypes.Add(typeof(T));
}
}
_subsManager.AddSubscription<T, TH>(handler);
}
public void Unsubscribe<T>(IIntegrationEventHandler<T> handler) where T : IntegrationEvent
public void Unsubscribe<T, TH>()
where TH : IIntegrationEventHandler<T>
where T : IntegrationEvent
{
var eventName = typeof(T).Name;
_subsManager.RemoveSubscription<T, TH>();
}
if (_handlers.ContainsKey(eventName) && _handlers[eventName].Contains(handler))
private static Func<IIntegrationEventHandler> FindHandlerByType(Type handlerType, IEnumerable<Func<IIntegrationEventHandler>> handlers)
{
foreach (var func in handlers)
{
_handlers[eventName].Remove(handler);
if (_handlers[eventName].Count == 0)
if (func.GetMethodInfo().ReturnType == handlerType)
{
_handlers.Remove(eventName);
var eventType = _eventTypes.SingleOrDefault(e => e.Name == eventName);
if (eventType != null)
{
_eventTypes.Remove(eventType);
if (!_persistentConnection.IsConnected)
{
_persistentConnection.TryConnect();
}
using (var channel = _persistentConnection.CreateModel())
{
channel.QueueUnbind(queue: _queueName,
exchange: BROKER_NAME,
routingKey: eventName);
if (_handlers.Keys.Count == 0)
{
_queueName = string.Empty;
_consumerChannel.Close();
}
}
}
return func;
}
}
return null;
}
public void Dispose()
@ -153,8 +147,8 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ
{
_consumerChannel.Dispose();
}
_handlers.Clear();
_subsManager.Clear();
}
private IModel CreateConsumerChannel()
@ -195,15 +189,17 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ
private async Task ProcessEvent(string eventName, string message)
{
if (_handlers.ContainsKey(eventName))
{
Type eventType = _eventTypes.Single(t => t.Name == eventName);
if (_subsManager.HasSubscriptionsForEvent(eventName))
{
var eventType = _subsManager.GetEventTypeByName(eventName);
var integrationEvent = JsonConvert.DeserializeObject(message, eventType);
var concreteType = typeof(IIntegrationEventHandler<>).MakeGenericType(eventType);
var handlers = _handlers[eventName];
var handlers = _subsManager.GetHandlersForEvent(eventName);
foreach (var handler in handlers)
foreach (var handlerfactory in handlers)
{
var handler = handlerfactory.DynamicInvoke();
var concreteType = typeof(IIntegrationEventHandler<>).MakeGenericType(eventType);
await (Task)concreteType.GetMethod("Handle").Invoke(handler, new object[] { integrationEvent });
}
}


+ 17
- 8
src/Services/Basket/Basket.API/Startup.cs View File

@ -3,6 +3,7 @@ using Basket.API.IntegrationEvents.EventHandling;
using Basket.API.IntegrationEvents.Events;
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
using Microsoft.eShopOnContainers.BuildingBlocks.EventBus;
using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Abstractions;
using Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ;
using Microsoft.eShopOnContainers.Services.Basket.API.Auth.Server;
@ -19,6 +20,7 @@ using StackExchange.Redis;
using System.Linq;
using System.Net;
using System.Threading.Tasks;
using System;
namespace Microsoft.eShopOnContainers.Services.Basket.API
{
@ -80,8 +82,6 @@ namespace Microsoft.eShopOnContainers.Services.Basket.API
return new DefaultRabbitMQPersistentConnection(factory, logger);
});
services.AddSingleton<IEventBus, EventBusRabbitMQ>();
services.AddSwaggerGen();
services.ConfigureSwaggerGen(options =>
@ -108,9 +108,16 @@ namespace Microsoft.eShopOnContainers.Services.Basket.API
});
services.AddTransient<IBasketRepository, RedisBasketRepository>();
services.AddTransient<IIntegrationEventHandler<ProductPriceChangedIntegrationEvent>, ProductPriceChangedIntegrationEventHandler>();
services.AddTransient<IIntegrationEventHandler<OrderStartedIntegrationEvent>, OrderStartedIntegrationEventHandler>();
RegisterServiceBus(services);
}
private void RegisterServiceBus(IServiceCollection services)
{
services.AddSingleton<IEventBus, EventBusRabbitMQ>();
services.AddSingleton<IEventBusSubscriptionsManager, InMemoryEventBusSubscriptionsManager>();
services.AddTransient<ProductPriceChangedIntegrationEventHandler>();
services.AddTransient<OrderStartedIntegrationEventHandler>();
}
@ -155,11 +162,13 @@ namespace Microsoft.eShopOnContainers.Services.Basket.API
var orderStartedHandler = app.ApplicationServices
.GetService<IIntegrationEventHandler<OrderStartedIntegrationEvent>>();
var eventBus = app.ApplicationServices
.GetRequiredService<IEventBus>();
var eventBus = app.ApplicationServices.GetRequiredService<IEventBus>();
eventBus.Subscribe<ProductPriceChangedIntegrationEvent, ProductPriceChangedIntegrationEventHandler>
(() => app.ApplicationServices.GetRequiredService<ProductPriceChangedIntegrationEventHandler>());
eventBus.Subscribe<ProductPriceChangedIntegrationEvent>(catalogPriceHandler);
eventBus.Subscribe<OrderStartedIntegrationEvent>(orderStartedHandler);
eventBus.Subscribe<OrderStartedIntegrationEvent, OrderStartedIntegrationEventHandler>
(() => app.ApplicationServices.GetRequiredService<OrderStartedIntegrationEventHandler>());
}
}
}

+ 2
- 0
src/Services/Catalog/Catalog.API/Startup.cs View File

@ -6,6 +6,7 @@
using Microsoft.AspNetCore.Hosting;
using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Infrastructure;
using Microsoft.eShopOnContainers.BuildingBlocks.EventBus;
using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Abstractions;
using Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ;
using Microsoft.eShopOnContainers.BuildingBlocks.IntegrationEventLogEF;
@ -115,6 +116,7 @@
return new DefaultRabbitMQPersistentConnection(factory, logger);
});
services.AddSingleton<IEventBusSubscriptionsManager, InMemoryEventBusSubscriptionsManager>();
services.AddSingleton<IEventBus, EventBusRabbitMQ>();
}


+ 2
- 0
src/Services/Ordering/Ordering.API/Startup.cs View File

@ -13,6 +13,7 @@
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
using Microsoft.EntityFrameworkCore;
using Microsoft.eShopOnContainers.BuildingBlocks.EventBus;
using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Abstractions;
using Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ;
using Microsoft.eShopOnContainers.BuildingBlocks.IntegrationEventLogEF;
@ -119,6 +120,7 @@
return new DefaultRabbitMQPersistentConnection(factory, logger);
});
services.AddSingleton<IEventBusSubscriptionsManager, InMemoryEventBusSubscriptionsManager>();
services.AddSingleton<IEventBus, EventBusRabbitMQ>();
services.AddOptions();


Loading…
Cancel
Save