espent1004 2a42053817 Implementing MultiEventBusRabbitMQ to allow for multiple connections to multiple virtual hosts in RabbitMQ, where it is intended to have one virtual host per tenant to achieve tenant isolation.
TODO: Implement in all microservices, and fix selection of actualEventBus in the Publish method in MultiEventBusRabbitMQ
2020-02-04 23:25:10 +01:00

209 lines
8.2 KiB
C#

/*namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusServiceBus
{
using Autofac;
using Microsoft.Azure.ServiceBus;
using Microsoft.eShopOnContainers.BuildingBlocks.EventBus;
using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Abstractions;
using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Events;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using System;
using System.Text;
using System.Threading.Tasks;
public class EventBusServiceBus : IEventBus
{
private readonly IServiceBusPersisterConnection _serviceBusPersisterConnection;
private readonly ILogger<EventBusServiceBus> _logger;
private readonly IEventBusSubscriptionsManager _subsManager;
private readonly SubscriptionClient _subscriptionClient;
private readonly ILifetimeScope _autofac;
private readonly string AUTOFAC_SCOPE_NAME = "eshop_event_bus";
private const string INTEGRATION_EVENT_SUFFIX = "IntegrationEvent";
public EventBusServiceBus(IServiceBusPersisterConnection serviceBusPersisterConnection,
ILogger<EventBusServiceBus> logger, IEventBusSubscriptionsManager subsManager, string subscriptionClientName,
ILifetimeScope autofac)
{
_serviceBusPersisterConnection = serviceBusPersisterConnection;
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_subsManager = subsManager ?? new InMemoryEventBusSubscriptionsManager();
_subscriptionClient = new SubscriptionClient(serviceBusPersisterConnection.ServiceBusConnectionStringBuilder,
subscriptionClientName);
_autofac = autofac;
RemoveDefaultRule();
RegisterSubscriptionClientMessageHandler();
}
public void Publish(IntegrationEvent @event)
{
var eventName = @event.GetType().Name.Replace(INTEGRATION_EVENT_SUFFIX, "");
var jsonMessage = JsonConvert.SerializeObject(@event);
var body = Encoding.UTF8.GetBytes(jsonMessage);
var message = new Message
{
MessageId = Guid.NewGuid().ToString(),
Body = body,
Label = eventName,
};
var topicClient = _serviceBusPersisterConnection.CreateModel();
topicClient.SendAsync(message)
.GetAwaiter()
.GetResult();
}
public void SubscribeDynamic<TH>(string eventName)
where TH : IDynamicIntegrationEventHandler
{
_logger.LogInformation("Subscribing to dynamic event {EventName} with {EventHandler}", eventName, typeof(TH).Name);
_subsManager.AddDynamicSubscription<TH>(eventName);
}
public void Subscribe<T, TH>()
where T : IntegrationEvent
where TH : IIntegrationEventHandler<T>
{
var eventName = typeof(T).Name.Replace(INTEGRATION_EVENT_SUFFIX, "");
var containsKey = _subsManager.HasSubscriptionsForEvent<T>();
if (!containsKey)
{
try
{
_subscriptionClient.AddRuleAsync(new RuleDescription
{
Filter = new CorrelationFilter { Label = eventName },
Name = eventName
}).GetAwaiter().GetResult();
}
catch (ServiceBusException)
{
_logger.LogWarning("The messaging entity {eventName} already exists.", eventName);
}
}
_logger.LogInformation("Subscribing to event {EventName} with {EventHandler}", eventName, typeof(TH).Name);
_subsManager.AddSubscription<T, TH>();
}
public void Unsubscribe<T, TH>()
where T : IntegrationEvent
where TH : IIntegrationEventHandler<T>
{
var eventName = typeof(T).Name.Replace(INTEGRATION_EVENT_SUFFIX, "");
try
{
_subscriptionClient
.RemoveRuleAsync(eventName)
.GetAwaiter()
.GetResult();
}
catch (MessagingEntityNotFoundException)
{
_logger.LogWarning("The messaging entity {eventName} Could not be found.", eventName);
}
_logger.LogInformation("Unsubscribing from event {EventName}", eventName);
_subsManager.RemoveSubscription<T, TH>();
}
public void UnsubscribeDynamic<TH>(string eventName)
where TH : IDynamicIntegrationEventHandler
{
_logger.LogInformation("Unsubscribing from dynamic event {EventName}", eventName);
_subsManager.RemoveDynamicSubscription<TH>(eventName);
}
public void Dispose()
{
_subsManager.Clear();
}
private void RegisterSubscriptionClientMessageHandler()
{
_subscriptionClient.RegisterMessageHandler(
async (message, token) =>
{
var eventName = $"{message.Label}{INTEGRATION_EVENT_SUFFIX}";
var messageData = Encoding.UTF8.GetString(message.Body);
// Complete the message so that it is not received again.
if (await ProcessEvent(eventName, messageData))
{
await _subscriptionClient.CompleteAsync(message.SystemProperties.LockToken);
}
},
new MessageHandlerOptions(ExceptionReceivedHandler) { MaxConcurrentCalls = 10, AutoComplete = false });
}
private Task ExceptionReceivedHandler(ExceptionReceivedEventArgs exceptionReceivedEventArgs)
{
var ex = exceptionReceivedEventArgs.Exception;
var context = exceptionReceivedEventArgs.ExceptionReceivedContext;
_logger.LogError(ex, "ERROR handling message: {ExceptionMessage} - Context: {@ExceptionContext}", ex.Message, context);
return Task.CompletedTask;
}
private async Task<bool> ProcessEvent(string eventName, string message)
{
var processed = false;
if (_subsManager.HasSubscriptionsForEvent(eventName))
{
using (var scope = _autofac.BeginLifetimeScope(AUTOFAC_SCOPE_NAME))
{
var subscriptions = _subsManager.GetHandlersForEvent(eventName);
foreach (var subscription in subscriptions)
{
if (subscription.IsDynamic)
{
var handler = scope.ResolveOptional(subscription.HandlerType) as IDynamicIntegrationEventHandler;
if (handler == null) continue;
dynamic eventData = JObject.Parse(message);
await handler.Handle(eventData);
}
else
{
var handler = scope.ResolveOptional(subscription.HandlerType);
if (handler == null) continue;
var eventType = _subsManager.GetEventTypeByName(eventName);
var integrationEvent = JsonConvert.DeserializeObject(message, eventType);
var concreteType = typeof(IIntegrationEventHandler<>).MakeGenericType(eventType);
await (Task)concreteType.GetMethod("Handle").Invoke(handler, new object[] { integrationEvent });
}
}
}
processed = true;
}
return processed;
}
private void RemoveDefaultRule()
{
try
{
_subscriptionClient
.RemoveRuleAsync(RuleDescription.DefaultRuleName)
.GetAwaiter()
.GetResult();
}
catch (MessagingEntityNotFoundException)
{
_logger.LogWarning("The messaging entity {DefaultRuleName} Could not be found.", RuleDescription.DefaultRuleName);
}
}
}
}
*/