NotifyDbContext- async

This commit is contained in:
Anton Suhorukov 2023-03-17 11:32:55 +03:00
parent 603fc8cc0f
commit 7d6feae7cd
16 changed files with 84 additions and 106 deletions

View File

@ -50,7 +50,7 @@ public class EmailSenderSink : Sink
{
await using var scope = _serviceProvider.CreateAsyncScope();
var m = scope.ServiceProvider.GetRequiredService<EmailSenderSinkMessageCreator>().CreateNotifyMessage(message, _senderName);
var result = await _sender.Send(m);
var result = await _sender.SendAsync(m);
responce.Result = result switch
{

View File

@ -53,7 +53,7 @@ class JabberSenderSink : Sink
}
else
{
await _sender.Send(m);
await _sender.SendAsync(m);
}
return new SendResponse(message, _senderName, result);

View File

@ -104,7 +104,7 @@ public class FirebaseHelper
Body = msg.Content
}
};
FirebaseAdminMessaging.FirebaseMessaging.DefaultInstance.SendAsync(m);
await FirebaseAdminMessaging.FirebaseMessaging.DefaultInstance.SendAsync(m);
}
}
}

View File

@ -58,7 +58,7 @@ class PushSenderSink : Sink
}
else
{
await _sender.Send(m);
await _sender.SendAsync(m);
}
return new SendResponse(message, Constants.NotifyPushSenderSysName, result);

View File

