Merge pull request #558 from ONLYOFFICE/feature/asc-notify-refactor

Feature/asc notify refactor
This commit is contained in:
Alexey Bannov 2022-02-25 18:19:06 +03:00 committed by GitHub
commit bdb7672aaf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
26 changed files with 690 additions and 876 deletions

View File

@ -23,11 +23,17 @@
*
*/
using ASC.Common.Mapping.PrimitiveTypeConverters;
namespace ASC.Common.Mapping;
public class MappingProfile : Profile
{
public MappingProfile() => Array.ForEach(AppDomain.CurrentDomain.GetAssemblies(), a => ApplyMappingsFromAssembly(a));
public MappingProfile()
{
Array.ForEach(AppDomain.CurrentDomain.GetAssemblies(), a => ApplyMappingsFromAssembly(a));
ApplyPrimitiveMappers();
}
private void ApplyMappingsFromAssembly(Assembly assembly)
{
@ -50,4 +56,10 @@ public class MappingProfile : Profile
methodInfo?.Invoke(instance, new object[] { this });
}
}
private void ApplyPrimitiveMappers()
{
CreateMap<long, DateTime>().ReverseMap()
.ConvertUsing<TimeConverter>();
}
}

View File

@ -0,0 +1,14 @@
namespace ASC.Common.Mapping.PrimitiveTypeConverters;
public class TimeConverter : ITypeConverter<long, DateTime>, ITypeConverter<DateTime, long>
{
public DateTime Convert(long source, DateTime destination, ResolutionContext context)
{
return new DateTime(source);
}
public long Convert(DateTime source, long destination, ResolutionContext context)
{
return source.Ticks;
}
}

View File

@ -1,6 +1,6 @@
namespace ASC.Core.Common.EF.Model
{
public class NotifyQueue
public class NotifyQueue : IMapFrom<NotifyMessage>
{
public int NotifyId { get; set; }
public int TenantId { get; set; }

View File

@ -39,6 +39,7 @@ global using ASC.Collections;
global using ASC.Common;
global using ASC.Common.Caching;
global using ASC.Common.Logging;
global using ASC.Common.Mapping;
global using ASC.Common.Module;
global using ASC.Common.Notify.Engine;
global using ASC.Common.Notify.Patterns;
@ -84,6 +85,8 @@ global using ASC.Notify.Sinks;
global using ASC.Security.Cryptography;
global using ASC.Web.Studio.Utility;
global using AutoMapper;
global using Autofac;
global using MailKit.Security;

View File

@ -77,7 +77,7 @@ namespace ASC.Core.Notify
Subject = message.Subject.Trim(' ', '\t', '\n', '\r'),
ContentType = message.ContentType,
Content = message.Body,
Sender = senderName,
SenderType = senderName,
CreationDate = DateTime.UtcNow.Ticks,
};
@ -87,7 +87,7 @@ namespace ASC.Core.Notify
var (tenantManager, configuration, options) = scopeClass;
var tenant = tenantManager.GetCurrentTenant(false);
m.Tenant = tenant == null ? Tenant.DEFAULT_TENANT : tenant.TenantId;
m.TenantId = tenant == null ? Tenant.DEFAULT_TENANT : tenant.TenantId;
var from = MailAddressUtils.Create(configuration.SmtpSettings.SenderAddress, configuration.SmtpSettings.SenderDisplayName);
var fromTag = message.Arguments.FirstOrDefault(x => x.Tag.Equals("MessageFrom"));
@ -100,14 +100,14 @@ namespace ASC.Core.Notify
}
catch { }
}
m.From = from.ToString();
m.Sender = from.ToString();
var to = new List<string>();
foreach (var address in message.Recipient.Addresses)
{
to.Add(MailAddressUtils.Create(address, message.Recipient.Name).ToString());
}
m.To = string.Join("|", to.ToArray());
m.Reciever = string.Join("|", to.ToArray());
var replyTag = message.Arguments.FirstOrDefault(x => x.Tag == "replyto");
if (replyTag != null && replyTag.Value is string value)
@ -131,7 +131,7 @@ namespace ASC.Core.Notify
var attachmentTag = message.Arguments.FirstOrDefault(x => x.Tag == "EmbeddedAttachments");
if (attachmentTag != null && attachmentTag.Value != null)
{
m.EmbeddedAttachments.AddRange(attachmentTag.Value as NotifyMessageAttachment[]);
m.Attachments.AddRange(attachmentTag.Value as NotifyMessageAttachment[]);
}
var autoSubmittedTag = message.Arguments.FirstOrDefault(x => x.Tag == "AutoSubmitted");

View File

@ -56,16 +56,16 @@ namespace ASC.Core.Notify
{
var m = new NotifyMessage
{
To = username,
Reciever = username,
Subject = message.Subject,
ContentType = message.ContentType,
Content = message.Body,
Sender = senderName,
SenderType = senderName,
CreationDate = DateTime.UtcNow.Ticks,
};
var tenant = tenantManager.GetCurrentTenant(false);
m.Tenant = tenant == null ? Tenant.DEFAULT_TENANT : tenant.TenantId;
m.TenantId = tenant == null ? Tenant.DEFAULT_TENANT : tenant.TenantId;
sender.Send(m);
}

View File

@ -0,0 +1,10 @@
namespace ASC.Notify.Messages;
public partial class NotifyMessage : IMapFrom<NotifyQueue>
{
public void Mapping(Profile profile)
{
profile.CreateMap<NotifyQueue, NotifyMessage>()
.ForMember(dest => dest.Attachments, opt => opt.Ignore());
}
}

View File

