From 8a51dee296c32ebbecee189c6faf666749aaa176 Mon Sep 17 00:00:00 2001 From: Vashchuk Nikita Date: Mon, 26 Jul 2021 20:03:55 +0300 Subject: [PATCH] modified DbWorker(Lazy), Sender. Added BuildQueueService and WorkerService.cs. --- common/ASC.Webhooks/BuildQueueService.cs | 31 +++++++ common/ASC.Webhooks/DbWorker.cs | 45 +++------- .../Properties/launchSettings.json | 30 +++---- common/ASC.Webhooks/WebhookHostedService.cs | 66 +++++++------- common/ASC.Webhooks/WebhookPublisher.cs | 6 -- common/ASC.Webhooks/WebhookSender.cs | 34 +++++-- common/ASC.Webhooks/WorkerService.cs | 88 +++++++++++++++++++ products/ASC.Files/Server/Startup.cs | 4 + web/ASC.Web.Api/ASC.Web.Api.csproj | 1 + 9 files changed, 210 insertions(+), 95 deletions(-) create mode 100644 common/ASC.Webhooks/BuildQueueService.cs create mode 100644 common/ASC.Webhooks/WorkerService.cs diff --git a/common/ASC.Webhooks/BuildQueueService.cs b/common/ASC.Webhooks/BuildQueueService.cs new file mode 100644 index 0000000000..4f7973e8c4 --- /dev/null +++ b/common/ASC.Webhooks/BuildQueueService.cs @@ -0,0 +1,31 @@ +using ASC.Common; +using ASC.Common.Caching; +using ASC.Web.Webhooks; + +namespace ASC.Webhooks +{ + [Singletone] + public class BuildQueueService + { + private ICacheNotify WebhookNotify { get; } + public BuildQueueService(ICacheNotify webhookNotify) + { + WebhookNotify = webhookNotify; + } + + public void Start() + { + WebhookNotify.Subscribe(BuildWebhooksQueue, CacheNotifyAction.Update); + } + + public void Stop() + { + WebhookNotify.Unsubscribe(CacheNotifyAction.Update); + } + + public void BuildWebhooksQueue(WebhookRequest request) + { + WebhookHostedService.Queue.Enqueue(request); + } + } +} diff --git a/common/ASC.Webhooks/DbWorker.cs b/common/ASC.Webhooks/DbWorker.cs index b588ea93bf..b1316e19e7 100644 --- a/common/ASC.Webhooks/DbWorker.cs +++ b/common/ASC.Webhooks/DbWorker.cs @@ -12,61 +12,42 @@ namespace ASC.Webhooks [Scope] public class DbWorker { - public WebhooksDbContext webhooksContext { get; } - public DbWorker(DbContextManager dbContext) + private Lazy LazyWebhooksDbContext { get; } + private WebhooksDbContext webhooksDbContext { get => LazyWebhooksDbContext.Value; } + public DbWorker(DbContextManager webhooksDbContext) { - webhooksContext = dbContext.Get("webhooks"); + LazyWebhooksDbContext = new Lazy(() => webhooksDbContext.Value); } public int WriteToJournal(WebhooksPayload webhook) { - var entity = webhooksContext.WebhooksPayloads.Add(webhook); - webhooksContext.SaveChanges(); + var entity = webhooksDbContext.WebhooksPayloads.Add(webhook); + webhooksDbContext.SaveChanges(); return entity.Entity.Id; } public void AddWebhookConfig(WebhooksConfig webhooksConfig) { - webhooksContext.WebhooksConfigs.Add(webhooksConfig); - webhooksContext.SaveChanges(); + webhooksDbContext.WebhooksConfigs.Add(webhooksConfig); + webhooksDbContext.SaveChanges(); } public List GetWebhookUri(int tenant) { - return webhooksContext.WebhooksConfigs.Where(t => t.TenantId == tenant).Select(it => it.Uri).ToList(); + return webhooksDbContext.WebhooksConfigs.Where(t => t.TenantId == tenant).Select(it => it.Uri).ToList(); } public List GetWebhookConfigs(int tenant) { - return webhooksContext.WebhooksConfigs.Where(t => t.TenantId == tenant).ToList(); + return webhooksDbContext.WebhooksConfigs.Where(t => t.TenantId == tenant).ToList(); } public void UpdateStatus(int id, ProcessStatus status) { - var webhook = webhooksContext.WebhooksPayloads.Where(t => t.Id == id).FirstOrDefault(); + var webhook = webhooksDbContext.WebhooksPayloads.Where(t => t.Id == id).FirstOrDefault(); webhook.Status = status; - webhooksContext.WebhooksPayloads.Update(webhook); - webhooksContext.SaveChanges(); - } - - public List GetWebhookQueue() - { - return webhooksContext.WebhooksPayloads - .Where(t => t.Status == ProcessStatus.InProcess) - .Join(webhooksContext.WebhooksConfigs, t => t.TenantId, t => t.TenantId, (payload, config) => new { payload, config }) - .Select(t => new WebhooksQueueEntry { Id = t.payload.Id, Data = t.payload.Data, SecretKey = t.config.SecretKey, Uri = t.config.Uri }) - .OrderBy(t => t.Id) - .ToList(); - } - - public List GetTenantWebhooks(EventName eventName) - { - return webhooksContext.WebhooksPayloads - .Where(t => t.Status == ProcessStatus.InProcess && t.Event == eventName) - .Join(webhooksContext.WebhooksConfigs, t => t.TenantId, t => t.TenantId, (payload, config) => new { payload, config }) - .Select(t => new WebhooksQueueEntry { Id = t.payload.Id, Data = t.payload.Data, SecretKey = t.config.SecretKey, Uri = t.config.Uri }) - .OrderBy(t => t.Id) - .ToList(); + webhooksDbContext.WebhooksPayloads.Update(webhook); + webhooksDbContext.SaveChanges(); } } } diff --git a/common/ASC.Webhooks/Properties/launchSettings.json b/common/ASC.Webhooks/Properties/launchSettings.json index 2ddd5acc1a..81153ebeeb 100644 --- a/common/ASC.Webhooks/Properties/launchSettings.json +++ b/common/ASC.Webhooks/Properties/launchSettings.json @@ -1,27 +1,27 @@ { - "iisSettings": { - "windowsAuthentication": false, - "anonymousAuthentication": true, - "iisExpress": { - "applicationUrl": "http://localhost:63599/", - "sslPort": 44357 - } - }, "profiles": { - "IIS Express": { - "commandName": "IISExpress", - "launchBrowser": true, + "Kestrel WebServer": { + "commandName": "Project", + "launchBrowser": false, "environmentVariables": { + "$STORAGE_ROOT": "../../Data", + "log__name": "webhooks", + "log__dir": "../../Logs", + "ASPNETCORE_URLS": "http://localhost:5056", "ASPNETCORE_ENVIRONMENT": "Development" } }, - "ASC.Webhooks": { - "commandName": "Project", - "launchBrowser": true, + "WSL 2 : Ubuntu 20.04": { + "commandName": "WSL2", + "launchBrowser": false, "environmentVariables": { + "$STORAGE_ROOT": "../../Data", + "log__name": "webhooks", + "log__dir": "../../Logs", + "ASPNETCORE_URLS": "http://localhost:5056", "ASPNETCORE_ENVIRONMENT": "Development" }, - "applicationUrl": "https://localhost:5001;http://localhost:5000" + "distributionName": "Ubuntu-20.04" } } } \ No newline at end of file diff --git a/common/ASC.Webhooks/WebhookHostedService.cs b/common/ASC.Webhooks/WebhookHostedService.cs index 83f51ca4e2..c5af7531c3 100644 --- a/common/ASC.Webhooks/WebhookHostedService.cs +++ b/common/ASC.Webhooks/WebhookHostedService.cs @@ -1,60 +1,58 @@ -using System; -using System.Collections.Generic; -using System.Linq; +using System.Collections.Concurrent; using System.Threading; using System.Threading.Tasks; -using ASC.Common.Caching; +using ASC.Common; using ASC.Common.Logging; -using ASC.Core; using ASC.Web.Webhooks; using Microsoft.Extensions.Hosting; -using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; namespace ASC.Webhooks { - public class WebhookHostedService : BackgroundService + [Singletone] + public class WebhookHostedService : IHostedService { - private ILog Log { get; } - private DbWorker DbWorker { get; } - private TenantManager TenantManager { get; } + private WorkerService workerService; + internal static readonly ConcurrentQueue Queue = new ConcurrentQueue(); + private BuildQueueService BuildQueueService { get; } private WebhookSender WebhookSender { get; } + private ILog Logger { get; } - private ICacheNotify CacheNotify { get; } - - public WebhookHostedService(IOptionsMonitor option, - DbWorker dbWorker, - TenantManager tenantManager, + public WebhookHostedService(BuildQueueService buildQueueService, WebhookSender webhookSender, - ICacheNotify cacheNotify) + IOptionsMonitor options) { - Log = option.Get("ASC.Webhooks"); - DbWorker = dbWorker; - TenantManager = tenantManager; + BuildQueueService = buildQueueService; WebhookSender = webhookSender; - CacheNotify = cacheNotify; - } - protected override async Task ExecuteAsync(CancellationToken stoppingToken) - { - Log.Debug( - $"WebhookHostedService is starting."); - - stoppingToken.Register(() => - Log.Debug($"WebhookHostedService is stopping.")); - - CacheNotify.Subscribe(BackgroundProcessing, CacheNotifyAction.Update); + Logger = options.Get("ASC.Webhooks"); } - public void Stop() + public Task StartAsync(CancellationToken cancellationToken) { - CacheNotify.Unsubscribe(CacheNotifyAction.Update); + workerService = new WorkerService(cancellationToken, WebhookSender, Logger); + workerService.Start(); + BuildQueueService.Start(); + + return Task.CompletedTask; } - private void BackgroundProcessing(WebhookRequest request)// горизонтальная, вертикальная кластеризация, добавление в очередь из паблишера + public Task StopAsync(CancellationToken cancellationToken) { - WebhookSender.Send(request); + + if (workerService != null) + { + workerService.Stop(); + workerService = null; + } + + if (BuildQueueService != null) + { + BuildQueueService.Stop(); + } + + return Task.CompletedTask; } } } diff --git a/common/ASC.Webhooks/WebhookPublisher.cs b/common/ASC.Webhooks/WebhookPublisher.cs index f8cd22c672..7a1a9bd247 100644 --- a/common/ASC.Webhooks/WebhookPublisher.cs +++ b/common/ASC.Webhooks/WebhookPublisher.cs @@ -1,19 +1,13 @@ using System; -using System.Collections.Generic; using System.Text.Json; -using System.Threading.Tasks; using ASC.Common; using ASC.Common.Caching; using ASC.Common.Logging; -using ASC.Common.Utils; using ASC.Core; using ASC.Web.Webhooks; using ASC.Webhooks.Dao.Models; -using Confluent.Kafka; - -using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Options; namespace ASC.Webhooks diff --git a/common/ASC.Webhooks/WebhookSender.cs b/common/ASC.Webhooks/WebhookSender.cs index 6bc6c529d5..6f458cac57 100644 --- a/common/ASC.Webhooks/WebhookSender.cs +++ b/common/ASC.Webhooks/WebhookSender.cs @@ -13,35 +13,52 @@ using ASC.Core; using ASC.Web.Webhooks; using ASC.Webhooks.Dao.Models; +using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Options; namespace ASC.Webhooks { - [Scope] + [Singletone] public class WebhookSender { private const int repeatCount = 5; private static readonly HttpClient httpClient = new HttpClient(); + private IServiceProvider ServiceProvider { get; } private ILog Log { get; } - private DbWorker DbWorker { get; } - public WebhookSender(IOptionsMonitor options, DbWorker dbWorker) + public WebhookSender(IOptionsMonitor options, IServiceProvider serviceProvider) { - DbWorker = dbWorker; Log = options.Get("ASC.Webhooks"); + ServiceProvider = serviceProvider; } public async Task Send(WebhookRequest webhookRequest) { var URI = webhookRequest.URI; var secretKey = webhookRequest.SecretKey; + using var scope = ServiceProvider.CreateScope(); + var dbWorker = scope.ServiceProvider.GetService(); + for (int i = 0; i < repeatCount; i++) { try { + var testRequest = new HttpRequestMessage(HttpMethod.Post, "http://localhost:8092/api/2.0/authentication.json"); + string httpContent = @"{ ""userName"":""www.vna-97@mail.ru"", ""password"":""265676-333"" }"; + testRequest.Content = new StringContent( + httpContent, + Encoding.UTF8, + "application/json"); + + var testResponse = httpClient.Send(testRequest); + + //HttpResponseMessage testResponseCalendar =httpClient.GetAsync("http://localhost:8092/api/2.0/calendar/info"); + //var token = "1FR2TsR3kXu2zor7fModuf/3nBJRPI4I7LG5x3ODzTVVgFmUd3NguHEmVqDNMJkNc7MRJjeacv+UaAOlRmLcUyCBtEt54Hzd6TCADQtzUEVvl2M20tX0uYd8sftTdIn/faWV415KXFsY3E16StTZ5A=="; + var request = new HttpRequestMessage(HttpMethod.Post, URI); request.Headers.Add("Accept", "*/*"); - request.Headers.Add("Secret","SHA256=" + GetSecretHash(secretKey, webhookRequest.Data));//*retry + request.Headers.Add("Secret","SHA256=" + GetSecretHash(secretKey, webhookRequest.Data)); + //request.Headers.Add("Authorization", token); request.Content = new StringContent( webhookRequest.Data, @@ -52,7 +69,8 @@ namespace ASC.Webhooks if (response.IsSuccessStatusCode) { - DbWorker.UpdateStatus(webhookRequest.Id, ProcessStatus.Success); + dbWorker.UpdateStatus(webhookRequest.Id, ProcessStatus.Success); + Log.Debug("Response: " + response); break; } } @@ -60,10 +78,10 @@ namespace ASC.Webhooks { if(i == repeatCount) { - DbWorker.UpdateStatus(webhookRequest.Id, ProcessStatus.Failed); + dbWorker.UpdateStatus(webhookRequest.Id, ProcessStatus.Failed); } - Log.Error("ERROR: " + ex.Message); + Log.Error(ex.Message); continue; } } diff --git a/common/ASC.Webhooks/WorkerService.cs b/common/ASC.Webhooks/WorkerService.cs new file mode 100644 index 0000000000..dfaa7041f2 --- /dev/null +++ b/common/ASC.Webhooks/WorkerService.cs @@ -0,0 +1,88 @@ +using System; +using System.Collections.Generic; +using System.Threading.Tasks; +using System.Threading; +using ASC.Common.Logging; + +namespace ASC.Webhooks +{ + internal class WorkerService + { + private readonly int threadCount = 10; + private readonly CancellationToken cancellationToken; + private readonly WebhookSender webhookSender; + private ILog logger; + private Timer timer; + + public WorkerService(CancellationToken cancellationToken, + WebhookSender webhookSender, + ILog logger) + { + this.cancellationToken = cancellationToken; + this.logger = logger; + this.webhookSender = webhookSender; + } + + public void Start() + { + timer = new Timer(Procedure, null, 0, Timeout.Infinite); + } + + public void Stop() + { + if (timer != null) + { + timer.Change(Timeout.Infinite, Timeout.Infinite); + timer.Dispose(); + timer = null; + } + } + + private void Procedure(object _) + { + if (cancellationToken.IsCancellationRequested) + { + Stop(); + return; + } + + timer.Change(Timeout.Infinite, Timeout.Infinite); + logger.Debug("Procedure: Start."); + var queueSize = WebhookHostedService.Queue.Count; + + if (queueSize == 0)// change to "<= threadCount" + { + logger.TraceFormat("Procedure: Waiting for data. Sleep {0}.", 5); + timer.Change(TimeSpan.FromSeconds(5), TimeSpan.FromMilliseconds(-1)); + return; + } + + + //var queueEntrys = new List(); + //for (int i = 0; i < queueSize; i++) + //{ + // WebhookHostedService.Queue.TryDequeue(out var entry); + // queueEntrys.Add(entry); + //} + + var tasks = new List(); + var counter = 0; + for (int i = 0; i < queueSize; i++) + { + WebhookHostedService.Queue.TryDequeue(out var entry); + tasks.Add(webhookSender.Send(entry)); + counter++; + + if (counter >= threadCount) + { + Task.WaitAll(tasks.ToArray()); + tasks.Clear(); + counter = 0; + } + } + + logger.Debug("Procedure: Finish."); + timer.Change(0, Timeout.Infinite); + } + } +} diff --git a/products/ASC.Files/Server/Startup.cs b/products/ASC.Files/Server/Startup.cs index bc6d8c0c92..b4526ea0da 100644 --- a/products/ASC.Files/Server/Startup.cs +++ b/products/ASC.Files/Server/Startup.cs @@ -6,6 +6,7 @@ using ASC.Api.Documents; using ASC.Web.Files; using ASC.Web.Files.HttpHandlers; using ASC.Web.Studio.Core.Notify; +using ASC.Webhooks; using Microsoft.AspNetCore.Builder; using Microsoft.AspNetCore.Hosting; @@ -41,6 +42,9 @@ namespace ASC.Files DIHelper.TryAdd(); DIHelper.TryAdd(); + services.AddHostedService(); + DIHelper.TryAdd(); + NotifyConfigurationExtension.Register(DIHelper); } diff --git a/web/ASC.Web.Api/ASC.Web.Api.csproj b/web/ASC.Web.Api/ASC.Web.Api.csproj index 20a7ec0af5..22423cbf54 100644 --- a/web/ASC.Web.Api/ASC.Web.Api.csproj +++ b/web/ASC.Web.Api/ASC.Web.Api.csproj @@ -27,6 +27,7 @@ +