Refactoring IntegrationEventLog service
This commit is contained in:
parent
24bed0aa33
commit
e3f8ac6e84
@ -5,6 +5,7 @@ using Newtonsoft.Json;
|
|||||||
using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Events;
|
using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Events;
|
||||||
using System.Linq;
|
using System.Linq;
|
||||||
using System.ComponentModel.DataAnnotations.Schema;
|
using System.ComponentModel.DataAnnotations.Schema;
|
||||||
|
using System.Reflection;
|
||||||
|
|
||||||
namespace Microsoft.eShopOnContainers.BuildingBlocks.IntegrationEventLogEF
|
namespace Microsoft.eShopOnContainers.BuildingBlocks.IntegrationEventLogEF
|
||||||
{
|
{
|
||||||
@ -31,9 +32,10 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.IntegrationEventLogEF
|
|||||||
public DateTime CreationTime { get; private set; }
|
public DateTime CreationTime { get; private set; }
|
||||||
public string Content { get; private set; }
|
public string Content { get; private set; }
|
||||||
|
|
||||||
public void DeserializeJsonContent(Type type)
|
public IntegrationEventLogEntry DeserializeJsonContent(Type type)
|
||||||
{
|
{
|
||||||
IntegrationEvent = JsonConvert.DeserializeObject(Content, type) as IntegrationEvent;
|
IntegrationEvent = JsonConvert.DeserializeObject(Content, type) as IntegrationEvent;
|
||||||
|
return this;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -8,39 +8,39 @@ using System.Collections;
|
|||||||
using System.Collections.Generic;
|
using System.Collections.Generic;
|
||||||
using System.Data.Common;
|
using System.Data.Common;
|
||||||
using System.Linq;
|
using System.Linq;
|
||||||
|
using System.Reflection;
|
||||||
using System.Threading.Tasks;
|
using System.Threading.Tasks;
|
||||||
|
|
||||||
namespace Microsoft.eShopOnContainers.BuildingBlocks.IntegrationEventLogEF.Services
|
namespace Microsoft.eShopOnContainers.BuildingBlocks.IntegrationEventLogEF.Services
|
||||||
{
|
{
|
||||||
public class IntegrationEventLogService : IIntegrationEventLogService
|
public class IntegrationEventLogService : IIntegrationEventLogService
|
||||||
{
|
{
|
||||||
private readonly IEventBusSubscriptionsManager _subsManager;
|
|
||||||
private readonly IntegrationEventLogContext _integrationEventLogContext;
|
private readonly IntegrationEventLogContext _integrationEventLogContext;
|
||||||
private readonly DbConnection _dbConnection;
|
private readonly DbConnection _dbConnection;
|
||||||
|
private readonly List<Type> _eventTypes;
|
||||||
|
|
||||||
public IntegrationEventLogService(IEventBusSubscriptionsManager subsManager,
|
public IntegrationEventLogService(DbConnection dbConnection)
|
||||||
DbConnection dbConnection)
|
|
||||||
{
|
{
|
||||||
_dbConnection = dbConnection ?? throw new ArgumentNullException(nameof(dbConnection));
|
_dbConnection = dbConnection ?? throw new ArgumentNullException(nameof(dbConnection));
|
||||||
_subsManager = subsManager ?? throw new ArgumentNullException(nameof(subsManager));
|
|
||||||
_integrationEventLogContext = new IntegrationEventLogContext(
|
_integrationEventLogContext = new IntegrationEventLogContext(
|
||||||
new DbContextOptionsBuilder<IntegrationEventLogContext>()
|
new DbContextOptionsBuilder<IntegrationEventLogContext>()
|
||||||
.UseSqlServer(_dbConnection)
|
.UseSqlServer(_dbConnection)
|
||||||
.ConfigureWarnings(warnings => warnings.Throw(RelationalEventId.QueryClientEvaluationWarning))
|
.ConfigureWarnings(warnings => warnings.Throw(RelationalEventId.QueryClientEvaluationWarning))
|
||||||
.Options);
|
.Options);
|
||||||
|
|
||||||
|
_eventTypes = Assembly.Load(Assembly.GetEntryAssembly().FullName)
|
||||||
|
.GetTypes()
|
||||||
|
.Where(t => t.Name.EndsWith(nameof(IntegrationEvent)))
|
||||||
|
.ToList();
|
||||||
}
|
}
|
||||||
|
|
||||||
public async Task<IEnumerable<IntegrationEventLogEntry>> RetrieveEventLogsPendingToPublishAsync()
|
public async Task<IEnumerable<IntegrationEventLogEntry>> RetrieveEventLogsPendingToPublishAsync()
|
||||||
{
|
{
|
||||||
var eventLogsPendingToPublish = await _integrationEventLogContext.IntegrationEventLogs
|
return await _integrationEventLogContext.IntegrationEventLogs
|
||||||
.Where(e => e.State == EventStateEnum.NotPublished)
|
.Where(e => e.State == EventStateEnum.NotPublished)
|
||||||
.OrderBy(o => o.CreationTime)
|
.OrderBy(o => o.CreationTime)
|
||||||
.ToListAsync();
|
.Select(e => e.DeserializeJsonContent(_eventTypes.Find(t=> t.Name == e.EventTypeShortName)))
|
||||||
|
.ToListAsync();
|
||||||
eventLogsPendingToPublish.ForEach(evtLog =>
|
|
||||||
evtLog.DeserializeJsonContent(_subsManager.GetEventTypeByName(evtLog.EventTypeShortName)));
|
|
||||||
|
|
||||||
return eventLogsPendingToPublish;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public Task SaveEventAsync(IntegrationEvent @event, DbTransaction transaction)
|
public Task SaveEventAsync(IntegrationEvent @event, DbTransaction transaction)
|
||||||
|
@ -232,11 +232,7 @@ namespace Microsoft.eShopOnContainers.Services.Catalog.API
|
|||||||
public static IServiceCollection AddIntegrationServices(this IServiceCollection services, IConfiguration configuration)
|
public static IServiceCollection AddIntegrationServices(this IServiceCollection services, IConfiguration configuration)
|
||||||
{
|
{
|
||||||
services.AddTransient<Func<DbConnection, IIntegrationEventLogService>>(
|
services.AddTransient<Func<DbConnection, IIntegrationEventLogService>>(
|
||||||
sp =>
|
sp => (DbConnection c) => new IntegrationEventLogService(c));
|
||||||
{
|
|
||||||
var busMgr = sp.GetRequiredService<IEventBusSubscriptionsManager>();
|
|
||||||
return (DbConnection c) => new IntegrationEventLogService(busMgr, c);
|
|
||||||
});
|
|
||||||
|
|
||||||
services.AddTransient<ICatalogIntegrationEventService, CatalogIntegrationEventService>();
|
services.AddTransient<ICatalogIntegrationEventService, CatalogIntegrationEventService>();
|
||||||
|
|
||||||
|
@ -113,14 +113,7 @@
|
|||||||
eventBus.Subscribe<OrderStockConfirmedIntegrationEvent, IIntegrationEventHandler<OrderStockConfirmedIntegrationEvent>>();
|
eventBus.Subscribe<OrderStockConfirmedIntegrationEvent, IIntegrationEventHandler<OrderStockConfirmedIntegrationEvent>>();
|
||||||
eventBus.Subscribe<OrderStockRejectedIntegrationEvent, IIntegrationEventHandler<OrderStockRejectedIntegrationEvent>>();
|
eventBus.Subscribe<OrderStockRejectedIntegrationEvent, IIntegrationEventHandler<OrderStockRejectedIntegrationEvent>>();
|
||||||
eventBus.Subscribe<OrderPaymentFailedIntegrationEvent, IIntegrationEventHandler<OrderPaymentFailedIntegrationEvent>>();
|
eventBus.Subscribe<OrderPaymentFailedIntegrationEvent, IIntegrationEventHandler<OrderPaymentFailedIntegrationEvent>>();
|
||||||
eventBus.Subscribe<OrderPaymentSuccededIntegrationEvent, IIntegrationEventHandler<OrderPaymentSuccededIntegrationEvent>>();
|
eventBus.Subscribe<OrderPaymentSuccededIntegrationEvent, IIntegrationEventHandler<OrderPaymentSuccededIntegrationEvent>>();
|
||||||
eventBus.Subscribe<OrderStartedIntegrationEvent, IIntegrationEventHandler<OrderStartedIntegrationEvent>>();
|
|
||||||
eventBus.Subscribe<OrderStatusChangedToAwaitingValidationIntegrationEvent, IIntegrationEventHandler<OrderStatusChangedToAwaitingValidationIntegrationEvent>>();
|
|
||||||
eventBus.Subscribe<OrderStatusChangedToCancelledIntegrationEvent, IIntegrationEventHandler<OrderStatusChangedToCancelledIntegrationEvent>>();
|
|
||||||
eventBus.Subscribe<OrderStatusChangedToPaidIntegrationEvent, IIntegrationEventHandler<OrderStatusChangedToPaidIntegrationEvent>>();
|
|
||||||
eventBus.Subscribe<OrderStatusChangedToShippedIntegrationEvent, IIntegrationEventHandler<OrderStatusChangedToShippedIntegrationEvent>>();
|
|
||||||
eventBus.Subscribe<OrderStatusChangedToSubmittedIntegrationEvent, IIntegrationEventHandler<OrderStatusChangedToSubmittedIntegrationEvent>>();
|
|
||||||
eventBus.Subscribe<OrderStatusChangedToStockConfirmedIntegrationEvent, IIntegrationEventHandler<OrderStatusChangedToStockConfirmedIntegrationEvent>>();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@ -258,11 +251,7 @@
|
|||||||
services.AddSingleton<IHttpContextAccessor, HttpContextAccessor>();
|
services.AddSingleton<IHttpContextAccessor, HttpContextAccessor>();
|
||||||
services.AddTransient<IIdentityService, IdentityService>();
|
services.AddTransient<IIdentityService, IdentityService>();
|
||||||
services.AddTransient<Func<DbConnection, IIntegrationEventLogService>>(
|
services.AddTransient<Func<DbConnection, IIntegrationEventLogService>>(
|
||||||
sp =>
|
sp => (DbConnection c) => new IntegrationEventLogService(c));
|
||||||
{
|
|
||||||
var busMgr = sp.GetRequiredService<IEventBusSubscriptionsManager>();
|
|
||||||
return (DbConnection c) => new IntegrationEventLogService(busMgr, c);
|
|
||||||
});
|
|
||||||
|
|
||||||
services.AddTransient<IOrderingIntegrationEventService, OrderingIntegrationEventService>();
|
services.AddTransient<IOrderingIntegrationEventService, OrderingIntegrationEventService>();
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user