From 72f2968bea9dc8c92d3716156b7dd1631e26d528 Mon Sep 17 00:00:00 2001 From: kct949 Date: Wed, 15 Feb 2023 15:03:26 +0100 Subject: [PATCH] implement subscribe for kafka + minor fixes --- .../DefaultKafkaPersistentConnection.cs | 20 ++- .../EventBus/EventBusKafka/EventBusKafka.cs | 16 ++- .../EventBusKafka/EventBusKafka.csproj | 2 + .../IKafkaPersistentConnection.cs | 3 +- .../KafkaConsumerBackgroundService.cs | 117 ++++++++++++++++++ .../Basket/Basket.API/GlobalUsings.cs | 1 + src/Services/Basket/Basket.API/Startup.cs | 27 +--- .../Basket.API/appsettings.Development.json | 11 +- .../Catalog.API/appsettings.Development.json | 11 +- .../appsettings.Development.json | 9 ++ .../Payment.API/appsettings.Development.json | 9 ++ 11 files changed, 178 insertions(+), 48 deletions(-) create mode 100644 src/BuildingBlocks/EventBus/EventBusKafka/KafkaConsumerBackgroundService.cs diff --git a/src/BuildingBlocks/EventBus/EventBusKafka/DefaultKafkaPersistentConnection.cs b/src/BuildingBlocks/EventBus/EventBusKafka/DefaultKafkaPersistentConnection.cs index 2b5a9b965..d57959252 100644 --- a/src/BuildingBlocks/EventBus/EventBusKafka/DefaultKafkaPersistentConnection.cs +++ b/src/BuildingBlocks/EventBus/EventBusKafka/DefaultKafkaPersistentConnection.cs @@ -1,5 +1,6 @@ -namespace EventBusKafka; +using Microsoft.Extensions.Configuration; +namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusKafka; /// /// Class for making sure we do not open new producer context (expensive) @@ -13,20 +14,13 @@ namespace EventBusKafka; public class DefaultKafkaPersistentConnection : IKafkaPersistentConnection { + private readonly IProducer _kafkaProducer; - private readonly ILogger _logger; - IProducer _kafkaProducer; - - public DefaultKafkaPersistentConnection(String brokerList, - ILogger logger) + public DefaultKafkaPersistentConnection(IConfiguration configuration) { - _logger = logger ?? throw new ArgumentNullException(nameof(logger)); - // TODO: fix configuration passing for producer - // for now just assume we give "localhost:9092" as argument - var conf = new ProducerConfig { BootstrapServers = brokerList }; - - // TODO maybe we need to retry this? -> as it could fail - _kafkaProducer = new ProducerBuilder(conf).Build(); + var producerConfig = new ProducerConfig(); + configuration.GetSection("Kafka:ProducerSettings").Bind(producerConfig); + _kafkaProducer = new ProducerBuilder(producerConfig).Build(); } public Handle Handle => _kafkaProducer.Handle; diff --git a/src/BuildingBlocks/EventBus/EventBusKafka/EventBusKafka.cs b/src/BuildingBlocks/EventBus/EventBusKafka/EventBusKafka.cs index 199e82a74..026a19c54 100644 --- a/src/BuildingBlocks/EventBus/EventBusKafka/EventBusKafka.cs +++ b/src/BuildingBlocks/EventBus/EventBusKafka/EventBusKafka.cs @@ -1,4 +1,4 @@ -namespace EventBusKafka; +namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusKafka; public class EventBusKafka : IEventBus, IDisposable { @@ -6,29 +6,27 @@ public class EventBusKafka : IEventBus, IDisposable // such that they always land in same partition and we have ordering guarantee // then the consumers have to ignore events they are not subscribed to // alternatively could have multiple topics (associated with event name) - private readonly string _topicName = "eshop_event_bus"; + private const string TopicName = "eshop_event_bus"; private readonly IKafkaPersistentConnection _persistentConnection; private readonly ILogger _logger; private readonly IEventBusSubscriptionsManager _subsManager; - private readonly int _retryCount; - private const string INTEGRATION_EVENT_SUFFIX = "IntegrationEvent"; + private const string IntegrationEventSuffix = "IntegrationEvent"; // Object that will be registered as singleton to each service on startup, // which will be used to publish and subscribe to events. public EventBusKafka(IKafkaPersistentConnection persistentConnection, - ILogger logger, IEventBusSubscriptionsManager subsManager, int retryCount = 5) + ILogger logger, IEventBusSubscriptionsManager subsManager) { _persistentConnection = persistentConnection ?? throw new ArgumentNullException(nameof(persistentConnection)); _logger = logger ?? throw new ArgumentNullException(nameof(logger)); - _subsManager = subsManager ?? new InMemoryEventBusSubscriptionsManager(); - _retryCount = retryCount; + _subsManager = subsManager; } public void Publish(IntegrationEvent @event) { - var eventName = @event.GetType().Name.Replace(INTEGRATION_EVENT_SUFFIX, ""); + var eventName = @event.GetType().Name.Replace(IntegrationEventSuffix, ""); var jsonMessage = JsonSerializer.Serialize(@event, @event.GetType()); // map Integration event to kafka message @@ -36,7 +34,7 @@ public class EventBusKafka : IEventBus, IDisposable var message = new Message { Key = eventName, Value = jsonMessage }; var kafkaHandle = new DependentProducerBuilder(_persistentConnection.Handle).Build(); - kafkaHandle.ProduceAsync(_topicName, message); + kafkaHandle.ProduceAsync(TopicName, message); } public void Subscribe() where T : IntegrationEvent where TH : IIntegrationEventHandler diff --git a/src/BuildingBlocks/EventBus/EventBusKafka/EventBusKafka.csproj b/src/BuildingBlocks/EventBus/EventBusKafka/EventBusKafka.csproj index c36791642..74961ceff 100644 --- a/src/BuildingBlocks/EventBus/EventBusKafka/EventBusKafka.csproj +++ b/src/BuildingBlocks/EventBus/EventBusKafka/EventBusKafka.csproj @@ -11,7 +11,9 @@ + + diff --git a/src/BuildingBlocks/EventBus/EventBusKafka/IKafkaPersistentConnection.cs b/src/BuildingBlocks/EventBus/EventBusKafka/IKafkaPersistentConnection.cs index 008c46211..25514a50e 100644 --- a/src/BuildingBlocks/EventBus/EventBusKafka/IKafkaPersistentConnection.cs +++ b/src/BuildingBlocks/EventBus/EventBusKafka/IKafkaPersistentConnection.cs @@ -1,4 +1,5 @@ -namespace EventBusKafka; +namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusKafka; + public interface IKafkaPersistentConnection : IDisposable { diff --git a/src/BuildingBlocks/EventBus/EventBusKafka/KafkaConsumerBackgroundService.cs b/src/BuildingBlocks/EventBus/EventBusKafka/KafkaConsumerBackgroundService.cs new file mode 100644 index 000000000..e6f7bd3ea --- /dev/null +++ b/src/BuildingBlocks/EventBus/EventBusKafka/KafkaConsumerBackgroundService.cs @@ -0,0 +1,117 @@ +using Autofac; +using Microsoft.Extensions.Configuration; +using Microsoft.Extensions.Hosting; + +namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusKafka; + +/* Inspired by https://github.com/confluentinc/confluent-kafka-dotnet/blob/master/examples/Web/RequestTimeConsumer.cs */ +public class KafkaConsumerBackgroundService : BackgroundService +{ + private readonly IEventBusSubscriptionsManager _subsManager; + private readonly ILifetimeScope _autofac; + private readonly ILogger _logger; + private readonly IConsumer _kafkaConsumer; + private const string AutofacScopeName = "eshop_event_bus"; + private const string TopicName = "eshop_event_bus"; + + + public KafkaConsumerBackgroundService(IConfiguration config, + IEventBusSubscriptionsManager subsManager, + ILifetimeScope autofac, + ILogger logger) + { + var consumerConfig = new ConsumerConfig(); + config.GetSection("Kafka:ConsumerSettings").Bind(consumerConfig); + + _kafkaConsumer = new ConsumerBuilder(consumerConfig).Build(); + _subsManager = subsManager; + _autofac = autofac; + _logger = logger; + } + + protected override Task ExecuteAsync(CancellationToken stoppingToken) + { + return Task.Run(() => StartConsumerLoop(stoppingToken), stoppingToken); + } + + private async Task StartConsumerLoop(CancellationToken cancellationToken) + { + _kafkaConsumer.Subscribe(TopicName); + + while (!cancellationToken.IsCancellationRequested) + { + try + { + var consumeResult = _kafkaConsumer.Consume(cancellationToken); + + var eventName = consumeResult.Message.Key; + + if (!_subsManager.HasSubscriptionsForEvent(eventName)) + { + _logger.LogWarning("No subscription for Kafka event: {EventName}", eventName); + continue; + } + + await using var scope = _autofac.BeginLifetimeScope(AutofacScopeName); + var subscriptions = _subsManager.GetHandlersForEvent(eventName); + foreach (var subscription in subscriptions) + { + #region dynamic subscription + /* We do not support dynamic subscription at the moment*/ + // if (subscription.IsDynamic) + // { + // if (scope.ResolveOptional(subscription.HandlerType) is not IDynamicIntegrationEventHandler handler) continue; + // using dynamic eventData = JsonDocument.Parse(message); + // await Task.Yield(); + // await handler.Handle(eventData); + // } + #endregion + + /* The code below applies to non-dynamic subscriptions only */ + var handler = scope.ResolveOptional(subscription.HandlerType); + if (handler == null) continue; + var eventType = _subsManager.GetEventTypeByName(eventName); + var integrationEvent = JsonSerializer.Deserialize(consumeResult.Message.Value, + eventType, + new JsonSerializerOptions + { + PropertyNameCaseInsensitive = true + }); + var concreteType = typeof(IIntegrationEventHandler<>).MakeGenericType(eventType); + + await Task.Yield(); + await (Task)concreteType.GetMethod("Handle") + .Invoke(handler, new object[] { integrationEvent }); + } + } + catch (OperationCanceledException) + { + break; + } + catch (ConsumeException e) + { + // Consumer errors should generally be ignored (or logged) unless fatal. + Console.WriteLine($"Consume error: {e.Error.Reason}"); + + if (e.Error.IsFatal) + { + // https://github.com/edenhill/librdkafka/blob/master/INTRODUCTION.md#fatal-consumer-errors + break; + } + } + catch (Exception e) + { + Console.WriteLine($"Unexpected error: {e}"); + break; + } + } + } + + public override void Dispose() + { + _kafkaConsumer.Close(); // Commit offsets and leave the group cleanly. + _kafkaConsumer.Dispose(); + + base.Dispose(); + } +} \ No newline at end of file diff --git a/src/Services/Basket/Basket.API/GlobalUsings.cs b/src/Services/Basket/Basket.API/GlobalUsings.cs index b2e13ab17..22426e3db 100644 --- a/src/Services/Basket/Basket.API/GlobalUsings.cs +++ b/src/Services/Basket/Basket.API/GlobalUsings.cs @@ -28,6 +28,7 @@ global using Azure.Messaging.ServiceBus; global using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Abstractions; global using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Events; global using Microsoft.eShopOnContainers.BuildingBlocks.EventBus; +global using Microsoft.eShopOnContainers.BuildingBlocks.EventBusKafka; global using Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ; global using Microsoft.eShopOnContainers.BuildingBlocks.EventBusServiceBus; global using Microsoft.eShopOnContainers.Services.Basket.API.Controllers; diff --git a/src/Services/Basket/Basket.API/Startup.cs b/src/Services/Basket/Basket.API/Startup.cs index 70c7b533a..0682f4b77 100644 --- a/src/Services/Basket/Basket.API/Startup.cs +++ b/src/Services/Basket/Basket.API/Startup.cs @@ -1,7 +1,3 @@ -using EventBusKafka; -using Microsoft.AspNetCore.Authentication.Cookies; -using Microsoft.AspNetCore.Authentication.OpenIdConnect; - namespace Microsoft.eShopOnContainers.Services.Basket.API; public class Startup { @@ -93,13 +89,7 @@ public class Startup } else if (Configuration.GetValue("KafkaEnabled")) { - services.AddSingleton(sp => - { - var logger = sp.GetRequiredService>(); - - // TODO add retry, better config passing here - return new DefaultKafkaPersistentConnection("localhost:9092",logger); - }); + services.AddSingleton(); } else { @@ -264,7 +254,7 @@ public class Startup var iLifetimeScope = sp.GetRequiredService(); var logger = sp.GetRequiredService>(); var eventBusSubscriptionsManager = sp.GetRequiredService(); - string subscriptionName = Configuration["SubscriptionClientName"]; + var subscriptionName = Configuration["SubscriptionClientName"]; return new EventBusServiceBus(serviceBusPersisterConnection, logger, eventBusSubscriptionsManager, iLifetimeScope, subscriptionName); @@ -272,17 +262,8 @@ public class Startup } else if (Configuration.GetValue("KafkaEnabled")) { - services.AddSingleton(sp => - { - var kafkaPersistentConnection = sp.GetRequiredService(); - var logger = sp.GetRequiredService>(); - var eventBusSubscriptionsManager = sp.GetRequiredService(); - string subscriptionName = Configuration["SubscriptionClientName"]; - - // TODO fix namespace -> global using - return new global::EventBusKafka.EventBusKafka(kafkaPersistentConnection, logger, - eventBusSubscriptionsManager, 5); - }); + services.AddHostedService(); + services.AddSingleton(); } else { diff --git a/src/Services/Basket/Basket.API/appsettings.Development.json b/src/Services/Basket/Basket.API/appsettings.Development.json index f4a3b9407..5655c0e0f 100644 --- a/src/Services/Basket/Basket.API/appsettings.Development.json +++ b/src/Services/Basket/Basket.API/appsettings.Development.json @@ -13,5 +13,14 @@ "IdentityUrl": "http://localhost:5105", "ConnectionString": "127.0.0.1", "AzureServiceBusEnabled": false, - "EventBusConnection": "localhost" + "EventBusConnection": "localhost", + "Kafka": { + "ProducerSettings": { + "BootstrapServers": "localhost:9092" + }, + "ConsumerSettings": { + "BootstrapServers": "localhost:9092", + "GroupId": "basket-group-id" + } + } } \ No newline at end of file diff --git a/src/Services/Catalog/Catalog.API/appsettings.Development.json b/src/Services/Catalog/Catalog.API/appsettings.Development.json index 1d5574f63..6154a50e6 100644 --- a/src/Services/Catalog/Catalog.API/appsettings.Development.json +++ b/src/Services/Catalog/Catalog.API/appsettings.Development.json @@ -11,5 +11,14 @@ } } }, - "EventBusConnection": "localhost" + "EventBusConnection": "localhost", + "Kafka": { + "ProducerSettings": { + "BootstrapServers": "localhost:9092" + }, + "ConsumerSettings": { + "BootstrapServers": "localhost:9092", + "GroupId": "catalog-group-id" + } + } } \ No newline at end of file diff --git a/src/Services/Ordering/Ordering.BackgroundTasks/appsettings.Development.json b/src/Services/Ordering/Ordering.BackgroundTasks/appsettings.Development.json index e203e9407..6fdbdfcc1 100644 --- a/src/Services/Ordering/Ordering.BackgroundTasks/appsettings.Development.json +++ b/src/Services/Ordering/Ordering.BackgroundTasks/appsettings.Development.json @@ -5,5 +5,14 @@ "System": "Information", "Microsoft": "Information" } + }, + "Kafka": { + "ProducerSettings": { + "BootstrapServers": "localhost:9092" + }, + "ConsumerSettings": { + "BootstrapServers": "localhost:9092", + "GroupId": "ordering-group-id" + } } } diff --git a/src/Services/Payment/Payment.API/appsettings.Development.json b/src/Services/Payment/Payment.API/appsettings.Development.json index fa8ce71a9..f8d627ace 100644 --- a/src/Services/Payment/Payment.API/appsettings.Development.json +++ b/src/Services/Payment/Payment.API/appsettings.Development.json @@ -6,5 +6,14 @@ "System": "Information", "Microsoft": "Information" } + }, + "Kafka": { + "ProducerSettings": { + "BootstrapServers": "localhost:9092" + }, + "ConsumerSettings": { + "BootstrapServers": "localhost:9092", + "GroupId": "payment-group-id" + } } }