Add shared scope transaction between updating catalog product priceand store ProductPriceChangedIntegrationEvent. Added service to encapsulate logic for storage of integration event logs.

This commit is contained in:
dsanz 2017-03-23 13:24:17 +01:00
parent dee6ea7342
commit 6e4d9461de
5 changed files with 96 additions and 23 deletions

View File

@ -8,6 +8,7 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.IntegrationEventLogEF
{ {
public class IntegrationEventLogEntry public class IntegrationEventLogEntry
{ {
private IntegrationEventLogEntry() { }
public IntegrationEventLogEntry(IntegrationEvent @event) public IntegrationEventLogEntry(IntegrationEvent @event)
{ {
EventId = @event.Id; EventId = @event.Id;

View File

@ -1,5 +1,7 @@
using Microsoft.AspNetCore.Mvc; using Catalog.API.IntegrationEvents;
using Microsoft.AspNetCore.Mvc;
using Microsoft.EntityFrameworkCore; using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Storage;
using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Abstractions; using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Abstractions;
using Microsoft.eShopOnContainers.BuildingBlocks.IntegrationEventLogEF; using Microsoft.eShopOnContainers.BuildingBlocks.IntegrationEventLogEF;
using Microsoft.eShopOnContainers.Services.Catalog.API.Infrastructure; using Microsoft.eShopOnContainers.Services.Catalog.API.Infrastructure;
@ -17,18 +19,18 @@ namespace Microsoft.eShopOnContainers.Services.Catalog.API.Controllers
public class CatalogController : ControllerBase public class CatalogController : ControllerBase
{ {
private readonly CatalogContext _catalogContext; private readonly CatalogContext _catalogContext;
private readonly IntegrationEventLogContext _integrationEventLogContext;
private readonly IOptionsSnapshot<Settings> _settings; private readonly IOptionsSnapshot<Settings> _settings;
private readonly IEventBus _eventBus; private readonly IEventBus _eventBus;
private readonly IIntegrationEventLogService _integrationEventLogService;
public CatalogController(CatalogContext catalogContext, IntegrationEventLogContext integrationEventLogContext, IOptionsSnapshot<Settings> settings, IEventBus eventBus) public CatalogController(CatalogContext Context, IOptionsSnapshot<Settings> settings, IEventBus eventBus, IIntegrationEventLogService integrationEventLogService)
{ {
_catalogContext = catalogContext; _catalogContext = Context;
_integrationEventLogContext = integrationEventLogContext;
_settings = settings; _settings = settings;
_eventBus = eventBus; _eventBus = eventBus;
_integrationEventLogService = integrationEventLogService;
((DbContext)catalogContext).ChangeTracker.QueryTrackingBehavior = QueryTrackingBehavior.NoTracking; ((DbContext)Context).ChangeTracker.QueryTrackingBehavior = QueryTrackingBehavior.NoTracking;
} }
// GET api/v1/[controller]/items[?pageSize=3&pageIndex=10] // GET api/v1/[controller]/items[?pageSize=3&pageIndex=10]
@ -149,21 +151,21 @@ namespace Microsoft.eShopOnContainers.Services.Catalog.API.Controllers
{ {
var oldPrice = item.Price; var oldPrice = item.Price;
item.Price = product.Price; item.Price = product.Price;
_catalogContext.CatalogItems.Update(item);
var @event = new ProductPriceChangedIntegrationEvent(item.Id, item.Price, oldPrice); var @event = new ProductPriceChangedIntegrationEvent(item.Id, item.Price, oldPrice);
var eventLogEntry = new IntegrationEventLogEntry(@event);
_integrationEventLogContext.IntegrationEventLogs.Add(eventLogEntry);
await _integrationEventLogContext.SaveChangesAsync(); using (var transaction = _catalogContext.Database.BeginTransaction())
await _catalogContext.SaveChangesAsync(); {
_catalogContext.CatalogItems.Update(item);
await _catalogContext.SaveChangesAsync();
await _integrationEventLogService.SaveEventAsync(@event);
transaction.Commit();
}
_eventBus.Publish(@event); _eventBus.Publish(@event);
eventLogEntry.TimesSent++; await _integrationEventLogService.MarkEventAsPublishedAsync(@event);
eventLogEntry.State = EventStateEnum.Published;
_integrationEventLogContext.IntegrationEventLogs.Update(eventLogEntry);
await _integrationEventLogContext.SaveChangesAsync();
} }
return Ok(); return Ok();

View File

@ -0,0 +1,14 @@
using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Events;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
namespace Catalog.API.IntegrationEvents
{
public interface IIntegrationEventLogService
{
Task SaveEventAsync(IntegrationEvent @event);
Task MarkEventAsPublishedAsync(IntegrationEvent @event);
}
}

View File

@ -0,0 +1,51 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Events;
using Microsoft.eShopOnContainers.Services.Catalog.API.Infrastructure;
using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Storage;
using Microsoft.eShopOnContainers.BuildingBlocks.IntegrationEventLogEF;
using Microsoft.EntityFrameworkCore.Infrastructure;
namespace Catalog.API.IntegrationEvents
{
public class IntegrationEventLogService : IIntegrationEventLogService
{
private readonly IntegrationEventLogContext _integrationEventLogContext;
private readonly CatalogContext _catalogContext;
public IntegrationEventLogService(CatalogContext catalogContext)
{
_catalogContext = catalogContext;
_integrationEventLogContext = new IntegrationEventLogContext(
new DbContextOptionsBuilder<IntegrationEventLogContext>()
.UseSqlServer(catalogContext.Database.GetDbConnection())
.ConfigureWarnings(warnings => warnings.Throw(RelationalEventId.QueryClientEvaluationWarning))
.Options);
}
public Task SaveEventAsync(IntegrationEvent @event)
{
var eventLogEntry = new IntegrationEventLogEntry(@event);
// as a constraint this transaction has to be done together with a catalogContext transaction
_integrationEventLogContext.Database.UseTransaction(_catalogContext.Database.CurrentTransaction.GetDbTransaction());
_integrationEventLogContext.IntegrationEventLogs.Add(eventLogEntry);
return _integrationEventLogContext.SaveChangesAsync();
}
public Task MarkEventAsPublishedAsync(IntegrationEvent @event)
{
var eventLogEntry = _integrationEventLogContext.IntegrationEventLogs.Single(ie => ie.EventId == @event.Id);
eventLogEntry.TimesSent++;
eventLogEntry.State = EventStateEnum.Published;
_integrationEventLogContext.IntegrationEventLogs.Update(eventLogEntry);
return _integrationEventLogContext.SaveChangesAsync();
}
}
}

View File

@ -1,5 +1,6 @@
namespace Microsoft.eShopOnContainers.Services.Catalog.API namespace Microsoft.eShopOnContainers.Services.Catalog.API
{ {
using global::Catalog.API.IntegrationEvents;
using Microsoft.AspNetCore.Builder; using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting; using Microsoft.AspNetCore.Hosting;
using Microsoft.EntityFrameworkCore; using Microsoft.EntityFrameworkCore;
@ -12,6 +13,7 @@
using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options; using Microsoft.Extensions.Options;
using System.Data.SqlClient;
using System.Reflection; using System.Reflection;
public class Startup public class Startup
@ -37,9 +39,11 @@
public void ConfigureServices(IServiceCollection services) public void ConfigureServices(IServiceCollection services)
{ {
var sqlConnection = new SqlConnection(Configuration["ConnectionString"]);
services.AddDbContext<CatalogContext>(c => services.AddDbContext<CatalogContext>(c =>
{ {
c.UseSqlServer(Configuration["ConnectionString"]); c.UseSqlServer(sqlConnection);
// Changing default behavior when client evaluation occurs to throw. // Changing default behavior when client evaluation occurs to throw.
// Default in EF Core would be to log a warning when client evaluation is performed. // Default in EF Core would be to log a warning when client evaluation is performed.
c.ConfigureWarnings(warnings => warnings.Throw(RelationalEventId.QueryClientEvaluationWarning)); c.ConfigureWarnings(warnings => warnings.Throw(RelationalEventId.QueryClientEvaluationWarning));
@ -48,7 +52,7 @@
services.AddDbContext<IntegrationEventLogContext>(c => services.AddDbContext<IntegrationEventLogContext>(c =>
{ {
c.UseSqlServer(Configuration["ConnectionString"], b => b.MigrationsAssembly("Catalog.API")); c.UseSqlServer(sqlConnection, b => b.MigrationsAssembly("Catalog.API"));
c.ConfigureWarnings(warnings => warnings.Throw(RelationalEventId.QueryClientEvaluationWarning)); c.ConfigureWarnings(warnings => warnings.Throw(RelationalEventId.QueryClientEvaluationWarning));
}); });
@ -77,6 +81,8 @@
.AllowCredentials()); .AllowCredentials());
}); });
services.AddTransient<IIntegrationEventLogService, IntegrationEventLogService>();
var serviceProvider = services.BuildServiceProvider(); var serviceProvider = services.BuildServiceProvider();
var configuration = serviceProvider.GetRequiredService<IOptionsSnapshot<Settings>>().Value; var configuration = serviceProvider.GetRequiredService<IOptionsSnapshot<Settings>>().Value;
services.AddSingleton<IEventBus>(new EventBusRabbitMQ(configuration.EventBusConnection)); services.AddSingleton<IEventBus>(new EventBusRabbitMQ(configuration.EventBusConnection));
@ -106,9 +112,8 @@
//Seed Data //Seed Data
CatalogContextSeed.SeedAsync(app, loggerFactory) CatalogContextSeed.SeedAsync(app, loggerFactory)
.Wait(); .Wait();
// TODO: move this creation to a db initializer integrationEventLogContext.Database.Migrate();
integrationEventLogContext.Database.Migrate();
} }
} }