@ -60,11 +60,11 @@ namespace ASC.Core.Notify.Senders
{
try
{
Log.DebugFormat("Tenant: {0}, To: {1}", m.Tenant, m.To);
Log.DebugFormat("Tenant: {0}, To: {1}", m.TenantId, m.Reciever);
using var scope = ServiceProvider.CreateScope();
var scopeClass = scope.ServiceProvider.GetService<AWSSenderScope>();
var (tenantManager, configuration) = scopeClass;
tenantManager.SetCurrentTenant(m.Tenant);
tenantManager.SetCurrentTenant(m.TenantId);
if (!configuration.SmtpSettings.IsDefaultSettings)
{
@ -81,7 +81,7 @@ namespace ASC.Core.Notify.Senders
}
catch (Exception e)
{
Log.ErrorFormat("Tenant: {0}, To: {1} - {2}", m.Tenant, m.To, e);
Log.ErrorFormat("Tenant: {0}, To: {1} - {2}", m.TenantId, m.Reciever, e);
throw;
}
}
@ -130,7 +130,7 @@ namespace ASC.Core.Notify.Senders
var dest = new Destination
{
ToAddresses = m.To.Split(new[] { '|' }, StringSplitOptions.RemoveEmptyEntries).Select(a => MailAddressUtils.Create(a).Address).ToList(),
ToAddresses = m.Reciever.Split(new[] { '|' }, StringSplitOptions.RemoveEmptyEntries).Select(a => MailAddressUtils.Create(a).Address).ToList(),
};
var subject = new Content(MimeHeaderUtils.EncodeMime(m.Subject)) { Charset = Encoding.UTF8.WebName, };
@ -148,7 +148,7 @@ namespace ASC.Core.Notify.Senders
body = new Body(new Content(m.Content) { Charset = Encoding.UTF8.WebName });
}
var from = MailAddressUtils.Create(m.From).ToEncodedString();
var from = MailAddressUtils.Create(m.Sender).ToEncodedString();
var request = new SendEmailRequest { Source = from, Destination = dest, Message = new Message(subject, body) };
if (!string.IsNullOrEmpty(m.ReplyTo))
{

View File

@ -54,7 +54,7 @@ namespace ASC.Core.Notify.Senders
{
using var scope = ServiceProvider.CreateScope();
var service = scope.ServiceProvider.GetService<JabberServiceClient>();
service.SendMessage(m.Tenant, null, m.To, text, m.Subject);
service.SendMessage(m.TenantId, null, m.Reciever, text, m.Subject);
}
catch (Exception e)
{

View File

@ -84,7 +84,7 @@ namespace ASC.Core.Notify.Senders
using var scope = ServiceProvider.CreateScope();
var scopeClass = scope.ServiceProvider.GetService<SmtpSenderScope>();
var (tenantManager, configuration) = scopeClass;
tenantManager.SetCurrentTenant(m.Tenant);
tenantManager.SetCurrentTenant(m.TenantId);
var smtpClient = GetSmtpClient();
var result = NoticeSendResult.TryOnceAgain;
@ -112,7 +112,7 @@ namespace ASC.Core.Notify.Senders
}
catch (Exception e)
{
Log.ErrorFormat("Tenant: {0}, To: {1} - {2}", m.Tenant, m.To, e);
Log.ErrorFormat("Tenant: {0}, To: {1} - {2}", m.TenantId, m.Reciever, e);
throw;
}
}
@ -177,11 +177,11 @@ namespace ASC.Core.Notify.Senders
Subject = m.Subject
};
var fromAddress = MailboxAddress.Parse(ParserOptions.Default, m.From);
var fromAddress = MailboxAddress.Parse(ParserOptions.Default, m.Sender);
mimeMessage.From.Add(fromAddress);
foreach (var to in m.To.Split(new[] { '|' }, StringSplitOptions.RemoveEmptyEntries))
foreach (var to in m.Reciever.Split(new[] { '|' }, StringSplitOptions.RemoveEmptyEntries))
{
mimeMessage.To.Add(MailboxAddress.Parse(ParserOptions.Default, to));
}
@ -202,14 +202,14 @@ namespace ASC.Core.Notify.Senders
ContentTransferEncoding = ContentEncoding.QuotedPrintable
};
if (m.EmbeddedAttachments != null && m.EmbeddedAttachments.Count > 0)
if (m.Attachments != null && m.Attachments.Count > 0)
{
var multipartRelated = new MultipartRelated
{
Root = htmlPart
};
foreach (var attachment in m.EmbeddedAttachments)
foreach (var attachment in m.Attachments)
{
var mimeEntity = ConvertAttachmentToMimePart(attachment);
if (mimeEntity != null)

View File

@ -45,11 +45,11 @@ namespace ASC.Core.Notify
const SendResult result = SendResult.OK;
var m = new NotifyMessage
{
To = message.Recipient.ID,
Reciever = message.Recipient.ID,
Subject = message.Subject,
ContentType = message.ContentType,
Content = message.Body,
Sender = senderName,
SenderType = senderName,
CreationDate = DateTime.UtcNow.Ticks,
};
@ -57,7 +57,7 @@ namespace ASC.Core.Notify
var tenantManager = scope.ServiceProvider.GetService<TenantManager>();
var tenant = tenantManager.GetCurrentTenant(false);
m.Tenant = tenant == null ? Tenant.DEFAULT_TENANT : tenant.TenantId;
m.TenantId = tenant == null ? Tenant.DEFAULT_TENANT : tenant.TenantId;
sender.Send(m);

View File

@ -3,17 +3,17 @@
package ASC.Notify.Messages;
message NotifyMessage {
int32 tenant = 1;
string sender = 2;
string from = 3;
string to = 4;
int32 tenant_id = 1;
string sender_type = 2;
string sender = 3;
string reciever = 4;
string reply_to = 5;
string subject = 6;
string content_type = 7;
string content = 8;
int64 creation_date = 9;
int32 priority = 10;
repeated NotifyMessageAttachment embedded_attachments = 11;
repeated NotifyMessageAttachment attachments = 11;
string auto_submitted = 12;
}
message NotifyMessageAttachment {

View File

@ -7,6 +7,7 @@
<RazorCompileOnBuild>false</RazorCompileOnBuild>
<GenerateMvcApplicationPartsAssemblyAttributes>false</GenerateMvcApplicationPartsAssemblyAttributes>
<AppendTargetFrameworkToOutputPath>false</AppendTargetFrameworkToOutputPath>
<ImplicitUsings>enable</ImplicitUsings>
</PropertyGroup>
<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Release|AnyCPU'">

View File

@ -23,108 +23,107 @@
*
*/
namespace ASC.Notify.Config
namespace ASC.Notify.Config;
[Singletone]
public class ConfigureNotifyServiceCfg : IConfigureOptions<NotifyServiceCfg>
{
[Singletone]
public class ConfigureNotifyServiceCfg : IConfigureOptions<NotifyServiceCfg>
public ConfigureNotifyServiceCfg(IServiceProvider serviceProvider)
{
public ConfigureNotifyServiceCfg(IServiceProvider serviceProvider)
{
ServiceProvider = serviceProvider;
}
private IServiceProvider ServiceProvider { get; }
public void Configure(NotifyServiceCfg options)
{
options.Init(ServiceProvider);
}
_serviceProvider = serviceProvider;
}
[Singletone(typeof(ConfigureNotifyServiceCfg))]
public class NotifyServiceCfg
private readonly IServiceProvider _serviceProvider;
public void Configure(NotifyServiceCfg options)
{
public string ConnectionStringName { get; set; }
public int StoreMessagesDays { get; set; }
public string ServerRoot { get; set; }
public NotifyServiceCfgProcess Process { get; set; }
public List<NotifyServiceCfgSender> Senders { get; set; }
public List<NotifyServiceCfgScheduler> Schedulers { get; set; }
options.Init(_serviceProvider);
}
}
public void Init(IServiceProvider serviceProvider)
[Singletone(typeof(ConfigureNotifyServiceCfg))]
public class NotifyServiceCfg
{
public string ConnectionStringName { get; set; }
public int StoreMessagesDays { get; set; }
public string ServerRoot { get; set; }
public NotifyServiceCfgProcess Process { get; set; }
public List<NotifyServiceCfgSender> Senders { get; set; }
public List<NotifyServiceCfgScheduler> Schedulers { get; set; }
public void Init(IServiceProvider serviceProvider)
{
ServerRoot = string.IsNullOrEmpty(ServerRoot) ? "http://*/" : ServerRoot;
Process.Init();
foreach (var s in Senders)
{
ServerRoot = string.IsNullOrEmpty(ServerRoot) ? "http://*/" : ServerRoot;
Process.Init();
foreach (var s in Senders)
try
{
try
{
s.Init(serviceProvider);
}
catch (Exception)
{
}
s.Init(serviceProvider);
}
foreach (var s in Schedulers)
catch (Exception)
{
try
{
s.Init();
}
catch (Exception)
{
}
}
}
}
public class NotifyServiceCfgProcess
{
public int MaxThreads { get; set; }
public int BufferSize { get; set; }
public int MaxAttempts { get; set; }
public string AttemptsInterval { get; set; }
public void Init()
foreach (var s in Schedulers)
{
if (MaxThreads == 0)
try
{
MaxThreads = Environment.ProcessorCount;
s.Init();
}
}
}
catch (Exception)
{
public class NotifyServiceCfgSender
{
public string Name { get; set; }
public string Type { get; set; }
public Dictionary<string, string> Properties { get; set; }
public INotifySender NotifySender { get; set; }
public void Init(IServiceProvider serviceProvider)
{
var sender = (INotifySender)serviceProvider.GetService(System.Type.GetType(Type, true));
sender.Init(Properties);
NotifySender = sender;
}
}
public class NotifyServiceCfgScheduler
{
public string Name { get; set; }
public string Register { get; set; }
public MethodInfo MethodInfo { get; set; }
public void Init()
{
var typeName = Register.Substring(0, Register.IndexOf(','));
var assemblyName = Register.Substring(Register.IndexOf(','));
var type = Type.GetType(typeName.Substring(0, typeName.LastIndexOf('.')) + assemblyName, true);
MethodInfo = type.GetMethod(typeName.Substring(typeName.LastIndexOf('.') + 1), BindingFlags.Static | BindingFlags.NonPublic | BindingFlags.Public);
}
}
}
}
public class NotifyServiceCfgProcess
{
public int MaxThreads { get; set; }
public int BufferSize { get; set; }
public int MaxAttempts { get; set; }
public string AttemptsInterval { get; set; }
public void Init()
{
if (MaxThreads == 0)
{
MaxThreads = Environment.ProcessorCount;
}
}
}
public class NotifyServiceCfgSender
{
public string Name { get; set; }
public string Type { get; set; }
public Dictionary<string, string> Properties { get; set; }
public INotifySender NotifySender { get; set; }
public void Init(IServiceProvider serviceProvider)
{
var sender = (INotifySender)serviceProvider.GetService(System.Type.GetType(Type, true));
sender.Init(Properties);
NotifySender = sender;
}
}
public class NotifyServiceCfgScheduler
{
public string Name { get; set; }
public string Register { get; set; }
public MethodInfo MethodInfo { get; set; }
public void Init()
{
var typeName = Register.Substring(0, Register.IndexOf(','));
var assemblyName = Register.Substring(Register.IndexOf(','));
var type = Type.GetType(string.Concat(typeName.AsSpan(0, typeName.LastIndexOf('.')), assemblyName), true);
MethodInfo = type.GetMethod(typeName.Substring(typeName.LastIndexOf('.') + 1), BindingFlags.Static | BindingFlags.NonPublic | BindingFlags.Public);
}
}

View File

@ -23,191 +23,167 @@
*
*/
namespace ASC.Notify
namespace ASC.Notify;
[Singletone(Additional = typeof(DbWorkerExtension))]
public class DbWorker
{
[Singletone(Additional = typeof(DbWorkerExtension))]
public class DbWorker
private readonly string _dbid;
private readonly object _syncRoot = new object();
private readonly IMapper _mapper;
private readonly IServiceScopeFactory _serviceScopeFactory;
public NotifyServiceCfg NotifyServiceCfg { get; }
public DbWorker(IServiceScopeFactory serviceScopeFactory, IOptions<NotifyServiceCfg> notifyServiceCfg, IMapper mapper)
{
private readonly string dbid;
private readonly object syncRoot = new object();
_serviceScopeFactory = serviceScopeFactory;
NotifyServiceCfg = notifyServiceCfg.Value;
_dbid = NotifyServiceCfg.ConnectionStringName;
_mapper = mapper;
}
private IServiceProvider ServiceProvider { get; }
public NotifyServiceCfg NotifyServiceCfg { get; }
public int SaveMessage(NotifyMessage m)
{
using var scope = _serviceScopeFactory.CreateScope();
using var dbContext = scope.ServiceProvider.GetService<DbContextManager<NotifyDbContext>>().Get(_dbid);
using var tx = dbContext.Database.BeginTransaction(IsolationLevel.ReadCommitted);
public DbWorker(IServiceProvider serviceProvider, IOptions<NotifyServiceCfg> notifyServiceCfg)
var notifyQueue = _mapper.Map<NotifyMessage, NotifyQueue>(m);
notifyQueue = dbContext.NotifyQueue.Add(notifyQueue).Entity;
dbContext.SaveChanges();
var id = notifyQueue.NotifyId;
var info = new NotifyInfo
{
ServiceProvider = serviceProvider;
NotifyServiceCfg = notifyServiceCfg.Value;
dbid = NotifyServiceCfg.ConnectionStringName;
}
NotifyId = id,
State = 0,
Attempts = 0,
ModifyDate = DateTime.UtcNow,
Priority = m.Priority
};
public int SaveMessage(NotifyMessage m)
dbContext.NotifyInfo.Add(info);
dbContext.SaveChanges();
tx.Commit();
return 1;
}
public IDictionary<int, NotifyMessage> GetMessages(int count)
{
lock (_syncRoot)
{
using var scope = ServiceProvider.CreateScope();
using var dbContext = scope.ServiceProvider.GetService<DbContextManager<NotifyDbContext>>().Get(dbid);
using var tx = dbContext.Database.BeginTransaction(IsolationLevel.ReadCommitted);
using var scope = _serviceScopeFactory.CreateScope();
using var dbContext = scope.ServiceProvider.GetService<DbContextManager<NotifyDbContext>>().Get(_dbid);
using var tx = dbContext.Database.BeginTransaction();
var notifyQueue = new NotifyQueue
{
NotifyId = 0,
TenantId = m.Tenant,
Sender = m.From,
Reciever = m.To,
Subject = m.Subject,
ContentType = m.ContentType,
Content = m.Content,
SenderType = m.Sender,
CreationDate = new DateTime(m.CreationDate),
ReplyTo = m.ReplyTo,
Attachments = m.EmbeddedAttachments.ToString(),
AutoSubmitted = m.AutoSubmitted
};
notifyQueue = dbContext.NotifyQueue.Add(notifyQueue).Entity;
dbContext.SaveChanges();
var id = notifyQueue.NotifyId;
var info = new NotifyInfo
{
NotifyId = id,
State = 0,
Attempts = 0,
ModifyDate = DateTime.UtcNow,
Priority = m.Priority
};
dbContext.NotifyInfo.Add(info);
dbContext.SaveChanges();
tx.Commit();
return 1;
}
public IDictionary<int, NotifyMessage> GetMessages(int count)
{
lock (syncRoot)
{
using var scope = ServiceProvider.CreateScope();
using var dbContext = scope.ServiceProvider.GetService<DbContextManager<NotifyDbContext>>().Get(dbid);
using var tx = dbContext.Database.BeginTransaction();
var q = dbContext.NotifyQueue
.Join(dbContext.NotifyInfo, r => r.NotifyId, r => r.NotifyId, (queue, info) => new { queue, info })
.Where(r => r.info.State == (int)MailSendingState.NotSended || r.info.State == (int)MailSendingState.Error && r.info.ModifyDate < DateTime.UtcNow - TimeSpan.Parse(NotifyServiceCfg.Process.AttemptsInterval))
.OrderBy(i => i.info.Priority)
.ThenBy(i => i.info.NotifyId)
.Take(count);
var q = dbContext.NotifyQueue
.Join(dbContext.NotifyInfo, r => r.NotifyId, r => r.NotifyId, (queue, info) => new { queue, info })
.Where(r => r.info.State == (int)MailSendingState.NotSended || r.info.State == (int)MailSendingState.Error && r.info.ModifyDate < DateTime.UtcNow - TimeSpan.Parse(NotifyServiceCfg.Process.AttemptsInterval))
.OrderBy(i => i.info.Priority)
.ThenBy(i => i.info.NotifyId)
.Take(count);
var messages = q
.ToDictionary(
r => r.queue.NotifyId,
r =>
var messages = q
.ToDictionary(
r => r.queue.NotifyId,
r =>
{
var res = _mapper.Map<NotifyQueue, NotifyMessage>(r.queue);
try
{
res.Attachments.AddRange(JsonConvert.DeserializeObject<RepeatedField<NotifyMessageAttachment>>(r.queue.Attachments));
}
catch (Exception)
{
var res = new NotifyMessage
{
Tenant = r.queue.TenantId,
From = r.queue.Sender,
To = r.queue.Reciever,
Subject = r.queue.Subject,
ContentType = r.queue.ContentType,
Content = r.queue.Content,
Sender = r.queue.SenderType,
CreationDate = r.queue.CreationDate.Ticks,
ReplyTo = r.queue.ReplyTo,
AutoSubmitted = r.queue.AutoSubmitted
};
try
{
res.EmbeddedAttachments.AddRange(JsonConvert.DeserializeObject<RepeatedField<NotifyMessageAttachment>>(r.queue.Attachments));
}
catch (Exception)
{
}
return res;
});
}
var info = dbContext.NotifyInfo.Where(r => messages.Keys.Any(a => a == r.NotifyId)).ToList();
return res;
});
foreach (var i in info)
{
i.State = (int)MailSendingState.Sending;
}
dbContext.SaveChanges();
tx.Commit();
return messages;
}
}
public void ResetStates()
{
using var scope = ServiceProvider.CreateScope();
using var dbContext = scope.ServiceProvider.GetService<DbContextManager<NotifyDbContext>>().Get(dbid);
var tr = dbContext.Database.BeginTransaction();
var info = dbContext.NotifyInfo.Where(r => r.State == 1).ToList();
var info = dbContext.NotifyInfo.Where(r => messages.Keys.Any(a => a == r.NotifyId)).ToList();
foreach (var i in info)
{
i.State = 0;
i.State = (int)MailSendingState.Sending;
}
dbContext.SaveChanges();
tr.Commit();
}
public void SetState(int id, MailSendingState result)
{
using var scope = ServiceProvider.CreateScope();
using var dbContext = scope.ServiceProvider.GetService<DbContextManager<NotifyDbContext>>().Get(dbid);
using var tx = dbContext.Database.BeginTransaction();
if (result == MailSendingState.Sended)
{
var d = dbContext.NotifyInfo.Where(r => r.NotifyId == id).FirstOrDefault();
dbContext.NotifyInfo.Remove(d);
dbContext.SaveChanges();
}
else
{
if (result == MailSendingState.Error)
{
var attempts = dbContext.NotifyInfo.Where(r => r.NotifyId == id).Select(r => r.Attempts).FirstOrDefault();
if (NotifyServiceCfg.Process.MaxAttempts <= attempts + 1)
{
result = MailSendingState.FatalError;
}
}
var info = dbContext.NotifyInfo
.Where(r => r.NotifyId == id)
.ToList();
foreach (var i in info)
{
i.State = (int)result;
i.Attempts += 1;
i.ModifyDate = DateTime.UtcNow;
}
dbContext.SaveChanges();
}
tx.Commit();
return messages;
}
}
public static class DbWorkerExtension
public void ResetStates()
{
public static void Register(DIHelper services)
using var scope = _serviceScopeFactory.CreateScope();
using var dbContext = scope.ServiceProvider.GetService<DbContextManager<NotifyDbContext>>().Get(_dbid);
var tr = dbContext.Database.BeginTransaction();
var info = dbContext.NotifyInfo.Where(r => r.State == 1).ToList();
foreach (var i in info)
{
services.TryAdd<DbContextManager<NotifyDbContext>>();
i.State = 0;
}
dbContext.SaveChanges();
tr.Commit();
}
public void SetState(int id, MailSendingState result)
{
using var scope = _serviceScopeFactory.CreateScope();
using var dbContext = scope.ServiceProvider.GetService<DbContextManager<NotifyDbContext>>().Get(_dbid);
using var tx = dbContext.Database.BeginTransaction();
if (result == MailSendingState.Sended)
{
var d = dbContext.NotifyInfo.Where(r => r.NotifyId == id).FirstOrDefault();
dbContext.NotifyInfo.Remove(d);
dbContext.SaveChanges();
}
else
{
if (result == MailSendingState.Error)
{
var attempts = dbContext.NotifyInfo.Where(r => r.NotifyId == id).Select(r => r.Attempts).FirstOrDefault();
if (NotifyServiceCfg.Process.MaxAttempts <= attempts + 1)
{
result = MailSendingState.FatalError;
}
}
var info = dbContext.NotifyInfo
.Where(r => r.NotifyId == id)
.ToList();
foreach (var i in info)
{
i.State = (int)result;
i.Attempts += 1;
i.ModifyDate = DateTime.UtcNow;
}
dbContext.SaveChanges();
}
tx.Commit();
}
}
public static class DbWorkerExtension
{
public static void Register(DIHelper services)
{
services.TryAdd<DbContextManager<NotifyDbContext>>();
}
}

View File

@ -1,17 +1,12 @@
global using System;
global using System.Collections.Generic;
global using System.Reflection;
global using System.Data;
global using System.IO;
global using System.Linq;
global using System.Reflection;
global using System.Threading;
global using System.Threading.Tasks;
global using ASC.Api.Core;
global using ASC.Common;
global using ASC.Common.Caching;
global using ASC.Common.DependencyInjection;
global using ASC.Common.Logging;
global using ASC.Common.Mapping;
global using ASC.Common.Utils;
global using ASC.Core;
global using ASC.Core.Common.EF;
@ -19,23 +14,22 @@ global using ASC.Core.Common.EF.Context;
global using ASC.Core.Common.EF.Model;
global using ASC.Core.Common.Settings;
global using ASC.Core.Notify.Senders;
global using ASC.Notify.Services;
global using ASC.Notify.Config;
global using ASC.Notify.Messages;
global using ASC.Web.Core;
global using ASC.Web.Core.WhiteLabel;
global using ASC.Web.Studio.Core.Notify;
global using AutoMapper;
global using Autofac;
global using Autofac.Extensions.DependencyInjection;
global using Google.Protobuf.Collections;
global using Microsoft.AspNetCore.Hosting;
global using Microsoft.EntityFrameworkCore;
global using Microsoft.Extensions.Configuration;
global using Microsoft.Extensions.DependencyInjection;
global using Microsoft.Extensions.Hosting;
global using Microsoft.Extensions.Options;
global using Microsoft.EntityFrameworkCore;
global using Microsoft.Extensions.Hosting.WindowsServices;
global using Newtonsoft.Json;

View File

@ -24,14 +24,13 @@
*/
namespace ASC.Notify
namespace ASC.Notify;
public enum MailSendingState
{
public enum MailSendingState
{
NotSended,
Sending,
Sended,
Error,
FatalError,
};
}
NotSended,
Sending,
Sended,
Error,
FatalError,
};

