eventbus: added support ProtobufSerializer

This commit is contained in:
Alexey Bannov 2022-03-05 19:46:20 +03:00
parent a29cefb574
commit 47d43831ea
13 changed files with 196 additions and 63 deletions

View File

@ -71,6 +71,8 @@ namespace ASC.Api.Core.Extensions
var logger = sp.GetRequiredService<IOptionsMonitor<ILog>>(); var logger = sp.GetRequiredService<IOptionsMonitor<ILog>>();
var eventBusSubcriptionsManager = sp.GetRequiredService<IEventBusSubscriptionsManager>(); var eventBusSubcriptionsManager = sp.GetRequiredService<IEventBusSubscriptionsManager>();
var serializer = new ASC.EventBus.Serializers.ProtobufSerializer();
var subscriptionClientName = "asc_event_bus_default_queue"; var subscriptionClientName = "asc_event_bus_default_queue";
if (!string.IsNullOrEmpty(cfg["core:eventBus:subscriptionClientName"])) if (!string.IsNullOrEmpty(cfg["core:eventBus:subscriptionClientName"]))
@ -85,7 +87,7 @@ namespace ASC.Api.Core.Extensions
retryCount = int.Parse(cfg["core:eventBus:connectRetryCount"]); retryCount = int.Parse(cfg["core:eventBus:connectRetryCount"]);
} }
return new EventBusRabbitMQ(rabbitMQPersistentConnection, logger, iLifetimeScope, eventBusSubcriptionsManager, subscriptionClientName, retryCount); return new EventBusRabbitMQ(rabbitMQPersistentConnection, logger, iLifetimeScope, eventBusSubcriptionsManager, serializer, subscriptionClientName, retryCount);
}); });
} }
else else

View File

@ -22,6 +22,7 @@
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets> <IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
</PackageReference> </PackageReference>
<PackageReference Include="MySql.Data" Version="8.0.28" /> <PackageReference Include="MySql.Data" Version="8.0.28" />
<PackageReference Include="protobuf-net" Version="3.0.101" />
</ItemGroup> </ItemGroup>
<ItemGroup> <ItemGroup>

View File

