diff --git a/src/BuildingBlocks/EventBus/EventBusKafka/DefaultKafkaPersistentConnection.cs b/src/BuildingBlocks/EventBus/EventBusKafka/DefaultKafkaPersistentConnection.cs index 6a4e38df8..64c241d81 100644 --- a/src/BuildingBlocks/EventBus/EventBusKafka/DefaultKafkaPersistentConnection.cs +++ b/src/BuildingBlocks/EventBus/EventBusKafka/DefaultKafkaPersistentConnection.cs @@ -1,10 +1,36 @@ namespace EventBusKafka; -// Abstracts how to connect to Kafka client + +/// +/// Class for making sure we do not open new producer context (expensive) +/// everytime a service publishes an event. +/// On startup each service creates an singleton instance of this class, +/// which is then used when publishing any events. +/// +/// based on https://github.com/confluentinc/confluent-kafka-dotnet/blob/master/examples/Web/KafkaClientHandle.cs +/// +/// public class DefaultKafkaPersistentConnection : IKafkaPersistentConnection { + + IProducer _kafkaProducer; + + public DefaultKafkaPersistentConnection(String brokerList) + { + // TODO: fix configuration passing for producer + // for now just assume we give "localhost:9092" as argument + var conf = new ProducerConfig { BootstrapServers = brokerList }; + _kafkaProducer = new ProducerBuilder(conf).Build(); + } + + public Handle Handle => _kafkaProducer.Handle; + public void Dispose() { + // Block until all outstanding produce requests have completed (with or + // without error). + _kafkaProducer.Flush(); + _kafkaProducer.Dispose(); } } \ No newline at end of file diff --git a/src/BuildingBlocks/EventBus/EventBusKafka/EventBusKafka.cs b/src/BuildingBlocks/EventBus/EventBusKafka/EventBusKafka.cs index d9a26fb3d..e99aee831 100644 --- a/src/BuildingBlocks/EventBus/EventBusKafka/EventBusKafka.cs +++ b/src/BuildingBlocks/EventBus/EventBusKafka/EventBusKafka.cs @@ -2,12 +2,17 @@ namespace EventBusKafka; public class EventBusKafka : IEventBus, IDisposable { - const string BROKER_NAME = "eshop_event_bus"; + // for now use single topic and event names as keys for messages + // such that they always land in same partition and we have ordering guarantee + // then the consumers have to ignore events they are not subscribed to + // alternatively could have multiple topics (associated with event name) + private readonly string _topicName = "eshop_event_bus"; private readonly IKafkaPersistentConnection _persistentConnection; private readonly ILogger _logger; private readonly IEventBusSubscriptionsManager _subsManager; private readonly int _retryCount; + private const string INTEGRATION_EVENT_SUFFIX = "IntegrationEvent"; // Object that will be registered as singleton to each service on startup, @@ -23,7 +28,15 @@ public class EventBusKafka : IEventBus, IDisposable public void Publish(IntegrationEvent @event) { - throw new NotImplementedException(); + var eventName = @event.GetType().Name.Replace(INTEGRATION_EVENT_SUFFIX, ""); + var jsonMessage = JsonSerializer.Serialize(@event, @event.GetType()); + + // map Integration event to kafka message + // event name something like OrderPaymentSucceededIntegrationEvent + var message = new Message { Key = eventName, Value = jsonMessage }; + IProducer kafkaHandle = + new DependentProducerBuilder(_persistentConnection.Handle).Build(); + kafkaHandle.ProduceAsync(_topicName, message); } public void Subscribe() where T : IntegrationEvent where TH : IIntegrationEventHandler diff --git a/src/BuildingBlocks/EventBus/EventBusKafka/GlobalUsings.cs b/src/BuildingBlocks/EventBus/EventBusKafka/GlobalUsings.cs index 2928d3520..6dbe6ba33 100644 --- a/src/BuildingBlocks/EventBus/EventBusKafka/GlobalUsings.cs +++ b/src/BuildingBlocks/EventBus/EventBusKafka/GlobalUsings.cs @@ -7,6 +7,7 @@ global using Microsoft.eShopOnContainers.BuildingBlocks.EventBus; global using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Abstractions; global using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Events; global using Microsoft.Extensions.Logging; +global using Confluent.Kafka; global using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Extensions; global using System.Text; global using System.Threading.Tasks; diff --git a/src/BuildingBlocks/EventBus/EventBusKafka/IKafkaPersistentConnection.cs b/src/BuildingBlocks/EventBus/EventBusKafka/IKafkaPersistentConnection.cs index 94254c699..008c46211 100644 --- a/src/BuildingBlocks/EventBus/EventBusKafka/IKafkaPersistentConnection.cs +++ b/src/BuildingBlocks/EventBus/EventBusKafka/IKafkaPersistentConnection.cs @@ -2,5 +2,5 @@ namespace EventBusKafka; public interface IKafkaPersistentConnection : IDisposable { - + Handle Handle { get; } } \ No newline at end of file