Fix Rabbitmq bug with multiple service instances
This commit is contained in:
parent
1545eb80bd
commit
804620330d
@ -32,11 +32,12 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ
|
|||||||
private string _queueName;
|
private string _queueName;
|
||||||
|
|
||||||
public EventBusRabbitMQ(IRabbitMQPersistentConnection persistentConnection, ILogger<EventBusRabbitMQ> logger,
|
public EventBusRabbitMQ(IRabbitMQPersistentConnection persistentConnection, ILogger<EventBusRabbitMQ> logger,
|
||||||
ILifetimeScope autofac, IEventBusSubscriptionsManager subsManager, int retryCount = 5)
|
ILifetimeScope autofac, IEventBusSubscriptionsManager subsManager, string queueName = null, int retryCount = 5)
|
||||||
{
|
{
|
||||||
_persistentConnection = persistentConnection ?? throw new ArgumentNullException(nameof(persistentConnection));
|
_persistentConnection = persistentConnection ?? throw new ArgumentNullException(nameof(persistentConnection));
|
||||||
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
|
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
|
||||||
_subsManager = subsManager ?? new InMemoryEventBusSubscriptionsManager();
|
_subsManager = subsManager ?? new InMemoryEventBusSubscriptionsManager();
|
||||||
|
_queueName = queueName;
|
||||||
_consumerChannel = CreateConsumerChannel();
|
_consumerChannel = CreateConsumerChannel();
|
||||||
_autofac = autofac;
|
_autofac = autofac;
|
||||||
_retryCount = retryCount;
|
_retryCount = retryCount;
|
||||||
@ -170,7 +171,12 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ
|
|||||||
channel.ExchangeDeclare(exchange: BROKER_NAME,
|
channel.ExchangeDeclare(exchange: BROKER_NAME,
|
||||||
type: "direct");
|
type: "direct");
|
||||||
|
|
||||||
_queueName = channel.QueueDeclare().QueueName;
|
channel.QueueDeclare(queue: _queueName,
|
||||||
|
durable: true,
|
||||||
|
exclusive: false,
|
||||||
|
autoDelete: false,
|
||||||
|
arguments: null);
|
||||||
|
|
||||||
|
|
||||||
var consumer = new EventingBasicConsumer(channel);
|
var consumer = new EventingBasicConsumer(channel);
|
||||||
consumer.Received += async (model, ea) =>
|
consumer.Received += async (model, ea) =>
|
||||||
|
@ -257,6 +257,8 @@ namespace Microsoft.eShopOnContainers.Services.Basket.API
|
|||||||
|
|
||||||
private void RegisterEventBus(IServiceCollection services)
|
private void RegisterEventBus(IServiceCollection services)
|
||||||
{
|
{
|
||||||
|
var subscriptionClientName = Configuration["SubscriptionClientName"];
|
||||||
|
|
||||||
if (Configuration.GetValue<bool>("AzureServiceBusEnabled"))
|
if (Configuration.GetValue<bool>("AzureServiceBusEnabled"))
|
||||||
{
|
{
|
||||||
services.AddSingleton<IEventBus, EventBusServiceBus>(sp =>
|
services.AddSingleton<IEventBus, EventBusServiceBus>(sp =>
|
||||||
@ -264,8 +266,7 @@ namespace Microsoft.eShopOnContainers.Services.Basket.API
|
|||||||
var serviceBusPersisterConnection = sp.GetRequiredService<IServiceBusPersisterConnection>();
|
var serviceBusPersisterConnection = sp.GetRequiredService<IServiceBusPersisterConnection>();
|
||||||
var iLifetimeScope = sp.GetRequiredService<ILifetimeScope>();
|
var iLifetimeScope = sp.GetRequiredService<ILifetimeScope>();
|
||||||
var logger = sp.GetRequiredService<ILogger<EventBusServiceBus>>();
|
var logger = sp.GetRequiredService<ILogger<EventBusServiceBus>>();
|
||||||
var eventBusSubcriptionsManager = sp.GetRequiredService<IEventBusSubscriptionsManager>();
|
var eventBusSubcriptionsManager = sp.GetRequiredService<IEventBusSubscriptionsManager>();
|
||||||
var subscriptionClientName = Configuration["SubscriptionClientName"];
|
|
||||||
|
|
||||||
return new EventBusServiceBus(serviceBusPersisterConnection, logger,
|
return new EventBusServiceBus(serviceBusPersisterConnection, logger,
|
||||||
eventBusSubcriptionsManager, subscriptionClientName, iLifetimeScope);
|
eventBusSubcriptionsManager, subscriptionClientName, iLifetimeScope);
|
||||||
@ -286,7 +287,7 @@ namespace Microsoft.eShopOnContainers.Services.Basket.API
|
|||||||
retryCount = int.Parse(Configuration["EventBusRetryCount"]);
|
retryCount = int.Parse(Configuration["EventBusRetryCount"]);
|
||||||
}
|
}
|
||||||
|
|
||||||
return new EventBusRabbitMQ(rabbitMQPersistentConnection, logger, iLifetimeScope, eventBusSubcriptionsManager, retryCount);
|
return new EventBusRabbitMQ(rabbitMQPersistentConnection, logger, iLifetimeScope, eventBusSubcriptionsManager, subscriptionClientName, retryCount);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -224,6 +224,8 @@
|
|||||||
|
|
||||||
private void RegisterEventBus(IServiceCollection services)
|
private void RegisterEventBus(IServiceCollection services)
|
||||||
{
|
{
|
||||||
|
var subscriptionClientName = Configuration["SubscriptionClientName"];
|
||||||
|
|
||||||
if (Configuration.GetValue<bool>("AzureServiceBusEnabled"))
|
if (Configuration.GetValue<bool>("AzureServiceBusEnabled"))
|
||||||
{
|
{
|
||||||
services.AddSingleton<IEventBus, EventBusServiceBus>(sp =>
|
services.AddSingleton<IEventBus, EventBusServiceBus>(sp =>
|
||||||
@ -232,7 +234,6 @@
|
|||||||
var iLifetimeScope = sp.GetRequiredService<ILifetimeScope>();
|
var iLifetimeScope = sp.GetRequiredService<ILifetimeScope>();
|
||||||
var logger = sp.GetRequiredService<ILogger<EventBusServiceBus>>();
|
var logger = sp.GetRequiredService<ILogger<EventBusServiceBus>>();
|
||||||
var eventBusSubcriptionsManager = sp.GetRequiredService<IEventBusSubscriptionsManager>();
|
var eventBusSubcriptionsManager = sp.GetRequiredService<IEventBusSubscriptionsManager>();
|
||||||
var subscriptionClientName = Configuration["SubscriptionClientName"];
|
|
||||||
|
|
||||||
return new EventBusServiceBus(serviceBusPersisterConnection, logger,
|
return new EventBusServiceBus(serviceBusPersisterConnection, logger,
|
||||||
eventBusSubcriptionsManager, subscriptionClientName, iLifetimeScope);
|
eventBusSubcriptionsManager, subscriptionClientName, iLifetimeScope);
|
||||||
@ -254,7 +255,7 @@
|
|||||||
retryCount = int.Parse(Configuration["EventBusRetryCount"]);
|
retryCount = int.Parse(Configuration["EventBusRetryCount"]);
|
||||||
}
|
}
|
||||||
|
|
||||||
return new EventBusRabbitMQ(rabbitMQPersistentConnection, logger, iLifetimeScope, eventBusSubcriptionsManager, retryCount);
|
return new EventBusRabbitMQ(rabbitMQPersistentConnection, logger, iLifetimeScope, eventBusSubcriptionsManager, subscriptionClientName, retryCount);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -227,6 +227,8 @@ namespace Microsoft.eShopOnContainers.Services.Locations.API
|
|||||||
|
|
||||||
private void RegisterEventBus(IServiceCollection services)
|
private void RegisterEventBus(IServiceCollection services)
|
||||||
{
|
{
|
||||||
|
var subscriptionClientName = Configuration["SubscriptionClientName"];
|
||||||
|
|
||||||
if (Configuration.GetValue<bool>("AzureServiceBusEnabled"))
|
if (Configuration.GetValue<bool>("AzureServiceBusEnabled"))
|
||||||
{
|
{
|
||||||
services.AddSingleton<IEventBus, EventBusServiceBus>(sp =>
|
services.AddSingleton<IEventBus, EventBusServiceBus>(sp =>
|
||||||
@ -235,7 +237,6 @@ namespace Microsoft.eShopOnContainers.Services.Locations.API
|
|||||||
var iLifetimeScope = sp.GetRequiredService<ILifetimeScope>();
|
var iLifetimeScope = sp.GetRequiredService<ILifetimeScope>();
|
||||||
var logger = sp.GetRequiredService<ILogger<EventBusServiceBus>>();
|
var logger = sp.GetRequiredService<ILogger<EventBusServiceBus>>();
|
||||||
var eventBusSubcriptionsManager = sp.GetRequiredService<IEventBusSubscriptionsManager>();
|
var eventBusSubcriptionsManager = sp.GetRequiredService<IEventBusSubscriptionsManager>();
|
||||||
var subscriptionClientName = Configuration["SubscriptionClientName"];
|
|
||||||
|
|
||||||
return new EventBusServiceBus(serviceBusPersisterConnection, logger,
|
return new EventBusServiceBus(serviceBusPersisterConnection, logger,
|
||||||
eventBusSubcriptionsManager, subscriptionClientName, iLifetimeScope);
|
eventBusSubcriptionsManager, subscriptionClientName, iLifetimeScope);
|
||||||
@ -256,7 +257,7 @@ namespace Microsoft.eShopOnContainers.Services.Locations.API
|
|||||||
retryCount = int.Parse(Configuration["EventBusRetryCount"]);
|
retryCount = int.Parse(Configuration["EventBusRetryCount"]);
|
||||||
}
|
}
|
||||||
|
|
||||||
return new EventBusRabbitMQ(rabbitMQPersistentConnection, logger, iLifetimeScope, eventBusSubcriptionsManager, retryCount);
|
return new EventBusRabbitMQ(rabbitMQPersistentConnection, logger, iLifetimeScope, eventBusSubcriptionsManager, subscriptionClientName, retryCount);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -246,8 +246,10 @@
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
private void RegisterEventBus(IServiceCollection services)
|
private void RegisterEventBus(IServiceCollection services)
|
||||||
{
|
{
|
||||||
|
var subscriptionClientName = Configuration["SubscriptionClientName"];
|
||||||
|
|
||||||
if (Configuration.GetValue<bool>("AzureServiceBusEnabled"))
|
if (Configuration.GetValue<bool>("AzureServiceBusEnabled"))
|
||||||
{
|
{
|
||||||
services.AddSingleton<IEventBus, EventBusServiceBus>(sp =>
|
services.AddSingleton<IEventBus, EventBusServiceBus>(sp =>
|
||||||
@ -255,8 +257,7 @@
|
|||||||
var serviceBusPersisterConnection = sp.GetRequiredService<IServiceBusPersisterConnection>();
|
var serviceBusPersisterConnection = sp.GetRequiredService<IServiceBusPersisterConnection>();
|
||||||
var iLifetimeScope = sp.GetRequiredService<ILifetimeScope>();
|
var iLifetimeScope = sp.GetRequiredService<ILifetimeScope>();
|
||||||
var logger = sp.GetRequiredService<ILogger<EventBusServiceBus>>();
|
var logger = sp.GetRequiredService<ILogger<EventBusServiceBus>>();
|
||||||
var eventBusSubcriptionsManager = sp.GetRequiredService<IEventBusSubscriptionsManager>();
|
var eventBusSubcriptionsManager = sp.GetRequiredService<IEventBusSubscriptionsManager>();
|
||||||
var subscriptionClientName = Configuration["SubscriptionClientName"];
|
|
||||||
|
|
||||||
return new EventBusServiceBus(serviceBusPersisterConnection, logger,
|
return new EventBusServiceBus(serviceBusPersisterConnection, logger,
|
||||||
eventBusSubcriptionsManager, subscriptionClientName, iLifetimeScope);
|
eventBusSubcriptionsManager, subscriptionClientName, iLifetimeScope);
|
||||||
@ -277,7 +278,7 @@
|
|||||||
retryCount = int.Parse(Configuration["EventBusRetryCount"]);
|
retryCount = int.Parse(Configuration["EventBusRetryCount"]);
|
||||||
}
|
}
|
||||||
|
|
||||||
return new EventBusRabbitMQ(rabbitMQPersistentConnection, logger, iLifetimeScope, eventBusSubcriptionsManager, retryCount);
|
return new EventBusRabbitMQ(rabbitMQPersistentConnection, logger, iLifetimeScope, eventBusSubcriptionsManager, subscriptionClientName, retryCount);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -294,6 +294,8 @@
|
|||||||
|
|
||||||
private void RegisterEventBus(IServiceCollection services)
|
private void RegisterEventBus(IServiceCollection services)
|
||||||
{
|
{
|
||||||
|
var subscriptionClientName = Configuration["SubscriptionClientName"];
|
||||||
|
|
||||||
if (Configuration.GetValue<bool>("AzureServiceBusEnabled"))
|
if (Configuration.GetValue<bool>("AzureServiceBusEnabled"))
|
||||||
{
|
{
|
||||||
services.AddSingleton<IEventBus, EventBusServiceBus>(sp =>
|
services.AddSingleton<IEventBus, EventBusServiceBus>(sp =>
|
||||||
@ -301,8 +303,7 @@
|
|||||||
var serviceBusPersisterConnection = sp.GetRequiredService<IServiceBusPersisterConnection>();
|
var serviceBusPersisterConnection = sp.GetRequiredService<IServiceBusPersisterConnection>();
|
||||||
var iLifetimeScope = sp.GetRequiredService<ILifetimeScope>();
|
var iLifetimeScope = sp.GetRequiredService<ILifetimeScope>();
|
||||||
var logger = sp.GetRequiredService<ILogger<EventBusServiceBus>>();
|
var logger = sp.GetRequiredService<ILogger<EventBusServiceBus>>();
|
||||||
var eventBusSubcriptionsManager = sp.GetRequiredService<IEventBusSubscriptionsManager>();
|
var eventBusSubcriptionsManager = sp.GetRequiredService<IEventBusSubscriptionsManager>();
|
||||||
var subscriptionClientName = Configuration["SubscriptionClientName"];
|
|
||||||
|
|
||||||
return new EventBusServiceBus(serviceBusPersisterConnection, logger,
|
return new EventBusServiceBus(serviceBusPersisterConnection, logger,
|
||||||
eventBusSubcriptionsManager, subscriptionClientName, iLifetimeScope);
|
eventBusSubcriptionsManager, subscriptionClientName, iLifetimeScope);
|
||||||
@ -323,7 +324,7 @@
|
|||||||
retryCount = int.Parse(Configuration["EventBusRetryCount"]);
|
retryCount = int.Parse(Configuration["EventBusRetryCount"]);
|
||||||
}
|
}
|
||||||
|
|
||||||
return new EventBusRabbitMQ(rabbitMQPersistentConnection, logger, iLifetimeScope, eventBusSubcriptionsManager, retryCount);
|
return new EventBusRabbitMQ(rabbitMQPersistentConnection, logger, iLifetimeScope, eventBusSubcriptionsManager, subscriptionClientName, retryCount);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -127,6 +127,8 @@ namespace Payment.API
|
|||||||
|
|
||||||
private void RegisterEventBus(IServiceCollection services)
|
private void RegisterEventBus(IServiceCollection services)
|
||||||
{
|
{
|
||||||
|
var subscriptionClientName = Configuration["SubscriptionClientName"];
|
||||||
|
|
||||||
if (Configuration.GetValue<bool>("AzureServiceBusEnabled"))
|
if (Configuration.GetValue<bool>("AzureServiceBusEnabled"))
|
||||||
{
|
{
|
||||||
services.AddSingleton<IEventBus, EventBusServiceBus>(sp =>
|
services.AddSingleton<IEventBus, EventBusServiceBus>(sp =>
|
||||||
@ -134,8 +136,7 @@ namespace Payment.API
|
|||||||
var serviceBusPersisterConnection = sp.GetRequiredService<IServiceBusPersisterConnection>();
|
var serviceBusPersisterConnection = sp.GetRequiredService<IServiceBusPersisterConnection>();
|
||||||
var iLifetimeScope = sp.GetRequiredService<ILifetimeScope>();
|
var iLifetimeScope = sp.GetRequiredService<ILifetimeScope>();
|
||||||
var logger = sp.GetRequiredService<ILogger<EventBusServiceBus>>();
|
var logger = sp.GetRequiredService<ILogger<EventBusServiceBus>>();
|
||||||
var eventBusSubcriptionsManager = sp.GetRequiredService<IEventBusSubscriptionsManager>();
|
var eventBusSubcriptionsManager = sp.GetRequiredService<IEventBusSubscriptionsManager>();
|
||||||
var subscriptionClientName = Configuration["SubscriptionClientName"];
|
|
||||||
|
|
||||||
return new EventBusServiceBus(serviceBusPersisterConnection, logger,
|
return new EventBusServiceBus(serviceBusPersisterConnection, logger,
|
||||||
eventBusSubcriptionsManager, subscriptionClientName, iLifetimeScope);
|
eventBusSubcriptionsManager, subscriptionClientName, iLifetimeScope);
|
||||||
@ -156,7 +157,7 @@ namespace Payment.API
|
|||||||
retryCount = int.Parse(Configuration["EventBusRetryCount"]);
|
retryCount = int.Parse(Configuration["EventBusRetryCount"]);
|
||||||
}
|
}
|
||||||
|
|
||||||
return new EventBusRabbitMQ(rabbitMQPersistentConnection, logger, iLifetimeScope, eventBusSubcriptionsManager, retryCount);
|
return new EventBusRabbitMQ(rabbitMQPersistentConnection, logger, iLifetimeScope, eventBusSubcriptionsManager, subscriptionClientName, retryCount);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -3,7 +3,10 @@
|
|||||||
<Scenarios>
|
<Scenarios>
|
||||||
<Scenario Name="OrderProductsLoadTest" DelayBetweenIterations="0" PercentNewUsers="0" IPSwitching="false" TestMixType="PercentageOfTestsStarted" ApplyDistributionToPacingDelay="true" MaxTestIterations="0" DisableDuringWarmup="false" DelayStartTime="0" AllowedAgents="">
|
<Scenario Name="OrderProductsLoadTest" DelayBetweenIterations="0" PercentNewUsers="0" IPSwitching="false" TestMixType="PercentageOfTestsStarted" ApplyDistributionToPacingDelay="true" MaxTestIterations="0" DisableDuringWarmup="false" DelayStartTime="0" AllowedAgents="">
|
||||||
<ThinkProfile Value="0.2" Pattern="NormalDistribution" />
|
<ThinkProfile Value="0.2" Pattern="NormalDistribution" />
|
||||||
<LoadProfile Pattern="Step" InitialUsers="1" MaxUsers="50" StepUsers="2" StepDuration="10" StepRampTime="10" />
|
<LoadProfile Pattern="Step" InitialUsers="1" MaxUsers="1000" StepUsers="50" StepDuration="10" StepRampTime="10" />
|
||||||
|
<InitializeTest>
|
||||||
|
<TestProfile Name="AddProducts" Path="webmvc\addproducts.webtest" Id="2c9d53ae-0237-47bd-a5d2-6500ef5d8fcb" Percentage="0.0" Type="Microsoft.VisualStudio.TestTools.WebStress.DeclarativeWebTestElement, Microsoft.VisualStudio.QualityTools.LoadTest, Version=15.0.0.0, Culture=neutral, PublicKeyToken=b03f5f7f11d50a3a" />
|
||||||
|
</InitializeTest>
|
||||||
<TestMix>
|
<TestMix>
|
||||||
<TestProfile Name="GetItems" Path="catalog.api\getitems.webtest" Id="e527de7e-beff-4824-af52-dda763fd5e6c" Percentage="19" Type="Microsoft.VisualStudio.TestTools.WebStress.DeclarativeWebTestElement, Microsoft.VisualStudio.QualityTools.LoadTest, Version=15.0.0.0, Culture=neutral, PublicKeyToken=b03f5f7f11d50a3a" />
|
<TestProfile Name="GetItems" Path="catalog.api\getitems.webtest" Id="e527de7e-beff-4824-af52-dda763fd5e6c" Percentage="19" Type="Microsoft.VisualStudio.TestTools.WebStress.DeclarativeWebTestElement, Microsoft.VisualStudio.QualityTools.LoadTest, Version=15.0.0.0, Culture=neutral, PublicKeyToken=b03f5f7f11d50a3a" />
|
||||||
<TestProfile Name="GetCatalogTypes" Path="catalog.api\getcatalogtypes.webtest" Id="7df20b29-d5c3-447b-b73d-95c63e9c4061" Percentage="15" Type="Microsoft.VisualStudio.TestTools.WebStress.DeclarativeWebTestElement, Microsoft.VisualStudio.QualityTools.LoadTest, Version=15.0.0.0, Culture=neutral, PublicKeyToken=b03f5f7f11d50a3a" />
|
<TestProfile Name="GetCatalogTypes" Path="catalog.api\getcatalogtypes.webtest" Id="7df20b29-d5c3-447b-b73d-95c63e9c4061" Percentage="15" Type="Microsoft.VisualStudio.TestTools.WebStress.DeclarativeWebTestElement, Microsoft.VisualStudio.QualityTools.LoadTest, Version=15.0.0.0, Culture=neutral, PublicKeyToken=b03f5f7f11d50a3a" />
|
||||||
|
Loading…
x
Reference in New Issue
Block a user