Kafka: added key to message

This commit is contained in:
pavelbannov 2019-08-28 14:14:09 +03:00
parent ae8bf2bc20
commit 795dc8665a
3 changed files with 45 additions and 19 deletions

View File

@ -28,8 +28,9 @@ using System;
using System.Collections.Generic;
using System.Linq;
using System.Runtime.Caching;
using System.Text.RegularExpressions;
using System.Text.RegularExpressions;
using Google.Protobuf;
namespace ASC.Common.Caching
{
public class AscCache : ICache
@ -143,7 +144,7 @@ namespace ASC.Common.Caching
public void ClearCache()
{
KafkaNotify.Publish(new AscCacheItem() { Id = Guid.NewGuid().ToString() }, CacheNotifyAction.Any);
KafkaNotify.Publish(new AscCacheItem() { Id = ByteString.CopyFrom(Guid.NewGuid().ToByteArray()) }, CacheNotifyAction.Any);
}
private MemoryCache GetCache()

View File

@ -17,25 +17,32 @@ namespace ASC.Common.Caching
private ClientConfig ClientConfig { get; set; }
private ILog Log { get; set; }
private ConcurrentDictionary<string, CancellationTokenSource> Cts { get; set; }
private ConcurrentDictionary<string, Action<T>> Actions { get; set; }
private MemoryCacheNotify<T> MemoryCacheNotify { get; set; }
private string ChannelName { get; } = $"ascchannel{typeof(T).Name}";
private ProtobufSerializer<T> Serializer { get; } = new ProtobufSerializer<T>();
private ProtobufDeserializer<T> Deserializer { get; } = new ProtobufDeserializer<T>();
private IProducer<Null, T> Producer { get; }
private ProtobufSerializer<T> ValueSerializer { get; } = new ProtobufSerializer<T>();
private ProtobufDeserializer<T> ValueDeserializer { get; } = new ProtobufDeserializer<T>();
private ProtobufSerializer<AscCacheItem> KeySerializer { get; } = new ProtobufSerializer<AscCacheItem>();
private ProtobufDeserializer<AscCacheItem> KeyDeserializer { get; } = new ProtobufDeserializer<AscCacheItem>();
private IProducer<AscCacheItem, T> Producer { get; }
private Guid Key { get; set; }
public KafkaCache()
{
Log = LogManager.GetLogger("ASC");
Cts = new ConcurrentDictionary<string, CancellationTokenSource>();
Actions = new ConcurrentDictionary<string, Action<T>>();
Key = Guid.NewGuid();
var settings = ConfigurationManager.GetSetting<KafkaSettings>("kafka");
if (settings != null && !string.IsNullOrEmpty(settings.BootstrapServers))
{
ClientConfig = new ClientConfig { BootstrapServers = settings.BootstrapServers };
var config = new ProducerConfig(ClientConfig);
Producer = new ProducerBuilder<Null, T>(config)
Producer = new ProducerBuilder<AscCacheItem, T>(config)
.SetErrorHandler((_, e) => Log.Error(e))
.SetValueSerializer(Serializer)
.SetKeySerializer(KeySerializer)
.SetValueSerializer(ValueSerializer)
.Build();
}
else
@ -55,7 +62,23 @@ namespace ASC.Common.Caching
try
{
Producer.ProduceAsync(GetChannelName(cacheNotifyAction), new Message<Null, T>() { Value = obj });
var channelName = GetChannelName(cacheNotifyAction);
if (Actions.TryGetValue(channelName, out var onchange))
{
onchange(obj);
}
var message = new Message<AscCacheItem, T>
{
Value = obj,
Key = new AscCacheItem
{
Id = ByteString.CopyFrom(Key.ToByteArray())
}
};
Producer.ProduceAsync(channelName, message);
}
catch (ProduceException<Null, string> e)
{
@ -74,8 +97,9 @@ namespace ASC.Common.Caching
MemoryCacheNotify.Subscribe(onchange, cacheNotifyAction);
return;
}
Cts[GetChannelName(cacheNotifyAction)] = new CancellationTokenSource();
var channelName = GetChannelName(cacheNotifyAction);
Cts[channelName] = new CancellationTokenSource();
Actions[channelName] = onchange;
void action()
{
@ -84,12 +108,13 @@ namespace ASC.Common.Caching
GroupId = Guid.NewGuid().ToString()
};
using var c = new ConsumerBuilder<Ignore, T>(conf)
using var c = new ConsumerBuilder<AscCacheItem, T>(conf)
.SetErrorHandler((_, e) => Log.Error(e))
.SetValueDeserializer(Deserializer)
.SetKeyDeserializer(KeyDeserializer)
.SetValueDeserializer(ValueDeserializer)
.Build();
c.Assign(new TopicPartition(GetChannelName(cacheNotifyAction), new Partition()));
c.Assign(new TopicPartition(channelName, new Partition()));
try
{
@ -97,10 +122,10 @@ namespace ASC.Common.Caching
{
try
{
var cr = c.Consume(Cts[GetChannelName(cacheNotifyAction)].Token);
if (cr != null && cr.Value != null)
var cr = c.Consume(Cts[channelName].Token);
if (cr != null && cr.Value != null && !(new Guid(cr.Key.Id.ToByteArray())).Equals(Key) && Actions.TryGetValue(channelName, out var act))
{
onchange(cr.Value);
act(cr.Value);
}
}
catch (ConsumeException e)

View File

@ -3,5 +3,5 @@
package ASC.Common;
message AscCacheItem {
string Id = 1;
bytes Id = 1;
}