From 3c909ff392227954a533febb97306175f76f27c2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ram=C3=B3n=20Tom=C3=A1s?= Date: Mon, 3 Apr 2017 13:13:40 +0200 Subject: [PATCH] Refactor transaction and publishing in integration events --- .../Utilities/ResilientTransaction.cs | 37 +++++++++++ .../Controllers/CatalogController.cs | 66 ++++++------------- .../CatalogIntegrationEventService.cs | 48 ++++++++++++++ .../ICatalogIntegrationEventService.cs | 14 ++++ src/Services/Catalog/Catalog.API/Startup.cs | 5 +- ...PaymentMethodVerifiedDomainEventHandler.cs | 47 +++++-------- .../IOrderingIntegrationEventService.cs | 14 ++++ .../OrderingIntegrationEventService.cs | 49 ++++++++++++++ src/Services/Ordering/Ordering.API/Startup.cs | 5 +- 9 files changed, 203 insertions(+), 82 deletions(-) create mode 100644 src/BuildingBlocks/EventBus/IntegrationEventLogEF/Utilities/ResilientTransaction.cs create mode 100644 src/Services/Catalog/Catalog.API/IntegrationEvents/CatalogIntegrationEventService.cs create mode 100644 src/Services/Catalog/Catalog.API/IntegrationEvents/ICatalogIntegrationEventService.cs create mode 100644 src/Services/Ordering/Ordering.API/IntegrationEvents/IOrderingIntegrationEventService.cs create mode 100644 src/Services/Ordering/Ordering.API/IntegrationEvents/OrderingIntegrationEventService.cs diff --git a/src/BuildingBlocks/EventBus/IntegrationEventLogEF/Utilities/ResilientTransaction.cs b/src/BuildingBlocks/EventBus/IntegrationEventLogEF/Utilities/ResilientTransaction.cs new file mode 100644 index 000000000..f8227882b --- /dev/null +++ b/src/BuildingBlocks/EventBus/IntegrationEventLogEF/Utilities/ResilientTransaction.cs @@ -0,0 +1,37 @@ +using Microsoft.EntityFrameworkCore; +using Microsoft.EntityFrameworkCore.Storage; +using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Events; +using Microsoft.eShopOnContainers.BuildingBlocks.IntegrationEventLogEF.Services; +using System; +using System.Collections.Generic; +using System.Data.Common; +using System.Text; +using System.Threading.Tasks; + +namespace Microsoft.eShopOnContainers.BuildingBlocks.IntegrationEventLogEF.Utilities +{ + public class ResilientTransaction + { + private DbContext _context; + private ResilientTransaction(DbContext context) => + _context = context ?? throw new ArgumentNullException(nameof(context)); + + public static ResilientTransaction New (DbContext context) => + new ResilientTransaction(context); + + public async Task ExecuteAsync(Func action) + { + //Use of an EF Core resiliency strategy when using multiple DbContexts within an explicit BeginTransaction(): + //See: https://docs.microsoft.com/en-us/ef/core/miscellaneous/connection-resiliency + var strategy = _context.Database.CreateExecutionStrategy(); + await strategy.ExecuteAsync(async () => + { + using (var transaction = _context.Database.BeginTransaction()) + { + await action(); + transaction.Commit(); + } + }); + } + } +} diff --git a/src/Services/Catalog/Catalog.API/Controllers/CatalogController.cs b/src/Services/Catalog/Catalog.API/Controllers/CatalogController.cs index f4864f2cd..e556376d9 100644 --- a/src/Services/Catalog/Catalog.API/Controllers/CatalogController.cs +++ b/src/Services/Catalog/Catalog.API/Controllers/CatalogController.cs @@ -15,6 +15,8 @@ using System.Data.Common; using System.Linq; using System.Threading.Tasks; using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Events; +using Microsoft.eShopOnContainers.BuildingBlocks.IntegrationEventLogEF.Utilities; +using Catalog.API.IntegrationEvents; namespace Microsoft.eShopOnContainers.Services.Catalog.API.Controllers { @@ -23,15 +25,13 @@ namespace Microsoft.eShopOnContainers.Services.Catalog.API.Controllers { private readonly CatalogContext _catalogContext; private readonly IOptionsSnapshot _settings; - private readonly IEventBus _eventBus; - private readonly Func _integrationEventLogServiceFactory; + private readonly ICatalogIntegrationEventService _catalogIntegrationEventService; - public CatalogController(CatalogContext Context, IOptionsSnapshot settings, IEventBus eventBus, Func integrationEventLogServiceFactory) + public CatalogController(CatalogContext Context, IOptionsSnapshot settings, ICatalogIntegrationEventService catalogIntegrationEventService) { _catalogContext = Context; + _catalogIntegrationEventService = catalogIntegrationEventService; _settings = settings; - _eventBus = eventBus; - _integrationEventLogServiceFactory = integrationEventLogServiceFactory; ((DbContext)Context).ChangeTracker.QueryTrackingBehavior = QueryTrackingBehavior.NoTracking; } @@ -145,51 +145,25 @@ namespace Microsoft.eShopOnContainers.Services.Catalog.API.Controllers { var catalogItem = await _catalogContext.CatalogItems.SingleOrDefaultAsync(i => i.Id == productToUpdate.Id); if (catalogItem == null) return NotFound(); - - bool raiseProductPriceChangedEvent = false; - IntegrationEvent priceChangedEvent = null; - - if (catalogItem.Price != productToUpdate.Price) raiseProductPriceChangedEvent = true; - - if (raiseProductPriceChangedEvent) // Create event if price has changed - { - var oldPrice = catalogItem.Price; - priceChangedEvent = new ProductPriceChangedIntegrationEvent(catalogItem.Id, productToUpdate.Price, oldPrice); - } - - //Update current product + var raiseProductPriceChangedEvent = catalogItem.Price != productToUpdate.Price; + var oldPrice = catalogItem.Price; + + // Update current product catalogItem = productToUpdate; + _catalogContext.CatalogItems.Update(catalogItem); - //Use of an EF Core resiliency strategy when using multiple DbContexts within an explicit BeginTransaction(): - //See: https://docs.microsoft.com/en-us/ef/core/miscellaneous/connection-resiliency - var strategy = _catalogContext.Database.CreateExecutionStrategy(); - var eventLogService = _integrationEventLogServiceFactory(_catalogContext.Database.GetDbConnection()); - await strategy.ExecuteAsync(async () => - { + if (raiseProductPriceChangedEvent) // Save and publish event if price has changed + { + var priceChangedEvent = new ProductPriceChangedIntegrationEvent(catalogItem.Id, productToUpdate.Price, oldPrice); // Achieving atomicity between original Catalog database operation and the IntegrationEventLog thanks to a local transaction - using (var transaction = _catalogContext.Database.BeginTransaction()) - { - _catalogContext.CatalogItems.Update(catalogItem); - await _catalogContext.SaveChangesAsync(); - - //Save to EventLog only if product price changed - if (raiseProductPriceChangedEvent) - { - - await eventLogService.SaveEventAsync(priceChangedEvent, _catalogContext.Database.CurrentTransaction.GetDbTransaction()); - } - - transaction.Commit(); - } - }); - - - //Publish to Event Bus only if product price changed - if (raiseProductPriceChangedEvent) - { - _eventBus.Publish(priceChangedEvent); - await eventLogService.MarkEventAsPublishedAsync(priceChangedEvent); + await _catalogIntegrationEventService.SaveEventAsync(priceChangedEvent); + // Publish to Event Bus only if product price changed + await _catalogIntegrationEventService.PublishAsync(priceChangedEvent); } + else // Save updated product + { + await _catalogContext.SaveChangesAsync(); + } return Ok(); } diff --git a/src/Services/Catalog/Catalog.API/IntegrationEvents/CatalogIntegrationEventService.cs b/src/Services/Catalog/Catalog.API/IntegrationEvents/CatalogIntegrationEventService.cs new file mode 100644 index 000000000..d751d155b --- /dev/null +++ b/src/Services/Catalog/Catalog.API/IntegrationEvents/CatalogIntegrationEventService.cs @@ -0,0 +1,48 @@ +using Microsoft.EntityFrameworkCore; +using Microsoft.EntityFrameworkCore.Storage; +using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Abstractions; +using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Events; +using Microsoft.eShopOnContainers.BuildingBlocks.IntegrationEventLogEF.Services; +using Microsoft.eShopOnContainers.BuildingBlocks.IntegrationEventLogEF.Utilities; +using Microsoft.eShopOnContainers.Services.Catalog.API.Infrastructure; +using System; +using System.Data.Common; +using System.Threading.Tasks; + +namespace Catalog.API.IntegrationEvents +{ + public class CatalogIntegrationEventService : ICatalogIntegrationEventService + { + private readonly Func _integrationEventLogServiceFactory; + private readonly IEventBus _eventBus; + private readonly CatalogContext _catalogContext; + private readonly IIntegrationEventLogService _eventLogService; + + public CatalogIntegrationEventService(IEventBus eventBus, CatalogContext catalogContext, + Func integrationEventLogServiceFactory) + { + _catalogContext = catalogContext ?? throw new ArgumentNullException(nameof(catalogContext)); + _integrationEventLogServiceFactory = integrationEventLogServiceFactory ?? throw new ArgumentNullException(nameof(integrationEventLogServiceFactory)); + _eventBus = eventBus ?? throw new ArgumentNullException(nameof(eventBus)); + _eventLogService = _integrationEventLogServiceFactory(_catalogContext.Database.GetDbConnection()); + } + + public async Task PublishAsync(IntegrationEvent evt) + { + _eventBus.Publish(evt); + await _eventLogService.MarkEventAsPublishedAsync(evt); + } + + public async Task SaveEventAsync(IntegrationEvent evt) + { + //Use of an EF Core resiliency strategy when using multiple DbContexts within an explicit BeginTransaction(): + //See: https://docs.microsoft.com/en-us/ef/core/miscellaneous/connection-resiliency + await ResilientTransaction.New(_catalogContext) + .ExecuteAsync(async () => { + // Achieving atomicity between original catalog database operation and the IntegrationEventLog thanks to a local transaction + await _catalogContext.SaveChangesAsync(); + await _eventLogService.SaveEventAsync(evt, _catalogContext.Database.CurrentTransaction.GetDbTransaction()); + }); + } + } +} diff --git a/src/Services/Catalog/Catalog.API/IntegrationEvents/ICatalogIntegrationEventService.cs b/src/Services/Catalog/Catalog.API/IntegrationEvents/ICatalogIntegrationEventService.cs new file mode 100644 index 000000000..1e695e692 --- /dev/null +++ b/src/Services/Catalog/Catalog.API/IntegrationEvents/ICatalogIntegrationEventService.cs @@ -0,0 +1,14 @@ +using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Events; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; + +namespace Catalog.API.IntegrationEvents +{ + public interface ICatalogIntegrationEventService + { + Task SaveEventAsync(IntegrationEvent evt); + Task PublishAsync(IntegrationEvent evt); + } +} diff --git a/src/Services/Catalog/Catalog.API/Startup.cs b/src/Services/Catalog/Catalog.API/Startup.cs index d49cc20be..716376e67 100644 --- a/src/Services/Catalog/Catalog.API/Startup.cs +++ b/src/Services/Catalog/Catalog.API/Startup.cs @@ -20,6 +20,7 @@ using System.IO; using System.Data.Common; using System.Reflection; + using global::Catalog.API.IntegrationEvents; public class Startup { @@ -97,10 +98,10 @@ }); services.AddTransient>( - sp => (DbConnection c) => new IntegrationEventLogService(c)); - + sp => (DbConnection c) => new IntegrationEventLogService(c)); var serviceProvider = services.BuildServiceProvider(); var configuration = serviceProvider.GetRequiredService>().Value; + services.AddTransient(); services.AddSingleton(new EventBusRabbitMQ(configuration.EventBusConnection)); } diff --git a/src/Services/Ordering/Ordering.API/Application/DomainEventHandlers/BuyerAndPaymentMethodVerified/UpdateOrderWhenBuyerAndPaymentMethodVerifiedDomainEventHandler.cs b/src/Services/Ordering/Ordering.API/Application/DomainEventHandlers/BuyerAndPaymentMethodVerified/UpdateOrderWhenBuyerAndPaymentMethodVerifiedDomainEventHandler.cs index 550d7055e..dd469ae78 100644 --- a/src/Services/Ordering/Ordering.API/Application/DomainEventHandlers/BuyerAndPaymentMethodVerified/UpdateOrderWhenBuyerAndPaymentMethodVerifiedDomainEventHandler.cs +++ b/src/Services/Ordering/Ordering.API/Application/DomainEventHandlers/BuyerAndPaymentMethodVerified/UpdateOrderWhenBuyerAndPaymentMethodVerifiedDomainEventHandler.cs @@ -1,15 +1,10 @@ using MediatR; -using Microsoft.EntityFrameworkCore; -using Microsoft.EntityFrameworkCore.Storage; -using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Abstractions; -using Microsoft.eShopOnContainers.BuildingBlocks.IntegrationEventLogEF.Services; using Microsoft.eShopOnContainers.Services.Ordering.Domain.AggregatesModel.OrderAggregate; -using Microsoft.eShopOnContainers.Services.Ordering.Infrastructure; using Microsoft.Extensions.Logging; +using Ordering.API.IntegrationEvents; using Ordering.API.IntegrationEvents.Events; using Ordering.Domain.Events; using System; -using System.Data.Common; using System.Threading.Tasks; namespace Ordering.API.Application.DomainEventHandlers.BuyerAndPaymentMethodVerified @@ -18,17 +13,15 @@ namespace Ordering.API.Application.DomainEventHandlers.BuyerAndPaymentMethodVeri : IAsyncNotificationHandler { private readonly IOrderRepository _orderRepository; - private readonly ILoggerFactory _logger; - private readonly Func _integrationEventLogServiceFactory; - private readonly IEventBus _eventBus; + private readonly IOrderingIntegrationEventService _orderingIntegrationEventService; + private readonly ILoggerFactory _logger; public UpdateOrderWhenBuyerAndPaymentMethodVerifiedDomainEventHandler( - IOrderRepository orderRepository, ILoggerFactory logger, IEventBus eventBus, - Func integrationEventLogServiceFactory) + IOrderRepository orderRepository, ILoggerFactory logger, + IOrderingIntegrationEventService orderingIntegrationEventService) { _orderRepository = orderRepository ?? throw new ArgumentNullException(nameof(orderRepository)); - _integrationEventLogServiceFactory = integrationEventLogServiceFactory ?? throw new ArgumentNullException(nameof(integrationEventLogServiceFactory)); - _eventBus = eventBus ?? throw new ArgumentNullException(nameof(eventBus)); + _orderingIntegrationEventService = orderingIntegrationEventService ?? throw new ArgumentNullException(nameof(orderingIntegrationEventService)); _logger = logger ?? throw new ArgumentNullException(nameof(logger)); } @@ -42,28 +35,18 @@ namespace Ordering.API.Application.DomainEventHandlers.BuyerAndPaymentMethodVeri orderToUpdate.SetPaymentId(buyerPaymentMethodVerifiedEvent.Payment.Id); var orderStartedIntegrationEvent = new OrderStartedIntegrationEvent(buyerPaymentMethodVerifiedEvent.Buyer.IdentityGuid); - //Use of an EF Core resiliency strategy when using multiple DbContexts within an explicit BeginTransaction(): - //See: https://docs.microsoft.com/en-us/ef/core/miscellaneous/connection-resiliency - var orderingContext = _orderRepository.UnitOfWork as OrderingContext; - var strategy = orderingContext.Database.CreateExecutionStrategy(); - var eventLogService = _integrationEventLogServiceFactory(orderingContext.Database.GetDbConnection()); - await strategy.ExecuteAsync(async () => - { - // Achieving atomicity between original Catalog database operation and the IntegrationEventLog thanks to a local transaction - using (var transaction = orderingContext.Database.BeginTransaction()) - { - await _orderRepository.UnitOfWork.SaveEntitiesAsync(); - await eventLogService.SaveEventAsync(orderStartedIntegrationEvent, orderingContext.Database.CurrentTransaction.GetDbTransaction()); - transaction.Commit(); - } - }); + // Using a local transaction to achieve atomicity between original Ordering database operation and + // the IntegrationEventLog. Only saving event if order has been successfully persisted to db + await _orderingIntegrationEventService + .SaveEventAsync(orderStartedIntegrationEvent); - _logger.CreateLogger(nameof(UpdateOrderWhenBuyerAndPaymentMethodVerifiedDomainEventHandler)) - .LogTrace($"Order with Id: {buyerPaymentMethodVerifiedEvent.OrderId} has been successfully updated with a payment method id: { buyerPaymentMethodVerifiedEvent.Payment.Id }"); + // Publish ordering integration event and mark it as published + await _orderingIntegrationEventService + .PublishAsync(orderStartedIntegrationEvent); - _eventBus.Publish(orderStartedIntegrationEvent); - await eventLogService.MarkEventAsPublishedAsync(orderStartedIntegrationEvent); + _logger.CreateLogger(nameof(UpdateOrderWhenBuyerAndPaymentMethodVerifiedDomainEventHandler)) + .LogTrace($"Order with Id: {buyerPaymentMethodVerifiedEvent.OrderId} has been successfully updated with a payment method id: { buyerPaymentMethodVerifiedEvent.Payment.Id }"); } } } diff --git a/src/Services/Ordering/Ordering.API/IntegrationEvents/IOrderingIntegrationEventService.cs b/src/Services/Ordering/Ordering.API/IntegrationEvents/IOrderingIntegrationEventService.cs new file mode 100644 index 000000000..93ea518ba --- /dev/null +++ b/src/Services/Ordering/Ordering.API/IntegrationEvents/IOrderingIntegrationEventService.cs @@ -0,0 +1,14 @@ +using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Events; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; + +namespace Ordering.API.IntegrationEvents +{ + public interface IOrderingIntegrationEventService + { + Task SaveEventAsync(IntegrationEvent evt); + Task PublishAsync(IntegrationEvent evt); + } +} diff --git a/src/Services/Ordering/Ordering.API/IntegrationEvents/OrderingIntegrationEventService.cs b/src/Services/Ordering/Ordering.API/IntegrationEvents/OrderingIntegrationEventService.cs new file mode 100644 index 000000000..b05ea44d0 --- /dev/null +++ b/src/Services/Ordering/Ordering.API/IntegrationEvents/OrderingIntegrationEventService.cs @@ -0,0 +1,49 @@ +using Microsoft.EntityFrameworkCore; +using Microsoft.EntityFrameworkCore.Storage; +using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Abstractions; +using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Events; +using Microsoft.eShopOnContainers.BuildingBlocks.IntegrationEventLogEF.Services; +using Microsoft.eShopOnContainers.BuildingBlocks.IntegrationEventLogEF.Utilities; +using Microsoft.eShopOnContainers.Services.Ordering.Domain.AggregatesModel.OrderAggregate; +using Microsoft.eShopOnContainers.Services.Ordering.Infrastructure; +using System; +using System.Data.Common; +using System.Threading.Tasks; + +namespace Ordering.API.IntegrationEvents +{ + public class OrderingIntegrationEventService : IOrderingIntegrationEventService + { + private readonly Func _integrationEventLogServiceFactory; + private readonly IEventBus _eventBus; + private readonly OrderingContext _orderingContext; + private readonly IIntegrationEventLogService _eventLogService; + + public OrderingIntegrationEventService (IEventBus eventBus, OrderingContext orderingContext, + Func integrationEventLogServiceFactory) + { + _orderingContext = orderingContext ?? throw new ArgumentNullException(nameof(orderingContext)); + _integrationEventLogServiceFactory = integrationEventLogServiceFactory ?? throw new ArgumentNullException(nameof(integrationEventLogServiceFactory)); + _eventBus = eventBus ?? throw new ArgumentNullException(nameof(eventBus)); + _eventLogService = _integrationEventLogServiceFactory(_orderingContext.Database.GetDbConnection()); + } + + public async Task PublishAsync(IntegrationEvent evt) + { + _eventBus.Publish(evt); + await _eventLogService.MarkEventAsPublishedAsync(evt); + } + + public async Task SaveEventAsync(IntegrationEvent evt) + { + //Use of an EF Core resiliency strategy when using multiple DbContexts within an explicit BeginTransaction(): + //See: https://docs.microsoft.com/en-us/ef/core/miscellaneous/connection-resiliency + await ResilientTransaction.New(_orderingContext) + .ExecuteAsync(async () => { + // Achieving atomicity between original ordering database operation and the IntegrationEventLog thanks to a local transaction + await _orderingContext.SaveChangesAsync(); + await _eventLogService.SaveEventAsync(evt, _orderingContext.Database.CurrentTransaction.GetDbTransaction()); + }); + } + } +} diff --git a/src/Services/Ordering/Ordering.API/Startup.cs b/src/Services/Ordering/Ordering.API/Startup.cs index b7d90fa81..0c7f9ac73 100644 --- a/src/Services/Ordering/Ordering.API/Startup.cs +++ b/src/Services/Ordering/Ordering.API/Startup.cs @@ -4,6 +4,7 @@ using Autofac; using Autofac.Extensions.DependencyInjection; using global::Ordering.API.Infrastructure.Middlewares; + using global::Ordering.API.IntegrationEvents; using Infrastructure; using Infrastructure.Auth; using Infrastructure.AutofacModules; @@ -101,9 +102,9 @@ services.AddSingleton(); services.AddTransient(); services.AddTransient>( - sp => (DbConnection c) => new IntegrationEventLogService(c)); - + sp => (DbConnection c) => new IntegrationEventLogService(c)); var serviceProvider = services.BuildServiceProvider(); + services.AddTransient(); services.AddSingleton(new EventBusRabbitMQ(Configuration["EventBusConnection"])); services.AddOptions();