update codes

This commit is contained in:
zedy 2021-06-23 12:29:09 +08:00
parent cf4b786454
commit a9fc93e732
4 changed files with 30 additions and 21 deletions

View File

@ -31,7 +31,7 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusServiceBus
} }
} }
public ServiceBusAdministrationClient SubscriptionClient public ServiceBusAdministrationClient AdministrationClient
{ {
get get
{ {

View File

@ -1,7 +1,5 @@
using Azure.Messaging.ServiceBus; using Azure.Messaging.ServiceBus;
using Azure.Messaging.ServiceBus.Administration; using Azure.Messaging.ServiceBus.Administration;
namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusServiceBus
{
using Autofac; using Autofac;
using Microsoft.eShopOnContainers.BuildingBlocks.EventBus; using Microsoft.eShopOnContainers.BuildingBlocks.EventBus;
using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Abstractions; using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Abstractions;
@ -11,7 +9,8 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusServiceBus
using System.Text; using System.Text;
using System.Text.Json; using System.Text.Json;
using System.Threading.Tasks; using System.Threading.Tasks;
namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusServiceBus
{
public class EventBusServiceBus : IEventBus public class EventBusServiceBus : IEventBus
{ {
private readonly IServiceBusPersisterConnection _serviceBusPersisterConnection; private readonly IServiceBusPersisterConnection _serviceBusPersisterConnection;
@ -20,6 +19,8 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusServiceBus
private readonly ILifetimeScope _autofac; private readonly ILifetimeScope _autofac;
private readonly string _topicName; private readonly string _topicName;
private readonly string _subscriptionName; private readonly string _subscriptionName;
private ServiceBusSender _sender;
private ServiceBusProcessor processor;
private readonly string AUTOFAC_SCOPE_NAME = "eshop_event_bus"; private readonly string AUTOFAC_SCOPE_NAME = "eshop_event_bus";
private const string INTEGRATION_EVENT_SUFFIX = "IntegrationEvent"; private const string INTEGRATION_EVENT_SUFFIX = "IntegrationEvent";
@ -32,6 +33,7 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusServiceBus
_autofac = autofac; _autofac = autofac;
_topicName = topicName; _topicName = topicName;
_subscriptionName = subscriptionName; _subscriptionName = subscriptionName;
_sender = _serviceBusPersisterConnection.TopicClient.CreateSender(_topicName);
RemoveDefaultRule(); RemoveDefaultRule();
RegisterSubscriptionClientMessageHandler(); RegisterSubscriptionClientMessageHandler();
@ -40,17 +42,15 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusServiceBus
public void Publish(IntegrationEvent @event) public void Publish(IntegrationEvent @event)
{ {
var eventName = @event.GetType().Name.Replace(INTEGRATION_EVENT_SUFFIX, ""); var eventName = @event.GetType().Name.Replace(INTEGRATION_EVENT_SUFFIX, "");
var jsonMessage = JsonSerializer.Serialize(@event);
var body = Encoding.UTF8.GetBytes(jsonMessage);
var message = new ServiceBusMessage var message = new ServiceBusMessage
{ {
MessageId = Guid.NewGuid().ToString(), MessageId = Guid.NewGuid().ToString(),
Body = new BinaryData(body), Body = new BinaryData(@event),
Subject = eventName, Subject = eventName,
}; };
_serviceBusPersisterConnection.TopicClient.CreateSender(_topicName).SendMessageAsync(message) _sender.SendMessageAsync(message)
.GetAwaiter() .GetAwaiter()
.GetResult(); .GetResult();
} }
@ -74,7 +74,7 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusServiceBus
{ {
try try
{ {
_serviceBusPersisterConnection.SubscriptionClient.CreateRuleAsync(_topicName, _subscriptionName, new CreateRuleOptions _serviceBusPersisterConnection.AdministrationClient.CreateRuleAsync(_topicName, _subscriptionName, new CreateRuleOptions
{ {
Filter = new CorrelationRuleFilter() { Subject = eventName }, Filter = new CorrelationRuleFilter() { Subject = eventName },
Name = eventName Name = eventName
@ -100,7 +100,7 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusServiceBus
try try
{ {
_serviceBusPersisterConnection _serviceBusPersisterConnection
.SubscriptionClient .AdministrationClient
.DeleteRuleAsync(_topicName, _subscriptionName, eventName) .DeleteRuleAsync(_topicName, _subscriptionName, eventName)
.GetAwaiter() .GetAwaiter()
.GetResult(); .GetResult();
@ -126,17 +126,18 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusServiceBus
public void Dispose() public void Dispose()
{ {
_subsManager.Clear(); _subsManager.Clear();
this.processor.CloseAsync();
} }
private void RegisterSubscriptionClientMessageHandler() private void RegisterSubscriptionClientMessageHandler()
{ {
ServiceBusProcessorOptions options = new ServiceBusProcessorOptions { MaxConcurrentCalls = 10, AutoCompleteMessages = false }; ServiceBusProcessorOptions options = new ServiceBusProcessorOptions { MaxConcurrentCalls = 10, AutoCompleteMessages = false };
ServiceBusProcessor processor = _serviceBusPersisterConnection.TopicClient.CreateProcessor(_topicName, options); this.processor = _serviceBusPersisterConnection.TopicClient.CreateProcessor(_topicName, options);
processor.ProcessMessageAsync += processor.ProcessMessageAsync +=
async (args) => async (args) =>
{ {
var eventName = $"{args.Message.Subject}{INTEGRATION_EVENT_SUFFIX}"; var eventName = $"{args.Message.Subject}{INTEGRATION_EVENT_SUFFIX}";
var messageData = Encoding.UTF8.GetString(args.Message.Body); string messageData = args.Message.Body.ToString();
// Complete the message so that it is not received again. // Complete the message so that it is not received again.
if (await ProcessEvent(eventName, messageData)) if (await ProcessEvent(eventName, messageData))
@ -146,6 +147,7 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusServiceBus
}; };
processor.ProcessErrorAsync += ErrorHandler; processor.ProcessErrorAsync += ErrorHandler;
processor.StartProcessingAsync();
} }
private Task ErrorHandler(ProcessErrorEventArgs exceptionReceivedEventArgs) private Task ErrorHandler(ProcessErrorEventArgs exceptionReceivedEventArgs)
@ -197,7 +199,7 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusServiceBus
try try
{ {
_serviceBusPersisterConnection _serviceBusPersisterConnection
.SubscriptionClient .AdministrationClient
.DeleteRuleAsync(_topicName, _subscriptionName, RuleProperties.DefaultRuleName) .DeleteRuleAsync(_topicName, _subscriptionName, RuleProperties.DefaultRuleName)
.GetAwaiter() .GetAwaiter()
.GetResult(); .GetResult();

View File

@ -7,6 +7,6 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusServiceBus
public interface IServiceBusPersisterConnection : IDisposable public interface IServiceBusPersisterConnection : IDisposable
{ {
ServiceBusClient TopicClient { get; } ServiceBusClient TopicClient { get; }
ServiceBusAdministrationClient SubscriptionClient { get; } ServiceBusAdministrationClient AdministrationClient { get; }
} }
} }

View File

@ -42,6 +42,7 @@
<PackageReference Include="AspNetCore.HealthChecks.SqlServer" Version="5.0.1" /> <PackageReference Include="AspNetCore.HealthChecks.SqlServer" Version="5.0.1" />
<PackageReference Include="AspNetCore.HealthChecks.UI.Client" Version="5.0.1" /> <PackageReference Include="AspNetCore.HealthChecks.UI.Client" Version="5.0.1" />
<PackageReference Include="Autofac.Extensions.DependencyInjection" Version="7.1.0" /> <PackageReference Include="Autofac.Extensions.DependencyInjection" Version="7.1.0" />
<PackageReference Include="Azure.Messaging.ServiceBus" Version="7.1.2" />
<PackageReference Include="Dapper" Version="2.0.78" /> <PackageReference Include="Dapper" Version="2.0.78" />
<PackageReference Include="FluentValidation.AspNetCore" Version="9.3.0" /> <PackageReference Include="FluentValidation.AspNetCore" Version="9.3.0" />
<PackageReference Include="Google.Protobuf" Version="3.14.0" /> <PackageReference Include="Google.Protobuf" Version="3.14.0" />
@ -68,6 +69,12 @@
<PackageReference Include="Swashbuckle.AspNetCore" Version="5.6.3" /> <PackageReference Include="Swashbuckle.AspNetCore" Version="5.6.3" />
</ItemGroup> </ItemGroup>
<ItemGroup>
<Reference Include="Azure.Messaging.ServiceBus">
<HintPath>C:\Users\v-wenjyu\.nuget\packages\azure.messaging.servicebus\7.1.2\lib\netstandard2.0\Azure.Messaging.ServiceBus.dll</HintPath>
</Reference>
</ItemGroup>
<ItemGroup> <ItemGroup>
<None Update="Setup\*"> <None Update="Setup\*">
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory> <CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>