From 4b537d2aadddc77bc65bfa443b5579191a775d66 Mon Sep 17 00:00:00 2001 From: kct949 Date: Wed, 15 Feb 2023 22:59:10 +0100 Subject: [PATCH] working version --- src/BuildingBlocks/EventBus/EventBus/Utils.cs | 15 +++++++++++++++ .../EventBus/EventBusKafka/EventBusKafka.cs | 3 +++ .../KafkaConsumerBackgroundService.cs | 13 ++++++++----- .../Basket.API/appsettings.Development.json | 5 +++-- src/Services/Basket/Basket.API/appsettings.json | 10 ++++++++++ src/docker-compose.override.yml | 5 +++-- 6 files changed, 42 insertions(+), 9 deletions(-) create mode 100644 src/BuildingBlocks/EventBus/EventBus/Utils.cs diff --git a/src/BuildingBlocks/EventBus/EventBus/Utils.cs b/src/BuildingBlocks/EventBus/EventBus/Utils.cs new file mode 100644 index 000000000..23d6d7539 --- /dev/null +++ b/src/BuildingBlocks/EventBus/EventBus/Utils.cs @@ -0,0 +1,15 @@ +using System.Linq; +using System.Text; + +namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBus; + +public static class Utils +{ + public static string CalculateMd5Hash(string input) + { + using var md5 = System.Security.Cryptography.MD5.Create(); + var inputBytes = Encoding.ASCII.GetBytes(input); + var hashBytes = md5.ComputeHash(inputBytes); + return Convert.ToHexString(hashBytes); + } +} \ No newline at end of file diff --git a/src/BuildingBlocks/EventBus/EventBusKafka/EventBusKafka.cs b/src/BuildingBlocks/EventBus/EventBusKafka/EventBusKafka.cs index 026a19c54..f83f6d92b 100644 --- a/src/BuildingBlocks/EventBus/EventBusKafka/EventBusKafka.cs +++ b/src/BuildingBlocks/EventBus/EventBusKafka/EventBusKafka.cs @@ -34,6 +34,9 @@ public class EventBusKafka : IEventBus, IDisposable var message = new Message { Key = eventName, Value = jsonMessage }; var kafkaHandle = new DependentProducerBuilder(_persistentConnection.Handle).Build(); + + Console.WriteLine($"Publishing event: {eventName}\n Content: {Utils.CalculateMd5Hash(jsonMessage)}"); + kafkaHandle.ProduceAsync(TopicName, message); } diff --git a/src/BuildingBlocks/EventBus/EventBusKafka/KafkaConsumerBackgroundService.cs b/src/BuildingBlocks/EventBus/EventBusKafka/KafkaConsumerBackgroundService.cs index e6f7bd3ea..85744e0b9 100644 --- a/src/BuildingBlocks/EventBus/EventBusKafka/KafkaConsumerBackgroundService.cs +++ b/src/BuildingBlocks/EventBus/EventBusKafka/KafkaConsumerBackgroundService.cs @@ -1,3 +1,4 @@ +using System.Security.Cryptography; using Autofac; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.Hosting; @@ -14,16 +15,15 @@ public class KafkaConsumerBackgroundService : BackgroundService private const string AutofacScopeName = "eshop_event_bus"; private const string TopicName = "eshop_event_bus"; - - public KafkaConsumerBackgroundService(IConfiguration config, + public KafkaConsumerBackgroundService(IConfiguration configuration, IEventBusSubscriptionsManager subsManager, ILifetimeScope autofac, ILogger logger) { var consumerConfig = new ConsumerConfig(); - config.GetSection("Kafka:ConsumerSettings").Bind(consumerConfig); - + configuration.GetSection("Kafka:ConsumerSettings").Bind(consumerConfig); _kafkaConsumer = new ConsumerBuilder(consumerConfig).Build(); + _subsManager = subsManager; _autofac = autofac; _logger = logger; @@ -45,7 +45,10 @@ public class KafkaConsumerBackgroundService : BackgroundService var consumeResult = _kafkaConsumer.Consume(cancellationToken); var eventName = consumeResult.Message.Key; + var messageContent = consumeResult.Message.Value; + Console.WriteLine($"Consumed event: {eventName}\n Content: {Utils.CalculateMd5Hash(messageContent)}"); + if (!_subsManager.HasSubscriptionsForEvent(eventName)) { _logger.LogWarning("No subscription for Kafka event: {EventName}", eventName); @@ -71,7 +74,7 @@ public class KafkaConsumerBackgroundService : BackgroundService var handler = scope.ResolveOptional(subscription.HandlerType); if (handler == null) continue; var eventType = _subsManager.GetEventTypeByName(eventName); - var integrationEvent = JsonSerializer.Deserialize(consumeResult.Message.Value, + var integrationEvent = JsonSerializer.Deserialize(messageContent, eventType, new JsonSerializerOptions { diff --git a/src/Services/Basket/Basket.API/appsettings.Development.json b/src/Services/Basket/Basket.API/appsettings.Development.json index 5655c0e0f..9cdcf69b7 100644 --- a/src/Services/Basket/Basket.API/appsettings.Development.json +++ b/src/Services/Basket/Basket.API/appsettings.Development.json @@ -14,12 +14,13 @@ "ConnectionString": "127.0.0.1", "AzureServiceBusEnabled": false, "EventBusConnection": "localhost", + "KafkaEnabled": true, "Kafka": { "ProducerSettings": { - "BootstrapServers": "localhost:9092" + "BootstrapServers": "broker:9092" }, "ConsumerSettings": { - "BootstrapServers": "localhost:9092", + "BootstrapServers": "broker:9092", "GroupId": "basket-group-id" } } diff --git a/src/Services/Basket/Basket.API/appsettings.json b/src/Services/Basket/Basket.API/appsettings.json index 295294308..848e51028 100644 --- a/src/Services/Basket/Basket.API/appsettings.json +++ b/src/Services/Basket/Basket.API/appsettings.json @@ -26,5 +26,15 @@ "Name": "eshop", "ClientId": "your-client-id", "ClientSecret": "your-client-secret" + }, + "KafkaEnabled": true, + "Kafka": { + "ProducerSettings": { + "BootstrapServers": "broker:9092" + }, + "ConsumerSettings": { + "BootstrapServers": "broker:9092", + "GroupId": "basket-group-id" + } } } diff --git a/src/docker-compose.override.yml b/src/docker-compose.override.yml index bb51fe74e..4d0262c37 100644 --- a/src/docker-compose.override.yml +++ b/src/docker-compose.override.yml @@ -33,6 +33,7 @@ services: - "6379:6379" volumes: - eshop-basketdata:/data + rabbitmq: ports: - "15672:15672" @@ -47,8 +48,8 @@ services: environment: KAFKA_BROKER_ID: 1 KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181' - KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_INTERNAL:PLAINTEXT - KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092,PLAINTEXT_INTERNAL://broker:29092 + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:9092,PLAINTEXT_HOST://localhost:29092 KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1