Browse Source

Refactor transaction and publishing in integration events

pull/150/head
Ramón Tomás 7 years ago
parent
commit
3c909ff392
9 changed files with 203 additions and 82 deletions
  1. +37
    -0
      src/BuildingBlocks/EventBus/IntegrationEventLogEF/Utilities/ResilientTransaction.cs
  2. +20
    -46
      src/Services/Catalog/Catalog.API/Controllers/CatalogController.cs
  3. +48
    -0
      src/Services/Catalog/Catalog.API/IntegrationEvents/CatalogIntegrationEventService.cs
  4. +14
    -0
      src/Services/Catalog/Catalog.API/IntegrationEvents/ICatalogIntegrationEventService.cs
  5. +3
    -2
      src/Services/Catalog/Catalog.API/Startup.cs
  6. +15
    -32
      src/Services/Ordering/Ordering.API/Application/DomainEventHandlers/BuyerAndPaymentMethodVerified/UpdateOrderWhenBuyerAndPaymentMethodVerifiedDomainEventHandler.cs
  7. +14
    -0
      src/Services/Ordering/Ordering.API/IntegrationEvents/IOrderingIntegrationEventService.cs
  8. +49
    -0
      src/Services/Ordering/Ordering.API/IntegrationEvents/OrderingIntegrationEventService.cs
  9. +3
    -2
      src/Services/Ordering/Ordering.API/Startup.cs

+ 37
- 0
src/BuildingBlocks/EventBus/IntegrationEventLogEF/Utilities/ResilientTransaction.cs View File

@ -0,0 +1,37 @@
using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Storage;
using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Events;
using Microsoft.eShopOnContainers.BuildingBlocks.IntegrationEventLogEF.Services;
using System;
using System.Collections.Generic;
using System.Data.Common;
using System.Text;
using System.Threading.Tasks;
namespace Microsoft.eShopOnContainers.BuildingBlocks.IntegrationEventLogEF.Utilities
{
public class ResilientTransaction
{
private DbContext _context;
private ResilientTransaction(DbContext context) =>
_context = context ?? throw new ArgumentNullException(nameof(context));
public static ResilientTransaction New (DbContext context) =>
new ResilientTransaction(context);
public async Task ExecuteAsync(Func<Task> action)
{
//Use of an EF Core resiliency strategy when using multiple DbContexts within an explicit BeginTransaction():
//See: https://docs.microsoft.com/en-us/ef/core/miscellaneous/connection-resiliency
var strategy = _context.Database.CreateExecutionStrategy();
await strategy.ExecuteAsync(async () =>
{
using (var transaction = _context.Database.BeginTransaction())
{
await action();
transaction.Commit();
}
});
}
}
}

+ 20
- 46
src/Services/Catalog/Catalog.API/Controllers/CatalogController.cs View File