View File

@ -1,104 +0,0 @@
/*
*
* (c) Copyright Ascensio System Limited 2010-2018
*
* This program is freeware. You can redistribute it and/or modify it under the terms of the GNU
* General Public License (GPL) version 3 as published by the Free Software Foundation (https://www.gnu.org/copyleft/gpl.html).
* In accordance with Section 7(a) of the GNU GPL its Section 15 shall be amended to the effect that
* Ascensio System SIA expressly excludes the warranty of non-infringement of any third-party rights.
*
* THIS PROGRAM IS DISTRIBUTED WITHOUT ANY WARRANTY; WITHOUT EVEN THE IMPLIED WARRANTY OF MERCHANTABILITY OR
* FITNESS FOR A PARTICULAR PURPOSE. For more details, see GNU GPL at https://www.gnu.org/copyleft/gpl.html
*
* You can contact Ascensio System SIA by email at sales@onlyoffice.com
*
* The interactive user interfaces in modified source and object code versions of ONLYOFFICE must display
* Appropriate Legal Notices, as required under Section 5 of the GNU GPL version 3.
*
* Pursuant to Section 7 § 3(b) of the GNU GPL you must retain the original ONLYOFFICE logo which contains
* relevant author attributions when distributing the software. If the display of the logo in its graphic
* form is not reasonably feasible for technical reasons, you must include the words "Powered by ONLYOFFICE"
* in every copy of the program you distribute.
* Pursuant to Section 7 § 3(e) we decline to grant you any rights under trademark law for use of our trademarks.
*
*/
namespace ASC.Notify
{
[Singletone]
public class NotifyCleaner : IDisposable
{
private readonly ILog log;
private readonly ManualResetEvent stop = new ManualResetEvent(false);
public NotifyServiceCfg NotifyServiceCfg { get; }
private IServiceProvider ServiceProvider { get; }
public CancellationTokenSource CancellationTokenSource { get; }
public NotifyCleaner(IOptions<NotifyServiceCfg> notifyServiceCfg, IServiceProvider serviceProvider, IOptionsMonitor<ILog> options)
{
log = options.Get("ASC.Notify");
NotifyServiceCfg = notifyServiceCfg.Value;
ServiceProvider = serviceProvider;
CancellationTokenSource = new CancellationTokenSource();
}
public void Start()
{
var t = new Task(Clear, CancellationTokenSource.Token, TaskCreationOptions.LongRunning);
t.Start(TaskScheduler.Default);
}
public void Stop()
{
stop.Set();
CancellationTokenSource.Cancel();
}
private void Clear()
{
while (true)
{
try
{
var date = DateTime.UtcNow.AddDays(-NotifyServiceCfg.StoreMessagesDays);
using var scope = ServiceProvider.CreateScope();
using var dbContext = scope.ServiceProvider.GetService<DbContextManager<NotifyDbContext>>().Get(NotifyServiceCfg.ConnectionStringName);
using var tx = dbContext.Database.BeginTransaction();
var info = dbContext.NotifyInfo.Where(r => r.ModifyDate < date && r.State == 4).ToList();
var queue = dbContext.NotifyQueue.Where(r => r.CreationDate < date).ToList();
dbContext.NotifyInfo.RemoveRange(info);
dbContext.NotifyQueue.RemoveRange(queue);
dbContext.SaveChanges();
tx.Commit();
log.InfoFormat("Clear notify messages: notify_info({0}), notify_queue ({1})", info.Count, queue.Count);
}
catch (ThreadAbortException)
{
// ignore
}
catch (Exception err)
{
log.Error(err);
}
if (stop.WaitOne(TimeSpan.FromHours(8)))
{
break;
}
}
}
public void Dispose()
{
if (CancellationTokenSource != null)
{
CancellationTokenSource.Dispose();
}
}
}
}

