EventBus: refactoring

This commit is contained in:
Alexey Bannov 2022-04-01 19:12:53 +03:00
parent abce571695
commit 2f06b0a013
9 changed files with 53 additions and 251 deletions

View File

@ -1,11 +1,7 @@

Microsoft Visual Studio Solution File, Format Version 12.00
# Visual Studio Version 17
<<<<<<< HEAD
VisualStudioVersion = 17.0.32112.339
=======
VisualStudioVersion = 17.0.31903.59
>>>>>>> feature/backend-refactor
MinimumVisualStudioVersion = 10.0.40219.1
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "ASC.Web.Studio", "web\ASC.Web.Studio\ASC.Web.Studio.csproj", "{90183112-BCD6-4E16-9CA2-12231930DAB4}"
EndProject
@ -107,8 +103,6 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "ASC.EventBus.RabbitMQ", "co
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "ASC.EventBus.Extensions.Logger", "common\ASC.EventBus.Extensions.Logger\ASC.EventBus.Extensions.Logger.csproj", "{ED8CEB38-7C95-43A8-B208-9C9828654AC1}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "ASC.EventBus.MemoryCache", "common\ASC.EventBus.MemoryCache\ASC.EventBus.MemoryCache.csproj", "{CC352535-5418-437E-9081-5FF47FD140F7}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
@ -299,10 +293,6 @@ Global
{ED8CEB38-7C95-43A8-B208-9C9828654AC1}.Debug|Any CPU.Build.0 = Debug|Any CPU
{ED8CEB38-7C95-43A8-B208-9C9828654AC1}.Release|Any CPU.ActiveCfg = Release|Any CPU
{ED8CEB38-7C95-43A8-B208-9C9828654AC1}.Release|Any CPU.Build.0 = Release|Any CPU
{CC352535-5418-437E-9081-5FF47FD140F7}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{CC352535-5418-437E-9081-5FF47FD140F7}.Debug|Any CPU.Build.0 = Debug|Any CPU
{CC352535-5418-437E-9081-5FF47FD140F7}.Release|Any CPU.ActiveCfg = Release|Any CPU
{CC352535-5418-437E-9081-5FF47FD140F7}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE

View File

@ -9,7 +9,6 @@
"common\\ASC.Data.Reassigns\\ASC.Data.Reassigns.csproj",
"common\\ASC.Data.Storage\\ASC.Data.Storage.csproj",
"common\\ASC.EventBus.Extensions.Logger\\ASC.EventBus.Extensions.Logger.csproj",
"common\\ASC.EventBus.MemoryCache\\ASC.EventBus.MemoryCache.csproj",
"common\\ASC.EventBus.RabbitMQ\\ASC.EventBus.RabbitMQ.csproj",
"common\\ASC.EventBus\\ASC.EventBus.csproj",
"common\\ASC.FederatedLogin\\ASC.FederatedLogin.csproj",

View File

@ -29,7 +29,6 @@
<ItemGroup>
<ProjectReference Include="..\..\web\ASC.Web.Core\ASC.Web.Core.csproj" />
<ProjectReference Include="..\ASC.EventBus.Extensions.Logger\ASC.EventBus.Extensions.Logger.csproj" />
<ProjectReference Include="..\ASC.EventBus.MemoryCache\ASC.EventBus.MemoryCache.csproj" />
<ProjectReference Include="..\ASC.EventBus.RabbitMQ\ASC.EventBus.RabbitMQ.csproj" />
<ProjectReference Include="..\ASC.EventBus\ASC.EventBus.csproj" />
<ProjectReference Include="..\ASC.Webhooks.Core\ASC.Webhooks.Core.csproj" />

View File

@ -98,38 +98,10 @@ public abstract class BaseStartup
DIHelper.TryAdd<CookieAuthHandler>();
DIHelper.TryAdd<WebhooksGlobalFilterAttribute>();
var redisConfiguration = Configuration.GetSection("Redis").Get<RedisConfiguration>();
var kafkaConfiguration = Configuration.GetSection("kafka").Get<KafkaSettings>();
var rabbitMQConfiguration = Configuration.GetSection("RabbitMQ").Get<RabbitMQSettings>();
services.AddDistributedCache(Configuration);
services.AddEventBus(Configuration);
services.AddStackExchangeRedisCache(config =>
{
config.ConfigurationOptions = redisConfiguration.ConfigurationOptions;
});
services.AddDistributedTaskQueue();
if (kafkaConfiguration != null)
{
DIHelper.TryAdd(typeof(ICacheNotify<>), typeof(KafkaCacheNotify<>));
}
else if (rabbitMQConfiguration != null)
{
DIHelper.TryAdd(typeof(ICacheNotify<>), typeof(RabbitMQCache<>));
}
else if (redisConfiguration != null)
{
DIHelper.TryAdd(typeof(ICacheNotify<>), typeof(RedisCacheNotify<>));
services.AddStackExchangeRedisExtensions<NewtonsoftSerializer>(redisConfiguration);
}
else
{
DIHelper.TryAdd(typeof(ICacheNotify<>), typeof(MemoryCacheNotify<>));
}
services.AddCacheNotify(Configuration);
DIHelper.TryAdd(typeof(IWebhookPublisher), typeof(WebhookPublisher));

