Fix bug service bus not receiving messages

Update service bus to 1.0 version
This commit is contained in:
Ramón Tomás 2017-08-17 13:00:48 +02:00
parent 4876dd562d
commit b4dccbf774
2 changed files with 25 additions and 12 deletions

View File

@ -1,18 +1,17 @@
namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusServiceBus namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusServiceBus
{ {
using System; using Autofac;
using Microsoft.Azure.ServiceBus;
using Microsoft.eShopOnContainers.BuildingBlocks.EventBus;
using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Abstractions; using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Abstractions;
using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Events; using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Events;
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;
using Microsoft.Azure.ServiceBus;
using Newtonsoft.Json; using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using System;
using System.Reflection;
using System.Text; using System.Text;
using System.Threading.Tasks; using System.Threading.Tasks;
using Microsoft.eShopOnContainers.BuildingBlocks.EventBus;
using System.Reflection;
using Microsoft.Azure.ServiceBus.Filters;
using Autofac;
using Newtonsoft.Json.Linq;
public class EventBusServiceBus : IEventBus public class EventBusServiceBus : IEventBus
{ {
@ -129,11 +128,25 @@
_subscriptionClient.RegisterMessageHandler( _subscriptionClient.RegisterMessageHandler(
async (message, token) => async (message, token) =>
{ {
var eventName = message.Label; var eventName = $"{message.Label}{INTEGRATION_EVENT_SUFIX}";
var messageData = Encoding.UTF8.GetString(message.Body); var messageData = Encoding.UTF8.GetString(message.Body);
await ProcessEvent(eventName, messageData); await ProcessEvent(eventName, messageData);
// Complete the message so that it is not received again.
await _subscriptionClient.CompleteAsync(message.SystemProperties.LockToken);
}, },
new MessageHandlerOptions() { MaxConcurrentCalls = 10, AutoComplete = true }); new MessageHandlerOptions(ExceptionReceivedHandler) { MaxConcurrentCalls = 10, AutoComplete = false });
}
private Task ExceptionReceivedHandler(ExceptionReceivedEventArgs exceptionReceivedEventArgs)
{
Console.WriteLine($"Message handler encountered an exception {exceptionReceivedEventArgs.Exception}.");
var context = exceptionReceivedEventArgs.ExceptionReceivedContext;
Console.WriteLine("Exception context for troubleshooting:");
Console.WriteLine($"- Endpoint: {context.Endpoint}");
Console.WriteLine($"- Entity Path: {context.EntityPath}");
Console.WriteLine($"- Executing Action: {context.Action}");
return Task.CompletedTask;
} }
private async Task ProcessEvent(string eventName, string message) private async Task ProcessEvent(string eventName, string message)
@ -169,13 +182,13 @@
try try
{ {
_subscriptionClient _subscriptionClient
.RemoveRuleAsync(SubscriptionClient.DefaultRule) .RemoveRuleAsync(RuleDescription.DefaultRuleName)
.GetAwaiter() .GetAwaiter()
.GetResult(); .GetResult();
} }
catch (MessagingEntityNotFoundException) catch (MessagingEntityNotFoundException)
{ {
_logger.LogInformation($"The messaging entity {SubscriptionClient.DefaultRule} Could not be found."); _logger.LogInformation($"The messaging entity { RuleDescription.DefaultRuleName } Could not be found.");
} }
} }
} }

View File

@ -6,7 +6,7 @@
</PropertyGroup> </PropertyGroup>
<ItemGroup> <ItemGroup>
<PackageReference Include="Microsoft.Azure.ServiceBus" Version="0.0.5-preview" /> <PackageReference Include="Microsoft.Azure.ServiceBus" Version="1.0.0" />
<PackageReference Include="Microsoft.Extensions.Logging" Version="1.1.1" /> <PackageReference Include="Microsoft.Extensions.Logging" Version="1.1.1" />
</ItemGroup> </ItemGroup>