using System; using System.Collections.Concurrent; using System.Threading; using System.Threading.Tasks; using ASC.Common.Logging; using ASC.Common.Utils; using Confluent.Kafka; using Confluent.Kafka.Admin; using Google.Protobuf; using Microsoft.Extensions.Options; namespace ASC.Common.Caching { [Singletone] public class KafkaCache : IDisposable, ICacheNotify where T : IMessage, new() { private ClientConfig ClientConfig { get; set; } private AdminClientConfig AdminClientConfig { get; set; } private ILog Log { get; set; } private ConcurrentDictionary Cts { get; set; } private ConcurrentDictionary> Actions { get; set; } private ProtobufSerializer ValueSerializer { get; } = new ProtobufSerializer(); private ProtobufDeserializer ValueDeserializer { get; } = new ProtobufDeserializer(); private ProtobufSerializer KeySerializer { get; } = new ProtobufSerializer(); private ProtobufDeserializer KeyDeserializer { get; } = new ProtobufDeserializer(); private IProducer Producer { get; set; } private Guid Key { get; set; } public KafkaCache(ConfigurationExtension configuration, IOptionsMonitor options) { Log = options.CurrentValue; Cts = new ConcurrentDictionary(); Actions = new ConcurrentDictionary>(); Key = Guid.NewGuid(); var settings = configuration.GetSetting("kafka"); ClientConfig = new ClientConfig { BootstrapServers = settings.BootstrapServers }; AdminClientConfig = new AdminClientConfig { BootstrapServers = settings.BootstrapServers }; } public void Publish(T obj, CacheNotifyAction cacheNotifyAction) { try { if (Producer == null) { Producer = new ProducerBuilder(new ProducerConfig(ClientConfig)) .SetErrorHandler((_, e) => Log.Error(e)) .SetKeySerializer(KeySerializer) .SetValueSerializer(ValueSerializer) .Build(); } var channelName = GetChannelName(cacheNotifyAction); if (Actions.TryGetValue(channelName, out var onchange)) { onchange(obj); } var message = new Message { Value = obj, Key = new AscCacheItem { Id = Key.ToString() } }; Producer.ProduceAsync(channelName, message); } catch (ProduceException e) { Log.Error(e); } catch (Exception e) { Log.Error(e); } } public void Subscribe(Action onchange, CacheNotifyAction cacheNotifyAction) { var channelName = GetChannelName(cacheNotifyAction); Cts[channelName] = new CancellationTokenSource(); Actions[channelName] = onchange; void action() { var conf = new ConsumerConfig(ClientConfig) { GroupId = Guid.NewGuid().ToString() }; using (var adminClient = new AdminClientBuilder(AdminClientConfig) .SetErrorHandler((_, e) => Log.Error(e)) .Build()) { try { //TODO: must add checking exist adminClient.CreateTopicsAsync( new TopicSpecification[] { new TopicSpecification { Name = channelName, NumPartitions = 1, ReplicationFactor = 1 } }).Wait(); } catch (AggregateException) { } } using var c = new ConsumerBuilder(conf) .SetErrorHandler((_, e) => Log.Error(e)) .SetKeyDeserializer(KeyDeserializer) .SetValueDeserializer(ValueDeserializer) .Build(); c.Assign(new TopicPartition(channelName, new Partition())); try { while (true) { try { var cr = c.Consume(Cts[channelName].Token); if (cr != null && cr.Message != null && cr.Message.Value != null && !(new Guid(cr.Message.Key.Id)).Equals(Key) && Actions.TryGetValue(channelName, out var act)) { try { act(cr.Message.Value); } catch (Exception e) { Log.Error("Kafka onmessage", e); } } } catch (ConsumeException e) { Log.Error(e); } } } catch (OperationCanceledException) { c.Close(); } } var task = new Task(action, TaskCreationOptions.LongRunning); task.Start(); } private string GetChannelName(CacheNotifyAction cacheNotifyAction) { return $"ascchannel{cacheNotifyAction}{typeof(T).FullName}".ToLower(); } public void Unsubscribe(CacheNotifyAction action) { Cts.TryGetValue(GetChannelName(action), out var source); if (source != null) { source.Cancel(); } } private bool disposedValue = false; // To detect redundant calls protected virtual void Dispose(bool disposing) { if (!disposedValue) { if (disposing && Producer != null) { Producer.Dispose(); } disposedValue = true; } } ~KafkaCache() { Dispose(false); } public void Dispose() { Dispose(true); GC.SuppressFinalize(this); } } public class KafkaSettings { public string BootstrapServers { get; set; } } }