@ -56,7 +56,7 @@ public class AWSSender : SmtpSender, IDisposable
_lastRefresh = DateTime.UtcNow - _refreshTimeout; //set to refresh on first send
}
public override async Task<NoticeSendResult> Send(NotifyMessage m)
public override async Task<NoticeSendResult> SendAsync(NotifyMessage m)
{
NoticeSendResult result;
try
@ -71,7 +71,7 @@ public class AWSSender : SmtpSender, IDisposable
var configuration = scope.ServiceProvider.GetService<CoreConfiguration>();
if (!configuration.SmtpSettings.IsDefaultSettings)
{
result = await base.Send(m);
result = await base.SendAsync(m);
}
else
{
@ -106,7 +106,7 @@ public class AWSSender : SmtpSender, IDisposable
if (result == NoticeSendResult.MessageIncorrect || result == NoticeSendResult.SendingImpossible)
{
_logger.DebugAmazonSendingFailed(result);
result = await base.Send(m);
result = await base.SendAsync(m);
}
return result;

View File

@ -29,5 +29,5 @@ namespace ASC.Core.Notify.Senders;
public interface INotifySender
{
void Init(IDictionary<string, string> properties);
Task<NoticeSendResult> Send(NotifyMessage m);
Task<NoticeSendResult> SendAsync(NotifyMessage m);
}

View File

@ -40,7 +40,7 @@ public class JabberSender : INotifySender
public void Init(IDictionary<string, string> properties) { }
public Task<NoticeSendResult> Send(NotifyMessage m)
public Task<NoticeSendResult> SendAsync(NotifyMessage m)
{
var text = m.Content;
if (!string.IsNullOrEmpty(text))

View File

@ -38,7 +38,7 @@ public class NotifyServiceSender : INotifySender
public void Init(IDictionary<string, string> properties) { }
public Task<NoticeSendResult> Send(NotifyMessage m)
public Task<NoticeSendResult> SendAsync(NotifyMessage m)
{
_notifyServiceClient.SendNotifyMessage(m);

View File

@ -41,7 +41,7 @@ public class PushSender : INotifySender
public void Init(IDictionary<string, string> properties) { }
public async Task<NoticeSendResult> Send(NotifyMessage m)
public async Task<NoticeSendResult> SendAsync(NotifyMessage m)
{
if (!string.IsNullOrEmpty(m.Content))
{

View File

@ -57,7 +57,7 @@ public class SmtpSender : INotifySender
_initProperties = properties;
}
public virtual Task<NoticeSendResult> Send(NotifyMessage m)
public virtual Task<NoticeSendResult> SendAsync(NotifyMessage m)
{
using var scope = _serviceProvider.CreateScope();
var tenantManager = scope.ServiceProvider.GetService<TenantManager>();

View File

@ -41,7 +41,7 @@ public class TelegramSender : INotifySender
public void Init(IDictionary<string, string> properties) { }
public async Task<NoticeSendResult> Send(NotifyMessage m)
public async Task<NoticeSendResult> SendAsync(NotifyMessage m)
{
if (!string.IsNullOrEmpty(m.Content))
{

View File

@ -47,7 +47,7 @@ class TelegramSenderSink : Sink
await using var scope = _serviceProvider.CreateAsyncScope();
var m = scope.ServiceProvider.GetRequiredService<TelegramSenderSinkMessageCreator>().CreateNotifyMessage(message, _senderName);
await _sender.Send(m);
await _sender.SendAsync(m);
return new SendResponse(message, _senderName, result);
}

View File

@ -27,9 +27,9 @@
namespace ASC.Notify;
[Singletone]
public class DbWorker
public class DbWorker : IDisposable
{
private readonly object _syncRoot = new object();
private readonly SemaphoreSlim _semaphore = new SemaphoreSlim(1);
private readonly IServiceScopeFactory _serviceScopeFactory;
private readonly NotifyServiceCfg _notifyServiceCfg;
@ -40,24 +40,24 @@ public class DbWorker
_notifyServiceCfg = notifyServiceCfg.Value;
}
public int SaveMessage(NotifyMessage m)
public async Task SaveMessageAsync(NotifyMessage m)
{
using var scope = _serviceScopeFactory.CreateScope();
var _mapper = scope.ServiceProvider.GetRequiredService<IMapper>();
using var dbContext = scope.ServiceProvider.GetService<IDbContextFactory<NotifyDbContext>>().CreateDbContext();
using var dbContext = await scope.ServiceProvider.GetService<IDbContextFactory<NotifyDbContext>>().CreateDbContextAsync();
var strategy = dbContext.Database.CreateExecutionStrategy();
strategy.Execute(() =>
await strategy.ExecuteAsync(async () =>
{
using var tx = dbContext.Database.BeginTransaction(IsolationLevel.ReadCommitted);
using var tx = await dbContext.Database.BeginTransactionAsync(IsolationLevel.ReadCommitted);
var notifyQueue = _mapper.Map<NotifyMessage, NotifyQueue>(m);
notifyQueue = dbContext.NotifyQueue.Add(notifyQueue).Entity;
dbContext.SaveChanges();
notifyQueue = (await dbContext.NotifyQueue.AddAsync(notifyQueue)).Entity;
await dbContext.SaveChangesAsync();
var id = notifyQueue.NotifyId;
@ -70,24 +70,23 @@ public class DbWorker
Priority = m.Priority
};
dbContext.NotifyInfo.Add(info);
dbContext.SaveChanges();
await dbContext.NotifyInfo.AddAsync(info);
await dbContext.SaveChangesAsync();
tx.Commit();
await tx.CommitAsync();
});
return 1;
}
public IDictionary<int, NotifyMessage> GetMessages(int count)
public async Task<IDictionary<int, NotifyMessage>> GetMessagesAsync(int count)
{
lock (_syncRoot)
try
{
await _semaphore.WaitAsync();
using var scope = _serviceScopeFactory.CreateScope();
var _mapper = scope.ServiceProvider.GetRequiredService<IMapper>();
using var dbContext = scope.ServiceProvider.GetService<IDbContextFactory<NotifyDbContext>>().CreateDbContext();
using var dbContext = await scope.ServiceProvider.GetService<IDbContextFactory<NotifyDbContext>>().CreateDbContextAsync();
var q = dbContext.NotifyQueue
.Join(dbContext.NotifyInfo, r => r.NotifyId, r => r.NotifyId, (queue, info) => new { queue, info })
@ -97,8 +96,8 @@ public class DbWorker
.Take(count);
var messages = q
.ToDictionary(
var messages = await q
.ToDictionaryAsync(
r => r.queue.NotifyId,
r =>
{
@ -116,92 +115,71 @@ public class DbWorker
return res;
});
var strategy = dbContext.Database.CreateExecutionStrategy();
strategy.Execute(() =>
{
using var tx = dbContext.Database.BeginTransaction();
var info = dbContext.NotifyInfo.Where(r => messages.Keys.Any(a => a == r.NotifyId)).ToList();
foreach (var i in info)
{
i.State = (int)MailSendingState.Sending;
}
dbContext.SaveChanges();
tx.Commit();
});
await dbContext.NotifyInfo.Where(r => messages.Keys.Any(a => a == r.NotifyId)).ExecuteUpdateAsync(q=> q.SetProperty(p => p.State, (int)MailSendingState.Sending));
return messages;
}
}
public void ResetStates()
{
using var scope = _serviceScopeFactory.CreateScope();
using var dbContext = scope.ServiceProvider.GetService<IDbContextFactory<NotifyDbContext>>().CreateDbContext();
var strategy = dbContext.Database.CreateExecutionStrategy();
strategy.Execute(() =>
catch
{
var tr = dbContext.Database.BeginTransaction();
var info = dbContext.NotifyInfo.Where(r => r.State == 1).ToList();
foreach (var i in info)
{
i.State = 0;
}
dbContext.SaveChanges();
tr.Commit();
});
throw;
}
finally
{
_semaphore.Release();
}
}
public void SetState(int id, MailSendingState result)
public async Task ResetStatesAsync()
{
using var scope = _serviceScopeFactory.CreateScope();
using var dbContext = scope.ServiceProvider.GetService<IDbContextFactory<NotifyDbContext>>().CreateDbContext();
using var dbContext = await scope.ServiceProvider.GetService<IDbContextFactory<NotifyDbContext>>().CreateDbContextAsync();
await dbContext.NotifyInfo.Where(r => r.State == 1).ExecuteUpdateAsync(q => q.SetProperty(p => p.State, 0));
}
public async Task SetStateAsync(int id, MailSendingState result)
{
using var scope = _serviceScopeFactory.CreateScope();
using var dbContext = await scope.ServiceProvider.GetService<IDbContextFactory<NotifyDbContext>>().CreateDbContextAsync();
var strategy = dbContext.Database.CreateExecutionStrategy();
strategy.Execute(() =>
await strategy.ExecuteAsync(async () =>
{
using var scope = _serviceScopeFactory.CreateScope();
using var dbContext = scope.ServiceProvider.GetService<IDbContextFactory<NotifyDbContext>>().CreateDbContext();
using var tx = dbContext.Database.BeginTransaction();
using var tx = await dbContext.Database.BeginTransactionAsync();
if (result == MailSendingState.Sended)
{
var d = dbContext.NotifyInfo.Where(r => r.NotifyId == id).FirstOrDefault();
var d = await dbContext.NotifyInfo.Where(r => r.NotifyId == id).FirstOrDefaultAsync();
dbContext.NotifyInfo.Remove(d);
dbContext.SaveChanges();
await dbContext.SaveChangesAsync();
}
else
{
if (result == MailSendingState.Error)
{
var attempts = dbContext.NotifyInfo.Where(r => r.NotifyId == id).Select(r => r.Attempts).FirstOrDefault();
var attempts = await dbContext.NotifyInfo.Where(r => r.NotifyId == id).Select(r => r.Attempts).FirstOrDefaultAsync();
if (_notifyServiceCfg.Process.MaxAttempts <= attempts + 1)
{
result = MailSendingState.FatalError;
}
}
var info = dbContext.NotifyInfo
.Where(r => r.NotifyId == id)
.ToList();
await dbContext.NotifyInfo.Where(r => r.NotifyId == id)
.ExecuteUpdateAsync(q =>
q.SetProperty(p => p.State, (int)result)
.SetProperty(p => p.Attempts, p => p.Attempts + 1)
.SetProperty(p => p.ModifyDate, DateTime.UtcNow));
foreach (var i in info)
{
i.State = (int)result;
i.Attempts += 1;
i.ModifyDate = DateTime.UtcNow;
}
dbContext.SaveChanges();
await dbContext.SaveChangesAsync();
}
tx.Commit();
await tx.CommitAsync();
});
}
public void Dispose()
{
_semaphore.Dispose();
}
}

View File

@ -40,11 +40,11 @@ public class NotifySendMessageRequestedIntegrationEventHandler : IIntegrationEve
_db = db;
}
private void SendNotifyMessage(NotifyMessage notifyMessage)
private async Task SendNotifyMessageAsync(NotifyMessage notifyMessage)
{
try
{
_db.SaveMessage(notifyMessage);
await _db.SaveMessageAsync(notifyMessage);
}
catch (Exception e)
{
@ -58,7 +58,7 @@ public class NotifySendMessageRequestedIntegrationEventHandler : IIntegrationEve
{
_logger.InformationHandlingIntegrationEvent(@event.Id, Program.AppName, @event);
SendNotifyMessage(@event.NotifyMessage);
await SendNotifyMessageAsync(@event.NotifyMessage);
await Task.CompletedTask;
}

View File

@ -62,7 +62,7 @@ public class NotifyCleanerService : BackgroundService
continue;
}
Clear();
await ClearAsync();
await Task.Delay(_waitingPeriod, stoppingToken);
}
@ -70,24 +70,24 @@ public class NotifyCleanerService : BackgroundService
_logger.InformationNotifyCleanerStopping();
}
private void Clear()
private async Task ClearAsync()
{
try
{
var date = DateTime.UtcNow.AddDays(-_notifyServiceCfg.StoreMessagesDays);
using var scope = _scopeFactory.CreateScope();
using var dbContext = scope.ServiceProvider.GetService<IDbContextFactory<NotifyDbContext>>().CreateDbContext();
using var dbContext = await scope.ServiceProvider.GetService<IDbContextFactory<NotifyDbContext>>().CreateDbContextAsync();
var strategy = dbContext.Database.CreateExecutionStrategy();
strategy.Execute(() =>
await strategy.ExecuteAsync(async () =>
{
using var tx = dbContext.Database.BeginTransaction();
using var tx = await dbContext.Database.BeginTransactionAsync();
var infoCount = dbContext.NotifyInfo.Where(r => r.ModifyDate < date && r.State == 4).ExecuteDelete();
var queueCount = dbContext.NotifyQueue.Where(r => r.CreationDate < date).ExecuteDelete();
tx.Commit();
var infoCount = await dbContext.NotifyInfo.Where(r => r.ModifyDate < date && r.State == 4).ExecuteDeleteAsync();
var queueCount = await dbContext.NotifyQueue.Where(r => r.CreationDate < date).ExecuteDeleteAsync();
await tx.CommitAsync();
_logger.InformationClearNotifyMessages(infoCount, queueCount);
});

View File

@ -75,7 +75,7 @@ public class NotifySenderService : BackgroundService
continue;
}
await ThreadManagerWork(stoppingToken);
await ThreadManagerWorkAsync(stoppingToken);
}
_logger.InformationNotifySenderStopping();
@ -92,7 +92,7 @@ public class NotifySenderService : BackgroundService
}
}
private async Task ThreadManagerWork(CancellationToken stoppingToken)
private async Task ThreadManagerWorkAsync(CancellationToken stoppingToken)
{
var tasks = new List<Task>(_notifyServiceCfg.Process.MaxThreads);
@ -100,10 +100,10 @@ public class NotifySenderService : BackgroundService
{
if (tasks.Count < _notifyServiceCfg.Process.MaxThreads)
{
var messages = _db.GetMessages(_notifyServiceCfg.Process.BufferSize);
var messages = await _db.GetMessagesAsync(_notifyServiceCfg.Process.BufferSize);
if (messages.Count > 0)
{
var t = new Task(() => SendMessages(messages, stoppingToken), stoppingToken, TaskCreationOptions.LongRunning);
var t = new Task(async () => await SendMessagesAsync(messages, stoppingToken), stoppingToken, TaskCreationOptions.LongRunning);
tasks.Add(t);
t.Start(TaskScheduler.Default);
}
@ -127,7 +127,7 @@ public class NotifySenderService : BackgroundService
}
}
private void SendMessages(object messages, CancellationToken stoppingToken)
private async Task SendMessagesAsync(object messages, CancellationToken stoppingToken)
{
try
{
@ -144,7 +144,7 @@ public class NotifySenderService : BackgroundService
var sender = _notifyServiceCfg.Senders.FirstOrDefault(r => r.Name == m.Value.SenderType);
if (sender != null)
{
sender.NotifySender.Send(m.Value);
await sender.NotifySender.SendAsync(m.Value);
}
else
{
@ -159,7 +159,7 @@ public class NotifySenderService : BackgroundService
_logger.ErrorWithException(e);
}
_db.SetState(m.Key, result);
await _db.SetStateAsync(m.Key, result);
}
}
catch (ThreadAbortException)