Browse Source

Add further details to Kafka eventbus skeleton.

So far only copied details from other eventbus implementations.
Key next steps are to implement a persistent connection abstraction (class)
for the Kafka eventbus and the publish and subscribe functions. For this
we need knowledge about how Kafka works, for example how one publishes
events topics etc.
pull/2068/head
Philipp Theyssen 2 years ago
parent
commit
30eb3a075e
5 changed files with 82 additions and 9 deletions
  1. +5
    -1
      src/BuildingBlocks/EventBus/EventBusKafka/DefaultKafkaPersistentConnection.cs
  2. +52
    -1
      src/BuildingBlocks/EventBus/EventBusKafka/EventBusKafka.cs
  3. +11
    -0
      src/BuildingBlocks/EventBus/EventBusKafka/EventBusKafka.csproj
  4. +13
    -6
      src/BuildingBlocks/EventBus/EventBusKafka/GlobalUsings.cs
  5. +1
    -1
      src/BuildingBlocks/EventBus/EventBusKafka/IKafkaPersistentConnection.cs

+ 5
- 1
src/BuildingBlocks/EventBus/EventBusKafka/DefaultKafkaPersistentConnection.cs View File

@ -1,6 +1,10 @@
namespace EventBusKafka; namespace EventBusKafka;
// Abstracts how to connect to Kafka client
public class DefaultKafkaPersistentConnection public class DefaultKafkaPersistentConnection
: IKafkaPersistentConnection
{ {
public void Dispose()
{
}
} }

+ 52
- 1
src/BuildingBlocks/EventBus/EventBusKafka/EventBusKafka.cs View File

@ -1,6 +1,57 @@
namespace EventBusKafka; namespace EventBusKafka;
public class EventBusKafka
public class EventBusKafka : IEventBus, IDisposable
{ {
const string BROKER_NAME = "eshop_event_bus";
private readonly IKafkaPersistentConnection _persistentConnection;
private readonly ILogger<EventBusKafka> _logger;
private readonly IEventBusSubscriptionsManager _subsManager;
private readonly int _retryCount;
// Object that will be registered as singleton to each service on startup,
// which will be used to publish and subscribe to events.
public EventBusKafka(IKafkaPersistentConnection persistentConnection,
ILogger<EventBusKafka> logger, IEventBusSubscriptionsManager subsManager, int retryCount = 5)
{
_persistentConnection = persistentConnection ?? throw new ArgumentNullException(nameof(persistentConnection));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_subsManager = subsManager ?? new InMemoryEventBusSubscriptionsManager();
_retryCount = retryCount;
}
public void Publish(IntegrationEvent @event)
{
throw new NotImplementedException();
}
public void Subscribe<T, TH>() where T : IntegrationEvent where TH : IIntegrationEventHandler<T>
{
throw new NotImplementedException();
}
public void SubscribeDynamic<TH>(string eventName) where TH : IDynamicIntegrationEventHandler
{
throw new NotImplementedException();
}
// Taken directly from rabbitMQ implementation
public void UnsubscribeDynamic<TH>(string eventName) where TH : IDynamicIntegrationEventHandler
{
_subsManager.RemoveDynamicSubscription<TH>(eventName);
}
// Taken directly from rabbitMQ implementation
public void Unsubscribe<T, TH>() where T : IntegrationEvent where TH : IIntegrationEventHandler<T>
{
var eventName = _subsManager.GetEventKey<T>();
_logger.LogInformation("Unsubscribing from event {EventName}", eventName);
_subsManager.RemoveSubscription<T, TH>();
}
public void Dispose()
{
_subsManager.Clear();
}
} }

+ 11
- 0
src/BuildingBlocks/EventBus/EventBusKafka/EventBusKafka.csproj View File

@ -10,4 +10,15 @@
<ProjectReference Include="..\EventBus\EventBus.csproj" /> <ProjectReference Include="..\EventBus\EventBus.csproj" />
</ItemGroup> </ItemGroup>
<ItemGroup>
<PackageReference Include="Confluent.Kafka" Version="2.0.2" />
<PackageReference Include="Polly" Version="7.2.3" />
</ItemGroup>
<ItemGroup>
<Reference Include="Microsoft.Extensions.Logging.Abstractions">
<HintPath>..\..\..\..\..\..\.nuget\packages\microsoft.extensions.logging.abstractions\7.0.0\lib\net7.0\Microsoft.Extensions.Logging.Abstractions.dll</HintPath>
</Reference>
</ItemGroup>
</Project> </Project>

+ 13
- 6
src/BuildingBlocks/EventBus/EventBusKafka/GlobalUsings.cs View File

@ -1,6 +1,13 @@
namespace EventBusKafka;
public class GlobalUsings
{
}
global using Polly;
global using Polly.Retry;
global using System;
global using System.IO;
global using System.Net.Sockets;
global using Microsoft.eShopOnContainers.BuildingBlocks.EventBus;
global using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Abstractions;
global using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Events;
global using Microsoft.Extensions.Logging;
global using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Extensions;
global using System.Text;
global using System.Threading.Tasks;
global using System.Text.Json;

+ 1
- 1
src/BuildingBlocks/EventBus/EventBusKafka/IKafkaPersistentConnection.cs View File

@ -1,6 +1,6 @@
namespace EventBusKafka; namespace EventBusKafka;
public class IKafkaPersistentConnection
public interface IKafkaPersistentConnection : IDisposable
{ {
} }

Loading…
Cancel
Save