update codes

This commit is contained in:
zedy 2021-06-23 13:56:39 +08:00
parent a147d7cf05
commit e960be43bb
2 changed files with 11 additions and 16 deletions

View File

@ -20,7 +20,7 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusServiceBus
private readonly string _topicName; private readonly string _topicName;
private readonly string _subscriptionName; private readonly string _subscriptionName;
private ServiceBusSender _sender; private ServiceBusSender _sender;
private ServiceBusProcessor processor; private ServiceBusProcessor _processor;
private readonly string AUTOFAC_SCOPE_NAME = "eshop_event_bus"; private readonly string AUTOFAC_SCOPE_NAME = "eshop_event_bus";
private const string INTEGRATION_EVENT_SUFFIX = "IntegrationEvent"; private const string INTEGRATION_EVENT_SUFFIX = "IntegrationEvent";
@ -36,7 +36,7 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusServiceBus
_sender = _serviceBusPersisterConnection.TopicClient.CreateSender(_topicName); _sender = _serviceBusPersisterConnection.TopicClient.CreateSender(_topicName);
RemoveDefaultRule(); RemoveDefaultRule();
RegisterSubscriptionClientMessageHandler(); RegisterSubscriptionClientMessageHandlerAsync().GetAwaiter().GetResult();
} }
public void Publish(IntegrationEvent @event) public void Publish(IntegrationEvent @event)
@ -126,14 +126,14 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusServiceBus
public void Dispose() public void Dispose()
{ {
_subsManager.Clear(); _subsManager.Clear();
this.processor.CloseAsync(); this._processor.CloseAsync().GetAwaiter().GetResult();
} }
private void RegisterSubscriptionClientMessageHandler() private async Task RegisterSubscriptionClientMessageHandlerAsync()
{ {
ServiceBusProcessorOptions options = new ServiceBusProcessorOptions { MaxConcurrentCalls = 10, AutoCompleteMessages = false }; ServiceBusProcessorOptions options = new ServiceBusProcessorOptions { MaxConcurrentCalls = 10, AutoCompleteMessages = false };
this.processor = _serviceBusPersisterConnection.TopicClient.CreateProcessor(_topicName, options); this._processor = _serviceBusPersisterConnection.TopicClient.CreateProcessor(_topicName, options);
processor.ProcessMessageAsync += this._processor.ProcessMessageAsync +=
async (args) => async (args) =>
{ {
var eventName = $"{args.Message.Subject}{INTEGRATION_EVENT_SUFFIX}"; var eventName = $"{args.Message.Subject}{INTEGRATION_EVENT_SUFFIX}";
@ -146,14 +146,14 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusServiceBus
} }
}; };
processor.ProcessErrorAsync += ErrorHandler; this._processor.ProcessErrorAsync += ErrorHandler;
processor.StartProcessingAsync(); await this._processor.StartProcessingAsync();
} }
private Task ErrorHandler(ProcessErrorEventArgs exceptionReceivedEventArgs) private Task ErrorHandler(ProcessErrorEventArgs args)
{ {
var ex = exceptionReceivedEventArgs.Exception; var ex = args.Exception;
var context = exceptionReceivedEventArgs.ErrorSource; var context = args.ErrorSource;
_logger.LogError(ex, "ERROR handling message: {ExceptionMessage} - Context: {@ExceptionContext}", ex.Message, context); _logger.LogError(ex, "ERROR handling message: {ExceptionMessage} - Context: {@ExceptionContext}", ex.Message, context);

View File

@ -69,11 +69,6 @@
<PackageReference Include="Swashbuckle.AspNetCore" Version="5.6.3" /> <PackageReference Include="Swashbuckle.AspNetCore" Version="5.6.3" />
</ItemGroup> </ItemGroup>
<ItemGroup>
<Reference Include="Azure.Messaging.ServiceBus">
<HintPath>C:\Users\v-wenjyu\.nuget\packages\azure.messaging.servicebus\7.1.2\lib\netstandard2.0\Azure.Messaging.ServiceBus.dll</HintPath>
</Reference>
</ItemGroup>
<ItemGroup> <ItemGroup>
<None Update="Setup\*"> <None Update="Setup\*">