First iteration over ResilientHttpClient. In this case ConcurrentDictionary is used and control concurrency to add new origin and policies

This commit is contained in:
Unai Zorrilla Castro 2017-05-08 18:57:58 +02:00
parent f9e60c53a3
commit 86e11f1bfb

View File

@ -3,6 +3,7 @@ using Newtonsoft.Json;
using Polly; using Polly;
using Polly.Wrap; using Polly.Wrap;
using System; using System;
using System.Collections.Concurrent;
using System.Collections.Generic; using System.Collections.Generic;
using System.Linq; using System.Linq;
using System.Net; using System.Net;
@ -20,44 +21,56 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.Resilience.Http
public class ResilientHttpClient : IHttpClient public class ResilientHttpClient : IHttpClient
{ {
private HttpClient _client; private HttpClient _client;
private readonly Dictionary<string, PolicyWrap> _policiesPerOrigin; private readonly ConcurrentDictionary<string, PolicyWrap> _policiesPerOrigin;
private ILogger<ResilientHttpClient> _logger; private ILogger<ResilientHttpClient> _logger;
private readonly Func<string, IEnumerable<Policy>> _policyCreator; private readonly Func<string, IEnumerable<Policy>> _policyCreator;
//public HttpClient Inst => _client;
public ResilientHttpClient(Func<string, IEnumerable<Policy>> policyCreator, ILogger<ResilientHttpClient> logger) public ResilientHttpClient(Func<string, IEnumerable<Policy>> policyCreator, ILogger<ResilientHttpClient> logger)
{ {
_client = new HttpClient(); _client = new HttpClient();
_logger = logger; _logger = logger;
_policiesPerOrigin = new Dictionary<string, PolicyWrap>(); _policiesPerOrigin = new ConcurrentDictionary<string, PolicyWrap>();
_policyCreator = policyCreator; _policyCreator = policyCreator;
} }
private Task<T> HttpInvoker<T>(string origin, Func<Task<T>> action)
public Task<HttpResponseMessage> PostAsync<T>(string uri, T item, string authorizationToken = null, string requestId = null, string authorizationMethod = "Bearer")
{ {
var normalizedOrigin = NormalizeOrigin(origin); return DoPostPutAsync(HttpMethod.Post, uri, item, authorizationToken, requestId, authorizationMethod);
if (!_policiesPerOrigin.ContainsKey(normalizedOrigin))
{
var newWrapper = Policy.WrapAsync(_policyCreator(normalizedOrigin).ToArray());
_policiesPerOrigin.Add(normalizedOrigin, newWrapper);
}
var policyWrapper = _policiesPerOrigin[normalizedOrigin];
// Executes the action applying all
// the policies defined in the wrapper
return policyWrapper.ExecuteAsync(() => action());
} }
private static string NormalizeOrigin(string origin) public Task<HttpResponseMessage> PutAsync<T>(string uri, T item, string authorizationToken = null, string requestId = null, string authorizationMethod = "Bearer")
{ {
return origin?.Trim()?.ToLower(); return DoPostPutAsync(HttpMethod.Put, uri, item, authorizationToken, requestId, authorizationMethod);
}
public Task<HttpResponseMessage> DeleteAsync(string uri, string authorizationToken = null, string requestId = null, string authorizationMethod = "Bearer")
{
var origin = GetOriginFromUri(uri);
return HttpInvoker(origin, async () =>
{
var requestMessage = new HttpRequestMessage(HttpMethod.Delete, uri);
if (authorizationToken != null)
{
requestMessage.Headers.Authorization = new AuthenticationHeaderValue(authorizationMethod, authorizationToken);
}
if (requestId != null)
{
requestMessage.Headers.Add("x-requestid", requestId);
}
return await _client.SendAsync(requestMessage);
});
} }
public Task<string> GetStringAsync(string uri, string authorizationToken = null, string authorizationMethod = "Bearer") public Task<string> GetStringAsync(string uri, string authorizationToken = null, string authorizationMethod = "Bearer")
{ {
var origin = GetOriginFromUri(uri); var origin = GetOriginFromUri(uri);
return HttpInvoker(origin, async () => return HttpInvoker(origin, async () =>
{ {
var requestMessage = new HttpRequestMessage(HttpMethod.Get, uri); var requestMessage = new HttpRequestMessage(HttpMethod.Get, uri);
@ -73,13 +86,6 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.Resilience.Http
}); });
} }
private static string GetOriginFromUri(string uri)
{
var url = new Uri(uri);
var origin = $"{url.Scheme}://{url.DnsSafeHost}:{url.Port}";
return origin;
}
private Task<HttpResponseMessage> DoPostPutAsync<T>(HttpMethod method, string uri, T item, string authorizationToken = null, string requestId = null, string authorizationMethod = "Bearer") private Task<HttpResponseMessage> DoPostPutAsync<T>(HttpMethod method, string uri, T item, string authorizationToken = null, string requestId = null, string authorizationMethod = "Bearer")
{ {
if (method != HttpMethod.Post && method != HttpMethod.Put) if (method != HttpMethod.Post && method != HttpMethod.Put)
@ -90,6 +96,7 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.Resilience.Http
// a new StringContent must be created for each retry // a new StringContent must be created for each retry
// as it is disposed after each call // as it is disposed after each call
var origin = GetOriginFromUri(uri); var origin = GetOriginFromUri(uri);
return HttpInvoker(origin, async () => return HttpInvoker(origin, async () =>
{ {
var requestMessage = new HttpRequestMessage(method, uri); var requestMessage = new HttpRequestMessage(method, uri);
@ -120,34 +127,49 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.Resilience.Http
}); });
} }
public Task<HttpResponseMessage> PostAsync<T>(string uri, T item, string authorizationToken = null, string requestId = null, string authorizationMethod = "Bearer") private Task<T> HttpInvoker<T>(string origin, Func<Task<T>> action)
{ {
return DoPostPutAsync(HttpMethod.Post, uri, item, authorizationToken, requestId, authorizationMethod); var policyWrapper = GetPolicyForOrigin(origin);
}
public Task<HttpResponseMessage> PutAsync<T>(string uri, T item, string authorizationToken = null, string requestId = null, string authorizationMethod = "Bearer") if (policyWrapper != null)
{
return DoPostPutAsync(HttpMethod.Put, uri, item, authorizationToken, requestId, authorizationMethod);
}
public Task<HttpResponseMessage> DeleteAsync(string uri, string authorizationToken = null, string requestId = null, string authorizationMethod = "Bearer")
{
var origin = GetOriginFromUri(uri);
return HttpInvoker(origin, async () =>
{ {
var requestMessage = new HttpRequestMessage(HttpMethod.Delete, uri); // Executes the action applying all
// the policies defined in the wrapper
if (authorizationToken != null) return policyWrapper.ExecuteAsync(() => action());
{ }
requestMessage.Headers.Authorization = new AuthenticationHeaderValue(authorizationMethod, authorizationToken); else
} {
throw new InvalidOperationException($"PolicyWrapper can't be created for origin {origin}");
if (requestId != null) }
{
requestMessage.Headers.Add("x-requestid", requestId);
}
return await _client.SendAsync(requestMessage);
});
} }
private PolicyWrap GetPolicyForOrigin(string origin)
{
var normalizedOrigin = NormalizeOrigin(origin);
if (!_policiesPerOrigin.TryGetValue(normalizedOrigin, out PolicyWrap policyWrapper))
{
policyWrapper = Policy.WrapAsync(_policyCreator(normalizedOrigin)
.ToArray());
_policiesPerOrigin.TryAdd(normalizedOrigin, policyWrapper);
}
return policyWrapper;
}
private static string NormalizeOrigin(string origin)
{
return origin?.Trim()?.ToLower();
}
private static string GetOriginFromUri(string uri)
{
var url = new Uri(uri);
var origin = $"{url.Scheme}://{url.DnsSafeHost}:{url.Port}";
return origin;
}
} }
} }