using System; using System.Collections.Generic; using System.Threading; using System.Threading.Tasks; using ASC.Common.Logging; using ASC.Common.Utils; using Confluent.Kafka; using Google.Protobuf; namespace ASC.Common.Caching { public class KafkaCache : ICacheNotify where T : IMessage, new() { private ClientConfig ClientConfig { get; set; } private ILog Log { get; set; } private Dictionary Cts { get; set; } private MemoryCacheNotify MemoryCacheNotify { get; set; } public KafkaCache() { Log = LogManager.GetLogger("ASC"); Cts = new Dictionary(); var settings = ConfigurationManager.GetSetting("kafka"); if (settings != null && !string.IsNullOrEmpty(settings.BootstrapServers)) { ClientConfig = new ClientConfig { BootstrapServers = settings.BootstrapServers }; } else { MemoryCacheNotify = new MemoryCacheNotify(); } } public async void Publish(T obj, CacheNotifyAction cacheNotifyAction) { if(ClientConfig == null) { MemoryCacheNotify.Publish(obj, cacheNotifyAction); return; } var config = new ProducerConfig(ClientConfig); using var p = new ProducerBuilder(config) .SetErrorHandler((_, e) => Log.Error(e)) .SetValueSerializer(new ProtobufSerializer()) .Build(); try { var dr = await p.ProduceAsync(GetChannelName(cacheNotifyAction), new Message() { Value = obj }); } catch (ProduceException e) { Log.Error(e); } catch (Exception e) { Log.Error(e); } } public void Subscribe(Action onchange, CacheNotifyAction cacheNotifyAction) { if (ClientConfig == null) { MemoryCacheNotify.Subscribe(onchange, cacheNotifyAction); return; } void action() { var conf = new ConsumerConfig(ClientConfig) { GroupId = new Guid().ToString(), EnableAutoCommit = true }; using var c = new ConsumerBuilder(conf) .SetErrorHandler((_, e) => Log.Error(e)) .SetValueDeserializer(new ProtobufDeserializer()) .Build(); c.Subscribe(GetChannelName(cacheNotifyAction)); Cts[cacheNotifyAction] = new CancellationTokenSource(); try { while (true) { try { var cr = c.Consume(Cts[cacheNotifyAction].Token); if (cr != null && cr.Value != null) { onchange(cr.Value); } } catch (ConsumeException e) { Log.Error(e); } } } catch (OperationCanceledException) { c.Close(); } } var task = new Task(action, TaskCreationOptions.LongRunning); task.Start(); } private string GetChannelName(CacheNotifyAction cacheNotifyAction) { return $"ascchannel{typeof(T).Name}{cacheNotifyAction}"; } public void Unsubscribe(CacheNotifyAction action) { Cts[action].Cancel(); } } public class KafkaSettings { public string BootstrapServers { get; set; } } public class MemoryCacheNotify : ICacheNotify where T : IMessage, new() { private readonly Dictionary>> actions = new Dictionary>>(); public void Publish(T obj, CacheNotifyAction action) { if (actions.TryGetValue(GetKey(action), out var onchange) && onchange != null) { foreach(var a in onchange) { a(obj); } } } public void Subscribe(Action onchange, CacheNotifyAction notifyAction) { if (onchange != null) { var key = GetKey(notifyAction); actions.TryAdd(key, new List>()); actions[key].Add(onchange); } } public void Unsubscribe(CacheNotifyAction action) { actions.Remove(GetKey(action)); } private string GetKey(CacheNotifyAction cacheNotifyAction) { return $"{typeof(T).Name}{cacheNotifyAction}"; } } }