Browse Source

working version

pull/2068/head
kct949 1 year ago
parent
commit
4b537d2aad
6 changed files with 42 additions and 9 deletions
  1. +15
    -0
      src/BuildingBlocks/EventBus/EventBus/Utils.cs
  2. +3
    -0
      src/BuildingBlocks/EventBus/EventBusKafka/EventBusKafka.cs
  3. +8
    -5
      src/BuildingBlocks/EventBus/EventBusKafka/KafkaConsumerBackgroundService.cs
  4. +3
    -2
      src/Services/Basket/Basket.API/appsettings.Development.json
  5. +10
    -0
      src/Services/Basket/Basket.API/appsettings.json
  6. +3
    -2
      src/docker-compose.override.yml

+ 15
- 0
src/BuildingBlocks/EventBus/EventBus/Utils.cs View File

@ -0,0 +1,15 @@
using System.Linq;
using System.Text;
namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBus;
public static class Utils
{
public static string CalculateMd5Hash(string input)
{
using var md5 = System.Security.Cryptography.MD5.Create();
var inputBytes = Encoding.ASCII.GetBytes(input);
var hashBytes = md5.ComputeHash(inputBytes);
return Convert.ToHexString(hashBytes);
}
}

+ 3
- 0
src/BuildingBlocks/EventBus/EventBusKafka/EventBusKafka.cs View File

@ -34,6 +34,9 @@ public class EventBusKafka : IEventBus, IDisposable
var message = new Message<string, string> { Key = eventName, Value = jsonMessage }; var message = new Message<string, string> { Key = eventName, Value = jsonMessage };
var kafkaHandle = var kafkaHandle =
new DependentProducerBuilder<string, string>(_persistentConnection.Handle).Build(); new DependentProducerBuilder<string, string>(_persistentConnection.Handle).Build();
Console.WriteLine($"Publishing event: {eventName}\n Content: {Utils.CalculateMd5Hash(jsonMessage)}");
kafkaHandle.ProduceAsync(TopicName, message); kafkaHandle.ProduceAsync(TopicName, message);
} }


+ 8
- 5
src/BuildingBlocks/EventBus/EventBusKafka/KafkaConsumerBackgroundService.cs View File

@ -1,3 +1,4 @@
using System.Security.Cryptography;
using Autofac; using Autofac;
using Microsoft.Extensions.Configuration; using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Hosting;
@ -14,16 +15,15 @@ public class KafkaConsumerBackgroundService : BackgroundService
private const string AutofacScopeName = "eshop_event_bus"; private const string AutofacScopeName = "eshop_event_bus";
private const string TopicName = "eshop_event_bus"; private const string TopicName = "eshop_event_bus";
public KafkaConsumerBackgroundService(IConfiguration config,
public KafkaConsumerBackgroundService(IConfiguration configuration,
IEventBusSubscriptionsManager subsManager, IEventBusSubscriptionsManager subsManager,
ILifetimeScope autofac, ILifetimeScope autofac,
ILogger<KafkaConsumerBackgroundService> logger) ILogger<KafkaConsumerBackgroundService> logger)
{ {
var consumerConfig = new ConsumerConfig(); var consumerConfig = new ConsumerConfig();
config.GetSection("Kafka:ConsumerSettings").Bind(consumerConfig);
configuration.GetSection("Kafka:ConsumerSettings").Bind(consumerConfig);
_kafkaConsumer = new ConsumerBuilder<string, string>(consumerConfig).Build(); _kafkaConsumer = new ConsumerBuilder<string, string>(consumerConfig).Build();
_subsManager = subsManager; _subsManager = subsManager;
_autofac = autofac; _autofac = autofac;
_logger = logger; _logger = logger;
@ -45,7 +45,10 @@ public class KafkaConsumerBackgroundService : BackgroundService
var consumeResult = _kafkaConsumer.Consume(cancellationToken); var consumeResult = _kafkaConsumer.Consume(cancellationToken);
var eventName = consumeResult.Message.Key; var eventName = consumeResult.Message.Key;
var messageContent = consumeResult.Message.Value;
Console.WriteLine($"Consumed event: {eventName}\n Content: {Utils.CalculateMd5Hash(messageContent)}");
if (!_subsManager.HasSubscriptionsForEvent(eventName)) if (!_subsManager.HasSubscriptionsForEvent(eventName))
{ {
_logger.LogWarning("No subscription for Kafka event: {EventName}", eventName); _logger.LogWarning("No subscription for Kafka event: {EventName}", eventName);
@ -71,7 +74,7 @@ public class KafkaConsumerBackgroundService : BackgroundService
var handler = scope.ResolveOptional(subscription.HandlerType); var handler = scope.ResolveOptional(subscription.HandlerType);
if (handler == null) continue; if (handler == null) continue;
var eventType = _subsManager.GetEventTypeByName(eventName); var eventType = _subsManager.GetEventTypeByName(eventName);
var integrationEvent = JsonSerializer.Deserialize(consumeResult.Message.Value,
var integrationEvent = JsonSerializer.Deserialize(messageContent,
eventType, eventType,
new JsonSerializerOptions new JsonSerializerOptions
{ {


+ 3
- 2
src/Services/Basket/Basket.API/appsettings.Development.json View File

@ -14,12 +14,13 @@
"ConnectionString": "127.0.0.1", "ConnectionString": "127.0.0.1",
"AzureServiceBusEnabled": false, "AzureServiceBusEnabled": false,
"EventBusConnection": "localhost", "EventBusConnection": "localhost",
"KafkaEnabled": true,
"Kafka": { "Kafka": {
"ProducerSettings": { "ProducerSettings": {
"BootstrapServers": "localhost:9092"
"BootstrapServers": "broker:9092"
}, },
"ConsumerSettings": { "ConsumerSettings": {
"BootstrapServers": "localhost:9092",
"BootstrapServers": "broker:9092",
"GroupId": "basket-group-id" "GroupId": "basket-group-id"
} }
} }

+ 10
- 0
src/Services/Basket/Basket.API/appsettings.json View File

@ -26,5 +26,15 @@
"Name": "eshop", "Name": "eshop",
"ClientId": "your-client-id", "ClientId": "your-client-id",
"ClientSecret": "your-client-secret" "ClientSecret": "your-client-secret"
},
"KafkaEnabled": true,
"Kafka": {
"ProducerSettings": {
"BootstrapServers": "broker:9092"
},
"ConsumerSettings": {
"BootstrapServers": "broker:9092",
"GroupId": "basket-group-id"
}
} }
} }

+ 3
- 2
src/docker-compose.override.yml View File

@ -33,6 +33,7 @@ services:
- "6379:6379" - "6379:6379"
volumes: volumes:
- eshop-basketdata:/data - eshop-basketdata:/data
rabbitmq: rabbitmq:
ports: ports:
- "15672:15672" - "15672:15672"
@ -47,8 +48,8 @@ services:
environment: environment:
KAFKA_BROKER_ID: 1 KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181' KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_INTERNAL:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092,PLAINTEXT_INTERNAL://broker:29092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:9092,PLAINTEXT_HOST://localhost:29092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1


Loading…
Cancel
Save