Update ServiceBus to the new SDKs.

This commit is contained in:
zedy 2021-06-11 17:51:21 +08:00
parent 1cbd218dc4
commit 804cbcf2a8
4 changed files with 53 additions and 52 deletions

View File

@ -1,58 +1,50 @@
using Microsoft.Azure.ServiceBus; using Azure.Messaging.ServiceBus;
using Azure.Messaging.ServiceBus.Administration;
using System; using System;
namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusServiceBus namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusServiceBus
{ {
public class DefaultServiceBusPersisterConnection : IServiceBusPersisterConnection public class DefaultServiceBusPersisterConnection : IServiceBusPersisterConnection
{ {
private readonly ServiceBusConnectionStringBuilder _serviceBusConnectionStringBuilder; private readonly string _serviceBusConnectionString;
private readonly string _subscriptionClientName; private ServiceBusAdministrationClient _subscriptionClient;
private SubscriptionClient _subscriptionClient; private ServiceBusClient _topicClient;
private ITopicClient _topicClient;
bool _disposed; bool _disposed;
public DefaultServiceBusPersisterConnection(ServiceBusConnectionStringBuilder serviceBusConnectionStringBuilder, public DefaultServiceBusPersisterConnection(string serviceBusConnectionString)
string subscriptionClientName)
{ {
_serviceBusConnectionStringBuilder = serviceBusConnectionStringBuilder ?? _serviceBusConnectionString = serviceBusConnectionString;
throw new ArgumentNullException(nameof(serviceBusConnectionStringBuilder)); _subscriptionClient = new ServiceBusAdministrationClient(_serviceBusConnectionString);
_subscriptionClientName = subscriptionClientName; _topicClient = new ServiceBusClient(_serviceBusConnectionString);
_subscriptionClient = new SubscriptionClient(_serviceBusConnectionStringBuilder, subscriptionClientName);
_topicClient = new TopicClient(_serviceBusConnectionStringBuilder, RetryPolicy.Default);
} }
public ITopicClient TopicClient public ServiceBusClient TopicClient
{ {
get get
{ {
if (_topicClient.IsClosedOrClosing) if (_topicClient.IsClosed)
{ {
_topicClient = new TopicClient(_serviceBusConnectionStringBuilder, RetryPolicy.Default); _topicClient = new ServiceBusClient(_serviceBusConnectionString);
} }
return _topicClient; return _topicClient;
} }
} }
public ISubscriptionClient SubscriptionClient public ServiceBusAdministrationClient SubscriptionClient
{ {
get get
{ {
if (_subscriptionClient.IsClosedOrClosing)
{
_subscriptionClient = new SubscriptionClient(_serviceBusConnectionStringBuilder, _subscriptionClientName);
}
return _subscriptionClient; return _subscriptionClient;
} }
} }
public ServiceBusConnectionStringBuilder ServiceBusConnectionStringBuilder => _serviceBusConnectionStringBuilder;
public ITopicClient CreateModel() public ServiceBusClient CreateModel()
{ {
if (_topicClient.IsClosedOrClosing) if (_topicClient.IsClosed)
{ {
_topicClient = new TopicClient(_serviceBusConnectionStringBuilder, RetryPolicy.Default); _topicClient = new ServiceBusClient(_serviceBusConnectionString);
} }
return _topicClient; return _topicClient;

View File

@ -1,7 +1,8 @@
namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusServiceBus using Azure.Messaging.ServiceBus;
using Azure.Messaging.ServiceBus.Administration;
namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusServiceBus
{ {
using Autofac; using Autofac;
using Microsoft.Azure.ServiceBus;
using Microsoft.eShopOnContainers.BuildingBlocks.EventBus; using Microsoft.eShopOnContainers.BuildingBlocks.EventBus;
using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Abstractions; using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Abstractions;
using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Events; using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Events;
@ -17,16 +18,20 @@
private readonly ILogger<EventBusServiceBus> _logger; private readonly ILogger<EventBusServiceBus> _logger;
private readonly IEventBusSubscriptionsManager _subsManager; private readonly IEventBusSubscriptionsManager _subsManager;
private readonly ILifetimeScope _autofac; private readonly ILifetimeScope _autofac;
private readonly string _topicName;
private readonly string _subscriptionName;
private readonly string AUTOFAC_SCOPE_NAME = "eshop_event_bus"; private readonly string AUTOFAC_SCOPE_NAME = "eshop_event_bus";
private const string INTEGRATION_EVENT_SUFFIX = "IntegrationEvent"; private const string INTEGRATION_EVENT_SUFFIX = "IntegrationEvent";
public EventBusServiceBus(IServiceBusPersisterConnection serviceBusPersisterConnection, public EventBusServiceBus(IServiceBusPersisterConnection serviceBusPersisterConnection,
ILogger<EventBusServiceBus> logger, IEventBusSubscriptionsManager subsManager, ILifetimeScope autofac) ILogger<EventBusServiceBus> logger, IEventBusSubscriptionsManager subsManager, ILifetimeScope autofac, string topicName, string subscriptionName)
{ {
_serviceBusPersisterConnection = serviceBusPersisterConnection; _serviceBusPersisterConnection = serviceBusPersisterConnection;
_logger = logger ?? throw new ArgumentNullException(nameof(logger)); _logger = logger ?? throw new ArgumentNullException(nameof(logger));
_subsManager = subsManager ?? new InMemoryEventBusSubscriptionsManager(); _subsManager = subsManager ?? new InMemoryEventBusSubscriptionsManager();
_autofac = autofac; _autofac = autofac;
_topicName = topicName;
_subscriptionName = subscriptionName;
RemoveDefaultRule(); RemoveDefaultRule();
RegisterSubscriptionClientMessageHandler(); RegisterSubscriptionClientMessageHandler();
@ -38,14 +43,14 @@
var jsonMessage = JsonSerializer.Serialize(@event); var jsonMessage = JsonSerializer.Serialize(@event);
var body = Encoding.UTF8.GetBytes(jsonMessage); var body = Encoding.UTF8.GetBytes(jsonMessage);
var message = new Message var message = new ServiceBusMessage
{ {
MessageId = Guid.NewGuid().ToString(), MessageId = Guid.NewGuid().ToString(),
Body = body, Body = new BinaryData(body),
Label = eventName, Subject = eventName,
}; };
_serviceBusPersisterConnection.TopicClient.SendAsync(message) _serviceBusPersisterConnection.TopicClient.CreateSender(_topicName).SendMessageAsync(message)
.GetAwaiter() .GetAwaiter()
.GetResult(); .GetResult();
} }
@ -69,9 +74,9 @@
{ {
try try
{ {
_serviceBusPersisterConnection.SubscriptionClient.AddRuleAsync(new RuleDescription _serviceBusPersisterConnection.SubscriptionClient.CreateRuleAsync(_topicName, _subscriptionName, new CreateRuleOptions
{ {
Filter = new CorrelationFilter { Label = eventName }, Filter = new CorrelationRuleFilter() { Subject = eventName },
Name = eventName Name = eventName
}).GetAwaiter().GetResult(); }).GetAwaiter().GetResult();
} }
@ -96,11 +101,11 @@
{ {
_serviceBusPersisterConnection _serviceBusPersisterConnection
.SubscriptionClient .SubscriptionClient
.RemoveRuleAsync(eventName) .DeleteRuleAsync(_topicName, _subscriptionName, eventName)
.GetAwaiter() .GetAwaiter()
.GetResult(); .GetResult();
} }
catch (MessagingEntityNotFoundException) catch (ServiceBusException ex) when (ex.Reason == ServiceBusFailureReason.MessagingEntityNotFound)
{ {
_logger.LogWarning("The messaging entity {eventName} Could not be found.", eventName); _logger.LogWarning("The messaging entity {eventName} Could not be found.", eventName);
} }
@ -125,25 +130,28 @@
private void RegisterSubscriptionClientMessageHandler() private void RegisterSubscriptionClientMessageHandler()
{ {
_serviceBusPersisterConnection.SubscriptionClient.RegisterMessageHandler( ServiceBusProcessorOptions options = new ServiceBusProcessorOptions { MaxConcurrentCalls = 10, AutoCompleteMessages = false };
async (message, token) => ServiceBusProcessor processor = _serviceBusPersisterConnection.TopicClient.CreateProcessor(_topicName, options);
processor.ProcessMessageAsync +=
async (args) =>
{ {
var eventName = $"{message.Label}{INTEGRATION_EVENT_SUFFIX}"; var eventName = $"{args.Message.Subject}{INTEGRATION_EVENT_SUFFIX}";
var messageData = Encoding.UTF8.GetString(message.Body); var messageData = Encoding.UTF8.GetString(args.Message.Body);
// Complete the message so that it is not received again. // Complete the message so that it is not received again.
if (await ProcessEvent(eventName, messageData)) if (await ProcessEvent(eventName, messageData))
{ {
await _serviceBusPersisterConnection.SubscriptionClient.CompleteAsync(message.SystemProperties.LockToken); await args.CompleteMessageAsync(args.Message);
} }
}, };
new MessageHandlerOptions(ExceptionReceivedHandler) { MaxConcurrentCalls = 10, AutoComplete = false });
processor.ProcessErrorAsync += ErrorHandler;
} }
private Task ExceptionReceivedHandler(ExceptionReceivedEventArgs exceptionReceivedEventArgs) private Task ErrorHandler(ProcessErrorEventArgs exceptionReceivedEventArgs)
{ {
var ex = exceptionReceivedEventArgs.Exception; var ex = exceptionReceivedEventArgs.Exception;
var context = exceptionReceivedEventArgs.ExceptionReceivedContext; var context = exceptionReceivedEventArgs.ErrorSource;
_logger.LogError(ex, "ERROR handling message: {ExceptionMessage} - Context: {@ExceptionContext}", ex.Message, context); _logger.LogError(ex, "ERROR handling message: {ExceptionMessage} - Context: {@ExceptionContext}", ex.Message, context);
@ -190,13 +198,13 @@
{ {
_serviceBusPersisterConnection _serviceBusPersisterConnection
.SubscriptionClient .SubscriptionClient
.RemoveRuleAsync(RuleDescription.DefaultRuleName) .DeleteRuleAsync(_topicName, _subscriptionName, RuleProperties.DefaultRuleName)
.GetAwaiter() .GetAwaiter()
.GetResult(); .GetResult();
} }
catch (MessagingEntityNotFoundException) catch (ServiceBusException ex) when (ex.Reason == ServiceBusFailureReason.MessagingEntityNotFound)
{ {
_logger.LogWarning("The messaging entity {DefaultRuleName} Could not be found.", RuleDescription.DefaultRuleName); _logger.LogWarning("The messaging entity {DefaultRuleName} Could not be found.", RuleProperties.DefaultRuleName);
} }
} }
} }

View File

@ -7,7 +7,7 @@
<ItemGroup> <ItemGroup>
<PackageReference Include="Autofac" Version="6.1.0" /> <PackageReference Include="Autofac" Version="6.1.0" />
<PackageReference Include="Microsoft.Azure.ServiceBus" Version="5.1.0" /> <PackageReference Include="Azure.Messaging.ServiceBus" Version="7.1.2" />
<PackageReference Include="Microsoft.CSharp" Version="4.7.0" /> <PackageReference Include="Microsoft.CSharp" Version="4.7.0" />
<PackageReference Include="Microsoft.Extensions.Logging" Version="5.0.0" /> <PackageReference Include="Microsoft.Extensions.Logging" Version="5.0.0" />
</ItemGroup> </ItemGroup>

View File

@ -1,11 +1,12 @@
namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusServiceBus using Azure.Messaging.ServiceBus;
using Azure.Messaging.ServiceBus.Administration;
namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusServiceBus
{ {
using Microsoft.Azure.ServiceBus;
using System; using System;
public interface IServiceBusPersisterConnection : IDisposable public interface IServiceBusPersisterConnection : IDisposable
{ {
ITopicClient TopicClient { get; } ServiceBusClient TopicClient { get; }
ISubscriptionClient SubscriptionClient { get; } ServiceBusAdministrationClient SubscriptionClient { get; }
} }
} }