diff --git a/common/ASC.Common/Caching/AscCache.cs b/common/ASC.Common/Caching/AscCache.cs index 90baae5bc8..b7be7dbaf8 100644 --- a/common/ASC.Common/Caching/AscCache.cs +++ b/common/ASC.Common/Caching/AscCache.cs @@ -28,8 +28,9 @@ using System; using System.Collections.Generic; using System.Linq; using System.Runtime.Caching; -using System.Text.RegularExpressions; - +using System.Text.RegularExpressions; +using Google.Protobuf; + namespace ASC.Common.Caching { public class AscCache : ICache @@ -143,7 +144,7 @@ namespace ASC.Common.Caching public void ClearCache() { - KafkaNotify.Publish(new AscCacheItem() { Id = Guid.NewGuid().ToString() }, CacheNotifyAction.Any); + KafkaNotify.Publish(new AscCacheItem() { Id = ByteString.CopyFrom(Guid.NewGuid().ToByteArray()) }, CacheNotifyAction.Any); } private MemoryCache GetCache() diff --git a/common/ASC.Common/Caching/KafkaCache.cs b/common/ASC.Common/Caching/KafkaCache.cs index c58f6a8439..d0c6645d47 100644 --- a/common/ASC.Common/Caching/KafkaCache.cs +++ b/common/ASC.Common/Caching/KafkaCache.cs @@ -17,25 +17,32 @@ namespace ASC.Common.Caching private ClientConfig ClientConfig { get; set; } private ILog Log { get; set; } private ConcurrentDictionary Cts { get; set; } + private ConcurrentDictionary> Actions { get; set; } private MemoryCacheNotify MemoryCacheNotify { get; set; } private string ChannelName { get; } = $"ascchannel{typeof(T).Name}"; - private ProtobufSerializer Serializer { get; } = new ProtobufSerializer(); - private ProtobufDeserializer Deserializer { get; } = new ProtobufDeserializer(); - - private IProducer Producer { get; } + private ProtobufSerializer ValueSerializer { get; } = new ProtobufSerializer(); + private ProtobufDeserializer ValueDeserializer { get; } = new ProtobufDeserializer(); + private ProtobufSerializer KeySerializer { get; } = new ProtobufSerializer(); + private ProtobufDeserializer KeyDeserializer { get; } = new ProtobufDeserializer(); + private IProducer Producer { get; } + private Guid Key { get; set; } public KafkaCache() { Log = LogManager.GetLogger("ASC"); Cts = new ConcurrentDictionary(); + Actions = new ConcurrentDictionary>(); + Key = Guid.NewGuid(); var settings = ConfigurationManager.GetSetting("kafka"); if (settings != null && !string.IsNullOrEmpty(settings.BootstrapServers)) { ClientConfig = new ClientConfig { BootstrapServers = settings.BootstrapServers }; + var config = new ProducerConfig(ClientConfig); - Producer = new ProducerBuilder(config) + Producer = new ProducerBuilder(config) .SetErrorHandler((_, e) => Log.Error(e)) - .SetValueSerializer(Serializer) + .SetKeySerializer(KeySerializer) + .SetValueSerializer(ValueSerializer) .Build(); } else @@ -55,7 +62,23 @@ namespace ASC.Common.Caching try { - Producer.ProduceAsync(GetChannelName(cacheNotifyAction), new Message() { Value = obj }); + var channelName = GetChannelName(cacheNotifyAction); + + if (Actions.TryGetValue(channelName, out var onchange)) + { + onchange(obj); + } + + var message = new Message + { + Value = obj, + Key = new AscCacheItem + { + Id = ByteString.CopyFrom(Key.ToByteArray()) + } + }; + + Producer.ProduceAsync(channelName, message); } catch (ProduceException e) { @@ -74,8 +97,9 @@ namespace ASC.Common.Caching MemoryCacheNotify.Subscribe(onchange, cacheNotifyAction); return; } - - Cts[GetChannelName(cacheNotifyAction)] = new CancellationTokenSource(); + var channelName = GetChannelName(cacheNotifyAction); + Cts[channelName] = new CancellationTokenSource(); + Actions[channelName] = onchange; void action() { @@ -84,12 +108,13 @@ namespace ASC.Common.Caching GroupId = Guid.NewGuid().ToString() }; - using var c = new ConsumerBuilder(conf) + using var c = new ConsumerBuilder(conf) .SetErrorHandler((_, e) => Log.Error(e)) - .SetValueDeserializer(Deserializer) + .SetKeyDeserializer(KeyDeserializer) + .SetValueDeserializer(ValueDeserializer) .Build(); - c.Assign(new TopicPartition(GetChannelName(cacheNotifyAction), new Partition())); + c.Assign(new TopicPartition(channelName, new Partition())); try { @@ -97,10 +122,10 @@ namespace ASC.Common.Caching { try { - var cr = c.Consume(Cts[GetChannelName(cacheNotifyAction)].Token); - if (cr != null && cr.Value != null) + var cr = c.Consume(Cts[channelName].Token); + if (cr != null && cr.Value != null && !(new Guid(cr.Key.Id.ToByteArray())).Equals(Key) && Actions.TryGetValue(channelName, out var act)) { - onchange(cr.Value); + act(cr.Value); } } catch (ConsumeException e) diff --git a/common/ASC.Common/protos/AscCacheItem.proto b/common/ASC.Common/protos/AscCacheItem.proto index 74b59401b2..8fbd2e7cfb 100644 --- a/common/ASC.Common/protos/AscCacheItem.proto +++ b/common/ASC.Common/protos/AscCacheItem.proto @@ -3,5 +3,5 @@ package ASC.Common; message AscCacheItem { - string Id = 1; + bytes Id = 1; } \ No newline at end of file