Browse Source

Add integration event traces

pull/952/head
Miguel Veloso 6 years ago
parent
commit
7cd717cf06
1 changed files with 32 additions and 14 deletions
  1. +32
    -14
      src/Services/Catalog/Catalog.API/IntegrationEvents/CatalogIntegrationEventService.cs

+ 32
- 14
src/Services/Catalog/Catalog.API/IntegrationEvents/CatalogIntegrationEventService.cs View File

@ -5,6 +5,8 @@ using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Events;
using Microsoft.eShopOnContainers.BuildingBlocks.IntegrationEventLogEF.Services;
using Microsoft.eShopOnContainers.BuildingBlocks.IntegrationEventLogEF.Utilities;
using Microsoft.eShopOnContainers.Services.Catalog.API.Infrastructure;
using Microsoft.Extensions.Logging;
using Serilog.Context;
using System;
using System.Data.Common;
using System.Threading.Tasks;
@ -17,10 +19,15 @@ namespace Catalog.API.IntegrationEvents
private readonly IEventBus _eventBus;
private readonly CatalogContext _catalogContext;
private readonly IIntegrationEventLogService _eventLogService;
private readonly ILogger<CatalogIntegrationEventService> _logger;
public CatalogIntegrationEventService(IEventBus eventBus, CatalogContext catalogContext,
Func<DbConnection, IIntegrationEventLogService> integrationEventLogServiceFactory)
public CatalogIntegrationEventService(
ILogger<CatalogIntegrationEventService> logger,
IEventBus eventBus,
CatalogContext catalogContext,
Func<DbConnection, IIntegrationEventLogService> integrationEventLogServiceFactory)
{
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_catalogContext = catalogContext ?? throw new ArgumentNullException(nameof(catalogContext));
_integrationEventLogServiceFactory = integrationEventLogServiceFactory ?? throw new ArgumentNullException(nameof(integrationEventLogServiceFactory));
_eventBus = eventBus ?? throw new ArgumentNullException(nameof(eventBus));
@ -29,28 +36,39 @@ namespace Catalog.API.IntegrationEvents
public async Task PublishThroughEventBusAsync(IntegrationEvent evt)
{
try
using (LogContext.PushProperty("IntegrationEventId", evt.Id))
{
await _eventLogService.MarkEventAsInProgressAsync(evt.Id);
_eventBus.Publish(evt);
await _eventLogService.MarkEventAsPublishedAsync(evt.Id);
try
{
_logger.LogInformation("----- CatalogIntegrationEventService - Publishing integration event: {IntegrationEventId} ({@IntegrationEvent})", evt.Id, evt);
await _eventLogService.MarkEventAsInProgressAsync(evt.Id);
_eventBus.Publish(evt);
await _eventLogService.MarkEventAsPublishedAsync(evt.Id);
}
catch (Exception ex)
{
_logger.LogError(ex, "----- CatalogIntegrationEventService - ERROR Publishing integration event");
await _eventLogService.MarkEventAsFailedAsync(evt.Id);
}
}
catch (Exception)
{
await _eventLogService.MarkEventAsFailedAsync(evt.Id);
}
}
public async Task SaveEventAndCatalogContextChangesAsync(IntegrationEvent evt)
{
//Use of an EF Core resiliency strategy when using multiple DbContexts within an explicit BeginTransaction():
//See: https://docs.microsoft.com/en-us/ef/core/miscellaneous/connection-resiliency
await ResilientTransaction.New(_catalogContext)
.ExecuteAsync(async () => {
using (LogContext.PushProperty("IntegrationEventId", evt.Id))
{
_logger.LogInformation("----- CatalogIntegrationEventService - Saving changes and integrationEvent: {IntegrationEvent}", evt.Id);
//Use of an EF Core resiliency strategy when using multiple DbContexts within an explicit BeginTransaction():
//See: https://docs.microsoft.com/en-us/ef/core/miscellaneous/connection-resiliency
await ResilientTransaction.New(_catalogContext).ExecuteAsync(async () =>
{
// Achieving atomicity between original catalog database operation and the IntegrationEventLog thanks to a local transaction
await _catalogContext.SaveChangesAsync();
await _eventLogService.SaveEventAsync(evt, _catalogContext.Database.CurrentTransaction.GetDbTransaction());
});
}
}
}
}

Loading…
Cancel
Save