From e1eac82289150b7c5134903941872c212010b6b4 Mon Sep 17 00:00:00 2001 From: Philipp Theyssen Date: Mon, 13 Feb 2023 17:27:24 +0100 Subject: [PATCH] Add logic for producing events to kafka eventbus. For now we have a single topic for all events. Since we use the eventname as the key for the kafka message, we have the property that they all get assigned to the same parition inside kafka and therefore are in-order. Alternatively one could have multiple topics. The code for the kafka connection is based on: https://github.com/confluentinc/confluent-kafka-dotnet/blob/master/examples/Web/KafkaClientHandle.cs --- .../DefaultKafkaPersistentConnection.cs | 28 ++++++++++++++++++- .../EventBus/EventBusKafka/EventBusKafka.cs | 17 +++++++++-- .../EventBus/EventBusKafka/GlobalUsings.cs | 1 + .../IKafkaPersistentConnection.cs | 2 +- 4 files changed, 44 insertions(+), 4 deletions(-) diff --git a/src/BuildingBlocks/EventBus/EventBusKafka/DefaultKafkaPersistentConnection.cs b/src/BuildingBlocks/EventBus/EventBusKafka/DefaultKafkaPersistentConnection.cs index 6a4e38df8..64c241d81 100644 --- a/src/BuildingBlocks/EventBus/EventBusKafka/DefaultKafkaPersistentConnection.cs +++ b/src/BuildingBlocks/EventBus/EventBusKafka/DefaultKafkaPersistentConnection.cs @@ -1,10 +1,36 @@ namespace EventBusKafka; -// Abstracts how to connect to Kafka client + +/// +/// Class for making sure we do not open new producer context (expensive) +/// everytime a service publishes an event. +/// On startup each service creates an singleton instance of this class, +/// which is then used when publishing any events. +/// +/// based on https://github.com/confluentinc/confluent-kafka-dotnet/blob/master/examples/Web/KafkaClientHandle.cs +/// +/// public class DefaultKafkaPersistentConnection : IKafkaPersistentConnection { + + IProducer _kafkaProducer; + + public DefaultKafkaPersistentConnection(String brokerList) + { + // TODO: fix configuration passing for producer + // for now just assume we give "localhost:9092" as argument + var conf = new ProducerConfig { BootstrapServers = brokerList }; + _kafkaProducer = new ProducerBuilder(conf).Build(); + } + + public Handle Handle => _kafkaProducer.Handle; + public void Dispose() { + // Block until all outstanding produce requests have completed (with or + // without error). + _kafkaProducer.Flush(); + _kafkaProducer.Dispose(); } } \ No newline at end of file diff --git a/src/BuildingBlocks/EventBus/EventBusKafka/EventBusKafka.cs b/src/BuildingBlocks/EventBus/EventBusKafka/EventBusKafka.cs index d9a26fb3d..e99aee831 100644 --- a/src/BuildingBlocks/EventBus/EventBusKafka/EventBusKafka.cs +++ b/src/BuildingBlocks/EventBus/EventBusKafka/EventBusKafka.cs @@ -2,12 +2,17 @@ namespace EventBusKafka; public class EventBusKafka : IEventBus, IDisposable { - const string BROKER_NAME = "eshop_event_bus"; + // for now use single topic and event names as keys for messages + // such that they always land in same partition and we have ordering guarantee + // then the consumers have to ignore events they are not subscribed to + // alternatively could have multiple topics (associated with event name) + private readonly string _topicName = "eshop_event_bus"; private readonly IKafkaPersistentConnection _persistentConnection; private readonly ILogger _logger; private readonly IEventBusSubscriptionsManager _subsManager; private readonly int _retryCount; + private const string INTEGRATION_EVENT_SUFFIX = "IntegrationEvent"; // Object that will be registered as singleton to each service on startup, @@ -23,7 +28,15 @@ public class EventBusKafka : IEventBus, IDisposable public void Publish(IntegrationEvent @event) { - throw new NotImplementedException(); + var eventName = @event.GetType().Name.Replace(INTEGRATION_EVENT_SUFFIX, ""); + var jsonMessage = JsonSerializer.Serialize(@event, @event.GetType()); + + // map Integration event to kafka message + // event name something like OrderPaymentSucceededIntegrationEvent + var message = new Message { Key = eventName, Value = jsonMessage }; + IProducer kafkaHandle = + new DependentProducerBuilder(_persistentConnection.Handle).Build(); + kafkaHandle.ProduceAsync(_topicName, message); } public void Subscribe() where T : IntegrationEvent where TH : IIntegrationEventHandler diff --git a/src/BuildingBlocks/EventBus/EventBusKafka/GlobalUsings.cs b/src/BuildingBlocks/EventBus/EventBusKafka/GlobalUsings.cs index 2928d3520..6dbe6ba33 100644 --- a/src/BuildingBlocks/EventBus/EventBusKafka/GlobalUsings.cs +++ b/src/BuildingBlocks/EventBus/EventBusKafka/GlobalUsings.cs @@ -7,6 +7,7 @@ global using Microsoft.eShopOnContainers.BuildingBlocks.EventBus; global using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Abstractions; global using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Events; global using Microsoft.Extensions.Logging; +global using Confluent.Kafka; global using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Extensions; global using System.Text; global using System.Threading.Tasks; diff --git a/src/BuildingBlocks/EventBus/EventBusKafka/IKafkaPersistentConnection.cs b/src/BuildingBlocks/EventBus/EventBusKafka/IKafkaPersistentConnection.cs index 94254c699..008c46211 100644 --- a/src/BuildingBlocks/EventBus/EventBusKafka/IKafkaPersistentConnection.cs +++ b/src/BuildingBlocks/EventBus/EventBusKafka/IKafkaPersistentConnection.cs @@ -2,5 +2,5 @@ namespace EventBusKafka; public interface IKafkaPersistentConnection : IDisposable { - + Handle Handle { get; } } \ No newline at end of file