@ -1,10 +1,10 @@ | |||||
namespace Microsoft.eShopOnContainers.BuildingBlocks.IntegrationEventLogEF | |||||
namespace Microsoft.eShopOnContainers.BuildingBlocks.IntegrationEventLogEF; | |||||
public enum EventStateEnum | |||||
{ | { | ||||
public enum EventStateEnum | |||||
{ | |||||
NotPublished = 0, | |||||
InProgress = 1, | |||||
Published = 2, | |||||
PublishedFailed = 3 | |||||
} | |||||
NotPublished = 0, | |||||
InProgress = 1, | |||||
Published = 2, | |||||
PublishedFailed = 3 | |||||
} | } | ||||
@ -1,45 +1,41 @@ | |||||
using Microsoft.EntityFrameworkCore; | |||||
using Microsoft.EntityFrameworkCore.Metadata.Builders; | |||||
namespace Microsoft.eShopOnContainers.BuildingBlocks.IntegrationEventLogEF; | |||||
namespace Microsoft.eShopOnContainers.BuildingBlocks.IntegrationEventLogEF | |||||
public class IntegrationEventLogContext : DbContext | |||||
{ | { | ||||
public class IntegrationEventLogContext : DbContext | |||||
public IntegrationEventLogContext(DbContextOptions<IntegrationEventLogContext> options) : base(options) | |||||
{ | { | ||||
public IntegrationEventLogContext(DbContextOptions<IntegrationEventLogContext> options) : base(options) | |||||
{ | |||||
} | |||||
} | |||||
public DbSet<IntegrationEventLogEntry> IntegrationEventLogs { get; set; } | |||||
public DbSet<IntegrationEventLogEntry> IntegrationEventLogs { get; set; } | |||||
protected override void OnModelCreating(ModelBuilder builder) | |||||
{ | |||||
builder.Entity<IntegrationEventLogEntry>(ConfigureIntegrationEventLogEntry); | |||||
} | |||||
protected override void OnModelCreating(ModelBuilder builder) | |||||
{ | |||||
builder.Entity<IntegrationEventLogEntry>(ConfigureIntegrationEventLogEntry); | |||||
} | |||||
void ConfigureIntegrationEventLogEntry(EntityTypeBuilder<IntegrationEventLogEntry> builder) | |||||
{ | |||||
builder.ToTable("IntegrationEventLog"); | |||||
void ConfigureIntegrationEventLogEntry(EntityTypeBuilder<IntegrationEventLogEntry> builder) | |||||
{ | |||||
builder.ToTable("IntegrationEventLog"); | |||||
builder.HasKey(e => e.EventId); | |||||
builder.HasKey(e => e.EventId); | |||||
builder.Property(e => e.EventId) | |||||
.IsRequired(); | |||||
builder.Property(e => e.EventId) | |||||
.IsRequired(); | |||||
builder.Property(e => e.Content) | |||||
.IsRequired(); | |||||
builder.Property(e => e.Content) | |||||
.IsRequired(); | |||||
builder.Property(e => e.CreationTime) | |||||
.IsRequired(); | |||||
builder.Property(e => e.CreationTime) | |||||
.IsRequired(); | |||||
builder.Property(e => e.State) | |||||
.IsRequired(); | |||||
builder.Property(e => e.State) | |||||
.IsRequired(); | |||||
builder.Property(e => e.TimesSent) | |||||
.IsRequired(); | |||||
builder.Property(e => e.TimesSent) | |||||
.IsRequired(); | |||||
builder.Property(e => e.EventTypeName) | |||||
.IsRequired(); | |||||
builder.Property(e => e.EventTypeName) | |||||
.IsRequired(); | |||||
} | |||||
} | } | ||||
} | } |
@ -1,43 +1,36 @@ | |||||
using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Events; | |||||
using System; | |||||
using System.Text.Json; | |||||
using System.ComponentModel.DataAnnotations.Schema; | |||||
using System.Linq; | |||||
namespace Microsoft.eShopOnContainers.BuildingBlocks.IntegrationEventLogEF; | |||||
namespace Microsoft.eShopOnContainers.BuildingBlocks.IntegrationEventLogEF | |||||
public class IntegrationEventLogEntry | |||||
{ | { | ||||
public class IntegrationEventLogEntry | |||||
private IntegrationEventLogEntry() { } | |||||
public IntegrationEventLogEntry(IntegrationEvent @event, Guid transactionId) | |||||
{ | { | ||||
private IntegrationEventLogEntry() { } | |||||
public IntegrationEventLogEntry(IntegrationEvent @event, Guid transactionId) | |||||
EventId = @event.Id; | |||||
CreationTime = @event.CreationDate; | |||||
EventTypeName = @event.GetType().FullName; | |||||
Content = JsonSerializer.Serialize(@event, @event.GetType(), new JsonSerializerOptions | |||||
{ | { | ||||
EventId = @event.Id; | |||||
CreationTime = @event.CreationDate; | |||||
EventTypeName = @event.GetType().FullName; | |||||
Content = JsonSerializer.Serialize(@event, @event.GetType(), new JsonSerializerOptions | |||||
{ | |||||
WriteIndented = true | |||||
}); | |||||
State = EventStateEnum.NotPublished; | |||||
TimesSent = 0; | |||||
TransactionId = transactionId.ToString(); | |||||
} | |||||
public Guid EventId { get; private set; } | |||||
public string EventTypeName { get; private set; } | |||||
[NotMapped] | |||||
public string EventTypeShortName => EventTypeName.Split('.')?.Last(); | |||||
[NotMapped] | |||||
public IntegrationEvent IntegrationEvent { get; private set; } | |||||
public EventStateEnum State { get; set; } | |||||
public int TimesSent { get; set; } | |||||
public DateTime CreationTime { get; private set; } | |||||
public string Content { get; private set; } | |||||
public string TransactionId { get; private set; } | |||||
WriteIndented = true | |||||
}); | |||||
State = EventStateEnum.NotPublished; | |||||
TimesSent = 0; | |||||
TransactionId = transactionId.ToString(); | |||||
} | |||||
public Guid EventId { get; private set; } | |||||
public string EventTypeName { get; private set; } | |||||
[NotMapped] | |||||
public string EventTypeShortName => EventTypeName.Split('.')?.Last(); | |||||
[NotMapped] | |||||
public IntegrationEvent IntegrationEvent { get; private set; } | |||||
public EventStateEnum State { get; set; } | |||||
public int TimesSent { get; set; } | |||||
public DateTime CreationTime { get; private set; } | |||||
public string Content { get; private set; } | |||||
public string TransactionId { get; private set; } | |||||
public IntegrationEventLogEntry DeserializeJsonContent(Type type) | |||||
{ | |||||
IntegrationEvent = JsonSerializer.Deserialize(Content, type, new JsonSerializerOptions() { PropertyNameCaseInsensitive = true }) as IntegrationEvent; | |||||
return this; | |||||
} | |||||
public IntegrationEventLogEntry DeserializeJsonContent(Type type) | |||||
{ | |||||
IntegrationEvent = JsonSerializer.Deserialize(Content, type, new JsonSerializerOptions() { PropertyNameCaseInsensitive = true }) as IntegrationEvent; | |||||
return this; | |||||
} | } | ||||
} | } |
@ -1,17 +1,10 @@ | |||||
using Microsoft.EntityFrameworkCore.Storage; | |||||
using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Events; | |||||
using System; | |||||
using System.Collections.Generic; | |||||
using System.Threading.Tasks; | |||||
namespace Microsoft.eShopOnContainers.BuildingBlocks.IntegrationEventLogEF.Services; | |||||
namespace Microsoft.eShopOnContainers.BuildingBlocks.IntegrationEventLogEF.Services | |||||
public interface IIntegrationEventLogService | |||||
{ | { | ||||
public interface IIntegrationEventLogService | |||||
{ | |||||
Task<IEnumerable<IntegrationEventLogEntry>> RetrieveEventLogsPendingToPublishAsync(Guid transactionId); | |||||
Task SaveEventAsync(IntegrationEvent @event, IDbContextTransaction transaction); | |||||
Task MarkEventAsPublishedAsync(Guid eventId); | |||||
Task MarkEventAsInProgressAsync(Guid eventId); | |||||
Task MarkEventAsFailedAsync(Guid eventId); | |||||
} | |||||
Task<IEnumerable<IntegrationEventLogEntry>> RetrieveEventLogsPendingToPublishAsync(Guid transactionId); | |||||
Task SaveEventAsync(IntegrationEvent @event, IDbContextTransaction transaction); | |||||
Task MarkEventAsPublishedAsync(Guid eventId); | |||||
Task MarkEventAsInProgressAsync(Guid eventId); | |||||
Task MarkEventAsFailedAsync(Guid eventId); | |||||
} | } |
@ -1,110 +1,99 @@ | |||||
using Microsoft.EntityFrameworkCore; | |||||
using Microsoft.EntityFrameworkCore.Storage; | |||||
using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Events; | |||||
using System; | |||||
using System.Collections.Generic; | |||||
using System.Data.Common; | |||||
using System.Linq; | |||||
using System.Reflection; | |||||
using System.Threading.Tasks; | |||||
namespace Microsoft.eShopOnContainers.BuildingBlocks.IntegrationEventLogEF.Services | |||||
{ | |||||
public class IntegrationEventLogService : IIntegrationEventLogService, IDisposable | |||||
{ | |||||
private readonly IntegrationEventLogContext _integrationEventLogContext; | |||||
private readonly DbConnection _dbConnection; | |||||
private readonly List<Type> _eventTypes; | |||||
private volatile bool disposedValue; | |||||
namespace Microsoft.eShopOnContainers.BuildingBlocks.IntegrationEventLogEF.Services; | |||||
public IntegrationEventLogService(DbConnection dbConnection) | |||||
{ | |||||
_dbConnection = dbConnection ?? throw new ArgumentNullException(nameof(dbConnection)); | |||||
_integrationEventLogContext = new IntegrationEventLogContext( | |||||
new DbContextOptionsBuilder<IntegrationEventLogContext>() | |||||
.UseSqlServer(_dbConnection) | |||||
.Options); | |||||
_eventTypes = Assembly.Load(Assembly.GetEntryAssembly().FullName) | |||||
.GetTypes() | |||||
.Where(t => t.Name.EndsWith(nameof(IntegrationEvent))) | |||||
.ToList(); | |||||
} | |||||
public class IntegrationEventLogService : IIntegrationEventLogService, IDisposable | |||||
{ | |||||
private readonly IntegrationEventLogContext _integrationEventLogContext; | |||||
private readonly DbConnection _dbConnection; | |||||
private readonly List<Type> _eventTypes; | |||||
private volatile bool disposedValue; | |||||
public async Task<IEnumerable<IntegrationEventLogEntry>> RetrieveEventLogsPendingToPublishAsync(Guid transactionId) | |||||
{ | |||||
var tid = transactionId.ToString(); | |||||
public IntegrationEventLogService(DbConnection dbConnection) | |||||
{ | |||||
_dbConnection = dbConnection ?? throw new ArgumentNullException(nameof(dbConnection)); | |||||
_integrationEventLogContext = new IntegrationEventLogContext( | |||||
new DbContextOptionsBuilder<IntegrationEventLogContext>() | |||||
.UseSqlServer(_dbConnection) | |||||
.Options); | |||||
_eventTypes = Assembly.Load(Assembly.GetEntryAssembly().FullName) | |||||
.GetTypes() | |||||
.Where(t => t.Name.EndsWith(nameof(IntegrationEvent))) | |||||
.ToList(); | |||||
} | |||||
var result = await _integrationEventLogContext.IntegrationEventLogs | |||||
.Where(e => e.TransactionId == tid && e.State == EventStateEnum.NotPublished).ToListAsync(); | |||||
public async Task<IEnumerable<IntegrationEventLogEntry>> RetrieveEventLogsPendingToPublishAsync(Guid transactionId) | |||||
{ | |||||
var tid = transactionId.ToString(); | |||||
if (result != null && result.Any()) | |||||
{ | |||||
return result.OrderBy(o => o.CreationTime) | |||||
.Select(e => e.DeserializeJsonContent(_eventTypes.Find(t => t.Name == e.EventTypeShortName))); | |||||
} | |||||
var result = await _integrationEventLogContext.IntegrationEventLogs | |||||
.Where(e => e.TransactionId == tid && e.State == EventStateEnum.NotPublished).ToListAsync(); | |||||
return new List<IntegrationEventLogEntry>(); | |||||
if (result != null && result.Any()) | |||||
{ | |||||
return result.OrderBy(o => o.CreationTime) | |||||
.Select(e => e.DeserializeJsonContent(_eventTypes.Find(t => t.Name == e.EventTypeShortName))); | |||||
} | } | ||||
public Task SaveEventAsync(IntegrationEvent @event, IDbContextTransaction transaction) | |||||
{ | |||||
if (transaction == null) throw new ArgumentNullException(nameof(transaction)); | |||||
return new List<IntegrationEventLogEntry>(); | |||||
} | |||||
var eventLogEntry = new IntegrationEventLogEntry(@event, transaction.TransactionId); | |||||
public Task SaveEventAsync(IntegrationEvent @event, IDbContextTransaction transaction) | |||||
{ | |||||
if (transaction == null) throw new ArgumentNullException(nameof(transaction)); | |||||
_integrationEventLogContext.Database.UseTransaction(transaction.GetDbTransaction()); | |||||
_integrationEventLogContext.IntegrationEventLogs.Add(eventLogEntry); | |||||
var eventLogEntry = new IntegrationEventLogEntry(@event, transaction.TransactionId); | |||||
return _integrationEventLogContext.SaveChangesAsync(); | |||||
} | |||||
_integrationEventLogContext.Database.UseTransaction(transaction.GetDbTransaction()); | |||||
_integrationEventLogContext.IntegrationEventLogs.Add(eventLogEntry); | |||||
public Task MarkEventAsPublishedAsync(Guid eventId) | |||||
{ | |||||
return UpdateEventStatus(eventId, EventStateEnum.Published); | |||||
} | |||||
return _integrationEventLogContext.SaveChangesAsync(); | |||||
} | |||||
public Task MarkEventAsInProgressAsync(Guid eventId) | |||||
{ | |||||
return UpdateEventStatus(eventId, EventStateEnum.InProgress); | |||||
} | |||||
public Task MarkEventAsPublishedAsync(Guid eventId) | |||||
{ | |||||
return UpdateEventStatus(eventId, EventStateEnum.Published); | |||||
} | |||||
public Task MarkEventAsFailedAsync(Guid eventId) | |||||
{ | |||||
return UpdateEventStatus(eventId, EventStateEnum.PublishedFailed); | |||||
} | |||||
public Task MarkEventAsInProgressAsync(Guid eventId) | |||||
{ | |||||
return UpdateEventStatus(eventId, EventStateEnum.InProgress); | |||||
} | |||||
private Task UpdateEventStatus(Guid eventId, EventStateEnum status) | |||||
{ | |||||
var eventLogEntry = _integrationEventLogContext.IntegrationEventLogs.Single(ie => ie.EventId == eventId); | |||||
eventLogEntry.State = status; | |||||
public Task MarkEventAsFailedAsync(Guid eventId) | |||||
{ | |||||
return UpdateEventStatus(eventId, EventStateEnum.PublishedFailed); | |||||
} | |||||
private Task UpdateEventStatus(Guid eventId, EventStateEnum status) | |||||
{ | |||||
var eventLogEntry = _integrationEventLogContext.IntegrationEventLogs.Single(ie => ie.EventId == eventId); | |||||
eventLogEntry.State = status; | |||||
if (status == EventStateEnum.InProgress) | |||||
eventLogEntry.TimesSent++; | |||||
if (status == EventStateEnum.InProgress) | |||||
eventLogEntry.TimesSent++; | |||||
_integrationEventLogContext.IntegrationEventLogs.Update(eventLogEntry); | |||||
_integrationEventLogContext.IntegrationEventLogs.Update(eventLogEntry); | |||||
return _integrationEventLogContext.SaveChangesAsync(); | |||||
} | |||||
return _integrationEventLogContext.SaveChangesAsync(); | |||||
} | |||||
protected virtual void Dispose(bool disposing) | |||||
protected virtual void Dispose(bool disposing) | |||||
{ | |||||
if (!disposedValue) | |||||
{ | { | ||||
if (!disposedValue) | |||||
if (disposing) | |||||
{ | { | ||||
if (disposing) | |||||
{ | |||||
_integrationEventLogContext?.Dispose(); | |||||
} | |||||
_integrationEventLogContext?.Dispose(); | |||||
} | |||||
disposedValue = true; | |||||
} | |||||
disposedValue = true; | |||||
} | } | ||||
} | |||||
public void Dispose() | |||||
{ | |||||
Dispose(disposing: true); | |||||
GC.SuppressFinalize(this); | |||||
} | |||||
public void Dispose() | |||||
{ | |||||
Dispose(disposing: true); | |||||
GC.SuppressFinalize(this); | |||||
} | } | ||||
} | } |
@ -1,31 +1,26 @@ | |||||
using Microsoft.EntityFrameworkCore; | |||||
using System; | |||||
using System.Threading.Tasks; | |||||
namespace Microsoft.eShopOnContainers.BuildingBlocks.IntegrationEventLogEF.Utilities; | |||||
namespace Microsoft.eShopOnContainers.BuildingBlocks.IntegrationEventLogEF.Utilities | |||||
public class ResilientTransaction | |||||
{ | { | ||||
public class ResilientTransaction | |||||
{ | |||||
private DbContext _context; | |||||
private ResilientTransaction(DbContext context) => | |||||
_context = context ?? throw new ArgumentNullException(nameof(context)); | |||||
private DbContext _context; | |||||
private ResilientTransaction(DbContext context) => | |||||
_context = context ?? throw new ArgumentNullException(nameof(context)); | |||||
public static ResilientTransaction New(DbContext context) => | |||||
new ResilientTransaction(context); | |||||
public static ResilientTransaction New(DbContext context) => | |||||
new ResilientTransaction(context); | |||||
public async Task ExecuteAsync(Func<Task> action) | |||||
public async Task ExecuteAsync(Func<Task> action) | |||||
{ | |||||
//Use of an EF Core resiliency strategy when using multiple DbContexts within an explicit BeginTransaction(): | |||||
//See: https://docs.microsoft.com/en-us/ef/core/miscellaneous/connection-resiliency | |||||
var strategy = _context.Database.CreateExecutionStrategy(); | |||||
await strategy.ExecuteAsync(async () => | |||||
{ | { | ||||
//Use of an EF Core resiliency strategy when using multiple DbContexts within an explicit BeginTransaction(): | |||||
//See: https://docs.microsoft.com/en-us/ef/core/miscellaneous/connection-resiliency | |||||
var strategy = _context.Database.CreateExecutionStrategy(); | |||||
await strategy.ExecuteAsync(async () => | |||||
using (var transaction = _context.Database.BeginTransaction()) | |||||
{ | { | ||||
using (var transaction = _context.Database.BeginTransaction()) | |||||
{ | |||||
await action(); | |||||
transaction.Commit(); | |||||
} | |||||
}); | |||||
} | |||||
await action(); | |||||
transaction.Commit(); | |||||
} | |||||
}); | |||||
} | } | ||||
} | } |