WIP
This commit is contained in:
parent
5c64be3b77
commit
e593b8d568
8
src/BuildingBlocks/EventBus/CommandBus/CommandBus.csproj
Normal file
8
src/BuildingBlocks/EventBus/CommandBus/CommandBus.csproj
Normal file
@ -0,0 +1,8 @@
|
|||||||
|
<Project Sdk="Microsoft.NET.Sdk">
|
||||||
|
|
||||||
|
<PropertyGroup>
|
||||||
|
<TargetFramework>netstandard1.0</TargetFramework>
|
||||||
|
<RootNamespace>Microsoft.eShopOnContainers.BuildingBlocks.CommandBus</RootNamespace>
|
||||||
|
</PropertyGroup>
|
||||||
|
|
||||||
|
</Project>
|
16
src/BuildingBlocks/EventBus/CommandBus/ICommandBus.cs
Normal file
16
src/BuildingBlocks/EventBus/CommandBus/ICommandBus.cs
Normal file
@ -0,0 +1,16 @@
|
|||||||
|
using System;
|
||||||
|
using System.Collections.Generic;
|
||||||
|
using System.Text;
|
||||||
|
using System.Threading.Tasks;
|
||||||
|
|
||||||
|
namespace Microsoft.eShopOnContainers.BuildingBlocks.CommandBus
|
||||||
|
{
|
||||||
|
public interface ICommandBus
|
||||||
|
{
|
||||||
|
void Send<T>(string name, T data);
|
||||||
|
void Handle<TC>(string name, IIntegrationCommandHandler<TC> handler);
|
||||||
|
void Handle(string name, IIntegrationCommandHandler handler);
|
||||||
|
void Handle<TI, TC>(TI handler)
|
||||||
|
where TI : IIntegrationCommandHandler<TC>;
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,16 @@
|
|||||||
|
using System;
|
||||||
|
using System.Collections.Generic;
|
||||||
|
using System.Text;
|
||||||
|
|
||||||
|
namespace Microsoft.eShopOnContainers.BuildingBlocks.CommandBus
|
||||||
|
{
|
||||||
|
public interface IIntegrationCommandHandler
|
||||||
|
{
|
||||||
|
void Handle(IntegrationCommand command);
|
||||||
|
}
|
||||||
|
|
||||||
|
public interface IIntegrationCommandHandler<T> : IIntegrationCommandHandler
|
||||||
|
{
|
||||||
|
void Handle(IntegrationCommand<T> command);
|
||||||
|
}
|
||||||
|
}
|
35
src/BuildingBlocks/EventBus/CommandBus/IntegrationCommand.cs
Normal file
35
src/BuildingBlocks/EventBus/CommandBus/IntegrationCommand.cs
Normal file
@ -0,0 +1,35 @@
|
|||||||
|
using System;
|
||||||
|
using System.Collections.Generic;
|
||||||
|
using System.Text;
|
||||||
|
|
||||||
|
namespace Microsoft.eShopOnContainers.BuildingBlocks.CommandBus
|
||||||
|
{
|
||||||
|
public abstract class IntegrationCommand
|
||||||
|
{
|
||||||
|
public Guid Id { get; }
|
||||||
|
public DateTime Sent { get; }
|
||||||
|
|
||||||
|
public abstract object GetDataAsObject();
|
||||||
|
|
||||||
|
protected IntegrationCommand()
|
||||||
|
{
|
||||||
|
Id = Guid.NewGuid();
|
||||||
|
Sent = DateTime.UtcNow;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
public class IntegrationCommand<T> : IntegrationCommand
|
||||||
|
{
|
||||||
|
public T Data { get; }
|
||||||
|
public string Name { get; }
|
||||||
|
public override object GetDataAsObject() => Data;
|
||||||
|
|
||||||
|
public IntegrationCommand(string name, T data) : base()
|
||||||
|
{
|
||||||
|
Data = data;
|
||||||
|
Name = name;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
@ -0,0 +1,143 @@
|
|||||||
|
using Microsoft.eShopOnContainers.BuildingBlocks.CommandBus;
|
||||||
|
using Microsoft.Extensions.Logging;
|
||||||
|
using Newtonsoft.Json;
|
||||||
|
using Polly;
|
||||||
|
using Polly.Retry;
|
||||||
|
using RabbitMQ.Client;
|
||||||
|
using RabbitMQ.Client.Events;
|
||||||
|
using RabbitMQ.Client.Exceptions;
|
||||||
|
using System;
|
||||||
|
using System.Collections.Generic;
|
||||||
|
using System.Net.Sockets;
|
||||||
|
using System.Text;
|
||||||
|
using System.Threading.Tasks;
|
||||||
|
|
||||||
|
namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ
|
||||||
|
{
|
||||||
|
public class CommandBusRabbitMQ : ICommandBus, IDisposable
|
||||||
|
{
|
||||||
|
const string BROKER_NAME = "eshop_command_bus";
|
||||||
|
|
||||||
|
private readonly IRabbitMQPersistentConnection _persistentConnection;
|
||||||
|
private readonly ILogger<CommandBusRabbitMQ> _logger;
|
||||||
|
|
||||||
|
private IModel _consumerChannel;
|
||||||
|
private string _queueName;
|
||||||
|
|
||||||
|
private readonly Dictionary<string, IIntegrationCommandHandler> _handlers;
|
||||||
|
private readonly Dictionary<string, Type> _typeMappings;
|
||||||
|
|
||||||
|
public CommandBusRabbitMQ(IRabbitMQPersistentConnection persistentConnection,
|
||||||
|
ILogger<CommandBusRabbitMQ> logger)
|
||||||
|
{
|
||||||
|
_logger = logger;
|
||||||
|
_persistentConnection = persistentConnection;
|
||||||
|
_handlers = new Dictionary<string, IIntegrationCommandHandler>();
|
||||||
|
_typeMappings = new Dictionary<string, Type>();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void Send<T>(string name, T data)
|
||||||
|
{
|
||||||
|
Send(new IntegrationCommand<T>(name, data));
|
||||||
|
}
|
||||||
|
|
||||||
|
public void Handle<TC>(string name, IIntegrationCommandHandler<TC> handler)
|
||||||
|
{
|
||||||
|
_handlers.Add(name, handler);
|
||||||
|
_typeMappings.Add(name, typeof(TC));
|
||||||
|
}
|
||||||
|
|
||||||
|
public void Handle(string name, IIntegrationCommandHandler handler)
|
||||||
|
{
|
||||||
|
_handlers.Add(name, handler);
|
||||||
|
}
|
||||||
|
public void Handle<TI, TC>(TI handler) where TI : IIntegrationCommandHandler<TC>
|
||||||
|
{
|
||||||
|
var name = typeof(TI).Name;
|
||||||
|
_handlers.Add(name, handler);
|
||||||
|
_typeMappings.Add(name, typeof(TC));
|
||||||
|
}
|
||||||
|
|
||||||
|
private void Send<T>(IntegrationCommand<T> command)
|
||||||
|
{
|
||||||
|
if (!_persistentConnection.IsConnected)
|
||||||
|
{
|
||||||
|
_persistentConnection.TryConnect();
|
||||||
|
}
|
||||||
|
|
||||||
|
var policy = RetryPolicy.Handle<BrokerUnreachableException>()
|
||||||
|
.Or<SocketException>()
|
||||||
|
.WaitAndRetry(5, retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)), (ex, time) =>
|
||||||
|
{
|
||||||
|
_logger.LogWarning(ex.ToString());
|
||||||
|
});
|
||||||
|
|
||||||
|
using (var channel = _persistentConnection.CreateModel())
|
||||||
|
{
|
||||||
|
var commandName = command.Name;
|
||||||
|
channel.ExchangeDeclare(exchange: BROKER_NAME, type: "direct");
|
||||||
|
var message = JsonConvert.SerializeObject(command);
|
||||||
|
var body = Encoding.UTF8.GetBytes(message);
|
||||||
|
policy.Execute(() =>
|
||||||
|
{
|
||||||
|
channel.BasicPublish(exchange: BROKER_NAME,
|
||||||
|
routingKey: commandName,
|
||||||
|
basicProperties: null,
|
||||||
|
body: body);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private IModel CreateConsumerChannel()
|
||||||
|
{
|
||||||
|
if (!_persistentConnection.IsConnected)
|
||||||
|
{
|
||||||
|
_persistentConnection.TryConnect();
|
||||||
|
}
|
||||||
|
|
||||||
|
var channel = _persistentConnection.CreateModel();
|
||||||
|
|
||||||
|
channel.ExchangeDeclare(exchange: BROKER_NAME, type: "direct");
|
||||||
|
_queueName = channel.QueueDeclare().QueueName;
|
||||||
|
var consumer = new EventingBasicConsumer(channel);
|
||||||
|
consumer.Received += async (model, ea) =>
|
||||||
|
{
|
||||||
|
var commandName = ea.RoutingKey;
|
||||||
|
var message = Encoding.UTF8.GetString(ea.Body);
|
||||||
|
await InvokeHandler(commandName, message);
|
||||||
|
};
|
||||||
|
|
||||||
|
channel.BasicConsume(queue: _queueName,
|
||||||
|
noAck: true,
|
||||||
|
consumer: consumer);
|
||||||
|
|
||||||
|
channel.CallbackException += (sender, ea) =>
|
||||||
|
{
|
||||||
|
_consumerChannel.Dispose();
|
||||||
|
_consumerChannel = CreateConsumerChannel();
|
||||||
|
};
|
||||||
|
|
||||||
|
return channel;
|
||||||
|
}
|
||||||
|
|
||||||
|
private Task InvokeHandler(string commandName, string message)
|
||||||
|
{
|
||||||
|
if (_handlers.ContainsKey(commandName))
|
||||||
|
{
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
public void Dispose()
|
||||||
|
{
|
||||||
|
if (_consumerChannel != null)
|
||||||
|
{
|
||||||
|
_consumerChannel.Dispose();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
Loading…
x
Reference in New Issue
Block a user