|
|
@ -6,6 +6,7 @@ using Microsoft.eShopOnContainers.BuildingBlocks.IntegrationEventLogEF; |
|
|
|
using Microsoft.eShopOnContainers.BuildingBlocks.IntegrationEventLogEF.Services; |
|
|
|
using Microsoft.eShopOnContainers.BuildingBlocks.IntegrationEventLogEF.Utilities; |
|
|
|
using Microsoft.eShopOnContainers.Services.Ordering.Infrastructure; |
|
|
|
using Microsoft.Extensions.Logging; |
|
|
|
using System; |
|
|
|
using System.Data.Common; |
|
|
|
using System.Diagnostics; |
|
|
@ -21,24 +22,30 @@ namespace Ordering.API.Application.IntegrationEvents |
|
|
|
private readonly OrderingContext _orderingContext; |
|
|
|
private readonly IntegrationEventLogContext _eventLogContext; |
|
|
|
private readonly IIntegrationEventLogService _eventLogService; |
|
|
|
private readonly ILogger<OrderingIntegrationEventService> _logger; |
|
|
|
|
|
|
|
public OrderingIntegrationEventService(IEventBus eventBus, |
|
|
|
OrderingContext orderingContext, |
|
|
|
IntegrationEventLogContext eventLogContext, |
|
|
|
Func<DbConnection, IIntegrationEventLogService> integrationEventLogServiceFactory) |
|
|
|
Func<DbConnection, IIntegrationEventLogService> integrationEventLogServiceFactory, |
|
|
|
ILogger<OrderingIntegrationEventService> logger) |
|
|
|
{ |
|
|
|
_orderingContext = orderingContext ?? throw new ArgumentNullException(nameof(orderingContext)); |
|
|
|
_eventLogContext = eventLogContext ?? throw new ArgumentNullException(nameof(eventLogContext)); |
|
|
|
_integrationEventLogServiceFactory = integrationEventLogServiceFactory ?? throw new ArgumentNullException(nameof(integrationEventLogServiceFactory)); |
|
|
|
_eventBus = eventBus ?? throw new ArgumentNullException(nameof(eventBus)); |
|
|
|
_eventLogService = _integrationEventLogServiceFactory(_orderingContext.Database.GetDbConnection()); |
|
|
|
_logger = logger ?? throw new ArgumentNullException(nameof(logger)); |
|
|
|
} |
|
|
|
|
|
|
|
public async Task PublishEventsThroughEventBusAsync() |
|
|
|
{ |
|
|
|
var pendindLogEvents = await _eventLogService.RetrieveEventLogsPendingToPublishAsync(); |
|
|
|
|
|
|
|
foreach (var logEvt in pendindLogEvents) |
|
|
|
{ |
|
|
|
_logger.LogInformation("----- Publishing integration event {IntegrationEventId} ({@IntegrationEvent})", logEvt.EventId, logEvt); |
|
|
|
|
|
|
|
try |
|
|
|
{ |
|
|
|
await _eventLogService.MarkEventAsInProgressAsync(logEvt.EventId); |
|
|
@ -54,6 +61,8 @@ namespace Ordering.API.Application.IntegrationEvents |
|
|
|
|
|
|
|
public async Task AddAndSaveEventAsync(IntegrationEvent evt) |
|
|
|
{ |
|
|
|
_logger.LogInformation("----- Saving integration event {IntegrationEventId} to repository ({@IntegrationEvent})", evt.Id, evt); |
|
|
|
|
|
|
|
await _eventLogService.SaveEventAsync(evt, _orderingContext.GetCurrentTransaction.GetDbTransaction()); |
|
|
|
} |
|
|
|
} |
|
|
|