@ -15,6 +15,8 @@ using System.Data.Common;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Events;
using Microsoft.eShopOnContainers.BuildingBlocks.IntegrationEventLogEF.Utilities;
using Catalog.API.IntegrationEvents;
namespace Microsoft.eShopOnContainers.Services.Catalog.API.Controllers
{
@ -23,15 +25,13 @@ namespace Microsoft.eShopOnContainers.Services.Catalog.API.Controllers
{
private readonly CatalogContext _catalogContext;
private readonly IOptionsSnapshot<Settings> _settings;
private readonly IEventBus _eventBus;
private readonly Func<DbConnection, IIntegrationEventLogService> _integrationEventLogServiceFactory;
private readonly ICatalogIntegrationEventService _catalogIntegrationEventService;
public CatalogController(CatalogContext Context, IOptionsSnapshot<Settings> settings, IEventBus eventBus, Func<DbConnection, IIntegrationEventLogService> integrationEventLogServiceFactory)
public CatalogController(CatalogContext Context, IOptionsSnapshot<Settings> settings, ICatalogIntegrationEventService catalogIntegrationEventService)
{
_catalogContext = Context;
_catalogIntegrationEventService = catalogIntegrationEventService;
_settings = settings;
_eventBus = eventBus;
_integrationEventLogServiceFactory = integrationEventLogServiceFactory;
((DbContext)Context).ChangeTracker.QueryTrackingBehavior = QueryTrackingBehavior.NoTracking;
}
@ -145,51 +145,25 @@ namespace Microsoft.eShopOnContainers.Services.Catalog.API.Controllers
{
var catalogItem = await _catalogContext.CatalogItems.SingleOrDefaultAsync(i => i.Id == productToUpdate.Id);
if (catalogItem == null) return NotFound();
bool raiseProductPriceChangedEvent = false;
IntegrationEvent priceChangedEvent = null;
if (catalogItem.Price != productToUpdate.Price) raiseProductPriceChangedEvent = true;
if (raiseProductPriceChangedEvent) // Create event if price has changed
{
var oldPrice = catalogItem.Price;
priceChangedEvent = new ProductPriceChangedIntegrationEvent(catalogItem.Id, productToUpdate.Price, oldPrice);
}
//Update current product
var raiseProductPriceChangedEvent = catalogItem.Price != productToUpdate.Price;
var oldPrice = catalogItem.Price;
// Update current product
catalogItem = productToUpdate;
_catalogContext.CatalogItems.Update(catalogItem);
//Use of an EF Core resiliency strategy when using multiple DbContexts within an explicit BeginTransaction():
//See: https://docs.microsoft.com/en-us/ef/core/miscellaneous/connection-resiliency
var strategy = _catalogContext.Database.CreateExecutionStrategy();
var eventLogService = _integrationEventLogServiceFactory(_catalogContext.Database.GetDbConnection());
await strategy.ExecuteAsync(async () =>
{
if (raiseProductPriceChangedEvent) // Save and publish event if price has changed
{
var priceChangedEvent = new ProductPriceChangedIntegrationEvent(catalogItem.Id, productToUpdate.Price, oldPrice);
// Achieving atomicity between original Catalog database operation and the IntegrationEventLog thanks to a local transaction
using (var transaction = _catalogContext.Database.BeginTransaction())
{
_catalogContext.CatalogItems.Update(catalogItem);
await _catalogContext.SaveChangesAsync();
//Save to EventLog only if product price changed
if (raiseProductPriceChangedEvent)
{
await eventLogService.SaveEventAsync(priceChangedEvent, _catalogContext.Database.CurrentTransaction.GetDbTransaction());
}
transaction.Commit();
}
});
//Publish to Event Bus only if product price changed
if (raiseProductPriceChangedEvent)
{
_eventBus.Publish(priceChangedEvent);
await eventLogService.MarkEventAsPublishedAsync(priceChangedEvent);
await _catalogIntegrationEventService.SaveEventAsync(priceChangedEvent);
// Publish to Event Bus only if product price changed
await _catalogIntegrationEventService.PublishAsync(priceChangedEvent);
}
else // Save updated product
{
await _catalogContext.SaveChangesAsync();
}
return Ok();
}


+ 48
- 0
src/Services/Catalog/Catalog.API/IntegrationEvents/CatalogIntegrationEventService.cs View File

@ -0,0 +1,48 @@
using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Storage;
using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Abstractions;
using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Events;
using Microsoft.eShopOnContainers.BuildingBlocks.IntegrationEventLogEF.Services;
using Microsoft.eShopOnContainers.BuildingBlocks.IntegrationEventLogEF.Utilities;
using Microsoft.eShopOnContainers.Services.Catalog.API.Infrastructure;
using System;
using System.Data.Common;
using System.Threading.Tasks;
namespace Catalog.API.IntegrationEvents
{
public class CatalogIntegrationEventService : ICatalogIntegrationEventService
{
private readonly Func<DbConnection, IIntegrationEventLogService> _integrationEventLogServiceFactory;
private readonly IEventBus _eventBus;
private readonly CatalogContext _catalogContext;
private readonly IIntegrationEventLogService _eventLogService;
public CatalogIntegrationEventService(IEventBus eventBus, CatalogContext catalogContext,
Func<DbConnection, IIntegrationEventLogService> integrationEventLogServiceFactory)
{
_catalogContext = catalogContext ?? throw new ArgumentNullException(nameof(catalogContext));
_integrationEventLogServiceFactory = integrationEventLogServiceFactory ?? throw new ArgumentNullException(nameof(integrationEventLogServiceFactory));
_eventBus = eventBus ?? throw new ArgumentNullException(nameof(eventBus));
_eventLogService = _integrationEventLogServiceFactory(_catalogContext.Database.GetDbConnection());
}
public async Task PublishAsync(IntegrationEvent evt)
{
_eventBus.Publish(evt);
await _eventLogService.MarkEventAsPublishedAsync(evt);
}
public async Task SaveEventAsync(IntegrationEvent evt)
{
//Use of an EF Core resiliency strategy when using multiple DbContexts within an explicit BeginTransaction():
//See: https://docs.microsoft.com/en-us/ef/core/miscellaneous/connection-resiliency
await ResilientTransaction.New(_catalogContext)
.ExecuteAsync(async () => {
// Achieving atomicity between original catalog database operation and the IntegrationEventLog thanks to a local transaction
await _catalogContext.SaveChangesAsync();
await _eventLogService.SaveEventAsync(evt, _catalogContext.Database.CurrentTransaction.GetDbTransaction());
});
}
}
}

+ 14
- 0
src/Services/Catalog/Catalog.API/IntegrationEvents/ICatalogIntegrationEventService.cs View File

@ -0,0 +1,14 @@
using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Events;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
namespace Catalog.API.IntegrationEvents
{
public interface ICatalogIntegrationEventService
{
Task SaveEventAsync(IntegrationEvent evt);
Task PublishAsync(IntegrationEvent evt);
}
}

+ 3
- 2
src/Services/Catalog/Catalog.API/Startup.cs View File

@ -20,6 +20,7 @@
using System.IO;
using System.Data.Common;
using System.Reflection;
using global::Catalog.API.IntegrationEvents;
public class Startup
{
@ -97,10 +98,10 @@
});
services.AddTransient<Func<DbConnection, IIntegrationEventLogService>>(
sp => (DbConnection c) => new IntegrationEventLogService(c));
sp => (DbConnection c) => new IntegrationEventLogService(c));
var serviceProvider = services.BuildServiceProvider();
var configuration = serviceProvider.GetRequiredService<IOptionsSnapshot<Settings>>().Value;
services.AddTransient<ICatalogIntegrationEventService, CatalogIntegrationEventService>();
services.AddSingleton<IEventBus>(new EventBusRabbitMQ(configuration.EventBusConnection));
}


