DocSpace-client/common/services/ASC.Webhooks.Service/WorkerService.cs

100 lines
2.8 KiB
C#
Raw Normal View History

using System;
2021-07-31 17:12:41 +00:00
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading;
2021-07-31 17:12:41 +00:00
using System.Threading.Tasks;
2021-08-26 18:43:41 +00:00
using ASC.Common;
using ASC.Common.Logging;
2021-07-29 11:04:42 +00:00
using ASC.Web.Webhooks;
2021-08-26 18:43:41 +00:00
namespace ASC.Webhooks.Service
{
2021-08-26 18:43:41 +00:00
[Singletone]
public class WorkerService
{
2022-01-24 10:01:19 +00:00
private readonly int? threadCount;
private readonly WebhookSender webhookSender;
2021-07-29 11:04:42 +00:00
private readonly ConcurrentQueue<WebhookRequest> queue;
2021-08-26 18:43:41 +00:00
private CancellationToken cancellationToken;
private ILog logger;
private Timer timer;
2021-08-26 18:43:41 +00:00
public WorkerService(WebhookSender webhookSender,
ILog logger,
BuildQueueService buildQueueService,
Settings settings)
{
this.logger = logger;
this.webhookSender = webhookSender;
2021-08-26 18:43:41 +00:00
queue = buildQueueService.Queue;
threadCount = settings.ThreadCount;
}
2021-08-26 18:43:41 +00:00
public void Start(CancellationToken cancellationToken)
{
2021-08-26 18:43:41 +00:00
this.cancellationToken = cancellationToken;
timer = new Timer(Procedure, null, 0, Timeout.Infinite);
}
public void Stop()
{
if (timer != null)
{
timer.Change(Timeout.Infinite, Timeout.Infinite);
timer.Dispose();
timer = null;
}
}
private void Procedure(object _)
{
if (cancellationToken.IsCancellationRequested)
{
Stop();
return;
}
timer.Change(Timeout.Infinite, Timeout.Infinite);
logger.Debug("Procedure: Start.");
2021-07-29 11:04:42 +00:00
var queueSize = queue.Count;
if (queueSize == 0)// change to "<= threadCount"
{
logger.TraceFormat("Procedure: Waiting for data. Sleep {0}.", 5);
timer.Change(TimeSpan.FromSeconds(5), TimeSpan.FromMilliseconds(-1));
return;
}
var tasks = new List<Task>();
var counter = 0;
for (int i = 0; i < queueSize; i++)
{
2021-08-26 18:43:41 +00:00
if (cancellationToken.IsCancellationRequested)
{
Stop();
return;
}
if (!queue.TryDequeue(out var entry))
break;
tasks.Add(webhookSender.Send(entry, cancellationToken));
counter++;
if (counter >= threadCount)
{
Task.WaitAll(tasks.ToArray());
tasks.Clear();
counter = 0;
}
}
2021-08-26 18:43:41 +00:00
Task.WaitAll(tasks.ToArray());
logger.Debug("Procedure: Finish.");
timer.Change(0, Timeout.Infinite);
}
}
}