|
@ -12,10 +12,14 @@ using RabbitMQ.Client; |
|
|
using RabbitMQ.Client.Events; |
|
|
using RabbitMQ.Client.Events; |
|
|
using RabbitMQ.Client.Exceptions; |
|
|
using RabbitMQ.Client.Exceptions; |
|
|
using System; |
|
|
using System; |
|
|
|
|
|
using System.Collections.Generic; |
|
|
using System.Diagnostics; |
|
|
using System.Diagnostics; |
|
|
|
|
|
using System.Net; |
|
|
|
|
|
using System.Net.Http; |
|
|
using System.Net.Sockets; |
|
|
using System.Net.Sockets; |
|
|
using System.Text; |
|
|
using System.Text; |
|
|
using System.Threading.Tasks; |
|
|
using System.Threading.Tasks; |
|
|
|
|
|
using System.Web; |
|
|
|
|
|
|
|
|
namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ |
|
|
namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ |
|
|
{ |
|
|
{ |
|
@ -28,15 +32,20 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ |
|
|
private readonly IEventBusSubscriptionsManager _subsManager; |
|
|
private readonly IEventBusSubscriptionsManager _subsManager; |
|
|
private readonly ILifetimeScope _autofac; |
|
|
private readonly ILifetimeScope _autofac; |
|
|
private readonly string AUTOFAC_SCOPE_NAME = "eshop_event_bus"; |
|
|
private readonly string AUTOFAC_SCOPE_NAME = "eshop_event_bus"; |
|
|
|
|
|
private static readonly String tenantACustomisationUrl = @"http://tenantacustomisation/"; |
|
|
|
|
|
private static readonly String tenantManagerUrl = @"http://tenantmanager/"; |
|
|
private readonly int _retryCount; |
|
|
private readonly int _retryCount; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
private IModel _consumerChannel; |
|
|
private IModel _consumerChannel; |
|
|
private string _queueName; |
|
|
private string _queueName; |
|
|
|
|
|
|
|
|
public EventBusRabbitMQ(IRabbitMQPersistentConnection persistentConnection, ILogger<EventBusRabbitMQ> logger, |
|
|
public EventBusRabbitMQ(IRabbitMQPersistentConnection persistentConnection, ILogger<EventBusRabbitMQ> logger, |
|
|
ILifetimeScope autofac, IEventBusSubscriptionsManager subsManager, string queueName = null, int retryCount = 5) |
|
|
|
|
|
|
|
|
ILifetimeScope autofac, IEventBusSubscriptionsManager subsManager, string queueName = null, |
|
|
|
|
|
int retryCount = 5) |
|
|
{ |
|
|
{ |
|
|
_persistentConnection = persistentConnection ?? throw new ArgumentNullException(nameof(persistentConnection)); |
|
|
|
|
|
|
|
|
_persistentConnection = |
|
|
|
|
|
persistentConnection ?? throw new ArgumentNullException(nameof(persistentConnection)); |
|
|
_logger = logger ?? throw new ArgumentNullException(nameof(logger)); |
|
|
_logger = logger ?? throw new ArgumentNullException(nameof(logger)); |
|
|
_subsManager = subsManager ?? new InMemoryEventBusSubscriptionsManager(); |
|
|
_subsManager = subsManager ?? new InMemoryEventBusSubscriptionsManager(); |
|
|
_queueName = queueName; |
|
|
_queueName = queueName; |
|
@ -76,18 +85,21 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ |
|
|
|
|
|
|
|
|
var policy = RetryPolicy.Handle<BrokerUnreachableException>() |
|
|
var policy = RetryPolicy.Handle<BrokerUnreachableException>() |
|
|
.Or<SocketException>() |
|
|
.Or<SocketException>() |
|
|
.WaitAndRetry(_retryCount, retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)), (ex, time) => |
|
|
|
|
|
{ |
|
|
|
|
|
_logger.LogWarning(ex, "Could not publish event: {EventId} after {Timeout}s ({ExceptionMessage})", @event.Id, $"{time.TotalSeconds:n1}", ex.Message); |
|
|
|
|
|
}); |
|
|
|
|
|
|
|
|
.WaitAndRetry(_retryCount, retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)), |
|
|
|
|
|
(ex, time) => |
|
|
|
|
|
{ |
|
|
|
|
|
_logger.LogWarning(ex, |
|
|
|
|
|
"Could not publish event: {EventId} after {Timeout}s ({ExceptionMessage})", @event.Id, |
|
|
|
|
|
$"{time.TotalSeconds:n1}", ex.Message); |
|
|
|
|
|
}); |
|
|
|
|
|
|
|
|
var eventName = @event.GetType().Name; |
|
|
var eventName = @event.GetType().Name; |
|
|
|
|
|
|
|
|
_logger.LogWarning("Creating RabbitMQ channel to publish event: {EventId} ({EventName})", @event.Id, eventName); |
|
|
|
|
|
|
|
|
_logger.LogWarning("Creating RabbitMQ channel to publish event: {EventId} ({EventName})", @event.Id, |
|
|
|
|
|
eventName); |
|
|
|
|
|
|
|
|
using (var channel = _persistentConnection.CreateModel()) |
|
|
using (var channel = _persistentConnection.CreateModel()) |
|
|
{ |
|
|
{ |
|
|
|
|
|
|
|
|
_logger.LogWarning("Declaring RabbitMQ exchange to publish event: {EventId}", @event.Id); |
|
|
_logger.LogWarning("Declaring RabbitMQ exchange to publish event: {EventId}", @event.Id); |
|
|
|
|
|
|
|
|
channel.ExchangeDeclare(exchange: BROKER_NAME, type: "direct"); |
|
|
channel.ExchangeDeclare(exchange: BROKER_NAME, type: "direct"); |
|
@ -115,7 +127,8 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ |
|
|
public void SubscribeDynamic<TH>(string eventName) |
|
|
public void SubscribeDynamic<TH>(string eventName) |
|
|
where TH : IDynamicIntegrationEventHandler |
|
|
where TH : IDynamicIntegrationEventHandler |
|
|
{ |
|
|
{ |
|
|
_logger.LogInformation("Subscribing to dynamic event {EventName} with {EventHandler}", eventName, typeof(TH).GetGenericTypeName()); |
|
|
|
|
|
|
|
|
_logger.LogInformation("Subscribing to dynamic event {EventName} with {EventHandler}", eventName, |
|
|
|
|
|
typeof(TH).GetGenericTypeName()); |
|
|
|
|
|
|
|
|
DoInternalSubscription(eventName); |
|
|
DoInternalSubscription(eventName); |
|
|
_subsManager.AddDynamicSubscription<TH>(eventName); |
|
|
_subsManager.AddDynamicSubscription<TH>(eventName); |
|
@ -129,7 +142,8 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ |
|
|
var eventName = _subsManager.GetEventKey<T>(); |
|
|
var eventName = _subsManager.GetEventKey<T>(); |
|
|
DoInternalSubscription(eventName); |
|
|
DoInternalSubscription(eventName); |
|
|
|
|
|
|
|
|
_logger.LogInformation("Subscribing to event {EventName} with {EventHandler}", eventName, typeof(TH).GetGenericTypeName()); |
|
|
|
|
|
|
|
|
_logger.LogInformation("Subscribing to event {EventName} with {EventHandler}", eventName, |
|
|
|
|
|
typeof(TH).GetGenericTypeName()); |
|
|
|
|
|
|
|
|
_subsManager.AddSubscription<T, TH>(); |
|
|
_subsManager.AddSubscription<T, TH>(); |
|
|
StartBasicConsume(); |
|
|
StartBasicConsume(); |
|
@ -148,8 +162,8 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ |
|
|
using (var channel = _persistentConnection.CreateModel()) |
|
|
using (var channel = _persistentConnection.CreateModel()) |
|
|
{ |
|
|
{ |
|
|
channel.QueueBind(queue: _queueName, |
|
|
channel.QueueBind(queue: _queueName, |
|
|
exchange: BROKER_NAME, |
|
|
|
|
|
routingKey: eventName); |
|
|
|
|
|
|
|
|
exchange: BROKER_NAME, |
|
|
|
|
|
routingKey: eventName); |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
@ -239,13 +253,13 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ |
|
|
var channel = _persistentConnection.CreateModel(); |
|
|
var channel = _persistentConnection.CreateModel(); |
|
|
|
|
|
|
|
|
channel.ExchangeDeclare(exchange: BROKER_NAME, |
|
|
channel.ExchangeDeclare(exchange: BROKER_NAME, |
|
|
type: "direct"); |
|
|
|
|
|
|
|
|
type: "direct"); |
|
|
|
|
|
|
|
|
channel.QueueDeclare(queue: _queueName, |
|
|
channel.QueueDeclare(queue: _queueName, |
|
|
durable: true, |
|
|
|
|
|
exclusive: false, |
|
|
|
|
|
autoDelete: false, |
|
|
|
|
|
arguments: null); |
|
|
|
|
|
|
|
|
durable: true, |
|
|
|
|
|
exclusive: false, |
|
|
|
|
|
autoDelete: false, |
|
|
|
|
|
arguments: null); |
|
|
|
|
|
|
|
|
channel.CallbackException += (sender, ea) => |
|
|
channel.CallbackException += (sender, ea) => |
|
|
{ |
|
|
{ |
|
@ -258,6 +272,60 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ |
|
|
|
|
|
|
|
|
return channel; |
|
|
return channel; |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
private async void SendEventToTenant(Object @event) |
|
|
|
|
|
{ |
|
|
|
|
|
string myJson = JsonConvert.SerializeObject(@event); |
|
|
|
|
|
using (var client = new HttpClient()) |
|
|
|
|
|
{ |
|
|
|
|
|
try |
|
|
|
|
|
{ |
|
|
|
|
|
//TODO replace URL with response from tenantmanager
|
|
|
|
|
|
var response = await client.PostAsync( |
|
|
|
|
|
tenantACustomisationUrl + "api/OrderStatusChangedToSubmittedIntegrationEvents", |
|
|
|
|
|
new StringContent(myJson, Encoding.UTF8, "application/json")); |
|
|
|
|
|
response.EnsureSuccessStatusCode(); |
|
|
|
|
|
_logger.LogInformation("----- Event sent to tenant{@event} -----", @event); |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
catch (Exception e) |
|
|
|
|
|
{ |
|
|
|
|
|
_logger.LogInformation("----- Exception{@e} -- Event{@event} -----", e, @event); |
|
|
|
|
|
} |
|
|
|
|
|
} |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
private async Task<Boolean> IsEventCustomised(String eventName, String tenantId) |
|
|
|
|
|
{ |
|
|
|
|
|
Boolean isCustomised = false; |
|
|
|
|
|
|
|
|
|
|
|
var builder = new UriBuilder(tenantManagerUrl + "api/Customisations"); |
|
|
|
|
|
builder.Port = -1; |
|
|
|
|
|
var query = HttpUtility.ParseQueryString(builder.Query); |
|
|
|
|
|
query["eventName"] = eventName; |
|
|
|
|
|
query["tenantId"] = tenantId; |
|
|
|
|
|
builder.Query = query.ToString(); |
|
|
|
|
|
string url = builder.ToString(); |
|
|
|
|
|
|
|
|
|
|
|
using (var client = new HttpClient()) |
|
|
|
|
|
{ |
|
|
|
|
|
try |
|
|
|
|
|
{ |
|
|
|
|
|
var response = await client.GetAsync( |
|
|
|
|
|
url); |
|
|
|
|
|
response.EnsureSuccessStatusCode(); |
|
|
|
|
|
isCustomised = |
|
|
|
|
|
JsonConvert.DeserializeObject<Boolean>(response.Content.ReadAsStringAsync().Result); |
|
|
|
|
|
} |
|
|
|
|
|
catch (Exception e) |
|
|
|
|
|
{ |
|
|
|
|
|
_logger.LogInformation("----- Exception{@e}", e); |
|
|
|
|
|
} |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
return isCustomised; |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
private async Task ProcessEvent(string eventName, string message) |
|
|
private async Task ProcessEvent(string eventName, string message) |
|
|
{ |
|
|
{ |
|
|
_logger.LogWarning("Processing RabbitMQ event: {EventName}", eventName); |
|
|
_logger.LogWarning("Processing RabbitMQ event: {EventName}", eventName); |
|
@ -271,7 +339,9 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ |
|
|
{ |
|
|
{ |
|
|
if (subscription.IsDynamic) |
|
|
if (subscription.IsDynamic) |
|
|
{ |
|
|
{ |
|
|
var handler = scope.ResolveOptional(subscription.HandlerType) as IDynamicIntegrationEventHandler; |
|
|
|
|
|
|
|
|
//TODO check if it is required here aswell
|
|
|
|
|
|
var handler = |
|
|
|
|
|
scope.ResolveOptional(subscription.HandlerType) as IDynamicIntegrationEventHandler; |
|
|
if (handler == null) continue; |
|
|
if (handler == null) continue; |
|
|
dynamic eventData = JObject.Parse(message); |
|
|
dynamic eventData = JObject.Parse(message); |
|
|
|
|
|
|
|
@ -281,17 +351,32 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ |
|
|
else |
|
|
else |
|
|
{ |
|
|
{ |
|
|
var handler = scope.ResolveOptional(subscription.HandlerType); |
|
|
var handler = scope.ResolveOptional(subscription.HandlerType); |
|
|
if (eventName.Equals("OrderStatusChangedToSubmittedIntegrationEvent") || eventName.Equals("UserCheckoutAcceptedIntegrationEvent")) |
|
|
|
|
|
{ |
|
|
|
|
|
Debug.WriteLine("Here"); |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
if (handler == null) continue; |
|
|
if (handler == null) continue; |
|
|
var eventType = _subsManager.GetEventTypeByName(eventName); |
|
|
var eventType = _subsManager.GetEventTypeByName(eventName); |
|
|
var integrationEvent = JsonConvert.DeserializeObject(message, eventType); |
|
|
var integrationEvent = JsonConvert.DeserializeObject(message, eventType); |
|
|
|
|
|
//IsEventCustomised(eventName, integrationEvent.TenantId);
|
|
|
|
|
|
if (eventName.Equals("OrderStatusChangedToSubmittedIntegrationEvent") && |
|
|
|
|
|
integrationEvent is IntegrationEvent) //TODO replace with tenantmanager
|
|
|
|
|
|
{ |
|
|
|
|
|
//Casting
|
|
|
|
|
|
IntegrationEvent evt = (IntegrationEvent) integrationEvent; |
|
|
|
|
|
//Checking if event should be sent to tenant, or handled normally
|
|
|
|
|
|
//Can instead create an endpoint in the tenant manager that also handles all the events that a tenant wants to postpone
|
|
|
|
|
|
//Additionally, an endpoint in the tenant manager is required, where the tenant
|
|
|
|
|
|
//Issue with the tenant knowing the id of the event
|
|
|
|
|
|
if (evt.CheckForCustomisation) |
|
|
|
|
|
{ |
|
|
|
|
|
SendEventToTenant(integrationEvent); |
|
|
|
|
|
break; |
|
|
|
|
|
} |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
var concreteType = typeof(IIntegrationEventHandler<>).MakeGenericType(eventType); |
|
|
var concreteType = typeof(IIntegrationEventHandler<>).MakeGenericType(eventType); |
|
|
|
|
|
|
|
|
await Task.Yield(); |
|
|
await Task.Yield(); |
|
|
await (Task)concreteType.GetMethod("Handle").Invoke(handler, new object[] { integrationEvent }); |
|
|
|
|
|
|
|
|
await (Task) concreteType.GetMethod("Handle") |
|
|
|
|
|
.Invoke(handler, new object[] {integrationEvent}); |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
@ -302,4 +387,4 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
} |