kct949 90b3c0a407
Pt/finish kafka (#5)
* Add kafka eventbus to Catalog, Ordering and Payment.

Only catalog is confirmed to work, I think the ordering background
service and payment are still subscribing to rabbitMQ for some reason
(needs more investigation)

* Fix kafka consumer group ids.

* Fix build problems (duplicate package + lower version)

All services seem to use Kafka now, but it seems like
kafka takes to long to start up and the services
fail to connect to the borkers (either we have to do
retries similar like rabbitmq) or we are going to
enforce the execution order in docker compose by
using something like healthchecks.

* Add kafka broker dependency for other services.

Still have bug in eventBus subscription because
for example the ordering service should
handle a UserCheckoutAccepted event put it does have
no subscription for it when such an event is
published to the Kafka topic:

```
src-ordering-api-1              | [15:04:42 WRN] No subscription for Kafka event: UserCheckoutAccepted
src-ordering-api-1              | Consumed event: UserCheckoutAccepted
src-ordering-api-1              |  Content: 632B63DB0CE145D499FE01904F76A475
```

* Add logging for subscription manager problem.

Seems like the subscription manager is not used
correctly in the kafka eventbus (two different objects?).

* add printing handlers

* actually trigger printing handlers

* add kafkapersistentconnection registration

* Revert "add kafkapersistentconnection registration"

This reverts commit 704ee3e36f09f3f3ad48057de31996654a8e3894.

* add allowAutoCreateTopics in consumers (different than default) and in producers (just to be explicit)

* register DefaultKafkaPersistentConnection in ordering.backgroundtasks

* remove noise in logs

* Make eventNames in kafka eventbus consistent.

Do not remove IntegrationEventSuffix, before this
change the subscription handlers and eventNames did not match.

* Create kafka admin background service to create empty topic.

We have to create the eshop_event_bus kafka topic on startup,
because otherwise the consumer of the microservices would fail.

---------

Co-authored-by: Philipp Theyssen <p.theyssen@gmail.com>
Co-authored-by: Philipp Theyssen <34607877+PTheyssen@users.noreply.github.com>
2023-03-15 12:00:20 +01:00

157 lines
6.8 KiB
C#

using Autofac;
using Azure.Messaging.ServiceBus;
using Microsoft.eShopOnContainers.BuildingBlocks.EventBus;
using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Abstractions;
using Microsoft.eShopOnContainers.BuildingBlocks.EventBusKafka;
using Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ;
using Microsoft.eShopOnContainers.BuildingBlocks.EventBusServiceBus;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Diagnostics.HealthChecks;
using Microsoft.Extensions.Logging;
using RabbitMQ.Client;
using Serilog;
namespace Ordering.BackgroundTasks.Extensions
{
public static class CustomExtensionMethods
{
public static IServiceCollection AddCustomHealthCheck(this IServiceCollection services, IConfiguration configuration)
{
var hcBuilder = services.AddHealthChecks();
hcBuilder.AddCheck("self", () => HealthCheckResult.Healthy());
hcBuilder.AddSqlServer(
configuration["ConnectionString"],
name: "OrderingTaskDB-check",
tags: new string[] { "orderingtaskdb" });
if (configuration.GetValue<bool>("AzureServiceBusEnabled"))
{
hcBuilder.AddAzureServiceBusTopic(
configuration["EventBusConnection"],
topicName: "eshop_event_bus",
name: "orderingtask-servicebus-check",
tags: new string[] { "servicebus" });
}
else if (configuration.GetValue<bool>("KafkaEnabled"))
{
// TODO: might want to add health check
}
else
{
hcBuilder.AddRabbitMQ(
$"amqp://{configuration["EventBusConnection"]}",
name: "orderingtask-rabbitmqbus-check",
tags: new string[] { "rabbitmqbus" });
}
return services;
}
public static IServiceCollection AddEventBus(this IServiceCollection services, IConfiguration configuration)
{
var subscriptionClientName = configuration["SubscriptionClientName"];
if (configuration.GetValue<bool>("AzureServiceBusEnabled"))
{
services.AddSingleton<IServiceBusPersisterConnection>(sp =>
{
var serviceBusConnectionString = configuration["EventBusConnection"];
return new DefaultServiceBusPersisterConnection(serviceBusConnectionString);
});
services.AddSingleton<IEventBus, EventBusServiceBus>(sp =>
{
var serviceBusPersisterConnection = sp.GetRequiredService<IServiceBusPersisterConnection>();
var iLifetimeScope = sp.GetRequiredService<ILifetimeScope>();
var logger = sp.GetRequiredService<ILogger<EventBusServiceBus>>();
var eventBusSubcriptionsManager = sp.GetRequiredService<IEventBusSubscriptionsManager>();
string subscriptionName = configuration["SubscriptionClientName"];
return new EventBusServiceBus(serviceBusPersisterConnection, logger, eventBusSubcriptionsManager, iLifetimeScope, subscriptionName);
});
}
else if (configuration.GetValue<bool>("KafkaEnabled"))
{
services.AddSingleton<IKafkaPersistentConnection, DefaultKafkaPersistentConnection>();
services.AddHostedService<KafkaConsumerBackgroundService>();
services.AddSingleton<IEventBus, EventBusKafka>();
}
else
{
services.AddSingleton<IRabbitMQPersistentConnection>(sp =>
{
var logger = sp.GetRequiredService<ILogger<DefaultRabbitMQPersistentConnection>>();
var factory = new ConnectionFactory()
{
HostName = configuration["EventBusConnection"],
DispatchConsumersAsync = true
};
if (!string.IsNullOrEmpty(configuration["EventBusUserName"]))
{
factory.UserName = configuration["EventBusUserName"];
}
if (!string.IsNullOrEmpty(configuration["EventBusPassword"]))
{
factory.Password = configuration["EventBusPassword"];
}
var retryCount = 5;
if (!string.IsNullOrEmpty(configuration["EventBusRetryCount"]))
{
retryCount = int.Parse(configuration["EventBusRetryCount"]);
}
return new DefaultRabbitMQPersistentConnection(factory, logger, retryCount);
});
services.AddSingleton<IEventBus, EventBusRabbitMQ>(sp =>
{
var rabbitMQPersistentConnection = sp.GetRequiredService<IRabbitMQPersistentConnection>();
var iLifetimeScope = sp.GetRequiredService<ILifetimeScope>();
var logger = sp.GetRequiredService<ILogger<EventBusRabbitMQ>>();
var eventBusSubcriptionsManager = sp.GetRequiredService<IEventBusSubscriptionsManager>();
var retryCount = 5;
if (!string.IsNullOrEmpty(configuration["EventBusRetryCount"]))
{
retryCount = int.Parse(configuration["EventBusRetryCount"]);
}
return new EventBusRabbitMQ(rabbitMQPersistentConnection, logger, iLifetimeScope, eventBusSubcriptionsManager, subscriptionClientName, retryCount);
});
}
services.AddSingleton<IEventBusSubscriptionsManager, InMemoryEventBusSubscriptionsManager>();
return services;
}
public static ILoggingBuilder UseSerilog(this ILoggingBuilder builder, IConfiguration configuration)
{
var seqServerUrl = configuration["Serilog:SeqServerUrl"];
var logstashUrl = configuration["Serilog:LogstashgUrl"];
Log.Logger = new LoggerConfiguration()
.MinimumLevel.Verbose()
.Enrich.WithProperty("ApplicationContext", Program.AppName)
.Enrich.FromLogContext()
.WriteTo.Console()
.WriteTo.Seq(string.IsNullOrWhiteSpace(seqServerUrl) ? "http://seq" : seqServerUrl)
.WriteTo.Http(string.IsNullOrWhiteSpace(logstashUrl) ? "http://logstash:8080" : logstashUrl,null)
.ReadFrom.Configuration(configuration)
.CreateLogger();
return builder;
}
}
}