From 24bed0aa33a9c9cd6c6de16726200b299e954cbc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ram=C3=B3n=20Tom=C3=A1s?= Date: Thu, 11 Oct 2018 17:16:31 +0200 Subject: [PATCH] Send IntegrationEvents after committing transactions --- .../EventBusRabbitMQ/EventBusRabbitMQ.cs | 4 +- .../EventBusServiceBus/EventBusServiceBus.cs | 6 ++- .../IntegrationEventLogEF/EventStateEnum.cs | 5 +- .../IntegrationEventLogEntry.cs | 13 ++++- .../Services/IIntegrationEventLogService.cs | 5 +- .../Services/IntegrationEventLogService.cs | 47 +++++++++++++++++-- .../CatalogIntegrationEventService.cs | 13 +++-- src/Services/Catalog/Catalog.API/Startup.cs | 6 ++- .../Behaviors/TransactionBehaviour.cs | 9 +++- .../Commands/CreateOrderCommandHandler.cs | 13 ++++- .../OrderCancelledDomainEventHandler.cs | 2 +- ...dToAwaitingValidationDomainEventHandler.cs | 2 +- ...erStatusChangedToPaidDomainEventHandler.cs | 2 +- .../OrderShippedDomainEventHandler.cs | 2 +- ...egateWhenOrderStartedDomainEventHandler.cs | 2 +- ...angedToStockConfirmedDomainEventHandler.cs | 2 +- ...CheckoutAcceptedIntegrationEventHandler.cs | 14 ++---- .../IOrderingIntegrationEventService.cs | 3 +- .../OrderingIntegrationEventService.cs | 30 +++++++----- src/Services/Ordering/Ordering.API/Startup.cs | 13 ++++- 20 files changed, 144 insertions(+), 49 deletions(-) diff --git a/src/BuildingBlocks/EventBus/EventBusRabbitMQ/EventBusRabbitMQ.cs b/src/BuildingBlocks/EventBus/EventBusRabbitMQ/EventBusRabbitMQ.cs index 49a417635..a3b6437ef 100644 --- a/src/BuildingBlocks/EventBus/EventBusRabbitMQ/EventBusRabbitMQ.cs +++ b/src/BuildingBlocks/EventBus/EventBusRabbitMQ/EventBusRabbitMQ.cs @@ -217,14 +217,16 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ if (subscription.IsDynamic) { var handler = scope.ResolveOptional(subscription.HandlerType) as IDynamicIntegrationEventHandler; + if (handler == null) continue; dynamic eventData = JObject.Parse(message); await handler.Handle(eventData); } else { + var handler = scope.ResolveOptional(subscription.HandlerType); + if (handler == null) continue; var eventType = _subsManager.GetEventTypeByName(eventName); var integrationEvent = JsonConvert.DeserializeObject(message, eventType); - var handler = scope.ResolveOptional(subscription.HandlerType); var concreteType = typeof(IIntegrationEventHandler<>).MakeGenericType(eventType); await (Task)concreteType.GetMethod("Handle").Invoke(handler, new object[] { integrationEvent }); } diff --git a/src/BuildingBlocks/EventBus/EventBusServiceBus/EventBusServiceBus.cs b/src/BuildingBlocks/EventBus/EventBusServiceBus/EventBusServiceBus.cs index 2cd86669b..d16eb4625 100644 --- a/src/BuildingBlocks/EventBus/EventBusServiceBus/EventBusServiceBus.cs +++ b/src/BuildingBlocks/EventBus/EventBusServiceBus/EventBusServiceBus.cs @@ -163,14 +163,16 @@ if (subscription.IsDynamic) { var handler = scope.ResolveOptional(subscription.HandlerType) as IDynamicIntegrationEventHandler; + if (handler == null) continue; dynamic eventData = JObject.Parse(message); await handler.Handle(eventData); } else { - var eventType = _subsManager.GetEventTypeByName(eventName); - var integrationEvent = JsonConvert.DeserializeObject(message, eventType); var handler = scope.ResolveOptional(subscription.HandlerType); + if (handler == null) continue; + var eventType = _subsManager.GetEventTypeByName(eventName); + var integrationEvent = JsonConvert.DeserializeObject(message, eventType); var concreteType = typeof(IIntegrationEventHandler<>).MakeGenericType(eventType); await (Task)concreteType.GetMethod("Handle").Invoke(handler, new object[] { integrationEvent }); } diff --git a/src/BuildingBlocks/EventBus/IntegrationEventLogEF/EventStateEnum.cs b/src/BuildingBlocks/EventBus/IntegrationEventLogEF/EventStateEnum.cs index 3efb78e74..079cf7d7e 100644 --- a/src/BuildingBlocks/EventBus/IntegrationEventLogEF/EventStateEnum.cs +++ b/src/BuildingBlocks/EventBus/IntegrationEventLogEF/EventStateEnum.cs @@ -7,7 +7,8 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.IntegrationEventLogEF public enum EventStateEnum { NotPublished = 0, - Published = 1, - PublishedFailed = 2 + InProgress = 1, + Published = 2, + PublishedFailed = 3 } } diff --git a/src/BuildingBlocks/EventBus/IntegrationEventLogEF/IntegrationEventLogEntry.cs b/src/BuildingBlocks/EventBus/IntegrationEventLogEF/IntegrationEventLogEntry.cs index 3cab9e500..b570f4eb8 100644 --- a/src/BuildingBlocks/EventBus/IntegrationEventLogEF/IntegrationEventLogEntry.cs +++ b/src/BuildingBlocks/EventBus/IntegrationEventLogEF/IntegrationEventLogEntry.cs @@ -3,6 +3,8 @@ using System.Collections.Generic; using System.Text; using Newtonsoft.Json; using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Events; +using System.Linq; +using System.ComponentModel.DataAnnotations.Schema; namespace Microsoft.eShopOnContainers.BuildingBlocks.IntegrationEventLogEF { @@ -11,7 +13,7 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.IntegrationEventLogEF private IntegrationEventLogEntry() { } public IntegrationEventLogEntry(IntegrationEvent @event) { - EventId = @event.Id; + EventId = @event.Id; CreationTime = @event.CreationDate; EventTypeName = @event.GetType().FullName; Content = JsonConvert.SerializeObject(@event); @@ -20,9 +22,18 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.IntegrationEventLogEF } public Guid EventId { get; private set; } public string EventTypeName { get; private set; } + [NotMapped] + public string EventTypeShortName => EventTypeName.Split('.')?.Last(); + [NotMapped] + public IntegrationEvent IntegrationEvent { get; private set; } public EventStateEnum State { get; set; } public int TimesSent { get; set; } public DateTime CreationTime { get; private set; } public string Content { get; private set; } + + public void DeserializeJsonContent(Type type) + { + IntegrationEvent = JsonConvert.DeserializeObject(Content, type) as IntegrationEvent; + } } } diff --git a/src/BuildingBlocks/EventBus/IntegrationEventLogEF/Services/IIntegrationEventLogService.cs b/src/BuildingBlocks/EventBus/IntegrationEventLogEF/Services/IIntegrationEventLogService.cs index ed1f74616..6167d8ae8 100644 --- a/src/BuildingBlocks/EventBus/IntegrationEventLogEF/Services/IIntegrationEventLogService.cs +++ b/src/BuildingBlocks/EventBus/IntegrationEventLogEF/Services/IIntegrationEventLogService.cs @@ -9,7 +9,10 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.IntegrationEventLogEF.Servi { public interface IIntegrationEventLogService { + Task> RetrieveEventLogsPendingToPublishAsync(); Task SaveEventAsync(IntegrationEvent @event, DbTransaction transaction); - Task MarkEventAsPublishedAsync(IntegrationEvent @event); + Task MarkEventAsPublishedAsync(Guid eventId); + Task MarkEventAsInProgressAsync(Guid eventId); + Task MarkEventAsFailedAsync(Guid eventId); } } diff --git a/src/BuildingBlocks/EventBus/IntegrationEventLogEF/Services/IntegrationEventLogService.cs b/src/BuildingBlocks/EventBus/IntegrationEventLogEF/Services/IntegrationEventLogService.cs index a12309482..0063374c3 100644 --- a/src/BuildingBlocks/EventBus/IntegrationEventLogEF/Services/IntegrationEventLogService.cs +++ b/src/BuildingBlocks/EventBus/IntegrationEventLogEF/Services/IntegrationEventLogService.cs @@ -1,7 +1,11 @@ using Microsoft.EntityFrameworkCore; using Microsoft.EntityFrameworkCore.Diagnostics; +using Microsoft.eShopOnContainers.BuildingBlocks.EventBus; using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Events; +using Newtonsoft.Json; using System; +using System.Collections; +using System.Collections.Generic; using System.Data.Common; using System.Linq; using System.Threading.Tasks; @@ -10,12 +14,15 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.IntegrationEventLogEF.Servi { public class IntegrationEventLogService : IIntegrationEventLogService { + private readonly IEventBusSubscriptionsManager _subsManager; private readonly IntegrationEventLogContext _integrationEventLogContext; private readonly DbConnection _dbConnection; - public IntegrationEventLogService(DbConnection dbConnection) + public IntegrationEventLogService(IEventBusSubscriptionsManager subsManager, + DbConnection dbConnection) { _dbConnection = dbConnection ?? throw new ArgumentNullException(nameof(dbConnection)); + _subsManager = subsManager ?? throw new ArgumentNullException(nameof(subsManager)); _integrationEventLogContext = new IntegrationEventLogContext( new DbContextOptionsBuilder() .UseSqlServer(_dbConnection) @@ -23,6 +30,19 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.IntegrationEventLogEF.Servi .Options); } + public async Task> RetrieveEventLogsPendingToPublishAsync() + { + var eventLogsPendingToPublish = await _integrationEventLogContext.IntegrationEventLogs + .Where(e => e.State == EventStateEnum.NotPublished) + .OrderBy(o => o.CreationTime) + .ToListAsync(); + + eventLogsPendingToPublish.ForEach(evtLog => + evtLog.DeserializeJsonContent(_subsManager.GetEventTypeByName(evtLog.EventTypeShortName))); + + return eventLogsPendingToPublish; + } + public Task SaveEventAsync(IntegrationEvent @event, DbTransaction transaction) { if (transaction == null) @@ -38,11 +58,28 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.IntegrationEventLogEF.Servi return _integrationEventLogContext.SaveChangesAsync(); } - public Task MarkEventAsPublishedAsync(IntegrationEvent @event) + public Task MarkEventAsPublishedAsync(Guid eventId) + { + return UpdateEventStatus(eventId, EventStateEnum.Published); + } + + public Task MarkEventAsInProgressAsync(Guid eventId) { - var eventLogEntry = _integrationEventLogContext.IntegrationEventLogs.Single(ie => ie.EventId == @event.Id); - eventLogEntry.TimesSent++; - eventLogEntry.State = EventStateEnum.Published; + return UpdateEventStatus(eventId, EventStateEnum.InProgress); + } + + public Task MarkEventAsFailedAsync(Guid eventId) + { + return UpdateEventStatus(eventId, EventStateEnum.PublishedFailed); + } + + private Task UpdateEventStatus(Guid eventId, EventStateEnum status) + { + var eventLogEntry = _integrationEventLogContext.IntegrationEventLogs.Single(ie => ie.EventId == eventId); + eventLogEntry.State = status; + + if(status == EventStateEnum.InProgress) + eventLogEntry.TimesSent++; _integrationEventLogContext.IntegrationEventLogs.Update(eventLogEntry); diff --git a/src/Services/Catalog/Catalog.API/IntegrationEvents/CatalogIntegrationEventService.cs b/src/Services/Catalog/Catalog.API/IntegrationEvents/CatalogIntegrationEventService.cs index 1b82251e3..8c550bf27 100644 --- a/src/Services/Catalog/Catalog.API/IntegrationEvents/CatalogIntegrationEventService.cs +++ b/src/Services/Catalog/Catalog.API/IntegrationEvents/CatalogIntegrationEventService.cs @@ -29,9 +29,16 @@ namespace Catalog.API.IntegrationEvents public async Task PublishThroughEventBusAsync(IntegrationEvent evt) { - _eventBus.Publish(evt); - - await _eventLogService.MarkEventAsPublishedAsync(evt); + try + { + await _eventLogService.MarkEventAsInProgressAsync(evt.Id); + _eventBus.Publish(evt); + await _eventLogService.MarkEventAsPublishedAsync(evt.Id); + } + catch (Exception) + { + await _eventLogService.MarkEventAsFailedAsync(evt.Id); + } } public async Task SaveEventAndCatalogContextChangesAsync(IntegrationEvent evt) diff --git a/src/Services/Catalog/Catalog.API/Startup.cs b/src/Services/Catalog/Catalog.API/Startup.cs index 408f870af..9a8d33720 100644 --- a/src/Services/Catalog/Catalog.API/Startup.cs +++ b/src/Services/Catalog/Catalog.API/Startup.cs @@ -232,7 +232,11 @@ namespace Microsoft.eShopOnContainers.Services.Catalog.API public static IServiceCollection AddIntegrationServices(this IServiceCollection services, IConfiguration configuration) { services.AddTransient>( - sp => (DbConnection c) => new IntegrationEventLogService(c)); + sp => + { + var busMgr = sp.GetRequiredService(); + return (DbConnection c) => new IntegrationEventLogService(busMgr, c); + }); services.AddTransient(); diff --git a/src/Services/Ordering/Ordering.API/Application/Behaviors/TransactionBehaviour.cs b/src/Services/Ordering/Ordering.API/Application/Behaviors/TransactionBehaviour.cs index d728d83cf..6f9aed9e5 100644 --- a/src/Services/Ordering/Ordering.API/Application/Behaviors/TransactionBehaviour.cs +++ b/src/Services/Ordering/Ordering.API/Application/Behaviors/TransactionBehaviour.cs @@ -2,6 +2,7 @@ using Microsoft.EntityFrameworkCore; using Microsoft.eShopOnContainers.Services.Ordering.Infrastructure; using Microsoft.Extensions.Logging; +using Ordering.API.Application.IntegrationEvents; using System; using System.Collections.Generic; using System.Linq; @@ -14,10 +15,14 @@ namespace Ordering.API.Application.Behaviors { private readonly ILogger> _logger; private readonly OrderingContext _dbContext; + private readonly IOrderingIntegrationEventService _orderingIntegrationEventService; - public TransactionBehaviour(OrderingContext dbContext, ILogger> logger) + public TransactionBehaviour(OrderingContext dbContext, + IOrderingIntegrationEventService orderingIntegrationEventService, + ILogger> logger) { _dbContext = dbContext ?? throw new ArgumentException(nameof(OrderingContext)); + _orderingIntegrationEventService = orderingIntegrationEventService ?? throw new ArgumentException(nameof(orderingIntegrationEventService)); _logger = logger ?? throw new ArgumentException(nameof(ILogger)); } @@ -39,6 +44,8 @@ namespace Ordering.API.Application.Behaviors await _dbContext.CommitTransactionAsync(); _logger.LogInformation($"Committed transaction {typeof(TRequest).Name}"); + + await _orderingIntegrationEventService.PublishEventsThroughEventBusAsync(); }); return response; diff --git a/src/Services/Ordering/Ordering.API/Application/Commands/CreateOrderCommandHandler.cs b/src/Services/Ordering/Ordering.API/Application/Commands/CreateOrderCommandHandler.cs index e5f154c0c..9a3035d5c 100644 --- a/src/Services/Ordering/Ordering.API/Application/Commands/CreateOrderCommandHandler.cs +++ b/src/Services/Ordering/Ordering.API/Application/Commands/CreateOrderCommandHandler.cs @@ -1,6 +1,8 @@ namespace Microsoft.eShopOnContainers.Services.Ordering.API.Application.Commands { using Domain.AggregatesModel.OrderAggregate; + using global::Ordering.API.Application.IntegrationEvents; + using global::Ordering.API.Application.IntegrationEvents.Events; using MediatR; using Microsoft.eShopOnContainers.Services.Ordering.API.Infrastructure.Services; using Microsoft.eShopOnContainers.Services.Ordering.Infrastructure.Idempotency; @@ -15,17 +17,26 @@ private readonly IOrderRepository _orderRepository; private readonly IIdentityService _identityService; private readonly IMediator _mediator; + private readonly IOrderingIntegrationEventService _orderingIntegrationEventService; // Using DI to inject infrastructure persistence Repositories - public CreateOrderCommandHandler(IMediator mediator, IOrderRepository orderRepository, IIdentityService identityService) + public CreateOrderCommandHandler(IMediator mediator, + IOrderingIntegrationEventService orderingIntegrationEventService, + IOrderRepository orderRepository, + IIdentityService identityService) { _orderRepository = orderRepository ?? throw new ArgumentNullException(nameof(orderRepository)); _identityService = identityService ?? throw new ArgumentNullException(nameof(identityService)); _mediator = mediator ?? throw new ArgumentNullException(nameof(mediator)); + _orderingIntegrationEventService = orderingIntegrationEventService ?? throw new ArgumentNullException(nameof(orderingIntegrationEventService)); } public async Task Handle(CreateOrderCommand message, CancellationToken cancellationToken) { + // Add Integration event to clean the basket + var orderStartedIntegrationEvent = new OrderStartedIntegrationEvent(message.UserId); + await _orderingIntegrationEventService.AddAndSaveEventAsync(orderStartedIntegrationEvent); + // Add/Update the Buyer AggregateRoot // DDD patterns comment: Add child entities and value-objects through the Order Aggregate-Root // methods and constructor so validations, invariants and business logic diff --git a/src/Services/Ordering/Ordering.API/Application/DomainEventHandlers/OrderCancelled/OrderCancelledDomainEventHandler.cs b/src/Services/Ordering/Ordering.API/Application/DomainEventHandlers/OrderCancelled/OrderCancelledDomainEventHandler.cs index f8a7b06e5..32967f6a7 100644 --- a/src/Services/Ordering/Ordering.API/Application/DomainEventHandlers/OrderCancelled/OrderCancelledDomainEventHandler.cs +++ b/src/Services/Ordering/Ordering.API/Application/DomainEventHandlers/OrderCancelled/OrderCancelledDomainEventHandler.cs @@ -41,7 +41,7 @@ namespace Ordering.API.Application.DomainEventHandlers.OrderCancelled var buyer = await _buyerRepository.FindByIdAsync(order.GetBuyerId.Value.ToString()); var orderStatusChangedToCancelledIntegrationEvent = new OrderStatusChangedToCancelledIntegrationEvent(order.Id, order.OrderStatus.Name, buyer.Name); - await _orderingIntegrationEventService.PublishThroughEventBusAsync(orderStatusChangedToCancelledIntegrationEvent); + await _orderingIntegrationEventService.AddAndSaveEventAsync(orderStatusChangedToCancelledIntegrationEvent); } } } diff --git a/src/Services/Ordering/Ordering.API/Application/DomainEventHandlers/OrderGracePeriodConfirmed/OrderStatusChangedToAwaitingValidationDomainEventHandler.cs b/src/Services/Ordering/Ordering.API/Application/DomainEventHandlers/OrderGracePeriodConfirmed/OrderStatusChangedToAwaitingValidationDomainEventHandler.cs index 60efead1b..e1c54af4f 100644 --- a/src/Services/Ordering/Ordering.API/Application/DomainEventHandlers/OrderGracePeriodConfirmed/OrderStatusChangedToAwaitingValidationDomainEventHandler.cs +++ b/src/Services/Ordering/Ordering.API/Application/DomainEventHandlers/OrderGracePeriodConfirmed/OrderStatusChangedToAwaitingValidationDomainEventHandler.cs @@ -46,7 +46,7 @@ var orderStatusChangedToAwaitingValidationIntegrationEvent = new OrderStatusChangedToAwaitingValidationIntegrationEvent( order.Id, order.OrderStatus.Name, buyer.Name, orderStockList); - await _orderingIntegrationEventService.PublishThroughEventBusAsync(orderStatusChangedToAwaitingValidationIntegrationEvent); + await _orderingIntegrationEventService.AddAndSaveEventAsync(orderStatusChangedToAwaitingValidationIntegrationEvent); } } } \ No newline at end of file diff --git a/src/Services/Ordering/Ordering.API/Application/DomainEventHandlers/OrderPaid/OrderStatusChangedToPaidDomainEventHandler.cs b/src/Services/Ordering/Ordering.API/Application/DomainEventHandlers/OrderPaid/OrderStatusChangedToPaidDomainEventHandler.cs index 59c1e4708..d3dca202f 100644 --- a/src/Services/Ordering/Ordering.API/Application/DomainEventHandlers/OrderPaid/OrderStatusChangedToPaidDomainEventHandler.cs +++ b/src/Services/Ordering/Ordering.API/Application/DomainEventHandlers/OrderPaid/OrderStatusChangedToPaidDomainEventHandler.cs @@ -51,7 +51,7 @@ buyer.Name, orderStockList); - await _orderingIntegrationEventService.PublishThroughEventBusAsync(orderStatusChangedToPaidIntegrationEvent); + await _orderingIntegrationEventService.AddAndSaveEventAsync(orderStatusChangedToPaidIntegrationEvent); } } } \ No newline at end of file diff --git a/src/Services/Ordering/Ordering.API/Application/DomainEventHandlers/OrderShipped/OrderShippedDomainEventHandler.cs b/src/Services/Ordering/Ordering.API/Application/DomainEventHandlers/OrderShipped/OrderShippedDomainEventHandler.cs index 0bf4cabcd..3be83a2ae 100644 --- a/src/Services/Ordering/Ordering.API/Application/DomainEventHandlers/OrderShipped/OrderShippedDomainEventHandler.cs +++ b/src/Services/Ordering/Ordering.API/Application/DomainEventHandlers/OrderShipped/OrderShippedDomainEventHandler.cs @@ -41,7 +41,7 @@ namespace Ordering.API.Application.DomainEventHandlers.OrderShipped var buyer = await _buyerRepository.FindByIdAsync(order.GetBuyerId.Value.ToString()); var orderStatusChangedToShippedIntegrationEvent = new OrderStatusChangedToShippedIntegrationEvent(order.Id, order.OrderStatus.Name, buyer.Name); - await _orderingIntegrationEventService.PublishThroughEventBusAsync(orderStatusChangedToShippedIntegrationEvent); + await _orderingIntegrationEventService.AddAndSaveEventAsync(orderStatusChangedToShippedIntegrationEvent); } } } diff --git a/src/Services/Ordering/Ordering.API/Application/DomainEventHandlers/OrderStartedEvent/ValidateOrAddBuyerAggregateWhenOrderStartedDomainEventHandler.cs b/src/Services/Ordering/Ordering.API/Application/DomainEventHandlers/OrderStartedEvent/ValidateOrAddBuyerAggregateWhenOrderStartedDomainEventHandler.cs index 0a8366893..99b2a21a0 100644 --- a/src/Services/Ordering/Ordering.API/Application/DomainEventHandlers/OrderStartedEvent/ValidateOrAddBuyerAggregateWhenOrderStartedDomainEventHandler.cs +++ b/src/Services/Ordering/Ordering.API/Application/DomainEventHandlers/OrderStartedEvent/ValidateOrAddBuyerAggregateWhenOrderStartedDomainEventHandler.cs @@ -59,7 +59,7 @@ namespace Ordering.API.Application.DomainEventHandlers.OrderStartedEvent .SaveEntitiesAsync(); var orderStatusChangedTosubmittedIntegrationEvent = new OrderStatusChangedToSubmittedIntegrationEvent(orderStartedEvent.Order.Id, orderStartedEvent.Order.OrderStatus.Name, buyer.Name); - await _orderingIntegrationEventService.PublishThroughEventBusAsync(orderStatusChangedTosubmittedIntegrationEvent); + await _orderingIntegrationEventService.AddAndSaveEventAsync(orderStatusChangedTosubmittedIntegrationEvent); _logger.CreateLogger(nameof(ValidateOrAddBuyerAggregateWhenOrderStartedDomainEventHandler)).LogTrace($"Buyer {buyerUpdated.Id} and related payment method were validated or updated for orderId: {orderStartedEvent.Order.Id}."); } diff --git a/src/Services/Ordering/Ordering.API/Application/DomainEventHandlers/OrderStockConfirmed/OrderStatusChangedToStockConfirmedDomainEventHandler.cs b/src/Services/Ordering/Ordering.API/Application/DomainEventHandlers/OrderStockConfirmed/OrderStatusChangedToStockConfirmedDomainEventHandler.cs index 910d764cf..e910964e8 100644 --- a/src/Services/Ordering/Ordering.API/Application/DomainEventHandlers/OrderStockConfirmed/OrderStatusChangedToStockConfirmedDomainEventHandler.cs +++ b/src/Services/Ordering/Ordering.API/Application/DomainEventHandlers/OrderStockConfirmed/OrderStatusChangedToStockConfirmedDomainEventHandler.cs @@ -41,7 +41,7 @@ var buyer = await _buyerRepository.FindByIdAsync(order.GetBuyerId.Value.ToString()); var orderStatusChangedToStockConfirmedIntegrationEvent = new OrderStatusChangedToStockConfirmedIntegrationEvent(order.Id, order.OrderStatus.Name, buyer.Name); - await _orderingIntegrationEventService.PublishThroughEventBusAsync(orderStatusChangedToStockConfirmedIntegrationEvent); + await _orderingIntegrationEventService.AddAndSaveEventAsync(orderStatusChangedToStockConfirmedIntegrationEvent); } } } \ No newline at end of file diff --git a/src/Services/Ordering/Ordering.API/Application/IntegrationEvents/EventHandling/UserCheckoutAcceptedIntegrationEventHandler.cs b/src/Services/Ordering/Ordering.API/Application/IntegrationEvents/EventHandling/UserCheckoutAcceptedIntegrationEventHandler.cs index f46c5683c..33f327c6b 100644 --- a/src/Services/Ordering/Ordering.API/Application/IntegrationEvents/EventHandling/UserCheckoutAcceptedIntegrationEventHandler.cs +++ b/src/Services/Ordering/Ordering.API/Application/IntegrationEvents/EventHandling/UserCheckoutAcceptedIntegrationEventHandler.cs @@ -11,15 +11,13 @@ namespace Ordering.API.Application.IntegrationEvents.EventHandling public class UserCheckoutAcceptedIntegrationEventHandler : IIntegrationEventHandler { private readonly IMediator _mediator; - private readonly ILoggerFactory _logger; - private readonly IOrderingIntegrationEventService _orderingIntegrationEventService; + private readonly ILoggerFactory _logger; public UserCheckoutAcceptedIntegrationEventHandler(IMediator mediator, - ILoggerFactory logger, IOrderingIntegrationEventService orderingIntegrationEventService) + ILoggerFactory logger) { _mediator = mediator ?? throw new ArgumentNullException(nameof(mediator)); - _logger = logger ?? throw new ArgumentNullException(nameof(logger)); - _orderingIntegrationEventService = orderingIntegrationEventService ?? throw new ArgumentNullException(nameof(orderingIntegrationEventService)); + _logger = logger ?? throw new ArgumentNullException(nameof(logger)); } /// @@ -34,11 +32,7 @@ namespace Ordering.API.Application.IntegrationEvents.EventHandling public async Task Handle(UserCheckoutAcceptedIntegrationEvent eventMsg) { var result = false; - - // Send Integration event to clean basket once basket is converted to Order and before starting with the order creation process - var orderStartedIntegrationEvent = new OrderStartedIntegrationEvent(eventMsg.UserId); - await _orderingIntegrationEventService.PublishThroughEventBusAsync(orderStartedIntegrationEvent); - + if (eventMsg.RequestId != Guid.Empty) { var createOrderCommand = new CreateOrderCommand(eventMsg.Basket.Items, eventMsg.UserId, eventMsg.UserName, eventMsg.City, eventMsg.Street, diff --git a/src/Services/Ordering/Ordering.API/Application/IntegrationEvents/IOrderingIntegrationEventService.cs b/src/Services/Ordering/Ordering.API/Application/IntegrationEvents/IOrderingIntegrationEventService.cs index 373bafaa5..05e8f0e4f 100644 --- a/src/Services/Ordering/Ordering.API/Application/IntegrationEvents/IOrderingIntegrationEventService.cs +++ b/src/Services/Ordering/Ordering.API/Application/IntegrationEvents/IOrderingIntegrationEventService.cs @@ -5,6 +5,7 @@ namespace Ordering.API.Application.IntegrationEvents { public interface IOrderingIntegrationEventService { - Task PublishThroughEventBusAsync(IntegrationEvent evt); + Task PublishEventsThroughEventBusAsync(); + Task AddAndSaveEventAsync(IntegrationEvent evt); } } diff --git a/src/Services/Ordering/Ordering.API/Application/IntegrationEvents/OrderingIntegrationEventService.cs b/src/Services/Ordering/Ordering.API/Application/IntegrationEvents/OrderingIntegrationEventService.cs index 537dfa971..9c1bd4e1b 100644 --- a/src/Services/Ordering/Ordering.API/Application/IntegrationEvents/OrderingIntegrationEventService.cs +++ b/src/Services/Ordering/Ordering.API/Application/IntegrationEvents/OrderingIntegrationEventService.cs @@ -34,23 +34,27 @@ namespace Ordering.API.Application.IntegrationEvents _eventLogService = _integrationEventLogServiceFactory(_orderingContext.Database.GetDbConnection()); } - public async Task PublishThroughEventBusAsync(IntegrationEvent evt) + public async Task PublishEventsThroughEventBusAsync() { - await SaveEventAsync(evt); - _eventBus.Publish(evt); - await _eventLogService.MarkEventAsPublishedAsync(evt); + var pendindLogEvents = await _eventLogService.RetrieveEventLogsPendingToPublishAsync(); + foreach (var logEvt in pendindLogEvents) + { + try + { + await _eventLogService.MarkEventAsInProgressAsync(logEvt.EventId); + _eventBus.Publish(logEvt.IntegrationEvent); + await _eventLogService.MarkEventAsPublishedAsync(logEvt.EventId); + } + catch (Exception) + { + await _eventLogService.MarkEventAsFailedAsync(logEvt.EventId); + } + } } - private async Task SaveEventAsync(IntegrationEvent evt) + public async Task AddAndSaveEventAsync(IntegrationEvent evt) { - var strategy = _orderingContext.Database.CreateExecutionStrategy(); - await strategy.ExecuteAsync(async () => - { - await _orderingContext.BeginTransactionAsync(); - await _eventLogService.SaveEventAsync(evt, _orderingContext.GetCurrentTransaction.GetDbTransaction()); - await _orderingContext.CommitTransactionAsync(); - }); - + await _eventLogService.SaveEventAsync(evt, _orderingContext.GetCurrentTransaction.GetDbTransaction()); } } } diff --git a/src/Services/Ordering/Ordering.API/Startup.cs b/src/Services/Ordering/Ordering.API/Startup.cs index 42567641a..362df5a47 100644 --- a/src/Services/Ordering/Ordering.API/Startup.cs +++ b/src/Services/Ordering/Ordering.API/Startup.cs @@ -114,6 +114,13 @@ eventBus.Subscribe>(); eventBus.Subscribe>(); eventBus.Subscribe>(); + eventBus.Subscribe>(); + eventBus.Subscribe>(); + eventBus.Subscribe>(); + eventBus.Subscribe>(); + eventBus.Subscribe>(); + eventBus.Subscribe>(); + eventBus.Subscribe>(); } @@ -251,7 +258,11 @@ services.AddSingleton(); services.AddTransient(); services.AddTransient>( - sp => (DbConnection c) => new IntegrationEventLogService(c)); + sp => + { + var busMgr = sp.GetRequiredService(); + return (DbConnection c) => new IntegrationEventLogService(busMgr, c); + }); services.AddTransient();