Initial approach
This commit is contained in:
parent
0bf9a91ac4
commit
db35a5c369
@ -0,0 +1,55 @@
|
|||||||
|
using MediatR;
|
||||||
|
using Microsoft.EntityFrameworkCore;
|
||||||
|
using Microsoft.eShopOnContainers.Services.Ordering.Infrastructure;
|
||||||
|
using Microsoft.Extensions.Logging;
|
||||||
|
using System;
|
||||||
|
using System.Collections.Generic;
|
||||||
|
using System.Linq;
|
||||||
|
using System.Threading;
|
||||||
|
using System.Threading.Tasks;
|
||||||
|
|
||||||
|
namespace Ordering.API.Application.Behaviors
|
||||||
|
{
|
||||||
|
public class TransactionBehaviour<TRequest, TResponse> : IPipelineBehavior<TRequest, TResponse>
|
||||||
|
{
|
||||||
|
private readonly ILogger<TransactionBehaviour<TRequest, TResponse>> _logger;
|
||||||
|
private readonly OrderingContext _dbContext;
|
||||||
|
|
||||||
|
public TransactionBehaviour(OrderingContext dbContext, ILogger<TransactionBehaviour<TRequest, TResponse>> logger)
|
||||||
|
{
|
||||||
|
_dbContext = dbContext ?? throw new ArgumentException(nameof(OrderingContext));
|
||||||
|
_logger = logger ?? throw new ArgumentException(nameof(ILogger));
|
||||||
|
}
|
||||||
|
|
||||||
|
public async Task<TResponse> Handle(TRequest request, CancellationToken cancellationToken, RequestHandlerDelegate<TResponse> next)
|
||||||
|
{
|
||||||
|
TResponse response = default(TResponse);
|
||||||
|
|
||||||
|
try
|
||||||
|
{
|
||||||
|
var strategy = _dbContext.Database.CreateExecutionStrategy();
|
||||||
|
await strategy.ExecuteAsync(async () =>
|
||||||
|
{
|
||||||
|
_logger.LogInformation($"Begin transaction {typeof(TRequest).Name}");
|
||||||
|
|
||||||
|
await _dbContext.BeginTransactionAsync();
|
||||||
|
|
||||||
|
response = await next();
|
||||||
|
|
||||||
|
await _dbContext.CommitTransactionAsync();
|
||||||
|
|
||||||
|
_logger.LogInformation($"Committed transaction {typeof(TRequest).Name}");
|
||||||
|
});
|
||||||
|
|
||||||
|
return response;
|
||||||
|
}
|
||||||
|
catch (Exception)
|
||||||
|
{
|
||||||
|
_logger.LogInformation($"Rollback transaction executed {typeof(TRequest).Name}");
|
||||||
|
|
||||||
|
_dbContext.RollbackTransaction();
|
||||||
|
throw;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -2,6 +2,7 @@
|
|||||||
using Microsoft.EntityFrameworkCore.Storage;
|
using Microsoft.EntityFrameworkCore.Storage;
|
||||||
using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Abstractions;
|
using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Abstractions;
|
||||||
using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Events;
|
using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Events;
|
||||||
|
using Microsoft.eShopOnContainers.BuildingBlocks.IntegrationEventLogEF;
|
||||||
using Microsoft.eShopOnContainers.BuildingBlocks.IntegrationEventLogEF.Services;
|
using Microsoft.eShopOnContainers.BuildingBlocks.IntegrationEventLogEF.Services;
|
||||||
using Microsoft.eShopOnContainers.BuildingBlocks.IntegrationEventLogEF.Utilities;
|
using Microsoft.eShopOnContainers.BuildingBlocks.IntegrationEventLogEF.Utilities;
|
||||||
using Microsoft.eShopOnContainers.Services.Ordering.Infrastructure;
|
using Microsoft.eShopOnContainers.Services.Ordering.Infrastructure;
|
||||||
@ -17,12 +18,16 @@ namespace Ordering.API.Application.IntegrationEvents
|
|||||||
private readonly Func<DbConnection, IIntegrationEventLogService> _integrationEventLogServiceFactory;
|
private readonly Func<DbConnection, IIntegrationEventLogService> _integrationEventLogServiceFactory;
|
||||||
private readonly IEventBus _eventBus;
|
private readonly IEventBus _eventBus;
|
||||||
private readonly OrderingContext _orderingContext;
|
private readonly OrderingContext _orderingContext;
|
||||||
|
private readonly IntegrationEventLogContext _eventLogContext;
|
||||||
private readonly IIntegrationEventLogService _eventLogService;
|
private readonly IIntegrationEventLogService _eventLogService;
|
||||||
|
|
||||||
public OrderingIntegrationEventService(IEventBus eventBus, OrderingContext orderingContext,
|
public OrderingIntegrationEventService(IEventBus eventBus,
|
||||||
Func<DbConnection, IIntegrationEventLogService> integrationEventLogServiceFactory)
|
OrderingContext orderingContext,
|
||||||
|
IntegrationEventLogContext eventLogContext,
|
||||||
|
Func<DbConnection, IIntegrationEventLogService> integrationEventLogServiceFactory)
|
||||||
{
|
{
|
||||||
_orderingContext = orderingContext ?? throw new ArgumentNullException(nameof(orderingContext));
|
_orderingContext = orderingContext ?? throw new ArgumentNullException(nameof(orderingContext));
|
||||||
|
_eventLogContext = eventLogContext ?? throw new ArgumentNullException(nameof(eventLogContext));
|
||||||
_integrationEventLogServiceFactory = integrationEventLogServiceFactory ?? throw new ArgumentNullException(nameof(integrationEventLogServiceFactory));
|
_integrationEventLogServiceFactory = integrationEventLogServiceFactory ?? throw new ArgumentNullException(nameof(integrationEventLogServiceFactory));
|
||||||
_eventBus = eventBus ?? throw new ArgumentNullException(nameof(eventBus));
|
_eventBus = eventBus ?? throw new ArgumentNullException(nameof(eventBus));
|
||||||
_eventLogService = _integrationEventLogServiceFactory(_orderingContext.Database.GetDbConnection());
|
_eventLogService = _integrationEventLogServiceFactory(_orderingContext.Database.GetDbConnection());
|
||||||
@ -37,14 +42,8 @@ namespace Ordering.API.Application.IntegrationEvents
|
|||||||
|
|
||||||
private async Task SaveEventAndOrderingContextChangesAsync(IntegrationEvent evt)
|
private async Task SaveEventAndOrderingContextChangesAsync(IntegrationEvent evt)
|
||||||
{
|
{
|
||||||
//Use of an EF Core resiliency strategy when using multiple DbContexts within an explicit BeginTransaction():
|
await _orderingContext.SaveChangesAsync();
|
||||||
//See: https://docs.microsoft.com/en-us/ef/core/miscellaneous/connection-resiliency
|
await _eventLogContext.SaveChangesAsync();
|
||||||
await ResilientTransaction.New(_orderingContext)
|
|
||||||
.ExecuteAsync(async () => {
|
|
||||||
// Achieving atomicity between original ordering database operation and the IntegrationEventLog thanks to a local transaction
|
|
||||||
await _orderingContext.SaveChangesAsync();
|
|
||||||
await _eventLogService.SaveEventAsync(evt, _orderingContext.Database.CurrentTransaction.GetDbTransaction());
|
|
||||||
});
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -3,6 +3,7 @@ using Autofac.Core;
|
|||||||
using FluentValidation;
|
using FluentValidation;
|
||||||
using MediatR;
|
using MediatR;
|
||||||
using Microsoft.eShopOnContainers.Services.Ordering.API.Application.Commands;
|
using Microsoft.eShopOnContainers.Services.Ordering.API.Application.Commands;
|
||||||
|
using Ordering.API.Application.Behaviors;
|
||||||
using Ordering.API.Application.DomainEventHandlers.OrderStartedEvent;
|
using Ordering.API.Application.DomainEventHandlers.OrderStartedEvent;
|
||||||
using Ordering.API.Application.Validations;
|
using Ordering.API.Application.Validations;
|
||||||
using Ordering.API.Infrastructure.Behaviors;
|
using Ordering.API.Infrastructure.Behaviors;
|
||||||
@ -53,6 +54,7 @@ namespace Microsoft.eShopOnContainers.Services.Ordering.API.Infrastructure.Autof
|
|||||||
|
|
||||||
builder.RegisterGeneric(typeof(LoggingBehavior<,>)).As(typeof(IPipelineBehavior<,>));
|
builder.RegisterGeneric(typeof(LoggingBehavior<,>)).As(typeof(IPipelineBehavior<,>));
|
||||||
builder.RegisterGeneric(typeof(ValidatorBehavior<,>)).As(typeof(IPipelineBehavior<,>));
|
builder.RegisterGeneric(typeof(ValidatorBehavior<,>)).As(typeof(IPipelineBehavior<,>));
|
||||||
|
builder.RegisterGeneric(typeof(TransactionBehaviour<,>)).As(typeof(IPipelineBehavior<,>));
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1,12 +1,14 @@
|
|||||||
using MediatR;
|
using MediatR;
|
||||||
using Microsoft.EntityFrameworkCore;
|
using Microsoft.EntityFrameworkCore;
|
||||||
using Microsoft.EntityFrameworkCore.Design;
|
using Microsoft.EntityFrameworkCore.Design;
|
||||||
|
using Microsoft.EntityFrameworkCore.Storage;
|
||||||
using Microsoft.eShopOnContainers.Services.Ordering.Domain.AggregatesModel.BuyerAggregate;
|
using Microsoft.eShopOnContainers.Services.Ordering.Domain.AggregatesModel.BuyerAggregate;
|
||||||
using Microsoft.eShopOnContainers.Services.Ordering.Domain.AggregatesModel.OrderAggregate;
|
using Microsoft.eShopOnContainers.Services.Ordering.Domain.AggregatesModel.OrderAggregate;
|
||||||
using Microsoft.eShopOnContainers.Services.Ordering.Domain.Seedwork;
|
using Microsoft.eShopOnContainers.Services.Ordering.Domain.Seedwork;
|
||||||
using Ordering.Infrastructure;
|
using Ordering.Infrastructure;
|
||||||
using Ordering.Infrastructure.EntityConfigurations;
|
using Ordering.Infrastructure.EntityConfigurations;
|
||||||
using System;
|
using System;
|
||||||
|
using System.Data;
|
||||||
using System.Threading;
|
using System.Threading;
|
||||||
using System.Threading.Tasks;
|
using System.Threading.Tasks;
|
||||||
|
|
||||||
@ -23,6 +25,7 @@ namespace Microsoft.eShopOnContainers.Services.Ordering.Infrastructure
|
|||||||
public DbSet<OrderStatus> OrderStatus { get; set; }
|
public DbSet<OrderStatus> OrderStatus { get; set; }
|
||||||
|
|
||||||
private readonly IMediator _mediator;
|
private readonly IMediator _mediator;
|
||||||
|
private IDbContextTransaction _currentTransaction;
|
||||||
|
|
||||||
private OrderingContext(DbContextOptions<OrderingContext> options) : base (options) { }
|
private OrderingContext(DbContextOptions<OrderingContext> options) : base (options) { }
|
||||||
|
|
||||||
@ -60,7 +63,50 @@ namespace Microsoft.eShopOnContainers.Services.Ordering.Infrastructure
|
|||||||
var result = await base.SaveChangesAsync();
|
var result = await base.SaveChangesAsync();
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public async Task BeginTransactionAsync()
|
||||||
|
{
|
||||||
|
_currentTransaction = _currentTransaction ?? await Database.BeginTransactionAsync(IsolationLevel.ReadCommitted);
|
||||||
|
}
|
||||||
|
|
||||||
|
public async Task CommitTransactionAsync()
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
await SaveChangesAsync();
|
||||||
|
_currentTransaction?.Commit();
|
||||||
|
}
|
||||||
|
catch
|
||||||
|
{
|
||||||
|
RollbackTransaction();
|
||||||
|
throw;
|
||||||
|
}
|
||||||
|
finally
|
||||||
|
{
|
||||||
|
if (_currentTransaction != null)
|
||||||
|
{
|
||||||
|
_currentTransaction.Dispose();
|
||||||
|
_currentTransaction = null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void RollbackTransaction()
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
_currentTransaction?.Rollback();
|
||||||
|
}
|
||||||
|
finally
|
||||||
|
{
|
||||||
|
if (_currentTransaction != null)
|
||||||
|
{
|
||||||
|
_currentTransaction.Dispose();
|
||||||
|
_currentTransaction = null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public class OrderingContextDesignFactory : IDesignTimeDbContextFactory<OrderingContext>
|
public class OrderingContextDesignFactory : IDesignTimeDbContextFactory<OrderingContext>
|
||||||
|
Loading…
x
Reference in New Issue
Block a user