View File

@ -61,30 +61,13 @@ public static class HostBuilderExtension
{
services.AddMemoryCache();
services.AddDistributedCache(hostContext.Configuration);
services.AddEventBus(hostContext.Configuration);
services.AddDistributedTaskQueue();
services.AddCacheNotify(hostContext.Configuration);
var diHelper = new DIHelper(services);
var redisConfiguration = hostContext.Configuration.GetSection("Redis").Get<RedisConfiguration>();
var kafkaConfiguration = hostContext.Configuration.GetSection("kafka").Get<KafkaSettings>();
var rabbitMQConfiguration = hostContext.Configuration.GetSection("RabbitMQ").Get<RabbitMQSettings>();
if (kafkaConfiguration != null)
{
diHelper.TryAdd(typeof(ICacheNotify<>), typeof(KafkaCacheNotify<>));
}
else if (rabbitMQConfiguration != null)
{
diHelper.TryAdd(typeof(ICacheNotify<>), typeof(RabbitMQCache<>));
}
else if (redisConfiguration != null)
{
diHelper.TryAdd(typeof(ICacheNotify<>), typeof(RedisCacheNotify<>));
services.AddStackExchangeRedisExtensions<NewtonsoftSerializer>(redisConfiguration);
}
else
{
diHelper.TryAdd(typeof(ICacheNotify<>), typeof(MemoryCacheNotify<>));
}
configureDelegate?.Invoke(hostContext, services, diHelper);
});

View File

@ -1,6 +1,49 @@
namespace ASC.Api.Core.Extensions;
public static class ServiceCollectionExtension
{
public static void AddCacheNotify(this IServiceCollection services, IConfiguration configuration)
{
var redisConfiguration = configuration.GetSection("Redis").Get<RedisConfiguration>();
var kafkaConfiguration = configuration.GetSection("kafka").Get<KafkaSettings>();
var rabbitMQConfiguration = configuration.GetSection("RabbitMQ").Get<RabbitMQSettings>();
if (redisConfiguration != null)
{
services.AddStackExchangeRedisExtensions<NewtonsoftSerializer>(redisConfiguration);
services.AddSingleton(typeof(ICacheNotify<>), typeof(RedisCacheNotify<>));
}
else if (rabbitMQConfiguration != null)
{
services.AddSingleton(typeof(ICacheNotify<>), typeof(RabbitMQCache<>));
}
else if (kafkaConfiguration != null)
{
services.AddSingleton(typeof(ICacheNotify<>), typeof(KafkaCacheNotify<>));
}
else
{
services.AddSingleton(typeof(ICacheNotify<>), typeof(MemoryCacheNotify<>));
}
}
public static void AddDistributedCache(this IServiceCollection services, IConfiguration configuration)
{
var redisConfiguration = configuration.GetSection("Redis").Get<RedisConfiguration>();
if (redisConfiguration != null)
{
services.AddStackExchangeRedisCache(config =>
{
config.ConfigurationOptions = redisConfiguration.ConfigurationOptions;
});
}
else
{
services.AddDistributedMemoryCache();
}
}
public static void AddEventBus(this IServiceCollection services, IConfiguration configuration)
{
services.AddSingleton<IEventBusSubscriptionsManager, InMemoryEventBusSubscriptionsManager>();
@ -72,18 +115,8 @@ public static class ServiceCollectionExtension
}
else
{
services.AddSingleton<IEventBus, EventBusMemoryCache>(sp =>
{
var cfg = sp.GetRequiredService<IConfiguration>();
var iLifetimeScope = sp.GetRequiredService<ILifetimeScope>();
var logger = sp.GetRequiredService<IOptionsMonitor<ILog>>();
var eventBusSubcriptionsManager = sp.GetRequiredService<IEventBusSubscriptionsManager>();
return new EventBusMemoryCache(logger, iLifetimeScope, eventBusSubcriptionsManager);
});
throw new NotImplementedException("EventBus: Provider not found.");
}
}
/// <remarks>

View File

@ -1,12 +0,0 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFramework>net6.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>disable</Nullable>
<ImplicitUsings>enable</ImplicitUsings>
</PropertyGroup>
<ItemGroup>
<ProjectReference Include="..\ASC.Common\ASC.Common.csproj" />
<ProjectReference Include="..\ASC.EventBus\ASC.EventBus.csproj" />
</ItemGroup>
</Project>

View File

