From 8dba1d66acee85e1ad5521a6621eb8ad3887b0e5 Mon Sep 17 00:00:00 2001 From: Eduard Tomas Date: Tue, 2 May 2017 19:43:54 +0200 Subject: [PATCH] WIP --- .../EventBus/CommandBus/CommandBus.csproj | 8 + .../EventBus/CommandBus/ICommandBus.cs | 16 ++ .../CommandBus/IIntegrationCommandHandler.cs | 16 ++ .../EventBus/CommandBus/IntegrationCommand.cs | 35 +++++ .../EventBusRabbitMQ/CommandBusRabbitMQ.cs | 143 ++++++++++++++++++ 5 files changed, 218 insertions(+) create mode 100644 src/BuildingBlocks/EventBus/CommandBus/CommandBus.csproj create mode 100644 src/BuildingBlocks/EventBus/CommandBus/ICommandBus.cs create mode 100644 src/BuildingBlocks/EventBus/CommandBus/IIntegrationCommandHandler.cs create mode 100644 src/BuildingBlocks/EventBus/CommandBus/IntegrationCommand.cs create mode 100644 src/BuildingBlocks/EventBus/EventBusRabbitMQ/CommandBusRabbitMQ.cs diff --git a/src/BuildingBlocks/EventBus/CommandBus/CommandBus.csproj b/src/BuildingBlocks/EventBus/CommandBus/CommandBus.csproj new file mode 100644 index 000000000..7c3327057 --- /dev/null +++ b/src/BuildingBlocks/EventBus/CommandBus/CommandBus.csproj @@ -0,0 +1,8 @@ + + + + netstandard1.0 + Microsoft.eShopOnContainers.BuildingBlocks.CommandBus + + + \ No newline at end of file diff --git a/src/BuildingBlocks/EventBus/CommandBus/ICommandBus.cs b/src/BuildingBlocks/EventBus/CommandBus/ICommandBus.cs new file mode 100644 index 000000000..813d9406c --- /dev/null +++ b/src/BuildingBlocks/EventBus/CommandBus/ICommandBus.cs @@ -0,0 +1,16 @@ +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading.Tasks; + +namespace Microsoft.eShopOnContainers.BuildingBlocks.CommandBus +{ + public interface ICommandBus + { + void Send(string name, T data); + void Handle(string name, IIntegrationCommandHandler handler); + void Handle(string name, IIntegrationCommandHandler handler); + void Handle(TI handler) + where TI : IIntegrationCommandHandler; + } +} diff --git a/src/BuildingBlocks/EventBus/CommandBus/IIntegrationCommandHandler.cs b/src/BuildingBlocks/EventBus/CommandBus/IIntegrationCommandHandler.cs new file mode 100644 index 000000000..07f0c1eea --- /dev/null +++ b/src/BuildingBlocks/EventBus/CommandBus/IIntegrationCommandHandler.cs @@ -0,0 +1,16 @@ +using System; +using System.Collections.Generic; +using System.Text; + +namespace Microsoft.eShopOnContainers.BuildingBlocks.CommandBus +{ + public interface IIntegrationCommandHandler + { + void Handle(IntegrationCommand command); + } + + public interface IIntegrationCommandHandler : IIntegrationCommandHandler + { + void Handle(IntegrationCommand command); + } +} diff --git a/src/BuildingBlocks/EventBus/CommandBus/IntegrationCommand.cs b/src/BuildingBlocks/EventBus/CommandBus/IntegrationCommand.cs new file mode 100644 index 000000000..8df6e5279 --- /dev/null +++ b/src/BuildingBlocks/EventBus/CommandBus/IntegrationCommand.cs @@ -0,0 +1,35 @@ +using System; +using System.Collections.Generic; +using System.Text; + +namespace Microsoft.eShopOnContainers.BuildingBlocks.CommandBus +{ + public abstract class IntegrationCommand + { + public Guid Id { get; } + public DateTime Sent { get; } + + public abstract object GetDataAsObject(); + + protected IntegrationCommand() + { + Id = Guid.NewGuid(); + Sent = DateTime.UtcNow; + } + + } + + public class IntegrationCommand : IntegrationCommand + { + public T Data { get; } + public string Name { get; } + public override object GetDataAsObject() => Data; + + public IntegrationCommand(string name, T data) : base() + { + Data = data; + Name = name; + } + } + +} diff --git a/src/BuildingBlocks/EventBus/EventBusRabbitMQ/CommandBusRabbitMQ.cs b/src/BuildingBlocks/EventBus/EventBusRabbitMQ/CommandBusRabbitMQ.cs new file mode 100644 index 000000000..699bf3772 --- /dev/null +++ b/src/BuildingBlocks/EventBus/EventBusRabbitMQ/CommandBusRabbitMQ.cs @@ -0,0 +1,143 @@ +using Microsoft.eShopOnContainers.BuildingBlocks.CommandBus; +using Microsoft.Extensions.Logging; +using Newtonsoft.Json; +using Polly; +using Polly.Retry; +using RabbitMQ.Client; +using RabbitMQ.Client.Events; +using RabbitMQ.Client.Exceptions; +using System; +using System.Collections.Generic; +using System.Net.Sockets; +using System.Text; +using System.Threading.Tasks; + +namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ +{ + public class CommandBusRabbitMQ : ICommandBus, IDisposable + { + const string BROKER_NAME = "eshop_command_bus"; + + private readonly IRabbitMQPersistentConnection _persistentConnection; + private readonly ILogger _logger; + + private IModel _consumerChannel; + private string _queueName; + + private readonly Dictionary _handlers; + private readonly Dictionary _typeMappings; + + public CommandBusRabbitMQ(IRabbitMQPersistentConnection persistentConnection, + ILogger logger) + { + _logger = logger; + _persistentConnection = persistentConnection; + _handlers = new Dictionary(); + _typeMappings = new Dictionary(); + } + + public void Send(string name, T data) + { + Send(new IntegrationCommand(name, data)); + } + + public void Handle(string name, IIntegrationCommandHandler handler) + { + _handlers.Add(name, handler); + _typeMappings.Add(name, typeof(TC)); + } + + public void Handle(string name, IIntegrationCommandHandler handler) + { + _handlers.Add(name, handler); + } + public void Handle(TI handler) where TI : IIntegrationCommandHandler + { + var name = typeof(TI).Name; + _handlers.Add(name, handler); + _typeMappings.Add(name, typeof(TC)); + } + + private void Send(IntegrationCommand command) + { + if (!_persistentConnection.IsConnected) + { + _persistentConnection.TryConnect(); + } + + var policy = RetryPolicy.Handle() + .Or() + .WaitAndRetry(5, retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)), (ex, time) => + { + _logger.LogWarning(ex.ToString()); + }); + + using (var channel = _persistentConnection.CreateModel()) + { + var commandName = command.Name; + channel.ExchangeDeclare(exchange: BROKER_NAME, type: "direct"); + var message = JsonConvert.SerializeObject(command); + var body = Encoding.UTF8.GetBytes(message); + policy.Execute(() => + { + channel.BasicPublish(exchange: BROKER_NAME, + routingKey: commandName, + basicProperties: null, + body: body); + }); + } + } + + private IModel CreateConsumerChannel() + { + if (!_persistentConnection.IsConnected) + { + _persistentConnection.TryConnect(); + } + + var channel = _persistentConnection.CreateModel(); + + channel.ExchangeDeclare(exchange: BROKER_NAME, type: "direct"); + _queueName = channel.QueueDeclare().QueueName; + var consumer = new EventingBasicConsumer(channel); + consumer.Received += async (model, ea) => + { + var commandName = ea.RoutingKey; + var message = Encoding.UTF8.GetString(ea.Body); + await InvokeHandler(commandName, message); + }; + + channel.BasicConsume(queue: _queueName, + noAck: true, + consumer: consumer); + + channel.CallbackException += (sender, ea) => + { + _consumerChannel.Dispose(); + _consumerChannel = CreateConsumerChannel(); + }; + + return channel; + } + + private Task InvokeHandler(string commandName, string message) + { + if (_handlers.ContainsKey(commandName)) + { + + } + + } + + public void Dispose() + { + if (_consumerChannel != null) + { + _consumerChannel.Dispose(); + } + } + + + + } +}