Merge branch 'master' of github.com:ONLYOFFICE/CommunityServer-AspNetCore
This commit is contained in:
commit
1e28a0c013
@ -57,6 +57,7 @@
|
||||
</ItemGroup>
|
||||
<ItemGroup>
|
||||
<Protobuf Include="protos/AscCacheItem.proto" />
|
||||
<Protobuf Include="protos\DistributedTaskCache.proto" />
|
||||
<Protobuf Include="protos\DistributedTaskCancelation.proto" />
|
||||
</ItemGroup>
|
||||
</Project>
|
@ -34,8 +34,6 @@ namespace ASC.Common.Caching
|
||||
{
|
||||
public class AscCache : ICache
|
||||
{
|
||||
public static readonly ICache Default;
|
||||
|
||||
public static readonly ICache Memory;
|
||||
|
||||
public readonly ICacheNotify<AscCacheItem> KafkaNotify;
|
||||
@ -43,7 +41,6 @@ namespace ASC.Common.Caching
|
||||
static AscCache()
|
||||
{
|
||||
Memory = new AscCache();
|
||||
Default = Memory;
|
||||
}
|
||||
|
||||
private AscCache()
|
||||
|
@ -26,80 +26,93 @@
|
||||
|
||||
using Newtonsoft.Json;
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Runtime.Serialization;
|
||||
using System.Linq;
|
||||
|
||||
namespace ASC.Common.Threading
|
||||
{
|
||||
public class DistributedTask : ISerializable
|
||||
public class DistributedTask
|
||||
{
|
||||
[JsonProperty]
|
||||
private readonly Dictionary<string, string> props = new Dictionary<string, string>();
|
||||
internal Action<DistributedTask> Publication { get; set; }
|
||||
|
||||
public DistributedTaskCache DistributedTaskCache { get; internal set; }
|
||||
|
||||
public string InstanseId
|
||||
{
|
||||
get
|
||||
{
|
||||
return DistributedTaskCache.InstanseId;
|
||||
}
|
||||
set
|
||||
{
|
||||
DistributedTaskCache.InstanseId = value;
|
||||
}
|
||||
}
|
||||
public string Id
|
||||
{
|
||||
get
|
||||
{
|
||||
return DistributedTaskCache.Id;
|
||||
}
|
||||
private set
|
||||
{
|
||||
DistributedTaskCache.Id = value;
|
||||
}
|
||||
}
|
||||
|
||||
internal Action<DistributedTask> Publication { get; set; }
|
||||
|
||||
[JsonProperty]
|
||||
public string InstanseId { get; internal set; }
|
||||
|
||||
[JsonProperty]
|
||||
public string Id { get; private set; }
|
||||
|
||||
[JsonProperty]
|
||||
public DistributedTaskStatus Status { get; internal set; }
|
||||
|
||||
[JsonProperty]
|
||||
public AggregateException Exception { get; internal set; }
|
||||
|
||||
public DistributedTaskStatus Status
|
||||
{
|
||||
get
|
||||
{
|
||||
return Enum.Parse<DistributedTaskStatus>(DistributedTaskCache.Status);
|
||||
}
|
||||
internal set
|
||||
{
|
||||
DistributedTaskCache.Status = value.ToString();
|
||||
}
|
||||
}
|
||||
|
||||
public AggregateException Exception
|
||||
{
|
||||
get
|
||||
{
|
||||
return new AggregateException(DistributedTaskCache.Exception);
|
||||
}
|
||||
internal set
|
||||
{
|
||||
DistributedTaskCache.Exception = value.ToString();
|
||||
}
|
||||
}
|
||||
|
||||
public DistributedTask()
|
||||
{
|
||||
Id = Guid.NewGuid().ToString();
|
||||
}
|
||||
|
||||
protected DistributedTask(SerializationInfo info, StreamingContext context)
|
||||
{
|
||||
InstanseId = info.GetValue("InstanseId", typeof(object)).ToString();
|
||||
Id = info.GetValue("Id", typeof(object)).ToString();
|
||||
Status = (DistributedTaskStatus)info.GetValue("Status", typeof(DistributedTaskStatus));
|
||||
Exception = (AggregateException)info.GetValue("Exception", typeof(AggregateException));
|
||||
foreach (var p in info)
|
||||
{
|
||||
if (p.Name.StartsWith("_"))
|
||||
{
|
||||
props[p.Name.TrimStart('_')] = p.Value.ToString();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void GetObjectData(SerializationInfo info, StreamingContext context)
|
||||
{
|
||||
info.AddValue("InstanseId", InstanseId);
|
||||
info.AddValue("Id", Id);
|
||||
info.AddValue("Status", Status);
|
||||
info.AddValue("Exception", Exception);
|
||||
foreach (var p in props)
|
||||
{
|
||||
info.AddValue("_" + p.Key, p.Value);
|
||||
}
|
||||
{
|
||||
DistributedTaskCache = new DistributedTaskCache
|
||||
{
|
||||
Id = Guid.NewGuid().ToString()
|
||||
};
|
||||
}
|
||||
|
||||
|
||||
public T GetProperty<T>(string name)
|
||||
{
|
||||
return props.ContainsKey(name) ? JsonConvert.DeserializeObject<T>(props[name]) : default(T);
|
||||
return DistributedTaskCache.Props.Any(r=> r.Key == name) ?
|
||||
JsonConvert.DeserializeObject<T>(DistributedTaskCache.Props.Single(r => r.Key == name).Value) :
|
||||
default;
|
||||
}
|
||||
|
||||
public void SetProperty(string name, object value)
|
||||
{
|
||||
{
|
||||
var prop = new DistributedTaskCache.Types.DistributedTaskCacheProp() {
|
||||
Key = name,
|
||||
Value = JsonConvert.SerializeObject(value)
|
||||
};
|
||||
|
||||
if (value != null)
|
||||
{
|
||||
props[name] = JsonConvert.SerializeObject(value);
|
||||
DistributedTaskCache.Props.Add(prop);
|
||||
}
|
||||
else
|
||||
{
|
||||
props.Remove(name);
|
||||
DistributedTaskCache.Props.Remove(prop);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -39,8 +39,9 @@ namespace ASC.Common.Threading
|
||||
public static readonly string InstanseId;
|
||||
|
||||
private readonly string key;
|
||||
private readonly ICache cache;
|
||||
private static readonly ICacheNotify<DistributedTaskCancelation> notify;
|
||||
private static readonly ICache cache;
|
||||
private readonly ICacheNotify<DistributedTaskCancelation> notify;
|
||||
private readonly ICacheNotify<DistributedTaskCache> notifyCache;
|
||||
private readonly TaskScheduler scheduler;
|
||||
private static readonly ConcurrentDictionary<string, CancellationTokenSource> cancelations = new ConcurrentDictionary<string, CancellationTokenSource>();
|
||||
|
||||
@ -48,14 +49,8 @@ namespace ASC.Common.Threading
|
||||
static DistributedTaskQueue()
|
||||
{
|
||||
InstanseId = Process.GetCurrentProcess().Id.ToString();
|
||||
notify = new KafkaCache<DistributedTaskCancelation>();
|
||||
notify.Subscribe((c) =>
|
||||
{
|
||||
if (cancelations.TryGetValue(c.Id, out var s))
|
||||
{
|
||||
s.Cancel();
|
||||
}
|
||||
}, CacheNotifyAction.Remove);
|
||||
|
||||
cache = AscCache.Memory;
|
||||
}
|
||||
|
||||
|
||||
@ -72,10 +67,29 @@ namespace ASC.Common.Threading
|
||||
}
|
||||
|
||||
key = name + GetType().Name;
|
||||
cache = AscCache.Default;
|
||||
scheduler = maxThreadsCount <= 0
|
||||
? TaskScheduler.Default
|
||||
: new LimitedConcurrencyLevelTaskScheduler(maxThreadsCount);
|
||||
: new LimitedConcurrencyLevelTaskScheduler(maxThreadsCount);
|
||||
|
||||
notify = new KafkaCache<DistributedTaskCancelation>();
|
||||
notify.Subscribe((c) =>
|
||||
{
|
||||
if (cancelations.TryGetValue(c.Id, out var s))
|
||||
{
|
||||
s.Cancel();
|
||||
}
|
||||
}, CacheNotifyAction.Remove);
|
||||
|
||||
notifyCache = new KafkaCache<DistributedTaskCache>();
|
||||
notifyCache.Subscribe((c) =>
|
||||
{
|
||||
cache.HashSet(key, c.Id, (DistributedTaskCache)null);
|
||||
}, CacheNotifyAction.Remove);
|
||||
|
||||
notifyCache.Subscribe((c) =>
|
||||
{
|
||||
cache.HashSet(key, c.Id, c);
|
||||
}, CacheNotifyAction.InsertOrUpdate);
|
||||
}
|
||||
|
||||
|
||||
@ -138,13 +152,13 @@ namespace ASC.Common.Threading
|
||||
}
|
||||
|
||||
public void SetTask(DistributedTask task)
|
||||
{
|
||||
cache.HashSet(key, task.Id, task);
|
||||
{
|
||||
notifyCache.Publish(task.DistributedTaskCache, CacheNotifyAction.InsertOrUpdate);
|
||||
}
|
||||
|
||||
public void RemoveTask(string id)
|
||||
{
|
||||
cache.HashSet(key, id, (DistributedTask)null);
|
||||
{
|
||||
notifyCache.Publish(new DistributedTaskCache() { Id = id }, CacheNotifyAction.Remove);
|
||||
}
|
||||
|
||||
|
||||
@ -163,8 +177,7 @@ namespace ASC.Common.Threading
|
||||
{
|
||||
distributedTask.Status = DistributedTaskStatus.Canceled;
|
||||
}
|
||||
CancellationTokenSource s;
|
||||
cancelations.TryRemove(id, out s);
|
||||
cancelations.TryRemove(id, out var s);
|
||||
|
||||
distributedTask.PublishChanges();
|
||||
}
|
||||
|
17
common/ASC.Common/protos/DistributedTaskCache.proto
Normal file
17
common/ASC.Common/protos/DistributedTaskCache.proto
Normal file
@ -0,0 +1,17 @@
|
||||
syntax = "proto3";
|
||||
|
||||
package ASC.Common.Threading;
|
||||
|
||||
message DistributedTaskCache {
|
||||
string Id = 1;
|
||||
string InstanseId = 2;
|
||||
string Status = 3;
|
||||
string Exception = 4;
|
||||
repeated DistributedTaskCacheProp Props = 5;
|
||||
|
||||
message DistributedTaskCacheProp
|
||||
{
|
||||
string Key = 1;
|
||||
string Value = 2;
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user