modified DbWorker(Lazy), Sender. Added BuildQueueService and WorkerService.cs.

This commit is contained in:
Vashchuk Nikita 2021-07-26 20:03:55 +03:00
parent faabf5aae7
commit 8a51dee296
9 changed files with 210 additions and 95 deletions

View File

@ -0,0 +1,31 @@
using ASC.Common;
using ASC.Common.Caching;
using ASC.Web.Webhooks;
namespace ASC.Webhooks
{
[Singletone]
public class BuildQueueService
{
private ICacheNotify<WebhookRequest> WebhookNotify { get; }
public BuildQueueService(ICacheNotify<WebhookRequest> webhookNotify)
{
WebhookNotify = webhookNotify;
}
public void Start()
{
WebhookNotify.Subscribe(BuildWebhooksQueue, CacheNotifyAction.Update);
}
public void Stop()
{
WebhookNotify.Unsubscribe(CacheNotifyAction.Update);
}
public void BuildWebhooksQueue(WebhookRequest request)
{
WebhookHostedService.Queue.Enqueue(request);
}
}
}

View File

@ -12,61 +12,42 @@ namespace ASC.Webhooks
[Scope]
public class DbWorker
{
public WebhooksDbContext webhooksContext { get; }
public DbWorker(DbContextManager<WebhooksDbContext> dbContext)
private Lazy<WebhooksDbContext> LazyWebhooksDbContext { get; }
private WebhooksDbContext webhooksDbContext { get => LazyWebhooksDbContext.Value; }
public DbWorker(DbContextManager<WebhooksDbContext> webhooksDbContext)
{
webhooksContext = dbContext.Get("webhooks");
LazyWebhooksDbContext = new Lazy<WebhooksDbContext>(() => webhooksDbContext.Value);
}
public int WriteToJournal(WebhooksPayload webhook)
{
var entity = webhooksContext.WebhooksPayloads.Add(webhook);
webhooksContext.SaveChanges();
var entity = webhooksDbContext.WebhooksPayloads.Add(webhook);
webhooksDbContext.SaveChanges();
return entity.Entity.Id;
}
public void AddWebhookConfig(WebhooksConfig webhooksConfig)
{
webhooksContext.WebhooksConfigs.Add(webhooksConfig);
webhooksContext.SaveChanges();
webhooksDbContext.WebhooksConfigs.Add(webhooksConfig);
webhooksDbContext.SaveChanges();
}
public List<string> GetWebhookUri(int tenant)
{
return webhooksContext.WebhooksConfigs.Where(t => t.TenantId == tenant).Select(it => it.Uri).ToList();
return webhooksDbContext.WebhooksConfigs.Where(t => t.TenantId == tenant).Select(it => it.Uri).ToList();
}
public List<WebhooksConfig> GetWebhookConfigs(int tenant)
{
return webhooksContext.WebhooksConfigs.Where(t => t.TenantId == tenant).ToList();
return webhooksDbContext.WebhooksConfigs.Where(t => t.TenantId == tenant).ToList();
}
public void UpdateStatus(int id, ProcessStatus status)
{
var webhook = webhooksContext.WebhooksPayloads.Where(t => t.Id == id).FirstOrDefault();
var webhook = webhooksDbContext.WebhooksPayloads.Where(t => t.Id == id).FirstOrDefault();
webhook.Status = status;
webhooksContext.WebhooksPayloads.Update(webhook);
webhooksContext.SaveChanges();
}
public List<WebhooksQueueEntry> GetWebhookQueue()
{
return webhooksContext.WebhooksPayloads
.Where(t => t.Status == ProcessStatus.InProcess)
.Join(webhooksContext.WebhooksConfigs, t => t.TenantId, t => t.TenantId, (payload, config) => new { payload, config })
.Select(t => new WebhooksQueueEntry { Id = t.payload.Id, Data = t.payload.Data, SecretKey = t.config.SecretKey, Uri = t.config.Uri })
.OrderBy(t => t.Id)
.ToList();
}
public List<WebhooksQueueEntry> GetTenantWebhooks(EventName eventName)
{
return webhooksContext.WebhooksPayloads
.Where(t => t.Status == ProcessStatus.InProcess && t.Event == eventName)
.Join(webhooksContext.WebhooksConfigs, t => t.TenantId, t => t.TenantId, (payload, config) => new { payload, config })
.Select(t => new WebhooksQueueEntry { Id = t.payload.Id, Data = t.payload.Data, SecretKey = t.config.SecretKey, Uri = t.config.Uri })
.OrderBy(t => t.Id)
.ToList();
webhooksDbContext.WebhooksPayloads.Update(webhook);
webhooksDbContext.SaveChanges();
}
}
}

View File

