Browse Source

Add logic for producing events to kafka eventbus.

For now we have a single topic for all events. Since
we use the eventname as the key for the kafka message,
we have the property that they all get assigned to the same
parition inside kafka and therefore are in-order.

Alternatively one could have multiple topics.

The code for the kafka connection is based on:

https://github.com/confluentinc/confluent-kafka-dotnet/blob/master/examples/Web/KafkaClientHandle.cs
pull/2068/head
Philipp Theyssen 2 years ago
parent
commit
e1eac82289
4 changed files with 44 additions and 4 deletions
  1. +27
    -1
      src/BuildingBlocks/EventBus/EventBusKafka/DefaultKafkaPersistentConnection.cs
  2. +15
    -2
      src/BuildingBlocks/EventBus/EventBusKafka/EventBusKafka.cs
  3. +1
    -0
      src/BuildingBlocks/EventBus/EventBusKafka/GlobalUsings.cs
  4. +1
    -1
      src/BuildingBlocks/EventBus/EventBusKafka/IKafkaPersistentConnection.cs

+ 27
- 1
src/BuildingBlocks/EventBus/EventBusKafka/DefaultKafkaPersistentConnection.cs View File

@ -1,10 +1,36 @@
namespace EventBusKafka; namespace EventBusKafka;
// Abstracts how to connect to Kafka client
/// <summary>
/// Class for making sure we do not open new producer context (expensive)
/// everytime a service publishes an event.
/// On startup each service creates an singleton instance of this class,
/// which is then used when publishing any events.
///
/// based on https://github.com/confluentinc/confluent-kafka-dotnet/blob/master/examples/Web/KafkaClientHandle.cs
///
/// </summary>
public class DefaultKafkaPersistentConnection public class DefaultKafkaPersistentConnection
: IKafkaPersistentConnection : IKafkaPersistentConnection
{ {
IProducer<byte[], byte[]> _kafkaProducer;
public DefaultKafkaPersistentConnection(String brokerList)
{
// TODO: fix configuration passing for producer
// for now just assume we give "localhost:9092" as argument
var conf = new ProducerConfig { BootstrapServers = brokerList };
_kafkaProducer = new ProducerBuilder<byte[], byte[]>(conf).Build();
}
public Handle Handle => _kafkaProducer.Handle;
public void Dispose() public void Dispose()
{ {
// Block until all outstanding produce requests have completed (with or
// without error).
_kafkaProducer.Flush();
_kafkaProducer.Dispose();
} }
} }

+ 15
- 2
src/BuildingBlocks/EventBus/EventBusKafka/EventBusKafka.cs View File

@ -2,12 +2,17 @@ namespace EventBusKafka;
public class EventBusKafka : IEventBus, IDisposable public class EventBusKafka : IEventBus, IDisposable
{ {
const string BROKER_NAME = "eshop_event_bus";
// for now use single topic and event names as keys for messages
// such that they always land in same partition and we have ordering guarantee
// then the consumers have to ignore events they are not subscribed to
// alternatively could have multiple topics (associated with event name)
private readonly string _topicName = "eshop_event_bus";
private readonly IKafkaPersistentConnection _persistentConnection; private readonly IKafkaPersistentConnection _persistentConnection;
private readonly ILogger<EventBusKafka> _logger; private readonly ILogger<EventBusKafka> _logger;
private readonly IEventBusSubscriptionsManager _subsManager; private readonly IEventBusSubscriptionsManager _subsManager;
private readonly int _retryCount; private readonly int _retryCount;
private const string INTEGRATION_EVENT_SUFFIX = "IntegrationEvent";
// Object that will be registered as singleton to each service on startup, // Object that will be registered as singleton to each service on startup,
@ -23,7 +28,15 @@ public class EventBusKafka : IEventBus, IDisposable
public void Publish(IntegrationEvent @event) public void Publish(IntegrationEvent @event)
{ {
throw new NotImplementedException();
var eventName = @event.GetType().Name.Replace(INTEGRATION_EVENT_SUFFIX, "");
var jsonMessage = JsonSerializer.Serialize(@event, @event.GetType());
// map Integration event to kafka message
// event name something like OrderPaymentSucceededIntegrationEvent
var message = new Message<string, string> { Key = eventName, Value = jsonMessage };
IProducer<string, string> kafkaHandle =
new DependentProducerBuilder<string, string>(_persistentConnection.Handle).Build();
kafkaHandle.ProduceAsync(_topicName, message);
} }
public void Subscribe<T, TH>() where T : IntegrationEvent where TH : IIntegrationEventHandler<T> public void Subscribe<T, TH>() where T : IntegrationEvent where TH : IIntegrationEventHandler<T>


+ 1
- 0
src/BuildingBlocks/EventBus/EventBusKafka/GlobalUsings.cs View File

@ -7,6 +7,7 @@ global using Microsoft.eShopOnContainers.BuildingBlocks.EventBus;
global using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Abstractions; global using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Abstractions;
global using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Events; global using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Events;
global using Microsoft.Extensions.Logging; global using Microsoft.Extensions.Logging;
global using Confluent.Kafka;
global using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Extensions; global using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Extensions;
global using System.Text; global using System.Text;
global using System.Threading.Tasks; global using System.Threading.Tasks;

+ 1
- 1
src/BuildingBlocks/EventBus/EventBusKafka/IKafkaPersistentConnection.cs View File

@ -2,5 +2,5 @@ namespace EventBusKafka;
public interface IKafkaPersistentConnection : IDisposable public interface IKafkaPersistentConnection : IDisposable
{ {
Handle Handle { get; }
} }

Loading…
Cancel
Save