View File

@ -1,145 +0,0 @@
/*
*
* (c) Copyright Ascensio System Limited 2010-2018
*
* This program is freeware. You can redistribute it and/or modify it under the terms of the GNU
* General Public License (GPL) version 3 as published by the Free Software Foundation (https://www.gnu.org/copyleft/gpl.html).
* In accordance with Section 7(a) of the GNU GPL its Section 15 shall be amended to the effect that
* Ascensio System SIA expressly excludes the warranty of non-infringement of any third-party rights.
*
* THIS PROGRAM IS DISTRIBUTED WITHOUT ANY WARRANTY; WITHOUT EVEN THE IMPLIED WARRANTY OF MERCHANTABILITY OR
* FITNESS FOR A PARTICULAR PURPOSE. For more details, see GNU GPL at https://www.gnu.org/copyleft/gpl.html
*
* You can contact Ascensio System SIA by email at sales@onlyoffice.com
*
* The interactive user interfaces in modified source and object code versions of ONLYOFFICE must display
* Appropriate Legal Notices, as required under Section 5 of the GNU GPL version 3.
*
* Pursuant to Section 7 § 3(b) of the GNU GPL you must retain the original ONLYOFFICE logo which contains
* relevant author attributions when distributing the software. If the display of the logo in its graphic
* form is not reasonably feasible for technical reasons, you must include the words "Powered by ONLYOFFICE"
* in every copy of the program you distribute.
* Pursuant to Section 7 § 3(e) we decline to grant you any rights under trademark law for use of our trademarks.
*
*/
namespace ASC.Notify
{
[Singletone]
public class NotifySender : IDisposable
{
private readonly ILog log;
private readonly DbWorker db;
private CancellationTokenSource cancellationToken;
public NotifyServiceCfg NotifyServiceCfg { get; }
public NotifySender(IOptions<NotifyServiceCfg> notifyServiceCfg, DbWorker dbWorker, IOptionsMonitor<ILog> options)
{
log = options.CurrentValue;
NotifyServiceCfg = notifyServiceCfg.Value;
db = dbWorker;
}
public void StartSending()
{
db.ResetStates();
cancellationToken = new CancellationTokenSource();
var task = new Task(async () => await ThreadManagerWork(), cancellationToken.Token, TaskCreationOptions.LongRunning);
task.Start();
}
public void StopSending()
{
cancellationToken.Cancel();
}
private async Task ThreadManagerWork()
{
var tasks = new List<Task>(NotifyServiceCfg.Process.MaxThreads);
while (!cancellationToken.IsCancellationRequested)
{
try
{
if (tasks.Count < NotifyServiceCfg.Process.MaxThreads)
{
var messages = db.GetMessages(NotifyServiceCfg.Process.BufferSize);
if (messages.Count > 0)
{
var t = new Task(() => SendMessages(messages), cancellationToken.Token, TaskCreationOptions.LongRunning);
tasks.Add(t);
t.Start(TaskScheduler.Default);
}
else
{
await Task.Delay(5000);
}
}
else
{
await Task.WhenAny(tasks.ToArray()).ContinueWith(r => tasks.RemoveAll(a => a.IsCompleted));
}
}
catch (ThreadAbortException)
{
return;
}
catch (Exception e)
{
log.Error(e);
}
}
}
private void SendMessages(object messages)
{
try
{
foreach (var m in (IDictionary<int, NotifyMessage>)messages)
{
if (cancellationToken.IsCancellationRequested) return;
var result = MailSendingState.Sended;
try
{
var sender = NotifyServiceCfg.Senders.FirstOrDefault(r => r.Name == m.Value.Sender);
if (sender != null)
{
sender.NotifySender.Send(m.Value);
}
else
{
result = MailSendingState.FatalError;
}
log.DebugFormat("Notify #{0} has been sent.", m.Key);
}
catch (Exception e)
{
result = MailSendingState.FatalError;
log.Error(e);
}
db.SetState(m.Key, result);
}
}
catch (ThreadAbortException)
{
return;
}
catch (Exception e)
{
log.Error(e);
}
}
public void Dispose()
{
if (cancellationToken != null)
{
cancellationToken.Dispose();
}
}
}
}

