refactoring: asc.webhooks.service

This commit is contained in:
Anton Suhorukov 2022-02-02 17:23:15 +03:00
parent 8428ef730c
commit 2ff2e64f4d
7 changed files with 170 additions and 199 deletions

View File

@ -1,36 +0,0 @@
using System.Collections.Concurrent;
using ASC.Common;
using ASC.Common.Caching;
using ASC.Web.Webhooks;
namespace ASC.Webhooks.Service
{
[Singletone]
public class BuildQueueService
{
internal ConcurrentQueue<WebhookRequest> Queue { get; }
private ICacheNotify<WebhookRequest> WebhookNotify { get; }
public BuildQueueService(ICacheNotify<WebhookRequest> webhookNotify)
{
WebhookNotify = webhookNotify;
Queue = new ConcurrentQueue<WebhookRequest>();
}
public void Start()
{
WebhookNotify.Subscribe(BuildWebhooksQueue, CacheNotifyAction.Update);
}
public void Stop()
{
WebhookNotify.Unsubscribe(CacheNotifyAction.Update);
}
public void BuildWebhooksQueue(WebhookRequest request)
{
Queue.Enqueue(request);
}
}
}

View File

@ -8,6 +8,7 @@ using ASC.Common.Caching;
using ASC.Common.DependencyInjection;
using ASC.Common.Utils;
using ASC.Webhooks.Core;
using ASC.Webhooks.Service.Services;
using Autofac;
using Autofac.Extensions.DependencyInjection;
@ -90,8 +91,11 @@ namespace ASC.Webhooks.Service
diHelper.TryAdd<DbWorker>();
services.AddHostedService<WebhookHostedService>();
diHelper.TryAdd<WebhookHostedService>();
services.AddHostedService<BuildQueueService>();
diHelper.TryAdd<BuildQueueService>();
services.AddHostedService<WorkerService>();
diHelper.TryAdd<WorkerService>();
})
.ConfigureContainer<ContainerBuilder>((context, builder) =>

View File

@ -0,0 +1,42 @@
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
using ASC.Common;
using ASC.Common.Caching;
using ASC.Web.Webhooks;
using Microsoft.Extensions.Hosting;
namespace ASC.Webhooks.Service.Services
{
[Singletone]
public class BuildQueueService : IHostedService
{
internal ConcurrentQueue<WebhookRequest> Queue { get; }
private readonly ICacheNotify<WebhookRequest> _webhookNotify;
public BuildQueueService(ICacheNotify<WebhookRequest> webhookNotify)
{
_webhookNotify = webhookNotify;
Queue = new ConcurrentQueue<WebhookRequest>();
}
public Task StartAsync(CancellationToken cancellationToken)
{
_webhookNotify.Subscribe(BuildWebhooksQueue, CacheNotifyAction.Update);
return Task.CompletedTask;
}
public Task StopAsync(CancellationToken cancellationToken)
{
_webhookNotify.Unsubscribe(CacheNotifyAction.Update);
return Task.CompletedTask;
}
public void BuildWebhooksQueue(WebhookRequest request)
{
Queue.Enqueue(request);
}
}
}

View File

@ -0,0 +1,108 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using ASC.Common;
using ASC.Common.Logging;
using ASC.Web.Webhooks;
using Microsoft.Extensions.Hosting;
namespace ASC.Webhooks.Service.Services
{
[Singletone]
public class WorkerService: IHostedService, IDisposable
{
private readonly int? _threadCount = 10;
private readonly WebhookSender _webhookSender;
private readonly ConcurrentQueue<WebhookRequest> _queue;
private CancellationToken _cancellationToken;
private readonly ILog _logger;
private Timer _timer;
public WorkerService(WebhookSender webhookSender,
ILog logger,
BuildQueueService buildQueueService,
Settings settings)
{
_logger = logger;
_webhookSender = webhookSender;
_queue = buildQueueService.Queue;
_threadCount = settings.ThreadCount;
}
public Task StartAsync(CancellationToken cancellationToken)
{
_cancellationToken = cancellationToken;
_timer = new Timer(DoWork, null, 0, Timeout.Infinite);
return Task.CompletedTask;
}
public Task StopAsync(CancellationToken cancellationToken)
{
Stop();
return Task.CompletedTask;
}
private void Stop()
{
_timer?.Change(Timeout.Infinite, 0);
}
private void DoWork(object _)
{
if (_cancellationToken.IsCancellationRequested)
{
Stop();
return;
}
_timer.Change(Timeout.Infinite, Timeout.Infinite);
_logger.Debug("Procedure: Start.");
var queueSize = _queue.Count;
if (queueSize == 0)// change to "<= threadCount"
{
_logger.TraceFormat("Procedure: Waiting for data. Sleep {0}.", 5);
_timer.Change(TimeSpan.FromSeconds(5), TimeSpan.FromMilliseconds(-1));
return;
}
var tasks = new List<Task>();
var counter = 0;
for (int i = 0; i < queueSize; i++)
{
if (_cancellationToken.IsCancellationRequested)
{
Stop();
return;
}
if (!_queue.TryDequeue(out var entry))
break;
tasks.Add(_webhookSender.Send(entry, _cancellationToken));
counter++;
if (counter >= _threadCount)
{
Task.WaitAll(tasks.ToArray());
tasks.Clear();
counter = 0;
}
}
Task.WaitAll(tasks.ToArray());
_logger.Debug("Procedure: Finish.");
_timer.Change(0, Timeout.Infinite);
}
public void Dispose()
{
_timer?.Dispose();
}
}
}

View File

@ -1,48 +0,0 @@
using System.Threading;
using System.Threading.Tasks;
using ASC.Common;
using Microsoft.Extensions.Hosting;
namespace ASC.Webhooks.Service
{
[Singletone]
public class WebhookHostedService : IHostedService
{
private WorkerService workerService;
private BuildQueueService buildQueueService;
public WebhookHostedService(WorkerService workerService,
BuildQueueService buildQueueService)
{
this.workerService = workerService;
this.buildQueueService = buildQueueService;
}
public Task StartAsync(CancellationToken cancellationToken)
{
workerService.Start(cancellationToken);
buildQueueService.Start();
return Task.CompletedTask;
}
public Task StopAsync(CancellationToken cancellationToken)
{
if (workerService != null)
{
workerService.Stop();
workerService = null;
}
if (buildQueueService != null)
{
buildQueueService.Stop();
}
return Task.CompletedTask;
}
}
}

View File

@ -21,21 +21,21 @@ namespace ASC.Webhooks.Service
[Singletone]
public class WebhookSender
{
public int? RepeatCount { get; }
private readonly int? _repeatCount;
private static readonly HttpClient httpClient = new HttpClient();
private IServiceProvider ServiceProvider { get; }
private ILog Log { get; }
private readonly IServiceScopeFactory _scopeFactory;
private readonly ILog _log;
public WebhookSender(IOptionsMonitor<ILog> options, IServiceProvider serviceProvider, Settings settings)
public WebhookSender(IOptionsMonitor<ILog> options, IServiceScopeFactory scopeFactory, Settings settings)
{
Log = options.Get("ASC.Webhooks.Core");
ServiceProvider = serviceProvider;
RepeatCount = settings.RepeatCount;
_log = options.Get("ASC.Webhooks.Core");
_scopeFactory = scopeFactory;
_repeatCount = settings.RepeatCount;
}
public async Task Send(WebhookRequest webhookRequest, CancellationToken cancellationToken)
{
using var scope = ServiceProvider.CreateScope();
using var scope = _scopeFactory.CreateScope();
var dbWorker = scope.ServiceProvider.GetService<DbWorker>();
var entry = dbWorker.ReadFromJournal(webhookRequest.Id);
@ -47,7 +47,7 @@ namespace ASC.Webhooks.Service
HttpResponseMessage response = new HttpResponseMessage();
HttpRequestMessage request = new HttpRequestMessage();
for (int i = 0; i < RepeatCount; i++)
for (int i = 0; i < _repeatCount; i++)
{
try
{
@ -65,23 +65,23 @@ namespace ASC.Webhooks.Service
if (response.IsSuccessStatusCode)
{
UpdateDb(dbWorker, id, response, request, ProcessStatus.Success);
Log.Debug("Response: " + response);
_log.Debug("Response: " + response);
break;
}
else if (i == RepeatCount - 1)
else if (i == _repeatCount - 1)
{
UpdateDb(dbWorker, id, response, request, ProcessStatus.Failed);
Log.Debug("Response: " + response);
_log.Debug("Response: " + response);
}
}
catch (Exception ex)
{
if (i == RepeatCount - 1)
if (i == _repeatCount - 1)
{
UpdateDb(dbWorker, id, response, request, ProcessStatus.Failed);
}
Log.Error(ex.Message);
_log.Error(ex.Message);
continue;
}
}

View File

@ -1,99 +0,0 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using ASC.Common;
using ASC.Common.Logging;
using ASC.Web.Webhooks;
namespace ASC.Webhooks.Service
{
[Singletone]
public class WorkerService
{
private readonly int? threadCount = 10;
private readonly WebhookSender webhookSender;
private readonly ConcurrentQueue<WebhookRequest> queue;
private CancellationToken cancellationToken;
private ILog logger;
private Timer timer;
public WorkerService(WebhookSender webhookSender,
ILog logger,
BuildQueueService buildQueueService,
Settings settings)
{
this.logger = logger;
this.webhookSender = webhookSender;
queue = buildQueueService.Queue;
threadCount = settings.ThreadCount;
}
public void Start(CancellationToken cancellationToken)
{
this.cancellationToken = cancellationToken;
timer = new Timer(Procedure, null, 0, Timeout.Infinite);
}
public void Stop()
{
if (timer != null)
{
timer.Change(Timeout.Infinite, Timeout.Infinite);
timer.Dispose();
timer = null;
}
}
private void Procedure(object _)
{
if (cancellationToken.IsCancellationRequested)
{
Stop();
return;
}
timer.Change(Timeout.Infinite, Timeout.Infinite);
logger.Debug("Procedure: Start.");
var queueSize = queue.Count;
if (queueSize == 0)// change to "<= threadCount"
{
logger.TraceFormat("Procedure: Waiting for data. Sleep {0}.", 5);
timer.Change(TimeSpan.FromSeconds(5), TimeSpan.FromMilliseconds(-1));
return;
}
var tasks = new List<Task>();
var counter = 0;
for (int i = 0; i < queueSize; i++)
{
if (cancellationToken.IsCancellationRequested)
{
Stop();
return;
}
if (!queue.TryDequeue(out var entry))
break;
tasks.Add(webhookSender.Send(entry, cancellationToken));
counter++;
if (counter >= threadCount)
{
Task.WaitAll(tasks.ToArray());
tasks.Clear();
counter = 0;
}
}
Task.WaitAll(tasks.ToArray());
logger.Debug("Procedure: Finish.");
timer.Change(0, Timeout.Infinite);
}
}
}