+ 15
- 32
src/Services/Ordering/Ordering.API/Application/DomainEventHandlers/BuyerAndPaymentMethodVerified/UpdateOrderWhenBuyerAndPaymentMethodVerifiedDomainEventHandler.cs View File

@ -1,15 +1,10 @@
using MediatR;
using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Storage;
using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Abstractions;
using Microsoft.eShopOnContainers.BuildingBlocks.IntegrationEventLogEF.Services;
using Microsoft.eShopOnContainers.Services.Ordering.Domain.AggregatesModel.OrderAggregate;
using Microsoft.eShopOnContainers.Services.Ordering.Infrastructure;
using Microsoft.Extensions.Logging;
using Ordering.API.IntegrationEvents;
using Ordering.API.IntegrationEvents.Events;
using Ordering.Domain.Events;
using System;
using System.Data.Common;
using System.Threading.Tasks;
namespace Ordering.API.Application.DomainEventHandlers.BuyerAndPaymentMethodVerified
@ -18,17 +13,15 @@ namespace Ordering.API.Application.DomainEventHandlers.BuyerAndPaymentMethodVeri
: IAsyncNotificationHandler<BuyerAndPaymentMethodVerifiedDomainEvent>
{
private readonly IOrderRepository _orderRepository;
private readonly ILoggerFactory _logger;
private readonly Func<DbConnection, IIntegrationEventLogService> _integrationEventLogServiceFactory;
private readonly IEventBus _eventBus;
private readonly IOrderingIntegrationEventService _orderingIntegrationEventService;
private readonly ILoggerFactory _logger;
public UpdateOrderWhenBuyerAndPaymentMethodVerifiedDomainEventHandler(
IOrderRepository orderRepository, ILoggerFactory logger, IEventBus eventBus,
Func<DbConnection, IIntegrationEventLogService> integrationEventLogServiceFactory)
IOrderRepository orderRepository, ILoggerFactory logger,
IOrderingIntegrationEventService orderingIntegrationEventService)
{
_orderRepository = orderRepository ?? throw new ArgumentNullException(nameof(orderRepository));
_integrationEventLogServiceFactory = integrationEventLogServiceFactory ?? throw new ArgumentNullException(nameof(integrationEventLogServiceFactory));
_eventBus = eventBus ?? throw new ArgumentNullException(nameof(eventBus));
_orderingIntegrationEventService = orderingIntegrationEventService ?? throw new ArgumentNullException(nameof(orderingIntegrationEventService));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
}
@ -42,28 +35,18 @@ namespace Ordering.API.Application.DomainEventHandlers.BuyerAndPaymentMethodVeri
orderToUpdate.SetPaymentId(buyerPaymentMethodVerifiedEvent.Payment.Id);
var orderStartedIntegrationEvent = new OrderStartedIntegrationEvent(buyerPaymentMethodVerifiedEvent.Buyer.IdentityGuid);
//Use of an EF Core resiliency strategy when using multiple DbContexts within an explicit BeginTransaction():
//See: https://docs.microsoft.com/en-us/ef/core/miscellaneous/connection-resiliency
var orderingContext = _orderRepository.UnitOfWork as OrderingContext;
var strategy = orderingContext.Database.CreateExecutionStrategy();
var eventLogService = _integrationEventLogServiceFactory(orderingContext.Database.GetDbConnection());
await strategy.ExecuteAsync(async () =>
{
// Achieving atomicity between original Catalog database operation and the IntegrationEventLog thanks to a local transaction
using (var transaction = orderingContext.Database.BeginTransaction())
{
await _orderRepository.UnitOfWork.SaveEntitiesAsync();
await eventLogService.SaveEventAsync(orderStartedIntegrationEvent, orderingContext.Database.CurrentTransaction.GetDbTransaction());
transaction.Commit();
}
});
// Using a local transaction to achieve atomicity between original Ordering database operation and
// the IntegrationEventLog. Only saving event if order has been successfully persisted to db
await _orderingIntegrationEventService
.SaveEventAsync(orderStartedIntegrationEvent);
_logger.CreateLogger(nameof(UpdateOrderWhenBuyerAndPaymentMethodVerifiedDomainEventHandler))
.LogTrace($"Order with Id: {buyerPaymentMethodVerifiedEvent.OrderId} has been successfully updated with a payment method id: { buyerPaymentMethodVerifiedEvent.Payment.Id }");
// Publish ordering integration event and mark it as published
await _orderingIntegrationEventService
.PublishAsync(orderStartedIntegrationEvent);
_eventBus.Publish(orderStartedIntegrationEvent);
await eventLogService.MarkEventAsPublishedAsync(orderStartedIntegrationEvent);
_logger.CreateLogger(nameof(UpdateOrderWhenBuyerAndPaymentMethodVerifiedDomainEventHandler))
.LogTrace($"Order with Id: {buyerPaymentMethodVerifiedEvent.OrderId} has been successfully updated with a payment method id: { buyerPaymentMethodVerifiedEvent.Payment.Id }");
}
}
}