View File

@ -1,137 +0,0 @@
/*
*
* (c) Copyright Ascensio System Limited 2010-2018
*
* This program is freeware. You can redistribute it and/or modify it under the terms of the GNU
* General Public License (GPL) version 3 as published by the Free Software Foundation (https://www.gnu.org/copyleft/gpl.html).
* In accordance with Section 7(a) of the GNU GPL its Section 15 shall be amended to the effect that
* Ascensio System SIA expressly excludes the warranty of non-infringement of any third-party rights.
*
* THIS PROGRAM IS DISTRIBUTED WITHOUT ANY WARRANTY; WITHOUT EVEN THE IMPLIED WARRANTY OF MERCHANTABILITY OR
* FITNESS FOR A PARTICULAR PURPOSE. For more details, see GNU GPL at https://www.gnu.org/copyleft/gpl.html
*
* You can contact Ascensio System SIA by email at sales@onlyoffice.com
*
* The interactive user interfaces in modified source and object code versions of ONLYOFFICE must display
* Appropriate Legal Notices, as required under Section 5 of the GNU GPL version 3.
*
* Pursuant to Section 7 § 3(b) of the GNU GPL you must retain the original ONLYOFFICE logo which contains
* relevant author attributions when distributing the software. If the display of the logo in its graphic
* form is not reasonably feasible for technical reasons, you must include the words "Powered by ONLYOFFICE"
* in every copy of the program you distribute.
* Pursuant to Section 7 § 3(e) we decline to grant you any rights under trademark law for use of our trademarks.
*
*/
namespace ASC.Notify
{
[Singletone(Additional = typeof(NotifyServiceExtension))]
public class NotifyService : INotifyService, IDisposable
{
private ILog Log { get; }
private ICacheNotify<NotifyMessage> CacheNotify { get; }
private ICacheNotify<NotifyInvoke> CacheInvoke { get; }
private DbWorker Db { get; }
private IServiceProvider ServiceProvider { get; }
public NotifyService(DbWorker db, IServiceProvider serviceProvider, ICacheNotify<NotifyMessage> cacheNotify, ICacheNotify<NotifyInvoke> cacheInvoke, IOptionsMonitor<ILog> options)
{
Db = db;
ServiceProvider = serviceProvider;
CacheNotify = cacheNotify;
CacheInvoke = cacheInvoke;
Log = options.CurrentValue;
}
public void Start()
{
CacheNotify.Subscribe((n) => SendNotifyMessage(n), Common.Caching.CacheNotifyAction.InsertOrUpdate);
CacheInvoke.Subscribe((n) => InvokeSendMethod(n), Common.Caching.CacheNotifyAction.InsertOrUpdate);
}
public void Stop()
{
CacheNotify.Unsubscribe(Common.Caching.CacheNotifyAction.InsertOrUpdate);
}
public void SendNotifyMessage(NotifyMessage notifyMessage)
{
try
{
Db.SaveMessage(notifyMessage);
}
catch (Exception e)
{
Log.Error(e);
}
}
public void InvokeSendMethod(NotifyInvoke notifyInvoke)
{
var service = notifyInvoke.Service;
var method = notifyInvoke.Method;
var tenant = notifyInvoke.Tenant;
var parameters = notifyInvoke.Parameters;
var serviceType = Type.GetType(service, true);
using var scope = ServiceProvider.CreateScope();
var instance = scope.ServiceProvider.GetService(serviceType);
if (instance == null)
{
throw new Exception("Service instance not found.");
}
var methodInfo = serviceType.GetMethod(method);
if (methodInfo == null)
{
throw new Exception("Method not found.");
}
var scopeClass = scope.ServiceProvider.GetService<NotifyServiceScope>();
var (tenantManager, tenantWhiteLabelSettingsHelper, settingsManager) = scopeClass;
tenantManager.SetCurrentTenant(tenant);
tenantWhiteLabelSettingsHelper.Apply(settingsManager.Load<TenantWhiteLabelSettings>(), tenant);
methodInfo.Invoke(instance, parameters.ToArray());
}
public void Dispose()
{
CacheNotify.Unsubscribe(Common.Caching.CacheNotifyAction.InsertOrUpdate);
CacheInvoke.Unsubscribe(Common.Caching.CacheNotifyAction.InsertOrUpdate);
}
}
[Scope]
public class NotifyServiceScope
{
private TenantManager TenantManager { get; }
private TenantWhiteLabelSettingsHelper TenantWhiteLabelSettingsHelper { get; }
private SettingsManager SettingsManager { get; }
public NotifyServiceScope(TenantManager tenantManager, TenantWhiteLabelSettingsHelper tenantWhiteLabelSettingsHelper, SettingsManager settingsManager)
{
TenantManager = tenantManager;
TenantWhiteLabelSettingsHelper = tenantWhiteLabelSettingsHelper;
SettingsManager = settingsManager;
}
public void Deconstruct(out TenantManager tenantManager, out TenantWhiteLabelSettingsHelper tenantWhiteLabelSettingsHelper, out SettingsManager settingsManager)
{
tenantManager = TenantManager;
tenantWhiteLabelSettingsHelper = TenantWhiteLabelSettingsHelper;
settingsManager = SettingsManager;
}
}
public static class NotifyServiceExtension
{
public static void Register(DIHelper services)
{
services.TryAdd<NotifyServiceScope>();
}
}
}

