refactoring: webhooks.service: file-scope namespace

This commit is contained in:
Anton Suhorukov 2022-02-03 16:41:09 +03:00
parent 2ff2e64f4d
commit a04f7824fc
5 changed files with 270 additions and 275 deletions

View File

@ -21,87 +21,86 @@ using Microsoft.Extensions.Hosting;
using StackExchange.Redis.Extensions.Core.Configuration; using StackExchange.Redis.Extensions.Core.Configuration;
using StackExchange.Redis.Extensions.Newtonsoft; using StackExchange.Redis.Extensions.Newtonsoft;
namespace ASC.Webhooks.Service namespace ASC.Webhooks.Service;
public class Program
{ {
public class Program public async static Task Main(string[] args)
{ {
public async static Task Main(string[] args) var host = CreateHostBuilder(args).Build();
{
var host = CreateHostBuilder(args).Build();
await host.RunAsync(); await host.RunAsync();
}
public static IHostBuilder CreateHostBuilder(string[] args) =>
Host.CreateDefaultBuilder(args)
.UseSystemd()
.UseWindowsService()
.UseServiceProviderFactory(new AutofacServiceProviderFactory())
.ConfigureWebHostDefaults(webBuilder => webBuilder.UseStartup<BaseWorkerStartup>())
.ConfigureAppConfiguration((hostContext, config) =>
{
var buided = config.Build();
var path = buided["pathToConf"];
if (!Path.IsPathRooted(path))
{
path = Path.GetFullPath(CrossPlatform.PathCombine(hostContext.HostingEnvironment.ContentRootPath, path));
}
config.SetBasePath(path);
var env = hostContext.Configuration.GetValue("ENVIRONMENT", "Production");
config
.AddInMemoryCollection(new Dictionary<string, string>
{
{"pathToConf", path }
}
)
.AddJsonFile("appsettings.json")
.AddJsonFile($"appsettings.{env}.json", true)
.AddJsonFile($"appsettings.services.json", true)
.AddJsonFile("storage.json")
.AddJsonFile("kafka.json")
.AddJsonFile($"kafka.{env}.json", true)
.AddJsonFile("redis.json")
.AddJsonFile($"redis.{env}.json", true)
.AddEnvironmentVariables()
.AddCommandLine(args);
})
.ConfigureServices((hostContext, services) =>
{
services.AddMemoryCache();
var diHelper = new DIHelper(services);
var redisConfiguration = hostContext.Configuration.GetSection("Redis").Get<RedisConfiguration>();
var kafkaConfiguration = hostContext.Configuration.GetSection("kafka").Get<KafkaSettings>();
if (kafkaConfiguration != null)
{
diHelper.TryAdd(typeof(ICacheNotify<>), typeof(KafkaCache<>));
}
else if (redisConfiguration != null)
{
diHelper.TryAdd(typeof(ICacheNotify<>), typeof(RedisCache<>));
services.AddStackExchangeRedisExtensions<NewtonsoftSerializer>(redisConfiguration);
}
else
{
diHelper.TryAdd(typeof(ICacheNotify<>), typeof(MemoryCacheNotify<>));
}
diHelper.TryAdd<DbWorker>();
services.AddHostedService<BuildQueueService>();
diHelper.TryAdd<BuildQueueService>();
services.AddHostedService<WorkerService>();
diHelper.TryAdd<WorkerService>();
})
.ConfigureContainer<ContainerBuilder>((context, builder) =>
{
builder.Register(context.Configuration, false, false);
})
.ConfigureNLogLogging();
} }
public static IHostBuilder CreateHostBuilder(string[] args) =>
Host.CreateDefaultBuilder(args)
.UseSystemd()
.UseWindowsService()
.UseServiceProviderFactory(new AutofacServiceProviderFactory())
.ConfigureWebHostDefaults(webBuilder => webBuilder.UseStartup<BaseWorkerStartup>())
.ConfigureAppConfiguration((hostContext, config) =>
{
var buided = config.Build();
var path = buided["pathToConf"];
if (!Path.IsPathRooted(path))
{
path = Path.GetFullPath(CrossPlatform.PathCombine(hostContext.HostingEnvironment.ContentRootPath, path));
}
config.SetBasePath(path);
var env = hostContext.Configuration.GetValue("ENVIRONMENT", "Production");
config
.AddInMemoryCollection(new Dictionary<string, string>
{
{"pathToConf", path }
}
)
.AddJsonFile("appsettings.json")
.AddJsonFile($"appsettings.{env}.json", true)
.AddJsonFile($"appsettings.services.json", true)
.AddJsonFile("storage.json")
.AddJsonFile("kafka.json")
.AddJsonFile($"kafka.{env}.json", true)
.AddJsonFile("redis.json")
.AddJsonFile($"redis.{env}.json", true)
.AddEnvironmentVariables()
.AddCommandLine(args);
})
.ConfigureServices((hostContext, services) =>
{
services.AddMemoryCache();
var diHelper = new DIHelper(services);
var redisConfiguration = hostContext.Configuration.GetSection("Redis").Get<RedisConfiguration>();
var kafkaConfiguration = hostContext.Configuration.GetSection("kafka").Get<KafkaSettings>();
if (kafkaConfiguration != null)
{
diHelper.TryAdd(typeof(ICacheNotify<>), typeof(KafkaCache<>));
}
else if (redisConfiguration != null)
{
diHelper.TryAdd(typeof(ICacheNotify<>), typeof(RedisCache<>));
services.AddStackExchangeRedisExtensions<NewtonsoftSerializer>(redisConfiguration);
}
else
{
diHelper.TryAdd(typeof(ICacheNotify<>), typeof(MemoryCacheNotify<>));
}
diHelper.TryAdd<DbWorker>();
services.AddHostedService<BuildQueueService>();
diHelper.TryAdd<BuildQueueService>();
services.AddHostedService<WorkerService>();
diHelper.TryAdd<WorkerService>();
})
.ConfigureContainer<ContainerBuilder>((context, builder) =>
{
builder.Register(context.Configuration, false, false);
})
.ConfigureNLogLogging();
} }