+ 14
- 0
src/Services/Ordering/Ordering.API/IntegrationEvents/IOrderingIntegrationEventService.cs View File

@ -0,0 +1,14 @@
using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Events;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
namespace Ordering.API.IntegrationEvents
{
public interface IOrderingIntegrationEventService
{
Task SaveEventAsync(IntegrationEvent evt);
Task PublishAsync(IntegrationEvent evt);
}
}

+ 49
- 0
src/Services/Ordering/Ordering.API/IntegrationEvents/OrderingIntegrationEventService.cs View File

@ -0,0 +1,49 @@
using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Storage;
using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Abstractions;
using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Events;
using Microsoft.eShopOnContainers.BuildingBlocks.IntegrationEventLogEF.Services;
using Microsoft.eShopOnContainers.BuildingBlocks.IntegrationEventLogEF.Utilities;
using Microsoft.eShopOnContainers.Services.Ordering.Domain.AggregatesModel.OrderAggregate;
using Microsoft.eShopOnContainers.Services.Ordering.Infrastructure;
using System;
using System.Data.Common;
using System.Threading.Tasks;
namespace Ordering.API.IntegrationEvents
{
public class OrderingIntegrationEventService : IOrderingIntegrationEventService
{
private readonly Func<DbConnection, IIntegrationEventLogService> _integrationEventLogServiceFactory;
private readonly IEventBus _eventBus;
private readonly OrderingContext _orderingContext;
private readonly IIntegrationEventLogService _eventLogService;
public OrderingIntegrationEventService (IEventBus eventBus, OrderingContext orderingContext,
Func<DbConnection, IIntegrationEventLogService> integrationEventLogServiceFactory)
{
_orderingContext = orderingContext ?? throw new ArgumentNullException(nameof(orderingContext));
_integrationEventLogServiceFactory = integrationEventLogServiceFactory ?? throw new ArgumentNullException(nameof(integrationEventLogServiceFactory));
_eventBus = eventBus ?? throw new ArgumentNullException(nameof(eventBus));
_eventLogService = _integrationEventLogServiceFactory(_orderingContext.Database.GetDbConnection());
}
public async Task PublishAsync(IntegrationEvent evt)
{
_eventBus.Publish(evt);
await _eventLogService.MarkEventAsPublishedAsync(evt);
}
public async Task SaveEventAsync(IntegrationEvent evt)
{
//Use of an EF Core resiliency strategy when using multiple DbContexts within an explicit BeginTransaction():
//See: https://docs.microsoft.com/en-us/ef/core/miscellaneous/connection-resiliency
await ResilientTransaction.New(_orderingContext)
.ExecuteAsync(async () => {
// Achieving atomicity between original ordering database operation and the IntegrationEventLog thanks to a local transaction
await _orderingContext.SaveChangesAsync();
await _eventLogService.SaveEventAsync(evt, _orderingContext.Database.CurrentTransaction.GetDbTransaction());
});
}
}
}

+ 3
- 2
src/Services/Ordering/Ordering.API/Startup.cs View File

@ -4,6 +4,7 @@
using Autofac;
using Autofac.Extensions.DependencyInjection;
using global::Ordering.API.Infrastructure.Middlewares;
using global::Ordering.API.IntegrationEvents;
using Infrastructure;
using Infrastructure.Auth;
using Infrastructure.AutofacModules;
@ -101,9 +102,9 @@
services.AddSingleton<IHttpContextAccessor, HttpContextAccessor>();
services.AddTransient<IIdentityService, IdentityService>();
services.AddTransient<Func<DbConnection, IIntegrationEventLogService>>(
sp => (DbConnection c) => new IntegrationEventLogService(c));
sp => (DbConnection c) => new IntegrationEventLogService(c));
var serviceProvider = services.BuildServiceProvider();
services.AddTransient<IOrderingIntegrationEventService, OrderingIntegrationEventService>();
services.AddSingleton<IEventBus>(new EventBusRabbitMQ(Configuration["EventBusConnection"]));
services.AddOptions();


Loading…
Cancel
Save