View File

@ -1,102 +0,0 @@
/*
*
* (c) Copyright Ascensio System Limited 2010-2018
*
* This program is freeware. You can redistribute it and/or modify it under the terms of the GNU
* General Public License (GPL) version 3 as published by the Free Software Foundation (https://www.gnu.org/copyleft/gpl.html).
* In accordance with Section 7(a) of the GNU GPL its Section 15 shall be amended to the effect that
* Ascensio System SIA expressly excludes the warranty of non-infringement of any third-party rights.
*
* THIS PROGRAM IS DISTRIBUTED WITHOUT ANY WARRANTY; WITHOUT EVEN THE IMPLIED WARRANTY OF MERCHANTABILITY OR
* FITNESS FOR A PARTICULAR PURPOSE. For more details, see GNU GPL at https://www.gnu.org/copyleft/gpl.html
*
* You can contact Ascensio System SIA by email at sales@onlyoffice.com
*
* The interactive user interfaces in modified source and object code versions of ONLYOFFICE must display
* Appropriate Legal Notices, as required under Section 5 of the GNU GPL version 3.
*
* Pursuant to Section 7 § 3(b) of the GNU GPL you must retain the original ONLYOFFICE logo which contains
* relevant author attributions when distributing the software. If the display of the logo in its graphic
* form is not reasonably feasible for technical reasons, you must include the words "Powered by ONLYOFFICE"
* in every copy of the program you distribute.
* Pursuant to Section 7 § 3(e) we decline to grant you any rights under trademark law for use of our trademarks.
*
*/
namespace ASC.Notify
{
[Singletone]
public class NotifyServiceLauncher : IHostedService
{
private NotifyServiceCfg NotifyServiceCfg { get; }
private NotifyService NotifyService { get; }
private NotifySender NotifySender { get; }
private NotifyCleaner NotifyCleaner { get; }
private WebItemManager WebItemManager { get; }
private IServiceProvider ServiceProvider { get; }
private NotifyConfiguration NotifyConfiguration { get; }
private ILog Log { get; }
public NotifyServiceLauncher(
IOptions<NotifyServiceCfg> notifyServiceCfg,
NotifySender notifySender,
NotifyService notifyService,
NotifyCleaner notifyCleaner,
WebItemManager webItemManager,
IServiceProvider serviceProvider,
NotifyConfiguration notifyConfiguration,
IOptionsMonitor<ILog> options)
{
NotifyServiceCfg = notifyServiceCfg.Value;
NotifyService = notifyService;
NotifySender = notifySender;
NotifyCleaner = notifyCleaner;
WebItemManager = webItemManager;
ServiceProvider = serviceProvider;
NotifyConfiguration = notifyConfiguration;
Log = options.Get("ASC.Notify");
}
public Task StartAsync(CancellationToken cancellationToken)
{
NotifyService.Start();
NotifySender.StartSending();
if (0 < NotifyServiceCfg.Schedulers.Count)
{
InitializeNotifySchedulers();
}
NotifyCleaner.Start();
return Task.CompletedTask;
}
public Task StopAsync(CancellationToken cancellationToken)
{
NotifyService.Stop();
if (NotifySender != null)
{
NotifySender.StopSending();
}
if (NotifyCleaner != null)
{
NotifyCleaner.Stop();
}
return Task.CompletedTask;
}
private void InitializeNotifySchedulers()
{
NotifyConfiguration.Configure();
foreach (var pair in NotifyServiceCfg.Schedulers.Where(r => r.MethodInfo != null))
{
Log.DebugFormat("Start scheduler {0} ({1})", pair.Name, pair.MethodInfo);
pair.MethodInfo.Invoke(null, null);
}
}
}
}

View File