@ -25,6 +25,7 @@
namespace ASC.Data.Backup.Contracts; namespace ASC.Data.Backup.Contracts;
public enum BackupStorageType public enum BackupStorageType
{ {
Documents = 0, Documents = 0,

View File

@ -83,4 +83,5 @@ global using MySql.Data.MySqlClient;
global using Newtonsoft.Json; global using Newtonsoft.Json;
global using Newtonsoft.Json.Linq; global using Newtonsoft.Json.Linq;
global using ASC.EventBus.Exceptions; global using ASC.EventBus.Exceptions;
global using ProtoBuf;

View File

@ -1,7 +1,13 @@
namespace ASC.Data.Backup.Core.IntegrationEvents.Events; namespace ASC.Data.Backup.Core.IntegrationEvents.Events;
[ProtoContract]
public record BackupRequestIntegrationEvent : IntegrationEvent public record BackupRequestIntegrationEvent : IntegrationEvent
{ {
private BackupRequestIntegrationEvent() :base()
{
}
public BackupRequestIntegrationEvent(BackupStorageType storageType, public BackupRequestIntegrationEvent(BackupStorageType storageType,
int tenantId, int tenantId,
Guid createBy, Guid createBy,
@ -19,11 +25,22 @@ public record BackupRequestIntegrationEvent : IntegrationEvent
StorageBasePath = storageBasePath; StorageBasePath = storageBasePath;
} }
[ProtoMember(1)]
public BackupStorageType StorageType { get; private init; } public BackupStorageType StorageType { get; private init; }
[ProtoMember(2)]
public Dictionary<string, string> StorageParams { get; private init; } public Dictionary<string, string> StorageParams { get; private init; }
[ProtoMember(3)]
public bool BackupMail { get; private init; } public bool BackupMail { get; private init; }
[ProtoMember(4)]
public bool IsScheduled { get; private init; } public bool IsScheduled { get; private init; }
[ProtoMember(5)]
public int BackupsStored { get; private init; } public int BackupsStored { get; private init; }
[ProtoMember(6)]
public string StorageBasePath { get; private init; } public string StorageBasePath { get; private init; }
} }

View File

@ -1,5 +1,6 @@
namespace ASC.Data.Backup.Core.IntegrationEvents.Events; namespace ASC.Data.Backup.Core.IntegrationEvents.Events;
[ProtoContract]
public record BackupRestoreRequestIntegrationEvent : IntegrationEvent public record BackupRestoreRequestIntegrationEvent : IntegrationEvent
{ {
public BackupRestoreRequestIntegrationEvent(BackupStorageType storageType, public BackupRestoreRequestIntegrationEvent(BackupStorageType storageType,
@ -16,10 +17,16 @@ public record BackupRestoreRequestIntegrationEvent : IntegrationEvent
BackupId = backupId; BackupId = backupId;
} }
[ProtoMember(1)]
public bool Notify { get; set; } public bool Notify { get; set; }
[ProtoMember(2)]
public string BackupId { get; set; } public string BackupId { get; set; }
[ProtoMember(3)]
public BackupStorageType StorageType { get; private init; } public BackupStorageType StorageType { get; private init; }
public Dictionary<string, string> StorageParams { get; private init; }
[ProtoMember(4)]
public Dictionary<string, string> StorageParams { get; private init; }
} }

View File

@ -1,4 +1,4 @@
<Project Sdk="Microsoft.NET.Sdk"> <Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup> <PropertyGroup>
<TargetFramework>net6.0</TargetFramework> <TargetFramework>net6.0</TargetFramework>

View File

@ -1,6 +1,7 @@
using System.Collections.Concurrent; using System.Collections.Concurrent;
using ASC.EventBus.Exceptions; using ASC.EventBus.Exceptions;
using ASC.EventBus.Serializers;
namespace ASC.EventBus.RabbitMQ; namespace ASC.EventBus.RabbitMQ;
@ -15,6 +16,7 @@ public class EventBusRabbitMQ : IEventBus, IDisposable
private readonly IEventBusSubscriptionsManager _subsManager; private readonly IEventBusSubscriptionsManager _subsManager;
private readonly ILifetimeScope _autofac; private readonly ILifetimeScope _autofac;
private readonly int _retryCount; private readonly int _retryCount;
private IIntegrationEventSerializer _serializer;
private string _consumerTag; private string _consumerTag;
private IModel _consumerChannel; private IModel _consumerChannel;
@ -23,8 +25,13 @@ public class EventBusRabbitMQ : IEventBus, IDisposable
private static ConcurrentQueue<Guid> _rejectedEvents; private static ConcurrentQueue<Guid> _rejectedEvents;
public EventBusRabbitMQ(IRabbitMQPersistentConnection persistentConnection, IOptionsMonitor<ILog> options, public EventBusRabbitMQ(IRabbitMQPersistentConnection persistentConnection,
ILifetimeScope autofac, IEventBusSubscriptionsManager subsManager, string queueName = null, int retryCount = 5) IOptionsMonitor<ILog> options,
ILifetimeScope autofac,
IEventBusSubscriptionsManager subsManager,
IIntegrationEventSerializer serializer,
string queueName = null,
int retryCount = 5)
{ {
_persistentConnection = persistentConnection ?? throw new ArgumentNullException(nameof(persistentConnection)); _persistentConnection = persistentConnection ?? throw new ArgumentNullException(nameof(persistentConnection));
_logger = options.CurrentValue ?? throw new ArgumentNullException(nameof(options.CurrentValue)); _logger = options.CurrentValue ?? throw new ArgumentNullException(nameof(options.CurrentValue));
@ -35,7 +42,7 @@ public class EventBusRabbitMQ : IEventBus, IDisposable
_autofac = autofac; _autofac = autofac;
_retryCount = retryCount; _retryCount = retryCount;
_subsManager.OnEventRemoved += SubsManager_OnEventRemoved; _subsManager.OnEventRemoved += SubsManager_OnEventRemoved;
_serializer = serializer;
_rejectedEvents = new ConcurrentQueue<Guid>(); _rejectedEvents = new ConcurrentQueue<Guid>();
} }
@ -84,12 +91,9 @@ public class EventBusRabbitMQ : IEventBus, IDisposable
_logger.TraceFormat("Declaring RabbitMQ exchange to publish event: {EventId}", @event.Id); _logger.TraceFormat("Declaring RabbitMQ exchange to publish event: {EventId}", @event.Id);
channel.ExchangeDeclare(exchange: EXCHANGE_NAME, type: "direct"); channel.ExchangeDeclare(exchange: EXCHANGE_NAME, type: "direct");
var body = JsonSerializer.SerializeToUtf8Bytes(@event, @event.GetType(), new JsonSerializerOptions var body = _serializer.Serialize(@event);
{
WriteIndented = true
});
policy.Execute(() => policy.Execute(() =>
{ {
var properties = channel.CreateBasicProperties(); var properties = channel.CreateBasicProperties();
@ -185,7 +189,7 @@ public class EventBusRabbitMQ : IEventBus, IDisposable
{ {
if (!String.IsNullOrEmpty(_consumerTag)) if (!String.IsNullOrEmpty(_consumerTag))
{ {
_logger.TraceFormat("Consumer tag {ConsumerTag} already exist. Cancelled BasicConsume again", _consumerTag ); _logger.TraceFormat("Consumer tag {ConsumerTag} already exist. Cancelled BasicConsume again", _consumerTag);
return; return;
} }
@ -208,8 +212,10 @@ public class EventBusRabbitMQ : IEventBus, IDisposable
private async Task Consumer_Received(object sender, BasicDeliverEventArgs eventArgs) private async Task Consumer_Received(object sender, BasicDeliverEventArgs eventArgs)
{ {
var eventName = eventArgs.RoutingKey; var eventName = eventArgs.RoutingKey;
var message = Encoding.UTF8.GetString(eventArgs.Body.Span);
var @event = GetEvent(eventName, eventArgs.Body.Span.ToArray());
var message = @event.ToString();
try try
{ {
if (message.ToLowerInvariant().Contains("throw-fake-exception")) if (message.ToLowerInvariant().Contains("throw-fake-exception"))
@ -217,9 +223,9 @@ public class EventBusRabbitMQ : IEventBus, IDisposable
throw new InvalidOperationException($"Fake exception requested: \"{message}\""); throw new InvalidOperationException($"Fake exception requested: \"{message}\"");
} }
await ProcessEvent(eventName, message); await ProcessEvent(eventName, @event);
_consumerChannel.BasicAck(eventArgs.DeliveryTag, multiple: false); _consumerChannel.BasicAck(eventArgs.DeliveryTag, multiple: false);
} }
catch (IntegrationEventRejectExeption ex) catch (IntegrationEventRejectExeption ex)
{ {
@ -264,13 +270,13 @@ public class EventBusRabbitMQ : IEventBus, IDisposable
channel.ExchangeDeclare(exchange: EXCHANGE_NAME, channel.ExchangeDeclare(exchange: EXCHANGE_NAME,
type: "direct"); type: "direct");
channel.ExchangeDeclare(exchange: DEAD_LETTER_EXCHANGE_NAME, channel.ExchangeDeclare(exchange: DEAD_LETTER_EXCHANGE_NAME,
type: "direct"); type: "direct");
channel.QueueDeclare(queue: _deadLetterQueueName, channel.QueueDeclare(queue: _deadLetterQueueName,
durable: true, durable: true,
exclusive: false, exclusive: false,
autoDelete: false, autoDelete: false,
arguments: null); arguments: null);
@ -291,54 +297,37 @@ public class EventBusRabbitMQ : IEventBus, IDisposable
_consumerChannel.Dispose(); _consumerChannel.Dispose();
_consumerChannel = CreateConsumerChannel(); _consumerChannel = CreateConsumerChannel();
_consumerTag = String.Empty; _consumerTag = String.Empty;
StartBasicConsume(); StartBasicConsume();
}; };
return channel; return channel;
} }
private IntegrationEvent GetEvent(string eventName, byte[] serializedMessage)
public void PreProcessEvent(string eventName, ref string message)
{
if (_rejectedEvents.Count == 0) return;
var eventType = _subsManager.GetEventTypeByName(eventName);
var integrationEvent = (IntegrationEvent)JsonSerializer.Deserialize(message, eventType, new JsonSerializerOptions() { PropertyNameCaseInsensitive = true });
if (_rejectedEvents.TryPeek(out Guid result) && result.Equals(integrationEvent.Id))
{
integrationEvent.Redelivered = true;
}
message = Encoding.UTF8.GetString(JsonSerializer.SerializeToUtf8Bytes(integrationEvent, eventType, new JsonSerializerOptions
{
WriteIndented = true
}));
}
private IntegrationEvent PreProcessEvent(string eventName, string message)
{ {
var eventType = _subsManager.GetEventTypeByName(eventName); var eventType = _subsManager.GetEventTypeByName(eventName);
var integrationEvent = (IntegrationEvent)JsonSerializer.Deserialize(message, eventType, new JsonSerializerOptions() { PropertyNameCaseInsensitive = true }); var integrationEvent = (IntegrationEvent)_serializer.Deserialize(serializedMessage, eventType);
if (_rejectedEvents.Count == 0) return integrationEvent;
if (_rejectedEvents.TryPeek(out Guid result) && result.Equals(integrationEvent.Id))
{
integrationEvent.Redelivered = true;
}
return integrationEvent; return integrationEvent;
} }
private async Task ProcessEvent(string eventName, string message) private void PreProcessEvent(IntegrationEvent @event)
{
if (_rejectedEvents.Count == 0) return;
if (_rejectedEvents.TryPeek(out Guid result) && result.Equals(@event.Id))
{
@event.Redelivered = true;
}
}
private async Task ProcessEvent(string eventName, IntegrationEvent @event)
{ {
_logger.TraceFormat("Processing RabbitMQ event: {EventName}", eventName); _logger.TraceFormat("Processing RabbitMQ event: {EventName}", eventName);
var @event = PreProcessEvent(eventName, message); PreProcessEvent(@event);
if (_subsManager.HasSubscriptionsForEvent(eventName)) if (_subsManager.HasSubscriptionsForEvent(eventName))
{ {

View File

@ -1,4 +1,4 @@
<Project Sdk="Microsoft.NET.Sdk"> <Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup> <PropertyGroup>
<TargetFramework>net6.0</TargetFramework> <TargetFramework>net6.0</TargetFramework>
@ -7,4 +7,9 @@
<ImplicitUsings>enable</ImplicitUsings> <ImplicitUsings>enable</ImplicitUsings>
</PropertyGroup> </PropertyGroup>
<ItemGroup>
<PackageReference Include="protobuf-net" Version="3.0.101" />
<PackageReference Include="System.ServiceModel.Primitives" Version="4.9.0" />
</ItemGroup>
</Project> </Project>

View File

@ -1,13 +1,13 @@
namespace ASC.EventBus.Events; namespace ASC.EventBus.Events;
[ProtoContract]
public record IntegrationEvent public record IntegrationEvent
{ {
private IntegrationEvent() protected IntegrationEvent()
{ {
} }
[JsonConstructor]
public IntegrationEvent(Guid createBy, int tenantId) public IntegrationEvent(Guid createBy, int tenantId)
{ {
Id = Guid.NewGuid(); Id = Guid.NewGuid();
@ -16,18 +16,19 @@ public record IntegrationEvent
TenantId = tenantId; TenantId = tenantId;
} }
[JsonInclude] [ProtoMember(1)]
public Guid Id { get; private init; } public Guid Id { get; private init; }
[JsonInclude] [ProtoMember(2)]
public DateTime CreateOn { get; private init; } public DateTime CreateOn { get; private init; }
[JsonInclude] [ProtoMember(3)]
public Guid CreateBy { get; private init; } public Guid CreateBy { get; private init; }
[JsonInclude] [ProtoMember(4)]
public bool Redelivered { get; set; }
[ProtoMember(5)]
public int TenantId { get; private init; } public int TenantId { get; private init; }
[JsonInclude]
public bool Redelivered { get; set; }
} }

View File

@ -8,3 +8,6 @@ global using ASC.EventBus.Abstractions;
global using ASC.EventBus.Events; global using ASC.EventBus.Events;
global using static ASC.EventBus.InMemoryEventBusSubscriptionsManager; global using static ASC.EventBus.InMemoryEventBusSubscriptionsManager;
global using ProtoBuf;
global using System.Security.Cryptography;
global using ProtoBuf.Meta;

View File

@ -0,0 +1,34 @@
namespace ASC.EventBus.Serializers;
/// <summary>
/// Contract for Serializer implementation
/// </summary>
public interface IIntegrationEventSerializer
{
/// <summary>
/// Serializes the specified item.
/// </summary>
/// <param name="item">The item.</param>
/// <returns>Return the serialized object</returns>
byte[] Serialize<T>(T? item);
/// <summary>
/// Deserializes the specified bytes.
/// </summary>
/// <typeparam name="T">The type of the expected object.</typeparam>
/// <param name="serializedObject">The serialized object.</param>
/// <returns>
/// The instance of the specified Item
/// </returns>
T Deserialize<T>(byte[] serializedObject);
/// <summary>
/// Deserializes the specified bytes.
/// </summary>
/// <param name="serializedObject">The serialized object.</param>
/// <param name="returnType">The return type.</param>
/// <returns>
/// The instance of the specified Item
/// </returns>
object Deserialize(byte[] serializedObject, Type returnType);
}

View File

@ -0,0 +1,72 @@
namespace ASC.EventBus.Serializers;
public class ProtobufSerializer : IIntegrationEventSerializer
{
private SynchronizedCollection<string> _processedProtoTypes;
private readonly int _baseFieldNumber;
public ProtobufSerializer()
{
_processedProtoTypes = new SynchronizedCollection<string>();
_baseFieldNumber = 100;
}
/// <inheritdoc/>
public byte[] Serialize<T>(T? item)
{
if (item == null)
return Array.Empty<byte>();
ProcessProtoType(item.GetType());
using var ms = new MemoryStream();
Serializer.Serialize(ms, item);
return ms.ToArray();
}
/// <inheritdoc/>
public T Deserialize<T>(byte[] serializedObject)
{
// ProcessProtoType(returnType);
using var ms = new MemoryStream(serializedObject);
return Serializer.Deserialize<T>(ms);
}
/// <inheritdoc/>
public object Deserialize(byte[] serializedObject, Type returnType)
{
ProcessProtoType(returnType);
using var ms = new MemoryStream(serializedObject);
return Serializer.Deserialize(returnType, ms);
}
private void ProcessProtoType(Type protoType)
{
if (_processedProtoTypes.Contains(protoType.FullName)) return;
if (protoType.BaseType == null && protoType.BaseType == typeof(object)) return;
var itemType = RuntimeTypeModel.Default[protoType];
var baseType = RuntimeTypeModel.Default[protoType.BaseType];
if (!baseType.GetSubtypes().Any(s => s.DerivedType == itemType))
{
baseType.AddSubType(_baseFieldNumber, protoType);
//foreach (var field in baseType.GetFields())
//{
// myType.Add(field.FieldNumber + _baseTypeIncrement, field.Name);
//}
}
_processedProtoTypes.Add(protoType.FullName);
}
}