View File

@ -8,35 +8,34 @@ using ASC.Web.Webhooks;
using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Hosting;
namespace ASC.Webhooks.Service.Services namespace ASC.Webhooks.Service.Services;
[Singletone]
public class BuildQueueService : IHostedService
{ {
[Singletone] internal ConcurrentQueue<WebhookRequest> Queue { get; }
public class BuildQueueService : IHostedService private readonly ICacheNotify<WebhookRequest> _webhookNotify;
public BuildQueueService(ICacheNotify<WebhookRequest> webhookNotify)
{ {
internal ConcurrentQueue<WebhookRequest> Queue { get; } _webhookNotify = webhookNotify;
private readonly ICacheNotify<WebhookRequest> _webhookNotify; Queue = new ConcurrentQueue<WebhookRequest>();
}
public BuildQueueService(ICacheNotify<WebhookRequest> webhookNotify) public Task StartAsync(CancellationToken cancellationToken)
{ {
_webhookNotify = webhookNotify; _webhookNotify.Subscribe(BuildWebhooksQueue, CacheNotifyAction.Update);
Queue = new ConcurrentQueue<WebhookRequest>(); return Task.CompletedTask;
} }
public Task StartAsync(CancellationToken cancellationToken) public Task StopAsync(CancellationToken cancellationToken)
{ {
_webhookNotify.Subscribe(BuildWebhooksQueue, CacheNotifyAction.Update); _webhookNotify.Unsubscribe(CacheNotifyAction.Update);
return Task.CompletedTask; return Task.CompletedTask;
} }
public Task StopAsync(CancellationToken cancellationToken) public void BuildWebhooksQueue(WebhookRequest request)
{ {
_webhookNotify.Unsubscribe(CacheNotifyAction.Update); Queue.Enqueue(request);
return Task.CompletedTask;
}
public void BuildWebhooksQueue(WebhookRequest request)
{
Queue.Enqueue(request);
}
} }
} }

