@ -1,4 +1,5 @@
using Microsoft.eShopOnContainers.BuildingBlocks.EventBus ;
using Autofac ;
using Microsoft.eShopOnContainers.BuildingBlocks.EventBus ;
using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Abstractions ;
using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Abstractions ;
using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Events ;
using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Events ;
using Microsoft.Extensions.Logging ;
using Microsoft.Extensions.Logging ;
@ -26,17 +27,20 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ
private readonly IRabbitMQPersistentConnection _persistentConnection ;
private readonly IRabbitMQPersistentConnection _persistentConnection ;
private readonly ILogger < EventBusRabbitMQ > _logger ;
private readonly ILogger < EventBusRabbitMQ > _logger ;
private readonly IEventBusSubscriptionsManager _subsManager ;
private readonly IEventBusSubscriptionsManager _subsManager ;
private readonly ILifetimeScope _autofac ;
private readonly string AUTOFAC_SCOPE_NAME = "eshop_event_bus" ;
private IModel _consumerChannel ;
private IModel _consumerChannel ;
private string _queueName ;
private string _queueName ;
public EventBusRabbitMQ ( IRabbitMQPersistentConnection persistentConnection , ILogger < EventBusRabbitMQ > logger , IEventBusSubscriptionsManager subsManager )
public EventBusRabbitMQ ( IRabbitMQPersistentConnection persistentConnection , ILogger < EventBusRabbitMQ > logger ,
ILifetimeScope autofac , IEventBusSubscriptionsManager subsManager )
{
{
_persistentConnection = persistentConnection ? ? throw new ArgumentNullException ( nameof ( persistentConnection ) ) ;
_persistentConnection = persistentConnection ? ? throw new ArgumentNullException ( nameof ( persistentConnection ) ) ;
_logger = logger ? ? throw new ArgumentNullException ( nameof ( logger ) ) ;
_logger = logger ? ? throw new ArgumentNullException ( nameof ( logger ) ) ;
_subsManager = subsManager ? ? new InMemoryEventBusSubscriptionsManager ( ) ;
_subsManager = subsManager ? ? new InMemoryEventBusSubscriptionsManager ( ) ;
_consumerChannel = CreateConsumerChannel ( ) ;
_consumerChannel = CreateConsumerChannel ( ) ;
_autofac = autofac ;
_subsManager . OnEventRemoved + = SubsManager_OnEventRemoved ;
_subsManager . OnEventRemoved + = SubsManager_OnEventRemoved ;
}
}
@ -97,20 +101,20 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ
}
}
}
}
public void SubscribeDynamic < TH > ( string eventName , Func < TH > handler )
where TH : IDynamicIntegrationEventHandler
public void SubscribeDynamic < TH > ( string eventName )
where TH : IDynamicIntegrationEventHandler
{
{
DoInternalSubscription ( eventName ) ;
DoInternalSubscription ( eventName ) ;
_subsManager . AddDynamicSubscription < TH > ( eventName , handler ) ;
_subsManager . AddDynamicSubscription < TH > ( eventName ) ;
}
}
public void Subscribe < T , TH > ( Func < TH > handler )
public void Subscribe < T , TH > ( )
where T : IntegrationEvent
where T : IntegrationEvent
where TH : IIntegrationEventHandler < T >
where TH : IIntegrationEventHandler < T >
{
{
var eventName = _subsManager . GetEventKey < T > ( ) ;
var eventName = _subsManager . GetEventKey < T > ( ) ;
DoInternalSubscription ( eventName ) ;
DoInternalSubscription ( eventName ) ;
_subsManager . AddSubscription < T , TH > ( handler ) ;
_subsManager . AddSubscription < T , TH > ( ) ;
}
}
private void DoInternalSubscription ( string eventName )
private void DoInternalSubscription ( string eventName )
@ -140,7 +144,7 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ
}
}
public void UnsubscribeDynamic < TH > ( string eventName )
public void UnsubscribeDynamic < TH > ( string eventName )
where TH : IDynamicIntegrationEventHandler
where TH : IDynamicIntegrationEventHandler
{
{
_subsManager . RemoveDynamicSubscription < TH > ( eventName ) ;
_subsManager . RemoveDynamicSubscription < TH > ( eventName ) ;
}
}
@ -195,25 +199,28 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ
private async Task ProcessEvent ( string eventName , string message )
private async Task ProcessEvent ( string eventName , string message )
{
{
if ( _subsManager . HasSubscriptionsForEvent ( eventName ) )
if ( _subsManager . HasSubscriptionsForEvent ( eventName ) )
{
{
var subscriptions = _subsManager . GetHandlersForEvent ( eventName ) ;
foreach ( var subscription in subscriptions )
using ( var scope = _autofac . BeginLifetimeScope ( AUTOFAC_SCOPE_NAME ) )
{
{
if ( subscription . IsDynamic )
{
var handler = subscription . Factory . DynamicInvoke ( ) as IDynamicIntegrationEventHandler ;
dynamic eventData = JObject . Parse ( message ) ;
await handler . Handle ( eventData ) ;
}
else
var subscriptions = _subsManager . GetHandlersForEvent ( eventName ) ;
foreach ( var subscription in subscriptions )
{
{
var eventType = _subsManager . GetEventTypeByName ( eventName ) ;
var integrationEvent = JsonConvert . DeserializeObject ( message , eventType ) ;
var handler = subscription . Factory . DynamicInvoke ( ) ;
var concreteType = typeof ( IIntegrationEventHandler < > ) . MakeGenericType ( eventType ) ;
await ( Task ) concreteType . GetMethod ( "Handle" ) . Invoke ( handler , new object [ ] { integrationEvent } ) ;
if ( subscription . IsDynamic )
{
var handler = scope . ResolveOptional ( subscription . HandlerType ) as IDynamicIntegrationEventHandler ;
dynamic eventData = JObject . Parse ( message ) ;
await handler . Handle ( eventData ) ;
}
else
{
var eventType = _subsManager . GetEventTypeByName ( eventName ) ;
var integrationEvent = JsonConvert . DeserializeObject ( message , eventType ) ;
var handler = scope . ResolveOptional ( subscription . HandlerType ) ;
var concreteType = typeof ( IIntegrationEventHandler < > ) . MakeGenericType ( eventType ) ;
await ( Task ) concreteType . GetMethod ( "Handle" ) . Invoke ( handler , new object [ ] { integrationEvent } ) ;
}
}
}
}
}
}
}