Start working subscribing with Kakfa and using it in Basket service.

This commit is contained in:
Philipp Theyssen 2023-02-15 11:38:34 +01:00
parent e1eac82289
commit 08b1404dd4
4 changed files with 39 additions and 3 deletions

View File

@ -14,13 +14,18 @@ public class DefaultKafkaPersistentConnection
: IKafkaPersistentConnection : IKafkaPersistentConnection
{ {
private readonly ILogger<DefaultKafkaPersistentConnection> _logger;
IProducer<byte[], byte[]> _kafkaProducer; IProducer<byte[], byte[]> _kafkaProducer;
public DefaultKafkaPersistentConnection(String brokerList) public DefaultKafkaPersistentConnection(String brokerList,
ILogger<DefaultKafkaPersistentConnection> logger)
{ {
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
// TODO: fix configuration passing for producer // TODO: fix configuration passing for producer
// for now just assume we give "localhost:9092" as argument // for now just assume we give "localhost:9092" as argument
var conf = new ProducerConfig { BootstrapServers = brokerList }; var conf = new ProducerConfig { BootstrapServers = brokerList };
// TODO maybe we need to retry this? -> as it could fail
_kafkaProducer = new ProducerBuilder<byte[], byte[]>(conf).Build(); _kafkaProducer = new ProducerBuilder<byte[], byte[]>(conf).Build();
} }

View File

@ -34,14 +34,19 @@ public class EventBusKafka : IEventBus, IDisposable
// map Integration event to kafka message // map Integration event to kafka message
// event name something like OrderPaymentSucceededIntegrationEvent // event name something like OrderPaymentSucceededIntegrationEvent
var message = new Message<string, string> { Key = eventName, Value = jsonMessage }; var message = new Message<string, string> { Key = eventName, Value = jsonMessage };
IProducer<string, string> kafkaHandle = var kafkaHandle =
new DependentProducerBuilder<string, string>(_persistentConnection.Handle).Build(); new DependentProducerBuilder<string, string>(_persistentConnection.Handle).Build();
kafkaHandle.ProduceAsync(_topicName, message); kafkaHandle.ProduceAsync(_topicName, message);
} }
public void Subscribe<T, TH>() where T : IntegrationEvent where TH : IIntegrationEventHandler<T> public void Subscribe<T, TH>() where T : IntegrationEvent where TH : IIntegrationEventHandler<T>
{ {
throw new NotImplementedException(); var eventName = _subsManager.GetEventKey<T>();
// DoInternalSubscription(eventName);
_logger.LogInformation("Subscribing to event {EventName} with {EventHandler}", eventName, typeof(TH).GetGenericTypeName());
_subsManager.AddSubscription<T, TH>();
} }
public void SubscribeDynamic<TH>(string eventName) where TH : IDynamicIntegrationEventHandler public void SubscribeDynamic<TH>(string eventName) where TH : IDynamicIntegrationEventHandler

View File

@ -54,6 +54,7 @@
</ItemGroup> </ItemGroup>
<ItemGroup> <ItemGroup>
<ProjectReference Include="..\..\..\BuildingBlocks\EventBus\EventBusKafka\EventBusKafka.csproj" />
<ProjectReference Include="..\..\..\BuildingBlocks\EventBus\EventBusRabbitMQ\EventBusRabbitMQ.csproj" /> <ProjectReference Include="..\..\..\BuildingBlocks\EventBus\EventBusRabbitMQ\EventBusRabbitMQ.csproj" />
<ProjectReference Include="..\..\..\BuildingBlocks\EventBus\EventBusServiceBus\EventBusServiceBus.csproj" /> <ProjectReference Include="..\..\..\BuildingBlocks\EventBus\EventBusServiceBus\EventBusServiceBus.csproj" />
<ProjectReference Include="..\..\..\BuildingBlocks\EventBus\EventBus\EventBus.csproj" /> <ProjectReference Include="..\..\..\BuildingBlocks\EventBus\EventBus\EventBus.csproj" />

View File

@ -1,3 +1,4 @@
using EventBusKafka;
using Microsoft.AspNetCore.Authentication.Cookies; using Microsoft.AspNetCore.Authentication.Cookies;
using Microsoft.AspNetCore.Authentication.OpenIdConnect; using Microsoft.AspNetCore.Authentication.OpenIdConnect;
@ -90,6 +91,16 @@ public class Startup
return new DefaultServiceBusPersisterConnection(serviceBusConnectionString); return new DefaultServiceBusPersisterConnection(serviceBusConnectionString);
}); });
} }
else if (Configuration.GetValue<bool>("KafkaEnabled"))
{
services.AddSingleton<IKafkaPersistentConnection>(sp =>
{
var logger = sp.GetRequiredService<ILogger<DefaultKafkaPersistentConnection>>();
// TODO add retry, better config passing here
return new DefaultKafkaPersistentConnection("localhost:9092",logger);
});
}
else else
{ {
services.AddSingleton<IRabbitMQPersistentConnection>(sp => services.AddSingleton<IRabbitMQPersistentConnection>(sp =>
@ -259,6 +270,20 @@ public class Startup
eventBusSubscriptionsManager, iLifetimeScope, subscriptionName); eventBusSubscriptionsManager, iLifetimeScope, subscriptionName);
}); });
} }
else if (Configuration.GetValue<bool>("KafkaEnabled"))
{
services.AddSingleton<IEventBus, EventBusKafka.EventBusKafka>(sp =>
{
var kafkaPersistentConnection = sp.GetRequiredService<IKafkaPersistentConnection>();
var logger = sp.GetRequiredService<ILogger<EventBusKafka.EventBusKafka>>();
var eventBusSubscriptionsManager = sp.GetRequiredService<IEventBusSubscriptionsManager>();
string subscriptionName = Configuration["SubscriptionClientName"];
// TODO fix namespace -> global using
return new global::EventBusKafka.EventBusKafka(kafkaPersistentConnection, logger,
eventBusSubscriptionsManager, 5);
});
}
else else
{ {
services.AddSingleton<IEventBus, EventBusRabbitMQ>(sp => services.AddSingleton<IEventBus, EventBusRabbitMQ>(sp =>