2022-04-14 19:23:57 +00:00
// (c) Copyright Ascensio System SIA 2010-2022
//
// This program is a free software product.
// You can redistribute it and/or modify it under the terms
// of the GNU Affero General Public License (AGPL) version 3 as published by the Free Software
// Foundation. In accordance with Section 7(a) of the GNU AGPL its Section 15 shall be amended
// to the effect that Ascensio System SIA expressly excludes the warranty of non-infringement of
// any third-party rights.
//
// This program is distributed WITHOUT ANY WARRANTY, without even the implied warranty
// of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. For details, see
// the GNU AGPL at: http://www.gnu.org/licenses/agpl-3.0.html
//
// You can contact Ascensio System SIA at Lubanas st. 125a-25, Riga, Latvia, EU, LV-1021.
//
// The interactive user interfaces in modified source and object code versions of the Program must
// display Appropriate Legal Notices, as required under Section 5 of the GNU AGPL version 3.
//
// Pursuant to Section 7(b) of the License you must retain the original Product logo when
// distributing the program. Pursuant to Section 7(e) we decline to grant you any rights under
// trademark law for use of our trademarks.
//
// All the Product's GUI elements, including illustrations and icon sets, as well as technical writing
// content are licensed under the terms of the Creative Commons Attribution-ShareAlike 4.0
// International. See the License terms at http://creativecommons.org/licenses/by-sa/4.0/legalcode
using System.Collections.Concurrent ;
2022-03-02 15:25:28 +00:00
using ASC.EventBus.Exceptions ;
2022-03-05 16:46:20 +00:00
using ASC.EventBus.Serializers ;
2022-02-23 12:33:42 +00:00
namespace ASC.EventBus.RabbitMQ ;
2022-02-10 14:42:21 +00:00
public class EventBusRabbitMQ : IEventBus , IDisposable
{
2022-02-23 12:33:42 +00:00
const string EXCHANGE_NAME = "asc_event_bus" ;
const string DEAD_LETTER_EXCHANGE_NAME = "asc_event_bus_dlx" ;
2022-02-10 14:42:21 +00:00
const string AUTOFAC_SCOPE_NAME = "asc_event_bus" ;
private readonly IRabbitMQPersistentConnection _persistentConnection ;
private readonly ILog _logger ;
private readonly IEventBusSubscriptionsManager _subsManager ;
private readonly ILifetimeScope _autofac ;
private readonly int _retryCount ;
2022-04-14 19:23:57 +00:00
private readonly IIntegrationEventSerializer _serializer ;
2022-02-10 14:42:21 +00:00
2022-03-04 15:12:48 +00:00
private string _consumerTag ;
2022-02-10 14:42:21 +00:00
private IModel _consumerChannel ;
private string _queueName ;
2022-04-14 19:23:57 +00:00
private readonly string _deadLetterQueueName ;
2022-02-10 14:42:21 +00:00
2022-03-02 15:25:28 +00:00
private static ConcurrentQueue < Guid > _rejectedEvents ;
2022-03-05 16:46:20 +00:00
public EventBusRabbitMQ ( IRabbitMQPersistentConnection persistentConnection ,
IOptionsMonitor < ILog > options ,
ILifetimeScope autofac ,
IEventBusSubscriptionsManager subsManager ,
IIntegrationEventSerializer serializer ,
string queueName = null ,
int retryCount = 5 )
2022-02-10 14:42:21 +00:00
{
_persistentConnection = persistentConnection ? ? throw new ArgumentNullException ( nameof ( persistentConnection ) ) ;
_logger = options . CurrentValue ? ? throw new ArgumentNullException ( nameof ( options . CurrentValue ) ) ;
_subsManager = subsManager ? ? new InMemoryEventBusSubscriptionsManager ( ) ;
_queueName = queueName ;
2022-02-23 12:33:42 +00:00
_deadLetterQueueName = $"{_queueName}_dlx" ;
2022-02-10 14:42:21 +00:00
_consumerChannel = CreateConsumerChannel ( ) ;
_autofac = autofac ;
_retryCount = retryCount ;
_subsManager . OnEventRemoved + = SubsManager_OnEventRemoved ;
2022-03-05 16:46:20 +00:00
_serializer = serializer ;
2022-03-02 15:25:28 +00:00
_rejectedEvents = new ConcurrentQueue < Guid > ( ) ;
2022-02-10 14:42:21 +00:00
}
2022-02-14 11:18:58 +00:00
private void SubsManager_OnEventRemoved ( object sender , string eventName )
2022-02-10 14:42:21 +00:00
{
if ( ! _persistentConnection . IsConnected )
{
_persistentConnection . TryConnect ( ) ;
}
using ( var channel = _persistentConnection . CreateModel ( ) )
{
channel . QueueUnbind ( queue : _queueName ,
2022-02-23 12:33:42 +00:00
exchange : EXCHANGE_NAME ,
2022-02-10 14:42:21 +00:00
routingKey : eventName ) ;
if ( _subsManager . IsEmpty )
{
_queueName = string . Empty ;
_consumerChannel . Close ( ) ;
}
}
}
public void Publish ( IntegrationEvent @event )
{
if ( ! _persistentConnection . IsConnected )
{
_persistentConnection . TryConnect ( ) ;
}
var policy = Policy . Handle < BrokerUnreachableException > ( )
. Or < SocketException > ( )
. WaitAndRetry ( _retryCount , retryAttempt = > TimeSpan . FromSeconds ( Math . Pow ( 2 , retryAttempt ) ) , ( ex , time ) = >
{
2022-04-25 18:02:18 +00:00
_logger . LogWarning ( ex , string . Format ( "Could not publish event: {EventId} after {Timeout}s ({ExceptionMessage})" , @event . Id , $"{time.TotalSeconds:n1}" , ex . Message ) ) ;
2022-02-10 14:42:21 +00:00
} ) ;
var eventName = @event . GetType ( ) . Name ;
2022-04-25 18:02:18 +00:00
_logger . LogTrace ( "Creating RabbitMQ channel to publish event: {EventId} ({EventName})" , @event . Id , eventName ) ;
2022-02-10 14:42:21 +00:00
using ( var channel = _persistentConnection . CreateModel ( ) )
{
2022-04-25 18:02:18 +00:00
_logger . LogTrace ( "Declaring RabbitMQ exchange to publish event: {EventId}" , @event . Id ) ;
2022-02-10 14:42:21 +00:00
2022-04-14 19:23:57 +00:00
channel . ExchangeDeclare ( exchange : EXCHANGE_NAME , type : "direct" ) ;
var body = _serializer . Serialize ( @event ) ;
2022-02-10 14:42:21 +00:00
policy . Execute ( ( ) = >
{
var properties = channel . CreateBasicProperties ( ) ;
properties . DeliveryMode = 2 ; // persistent
2022-04-25 18:02:18 +00:00
_logger . LogTrace ( "Publishing event to RabbitMQ: {EventId}" , @event . Id ) ;
2022-02-10 14:42:21 +00:00
channel . BasicPublish (
2022-02-23 12:33:42 +00:00
exchange : EXCHANGE_NAME ,
2022-02-10 14:42:21 +00:00
routingKey : eventName ,
mandatory : true ,
basicProperties : properties ,
body : body ) ;
} ) ;
}
}
public void SubscribeDynamic < TH > ( string eventName )
where TH : IDynamicIntegrationEventHandler
{
2022-04-25 18:02:18 +00:00
_logger . LogInformation ( "Subscribing to dynamic event {EventName} with {EventHandler}" , eventName , typeof ( TH ) . GetGenericTypeName ( ) ) ;
2022-02-10 14:42:21 +00:00
DoInternalSubscription ( eventName ) ;
_subsManager . AddDynamicSubscription < TH > ( eventName ) ;
StartBasicConsume ( ) ;
}
public void Subscribe < T , TH > ( )
where T : IntegrationEvent
where TH : IIntegrationEventHandler < T >
{
var eventName = _subsManager . GetEventKey < T > ( ) ;
DoInternalSubscription ( eventName ) ;
2022-04-25 18:02:18 +00:00
_logger . LogInformation ( "Subscribing to event {EventName} with {EventHandler}" , eventName , typeof ( TH ) . GetGenericTypeName ( ) ) ;
2022-02-10 14:42:21 +00:00
_subsManager . AddSubscription < T , TH > ( ) ;
StartBasicConsume ( ) ;
}
private void DoInternalSubscription ( string eventName )
{
var containsKey = _subsManager . HasSubscriptionsForEvent ( eventName ) ;
if ( ! containsKey )
{
if ( ! _persistentConnection . IsConnected )
{
_persistentConnection . TryConnect ( ) ;
}
2022-02-23 12:33:42 +00:00
_consumerChannel . QueueBind ( queue : _deadLetterQueueName ,
exchange : DEAD_LETTER_EXCHANGE_NAME ,
routingKey : eventName ) ;
2022-02-10 14:42:21 +00:00
_consumerChannel . QueueBind ( queue : _queueName ,
2022-02-23 12:33:42 +00:00
exchange : EXCHANGE_NAME ,
2022-02-10 14:42:21 +00:00
routingKey : eventName ) ;
}
}
public void Unsubscribe < T , TH > ( )
where T : IntegrationEvent
where TH : IIntegrationEventHandler < T >
{
var eventName = _subsManager . GetEventKey < T > ( ) ;
2022-04-25 18:02:18 +00:00
_logger . LogInformation ( "Unsubscribing from event {EventName}" , eventName ) ;
2022-02-10 14:42:21 +00:00
_subsManager . RemoveSubscription < T , TH > ( ) ;
}
public void UnsubscribeDynamic < TH > ( string eventName )
where TH : IDynamicIntegrationEventHandler
{
_subsManager . RemoveDynamicSubscription < TH > ( eventName ) ;
}
public void Dispose ( )
{
if ( _consumerChannel ! = null )
{
_consumerChannel . Dispose ( ) ;
}
_subsManager . Clear ( ) ;
}
private void StartBasicConsume ( )
{
2022-04-25 18:02:18 +00:00
_logger . LogTrace ( "Starting RabbitMQ basic consume" ) ;
2022-02-10 14:42:21 +00:00
if ( _consumerChannel ! = null )
{
2022-03-04 15:12:48 +00:00
if ( ! String . IsNullOrEmpty ( _consumerTag ) )
{
2022-04-25 18:02:18 +00:00
_logger . LogTrace ( "Consumer tag {ConsumerTag} already exist. Cancelled BasicConsume again" , _consumerTag ) ;
2022-03-04 15:12:48 +00:00
return ;
}
2022-02-10 14:42:21 +00:00
var consumer = new AsyncEventingBasicConsumer ( _consumerChannel ) ;
consumer . Received + = Consumer_Received ;
2022-03-04 15:12:48 +00:00
_consumerTag = _consumerChannel . BasicConsume (
2022-02-10 14:42:21 +00:00
queue : _queueName ,
autoAck : false ,
consumer : consumer ) ;
}
else
{
2022-04-25 18:02:18 +00:00
_logger . LogError ( "StartBasicConsume can't call on _consumerChannel == null" ) ;
2022-02-10 14:42:21 +00:00
}
}
private async Task Consumer_Received ( object sender , BasicDeliverEventArgs eventArgs )
{
var eventName = eventArgs . RoutingKey ;
2022-03-05 16:46:20 +00:00
var @event = GetEvent ( eventName , eventArgs . Body . Span . ToArray ( ) ) ;
var message = @event . ToString ( ) ;
2022-02-10 14:42:21 +00:00
try
{
if ( message . ToLowerInvariant ( ) . Contains ( "throw-fake-exception" ) )
{
throw new InvalidOperationException ( $"Fake exception requested: \" { message } \ "" ) ;
}
2022-03-05 16:46:20 +00:00
await ProcessEvent ( eventName , @event ) ;
2022-02-23 12:33:42 +00:00
2022-03-05 16:46:20 +00:00
_consumerChannel . BasicAck ( eventArgs . DeliveryTag , multiple : false ) ;
2022-02-10 14:42:21 +00:00
}
2022-02-23 12:33:42 +00:00
catch ( IntegrationEventRejectExeption ex )
2022-02-10 14:42:21 +00:00
{
2022-04-25 18:02:18 +00:00
_logger . LogWarning ( ex , string . Format ( "----- ERROR Processing message \"{0}\"" , message ) ) ;
2022-02-23 12:33:42 +00:00
if ( eventArgs . Redelivered )
2022-03-02 15:25:28 +00:00
{
2022-04-14 19:23:57 +00:00
if ( _rejectedEvents . TryPeek ( out var result ) & & result . Equals ( ex . EventId ) )
2022-03-02 15:25:28 +00:00
{
2022-04-14 19:23:57 +00:00
_rejectedEvents . TryDequeue ( out var _ ) ;
2022-03-02 15:25:28 +00:00
_consumerChannel . BasicReject ( eventArgs . DeliveryTag , requeue : false ) ;
}
else
{
_rejectedEvents . Enqueue ( ex . EventId ) ;
}
}
2022-02-23 12:33:42 +00:00
else
2022-03-02 15:25:28 +00:00
{
2022-02-23 12:33:42 +00:00
_consumerChannel . BasicNack ( eventArgs . DeliveryTag , multiple : false , requeue : true ) ;
2022-03-02 15:25:28 +00:00
}
2022-02-10 14:42:21 +00:00
}
2022-02-23 12:33:42 +00:00
catch ( Exception ex )
{
2022-04-25 18:02:18 +00:00
_logger . LogWarning ( ex , string . Format ( "----- ERROR Processing message \"{0}\"" , message ) ) ;
2022-02-10 14:42:21 +00:00
2022-02-23 12:33:42 +00:00
_consumerChannel . BasicAck ( eventArgs . DeliveryTag , multiple : false ) ;
}
2022-02-10 14:42:21 +00:00
}
private IModel CreateConsumerChannel ( )
{
if ( ! _persistentConnection . IsConnected )
{
_persistentConnection . TryConnect ( ) ;
}
2022-04-25 18:02:18 +00:00
_logger . LogTrace ( "Creating RabbitMQ consumer channel" ) ;
2022-02-10 14:42:21 +00:00
var channel = _persistentConnection . CreateModel ( ) ;
2022-02-23 12:33:42 +00:00
channel . ExchangeDeclare ( exchange : EXCHANGE_NAME ,
type : "direct" ) ;
2022-03-05 16:46:20 +00:00
channel . ExchangeDeclare ( exchange : DEAD_LETTER_EXCHANGE_NAME ,
2022-02-10 14:42:21 +00:00
type : "direct" ) ;
2022-02-23 12:33:42 +00:00
channel . QueueDeclare ( queue : _deadLetterQueueName ,
durable : true ,
exclusive : false ,
2022-03-05 16:46:20 +00:00
autoDelete : false ,
2022-02-23 12:33:42 +00:00
arguments : null ) ;
var arguments = new Dictionary < string , object > ( ) ;
arguments . Add ( "x-dead-letter-exchange" , DEAD_LETTER_EXCHANGE_NAME ) ;
2022-02-10 14:42:21 +00:00
channel . QueueDeclare ( queue : _queueName ,
durable : true ,
exclusive : false ,
autoDelete : false ,
2022-02-23 12:33:42 +00:00
arguments : arguments ) ;
2022-02-10 14:42:21 +00:00
channel . CallbackException + = ( sender , ea ) = >
{
2022-04-25 18:02:18 +00:00
_logger . LogWarning ( "Recreating RabbitMQ consumer channel" , ( object ) ea . Exception ) ;
2022-02-10 14:42:21 +00:00
_consumerChannel . Dispose ( ) ;
_consumerChannel = CreateConsumerChannel ( ) ;
2022-03-04 15:12:48 +00:00
_consumerTag = String . Empty ;
2022-03-05 16:46:20 +00:00
2022-02-10 14:42:21 +00:00
StartBasicConsume ( ) ;
} ;
return channel ;
}
2022-03-05 16:46:20 +00:00
private IntegrationEvent GetEvent ( string eventName , byte [ ] serializedMessage )
2022-03-02 15:25:28 +00:00
{
var eventType = _subsManager . GetEventTypeByName ( eventName ) ;
2022-03-05 16:46:20 +00:00
var integrationEvent = ( IntegrationEvent ) _serializer . Deserialize ( serializedMessage , eventType ) ;
2022-03-02 15:25:28 +00:00
2022-03-05 16:46:20 +00:00
return integrationEvent ;
2022-03-02 15:25:28 +00:00
}
2022-03-05 16:46:20 +00:00
private void PreProcessEvent ( IntegrationEvent @event )
2022-03-02 15:25:28 +00:00
{
2022-04-14 19:23:57 +00:00
if ( _rejectedEvents . Count = = 0 )
{
return ;
}
if ( _rejectedEvents . TryPeek ( out var result ) & & result . Equals ( @event . Id ) )
2022-03-02 15:25:28 +00:00
{
2022-03-05 16:46:20 +00:00
@event . Redelivered = true ;
2022-03-02 15:25:28 +00:00
}
}
2022-03-05 16:46:20 +00:00
private async Task ProcessEvent ( string eventName , IntegrationEvent @event )
2022-02-10 14:42:21 +00:00
{
2022-04-25 18:02:18 +00:00
_logger . LogTrace ( "Processing RabbitMQ event: {EventName}" , eventName ) ;
2022-02-10 14:42:21 +00:00
2022-03-05 16:46:20 +00:00
PreProcessEvent ( @event ) ;
2022-03-02 15:25:28 +00:00
2022-02-10 14:42:21 +00:00
if ( _subsManager . HasSubscriptionsForEvent ( eventName ) )
{
using ( var scope = _autofac . BeginLifetimeScope ( AUTOFAC_SCOPE_NAME ) )
{
var subscriptions = _subsManager . GetHandlersForEvent ( eventName ) ;
2022-03-02 15:25:28 +00:00
2022-02-10 14:42:21 +00:00
foreach ( var subscription in subscriptions )
{
if ( subscription . IsDynamic )
{
var handler = scope . ResolveOptional ( subscription . HandlerType ) as IDynamicIntegrationEventHandler ;
2022-04-14 19:23:57 +00:00
if ( handler = = null )
{
continue ;
}
2022-03-02 15:25:28 +00:00
using dynamic eventData = @event ;
2022-02-10 14:42:21 +00:00
await Task . Yield ( ) ;
await handler . Handle ( eventData ) ;
}
else
{
var handler = scope . ResolveOptional ( subscription . HandlerType ) ;
2022-04-14 19:23:57 +00:00
if ( handler = = null )
{
continue ;
}
2022-02-10 14:42:21 +00:00
var eventType = _subsManager . GetEventTypeByName ( eventName ) ;
var concreteType = typeof ( IIntegrationEventHandler < > ) . MakeGenericType ( eventType ) ;
await Task . Yield ( ) ;
2022-03-02 15:25:28 +00:00
await ( Task ) concreteType . GetMethod ( "Handle" ) . Invoke ( handler , new object [ ] { @event } ) ;
2022-02-10 14:42:21 +00:00
}
}
}
}
else
{
2022-04-25 18:02:18 +00:00
_logger . LogWarning ( "No subscription for RabbitMQ event: {EventName}" , eventName ) ;
2022-02-10 14:42:21 +00:00
}
}
}