Common: added distributed task cache

AscCache: removed Default
This commit is contained in:
pavelbannov 2019-07-23 16:12:16 +03:00
parent f673795eb9
commit 0a0cfbbf88
5 changed files with 114 additions and 73 deletions

View File

@ -57,6 +57,7 @@
</ItemGroup>
<ItemGroup>
<Protobuf Include="protos/AscCacheItem.proto" />
<Protobuf Include="protos\DistributedTaskCache.proto" />
<Protobuf Include="protos\DistributedTaskCancelation.proto" />
</ItemGroup>
</Project>

View File

@ -34,8 +34,6 @@ namespace ASC.Common.Caching
{
public class AscCache : ICache
{
public static readonly ICache Default;
public static readonly ICache Memory;
public readonly ICacheNotify<AscCacheItem> KafkaNotify;
@ -43,7 +41,6 @@ namespace ASC.Common.Caching
static AscCache()
{
Memory = new AscCache();
Default = Memory;
}
private AscCache()

View File

@ -26,80 +26,93 @@
using Newtonsoft.Json;
using System;
using System.Collections.Generic;
using System.Runtime.Serialization;
using System.Linq;
namespace ASC.Common.Threading
{
public class DistributedTask : ISerializable
public class DistributedTask
{
[JsonProperty]
private readonly Dictionary<string, string> props = new Dictionary<string, string>();
internal Action<DistributedTask> Publication { get; set; }
public DistributedTaskCache DistributedTaskCache { get; internal set; }
public string InstanseId
{
get
{
return DistributedTaskCache.InstanseId;
}
set
{
DistributedTaskCache.InstanseId = value;
}
}
public string Id
{
get
{
return DistributedTaskCache.Id;
}
private set
{
DistributedTaskCache.Id = value;
}
}
internal Action<DistributedTask> Publication { get; set; }
[JsonProperty]
public string InstanseId { get; internal set; }
[JsonProperty]
public string Id { get; private set; }
[JsonProperty]
public DistributedTaskStatus Status { get; internal set; }
[JsonProperty]
public AggregateException Exception { get; internal set; }
public DistributedTaskStatus Status
{
get
{
return Enum.Parse<DistributedTaskStatus>(DistributedTaskCache.Status);
}
internal set
{
DistributedTaskCache.Status = value.ToString();
}
}
public AggregateException Exception
{
get
{
return new AggregateException(DistributedTaskCache.Exception);
}
internal set
{
DistributedTaskCache.Exception = value.ToString();
}
}
public DistributedTask()
{
Id = Guid.NewGuid().ToString();
}
protected DistributedTask(SerializationInfo info, StreamingContext context)
{
InstanseId = info.GetValue("InstanseId", typeof(object)).ToString();
Id = info.GetValue("Id", typeof(object)).ToString();
Status = (DistributedTaskStatus)info.GetValue("Status", typeof(DistributedTaskStatus));
Exception = (AggregateException)info.GetValue("Exception", typeof(AggregateException));
foreach (var p in info)
{
if (p.Name.StartsWith("_"))
{
props[p.Name.TrimStart('_')] = p.Value.ToString();
}
}
}
public void GetObjectData(SerializationInfo info, StreamingContext context)
{
info.AddValue("InstanseId", InstanseId);
info.AddValue("Id", Id);
info.AddValue("Status", Status);
info.AddValue("Exception", Exception);
foreach (var p in props)
{
info.AddValue("_" + p.Key, p.Value);
}
{
DistributedTaskCache = new DistributedTaskCache
{
Id = Guid.NewGuid().ToString()
};
}
public T GetProperty<T>(string name)
{
return props.ContainsKey(name) ? JsonConvert.DeserializeObject<T>(props[name]) : default(T);
return DistributedTaskCache.Props.Any(r=> r.Key == name) ?
JsonConvert.DeserializeObject<T>(DistributedTaskCache.Props.Single(r => r.Key == name).Value) :
default;
}
public void SetProperty(string name, object value)
{
{
var prop = new DistributedTaskCache.Types.DistributedTaskCacheProp() {
Key = name,
Value = JsonConvert.SerializeObject(value)
};
if (value != null)
{
props[name] = JsonConvert.SerializeObject(value);
DistributedTaskCache.Props.Add(prop);
}
else
{
props.Remove(name);
DistributedTaskCache.Props.Remove(prop);
}
}

View File

@ -39,8 +39,9 @@ namespace ASC.Common.Threading
public static readonly string InstanseId;
private readonly string key;
private readonly ICache cache;
private static readonly ICacheNotify<DistributedTaskCancelation> notify;
private static readonly ICache cache;
private readonly ICacheNotify<DistributedTaskCancelation> notify;
private readonly ICacheNotify<DistributedTaskCache> notifyCache;
private readonly TaskScheduler scheduler;
private static readonly ConcurrentDictionary<string, CancellationTokenSource> cancelations = new ConcurrentDictionary<string, CancellationTokenSource>();
@ -48,14 +49,8 @@ namespace ASC.Common.Threading
static DistributedTaskQueue()
{
InstanseId = Process.GetCurrentProcess().Id.ToString();
notify = new KafkaCache<DistributedTaskCancelation>();
notify.Subscribe((c) =>
{
if (cancelations.TryGetValue(c.Id, out var s))
{
s.Cancel();
}
}, CacheNotifyAction.Remove);
cache = AscCache.Memory;
}
@ -72,10 +67,29 @@ namespace ASC.Common.Threading
}
key = name + GetType().Name;
cache = AscCache.Default;
scheduler = maxThreadsCount <= 0
? TaskScheduler.Default
: new LimitedConcurrencyLevelTaskScheduler(maxThreadsCount);
: new LimitedConcurrencyLevelTaskScheduler(maxThreadsCount);
notify = new KafkaCache<DistributedTaskCancelation>();
notify.Subscribe((c) =>
{
if (cancelations.TryGetValue(c.Id, out var s))
{
s.Cancel();
}
}, CacheNotifyAction.Remove);
notifyCache = new KafkaCache<DistributedTaskCache>();
notifyCache.Subscribe((c) =>
{
cache.HashSet(key, c.Id, (DistributedTaskCache)null);
}, CacheNotifyAction.Remove);
notifyCache.Subscribe((c) =>
{
cache.HashSet(key, c.Id, c);
}, CacheNotifyAction.InsertOrUpdate);
}
@ -138,13 +152,13 @@ namespace ASC.Common.Threading
}
public void SetTask(DistributedTask task)
{
cache.HashSet(key, task.Id, task);
{
notifyCache.Publish(task.DistributedTaskCache, CacheNotifyAction.InsertOrUpdate);
}
public void RemoveTask(string id)
{
cache.HashSet(key, id, (DistributedTask)null);
{
notifyCache.Publish(new DistributedTaskCache() { Id = id }, CacheNotifyAction.Remove);
}
@ -163,8 +177,7 @@ namespace ASC.Common.Threading
{
distributedTask.Status = DistributedTaskStatus.Canceled;
}
CancellationTokenSource s;
cancelations.TryRemove(id, out s);
cancelations.TryRemove(id, out var s);
distributedTask.PublishChanges();
}

View File

@ -0,0 +1,17 @@
syntax = "proto3";
package ASC.Common.Threading;
message DistributedTaskCache {
string Id = 1;
string InstanseId = 2;
string Status = 3;
string Exception = 4;
repeated DistributedTaskCacheProp Props = 5;
message DistributedTaskCacheProp
{
string Key = 1;
string Value = 2;
}
}