|
@ -27,7 +27,7 @@ |
|
|
ILifetimeScope autofac) |
|
|
ILifetimeScope autofac) |
|
|
{ |
|
|
{ |
|
|
_serviceBusPersisterConnection = serviceBusPersisterConnection; |
|
|
_serviceBusPersisterConnection = serviceBusPersisterConnection; |
|
|
_logger = logger; |
|
|
|
|
|
|
|
|
_logger = logger ?? throw new ArgumentNullException(nameof(logger)); |
|
|
_subsManager = subsManager ?? new InMemoryEventBusSubscriptionsManager(); |
|
|
_subsManager = subsManager ?? new InMemoryEventBusSubscriptionsManager(); |
|
|
|
|
|
|
|
|
_subscriptionClient = new SubscriptionClient(serviceBusPersisterConnection.ServiceBusConnectionStringBuilder, |
|
|
_subscriptionClient = new SubscriptionClient(serviceBusPersisterConnection.ServiceBusConnectionStringBuilder, |
|
@ -61,6 +61,8 @@ |
|
|
public void SubscribeDynamic<TH>(string eventName) |
|
|
public void SubscribeDynamic<TH>(string eventName) |
|
|
where TH : IDynamicIntegrationEventHandler |
|
|
where TH : IDynamicIntegrationEventHandler |
|
|
{ |
|
|
{ |
|
|
|
|
|
_logger.LogInformation("Subscribing to dynamic event {EventName} with {EventHandler}", eventName, nameof(TH)); |
|
|
|
|
|
|
|
|
_subsManager.AddDynamicSubscription<TH>(eventName); |
|
|
_subsManager.AddDynamicSubscription<TH>(eventName); |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
@ -87,6 +89,8 @@ |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
_logger.LogInformation("Subscribing to event {EventName} with {EventHandler}", eventName, nameof(TH)); |
|
|
|
|
|
|
|
|
_subsManager.AddSubscription<T, TH>(); |
|
|
_subsManager.AddSubscription<T, TH>(); |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
@ -108,12 +112,16 @@ |
|
|
_logger.LogWarning("The messaging entity {eventName} Could not be found.", eventName); |
|
|
_logger.LogWarning("The messaging entity {eventName} Could not be found.", eventName); |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
_logger.LogInformation("Unsubscribing from event {EventName}", eventName); |
|
|
|
|
|
|
|
|
_subsManager.RemoveSubscription<T, TH>(); |
|
|
_subsManager.RemoveSubscription<T, TH>(); |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
public void UnsubscribeDynamic<TH>(string eventName) |
|
|
public void UnsubscribeDynamic<TH>(string eventName) |
|
|
where TH : IDynamicIntegrationEventHandler |
|
|
where TH : IDynamicIntegrationEventHandler |
|
|
{ |
|
|
{ |
|
|
|
|
|
_logger.LogInformation("Unsubscribing from dynamic event {EventName}", eventName); |
|
|
|
|
|
|
|
|
_subsManager.RemoveDynamicSubscription<TH>(eventName); |
|
|
_subsManager.RemoveDynamicSubscription<TH>(eventName); |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
@ -136,17 +144,16 @@ |
|
|
await _subscriptionClient.CompleteAsync(message.SystemProperties.LockToken); |
|
|
await _subscriptionClient.CompleteAsync(message.SystemProperties.LockToken); |
|
|
} |
|
|
} |
|
|
}, |
|
|
}, |
|
|
new MessageHandlerOptions(ExceptionReceivedHandler) { MaxConcurrentCalls = 10, AutoComplete = false }); |
|
|
|
|
|
|
|
|
new MessageHandlerOptions(ExceptionReceivedHandler) { MaxConcurrentCalls = 10, AutoComplete = false }); |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
private Task ExceptionReceivedHandler(ExceptionReceivedEventArgs exceptionReceivedEventArgs) |
|
|
private Task ExceptionReceivedHandler(ExceptionReceivedEventArgs exceptionReceivedEventArgs) |
|
|
{ |
|
|
{ |
|
|
Console.WriteLine($"Message handler encountered an exception {exceptionReceivedEventArgs.Exception}."); |
|
|
|
|
|
|
|
|
var ex = exceptionReceivedEventArgs.Exception; |
|
|
var context = exceptionReceivedEventArgs.ExceptionReceivedContext; |
|
|
var context = exceptionReceivedEventArgs.ExceptionReceivedContext; |
|
|
Console.WriteLine("Exception context for troubleshooting:"); |
|
|
|
|
|
Console.WriteLine($"- Endpoint: {context.Endpoint}"); |
|
|
|
|
|
Console.WriteLine($"- Entity Path: {context.EntityPath}"); |
|
|
|
|
|
Console.WriteLine($"- Executing Action: {context.Action}"); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
_logger.LogError(ex, "ERROR handling message: {ExceptionMessage} - Context: {@ExceptionContext}", ex.Message, context); |
|
|
|
|
|
|
|
|
return Task.CompletedTask; |
|
|
return Task.CompletedTask; |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
@ -172,7 +179,7 @@ |
|
|
var handler = scope.ResolveOptional(subscription.HandlerType); |
|
|
var handler = scope.ResolveOptional(subscription.HandlerType); |
|
|
if (handler == null) continue; |
|
|
if (handler == null) continue; |
|
|
var eventType = _subsManager.GetEventTypeByName(eventName); |
|
|
var eventType = _subsManager.GetEventTypeByName(eventName); |
|
|
var integrationEvent = JsonConvert.DeserializeObject(message, eventType); |
|
|
|
|
|
|
|
|
var integrationEvent = JsonConvert.DeserializeObject(message, eventType); |
|
|
var concreteType = typeof(IIntegrationEventHandler<>).MakeGenericType(eventType); |
|
|
var concreteType = typeof(IIntegrationEventHandler<>).MakeGenericType(eventType); |
|
|
await (Task)concreteType.GetMethod("Handle").Invoke(handler, new object[] { integrationEvent }); |
|
|
await (Task)concreteType.GetMethod("Handle").Invoke(handler, new object[] { integrationEvent }); |
|
|
} |
|
|
} |
|
|