2017-05-09 20:27:00 +02:00

198 lines
7.2 KiB
C#

using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using Polly;
using Polly.Wrap;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Net;
using System.Net.Http;
using System.Net.Http.Headers;
using System.Threading.Tasks;
namespace Microsoft.eShopOnContainers.BuildingBlocks.Resilience.Http
{
/// <summary>
/// HttpClient wrapper that integrates Retry and Circuit
/// breaker policies when invoking HTTP services.
/// Based on Polly library: https://github.com/App-vNext/Polly
/// </summary>
public class ResilientHttpClient : IHttpClient
{
private readonly HttpClient _client;
private readonly ILogger<ResilientHttpClient> _logger;
private ConcurrentDictionary<string, PolicyWrap> _policyWrappers;
public ResilientHttpClient(ILogger<ResilientHttpClient> logger)
{
_client = new HttpClient();
_logger = logger;
_policyWrappers = new ConcurrentDictionary<string, PolicyWrap>();
}
public Task<HttpResponseMessage> PostAsync<T>(string uri, T item, string authorizationToken = null, string requestId = null, string authorizationMethod = "Bearer")
{
return DoPostPutAsync(HttpMethod.Post, uri, item, authorizationToken, requestId, authorizationMethod);
}
public Task<HttpResponseMessage> PutAsync<T>(string uri, T item, string authorizationToken = null, string requestId = null, string authorizationMethod = "Bearer")
{
return DoPostPutAsync(HttpMethod.Put, uri, item, authorizationToken, requestId, authorizationMethod);
}
public Task<HttpResponseMessage> DeleteAsync(string uri, string authorizationToken = null, string requestId = null, string authorizationMethod = "Bearer")
{
var origin = GetOriginFromUri(uri);
return HttpInvoker(origin, async () =>
{
var requestMessage = new HttpRequestMessage(HttpMethod.Delete, uri);
if (authorizationToken != null)
{
requestMessage.Headers.Authorization = new AuthenticationHeaderValue(authorizationMethod, authorizationToken);
}
if (requestId != null)
{
requestMessage.Headers.Add("x-requestid", requestId);
}
return await _client.SendAsync(requestMessage);
});
}
public Task<string> GetStringAsync(string uri, string authorizationToken = null, string authorizationMethod = "Bearer")
{
var origin = GetOriginFromUri(uri);
return HttpInvoker(origin, async () =>
{
var requestMessage = new HttpRequestMessage(HttpMethod.Get, uri);
if (authorizationToken != null)
{
requestMessage.Headers.Authorization = new AuthenticationHeaderValue(authorizationMethod, authorizationToken);
}
var response = await _client.SendAsync(requestMessage);
return await response.Content.ReadAsStringAsync();
});
}
private Task<HttpResponseMessage> DoPostPutAsync<T>(HttpMethod method, string uri, T item, string authorizationToken = null, string requestId = null, string authorizationMethod = "Bearer")
{
if (method != HttpMethod.Post && method != HttpMethod.Put)
{
throw new ArgumentException("Value must be either post or put.", nameof(method));
}
// a new StringContent must be created for each retry
// as it is disposed after each call
var origin = GetOriginFromUri(uri);
return HttpInvoker(origin, () =>
{
var requestMessage = new HttpRequestMessage(method, uri);
requestMessage.Content = new StringContent(JsonConvert.SerializeObject(item), System.Text.Encoding.UTF8, "application/json");
if (authorizationToken != null)
{
requestMessage.Headers.Authorization = new AuthenticationHeaderValue(authorizationMethod, authorizationToken);
}
if (requestId != null)
{
requestMessage.Headers.Add("x-requestid", requestId);
}
var response = _client.SendAsync(requestMessage).Result;
// raise exception if HttpResponseCode 500
// needed for circuit breaker to track fails
if (response.StatusCode == HttpStatusCode.InternalServerError)
{
throw new HttpRequestException();
}
return Task.FromResult(response);
});
}
private async Task<T> HttpInvoker<T>(string origin, Func<Task<T>> action)
{
var normalizedOrigin = NormalizeOrigin(origin);
if (!_policyWrappers.TryGetValue(normalizedOrigin, out PolicyWrap policyWrap))
{
policyWrap = Policy.Wrap(CreatePolicies());
_policyWrappers.TryAdd(normalizedOrigin, policyWrap);
}
// Executes the action applying all
// the policies defined in the wrapper
return await policyWrap.Execute(action, new Context(normalizedOrigin));
}
private static string NormalizeOrigin(string origin)
{
return origin?.Trim()?.ToLower();
}
private static string GetOriginFromUri(string uri)
{
var url = new Uri(uri);
var origin = $"{url.Scheme}://{url.DnsSafeHost}:{url.Port}";
return origin;
}
private Policy[] CreatePolicies()
{
return new Policy[]
{
Policy.Handle<HttpRequestException>()
.WaitAndRetry(
// number of retries
6,
// exponential backofff
retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)),
// on retry
(exception, timeSpan, retryCount, context) =>
{
var msg = $"Retry {retryCount} implemented with Polly's RetryPolicy " +
$"of {context.PolicyKey} " +
$"at {context.ExecutionKey}, " +
$"due to: {exception}.";
_logger.LogWarning(msg);
_logger.LogDebug(msg);
}),
Policy.Handle<HttpRequestException>()
.CircuitBreaker(
// number of exceptions before breaking circuit
5,
// time circuit opened before retry
TimeSpan.FromMinutes(1),
(exception, duration) =>
{
// on circuit opened
_logger.LogTrace("Circuit breaker opened");
},
() =>
{
// on circuit closed
_logger.LogTrace("Circuit breaker reset");
})
};
}
}
}