Merge pull request #528 from ONLYOFFICE/feature/refactoring-webhooks.service
Feature/refactoring webhooks.service
This commit is contained in:
commit
97eb650a74
@ -1,30 +0,0 @@
|
||||
namespace ASC.Webhooks.Service
|
||||
{
|
||||
[Singletone]
|
||||
public class BuildQueueService
|
||||
{
|
||||
internal ConcurrentQueue<WebhookRequest> Queue { get; }
|
||||
private ICacheNotify<WebhookRequest> WebhookNotify { get; }
|
||||
|
||||
public BuildQueueService(ICacheNotify<WebhookRequest> webhookNotify)
|
||||
{
|
||||
WebhookNotify = webhookNotify;
|
||||
Queue = new ConcurrentQueue<WebhookRequest>();
|
||||
}
|
||||
|
||||
public void Start()
|
||||
{
|
||||
WebhookNotify.Subscribe(BuildWebhooksQueue, CacheNotifyAction.Update);
|
||||
}
|
||||
|
||||
public void Stop()
|
||||
{
|
||||
WebhookNotify.Unsubscribe(CacheNotifyAction.Update);
|
||||
}
|
||||
|
||||
public void BuildWebhooksQueue(WebhookRequest request)
|
||||
{
|
||||
Queue.Enqueue(request);
|
||||
}
|
||||
}
|
||||
}
|
@ -4,6 +4,7 @@ global using System.Collections.Generic;
|
||||
global using System.IO;
|
||||
global using System.Linq;
|
||||
global using System.Net.Http;
|
||||
global using System.Runtime.InteropServices;
|
||||
global using System.Security.Cryptography;
|
||||
global using System.Text;
|
||||
global using System.Text.Json;
|
||||
@ -18,14 +19,17 @@ global using ASC.Common.Logging;
|
||||
global using ASC.Common.Utils;
|
||||
global using ASC.Web.Webhooks;
|
||||
global using ASC.Webhooks.Core;
|
||||
global using ASC.Webhooks.Service.Services;
|
||||
|
||||
global using Autofac;
|
||||
global using Autofac.Extensions.DependencyInjection;
|
||||
|
||||
global using Microsoft.AspNetCore.Builder;
|
||||
global using Microsoft.AspNetCore.Hosting;
|
||||
global using Microsoft.Extensions.Configuration;
|
||||
global using Microsoft.Extensions.DependencyInjection;
|
||||
global using Microsoft.Extensions.Hosting;
|
||||
global using Microsoft.Extensions.Hosting.WindowsServices;
|
||||
global using Microsoft.Extensions.Options;
|
||||
|
||||
global using StackExchange.Redis.Extensions.Core.Configuration;
|
||||
|
@ -1,81 +1,110 @@
|
||||
namespace ASC.Webhooks.Service
|
||||
var options = new WebApplicationOptions
|
||||
{
|
||||
public static class Program
|
||||
Args = args,
|
||||
ContentRootPath = WindowsServiceHelpers.IsWindowsService() ? AppContext.BaseDirectory : default
|
||||
};
|
||||
|
||||
var builder = WebApplication.CreateBuilder(options);
|
||||
|
||||
builder.Host.UseWindowsService();
|
||||
builder.Host.UseSystemd();
|
||||
builder.Host.UseServiceProviderFactory(new AutofacServiceProviderFactory());
|
||||
|
||||
builder.WebHost.ConfigureKestrel((hostingContext, serverOptions) =>
|
||||
{
|
||||
var kestrelConfig = hostingContext.Configuration.GetSection("Kestrel");
|
||||
|
||||
if (!kestrelConfig.Exists()) return;
|
||||
|
||||
var unixSocket = kestrelConfig.GetValue<string>("ListenUnixSocket");
|
||||
|
||||
if (RuntimeInformation.IsOSPlatform(OSPlatform.Linux))
|
||||
{
|
||||
public async static Task Main(string[] args)
|
||||
if (!string.IsNullOrWhiteSpace(unixSocket))
|
||||
{
|
||||
var host = CreateHostBuilder(args).Build();
|
||||
unixSocket = string.Format(unixSocket, hostingContext.HostingEnvironment.ApplicationName.Replace("ASC.", "").Replace(".", ""));
|
||||
|
||||
await host.RunAsync();
|
||||
serverOptions.ListenUnixSocket(unixSocket);
|
||||
}
|
||||
|
||||
public static IHostBuilder CreateHostBuilder(string[] args) =>
|
||||
Host.CreateDefaultBuilder(args)
|
||||
.UseSystemd()
|
||||
.UseWindowsService()
|
||||
.UseServiceProviderFactory(new AutofacServiceProviderFactory())
|
||||
.ConfigureWebHostDefaults(webBuilder => webBuilder.UseStartup<BaseWorkerStartup>())
|
||||
.ConfigureAppConfiguration((hostContext, config) =>
|
||||
{
|
||||
var buided = config.Build();
|
||||
var path = buided["pathToConf"];
|
||||
if (!Path.IsPathRooted(path))
|
||||
{
|
||||
path = Path.GetFullPath(CrossPlatform.PathCombine(hostContext.HostingEnvironment.ContentRootPath, path));
|
||||
}
|
||||
config.SetBasePath(path);
|
||||
var env = hostContext.Configuration.GetValue("ENVIRONMENT", "Production");
|
||||
config
|
||||
.AddInMemoryCollection(new Dictionary<string, string>
|
||||
{
|
||||
{"pathToConf", path }
|
||||
}
|
||||
)
|
||||
.AddJsonFile("appsettings.json")
|
||||
.AddJsonFile($"appsettings.{env}.json", true)
|
||||
.AddJsonFile($"appsettings.services.json", true)
|
||||
.AddJsonFile("storage.json")
|
||||
.AddJsonFile("kafka.json")
|
||||
.AddJsonFile($"kafka.{env}.json", true)
|
||||
.AddJsonFile("redis.json")
|
||||
.AddJsonFile($"redis.{env}.json", true)
|
||||
.AddEnvironmentVariables()
|
||||
.AddCommandLine(args);
|
||||
})
|
||||
.ConfigureServices((hostContext, services) =>
|
||||
{
|
||||
services.AddMemoryCache();
|
||||
|
||||
var diHelper = new DIHelper(services);
|
||||
|
||||
var redisConfiguration = hostContext.Configuration.GetSection("Redis").Get<RedisConfiguration>();
|
||||
var kafkaConfiguration = hostContext.Configuration.GetSection("kafka").Get<KafkaSettings>();
|
||||
|
||||
if (kafkaConfiguration != null)
|
||||
{
|
||||
diHelper.TryAdd(typeof(ICacheNotify<>), typeof(KafkaCacheNotify<>));
|
||||
}
|
||||
else if (redisConfiguration != null)
|
||||
{
|
||||
diHelper.TryAdd(typeof(ICacheNotify<>), typeof(RedisCacheNotify<>));
|
||||
|
||||
services.AddStackExchangeRedisExtensions<NewtonsoftSerializer>(redisConfiguration);
|
||||
}
|
||||
else
|
||||
{
|
||||
diHelper.TryAdd(typeof(ICacheNotify<>), typeof(MemoryCacheNotify<>));
|
||||
}
|
||||
|
||||
diHelper.TryAdd<DbWorker>();
|
||||
|
||||
services.AddHostedService<WebhookHostedService>();
|
||||
diHelper.TryAdd<WebhookHostedService>();
|
||||
|
||||
})
|
||||
.ConfigureContainer<ContainerBuilder>((context, builder) =>
|
||||
{
|
||||
builder.Register(context.Configuration, false, false);
|
||||
})
|
||||
.ConfigureNLogLogging();
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
builder.Host.ConfigureContainer<ContainerBuilder>((context, builder) =>
|
||||
{
|
||||
builder.Register(context.Configuration, false, false);
|
||||
});
|
||||
|
||||
builder.Host.ConfigureAppConfiguration((hostContext, config) =>
|
||||
{
|
||||
var buided = config.Build();
|
||||
var path = buided["pathToConf"];
|
||||
if (!Path.IsPathRooted(path))
|
||||
{
|
||||
path = Path.GetFullPath(CrossPlatform.PathCombine(hostContext.HostingEnvironment.ContentRootPath, path));
|
||||
}
|
||||
config.SetBasePath(path);
|
||||
var env = hostContext.Configuration.GetValue("ENVIRONMENT", "Production");
|
||||
config
|
||||
.AddInMemoryCollection(new Dictionary<string, string>
|
||||
{
|
||||
{"pathToConf", path }
|
||||
}
|
||||
)
|
||||
.AddJsonFile("appsettings.json")
|
||||
.AddJsonFile($"appsettings.{env}.json", true)
|
||||
.AddJsonFile($"appsettings.services.json", true)
|
||||
.AddJsonFile("storage.json")
|
||||
.AddJsonFile("kafka.json")
|
||||
.AddJsonFile($"kafka.{env}.json", true)
|
||||
.AddJsonFile("redis.json")
|
||||
.AddJsonFile($"redis.{env}.json", true)
|
||||
.AddEnvironmentVariables()
|
||||
.AddCommandLine(args);
|
||||
});
|
||||
|
||||
builder.WebHost.ConfigureServices((hostContext, services) =>
|
||||
{
|
||||
services.AddMemoryCache();
|
||||
|
||||
var diHelper = new DIHelper(services);
|
||||
|
||||
var redisConfiguration = hostContext.Configuration.GetSection("Redis").Get<RedisConfiguration>();
|
||||
var kafkaConfiguration = hostContext.Configuration.GetSection("kafka").Get<KafkaSettings>();
|
||||
|
||||
if (kafkaConfiguration != null)
|
||||
{
|
||||
diHelper.TryAdd(typeof(ICacheNotify<>), typeof(KafkaCacheNotify<>));
|
||||
}
|
||||
else if (redisConfiguration != null)
|
||||
{
|
||||
diHelper.TryAdd(typeof(ICacheNotify<>), typeof(RedisCacheNotify<>));
|
||||
|
||||
services.AddStackExchangeRedisExtensions<NewtonsoftSerializer>(redisConfiguration);
|
||||
}
|
||||
else
|
||||
{
|
||||
diHelper.TryAdd(typeof(ICacheNotify<>), typeof(MemoryCacheNotify<>));
|
||||
}
|
||||
|
||||
services.AddHttpClient();
|
||||
|
||||
diHelper.TryAdd<DbWorker>();
|
||||
|
||||
services.AddHostedService<BuildQueueService>();
|
||||
diHelper.TryAdd<BuildQueueService>();
|
||||
|
||||
services.AddHostedService<WorkerService>();
|
||||
diHelper.TryAdd<WorkerService>();
|
||||
});
|
||||
|
||||
builder.Host.ConfigureNLogLogging();
|
||||
|
||||
var startup = new BaseWorkerStartup(builder.Configuration);
|
||||
|
||||
startup.ConfigureServices(builder.Services);
|
||||
|
||||
var app = builder.Build();
|
||||
|
||||
startup.Configure(app);
|
||||
|
||||
app.Run();
|
@ -0,0 +1,30 @@
|
||||
namespace ASC.Webhooks.Service.Services;
|
||||
|
||||
[Singletone]
|
||||
public class BuildQueueService : BackgroundService
|
||||
{
|
||||
internal readonly ConcurrentQueue<WebhookRequest> Queue;
|
||||
private readonly ICacheNotify<WebhookRequest> _webhookNotify;
|
||||
|
||||
public BuildQueueService(ICacheNotify<WebhookRequest> webhookNotify)
|
||||
{
|
||||
_webhookNotify = webhookNotify;
|
||||
Queue = new ConcurrentQueue<WebhookRequest>();
|
||||
}
|
||||
public void BuildWebhooksQueue(WebhookRequest request)
|
||||
{
|
||||
Queue.Enqueue(request);
|
||||
}
|
||||
|
||||
protected override Task ExecuteAsync(CancellationToken stoppingToken)
|
||||
{
|
||||
_webhookNotify.Subscribe(BuildWebhooksQueue, CacheNotifyAction.Update);
|
||||
|
||||
stoppingToken.Register(() =>
|
||||
{
|
||||
_webhookNotify.Unsubscribe(CacheNotifyAction.Update);
|
||||
});
|
||||
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
}
|
@ -0,0 +1,69 @@
|
||||
namespace ASC.Webhooks.Service.Services;
|
||||
|
||||
[Singletone]
|
||||
public class WorkerService : BackgroundService
|
||||
{
|
||||
private readonly ILog _logger;
|
||||
private readonly ConcurrentQueue<WebhookRequest> _queue;
|
||||
private readonly int? _threadCount = 10;
|
||||
private readonly WebhookSender _webhookSender;
|
||||
private readonly TimeSpan _waitingPeriod;
|
||||
|
||||
public WorkerService(WebhookSender webhookSender,
|
||||
ILog logger,
|
||||
BuildQueueService buildQueueService,
|
||||
Settings settings)
|
||||
{
|
||||
_logger = logger;
|
||||
_webhookSender = webhookSender;
|
||||
_queue = buildQueueService.Queue;
|
||||
_threadCount = settings.ThreadCount;
|
||||
_waitingPeriod = TimeSpan.FromSeconds(5);
|
||||
}
|
||||
|
||||
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
|
||||
{
|
||||
while (!stoppingToken.IsCancellationRequested)
|
||||
{
|
||||
var queueSize = _queue.Count;
|
||||
|
||||
if (queueSize == 0) // change to "<= threadCount"
|
||||
{
|
||||
_logger.TraceFormat("Procedure: Waiting for data. Sleep {0}.", _waitingPeriod);
|
||||
|
||||
await Task.Delay(_waitingPeriod, stoppingToken);
|
||||
|
||||
continue;
|
||||
}
|
||||
|
||||
var tasks = new List<Task>();
|
||||
var counter = 0;
|
||||
|
||||
for (int i = 0; i < queueSize; i++)
|
||||
{
|
||||
if (stoppingToken.IsCancellationRequested)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
if (!_queue.TryDequeue(out var entry))
|
||||
{
|
||||
break;
|
||||
}
|
||||
|
||||
tasks.Add(_webhookSender.Send(entry, stoppingToken));
|
||||
counter++;
|
||||
|
||||
if (counter >= _threadCount)
|
||||
{
|
||||
Task.WaitAll(tasks.ToArray());
|
||||
tasks.Clear();
|
||||
counter = 0;
|
||||
}
|
||||
}
|
||||
|
||||
Task.WaitAll(tasks.ToArray());
|
||||
_logger.Debug("Procedure: Finish.");
|
||||
}
|
||||
}
|
||||
}
|
@ -1,21 +1,18 @@
|
||||
namespace ASC.Webhooks.Service
|
||||
namespace ASC.Webhooks.Service;
|
||||
|
||||
[Singletone]
|
||||
public class Settings
|
||||
{
|
||||
[Singletone]
|
||||
public class Settings
|
||||
public Settings()
|
||||
{
|
||||
public Settings()
|
||||
{
|
||||
|
||||
}
|
||||
public Settings(ConfigurationExtension configuration)
|
||||
{
|
||||
var cfg = configuration.GetSetting<Settings>("webhooks");
|
||||
RepeatCount = cfg.RepeatCount ?? 5;
|
||||
ThreadCount = cfg.ThreadCount ?? 1;
|
||||
}
|
||||
|
||||
public int? RepeatCount { get; }
|
||||
public int? ThreadCount { get; }
|
||||
|
||||
}
|
||||
|
||||
public Settings(ConfigurationExtension configuration)
|
||||
{
|
||||
var cfg = configuration.GetSetting<Settings>("webhooks");
|
||||
RepeatCount = cfg.RepeatCount ?? 5;
|
||||
ThreadCount = cfg.ThreadCount ?? 1;
|
||||
}
|
||||
public int? RepeatCount { get; }
|
||||
public int? ThreadCount { get; }
|
||||
}
|
@ -1,41 +0,0 @@
|
||||
namespace ASC.Webhooks.Service
|
||||
{
|
||||
[Singletone]
|
||||
public class WebhookHostedService : IHostedService
|
||||
{
|
||||
private WorkerService workerService;
|
||||
private BuildQueueService buildQueueService;
|
||||
|
||||
public WebhookHostedService(WorkerService workerService,
|
||||
BuildQueueService buildQueueService)
|
||||
{
|
||||
this.workerService = workerService;
|
||||
this.buildQueueService = buildQueueService;
|
||||
}
|
||||
|
||||
public Task StartAsync(CancellationToken cancellationToken)
|
||||
{
|
||||
workerService.Start(cancellationToken);
|
||||
buildQueueService.Start();
|
||||
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
|
||||
public Task StopAsync(CancellationToken cancellationToken)
|
||||
{
|
||||
|
||||
if (workerService != null)
|
||||
{
|
||||
workerService.Stop();
|
||||
workerService = null;
|
||||
}
|
||||
|
||||
if (buildQueueService != null)
|
||||
{
|
||||
buildQueueService.Stop();
|
||||
}
|
||||
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
}
|
||||
}
|
@ -1,101 +1,102 @@
|
||||
namespace ASC.Webhooks.Service
|
||||
namespace ASC.Webhooks.Service;
|
||||
|
||||
[Singletone]
|
||||
public class WebhookSender
|
||||
{
|
||||
[Singletone]
|
||||
public class WebhookSender
|
||||
private readonly IHttpClientFactory _clientFactory;
|
||||
private readonly ILog _log;
|
||||
private readonly int? _repeatCount;
|
||||
private readonly IServiceScopeFactory _scopeFactory;
|
||||
|
||||
public WebhookSender(IOptionsMonitor<ILog> options, IServiceScopeFactory scopeFactory, Settings settings, IHttpClientFactory clientFactory)
|
||||
{
|
||||
public int? RepeatCount { get; }
|
||||
private static readonly HttpClient httpClient = new HttpClient();
|
||||
private IServiceProvider ServiceProvider { get; }
|
||||
private ILog Log { get; }
|
||||
_log = options.Get("ASC.Webhooks.Core");
|
||||
_scopeFactory = scopeFactory;
|
||||
_repeatCount = settings.RepeatCount;
|
||||
_clientFactory = clientFactory;
|
||||
}
|
||||
|
||||
public WebhookSender(IOptionsMonitor<ILog> options, IServiceProvider serviceProvider, Settings settings)
|
||||
public async Task Send(WebhookRequest webhookRequest, CancellationToken cancellationToken)
|
||||
{
|
||||
using var scope = _scopeFactory.CreateScope();
|
||||
var dbWorker = scope.ServiceProvider.GetService<DbWorker>();
|
||||
|
||||
var entry = dbWorker.ReadFromJournal(webhookRequest.Id);
|
||||
var id = entry.Id;
|
||||
var requestURI = entry.Uri;
|
||||
var secretKey = entry.SecretKey;
|
||||
var data = entry.Payload;
|
||||
|
||||
HttpResponseMessage response = new HttpResponseMessage();
|
||||
HttpRequestMessage request = new HttpRequestMessage();
|
||||
|
||||
for (int i = 0; i < _repeatCount; i++)
|
||||
{
|
||||
Log = options.Get("ASC.Webhooks.Core");
|
||||
ServiceProvider = serviceProvider;
|
||||
RepeatCount = settings.RepeatCount;
|
||||
}
|
||||
|
||||
public async Task Send(WebhookRequest webhookRequest, CancellationToken cancellationToken)
|
||||
{
|
||||
using var scope = ServiceProvider.CreateScope();
|
||||
var dbWorker = scope.ServiceProvider.GetService<DbWorker>();
|
||||
|
||||
var entry = dbWorker.ReadFromJournal(webhookRequest.Id);
|
||||
var id = entry.Id;
|
||||
var requestURI = entry.Uri;
|
||||
var secretKey = entry.SecretKey;
|
||||
var data = entry.Payload;
|
||||
|
||||
HttpResponseMessage response = new HttpResponseMessage();
|
||||
HttpRequestMessage request = new HttpRequestMessage();
|
||||
|
||||
for (int i = 0; i < RepeatCount; i++)
|
||||
try
|
||||
{
|
||||
try
|
||||
request = new HttpRequestMessage(HttpMethod.Post, requestURI);
|
||||
request.Headers.Add("Accept", "*/*");
|
||||
request.Headers.Add("Secret", "SHA256=" + GetSecretHash(secretKey, data));
|
||||
|
||||
request.Content = new StringContent(
|
||||
data,
|
||||
Encoding.UTF8,
|
||||
"application/json");
|
||||
|
||||
var httpClient = _clientFactory.CreateClient();
|
||||
response = await httpClient.SendAsync(request, cancellationToken);
|
||||
|
||||
if (response.IsSuccessStatusCode)
|
||||
{
|
||||
request = new HttpRequestMessage(HttpMethod.Post, requestURI);
|
||||
request.Headers.Add("Accept", "*/*");
|
||||
request.Headers.Add("Secret", "SHA256=" + GetSecretHash(secretKey, data));
|
||||
|
||||
request.Content = new StringContent(
|
||||
data,
|
||||
Encoding.UTF8,
|
||||
"application/json");
|
||||
|
||||
response = await httpClient.SendAsync(request, cancellationToken);
|
||||
|
||||
if (response.IsSuccessStatusCode)
|
||||
{
|
||||
UpdateDb(dbWorker, id, response, request, ProcessStatus.Success);
|
||||
Log.Debug("Response: " + response);
|
||||
break;
|
||||
}
|
||||
else if (i == RepeatCount - 1)
|
||||
{
|
||||
UpdateDb(dbWorker, id, response, request, ProcessStatus.Failed);
|
||||
Log.Debug("Response: " + response);
|
||||
}
|
||||
UpdateDb(dbWorker, id, response, request, ProcessStatus.Success);
|
||||
_log.Debug("Response: " + response);
|
||||
break;
|
||||
}
|
||||
catch (Exception ex)
|
||||
else if (i == _repeatCount - 1)
|
||||
{
|
||||
if (i == RepeatCount - 1)
|
||||
{
|
||||
UpdateDb(dbWorker, id, response, request, ProcessStatus.Failed);
|
||||
}
|
||||
|
||||
Log.Error(ex.Message);
|
||||
continue;
|
||||
UpdateDb(dbWorker, id, response, request, ProcessStatus.Failed);
|
||||
_log.Debug("Response: " + response);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private string GetSecretHash(string secretKey, string body)
|
||||
{
|
||||
string computedSignature;
|
||||
var secretBytes = Encoding.UTF8.GetBytes(secretKey);
|
||||
|
||||
using (var hasher = new HMACSHA256(secretBytes))
|
||||
catch (Exception ex)
|
||||
{
|
||||
var data = Encoding.UTF8.GetBytes(body);
|
||||
computedSignature = BitConverter.ToString(hasher.ComputeHash(data));
|
||||
if (i == _repeatCount - 1)
|
||||
{
|
||||
UpdateDb(dbWorker, id, response, request, ProcessStatus.Failed);
|
||||
}
|
||||
|
||||
_log.Error(ex.Message);
|
||||
continue;
|
||||
}
|
||||
|
||||
return computedSignature;
|
||||
}
|
||||
|
||||
private void UpdateDb(DbWorker dbWorker, int id, HttpResponseMessage response, HttpRequestMessage request, ProcessStatus status)
|
||||
{
|
||||
var responseHeaders = JsonSerializer.Serialize(response.Headers.ToDictionary(r => r.Key, v => v.Value));
|
||||
var requestHeaders = JsonSerializer.Serialize(request.Headers.ToDictionary(r => r.Key , v => v.Value));
|
||||
string responsePayload;
|
||||
|
||||
using (var streamReader = new StreamReader(response.Content.ReadAsStream()))
|
||||
{
|
||||
var responseContent = streamReader.ReadToEnd();
|
||||
responsePayload = JsonSerializer.Serialize(responseContent);
|
||||
}
|
||||
|
||||
dbWorker.UpdateWebhookJournal(id, status, responsePayload, responseHeaders, requestHeaders);
|
||||
}
|
||||
}
|
||||
|
||||
private string GetSecretHash(string secretKey, string body)
|
||||
{
|
||||
string computedSignature;
|
||||
var secretBytes = Encoding.UTF8.GetBytes(secretKey);
|
||||
|
||||
using (var hasher = new HMACSHA256(secretBytes))
|
||||
{
|
||||
var data = Encoding.UTF8.GetBytes(body);
|
||||
computedSignature = BitConverter.ToString(hasher.ComputeHash(data));
|
||||
}
|
||||
|
||||
return computedSignature;
|
||||
}
|
||||
|
||||
private void UpdateDb(DbWorker dbWorker, int id, HttpResponseMessage response, HttpRequestMessage request, ProcessStatus status)
|
||||
{
|
||||
var responseHeaders = JsonSerializer.Serialize(response.Headers.ToDictionary(r => r.Key, v => v.Value));
|
||||
var requestHeaders = JsonSerializer.Serialize(request.Headers.ToDictionary(r => r.Key, v => v.Value));
|
||||
string responsePayload;
|
||||
|
||||
using (var streamReader = new StreamReader(response.Content.ReadAsStream()))
|
||||
{
|
||||
var responseContent = streamReader.ReadToEnd();
|
||||
responsePayload = JsonSerializer.Serialize(responseContent);
|
||||
}
|
||||
|
||||
dbWorker.UpdateWebhookJournal(id, status, responsePayload, responseHeaders, requestHeaders);
|
||||
}
|
||||
}
|
@ -1,89 +0,0 @@
|
||||
namespace ASC.Webhooks.Service
|
||||
{
|
||||
[Singletone]
|
||||
public class WorkerService
|
||||
{
|
||||
private readonly int? threadCount;
|
||||
private readonly WebhookSender webhookSender;
|
||||
private readonly ConcurrentQueue<WebhookRequest> queue;
|
||||
private CancellationToken cancellationToken;
|
||||
private ILog logger;
|
||||
private Timer timer;
|
||||
|
||||
public WorkerService(WebhookSender webhookSender,
|
||||
ILog logger,
|
||||
BuildQueueService buildQueueService,
|
||||
Settings settings)
|
||||
{
|
||||
this.logger = logger;
|
||||
this.webhookSender = webhookSender;
|
||||
queue = buildQueueService.Queue;
|
||||
threadCount = settings.ThreadCount;
|
||||
}
|
||||
|
||||
public void Start(CancellationToken cancellationToken)
|
||||
{
|
||||
this.cancellationToken = cancellationToken;
|
||||
|
||||
timer = new Timer(Procedure, null, 0, Timeout.Infinite);
|
||||
}
|
||||
|
||||
public void Stop()
|
||||
{
|
||||
if (timer != null)
|
||||
{
|
||||
timer.Change(Timeout.Infinite, Timeout.Infinite);
|
||||
timer.Dispose();
|
||||
timer = null;
|
||||
}
|
||||
}
|
||||
|
||||
private void Procedure(object _)
|
||||
{
|
||||
if (cancellationToken.IsCancellationRequested)
|
||||
{
|
||||
Stop();
|
||||
return;
|
||||
}
|
||||
|
||||
timer.Change(Timeout.Infinite, Timeout.Infinite);
|
||||
logger.Debug("Procedure: Start.");
|
||||
var queueSize = queue.Count;
|
||||
|
||||
if (queueSize == 0)// change to "<= threadCount"
|
||||
{
|
||||
logger.TraceFormat("Procedure: Waiting for data. Sleep {0}.", 5);
|
||||
timer.Change(TimeSpan.FromSeconds(5), TimeSpan.FromMilliseconds(-1));
|
||||
return;
|
||||
}
|
||||
|
||||
var tasks = new List<Task>();
|
||||
var counter = 0;
|
||||
for (int i = 0; i < queueSize; i++)
|
||||
{
|
||||
if (cancellationToken.IsCancellationRequested)
|
||||
{
|
||||
Stop();
|
||||
return;
|
||||
}
|
||||
|
||||
if (!queue.TryDequeue(out var entry))
|
||||
break;
|
||||
|
||||
tasks.Add(webhookSender.Send(entry, cancellationToken));
|
||||
counter++;
|
||||
|
||||
if (counter >= threadCount)
|
||||
{
|
||||
Task.WaitAll(tasks.ToArray());
|
||||
tasks.Clear();
|
||||
counter = 0;
|
||||
}
|
||||
}
|
||||
|
||||
Task.WaitAll(tasks.ToArray());
|
||||
logger.Debug("Procedure: Finish.");
|
||||
timer.Change(0, Timeout.Infinite);
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user