From aa7556a1bdbfce4cff7f22c5185423c2cfd4acb4 Mon Sep 17 00:00:00 2001 From: Christian Arenas Date: Wed, 24 May 2017 15:34:55 +0200 Subject: [PATCH] Create EventBusServiceBus project and add EventBusServiceBus/ServiceBusPersisterConnection to project --- eShopOnContainers-ServicesAndWebApps.sln | 53 ++++++- .../DefaultServiceBusPersisterConnection.cs | 49 +++++++ .../EventBusServiceBus/EventBusServiceBus.cs | 134 ++++++++++++++++++ .../EventBusServiceBus.csproj | 17 +++ .../IServiceBusPersisterConnection.cs | 14 ++ 5 files changed, 266 insertions(+), 1 deletion(-) create mode 100644 src/BuildingBlocks/EventBus/EventBusServiceBus/DefaultServiceBusPersisterConnection.cs create mode 100644 src/BuildingBlocks/EventBus/EventBusServiceBus/EventBusServiceBus.cs create mode 100644 src/BuildingBlocks/EventBus/EventBusServiceBus/EventBusServiceBus.csproj create mode 100644 src/BuildingBlocks/EventBus/EventBusServiceBus/IServiceBusPersisterConnection.cs diff --git a/eShopOnContainers-ServicesAndWebApps.sln b/eShopOnContainers-ServicesAndWebApps.sln index eb0e83e02..6aa891a2e 100644 --- a/eShopOnContainers-ServicesAndWebApps.sln +++ b/eShopOnContainers-ServicesAndWebApps.sln @@ -1,7 +1,7 @@  Microsoft Visual Studio Solution File, Format Version 12.00 # Visual Studio 15 -VisualStudioVersion = 15.0.26403.3 +VisualStudioVersion = 15.0.26430.6 MinimumVisualStudioVersion = 10.0.40219.1 Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "src", "src", "{932D8224-11F6-4D07-B109-DA28AD288A63}" EndProject @@ -76,6 +76,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Microsoft.Extensions.Health EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "EventBus.Tests", "src\BuildingBlocks\EventBus\EventBus.Tests\EventBus.Tests.csproj", "{89D80DF1-32E1-4AAF-970F-DA0AA6881F9D}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "EventBusServiceBus", "src\BuildingBlocks\EventBus\EventBusServiceBus\EventBusServiceBus.csproj", "{69AF10D3-AA76-4FF7-B187-EC7E8CC5F5B8}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Ad-Hoc|Any CPU = Ad-Hoc|Any CPU @@ -1002,6 +1004,54 @@ Global {89D80DF1-32E1-4AAF-970F-DA0AA6881F9D}.Release|x64.Build.0 = Release|Any CPU {89D80DF1-32E1-4AAF-970F-DA0AA6881F9D}.Release|x86.ActiveCfg = Release|Any CPU {89D80DF1-32E1-4AAF-970F-DA0AA6881F9D}.Release|x86.Build.0 = Release|Any CPU + {69AF10D3-AA76-4FF7-B187-EC7E8CC5F5B8}.Ad-Hoc|Any CPU.ActiveCfg = Debug|Any CPU + {69AF10D3-AA76-4FF7-B187-EC7E8CC5F5B8}.Ad-Hoc|Any CPU.Build.0 = Debug|Any CPU + {69AF10D3-AA76-4FF7-B187-EC7E8CC5F5B8}.Ad-Hoc|ARM.ActiveCfg = Debug|Any CPU + {69AF10D3-AA76-4FF7-B187-EC7E8CC5F5B8}.Ad-Hoc|ARM.Build.0 = Debug|Any CPU + {69AF10D3-AA76-4FF7-B187-EC7E8CC5F5B8}.Ad-Hoc|iPhone.ActiveCfg = Debug|Any CPU + {69AF10D3-AA76-4FF7-B187-EC7E8CC5F5B8}.Ad-Hoc|iPhone.Build.0 = Debug|Any CPU + {69AF10D3-AA76-4FF7-B187-EC7E8CC5F5B8}.Ad-Hoc|iPhoneSimulator.ActiveCfg = Debug|Any CPU + {69AF10D3-AA76-4FF7-B187-EC7E8CC5F5B8}.Ad-Hoc|iPhoneSimulator.Build.0 = Debug|Any CPU + {69AF10D3-AA76-4FF7-B187-EC7E8CC5F5B8}.Ad-Hoc|x64.ActiveCfg = Debug|Any CPU + {69AF10D3-AA76-4FF7-B187-EC7E8CC5F5B8}.Ad-Hoc|x64.Build.0 = Debug|Any CPU + {69AF10D3-AA76-4FF7-B187-EC7E8CC5F5B8}.Ad-Hoc|x86.ActiveCfg = Debug|Any CPU + {69AF10D3-AA76-4FF7-B187-EC7E8CC5F5B8}.Ad-Hoc|x86.Build.0 = Debug|Any CPU + {69AF10D3-AA76-4FF7-B187-EC7E8CC5F5B8}.AppStore|Any CPU.ActiveCfg = Debug|Any CPU + {69AF10D3-AA76-4FF7-B187-EC7E8CC5F5B8}.AppStore|Any CPU.Build.0 = Debug|Any CPU + {69AF10D3-AA76-4FF7-B187-EC7E8CC5F5B8}.AppStore|ARM.ActiveCfg = Debug|Any CPU + {69AF10D3-AA76-4FF7-B187-EC7E8CC5F5B8}.AppStore|ARM.Build.0 = Debug|Any CPU + {69AF10D3-AA76-4FF7-B187-EC7E8CC5F5B8}.AppStore|iPhone.ActiveCfg = Debug|Any CPU + {69AF10D3-AA76-4FF7-B187-EC7E8CC5F5B8}.AppStore|iPhone.Build.0 = Debug|Any CPU + {69AF10D3-AA76-4FF7-B187-EC7E8CC5F5B8}.AppStore|iPhoneSimulator.ActiveCfg = Debug|Any CPU + {69AF10D3-AA76-4FF7-B187-EC7E8CC5F5B8}.AppStore|iPhoneSimulator.Build.0 = Debug|Any CPU + {69AF10D3-AA76-4FF7-B187-EC7E8CC5F5B8}.AppStore|x64.ActiveCfg = Debug|Any CPU + {69AF10D3-AA76-4FF7-B187-EC7E8CC5F5B8}.AppStore|x64.Build.0 = Debug|Any CPU + {69AF10D3-AA76-4FF7-B187-EC7E8CC5F5B8}.AppStore|x86.ActiveCfg = Debug|Any CPU + {69AF10D3-AA76-4FF7-B187-EC7E8CC5F5B8}.AppStore|x86.Build.0 = Debug|Any CPU + {69AF10D3-AA76-4FF7-B187-EC7E8CC5F5B8}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {69AF10D3-AA76-4FF7-B187-EC7E8CC5F5B8}.Debug|Any CPU.Build.0 = Debug|Any CPU + {69AF10D3-AA76-4FF7-B187-EC7E8CC5F5B8}.Debug|ARM.ActiveCfg = Debug|Any CPU + {69AF10D3-AA76-4FF7-B187-EC7E8CC5F5B8}.Debug|ARM.Build.0 = Debug|Any CPU + {69AF10D3-AA76-4FF7-B187-EC7E8CC5F5B8}.Debug|iPhone.ActiveCfg = Debug|Any CPU + {69AF10D3-AA76-4FF7-B187-EC7E8CC5F5B8}.Debug|iPhone.Build.0 = Debug|Any CPU + {69AF10D3-AA76-4FF7-B187-EC7E8CC5F5B8}.Debug|iPhoneSimulator.ActiveCfg = Debug|Any CPU + {69AF10D3-AA76-4FF7-B187-EC7E8CC5F5B8}.Debug|iPhoneSimulator.Build.0 = Debug|Any CPU + {69AF10D3-AA76-4FF7-B187-EC7E8CC5F5B8}.Debug|x64.ActiveCfg = Debug|Any CPU + {69AF10D3-AA76-4FF7-B187-EC7E8CC5F5B8}.Debug|x64.Build.0 = Debug|Any CPU + {69AF10D3-AA76-4FF7-B187-EC7E8CC5F5B8}.Debug|x86.ActiveCfg = Debug|Any CPU + {69AF10D3-AA76-4FF7-B187-EC7E8CC5F5B8}.Debug|x86.Build.0 = Debug|Any CPU + {69AF10D3-AA76-4FF7-B187-EC7E8CC5F5B8}.Release|Any CPU.ActiveCfg = Release|Any CPU + {69AF10D3-AA76-4FF7-B187-EC7E8CC5F5B8}.Release|Any CPU.Build.0 = Release|Any CPU + {69AF10D3-AA76-4FF7-B187-EC7E8CC5F5B8}.Release|ARM.ActiveCfg = Release|Any CPU + {69AF10D3-AA76-4FF7-B187-EC7E8CC5F5B8}.Release|ARM.Build.0 = Release|Any CPU + {69AF10D3-AA76-4FF7-B187-EC7E8CC5F5B8}.Release|iPhone.ActiveCfg = Release|Any CPU + {69AF10D3-AA76-4FF7-B187-EC7E8CC5F5B8}.Release|iPhone.Build.0 = Release|Any CPU + {69AF10D3-AA76-4FF7-B187-EC7E8CC5F5B8}.Release|iPhoneSimulator.ActiveCfg = Release|Any CPU + {69AF10D3-AA76-4FF7-B187-EC7E8CC5F5B8}.Release|iPhoneSimulator.Build.0 = Release|Any CPU + {69AF10D3-AA76-4FF7-B187-EC7E8CC5F5B8}.Release|x64.ActiveCfg = Release|Any CPU + {69AF10D3-AA76-4FF7-B187-EC7E8CC5F5B8}.Release|x64.Build.0 = Release|Any CPU + {69AF10D3-AA76-4FF7-B187-EC7E8CC5F5B8}.Release|x86.ActiveCfg = Release|Any CPU + {69AF10D3-AA76-4FF7-B187-EC7E8CC5F5B8}.Release|x86.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -1038,5 +1088,6 @@ Global {22A0F9C1-2D4A-4107-95B7-8459E6688BC5} = {A81ECBC2-6B00-4DCD-8388-469174033379} {4BD76717-3102-4969-8C2C-BAAA3F0263B6} = {A81ECBC2-6B00-4DCD-8388-469174033379} {89D80DF1-32E1-4AAF-970F-DA0AA6881F9D} = {807BB76E-B2BB-47A2-A57B-3D1B20FF5E7F} + {69AF10D3-AA76-4FF7-B187-EC7E8CC5F5B8} = {807BB76E-B2BB-47A2-A57B-3D1B20FF5E7F} EndGlobalSection EndGlobal diff --git a/src/BuildingBlocks/EventBus/EventBusServiceBus/DefaultServiceBusPersisterConnection.cs b/src/BuildingBlocks/EventBus/EventBusServiceBus/DefaultServiceBusPersisterConnection.cs new file mode 100644 index 000000000..3b9d9bda4 --- /dev/null +++ b/src/BuildingBlocks/EventBus/EventBusServiceBus/DefaultServiceBusPersisterConnection.cs @@ -0,0 +1,49 @@ +using Microsoft.Azure.ServiceBus; +using Microsoft.Extensions.Logging; +using System; +using System.IO; + +namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusServiceBus +{ + public class DefaultServiceBusPersisterConnection : ServiceBusConnection, IServiceBusPersisterConnection + { + private readonly ILogger _logger; + private readonly ServiceBusConnectionStringBuilder _serviceBusConnectionStringBuilder; + private ITopicClient _topicClient; + + bool _disposed; + object sync_root = new object(); + + public DefaultServiceBusPersisterConnection(ServiceBusConnectionStringBuilder serviceBusConnectionStringBuilder, + TimeSpan operationTimeout, RetryPolicy retryPolicy, ILogger logger) + : base(operationTimeout, retryPolicy) + { + _logger = logger ?? throw new ArgumentNullException(nameof(logger)); + + InitializeConnection(serviceBusConnectionStringBuilder); + _serviceBusConnectionStringBuilder = serviceBusConnectionStringBuilder ?? + throw new ArgumentNullException(nameof(serviceBusConnectionStringBuilder)); + } + + public bool IsConnected => _topicClient.IsClosedOrClosing; + + public ServiceBusConnectionStringBuilder ServiceBusConnectionStringBuilder => _serviceBusConnectionStringBuilder; + + public ITopicClient CreateModel() + { + if(_topicClient.IsClosedOrClosing) + { + _topicClient = new TopicClient(_serviceBusConnectionStringBuilder, RetryPolicy); + } + + return _topicClient; + } + + public void Dispose() + { + if (_disposed) return; + + _disposed = true; + } + } +} diff --git a/src/BuildingBlocks/EventBus/EventBusServiceBus/EventBusServiceBus.cs b/src/BuildingBlocks/EventBus/EventBusServiceBus/EventBusServiceBus.cs new file mode 100644 index 000000000..1433a9f65 --- /dev/null +++ b/src/BuildingBlocks/EventBus/EventBusServiceBus/EventBusServiceBus.cs @@ -0,0 +1,134 @@ +namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusServiceBus +{ + using System; + using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Abstractions; + using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Events; + using Microsoft.Extensions.Logging; + using Microsoft.Azure.ServiceBus; + using Newtonsoft.Json; + using System.Text; + using System.Threading.Tasks; + using Microsoft.eShopOnContainers.BuildingBlocks.EventBus; + using System.Reflection; + using Microsoft.Azure.ServiceBus.Filters; + + public class EventBusServiceBus : IEventBus + { + private readonly IServiceBusPersisterConnection _serviceBusPersisterConnection; + private ServiceBusConnectionStringBuilder _serviceBusConnectionStringBuilder; + private readonly ILogger _logger; + private readonly IEventBusSubscriptionsManager _subsManager; + private readonly SubscriptionClient _subscriptionClient; + + public EventBusServiceBus(IServiceBusPersisterConnection serviceBusPersisterConnection, + ILogger logger, IEventBusSubscriptionsManager subsManager, string subscriptionClientName) + { + _serviceBusPersisterConnection = serviceBusPersisterConnection; + _logger = logger; + _subsManager = subsManager ?? new InMemoryEventBusSubscriptionsManager(); + + _subscriptionClient = new SubscriptionClient(serviceBusPersisterConnection.ServiceBusConnectionStringBuilder, + subscriptionClientName); + } + + public void Publish(IntegrationEvent @event) + { + var eventName = @event.GetType().Name; + var jsonMessage = JsonConvert.SerializeObject(@event); + var body = Encoding.UTF8.GetBytes(jsonMessage); + + var message = new Message + { + MessageId = new Guid().ToString(), + Body = Encoding.UTF8.GetBytes(jsonMessage), + Label = eventName, + }; + + var topicClient = _serviceBusPersisterConnection.CreateModel(); + + topicClient.SendAsync(message) + .GetAwaiter() + .GetResult(); + } + + public void Subscribe(Func handler) + where T : IntegrationEvent + where TH : IIntegrationEventHandler + { + var eventName = typeof(T).Name; + var containsKey = _subsManager.HasSubscriptionsForEvent(); + if (!containsKey) + { + try + { + _subscriptionClient.AddRuleAsync(new RuleDescription + { + Filter = new CorrelationFilter { Label = eventName }, + Name = eventName + }).GetAwaiter().GetResult(); + } + catch(ServiceBusException) + { + _logger.LogWarning($"The messaging entity {eventName} already exists."); + } + } + + _subsManager.AddSubscription(handler); + } + + public void Unsubscribe() + where T : IntegrationEvent + where TH : IIntegrationEventHandler + { + var eventName = typeof(T).Name; + + try + { + _subscriptionClient + .RemoveRuleAsync(eventName) + .GetAwaiter() + .GetResult(); + } + catch (MessagingEntityNotFoundException) + { + _logger.LogWarning($"The messaging entity {eventName} Could not be found."); + } + + _subsManager.RemoveSubscription(); + } + + public void Dispose() + { + _subsManager.Clear(); + } + + //private async Task CreateConsumerChannel() + //{ + // _subscriptionClient.RegisterMessageHandler( + // async (message, token) => + // { + // var eventName = message.Label; + // var messageData = Encoding.UTF8.GetString(message.Body); + // await ProcessEvent(eventName, messageData); + // }, + // new MessageHandlerOptions() { MaxConcurrentCalls = 10, AutoComplete = true }); + //} + + private async Task ProcessEvent(string eventName, string message) + { + if (_subsManager.HasSubscriptionsForEvent(eventName)) + { + var eventType = _subsManager.GetEventTypeByName(eventName); + var integrationEvent = JsonConvert.DeserializeObject(message, eventType); + var handlers = _subsManager.GetHandlersForEvent(eventName); + + foreach (var handlerfactory in handlers) + { + var handler = handlerfactory.DynamicInvoke(); + var concreteType = typeof(IIntegrationEventHandler<>).MakeGenericType(eventType); + await (Task)concreteType.GetMethod("Handle").Invoke(handler, new object[] { integrationEvent }); + } + } + } + } +} diff --git a/src/BuildingBlocks/EventBus/EventBusServiceBus/EventBusServiceBus.csproj b/src/BuildingBlocks/EventBus/EventBusServiceBus/EventBusServiceBus.csproj new file mode 100644 index 000000000..5be30800e --- /dev/null +++ b/src/BuildingBlocks/EventBus/EventBusServiceBus/EventBusServiceBus.csproj @@ -0,0 +1,17 @@ + + + + netcoreapp1.1 + Microsoft.eShopOnContainers.BuildingBlocks.EventBusServiceBus + + + + + + + + + + + + \ No newline at end of file diff --git a/src/BuildingBlocks/EventBus/EventBusServiceBus/IServiceBusPersisterConnection.cs b/src/BuildingBlocks/EventBus/EventBusServiceBus/IServiceBusPersisterConnection.cs new file mode 100644 index 000000000..e995db27e --- /dev/null +++ b/src/BuildingBlocks/EventBus/EventBusServiceBus/IServiceBusPersisterConnection.cs @@ -0,0 +1,14 @@ +namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusServiceBus +{ + using System; + using Microsoft.Azure.ServiceBus; + + public interface IServiceBusPersisterConnection : IDisposable + { + ServiceBusConnectionStringBuilder ServiceBusConnectionStringBuilder { get; } + + bool IsConnected { get; } + + ITopicClient CreateModel(); + } +} \ No newline at end of file