Browse Source

add Dispose() menthod in interface

pull/1698/head
zedy 3 years ago
parent
commit
0709a3ce5b
4 changed files with 20 additions and 7 deletions
  1. +2
    -0
      src/BuildingBlocks/EventBus/EventBus/Abstractions/IEventBus.cs
  2. +4
    -0
      src/BuildingBlocks/EventBus/EventBusRabbitMQ/EventBusRabbitMQ.cs
  3. +2
    -0
      src/BuildingBlocks/EventBus/EventBusRabbitMQ/EventBusRabbitMQ.csproj
  4. +12
    -7
      src/BuildingBlocks/EventBus/EventBusServiceBus/EventBusServiceBus.cs

+ 2
- 0
src/BuildingBlocks/EventBus/EventBus/Abstractions/IEventBus.cs View File

@ -19,5 +19,7 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Abstractions
void Unsubscribe<T, TH>()
where TH : IIntegrationEventHandler<T>
where T : IntegrationEvent;
void Dispose();
}
}

+ 4
- 0
src/BuildingBlocks/EventBus/EventBusRabbitMQ/EventBusRabbitMQ.cs View File

@ -4,6 +4,7 @@ using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Abstractions;
using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Events;
using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Extensions;
using Microsoft.Extensions.Logging;
using Azure.Messaging.ServiceBus;
using Polly;
using Polly.Retry;
using RabbitMQ.Client;
@ -27,6 +28,7 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ
private readonly IEventBusSubscriptionsManager _subsManager;
private readonly ILifetimeScope _autofac;
private readonly int _retryCount;
private readonly ServiceBusProcessor _processor;
private IModel _consumerChannel;
private string _queueName;
@ -42,6 +44,7 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ
_autofac = autofac;
_retryCount = retryCount;
_subsManager.OnEventRemoved += SubsManager_OnEventRemoved;
this._processor = EventBusServiceBus.EventBusServiceBus.DeliverProcessor();
}
private void SubsManager_OnEventRemoved(object sender, string eventName)
@ -178,6 +181,7 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ
}
_subsManager.Clear();
_processor.CloseAsync().GetAwaiter().GetResult();
}
private void StartBasicConsume()


+ 2
- 0
src/BuildingBlocks/EventBus/EventBusRabbitMQ/EventBusRabbitMQ.csproj View File

@ -7,6 +7,7 @@
<ItemGroup>
<PackageReference Include="Autofac" Version="6.1.0" />
<PackageReference Include="Azure.Messaging.ServiceBus" Version="7.2.0" />
<PackageReference Include="Microsoft.CSharp" Version="4.7.0" />
<PackageReference Include="Microsoft.Extensions.Logging" Version="5.0.0" />
<PackageReference Include="Polly" Version="7.2.1" />
@ -14,6 +15,7 @@
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\EventBusServiceBus\EventBusServiceBus.csproj" />
<ProjectReference Include="..\EventBus\EventBus.csproj" />
</ItemGroup>

+ 12
- 7
src/BuildingBlocks/EventBus/EventBusServiceBus/EventBusServiceBus.cs View File

@ -20,7 +20,7 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusServiceBus
private readonly string _topicName;
private readonly string _subscriptionName;
private ServiceBusSender _sender;
private ServiceBusProcessor _processor;
private static ServiceBusProcessor _processor;
private readonly string AUTOFAC_SCOPE_NAME = "eshop_event_bus";
private const string INTEGRATION_EVENT_SUFFIX = "IntegrationEvent";
@ -34,6 +34,8 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusServiceBus
_topicName = topicName;
_subscriptionName = subscriptionName;
_sender = _serviceBusPersisterConnection.TopicClient.CreateSender(_topicName);
ServiceBusProcessorOptions options = new ServiceBusProcessorOptions { MaxConcurrentCalls = 10, AutoCompleteMessages = false };
_processor = _serviceBusPersisterConnection.TopicClient.CreateProcessor(_topicName, options);
RemoveDefaultRule();
RegisterSubscriptionClientMessageHandlerAsync().GetAwaiter().GetResult();
@ -126,14 +128,17 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusServiceBus
public void Dispose()
{
_subsManager.Clear();
this._processor.CloseAsync().GetAwaiter().GetResult();
_processor.CloseAsync().GetAwaiter().GetResult();
}
public static ServiceBusProcessor DeliverProcessor()
{
return _processor;
}
private async Task RegisterSubscriptionClientMessageHandlerAsync()
{
ServiceBusProcessorOptions options = new ServiceBusProcessorOptions { MaxConcurrentCalls = 10, AutoCompleteMessages = false };
this._processor = _serviceBusPersisterConnection.TopicClient.CreateProcessor(_topicName, options);
this._processor.ProcessMessageAsync +=
_processor.ProcessMessageAsync +=
async (args) =>
{
var eventName = $"{args.Message.Subject}{INTEGRATION_EVENT_SUFFIX}";
@ -146,8 +151,8 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusServiceBus
}
};
this._processor.ProcessErrorAsync += ErrorHandler;
await this._processor.StartProcessingAsync();
_processor.ProcessErrorAsync += ErrorHandler;
await _processor.StartProcessingAsync();
}
private Task ErrorHandler(ProcessErrorEventArgs args)


Loading…
Cancel
Save