Add a TransactionId to IntegrationEventLogEntry to ensure that chained integration events are only published once from the correct transaction scope
This commit is contained in:
parent
ec0c15ac84
commit
61ecfba052
@ -110,6 +110,7 @@ services:
|
|||||||
- ApplicationInsights__InstrumentationKey=${INSTRUMENTATION_KEY}
|
- ApplicationInsights__InstrumentationKey=${INSTRUMENTATION_KEY}
|
||||||
- OrchestratorType=${ORCHESTRATOR_TYPE}
|
- OrchestratorType=${ORCHESTRATOR_TYPE}
|
||||||
- UseLoadTest=${USE_LOADTEST:-False}
|
- UseLoadTest=${USE_LOADTEST:-False}
|
||||||
|
- Serilog__MinimumLevel__Override__Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ=Verbose
|
||||||
ports:
|
ports:
|
||||||
- "5102:80" # Important: In a production environment your should remove the external port (5102) kept here for microservice debugging purposes.
|
- "5102:80" # Important: In a production environment your should remove the external port (5102) kept here for microservice debugging purposes.
|
||||||
# The API Gateway redirects and access through the internal port (80).
|
# The API Gateway redirects and access through the internal port (80).
|
||||||
@ -130,6 +131,7 @@ services:
|
|||||||
- ApplicationInsights__InstrumentationKey=${INSTRUMENTATION_KEY}
|
- ApplicationInsights__InstrumentationKey=${INSTRUMENTATION_KEY}
|
||||||
- OrchestratorType=${ORCHESTRATOR_TYPE}
|
- OrchestratorType=${ORCHESTRATOR_TYPE}
|
||||||
- UseLoadTest=${USE_LOADTEST:-False}
|
- UseLoadTest=${USE_LOADTEST:-False}
|
||||||
|
- Serilog__MinimumLevel__Override__Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ=Verbose
|
||||||
ports:
|
ports:
|
||||||
- "5111:80"
|
- "5111:80"
|
||||||
|
|
||||||
@ -168,6 +170,8 @@ services:
|
|||||||
- AzureServiceBusEnabled=False
|
- AzureServiceBusEnabled=False
|
||||||
- ApplicationInsights__InstrumentationKey=${INSTRUMENTATION_KEY}
|
- ApplicationInsights__InstrumentationKey=${INSTRUMENTATION_KEY}
|
||||||
- OrchestratorType=${ORCHESTRATOR_TYPE}
|
- OrchestratorType=${ORCHESTRATOR_TYPE}
|
||||||
|
- Serilog__MinimumLevel__Override__Payment.API.IntegrationEvents.EventHandling=Verbose
|
||||||
|
- Serilog__MinimumLevel__Override__Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ=Verbose
|
||||||
ports:
|
ports:
|
||||||
- "5108:80" # Important: In a production environment your should remove the external port (5108) kept here for microservice debugging purposes.
|
- "5108:80" # Important: In a production environment your should remove the external port (5108) kept here for microservice debugging purposes.
|
||||||
# The API Gateway redirects and access through the internal port (80).
|
# The API Gateway redirects and access through the internal port (80).
|
||||||
|
@ -1 +1 @@
|
|||||||
echo RESTORING ALL PACKAGES...; for f in /src/csproj-files/*.csproj; do dotnet restore $f; done
|
echo RESTORING ALL PACKAGES...; for f in /src/csproj-files/*.csproj; do dotnet restore $f; done
|
@ -80,13 +80,16 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ
|
|||||||
_logger.LogWarning(ex, "Could not publish event: {EventId} after {Timeout}s ({ExceptionMessage})", @event.Id, $"{time.TotalSeconds:n1}", ex.Message);
|
_logger.LogWarning(ex, "Could not publish event: {EventId} after {Timeout}s ({ExceptionMessage})", @event.Id, $"{time.TotalSeconds:n1}", ex.Message);
|
||||||
});
|
});
|
||||||
|
|
||||||
|
var eventName = @event.GetType().Name;
|
||||||
|
|
||||||
|
_logger.LogTrace("Creating RabbitMQ channel to publish event: {EventId} ({EventName})", @event.Id, eventName);
|
||||||
|
|
||||||
using (var channel = _persistentConnection.CreateModel())
|
using (var channel = _persistentConnection.CreateModel())
|
||||||
{
|
{
|
||||||
var eventName = @event.GetType()
|
|
||||||
.Name;
|
|
||||||
|
|
||||||
channel.ExchangeDeclare(exchange: BROKER_NAME,
|
_logger.LogTrace("Declaring RabbitMQ exchange to publish event: {EventId}", @event.Id);
|
||||||
type: "direct");
|
|
||||||
|
channel.ExchangeDeclare(exchange: BROKER_NAME, type: "direct");
|
||||||
|
|
||||||
var message = JsonConvert.SerializeObject(@event);
|
var message = JsonConvert.SerializeObject(@event);
|
||||||
var body = Encoding.UTF8.GetBytes(message);
|
var body = Encoding.UTF8.GetBytes(message);
|
||||||
@ -96,11 +99,14 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ
|
|||||||
var properties = channel.CreateBasicProperties();
|
var properties = channel.CreateBasicProperties();
|
||||||
properties.DeliveryMode = 2; // persistent
|
properties.DeliveryMode = 2; // persistent
|
||||||
|
|
||||||
channel.BasicPublish(exchange: BROKER_NAME,
|
_logger.LogTrace("Publishing event to RabbitMQ: {EventId}", @event.Id);
|
||||||
routingKey: eventName,
|
|
||||||
mandatory: true,
|
channel.BasicPublish(
|
||||||
basicProperties: properties,
|
exchange: BROKER_NAME,
|
||||||
body: body);
|
routingKey: eventName,
|
||||||
|
mandatory: true,
|
||||||
|
basicProperties: properties,
|
||||||
|
body: body);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -176,6 +182,8 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ
|
|||||||
|
|
||||||
private void StartBasicConsume()
|
private void StartBasicConsume()
|
||||||
{
|
{
|
||||||
|
_logger.LogTrace("Starting RabbitMQ basic consume");
|
||||||
|
|
||||||
if (_consumerChannel != null)
|
if (_consumerChannel != null)
|
||||||
{
|
{
|
||||||
var consumer = new AsyncEventingBasicConsumer(_consumerChannel);
|
var consumer = new AsyncEventingBasicConsumer(_consumerChannel);
|
||||||
@ -225,6 +233,8 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ
|
|||||||
_persistentConnection.TryConnect();
|
_persistentConnection.TryConnect();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
_logger.LogTrace("Creating RabbitMQ consumer channel");
|
||||||
|
|
||||||
var channel = _persistentConnection.CreateModel();
|
var channel = _persistentConnection.CreateModel();
|
||||||
|
|
||||||
channel.ExchangeDeclare(exchange: BROKER_NAME,
|
channel.ExchangeDeclare(exchange: BROKER_NAME,
|
||||||
@ -238,6 +248,8 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ
|
|||||||
|
|
||||||
channel.CallbackException += (sender, ea) =>
|
channel.CallbackException += (sender, ea) =>
|
||||||
{
|
{
|
||||||
|
_logger.LogWarning(ea.Exception, "Recreating RabbitMQ consumer channel");
|
||||||
|
|
||||||
_consumerChannel.Dispose();
|
_consumerChannel.Dispose();
|
||||||
_consumerChannel = CreateConsumerChannel();
|
_consumerChannel = CreateConsumerChannel();
|
||||||
StartBasicConsume();
|
StartBasicConsume();
|
||||||
@ -248,6 +260,8 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ
|
|||||||
|
|
||||||
private async Task ProcessEvent(string eventName, string message)
|
private async Task ProcessEvent(string eventName, string message)
|
||||||
{
|
{
|
||||||
|
_logger.LogTrace("Processing RabbitMQ event: {EventName}", eventName);
|
||||||
|
|
||||||
if (_subsManager.HasSubscriptionsForEvent(eventName))
|
if (_subsManager.HasSubscriptionsForEvent(eventName))
|
||||||
{
|
{
|
||||||
using (var scope = _autofac.BeginLifetimeScope(AUTOFAC_SCOPE_NAME))
|
using (var scope = _autofac.BeginLifetimeScope(AUTOFAC_SCOPE_NAME))
|
||||||
@ -274,6 +288,10 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
_logger.LogWarning("No subscription for RabbitMQ event: {EventName}", eventName);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -12,7 +12,7 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.IntegrationEventLogEF
|
|||||||
public class IntegrationEventLogEntry
|
public class IntegrationEventLogEntry
|
||||||
{
|
{
|
||||||
private IntegrationEventLogEntry() { }
|
private IntegrationEventLogEntry() { }
|
||||||
public IntegrationEventLogEntry(IntegrationEvent @event)
|
public IntegrationEventLogEntry(IntegrationEvent @event, Guid transactionId)
|
||||||
{
|
{
|
||||||
EventId = @event.Id;
|
EventId = @event.Id;
|
||||||
CreationTime = @event.CreationDate;
|
CreationTime = @event.CreationDate;
|
||||||
@ -20,7 +20,9 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.IntegrationEventLogEF
|
|||||||
Content = JsonConvert.SerializeObject(@event);
|
Content = JsonConvert.SerializeObject(@event);
|
||||||
State = EventStateEnum.NotPublished;
|
State = EventStateEnum.NotPublished;
|
||||||
TimesSent = 0;
|
TimesSent = 0;
|
||||||
|
TransactionId = transactionId.ToString();
|
||||||
}
|
}
|
||||||
|
|
||||||
public Guid EventId { get; private set; }
|
public Guid EventId { get; private set; }
|
||||||
public string EventTypeName { get; private set; }
|
public string EventTypeName { get; private set; }
|
||||||
[NotMapped]
|
[NotMapped]
|
||||||
@ -31,6 +33,7 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.IntegrationEventLogEF
|
|||||||
public int TimesSent { get; set; }
|
public int TimesSent { get; set; }
|
||||||
public DateTime CreationTime { get; private set; }
|
public DateTime CreationTime { get; private set; }
|
||||||
public string Content { get; private set; }
|
public string Content { get; private set; }
|
||||||
|
public string TransactionId { get; private set; }
|
||||||
|
|
||||||
public IntegrationEventLogEntry DeserializeJsonContent(Type type)
|
public IntegrationEventLogEntry DeserializeJsonContent(Type type)
|
||||||
{
|
{
|
||||||
|
@ -1,4 +1,5 @@
|
|||||||
using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Events;
|
using Microsoft.EntityFrameworkCore.Storage;
|
||||||
|
using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Events;
|
||||||
using System;
|
using System;
|
||||||
using System.Collections.Generic;
|
using System.Collections.Generic;
|
||||||
using System.Data.Common;
|
using System.Data.Common;
|
||||||
@ -9,8 +10,8 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.IntegrationEventLogEF.Servi
|
|||||||
{
|
{
|
||||||
public interface IIntegrationEventLogService
|
public interface IIntegrationEventLogService
|
||||||
{
|
{
|
||||||
Task<IEnumerable<IntegrationEventLogEntry>> RetrieveEventLogsPendingToPublishAsync();
|
Task<IEnumerable<IntegrationEventLogEntry>> RetrieveEventLogsPendingToPublishAsync(Guid transactionId);
|
||||||
Task SaveEventAsync(IntegrationEvent @event, DbTransaction transaction);
|
Task SaveEventAsync(IntegrationEvent @event, IDbContextTransaction transaction);
|
||||||
Task MarkEventAsPublishedAsync(Guid eventId);
|
Task MarkEventAsPublishedAsync(Guid eventId);
|
||||||
Task MarkEventAsInProgressAsync(Guid eventId);
|
Task MarkEventAsInProgressAsync(Guid eventId);
|
||||||
Task MarkEventAsFailedAsync(Guid eventId);
|
Task MarkEventAsFailedAsync(Guid eventId);
|
||||||
|
@ -1,5 +1,6 @@
|
|||||||
using Microsoft.EntityFrameworkCore;
|
using Microsoft.EntityFrameworkCore;
|
||||||
using Microsoft.EntityFrameworkCore.Diagnostics;
|
using Microsoft.EntityFrameworkCore.Diagnostics;
|
||||||
|
using Microsoft.EntityFrameworkCore.Storage;
|
||||||
using Microsoft.eShopOnContainers.BuildingBlocks.EventBus;
|
using Microsoft.eShopOnContainers.BuildingBlocks.EventBus;
|
||||||
using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Events;
|
using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Events;
|
||||||
using Newtonsoft.Json;
|
using Newtonsoft.Json;
|
||||||
@ -34,25 +35,24 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.IntegrationEventLogEF.Servi
|
|||||||
.ToList();
|
.ToList();
|
||||||
}
|
}
|
||||||
|
|
||||||
public async Task<IEnumerable<IntegrationEventLogEntry>> RetrieveEventLogsPendingToPublishAsync()
|
public async Task<IEnumerable<IntegrationEventLogEntry>> RetrieveEventLogsPendingToPublishAsync(Guid transactionId)
|
||||||
{
|
{
|
||||||
|
var tid = transactionId.ToString();
|
||||||
|
|
||||||
return await _integrationEventLogContext.IntegrationEventLogs
|
return await _integrationEventLogContext.IntegrationEventLogs
|
||||||
.Where(e => e.State == EventStateEnum.NotPublished)
|
.Where(e => e.TransactionId == tid && e.State == EventStateEnum.NotPublished)
|
||||||
.OrderBy(o => o.CreationTime)
|
.OrderBy(o => o.CreationTime)
|
||||||
.Select(e => e.DeserializeJsonContent(_eventTypes.Find(t=> t.Name == e.EventTypeShortName)))
|
.Select(e => e.DeserializeJsonContent(_eventTypes.Find(t=> t.Name == e.EventTypeShortName)))
|
||||||
.ToListAsync();
|
.ToListAsync();
|
||||||
}
|
}
|
||||||
|
|
||||||
public Task SaveEventAsync(IntegrationEvent @event, DbTransaction transaction)
|
public Task SaveEventAsync(IntegrationEvent @event, IDbContextTransaction transaction)
|
||||||
{
|
{
|
||||||
if (transaction == null)
|
if (transaction == null) throw new ArgumentNullException(nameof(transaction));
|
||||||
{
|
|
||||||
throw new ArgumentNullException(nameof(transaction), $"A {typeof(DbTransaction).FullName} is required as a pre-requisite to save the event.");
|
|
||||||
}
|
|
||||||
|
|
||||||
var eventLogEntry = new IntegrationEventLogEntry(@event);
|
var eventLogEntry = new IntegrationEventLogEntry(@event, transaction.TransactionId);
|
||||||
|
|
||||||
_integrationEventLogContext.Database.UseTransaction(transaction);
|
_integrationEventLogContext.Database.UseTransaction(transaction.GetDbTransaction());
|
||||||
_integrationEventLogContext.IntegrationEventLogs.Add(eventLogEntry);
|
_integrationEventLogContext.IntegrationEventLogs.Add(eventLogEntry);
|
||||||
|
|
||||||
return _integrationEventLogContext.SaveChangesAsync();
|
return _integrationEventLogContext.SaveChangesAsync();
|
||||||
|
@ -0,0 +1,50 @@
|
|||||||
|
// <auto-generated />
|
||||||
|
using System;
|
||||||
|
using Microsoft.EntityFrameworkCore;
|
||||||
|
using Microsoft.EntityFrameworkCore.Infrastructure;
|
||||||
|
using Microsoft.EntityFrameworkCore.Metadata;
|
||||||
|
using Microsoft.EntityFrameworkCore.Migrations;
|
||||||
|
using Microsoft.EntityFrameworkCore.Storage.ValueConversion;
|
||||||
|
using Microsoft.eShopOnContainers.BuildingBlocks.IntegrationEventLogEF;
|
||||||
|
|
||||||
|
namespace Catalog.API.Infrastructure.IntegrationEventMigrations
|
||||||
|
{
|
||||||
|
[DbContext(typeof(IntegrationEventLogContext))]
|
||||||
|
[Migration("20190507184807_AddTransactionId")]
|
||||||
|
partial class AddTransactionId
|
||||||
|
{
|
||||||
|
protected override void BuildTargetModel(ModelBuilder modelBuilder)
|
||||||
|
{
|
||||||
|
#pragma warning disable 612, 618
|
||||||
|
modelBuilder
|
||||||
|
.HasAnnotation("ProductVersion", "2.2.3-servicing-35854")
|
||||||
|
.HasAnnotation("Relational:MaxIdentifierLength", 128)
|
||||||
|
.HasAnnotation("SqlServer:ValueGenerationStrategy", SqlServerValueGenerationStrategy.IdentityColumn);
|
||||||
|
|
||||||
|
modelBuilder.Entity("Microsoft.eShopOnContainers.BuildingBlocks.IntegrationEventLogEF.IntegrationEventLogEntry", b =>
|
||||||
|
{
|
||||||
|
b.Property<Guid>("EventId")
|
||||||
|
.ValueGeneratedOnAdd();
|
||||||
|
|
||||||
|
b.Property<string>("Content")
|
||||||
|
.IsRequired();
|
||||||
|
|
||||||
|
b.Property<DateTime>("CreationTime");
|
||||||
|
|
||||||
|
b.Property<string>("EventTypeName")
|
||||||
|
.IsRequired();
|
||||||
|
|
||||||
|
b.Property<int>("State");
|
||||||
|
|
||||||
|
b.Property<int>("TimesSent");
|
||||||
|
|
||||||
|
b.Property<string>("TransactionId");
|
||||||
|
|
||||||
|
b.HasKey("EventId");
|
||||||
|
|
||||||
|
b.ToTable("IntegrationEventLog");
|
||||||
|
});
|
||||||
|
#pragma warning restore 612, 618
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,22 @@
|
|||||||
|
using Microsoft.EntityFrameworkCore.Migrations;
|
||||||
|
|
||||||
|
namespace Catalog.API.Infrastructure.IntegrationEventMigrations
|
||||||
|
{
|
||||||
|
public partial class AddTransactionId : Migration
|
||||||
|
{
|
||||||
|
protected override void Up(MigrationBuilder migrationBuilder)
|
||||||
|
{
|
||||||
|
migrationBuilder.AddColumn<string>(
|
||||||
|
name: "TransactionId",
|
||||||
|
table: "IntegrationEventLog",
|
||||||
|
nullable: true);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected override void Down(MigrationBuilder migrationBuilder)
|
||||||
|
{
|
||||||
|
migrationBuilder.DropColumn(
|
||||||
|
name: "TransactionId",
|
||||||
|
table: "IntegrationEventLog");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,18 @@
|
|||||||
|
using Microsoft.EntityFrameworkCore;
|
||||||
|
using Microsoft.EntityFrameworkCore.Design;
|
||||||
|
using Microsoft.eShopOnContainers.BuildingBlocks.IntegrationEventLogEF;
|
||||||
|
|
||||||
|
namespace Catalog.API.Infrastructure.IntegrationEventMigrations
|
||||||
|
{
|
||||||
|
public class IntegrationEventLogContextDesignTimeFactory : IDesignTimeDbContextFactory<IntegrationEventLogContext>
|
||||||
|
{
|
||||||
|
public IntegrationEventLogContext CreateDbContext(string[] args)
|
||||||
|
{
|
||||||
|
var optionsBuilder = new DbContextOptionsBuilder<IntegrationEventLogContext>();
|
||||||
|
|
||||||
|
optionsBuilder.UseSqlServer(".", options => options.MigrationsAssembly(GetType().Assembly.GetName().Name));
|
||||||
|
|
||||||
|
return new IntegrationEventLogContext(optionsBuilder.Options);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -1,8 +1,9 @@
|
|||||||
using System;
|
// <auto-generated />
|
||||||
|
using System;
|
||||||
using Microsoft.EntityFrameworkCore;
|
using Microsoft.EntityFrameworkCore;
|
||||||
using Microsoft.EntityFrameworkCore.Infrastructure;
|
using Microsoft.EntityFrameworkCore.Infrastructure;
|
||||||
using Microsoft.EntityFrameworkCore.Metadata;
|
using Microsoft.EntityFrameworkCore.Metadata;
|
||||||
using Microsoft.EntityFrameworkCore.Migrations;
|
using Microsoft.EntityFrameworkCore.Storage.ValueConversion;
|
||||||
using Microsoft.eShopOnContainers.BuildingBlocks.IntegrationEventLogEF;
|
using Microsoft.eShopOnContainers.BuildingBlocks.IntegrationEventLogEF;
|
||||||
|
|
||||||
namespace Catalog.API.Migrations
|
namespace Catalog.API.Migrations
|
||||||
@ -12,8 +13,10 @@ namespace Catalog.API.Migrations
|
|||||||
{
|
{
|
||||||
protected override void BuildModel(ModelBuilder modelBuilder)
|
protected override void BuildModel(ModelBuilder modelBuilder)
|
||||||
{
|
{
|
||||||
|
#pragma warning disable 612, 618
|
||||||
modelBuilder
|
modelBuilder
|
||||||
.HasAnnotation("ProductVersion", "1.1.1")
|
.HasAnnotation("ProductVersion", "2.2.3-servicing-35854")
|
||||||
|
.HasAnnotation("Relational:MaxIdentifierLength", 128)
|
||||||
.HasAnnotation("SqlServer:ValueGenerationStrategy", SqlServerValueGenerationStrategy.IdentityColumn);
|
.HasAnnotation("SqlServer:ValueGenerationStrategy", SqlServerValueGenerationStrategy.IdentityColumn);
|
||||||
|
|
||||||
modelBuilder.Entity("Microsoft.eShopOnContainers.BuildingBlocks.IntegrationEventLogEF.IntegrationEventLogEntry", b =>
|
modelBuilder.Entity("Microsoft.eShopOnContainers.BuildingBlocks.IntegrationEventLogEF.IntegrationEventLogEntry", b =>
|
||||||
@ -33,10 +36,13 @@ namespace Catalog.API.Migrations
|
|||||||
|
|
||||||
b.Property<int>("TimesSent");
|
b.Property<int>("TimesSent");
|
||||||
|
|
||||||
|
b.Property<string>("TransactionId");
|
||||||
|
|
||||||
b.HasKey("EventId");
|
b.HasKey("EventId");
|
||||||
|
|
||||||
b.ToTable("IntegrationEventLog");
|
b.ToTable("IntegrationEventLog");
|
||||||
});
|
});
|
||||||
|
#pragma warning restore 612, 618
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -62,7 +62,7 @@ namespace Catalog.API.IntegrationEvents
|
|||||||
{
|
{
|
||||||
// Achieving atomicity between original catalog database operation and the IntegrationEventLog thanks to a local transaction
|
// Achieving atomicity between original catalog database operation and the IntegrationEventLog thanks to a local transaction
|
||||||
await _catalogContext.SaveChangesAsync();
|
await _catalogContext.SaveChangesAsync();
|
||||||
await _eventLogService.SaveEventAsync(evt, _catalogContext.Database.CurrentTransaction.GetDbTransaction());
|
await _eventLogService.SaveEventAsync(evt, _catalogContext.Database.CurrentTransaction);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -42,6 +42,8 @@ namespace Ordering.API.Application.Behaviors
|
|||||||
|
|
||||||
await strategy.ExecuteAsync(async () =>
|
await strategy.ExecuteAsync(async () =>
|
||||||
{
|
{
|
||||||
|
Guid transactionId;
|
||||||
|
|
||||||
using (var transaction = await _dbContext.BeginTransactionAsync())
|
using (var transaction = await _dbContext.BeginTransactionAsync())
|
||||||
using (LogContext.PushProperty("TransactionContext", transaction.TransactionId))
|
using (LogContext.PushProperty("TransactionContext", transaction.TransactionId))
|
||||||
{
|
{
|
||||||
@ -52,9 +54,11 @@ namespace Ordering.API.Application.Behaviors
|
|||||||
_logger.LogInformation("----- Commit transaction {TransactionId} for {CommandName}", transaction.TransactionId, typeName);
|
_logger.LogInformation("----- Commit transaction {TransactionId} for {CommandName}", transaction.TransactionId, typeName);
|
||||||
|
|
||||||
await _dbContext.CommitTransactionAsync(transaction);
|
await _dbContext.CommitTransactionAsync(transaction);
|
||||||
|
|
||||||
|
transactionId = transaction.TransactionId;
|
||||||
}
|
}
|
||||||
|
|
||||||
await _orderingIntegrationEventService.PublishEventsThroughEventBusAsync();
|
await _orderingIntegrationEventService.PublishEventsThroughEventBusAsync(transactionId);
|
||||||
});
|
});
|
||||||
|
|
||||||
return response;
|
return response;
|
||||||
|
@ -1,11 +1,12 @@
|
|||||||
using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Events;
|
using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Events;
|
||||||
|
using System;
|
||||||
using System.Threading.Tasks;
|
using System.Threading.Tasks;
|
||||||
|
|
||||||
namespace Ordering.API.Application.IntegrationEvents
|
namespace Ordering.API.Application.IntegrationEvents
|
||||||
{
|
{
|
||||||
public interface IOrderingIntegrationEventService
|
public interface IOrderingIntegrationEventService
|
||||||
{
|
{
|
||||||
Task PublishEventsThroughEventBusAsync();
|
Task PublishEventsThroughEventBusAsync(Guid transactionId);
|
||||||
Task AddAndSaveEventAsync(IntegrationEvent evt);
|
Task AddAndSaveEventAsync(IntegrationEvent evt);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -39,9 +39,9 @@ namespace Ordering.API.Application.IntegrationEvents
|
|||||||
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
|
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
|
||||||
}
|
}
|
||||||
|
|
||||||
public async Task PublishEventsThroughEventBusAsync()
|
public async Task PublishEventsThroughEventBusAsync(Guid transactionId)
|
||||||
{
|
{
|
||||||
var pendingLogEvents = await _eventLogService.RetrieveEventLogsPendingToPublishAsync();
|
var pendingLogEvents = await _eventLogService.RetrieveEventLogsPendingToPublishAsync(transactionId);
|
||||||
|
|
||||||
foreach (var logEvt in pendingLogEvents)
|
foreach (var logEvt in pendingLogEvents)
|
||||||
{
|
{
|
||||||
@ -66,7 +66,7 @@ namespace Ordering.API.Application.IntegrationEvents
|
|||||||
{
|
{
|
||||||
_logger.LogInformation("----- Enqueuing integration event {IntegrationEventId} to repository ({@IntegrationEvent})", evt.Id, evt);
|
_logger.LogInformation("----- Enqueuing integration event {IntegrationEventId} to repository ({@IntegrationEvent})", evt.Id, evt);
|
||||||
|
|
||||||
await _eventLogService.SaveEventAsync(evt, _orderingContext.GetCurrentTransaction.GetDbTransaction());
|
await _eventLogService.SaveEventAsync(evt, _orderingContext.GetCurrentTransaction());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -0,0 +1,50 @@
|
|||||||
|
// <auto-generated />
|
||||||
|
using System;
|
||||||
|
using Microsoft.EntityFrameworkCore;
|
||||||
|
using Microsoft.EntityFrameworkCore.Infrastructure;
|
||||||
|
using Microsoft.EntityFrameworkCore.Metadata;
|
||||||
|
using Microsoft.EntityFrameworkCore.Migrations;
|
||||||
|
using Microsoft.EntityFrameworkCore.Storage.ValueConversion;
|
||||||
|
using Microsoft.eShopOnContainers.BuildingBlocks.IntegrationEventLogEF;
|
||||||
|
|
||||||
|
namespace Ordering.API.Infrastructure.IntegrationEventMigrations
|
||||||
|
{
|
||||||
|
[DbContext(typeof(IntegrationEventLogContext))]
|
||||||
|
[Migration("20190507185219_AddTransactionId")]
|
||||||
|
partial class AddTransactionId
|
||||||
|
{
|
||||||
|
protected override void BuildTargetModel(ModelBuilder modelBuilder)
|
||||||
|
{
|
||||||
|
#pragma warning disable 612, 618
|
||||||
|
modelBuilder
|
||||||
|
.HasAnnotation("ProductVersion", "2.2.3-servicing-35854")
|
||||||
|
.HasAnnotation("Relational:MaxIdentifierLength", 128)
|
||||||
|
.HasAnnotation("SqlServer:ValueGenerationStrategy", SqlServerValueGenerationStrategy.IdentityColumn);
|
||||||
|
|
||||||
|
modelBuilder.Entity("Microsoft.eShopOnContainers.BuildingBlocks.IntegrationEventLogEF.IntegrationEventLogEntry", b =>
|
||||||
|
{
|
||||||
|
b.Property<Guid>("EventId")
|
||||||
|
.ValueGeneratedOnAdd();
|
||||||
|
|
||||||
|
b.Property<string>("Content")
|
||||||
|
.IsRequired();
|
||||||
|
|
||||||
|
b.Property<DateTime>("CreationTime");
|
||||||
|
|
||||||
|
b.Property<string>("EventTypeName")
|
||||||
|
.IsRequired();
|
||||||
|
|
||||||
|
b.Property<int>("State");
|
||||||
|
|
||||||
|
b.Property<int>("TimesSent");
|
||||||
|
|
||||||
|
b.Property<string>("TransactionId");
|
||||||
|
|
||||||
|
b.HasKey("EventId");
|
||||||
|
|
||||||
|
b.ToTable("IntegrationEventLog");
|
||||||
|
});
|
||||||
|
#pragma warning restore 612, 618
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,22 @@
|
|||||||
|
using Microsoft.EntityFrameworkCore.Migrations;
|
||||||
|
|
||||||
|
namespace Ordering.API.Infrastructure.IntegrationEventMigrations
|
||||||
|
{
|
||||||
|
public partial class AddTransactionId : Migration
|
||||||
|
{
|
||||||
|
protected override void Up(MigrationBuilder migrationBuilder)
|
||||||
|
{
|
||||||
|
migrationBuilder.AddColumn<string>(
|
||||||
|
name: "TransactionId",
|
||||||
|
table: "IntegrationEventLog",
|
||||||
|
nullable: true);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected override void Down(MigrationBuilder migrationBuilder)
|
||||||
|
{
|
||||||
|
migrationBuilder.DropColumn(
|
||||||
|
name: "TransactionId",
|
||||||
|
table: "IntegrationEventLog");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,18 @@
|
|||||||
|
using Microsoft.EntityFrameworkCore;
|
||||||
|
using Microsoft.EntityFrameworkCore.Design;
|
||||||
|
using Microsoft.eShopOnContainers.BuildingBlocks.IntegrationEventLogEF;
|
||||||
|
|
||||||
|
namespace Catalog.API.Infrastructure.IntegrationEventMigrations
|
||||||
|
{
|
||||||
|
public class IntegrationEventLogContextDesignTimeFactory : IDesignTimeDbContextFactory<IntegrationEventLogContext>
|
||||||
|
{
|
||||||
|
public IntegrationEventLogContext CreateDbContext(string[] args)
|
||||||
|
{
|
||||||
|
var optionsBuilder = new DbContextOptionsBuilder<IntegrationEventLogContext>();
|
||||||
|
|
||||||
|
optionsBuilder.UseSqlServer(".", options => options.MigrationsAssembly(GetType().Assembly.GetName().Name));
|
||||||
|
|
||||||
|
return new IntegrationEventLogContext(optionsBuilder.Options);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -1,8 +1,9 @@
|
|||||||
using System;
|
// <auto-generated />
|
||||||
|
using System;
|
||||||
using Microsoft.EntityFrameworkCore;
|
using Microsoft.EntityFrameworkCore;
|
||||||
using Microsoft.EntityFrameworkCore.Infrastructure;
|
using Microsoft.EntityFrameworkCore.Infrastructure;
|
||||||
using Microsoft.EntityFrameworkCore.Metadata;
|
using Microsoft.EntityFrameworkCore.Metadata;
|
||||||
using Microsoft.EntityFrameworkCore.Migrations;
|
using Microsoft.EntityFrameworkCore.Storage.ValueConversion;
|
||||||
using Microsoft.eShopOnContainers.BuildingBlocks.IntegrationEventLogEF;
|
using Microsoft.eShopOnContainers.BuildingBlocks.IntegrationEventLogEF;
|
||||||
|
|
||||||
namespace Ordering.API.Infrastructure.IntegrationEventMigrations
|
namespace Ordering.API.Infrastructure.IntegrationEventMigrations
|
||||||
@ -12,8 +13,10 @@ namespace Ordering.API.Infrastructure.IntegrationEventMigrations
|
|||||||
{
|
{
|
||||||
protected override void BuildModel(ModelBuilder modelBuilder)
|
protected override void BuildModel(ModelBuilder modelBuilder)
|
||||||
{
|
{
|
||||||
|
#pragma warning disable 612, 618
|
||||||
modelBuilder
|
modelBuilder
|
||||||
.HasAnnotation("ProductVersion", "1.1.1")
|
.HasAnnotation("ProductVersion", "2.2.3-servicing-35854")
|
||||||
|
.HasAnnotation("Relational:MaxIdentifierLength", 128)
|
||||||
.HasAnnotation("SqlServer:ValueGenerationStrategy", SqlServerValueGenerationStrategy.IdentityColumn);
|
.HasAnnotation("SqlServer:ValueGenerationStrategy", SqlServerValueGenerationStrategy.IdentityColumn);
|
||||||
|
|
||||||
modelBuilder.Entity("Microsoft.eShopOnContainers.BuildingBlocks.IntegrationEventLogEF.IntegrationEventLogEntry", b =>
|
modelBuilder.Entity("Microsoft.eShopOnContainers.BuildingBlocks.IntegrationEventLogEF.IntegrationEventLogEntry", b =>
|
||||||
@ -33,10 +36,13 @@ namespace Ordering.API.Infrastructure.IntegrationEventMigrations
|
|||||||
|
|
||||||
b.Property<int>("TimesSent");
|
b.Property<int>("TimesSent");
|
||||||
|
|
||||||
|
b.Property<string>("TransactionId");
|
||||||
|
|
||||||
b.HasKey("EventId");
|
b.HasKey("EventId");
|
||||||
|
|
||||||
b.ToTable("IntegrationEventLog");
|
b.ToTable("IntegrationEventLog");
|
||||||
});
|
});
|
||||||
|
#pragma warning restore 612, 618
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -29,7 +29,7 @@ namespace Microsoft.eShopOnContainers.Services.Ordering.Infrastructure
|
|||||||
|
|
||||||
private OrderingContext(DbContextOptions<OrderingContext> options) : base(options) { }
|
private OrderingContext(DbContextOptions<OrderingContext> options) : base(options) { }
|
||||||
|
|
||||||
public IDbContextTransaction GetCurrentTransaction => _currentTransaction;
|
public IDbContextTransaction GetCurrentTransaction() => _currentTransaction;
|
||||||
|
|
||||||
public bool HasActiveTransaction => _currentTransaction != null;
|
public bool HasActiveTransaction => _currentTransaction != null;
|
||||||
|
|
||||||
|
@ -23,6 +23,8 @@
|
|||||||
_eventBus = eventBus;
|
_eventBus = eventBus;
|
||||||
_settings = settings.Value;
|
_settings = settings.Value;
|
||||||
_logger = logger ?? throw new System.ArgumentNullException(nameof(logger));
|
_logger = logger ?? throw new System.ArgumentNullException(nameof(logger));
|
||||||
|
|
||||||
|
_logger.LogTrace("PaymentSettings: {@PaymentSettings}", _settings);
|
||||||
}
|
}
|
||||||
|
|
||||||
public async Task Handle(OrderStatusChangedToStockConfirmedIntegrationEvent @event)
|
public async Task Handle(OrderStatusChangedToStockConfirmedIntegrationEvent @event)
|
||||||
|
Loading…
x
Reference in New Issue
Block a user