From 90b3c0a407481e21ef5a3eead0105d01d3b21070 Mon Sep 17 00:00:00 2001
From: kct949 <123660777+kct949@users.noreply.github.com>
Date: Wed, 15 Mar 2023 12:00:20 +0100
Subject: [PATCH] Pt/finish kafka (#5)
* Add kafka eventbus to Catalog, Ordering and Payment.
Only catalog is confirmed to work, I think the ordering background
service and payment are still subscribing to rabbitMQ for some reason
(needs more investigation)
* Fix kafka consumer group ids.
* Fix build problems (duplicate package + lower version)
All services seem to use Kafka now, but it seems like
kafka takes to long to start up and the services
fail to connect to the borkers (either we have to do
retries similar like rabbitmq) or we are going to
enforce the execution order in docker compose by
using something like healthchecks.
* Add kafka broker dependency for other services.
Still have bug in eventBus subscription because
for example the ordering service should
handle a UserCheckoutAccepted event put it does have
no subscription for it when such an event is
published to the Kafka topic:
```
src-ordering-api-1 | [15:04:42 WRN] No subscription for Kafka event: UserCheckoutAccepted
src-ordering-api-1 | Consumed event: UserCheckoutAccepted
src-ordering-api-1 | Content: 632B63DB0CE145D499FE01904F76A475
```
* Add logging for subscription manager problem.
Seems like the subscription manager is not used
correctly in the kafka eventbus (two different objects?).
* add printing handlers
* actually trigger printing handlers
* add kafkapersistentconnection registration
* Revert "add kafkapersistentconnection registration"
This reverts commit 704ee3e36f09f3f3ad48057de31996654a8e3894.
* add allowAutoCreateTopics in consumers (different than default) and in producers (just to be explicit)
* register DefaultKafkaPersistentConnection in ordering.backgroundtasks
* remove noise in logs
* Make eventNames in kafka eventbus consistent.
Do not remove IntegrationEventSuffix, before this
change the subscription handlers and eventNames did not match.
* Create kafka admin background service to create empty topic.
We have to create the eshop_event_bus kafka topic on startup,
because otherwise the consumer of the microservices would fail.
---------
Co-authored-by: Philipp Theyssen
Co-authored-by: Philipp Theyssen <34607877+PTheyssen@users.noreply.github.com>
---
.../EventBus/IEventBusSubscriptionsManager.cs | 1 +
.../InMemoryEventBusSubscriptionsManager.cs | 9 ++--
.../EventBus/EventBusKafka/EventBusKafka.cs | 11 +++--
.../KafkaAdminBackgroundService.cs | 43 +++++++++++++++++++
.../KafkaConsumerBackgroundService.cs | 3 --
.../Basket/Basket.API/Basket.API.csproj | 1 -
.../Basket/Basket.API/Dockerfile.develop | 2 +-
src/Services/Basket/Basket.API/Startup.cs | 1 +
.../Basket.API/appsettings.Development.json | 9 +++-
.../Basket/Basket.API/appsettings.json | 9 +++-
.../Catalog/Catalog.API/Catalog.API.csproj | 1 +
.../Catalog/Catalog.API/Dockerfile.develop | 1 +
.../Catalog/Catalog.API/GlobalUsings.cs | 1 +
src/Services/Catalog/Catalog.API/Startup.cs | 23 ++++++++--
.../Catalog.API/appsettings.Development.json | 9 ++--
.../Catalog/Catalog.API/appsettings.json | 13 +++++-
.../Ordering/Ordering.API/Dockerfile.develop | 1 +
.../Ordering/Ordering.API/GlobalUsings.cs | 1 +
.../Ordering/Ordering.API/Ordering.API.csproj | 1 +
src/Services/Ordering/Ordering.API/Startup.cs | 26 ++++++++---
.../Ordering/Ordering.API/appsettings.json | 12 ++++++
.../Extensions/CustomExtensionMethods.cs | 11 +++++
.../Ordering.BackgroundTasks.csproj | 3 +-
.../appsettings.Development.json | 9 ++--
.../Ordering.BackgroundTasks/appsettings.json | 14 +++++-
.../Payment/Payment.API/Dockerfile.develop | 1 +
.../Payment/Payment.API/GlobalUsings.cs | 1 +
.../Payment/Payment.API/Payment.API.csproj | 1 +
src/Services/Payment/Payment.API/Startup.cs | 15 +++++++
.../Payment.API/appsettings.Development.json | 9 ++--
.../Payment/Payment.API/appsettings.json | 14 +++++-
src/docker-compose.yml | 4 ++
32 files changed, 222 insertions(+), 38 deletions(-)
create mode 100644 src/BuildingBlocks/EventBus/EventBusKafka/KafkaAdminBackgroundService.cs
diff --git a/src/BuildingBlocks/EventBus/EventBus/IEventBusSubscriptionsManager.cs b/src/BuildingBlocks/EventBus/EventBus/IEventBusSubscriptionsManager.cs
index a140bfc75..119d58dff 100644
--- a/src/BuildingBlocks/EventBus/EventBus/IEventBusSubscriptionsManager.cs
+++ b/src/BuildingBlocks/EventBus/EventBus/IEventBusSubscriptionsManager.cs
@@ -4,6 +4,7 @@ public interface IEventBusSubscriptionsManager
{
bool IsEmpty { get; }
event EventHandler OnEventRemoved;
+
void AddDynamicSubscription(string eventName)
where TH : IDynamicIntegrationEventHandler;
diff --git a/src/BuildingBlocks/EventBus/EventBus/InMemoryEventBusSubscriptionsManager.cs b/src/BuildingBlocks/EventBus/EventBus/InMemoryEventBusSubscriptionsManager.cs
index 5de35b292..43b7ab81b 100644
--- a/src/BuildingBlocks/EventBus/EventBus/InMemoryEventBusSubscriptionsManager.cs
+++ b/src/BuildingBlocks/EventBus/EventBus/InMemoryEventBusSubscriptionsManager.cs
@@ -2,8 +2,6 @@
public partial class InMemoryEventBusSubscriptionsManager : IEventBusSubscriptionsManager
{
-
-
private readonly Dictionary> _handlers;
private readonly List _eventTypes;
@@ -17,7 +15,7 @@ public partial class InMemoryEventBusSubscriptionsManager : IEventBusSubscriptio
public bool IsEmpty => _handlers is { Count: 0 };
public void Clear() => _handlers.Clear();
-
+
public void AddDynamicSubscription(string eventName)
where TH : IDynamicIntegrationEventHandler
{
@@ -30,12 +28,17 @@ public partial class InMemoryEventBusSubscriptionsManager : IEventBusSubscriptio
{
var eventName = GetEventKey();
+ Console.WriteLine($"Adding Handler: {typeof(TH)} for Event: {eventName}");
DoAddSubscription(typeof(TH), eventName, isDynamic: false);
if (!_eventTypes.Contains(typeof(T)))
{
_eventTypes.Add(typeof(T));
}
+
+ foreach (var h in _handlers) {
+ Console.WriteLine($"Handler has Subscription for: {h.Key}");
+ }
}
private void DoAddSubscription(Type handlerType, string eventName, bool isDynamic)
diff --git a/src/BuildingBlocks/EventBus/EventBusKafka/EventBusKafka.cs b/src/BuildingBlocks/EventBus/EventBusKafka/EventBusKafka.cs
index f83f6d92b..881c0defa 100644
--- a/src/BuildingBlocks/EventBus/EventBusKafka/EventBusKafka.cs
+++ b/src/BuildingBlocks/EventBus/EventBusKafka/EventBusKafka.cs
@@ -26,7 +26,7 @@ public class EventBusKafka : IEventBus, IDisposable
public void Publish(IntegrationEvent @event)
{
- var eventName = @event.GetType().Name.Replace(IntegrationEventSuffix, "");
+ var eventName = @event.GetType().Name;
var jsonMessage = JsonSerializer.Serialize(@event, @event.GetType());
// map Integration event to kafka message
@@ -47,7 +47,12 @@ public class EventBusKafka : IEventBus, IDisposable
_logger.LogInformation("Subscribing to event {EventName} with {EventHandler}", eventName, typeof(TH).GetGenericTypeName());
- _subsManager.AddSubscription();
+ try {
+ _subsManager.AddSubscription();
+ } catch (Exception e)
+ {
+ Console.WriteLine($"Failed to add subscription {eventName}, because: {e.Message}");
+ }
}
public void SubscribeDynamic(string eventName) where TH : IDynamicIntegrationEventHandler
@@ -73,4 +78,4 @@ public class EventBusKafka : IEventBus, IDisposable
{
_subsManager.Clear();
}
-}
\ No newline at end of file
+}
diff --git a/src/BuildingBlocks/EventBus/EventBusKafka/KafkaAdminBackgroundService.cs b/src/BuildingBlocks/EventBus/EventBusKafka/KafkaAdminBackgroundService.cs
new file mode 100644
index 000000000..68d499e0f
--- /dev/null
+++ b/src/BuildingBlocks/EventBus/EventBusKafka/KafkaAdminBackgroundService.cs
@@ -0,0 +1,43 @@
+using Confluent.Kafka.Admin;
+using Microsoft.Extensions.Configuration;
+using Microsoft.Extensions.Hosting;
+
+namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusKafka;
+
+public class KafkaAdminBackgroundService : BackgroundService
+{
+ private const string TopicName = "eshop_event_bus";
+ private readonly ILogger _logger;
+ private readonly IConfiguration _configuration;
+
+ public KafkaAdminBackgroundService(
+ IConfiguration configuration,
+ ILogger logger)
+ {
+ _configuration = configuration;
+ _logger = logger;
+ }
+
+ protected override Task ExecuteAsync(CancellationToken stoppingToken)
+ {
+ return Task.Run(CreateTopics, stoppingToken);
+ }
+
+ private async Task CreateTopics()
+ {
+ var adminConfig = new AdminClientConfig();
+ _configuration.GetSection("Kafka:AdminSettings").Bind(adminConfig);
+ using (var adminClient = new AdminClientBuilder(adminConfig).Build())
+ {
+ try
+ {
+ await adminClient.CreateTopicsAsync(new TopicSpecification[] {
+ new TopicSpecification { Name = TopicName, ReplicationFactor = 1, NumPartitions = 1 } });
+ }
+ catch (CreateTopicsException e)
+ {
+ Console.WriteLine($"An error occured creating topic {e.Results[0].Topic}: {e.Results[0].Error.Reason}");
+ }
+ }
+ }
+}
diff --git a/src/BuildingBlocks/EventBus/EventBusKafka/KafkaConsumerBackgroundService.cs b/src/BuildingBlocks/EventBus/EventBusKafka/KafkaConsumerBackgroundService.cs
index 85744e0b9..473c3a0a2 100644
--- a/src/BuildingBlocks/EventBus/EventBusKafka/KafkaConsumerBackgroundService.cs
+++ b/src/BuildingBlocks/EventBus/EventBusKafka/KafkaConsumerBackgroundService.cs
@@ -1,4 +1,3 @@
-using System.Security.Cryptography;
using Autofac;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Hosting;
@@ -46,8 +45,6 @@ public class KafkaConsumerBackgroundService : BackgroundService
var eventName = consumeResult.Message.Key;
var messageContent = consumeResult.Message.Value;
-
- Console.WriteLine($"Consumed event: {eventName}\n Content: {Utils.CalculateMd5Hash(messageContent)}");
if (!_subsManager.HasSubscriptionsForEvent(eventName))
{
diff --git a/src/Services/Basket/Basket.API/Basket.API.csproj b/src/Services/Basket/Basket.API/Basket.API.csproj
index 4d1cc625d..21fead841 100644
--- a/src/Services/Basket/Basket.API/Basket.API.csproj
+++ b/src/Services/Basket/Basket.API/Basket.API.csproj
@@ -16,7 +16,6 @@
-
diff --git a/src/Services/Basket/Basket.API/Dockerfile.develop b/src/Services/Basket/Basket.API/Dockerfile.develop
index 5fc9900be..cf5811bae 100644
--- a/src/Services/Basket/Basket.API/Dockerfile.develop
+++ b/src/Services/Basket/Basket.API/Dockerfile.develop
@@ -8,7 +8,7 @@ WORKDIR /src
COPY ["BuildingBlocks/EventBus/EventBus/EventBus.csproj", "BuildingBlocks/EventBus/EventBus/"]
COPY ["BuildingBlocks/EventBus/EventBusRabbitMQ/EventBusRabbitMQ.csproj", "BuildingBlocks/EventBus/EventBusRabbitMQ/"]
-COPY ["BuildingBlocks/EventBus/EventBusKafka/EventBusKafka.csproj" "BuildingBlocks/EventBus/EvenBusKafka/"]
+COPY ["BuildingBlocks/EventBus/EventBusKafka/EventBusKafka.csproj" "BuildingBlocks/EventBus/EventBusKafka/"]
COPY ["BuildingBlocks/EventBus/EventBusServiceBus/EventBusServiceBus.csproj", "BuildingBlocks/EventBus/EventBusServiceBus/"]
COPY ["Services/Basket/Basket.API/Basket.API.csproj", "Services/Basket/Basket.API/"]
COPY ["NuGet.config", "NuGet.config"]
diff --git a/src/Services/Basket/Basket.API/Startup.cs b/src/Services/Basket/Basket.API/Startup.cs
index 0682f4b77..2e8b11c9c 100644
--- a/src/Services/Basket/Basket.API/Startup.cs
+++ b/src/Services/Basket/Basket.API/Startup.cs
@@ -262,6 +262,7 @@ public class Startup
}
else if (Configuration.GetValue("KafkaEnabled"))
{
+ services.AddHostedService();
services.AddHostedService();
services.AddSingleton();
}
diff --git a/src/Services/Basket/Basket.API/appsettings.Development.json b/src/Services/Basket/Basket.API/appsettings.Development.json
index 9cdcf69b7..baacbecbe 100644
--- a/src/Services/Basket/Basket.API/appsettings.Development.json
+++ b/src/Services/Basket/Basket.API/appsettings.Development.json
@@ -16,12 +16,17 @@
"EventBusConnection": "localhost",
"KafkaEnabled": true,
"Kafka": {
- "ProducerSettings": {
+ "AdminSettings": {
"BootstrapServers": "broker:9092"
},
+ "ProducerSettings": {
+ "BootstrapServers": "broker:9092",
+ "AllowAutoCreateTopics": true
+ },
"ConsumerSettings": {
"BootstrapServers": "broker:9092",
- "GroupId": "basket-group-id"
+ "GroupId": "basket-group-id",
+ "AllowAutoCreateTopics": true
}
}
}
\ No newline at end of file
diff --git a/src/Services/Basket/Basket.API/appsettings.json b/src/Services/Basket/Basket.API/appsettings.json
index 848e51028..bc2b0bc8c 100644
--- a/src/Services/Basket/Basket.API/appsettings.json
+++ b/src/Services/Basket/Basket.API/appsettings.json
@@ -29,12 +29,17 @@
},
"KafkaEnabled": true,
"Kafka": {
- "ProducerSettings": {
+ "AdminSettings": {
"BootstrapServers": "broker:9092"
},
+ "ProducerSettings": {
+ "BootstrapServers": "broker:9092",
+ "AllowAutoCreateTopics": true
+ },
"ConsumerSettings": {
"BootstrapServers": "broker:9092",
- "GroupId": "basket-group-id"
+ "GroupId": "basket-group-id",
+ "AllowAutoCreateTopics": true
}
}
}
diff --git a/src/Services/Catalog/Catalog.API/Catalog.API.csproj b/src/Services/Catalog/Catalog.API/Catalog.API.csproj
index 362401a4d..fd5fc299f 100644
--- a/src/Services/Catalog/Catalog.API/Catalog.API.csproj
+++ b/src/Services/Catalog/Catalog.API/Catalog.API.csproj
@@ -78,6 +78,7 @@
+
diff --git a/src/Services/Catalog/Catalog.API/Dockerfile.develop b/src/Services/Catalog/Catalog.API/Dockerfile.develop
index 9aefa6eff..b7f911dd7 100644
--- a/src/Services/Catalog/Catalog.API/Dockerfile.develop
+++ b/src/Services/Catalog/Catalog.API/Dockerfile.develop
@@ -8,6 +8,7 @@ WORKDIR /src
COPY ["BuildingBlocks/EventBus/EventBus/EventBus.csproj", "BuildingBlocks/EventBus/EventBus/"]
COPY ["BuildingBlocks/EventBus/EventBusRabbitMQ/EventBusRabbitMQ.csproj", "BuildingBlocks/EventBus/EventBusRabbitMQ/"]
+COPY ["BuildingBlocks/EventBus/EventBusKafka/EventBusKafka.csproj" "BuildingBlocks/EventBus/EventBusKafka/"]
COPY ["BuildingBlocks/EventBus/EventBusServiceBus/EventBusServiceBus.csproj", "BuildingBlocks/EventBus/EventBusServiceBus/"]
COPY ["BuildingBlocks/EventBus/IntegrationEventLogEF/IntegrationEventLogEF.csproj", "BuildingBlocks/EventBus/IntegrationEventLogEF/"]
COPY ["BuildingBlocks/WebHostCustomization/WebHost.Customization/WebHost.Customization.csproj", "BuildingBlocks/WebHostCustomization/WebHost.Customization/"]
diff --git a/src/Services/Catalog/Catalog.API/GlobalUsings.cs b/src/Services/Catalog/Catalog.API/GlobalUsings.cs
index 48641cc80..dc6664f0f 100644
--- a/src/Services/Catalog/Catalog.API/GlobalUsings.cs
+++ b/src/Services/Catalog/Catalog.API/GlobalUsings.cs
@@ -19,6 +19,7 @@ global using Microsoft.EntityFrameworkCore.Design;
global using Microsoft.EntityFrameworkCore.Metadata.Builders;
global using Microsoft.EntityFrameworkCore;
global using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Abstractions;
+global using Microsoft.eShopOnContainers.BuildingBlocks.EventBusKafka;
global using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Events;
global using Microsoft.eShopOnContainers.BuildingBlocks.IntegrationEventLogEF.Services;
global using Microsoft.eShopOnContainers.BuildingBlocks.IntegrationEventLogEF.Utilities;
diff --git a/src/Services/Catalog/Catalog.API/Startup.cs b/src/Services/Catalog/Catalog.API/Startup.cs
index f7b46cb6f..6632d2695 100644
--- a/src/Services/Catalog/Catalog.API/Startup.cs
+++ b/src/Services/Catalog/Catalog.API/Startup.cs
@@ -1,3 +1,5 @@
+using Microsoft.eShopOnContainers.BuildingBlocks.EventBusKafka;
+
namespace Microsoft.eShopOnContainers.Services.Catalog.API;
public class Startup
@@ -154,6 +156,10 @@ public static class CustomExtensionMethods
name: "catalog-servicebus-check",
tags: new string[] { "servicebus" });
}
+ else if (configuration.GetValue("KafkaEnabled"))
+ {
+ // TODO: might want to add health check
+ }
else
{
hcBuilder
@@ -175,7 +181,7 @@ public static class CustomExtensionMethods
sqlServerOptionsAction: sqlOptions =>
{
sqlOptions.MigrationsAssembly(typeof(Startup).GetTypeInfo().Assembly.GetName().Name);
- //Configuring Connection Resiliency: https://docs.microsoft.com/en-us/ef/core/miscellaneous/connection-resiliency
+ //Configuring Connection Resiliency: https://docs.microsoft.com/en-us/ef/core/miscellaneous/connection-resiliency
sqlOptions.EnableRetryOnFailure(maxRetryCount: 15, maxRetryDelay: TimeSpan.FromSeconds(30), errorNumbersToAdd: null);
});
});
@@ -186,7 +192,7 @@ public static class CustomExtensionMethods
sqlServerOptionsAction: sqlOptions =>
{
sqlOptions.MigrationsAssembly(typeof(Startup).GetTypeInfo().Assembly.GetName().Name);
- //Configuring Connection Resiliency: https://docs.microsoft.com/en-us/ef/core/miscellaneous/connection-resiliency
+ //Configuring Connection Resiliency: https://docs.microsoft.com/en-us/ef/core/miscellaneous/connection-resiliency
sqlOptions.EnableRetryOnFailure(maxRetryCount: 15, maxRetryDelay: TimeSpan.FromSeconds(30), errorNumbersToAdd: null);
});
});
@@ -221,7 +227,7 @@ public static class CustomExtensionMethods
public static IServiceCollection AddSwagger(this IServiceCollection services, IConfiguration configuration)
{
services.AddSwaggerGen(options =>
- {
+ {
options.SwaggerDoc("v1", new OpenApiInfo
{
Title = "eShopOnContainers - Catalog HTTP API",
@@ -251,6 +257,10 @@ public static class CustomExtensionMethods
return new DefaultServiceBusPersisterConnection(serviceBusConnection);
});
}
+ else if (configuration.GetValue("KafkaEnabled"))
+ {
+ services.AddSingleton();
+ }
else
{
services.AddSingleton(sp =>
@@ -304,6 +314,11 @@ public static class CustomExtensionMethods
});
}
+ else if (configuration.GetValue("KafkaEnabled"))
+ {
+ services.AddHostedService();
+ services.AddSingleton();
+ }
else
{
services.AddSingleton(sp =>
@@ -330,4 +345,4 @@ public static class CustomExtensionMethods
return services;
}
-}
\ No newline at end of file
+}
diff --git a/src/Services/Catalog/Catalog.API/appsettings.Development.json b/src/Services/Catalog/Catalog.API/appsettings.Development.json
index 6154a50e6..eaf1ae0b9 100644
--- a/src/Services/Catalog/Catalog.API/appsettings.Development.json
+++ b/src/Services/Catalog/Catalog.API/appsettings.Development.json
@@ -12,13 +12,16 @@
}
},
"EventBusConnection": "localhost",
+ "KafkaEnabled": true,
"Kafka": {
"ProducerSettings": {
- "BootstrapServers": "localhost:9092"
+ "BootstrapServers": "broker:9092",
+ "AllowAutoCreateTopics": true
},
"ConsumerSettings": {
- "BootstrapServers": "localhost:9092",
- "GroupId": "catalog-group-id"
+ "BootstrapServers": "broker:9092",
+ "GroupId": "catalog-group-id",
+ "AllowAutoCreateTopics": true
}
}
}
\ No newline at end of file
diff --git a/src/Services/Catalog/Catalog.API/appsettings.json b/src/Services/Catalog/Catalog.API/appsettings.json
index f8342fe8d..14288ccc4 100644
--- a/src/Services/Catalog/Catalog.API/appsettings.json
+++ b/src/Services/Catalog/Catalog.API/appsettings.json
@@ -24,7 +24,18 @@
"Name": "eshop",
"ClientId": "your-client-id",
"ClientSecret": "your-client-secret"
+ },
+ "KafkaEnabled": true,
+ "Kafka": {
+ "ProducerSettings": {
+ "BootstrapServers": "broker:9092",
+ "AllowAutoCreateTopics": true
+ },
+ "ConsumerSettings": {
+ "BootstrapServers": "broker:9092",
+ "GroupId": "catalog-group-id",
+ "AllowAutoCreateTopics": true
+ }
}
-
}
diff --git a/src/Services/Ordering/Ordering.API/Dockerfile.develop b/src/Services/Ordering/Ordering.API/Dockerfile.develop
index 718d01939..f0fd43dc7 100644
--- a/src/Services/Ordering/Ordering.API/Dockerfile.develop
+++ b/src/Services/Ordering/Ordering.API/Dockerfile.develop
@@ -6,6 +6,7 @@ EXPOSE 80
WORKDIR /src
COPY ["BuildingBlocks/EventBus/EventBus/EventBus.csproj", "BuildingBlocks/EventBus/EventBus/"]
+COPY "BuildingBlocks/EventBus/EventBusKafka/EventBusKafka.csproj" "BuildingBlocks/EventBus/EventBusKafka/EventBusKafka.csproj"
COPY ["BuildingBlocks/EventBus/EventBusRabbitMQ/EventBusRabbitMQ.csproj", "BuildingBlocks/EventBus/EventBusRabbitMQ/"]
COPY ["BuildingBlocks/EventBus/EventBusServiceBus/EventBusServiceBus.csproj", "BuildingBlocks/EventBus/EventBusServiceBus/"]
COPY ["BuildingBlocks/EventBus/IntegrationEventLogEF/IntegrationEventLogEF.csproj", "BuildingBlocks/EventBus/IntegrationEventLogEF/"]
diff --git a/src/Services/Ordering/Ordering.API/GlobalUsings.cs b/src/Services/Ordering/Ordering.API/GlobalUsings.cs
index 434ef8273..bbe3d804f 100644
--- a/src/Services/Ordering/Ordering.API/GlobalUsings.cs
+++ b/src/Services/Ordering/Ordering.API/GlobalUsings.cs
@@ -27,6 +27,7 @@ global using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Abstractions;
global using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Events;
global using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Extensions;
global using Microsoft.eShopOnContainers.BuildingBlocks.EventBus;
+global using Microsoft.eShopOnContainers.BuildingBlocks.EventBusKafka;
global using Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ;
global using Microsoft.eShopOnContainers.BuildingBlocks.EventBusServiceBus;
global using Microsoft.eShopOnContainers.BuildingBlocks.IntegrationEventLogEF.Services;
diff --git a/src/Services/Ordering/Ordering.API/Ordering.API.csproj b/src/Services/Ordering/Ordering.API/Ordering.API.csproj
index 7a5813400..b43fbd873 100644
--- a/src/Services/Ordering/Ordering.API/Ordering.API.csproj
+++ b/src/Services/Ordering/Ordering.API/Ordering.API.csproj
@@ -26,6 +26,7 @@
+
diff --git a/src/Services/Ordering/Ordering.API/Startup.cs b/src/Services/Ordering/Ordering.API/Startup.cs
index 95635e39d..af63eb18d 100644
--- a/src/Services/Ordering/Ordering.API/Startup.cs
+++ b/src/Services/Ordering/Ordering.API/Startup.cs
@@ -1,5 +1,6 @@
using Microsoft.AspNetCore.Authentication.Cookies;
using Microsoft.AspNetCore.Authentication.OpenIdConnect;
+using Microsoft.eShopOnContainers.BuildingBlocks.EventBusKafka;
namespace Microsoft.eShopOnContainers.Services.Ordering.API;
@@ -174,6 +175,10 @@ static class CustomExtensionsMethods
name: "ordering-servicebus-check",
tags: new string[] { "servicebus" });
}
+ else if (configuration.GetValue("KafkaEnabled"))
+ {
+ // TODO: might want to add health check
+ }
else
{
hcBuilder
@@ -206,7 +211,7 @@ static class CustomExtensionsMethods
sqlServerOptionsAction: sqlOptions =>
{
sqlOptions.MigrationsAssembly(typeof(Startup).GetTypeInfo().Assembly.GetName().Name);
- //Configuring Connection Resiliency: https://docs.microsoft.com/en-us/ef/core/miscellaneous/connection-resiliency
+ //Configuring Connection Resiliency: https://docs.microsoft.com/en-us/ef/core/miscellaneous/connection-resiliency
sqlOptions.EnableRetryOnFailure(maxRetryCount: 15, maxRetryDelay: TimeSpan.FromSeconds(30), errorNumbersToAdd: null);
});
});
@@ -217,7 +222,7 @@ static class CustomExtensionsMethods
public static IServiceCollection AddCustomSwagger(this IServiceCollection services, IConfiguration configuration)
{
services.AddSwaggerGen(options =>
- {
+ {
options.SwaggerDoc("v1", new OpenApiInfo
{
Title = "eShopOnContainers - Ordering HTTP API",
@@ -251,7 +256,7 @@ static class CustomExtensionsMethods
public static IServiceCollection AddCustomIntegrations(this IServiceCollection services, IConfiguration configuration)
{
services.AddSingleton();
- // HACK: no auth
+ // HACK: no auth
// services.AddTransient();
services.AddTransient();
services.AddTransient>(
@@ -270,6 +275,10 @@ static class CustomExtensionsMethods
return new DefaultServiceBusPersisterConnection(serviceBusConnectionString);
});
}
+ else if (configuration.GetValue("KafkaEnabled"))
+ {
+ services.AddSingleton();
+ }
else
{
services.AddSingleton(sp =>
@@ -347,6 +356,11 @@ static class CustomExtensionsMethods
eventBusSubcriptionsManager, iLifetimeScope, subscriptionName);
});
}
+ else if (configuration.GetValue("KafkaEnabled"))
+ {
+ services.AddHostedService();
+ services.AddSingleton();
+ }
else
{
services.AddSingleton(sp =>
@@ -401,14 +415,14 @@ static class CustomExtensionsMethods
});
return services;
}
-
+
// HACK: no auth
private class AddUserIdHeaderFilter : IOperationFilter
{
public void Apply(OpenApiOperation operation, OperationFilterContext context)
{
operation.Parameters ??= new List();
-
+
operation.Parameters.Add(new OpenApiParameter
{
Name = "user-id",
@@ -417,4 +431,4 @@ static class CustomExtensionsMethods
});
}
}
-}
\ No newline at end of file
+}
diff --git a/src/Services/Ordering/Ordering.API/appsettings.json b/src/Services/Ordering/Ordering.API/appsettings.json
index 9c7f07ec1..d57f795e6 100644
--- a/src/Services/Ordering/Ordering.API/appsettings.json
+++ b/src/Services/Ordering/Ordering.API/appsettings.json
@@ -23,6 +23,18 @@
},
"EventBusRetryCount": 5,
"EventBusConnection": "localhost",
+ "KafkaEnabled": true,
+ "Kafka": {
+ "ProducerSettings": {
+ "BootstrapServers": "broker:9092",
+ "AllowAutoCreateTopics": true
+ },
+ "ConsumerSettings": {
+ "BootstrapServers": "broker:9092",
+ "GroupId": "ordering-group-id",
+ "AllowAutoCreateTopics": true
+ }
+ },
"UseVault": false,
"Vault": {
"Name": "eshop",
diff --git a/src/Services/Ordering/Ordering.BackgroundTasks/Extensions/CustomExtensionMethods.cs b/src/Services/Ordering/Ordering.BackgroundTasks/Extensions/CustomExtensionMethods.cs
index 7010bbae8..f8cd9f236 100644
--- a/src/Services/Ordering/Ordering.BackgroundTasks/Extensions/CustomExtensionMethods.cs
+++ b/src/Services/Ordering/Ordering.BackgroundTasks/Extensions/CustomExtensionMethods.cs
@@ -2,6 +2,7 @@
using Azure.Messaging.ServiceBus;
using Microsoft.eShopOnContainers.BuildingBlocks.EventBus;
using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Abstractions;
+using Microsoft.eShopOnContainers.BuildingBlocks.EventBusKafka;
using Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ;
using Microsoft.eShopOnContainers.BuildingBlocks.EventBusServiceBus;
using Microsoft.Extensions.Configuration;
@@ -34,6 +35,10 @@ namespace Ordering.BackgroundTasks.Extensions
name: "orderingtask-servicebus-check",
tags: new string[] { "servicebus" });
}
+ else if (configuration.GetValue("KafkaEnabled"))
+ {
+ // TODO: might want to add health check
+ }
else
{
hcBuilder.AddRabbitMQ(
@@ -69,6 +74,12 @@ namespace Ordering.BackgroundTasks.Extensions
return new EventBusServiceBus(serviceBusPersisterConnection, logger, eventBusSubcriptionsManager, iLifetimeScope, subscriptionName);
});
}
+ else if (configuration.GetValue("KafkaEnabled"))
+ {
+ services.AddSingleton();
+ services.AddHostedService();
+ services.AddSingleton();
+ }
else
{
services.AddSingleton(sp =>
diff --git a/src/Services/Ordering/Ordering.BackgroundTasks/Ordering.BackgroundTasks.csproj b/src/Services/Ordering/Ordering.BackgroundTasks/Ordering.BackgroundTasks.csproj
index fe7da8680..ba89b27a4 100644
--- a/src/Services/Ordering/Ordering.BackgroundTasks/Ordering.BackgroundTasks.csproj
+++ b/src/Services/Ordering/Ordering.BackgroundTasks/Ordering.BackgroundTasks.csproj
@@ -16,7 +16,7 @@
-
+
@@ -28,6 +28,7 @@
+
diff --git a/src/Services/Ordering/Ordering.BackgroundTasks/appsettings.Development.json b/src/Services/Ordering/Ordering.BackgroundTasks/appsettings.Development.json
index 6fdbdfcc1..3085e5372 100644
--- a/src/Services/Ordering/Ordering.BackgroundTasks/appsettings.Development.json
+++ b/src/Services/Ordering/Ordering.BackgroundTasks/appsettings.Development.json
@@ -6,13 +6,16 @@
"Microsoft": "Information"
}
},
+ "KafkaEnabled": true,
"Kafka": {
"ProducerSettings": {
- "BootstrapServers": "localhost:9092"
+ "BootstrapServers": "broker:9092",
+ "AllowAutoCreateTopics": true
},
"ConsumerSettings": {
- "BootstrapServers": "localhost:9092",
- "GroupId": "ordering-group-id"
+ "BootstrapServers": "broker:9092",
+ "GroupId": "ordering-background-group-id",
+ "AllowAutoCreateTopics": true
}
}
}
diff --git a/src/Services/Ordering/Ordering.BackgroundTasks/appsettings.json b/src/Services/Ordering/Ordering.BackgroundTasks/appsettings.json
index 88e5d6858..f156b527a 100644
--- a/src/Services/Ordering/Ordering.BackgroundTasks/appsettings.json
+++ b/src/Services/Ordering/Ordering.BackgroundTasks/appsettings.json
@@ -22,5 +22,17 @@
"EventBusRetryCount": 5,
"EventBusConnection": "",
"EventBusUserName": "",
- "EventBusPassword": ""
+ "EventBusPassword": "",
+ "KafkaEnabled": true,
+ "Kafka": {
+ "ProducerSettings": {
+ "BootstrapServers": "broker:9092",
+ "AllowAutoCreateTopics": true
+ },
+ "ConsumerSettings": {
+ "BootstrapServers": "broker:9092",
+ "GroupId": "ordering-background-group-id",
+ "AllowAutoCreateTopics": true
+ }
+ }
}
\ No newline at end of file
diff --git a/src/Services/Payment/Payment.API/Dockerfile.develop b/src/Services/Payment/Payment.API/Dockerfile.develop
index 59ec117f4..904f7357b 100644
--- a/src/Services/Payment/Payment.API/Dockerfile.develop
+++ b/src/Services/Payment/Payment.API/Dockerfile.develop
@@ -7,6 +7,7 @@ EXPOSE 80
WORKDIR /src
COPY ["BuildingBlocks/EventBus/EventBus/EventBus.csproj", "BuildingBlocks/EventBus/EventBus/"]
COPY ["BuildingBlocks/EventBus/EventBusRabbitMQ/EventBusRabbitMQ.csproj", "BuildingBlocks/EventBus/EventBusRabbitMQ/"]
+COPY ["BuildingBlocks/EventBus/EventBusKafka/EventBusKafka.csproj" "BuildingBlocks/EventBus/EventBusKafka/"]
COPY ["BuildingBlocks/EventBus/EventBusServiceBus/EventBusServiceBus.csproj", "BuildingBlocks/EventBus/EventBusServiceBus/"]
COPY ["BuildingBlocks/EventBus/IntegrationEventLogEF/IntegrationEventLogEF.csproj", "BuildingBlocks/EventBus/IntegrationEventLogEF/"]
COPY ["Services/Payment/Payment.API/Payment.API.csproj", "Services/Payment/Payment.API/"]
diff --git a/src/Services/Payment/Payment.API/GlobalUsings.cs b/src/Services/Payment/Payment.API/GlobalUsings.cs
index e87eb53bb..729530ce3 100644
--- a/src/Services/Payment/Payment.API/GlobalUsings.cs
+++ b/src/Services/Payment/Payment.API/GlobalUsings.cs
@@ -10,6 +10,7 @@ global using Azure.Messaging.ServiceBus;
global using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Abstractions;
global using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Events;
global using Microsoft.eShopOnContainers.BuildingBlocks.EventBus;
+global using Microsoft.eShopOnContainers.BuildingBlocks.EventBusKafka;
global using Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ;
global using Microsoft.eShopOnContainers.BuildingBlocks.EventBusServiceBus;
global using Microsoft.eShopOnContainers.Payment.API.IntegrationEvents.Events;
diff --git a/src/Services/Payment/Payment.API/Payment.API.csproj b/src/Services/Payment/Payment.API/Payment.API.csproj
index 02c255fa6..d3e2abab0 100644
--- a/src/Services/Payment/Payment.API/Payment.API.csproj
+++ b/src/Services/Payment/Payment.API/Payment.API.csproj
@@ -29,6 +29,7 @@
+
diff --git a/src/Services/Payment/Payment.API/Startup.cs b/src/Services/Payment/Payment.API/Startup.cs
index c880f2a75..7ac4ba5c7 100644
--- a/src/Services/Payment/Payment.API/Startup.cs
+++ b/src/Services/Payment/Payment.API/Startup.cs
@@ -1,3 +1,5 @@
+using Microsoft.eShopOnContainers.BuildingBlocks.EventBusKafka;
+
namespace Microsoft.eShopOnContainers.Payment.API;
public class Startup
@@ -27,6 +29,10 @@ public class Startup
return new DefaultServiceBusPersisterConnection(serviceBusConnectionString);
});
}
+ else if (Configuration.GetValue("KafkaEnabled"))
+ {
+ services.AddSingleton();
+ }
else
{
services.AddSingleton(sp =>
@@ -116,6 +122,11 @@ public class Startup
eventBusSubcriptionsManager, iLifetimeScope, subscriptionName);
});
}
+ else if (Configuration.GetValue("KafkaEnabled"))
+ {
+ services.AddHostedService();
+ services.AddSingleton();
+ }
else
{
services.AddSingleton(sp =>
@@ -164,6 +175,10 @@ public static class CustomExtensionMethods
name: "payment-servicebus-check",
tags: new string[] { "servicebus" });
}
+ else if (configuration.GetValue("KafkaEnabled"))
+ {
+ // TODO: might want to add healthcheck topic
+ }
else
{
hcBuilder
diff --git a/src/Services/Payment/Payment.API/appsettings.Development.json b/src/Services/Payment/Payment.API/appsettings.Development.json
index f8d627ace..c05c7149b 100644
--- a/src/Services/Payment/Payment.API/appsettings.Development.json
+++ b/src/Services/Payment/Payment.API/appsettings.Development.json
@@ -7,13 +7,16 @@
"Microsoft": "Information"
}
},
+ "KafkaEnabled": true,
"Kafka": {
"ProducerSettings": {
- "BootstrapServers": "localhost:9092"
+ "BootstrapServers": "broker:9092",
+ "AllowAutoCreateTopics": true
},
"ConsumerSettings": {
- "BootstrapServers": "localhost:9092",
- "GroupId": "payment-group-id"
+ "BootstrapServers": "broker:9092",
+ "GroupId": "payment-group-id",
+ "AllowAutoCreateTopics": true
}
}
}
diff --git a/src/Services/Payment/Payment.API/appsettings.json b/src/Services/Payment/Payment.API/appsettings.json
index 9964a8bd2..6e6b0abd6 100644
--- a/src/Services/Payment/Payment.API/appsettings.json
+++ b/src/Services/Payment/Payment.API/appsettings.json
@@ -17,5 +17,17 @@
"ApplicationInsights": {
"InstrumentationKey": ""
},
- "EventBusRetryCount": 5
+ "EventBusRetryCount": 5,
+ "KafkaEnabled": true,
+ "Kafka": {
+ "ProducerSettings": {
+ "BootstrapServers": "broker:9092",
+ "AllowAutoCreateTopics": true
+ },
+ "ConsumerSettings": {
+ "BootstrapServers": "broker:9092",
+ "GroupId": "payment-group-id",
+ "AllowAutoCreateTopics": true
+ }
+ }
}
diff --git a/src/docker-compose.yml b/src/docker-compose.yml
index b689a095a..28754d609 100644
--- a/src/docker-compose.yml
+++ b/src/docker-compose.yml
@@ -52,6 +52,7 @@ services:
depends_on:
- sqldata
- rabbitmq
+ - broker
ordering-api:
image: ${REGISTRY:-eshop}/ordering.api:${PLATFORM:-linux}-${TAG:-latest}
@@ -61,6 +62,7 @@ services:
depends_on:
- sqldata
- rabbitmq
+ - broker
ordering-backgroundtasks:
image: ${REGISTRY:-eshop}/ordering.backgroundtasks:${PLATFORM:-linux}-${TAG:-latest}
@@ -70,6 +72,7 @@ services:
depends_on:
- sqldata
- rabbitmq
+ - broker
payment-api:
image: ${REGISTRY:-eshop}/payment.api:${PLATFORM:-linux}-${TAG:-latest}
@@ -78,6 +81,7 @@ services:
dockerfile: Services/Payment/Payment.API/Dockerfile
depends_on:
- rabbitmq
+ - broker
# webhooks-api:
# image: ${REGISTRY:-eshop}/webhooks.api:${PLATFORM:-linux}-${TAG:-latest}
| | |