|
|
@ -15,7 +15,7 @@ namespace Microsoft.eShopOnContainers.Services.Common.Infrastructure |
|
|
|
{ |
|
|
|
public class EventBusRabbitMQ : IEventBus |
|
|
|
{ |
|
|
|
private readonly string _brokerName = "event_bus"; |
|
|
|
private readonly string _brokerName = "eshop_event_bus"; |
|
|
|
private readonly string _connectionString; |
|
|
|
private readonly Dictionary<string, List<IIntegrationEventHandler>> _handlers; |
|
|
|
private readonly List<Type> _eventTypes; |
|
|
@ -120,24 +120,11 @@ namespace Microsoft.eShopOnContainers.Services.Common.Infrastructure |
|
|
|
|
|
|
|
var consumer = new EventingBasicConsumer(channel); |
|
|
|
consumer.Received += async (model, ea) => |
|
|
|
{ |
|
|
|
{ |
|
|
|
var eventName = ea.RoutingKey; |
|
|
|
if (_handlers.ContainsKey(eventName)) |
|
|
|
{ |
|
|
|
var message = Encoding.UTF8.GetString(ea.Body); |
|
|
|
Type eventType = _eventTypes.Single(t => t.Name == eventName); |
|
|
|
|
|
|
|
var integrationEvent = JsonConvert.DeserializeObject(message, eventType); |
|
|
|
var handlers = _handlers[eventName]; |
|
|
|
|
|
|
|
|
|
|
|
var concreteType = typeof(IIntegrationEventHandler<>).MakeGenericType(eventType); |
|
|
|
|
|
|
|
foreach (var handler in handlers) |
|
|
|
{ |
|
|
|
await (Task)concreteType.GetMethod("Handle").Invoke(handler, new object[] { integrationEvent }); |
|
|
|
} |
|
|
|
} |
|
|
|
var message = Encoding.UTF8.GetString(ea.Body); |
|
|
|
|
|
|
|
await ProcessEvent(eventName, message); |
|
|
|
}; |
|
|
|
channel.BasicConsume(queue: _queueName, |
|
|
|
noAck: true, |
|
|
@ -147,5 +134,21 @@ namespace Microsoft.eShopOnContainers.Services.Common.Infrastructure |
|
|
|
return _connection.Item1; |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
private async Task ProcessEvent(string eventName, string message) |
|
|
|
{ |
|
|
|
if (_handlers.ContainsKey(eventName)) |
|
|
|
{ |
|
|
|
Type eventType = _eventTypes.Single(t => t.Name == eventName); |
|
|
|
var integrationEvent = JsonConvert.DeserializeObject(message, eventType); |
|
|
|
var concreteType = typeof(IIntegrationEventHandler<>).MakeGenericType(eventType); |
|
|
|
var handlers = _handlers[eventName]; |
|
|
|
|
|
|
|
foreach (var handler in handlers) |
|
|
|
{ |
|
|
|
await (Task)concreteType.GetMethod("Handle").Invoke(handler, new object[] { integrationEvent }); |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |