@ -1,8 +0,0 @@ | |||
<Project Sdk="Microsoft.NET.Sdk"> | |||
<PropertyGroup> | |||
<TargetFramework>netstandard1.0</TargetFramework> | |||
<RootNamespace>Microsoft.eShopOnContainers.BuildingBlocks.CommandBus</RootNamespace> | |||
</PropertyGroup> | |||
</Project> |
@ -1,16 +0,0 @@ | |||
using System; | |||
using System.Collections.Generic; | |||
using System.Text; | |||
using System.Threading.Tasks; | |||
namespace Microsoft.eShopOnContainers.BuildingBlocks.CommandBus | |||
{ | |||
public interface ICommandBus | |||
{ | |||
void Send<T>(string name, T data); | |||
void Handle<TC>(string name, IIntegrationCommandHandler<TC> handler); | |||
void Handle(string name, IIntegrationCommandHandler handler); | |||
void Handle<TI, TC>(TI handler) | |||
where TI : IIntegrationCommandHandler<TC>; | |||
} | |||
} |
@ -1,16 +0,0 @@ | |||
using System; | |||
using System.Collections.Generic; | |||
using System.Text; | |||
namespace Microsoft.eShopOnContainers.BuildingBlocks.CommandBus | |||
{ | |||
public interface IIntegrationCommandHandler | |||
{ | |||
void Handle(IntegrationCommand command); | |||
} | |||
public interface IIntegrationCommandHandler<T> : IIntegrationCommandHandler | |||
{ | |||
void Handle(IntegrationCommand<T> command); | |||
} | |||
} |
@ -1,35 +0,0 @@ | |||
using System; | |||
using System.Collections.Generic; | |||
using System.Text; | |||
namespace Microsoft.eShopOnContainers.BuildingBlocks.CommandBus | |||
{ | |||
public abstract class IntegrationCommand | |||
{ | |||
public Guid Id { get; } | |||
public DateTime Sent { get; } | |||
public abstract object GetDataAsObject(); | |||
protected IntegrationCommand() | |||
{ | |||
Id = Guid.NewGuid(); | |||
Sent = DateTime.UtcNow; | |||
} | |||
} | |||
public class IntegrationCommand<T> : IntegrationCommand | |||
{ | |||
public T Data { get; } | |||
public string Name { get; } | |||
public override object GetDataAsObject() => Data; | |||
public IntegrationCommand(string name, T data) : base() | |||
{ | |||
Data = data; | |||
Name = name; | |||
} | |||
} | |||
} |
@ -1,22 +0,0 @@ | |||
<Project Sdk="Microsoft.NET.Sdk"> | |||
<PropertyGroup> | |||
<TargetFramework>netstandard2.0</TargetFramework> | |||
</PropertyGroup> | |||
<ItemGroup> | |||
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="15.3.0" /> | |||
<PackageReference Include="xunit" Version="2.3.0" /> | |||
<PackageReference Include="xunit.runner.visualstudio" Version="2.3.0" /> | |||
</ItemGroup> | |||
<ItemGroup> | |||
<ProjectReference Include="..\EventBusRabbitMQ\EventBusRabbitMQ.csproj" /> | |||
<ProjectReference Include="..\EventBus\EventBus.csproj" /> | |||
</ItemGroup> | |||
<ItemGroup> | |||
<Service Include="{82a7f48d-3b50-4b1e-b82e-3ada8210c358}" /> | |||
</ItemGroup> | |||
</Project> |
@ -1,56 +0,0 @@ | |||
using Microsoft.eShopOnContainers.BuildingBlocks.EventBus; | |||
using System; | |||
using System.Linq; | |||
using Xunit; | |||
namespace EventBus.Tests | |||
{ | |||
public class InMemory_SubscriptionManager_Tests | |||
{ | |||
[Fact] | |||
public void After_Creation_Should_Be_Empty() | |||
{ | |||
var manager = new InMemoryEventBusSubscriptionsManager(); | |||
Assert.True(manager.IsEmpty); | |||
} | |||
[Fact] | |||
public void After_One_Event_Subscription_Should_Contain_The_Event() | |||
{ | |||
var manager = new InMemoryEventBusSubscriptionsManager(); | |||
manager.AddSubscription<TestIntegrationEvent,TestIntegrationEventHandler>(); | |||
Assert.True(manager.HasSubscriptionsForEvent<TestIntegrationEvent>()); | |||
} | |||
[Fact] | |||
public void After_All_Subscriptions_Are_Deleted_Event_Should_No_Longer_Exists() | |||
{ | |||
var manager = new InMemoryEventBusSubscriptionsManager(); | |||
manager.AddSubscription<TestIntegrationEvent, TestIntegrationEventHandler>(); | |||
manager.RemoveSubscription<TestIntegrationEvent, TestIntegrationEventHandler>(); | |||
Assert.False(manager.HasSubscriptionsForEvent<TestIntegrationEvent>()); | |||
} | |||
[Fact] | |||
public void Deleting_Last_Subscription_Should_Raise_On_Deleted_Event() | |||
{ | |||
bool raised = false; | |||
var manager = new InMemoryEventBusSubscriptionsManager(); | |||
manager.OnEventRemoved += (o, e) => raised = true; | |||
manager.AddSubscription<TestIntegrationEvent, TestIntegrationEventHandler>(); | |||
manager.RemoveSubscription<TestIntegrationEvent, TestIntegrationEventHandler>(); | |||
Assert.True(raised); | |||
} | |||
[Fact] | |||
public void Get_Handlers_For_Event_Should_Return_All_Handlers() | |||
{ | |||
var manager = new InMemoryEventBusSubscriptionsManager(); | |||
manager.AddSubscription<TestIntegrationEvent, TestIntegrationEventHandler>(); | |||
manager.AddSubscription<TestIntegrationEvent, TestIntegrationOtherEventHandler>(); | |||
var handlers = manager.GetHandlersForEvent<TestIntegrationEvent>(); | |||
Assert.Equal(2, handlers.Count()); | |||
} | |||
} | |||
} |
@ -1,11 +0,0 @@ | |||
using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Events; | |||
using System; | |||
using System.Collections.Generic; | |||
using System.Text; | |||
namespace EventBus.Tests | |||
{ | |||
public class TestIntegrationEvent : IntegrationEvent | |||
{ | |||
} | |||
} |
@ -1,23 +0,0 @@ | |||
using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Abstractions; | |||
using System; | |||
using System.Collections.Generic; | |||
using System.Text; | |||
using System.Threading.Tasks; | |||
namespace EventBus.Tests | |||
{ | |||
public class TestIntegrationEventHandler : IIntegrationEventHandler<TestIntegrationEvent> | |||
{ | |||
public bool Handled { get; private set; } | |||
public TestIntegrationEventHandler() | |||
{ | |||
Handled = false; | |||
} | |||
public async Task Handle(TestIntegrationEvent @event) | |||
{ | |||
Handled = true; | |||
} | |||
} | |||
} |
@ -1,23 +0,0 @@ | |||
using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Abstractions; | |||
using System; | |||
using System.Collections.Generic; | |||
using System.Text; | |||
using System.Threading.Tasks; | |||
namespace EventBus.Tests | |||
{ | |||
public class TestIntegrationOtherEventHandler : IIntegrationEventHandler<TestIntegrationEvent> | |||
{ | |||
public bool Handled { get; private set; } | |||
public TestIntegrationOtherEventHandler() | |||
{ | |||
Handled = false; | |||
} | |||
public async Task Handle(TestIntegrationEvent @event) | |||
{ | |||
Handled = true; | |||
} | |||
} | |||
} |
@ -1,13 +0,0 @@ | |||
using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Abstractions; | |||
using System; | |||
using System.Collections.Generic; | |||
using System.Text; | |||
using System.Threading.Tasks; | |||
namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Abstractions | |||
{ | |||
public interface IDynamicIntegrationEventHandler | |||
{ | |||
Task Handle(dynamic eventData); | |||
} | |||
} |
@ -1,24 +0,0 @@ | |||
using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Events; | |||
using System; | |||
namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Abstractions | |||
{ | |||
public interface IEventBus | |||
{ | |||
void Publish(IntegrationEvent @event); | |||
void Subscribe<T, TH>() | |||
where T : IntegrationEvent | |||
where TH : IIntegrationEventHandler<T>; | |||
void SubscribeDynamic<TH>(string eventName) | |||
where TH : IDynamicIntegrationEventHandler; | |||
void UnsubscribeDynamic<TH>(string eventName) | |||
where TH : IDynamicIntegrationEventHandler; | |||
void Unsubscribe<T, TH>() | |||
where TH : IIntegrationEventHandler<T> | |||
where T : IntegrationEvent; | |||
} | |||
} |
@ -1,15 +0,0 @@ | |||
using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Events; | |||
using System.Threading.Tasks; | |||
namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Abstractions | |||
{ | |||
public interface IIntegrationEventHandler<in TIntegrationEvent> : IIntegrationEventHandler | |||
where TIntegrationEvent: IntegrationEvent | |||
{ | |||
Task Handle(TIntegrationEvent @event); | |||
} | |||
public interface IIntegrationEventHandler | |||
{ | |||
} | |||
} |
@ -1,12 +0,0 @@ | |||
<Project Sdk="Microsoft.NET.Sdk"> | |||
<PropertyGroup> | |||
<TargetFramework>netstandard2.0</TargetFramework> | |||
<RootNamespace>Microsoft.eShopOnContainers.BuildingBlocks.EventBus</RootNamespace> | |||
</PropertyGroup> | |||
<ItemGroup> | |||
<PackageReference Include="Newtonsoft.Json" Version="11.0.2" /> | |||
</ItemGroup> | |||
</Project> |
@ -1,27 +0,0 @@ | |||
using System; | |||
using Newtonsoft.Json; | |||
namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Events | |||
{ | |||
public class IntegrationEvent | |||
{ | |||
public IntegrationEvent() | |||
{ | |||
Id = Guid.NewGuid(); | |||
CreationDate = DateTime.UtcNow; | |||
} | |||
[JsonConstructor] | |||
public IntegrationEvent(Guid id, DateTime createDate) | |||
{ | |||
Id = id; | |||
CreationDate = createDate; | |||
} | |||
[JsonProperty] | |||
public Guid Id { get; private set; } | |||
[JsonProperty] | |||
public DateTime CreationDate { get; private set; } | |||
} | |||
} |
@ -1,32 +0,0 @@ | |||
using System; | |||
using System.Collections.Generic; | |||
using System.Linq; | |||
using System.Threading.Tasks; | |||
namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Extensions | |||
{ | |||
public static class GenericTypeExtensions | |||
{ | |||
public static string GetGenericTypeName(this Type type) | |||
{ | |||
var typeName = string.Empty; | |||
if (type.IsGenericType) | |||
{ | |||
var genericTypes = string.Join(",", type.GetGenericArguments().Select(t => t.Name).ToArray()); | |||
typeName = $"{type.Name.Remove(type.Name.IndexOf('`'))}<{genericTypes}>"; | |||
} | |||
else | |||
{ | |||
typeName = type.Name; | |||
} | |||
return typeName; | |||
} | |||
public static string GetGenericTypeName(this object @object) | |||
{ | |||
return @object.GetType().GetGenericTypeName(); | |||
} | |||
} | |||
} |
@ -1,34 +0,0 @@ | |||
using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Abstractions; | |||
using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Events; | |||
using System; | |||
using System.Collections.Generic; | |||
using static Microsoft.eShopOnContainers.BuildingBlocks.EventBus.InMemoryEventBusSubscriptionsManager; | |||
namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBus | |||
{ | |||
public interface IEventBusSubscriptionsManager | |||
{ | |||
bool IsEmpty { get; } | |||
event EventHandler<string> OnEventRemoved; | |||
void AddDynamicSubscription<TH>(string eventName) | |||
where TH : IDynamicIntegrationEventHandler; | |||
void AddSubscription<T, TH>() | |||
where T : IntegrationEvent | |||
where TH : IIntegrationEventHandler<T>; | |||
void RemoveSubscription<T, TH>() | |||
where TH : IIntegrationEventHandler<T> | |||
where T : IntegrationEvent; | |||
void RemoveDynamicSubscription<TH>(string eventName) | |||
where TH : IDynamicIntegrationEventHandler; | |||
bool HasSubscriptionsForEvent<T>() where T : IntegrationEvent; | |||
bool HasSubscriptionsForEvent(string eventName); | |||
Type GetEventTypeByName(string eventName); | |||
void Clear(); | |||
IEnumerable<SubscriptionInfo> GetHandlersForEvent<T>() where T : IntegrationEvent; | |||
IEnumerable<SubscriptionInfo> GetHandlersForEvent(string eventName); | |||
string GetEventKey<T>(); | |||
} | |||
} |
@ -1,165 +0,0 @@ | |||
using System; | |||
using System.Collections.Generic; | |||
using System.Linq; | |||
using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Abstractions; | |||
using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Events; | |||
namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBus | |||
{ | |||
public partial class InMemoryEventBusSubscriptionsManager : IEventBusSubscriptionsManager | |||
{ | |||
private readonly Dictionary<string, List<SubscriptionInfo>> _handlers; | |||
private readonly List<Type> _eventTypes; | |||
public event EventHandler<string> OnEventRemoved; | |||
public InMemoryEventBusSubscriptionsManager() | |||
{ | |||
_handlers = new Dictionary<string, List<SubscriptionInfo>>(); | |||
_eventTypes = new List<Type>(); | |||
} | |||
public bool IsEmpty => !_handlers.Keys.Any(); | |||
public void Clear() => _handlers.Clear(); | |||
public void AddDynamicSubscription<TH>(string eventName) | |||
where TH : IDynamicIntegrationEventHandler | |||
{ | |||
DoAddSubscription(typeof(TH), eventName, isDynamic: true); | |||
} | |||
public void AddSubscription<T, TH>() | |||
where T : IntegrationEvent | |||
where TH : IIntegrationEventHandler<T> | |||
{ | |||
var eventName = GetEventKey<T>(); | |||
DoAddSubscription(typeof(TH), eventName, isDynamic: false); | |||
if (!_eventTypes.Contains(typeof(T))) | |||
{ | |||
_eventTypes.Add(typeof(T)); | |||
} | |||
} | |||
private void DoAddSubscription(Type handlerType, string eventName, bool isDynamic) | |||
{ | |||
if (!HasSubscriptionsForEvent(eventName)) | |||
{ | |||
_handlers.Add(eventName, new List<SubscriptionInfo>()); | |||
} | |||
if (_handlers[eventName].Any(s => s.HandlerType == handlerType)) | |||
{ | |||
throw new ArgumentException( | |||
$"Handler Type {handlerType.Name} already registered for '{eventName}'", nameof(handlerType)); | |||
} | |||
if (isDynamic) | |||
{ | |||
_handlers[eventName].Add(SubscriptionInfo.Dynamic(handlerType)); | |||
} | |||
else | |||
{ | |||
_handlers[eventName].Add(SubscriptionInfo.Typed(handlerType)); | |||
} | |||
} | |||
public void RemoveDynamicSubscription<TH>(string eventName) | |||
where TH : IDynamicIntegrationEventHandler | |||
{ | |||
var handlerToRemove = FindDynamicSubscriptionToRemove<TH>(eventName); | |||
DoRemoveHandler(eventName, handlerToRemove); | |||
} | |||
public void RemoveSubscription<T, TH>() | |||
where TH : IIntegrationEventHandler<T> | |||
where T : IntegrationEvent | |||
{ | |||
var handlerToRemove = FindSubscriptionToRemove<T, TH>(); | |||
var eventName = GetEventKey<T>(); | |||
DoRemoveHandler(eventName, handlerToRemove); | |||
} | |||
private void DoRemoveHandler(string eventName, SubscriptionInfo subsToRemove) | |||
{ | |||
if (subsToRemove != null) | |||
{ | |||
_handlers[eventName].Remove(subsToRemove); | |||
if (!_handlers[eventName].Any()) | |||
{ | |||
_handlers.Remove(eventName); | |||
var eventType = _eventTypes.SingleOrDefault(e => e.Name == eventName); | |||
if (eventType != null) | |||
{ | |||
_eventTypes.Remove(eventType); | |||
} | |||
RaiseOnEventRemoved(eventName); | |||
} | |||
} | |||
} | |||
public IEnumerable<SubscriptionInfo> GetHandlersForEvent<T>() where T : IntegrationEvent | |||
{ | |||
var key = GetEventKey<T>(); | |||
return GetHandlersForEvent(key); | |||
} | |||
public IEnumerable<SubscriptionInfo> GetHandlersForEvent(string eventName) => _handlers[eventName]; | |||
private void RaiseOnEventRemoved(string eventName) | |||
{ | |||
var handler = OnEventRemoved; | |||
if (handler != null) | |||
{ | |||
OnEventRemoved(this, eventName); | |||
} | |||
} | |||
private SubscriptionInfo FindDynamicSubscriptionToRemove<TH>(string eventName) | |||
where TH : IDynamicIntegrationEventHandler | |||
{ | |||
return DoFindSubscriptionToRemove(eventName, typeof(TH)); | |||
} | |||
private SubscriptionInfo FindSubscriptionToRemove<T, TH>() | |||
where T : IntegrationEvent | |||
where TH : IIntegrationEventHandler<T> | |||
{ | |||
var eventName = GetEventKey<T>(); | |||
return DoFindSubscriptionToRemove(eventName, typeof(TH)); | |||
} | |||
private SubscriptionInfo DoFindSubscriptionToRemove(string eventName, Type handlerType) | |||
{ | |||
if (!HasSubscriptionsForEvent(eventName)) | |||
{ | |||
return null; | |||
} | |||
return _handlers[eventName].SingleOrDefault(s => s.HandlerType == handlerType); | |||
} | |||
public bool HasSubscriptionsForEvent<T>() where T : IntegrationEvent | |||
{ | |||
var key = GetEventKey<T>(); | |||
return HasSubscriptionsForEvent(key); | |||
} | |||
public bool HasSubscriptionsForEvent(string eventName) => _handlers.ContainsKey(eventName); | |||
public Type GetEventTypeByName(string eventName) => _eventTypes.SingleOrDefault(t => t.Name == eventName); | |||
public string GetEventKey<T>() | |||
{ | |||
return typeof(T).Name; | |||
} | |||
} | |||
} |
@ -1,28 +0,0 @@ | |||
using System; | |||
namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBus | |||
{ | |||
public partial class InMemoryEventBusSubscriptionsManager : IEventBusSubscriptionsManager | |||
{ | |||
public class SubscriptionInfo | |||
{ | |||
public bool IsDynamic { get; } | |||
public Type HandlerType{ get; } | |||
private SubscriptionInfo(bool isDynamic, Type handlerType) | |||
{ | |||
IsDynamic = isDynamic; | |||
HandlerType = handlerType; | |||
} | |||
public static SubscriptionInfo Dynamic(Type handlerType) | |||
{ | |||
return new SubscriptionInfo(true, handlerType); | |||
} | |||
public static SubscriptionInfo Typed(Type handlerType) | |||
{ | |||
return new SubscriptionInfo(false, handlerType); | |||
} | |||
} | |||
} | |||
} |
@ -1,131 +0,0 @@ | |||
using Microsoft.Extensions.Logging; | |||
using Polly; | |||
using Polly.Retry; | |||
using RabbitMQ.Client; | |||
using RabbitMQ.Client.Events; | |||
using RabbitMQ.Client.Exceptions; | |||
using System; | |||
using System.IO; | |||
using System.Net.Sockets; | |||
namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ | |||
{ | |||
public class DefaultRabbitMQPersistentConnection | |||
: IRabbitMQPersistentConnection | |||
{ | |||
private readonly IConnectionFactory _connectionFactory; | |||
private readonly ILogger<DefaultRabbitMQPersistentConnection> _logger; | |||
private readonly int _retryCount; | |||
IConnection _connection; | |||
bool _disposed; | |||
object sync_root = new object(); | |||
public DefaultRabbitMQPersistentConnection(IConnectionFactory connectionFactory, ILogger<DefaultRabbitMQPersistentConnection> logger, int retryCount = 5) | |||
{ | |||
_connectionFactory = connectionFactory ?? throw new ArgumentNullException(nameof(connectionFactory)); | |||
_logger = logger ?? throw new ArgumentNullException(nameof(logger)); | |||
_retryCount = retryCount; | |||
} | |||
public bool IsConnected | |||
{ | |||
get | |||
{ | |||
return _connection != null && _connection.IsOpen && !_disposed; | |||
} | |||
} | |||
public IModel CreateModel() | |||
{ | |||
if (!IsConnected) | |||
{ | |||
throw new InvalidOperationException("No RabbitMQ connections are available to perform this action"); | |||
} | |||
return _connection.CreateModel(); | |||
} | |||
public void Dispose() | |||
{ | |||
if (_disposed) return; | |||
_disposed = true; | |||
try | |||
{ | |||
_connection.Dispose(); | |||
} | |||
catch (IOException ex) | |||
{ | |||
_logger.LogCritical(ex.ToString()); | |||
} | |||
} | |||
public bool TryConnect() | |||
{ | |||
_logger.LogInformation("RabbitMQ Client is trying to connect"); | |||
lock (sync_root) | |||
{ | |||
var policy = RetryPolicy.Handle<SocketException>() | |||
.Or<BrokerUnreachableException>() | |||
.WaitAndRetry(_retryCount, retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)), (ex, time) => | |||
{ | |||
_logger.LogWarning(ex, "RabbitMQ Client could not connect after {TimeOut}s ({ExceptionMessage})", $"{time.TotalSeconds:n1}", ex.Message); | |||
} | |||
); | |||
policy.Execute(() => | |||
{ | |||
_connection = _connectionFactory | |||
.CreateConnection(); | |||
}); | |||
if (IsConnected) | |||
{ | |||
_connection.ConnectionShutdown += OnConnectionShutdown; | |||
_connection.CallbackException += OnCallbackException; | |||
_connection.ConnectionBlocked += OnConnectionBlocked; | |||
_logger.LogInformation("RabbitMQ Client acquired a persistent connection to '{HostName}' and is subscribed to failure events", _connection.Endpoint.HostName); | |||
return true; | |||
} | |||
else | |||
{ | |||
_logger.LogCritical("FATAL ERROR: RabbitMQ connections could not be created and opened"); | |||
return false; | |||
} | |||
} | |||
} | |||
private void OnConnectionBlocked(object sender, ConnectionBlockedEventArgs e) | |||
{ | |||
if (_disposed) return; | |||
_logger.LogWarning("A RabbitMQ connection is shutdown. Trying to re-connect..."); | |||
TryConnect(); | |||
} | |||
void OnCallbackException(object sender, CallbackExceptionEventArgs e) | |||
{ | |||
if (_disposed) return; | |||
_logger.LogWarning("A RabbitMQ connection throw exception. Trying to re-connect..."); | |||
TryConnect(); | |||
} | |||
void OnConnectionShutdown(object sender, ShutdownEventArgs reason) | |||
{ | |||
if (_disposed) return; | |||
_logger.LogWarning("A RabbitMQ connection is on shutdown. Trying to re-connect..."); | |||
TryConnect(); | |||
} | |||
} | |||
} |
@ -1,248 +0,0 @@ | |||
using Autofac; | |||
using Microsoft.eShopOnContainers.BuildingBlocks.EventBus; | |||
using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Abstractions; | |||
using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Events; | |||
using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Extensions; | |||
using Microsoft.Extensions.Logging; | |||
using Newtonsoft.Json; | |||
using Newtonsoft.Json.Linq; | |||
using Polly; | |||
using Polly.Retry; | |||
using RabbitMQ.Client; | |||
using RabbitMQ.Client.Events; | |||
using RabbitMQ.Client.Exceptions; | |||
using System; | |||
using System.Net.Sockets; | |||
using System.Text; | |||
using System.Threading.Tasks; | |||
namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ | |||
{ | |||
public class EventBusRabbitMQ : IEventBus, IDisposable | |||
{ | |||
const string BROKER_NAME = "eshop_event_bus"; | |||
private readonly IRabbitMQPersistentConnection _persistentConnection; | |||
private readonly ILogger<EventBusRabbitMQ> _logger; | |||
private readonly IEventBusSubscriptionsManager _subsManager; | |||
private readonly ILifetimeScope _autofac; | |||
private readonly string AUTOFAC_SCOPE_NAME = "eshop_event_bus"; | |||
private readonly int _retryCount; | |||
private IModel _consumerChannel; | |||
private string _queueName; | |||
public EventBusRabbitMQ(IRabbitMQPersistentConnection persistentConnection, ILogger<EventBusRabbitMQ> logger, | |||
ILifetimeScope autofac, IEventBusSubscriptionsManager subsManager, string queueName = null, int retryCount = 5) | |||
{ | |||
_persistentConnection = persistentConnection ?? throw new ArgumentNullException(nameof(persistentConnection)); | |||
_logger = logger ?? throw new ArgumentNullException(nameof(logger)); | |||
_subsManager = subsManager ?? new InMemoryEventBusSubscriptionsManager(); | |||
_queueName = queueName; | |||
_consumerChannel = CreateConsumerChannel(); | |||
_autofac = autofac; | |||
_retryCount = retryCount; | |||
_subsManager.OnEventRemoved += SubsManager_OnEventRemoved; | |||
} | |||
private void SubsManager_OnEventRemoved(object sender, string eventName) | |||
{ | |||
if (!_persistentConnection.IsConnected) | |||
{ | |||
_persistentConnection.TryConnect(); | |||
} | |||
using (var channel = _persistentConnection.CreateModel()) | |||
{ | |||
channel.QueueUnbind(queue: _queueName, | |||
exchange: BROKER_NAME, | |||
routingKey: eventName); | |||
if (_subsManager.IsEmpty) | |||
{ | |||
_queueName = string.Empty; | |||
_consumerChannel.Close(); | |||
} | |||
} | |||
} | |||
public void Publish(IntegrationEvent @event) | |||
{ | |||
if (!_persistentConnection.IsConnected) | |||
{ | |||
_persistentConnection.TryConnect(); | |||
} | |||
var policy = RetryPolicy.Handle<BrokerUnreachableException>() | |||
.Or<SocketException>() | |||
.WaitAndRetry(_retryCount, retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)), (ex, time) => | |||
{ | |||
_logger.LogWarning(ex, "Could not publish event: {EventId} after {Timeout}s ({ExceptionMessage})", @event.Id, $"{time.TotalSeconds:n1}", ex.Message); | |||
}); | |||
using (var channel = _persistentConnection.CreateModel()) | |||
{ | |||
var eventName = @event.GetType() | |||
.Name; | |||
channel.ExchangeDeclare(exchange: BROKER_NAME, | |||
type: "direct"); | |||
var message = JsonConvert.SerializeObject(@event); | |||
var body = Encoding.UTF8.GetBytes(message); | |||
policy.Execute(() => | |||
{ | |||
var properties = channel.CreateBasicProperties(); | |||
properties.DeliveryMode = 2; // persistent | |||
channel.BasicPublish(exchange: BROKER_NAME, | |||
routingKey: eventName, | |||
mandatory:true, | |||
basicProperties: properties, | |||
body: body); | |||
}); | |||
} | |||
} | |||
public void SubscribeDynamic<TH>(string eventName) | |||
where TH : IDynamicIntegrationEventHandler | |||
{ | |||
_logger.LogInformation("Subscribing to dynamic event {EventName} with {EventHandler}", eventName, typeof(TH).GetGenericTypeName()); | |||
DoInternalSubscription(eventName); | |||
_subsManager.AddDynamicSubscription<TH>(eventName); | |||
} | |||
public void Subscribe<T, TH>() | |||
where T : IntegrationEvent | |||
where TH : IIntegrationEventHandler<T> | |||
{ | |||
var eventName = _subsManager.GetEventKey<T>(); | |||
DoInternalSubscription(eventName); | |||
_logger.LogInformation("Subscribing to event {EventName} with {EventHandler}", eventName, typeof(TH).GetGenericTypeName()); | |||
_subsManager.AddSubscription<T, TH>(); | |||
} | |||
private void DoInternalSubscription(string eventName) | |||
{ | |||
var containsKey = _subsManager.HasSubscriptionsForEvent(eventName); | |||
if (!containsKey) | |||
{ | |||
if (!_persistentConnection.IsConnected) | |||
{ | |||
_persistentConnection.TryConnect(); | |||
} | |||
using (var channel = _persistentConnection.CreateModel()) | |||
{ | |||
channel.QueueBind(queue: _queueName, | |||
exchange: BROKER_NAME, | |||
routingKey: eventName); | |||
} | |||
} | |||
} | |||
public void Unsubscribe<T, TH>() | |||
where T : IntegrationEvent | |||
where TH : IIntegrationEventHandler<T> | |||
{ | |||
var eventName = _subsManager.GetEventKey<T>(); | |||
_logger.LogInformation("Unsubscribing from event {EventName}", eventName); | |||
_subsManager.RemoveSubscription<T, TH>(); | |||
} | |||
public void UnsubscribeDynamic<TH>(string eventName) | |||
where TH : IDynamicIntegrationEventHandler | |||
{ | |||
_subsManager.RemoveDynamicSubscription<TH>(eventName); | |||
} | |||
public void Dispose() | |||
{ | |||
if (_consumerChannel != null) | |||
{ | |||
_consumerChannel.Dispose(); | |||
} | |||
_subsManager.Clear(); | |||
} | |||
private IModel CreateConsumerChannel() | |||
{ | |||
if (!_persistentConnection.IsConnected) | |||
{ | |||
_persistentConnection.TryConnect(); | |||
} | |||
var channel = _persistentConnection.CreateModel(); | |||
channel.ExchangeDeclare(exchange: BROKER_NAME, | |||
type: "direct"); | |||
channel.QueueDeclare(queue: _queueName, | |||
durable: true, | |||
exclusive: false, | |||
autoDelete: false, | |||
arguments: null); | |||
var consumer = new EventingBasicConsumer(channel); | |||
consumer.Received += async (model, ea) => | |||
{ | |||
var eventName = ea.RoutingKey; | |||
var message = Encoding.UTF8.GetString(ea.Body); | |||
await ProcessEvent(eventName, message); | |||
channel.BasicAck(ea.DeliveryTag,multiple:false); | |||
}; | |||
channel.BasicConsume(queue: _queueName, | |||
autoAck: false, | |||
consumer: consumer); | |||
channel.CallbackException += (sender, ea) => | |||
{ | |||
_consumerChannel.Dispose(); | |||
_consumerChannel = CreateConsumerChannel(); | |||
}; | |||
return channel; | |||
} | |||
private async Task ProcessEvent(string eventName, string message) | |||
{ | |||
if (_subsManager.HasSubscriptionsForEvent(eventName)) | |||
{ | |||
using (var scope = _autofac.BeginLifetimeScope(AUTOFAC_SCOPE_NAME)) | |||
{ | |||
var subscriptions = _subsManager.GetHandlersForEvent(eventName); | |||
foreach (var subscription in subscriptions) | |||
{ | |||
if (subscription.IsDynamic) | |||
{ | |||
var handler = scope.ResolveOptional(subscription.HandlerType) as IDynamicIntegrationEventHandler; | |||
if (handler == null) continue; | |||
dynamic eventData = JObject.Parse(message); | |||
await handler.Handle(eventData); | |||
} | |||
else | |||
{ | |||
var handler = scope.ResolveOptional(subscription.HandlerType); | |||
if (handler == null) continue; | |||
var eventType = _subsManager.GetEventTypeByName(eventName); | |||
var integrationEvent = JsonConvert.DeserializeObject(message, eventType); | |||
var concreteType = typeof(IIntegrationEventHandler<>).MakeGenericType(eventType); | |||
await (Task)concreteType.GetMethod("Handle").Invoke(handler, new object[] { integrationEvent }); | |||
} | |||
} | |||
} | |||
} | |||
} | |||
} | |||
} |
@ -1,22 +0,0 @@ | |||
<Project Sdk="Microsoft.NET.Sdk"> | |||
<PropertyGroup> | |||
<TargetFramework>netstandard2.0</TargetFramework> | |||
<RootNamespace>Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ</RootNamespace> | |||
</PropertyGroup> | |||
<ItemGroup> | |||
<PackageReference Include="Autofac" Version="4.6.2" /> | |||
<PackageReference Include="Microsoft.CSharp" Version="4.5.0" /> | |||
<PackageReference Include="Microsoft.Extensions.Logging" Version="2.2.0" /> | |||
<PackageReference Include="Newtonsoft.Json" Version="11.0.2" /> | |||
<PackageReference Include="Polly" Version="6.0.1" /> | |||
<PackageReference Include="RabbitMQ.Client" Version="5.0.1" /> | |||
<PackageReference Include="System.ValueTuple" Version="4.5.0" /> | |||
</ItemGroup> | |||
<ItemGroup> | |||
<ProjectReference Include="..\EventBus\EventBus.csproj" /> | |||
</ItemGroup> | |||
</Project> |
@ -1,15 +0,0 @@ | |||
using RabbitMQ.Client; | |||
using System; | |||
namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ | |||
{ | |||
public interface IRabbitMQPersistentConnection | |||
: IDisposable | |||
{ | |||
bool IsConnected { get; } | |||
bool TryConnect(); | |||
IModel CreateModel(); | |||
} | |||
} |
@ -1,44 +0,0 @@ | |||
using Microsoft.Azure.ServiceBus; | |||
using Microsoft.Extensions.Logging; | |||
using System; | |||
namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusServiceBus | |||
{ | |||
public class DefaultServiceBusPersisterConnection :IServiceBusPersisterConnection | |||
{ | |||
private readonly ILogger<DefaultServiceBusPersisterConnection> _logger; | |||
private readonly ServiceBusConnectionStringBuilder _serviceBusConnectionStringBuilder; | |||
private ITopicClient _topicClient; | |||
bool _disposed; | |||
public DefaultServiceBusPersisterConnection(ServiceBusConnectionStringBuilder serviceBusConnectionStringBuilder, | |||
ILogger<DefaultServiceBusPersisterConnection> logger) | |||
{ | |||
_logger = logger ?? throw new ArgumentNullException(nameof(logger)); | |||
_serviceBusConnectionStringBuilder = serviceBusConnectionStringBuilder ?? | |||
throw new ArgumentNullException(nameof(serviceBusConnectionStringBuilder)); | |||
_topicClient = new TopicClient(_serviceBusConnectionStringBuilder, RetryPolicy.Default); | |||
} | |||
public ServiceBusConnectionStringBuilder ServiceBusConnectionStringBuilder => _serviceBusConnectionStringBuilder; | |||
public ITopicClient CreateModel() | |||
{ | |||
if(_topicClient.IsClosedOrClosing) | |||
{ | |||
_topicClient = new TopicClient(_serviceBusConnectionStringBuilder, RetryPolicy.Default); | |||
} | |||
return _topicClient; | |||
} | |||
public void Dispose() | |||
{ | |||
if (_disposed) return; | |||
_disposed = true; | |||
} | |||
} | |||
} |
@ -1,208 +0,0 @@ | |||
namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusServiceBus | |||
{ | |||
using Autofac; | |||
using Microsoft.Azure.ServiceBus; | |||
using Microsoft.eShopOnContainers.BuildingBlocks.EventBus; | |||
using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Abstractions; | |||
using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Events; | |||
using Microsoft.Extensions.Logging; | |||
using Newtonsoft.Json; | |||
using Newtonsoft.Json.Linq; | |||
using System; | |||
using System.Text; | |||
using System.Threading.Tasks; | |||
public class EventBusServiceBus : IEventBus | |||
{ | |||
private readonly IServiceBusPersisterConnection _serviceBusPersisterConnection; | |||
private readonly ILogger<EventBusServiceBus> _logger; | |||
private readonly IEventBusSubscriptionsManager _subsManager; | |||
private readonly SubscriptionClient _subscriptionClient; | |||
private readonly ILifetimeScope _autofac; | |||
private readonly string AUTOFAC_SCOPE_NAME = "eshop_event_bus"; | |||
private const string INTEGRATION_EVENT_SUFIX = "IntegrationEvent"; | |||
public EventBusServiceBus(IServiceBusPersisterConnection serviceBusPersisterConnection, | |||
ILogger<EventBusServiceBus> logger, IEventBusSubscriptionsManager subsManager, string subscriptionClientName, | |||
ILifetimeScope autofac) | |||
{ | |||
_serviceBusPersisterConnection = serviceBusPersisterConnection; | |||
_logger = logger ?? throw new ArgumentNullException(nameof(logger)); | |||
_subsManager = subsManager ?? new InMemoryEventBusSubscriptionsManager(); | |||
_subscriptionClient = new SubscriptionClient(serviceBusPersisterConnection.ServiceBusConnectionStringBuilder, | |||
subscriptionClientName); | |||
_autofac = autofac; | |||
RemoveDefaultRule(); | |||
RegisterSubscriptionClientMessageHandler(); | |||
} | |||
public void Publish(IntegrationEvent @event) | |||
{ | |||
var eventName = @event.GetType().Name.Replace(INTEGRATION_EVENT_SUFIX, ""); | |||
var jsonMessage = JsonConvert.SerializeObject(@event); | |||
var body = Encoding.UTF8.GetBytes(jsonMessage); | |||
var message = new Message | |||
{ | |||
MessageId = Guid.NewGuid().ToString(), | |||
Body = body, | |||
Label = eventName, | |||
}; | |||
var topicClient = _serviceBusPersisterConnection.CreateModel(); | |||
topicClient.SendAsync(message) | |||
.GetAwaiter() | |||
.GetResult(); | |||
} | |||
public void SubscribeDynamic<TH>(string eventName) | |||
where TH : IDynamicIntegrationEventHandler | |||
{ | |||
_logger.LogInformation("Subscribing to dynamic event {EventName} with {EventHandler}", eventName, nameof(TH)); | |||
_subsManager.AddDynamicSubscription<TH>(eventName); | |||
} | |||
public void Subscribe<T, TH>() | |||
where T : IntegrationEvent | |||
where TH : IIntegrationEventHandler<T> | |||
{ | |||
var eventName = typeof(T).Name.Replace(INTEGRATION_EVENT_SUFIX, ""); | |||
var containsKey = _subsManager.HasSubscriptionsForEvent<T>(); | |||
if (!containsKey) | |||
{ | |||
try | |||
{ | |||
_subscriptionClient.AddRuleAsync(new RuleDescription | |||
{ | |||
Filter = new CorrelationFilter { Label = eventName }, | |||
Name = eventName | |||
}).GetAwaiter().GetResult(); | |||
} | |||
catch (ServiceBusException) | |||
{ | |||
_logger.LogWarning("The messaging entity {eventName} already exists.", eventName); | |||
} | |||
} | |||
_logger.LogInformation("Subscribing to event {EventName} with {EventHandler}", eventName, nameof(TH)); | |||
_subsManager.AddSubscription<T, TH>(); | |||
} | |||
public void Unsubscribe<T, TH>() | |||
where T : IntegrationEvent | |||
where TH : IIntegrationEventHandler<T> | |||
{ | |||
var eventName = typeof(T).Name.Replace(INTEGRATION_EVENT_SUFIX, ""); | |||
try | |||
{ | |||
_subscriptionClient | |||
.RemoveRuleAsync(eventName) | |||
.GetAwaiter() | |||
.GetResult(); | |||
} | |||
catch (MessagingEntityNotFoundException) | |||
{ | |||
_logger.LogWarning("The messaging entity {eventName} Could not be found.", eventName); | |||
} | |||
_logger.LogInformation("Unsubscribing from event {EventName}", eventName); | |||
_subsManager.RemoveSubscription<T, TH>(); | |||
} | |||
public void UnsubscribeDynamic<TH>(string eventName) | |||
where TH : IDynamicIntegrationEventHandler | |||
{ | |||
_logger.LogInformation("Unsubscribing from dynamic event {EventName}", eventName); | |||
_subsManager.RemoveDynamicSubscription<TH>(eventName); | |||
} | |||
public void Dispose() | |||
{ | |||
_subsManager.Clear(); | |||
} | |||
private void RegisterSubscriptionClientMessageHandler() | |||
{ | |||
_subscriptionClient.RegisterMessageHandler( | |||
async (message, token) => | |||
{ | |||
var eventName = $"{message.Label}{INTEGRATION_EVENT_SUFIX}"; | |||
var messageData = Encoding.UTF8.GetString(message.Body); | |||
// Complete the message so that it is not received again. | |||
if (await ProcessEvent(eventName, messageData)) | |||
{ | |||
await _subscriptionClient.CompleteAsync(message.SystemProperties.LockToken); | |||
} | |||
}, | |||
new MessageHandlerOptions(ExceptionReceivedHandler) { MaxConcurrentCalls = 10, AutoComplete = false }); | |||
} | |||
private Task ExceptionReceivedHandler(ExceptionReceivedEventArgs exceptionReceivedEventArgs) | |||
{ | |||
var ex = exceptionReceivedEventArgs.Exception; | |||
var context = exceptionReceivedEventArgs.ExceptionReceivedContext; | |||
_logger.LogError(ex, "ERROR handling message: {ExceptionMessage} - Context: {@ExceptionContext}", ex.Message, context); | |||
return Task.CompletedTask; | |||
} | |||
private async Task<bool> ProcessEvent(string eventName, string message) | |||
{ | |||
var processed = false; | |||
if (_subsManager.HasSubscriptionsForEvent(eventName)) | |||
{ | |||
using (var scope = _autofac.BeginLifetimeScope(AUTOFAC_SCOPE_NAME)) | |||
{ | |||
var subscriptions = _subsManager.GetHandlersForEvent(eventName); | |||
foreach (var subscription in subscriptions) | |||
{ | |||
if (subscription.IsDynamic) | |||
{ | |||
var handler = scope.ResolveOptional(subscription.HandlerType) as IDynamicIntegrationEventHandler; | |||
if (handler == null) continue; | |||
dynamic eventData = JObject.Parse(message); | |||
await handler.Handle(eventData); | |||
} | |||
else | |||
{ | |||
var handler = scope.ResolveOptional(subscription.HandlerType); | |||
if (handler == null) continue; | |||
var eventType = _subsManager.GetEventTypeByName(eventName); | |||
var integrationEvent = JsonConvert.DeserializeObject(message, eventType); | |||
var concreteType = typeof(IIntegrationEventHandler<>).MakeGenericType(eventType); | |||
await (Task)concreteType.GetMethod("Handle").Invoke(handler, new object[] { integrationEvent }); | |||
} | |||
} | |||
} | |||
processed = true; | |||
} | |||
return processed; | |||
} | |||
private void RemoveDefaultRule() | |||
{ | |||
try | |||
{ | |||
_subscriptionClient | |||
.RemoveRuleAsync(RuleDescription.DefaultRuleName) | |||
.GetAwaiter() | |||
.GetResult(); | |||
} | |||
catch (MessagingEntityNotFoundException) | |||
{ | |||
_logger.LogWarning("The messaging entity {DefaultRuleName} Could not be found.", RuleDescription.DefaultRuleName); | |||
} | |||
} | |||
} | |||
} |
@ -1,19 +0,0 @@ | |||
<Project Sdk="Microsoft.NET.Sdk"> | |||
<PropertyGroup> | |||
<TargetFramework>netstandard2.0</TargetFramework> | |||
<RootNamespace>Microsoft.eShopOnContainers.BuildingBlocks.EventBusServiceBus</RootNamespace> | |||
</PropertyGroup> | |||
<ItemGroup> | |||
<PackageReference Include="Autofac" Version="4.6.2" /> | |||
<PackageReference Include="Microsoft.Azure.ServiceBus" Version="3.0.0" /> | |||
<PackageReference Include="Microsoft.CSharp" Version="4.5.0" /> | |||
<PackageReference Include="Microsoft.Extensions.Logging" Version="2.2.0" /> | |||
</ItemGroup> | |||
<ItemGroup> | |||
<ProjectReference Include="..\EventBus\EventBus.csproj" /> | |||
</ItemGroup> | |||
</Project> |
@ -1,12 +0,0 @@ | |||
namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusServiceBus | |||
{ | |||
using Microsoft.Azure.ServiceBus; | |||
using System; | |||
public interface IServiceBusPersisterConnection : IDisposable | |||
{ | |||
ServiceBusConnectionStringBuilder ServiceBusConnectionStringBuilder { get; } | |||
ITopicClient CreateModel(); | |||
} | |||
} |
@ -1,14 +0,0 @@ | |||
using System; | |||
using System.Collections.Generic; | |||
using System.Text; | |||
namespace Microsoft.eShopOnContainers.BuildingBlocks.IntegrationEventLogEF | |||
{ | |||
public enum EventStateEnum | |||
{ | |||
NotPublished = 0, | |||
InProgress = 1, | |||
Published = 2, | |||
PublishedFailed = 3 | |||
} | |||
} |
@ -1,48 +0,0 @@ | |||
using Microsoft.EntityFrameworkCore; | |||
using Microsoft.EntityFrameworkCore.Metadata.Builders; | |||
using System; | |||
using System.Collections.Generic; | |||
using System.Text; | |||
namespace Microsoft.eShopOnContainers.BuildingBlocks.IntegrationEventLogEF | |||
{ | |||
public class IntegrationEventLogContext : DbContext | |||
{ | |||
public IntegrationEventLogContext(DbContextOptions<IntegrationEventLogContext> options) : base(options) | |||
{ | |||
} | |||
public DbSet<IntegrationEventLogEntry> IntegrationEventLogs { get; set; } | |||
protected override void OnModelCreating(ModelBuilder builder) | |||
{ | |||
builder.Entity<IntegrationEventLogEntry>(ConfigureIntegrationEventLogEntry); | |||
} | |||
void ConfigureIntegrationEventLogEntry(EntityTypeBuilder<IntegrationEventLogEntry> builder) | |||
{ | |||
builder.ToTable("IntegrationEventLog"); | |||
builder.HasKey(e => e.EventId); | |||
builder.Property(e => e.EventId) | |||
.IsRequired(); | |||
builder.Property(e => e.Content) | |||
.IsRequired(); | |||
builder.Property(e => e.CreationTime) | |||
.IsRequired(); | |||
builder.Property(e => e.State) | |||
.IsRequired(); | |||
builder.Property(e => e.TimesSent) | |||
.IsRequired(); | |||
builder.Property(e => e.EventTypeName) | |||
.IsRequired(); | |||
} | |||
} | |||
} |
@ -1,19 +0,0 @@ | |||
<Project Sdk="Microsoft.NET.Sdk"> | |||
<PropertyGroup> | |||
<TargetFramework>netstandard2.0</TargetFramework> | |||
<RootNamespace>Microsoft.eShopOnContainers.BuildingBlocks.IntegrationEventLogEF</RootNamespace> | |||
</PropertyGroup> | |||
<ItemGroup> | |||
<PackageReference Include="Microsoft.EntityFrameworkCore" Version="2.2.2" /> | |||
<PackageReference Include="Microsoft.EntityFrameworkCore.Design" Version="2.2.2" /> | |||
<PackageReference Include="Microsoft.EntityFrameworkCore.Relational" Version="2.2.2" /> | |||
<PackageReference Include="Microsoft.EntityFrameworkCore.SqlServer" Version="2.2.2" /> | |||
<PackageReference Include="Newtonsoft.Json" Version="11.0.2" /> | |||
</ItemGroup> | |||
<ItemGroup> | |||
<ProjectReference Include="..\EventBus\EventBus.csproj" /> | |||
</ItemGroup> | |||
</Project> |
@ -1,41 +0,0 @@ | |||
using System; | |||
using System.Collections.Generic; | |||
using System.Text; | |||
using Newtonsoft.Json; | |||
using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Events; | |||
using System.Linq; | |||
using System.ComponentModel.DataAnnotations.Schema; | |||
using System.Reflection; | |||
namespace Microsoft.eShopOnContainers.BuildingBlocks.IntegrationEventLogEF | |||
{ | |||
public class IntegrationEventLogEntry | |||
{ | |||
private IntegrationEventLogEntry() { } | |||
public IntegrationEventLogEntry(IntegrationEvent @event) | |||
{ | |||
EventId = @event.Id; | |||
CreationTime = @event.CreationDate; | |||
EventTypeName = @event.GetType().FullName; | |||
Content = JsonConvert.SerializeObject(@event); | |||
State = EventStateEnum.NotPublished; | |||
TimesSent = 0; | |||
} | |||
public Guid EventId { get; private set; } | |||
public string EventTypeName { get; private set; } | |||
[NotMapped] | |||
public string EventTypeShortName => EventTypeName.Split('.')?.Last(); | |||
[NotMapped] | |||
public IntegrationEvent IntegrationEvent { get; private set; } | |||
public EventStateEnum State { get; set; } | |||
public int TimesSent { get; set; } | |||
public DateTime CreationTime { get; private set; } | |||
public string Content { get; private set; } | |||
public IntegrationEventLogEntry DeserializeJsonContent(Type type) | |||
{ | |||
IntegrationEvent = JsonConvert.DeserializeObject(Content, type) as IntegrationEvent; | |||
return this; | |||
} | |||
} | |||
} |
@ -1,18 +0,0 @@ | |||
using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Events; | |||
using System; | |||
using System.Collections.Generic; | |||
using System.Data.Common; | |||
using System.Linq; | |||
using System.Threading.Tasks; | |||
namespace Microsoft.eShopOnContainers.BuildingBlocks.IntegrationEventLogEF.Services | |||
{ | |||
public interface IIntegrationEventLogService | |||
{ | |||
Task<IEnumerable<IntegrationEventLogEntry>> RetrieveEventLogsPendingToPublishAsync(); | |||
Task SaveEventAsync(IntegrationEvent @event, DbTransaction transaction); | |||
Task MarkEventAsPublishedAsync(Guid eventId); | |||
Task MarkEventAsInProgressAsync(Guid eventId); | |||
Task MarkEventAsFailedAsync(Guid eventId); | |||
} | |||
} |
@ -1,89 +0,0 @@ | |||
using Microsoft.EntityFrameworkCore; | |||
using Microsoft.EntityFrameworkCore.Diagnostics; | |||
using Microsoft.eShopOnContainers.BuildingBlocks.EventBus; | |||
using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Events; | |||
using Newtonsoft.Json; | |||
using System; | |||
using System.Collections; | |||
using System.Collections.Generic; | |||
using System.Data.Common; | |||
using System.Linq; | |||
using System.Reflection; | |||
using System.Threading.Tasks; | |||
namespace Microsoft.eShopOnContainers.BuildingBlocks.IntegrationEventLogEF.Services | |||
{ | |||
public class IntegrationEventLogService : IIntegrationEventLogService | |||
{ | |||
private readonly IntegrationEventLogContext _integrationEventLogContext; | |||
private readonly DbConnection _dbConnection; | |||
private readonly List<Type> _eventTypes; | |||
public IntegrationEventLogService(DbConnection dbConnection) | |||
{ | |||
_dbConnection = dbConnection ?? throw new ArgumentNullException(nameof(dbConnection)); | |||
_integrationEventLogContext = new IntegrationEventLogContext( | |||
new DbContextOptionsBuilder<IntegrationEventLogContext>() | |||
.UseSqlServer(_dbConnection) | |||
.ConfigureWarnings(warnings => warnings.Throw(RelationalEventId.QueryClientEvaluationWarning)) | |||
.Options); | |||
_eventTypes = Assembly.Load(Assembly.GetEntryAssembly().FullName) | |||
.GetTypes() | |||
.Where(t => t.Name.EndsWith(nameof(IntegrationEvent))) | |||
.ToList(); | |||
} | |||
public async Task<IEnumerable<IntegrationEventLogEntry>> RetrieveEventLogsPendingToPublishAsync() | |||
{ | |||
return await _integrationEventLogContext.IntegrationEventLogs | |||
.Where(e => e.State == EventStateEnum.NotPublished) | |||
.OrderBy(o => o.CreationTime) | |||
.Select(e => e.DeserializeJsonContent(_eventTypes.Find(t=> t.Name == e.EventTypeShortName))) | |||
.ToListAsync(); | |||
} | |||
public Task SaveEventAsync(IntegrationEvent @event, DbTransaction transaction) | |||
{ | |||
if (transaction == null) | |||
{ | |||
throw new ArgumentNullException(nameof(transaction), $"A {typeof(DbTransaction).FullName} is required as a pre-requisite to save the event."); | |||
} | |||
var eventLogEntry = new IntegrationEventLogEntry(@event); | |||
_integrationEventLogContext.Database.UseTransaction(transaction); | |||
_integrationEventLogContext.IntegrationEventLogs.Add(eventLogEntry); | |||
return _integrationEventLogContext.SaveChangesAsync(); | |||
} | |||
public Task MarkEventAsPublishedAsync(Guid eventId) | |||
{ | |||
return UpdateEventStatus(eventId, EventStateEnum.Published); | |||
} | |||
public Task MarkEventAsInProgressAsync(Guid eventId) | |||
{ | |||
return UpdateEventStatus(eventId, EventStateEnum.InProgress); | |||
} | |||
public Task MarkEventAsFailedAsync(Guid eventId) | |||
{ | |||
return UpdateEventStatus(eventId, EventStateEnum.PublishedFailed); | |||
} | |||
private Task UpdateEventStatus(Guid eventId, EventStateEnum status) | |||
{ | |||
var eventLogEntry = _integrationEventLogContext.IntegrationEventLogs.Single(ie => ie.EventId == eventId); | |||
eventLogEntry.State = status; | |||
if(status == EventStateEnum.InProgress) | |||
eventLogEntry.TimesSent++; | |||
_integrationEventLogContext.IntegrationEventLogs.Update(eventLogEntry); | |||
return _integrationEventLogContext.SaveChangesAsync(); | |||
} | |||
} | |||
} |
@ -1,37 +0,0 @@ | |||
using Microsoft.EntityFrameworkCore; | |||
using Microsoft.EntityFrameworkCore.Storage; | |||
using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Events; | |||
using Microsoft.eShopOnContainers.BuildingBlocks.IntegrationEventLogEF.Services; | |||
using System; | |||
using System.Collections.Generic; | |||
using System.Data.Common; | |||
using System.Text; | |||
using System.Threading.Tasks; | |||
namespace Microsoft.eShopOnContainers.BuildingBlocks.IntegrationEventLogEF.Utilities | |||
{ | |||
public class ResilientTransaction | |||
{ | |||
private DbContext _context; | |||
private ResilientTransaction(DbContext context) => | |||
_context = context ?? throw new ArgumentNullException(nameof(context)); | |||
public static ResilientTransaction New (DbContext context) => | |||
new ResilientTransaction(context); | |||
public async Task ExecuteAsync(Func<Task> action) | |||
{ | |||
//Use of an EF Core resiliency strategy when using multiple DbContexts within an explicit BeginTransaction(): | |||
//See: https://docs.microsoft.com/en-us/ef/core/miscellaneous/connection-resiliency | |||
var strategy = _context.Database.CreateExecutionStrategy(); | |||
await strategy.ExecuteAsync(async () => | |||
{ | |||
using (var transaction = _context.Database.BeginTransaction()) | |||
{ | |||
await action(); | |||
transaction.Commit(); | |||
} | |||
}); | |||
} | |||
} | |||
} |