Files
hcs/Hcs.Broker/Api/Request/RequestBase.cs

506 lines
22 KiB
C#
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

using Hcs.Broker.Api.Request.Adapter;
using Hcs.Broker.Api.Request.Exception;
using Hcs.Broker.Internal;
using System.Security.Cryptography.X509Certificates;
using System.ServiceModel;
using System.ServiceModel.Channels;
using System.ServiceModel.Description;
using System.ServiceModel.Security;
using System.Text;
namespace Hcs.Broker.Api.Request
{
internal abstract class RequestBase<TResult, TAsyncClient, TChannel, TRequestHeader, TAck, TErrorMessage, TGetStateRequest>
where TResult : IGetStateResult
where TAsyncClient : ClientBase<TChannel>, TChannel, IAsyncClient<TRequestHeader>
where TChannel : class
where TRequestHeader : class
where TAck : IAck
where TErrorMessage : IErrorMessage
where TGetStateRequest : IGetStateRequest, new()
{
private const int RESPONSE_WAIT_DELAY_MIN = 2;
private const int RESPONSE_WAIT_DELAY_MAX = 5;
// "[EXP001000] Произошла ошибка при передаче данных. Попробуйте осуществить передачу данных повторно".
// Видимо, эту ошибку нельзя включать здесь. Предположительно это маркер DDOS защиты и если отправлять
// точно такой же пакет повторно, то ошибка входит в бесконечный цикл - необходимо заново
// собирать пакет с новыми кодами и временем и новой подписью. Такую ошибку надо обнаруживать
// на более высоком уровне и заново отправлять запрос новым пакетом.
private static readonly string[] ignorableSystemErrorMarkers = [
"Истекло время ожидания шлюза",
"Базовое соединение закрыто: Соединение, которое должно было работать, было разорвано сервером",
"Попробуйте осуществить передачу данных повторно",
"(502) Недопустимый шлюз",
"(503) Сервер не доступен"
];
protected Client client;
protected CustomBinding binding;
protected abstract EndPoint EndPoint { get; }
/// <summary>
/// Для запросов, возвращающих мало данных, можно попробовать сократить
/// начальный период ожидания подготовки ответа
/// </summary>
protected abstract bool EnableMinimalResponseWaitDelay { get; }
/// <summary>
/// Указывает на то, что можно ли этот метод перезапускать в случае зависания
/// ожидания или в случае сбоя на сервере
/// </summary>
protected abstract bool CanBeRestarted { get; }
/// <summary>
/// Для противодействия зависанию ожидания вводится предел ожидания в минутах
/// для запросов, которые можно перезапустить заново с теми же параметрами
/// </summary>
protected abstract int RestartTimeoutMinutes { get; }
private EndpointAddress RemoteAddress => new(client.ComposeEndpointUri(EndPointLocator.GetPath(EndPoint)));
private string ThreadIdText => $"(Thread #{ThreadId})";
/// <summary>
/// Возвращает идентификатор текущего исполняемого потока
/// </summary>
private int ThreadId => Environment.CurrentManagedThreadId;
public RequestBase(Client client)
{
this.client = client;
ConfigureBinding();
}
private void ConfigureBinding()
{
binding = new CustomBinding
{
CloseTimeout = TimeSpan.FromSeconds(180),
OpenTimeout = TimeSpan.FromSeconds(180),
ReceiveTimeout = TimeSpan.FromSeconds(180),
SendTimeout = TimeSpan.FromSeconds(180)
};
binding.Elements.Add(new TextMessageEncodingBindingElement
{
MessageVersion = MessageVersion.Soap11,
WriteEncoding = Encoding.UTF8
});
if (client.UseTunnel)
{
if (!System.Diagnostics.Process.GetProcessesByName("stunnel").Any())
{
throw new System.Exception("stunnel is not running");
}
binding.Elements.Add(new HttpTransportBindingElement
{
AuthenticationScheme = (client.IsPPAK ? System.Net.AuthenticationSchemes.Digest : System.Net.AuthenticationSchemes.Basic),
MaxReceivedMessageSize = int.MaxValue,
UseDefaultWebProxy = false
});
}
else
{
binding.Elements.Add(new HttpsTransportBindingElement
{
AuthenticationScheme = (client.IsPPAK ? System.Net.AuthenticationSchemes.Digest : System.Net.AuthenticationSchemes.Basic),
MaxReceivedMessageSize = int.MaxValue,
RequireClientCertificate = true,
UseDefaultWebProxy = false
});
}
}
protected async Task<TResult> SendAndWaitResultAsync(
object request,
Func<TAsyncClient, Task<TAck>> sender,
CancellationToken token)
{
token.ThrowIfCancellationRequested();
while (true)
{
try
{
if (CanBeRestarted)
{
return await RunRepeatableTaskInsistentlyAsync(
async () => await ExecuteSendAndWaitResultAsync(request, sender, token), token);
}
else
{
return await ExecuteSendAndWaitResultAsync(request, sender, token);
}
}
catch (RestartTimeoutException e)
{
if (!CanBeRestarted)
{
throw new System.Exception("Cannot restart request execution on timeout", e);
}
client.TryLog($"Restarting {request.GetType().Name} request execution...");
}
}
}
protected async Task<string> SendAsync(
object request,
Func<TAsyncClient, Task<TAck>> sender,
CancellationToken token)
{
token.ThrowIfCancellationRequested();
while (true)
{
try
{
if (CanBeRestarted)
{
return await RunRepeatableTaskInsistentlyAsync(
async () => await ExecuteSendAsync(request, sender), token);
}
else
{
return await ExecuteSendAsync(request, sender);
}
}
catch (RestartTimeoutException e)
{
if (!CanBeRestarted)
{
throw new System.Exception("Cannot restart request execution on timeout", e);
}
client.TryLog($"Restarting {request.GetType().Name} request execution...");
}
}
}
/// <summary>
/// Для запросов к серверу которые можно направлять несколько раз, разрешаем
/// серверу аномально отказаться. Предполагается, что здесь мы игнорируем
/// только жесткие отказы серверной инфраструктуры, которые указывают
/// что запрос даже не был принят в обработку. Также все запросы на
/// чтение можно повторять в случае их серверных системных ошибок.
/// </summary>
protected async Task<TRepeatableResult> RunRepeatableTaskInsistentlyAsync<TRepeatableResult>(
Func<Task<TRepeatableResult>> func, CancellationToken token)
{
var afterErrorDelaySec = 120;
for (var attempt = 1; ; attempt++)
{
try
{
return await func();
}
catch (System.Exception e)
{
if (CanIgnoreSuchException(e, out string marker))
{
client.TryLog($"Ignoring error of attempt #{attempt} with type [{marker}]");
client.TryLog($"Waiting {afterErrorDelaySec} sec until next attempt...");
await Task.Delay(afterErrorDelaySec * 1000, token);
continue;
}
if (e is RestartTimeoutException)
{
throw e;
}
if (e is RemoteException)
{
throw RemoteException.CreateNew(e as RemoteException);
}
throw new System.Exception("Cannot ignore this exception", e);
}
}
}
private bool CanIgnoreSuchException(System.Exception e, out string resultMarker)
{
foreach (var marker in ignorableSystemErrorMarkers)
{
var found = Util.EnumerateInnerExceptions(e).Find(
x => x.Message != null && x.Message.Contains(marker));
if (found != null)
{
resultMarker = marker;
return true;
}
}
resultMarker = null;
return false;
}
private async Task<TResult> ExecuteSendAndWaitResultAsync(
object request,
Func<TAsyncClient, Task<TAck>> sender,
CancellationToken token)
{
if (request == null)
{
throw new ArgumentNullException(nameof(request));
}
var version = RequestHelper.GetRequestVersionString(request);
client.TryLog($"Executing request {RemoteAddress.Uri}/{request.GetType().Name} of version {version}...");
TAck ack;
var stopWatch = System.Diagnostics.Stopwatch.StartNew();
using (var asyncClient = CreateAsyncClient())
{
ack = await sender(asyncClient);
}
stopWatch.Stop();
client.TryLog($"Request executed in {stopWatch.ElapsedMilliseconds} ms, result GUID is {ack.MessageGUID}");
var result = await WaitForResultAsync(ack, true, token);
if (result is IQueryable queryableResult)
{
queryableResult.OfType<TErrorMessage>().ToList().ForEach(x =>
{
throw RemoteException.CreateNew(x.ErrorCode, x.Description);
});
}
else if (result is TErrorMessage x)
{
throw RemoteException.CreateNew(x.ErrorCode, x.Description);
}
return result;
}
private async Task<string> ExecuteSendAsync(
object request,
Func<TAsyncClient, Task<TAck>> sender)
{
ArgumentNullException.ThrowIfNull(request);
var version = RequestHelper.GetRequestVersionString(request);
client.TryLog($"Executing request {RemoteAddress.Uri}/{request.GetType().Name} of version {version}...");
TAck ack;
var stopWatch = System.Diagnostics.Stopwatch.StartNew();
using (var asyncClient = CreateAsyncClient())
{
ack = await sender(asyncClient);
}
stopWatch.Stop();
client.TryLog($"Request executed in {stopWatch.ElapsedMilliseconds} ms, result GUID is {ack.MessageGUID}");
return ack.MessageGUID;
}
protected async Task<TResult?> ExecuteGetResultAsync(string messageGuid)
{
using var asyncClient = CreateAsyncClient();
var requestHeader = RequestHelper.CreateHeader<TRequestHeader>(client);
var requestBody = new TGetStateRequest
{
MessageGUID = messageGuid
};
var response = await asyncClient.GetStateAsync(requestHeader, requestBody);
var result = response.GetStateResult;
if (result.RequestState == (int)AsyncRequestStateType.Ready)
{
if (result is IQueryable queryableResult)
{
queryableResult.OfType<TErrorMessage>().ToList().ForEach(x =>
{
throw RemoteException.CreateNew(x.ErrorCode, x.Description);
});
}
else if (result is TErrorMessage x)
{
throw RemoteException.CreateNew(x.ErrorCode, x.Description);
}
return (TResult)result;
}
return default;
}
private TAsyncClient CreateAsyncClient()
{
var asyncClient = (TAsyncClient)Activator.CreateInstance(typeof(TAsyncClient), binding, RemoteAddress);
ConfigureEndpointCredentials(asyncClient.Endpoint, asyncClient.ClientCredentials);
return asyncClient;
}
private void ConfigureEndpointCredentials(
ServiceEndpoint serviceEndpoint, ClientCredentials clientCredentials)
{
serviceEndpoint.EndpointBehaviors.Add(new GostSigningEndpointBehavior(client));
if (!client.IsPPAK)
{
clientCredentials.UserName.UserName = Constants.NAME_SIT;
clientCredentials.UserName.Password = Constants.PASSWORD_SIT;
}
clientCredentials.ServiceCertificate.SslCertificateAuthentication =
new X509ServiceCertificateAuthentication()
{
CertificateValidationMode = X509CertificateValidationMode.None,
RevocationMode = X509RevocationMode.NoCheck
};
if (!client.UseTunnel)
{
clientCredentials.ClientCertificate.SetCertificate(
StoreLocation.CurrentUser,
StoreName.My,
X509FindType.FindByThumbprint,
client.Certificate.Thumbprint);
}
}
/// <summary>
/// Основной алгоритм ожидания ответа на асинхронный запрос.
/// Из документации ГИС ЖКХ:
/// Также рекомендуем придерживаться следующего алгоритма отправки запросов на получение статуса обработки пакета в случае использования асинхронных сервисов ГИС ЖКХ (в рамках одного MessageGUID):
/// - первый запрос getState направлять не ранее чем через 10 секунд, после получения квитанции о приеме пакета с бизнес-данными от сервиса ГИС КЖХ;
/// - в случае, если на первый запрос getState получен результат с RequestState равным "1" или "2", то следующий запрос getState необходимо направлять не ранее чем через 60 секунд после отправки предыдущего запроса;
/// - в случае, если на второй запрос getState получен результат с RequestState равным "1" или "2", то следующий запрос getState необходимо направлять не ранее чем через 300 секунд после отправки предыдущего запроса;
/// - в случае, если на третий запрос getState получен результат с RequestState равным "1" или "2", то следующий запрос getState необходимо направлять не ранее чем через 900 секунд после отправки предыдущего запроса;
/// - в случае, если на четвертый(и все последующие запросы) getState получен результат с RequestState равным "1" или "2", то следующий запрос getState необходимо направлять не ранее чем через 1800 секунд после отправки предыдущего запроса.
/// </summary>
private async Task<TResult> WaitForResultAsync(
TAck ack, bool withInitialDelay, CancellationToken token)
{
TResult result;
var startTime = DateTime.Now;
for (var attempts = 1; ; attempts++)
{
token.ThrowIfCancellationRequested();
var delaySec = EnableMinimalResponseWaitDelay ? RESPONSE_WAIT_DELAY_MIN : RESPONSE_WAIT_DELAY_MAX;
if (attempts >= 2)
{
delaySec = RESPONSE_WAIT_DELAY_MAX;
}
if (attempts >= 3)
{
delaySec = RESPONSE_WAIT_DELAY_MAX * 2;
}
if (attempts >= 5)
{
delaySec = RESPONSE_WAIT_DELAY_MAX * 4;
}
if (attempts >= 7)
{
delaySec = RESPONSE_WAIT_DELAY_MAX * 8;
}
if (attempts >= 9)
{
delaySec = RESPONSE_WAIT_DELAY_MAX * 16;
}
if (attempts >= 12)
{
delaySec = RESPONSE_WAIT_DELAY_MAX * 60;
}
if (attempts > 1 || withInitialDelay)
{
var minutesElapsed = (int)(DateTime.Now - startTime).TotalMinutes;
if (CanBeRestarted && minutesElapsed > RestartTimeoutMinutes)
{
throw new RestartTimeoutException($"{RestartTimeoutMinutes} minute(s) wait time exceeded");
}
client.TryLog($"Waiting {delaySec} sec for attempt #{attempts}" +
$" to get response ({minutesElapsed} minute(s) elapsed)...");
await Task.Delay(delaySec * 1000, token);
}
client.TryLog($"Requesting response, attempt #{attempts} in {ThreadIdText}...");
result = await TryGetResultAsync(ack);
if (result != null)
{
break;
}
}
client.TryLog($"Response received!");
return result;
}
/// <summary>
/// Выполняет однократную проверку наличия результата.
/// Возвращает default если результата еще нет.
/// </summary>
private async Task<TResult> TryGetResultAsync(TAck ack)
{
using var asyncClient = CreateAsyncClient();
var requestHeader = RequestHelper.CreateHeader<TRequestHeader>(client);
var requestBody = new TGetStateRequest
{
MessageGUID = ack.MessageGUID
};
var response = await asyncClient.GetStateAsync(requestHeader, requestBody);
var result = response.GetStateResult;
if (result.RequestState == (int)AsyncRequestStateType.Ready)
{
return (TResult)result;
}
return default;
}
protected TRequestHeader CreateRequestHeader()
{
return RequestHelper.CreateHeader<TRequestHeader>(client);
}
/// <summary>
/// Исполнение повторяемой операции некоторое допустимое число ошибок
/// </summary>
protected async Task<TRepeatableResult> RunRepeatableTaskAsync<TRepeatableResult>(
Func<Task<TRepeatableResult>> taskFunc, Func<System.Exception, bool> canIgnoreFunc, int maxAttempts)
{
for (var attempts = 1; ; attempts++)
{
try
{
return await taskFunc();
}
catch (System.Exception e)
{
if (canIgnoreFunc(e))
{
if (attempts < maxAttempts)
{
client.TryLog($"Ignoring error of attempt #{attempts} of {maxAttempts} attempts");
continue;
}
throw new System.Exception("Too much attempts with error");
}
throw e;
}
}
}
}
}