View File

@ -10,49 +10,70 @@ using ASC.Web.Webhooks;
using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Hosting;
namespace ASC.Webhooks.Service.Services namespace ASC.Webhooks.Service.Services;
[Singletone]
public class WorkerService: IHostedService, IDisposable
{ {
[Singletone] private readonly int? _threadCount = 10;
public class WorkerService: IHostedService, IDisposable private readonly WebhookSender _webhookSender;
private readonly ConcurrentQueue<WebhookRequest> _queue;
private CancellationToken _cancellationToken;
private readonly ILog _logger;
private Timer _timer;
public WorkerService(WebhookSender webhookSender,
ILog logger,
BuildQueueService buildQueueService,
Settings settings)
{ {
private readonly int? _threadCount = 10; _logger = logger;
private readonly WebhookSender _webhookSender; _webhookSender = webhookSender;
private readonly ConcurrentQueue<WebhookRequest> _queue; _queue = buildQueueService.Queue;
private CancellationToken _cancellationToken; _threadCount = settings.ThreadCount;
private readonly ILog _logger; }
private Timer _timer;
public WorkerService(WebhookSender webhookSender, public Task StartAsync(CancellationToken cancellationToken)
ILog logger, {
BuildQueueService buildQueueService, _cancellationToken = cancellationToken;
Settings settings)
{
_logger = logger;
_webhookSender = webhookSender;
_queue = buildQueueService.Queue;
_threadCount = settings.ThreadCount;
}
public Task StartAsync(CancellationToken cancellationToken) _timer = new Timer(DoWork, null, 0, Timeout.Infinite);
{ return Task.CompletedTask;
_cancellationToken = cancellationToken; }
_timer = new Timer(DoWork, null, 0, Timeout.Infinite); public Task StopAsync(CancellationToken cancellationToken)
return Task.CompletedTask; {
} Stop();
return Task.CompletedTask;
}
public Task StopAsync(CancellationToken cancellationToken) private void Stop()
{
_timer?.Change(Timeout.Infinite, 0);
}
private void DoWork(object _)
{
if (_cancellationToken.IsCancellationRequested)
{ {
Stop(); Stop();
return Task.CompletedTask; return;
} }
private void Stop() _timer.Change(Timeout.Infinite, Timeout.Infinite);
_logger.Debug("Procedure: Start.");
var queueSize = _queue.Count;
if (queueSize == 0)// change to "<= threadCount"
{ {
_timer?.Change(Timeout.Infinite, 0); _logger.TraceFormat("Procedure: Waiting for data. Sleep {0}.", 5);
_timer.Change(TimeSpan.FromSeconds(5), TimeSpan.FromMilliseconds(-1));
return;
} }
private void DoWork(object _) var tasks = new List<Task>();
var counter = 0;
for (int i = 0; i < queueSize; i++)
{ {
if (_cancellationToken.IsCancellationRequested) if (_cancellationToken.IsCancellationRequested)
{ {
@ -60,49 +81,27 @@ namespace ASC.Webhooks.Service.Services
return; return;
} }
_timer.Change(Timeout.Infinite, Timeout.Infinite); if (!_queue.TryDequeue(out var entry))
_logger.Debug("Procedure: Start."); break;
var queueSize = _queue.Count;
if (queueSize == 0)// change to "<= threadCount" tasks.Add(_webhookSender.Send(entry, _cancellationToken));
counter++;
if (counter >= _threadCount)
{ {
_logger.TraceFormat("Procedure: Waiting for data. Sleep {0}.", 5); Task.WaitAll(tasks.ToArray());
_timer.Change(TimeSpan.FromSeconds(5), TimeSpan.FromMilliseconds(-1)); tasks.Clear();
return; counter = 0;
} }
var tasks = new List<Task>();
var counter = 0;
for (int i = 0; i < queueSize; i++)
{
if (_cancellationToken.IsCancellationRequested)
{
Stop();
return;
}
if (!_queue.TryDequeue(out var entry))
break;
tasks.Add(_webhookSender.Send(entry, _cancellationToken));
counter++;
if (counter >= _threadCount)
{
Task.WaitAll(tasks.ToArray());
tasks.Clear();
counter = 0;
}
}
Task.WaitAll(tasks.ToArray());
_logger.Debug("Procedure: Finish.");
_timer.Change(0, Timeout.Infinite);
} }
public void Dispose() Task.WaitAll(tasks.ToArray());
{ _logger.Debug("Procedure: Finish.");
_timer?.Dispose(); _timer.Change(0, Timeout.Infinite);
} }
public void Dispose()
{
_timer?.Dispose();
} }
} }

