2019-07-19 13:10:15 +00:00
|
|
|
|
using System;
|
2019-07-23 07:45:54 +00:00
|
|
|
|
using System.Collections.Concurrent;
|
2019-07-19 13:10:15 +00:00
|
|
|
|
using System.Threading;
|
|
|
|
|
using System.Threading.Tasks;
|
2020-02-12 13:50:38 +00:00
|
|
|
|
|
2019-07-19 13:10:15 +00:00
|
|
|
|
using ASC.Common.Logging;
|
|
|
|
|
using ASC.Common.Utils;
|
2020-02-12 13:50:38 +00:00
|
|
|
|
|
2019-07-19 13:10:15 +00:00
|
|
|
|
using Confluent.Kafka;
|
2021-08-11 18:15:37 +00:00
|
|
|
|
using Confluent.Kafka.Admin;
|
2019-07-19 13:10:15 +00:00
|
|
|
|
|
|
|
|
|
using Google.Protobuf;
|
2020-02-12 13:50:38 +00:00
|
|
|
|
|
2019-10-17 15:55:35 +00:00
|
|
|
|
using Microsoft.Extensions.Options;
|
2019-07-19 13:10:15 +00:00
|
|
|
|
|
|
|
|
|
namespace ASC.Common.Caching
|
|
|
|
|
{
|
2020-10-21 11:52:44 +00:00
|
|
|
|
[Singletone]
|
2019-08-27 14:57:06 +00:00
|
|
|
|
public class KafkaCache<T> : IDisposable, ICacheNotify<T> where T : IMessage<T>, new()
|
2019-07-19 13:10:15 +00:00
|
|
|
|
{
|
|
|
|
|
private ClientConfig ClientConfig { get; set; }
|
2021-08-11 18:15:37 +00:00
|
|
|
|
private AdminClientConfig AdminClientConfig { get; set; }
|
2019-07-19 13:10:15 +00:00
|
|
|
|
private ILog Log { get; set; }
|
2019-08-27 14:57:06 +00:00
|
|
|
|
private ConcurrentDictionary<string, CancellationTokenSource> Cts { get; set; }
|
2019-08-28 11:14:09 +00:00
|
|
|
|
private ConcurrentDictionary<string, Action<T>> Actions { get; set; }
|
|
|
|
|
private ProtobufSerializer<T> ValueSerializer { get; } = new ProtobufSerializer<T>();
|
|
|
|
|
private ProtobufDeserializer<T> ValueDeserializer { get; } = new ProtobufDeserializer<T>();
|
|
|
|
|
private ProtobufSerializer<AscCacheItem> KeySerializer { get; } = new ProtobufSerializer<AscCacheItem>();
|
|
|
|
|
private ProtobufDeserializer<AscCacheItem> KeyDeserializer { get; } = new ProtobufDeserializer<AscCacheItem>();
|
2019-09-19 12:48:37 +00:00
|
|
|
|
private IProducer<AscCacheItem, T> Producer { get; set; }
|
2019-08-28 11:14:09 +00:00
|
|
|
|
private Guid Key { get; set; }
|
2019-10-11 15:03:03 +00:00
|
|
|
|
|
2020-11-24 10:15:11 +00:00
|
|
|
|
public KafkaCache(ConfigurationExtension configuration, IOptionsMonitor<ILog> options)
|
2019-10-11 15:03:03 +00:00
|
|
|
|
{
|
2019-11-06 15:03:09 +00:00
|
|
|
|
Log = options.CurrentValue;
|
2019-10-11 15:03:03 +00:00
|
|
|
|
Cts = new ConcurrentDictionary<string, CancellationTokenSource>();
|
|
|
|
|
Actions = new ConcurrentDictionary<string, Action<T>>();
|
|
|
|
|
Key = Guid.NewGuid();
|
|
|
|
|
|
|
|
|
|
var settings = configuration.GetSetting<KafkaSettings>("kafka");
|
|
|
|
|
|
2022-01-13 11:24:14 +00:00
|
|
|
|
ClientConfig = new ClientConfig { BootstrapServers = settings.BootstrapServers };
|
|
|
|
|
AdminClientConfig = new AdminClientConfig { BootstrapServers = settings.BootstrapServers };
|
2019-07-19 13:10:15 +00:00
|
|
|
|
}
|
|
|
|
|
|
2019-08-27 14:57:06 +00:00
|
|
|
|
public void Publish(T obj, CacheNotifyAction cacheNotifyAction)
|
2019-07-19 13:10:15 +00:00
|
|
|
|
{
|
|
|
|
|
try
|
|
|
|
|
{
|
2019-10-11 15:03:03 +00:00
|
|
|
|
if (Producer == null)
|
2019-09-19 12:48:37 +00:00
|
|
|
|
{
|
|
|
|
|
Producer = new ProducerBuilder<AscCacheItem, T>(new ProducerConfig(ClientConfig))
|
|
|
|
|
.SetErrorHandler((_, e) => Log.Error(e))
|
|
|
|
|
.SetKeySerializer(KeySerializer)
|
|
|
|
|
.SetValueSerializer(ValueSerializer)
|
|
|
|
|
.Build();
|
|
|
|
|
}
|
|
|
|
|
|
2019-08-28 11:14:09 +00:00
|
|
|
|
var channelName = GetChannelName(cacheNotifyAction);
|
|
|
|
|
|
|
|
|
|
if (Actions.TryGetValue(channelName, out var onchange))
|
|
|
|
|
{
|
|
|
|
|
onchange(obj);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
var message = new Message<AscCacheItem, T>
|
|
|
|
|
{
|
|
|
|
|
Value = obj,
|
|
|
|
|
Key = new AscCacheItem
|
|
|
|
|
{
|
2021-12-23 15:29:35 +00:00
|
|
|
|
Id = Key.ToString()
|
2019-08-28 11:14:09 +00:00
|
|
|
|
}
|
|
|
|
|
};
|
|
|
|
|
|
2020-10-12 19:39:23 +00:00
|
|
|
|
Producer.ProduceAsync(channelName, message);
|
2019-07-19 13:10:15 +00:00
|
|
|
|
}
|
|
|
|
|
catch (ProduceException<Null, string> e)
|
|
|
|
|
{
|
|
|
|
|
Log.Error(e);
|
|
|
|
|
}
|
|
|
|
|
catch (Exception e)
|
|
|
|
|
{
|
|
|
|
|
Log.Error(e);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2021-10-21 17:59:05 +00:00
|
|
|
|
public async Task PublishAsync(T obj, CacheNotifyAction cacheNotifyAction)
|
|
|
|
|
{
|
|
|
|
|
try
|
|
|
|
|
{
|
|
|
|
|
if (Producer == null)
|
|
|
|
|
{
|
|
|
|
|
Producer = new ProducerBuilder<AscCacheItem, T>(new ProducerConfig(ClientConfig))
|
|
|
|
|
.SetErrorHandler((_, e) => Log.Error(e))
|
|
|
|
|
.SetKeySerializer(KeySerializer)
|
|
|
|
|
.SetValueSerializer(ValueSerializer)
|
|
|
|
|
.Build();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
var channelName = GetChannelName(cacheNotifyAction);
|
|
|
|
|
|
|
|
|
|
if (Actions.TryGetValue(channelName, out var onchange))
|
|
|
|
|
{
|
|
|
|
|
onchange(obj);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
var message = new Message<AscCacheItem, T>
|
|
|
|
|
{
|
|
|
|
|
Value = obj,
|
|
|
|
|
Key = new AscCacheItem
|
|
|
|
|
{
|
2022-02-04 13:04:52 +00:00
|
|
|
|
Id = Key.ToString()
|
2021-10-21 17:59:05 +00:00
|
|
|
|
}
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
await Producer.ProduceAsync(channelName, message);
|
|
|
|
|
}
|
|
|
|
|
catch (ProduceException<Null, string> e)
|
|
|
|
|
{
|
|
|
|
|
Log.Error(e);
|
|
|
|
|
}
|
|
|
|
|
catch (Exception e)
|
|
|
|
|
{
|
|
|
|
|
Log.Error(e);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2019-07-19 13:10:15 +00:00
|
|
|
|
public void Subscribe(Action<T> onchange, CacheNotifyAction cacheNotifyAction)
|
|
|
|
|
{
|
2019-08-28 11:14:09 +00:00
|
|
|
|
var channelName = GetChannelName(cacheNotifyAction);
|
2022-01-13 11:24:14 +00:00
|
|
|
|
|
2019-08-28 11:14:09 +00:00
|
|
|
|
Cts[channelName] = new CancellationTokenSource();
|
|
|
|
|
Actions[channelName] = onchange;
|
2019-07-23 07:45:54 +00:00
|
|
|
|
|
2019-07-19 13:10:15 +00:00
|
|
|
|
void action()
|
|
|
|
|
{
|
|
|
|
|
var conf = new ConsumerConfig(ClientConfig)
|
|
|
|
|
{
|
2019-08-27 14:57:06 +00:00
|
|
|
|
GroupId = Guid.NewGuid().ToString()
|
2019-07-19 13:10:15 +00:00
|
|
|
|
};
|
|
|
|
|
|
2021-08-11 18:15:37 +00:00
|
|
|
|
|
|
|
|
|
using (var adminClient = new AdminClientBuilder(AdminClientConfig)
|
|
|
|
|
.SetErrorHandler((_, e) => Log.Error(e))
|
|
|
|
|
.Build())
|
|
|
|
|
{
|
2021-08-13 14:05:11 +00:00
|
|
|
|
try
|
|
|
|
|
{
|
|
|
|
|
//TODO: must add checking exist
|
2021-08-11 18:15:37 +00:00
|
|
|
|
adminClient.CreateTopicsAsync(
|
2021-08-13 14:05:11 +00:00
|
|
|
|
new TopicSpecification[]
|
2021-08-11 18:15:37 +00:00
|
|
|
|
{
|
2021-08-13 14:05:11 +00:00
|
|
|
|
new TopicSpecification
|
|
|
|
|
{
|
2021-08-13 08:09:01 +00:00
|
|
|
|
Name = channelName,
|
|
|
|
|
NumPartitions = 1,
|
|
|
|
|
ReplicationFactor = 1
|
2021-08-13 14:05:11 +00:00
|
|
|
|
}
|
2021-08-11 18:15:37 +00:00
|
|
|
|
}).Wait();
|
2021-08-13 14:05:11 +00:00
|
|
|
|
}
|
2022-01-13 11:24:14 +00:00
|
|
|
|
catch (AggregateException)
|
2021-08-13 14:05:11 +00:00
|
|
|
|
{
|
|
|
|
|
|
|
|
|
|
}
|
2021-08-11 18:15:37 +00:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
2019-08-28 11:14:09 +00:00
|
|
|
|
using var c = new ConsumerBuilder<AscCacheItem, T>(conf)
|
2019-07-19 13:10:15 +00:00
|
|
|
|
.SetErrorHandler((_, e) => Log.Error(e))
|
2019-08-28 11:14:09 +00:00
|
|
|
|
.SetKeyDeserializer(KeyDeserializer)
|
|
|
|
|
.SetValueDeserializer(ValueDeserializer)
|
2019-07-19 13:10:15 +00:00
|
|
|
|
.Build();
|
|
|
|
|
|
2019-08-28 11:14:09 +00:00
|
|
|
|
c.Assign(new TopicPartition(channelName, new Partition()));
|
2019-07-19 13:10:15 +00:00
|
|
|
|
|
|
|
|
|
try
|
|
|
|
|
{
|
|
|
|
|
while (true)
|
|
|
|
|
{
|
|
|
|
|
try
|
|
|
|
|
{
|
2019-08-28 11:14:09 +00:00
|
|
|
|
var cr = c.Consume(Cts[channelName].Token);
|
2021-12-23 15:29:35 +00:00
|
|
|
|
if (cr != null && cr.Message != null && cr.Message.Value != null && !(new Guid(cr.Message.Key.Id)).Equals(Key) && Actions.TryGetValue(channelName, out var act))
|
2019-07-19 13:10:15 +00:00
|
|
|
|
{
|
2019-09-26 11:53:54 +00:00
|
|
|
|
try
|
|
|
|
|
{
|
2020-10-12 13:52:31 +00:00
|
|
|
|
act(cr.Message.Value);
|
2019-09-26 11:53:54 +00:00
|
|
|
|
}
|
|
|
|
|
catch (Exception e)
|
|
|
|
|
{
|
|
|
|
|
Log.Error("Kafka onmessage", e);
|
|
|
|
|
}
|
2019-07-19 13:10:15 +00:00
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
catch (ConsumeException e)
|
|
|
|
|
{
|
|
|
|
|
Log.Error(e);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
catch (OperationCanceledException)
|
|
|
|
|
{
|
|
|
|
|
c.Close();
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
var task = new Task(action, TaskCreationOptions.LongRunning);
|
|
|
|
|
task.Start();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private string GetChannelName(CacheNotifyAction cacheNotifyAction)
|
|
|
|
|
{
|
2022-01-24 10:34:44 +00:00
|
|
|
|
return $"ascchannel{cacheNotifyAction}{typeof(T).FullName}".ToLower();
|
2019-07-19 13:10:15 +00:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public void Unsubscribe(CacheNotifyAction action)
|
|
|
|
|
{
|
2020-10-12 19:39:23 +00:00
|
|
|
|
Cts.TryGetValue(GetChannelName(action), out var source);
|
2019-08-15 12:04:42 +00:00
|
|
|
|
if (source != null)
|
2019-07-23 07:45:54 +00:00
|
|
|
|
{
|
|
|
|
|
source.Cancel();
|
|
|
|
|
}
|
2019-07-19 13:10:15 +00:00
|
|
|
|
}
|
2019-08-27 14:57:06 +00:00
|
|
|
|
|
|
|
|
|
private bool disposedValue = false; // To detect redundant calls
|
|
|
|
|
|
|
|
|
|
protected virtual void Dispose(bool disposing)
|
|
|
|
|
{
|
|
|
|
|
if (!disposedValue)
|
|
|
|
|
{
|
2019-09-19 12:48:37 +00:00
|
|
|
|
if (disposing && Producer != null)
|
2019-08-27 14:57:06 +00:00
|
|
|
|
{
|
|
|
|
|
Producer.Dispose();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
disposedValue = true;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
~KafkaCache()
|
|
|
|
|
{
|
|
|
|
|
Dispose(false);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public void Dispose()
|
|
|
|
|
{
|
|
|
|
|
Dispose(true);
|
|
|
|
|
GC.SuppressFinalize(this);
|
|
|
|
|
}
|
2019-07-19 13:10:15 +00:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public class KafkaSettings
|
|
|
|
|
{
|
|
|
|
|
public string BootstrapServers { get; set; }
|
|
|
|
|
}
|
2019-08-27 14:57:06 +00:00
|
|
|
|
}
|