Merge branch 'dev' of https://github.com/dotnet/eShopOnContainers into dev
This commit is contained in:
commit
dee6ea7342
@ -11,6 +11,6 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Events
|
|||||||
Id = Guid.NewGuid();
|
Id = Guid.NewGuid();
|
||||||
}
|
}
|
||||||
|
|
||||||
public Guid Id { get; private set; }
|
public Guid Id { get; }
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -21,7 +21,8 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ
|
|||||||
private readonly Dictionary<string, List<IIntegrationEventHandler>> _handlers;
|
private readonly Dictionary<string, List<IIntegrationEventHandler>> _handlers;
|
||||||
private readonly List<Type> _eventTypes;
|
private readonly List<Type> _eventTypes;
|
||||||
|
|
||||||
private Tuple<IModel, IConnection> _connection;
|
private IModel _model;
|
||||||
|
private IConnection _connection;
|
||||||
private string _queueName;
|
private string _queueName;
|
||||||
|
|
||||||
|
|
||||||
@ -86,15 +87,15 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ
|
|||||||
_handlers.Remove(eventName);
|
_handlers.Remove(eventName);
|
||||||
var eventType = _eventTypes.Single(e => e.Name == eventName);
|
var eventType = _eventTypes.Single(e => e.Name == eventName);
|
||||||
_eventTypes.Remove(eventType);
|
_eventTypes.Remove(eventType);
|
||||||
_connection.Item1.QueueUnbind(queue: _queueName,
|
_model.QueueUnbind(queue: _queueName,
|
||||||
exchange: _brokerName,
|
exchange: _brokerName,
|
||||||
routingKey: eventName);
|
routingKey: eventName);
|
||||||
|
|
||||||
if (_handlers.Keys.Count == 0)
|
if (_handlers.Keys.Count == 0)
|
||||||
{
|
{
|
||||||
_queueName = string.Empty;
|
_queueName = string.Empty;
|
||||||
_connection.Item1.Dispose();
|
_model.Dispose();
|
||||||
_connection.Item2.Dispose();
|
_connection.Dispose();
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
@ -103,50 +104,53 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ
|
|||||||
|
|
||||||
public void Dispose()
|
public void Dispose()
|
||||||
{
|
{
|
||||||
if (_connection != null)
|
_handlers.Clear();
|
||||||
{
|
_model?.Dispose();
|
||||||
_handlers.Clear();
|
_connection?.Dispose();
|
||||||
_connection.Item1.Dispose();
|
|
||||||
_connection.Item2.Dispose();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private IModel GetChannel()
|
private IModel GetChannel()
|
||||||
{
|
{
|
||||||
if (_connection != null)
|
if (_model != null)
|
||||||
{
|
{
|
||||||
return _connection.Item1;
|
return _model;
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
var factory = new ConnectionFactory() { HostName = _connectionString };
|
(_model, _connection) = CreateConnection();
|
||||||
var connection = factory.CreateConnection();
|
return _model;
|
||||||
var channel = connection.CreateModel();
|
|
||||||
|
|
||||||
channel.ExchangeDeclare(exchange: _brokerName,
|
|
||||||
type: "direct");
|
|
||||||
if (string.IsNullOrEmpty(_queueName))
|
|
||||||
{
|
|
||||||
_queueName = channel.QueueDeclare().QueueName;
|
|
||||||
}
|
|
||||||
|
|
||||||
var consumer = new EventingBasicConsumer(channel);
|
|
||||||
consumer.Received += async (model, ea) =>
|
|
||||||
{
|
|
||||||
var eventName = ea.RoutingKey;
|
|
||||||
var message = Encoding.UTF8.GetString(ea.Body);
|
|
||||||
|
|
||||||
await ProcessEvent(eventName, message);
|
|
||||||
};
|
|
||||||
channel.BasicConsume(queue: _queueName,
|
|
||||||
noAck: true,
|
|
||||||
consumer: consumer);
|
|
||||||
_connection = new Tuple<IModel, IConnection>(channel, connection);
|
|
||||||
|
|
||||||
return _connection.Item1;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
private (IModel model, IConnection connection) CreateConnection()
|
||||||
|
{
|
||||||
|
var factory = new ConnectionFactory() { HostName = _connectionString };
|
||||||
|
var con = factory.CreateConnection();
|
||||||
|
var channel = con.CreateModel();
|
||||||
|
|
||||||
|
channel.ExchangeDeclare(exchange: _brokerName,
|
||||||
|
type: "direct");
|
||||||
|
if (string.IsNullOrEmpty(_queueName))
|
||||||
|
{
|
||||||
|
_queueName = channel.QueueDeclare().QueueName;
|
||||||
|
}
|
||||||
|
|
||||||
|
var consumer = new EventingBasicConsumer(channel);
|
||||||
|
consumer.Received += async (model, ea) =>
|
||||||
|
{
|
||||||
|
var eventName = ea.RoutingKey;
|
||||||
|
var message = Encoding.UTF8.GetString(ea.Body);
|
||||||
|
|
||||||
|
await ProcessEvent(eventName, message);
|
||||||
|
};
|
||||||
|
channel.BasicConsume(queue: _queueName,
|
||||||
|
noAck: true,
|
||||||
|
consumer: consumer);
|
||||||
|
|
||||||
|
return (channel, con);
|
||||||
|
}
|
||||||
|
|
||||||
private async Task ProcessEvent(string eventName, string message)
|
private async Task ProcessEvent(string eventName, string message)
|
||||||
{
|
{
|
||||||
if (_handlers.ContainsKey(eventName))
|
if (_handlers.ContainsKey(eventName))
|
||||||
|
@ -9,6 +9,7 @@
|
|||||||
<ItemGroup>
|
<ItemGroup>
|
||||||
<PackageReference Include="Newtonsoft.Json" Version="9.0.1" />
|
<PackageReference Include="Newtonsoft.Json" Version="9.0.1" />
|
||||||
<PackageReference Include="RabbitMQ.Client" Version="4.1.1" />
|
<PackageReference Include="RabbitMQ.Client" Version="4.1.1" />
|
||||||
|
<PackageReference Include="System.ValueTuple" Version="4.3.0" />
|
||||||
</ItemGroup>
|
</ItemGroup>
|
||||||
|
|
||||||
<ItemGroup>
|
<ItemGroup>
|
||||||
|
@ -8,6 +8,12 @@ using System.Threading.Tasks;
|
|||||||
|
|
||||||
namespace WebMVC.Services.Utilities
|
namespace WebMVC.Services.Utilities
|
||||||
{
|
{
|
||||||
|
/// <summary>
|
||||||
|
/// HttpClient wrapper that integrates Retry and Circuit
|
||||||
|
/// breaker policies when calling to Api services.
|
||||||
|
/// Currently is ONLY implemented for the ASP MVC
|
||||||
|
/// and Xamarin App
|
||||||
|
/// </summary>
|
||||||
public class HttpApiClientWrapper : IHttpClient
|
public class HttpApiClientWrapper : IHttpClient
|
||||||
{
|
{
|
||||||
private HttpClient _client;
|
private HttpClient _client;
|
||||||
@ -30,7 +36,7 @@ namespace WebMVC.Services.Utilities
|
|||||||
Policy.Handle<HttpRequestException>()
|
Policy.Handle<HttpRequestException>()
|
||||||
.CircuitBreakerAsync(
|
.CircuitBreakerAsync(
|
||||||
// number of exceptions before breaking circuit
|
// number of exceptions before breaking circuit
|
||||||
3,
|
5,
|
||||||
// time circuit opened before retry
|
// time circuit opened before retry
|
||||||
TimeSpan.FromMinutes(1),
|
TimeSpan.FromMinutes(1),
|
||||||
(exception, duration) =>
|
(exception, duration) =>
|
||||||
@ -49,7 +55,7 @@ namespace WebMVC.Services.Utilities
|
|||||||
Policy.Handle<HttpRequestException>()
|
Policy.Handle<HttpRequestException>()
|
||||||
.WaitAndRetryAsync(
|
.WaitAndRetryAsync(
|
||||||
// number of retries
|
// number of retries
|
||||||
3,
|
5,
|
||||||
// exponential backofff
|
// exponential backofff
|
||||||
retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)),
|
retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)),
|
||||||
// on retry
|
// on retry
|
||||||
@ -62,27 +68,21 @@ namespace WebMVC.Services.Utilities
|
|||||||
}
|
}
|
||||||
);
|
);
|
||||||
|
|
||||||
// Notice that these (and other methods below) are Task
|
|
||||||
// returning asynchronous methods. But, they do not
|
|
||||||
// have the 'async' modifier, and do not contain
|
|
||||||
// any 'await statements. In each of these methods,
|
|
||||||
// the only asynchronous call is the last (or only)
|
|
||||||
// statement of the method. In those instances,
|
|
||||||
// a Task returning method that does not use the
|
|
||||||
// async modifier is preferred. The compiler generates
|
|
||||||
// synchronous code for this method, but returns the
|
|
||||||
// task from the underlying asynchronous method. The
|
|
||||||
// generated code does not contain the state machine
|
|
||||||
// generated for asynchronous methods.
|
|
||||||
public Task<string> GetStringAsync(string uri) =>
|
public Task<string> GetStringAsync(string uri) =>
|
||||||
HttpInvoker(() => _client.GetStringAsync(uri));
|
HttpInvoker(() =>
|
||||||
|
_client.GetStringAsync(uri));
|
||||||
|
|
||||||
public Task<HttpResponseMessage> PostAsync<T>(string uri, T item) =>
|
public Task<HttpResponseMessage> PostAsync<T>(string uri, T item) =>
|
||||||
// a new StringContent must be created for each retry
|
// a new StringContent must be created for each retry
|
||||||
// as it is disposed after each call
|
// as it is disposed after each call
|
||||||
HttpInvoker(() =>_client.PostAsync(uri,
|
HttpInvoker(() =>
|
||||||
new StringContent(JsonConvert.SerializeObject(item),
|
{
|
||||||
System.Text.Encoding.UTF8, "application/json")));
|
var response = _client.PostAsync(uri, new StringContent(JsonConvert.SerializeObject(item), System.Text.Encoding.UTF8, "application/json"));
|
||||||
|
// raise exception if not success response
|
||||||
|
// needed for circuit breaker to track fails
|
||||||
|
response.Result.EnsureSuccessStatusCode();
|
||||||
|
return response;
|
||||||
|
});
|
||||||
|
|
||||||
public Task<HttpResponseMessage> DeleteAsync(string uri) =>
|
public Task<HttpResponseMessage> DeleteAsync(string uri) =>
|
||||||
HttpInvoker(() => _client.DeleteAsync(uri));
|
HttpInvoker(() => _client.DeleteAsync(uri));
|
||||||
|
@ -54,11 +54,11 @@ namespace Microsoft.eShopOnContainers.WebMVC
|
|||||||
|
|
||||||
if(Configuration.GetValue<string>("ActivateCircuitBreaker") == bool.TrueString)
|
if(Configuration.GetValue<string>("ActivateCircuitBreaker") == bool.TrueString)
|
||||||
{
|
{
|
||||||
services.AddSingleton<IHttpClient, HttpApiClientWrapper>();
|
services.AddTransient<IHttpClient, HttpApiClientWrapper>();
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
services.AddSingleton<IHttpClient, HttpApiClient>();
|
services.AddTransient<IHttpClient, HttpApiClient>();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -11,6 +11,8 @@ import 'rxjs/add/operator/catch';
|
|||||||
import { SecurityService } from './security.service';
|
import { SecurityService } from './security.service';
|
||||||
import { Guid } from '../../../guid';
|
import { Guid } from '../../../guid';
|
||||||
|
|
||||||
|
// Implementing a Retry-Circuit breaker policy
|
||||||
|
// is pending to do for the SPA app
|
||||||
@Injectable()
|
@Injectable()
|
||||||
export class DataService {
|
export class DataService {
|
||||||
constructor(private http: Http, private securityService: SecurityService) { }
|
constructor(private http: Http, private securityService: SecurityService) { }
|
||||||
|
Loading…
x
Reference in New Issue
Block a user