diff --git a/src/BuildingBlocks/EventBus/EventBusKafka/DefaultKafkaPersistentConnection.cs b/src/BuildingBlocks/EventBus/EventBusKafka/DefaultKafkaPersistentConnection.cs index 64c241d81..2b5a9b965 100644 --- a/src/BuildingBlocks/EventBus/EventBusKafka/DefaultKafkaPersistentConnection.cs +++ b/src/BuildingBlocks/EventBus/EventBusKafka/DefaultKafkaPersistentConnection.cs @@ -14,13 +14,18 @@ public class DefaultKafkaPersistentConnection : IKafkaPersistentConnection { + private readonly ILogger _logger; IProducer _kafkaProducer; - public DefaultKafkaPersistentConnection(String brokerList) + public DefaultKafkaPersistentConnection(String brokerList, + ILogger logger) { + _logger = logger ?? throw new ArgumentNullException(nameof(logger)); // TODO: fix configuration passing for producer // for now just assume we give "localhost:9092" as argument var conf = new ProducerConfig { BootstrapServers = brokerList }; + + // TODO maybe we need to retry this? -> as it could fail _kafkaProducer = new ProducerBuilder(conf).Build(); } diff --git a/src/BuildingBlocks/EventBus/EventBusKafka/EventBusKafka.cs b/src/BuildingBlocks/EventBus/EventBusKafka/EventBusKafka.cs index e99aee831..199e82a74 100644 --- a/src/BuildingBlocks/EventBus/EventBusKafka/EventBusKafka.cs +++ b/src/BuildingBlocks/EventBus/EventBusKafka/EventBusKafka.cs @@ -34,14 +34,19 @@ public class EventBusKafka : IEventBus, IDisposable // map Integration event to kafka message // event name something like OrderPaymentSucceededIntegrationEvent var message = new Message { Key = eventName, Value = jsonMessage }; - IProducer kafkaHandle = + var kafkaHandle = new DependentProducerBuilder(_persistentConnection.Handle).Build(); kafkaHandle.ProduceAsync(_topicName, message); } public void Subscribe() where T : IntegrationEvent where TH : IIntegrationEventHandler { - throw new NotImplementedException(); + var eventName = _subsManager.GetEventKey(); + // DoInternalSubscription(eventName); + + _logger.LogInformation("Subscribing to event {EventName} with {EventHandler}", eventName, typeof(TH).GetGenericTypeName()); + + _subsManager.AddSubscription(); } public void SubscribeDynamic(string eventName) where TH : IDynamicIntegrationEventHandler diff --git a/src/Services/Basket/Basket.API/Basket.API.csproj b/src/Services/Basket/Basket.API/Basket.API.csproj index 1cf1b7dfc..4d1cc625d 100644 --- a/src/Services/Basket/Basket.API/Basket.API.csproj +++ b/src/Services/Basket/Basket.API/Basket.API.csproj @@ -54,6 +54,7 @@ + diff --git a/src/Services/Basket/Basket.API/Startup.cs b/src/Services/Basket/Basket.API/Startup.cs index 3ef2aa33f..70c7b533a 100644 --- a/src/Services/Basket/Basket.API/Startup.cs +++ b/src/Services/Basket/Basket.API/Startup.cs @@ -1,3 +1,4 @@ +using EventBusKafka; using Microsoft.AspNetCore.Authentication.Cookies; using Microsoft.AspNetCore.Authentication.OpenIdConnect; @@ -90,6 +91,16 @@ public class Startup return new DefaultServiceBusPersisterConnection(serviceBusConnectionString); }); } + else if (Configuration.GetValue("KafkaEnabled")) + { + services.AddSingleton(sp => + { + var logger = sp.GetRequiredService>(); + + // TODO add retry, better config passing here + return new DefaultKafkaPersistentConnection("localhost:9092",logger); + }); + } else { services.AddSingleton(sp => @@ -259,6 +270,20 @@ public class Startup eventBusSubscriptionsManager, iLifetimeScope, subscriptionName); }); } + else if (Configuration.GetValue("KafkaEnabled")) + { + services.AddSingleton(sp => + { + var kafkaPersistentConnection = sp.GetRequiredService(); + var logger = sp.GetRequiredService>(); + var eventBusSubscriptionsManager = sp.GetRequiredService(); + string subscriptionName = Configuration["SubscriptionClientName"]; + + // TODO fix namespace -> global using + return new global::EventBusKafka.EventBusKafka(kafkaPersistentConnection, logger, + eventBusSubscriptionsManager, 5); + }); + } else { services.AddSingleton(sp =>