@ -1,27 +1,27 @@
{
"iisSettings": {
"windowsAuthentication": false,
"anonymousAuthentication": true,
"iisExpress": {
"applicationUrl": "http://localhost:63599/",
"sslPort": 44357
}
},
"profiles": {
"IIS Express": {
"commandName": "IISExpress",
"launchBrowser": true,
"Kestrel WebServer": {
"commandName": "Project",
"launchBrowser": false,
"environmentVariables": {
"$STORAGE_ROOT": "../../Data",
"log__name": "webhooks",
"log__dir": "../../Logs",
"ASPNETCORE_URLS": "http://localhost:5056",
"ASPNETCORE_ENVIRONMENT": "Development"
}
},
"ASC.Webhooks": {
"commandName": "Project",
"launchBrowser": true,
"WSL 2 : Ubuntu 20.04": {
"commandName": "WSL2",
"launchBrowser": false,
"environmentVariables": {
"$STORAGE_ROOT": "../../Data",
"log__name": "webhooks",
"log__dir": "../../Logs",
"ASPNETCORE_URLS": "http://localhost:5056",
"ASPNETCORE_ENVIRONMENT": "Development"
},
"applicationUrl": "https://localhost:5001;http://localhost:5000"
"distributionName": "Ubuntu-20.04"
}
}
}

View File

@ -1,60 +1,58 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
using ASC.Common.Caching;
using ASC.Common;
using ASC.Common.Logging;
using ASC.Core;
using ASC.Web.Webhooks;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
namespace ASC.Webhooks
{
public class WebhookHostedService : BackgroundService
[Singletone]
public class WebhookHostedService : IHostedService
{
private ILog Log { get; }
private DbWorker DbWorker { get; }
private TenantManager TenantManager { get; }
private WorkerService workerService;
internal static readonly ConcurrentQueue<WebhookRequest> Queue = new ConcurrentQueue<WebhookRequest>();
private BuildQueueService BuildQueueService { get; }
private WebhookSender WebhookSender { get; }
private ILog Logger { get; }
private ICacheNotify<WebhookRequest> CacheNotify { get; }
public WebhookHostedService(IOptionsMonitor<ILog> option,
DbWorker dbWorker,
TenantManager tenantManager,
public WebhookHostedService(BuildQueueService buildQueueService,
WebhookSender webhookSender,
ICacheNotify<WebhookRequest> cacheNotify)
IOptionsMonitor<ILog> options)
{
Log = option.Get("ASC.Webhooks");
DbWorker = dbWorker;
TenantManager = tenantManager;
BuildQueueService = buildQueueService;
WebhookSender = webhookSender;
CacheNotify = cacheNotify;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
Log.Debug(
$"WebhookHostedService is starting.");
stoppingToken.Register(() =>
Log.Debug($"WebhookHostedService is stopping."));
CacheNotify.Subscribe(BackgroundProcessing, CacheNotifyAction.Update);
Logger = options.Get("ASC.Webhooks");
}
public void Stop()
public Task StartAsync(CancellationToken cancellationToken)
{
CacheNotify.Unsubscribe(CacheNotifyAction.Update);
workerService = new WorkerService(cancellationToken, WebhookSender, Logger);
workerService.Start();
BuildQueueService.Start();
return Task.CompletedTask;
}
private void BackgroundProcessing(WebhookRequest request)// горизонтальная, вертикальная кластеризация, добавление в очередь из паблишера
public Task StopAsync(CancellationToken cancellationToken)
{
WebhookSender.Send(request);
if (workerService != null)
{
workerService.Stop();
workerService = null;
}
if (BuildQueueService != null)
{
BuildQueueService.Stop();
}
return Task.CompletedTask;
}
}
}

View File

@ -1,19 +1,13 @@
using System;
using System.Collections.Generic;
using System.Text.Json;
using System.Threading.Tasks;
using ASC.Common;
using ASC.Common.Caching;
using ASC.Common.Logging;
using ASC.Common.Utils;
using ASC.Core;
using ASC.Web.Webhooks;
using ASC.Webhooks.Dao.Models;
using Confluent.Kafka;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Options;
namespace ASC.Webhooks

View File

@ -13,35 +13,52 @@ using ASC.Core;
using ASC.Web.Webhooks;
using ASC.Webhooks.Dao.Models;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Options;
namespace ASC.Webhooks
{
[Scope]
[Singletone]
public class WebhookSender
{
private const int repeatCount = 5;
private static readonly HttpClient httpClient = new HttpClient();
private IServiceProvider ServiceProvider { get; }
private ILog Log { get; }
private DbWorker DbWorker { get; }
public WebhookSender(IOptionsMonitor<ILog> options, DbWorker dbWorker)
public WebhookSender(IOptionsMonitor<ILog> options, IServiceProvider serviceProvider)
{
DbWorker = dbWorker;
Log = options.Get("ASC.Webhooks");
ServiceProvider = serviceProvider;
}
public async Task Send(WebhookRequest webhookRequest)
{
var URI = webhookRequest.URI;
var secretKey = webhookRequest.SecretKey;
using var scope = ServiceProvider.CreateScope();
var dbWorker = scope.ServiceProvider.GetService<DbWorker>();
for (int i = 0; i < repeatCount; i++)
{
try
{
var testRequest = new HttpRequestMessage(HttpMethod.Post, "http://localhost:8092/api/2.0/authentication.json");
string httpContent = @"{ ""userName"":""www.vna-97@mail.ru"", ""password"":""265676-333"" }";
testRequest.Content = new StringContent(
httpContent,
Encoding.UTF8,
"application/json");
var testResponse = httpClient.Send(testRequest);
//HttpResponseMessage testResponseCalendar =httpClient.GetAsync("http://localhost:8092/api/2.0/calendar/info");
//var token = "1FR2TsR3kXu2zor7fModuf/3nBJRPI4I7LG5x3ODzTVVgFmUd3NguHEmVqDNMJkNc7MRJjeacv+UaAOlRmLcUyCBtEt54Hzd6TCADQtzUEVvl2M20tX0uYd8sftTdIn/faWV415KXFsY3E16StTZ5A==";
var request = new HttpRequestMessage(HttpMethod.Post, URI);
request.Headers.Add("Accept", "*/*");
request.Headers.Add("Secret","SHA256=" + GetSecretHash(secretKey, webhookRequest.Data));//*retry
request.Headers.Add("Secret","SHA256=" + GetSecretHash(secretKey, webhookRequest.Data));
//request.Headers.Add("Authorization", token);
request.Content = new StringContent(
webhookRequest.Data,
@ -52,7 +69,8 @@ namespace ASC.Webhooks
if (response.IsSuccessStatusCode)
{
DbWorker.UpdateStatus(webhookRequest.Id, ProcessStatus.Success);
dbWorker.UpdateStatus(webhookRequest.Id, ProcessStatus.Success);
Log.Debug("Response: " + response);
break;
}
}
@ -60,10 +78,10 @@ namespace ASC.Webhooks
{
if(i == repeatCount)
{
DbWorker.UpdateStatus(webhookRequest.Id, ProcessStatus.Failed);
dbWorker.UpdateStatus(webhookRequest.Id, ProcessStatus.Failed);
}
Log.Error("ERROR: " + ex.Message);
Log.Error(ex.Message);
continue;
}
}

View File

@ -0,0 +1,88 @@
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using System.Threading;
using ASC.Common.Logging;
namespace ASC.Webhooks
{
internal class WorkerService
{
private readonly int threadCount = 10;
private readonly CancellationToken cancellationToken;
private readonly WebhookSender webhookSender;
private ILog logger;
private Timer timer;
public WorkerService(CancellationToken cancellationToken,
WebhookSender webhookSender,
ILog logger)
{
this.cancellationToken = cancellationToken;
this.logger = logger;
this.webhookSender = webhookSender;
}
public void Start()
{
timer = new Timer(Procedure, null, 0, Timeout.Infinite);
}
public void Stop()
{
if (timer != null)
{
timer.Change(Timeout.Infinite, Timeout.Infinite);
timer.Dispose();
timer = null;
}
}
private void Procedure(object _)
{
if (cancellationToken.IsCancellationRequested)
{
Stop();
return;
}
timer.Change(Timeout.Infinite, Timeout.Infinite);
logger.Debug("Procedure: Start.");
var queueSize = WebhookHostedService.Queue.Count;
if (queueSize == 0)// change to "<= threadCount"
{
logger.TraceFormat("Procedure: Waiting for data. Sleep {0}.", 5);
timer.Change(TimeSpan.FromSeconds(5), TimeSpan.FromMilliseconds(-1));
return;
}
//var queueEntrys = new List<WebhookRequest>();
//for (int i = 0; i < queueSize; i++)
//{
// WebhookHostedService.Queue.TryDequeue(out var entry);
// queueEntrys.Add(entry);
//}
var tasks = new List<Task>();
var counter = 0;
for (int i = 0; i < queueSize; i++)
{
WebhookHostedService.Queue.TryDequeue(out var entry);
tasks.Add(webhookSender.Send(entry));
counter++;
if (counter >= threadCount)
{
Task.WaitAll(tasks.ToArray());
tasks.Clear();
counter = 0;
}
}
logger.Debug("Procedure: Finish.");
timer.Change(0, Timeout.Infinite);
}
}
}

View File

@ -6,6 +6,7 @@ using ASC.Api.Documents;
using ASC.Web.Files;
using ASC.Web.Files.HttpHandlers;
using ASC.Web.Studio.Core.Notify;
using ASC.Webhooks;
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
@ -41,6 +42,9 @@ namespace ASC.Files
DIHelper.TryAdd<DocuSignHandlerService>();
DIHelper.TryAdd<ThirdPartyAppHandlerService>();
services.AddHostedService<WebhookHostedService>();
DIHelper.TryAdd<WebhookHostedService>();
NotifyConfigurationExtension.Register(DIHelper);
}

View File

@ -27,6 +27,7 @@
<ItemGroup>
<ProjectReference Include="..\..\common\ASC.Core.Common\ASC.Core.Common.csproj" />
<ProjectReference Include="..\..\common\ASC.Webhooks\ASC.Webhooks.csproj" />
<ProjectReference Include="..\..\common\services\ASC.AuditTrail\ASC.AuditTrail.csproj" />
<ProjectReference Include="..\..\common\services\ASC.Data.Backup\ASC.Data.Backup.csproj" />
</ItemGroup>