2019-07-19 13:10:15 +00:00
|
|
|
|
using System;
|
2019-07-23 07:45:54 +00:00
|
|
|
|
using System.Collections.Concurrent;
|
2019-07-19 13:10:15 +00:00
|
|
|
|
using System.Collections.Generic;
|
|
|
|
|
using System.Threading;
|
|
|
|
|
using System.Threading.Tasks;
|
|
|
|
|
|
|
|
|
|
using ASC.Common.Logging;
|
|
|
|
|
using ASC.Common.Utils;
|
|
|
|
|
using Confluent.Kafka;
|
|
|
|
|
|
|
|
|
|
using Google.Protobuf;
|
|
|
|
|
|
|
|
|
|
namespace ASC.Common.Caching
|
|
|
|
|
{
|
2019-08-27 14:57:06 +00:00
|
|
|
|
public class KafkaCache<T> : IDisposable, ICacheNotify<T> where T : IMessage<T>, new()
|
2019-07-19 13:10:15 +00:00
|
|
|
|
{
|
|
|
|
|
private ClientConfig ClientConfig { get; set; }
|
|
|
|
|
private ILog Log { get; set; }
|
2019-08-27 14:57:06 +00:00
|
|
|
|
private ConcurrentDictionary<string, CancellationTokenSource> Cts { get; set; }
|
2019-07-19 13:10:15 +00:00
|
|
|
|
private MemoryCacheNotify<T> MemoryCacheNotify { get; set; }
|
2019-08-27 14:57:06 +00:00
|
|
|
|
private string ChannelName { get; } = $"ascchannel{typeof(T).Name}";
|
|
|
|
|
private ProtobufSerializer<T> Serializer { get; } = new ProtobufSerializer<T>();
|
|
|
|
|
private ProtobufDeserializer<T> Deserializer { get; } = new ProtobufDeserializer<T>();
|
|
|
|
|
|
|
|
|
|
private IProducer<Null, T> Producer { get; }
|
2019-07-19 13:10:15 +00:00
|
|
|
|
public KafkaCache()
|
|
|
|
|
{
|
|
|
|
|
Log = LogManager.GetLogger("ASC");
|
2019-08-27 14:57:06 +00:00
|
|
|
|
Cts = new ConcurrentDictionary<string, CancellationTokenSource>();
|
2019-07-19 13:10:15 +00:00
|
|
|
|
|
|
|
|
|
var settings = ConfigurationManager.GetSetting<KafkaSettings>("kafka");
|
|
|
|
|
if (settings != null && !string.IsNullOrEmpty(settings.BootstrapServers))
|
|
|
|
|
{
|
|
|
|
|
ClientConfig = new ClientConfig { BootstrapServers = settings.BootstrapServers };
|
2019-08-27 14:57:06 +00:00
|
|
|
|
var config = new ProducerConfig(ClientConfig);
|
|
|
|
|
Producer = new ProducerBuilder<Null, T>(config)
|
|
|
|
|
.SetErrorHandler((_, e) => Log.Error(e))
|
|
|
|
|
.SetValueSerializer(Serializer)
|
|
|
|
|
.Build();
|
2019-07-19 13:10:15 +00:00
|
|
|
|
}
|
|
|
|
|
else
|
|
|
|
|
{
|
|
|
|
|
MemoryCacheNotify = new MemoryCacheNotify<T>();
|
|
|
|
|
}
|
2019-08-27 14:57:06 +00:00
|
|
|
|
|
2019-07-19 13:10:15 +00:00
|
|
|
|
}
|
|
|
|
|
|
2019-08-27 14:57:06 +00:00
|
|
|
|
public void Publish(T obj, CacheNotifyAction cacheNotifyAction)
|
2019-07-19 13:10:15 +00:00
|
|
|
|
{
|
2019-08-15 12:04:42 +00:00
|
|
|
|
if (ClientConfig == null)
|
2019-07-19 13:10:15 +00:00
|
|
|
|
{
|
|
|
|
|
MemoryCacheNotify.Publish(obj, cacheNotifyAction);
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
try
|
|
|
|
|
{
|
2019-08-27 14:57:06 +00:00
|
|
|
|
Producer.ProduceAsync(GetChannelName(cacheNotifyAction), new Message<Null, T>() { Value = obj });
|
2019-07-19 13:10:15 +00:00
|
|
|
|
}
|
|
|
|
|
catch (ProduceException<Null, string> e)
|
|
|
|
|
{
|
|
|
|
|
Log.Error(e);
|
|
|
|
|
}
|
|
|
|
|
catch (Exception e)
|
|
|
|
|
{
|
|
|
|
|
Log.Error(e);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public void Subscribe(Action<T> onchange, CacheNotifyAction cacheNotifyAction)
|
|
|
|
|
{
|
|
|
|
|
if (ClientConfig == null)
|
|
|
|
|
{
|
|
|
|
|
MemoryCacheNotify.Subscribe(onchange, cacheNotifyAction);
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
2019-08-27 14:57:06 +00:00
|
|
|
|
Cts[GetChannelName(cacheNotifyAction)] = new CancellationTokenSource();
|
2019-07-23 07:45:54 +00:00
|
|
|
|
|
2019-07-19 13:10:15 +00:00
|
|
|
|
void action()
|
|
|
|
|
{
|
|
|
|
|
var conf = new ConsumerConfig(ClientConfig)
|
|
|
|
|
{
|
2019-08-27 14:57:06 +00:00
|
|
|
|
GroupId = Guid.NewGuid().ToString()
|
2019-07-19 13:10:15 +00:00
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
using var c = new ConsumerBuilder<Ignore, T>(conf)
|
|
|
|
|
.SetErrorHandler((_, e) => Log.Error(e))
|
2019-08-27 14:57:06 +00:00
|
|
|
|
.SetValueDeserializer(Deserializer)
|
2019-07-19 13:10:15 +00:00
|
|
|
|
.Build();
|
|
|
|
|
|
2019-07-23 07:45:54 +00:00
|
|
|
|
c.Assign(new TopicPartition(GetChannelName(cacheNotifyAction), new Partition()));
|
2019-07-19 13:10:15 +00:00
|
|
|
|
|
|
|
|
|
try
|
|
|
|
|
{
|
|
|
|
|
while (true)
|
|
|
|
|
{
|
|
|
|
|
try
|
|
|
|
|
{
|
2019-08-27 14:57:06 +00:00
|
|
|
|
var cr = c.Consume(Cts[GetChannelName(cacheNotifyAction)].Token);
|
2019-07-19 13:10:15 +00:00
|
|
|
|
if (cr != null && cr.Value != null)
|
|
|
|
|
{
|
|
|
|
|
onchange(cr.Value);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
catch (ConsumeException e)
|
|
|
|
|
{
|
|
|
|
|
Log.Error(e);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
catch (OperationCanceledException)
|
|
|
|
|
{
|
|
|
|
|
c.Close();
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
var task = new Task(action, TaskCreationOptions.LongRunning);
|
|
|
|
|
task.Start();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private string GetChannelName(CacheNotifyAction cacheNotifyAction)
|
|
|
|
|
{
|
2019-08-27 14:57:06 +00:00
|
|
|
|
return $"{ChannelName}{cacheNotifyAction}";
|
2019-07-19 13:10:15 +00:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public void Unsubscribe(CacheNotifyAction action)
|
|
|
|
|
{
|
2019-08-27 14:57:06 +00:00
|
|
|
|
Cts.TryGetValue(GetChannelName(action), out var source);
|
2019-08-15 12:04:42 +00:00
|
|
|
|
if (source != null)
|
2019-07-23 07:45:54 +00:00
|
|
|
|
{
|
|
|
|
|
source.Cancel();
|
|
|
|
|
}
|
2019-07-19 13:10:15 +00:00
|
|
|
|
}
|
2019-08-27 14:57:06 +00:00
|
|
|
|
|
|
|
|
|
private bool disposedValue = false; // To detect redundant calls
|
|
|
|
|
|
|
|
|
|
protected virtual void Dispose(bool disposing)
|
|
|
|
|
{
|
|
|
|
|
if (!disposedValue)
|
|
|
|
|
{
|
|
|
|
|
if (disposing)
|
|
|
|
|
{
|
|
|
|
|
Producer.Dispose();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
disposedValue = true;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
~KafkaCache()
|
|
|
|
|
{
|
|
|
|
|
Dispose(false);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public void Dispose()
|
|
|
|
|
{
|
|
|
|
|
Dispose(true);
|
|
|
|
|
GC.SuppressFinalize(this);
|
|
|
|
|
}
|
2019-07-19 13:10:15 +00:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public class KafkaSettings
|
|
|
|
|
{
|
|
|
|
|
public string BootstrapServers { get; set; }
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public class MemoryCacheNotify<T> : ICacheNotify<T> where T : IMessage<T>, new()
|
|
|
|
|
{
|
|
|
|
|
private readonly Dictionary<string, List<Action<T>>> actions = new Dictionary<string, List<Action<T>>>();
|
|
|
|
|
|
|
|
|
|
public void Publish(T obj, CacheNotifyAction action)
|
2019-08-27 14:57:06 +00:00
|
|
|
|
{
|
|
|
|
|
if (actions.TryGetValue(GetKey(action), out var onchange) && onchange != null)
|
|
|
|
|
{
|
2019-08-15 12:04:42 +00:00
|
|
|
|
foreach (var a in onchange)
|
2019-07-19 13:10:15 +00:00
|
|
|
|
{
|
|
|
|
|
a(obj);
|
2019-08-27 14:57:06 +00:00
|
|
|
|
}
|
2019-07-19 13:10:15 +00:00
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public void Subscribe(Action<T> onchange, CacheNotifyAction notifyAction)
|
|
|
|
|
{
|
2019-08-27 14:57:06 +00:00
|
|
|
|
if (onchange != null)
|
2019-07-19 13:10:15 +00:00
|
|
|
|
{
|
|
|
|
|
var key = GetKey(notifyAction);
|
2019-08-27 14:57:06 +00:00
|
|
|
|
actions.TryAdd(key, new List<Action<T>>());
|
|
|
|
|
actions[key].Add(onchange);
|
2019-07-19 13:10:15 +00:00
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public void Unsubscribe(CacheNotifyAction action)
|
|
|
|
|
{
|
|
|
|
|
actions.Remove(GetKey(action));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private string GetKey(CacheNotifyAction cacheNotifyAction)
|
|
|
|
|
{
|
|
|
|
|
return $"{typeof(T).Name}{cacheNotifyAction}";
|
|
|
|
|
}
|
|
|
|
|
}
|
2019-08-27 14:57:06 +00:00
|
|
|
|
}
|