From 30eb3a075e7ebf2ebff22e5e3abbccacc8b4976b Mon Sep 17 00:00:00 2001
From: Philipp Theyssen
Date: Mon, 13 Feb 2023 12:42:56 +0100
Subject: [PATCH] Add further details to Kafka eventbus skeleton.
So far only copied details from other eventbus implementations.
Key next steps are to implement a persistent connection abstraction (class)
for the Kafka eventbus and the publish and subscribe functions. For this
we need knowledge about how Kafka works, for example how one publishes
events topics etc.
---
.../DefaultKafkaPersistentConnection.cs | 6 ++-
.../EventBus/EventBusKafka/EventBusKafka.cs | 53 ++++++++++++++++++-
.../EventBusKafka/EventBusKafka.csproj | 11 ++++
.../EventBus/EventBusKafka/GlobalUsings.cs | 19 ++++---
.../IKafkaPersistentConnection.cs | 2 +-
5 files changed, 82 insertions(+), 9 deletions(-)
diff --git a/src/BuildingBlocks/EventBus/EventBusKafka/DefaultKafkaPersistentConnection.cs b/src/BuildingBlocks/EventBus/EventBusKafka/DefaultKafkaPersistentConnection.cs
index 17278bf02..6a4e38df8 100644
--- a/src/BuildingBlocks/EventBus/EventBusKafka/DefaultKafkaPersistentConnection.cs
+++ b/src/BuildingBlocks/EventBus/EventBusKafka/DefaultKafkaPersistentConnection.cs
@@ -1,6 +1,10 @@
namespace EventBusKafka;
+// Abstracts how to connect to Kafka client
public class DefaultKafkaPersistentConnection
+ : IKafkaPersistentConnection
{
-
+ public void Dispose()
+ {
+ }
}
\ No newline at end of file
diff --git a/src/BuildingBlocks/EventBus/EventBusKafka/EventBusKafka.cs b/src/BuildingBlocks/EventBus/EventBusKafka/EventBusKafka.cs
index 70e0ff1b1..d9a26fb3d 100644
--- a/src/BuildingBlocks/EventBus/EventBusKafka/EventBusKafka.cs
+++ b/src/BuildingBlocks/EventBus/EventBusKafka/EventBusKafka.cs
@@ -1,6 +1,57 @@
namespace EventBusKafka;
-public class EventBusKafka
+public class EventBusKafka : IEventBus, IDisposable
{
+ const string BROKER_NAME = "eshop_event_bus";
+
+ private readonly IKafkaPersistentConnection _persistentConnection;
+ private readonly ILogger _logger;
+ private readonly IEventBusSubscriptionsManager _subsManager;
+ private readonly int _retryCount;
+
+ // Object that will be registered as singleton to each service on startup,
+ // which will be used to publish and subscribe to events.
+ public EventBusKafka(IKafkaPersistentConnection persistentConnection,
+ ILogger logger, IEventBusSubscriptionsManager subsManager, int retryCount = 5)
+ {
+ _persistentConnection = persistentConnection ?? throw new ArgumentNullException(nameof(persistentConnection));
+ _logger = logger ?? throw new ArgumentNullException(nameof(logger));
+ _subsManager = subsManager ?? new InMemoryEventBusSubscriptionsManager();
+ _retryCount = retryCount;
+ }
+
+ public void Publish(IntegrationEvent @event)
+ {
+ throw new NotImplementedException();
+ }
+
+ public void Subscribe() where T : IntegrationEvent where TH : IIntegrationEventHandler
+ {
+ throw new NotImplementedException();
+ }
+
+ public void SubscribeDynamic(string eventName) where TH : IDynamicIntegrationEventHandler
+ {
+ throw new NotImplementedException();
+ }
+
+ // Taken directly from rabbitMQ implementation
+ public void UnsubscribeDynamic | (string eventName) where TH : IDynamicIntegrationEventHandler
+ {
+ _subsManager.RemoveDynamicSubscription | (eventName);
+ }
+
+ // Taken directly from rabbitMQ implementation
+ public void Unsubscribe() where T : IntegrationEvent where TH : IIntegrationEventHandler
+ {
+ var eventName = _subsManager.GetEventKey();
+ _logger.LogInformation("Unsubscribing from event {EventName}", eventName);
+ _subsManager.RemoveSubscription();
+ }
+
+ public void Dispose()
+ {
+ _subsManager.Clear();
+ }
}
\ No newline at end of file
diff --git a/src/BuildingBlocks/EventBus/EventBusKafka/EventBusKafka.csproj b/src/BuildingBlocks/EventBus/EventBusKafka/EventBusKafka.csproj
index 28689f59d..c36791642 100644
--- a/src/BuildingBlocks/EventBus/EventBusKafka/EventBusKafka.csproj
+++ b/src/BuildingBlocks/EventBus/EventBusKafka/EventBusKafka.csproj
@@ -10,4 +10,15 @@
+
+
+
+
+
+
+
+ ..\..\..\..\..\..\.nuget\packages\microsoft.extensions.logging.abstractions\7.0.0\lib\net7.0\Microsoft.Extensions.Logging.Abstractions.dll
+
+
+
diff --git a/src/BuildingBlocks/EventBus/EventBusKafka/GlobalUsings.cs b/src/BuildingBlocks/EventBus/EventBusKafka/GlobalUsings.cs
index fd2a37b20..2928d3520 100644
--- a/src/BuildingBlocks/EventBus/EventBusKafka/GlobalUsings.cs
+++ b/src/BuildingBlocks/EventBus/EventBusKafka/GlobalUsings.cs
@@ -1,6 +1,13 @@
-namespace EventBusKafka;
-
-public class GlobalUsings
-{
-
-}
\ No newline at end of file
+global using Polly;
+global using Polly.Retry;
+global using System;
+global using System.IO;
+global using System.Net.Sockets;
+global using Microsoft.eShopOnContainers.BuildingBlocks.EventBus;
+global using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Abstractions;
+global using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Events;
+global using Microsoft.Extensions.Logging;
+global using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Extensions;
+global using System.Text;
+global using System.Threading.Tasks;
+global using System.Text.Json;
\ No newline at end of file
diff --git a/src/BuildingBlocks/EventBus/EventBusKafka/IKafkaPersistentConnection.cs b/src/BuildingBlocks/EventBus/EventBusKafka/IKafkaPersistentConnection.cs
index 512c94be9..94254c699 100644
--- a/src/BuildingBlocks/EventBus/EventBusKafka/IKafkaPersistentConnection.cs
+++ b/src/BuildingBlocks/EventBus/EventBusKafka/IKafkaPersistentConnection.cs
@@ -1,6 +1,6 @@
namespace EventBusKafka;
-public class IKafkaPersistentConnection
+public interface IKafkaPersistentConnection : IDisposable
{
}
\ No newline at end of file
|