@ -1,158 +0,0 @@
using System.Collections.Concurrent;
using ASC.Common.Logging;
using ASC.EventBus.Exceptions;
using ASC.EventBus.Extensions;
using Microsoft.Extensions.Options;
namespace ASC.EventBus.MemoryCache;
public class EventBusMemoryCache : IEventBus, IDisposable
{
public IEventBusSubscriptionsManager _subsManager;
private readonly ILifetimeScope _autofac;
private readonly ILog _logger;
const string AUTOFAC_SCOPE_NAME = "asc_event_bus";
private static ConcurrentQueue<Guid> _rejectedEvents;
public EventBusMemoryCache(IOptionsMonitor<ILog> options, ILifetimeScope autofac, IEventBusSubscriptionsManager subsManager)
{
_logger = options.CurrentValue ?? throw new ArgumentNullException(nameof(options.CurrentValue));
_autofac = autofac ?? throw new ArgumentNullException(nameof(autofac));
_subsManager = subsManager ?? throw new ArgumentNullException(nameof(subsManager));
_rejectedEvents = new ConcurrentQueue<Guid>();
}
public void Publish(IntegrationEvent @event)
{
var eventName = @event.GetType().Name;
var message = JsonSerializer.Serialize(@event, @event.GetType());
try
{
ProcessEvent(eventName, message)
.GetAwaiter()
.GetResult();
}
catch (IntegrationEventRejectExeption ex)
{
_logger.Warn(String.Format("----- ERROR Processing message \"{0}\"", message), ex);
if (_rejectedEvents.TryPeek(out Guid result) && result.Equals(ex.EventId))
{
_rejectedEvents.TryDequeue(out Guid _);
}
else
{
_rejectedEvents.Enqueue(ex.EventId);
}
}
catch (Exception ex)
{
_logger.Warn(String.Format("----- ERROR Processing message \"{0}\"", message), ex);
}
}
public void Subscribe<T, TH>()
where T : IntegrationEvent
where TH : IIntegrationEventHandler<T>
{
var eventName = _subsManager.GetEventKey<T>();
_logger.InfoFormat("Subscribing to event {EventName} with {EventHandler}", eventName, typeof(TH).GetGenericTypeName());
_subsManager.AddSubscription<T, TH>();
}
public void SubscribeDynamic<TH>(string eventName) where TH : IDynamicIntegrationEventHandler
{
_logger.InfoFormat("Subscribing to dynamic event {EventName} with {EventHandler}", eventName, typeof(TH).Name);
_subsManager.AddDynamicSubscription<TH>(eventName);
}
public void Unsubscribe<T, TH>()
where T : IntegrationEvent
where TH : IIntegrationEventHandler<T>
{
var eventName = _subsManager.GetEventKey<T>();
_logger.InfoFormat("Unsubscribing from event {EventName}", eventName);
_subsManager.RemoveSubscription<T, TH>();
}
public void UnsubscribeDynamic<TH>(string eventName) where TH : IDynamicIntegrationEventHandler
{
_logger.InfoFormat("Unsubscribing from dynamic event {EventName}", eventName);
_subsManager.RemoveDynamicSubscription<TH>(eventName);
}
private IntegrationEvent PreProcessEvent(string eventName, string message)
{
var eventType = _subsManager.GetEventTypeByName(eventName);
var integrationEvent = (IntegrationEvent)JsonSerializer.Deserialize(message, eventType, new JsonSerializerOptions() { PropertyNameCaseInsensitive = true });
if (_rejectedEvents.Count == 0) return integrationEvent;
if (_rejectedEvents.TryPeek(out Guid result) && result.Equals(integrationEvent.Id))
{
integrationEvent.Redelivered = true;
}
return integrationEvent;
}
private async Task ProcessEvent(string eventName, string message)
{
_logger.TraceFormat("Processing MemotyCache event: {EventName}", eventName);
var @event = PreProcessEvent(eventName, message);
if (_subsManager.HasSubscriptionsForEvent(eventName))
{
using (var scope = _autofac.BeginLifetimeScope(AUTOFAC_SCOPE_NAME))
{
var subscriptions = _subsManager.GetHandlersForEvent(eventName);
foreach (var subscription in subscriptions)
{
if (subscription.IsDynamic)
{
var handler = scope.ResolveOptional(subscription.HandlerType) as IDynamicIntegrationEventHandler;
if (handler == null) continue;
using dynamic eventData = @event;
await Task.Yield();
await handler.Handle(eventData);
}
else
{
var handler = scope.ResolveOptional(subscription.HandlerType);
if (handler == null) continue;
var eventType = _subsManager.GetEventTypeByName(eventName);
var concreteType = typeof(IIntegrationEventHandler<>).MakeGenericType(eventType);
await Task.Yield();
await (Task)concreteType.GetMethod("Handle").Invoke(handler, new object[] { @event });
}
}
}
}
else
{
_logger.WarnFormat("No subscription for MemotyCache event: {EventName}", eventName);
}
}
public void Dispose()
{
_subsManager.Clear();
}
}

View File

@ -1,4 +0,0 @@
global using ASC.EventBus.Abstractions;
global using ASC.EventBus.Events;
global using System.Text.Json;
global using Autofac;