View File

@ -2,24 +2,23 @@
using ASC.Common; using ASC.Common;
using ASC.Common.Utils; using ASC.Common.Utils;
namespace ASC.Webhooks.Service namespace ASC.Webhooks.Service;
[Singletone]
public class Settings
{ {
[Singletone] public Settings()
public class Settings
{ {
public Settings()
{
}
public Settings(ConfigurationExtension configuration)
{
var cfg = configuration.GetSetting<Settings>("webhooks");
RepeatCount = cfg.RepeatCount ?? 5;
ThreadCount = cfg.ThreadCount ?? 1;
}
public int? RepeatCount { get; }
public int? ThreadCount { get; }
} }
public Settings(ConfigurationExtension configuration)
{
var cfg = configuration.GetSetting<Settings>("webhooks");
RepeatCount = cfg.RepeatCount ?? 5;
ThreadCount = cfg.ThreadCount ?? 1;
}
public int? RepeatCount { get; }
public int? ThreadCount { get; }
} }

View File

@ -16,104 +16,103 @@ using ASC.Webhooks.Core;
using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Options; using Microsoft.Extensions.Options;
namespace ASC.Webhooks.Service namespace ASC.Webhooks.Service;
[Singletone]
public class WebhookSender
{ {
[Singletone] private readonly int? _repeatCount;
public class WebhookSender private static readonly HttpClient httpClient = new HttpClient();
private readonly IServiceScopeFactory _scopeFactory;
private readonly ILog _log;
public WebhookSender(IOptionsMonitor<ILog> options, IServiceScopeFactory scopeFactory, Settings settings)
{ {
private readonly int? _repeatCount; _log = options.Get("ASC.Webhooks.Core");
private static readonly HttpClient httpClient = new HttpClient(); _scopeFactory = scopeFactory;
private readonly IServiceScopeFactory _scopeFactory; _repeatCount = settings.RepeatCount;
private readonly ILog _log; }
public WebhookSender(IOptionsMonitor<ILog> options, IServiceScopeFactory scopeFactory, Settings settings) public async Task Send(WebhookRequest webhookRequest, CancellationToken cancellationToken)
{
using var scope = _scopeFactory.CreateScope();
var dbWorker = scope.ServiceProvider.GetService<DbWorker>();
var entry = dbWorker.ReadFromJournal(webhookRequest.Id);
var id = entry.Id;
var requestURI = entry.Uri;
var secretKey = entry.SecretKey;
var data = entry.Payload;
HttpResponseMessage response = new HttpResponseMessage();
HttpRequestMessage request = new HttpRequestMessage();
for (int i = 0; i < _repeatCount; i++)
{ {
_log = options.Get("ASC.Webhooks.Core"); try
_scopeFactory = scopeFactory;
_repeatCount = settings.RepeatCount;
}
public async Task Send(WebhookRequest webhookRequest, CancellationToken cancellationToken)
{
using var scope = _scopeFactory.CreateScope();
var dbWorker = scope.ServiceProvider.GetService<DbWorker>();
var entry = dbWorker.ReadFromJournal(webhookRequest.Id);
var id = entry.Id;
var requestURI = entry.Uri;
var secretKey = entry.SecretKey;
var data = entry.Payload;
HttpResponseMessage response = new HttpResponseMessage();
HttpRequestMessage request = new HttpRequestMessage();
for (int i = 0; i < _repeatCount; i++)
{ {
try request = new HttpRequestMessage(HttpMethod.Post, requestURI);
request.Headers.Add("Accept", "*/*");
request.Headers.Add("Secret", "SHA256=" + GetSecretHash(secretKey, data));
request.Content = new StringContent(
data,
Encoding.UTF8,
"application/json");
response = await httpClient.SendAsync(request, cancellationToken);
if (response.IsSuccessStatusCode)
{ {
request = new HttpRequestMessage(HttpMethod.Post, requestURI); UpdateDb(dbWorker, id, response, request, ProcessStatus.Success);
request.Headers.Add("Accept", "*/*"); _log.Debug("Response: " + response);
request.Headers.Add("Secret", "SHA256=" + GetSecretHash(secretKey, data)); break;
request.Content = new StringContent(
data,
Encoding.UTF8,
"application/json");
response = await httpClient.SendAsync(request, cancellationToken);
if (response.IsSuccessStatusCode)
{
UpdateDb(dbWorker, id, response, request, ProcessStatus.Success);
_log.Debug("Response: " + response);
break;
}
else if (i == _repeatCount - 1)
{
UpdateDb(dbWorker, id, response, request, ProcessStatus.Failed);
_log.Debug("Response: " + response);
}
} }
catch (Exception ex) else if (i == _repeatCount - 1)
{ {
if (i == _repeatCount - 1) UpdateDb(dbWorker, id, response, request, ProcessStatus.Failed);
{ _log.Debug("Response: " + response);
UpdateDb(dbWorker, id, response, request, ProcessStatus.Failed);
}
_log.Error(ex.Message);
continue;
} }
} }
} catch (Exception ex)
private string GetSecretHash(string secretKey, string body)
{
string computedSignature;
var secretBytes = Encoding.UTF8.GetBytes(secretKey);
using (var hasher = new HMACSHA256(secretBytes))
{ {
var data = Encoding.UTF8.GetBytes(body); if (i == _repeatCount - 1)
computedSignature = BitConverter.ToString(hasher.ComputeHash(data)); {
UpdateDb(dbWorker, id, response, request, ProcessStatus.Failed);
}
_log.Error(ex.Message);
continue;
} }
return computedSignature;
}
private void UpdateDb(DbWorker dbWorker, int id, HttpResponseMessage response, HttpRequestMessage request, ProcessStatus status)
{
var responseHeaders = JsonSerializer.Serialize(response.Headers.ToDictionary(r => r.Key, v => v.Value));
var requestHeaders = JsonSerializer.Serialize(request.Headers.ToDictionary(r => r.Key , v => v.Value));
string responsePayload;
using (var streamReader = new StreamReader(response.Content.ReadAsStream()))
{
var responseContent = streamReader.ReadToEnd();
responsePayload = JsonSerializer.Serialize(responseContent);
};
dbWorker.UpdateWebhookJournal(id, status, responsePayload, responseHeaders, requestHeaders);
} }
} }
private string GetSecretHash(string secretKey, string body)
{
string computedSignature;
var secretBytes = Encoding.UTF8.GetBytes(secretKey);
using (var hasher = new HMACSHA256(secretBytes))
{
var data = Encoding.UTF8.GetBytes(body);
computedSignature = BitConverter.ToString(hasher.ComputeHash(data));
}
return computedSignature;
}
private void UpdateDb(DbWorker dbWorker, int id, HttpResponseMessage response, HttpRequestMessage request, ProcessStatus status)
{
var responseHeaders = JsonSerializer.Serialize(response.Headers.ToDictionary(r => r.Key, v => v.Value));
var requestHeaders = JsonSerializer.Serialize(request.Headers.ToDictionary(r => r.Key , v => v.Value));
string responsePayload;
using (var streamReader = new StreamReader(response.Content.ReadAsStream()))
{
var responseContent = streamReader.ReadToEnd();
responsePayload = JsonSerializer.Serialize(responseContent);
};
dbWorker.UpdateWebhookJournal(id, status, responsePayload, responseHeaders, requestHeaders);
}
} }