Backup: DistributedTaskQueue

This commit is contained in:
pavelbannov 2020-08-06 21:07:27 +03:00
parent b067991256
commit ffc756f32b
9 changed files with 141 additions and 68 deletions

View File

@ -66,7 +66,7 @@ namespace ASC.Common.Threading
{
return Enum.Parse<DistributedTaskStatus>(DistributedTaskCache.Status);
}
internal set
set
{
DistributedTaskCache.Status = value.ToString();
}

View File

@ -34,6 +34,7 @@ using System.Threading.Tasks;
using ASC.Common.Caching;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Options;
namespace ASC.Common.Threading
@ -100,15 +101,18 @@ namespace ASC.Common.Threading
public class ConfigureDistributedTaskQueue : IConfigureNamedOptions<DistributedTaskQueue>
{
public DistributedTaskCacheNotify DistributedTaskCacheNotify { get; }
public ConfigureDistributedTaskQueue(DistributedTaskCacheNotify distributedTaskCacheNotify)
public IServiceProvider ServiceProvider { get; }
public ConfigureDistributedTaskQueue(DistributedTaskCacheNotify distributedTaskCacheNotify, IServiceProvider serviceProvider)
{
DistributedTaskCacheNotify = distributedTaskCacheNotify;
ServiceProvider = serviceProvider;
}
public void Configure(DistributedTaskQueue queue)
{
queue.DistributedTaskCacheNotify = DistributedTaskCacheNotify;
queue.DistributedTaskCacheNotify = DistributedTaskCacheNotify;
queue.ServiceProvider = ServiceProvider;
}
public void Configure(string name, DistributedTaskQueue options)
@ -125,6 +129,7 @@ namespace ASC.Common.Threading
private string key;
private TaskScheduler Scheduler { get; set; } = TaskScheduler.Default;
public IServiceProvider ServiceProvider { get; set; }
public string Name { set { key = value + GetType().Name; } }
private ICache Cache { get => DistributedTaskCacheNotify.Cache; }
private ConcurrentDictionary<string, CancellationTokenSource> Cancelations { get => DistributedTaskCacheNotify.Cancelations; }
@ -180,6 +185,25 @@ namespace ASC.Common.Threading
public IEnumerable<DistributedTask> GetTasks()
{
var tasks = Cache.HashGetAll<DistributedTaskCache>(key).Values.Select(r => new DistributedTask(r)).ToList();
tasks.ForEach(t =>
{
if (t.Publication == null)
{
t.Publication = GetPublication();
}
});
return tasks;
}
public IEnumerable<T> GetTasks<T>() where T : DistributedTask
{
var tasks = Cache.HashGetAll<DistributedTaskCache>(key).Values.Select(r =>
{
var result = ServiceProvider.GetService<T>();
result.DistributedTaskCache = r;
return result;
}).ToList();
tasks.ForEach(t =>
{
if (t.Publication == null)

View File

@ -30,8 +30,8 @@ namespace ASC.Common.Threading.Progress
{
public interface IProgressItem : ICloneable
{
object Id { get; set; }
object Status { get; set; }
string Id { get; }
DistributedTaskStatus Status { get; set; }
object Error { get; set; }
double Percentage { get; set; }
bool IsCompleted { get; set; }

View File

@ -35,13 +35,13 @@ namespace ASC.Common.Threading.Progress
protected int StepCount { get; set; }
public object Id { get; set; }
public object Status { get; set; }
public object Error { get; set; }
public string Id { get; set; }
public DistributedTaskStatus Status { get; set; }
public object Error { get; set; }
public double Percentage
{
get { return Math.Min(100.0, Math.Max(0, _percentage)); }
@ -89,7 +89,7 @@ namespace ASC.Common.Threading.Progress
protected ProgressBase()
{
Id = Guid.NewGuid(); // random id
Id = Guid.NewGuid().ToString(); // random id
}
protected void ProgressAdd(double value)

View File

@ -32,6 +32,7 @@ using System.Text;
using ASC.Common;
using ASC.Common.Logging;
using ASC.Common.Threading;
using ASC.Common.Threading.Progress;
using ASC.Core;
using ASC.Core.Users;
@ -61,8 +62,8 @@ namespace ASC.Data.Reassigns
//private readonly IFileStorageService _docService;
//private readonly MailGarbageEngine _mailEraser;
public object Id { get; set; }
public object Status { get; set; }
public string Id { get; }
public DistributedTaskStatus Status { get; set; }
public object Error { get; set; }
public double Percentage { get; set; }
public bool IsCompleted { get; set; }
@ -88,7 +89,7 @@ namespace ASC.Data.Reassigns
//_mailEraser = new MailGarbageEngine();
Id = queueWorkerRemove.GetProgressItemId(tenantId, FromUser);
Status = ProgressStatus.Queued;
Status = DistributedTaskStatus.Created;
Error = null;
Percentage = 0;
IsCompleted = false;
@ -114,7 +115,7 @@ namespace ASC.Data.Reassigns
try
{
Percentage = 0;
Status = ProgressStatus.Started;
Status = DistributedTaskStatus.Running;
securityContext.AuthenticateMe(_currentUserId);
@ -158,12 +159,12 @@ namespace ASC.Data.Reassigns
SendSuccessNotify(studioNotifyService, messageService, messageTarget, userName, docsSpace, crmSpace, mailSpace, talkSpace);
Percentage = 100;
Status = ProgressStatus.Done;
Status = DistributedTaskStatus.Completed;
}
catch (Exception ex)
{
logger.Error(ex);
Status = ProgressStatus.Failed;
Status = DistributedTaskStatus.Failted;
Error = ex.Message;
SendErrorNotify(studioNotifyService, ex.Message, userName);
}

View File

@ -171,7 +171,7 @@ namespace ASC.Data.Storage
string[] files;
foreach (var domain in domains)
{
Status = module + domain;
//Status = module + domain;
Log.DebugFormat("Domain: {0}", domain);
files = oldStore.ListFilesRelative(domain, "\\", "*.*", true);

View File

@ -33,6 +33,7 @@ using System.Threading;
using ASC.Common;
using ASC.Common.Caching;
using ASC.Common.Logging;
using ASC.Common.Threading;
using ASC.Common.Threading.Progress;
using ASC.Core;
using ASC.Core.Billing;
@ -55,7 +56,7 @@ namespace ASC.Data.Backup.Service
public class BackupWorker
{
private ILog Log { get; set; }
private ProgressQueue<BaseBackupProgressItem> ProgressQueue { get; set; }
private DistributedTaskQueue ProgressQueue { get; set; }
internal string TempFolder { get; set; }
private string CurrentRegion { get; set; }
private Dictionary<string, string> ConfigPaths { get; set; }
@ -64,14 +65,16 @@ namespace ASC.Data.Backup.Service
private ICacheNotify<BackupProgress> CacheBackupProgress { get; }
private FactoryProgressItem FactoryProgressItem { get; set; }
private readonly object SynchRoot = new object();
public BackupWorker(
IOptionsMonitor<ILog> options,
ICacheNotify<BackupProgress> cacheBackupProgress,
ProgressQueueOptionsManager<BaseBackupProgressItem> progressQueue,
DistributedTaskQueueOptionsManager progressQueue,
FactoryProgressItem factoryProgressItem)
{
Log = options.CurrentValue;
ProgressQueue = progressQueue.Value;
ProgressQueue = progressQueue.Get("backup");
CacheBackupProgress = cacheBackupProgress;
FactoryProgressItem = factoryProgressItem;
}
@ -101,25 +104,30 @@ namespace ASC.Data.Backup.Service
{
if (ProgressQueue != null)
{
ProgressQueue.Terminate();
var tasks = ProgressQueue.GetTasks();
foreach (var t in tasks)
{
ProgressQueue.CancelTask(t.Id);
}
ProgressQueue = null;
}
}
public BackupProgress StartBackup(StartBackupRequest request)
{
lock (ProgressQueue.SynchRoot)
lock (SynchRoot)
{
var item = ProgressQueue.GetItems().OfType<BackupProgressItem>().FirstOrDefault(t => t.TenantId == request.TenantId);
var item = ProgressQueue.GetTasks<BackupProgressItem>().FirstOrDefault(t => t.TenantId == request.TenantId);
if (item != null && item.IsCompleted)
{
ProgressQueue.Remove(item);
ProgressQueue.RemoveTask(item.Id);
item = null;
}
if (item == null)
{
item = FactoryProgressItem.CreateBackupProgressItem(request, false, TempFolder, Limit, CurrentRegion, ConfigPaths);
ProgressQueue.Add(item);
ProgressQueue.QueueTask((a, b) => item.RunJob(), item);
}
var progress = ToBackupProgress(item);
@ -132,35 +140,35 @@ namespace ASC.Data.Backup.Service
public void StartScheduledBackup(BackupSchedule schedule)
{
lock (ProgressQueue.SynchRoot)
lock (SynchRoot)
{
var item = ProgressQueue.GetItems().OfType<BackupProgressItem>().FirstOrDefault(t => t.TenantId == schedule.TenantId);
var item = ProgressQueue.GetTasks<BackupProgressItem>().FirstOrDefault(t => t.TenantId == schedule.TenantId);
if (item != null && item.IsCompleted)
{
ProgressQueue.Remove(item);
ProgressQueue.RemoveTask(item.Id);
item = null;
}
if (item == null)
{
item = FactoryProgressItem.CreateBackupProgressItem(schedule, false, TempFolder, Limit, CurrentRegion, ConfigPaths);
ProgressQueue.Add(item);
ProgressQueue.QueueTask((a, b) => item.RunJob(), item);
}
}
}
public BackupProgress GetBackupProgress(int tenantId)
{
lock (ProgressQueue.SynchRoot)
lock (SynchRoot)
{
return ToBackupProgress(ProgressQueue.GetItems().OfType<BackupProgressItem>().FirstOrDefault(t => t.TenantId == tenantId));
return ToBackupProgress(ProgressQueue.GetTasks<BackupProgressItem>().FirstOrDefault(t => t.TenantId == tenantId));
}
}
public void ResetBackupError(int tenantId)
{
lock (ProgressQueue.SynchRoot)
lock (SynchRoot)
{
var progress = ProgressQueue.GetItems().OfType<BackupProgressItem>().FirstOrDefault(t => t.TenantId == tenantId);
var progress = ProgressQueue.GetTasks<BackupProgressItem>().FirstOrDefault(t => t.TenantId == tenantId);
if (progress != null)
{
progress.Error = null;
@ -170,9 +178,9 @@ namespace ASC.Data.Backup.Service
public void ResetRestoreError(int tenantId)
{
lock (ProgressQueue.SynchRoot)
lock (SynchRoot)
{
var progress = ProgressQueue.GetItems().OfType<RestoreProgressItem>().FirstOrDefault(t => t.TenantId == tenantId);
var progress = ProgressQueue.GetTasks<RestoreProgressItem>().FirstOrDefault(t => t.TenantId == tenantId);
if (progress != null)
{
progress.Error = null;
@ -182,18 +190,18 @@ namespace ASC.Data.Backup.Service
public BackupProgress StartRestore(StartRestoreRequest request)
{
lock (ProgressQueue.SynchRoot)
lock (SynchRoot)
{
var item = ProgressQueue.GetItems().OfType<RestoreProgressItem>().FirstOrDefault(t => t.TenantId == request.TenantId);
var item = ProgressQueue.GetTasks<RestoreProgressItem>().FirstOrDefault(t => t.TenantId == request.TenantId);
if (item != null && item.IsCompleted)
{
ProgressQueue.Remove(item);
ProgressQueue.RemoveTask(item.Id);
item = null;
}
if (item == null)
{
item = FactoryProgressItem.CreateRestoreProgressItem(request, TempFolder, UpgradesPath, CurrentRegion, ConfigPaths);
ProgressQueue.Add(item);
ProgressQueue.QueueTask((a, b) => item.RunJob(), item);
}
return ToBackupProgress(item);
}
@ -201,19 +209,19 @@ namespace ASC.Data.Backup.Service
public BackupProgress StartTransfer(int tenantId, string targetRegion, bool transferMail, bool notify)
{
lock (ProgressQueue.SynchRoot)
lock (SynchRoot)
{
var item = ProgressQueue.GetItems().OfType<TransferProgressItem>().FirstOrDefault(t => t.TenantId == tenantId);
var item = ProgressQueue.GetTasks<TransferProgressItem>().FirstOrDefault(t => t.TenantId == tenantId);
if (item != null && item.IsCompleted)
{
ProgressQueue.Remove(item);
ProgressQueue.RemoveTask(item.Id);
item = null;
}
if (item == null)
{
item = FactoryProgressItem.CreateTransferProgressItem(targetRegion, transferMail, tenantId, TempFolder, Limit, notify, CurrentRegion, ConfigPaths);
ProgressQueue.Add(item);
ProgressQueue.QueueTask((a, b) => item.RunJob(), item);
}
return ToBackupProgress(item);
@ -253,6 +261,7 @@ namespace ASC.Data.Backup.Service
internal void PublishProgress(BaseBackupProgressItem progress)
{
progress.PublishChanges();
PublishProgress(ToBackupProgress(progress));
}
@ -281,19 +290,63 @@ namespace ASC.Data.Backup.Service
};
}
public abstract class BaseBackupProgressItem : IProgressItem
public abstract class BaseBackupProgressItem : DistributedTask, IProgressItem
{
public object Id { get; set; }
public object error;
public object Error
{
get
{
return error ?? GetProperty<object>(nameof(error));
}
set
{
error = value;
SetProperty(nameof(error), value);
}
}
public object Status { get; set; }
public double? percentage;
public double Percentage
{
get
{
return percentage ?? GetProperty<double>(nameof(percentage));
}
set
{
percentage = value;
SetProperty(nameof(percentage), value);
}
}
public object Error { get; set; }
public bool? isCompleted;
public bool IsCompleted
{
get
{
return isCompleted ?? GetProperty<bool>(nameof(isCompleted));
}
set
{
isCompleted = value;
SetProperty(nameof(isCompleted), value);
}
}
public double Percentage { get; set; }
public bool IsCompleted { get; set; }
public int TenantId { get; set; }
private int? tenantId;
public int TenantId
{
get
{
return tenantId ?? GetProperty<int>(nameof(tenantId));
}
set
{
tenantId = value;
SetProperty(nameof(tenantId), value);
}
}
public abstract BackupProgressItemEnum BackupProgressItemEnum { get; }
@ -324,11 +377,8 @@ namespace ASC.Data.Backup.Service
public BackupProgressItem(IServiceProvider serviceProvider, IOptionsMonitor<ILog> options)
{
Id = Guid.NewGuid();
Log = options.CurrentValue;
ServiceProvider = serviceProvider;
}
public void Init(BackupSchedule schedule, bool isScheduled, string tempFolder, int limit, string currentRegion, Dictionary<string, string> configPaths)
@ -409,7 +459,7 @@ namespace ASC.Data.Backup.Service
repo.SaveBackupRecord(
new BackupRecord
{
Id = (Guid)Id,
Id = Guid.Parse(Id),
TenantId = TenantId,
IsScheduled = IsScheduled,
Name = Path.GetFileName(tempFile),
@ -491,7 +541,6 @@ namespace ASC.Data.Backup.Service
}
public void Init(StartRestoreRequest request, string tempFolder, string upgradesPath, string currentRegion, Dictionary<string, string> configPaths)
{
Id = Guid.NewGuid();
TenantId = request.TenantId;
Notify = request.NotifyAfterCompletion;
StoragePath = request.FilePathOrId;
@ -525,7 +574,7 @@ namespace ASC.Data.Backup.Service
tenantManager.SaveTenant(tenant);
var columnMapper = new ColumnMapper();
columnMapper.SetMapping("tenants_tenants", "alias", tenant.TenantAlias, ((Guid)Id).ToString("N"));
columnMapper.SetMapping("tenants_tenants", "alias", tenant.TenantAlias, (Guid.Parse(Id)).ToString("N"));
columnMapper.Commit();
var restoreTask = scope.ServiceProvider.GetService<RestorePortalTask>();
@ -652,7 +701,6 @@ namespace ASC.Data.Backup.Service
string currentRegion,
Dictionary<string, string> configPaths)
{
Id = Guid.NewGuid();
TenantId = tenantId;
TargetRegion = targetRegion;
TransferMail = transferMail;

View File

@ -1,7 +1,8 @@
using System;
using ASC.Api.Core;
using ASC.Common;
using ASC.Common;
using ASC.Common.Threading.Workers;
using ASC.Data.Backup.Controllers;
using ASC.Data.Backup.Service;

View File

@ -2,10 +2,10 @@
using System;
using ASC.Api.Core;
using ASC.Common;
using ASC.Common;
using ASC.Common.Threading.Workers;
using ASC.Data.Reassigns;
using ASC.Employee.Core.Controllers;
using ASC.Web.Core.Users;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
@ -27,8 +27,7 @@ namespace ASC.People
var diHelper = new DIHelper(services);
diHelper.AddProgressQueue<RemoveProgressItem>(1, (int)TimeSpan.FromMinutes(5).TotalMilliseconds, true, false, 0);
diHelper.AddProgressQueue<ReassignProgressItem>(1, (int)TimeSpan.FromMinutes(5).TotalMilliseconds, true, false, 0);
diHelper.AddWorkerQueue<ResizeWorkerItem>(2, (int)TimeSpan.FromMinutes(30).TotalMilliseconds, true, 1);
diHelper.AddProgressQueue<ReassignProgressItem>(1, (int)TimeSpan.FromMinutes(5).TotalMilliseconds, true, false, 0);
diHelper
.AddPeopleController()