From 0a0cfbbf881037d9e2796edd6897f6b3a95b94e5 Mon Sep 17 00:00:00 2001 From: pavelbannov Date: Tue, 23 Jul 2019 16:12:16 +0300 Subject: [PATCH] Common: added distributed task cache AscCache: removed Default --- common/ASC.Common/ASC.Common.csproj | 1 + common/ASC.Common/Caching/AscCache.cs | 3 - .../ASC.Common/Threading/DistributedTask.cs | 117 ++++++++++-------- .../Threading/DistributedTaskQueue.cs | 49 +++++--- .../protos/DistributedTaskCache.proto | 17 +++ 5 files changed, 114 insertions(+), 73 deletions(-) create mode 100644 common/ASC.Common/protos/DistributedTaskCache.proto diff --git a/common/ASC.Common/ASC.Common.csproj b/common/ASC.Common/ASC.Common.csproj index 88145e867f..30f8fa6924 100644 --- a/common/ASC.Common/ASC.Common.csproj +++ b/common/ASC.Common/ASC.Common.csproj @@ -57,6 +57,7 @@ + \ No newline at end of file diff --git a/common/ASC.Common/Caching/AscCache.cs b/common/ASC.Common/Caching/AscCache.cs index cf6dc4a936..3daa841688 100644 --- a/common/ASC.Common/Caching/AscCache.cs +++ b/common/ASC.Common/Caching/AscCache.cs @@ -34,8 +34,6 @@ namespace ASC.Common.Caching { public class AscCache : ICache { - public static readonly ICache Default; - public static readonly ICache Memory; public readonly ICacheNotify KafkaNotify; @@ -43,7 +41,6 @@ namespace ASC.Common.Caching static AscCache() { Memory = new AscCache(); - Default = Memory; } private AscCache() diff --git a/common/ASC.Common/Threading/DistributedTask.cs b/common/ASC.Common/Threading/DistributedTask.cs index 85153e3b08..4bf17dd21c 100644 --- a/common/ASC.Common/Threading/DistributedTask.cs +++ b/common/ASC.Common/Threading/DistributedTask.cs @@ -26,80 +26,93 @@ using Newtonsoft.Json; using System; -using System.Collections.Generic; -using System.Runtime.Serialization; +using System.Linq; namespace ASC.Common.Threading { - public class DistributedTask : ISerializable + public class DistributedTask { - [JsonProperty] - private readonly Dictionary props = new Dictionary(); + internal Action Publication { get; set; } + + public DistributedTaskCache DistributedTaskCache { get; internal set; } + public string InstanseId + { + get + { + return DistributedTaskCache.InstanseId; + } + set + { + DistributedTaskCache.InstanseId = value; + } + } + public string Id + { + get + { + return DistributedTaskCache.Id; + } + private set + { + DistributedTaskCache.Id = value; + } + } - internal Action Publication { get; set; } - - [JsonProperty] - public string InstanseId { get; internal set; } - - [JsonProperty] - public string Id { get; private set; } - - [JsonProperty] - public DistributedTaskStatus Status { get; internal set; } - - [JsonProperty] - public AggregateException Exception { get; internal set; } - + public DistributedTaskStatus Status + { + get + { + return Enum.Parse(DistributedTaskCache.Status); + } + internal set + { + DistributedTaskCache.Status = value.ToString(); + } + } + public AggregateException Exception + { + get + { + return new AggregateException(DistributedTaskCache.Exception); + } + internal set + { + DistributedTaskCache.Exception = value.ToString(); + } + } public DistributedTask() - { - Id = Guid.NewGuid().ToString(); - } - - protected DistributedTask(SerializationInfo info, StreamingContext context) - { - InstanseId = info.GetValue("InstanseId", typeof(object)).ToString(); - Id = info.GetValue("Id", typeof(object)).ToString(); - Status = (DistributedTaskStatus)info.GetValue("Status", typeof(DistributedTaskStatus)); - Exception = (AggregateException)info.GetValue("Exception", typeof(AggregateException)); - foreach (var p in info) - { - if (p.Name.StartsWith("_")) - { - props[p.Name.TrimStart('_')] = p.Value.ToString(); - } - } - } - - public void GetObjectData(SerializationInfo info, StreamingContext context) - { - info.AddValue("InstanseId", InstanseId); - info.AddValue("Id", Id); - info.AddValue("Status", Status); - info.AddValue("Exception", Exception); - foreach (var p in props) - { - info.AddValue("_" + p.Key, p.Value); - } + { + DistributedTaskCache = new DistributedTaskCache + { + Id = Guid.NewGuid().ToString() + }; } public T GetProperty(string name) { - return props.ContainsKey(name) ? JsonConvert.DeserializeObject(props[name]) : default(T); + return DistributedTaskCache.Props.Any(r=> r.Key == name) ? + JsonConvert.DeserializeObject(DistributedTaskCache.Props.Single(r => r.Key == name).Value) : + default; } public void SetProperty(string name, object value) - { + { + var prop = new DistributedTaskCache.Types.DistributedTaskCacheProp() { + Key = name, + Value = JsonConvert.SerializeObject(value) + }; + if (value != null) { - props[name] = JsonConvert.SerializeObject(value); + DistributedTaskCache.Props.Add(prop); } else { - props.Remove(name); + DistributedTaskCache.Props.Remove(prop); } } diff --git a/common/ASC.Common/Threading/DistributedTaskQueue.cs b/common/ASC.Common/Threading/DistributedTaskQueue.cs index 96fde99ba8..69af35ff76 100644 --- a/common/ASC.Common/Threading/DistributedTaskQueue.cs +++ b/common/ASC.Common/Threading/DistributedTaskQueue.cs @@ -39,8 +39,9 @@ namespace ASC.Common.Threading public static readonly string InstanseId; private readonly string key; - private readonly ICache cache; - private static readonly ICacheNotify notify; + private static readonly ICache cache; + private readonly ICacheNotify notify; + private readonly ICacheNotify notifyCache; private readonly TaskScheduler scheduler; private static readonly ConcurrentDictionary cancelations = new ConcurrentDictionary(); @@ -48,14 +49,8 @@ namespace ASC.Common.Threading static DistributedTaskQueue() { InstanseId = Process.GetCurrentProcess().Id.ToString(); - notify = new KafkaCache(); - notify.Subscribe((c) => - { - if (cancelations.TryGetValue(c.Id, out var s)) - { - s.Cancel(); - } - }, CacheNotifyAction.Remove); + + cache = AscCache.Memory; } @@ -72,10 +67,29 @@ namespace ASC.Common.Threading } key = name + GetType().Name; - cache = AscCache.Default; scheduler = maxThreadsCount <= 0 ? TaskScheduler.Default - : new LimitedConcurrencyLevelTaskScheduler(maxThreadsCount); + : new LimitedConcurrencyLevelTaskScheduler(maxThreadsCount); + + notify = new KafkaCache(); + notify.Subscribe((c) => + { + if (cancelations.TryGetValue(c.Id, out var s)) + { + s.Cancel(); + } + }, CacheNotifyAction.Remove); + + notifyCache = new KafkaCache(); + notifyCache.Subscribe((c) => + { + cache.HashSet(key, c.Id, (DistributedTaskCache)null); + }, CacheNotifyAction.Remove); + + notifyCache.Subscribe((c) => + { + cache.HashSet(key, c.Id, c); + }, CacheNotifyAction.InsertOrUpdate); } @@ -138,13 +152,13 @@ namespace ASC.Common.Threading } public void SetTask(DistributedTask task) - { - cache.HashSet(key, task.Id, task); + { + notifyCache.Publish(task.DistributedTaskCache, CacheNotifyAction.InsertOrUpdate); } public void RemoveTask(string id) - { - cache.HashSet(key, id, (DistributedTask)null); + { + notifyCache.Publish(new DistributedTaskCache() { Id = id }, CacheNotifyAction.Remove); } @@ -163,8 +177,7 @@ namespace ASC.Common.Threading { distributedTask.Status = DistributedTaskStatus.Canceled; } - CancellationTokenSource s; - cancelations.TryRemove(id, out s); + cancelations.TryRemove(id, out var s); distributedTask.PublishChanges(); } diff --git a/common/ASC.Common/protos/DistributedTaskCache.proto b/common/ASC.Common/protos/DistributedTaskCache.proto new file mode 100644 index 0000000000..9d8268948c --- /dev/null +++ b/common/ASC.Common/protos/DistributedTaskCache.proto @@ -0,0 +1,17 @@ +syntax = "proto3"; + +package ASC.Common.Threading; + +message DistributedTaskCache { + string Id = 1; + string InstanseId = 2; + string Status = 3; + string Exception = 4; + repeated DistributedTaskCacheProp Props = 5; + + message DistributedTaskCacheProp + { + string Key = 1; + string Value = 2; + } +} \ No newline at end of file