@ -1,88 +1,99 @@
namespace ASC.Notify
var options = new WebApplicationOptions
{
public static class Program
Args = args,
ContentRootPath = WindowsServiceHelpers.IsWindowsService() ? AppContext.BaseDirectory : default
};
var builder = WebApplication.CreateBuilder(options);
var startup = new BaseWorkerStartup(builder.Configuration);
builder.Host.UseSystemd();
builder.Host.UseWindowsService();
builder.Host.UseServiceProviderFactory(new AutofacServiceProviderFactory());
builder.Host.ConfigureAppConfiguration((hostContext, config) =>
{
var buildedConfig = config.Build();
var path = buildedConfig["pathToConf"];
if (!Path.IsPathRooted(path))
{
public async static Task Main(string[] args)
{
var host = CreateHostBuilder(args).Build();
await host.RunAsync();
}
public static IHostBuilder CreateHostBuilder(string[] args) =>
Host.CreateDefaultBuilder(args)
.UseSystemd()
.UseWindowsService()
.UseServiceProviderFactory(new AutofacServiceProviderFactory())
.ConfigureWebHostDefaults(webBuilder => webBuilder.UseStartup<BaseWorkerStartup>())
.ConfigureAppConfiguration((hostContext, config) =>
{
var buided = config.Build();
var path = buided["pathToConf"];
if (!Path.IsPathRooted(path))
{
path = Path.GetFullPath(CrossPlatform.PathCombine(hostContext.HostingEnvironment.ContentRootPath, path));
}
config.SetBasePath(path);
var env = hostContext.Configuration.GetValue("ENVIRONMENT", "Production");
config
.AddJsonFile("appsettings.json")
.AddJsonFile($"appsettings.{env}.json", true)
.AddJsonFile($"appsettings.services.json", true)
.AddJsonFile("storage.json")
.AddJsonFile("notify.json")
.AddJsonFile($"notify.{env}.json", true)
.AddJsonFile("kafka.json")
.AddJsonFile($"kafka.{env}.json", true)
.AddJsonFile("redis.json")
.AddJsonFile($"redis.{env}.json", true)
.AddEnvironmentVariables()
.AddCommandLine(args)
.AddInMemoryCollection(new Dictionary<string, string>
{
{"pathToConf", path }
}
);
})
.ConfigureServices((hostContext, services) =>
{
services.AddMemoryCache();
var diHelper = new DIHelper(services);
var redisConfiguration = hostContext.Configuration.GetSection("Redis").Get<RedisConfiguration>();
var kafkaConfiguration = hostContext.Configuration.GetSection("kafka").Get<KafkaSettings>();
if (kafkaConfiguration != null)
{
diHelper.TryAdd(typeof(ICacheNotify<>), typeof(KafkaCacheNotify<>));
}
else if (redisConfiguration != null)
{
diHelper.TryAdd(typeof(ICacheNotify<>), typeof(RedisCacheNotify<>));
services.AddStackExchangeRedisExtensions<NewtonsoftSerializer>(redisConfiguration);
}
else
{
diHelper.TryAdd(typeof(ICacheNotify<>), typeof(MemoryCacheNotify<>));
}
diHelper.RegisterProducts(hostContext.Configuration, hostContext.HostingEnvironment.ContentRootPath);
services.Configure<NotifyServiceCfg>(hostContext.Configuration.GetSection("notify"));
diHelper.TryAdd<NotifyServiceLauncher>();
diHelper.TryAdd<JabberSender>();
diHelper.TryAdd<SmtpSender>();
diHelper.TryAdd<AWSSender>(); // fix private
services.AddHostedService<NotifyServiceLauncher>();
})
.ConfigureContainer<ContainerBuilder>((context, builder) =>
{
builder.Register(context.Configuration);
})
.ConfigureNLogLogging();
path = Path.GetFullPath(CrossPlatform.PathCombine(hostContext.HostingEnvironment.ContentRootPath, path));
}
}
config.SetBasePath(path);
var env = hostContext.Configuration.GetValue("ENVIRONMENT", "Production");
config.AddJsonFile("appsettings.json")
.AddJsonFile($"appsettings.{env}.json", true)
.AddJsonFile($"appsettings.services.json", true)
.AddJsonFile("storage.json")
.AddJsonFile("notify.json")
.AddJsonFile($"notify.{env}.json", true)
.AddJsonFile("kafka.json")
.AddJsonFile($"kafka.{env}.json", true)
.AddJsonFile("redis.json")
.AddJsonFile($"redis.{env}.json", true)
.AddEnvironmentVariables()
.AddCommandLine(args)
.AddInMemoryCollection(new Dictionary<string, string>
{
{"pathToConf", path }
});
});
startup.ConfigureServices(builder.Services);
builder.Host.ConfigureServices((hostContext, services) =>
{
services.AddMemoryCache();
var diHelper = new DIHelper(services);
var redisConfiguration = hostContext.Configuration.GetSection("Redis").Get<RedisConfiguration>();
var kafkaConfiguration = hostContext.Configuration.GetSection("kafka").Get<KafkaSettings>();
if (kafkaConfiguration != null)
{
diHelper.TryAdd(typeof(ICacheNotify<>), typeof(KafkaCacheNotify<>));
}
else if (redisConfiguration != null)
{
diHelper.TryAdd(typeof(ICacheNotify<>), typeof(RedisCacheNotify<>));
services.AddStackExchangeRedisExtensions<NewtonsoftSerializer>(redisConfiguration);
}
else
{
diHelper.TryAdd(typeof(ICacheNotify<>), typeof(MemoryCacheNotify<>));
}
diHelper.RegisterProducts(hostContext.Configuration, hostContext.HostingEnvironment.ContentRootPath);
services.Configure<NotifyServiceCfg>(hostContext.Configuration.GetSection("notify"));
diHelper.TryAdd<NotifyService>();
diHelper.TryAdd<NotifySenderService>();
diHelper.TryAdd<NotifyCleanerService>();
diHelper.TryAdd<TenantManager>();
diHelper.TryAdd<TenantWhiteLabelSettingsHelper>();
diHelper.TryAdd<SettingsManager>();
diHelper.TryAdd<JabberSender>();
diHelper.TryAdd<SmtpSender>();
diHelper.TryAdd<AWSSender>(); // fix private
services.AddAutoMapper(Assembly.GetAssembly(typeof(MappingProfile)));
services.AddHostedService<NotifyService>();
services.AddHostedService<NotifySenderService>();
services.AddHostedService<NotifyCleanerService>();
});
builder.Host.ConfigureContainer<ContainerBuilder>((context, builder) =>
{
builder.Register(context.Configuration);
});
builder.Host.ConfigureNLogLogging();
var app = builder.Build();
startup.Configure(app);
await app.RunAsync();

View File

@ -0,0 +1,62 @@
namespace ASC.Notify.Services;
[Singletone]
public class NotifyCleanerService : BackgroundService
{
private readonly ILog _logger;
private readonly NotifyServiceCfg _notifyServiceCfg;
private readonly IServiceScopeFactory _serviceScopeFactory;
private readonly TimeSpan _waitingPeriod = TimeSpan.FromHours(8);
public NotifyCleanerService(IOptions<NotifyServiceCfg> notifyServiceCfg, IServiceScopeFactory serviceScopeFactory, IOptionsMonitor<ILog> options)
{
_logger = options.Get("ASC.NotifyCleaner");
_notifyServiceCfg = notifyServiceCfg.Value;
_serviceScopeFactory = serviceScopeFactory;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
_logger.Info("Notify Cleaner Service running.");
while (!stoppingToken.IsCancellationRequested)
{
Clear();
await Task.Delay(_waitingPeriod, stoppingToken);
}
_logger.Info("Notify Cleaner Service is stopping.");
}
private void Clear()
{
try
{
var date = DateTime.UtcNow.AddDays(-_notifyServiceCfg.StoreMessagesDays);
using var scope = _serviceScopeFactory.CreateScope();
using var dbContext = scope.ServiceProvider.GetService<DbContextManager<NotifyDbContext>>().Get(_notifyServiceCfg.ConnectionStringName);
using var tx = dbContext.Database.BeginTransaction();
var info = dbContext.NotifyInfo.Where(r => r.ModifyDate < date && r.State == 4).ToList();
var queue = dbContext.NotifyQueue.Where(r => r.CreationDate < date).ToList();
dbContext.NotifyInfo.RemoveRange(info);
dbContext.NotifyQueue.RemoveRange(queue);
dbContext.SaveChanges();
tx.Commit();
_logger.InfoFormat("Clear notify messages: notify_info({0}), notify_queue ({1})", info.Count, queue.Count);
}
catch (ThreadAbortException)
{
// ignore
}
catch (Exception err)
{
_logger.Error(err);
}
}
}

View File

