diff --git a/src/BuildingBlocks/EventBus/EventBusKafka/DefaultKafkaPersistentConnection.cs b/src/BuildingBlocks/EventBus/EventBusKafka/DefaultKafkaPersistentConnection.cs index 17278bf02..6a4e38df8 100644 --- a/src/BuildingBlocks/EventBus/EventBusKafka/DefaultKafkaPersistentConnection.cs +++ b/src/BuildingBlocks/EventBus/EventBusKafka/DefaultKafkaPersistentConnection.cs @@ -1,6 +1,10 @@ namespace EventBusKafka; +// Abstracts how to connect to Kafka client public class DefaultKafkaPersistentConnection + : IKafkaPersistentConnection { - + public void Dispose() + { + } } \ No newline at end of file diff --git a/src/BuildingBlocks/EventBus/EventBusKafka/EventBusKafka.cs b/src/BuildingBlocks/EventBus/EventBusKafka/EventBusKafka.cs index 70e0ff1b1..d9a26fb3d 100644 --- a/src/BuildingBlocks/EventBus/EventBusKafka/EventBusKafka.cs +++ b/src/BuildingBlocks/EventBus/EventBusKafka/EventBusKafka.cs @@ -1,6 +1,57 @@ namespace EventBusKafka; -public class EventBusKafka +public class EventBusKafka : IEventBus, IDisposable { + const string BROKER_NAME = "eshop_event_bus"; + + private readonly IKafkaPersistentConnection _persistentConnection; + private readonly ILogger _logger; + private readonly IEventBusSubscriptionsManager _subsManager; + private readonly int _retryCount; + + // Object that will be registered as singleton to each service on startup, + // which will be used to publish and subscribe to events. + public EventBusKafka(IKafkaPersistentConnection persistentConnection, + ILogger logger, IEventBusSubscriptionsManager subsManager, int retryCount = 5) + { + _persistentConnection = persistentConnection ?? throw new ArgumentNullException(nameof(persistentConnection)); + _logger = logger ?? throw new ArgumentNullException(nameof(logger)); + _subsManager = subsManager ?? new InMemoryEventBusSubscriptionsManager(); + _retryCount = retryCount; + } + + public void Publish(IntegrationEvent @event) + { + throw new NotImplementedException(); + } + + public void Subscribe() where T : IntegrationEvent where TH : IIntegrationEventHandler + { + throw new NotImplementedException(); + } + + public void SubscribeDynamic(string eventName) where TH : IDynamicIntegrationEventHandler + { + throw new NotImplementedException(); + } + + // Taken directly from rabbitMQ implementation + public void UnsubscribeDynamic(string eventName) where TH : IDynamicIntegrationEventHandler + { + _subsManager.RemoveDynamicSubscription(eventName); + } + + // Taken directly from rabbitMQ implementation + public void Unsubscribe() where T : IntegrationEvent where TH : IIntegrationEventHandler + { + var eventName = _subsManager.GetEventKey(); + _logger.LogInformation("Unsubscribing from event {EventName}", eventName); + _subsManager.RemoveSubscription(); + } + + public void Dispose() + { + _subsManager.Clear(); + } } \ No newline at end of file diff --git a/src/BuildingBlocks/EventBus/EventBusKafka/EventBusKafka.csproj b/src/BuildingBlocks/EventBus/EventBusKafka/EventBusKafka.csproj index 28689f59d..c36791642 100644 --- a/src/BuildingBlocks/EventBus/EventBusKafka/EventBusKafka.csproj +++ b/src/BuildingBlocks/EventBus/EventBusKafka/EventBusKafka.csproj @@ -10,4 +10,15 @@ + + + + + + + + ..\..\..\..\..\..\.nuget\packages\microsoft.extensions.logging.abstractions\7.0.0\lib\net7.0\Microsoft.Extensions.Logging.Abstractions.dll + + + diff --git a/src/BuildingBlocks/EventBus/EventBusKafka/GlobalUsings.cs b/src/BuildingBlocks/EventBus/EventBusKafka/GlobalUsings.cs index fd2a37b20..2928d3520 100644 --- a/src/BuildingBlocks/EventBus/EventBusKafka/GlobalUsings.cs +++ b/src/BuildingBlocks/EventBus/EventBusKafka/GlobalUsings.cs @@ -1,6 +1,13 @@ -namespace EventBusKafka; - -public class GlobalUsings -{ - -} \ No newline at end of file +global using Polly; +global using Polly.Retry; +global using System; +global using System.IO; +global using System.Net.Sockets; +global using Microsoft.eShopOnContainers.BuildingBlocks.EventBus; +global using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Abstractions; +global using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Events; +global using Microsoft.Extensions.Logging; +global using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Extensions; +global using System.Text; +global using System.Threading.Tasks; +global using System.Text.Json; \ No newline at end of file diff --git a/src/BuildingBlocks/EventBus/EventBusKafka/IKafkaPersistentConnection.cs b/src/BuildingBlocks/EventBus/EventBusKafka/IKafkaPersistentConnection.cs index 512c94be9..94254c699 100644 --- a/src/BuildingBlocks/EventBus/EventBusKafka/IKafkaPersistentConnection.cs +++ b/src/BuildingBlocks/EventBus/EventBusKafka/IKafkaPersistentConnection.cs @@ -1,6 +1,6 @@ namespace EventBusKafka; -public class IKafkaPersistentConnection +public interface IKafkaPersistentConnection : IDisposable { } \ No newline at end of file