From a12e9bdbb5081c7011ff8c7c5b9772b70d928e50 Mon Sep 17 00:00:00 2001 From: Philipp Theyssen Date: Mon, 13 Feb 2023 11:23:51 +0100 Subject: [PATCH 1/8] Add EventBusKafka project skeleton --- .../DefaultKafkaPersistentConnection.cs | 6 +++ .../EventBus/EventBusKafka/EventBusKafka.cs | 6 +++ .../EventBusKafka/EventBusKafka.csproj | 13 +++++ .../EventBus/EventBusKafka/GlobalUsings.cs | 6 +++ .../IKafkaPersistentConnection.cs | 6 +++ src/eShopOnContainers-ServicesAndWebApps.sln | 51 +++++++++++++++++++ 6 files changed, 88 insertions(+) create mode 100644 src/BuildingBlocks/EventBus/EventBusKafka/DefaultKafkaPersistentConnection.cs create mode 100644 src/BuildingBlocks/EventBus/EventBusKafka/EventBusKafka.cs create mode 100644 src/BuildingBlocks/EventBus/EventBusKafka/EventBusKafka.csproj create mode 100644 src/BuildingBlocks/EventBus/EventBusKafka/GlobalUsings.cs create mode 100644 src/BuildingBlocks/EventBus/EventBusKafka/IKafkaPersistentConnection.cs diff --git a/src/BuildingBlocks/EventBus/EventBusKafka/DefaultKafkaPersistentConnection.cs b/src/BuildingBlocks/EventBus/EventBusKafka/DefaultKafkaPersistentConnection.cs new file mode 100644 index 000000000..17278bf02 --- /dev/null +++ b/src/BuildingBlocks/EventBus/EventBusKafka/DefaultKafkaPersistentConnection.cs @@ -0,0 +1,6 @@ +namespace EventBusKafka; + +public class DefaultKafkaPersistentConnection +{ + +} \ No newline at end of file diff --git a/src/BuildingBlocks/EventBus/EventBusKafka/EventBusKafka.cs b/src/BuildingBlocks/EventBus/EventBusKafka/EventBusKafka.cs new file mode 100644 index 000000000..70e0ff1b1 --- /dev/null +++ b/src/BuildingBlocks/EventBus/EventBusKafka/EventBusKafka.cs @@ -0,0 +1,6 @@ +namespace EventBusKafka; + +public class EventBusKafka +{ + +} \ No newline at end of file diff --git a/src/BuildingBlocks/EventBus/EventBusKafka/EventBusKafka.csproj b/src/BuildingBlocks/EventBus/EventBusKafka/EventBusKafka.csproj new file mode 100644 index 000000000..28689f59d --- /dev/null +++ b/src/BuildingBlocks/EventBus/EventBusKafka/EventBusKafka.csproj @@ -0,0 +1,13 @@ + + + + net7.0 + enable + enable + + + + + + + diff --git a/src/BuildingBlocks/EventBus/EventBusKafka/GlobalUsings.cs b/src/BuildingBlocks/EventBus/EventBusKafka/GlobalUsings.cs new file mode 100644 index 000000000..fd2a37b20 --- /dev/null +++ b/src/BuildingBlocks/EventBus/EventBusKafka/GlobalUsings.cs @@ -0,0 +1,6 @@ +namespace EventBusKafka; + +public class GlobalUsings +{ + +} \ No newline at end of file diff --git a/src/BuildingBlocks/EventBus/EventBusKafka/IKafkaPersistentConnection.cs b/src/BuildingBlocks/EventBus/EventBusKafka/IKafkaPersistentConnection.cs new file mode 100644 index 000000000..512c94be9 --- /dev/null +++ b/src/BuildingBlocks/EventBus/EventBusKafka/IKafkaPersistentConnection.cs @@ -0,0 +1,6 @@ +namespace EventBusKafka; + +public class IKafkaPersistentConnection +{ + +} \ No newline at end of file diff --git a/src/eShopOnContainers-ServicesAndWebApps.sln b/src/eShopOnContainers-ServicesAndWebApps.sln index c45bfe0bf..edf7ea518 100644 --- a/src/eShopOnContainers-ServicesAndWebApps.sln +++ b/src/eShopOnContainers-ServicesAndWebApps.sln @@ -124,6 +124,8 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "tests", "tests", "{373D8AA1 EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "EventBus.Tests", "BuildingBlocks\EventBus\EventBus.Tests\EventBus.Tests.csproj", "{95D735BE-2899-4495-BE3F-2600E93B4E3C}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "EventBusKafka", "BuildingBlocks\EventBus\EventBusKafka\EventBusKafka.csproj", "{230B3FC1-6971-43C4-A75E-867F5F24B607}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Ad-Hoc|Any CPU = Ad-Hoc|Any CPU @@ -1530,6 +1532,54 @@ Global {95D735BE-2899-4495-BE3F-2600E93B4E3C}.Release|x64.Build.0 = Release|Any CPU {95D735BE-2899-4495-BE3F-2600E93B4E3C}.Release|x86.ActiveCfg = Release|Any CPU {95D735BE-2899-4495-BE3F-2600E93B4E3C}.Release|x86.Build.0 = Release|Any CPU + {230B3FC1-6971-43C4-A75E-867F5F24B607}.Ad-Hoc|Any CPU.ActiveCfg = Debug|Any CPU + {230B3FC1-6971-43C4-A75E-867F5F24B607}.Ad-Hoc|Any CPU.Build.0 = Debug|Any CPU + {230B3FC1-6971-43C4-A75E-867F5F24B607}.Ad-Hoc|ARM.ActiveCfg = Debug|Any CPU + {230B3FC1-6971-43C4-A75E-867F5F24B607}.Ad-Hoc|ARM.Build.0 = Debug|Any CPU + {230B3FC1-6971-43C4-A75E-867F5F24B607}.Ad-Hoc|iPhone.ActiveCfg = Debug|Any CPU + {230B3FC1-6971-43C4-A75E-867F5F24B607}.Ad-Hoc|iPhone.Build.0 = Debug|Any CPU + {230B3FC1-6971-43C4-A75E-867F5F24B607}.Ad-Hoc|iPhoneSimulator.ActiveCfg = Debug|Any CPU + {230B3FC1-6971-43C4-A75E-867F5F24B607}.Ad-Hoc|iPhoneSimulator.Build.0 = Debug|Any CPU + {230B3FC1-6971-43C4-A75E-867F5F24B607}.Ad-Hoc|x64.ActiveCfg = Debug|Any CPU + {230B3FC1-6971-43C4-A75E-867F5F24B607}.Ad-Hoc|x64.Build.0 = Debug|Any CPU + {230B3FC1-6971-43C4-A75E-867F5F24B607}.Ad-Hoc|x86.ActiveCfg = Debug|Any CPU + {230B3FC1-6971-43C4-A75E-867F5F24B607}.Ad-Hoc|x86.Build.0 = Debug|Any CPU + {230B3FC1-6971-43C4-A75E-867F5F24B607}.AppStore|Any CPU.ActiveCfg = Debug|Any CPU + {230B3FC1-6971-43C4-A75E-867F5F24B607}.AppStore|Any CPU.Build.0 = Debug|Any CPU + {230B3FC1-6971-43C4-A75E-867F5F24B607}.AppStore|ARM.ActiveCfg = Debug|Any CPU + {230B3FC1-6971-43C4-A75E-867F5F24B607}.AppStore|ARM.Build.0 = Debug|Any CPU + {230B3FC1-6971-43C4-A75E-867F5F24B607}.AppStore|iPhone.ActiveCfg = Debug|Any CPU + {230B3FC1-6971-43C4-A75E-867F5F24B607}.AppStore|iPhone.Build.0 = Debug|Any CPU + {230B3FC1-6971-43C4-A75E-867F5F24B607}.AppStore|iPhoneSimulator.ActiveCfg = Debug|Any CPU + {230B3FC1-6971-43C4-A75E-867F5F24B607}.AppStore|iPhoneSimulator.Build.0 = Debug|Any CPU + {230B3FC1-6971-43C4-A75E-867F5F24B607}.AppStore|x64.ActiveCfg = Debug|Any CPU + {230B3FC1-6971-43C4-A75E-867F5F24B607}.AppStore|x64.Build.0 = Debug|Any CPU + {230B3FC1-6971-43C4-A75E-867F5F24B607}.AppStore|x86.ActiveCfg = Debug|Any CPU + {230B3FC1-6971-43C4-A75E-867F5F24B607}.AppStore|x86.Build.0 = Debug|Any CPU + {230B3FC1-6971-43C4-A75E-867F5F24B607}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {230B3FC1-6971-43C4-A75E-867F5F24B607}.Debug|Any CPU.Build.0 = Debug|Any CPU + {230B3FC1-6971-43C4-A75E-867F5F24B607}.Debug|ARM.ActiveCfg = Debug|Any CPU + {230B3FC1-6971-43C4-A75E-867F5F24B607}.Debug|ARM.Build.0 = Debug|Any CPU + {230B3FC1-6971-43C4-A75E-867F5F24B607}.Debug|iPhone.ActiveCfg = Debug|Any CPU + {230B3FC1-6971-43C4-A75E-867F5F24B607}.Debug|iPhone.Build.0 = Debug|Any CPU + {230B3FC1-6971-43C4-A75E-867F5F24B607}.Debug|iPhoneSimulator.ActiveCfg = Debug|Any CPU + {230B3FC1-6971-43C4-A75E-867F5F24B607}.Debug|iPhoneSimulator.Build.0 = Debug|Any CPU + {230B3FC1-6971-43C4-A75E-867F5F24B607}.Debug|x64.ActiveCfg = Debug|Any CPU + {230B3FC1-6971-43C4-A75E-867F5F24B607}.Debug|x64.Build.0 = Debug|Any CPU + {230B3FC1-6971-43C4-A75E-867F5F24B607}.Debug|x86.ActiveCfg = Debug|Any CPU + {230B3FC1-6971-43C4-A75E-867F5F24B607}.Debug|x86.Build.0 = Debug|Any CPU + {230B3FC1-6971-43C4-A75E-867F5F24B607}.Release|Any CPU.ActiveCfg = Release|Any CPU + {230B3FC1-6971-43C4-A75E-867F5F24B607}.Release|Any CPU.Build.0 = Release|Any CPU + {230B3FC1-6971-43C4-A75E-867F5F24B607}.Release|ARM.ActiveCfg = Release|Any CPU + {230B3FC1-6971-43C4-A75E-867F5F24B607}.Release|ARM.Build.0 = Release|Any CPU + {230B3FC1-6971-43C4-A75E-867F5F24B607}.Release|iPhone.ActiveCfg = Release|Any CPU + {230B3FC1-6971-43C4-A75E-867F5F24B607}.Release|iPhone.Build.0 = Release|Any CPU + {230B3FC1-6971-43C4-A75E-867F5F24B607}.Release|iPhoneSimulator.ActiveCfg = Release|Any CPU + {230B3FC1-6971-43C4-A75E-867F5F24B607}.Release|iPhoneSimulator.Build.0 = Release|Any CPU + {230B3FC1-6971-43C4-A75E-867F5F24B607}.Release|x64.ActiveCfg = Release|Any CPU + {230B3FC1-6971-43C4-A75E-867F5F24B607}.Release|x64.Build.0 = Release|Any CPU + {230B3FC1-6971-43C4-A75E-867F5F24B607}.Release|x86.ActiveCfg = Release|Any CPU + {230B3FC1-6971-43C4-A75E-867F5F24B607}.Release|x86.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -1588,6 +1638,7 @@ Global {B62E859F-825E-4C8B-93EC-5966EACFD026} = {798BFC44-2CCD-45FA-B37A-5173B03C2B30} {373D8AA1-36BE-49EC-89F0-6CB736666285} = {807BB76E-B2BB-47A2-A57B-3D1B20FF5E7F} {95D735BE-2899-4495-BE3F-2600E93B4E3C} = {373D8AA1-36BE-49EC-89F0-6CB736666285} + {230B3FC1-6971-43C4-A75E-867F5F24B607} = {807BB76E-B2BB-47A2-A57B-3D1B20FF5E7F} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {25728519-5F0F-4973-8A64-0A81EB4EA8D9} From 30eb3a075e7ebf2ebff22e5e3abbccacc8b4976b Mon Sep 17 00:00:00 2001 From: Philipp Theyssen Date: Mon, 13 Feb 2023 12:42:56 +0100 Subject: [PATCH 2/8] Add further details to Kafka eventbus skeleton. So far only copied details from other eventbus implementations. Key next steps are to implement a persistent connection abstraction (class) for the Kafka eventbus and the publish and subscribe functions. For this we need knowledge about how Kafka works, for example how one publishes events topics etc. --- .../DefaultKafkaPersistentConnection.cs | 6 ++- .../EventBus/EventBusKafka/EventBusKafka.cs | 53 ++++++++++++++++++- .../EventBusKafka/EventBusKafka.csproj | 11 ++++ .../EventBus/EventBusKafka/GlobalUsings.cs | 19 ++++--- .../IKafkaPersistentConnection.cs | 2 +- 5 files changed, 82 insertions(+), 9 deletions(-) diff --git a/src/BuildingBlocks/EventBus/EventBusKafka/DefaultKafkaPersistentConnection.cs b/src/BuildingBlocks/EventBus/EventBusKafka/DefaultKafkaPersistentConnection.cs index 17278bf02..6a4e38df8 100644 --- a/src/BuildingBlocks/EventBus/EventBusKafka/DefaultKafkaPersistentConnection.cs +++ b/src/BuildingBlocks/EventBus/EventBusKafka/DefaultKafkaPersistentConnection.cs @@ -1,6 +1,10 @@ namespace EventBusKafka; +// Abstracts how to connect to Kafka client public class DefaultKafkaPersistentConnection + : IKafkaPersistentConnection { - + public void Dispose() + { + } } \ No newline at end of file diff --git a/src/BuildingBlocks/EventBus/EventBusKafka/EventBusKafka.cs b/src/BuildingBlocks/EventBus/EventBusKafka/EventBusKafka.cs index 70e0ff1b1..d9a26fb3d 100644 --- a/src/BuildingBlocks/EventBus/EventBusKafka/EventBusKafka.cs +++ b/src/BuildingBlocks/EventBus/EventBusKafka/EventBusKafka.cs @@ -1,6 +1,57 @@ namespace EventBusKafka; -public class EventBusKafka +public class EventBusKafka : IEventBus, IDisposable { + const string BROKER_NAME = "eshop_event_bus"; + + private readonly IKafkaPersistentConnection _persistentConnection; + private readonly ILogger _logger; + private readonly IEventBusSubscriptionsManager _subsManager; + private readonly int _retryCount; + + // Object that will be registered as singleton to each service on startup, + // which will be used to publish and subscribe to events. + public EventBusKafka(IKafkaPersistentConnection persistentConnection, + ILogger logger, IEventBusSubscriptionsManager subsManager, int retryCount = 5) + { + _persistentConnection = persistentConnection ?? throw new ArgumentNullException(nameof(persistentConnection)); + _logger = logger ?? throw new ArgumentNullException(nameof(logger)); + _subsManager = subsManager ?? new InMemoryEventBusSubscriptionsManager(); + _retryCount = retryCount; + } + + public void Publish(IntegrationEvent @event) + { + throw new NotImplementedException(); + } + + public void Subscribe() where T : IntegrationEvent where TH : IIntegrationEventHandler + { + throw new NotImplementedException(); + } + + public void SubscribeDynamic(string eventName) where TH : IDynamicIntegrationEventHandler + { + throw new NotImplementedException(); + } + + // Taken directly from rabbitMQ implementation + public void UnsubscribeDynamic(string eventName) where TH : IDynamicIntegrationEventHandler + { + _subsManager.RemoveDynamicSubscription(eventName); + } + + // Taken directly from rabbitMQ implementation + public void Unsubscribe() where T : IntegrationEvent where TH : IIntegrationEventHandler + { + var eventName = _subsManager.GetEventKey(); + _logger.LogInformation("Unsubscribing from event {EventName}", eventName); + _subsManager.RemoveSubscription(); + } + + public void Dispose() + { + _subsManager.Clear(); + } } \ No newline at end of file diff --git a/src/BuildingBlocks/EventBus/EventBusKafka/EventBusKafka.csproj b/src/BuildingBlocks/EventBus/EventBusKafka/EventBusKafka.csproj index 28689f59d..c36791642 100644 --- a/src/BuildingBlocks/EventBus/EventBusKafka/EventBusKafka.csproj +++ b/src/BuildingBlocks/EventBus/EventBusKafka/EventBusKafka.csproj @@ -10,4 +10,15 @@ + + + + + + + + ..\..\..\..\..\..\.nuget\packages\microsoft.extensions.logging.abstractions\7.0.0\lib\net7.0\Microsoft.Extensions.Logging.Abstractions.dll + + + diff --git a/src/BuildingBlocks/EventBus/EventBusKafka/GlobalUsings.cs b/src/BuildingBlocks/EventBus/EventBusKafka/GlobalUsings.cs index fd2a37b20..2928d3520 100644 --- a/src/BuildingBlocks/EventBus/EventBusKafka/GlobalUsings.cs +++ b/src/BuildingBlocks/EventBus/EventBusKafka/GlobalUsings.cs @@ -1,6 +1,13 @@ -namespace EventBusKafka; - -public class GlobalUsings -{ - -} \ No newline at end of file +global using Polly; +global using Polly.Retry; +global using System; +global using System.IO; +global using System.Net.Sockets; +global using Microsoft.eShopOnContainers.BuildingBlocks.EventBus; +global using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Abstractions; +global using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Events; +global using Microsoft.Extensions.Logging; +global using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Extensions; +global using System.Text; +global using System.Threading.Tasks; +global using System.Text.Json; \ No newline at end of file diff --git a/src/BuildingBlocks/EventBus/EventBusKafka/IKafkaPersistentConnection.cs b/src/BuildingBlocks/EventBus/EventBusKafka/IKafkaPersistentConnection.cs index 512c94be9..94254c699 100644 --- a/src/BuildingBlocks/EventBus/EventBusKafka/IKafkaPersistentConnection.cs +++ b/src/BuildingBlocks/EventBus/EventBusKafka/IKafkaPersistentConnection.cs @@ -1,6 +1,6 @@ namespace EventBusKafka; -public class IKafkaPersistentConnection +public interface IKafkaPersistentConnection : IDisposable { } \ No newline at end of file From e1eac82289150b7c5134903941872c212010b6b4 Mon Sep 17 00:00:00 2001 From: Philipp Theyssen Date: Mon, 13 Feb 2023 17:27:24 +0100 Subject: [PATCH 3/8] Add logic for producing events to kafka eventbus. For now we have a single topic for all events. Since we use the eventname as the key for the kafka message, we have the property that they all get assigned to the same parition inside kafka and therefore are in-order. Alternatively one could have multiple topics. The code for the kafka connection is based on: https://github.com/confluentinc/confluent-kafka-dotnet/blob/master/examples/Web/KafkaClientHandle.cs --- .../DefaultKafkaPersistentConnection.cs | 28 ++++++++++++++++++- .../EventBus/EventBusKafka/EventBusKafka.cs | 17 +++++++++-- .../EventBus/EventBusKafka/GlobalUsings.cs | 1 + .../IKafkaPersistentConnection.cs | 2 +- 4 files changed, 44 insertions(+), 4 deletions(-) diff --git a/src/BuildingBlocks/EventBus/EventBusKafka/DefaultKafkaPersistentConnection.cs b/src/BuildingBlocks/EventBus/EventBusKafka/DefaultKafkaPersistentConnection.cs index 6a4e38df8..64c241d81 100644 --- a/src/BuildingBlocks/EventBus/EventBusKafka/DefaultKafkaPersistentConnection.cs +++ b/src/BuildingBlocks/EventBus/EventBusKafka/DefaultKafkaPersistentConnection.cs @@ -1,10 +1,36 @@ namespace EventBusKafka; -// Abstracts how to connect to Kafka client + +/// +/// Class for making sure we do not open new producer context (expensive) +/// everytime a service publishes an event. +/// On startup each service creates an singleton instance of this class, +/// which is then used when publishing any events. +/// +/// based on https://github.com/confluentinc/confluent-kafka-dotnet/blob/master/examples/Web/KafkaClientHandle.cs +/// +/// public class DefaultKafkaPersistentConnection : IKafkaPersistentConnection { + + IProducer _kafkaProducer; + + public DefaultKafkaPersistentConnection(String brokerList) + { + // TODO: fix configuration passing for producer + // for now just assume we give "localhost:9092" as argument + var conf = new ProducerConfig { BootstrapServers = brokerList }; + _kafkaProducer = new ProducerBuilder(conf).Build(); + } + + public Handle Handle => _kafkaProducer.Handle; + public void Dispose() { + // Block until all outstanding produce requests have completed (with or + // without error). + _kafkaProducer.Flush(); + _kafkaProducer.Dispose(); } } \ No newline at end of file diff --git a/src/BuildingBlocks/EventBus/EventBusKafka/EventBusKafka.cs b/src/BuildingBlocks/EventBus/EventBusKafka/EventBusKafka.cs index d9a26fb3d..e99aee831 100644 --- a/src/BuildingBlocks/EventBus/EventBusKafka/EventBusKafka.cs +++ b/src/BuildingBlocks/EventBus/EventBusKafka/EventBusKafka.cs @@ -2,12 +2,17 @@ namespace EventBusKafka; public class EventBusKafka : IEventBus, IDisposable { - const string BROKER_NAME = "eshop_event_bus"; + // for now use single topic and event names as keys for messages + // such that they always land in same partition and we have ordering guarantee + // then the consumers have to ignore events they are not subscribed to + // alternatively could have multiple topics (associated with event name) + private readonly string _topicName = "eshop_event_bus"; private readonly IKafkaPersistentConnection _persistentConnection; private readonly ILogger _logger; private readonly IEventBusSubscriptionsManager _subsManager; private readonly int _retryCount; + private const string INTEGRATION_EVENT_SUFFIX = "IntegrationEvent"; // Object that will be registered as singleton to each service on startup, @@ -23,7 +28,15 @@ public class EventBusKafka : IEventBus, IDisposable public void Publish(IntegrationEvent @event) { - throw new NotImplementedException(); + var eventName = @event.GetType().Name.Replace(INTEGRATION_EVENT_SUFFIX, ""); + var jsonMessage = JsonSerializer.Serialize(@event, @event.GetType()); + + // map Integration event to kafka message + // event name something like OrderPaymentSucceededIntegrationEvent + var message = new Message { Key = eventName, Value = jsonMessage }; + IProducer kafkaHandle = + new DependentProducerBuilder(_persistentConnection.Handle).Build(); + kafkaHandle.ProduceAsync(_topicName, message); } public void Subscribe() where T : IntegrationEvent where TH : IIntegrationEventHandler diff --git a/src/BuildingBlocks/EventBus/EventBusKafka/GlobalUsings.cs b/src/BuildingBlocks/EventBus/EventBusKafka/GlobalUsings.cs index 2928d3520..6dbe6ba33 100644 --- a/src/BuildingBlocks/EventBus/EventBusKafka/GlobalUsings.cs +++ b/src/BuildingBlocks/EventBus/EventBusKafka/GlobalUsings.cs @@ -7,6 +7,7 @@ global using Microsoft.eShopOnContainers.BuildingBlocks.EventBus; global using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Abstractions; global using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Events; global using Microsoft.Extensions.Logging; +global using Confluent.Kafka; global using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Extensions; global using System.Text; global using System.Threading.Tasks; diff --git a/src/BuildingBlocks/EventBus/EventBusKafka/IKafkaPersistentConnection.cs b/src/BuildingBlocks/EventBus/EventBusKafka/IKafkaPersistentConnection.cs index 94254c699..008c46211 100644 --- a/src/BuildingBlocks/EventBus/EventBusKafka/IKafkaPersistentConnection.cs +++ b/src/BuildingBlocks/EventBus/EventBusKafka/IKafkaPersistentConnection.cs @@ -2,5 +2,5 @@ namespace EventBusKafka; public interface IKafkaPersistentConnection : IDisposable { - + Handle Handle { get; } } \ No newline at end of file From 08b1404dd4eb702117472ab0cf20355f4a426e1e Mon Sep 17 00:00:00 2001 From: Philipp Theyssen Date: Wed, 15 Feb 2023 11:38:34 +0100 Subject: [PATCH 4/8] Start working subscribing with Kakfa and using it in Basket service. --- .../DefaultKafkaPersistentConnection.cs | 7 +++++- .../EventBus/EventBusKafka/EventBusKafka.cs | 9 +++++-- .../Basket/Basket.API/Basket.API.csproj | 1 + src/Services/Basket/Basket.API/Startup.cs | 25 +++++++++++++++++++ 4 files changed, 39 insertions(+), 3 deletions(-) diff --git a/src/BuildingBlocks/EventBus/EventBusKafka/DefaultKafkaPersistentConnection.cs b/src/BuildingBlocks/EventBus/EventBusKafka/DefaultKafkaPersistentConnection.cs index 64c241d81..2b5a9b965 100644 --- a/src/BuildingBlocks/EventBus/EventBusKafka/DefaultKafkaPersistentConnection.cs +++ b/src/BuildingBlocks/EventBus/EventBusKafka/DefaultKafkaPersistentConnection.cs @@ -14,13 +14,18 @@ public class DefaultKafkaPersistentConnection : IKafkaPersistentConnection { + private readonly ILogger _logger; IProducer _kafkaProducer; - public DefaultKafkaPersistentConnection(String brokerList) + public DefaultKafkaPersistentConnection(String brokerList, + ILogger logger) { + _logger = logger ?? throw new ArgumentNullException(nameof(logger)); // TODO: fix configuration passing for producer // for now just assume we give "localhost:9092" as argument var conf = new ProducerConfig { BootstrapServers = brokerList }; + + // TODO maybe we need to retry this? -> as it could fail _kafkaProducer = new ProducerBuilder(conf).Build(); } diff --git a/src/BuildingBlocks/EventBus/EventBusKafka/EventBusKafka.cs b/src/BuildingBlocks/EventBus/EventBusKafka/EventBusKafka.cs index e99aee831..199e82a74 100644 --- a/src/BuildingBlocks/EventBus/EventBusKafka/EventBusKafka.cs +++ b/src/BuildingBlocks/EventBus/EventBusKafka/EventBusKafka.cs @@ -34,14 +34,19 @@ public class EventBusKafka : IEventBus, IDisposable // map Integration event to kafka message // event name something like OrderPaymentSucceededIntegrationEvent var message = new Message { Key = eventName, Value = jsonMessage }; - IProducer kafkaHandle = + var kafkaHandle = new DependentProducerBuilder(_persistentConnection.Handle).Build(); kafkaHandle.ProduceAsync(_topicName, message); } public void Subscribe() where T : IntegrationEvent where TH : IIntegrationEventHandler { - throw new NotImplementedException(); + var eventName = _subsManager.GetEventKey(); + // DoInternalSubscription(eventName); + + _logger.LogInformation("Subscribing to event {EventName} with {EventHandler}", eventName, typeof(TH).GetGenericTypeName()); + + _subsManager.AddSubscription(); } public void SubscribeDynamic(string eventName) where TH : IDynamicIntegrationEventHandler diff --git a/src/Services/Basket/Basket.API/Basket.API.csproj b/src/Services/Basket/Basket.API/Basket.API.csproj index 1cf1b7dfc..4d1cc625d 100644 --- a/src/Services/Basket/Basket.API/Basket.API.csproj +++ b/src/Services/Basket/Basket.API/Basket.API.csproj @@ -54,6 +54,7 @@ + diff --git a/src/Services/Basket/Basket.API/Startup.cs b/src/Services/Basket/Basket.API/Startup.cs index 3ef2aa33f..70c7b533a 100644 --- a/src/Services/Basket/Basket.API/Startup.cs +++ b/src/Services/Basket/Basket.API/Startup.cs @@ -1,3 +1,4 @@ +using EventBusKafka; using Microsoft.AspNetCore.Authentication.Cookies; using Microsoft.AspNetCore.Authentication.OpenIdConnect; @@ -90,6 +91,16 @@ public class Startup return new DefaultServiceBusPersisterConnection(serviceBusConnectionString); }); } + else if (Configuration.GetValue("KafkaEnabled")) + { + services.AddSingleton(sp => + { + var logger = sp.GetRequiredService>(); + + // TODO add retry, better config passing here + return new DefaultKafkaPersistentConnection("localhost:9092",logger); + }); + } else { services.AddSingleton(sp => @@ -259,6 +270,20 @@ public class Startup eventBusSubscriptionsManager, iLifetimeScope, subscriptionName); }); } + else if (Configuration.GetValue("KafkaEnabled")) + { + services.AddSingleton(sp => + { + var kafkaPersistentConnection = sp.GetRequiredService(); + var logger = sp.GetRequiredService>(); + var eventBusSubscriptionsManager = sp.GetRequiredService(); + string subscriptionName = Configuration["SubscriptionClientName"]; + + // TODO fix namespace -> global using + return new global::EventBusKafka.EventBusKafka(kafkaPersistentConnection, logger, + eventBusSubscriptionsManager, 5); + }); + } else { services.AddSingleton(sp => From 72f2968bea9dc8c92d3716156b7dd1631e26d528 Mon Sep 17 00:00:00 2001 From: kct949 Date: Wed, 15 Feb 2023 15:03:26 +0100 Subject: [PATCH 5/8] implement subscribe for kafka + minor fixes --- .../DefaultKafkaPersistentConnection.cs | 20 ++- .../EventBus/EventBusKafka/EventBusKafka.cs | 16 ++- .../EventBusKafka/EventBusKafka.csproj | 2 + .../IKafkaPersistentConnection.cs | 3 +- .../KafkaConsumerBackgroundService.cs | 117 ++++++++++++++++++ .../Basket/Basket.API/GlobalUsings.cs | 1 + src/Services/Basket/Basket.API/Startup.cs | 27 +--- .../Basket.API/appsettings.Development.json | 11 +- .../Catalog.API/appsettings.Development.json | 11 +- .../appsettings.Development.json | 9 ++ .../Payment.API/appsettings.Development.json | 9 ++ 11 files changed, 178 insertions(+), 48 deletions(-) create mode 100644 src/BuildingBlocks/EventBus/EventBusKafka/KafkaConsumerBackgroundService.cs diff --git a/src/BuildingBlocks/EventBus/EventBusKafka/DefaultKafkaPersistentConnection.cs b/src/BuildingBlocks/EventBus/EventBusKafka/DefaultKafkaPersistentConnection.cs index 2b5a9b965..d57959252 100644 --- a/src/BuildingBlocks/EventBus/EventBusKafka/DefaultKafkaPersistentConnection.cs +++ b/src/BuildingBlocks/EventBus/EventBusKafka/DefaultKafkaPersistentConnection.cs @@ -1,5 +1,6 @@ -namespace EventBusKafka; +using Microsoft.Extensions.Configuration; +namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusKafka; /// /// Class for making sure we do not open new producer context (expensive) @@ -13,20 +14,13 @@ namespace EventBusKafka; public class DefaultKafkaPersistentConnection : IKafkaPersistentConnection { + private readonly IProducer _kafkaProducer; - private readonly ILogger _logger; - IProducer _kafkaProducer; - - public DefaultKafkaPersistentConnection(String brokerList, - ILogger logger) + public DefaultKafkaPersistentConnection(IConfiguration configuration) { - _logger = logger ?? throw new ArgumentNullException(nameof(logger)); - // TODO: fix configuration passing for producer - // for now just assume we give "localhost:9092" as argument - var conf = new ProducerConfig { BootstrapServers = brokerList }; - - // TODO maybe we need to retry this? -> as it could fail - _kafkaProducer = new ProducerBuilder(conf).Build(); + var producerConfig = new ProducerConfig(); + configuration.GetSection("Kafka:ProducerSettings").Bind(producerConfig); + _kafkaProducer = new ProducerBuilder(producerConfig).Build(); } public Handle Handle => _kafkaProducer.Handle; diff --git a/src/BuildingBlocks/EventBus/EventBusKafka/EventBusKafka.cs b/src/BuildingBlocks/EventBus/EventBusKafka/EventBusKafka.cs index 199e82a74..026a19c54 100644 --- a/src/BuildingBlocks/EventBus/EventBusKafka/EventBusKafka.cs +++ b/src/BuildingBlocks/EventBus/EventBusKafka/EventBusKafka.cs @@ -1,4 +1,4 @@ -namespace EventBusKafka; +namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusKafka; public class EventBusKafka : IEventBus, IDisposable { @@ -6,29 +6,27 @@ public class EventBusKafka : IEventBus, IDisposable // such that they always land in same partition and we have ordering guarantee // then the consumers have to ignore events they are not subscribed to // alternatively could have multiple topics (associated with event name) - private readonly string _topicName = "eshop_event_bus"; + private const string TopicName = "eshop_event_bus"; private readonly IKafkaPersistentConnection _persistentConnection; private readonly ILogger _logger; private readonly IEventBusSubscriptionsManager _subsManager; - private readonly int _retryCount; - private const string INTEGRATION_EVENT_SUFFIX = "IntegrationEvent"; + private const string IntegrationEventSuffix = "IntegrationEvent"; // Object that will be registered as singleton to each service on startup, // which will be used to publish and subscribe to events. public EventBusKafka(IKafkaPersistentConnection persistentConnection, - ILogger logger, IEventBusSubscriptionsManager subsManager, int retryCount = 5) + ILogger logger, IEventBusSubscriptionsManager subsManager) { _persistentConnection = persistentConnection ?? throw new ArgumentNullException(nameof(persistentConnection)); _logger = logger ?? throw new ArgumentNullException(nameof(logger)); - _subsManager = subsManager ?? new InMemoryEventBusSubscriptionsManager(); - _retryCount = retryCount; + _subsManager = subsManager; } public void Publish(IntegrationEvent @event) { - var eventName = @event.GetType().Name.Replace(INTEGRATION_EVENT_SUFFIX, ""); + var eventName = @event.GetType().Name.Replace(IntegrationEventSuffix, ""); var jsonMessage = JsonSerializer.Serialize(@event, @event.GetType()); // map Integration event to kafka message @@ -36,7 +34,7 @@ public class EventBusKafka : IEventBus, IDisposable var message = new Message { Key = eventName, Value = jsonMessage }; var kafkaHandle = new DependentProducerBuilder(_persistentConnection.Handle).Build(); - kafkaHandle.ProduceAsync(_topicName, message); + kafkaHandle.ProduceAsync(TopicName, message); } public void Subscribe() where T : IntegrationEvent where TH : IIntegrationEventHandler diff --git a/src/BuildingBlocks/EventBus/EventBusKafka/EventBusKafka.csproj b/src/BuildingBlocks/EventBus/EventBusKafka/EventBusKafka.csproj index c36791642..74961ceff 100644 --- a/src/BuildingBlocks/EventBus/EventBusKafka/EventBusKafka.csproj +++ b/src/BuildingBlocks/EventBus/EventBusKafka/EventBusKafka.csproj @@ -11,7 +11,9 @@ + + diff --git a/src/BuildingBlocks/EventBus/EventBusKafka/IKafkaPersistentConnection.cs b/src/BuildingBlocks/EventBus/EventBusKafka/IKafkaPersistentConnection.cs index 008c46211..25514a50e 100644 --- a/src/BuildingBlocks/EventBus/EventBusKafka/IKafkaPersistentConnection.cs +++ b/src/BuildingBlocks/EventBus/EventBusKafka/IKafkaPersistentConnection.cs @@ -1,4 +1,5 @@ -namespace EventBusKafka; +namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusKafka; + public interface IKafkaPersistentConnection : IDisposable { diff --git a/src/BuildingBlocks/EventBus/EventBusKafka/KafkaConsumerBackgroundService.cs b/src/BuildingBlocks/EventBus/EventBusKafka/KafkaConsumerBackgroundService.cs new file mode 100644 index 000000000..e6f7bd3ea --- /dev/null +++ b/src/BuildingBlocks/EventBus/EventBusKafka/KafkaConsumerBackgroundService.cs @@ -0,0 +1,117 @@ +using Autofac; +using Microsoft.Extensions.Configuration; +using Microsoft.Extensions.Hosting; + +namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusKafka; + +/* Inspired by https://github.com/confluentinc/confluent-kafka-dotnet/blob/master/examples/Web/RequestTimeConsumer.cs */ +public class KafkaConsumerBackgroundService : BackgroundService +{ + private readonly IEventBusSubscriptionsManager _subsManager; + private readonly ILifetimeScope _autofac; + private readonly ILogger _logger; + private readonly IConsumer _kafkaConsumer; + private const string AutofacScopeName = "eshop_event_bus"; + private const string TopicName = "eshop_event_bus"; + + + public KafkaConsumerBackgroundService(IConfiguration config, + IEventBusSubscriptionsManager subsManager, + ILifetimeScope autofac, + ILogger logger) + { + var consumerConfig = new ConsumerConfig(); + config.GetSection("Kafka:ConsumerSettings").Bind(consumerConfig); + + _kafkaConsumer = new ConsumerBuilder(consumerConfig).Build(); + _subsManager = subsManager; + _autofac = autofac; + _logger = logger; + } + + protected override Task ExecuteAsync(CancellationToken stoppingToken) + { + return Task.Run(() => StartConsumerLoop(stoppingToken), stoppingToken); + } + + private async Task StartConsumerLoop(CancellationToken cancellationToken) + { + _kafkaConsumer.Subscribe(TopicName); + + while (!cancellationToken.IsCancellationRequested) + { + try + { + var consumeResult = _kafkaConsumer.Consume(cancellationToken); + + var eventName = consumeResult.Message.Key; + + if (!_subsManager.HasSubscriptionsForEvent(eventName)) + { + _logger.LogWarning("No subscription for Kafka event: {EventName}", eventName); + continue; + } + + await using var scope = _autofac.BeginLifetimeScope(AutofacScopeName); + var subscriptions = _subsManager.GetHandlersForEvent(eventName); + foreach (var subscription in subscriptions) + { + #region dynamic subscription + /* We do not support dynamic subscription at the moment*/ + // if (subscription.IsDynamic) + // { + // if (scope.ResolveOptional(subscription.HandlerType) is not IDynamicIntegrationEventHandler handler) continue; + // using dynamic eventData = JsonDocument.Parse(message); + // await Task.Yield(); + // await handler.Handle(eventData); + // } + #endregion + + /* The code below applies to non-dynamic subscriptions only */ + var handler = scope.ResolveOptional(subscription.HandlerType); + if (handler == null) continue; + var eventType = _subsManager.GetEventTypeByName(eventName); + var integrationEvent = JsonSerializer.Deserialize(consumeResult.Message.Value, + eventType, + new JsonSerializerOptions + { + PropertyNameCaseInsensitive = true + }); + var concreteType = typeof(IIntegrationEventHandler<>).MakeGenericType(eventType); + + await Task.Yield(); + await (Task)concreteType.GetMethod("Handle") + .Invoke(handler, new object[] { integrationEvent }); + } + } + catch (OperationCanceledException) + { + break; + } + catch (ConsumeException e) + { + // Consumer errors should generally be ignored (or logged) unless fatal. + Console.WriteLine($"Consume error: {e.Error.Reason}"); + + if (e.Error.IsFatal) + { + // https://github.com/edenhill/librdkafka/blob/master/INTRODUCTION.md#fatal-consumer-errors + break; + } + } + catch (Exception e) + { + Console.WriteLine($"Unexpected error: {e}"); + break; + } + } + } + + public override void Dispose() + { + _kafkaConsumer.Close(); // Commit offsets and leave the group cleanly. + _kafkaConsumer.Dispose(); + + base.Dispose(); + } +} \ No newline at end of file diff --git a/src/Services/Basket/Basket.API/GlobalUsings.cs b/src/Services/Basket/Basket.API/GlobalUsings.cs index b2e13ab17..22426e3db 100644 --- a/src/Services/Basket/Basket.API/GlobalUsings.cs +++ b/src/Services/Basket/Basket.API/GlobalUsings.cs @@ -28,6 +28,7 @@ global using Azure.Messaging.ServiceBus; global using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Abstractions; global using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Events; global using Microsoft.eShopOnContainers.BuildingBlocks.EventBus; +global using Microsoft.eShopOnContainers.BuildingBlocks.EventBusKafka; global using Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ; global using Microsoft.eShopOnContainers.BuildingBlocks.EventBusServiceBus; global using Microsoft.eShopOnContainers.Services.Basket.API.Controllers; diff --git a/src/Services/Basket/Basket.API/Startup.cs b/src/Services/Basket/Basket.API/Startup.cs index 70c7b533a..0682f4b77 100644 --- a/src/Services/Basket/Basket.API/Startup.cs +++ b/src/Services/Basket/Basket.API/Startup.cs @@ -1,7 +1,3 @@ -using EventBusKafka; -using Microsoft.AspNetCore.Authentication.Cookies; -using Microsoft.AspNetCore.Authentication.OpenIdConnect; - namespace Microsoft.eShopOnContainers.Services.Basket.API; public class Startup { @@ -93,13 +89,7 @@ public class Startup } else if (Configuration.GetValue("KafkaEnabled")) { - services.AddSingleton(sp => - { - var logger = sp.GetRequiredService>(); - - // TODO add retry, better config passing here - return new DefaultKafkaPersistentConnection("localhost:9092",logger); - }); + services.AddSingleton(); } else { @@ -264,7 +254,7 @@ public class Startup var iLifetimeScope = sp.GetRequiredService(); var logger = sp.GetRequiredService>(); var eventBusSubscriptionsManager = sp.GetRequiredService(); - string subscriptionName = Configuration["SubscriptionClientName"]; + var subscriptionName = Configuration["SubscriptionClientName"]; return new EventBusServiceBus(serviceBusPersisterConnection, logger, eventBusSubscriptionsManager, iLifetimeScope, subscriptionName); @@ -272,17 +262,8 @@ public class Startup } else if (Configuration.GetValue("KafkaEnabled")) { - services.AddSingleton(sp => - { - var kafkaPersistentConnection = sp.GetRequiredService(); - var logger = sp.GetRequiredService>(); - var eventBusSubscriptionsManager = sp.GetRequiredService(); - string subscriptionName = Configuration["SubscriptionClientName"]; - - // TODO fix namespace -> global using - return new global::EventBusKafka.EventBusKafka(kafkaPersistentConnection, logger, - eventBusSubscriptionsManager, 5); - }); + services.AddHostedService(); + services.AddSingleton(); } else { diff --git a/src/Services/Basket/Basket.API/appsettings.Development.json b/src/Services/Basket/Basket.API/appsettings.Development.json index f4a3b9407..5655c0e0f 100644 --- a/src/Services/Basket/Basket.API/appsettings.Development.json +++ b/src/Services/Basket/Basket.API/appsettings.Development.json @@ -13,5 +13,14 @@ "IdentityUrl": "http://localhost:5105", "ConnectionString": "127.0.0.1", "AzureServiceBusEnabled": false, - "EventBusConnection": "localhost" + "EventBusConnection": "localhost", + "Kafka": { + "ProducerSettings": { + "BootstrapServers": "localhost:9092" + }, + "ConsumerSettings": { + "BootstrapServers": "localhost:9092", + "GroupId": "basket-group-id" + } + } } \ No newline at end of file diff --git a/src/Services/Catalog/Catalog.API/appsettings.Development.json b/src/Services/Catalog/Catalog.API/appsettings.Development.json index 1d5574f63..6154a50e6 100644 --- a/src/Services/Catalog/Catalog.API/appsettings.Development.json +++ b/src/Services/Catalog/Catalog.API/appsettings.Development.json @@ -11,5 +11,14 @@ } } }, - "EventBusConnection": "localhost" + "EventBusConnection": "localhost", + "Kafka": { + "ProducerSettings": { + "BootstrapServers": "localhost:9092" + }, + "ConsumerSettings": { + "BootstrapServers": "localhost:9092", + "GroupId": "catalog-group-id" + } + } } \ No newline at end of file diff --git a/src/Services/Ordering/Ordering.BackgroundTasks/appsettings.Development.json b/src/Services/Ordering/Ordering.BackgroundTasks/appsettings.Development.json index e203e9407..6fdbdfcc1 100644 --- a/src/Services/Ordering/Ordering.BackgroundTasks/appsettings.Development.json +++ b/src/Services/Ordering/Ordering.BackgroundTasks/appsettings.Development.json @@ -5,5 +5,14 @@ "System": "Information", "Microsoft": "Information" } + }, + "Kafka": { + "ProducerSettings": { + "BootstrapServers": "localhost:9092" + }, + "ConsumerSettings": { + "BootstrapServers": "localhost:9092", + "GroupId": "ordering-group-id" + } } } diff --git a/src/Services/Payment/Payment.API/appsettings.Development.json b/src/Services/Payment/Payment.API/appsettings.Development.json index fa8ce71a9..f8d627ace 100644 --- a/src/Services/Payment/Payment.API/appsettings.Development.json +++ b/src/Services/Payment/Payment.API/appsettings.Development.json @@ -6,5 +6,14 @@ "System": "Information", "Microsoft": "Information" } + }, + "Kafka": { + "ProducerSettings": { + "BootstrapServers": "localhost:9092" + }, + "ConsumerSettings": { + "BootstrapServers": "localhost:9092", + "GroupId": "payment-group-id" + } } } From 6d6a9c09d2adca09447eb9afbccdb5f8b2ef237a Mon Sep 17 00:00:00 2001 From: kct949 Date: Wed, 15 Feb 2023 15:18:05 +0100 Subject: [PATCH 6/8] add kafka to docker compose --- src/docker-compose.override.yml | 19 +++++++++++++++++++ src/docker-compose.yml | 8 ++++++++ 2 files changed, 27 insertions(+) diff --git a/src/docker-compose.override.yml b/src/docker-compose.override.yml index 89b3e9925..bb51fe74e 100644 --- a/src/docker-compose.override.yml +++ b/src/docker-compose.override.yml @@ -37,7 +37,26 @@ services: ports: - "15672:15672" - "5672:5672" + + zookeeper: + environment: + ZOOKEEPER_CLIENT_PORT: 2181 + ZOOKEEPER_TICK_TIME: 2000 + broker: + environment: + KAFKA_BROKER_ID: 1 + KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181' + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_INTERNAL:PLAINTEXT + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092,PLAINTEXT_INTERNAL://broker:29092 + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 + KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 + ports: + # To learn about configuring Kafka for access across networks see + # https://www.confluent.io/blog/kafka-client-cannot-connect-to-broker-on-aws-on-docker-etc/ + - "9092:9092" + identity-api: environment: - ASPNETCORE_ENVIRONMENT=Development diff --git a/src/docker-compose.yml b/src/docker-compose.yml index 6821bad81..dba6bd74e 100644 --- a/src/docker-compose.yml +++ b/src/docker-compose.yml @@ -16,6 +16,14 @@ services: rabbitmq: image: rabbitmq:3-management-alpine + + zookeeper: + image: confluentinc/cp-zookeeper:7.3.0 + + broker: + image: confluentinc/cp-kafka:7.3.0 + depends_on: + - zookeeper identity-api: image: ${REGISTRY:-eshop}/identity.api:${PLATFORM:-linux}-${TAG:-latest} From 2af8c1ba25e3f34ef3cc7a7e380eb0fe8120402d Mon Sep 17 00:00:00 2001 From: kct949 Date: Wed, 15 Feb 2023 16:34:05 +0100 Subject: [PATCH 7/8] add 'copy kafka project' to all dockerfiles --- .../Mobile.Bff.Shopping/aggregator/Dockerfile | 1 + .../Web.Bff.Shopping/aggregator/Dockerfile | 1 + src/DockerfileSolutionRestore.txt | Bin 7956 -> 8212 bytes src/Services/Basket/Basket.API/Dockerfile | 1 + .../Basket/Basket.API/Dockerfile.develop | 1 + src/Services/Catalog/Catalog.API/Dockerfile | 1 + src/Services/Identity/Identity.API/Dockerfile | 1 + src/Services/Ordering/Ordering.API/Dockerfile | 1 + .../Ordering.BackgroundTasks/Dockerfile | 1 + .../Ordering/Ordering.SignalrHub/Dockerfile | 1 + src/Services/Payment/Payment.API/Dockerfile | 1 + src/Services/Webhooks/Webhooks.API/Dockerfile | 1 + src/Web/WebMVC/Dockerfile | 1 + src/Web/WebSPA/Dockerfile | 1 + src/Web/WebStatus/Dockerfile | 1 + src/Web/WebhookClient/Dockerfile | 1 + src/docker-compose.yml | 1 + src/eShopOnContainers-ServicesAndWebApps.sln | 4 ++-- 18 files changed, 18 insertions(+), 2 deletions(-) diff --git a/src/ApiGateways/Mobile.Bff.Shopping/aggregator/Dockerfile b/src/ApiGateways/Mobile.Bff.Shopping/aggregator/Dockerfile index 294e04375..693c03fdb 100644 --- a/src/ApiGateways/Mobile.Bff.Shopping/aggregator/Dockerfile +++ b/src/ApiGateways/Mobile.Bff.Shopping/aggregator/Dockerfile @@ -12,6 +12,7 @@ COPY "eShopOnContainers-ServicesAndWebApps.sln" "eShopOnContainers-ServicesAndWe COPY "ApiGateways/Mobile.Bff.Shopping/aggregator/Mobile.Shopping.HttpAggregator.csproj" "ApiGateways/Mobile.Bff.Shopping/aggregator/Mobile.Shopping.HttpAggregator.csproj" COPY "ApiGateways/Web.Bff.Shopping/aggregator/Web.Shopping.HttpAggregator.csproj" "ApiGateways/Web.Bff.Shopping/aggregator/Web.Shopping.HttpAggregator.csproj" COPY "BuildingBlocks/Devspaces.Support/Devspaces.Support.csproj" "BuildingBlocks/Devspaces.Support/Devspaces.Support.csproj" +COPY "BuildingBlocks/EventBus/EventBusKafka/EventBusKafka.csproj" "BuildingBlocks/EventBus/EventBusKafka/EventBusKafka.csproj" COPY "BuildingBlocks/EventBus/EventBus/EventBus.csproj" "BuildingBlocks/EventBus/EventBus/EventBus.csproj" COPY "BuildingBlocks/EventBus/EventBus.Tests/EventBus.Tests.csproj" "BuildingBlocks/EventBus/EventBus.Tests/EventBus.Tests.csproj" COPY "BuildingBlocks/EventBus/EventBusRabbitMQ/EventBusRabbitMQ.csproj" "BuildingBlocks/EventBus/EventBusRabbitMQ/EventBusRabbitMQ.csproj" diff --git a/src/ApiGateways/Web.Bff.Shopping/aggregator/Dockerfile b/src/ApiGateways/Web.Bff.Shopping/aggregator/Dockerfile index 5cf1c7332..532986893 100644 --- a/src/ApiGateways/Web.Bff.Shopping/aggregator/Dockerfile +++ b/src/ApiGateways/Web.Bff.Shopping/aggregator/Dockerfile @@ -12,6 +12,7 @@ COPY "eShopOnContainers-ServicesAndWebApps.sln" "eShopOnContainers-ServicesAndWe COPY "ApiGateways/Mobile.Bff.Shopping/aggregator/Mobile.Shopping.HttpAggregator.csproj" "ApiGateways/Mobile.Bff.Shopping/aggregator/Mobile.Shopping.HttpAggregator.csproj" COPY "ApiGateways/Web.Bff.Shopping/aggregator/Web.Shopping.HttpAggregator.csproj" "ApiGateways/Web.Bff.Shopping/aggregator/Web.Shopping.HttpAggregator.csproj" COPY "BuildingBlocks/Devspaces.Support/Devspaces.Support.csproj" "BuildingBlocks/Devspaces.Support/Devspaces.Support.csproj" +COPY "BuildingBlocks/EventBus/EventBusKafka/EventBusKafka.csproj" "BuildingBlocks/EventBus/EventBusKafka/EventBusKafka.csproj" COPY "BuildingBlocks/EventBus/EventBus/EventBus.csproj" "BuildingBlocks/EventBus/EventBus/EventBus.csproj" COPY "BuildingBlocks/EventBus/EventBus.Tests/EventBus.Tests.csproj" "BuildingBlocks/EventBus/EventBus.Tests/EventBus.Tests.csproj" COPY "BuildingBlocks/EventBus/EventBusRabbitMQ/EventBusRabbitMQ.csproj" "BuildingBlocks/EventBus/EventBusRabbitMQ/EventBusRabbitMQ.csproj" diff --git a/src/DockerfileSolutionRestore.txt b/src/DockerfileSolutionRestore.txt index 040bf761ec79bfb95f559c13baa31f518c2a399f..f99fde1aa53e1838da622138df448174f5267d36 100644 GIT binary patch delta 88 zcmbPYH^pJYGWN*}SOhq|84?-N7_u1>Cx2uSM&c#%iB1;aFu<$Ek=cH-9!JIG11u(+ J9XPIt0RS~w7##or delta 20 ccmbQ@FvV`eGWN+1oN1E}u$XLi;JhLR09CUGw*UYD diff --git a/src/Services/Basket/Basket.API/Dockerfile b/src/Services/Basket/Basket.API/Dockerfile index 078257cc0..61aa777a0 100644 --- a/src/Services/Basket/Basket.API/Dockerfile +++ b/src/Services/Basket/Basket.API/Dockerfile @@ -16,6 +16,7 @@ COPY "BuildingBlocks/EventBus/EventBus/EventBus.csproj" "BuildingBlocks/EventBus COPY "BuildingBlocks/EventBus/EventBus.Tests/EventBus.Tests.csproj" "BuildingBlocks/EventBus/EventBus.Tests/EventBus.Tests.csproj" COPY "BuildingBlocks/EventBus/EventBusRabbitMQ/EventBusRabbitMQ.csproj" "BuildingBlocks/EventBus/EventBusRabbitMQ/EventBusRabbitMQ.csproj" COPY "BuildingBlocks/EventBus/EventBusServiceBus/EventBusServiceBus.csproj" "BuildingBlocks/EventBus/EventBusServiceBus/EventBusServiceBus.csproj" +COPY "BuildingBlocks/EventBus/EventBusKafka/EventBusKafka.csproj" "BuildingBlocks/EventBus/EventBusKafka/EventBusKafka.csproj" COPY "BuildingBlocks/EventBus/IntegrationEventLogEF/IntegrationEventLogEF.csproj" "BuildingBlocks/EventBus/IntegrationEventLogEF/IntegrationEventLogEF.csproj" COPY "BuildingBlocks/WebHostCustomization/WebHost.Customization/WebHost.Customization.csproj" "BuildingBlocks/WebHostCustomization/WebHost.Customization/WebHost.Customization.csproj" COPY "Services/Basket/Basket.API/Basket.API.csproj" "Services/Basket/Basket.API/Basket.API.csproj" diff --git a/src/Services/Basket/Basket.API/Dockerfile.develop b/src/Services/Basket/Basket.API/Dockerfile.develop index b29a1aa8a..5fc9900be 100644 --- a/src/Services/Basket/Basket.API/Dockerfile.develop +++ b/src/Services/Basket/Basket.API/Dockerfile.develop @@ -8,6 +8,7 @@ WORKDIR /src COPY ["BuildingBlocks/EventBus/EventBus/EventBus.csproj", "BuildingBlocks/EventBus/EventBus/"] COPY ["BuildingBlocks/EventBus/EventBusRabbitMQ/EventBusRabbitMQ.csproj", "BuildingBlocks/EventBus/EventBusRabbitMQ/"] +COPY ["BuildingBlocks/EventBus/EventBusKafka/EventBusKafka.csproj" "BuildingBlocks/EventBus/EvenBusKafka/"] COPY ["BuildingBlocks/EventBus/EventBusServiceBus/EventBusServiceBus.csproj", "BuildingBlocks/EventBus/EventBusServiceBus/"] COPY ["Services/Basket/Basket.API/Basket.API.csproj", "Services/Basket/Basket.API/"] COPY ["NuGet.config", "NuGet.config"] diff --git a/src/Services/Catalog/Catalog.API/Dockerfile b/src/Services/Catalog/Catalog.API/Dockerfile index f7ab4232c..b55554187 100644 --- a/src/Services/Catalog/Catalog.API/Dockerfile +++ b/src/Services/Catalog/Catalog.API/Dockerfile @@ -16,6 +16,7 @@ COPY "BuildingBlocks/Devspaces.Support/Devspaces.Support.csproj" "BuildingBlocks COPY "BuildingBlocks/EventBus/EventBus/EventBus.csproj" "BuildingBlocks/EventBus/EventBus/EventBus.csproj" COPY "BuildingBlocks/EventBus/EventBus.Tests/EventBus.Tests.csproj" "BuildingBlocks/EventBus/EventBus.Tests/EventBus.Tests.csproj" COPY "BuildingBlocks/EventBus/EventBusRabbitMQ/EventBusRabbitMQ.csproj" "BuildingBlocks/EventBus/EventBusRabbitMQ/EventBusRabbitMQ.csproj" +COPY "BuildingBlocks/EventBus/EventBusKafka/EventBusKafka.csproj" "BuildingBlocks/EventBus/EventBusKafka/EventBusKafka.csproj" COPY "BuildingBlocks/EventBus/EventBusServiceBus/EventBusServiceBus.csproj" "BuildingBlocks/EventBus/EventBusServiceBus/EventBusServiceBus.csproj" COPY "BuildingBlocks/EventBus/IntegrationEventLogEF/IntegrationEventLogEF.csproj" "BuildingBlocks/EventBus/IntegrationEventLogEF/IntegrationEventLogEF.csproj" COPY "BuildingBlocks/WebHostCustomization/WebHost.Customization/WebHost.Customization.csproj" "BuildingBlocks/WebHostCustomization/WebHost.Customization/WebHost.Customization.csproj" diff --git a/src/Services/Identity/Identity.API/Dockerfile b/src/Services/Identity/Identity.API/Dockerfile index aca2e1e81..78c55535e 100644 --- a/src/Services/Identity/Identity.API/Dockerfile +++ b/src/Services/Identity/Identity.API/Dockerfile @@ -12,6 +12,7 @@ COPY "eShopOnContainers-ServicesAndWebApps.sln" "eShopOnContainers-ServicesAndWe COPY "ApiGateways/Mobile.Bff.Shopping/aggregator/Mobile.Shopping.HttpAggregator.csproj" "ApiGateways/Mobile.Bff.Shopping/aggregator/Mobile.Shopping.HttpAggregator.csproj" COPY "ApiGateways/Web.Bff.Shopping/aggregator/Web.Shopping.HttpAggregator.csproj" "ApiGateways/Web.Bff.Shopping/aggregator/Web.Shopping.HttpAggregator.csproj" COPY "BuildingBlocks/Devspaces.Support/Devspaces.Support.csproj" "BuildingBlocks/Devspaces.Support/Devspaces.Support.csproj" +COPY "BuildingBlocks/EventBus/EventBusKafka/EventBusKafka.csproj" "BuildingBlocks/EventBus/EventBusKafka/EventBusKafka.csproj" COPY "BuildingBlocks/EventBus/EventBus/EventBus.csproj" "BuildingBlocks/EventBus/EventBus/EventBus.csproj" COPY "BuildingBlocks/EventBus/EventBus.Tests/EventBus.Tests.csproj" "BuildingBlocks/EventBus/EventBus.Tests/EventBus.Tests.csproj" COPY "BuildingBlocks/EventBus/EventBusRabbitMQ/EventBusRabbitMQ.csproj" "BuildingBlocks/EventBus/EventBusRabbitMQ/EventBusRabbitMQ.csproj" diff --git a/src/Services/Ordering/Ordering.API/Dockerfile b/src/Services/Ordering/Ordering.API/Dockerfile index 650e54cc6..cefe66395 100644 --- a/src/Services/Ordering/Ordering.API/Dockerfile +++ b/src/Services/Ordering/Ordering.API/Dockerfile @@ -12,6 +12,7 @@ COPY "eShopOnContainers-ServicesAndWebApps.sln" "eShopOnContainers-ServicesAndWe COPY "ApiGateways/Mobile.Bff.Shopping/aggregator/Mobile.Shopping.HttpAggregator.csproj" "ApiGateways/Mobile.Bff.Shopping/aggregator/Mobile.Shopping.HttpAggregator.csproj" COPY "ApiGateways/Web.Bff.Shopping/aggregator/Web.Shopping.HttpAggregator.csproj" "ApiGateways/Web.Bff.Shopping/aggregator/Web.Shopping.HttpAggregator.csproj" COPY "BuildingBlocks/Devspaces.Support/Devspaces.Support.csproj" "BuildingBlocks/Devspaces.Support/Devspaces.Support.csproj" +COPY "BuildingBlocks/EventBus/EventBusKafka/EventBusKafka.csproj" "BuildingBlocks/EventBus/EventBusKafka/EventBusKafka.csproj" COPY "BuildingBlocks/EventBus/EventBus/EventBus.csproj" "BuildingBlocks/EventBus/EventBus/EventBus.csproj" COPY "BuildingBlocks/EventBus/EventBus.Tests/EventBus.Tests.csproj" "BuildingBlocks/EventBus/EventBus.Tests/EventBus.Tests.csproj" COPY "BuildingBlocks/EventBus/EventBusRabbitMQ/EventBusRabbitMQ.csproj" "BuildingBlocks/EventBus/EventBusRabbitMQ/EventBusRabbitMQ.csproj" diff --git a/src/Services/Ordering/Ordering.BackgroundTasks/Dockerfile b/src/Services/Ordering/Ordering.BackgroundTasks/Dockerfile index 43348d40c..7d640ab27 100644 --- a/src/Services/Ordering/Ordering.BackgroundTasks/Dockerfile +++ b/src/Services/Ordering/Ordering.BackgroundTasks/Dockerfile @@ -13,6 +13,7 @@ COPY "ApiGateways/Mobile.Bff.Shopping/aggregator/Mobile.Shopping.HttpAggregator. COPY "ApiGateways/Web.Bff.Shopping/aggregator/Web.Shopping.HttpAggregator.csproj" "ApiGateways/Web.Bff.Shopping/aggregator/Web.Shopping.HttpAggregator.csproj" COPY "BuildingBlocks/Devspaces.Support/Devspaces.Support.csproj" "BuildingBlocks/Devspaces.Support/Devspaces.Support.csproj" COPY "BuildingBlocks/EventBus/EventBus/EventBus.csproj" "BuildingBlocks/EventBus/EventBus/EventBus.csproj" +COPY "BuildingBlocks/EventBus/EventBusKafka/EventBusKafka.csproj" "BuildingBlocks/EventBus/EventBusKafka/EventBusKafka.csproj" COPY "BuildingBlocks/EventBus/EventBus.Tests/EventBus.Tests.csproj" "BuildingBlocks/EventBus/EventBus.Tests/EventBus.Tests.csproj" COPY "BuildingBlocks/EventBus/EventBusRabbitMQ/EventBusRabbitMQ.csproj" "BuildingBlocks/EventBus/EventBusRabbitMQ/EventBusRabbitMQ.csproj" COPY "BuildingBlocks/EventBus/EventBusServiceBus/EventBusServiceBus.csproj" "BuildingBlocks/EventBus/EventBusServiceBus/EventBusServiceBus.csproj" diff --git a/src/Services/Ordering/Ordering.SignalrHub/Dockerfile b/src/Services/Ordering/Ordering.SignalrHub/Dockerfile index 82cb625a7..9d0c15a14 100644 --- a/src/Services/Ordering/Ordering.SignalrHub/Dockerfile +++ b/src/Services/Ordering/Ordering.SignalrHub/Dockerfile @@ -14,6 +14,7 @@ COPY "ApiGateways/Web.Bff.Shopping/aggregator/Web.Shopping.HttpAggregator.csproj COPY "BuildingBlocks/Devspaces.Support/Devspaces.Support.csproj" "BuildingBlocks/Devspaces.Support/Devspaces.Support.csproj" COPY "BuildingBlocks/EventBus/EventBus/EventBus.csproj" "BuildingBlocks/EventBus/EventBus/EventBus.csproj" COPY "BuildingBlocks/EventBus/EventBus.Tests/EventBus.Tests.csproj" "BuildingBlocks/EventBus/EventBus.Tests/EventBus.Tests.csproj" +COPY "BuildingBlocks/EventBus/EventBusKafka/EventBusKafka.csproj" "BuildingBlocks/EventBus/EventBusKafka/EventBusKafka.csproj" COPY "BuildingBlocks/EventBus/EventBusRabbitMQ/EventBusRabbitMQ.csproj" "BuildingBlocks/EventBus/EventBusRabbitMQ/EventBusRabbitMQ.csproj" COPY "BuildingBlocks/EventBus/EventBusServiceBus/EventBusServiceBus.csproj" "BuildingBlocks/EventBus/EventBusServiceBus/EventBusServiceBus.csproj" COPY "BuildingBlocks/EventBus/IntegrationEventLogEF/IntegrationEventLogEF.csproj" "BuildingBlocks/EventBus/IntegrationEventLogEF/IntegrationEventLogEF.csproj" diff --git a/src/Services/Payment/Payment.API/Dockerfile b/src/Services/Payment/Payment.API/Dockerfile index 4b17cb3bd..58ec2f491 100644 --- a/src/Services/Payment/Payment.API/Dockerfile +++ b/src/Services/Payment/Payment.API/Dockerfile @@ -15,6 +15,7 @@ COPY "BuildingBlocks/Devspaces.Support/Devspaces.Support.csproj" "BuildingBlocks COPY "BuildingBlocks/EventBus/EventBus/EventBus.csproj" "BuildingBlocks/EventBus/EventBus/EventBus.csproj" COPY "BuildingBlocks/EventBus/EventBus.Tests/EventBus.Tests.csproj" "BuildingBlocks/EventBus/EventBus.Tests/EventBus.Tests.csproj" COPY "BuildingBlocks/EventBus/EventBusRabbitMQ/EventBusRabbitMQ.csproj" "BuildingBlocks/EventBus/EventBusRabbitMQ/EventBusRabbitMQ.csproj" +COPY "BuildingBlocks/EventBus/EventBusKafka/EventBusKafka.csproj" "BuildingBlocks/EventBus/EventBusKafka/EventBusKafka.csproj" COPY "BuildingBlocks/EventBus/EventBusServiceBus/EventBusServiceBus.csproj" "BuildingBlocks/EventBus/EventBusServiceBus/EventBusServiceBus.csproj" COPY "BuildingBlocks/EventBus/IntegrationEventLogEF/IntegrationEventLogEF.csproj" "BuildingBlocks/EventBus/IntegrationEventLogEF/IntegrationEventLogEF.csproj" COPY "BuildingBlocks/WebHostCustomization/WebHost.Customization/WebHost.Customization.csproj" "BuildingBlocks/WebHostCustomization/WebHost.Customization/WebHost.Customization.csproj" diff --git a/src/Services/Webhooks/Webhooks.API/Dockerfile b/src/Services/Webhooks/Webhooks.API/Dockerfile index 5db5f3f90..f7cb45fb0 100644 --- a/src/Services/Webhooks/Webhooks.API/Dockerfile +++ b/src/Services/Webhooks/Webhooks.API/Dockerfile @@ -14,6 +14,7 @@ COPY "ApiGateways/Web.Bff.Shopping/aggregator/Web.Shopping.HttpAggregator.csproj COPY "BuildingBlocks/Devspaces.Support/Devspaces.Support.csproj" "BuildingBlocks/Devspaces.Support/Devspaces.Support.csproj" COPY "BuildingBlocks/EventBus/EventBus/EventBus.csproj" "BuildingBlocks/EventBus/EventBus/EventBus.csproj" COPY "BuildingBlocks/EventBus/EventBus.Tests/EventBus.Tests.csproj" "BuildingBlocks/EventBus/EventBus.Tests/EventBus.Tests.csproj" +COPY "BuildingBlocks/EventBus/EventBusKafka/EventBusKafka.csproj" "BuildingBlocks/EventBus/EventBusKafka/EventBusKafka.csproj" COPY "BuildingBlocks/EventBus/EventBusRabbitMQ/EventBusRabbitMQ.csproj" "BuildingBlocks/EventBus/EventBusRabbitMQ/EventBusRabbitMQ.csproj" COPY "BuildingBlocks/EventBus/EventBusServiceBus/EventBusServiceBus.csproj" "BuildingBlocks/EventBus/EventBusServiceBus/EventBusServiceBus.csproj" COPY "BuildingBlocks/EventBus/IntegrationEventLogEF/IntegrationEventLogEF.csproj" "BuildingBlocks/EventBus/IntegrationEventLogEF/IntegrationEventLogEF.csproj" diff --git a/src/Web/WebMVC/Dockerfile b/src/Web/WebMVC/Dockerfile index 9170a2c41..f1e93adcc 100644 --- a/src/Web/WebMVC/Dockerfile +++ b/src/Web/WebMVC/Dockerfile @@ -12,6 +12,7 @@ COPY "eShopOnContainers-ServicesAndWebApps.sln" "eShopOnContainers-ServicesAndWe COPY "ApiGateways/Mobile.Bff.Shopping/aggregator/Mobile.Shopping.HttpAggregator.csproj" "ApiGateways/Mobile.Bff.Shopping/aggregator/Mobile.Shopping.HttpAggregator.csproj" COPY "ApiGateways/Web.Bff.Shopping/aggregator/Web.Shopping.HttpAggregator.csproj" "ApiGateways/Web.Bff.Shopping/aggregator/Web.Shopping.HttpAggregator.csproj" COPY "BuildingBlocks/Devspaces.Support/Devspaces.Support.csproj" "BuildingBlocks/Devspaces.Support/Devspaces.Support.csproj" +COPY "BuildingBlocks/EventBus/EventBusKafka/EventBusKafka.csproj" "BuildingBlocks/EventBus/EventBusKafka/EventBusKafka.csproj" COPY "BuildingBlocks/EventBus/EventBus/EventBus.csproj" "BuildingBlocks/EventBus/EventBus/EventBus.csproj" COPY "BuildingBlocks/EventBus/EventBus.Tests/EventBus.Tests.csproj" "BuildingBlocks/EventBus/EventBus.Tests/EventBus.Tests.csproj" COPY "BuildingBlocks/EventBus/EventBusRabbitMQ/EventBusRabbitMQ.csproj" "BuildingBlocks/EventBus/EventBusRabbitMQ/EventBusRabbitMQ.csproj" diff --git a/src/Web/WebSPA/Dockerfile b/src/Web/WebSPA/Dockerfile index f3e23d919..2b3255f6b 100644 --- a/src/Web/WebSPA/Dockerfile +++ b/src/Web/WebSPA/Dockerfile @@ -23,6 +23,7 @@ COPY "eShopOnContainers-ServicesAndWebApps.sln" "eShopOnContainers-ServicesAndWe COPY "ApiGateways/Mobile.Bff.Shopping/aggregator/Mobile.Shopping.HttpAggregator.csproj" "ApiGateways/Mobile.Bff.Shopping/aggregator/Mobile.Shopping.HttpAggregator.csproj" COPY "ApiGateways/Web.Bff.Shopping/aggregator/Web.Shopping.HttpAggregator.csproj" "ApiGateways/Web.Bff.Shopping/aggregator/Web.Shopping.HttpAggregator.csproj" COPY "BuildingBlocks/Devspaces.Support/Devspaces.Support.csproj" "BuildingBlocks/Devspaces.Support/Devspaces.Support.csproj" +COPY "BuildingBlocks/EventBus/EventBusKafka/EventBusKafka.csproj" "BuildingBlocks/EventBus/EventBusKafka/EventBusKafka.csproj" COPY "BuildingBlocks/EventBus/EventBus/EventBus.csproj" "BuildingBlocks/EventBus/EventBus/EventBus.csproj" COPY "BuildingBlocks/EventBus/EventBus.Tests/EventBus.Tests.csproj" "BuildingBlocks/EventBus/EventBus.Tests/EventBus.Tests.csproj" COPY "BuildingBlocks/EventBus/EventBusRabbitMQ/EventBusRabbitMQ.csproj" "BuildingBlocks/EventBus/EventBusRabbitMQ/EventBusRabbitMQ.csproj" diff --git a/src/Web/WebStatus/Dockerfile b/src/Web/WebStatus/Dockerfile index 325c7e7cd..b3ac9e55c 100644 --- a/src/Web/WebStatus/Dockerfile +++ b/src/Web/WebStatus/Dockerfile @@ -12,6 +12,7 @@ COPY "eShopOnContainers-ServicesAndWebApps.sln" "eShopOnContainers-ServicesAndWe COPY "ApiGateways/Mobile.Bff.Shopping/aggregator/Mobile.Shopping.HttpAggregator.csproj" "ApiGateways/Mobile.Bff.Shopping/aggregator/Mobile.Shopping.HttpAggregator.csproj" COPY "ApiGateways/Web.Bff.Shopping/aggregator/Web.Shopping.HttpAggregator.csproj" "ApiGateways/Web.Bff.Shopping/aggregator/Web.Shopping.HttpAggregator.csproj" COPY "BuildingBlocks/Devspaces.Support/Devspaces.Support.csproj" "BuildingBlocks/Devspaces.Support/Devspaces.Support.csproj" +COPY "BuildingBlocks/EventBus/EventBusKafka/EventBusKafka.csproj" "BuildingBlocks/EventBus/EventBusKafka/EventBusKafka.csproj" COPY "BuildingBlocks/EventBus/EventBus/EventBus.csproj" "BuildingBlocks/EventBus/EventBus/EventBus.csproj" COPY "BuildingBlocks/EventBus/EventBus.Tests/EventBus.Tests.csproj" "BuildingBlocks/EventBus/EventBus.Tests/EventBus.Tests.csproj" COPY "BuildingBlocks/EventBus/EventBusRabbitMQ/EventBusRabbitMQ.csproj" "BuildingBlocks/EventBus/EventBusRabbitMQ/EventBusRabbitMQ.csproj" diff --git a/src/Web/WebhookClient/Dockerfile b/src/Web/WebhookClient/Dockerfile index 82871380e..a75f822d2 100644 --- a/src/Web/WebhookClient/Dockerfile +++ b/src/Web/WebhookClient/Dockerfile @@ -18,6 +18,7 @@ COPY "BuildingBlocks/EventBus/EventBus.Tests/EventBus.Tests.csproj" "BuildingBlo COPY "BuildingBlocks/EventBus/EventBusRabbitMQ/EventBusRabbitMQ.csproj" "BuildingBlocks/EventBus/EventBusRabbitMQ/EventBusRabbitMQ.csproj" COPY "BuildingBlocks/EventBus/EventBusServiceBus/EventBusServiceBus.csproj" "BuildingBlocks/EventBus/EventBusServiceBus/EventBusServiceBus.csproj" COPY "BuildingBlocks/EventBus/IntegrationEventLogEF/IntegrationEventLogEF.csproj" "BuildingBlocks/EventBus/IntegrationEventLogEF/IntegrationEventLogEF.csproj" +COPY "BuildingBlocks/EventBus/EventBusKafka/EventBusKafka.csproj" "BuildingBlocks/EventBus/EventBusKafka/EventBusKafka.csproj" COPY "BuildingBlocks/WebHostCustomization/WebHost.Customization/WebHost.Customization.csproj" "BuildingBlocks/WebHostCustomization/WebHost.Customization/WebHost.Customization.csproj" COPY "Services/Basket/Basket.API/Basket.API.csproj" "Services/Basket/Basket.API/Basket.API.csproj" COPY "Services/Basket/Basket.FunctionalTests/Basket.FunctionalTests.csproj" "Services/Basket/Basket.FunctionalTests/Basket.FunctionalTests.csproj" diff --git a/src/docker-compose.yml b/src/docker-compose.yml index dba6bd74e..b689a095a 100644 --- a/src/docker-compose.yml +++ b/src/docker-compose.yml @@ -42,6 +42,7 @@ services: - basketdata - identity-api - rabbitmq + - broker catalog-api: image: ${REGISTRY:-eshop}/catalog.api:${PLATFORM:-linux}-${TAG:-latest} diff --git a/src/eShopOnContainers-ServicesAndWebApps.sln b/src/eShopOnContainers-ServicesAndWebApps.sln index edf7ea518..43258d5ba 100644 --- a/src/eShopOnContainers-ServicesAndWebApps.sln +++ b/src/eShopOnContainers-ServicesAndWebApps.sln @@ -53,6 +53,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "EventBus", "BuildingBlocks\ EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "EventBusRabbitMQ", "BuildingBlocks\EventBus\EventBusRabbitMQ\EventBusRabbitMQ.csproj", "{8088F3FC-6787-45FA-A924-816EC81CBFAC}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "EventBusKafka", "BuildingBlocks\EventBus\EventBusKafka\EventBusKafka.csproj", "{230B3FC1-6971-43C4-A75E-867F5F24B607}" +EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "IntegrationEventLogEF", "BuildingBlocks\EventBus\IntegrationEventLogEF\IntegrationEventLogEF.csproj", "{9EE28E45-1533-472B-8267-56C48855BA0E}" EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "WebStatus", "Web\WebStatus\WebStatus.csproj", "{C0A7918D-B4F2-4E7F-8DE2-1E5279EF079F}" @@ -124,8 +126,6 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "tests", "tests", "{373D8AA1 EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "EventBus.Tests", "BuildingBlocks\EventBus\EventBus.Tests\EventBus.Tests.csproj", "{95D735BE-2899-4495-BE3F-2600E93B4E3C}" EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "EventBusKafka", "BuildingBlocks\EventBus\EventBusKafka\EventBusKafka.csproj", "{230B3FC1-6971-43C4-A75E-867F5F24B607}" -EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Ad-Hoc|Any CPU = Ad-Hoc|Any CPU From 4b537d2aadddc77bc65bfa443b5579191a775d66 Mon Sep 17 00:00:00 2001 From: kct949 Date: Wed, 15 Feb 2023 22:59:10 +0100 Subject: [PATCH 8/8] working version --- src/BuildingBlocks/EventBus/EventBus/Utils.cs | 15 +++++++++++++++ .../EventBus/EventBusKafka/EventBusKafka.cs | 3 +++ .../KafkaConsumerBackgroundService.cs | 13 ++++++++----- .../Basket.API/appsettings.Development.json | 5 +++-- src/Services/Basket/Basket.API/appsettings.json | 10 ++++++++++ src/docker-compose.override.yml | 5 +++-- 6 files changed, 42 insertions(+), 9 deletions(-) create mode 100644 src/BuildingBlocks/EventBus/EventBus/Utils.cs diff --git a/src/BuildingBlocks/EventBus/EventBus/Utils.cs b/src/BuildingBlocks/EventBus/EventBus/Utils.cs new file mode 100644 index 000000000..23d6d7539 --- /dev/null +++ b/src/BuildingBlocks/EventBus/EventBus/Utils.cs @@ -0,0 +1,15 @@ +using System.Linq; +using System.Text; + +namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBus; + +public static class Utils +{ + public static string CalculateMd5Hash(string input) + { + using var md5 = System.Security.Cryptography.MD5.Create(); + var inputBytes = Encoding.ASCII.GetBytes(input); + var hashBytes = md5.ComputeHash(inputBytes); + return Convert.ToHexString(hashBytes); + } +} \ No newline at end of file diff --git a/src/BuildingBlocks/EventBus/EventBusKafka/EventBusKafka.cs b/src/BuildingBlocks/EventBus/EventBusKafka/EventBusKafka.cs index 026a19c54..f83f6d92b 100644 --- a/src/BuildingBlocks/EventBus/EventBusKafka/EventBusKafka.cs +++ b/src/BuildingBlocks/EventBus/EventBusKafka/EventBusKafka.cs @@ -34,6 +34,9 @@ public class EventBusKafka : IEventBus, IDisposable var message = new Message { Key = eventName, Value = jsonMessage }; var kafkaHandle = new DependentProducerBuilder(_persistentConnection.Handle).Build(); + + Console.WriteLine($"Publishing event: {eventName}\n Content: {Utils.CalculateMd5Hash(jsonMessage)}"); + kafkaHandle.ProduceAsync(TopicName, message); } diff --git a/src/BuildingBlocks/EventBus/EventBusKafka/KafkaConsumerBackgroundService.cs b/src/BuildingBlocks/EventBus/EventBusKafka/KafkaConsumerBackgroundService.cs index e6f7bd3ea..85744e0b9 100644 --- a/src/BuildingBlocks/EventBus/EventBusKafka/KafkaConsumerBackgroundService.cs +++ b/src/BuildingBlocks/EventBus/EventBusKafka/KafkaConsumerBackgroundService.cs @@ -1,3 +1,4 @@ +using System.Security.Cryptography; using Autofac; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.Hosting; @@ -14,16 +15,15 @@ public class KafkaConsumerBackgroundService : BackgroundService private const string AutofacScopeName = "eshop_event_bus"; private const string TopicName = "eshop_event_bus"; - - public KafkaConsumerBackgroundService(IConfiguration config, + public KafkaConsumerBackgroundService(IConfiguration configuration, IEventBusSubscriptionsManager subsManager, ILifetimeScope autofac, ILogger logger) { var consumerConfig = new ConsumerConfig(); - config.GetSection("Kafka:ConsumerSettings").Bind(consumerConfig); - + configuration.GetSection("Kafka:ConsumerSettings").Bind(consumerConfig); _kafkaConsumer = new ConsumerBuilder(consumerConfig).Build(); + _subsManager = subsManager; _autofac = autofac; _logger = logger; @@ -45,7 +45,10 @@ public class KafkaConsumerBackgroundService : BackgroundService var consumeResult = _kafkaConsumer.Consume(cancellationToken); var eventName = consumeResult.Message.Key; + var messageContent = consumeResult.Message.Value; + Console.WriteLine($"Consumed event: {eventName}\n Content: {Utils.CalculateMd5Hash(messageContent)}"); + if (!_subsManager.HasSubscriptionsForEvent(eventName)) { _logger.LogWarning("No subscription for Kafka event: {EventName}", eventName); @@ -71,7 +74,7 @@ public class KafkaConsumerBackgroundService : BackgroundService var handler = scope.ResolveOptional(subscription.HandlerType); if (handler == null) continue; var eventType = _subsManager.GetEventTypeByName(eventName); - var integrationEvent = JsonSerializer.Deserialize(consumeResult.Message.Value, + var integrationEvent = JsonSerializer.Deserialize(messageContent, eventType, new JsonSerializerOptions { diff --git a/src/Services/Basket/Basket.API/appsettings.Development.json b/src/Services/Basket/Basket.API/appsettings.Development.json index 5655c0e0f..9cdcf69b7 100644 --- a/src/Services/Basket/Basket.API/appsettings.Development.json +++ b/src/Services/Basket/Basket.API/appsettings.Development.json @@ -14,12 +14,13 @@ "ConnectionString": "127.0.0.1", "AzureServiceBusEnabled": false, "EventBusConnection": "localhost", + "KafkaEnabled": true, "Kafka": { "ProducerSettings": { - "BootstrapServers": "localhost:9092" + "BootstrapServers": "broker:9092" }, "ConsumerSettings": { - "BootstrapServers": "localhost:9092", + "BootstrapServers": "broker:9092", "GroupId": "basket-group-id" } } diff --git a/src/Services/Basket/Basket.API/appsettings.json b/src/Services/Basket/Basket.API/appsettings.json index 295294308..848e51028 100644 --- a/src/Services/Basket/Basket.API/appsettings.json +++ b/src/Services/Basket/Basket.API/appsettings.json @@ -26,5 +26,15 @@ "Name": "eshop", "ClientId": "your-client-id", "ClientSecret": "your-client-secret" + }, + "KafkaEnabled": true, + "Kafka": { + "ProducerSettings": { + "BootstrapServers": "broker:9092" + }, + "ConsumerSettings": { + "BootstrapServers": "broker:9092", + "GroupId": "basket-group-id" + } } } diff --git a/src/docker-compose.override.yml b/src/docker-compose.override.yml index bb51fe74e..4d0262c37 100644 --- a/src/docker-compose.override.yml +++ b/src/docker-compose.override.yml @@ -33,6 +33,7 @@ services: - "6379:6379" volumes: - eshop-basketdata:/data + rabbitmq: ports: - "15672:15672" @@ -47,8 +48,8 @@ services: environment: KAFKA_BROKER_ID: 1 KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181' - KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_INTERNAL:PLAINTEXT - KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092,PLAINTEXT_INTERNAL://broker:29092 + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:9092,PLAINTEXT_HOST://localhost:29092 KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1