Features/#146 clean basket with integrate eventpull/173/head
@ -0,0 +1,37 @@ | |||||
using Microsoft.EntityFrameworkCore; | |||||
using Microsoft.EntityFrameworkCore.Storage; | |||||
using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Events; | |||||
using Microsoft.eShopOnContainers.BuildingBlocks.IntegrationEventLogEF.Services; | |||||
using System; | |||||
using System.Collections.Generic; | |||||
using System.Data.Common; | |||||
using System.Text; | |||||
using System.Threading.Tasks; | |||||
namespace Microsoft.eShopOnContainers.BuildingBlocks.IntegrationEventLogEF.Utilities | |||||
{ | |||||
public class ResilientTransaction | |||||
{ | |||||
private DbContext _context; | |||||
private ResilientTransaction(DbContext context) => | |||||
_context = context ?? throw new ArgumentNullException(nameof(context)); | |||||
public static ResilientTransaction New (DbContext context) => | |||||
new ResilientTransaction(context); | |||||
public async Task ExecuteAsync(Func<Task> action) | |||||
{ | |||||
//Use of an EF Core resiliency strategy when using multiple DbContexts within an explicit BeginTransaction(): | |||||
//See: https://docs.microsoft.com/en-us/ef/core/miscellaneous/connection-resiliency | |||||
var strategy = _context.Database.CreateExecutionStrategy(); | |||||
await strategy.ExecuteAsync(async () => | |||||
{ | |||||
using (var transaction = _context.Database.BeginTransaction()) | |||||
{ | |||||
await action(); | |||||
transaction.Commit(); | |||||
} | |||||
}); | |||||
} | |||||
} | |||||
} |
@ -0,0 +1,27 @@ | |||||
using Basket.API.IntegrationEvents.Events; | |||||
using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Abstractions; | |||||
using Microsoft.eShopOnContainers.Services.Basket.API.Model; | |||||
using System; | |||||
using System.Collections.Generic; | |||||
using System.Linq; | |||||
using System.Threading.Tasks; | |||||
namespace Basket.API.IntegrationEvents.EventHandling | |||||
{ | |||||
public class OrderStartedIntegrationEventHandler : IIntegrationEventHandler<OrderStartedIntegrationEvent> | |||||
{ | |||||
private readonly IBasketRepository _repository; | |||||
public OrderStartedIntegrationEventHandler(IBasketRepository repository) | |||||
{ | |||||
_repository = repository; | |||||
} | |||||
public async Task Handle(OrderStartedIntegrationEvent @event) | |||||
{ | |||||
await _repository.DeleteBasketAsync(@event.UserId.ToString()); | |||||
} | |||||
} | |||||
} | |||||
@ -0,0 +1,19 @@ | |||||
using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Events; | |||||
using System; | |||||
using System.Collections.Generic; | |||||
using System.Linq; | |||||
using System.Threading.Tasks; | |||||
namespace Basket.API.IntegrationEvents.Events | |||||
{ | |||||
// Integration Events notes: | |||||
// An Event is “something that has happened in the past”, therefore its name has to be | |||||
// An Integration Event is an event that can cause side effects to other microsrvices, Bounded-Contexts or external systems. | |||||
public class OrderStartedIntegrationEvent : IntegrationEvent | |||||
{ | |||||
public string UserId { get; } | |||||
public OrderStartedIntegrationEvent(string userId) => | |||||
UserId = userId; | |||||
} | |||||
} |
@ -0,0 +1,48 @@ | |||||
using Microsoft.EntityFrameworkCore; | |||||
using Microsoft.EntityFrameworkCore.Storage; | |||||
using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Abstractions; | |||||
using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Events; | |||||
using Microsoft.eShopOnContainers.BuildingBlocks.IntegrationEventLogEF.Services; | |||||
using Microsoft.eShopOnContainers.BuildingBlocks.IntegrationEventLogEF.Utilities; | |||||
using Microsoft.eShopOnContainers.Services.Catalog.API.Infrastructure; | |||||
using System; | |||||
using System.Data.Common; | |||||
using System.Threading.Tasks; | |||||
namespace Catalog.API.IntegrationEvents | |||||
{ | |||||
public class CatalogIntegrationEventService : ICatalogIntegrationEventService | |||||
{ | |||||
private readonly Func<DbConnection, IIntegrationEventLogService> _integrationEventLogServiceFactory; | |||||
private readonly IEventBus _eventBus; | |||||
private readonly CatalogContext _catalogContext; | |||||
private readonly IIntegrationEventLogService _eventLogService; | |||||
public CatalogIntegrationEventService(IEventBus eventBus, CatalogContext catalogContext, | |||||
Func<DbConnection, IIntegrationEventLogService> integrationEventLogServiceFactory) | |||||
{ | |||||
_catalogContext = catalogContext ?? throw new ArgumentNullException(nameof(catalogContext)); | |||||
_integrationEventLogServiceFactory = integrationEventLogServiceFactory ?? throw new ArgumentNullException(nameof(integrationEventLogServiceFactory)); | |||||
_eventBus = eventBus ?? throw new ArgumentNullException(nameof(eventBus)); | |||||
_eventLogService = _integrationEventLogServiceFactory(_catalogContext.Database.GetDbConnection()); | |||||
} | |||||
public async Task PublishThroughEventBusAsync(IntegrationEvent evt) | |||||
{ | |||||
_eventBus.Publish(evt); | |||||
await _eventLogService.MarkEventAsPublishedAsync(evt); | |||||
} | |||||
public async Task SaveEventAndCatalogContextChangesAsync(IntegrationEvent evt) | |||||
{ | |||||
//Use of an EF Core resiliency strategy when using multiple DbContexts within an explicit BeginTransaction(): | |||||
//See: https://docs.microsoft.com/en-us/ef/core/miscellaneous/connection-resiliency | |||||
await ResilientTransaction.New(_catalogContext) | |||||
.ExecuteAsync(async () => { | |||||
// Achieving atomicity between original catalog database operation and the IntegrationEventLog thanks to a local transaction | |||||
await _catalogContext.SaveChangesAsync(); | |||||
await _eventLogService.SaveEventAsync(evt, _catalogContext.Database.CurrentTransaction.GetDbTransaction()); | |||||
}); | |||||
} | |||||
} | |||||
} |
@ -0,0 +1,14 @@ | |||||
using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Events; | |||||
using System; | |||||
using System.Collections.Generic; | |||||
using System.Linq; | |||||
using System.Threading.Tasks; | |||||
namespace Catalog.API.IntegrationEvents | |||||
{ | |||||
public interface ICatalogIntegrationEventService | |||||
{ | |||||
Task SaveEventAndCatalogContextChangesAsync(IntegrationEvent evt); | |||||
Task PublishThroughEventBusAsync(IntegrationEvent evt); | |||||
} | |||||
} |
@ -0,0 +1,43 @@ | |||||
using System; | |||||
using Microsoft.EntityFrameworkCore; | |||||
using Microsoft.EntityFrameworkCore.Infrastructure; | |||||
using Microsoft.EntityFrameworkCore.Metadata; | |||||
using Microsoft.EntityFrameworkCore.Migrations; | |||||
using Microsoft.eShopOnContainers.BuildingBlocks.IntegrationEventLogEF; | |||||
namespace Ordering.API.Infrastructure.IntegrationEventMigrations | |||||
{ | |||||
[DbContext(typeof(IntegrationEventLogContext))] | |||||
[Migration("20170330131634_IntegrationEventInitial")] | |||||
partial class IntegrationEventInitial | |||||
{ | |||||
protected override void BuildTargetModel(ModelBuilder modelBuilder) | |||||
{ | |||||
modelBuilder | |||||
.HasAnnotation("ProductVersion", "1.1.1") | |||||
.HasAnnotation("SqlServer:ValueGenerationStrategy", SqlServerValueGenerationStrategy.IdentityColumn); | |||||
modelBuilder.Entity("Microsoft.eShopOnContainers.BuildingBlocks.IntegrationEventLogEF.IntegrationEventLogEntry", b => | |||||
{ | |||||
b.Property<Guid>("EventId") | |||||
.ValueGeneratedOnAdd(); | |||||
b.Property<string>("Content") | |||||
.IsRequired(); | |||||
b.Property<DateTime>("CreationTime"); | |||||
b.Property<string>("EventTypeName") | |||||
.IsRequired(); | |||||
b.Property<int>("State"); | |||||
b.Property<int>("TimesSent"); | |||||
b.HasKey("EventId"); | |||||
b.ToTable("IntegrationEventLog"); | |||||
}); | |||||
} | |||||
} | |||||
} |
@ -0,0 +1,34 @@ | |||||
using System; | |||||
using System.Collections.Generic; | |||||
using Microsoft.EntityFrameworkCore.Migrations; | |||||
namespace Ordering.API.Infrastructure.IntegrationEventMigrations | |||||
{ | |||||
public partial class IntegrationEventInitial : Migration | |||||
{ | |||||
protected override void Up(MigrationBuilder migrationBuilder) | |||||
{ | |||||
migrationBuilder.CreateTable( | |||||
name: "IntegrationEventLog", | |||||
columns: table => new | |||||
{ | |||||
EventId = table.Column<Guid>(nullable: false), | |||||
Content = table.Column<string>(nullable: false), | |||||
CreationTime = table.Column<DateTime>(nullable: false), | |||||
EventTypeName = table.Column<string>(nullable: false), | |||||
State = table.Column<int>(nullable: false), | |||||
TimesSent = table.Column<int>(nullable: false) | |||||
}, | |||||
constraints: table => | |||||
{ | |||||
table.PrimaryKey("PK_IntegrationEventLog", x => x.EventId); | |||||
}); | |||||
} | |||||
protected override void Down(MigrationBuilder migrationBuilder) | |||||
{ | |||||
migrationBuilder.DropTable( | |||||
name: "IntegrationEventLog"); | |||||
} | |||||
} | |||||
} |
@ -0,0 +1,42 @@ | |||||
using System; | |||||
using Microsoft.EntityFrameworkCore; | |||||
using Microsoft.EntityFrameworkCore.Infrastructure; | |||||
using Microsoft.EntityFrameworkCore.Metadata; | |||||
using Microsoft.EntityFrameworkCore.Migrations; | |||||
using Microsoft.eShopOnContainers.BuildingBlocks.IntegrationEventLogEF; | |||||
namespace Ordering.API.Infrastructure.IntegrationEventMigrations | |||||
{ | |||||
[DbContext(typeof(IntegrationEventLogContext))] | |||||
partial class IntegrationEventLogContextModelSnapshot : ModelSnapshot | |||||
{ | |||||
protected override void BuildModel(ModelBuilder modelBuilder) | |||||
{ | |||||
modelBuilder | |||||
.HasAnnotation("ProductVersion", "1.1.1") | |||||
.HasAnnotation("SqlServer:ValueGenerationStrategy", SqlServerValueGenerationStrategy.IdentityColumn); | |||||
modelBuilder.Entity("Microsoft.eShopOnContainers.BuildingBlocks.IntegrationEventLogEF.IntegrationEventLogEntry", b => | |||||
{ | |||||
b.Property<Guid>("EventId") | |||||
.ValueGeneratedOnAdd(); | |||||
b.Property<string>("Content") | |||||
.IsRequired(); | |||||
b.Property<DateTime>("CreationTime"); | |||||
b.Property<string>("EventTypeName") | |||||
.IsRequired(); | |||||
b.Property<int>("State"); | |||||
b.Property<int>("TimesSent"); | |||||
b.HasKey("EventId"); | |||||
b.ToTable("IntegrationEventLog"); | |||||
}); | |||||
} | |||||
} | |||||
} |
@ -0,0 +1,19 @@ | |||||
using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Events; | |||||
using System; | |||||
using System.Collections.Generic; | |||||
using System.Linq; | |||||
using System.Threading.Tasks; | |||||
namespace Ordering.API.IntegrationEvents.Events | |||||
{ | |||||
// Integration Events notes: | |||||
// An Event is “something that has happened in the past”, therefore its name has to be | |||||
// An Integration Event is an event that can cause side effects to other microsrvices, Bounded-Contexts or external systems. | |||||
public class OrderStartedIntegrationEvent : IntegrationEvent | |||||
{ | |||||
public string UserId { get; } | |||||
public OrderStartedIntegrationEvent(string userId) => | |||||
UserId = userId; | |||||
} | |||||
} |
@ -0,0 +1,14 @@ | |||||
using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Events; | |||||
using System; | |||||
using System.Collections.Generic; | |||||
using System.Linq; | |||||
using System.Threading.Tasks; | |||||
namespace Ordering.API.IntegrationEvents | |||||
{ | |||||
public interface IOrderingIntegrationEventService | |||||
{ | |||||
Task SaveEventAndOrderingContextChangesAsync(IntegrationEvent evt); | |||||
Task PublishThroughEventBusAsync(IntegrationEvent evt); | |||||
} | |||||
} |
@ -0,0 +1,49 @@ | |||||
using Microsoft.EntityFrameworkCore; | |||||
using Microsoft.EntityFrameworkCore.Storage; | |||||
using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Abstractions; | |||||
using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Events; | |||||
using Microsoft.eShopOnContainers.BuildingBlocks.IntegrationEventLogEF.Services; | |||||
using Microsoft.eShopOnContainers.BuildingBlocks.IntegrationEventLogEF.Utilities; | |||||
using Microsoft.eShopOnContainers.Services.Ordering.Domain.AggregatesModel.OrderAggregate; | |||||
using Microsoft.eShopOnContainers.Services.Ordering.Infrastructure; | |||||
using System; | |||||
using System.Data.Common; | |||||
using System.Threading.Tasks; | |||||
namespace Ordering.API.IntegrationEvents | |||||
{ | |||||
public class OrderingIntegrationEventService : IOrderingIntegrationEventService | |||||
{ | |||||
private readonly Func<DbConnection, IIntegrationEventLogService> _integrationEventLogServiceFactory; | |||||
private readonly IEventBus _eventBus; | |||||
private readonly OrderingContext _orderingContext; | |||||
private readonly IIntegrationEventLogService _eventLogService; | |||||
public OrderingIntegrationEventService (IEventBus eventBus, OrderingContext orderingContext, | |||||
Func<DbConnection, IIntegrationEventLogService> integrationEventLogServiceFactory) | |||||
{ | |||||
_orderingContext = orderingContext ?? throw new ArgumentNullException(nameof(orderingContext)); | |||||
_integrationEventLogServiceFactory = integrationEventLogServiceFactory ?? throw new ArgumentNullException(nameof(integrationEventLogServiceFactory)); | |||||
_eventBus = eventBus ?? throw new ArgumentNullException(nameof(eventBus)); | |||||
_eventLogService = _integrationEventLogServiceFactory(_orderingContext.Database.GetDbConnection()); | |||||
} | |||||
public async Task PublishThroughEventBusAsync(IntegrationEvent evt) | |||||
{ | |||||
_eventBus.Publish(evt); | |||||
await _eventLogService.MarkEventAsPublishedAsync(evt); | |||||
} | |||||
public async Task SaveEventAndOrderingContextChangesAsync(IntegrationEvent evt) | |||||
{ | |||||
//Use of an EF Core resiliency strategy when using multiple DbContexts within an explicit BeginTransaction(): | |||||
//See: https://docs.microsoft.com/en-us/ef/core/miscellaneous/connection-resiliency | |||||
await ResilientTransaction.New(_orderingContext) | |||||
.ExecuteAsync(async () => { | |||||
// Achieving atomicity between original ordering database operation and the IntegrationEventLog thanks to a local transaction | |||||
await _orderingContext.SaveChangesAsync(); | |||||
await _eventLogService.SaveEventAsync(evt, _orderingContext.Database.CurrentTransaction.GetDbTransaction()); | |||||
}); | |||||
} | |||||
} | |||||
} |