@ -0,0 +1,111 @@
namespace ASC.Notify.Services;
[Singletone]
public class NotifySenderService : BackgroundService
{
private readonly DbWorker _db;
private readonly ILog _logger;
private readonly NotifyServiceCfg _notifyServiceCfg;
public NotifySenderService(
IOptions<NotifyServiceCfg> notifyServiceCfg,
DbWorker dbWorker,
IOptionsMonitor<ILog> options)
{
_logger = options.Get("ASC.NotifySender");
_notifyServiceCfg = notifyServiceCfg.Value;
_db = dbWorker;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
_logger.Info("Notify Sender Service running.");
while (!stoppingToken.IsCancellationRequested)
{
await ThreadManagerWork(stoppingToken);
}
_logger.Info("Notify Sender Service is stopping.");
}
private async Task ThreadManagerWork(CancellationToken stoppingToken)
{
var tasks = new List<Task>(_notifyServiceCfg.Process.MaxThreads);
try
{
if (tasks.Count < _notifyServiceCfg.Process.MaxThreads)
{
var messages = _db.GetMessages(_notifyServiceCfg.Process.BufferSize);
if (messages.Count > 0)
{
var t = new Task(() => SendMessages(messages, stoppingToken), stoppingToken, TaskCreationOptions.LongRunning);
tasks.Add(t);
t.Start(TaskScheduler.Default);
}
else
{
await Task.Delay(5000, stoppingToken);
}
}
else
{
await Task.WhenAny(tasks.ToArray()).ContinueWith(r => tasks.RemoveAll(a => a.IsCompleted));
}
}
catch (ThreadAbortException)
{
return;
}
catch (Exception e)
{
_logger.Error(e);
}
}
private void SendMessages(object messages, CancellationToken stoppingToken)
{
try
{
foreach (var m in (IDictionary<int, NotifyMessage>)messages)
{
if (stoppingToken.IsCancellationRequested)
{
return;
}
var result = MailSendingState.Sended;
try
{
var sender = _notifyServiceCfg.Senders.FirstOrDefault(r => r.Name == m.Value.SenderType);
if (sender != null)
{
sender.NotifySender.Send(m.Value);
}
else
{
result = MailSendingState.FatalError;
}
_logger.DebugFormat("Notify #{0} has been sent.", m.Key);
}
catch (Exception e)
{
result = MailSendingState.FatalError;
_logger.Error(e);
}
_db.SetState(m.Key, result);
}
}
catch (ThreadAbortException)
{
return;
}
catch (Exception e)
{
_logger.Error(e);
}
}
}

View File

@ -0,0 +1,110 @@
namespace ASC.Notify.Services;
[Singletone]
public class NotifyService : IHostedService
{
private readonly DbWorker _db;
private readonly ICacheNotify<NotifyInvoke> _cacheInvoke;
private readonly ICacheNotify<NotifyMessage> _cacheNotify;
private readonly ILog _logger;
private readonly IServiceScopeFactory _serviceScopeFactory;
private readonly NotifyConfiguration _notifyConfiguration;
private readonly NotifyServiceCfg _notifyServiceCfg;
public NotifyService(
IOptions<NotifyServiceCfg> notifyServiceCfg,
DbWorker db,
ICacheNotify<NotifyInvoke> cacheInvoke,
ICacheNotify<NotifyMessage> cacheNotify,
IOptionsMonitor<ILog> options,
IServiceScopeFactory serviceScopeFactory,
NotifyConfiguration notifyConfiguration)
{
_cacheInvoke = cacheInvoke;
_cacheNotify = cacheNotify;
_db = db;
_logger = options.Get("ASC.NotifyService");
_notifyConfiguration = notifyConfiguration;
_notifyServiceCfg = notifyServiceCfg.Value;
_serviceScopeFactory = serviceScopeFactory;
}
public Task StartAsync(CancellationToken cancellationToken)
{
_logger.Info("Notify Service running.");
_cacheNotify.Subscribe((n) => SendNotifyMessage(n), CacheNotifyAction.InsertOrUpdate);
_cacheInvoke.Subscribe((n) => InvokeSendMethod(n), CacheNotifyAction.InsertOrUpdate);
if (0 < _notifyServiceCfg.Schedulers.Count)
{
InitializeNotifySchedulers();
}
return Task.CompletedTask;
}
public Task StopAsync(CancellationToken cancellationToken)
{
_logger.Info("Notify Service is stopping.");
_cacheNotify.Unsubscribe(CacheNotifyAction.InsertOrUpdate);
_cacheInvoke.Unsubscribe(CacheNotifyAction.InsertOrUpdate);
return Task.CompletedTask;
}
private void SendNotifyMessage(NotifyMessage notifyMessage)
{
try
{
_db.SaveMessage(notifyMessage);
}
catch (Exception e)
{
_logger.Error(e);
}
}
private void InvokeSendMethod(NotifyInvoke notifyInvoke)
{
var service = notifyInvoke.Service;
var method = notifyInvoke.Method;
var tenant = notifyInvoke.Tenant;
var parameters = notifyInvoke.Parameters;
var serviceType = Type.GetType(service, true);
using var scope = _serviceScopeFactory.CreateScope();
var instance = scope.ServiceProvider.GetService(serviceType);
if (instance == null)
{
throw new Exception("Service instance not found.");
}
var methodInfo = serviceType.GetMethod(method);
if (methodInfo == null)
{
throw new Exception("Method not found.");
}
var tenantManager = scope.ServiceProvider.GetService<TenantManager>();
var tenantWhiteLabelSettingsHelper = scope.ServiceProvider.GetService<TenantWhiteLabelSettingsHelper>();
var settingsManager = scope.ServiceProvider.GetService<SettingsManager>();
tenantManager.SetCurrentTenant(tenant);
tenantWhiteLabelSettingsHelper.Apply(settingsManager.Load<TenantWhiteLabelSettings>(), tenant);
methodInfo.Invoke(instance, parameters.ToArray());
}
private void InitializeNotifySchedulers()
{
_notifyConfiguration.Configure();
foreach (var pair in _notifyServiceCfg.Schedulers.Where(r => r.MethodInfo != null))
{
_logger.DebugFormat("Start scheduler {0} ({1})", pair.Name, pair.MethodInfo);
pair.MethodInfo.Invoke(null, null);
}
}
}

View File

@ -44,8 +44,8 @@ namespace ASC.TelegramService
public Task SendMessage(NotifyMessage msg)
{
if (string.IsNullOrEmpty(msg.To)) return Task.CompletedTask;
if (!Clients.ContainsKey(msg.Tenant)) return Task.CompletedTask;
if (string.IsNullOrEmpty(msg.Reciever)) return Task.CompletedTask;
if (!Clients.ContainsKey(msg.TenantId)) return Task.CompletedTask;
return InternalSendMessage(msg);
}
@ -55,15 +55,15 @@ namespace ASC.TelegramService
var scope = ServiceProvider.CreateScope();
var cachedTelegramDao = scope.ServiceProvider.GetService<IOptionsSnapshot<CachedTelegramDao>>().Value;
var client = Clients[msg.Tenant].Client;
var client = Clients[msg.TenantId].Client;
try
{
var tgUser = cachedTelegramDao.GetUser(Guid.Parse(msg.To), msg.Tenant);
var tgUser = cachedTelegramDao.GetUser(Guid.Parse(msg.Reciever), msg.TenantId);
if (tgUser == null)
{
Log.DebugFormat("Couldn't find telegramId for user '{0}'", msg.To);
Log.DebugFormat("Couldn't find telegramId for user '{0}'", msg.Reciever);
return;
}
@ -72,7 +72,7 @@ namespace ASC.TelegramService
}
catch (Exception e)
{
Log.DebugFormat("Couldn't send message for user '{0}' got an '{1}'", msg.To, e.Message);
Log.DebugFormat("Couldn't send message for user '{0}' got an '{1}'", msg.Reciever, e.Message);
}
}