diff --git a/src/BuildingBlocks/EventBus/EventBus/IEventBusSubscriptionsManager.cs b/src/BuildingBlocks/EventBus/EventBus/IEventBusSubscriptionsManager.cs index a140bfc75..119d58dff 100644 --- a/src/BuildingBlocks/EventBus/EventBus/IEventBusSubscriptionsManager.cs +++ b/src/BuildingBlocks/EventBus/EventBus/IEventBusSubscriptionsManager.cs @@ -4,6 +4,7 @@ public interface IEventBusSubscriptionsManager { bool IsEmpty { get; } event EventHandler OnEventRemoved; + void AddDynamicSubscription(string eventName) where TH : IDynamicIntegrationEventHandler; diff --git a/src/BuildingBlocks/EventBus/EventBus/InMemoryEventBusSubscriptionsManager.cs b/src/BuildingBlocks/EventBus/EventBus/InMemoryEventBusSubscriptionsManager.cs index 5de35b292..43b7ab81b 100644 --- a/src/BuildingBlocks/EventBus/EventBus/InMemoryEventBusSubscriptionsManager.cs +++ b/src/BuildingBlocks/EventBus/EventBus/InMemoryEventBusSubscriptionsManager.cs @@ -2,8 +2,6 @@ public partial class InMemoryEventBusSubscriptionsManager : IEventBusSubscriptionsManager { - - private readonly Dictionary> _handlers; private readonly List _eventTypes; @@ -17,7 +15,7 @@ public partial class InMemoryEventBusSubscriptionsManager : IEventBusSubscriptio public bool IsEmpty => _handlers is { Count: 0 }; public void Clear() => _handlers.Clear(); - + public void AddDynamicSubscription(string eventName) where TH : IDynamicIntegrationEventHandler { @@ -30,12 +28,17 @@ public partial class InMemoryEventBusSubscriptionsManager : IEventBusSubscriptio { var eventName = GetEventKey(); + Console.WriteLine($"Adding Handler: {typeof(TH)} for Event: {eventName}"); DoAddSubscription(typeof(TH), eventName, isDynamic: false); if (!_eventTypes.Contains(typeof(T))) { _eventTypes.Add(typeof(T)); } + + foreach (var h in _handlers) { + Console.WriteLine($"Handler has Subscription for: {h.Key}"); + } } private void DoAddSubscription(Type handlerType, string eventName, bool isDynamic) diff --git a/src/BuildingBlocks/EventBus/EventBusKafka/EventBusKafka.cs b/src/BuildingBlocks/EventBus/EventBusKafka/EventBusKafka.cs index f83f6d92b..881c0defa 100644 --- a/src/BuildingBlocks/EventBus/EventBusKafka/EventBusKafka.cs +++ b/src/BuildingBlocks/EventBus/EventBusKafka/EventBusKafka.cs @@ -26,7 +26,7 @@ public class EventBusKafka : IEventBus, IDisposable public void Publish(IntegrationEvent @event) { - var eventName = @event.GetType().Name.Replace(IntegrationEventSuffix, ""); + var eventName = @event.GetType().Name; var jsonMessage = JsonSerializer.Serialize(@event, @event.GetType()); // map Integration event to kafka message @@ -47,7 +47,12 @@ public class EventBusKafka : IEventBus, IDisposable _logger.LogInformation("Subscribing to event {EventName} with {EventHandler}", eventName, typeof(TH).GetGenericTypeName()); - _subsManager.AddSubscription(); + try { + _subsManager.AddSubscription(); + } catch (Exception e) + { + Console.WriteLine($"Failed to add subscription {eventName}, because: {e.Message}"); + } } public void SubscribeDynamic(string eventName) where TH : IDynamicIntegrationEventHandler @@ -73,4 +78,4 @@ public class EventBusKafka : IEventBus, IDisposable { _subsManager.Clear(); } -} \ No newline at end of file +} diff --git a/src/BuildingBlocks/EventBus/EventBusKafka/KafkaAdminBackgroundService.cs b/src/BuildingBlocks/EventBus/EventBusKafka/KafkaAdminBackgroundService.cs new file mode 100644 index 000000000..68d499e0f --- /dev/null +++ b/src/BuildingBlocks/EventBus/EventBusKafka/KafkaAdminBackgroundService.cs @@ -0,0 +1,43 @@ +using Confluent.Kafka.Admin; +using Microsoft.Extensions.Configuration; +using Microsoft.Extensions.Hosting; + +namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusKafka; + +public class KafkaAdminBackgroundService : BackgroundService +{ + private const string TopicName = "eshop_event_bus"; + private readonly ILogger _logger; + private readonly IConfiguration _configuration; + + public KafkaAdminBackgroundService( + IConfiguration configuration, + ILogger logger) + { + _configuration = configuration; + _logger = logger; + } + + protected override Task ExecuteAsync(CancellationToken stoppingToken) + { + return Task.Run(CreateTopics, stoppingToken); + } + + private async Task CreateTopics() + { + var adminConfig = new AdminClientConfig(); + _configuration.GetSection("Kafka:AdminSettings").Bind(adminConfig); + using (var adminClient = new AdminClientBuilder(adminConfig).Build()) + { + try + { + await adminClient.CreateTopicsAsync(new TopicSpecification[] { + new TopicSpecification { Name = TopicName, ReplicationFactor = 1, NumPartitions = 1 } }); + } + catch (CreateTopicsException e) + { + Console.WriteLine($"An error occured creating topic {e.Results[0].Topic}: {e.Results[0].Error.Reason}"); + } + } + } +} diff --git a/src/BuildingBlocks/EventBus/EventBusKafka/KafkaConsumerBackgroundService.cs b/src/BuildingBlocks/EventBus/EventBusKafka/KafkaConsumerBackgroundService.cs index 85744e0b9..473c3a0a2 100644 --- a/src/BuildingBlocks/EventBus/EventBusKafka/KafkaConsumerBackgroundService.cs +++ b/src/BuildingBlocks/EventBus/EventBusKafka/KafkaConsumerBackgroundService.cs @@ -1,4 +1,3 @@ -using System.Security.Cryptography; using Autofac; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.Hosting; @@ -46,8 +45,6 @@ public class KafkaConsumerBackgroundService : BackgroundService var eventName = consumeResult.Message.Key; var messageContent = consumeResult.Message.Value; - - Console.WriteLine($"Consumed event: {eventName}\n Content: {Utils.CalculateMd5Hash(messageContent)}"); if (!_subsManager.HasSubscriptionsForEvent(eventName)) { diff --git a/src/Services/Basket/Basket.API/Basket.API.csproj b/src/Services/Basket/Basket.API/Basket.API.csproj index 4d1cc625d..21fead841 100644 --- a/src/Services/Basket/Basket.API/Basket.API.csproj +++ b/src/Services/Basket/Basket.API/Basket.API.csproj @@ -16,7 +16,6 @@ - diff --git a/src/Services/Basket/Basket.API/Dockerfile.develop b/src/Services/Basket/Basket.API/Dockerfile.develop index 5fc9900be..cf5811bae 100644 --- a/src/Services/Basket/Basket.API/Dockerfile.develop +++ b/src/Services/Basket/Basket.API/Dockerfile.develop @@ -8,7 +8,7 @@ WORKDIR /src COPY ["BuildingBlocks/EventBus/EventBus/EventBus.csproj", "BuildingBlocks/EventBus/EventBus/"] COPY ["BuildingBlocks/EventBus/EventBusRabbitMQ/EventBusRabbitMQ.csproj", "BuildingBlocks/EventBus/EventBusRabbitMQ/"] -COPY ["BuildingBlocks/EventBus/EventBusKafka/EventBusKafka.csproj" "BuildingBlocks/EventBus/EvenBusKafka/"] +COPY ["BuildingBlocks/EventBus/EventBusKafka/EventBusKafka.csproj" "BuildingBlocks/EventBus/EventBusKafka/"] COPY ["BuildingBlocks/EventBus/EventBusServiceBus/EventBusServiceBus.csproj", "BuildingBlocks/EventBus/EventBusServiceBus/"] COPY ["Services/Basket/Basket.API/Basket.API.csproj", "Services/Basket/Basket.API/"] COPY ["NuGet.config", "NuGet.config"] diff --git a/src/Services/Basket/Basket.API/Startup.cs b/src/Services/Basket/Basket.API/Startup.cs index 0682f4b77..2e8b11c9c 100644 --- a/src/Services/Basket/Basket.API/Startup.cs +++ b/src/Services/Basket/Basket.API/Startup.cs @@ -262,6 +262,7 @@ public class Startup } else if (Configuration.GetValue("KafkaEnabled")) { + services.AddHostedService(); services.AddHostedService(); services.AddSingleton(); } diff --git a/src/Services/Basket/Basket.API/appsettings.Development.json b/src/Services/Basket/Basket.API/appsettings.Development.json index 9cdcf69b7..baacbecbe 100644 --- a/src/Services/Basket/Basket.API/appsettings.Development.json +++ b/src/Services/Basket/Basket.API/appsettings.Development.json @@ -16,12 +16,17 @@ "EventBusConnection": "localhost", "KafkaEnabled": true, "Kafka": { - "ProducerSettings": { + "AdminSettings": { "BootstrapServers": "broker:9092" }, + "ProducerSettings": { + "BootstrapServers": "broker:9092", + "AllowAutoCreateTopics": true + }, "ConsumerSettings": { "BootstrapServers": "broker:9092", - "GroupId": "basket-group-id" + "GroupId": "basket-group-id", + "AllowAutoCreateTopics": true } } } \ No newline at end of file diff --git a/src/Services/Basket/Basket.API/appsettings.json b/src/Services/Basket/Basket.API/appsettings.json index 848e51028..bc2b0bc8c 100644 --- a/src/Services/Basket/Basket.API/appsettings.json +++ b/src/Services/Basket/Basket.API/appsettings.json @@ -29,12 +29,17 @@ }, "KafkaEnabled": true, "Kafka": { - "ProducerSettings": { + "AdminSettings": { "BootstrapServers": "broker:9092" }, + "ProducerSettings": { + "BootstrapServers": "broker:9092", + "AllowAutoCreateTopics": true + }, "ConsumerSettings": { "BootstrapServers": "broker:9092", - "GroupId": "basket-group-id" + "GroupId": "basket-group-id", + "AllowAutoCreateTopics": true } } } diff --git a/src/Services/Catalog/Catalog.API/Catalog.API.csproj b/src/Services/Catalog/Catalog.API/Catalog.API.csproj index 362401a4d..fd5fc299f 100644 --- a/src/Services/Catalog/Catalog.API/Catalog.API.csproj +++ b/src/Services/Catalog/Catalog.API/Catalog.API.csproj @@ -78,6 +78,7 @@ + diff --git a/src/Services/Catalog/Catalog.API/Dockerfile.develop b/src/Services/Catalog/Catalog.API/Dockerfile.develop index 9aefa6eff..b7f911dd7 100644 --- a/src/Services/Catalog/Catalog.API/Dockerfile.develop +++ b/src/Services/Catalog/Catalog.API/Dockerfile.develop @@ -8,6 +8,7 @@ WORKDIR /src COPY ["BuildingBlocks/EventBus/EventBus/EventBus.csproj", "BuildingBlocks/EventBus/EventBus/"] COPY ["BuildingBlocks/EventBus/EventBusRabbitMQ/EventBusRabbitMQ.csproj", "BuildingBlocks/EventBus/EventBusRabbitMQ/"] +COPY ["BuildingBlocks/EventBus/EventBusKafka/EventBusKafka.csproj" "BuildingBlocks/EventBus/EventBusKafka/"] COPY ["BuildingBlocks/EventBus/EventBusServiceBus/EventBusServiceBus.csproj", "BuildingBlocks/EventBus/EventBusServiceBus/"] COPY ["BuildingBlocks/EventBus/IntegrationEventLogEF/IntegrationEventLogEF.csproj", "BuildingBlocks/EventBus/IntegrationEventLogEF/"] COPY ["BuildingBlocks/WebHostCustomization/WebHost.Customization/WebHost.Customization.csproj", "BuildingBlocks/WebHostCustomization/WebHost.Customization/"] diff --git a/src/Services/Catalog/Catalog.API/GlobalUsings.cs b/src/Services/Catalog/Catalog.API/GlobalUsings.cs index 48641cc80..dc6664f0f 100644 --- a/src/Services/Catalog/Catalog.API/GlobalUsings.cs +++ b/src/Services/Catalog/Catalog.API/GlobalUsings.cs @@ -19,6 +19,7 @@ global using Microsoft.EntityFrameworkCore.Design; global using Microsoft.EntityFrameworkCore.Metadata.Builders; global using Microsoft.EntityFrameworkCore; global using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Abstractions; +global using Microsoft.eShopOnContainers.BuildingBlocks.EventBusKafka; global using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Events; global using Microsoft.eShopOnContainers.BuildingBlocks.IntegrationEventLogEF.Services; global using Microsoft.eShopOnContainers.BuildingBlocks.IntegrationEventLogEF.Utilities; diff --git a/src/Services/Catalog/Catalog.API/Startup.cs b/src/Services/Catalog/Catalog.API/Startup.cs index f7b46cb6f..6632d2695 100644 --- a/src/Services/Catalog/Catalog.API/Startup.cs +++ b/src/Services/Catalog/Catalog.API/Startup.cs @@ -1,3 +1,5 @@ +using Microsoft.eShopOnContainers.BuildingBlocks.EventBusKafka; + namespace Microsoft.eShopOnContainers.Services.Catalog.API; public class Startup @@ -154,6 +156,10 @@ public static class CustomExtensionMethods name: "catalog-servicebus-check", tags: new string[] { "servicebus" }); } + else if (configuration.GetValue("KafkaEnabled")) + { + // TODO: might want to add health check + } else { hcBuilder @@ -175,7 +181,7 @@ public static class CustomExtensionMethods sqlServerOptionsAction: sqlOptions => { sqlOptions.MigrationsAssembly(typeof(Startup).GetTypeInfo().Assembly.GetName().Name); - //Configuring Connection Resiliency: https://docs.microsoft.com/en-us/ef/core/miscellaneous/connection-resiliency + //Configuring Connection Resiliency: https://docs.microsoft.com/en-us/ef/core/miscellaneous/connection-resiliency sqlOptions.EnableRetryOnFailure(maxRetryCount: 15, maxRetryDelay: TimeSpan.FromSeconds(30), errorNumbersToAdd: null); }); }); @@ -186,7 +192,7 @@ public static class CustomExtensionMethods sqlServerOptionsAction: sqlOptions => { sqlOptions.MigrationsAssembly(typeof(Startup).GetTypeInfo().Assembly.GetName().Name); - //Configuring Connection Resiliency: https://docs.microsoft.com/en-us/ef/core/miscellaneous/connection-resiliency + //Configuring Connection Resiliency: https://docs.microsoft.com/en-us/ef/core/miscellaneous/connection-resiliency sqlOptions.EnableRetryOnFailure(maxRetryCount: 15, maxRetryDelay: TimeSpan.FromSeconds(30), errorNumbersToAdd: null); }); }); @@ -221,7 +227,7 @@ public static class CustomExtensionMethods public static IServiceCollection AddSwagger(this IServiceCollection services, IConfiguration configuration) { services.AddSwaggerGen(options => - { + { options.SwaggerDoc("v1", new OpenApiInfo { Title = "eShopOnContainers - Catalog HTTP API", @@ -251,6 +257,10 @@ public static class CustomExtensionMethods return new DefaultServiceBusPersisterConnection(serviceBusConnection); }); } + else if (configuration.GetValue("KafkaEnabled")) + { + services.AddSingleton(); + } else { services.AddSingleton(sp => @@ -304,6 +314,11 @@ public static class CustomExtensionMethods }); } + else if (configuration.GetValue("KafkaEnabled")) + { + services.AddHostedService(); + services.AddSingleton(); + } else { services.AddSingleton(sp => @@ -330,4 +345,4 @@ public static class CustomExtensionMethods return services; } -} \ No newline at end of file +} diff --git a/src/Services/Catalog/Catalog.API/appsettings.Development.json b/src/Services/Catalog/Catalog.API/appsettings.Development.json index 6154a50e6..eaf1ae0b9 100644 --- a/src/Services/Catalog/Catalog.API/appsettings.Development.json +++ b/src/Services/Catalog/Catalog.API/appsettings.Development.json @@ -12,13 +12,16 @@ } }, "EventBusConnection": "localhost", + "KafkaEnabled": true, "Kafka": { "ProducerSettings": { - "BootstrapServers": "localhost:9092" + "BootstrapServers": "broker:9092", + "AllowAutoCreateTopics": true }, "ConsumerSettings": { - "BootstrapServers": "localhost:9092", - "GroupId": "catalog-group-id" + "BootstrapServers": "broker:9092", + "GroupId": "catalog-group-id", + "AllowAutoCreateTopics": true } } } \ No newline at end of file diff --git a/src/Services/Catalog/Catalog.API/appsettings.json b/src/Services/Catalog/Catalog.API/appsettings.json index f8342fe8d..14288ccc4 100644 --- a/src/Services/Catalog/Catalog.API/appsettings.json +++ b/src/Services/Catalog/Catalog.API/appsettings.json @@ -24,7 +24,18 @@ "Name": "eshop", "ClientId": "your-client-id", "ClientSecret": "your-client-secret" + }, + "KafkaEnabled": true, + "Kafka": { + "ProducerSettings": { + "BootstrapServers": "broker:9092", + "AllowAutoCreateTopics": true + }, + "ConsumerSettings": { + "BootstrapServers": "broker:9092", + "GroupId": "catalog-group-id", + "AllowAutoCreateTopics": true + } } - } diff --git a/src/Services/Ordering/Ordering.API/Dockerfile.develop b/src/Services/Ordering/Ordering.API/Dockerfile.develop index 718d01939..f0fd43dc7 100644 --- a/src/Services/Ordering/Ordering.API/Dockerfile.develop +++ b/src/Services/Ordering/Ordering.API/Dockerfile.develop @@ -6,6 +6,7 @@ EXPOSE 80 WORKDIR /src COPY ["BuildingBlocks/EventBus/EventBus/EventBus.csproj", "BuildingBlocks/EventBus/EventBus/"] +COPY "BuildingBlocks/EventBus/EventBusKafka/EventBusKafka.csproj" "BuildingBlocks/EventBus/EventBusKafka/EventBusKafka.csproj" COPY ["BuildingBlocks/EventBus/EventBusRabbitMQ/EventBusRabbitMQ.csproj", "BuildingBlocks/EventBus/EventBusRabbitMQ/"] COPY ["BuildingBlocks/EventBus/EventBusServiceBus/EventBusServiceBus.csproj", "BuildingBlocks/EventBus/EventBusServiceBus/"] COPY ["BuildingBlocks/EventBus/IntegrationEventLogEF/IntegrationEventLogEF.csproj", "BuildingBlocks/EventBus/IntegrationEventLogEF/"] diff --git a/src/Services/Ordering/Ordering.API/GlobalUsings.cs b/src/Services/Ordering/Ordering.API/GlobalUsings.cs index 434ef8273..bbe3d804f 100644 --- a/src/Services/Ordering/Ordering.API/GlobalUsings.cs +++ b/src/Services/Ordering/Ordering.API/GlobalUsings.cs @@ -27,6 +27,7 @@ global using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Abstractions; global using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Events; global using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Extensions; global using Microsoft.eShopOnContainers.BuildingBlocks.EventBus; +global using Microsoft.eShopOnContainers.BuildingBlocks.EventBusKafka; global using Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ; global using Microsoft.eShopOnContainers.BuildingBlocks.EventBusServiceBus; global using Microsoft.eShopOnContainers.BuildingBlocks.IntegrationEventLogEF.Services; diff --git a/src/Services/Ordering/Ordering.API/Ordering.API.csproj b/src/Services/Ordering/Ordering.API/Ordering.API.csproj index 7a5813400..b43fbd873 100644 --- a/src/Services/Ordering/Ordering.API/Ordering.API.csproj +++ b/src/Services/Ordering/Ordering.API/Ordering.API.csproj @@ -26,6 +26,7 @@ + diff --git a/src/Services/Ordering/Ordering.API/Startup.cs b/src/Services/Ordering/Ordering.API/Startup.cs index 95635e39d..af63eb18d 100644 --- a/src/Services/Ordering/Ordering.API/Startup.cs +++ b/src/Services/Ordering/Ordering.API/Startup.cs @@ -1,5 +1,6 @@ using Microsoft.AspNetCore.Authentication.Cookies; using Microsoft.AspNetCore.Authentication.OpenIdConnect; +using Microsoft.eShopOnContainers.BuildingBlocks.EventBusKafka; namespace Microsoft.eShopOnContainers.Services.Ordering.API; @@ -174,6 +175,10 @@ static class CustomExtensionsMethods name: "ordering-servicebus-check", tags: new string[] { "servicebus" }); } + else if (configuration.GetValue("KafkaEnabled")) + { + // TODO: might want to add health check + } else { hcBuilder @@ -206,7 +211,7 @@ static class CustomExtensionsMethods sqlServerOptionsAction: sqlOptions => { sqlOptions.MigrationsAssembly(typeof(Startup).GetTypeInfo().Assembly.GetName().Name); - //Configuring Connection Resiliency: https://docs.microsoft.com/en-us/ef/core/miscellaneous/connection-resiliency + //Configuring Connection Resiliency: https://docs.microsoft.com/en-us/ef/core/miscellaneous/connection-resiliency sqlOptions.EnableRetryOnFailure(maxRetryCount: 15, maxRetryDelay: TimeSpan.FromSeconds(30), errorNumbersToAdd: null); }); }); @@ -217,7 +222,7 @@ static class CustomExtensionsMethods public static IServiceCollection AddCustomSwagger(this IServiceCollection services, IConfiguration configuration) { services.AddSwaggerGen(options => - { + { options.SwaggerDoc("v1", new OpenApiInfo { Title = "eShopOnContainers - Ordering HTTP API", @@ -251,7 +256,7 @@ static class CustomExtensionsMethods public static IServiceCollection AddCustomIntegrations(this IServiceCollection services, IConfiguration configuration) { services.AddSingleton(); - // HACK: no auth + // HACK: no auth // services.AddTransient(); services.AddTransient(); services.AddTransient>( @@ -270,6 +275,10 @@ static class CustomExtensionsMethods return new DefaultServiceBusPersisterConnection(serviceBusConnectionString); }); } + else if (configuration.GetValue("KafkaEnabled")) + { + services.AddSingleton(); + } else { services.AddSingleton(sp => @@ -347,6 +356,11 @@ static class CustomExtensionsMethods eventBusSubcriptionsManager, iLifetimeScope, subscriptionName); }); } + else if (configuration.GetValue("KafkaEnabled")) + { + services.AddHostedService(); + services.AddSingleton(); + } else { services.AddSingleton(sp => @@ -401,14 +415,14 @@ static class CustomExtensionsMethods }); return services; } - + // HACK: no auth private class AddUserIdHeaderFilter : IOperationFilter { public void Apply(OpenApiOperation operation, OperationFilterContext context) { operation.Parameters ??= new List(); - + operation.Parameters.Add(new OpenApiParameter { Name = "user-id", @@ -417,4 +431,4 @@ static class CustomExtensionsMethods }); } } -} \ No newline at end of file +} diff --git a/src/Services/Ordering/Ordering.API/appsettings.json b/src/Services/Ordering/Ordering.API/appsettings.json index 9c7f07ec1..d57f795e6 100644 --- a/src/Services/Ordering/Ordering.API/appsettings.json +++ b/src/Services/Ordering/Ordering.API/appsettings.json @@ -23,6 +23,18 @@ }, "EventBusRetryCount": 5, "EventBusConnection": "localhost", + "KafkaEnabled": true, + "Kafka": { + "ProducerSettings": { + "BootstrapServers": "broker:9092", + "AllowAutoCreateTopics": true + }, + "ConsumerSettings": { + "BootstrapServers": "broker:9092", + "GroupId": "ordering-group-id", + "AllowAutoCreateTopics": true + } + }, "UseVault": false, "Vault": { "Name": "eshop", diff --git a/src/Services/Ordering/Ordering.BackgroundTasks/Extensions/CustomExtensionMethods.cs b/src/Services/Ordering/Ordering.BackgroundTasks/Extensions/CustomExtensionMethods.cs index 7010bbae8..f8cd9f236 100644 --- a/src/Services/Ordering/Ordering.BackgroundTasks/Extensions/CustomExtensionMethods.cs +++ b/src/Services/Ordering/Ordering.BackgroundTasks/Extensions/CustomExtensionMethods.cs @@ -2,6 +2,7 @@ using Azure.Messaging.ServiceBus; using Microsoft.eShopOnContainers.BuildingBlocks.EventBus; using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Abstractions; +using Microsoft.eShopOnContainers.BuildingBlocks.EventBusKafka; using Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ; using Microsoft.eShopOnContainers.BuildingBlocks.EventBusServiceBus; using Microsoft.Extensions.Configuration; @@ -34,6 +35,10 @@ namespace Ordering.BackgroundTasks.Extensions name: "orderingtask-servicebus-check", tags: new string[] { "servicebus" }); } + else if (configuration.GetValue("KafkaEnabled")) + { + // TODO: might want to add health check + } else { hcBuilder.AddRabbitMQ( @@ -69,6 +74,12 @@ namespace Ordering.BackgroundTasks.Extensions return new EventBusServiceBus(serviceBusPersisterConnection, logger, eventBusSubcriptionsManager, iLifetimeScope, subscriptionName); }); } + else if (configuration.GetValue("KafkaEnabled")) + { + services.AddSingleton(); + services.AddHostedService(); + services.AddSingleton(); + } else { services.AddSingleton(sp => diff --git a/src/Services/Ordering/Ordering.BackgroundTasks/Ordering.BackgroundTasks.csproj b/src/Services/Ordering/Ordering.BackgroundTasks/Ordering.BackgroundTasks.csproj index fe7da8680..ba89b27a4 100644 --- a/src/Services/Ordering/Ordering.BackgroundTasks/Ordering.BackgroundTasks.csproj +++ b/src/Services/Ordering/Ordering.BackgroundTasks/Ordering.BackgroundTasks.csproj @@ -16,7 +16,7 @@ - + @@ -28,6 +28,7 @@ + diff --git a/src/Services/Ordering/Ordering.BackgroundTasks/appsettings.Development.json b/src/Services/Ordering/Ordering.BackgroundTasks/appsettings.Development.json index 6fdbdfcc1..3085e5372 100644 --- a/src/Services/Ordering/Ordering.BackgroundTasks/appsettings.Development.json +++ b/src/Services/Ordering/Ordering.BackgroundTasks/appsettings.Development.json @@ -6,13 +6,16 @@ "Microsoft": "Information" } }, + "KafkaEnabled": true, "Kafka": { "ProducerSettings": { - "BootstrapServers": "localhost:9092" + "BootstrapServers": "broker:9092", + "AllowAutoCreateTopics": true }, "ConsumerSettings": { - "BootstrapServers": "localhost:9092", - "GroupId": "ordering-group-id" + "BootstrapServers": "broker:9092", + "GroupId": "ordering-background-group-id", + "AllowAutoCreateTopics": true } } } diff --git a/src/Services/Ordering/Ordering.BackgroundTasks/appsettings.json b/src/Services/Ordering/Ordering.BackgroundTasks/appsettings.json index 88e5d6858..f156b527a 100644 --- a/src/Services/Ordering/Ordering.BackgroundTasks/appsettings.json +++ b/src/Services/Ordering/Ordering.BackgroundTasks/appsettings.json @@ -22,5 +22,17 @@ "EventBusRetryCount": 5, "EventBusConnection": "", "EventBusUserName": "", - "EventBusPassword": "" + "EventBusPassword": "", + "KafkaEnabled": true, + "Kafka": { + "ProducerSettings": { + "BootstrapServers": "broker:9092", + "AllowAutoCreateTopics": true + }, + "ConsumerSettings": { + "BootstrapServers": "broker:9092", + "GroupId": "ordering-background-group-id", + "AllowAutoCreateTopics": true + } + } } \ No newline at end of file diff --git a/src/Services/Payment/Payment.API/Dockerfile.develop b/src/Services/Payment/Payment.API/Dockerfile.develop index 59ec117f4..904f7357b 100644 --- a/src/Services/Payment/Payment.API/Dockerfile.develop +++ b/src/Services/Payment/Payment.API/Dockerfile.develop @@ -7,6 +7,7 @@ EXPOSE 80 WORKDIR /src COPY ["BuildingBlocks/EventBus/EventBus/EventBus.csproj", "BuildingBlocks/EventBus/EventBus/"] COPY ["BuildingBlocks/EventBus/EventBusRabbitMQ/EventBusRabbitMQ.csproj", "BuildingBlocks/EventBus/EventBusRabbitMQ/"] +COPY ["BuildingBlocks/EventBus/EventBusKafka/EventBusKafka.csproj" "BuildingBlocks/EventBus/EventBusKafka/"] COPY ["BuildingBlocks/EventBus/EventBusServiceBus/EventBusServiceBus.csproj", "BuildingBlocks/EventBus/EventBusServiceBus/"] COPY ["BuildingBlocks/EventBus/IntegrationEventLogEF/IntegrationEventLogEF.csproj", "BuildingBlocks/EventBus/IntegrationEventLogEF/"] COPY ["Services/Payment/Payment.API/Payment.API.csproj", "Services/Payment/Payment.API/"] diff --git a/src/Services/Payment/Payment.API/GlobalUsings.cs b/src/Services/Payment/Payment.API/GlobalUsings.cs index e87eb53bb..729530ce3 100644 --- a/src/Services/Payment/Payment.API/GlobalUsings.cs +++ b/src/Services/Payment/Payment.API/GlobalUsings.cs @@ -10,6 +10,7 @@ global using Azure.Messaging.ServiceBus; global using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Abstractions; global using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Events; global using Microsoft.eShopOnContainers.BuildingBlocks.EventBus; +global using Microsoft.eShopOnContainers.BuildingBlocks.EventBusKafka; global using Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ; global using Microsoft.eShopOnContainers.BuildingBlocks.EventBusServiceBus; global using Microsoft.eShopOnContainers.Payment.API.IntegrationEvents.Events; diff --git a/src/Services/Payment/Payment.API/Payment.API.csproj b/src/Services/Payment/Payment.API/Payment.API.csproj index 02c255fa6..d3e2abab0 100644 --- a/src/Services/Payment/Payment.API/Payment.API.csproj +++ b/src/Services/Payment/Payment.API/Payment.API.csproj @@ -29,6 +29,7 @@ + diff --git a/src/Services/Payment/Payment.API/Startup.cs b/src/Services/Payment/Payment.API/Startup.cs index c880f2a75..7ac4ba5c7 100644 --- a/src/Services/Payment/Payment.API/Startup.cs +++ b/src/Services/Payment/Payment.API/Startup.cs @@ -1,3 +1,5 @@ +using Microsoft.eShopOnContainers.BuildingBlocks.EventBusKafka; + namespace Microsoft.eShopOnContainers.Payment.API; public class Startup @@ -27,6 +29,10 @@ public class Startup return new DefaultServiceBusPersisterConnection(serviceBusConnectionString); }); } + else if (Configuration.GetValue("KafkaEnabled")) + { + services.AddSingleton(); + } else { services.AddSingleton(sp => @@ -116,6 +122,11 @@ public class Startup eventBusSubcriptionsManager, iLifetimeScope, subscriptionName); }); } + else if (Configuration.GetValue("KafkaEnabled")) + { + services.AddHostedService(); + services.AddSingleton(); + } else { services.AddSingleton(sp => @@ -164,6 +175,10 @@ public static class CustomExtensionMethods name: "payment-servicebus-check", tags: new string[] { "servicebus" }); } + else if (configuration.GetValue("KafkaEnabled")) + { + // TODO: might want to add healthcheck topic + } else { hcBuilder diff --git a/src/Services/Payment/Payment.API/appsettings.Development.json b/src/Services/Payment/Payment.API/appsettings.Development.json index f8d627ace..c05c7149b 100644 --- a/src/Services/Payment/Payment.API/appsettings.Development.json +++ b/src/Services/Payment/Payment.API/appsettings.Development.json @@ -7,13 +7,16 @@ "Microsoft": "Information" } }, + "KafkaEnabled": true, "Kafka": { "ProducerSettings": { - "BootstrapServers": "localhost:9092" + "BootstrapServers": "broker:9092", + "AllowAutoCreateTopics": true }, "ConsumerSettings": { - "BootstrapServers": "localhost:9092", - "GroupId": "payment-group-id" + "BootstrapServers": "broker:9092", + "GroupId": "payment-group-id", + "AllowAutoCreateTopics": true } } } diff --git a/src/Services/Payment/Payment.API/appsettings.json b/src/Services/Payment/Payment.API/appsettings.json index 9964a8bd2..6e6b0abd6 100644 --- a/src/Services/Payment/Payment.API/appsettings.json +++ b/src/Services/Payment/Payment.API/appsettings.json @@ -17,5 +17,17 @@ "ApplicationInsights": { "InstrumentationKey": "" }, - "EventBusRetryCount": 5 + "EventBusRetryCount": 5, + "KafkaEnabled": true, + "Kafka": { + "ProducerSettings": { + "BootstrapServers": "broker:9092", + "AllowAutoCreateTopics": true + }, + "ConsumerSettings": { + "BootstrapServers": "broker:9092", + "GroupId": "payment-group-id", + "AllowAutoCreateTopics": true + } + } } diff --git a/src/docker-compose.yml b/src/docker-compose.yml index b689a095a..28754d609 100644 --- a/src/docker-compose.yml +++ b/src/docker-compose.yml @@ -52,6 +52,7 @@ services: depends_on: - sqldata - rabbitmq + - broker ordering-api: image: ${REGISTRY:-eshop}/ordering.api:${PLATFORM:-linux}-${TAG:-latest} @@ -61,6 +62,7 @@ services: depends_on: - sqldata - rabbitmq + - broker ordering-backgroundtasks: image: ${REGISTRY:-eshop}/ordering.backgroundtasks:${PLATFORM:-linux}-${TAG:-latest} @@ -70,6 +72,7 @@ services: depends_on: - sqldata - rabbitmq + - broker payment-api: image: ${REGISTRY:-eshop}/payment.api:${PLATFORM:-linux}-${TAG:-latest} @@ -78,6 +81,7 @@ services: dockerfile: Services/Payment/Payment.API/Dockerfile depends_on: - rabbitmq + - broker # webhooks-api: # image: ${REGISTRY:-eshop}/webhooks.api:${PLATFORM:-linux}-${TAG:-latest}