first version: backup s3 tar

This commit is contained in:
Anton Suhorukov 2023-07-18 16:21:44 +03:00
parent a42d2f8b6c
commit 162b3cf36d
15 changed files with 485 additions and 659 deletions

View File

@ -1,237 +0,0 @@
// (c) Copyright Ascensio System SIA 2010-2022
//
// This program is a free software product.
// You can redistribute it and/or modify it under the terms
// of the GNU Affero General Public License (AGPL) version 3 as published by the Free Software
// Foundation. In accordance with Section 7(a) of the GNU AGPL its Section 15 shall be amended
// to the effect that Ascensio System SIA expressly excludes the warranty of non-infringement of
// any third-party rights.
//
// This program is distributed WITHOUT ANY WARRANTY, without even the implied warranty
// of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. For details, see
// the GNU AGPL at: http://www.gnu.org/licenses/agpl-3.0.html
//
// You can contact Ascensio System SIA at Lubanas st. 125a-25, Riga, Latvia, EU, LV-1021.
//
// The interactive user interfaces in modified source and object code versions of the Program must
// display Appropriate Legal Notices, as required under Section 5 of the GNU AGPL version 3.
//
// Pursuant to Section 7(b) of the License you must retain the original Product logo when
// distributing the program. Pursuant to Section 7(e) we decline to grant you any rights under
// trademark law for use of our trademarks.
//
// All the Product's GUI elements, including illustrations and icon sets, as well as technical writing
// content are licensed under the terms of the Creative Commons Attribution-ShareAlike 4.0
// International. See the License terms at http://creativecommons.org/licenses/by-sa/4.0/legalcode
using ConfigurationManager = System.Configuration.ConfigurationManager;
namespace ASC.Data.Backup;
[Scope]
public class DbBackupProvider : IBackupProvider
{
public string Name => "databases";
private readonly List<string> _processedTables = new List<string>();
private readonly DbHelper _dbHelper;
private readonly TempStream _tempStream;
public DbBackupProvider(DbHelper dbHelper, TempStream tempStream)
{
_dbHelper = dbHelper;
_tempStream = tempStream;
}
public event EventHandler<ProgressChangedEventArgs> ProgressChanged;
public async Task<IEnumerable<XElement>> GetElements(int tenant, string[] configs, IDataWriteOperator writer)
{
_processedTables.Clear();
var xml = new List<XElement>();
var connectionKeys = new Dictionary<string, string>();
foreach (var connectionString in GetConnectionStrings(configs))
{
//do not save the base, having the same provider and connection string is not to duplicate
//data, but also expose the ref attribute of repetitive bases for the correct recovery
var node = new XElement(connectionString.Name);
xml.Add(node);
var connectionKey = connectionString.ProviderName + connectionString.ConnectionString;
if (connectionKeys.TryGetValue(connectionKey, out var value))
{
node.Add(new XAttribute("ref", value));
}
else
{
connectionKeys.Add(connectionKey, connectionString.Name);
node.Add(await BackupDatabase(tenant, connectionString, writer));
}
}
return xml.AsEnumerable();
}
public async Task LoadFromAsync(IEnumerable<XElement> elements, int tenant, string[] configs, IDataReadOperator reader)
{
_processedTables.Clear();
foreach (var connectionString in GetConnectionStrings(configs))
{
await RestoreDatabaseAsync(connectionString, elements, reader);
}
}
public IEnumerable<ConnectionStringSettings> GetConnectionStrings(string[] configs)
{
/* if (configs.Length == 0)
{
configs = new string[] { AppDomain.CurrentDomain.SetupInformation.ConfigurationFile };
}
var connectionStrings = new List<ConnectionStringSettings>();
foreach (var config in configs)
{
connectionStrings.AddRange(GetConnectionStrings(GetConfiguration(config)));
}
return connectionStrings.GroupBy(cs => cs.Name).Select(g => g.First());*/
return null;
}
public IEnumerable<ConnectionStringSettings> GetConnectionStrings(Configuration cfg)
{
var connectionStrings = new List<ConnectionStringSettings>();
foreach (ConnectionStringSettings connectionString in cfg.ConnectionStrings.ConnectionStrings)
{
if (connectionString.Name == "LocalSqlServer" || connectionString.Name == "readonly")
{
continue;
}
connectionStrings.Add(connectionString);
if (connectionString.ConnectionString.Contains("|DataDirectory|"))
{
connectionString.ConnectionString = connectionString.ConnectionString.Replace("|DataDirectory|", Path.GetDirectoryName(cfg.FilePath) + '\\');
}
}
return connectionStrings;
}
private void OnProgressChanged(string status, int progress)
{
ProgressChanged?.Invoke(this, new ProgressChangedEventArgs(status, progress));
}
private Configuration GetConfiguration(string config)
{
if (config.Contains(Path.DirectorySeparatorChar) && !Uri.IsWellFormedUriString(config, UriKind.Relative))
{
var map = new ExeConfigurationFileMap
{
ExeConfigFilename = string.Equals(Path.GetExtension(config), ".config", StringComparison.OrdinalIgnoreCase) ? config : CrossPlatform.PathCombine(config, "Web.config")
};
return ConfigurationManager.OpenMappedExeConfiguration(map, ConfigurationUserLevel.None);
}
return ConfigurationManager.OpenExeConfiguration(config);
}
private async Task<List<XElement>> BackupDatabase(int tenant, ConnectionStringSettings connectionString, IDataWriteOperator writer)
{
var xml = new List<XElement>();
var errors = 0;
var timeout = TimeSpan.FromSeconds(1);
var tables = _dbHelper.GetTables();
for (var i = 0; i < tables.Count; i++)
{
var table = tables[i];
OnProgressChanged(table, (int)(i / (double)tables.Count * 100));
if (_processedTables.Contains(table, StringComparer.InvariantCultureIgnoreCase))
{
continue;
}
xml.Add(new XElement(table));
DataTable dataTable;
while (true)
{
try
{
dataTable = _dbHelper.GetTable(table, tenant);
break;
}
catch
{
errors++;
if (20 < errors)
{
throw;
}
Thread.Sleep(timeout);
}
}
foreach (DataColumn c in dataTable.Columns)
{
if (c.DataType == typeof(DateTime))
{
c.DateTimeMode = DataSetDateTime.Unspecified;
}
}
await using (var file = _tempStream.Create())
{
dataTable.WriteXml(file, XmlWriteMode.WriteSchema);
await writer.WriteEntryAsync($"{Name}\\{connectionString.Name}\\{table}".ToLower(), file);
}
_processedTables.Add(table);
}
return xml;
}
private async Task RestoreDatabaseAsync(ConnectionStringSettings connectionString, IEnumerable<XElement> elements, IDataReadOperator reader)
{
var dbName = connectionString.Name;
var dbElement = elements.SingleOrDefault(e => string.Equals(e.Name.LocalName, connectionString.Name, StringComparison.OrdinalIgnoreCase));
if (dbElement != null && dbElement.Attribute("ref") != null)
{
dbName = dbElement.Attribute("ref").Value;
dbElement = elements.Single(e => string.Equals(e.Name.LocalName, dbElement.Attribute("ref").Value, StringComparison.OrdinalIgnoreCase));
}
if (dbElement == null)
{
return;
}
var tables = _dbHelper.GetTables();
for (var i = 0; i < tables.Count; i++)
{
var table = tables[i];
OnProgressChanged(table, (int)(i / (double)tables.Count * 100));
if (_processedTables.Contains(table, StringComparer.InvariantCultureIgnoreCase))
{
continue;
}
if (dbElement.Element(table) != null)
{
await using (var stream = reader.GetEntry($"{Name}\\{dbName}\\{table}".ToLower()))
{
var data = new DataTable();
data.ReadXml(stream);
await _dbHelper.SetTableAsync(data);
}
_processedTables.Add(table);
}
}
}
}

View File

@ -1,310 +0,0 @@
// (c) Copyright Ascensio System SIA 2010-2022
//
// This program is a free software product.
// You can redistribute it and/or modify it under the terms
// of the GNU Affero General Public License (AGPL) version 3 as published by the Free Software
// Foundation. In accordance with Section 7(a) of the GNU AGPL its Section 15 shall be amended
// to the effect that Ascensio System SIA expressly excludes the warranty of non-infringement of
// any third-party rights.
//
// This program is distributed WITHOUT ANY WARRANTY, without even the implied warranty
// of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. For details, see
// the GNU AGPL at: http://www.gnu.org/licenses/agpl-3.0.html
//
// You can contact Ascensio System SIA at Lubanas st. 125a-25, Riga, Latvia, EU, LV-1021.
//
// The interactive user interfaces in modified source and object code versions of the Program must
// display Appropriate Legal Notices, as required under Section 5 of the GNU AGPL version 3.
//
// Pursuant to Section 7(b) of the License you must retain the original Product logo when
// distributing the program. Pursuant to Section 7(e) we decline to grant you any rights under
// trademark law for use of our trademarks.
//
// All the Product's GUI elements, including illustrations and icon sets, as well as technical writing
// content are licensed under the terms of the Creative Commons Attribution-ShareAlike 4.0
// International. See the License terms at http://creativecommons.org/licenses/by-sa/4.0/legalcode
namespace ASC.Data.Backup;
[Scope]
public class DbHelper : IDisposable
{
private readonly DbProviderFactory _factory;
private readonly DbConnection _connect;
private readonly DbCommandBuilder _builder;
private readonly DataTable _columns;
private readonly bool _mysql;
private readonly ILogger<DbHelper> _logger;
private readonly TenantDbContext _tenantDbContext;
private readonly CoreDbContext _coreDbContext;
private readonly IDictionary<string, string> _whereExceptions
= new Dictionary<string, string>();
public DbHelper(
ILogger<DbHelper> logger,
ConnectionStringSettings connectionString,
IDbContextFactory<TenantDbContext> tenantDbContext,
IDbContextFactory<CoreDbContext> coreDbContext)
{
_logger = logger;
_tenantDbContext = tenantDbContext.CreateDbContext();
_coreDbContext = coreDbContext.CreateDbContext();
var file = connectionString.ElementInformation.Source;
if ("web.connections.config".Equals(Path.GetFileName(file), StringComparison.InvariantCultureIgnoreCase))
{
file = CrossPlatform.PathCombine(Path.GetDirectoryName(file), "Web.config");
}
var xconfig = XDocument.Load(file);
var provider = xconfig.XPathSelectElement("/configuration/system.data/DbProviderFactories/add[@invariant='" + connectionString.ProviderName + "']");
_factory = (DbProviderFactory)Activator.CreateInstance(Type.GetType(provider.Attribute("type").Value, true));
_builder = _factory.CreateCommandBuilder();
_connect = _factory.CreateConnection();
_connect.ConnectionString = connectionString.ConnectionString;
_connect.Open();
_mysql = connectionString.ProviderName.Contains("mysql", StringComparison.OrdinalIgnoreCase);
if (_mysql)
{
CreateCommand("set @@session.sql_mode = concat(@@session.sql_mode, ',NO_AUTO_VALUE_ON_ZERO')").ExecuteNonQuery();
}
_columns = _connect.GetSchema("Columns");
_whereExceptions["calendar_calendar_item"] = " where calendar_id in (select id from calendar_calendars where tenant = {0}) ";
_whereExceptions["calendar_calendar_user"] = " where calendar_id in (select id from calendar_calendars where tenant = {0}) ";
_whereExceptions["calendar_event_item"] = " inner join calendar_events on calendar_event_item.event_id = calendar_events.id where calendar_events.tenant = {0} ";
_whereExceptions["calendar_event_user"] = " inner join calendar_events on calendar_event_user.event_id = calendar_events.id where calendar_events.tenant = {0} ";
_whereExceptions["crm_entity_contact"] = " inner join crm_contact on crm_entity_contact.contact_id = crm_contact.id where crm_contact.tenant_id = {0} ";
_whereExceptions["crm_entity_tag"] = " inner join crm_tag on crm_entity_tag.tag_id = crm_tag.id where crm_tag.tenant_id = {0} ";
_whereExceptions["files_folder_tree"] = " inner join files_folder on folder_id = id where tenant_id = {0} ";
_whereExceptions["forum_answer_variant"] = " where answer_id in (select id from forum_answer where tenantid = {0})";
_whereExceptions["forum_topic_tag"] = " where topic_id in (select id from forum_topic where tenantid = {0})";
_whereExceptions["forum_variant"] = " where question_id in (select id from forum_question where tenantid = {0})";
_whereExceptions["projects_project_participant"] = " inner join projects_projects on projects_project_participant.project_id = projects_projects.id where projects_projects.tenant_id = {0} ";
_whereExceptions["projects_following_project_participant"] = " inner join projects_projects on projects_following_project_participant.project_id = projects_projects.id where projects_projects.tenant_id = {0} ";
_whereExceptions["projects_project_tag"] = " inner join projects_projects on projects_project_tag.project_id = projects_projects.id where projects_projects.tenant_id = {0} ";
_whereExceptions["tenants_tenants"] = " where id = {0}";
_whereExceptions["core_acl"] = " where tenant = {0} or tenant = -1";
_whereExceptions["core_subscription"] = " where tenant = {0} or tenant = -1";
_whereExceptions["core_subscriptionmethod"] = " where tenant = {0} or tenant = -1";
}
public List<string> GetTables()
{
var allowTables = new List<string>
{
"blogs_",
"bookmarking_",
"calendar_",
"core_",
"crm_",
"events_",
"files_",
"forum_",
"photo_",
"projects_",
"tenants_",
"webstudio_",
"wiki_",
};
var disallowTables = new List<string>
{
"core_settings",
"webstudio_uservisit",
"webstudio_useractivity",
"tenants_forbiden",
};
IEnumerable<string> tables;
if (_mysql)
{
tables = ExecuteList(CreateCommand("show tables"));
}
else
{
tables = _connect
.GetSchema("Tables")
.Select(@"TABLE_TYPE <> 'SYSTEM_TABLE'")
.Select(row => (string)row["TABLE_NAME"]);
}
return tables
.Where(t => allowTables.Any(a => t.StartsWith(a)) && !disallowTables.Any(d => t.StartsWith(d)))
.ToList();
}
public DataTable GetTable(string table, int tenant)
{
try
{
var dataTable = new DataTable(table);
var adapter = _factory.CreateDataAdapter();
adapter.SelectCommand = CreateCommand("select " + Quote(table) + ".* from " + Quote(table) + GetWhere(table, tenant));
_logger.Debug(adapter.SelectCommand.CommandText);
adapter.Fill(dataTable);
return dataTable;
}
catch (Exception error)
{
_logger.ErrorTableString(table, error);
throw;
}
}
public async Task SetTableAsync(DataTable table)
{
await using var tx = _connect.BeginTransaction();
try
{
if ("tenants_tenants".Equals(table.TableName, StringComparison.InvariantCultureIgnoreCase))
{
// remove last tenant
var tenant = await Queries.LastTenantAsync(_tenantDbContext);
if (tenant != null)
{
_tenantDbContext.Tenants.Remove(tenant);
await _tenantDbContext.SaveChangesAsync();
}
/* var tenantid = CreateCommand("select id from tenants_tenants order by id desc limit 1").ExecuteScalar();
CreateCommand("delete from tenants_tenants where id = " + tenantid).ExecuteNonQuery();*/
if (table.Columns.Contains("mappeddomain"))
{
foreach (var r in table.Rows.Cast<DataRow>())
{
r[table.Columns["mappeddomain"]] = null;
if (table.Columns.Contains("id"))
{
var tariff = await Queries.TariffAsync(_coreDbContext, tenant.Id);
tariff.TenantId = (int)r[table.Columns["id"]];
tariff.CreateOn = DateTime.Now;
// CreateCommand("update tenants_tariff set tenant = " + r[table.Columns["id"]] + " where tenant = " + tenantid).ExecuteNonQuery();
_coreDbContext.Entry(tariff).State = EntityState.Modified;
await _coreDbContext.SaveChangesAsync();
}
}
}
}
var sql = new StringBuilder("replace into " + Quote(table.TableName) + "(");
var tableColumns = GetColumnsFrom(table.TableName)
.Intersect(table.Columns.Cast<DataColumn>().Select(c => c.ColumnName), StringComparer.InvariantCultureIgnoreCase)
.ToList();
tableColumns.ForEach(column => sql.Append($"{Quote(column)}, "));
sql.Replace(", ", ") values (", sql.Length - 2, 2);
var insert = _connect.CreateCommand();
tableColumns.ForEach(column =>
{
sql.Append($"@{column}, ");
var p = insert.CreateParameter();
p.ParameterName = "@" + column;
insert.Parameters.Add(p);
});
sql.Replace(", ", ")", sql.Length - 2, 2);
insert.CommandText = sql.ToString();
foreach (var r in table.Rows.Cast<DataRow>())
{
foreach (var c in tableColumns)
{
((IDbDataParameter)insert.Parameters["@" + c]).Value = r[c];
}
insert.ExecuteNonQuery();
}
tx.Commit();
}
catch (Exception e)
{
_logger.ErrorTable(table, e);
}
}
public void Dispose()
{
_builder.Dispose();
_connect.Dispose();
}
public DbCommand CreateCommand(string sql)
{
var command = _connect.CreateCommand();
command.CommandText = sql;
return command;
}
public List<string> ExecuteList(DbCommand command)
{
var list = new List<string>();
using (var result = command.ExecuteReader())
{
while (result.Read())
{
list.Add(result.GetString(0));
}
}
return list;
}
private string Quote(string identifier)
{
return identifier;
}
private IEnumerable<string> GetColumnsFrom(string table)
{
if (_mysql)
{
return ExecuteList(CreateCommand("show columns from " + Quote(table)));
}
else
{
return _columns.Select($"TABLE_NAME = '{table}'")
.Select(r => r["COLUMN_NAME"].ToString());
}
}
private string GetWhere(string tableName, int tenant)
{
if (tenant == -1)
{
return string.Empty;
}
if (_whereExceptions.TryGetValue(tableName.ToLower(), out var exc))
{
return string.Format(exc, tenant);
}
var tenantColumn = GetColumnsFrom(tableName).FirstOrDefault(c => c.StartsWith("tenant", StringComparison.OrdinalIgnoreCase));
return tenantColumn != null ?
" where " + Quote(tenantColumn) + " = " + tenant :
" where 1 = 0";
}
}
static file class Queries
{
public static readonly Func<TenantDbContext, Task<DbTenant>> LastTenantAsync =
Microsoft.EntityFrameworkCore.EF.CompileAsyncQuery(
(TenantDbContext ctx) =>
ctx.Tenants.LastOrDefault());
public static readonly Func<CoreDbContext, int, Task<DbTariff>> TariffAsync =
Microsoft.EntityFrameworkCore.EF.CompileAsyncQuery(
(CoreDbContext ctx, int tenantId) =>
ctx.Tariffs.FirstOrDefault(t => t.TenantId == tenantId));
}

View File

@ -1,53 +0,0 @@
// (c) Copyright Ascensio System SIA 2010-2022
//
// This program is a free software product.
// You can redistribute it and/or modify it under the terms
// of the GNU Affero General Public License (AGPL) version 3 as published by the Free Software
// Foundation. In accordance with Section 7(a) of the GNU AGPL its Section 15 shall be amended
// to the effect that Ascensio System SIA expressly excludes the warranty of non-infringement of
// any third-party rights.
//
// This program is distributed WITHOUT ANY WARRANTY, without even the implied warranty
// of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. For details, see
// the GNU AGPL at: http://www.gnu.org/licenses/agpl-3.0.html
//
// You can contact Ascensio System SIA at Lubanas st. 125a-25, Riga, Latvia, EU, LV-1021.
//
// The interactive user interfaces in modified source and object code versions of the Program must
// display Appropriate Legal Notices, as required under Section 5 of the GNU AGPL version 3.
//
// Pursuant to Section 7(b) of the License you must retain the original Product logo when
// distributing the program. Pursuant to Section 7(e) we decline to grant you any rights under
// trademark law for use of our trademarks.
//
// All the Product's GUI elements, including illustrations and icon sets, as well as technical writing
// content are licensed under the terms of the Creative Commons Attribution-ShareAlike 4.0
// International. See the License terms at http://creativecommons.org/licenses/by-sa/4.0/legalcode
namespace ASC.Data.Backup;
public interface IBackupProvider
{
string Name { get; }
event EventHandler<ProgressChangedEventArgs> ProgressChanged;
Task<IEnumerable<XElement>> GetElements(int tenant, string[] configs, IDataWriteOperator writer);
Task LoadFromAsync(IEnumerable<XElement> elements, int tenant, string[] configs, IDataReadOperator reader);
}
public class ProgressChangedEventArgs : EventArgs
{
public string Status { get; private set; }
public double Progress { get; private set; }
public bool Completed { get; private set; }
public ProgressChangedEventArgs(string status, double progress)
: this(status, progress, false) { }
public ProgressChangedEventArgs(string status, double progress, bool completed)
{
Status = status;
Progress = progress;
Completed = completed;
}
}

View File

@ -33,7 +33,7 @@ public class BackupProgressItem : BaseBackupProgressItem
public Dictionary<string, string> StorageParams { get; set; }
public string TempFolder { get; set; }
private const string ArchiveFormat = "tar.gz";
private const string ArchiveFormat = "tar";
private bool _isScheduled;
private Guid _userId;

View File

@ -710,17 +710,13 @@ public class BackupPortalTask : PortalTaskBase
foreach (var file in group)
{
var storage = await StorageFactory.GetStorageAsync(TenantId, group.Key);
var file1 = file;
Stream fileStream = null;
await ActionInvoker.Try(async state =>
try
{
var f = (BackupFileInfo)state;
fileStream = await storage.GetReadStreamAsync(f.Domain, f.Path);
}, file, 5, error => _logger.WarningCanNotBackupFile(file1.Module, file1.Path, error));
if(fileStream != null)
await writer.WriteEntryAsync(file.GetZipKey(), file.Domain, file.Path, storage);
}
catch(Exception error)
{
await writer.WriteEntryAsync(file1.GetZipKey(), fileStream);
fileStream.Dispose();
_logger.WarningCanNotBackupFile(file.Module, file.Path, error);
}
SetCurrentStepProgress((int)(++filesProcessed * 100 / (double)filesCount));
}

View File

@ -94,7 +94,7 @@ public class DeletePortalTask : PortalTaskBase
var domains = StorageFactoryConfig.GetDomainList(module);
foreach (var domain in domains)
{
await ActionInvoker.Try(async state => await storage.DeleteFilesAsync((string)state, "\\", "*.*", true), domain, 5,
await ActionInvoker.TryAsync(async state => await storage.DeleteFilesAsync((string)state, "\\", "*.*", true), domain, 5,
onFailure: error => _logger.WarningCanNotDeleteFilesForDomain(domain, error));
}
await storage.DeleteFilesAsync("\\", "*.*", true);

View File

@ -421,7 +421,7 @@ public class RestorePortalTask : PortalTaskBase
foreach (var domain in domains)
{
await ActionInvoker.Try(
await ActionInvoker.TryAsync(
async state =>
{
if (await storage.IsDirectoryAsync((string)state))

View File

@ -28,6 +28,8 @@ using Amazon.Extensions.S3.Encryption;
using Amazon.Extensions.S3.Encryption.Primitives;
using Amazon.S3.Internal;
using ASC.Data.Storage.Tar;
namespace ASC.Data.Storage.S3;
[Scope]
@ -57,6 +59,7 @@ public class S3Storage : BaseStorage
private EncryptionMethod _encryptionMethod = EncryptionMethod.None;
private string _encryptionKey;
private readonly IConfiguration _configuration;
private readonly CoreBaseSettings _coreBaseSettings;
public S3Storage(
TempStream tempStream,
@ -69,10 +72,12 @@ public class S3Storage : BaseStorage
IHttpClientFactory clientFactory,
IConfiguration configuration,
TenantQuotaFeatureStatHelper tenantQuotaFeatureStatHelper,
QuotaSocketManager quotaSocketManager)
QuotaSocketManager quotaSocketManager,
CoreBaseSettings coreBaseSettings)
: base(tempStream, tenantManager, pathUtils, emailValidationKeyProvider, httpContextAccessor, factory, options, clientFactory, tenantQuotaFeatureStatHelper, quotaSocketManager)
{
_configuration = configuration;
_coreBaseSettings = coreBaseSettings;
}
public Uri GetUriInternal(string path)
@ -387,9 +392,16 @@ public class S3Storage : BaseStorage
public override IDataWriteOperator CreateDataWriteOperator(CommonChunkedUploadSession chunkedUploadSession,
CommonChunkedUploadSessionHolder sessionHolder)
{
if (_coreBaseSettings.Standalone)
{
return new S3ZipWriteOperator(_tempStream, chunkedUploadSession, sessionHolder);
}
else
{
return new S3TarWriteOperator(chunkedUploadSession, sessionHolder);
}
}
#endregion
@ -1243,7 +1255,7 @@ public class S3Storage : BaseStorage
};
completeRequest.AddPartETags(copyResponses);
var completeUploadResponse = await client.CompleteMultipartUploadAsync(completeRequest);
await client.CompleteMultipartUploadAsync(completeRequest);
}
else
{
@ -1273,6 +1285,247 @@ public class S3Storage : BaseStorage
}
}
public async Task ConcatFileStreamAsync(Stream stream, string tarKey, string destinationDomain, string destinationKey, string uploadId, List<PartETag> eTags, int partNumber)
{
using var s3 = GetClient();
var destinationPath = MakePath(destinationDomain, destinationKey);
var blockSize = 512;
long prevFileSize = 0;
try
{
var objResult = await s3.GetObjectMetadataAsync(_bucket, destinationPath);
prevFileSize = objResult.ContentLength;
}
catch { }
var header = BuilderHeaders.CreateHeader(tarKey, stream.Length);
var ms = new MemoryStream();
if (prevFileSize % blockSize != 0)
{
var endBlock = new byte[blockSize - prevFileSize % blockSize];
ms.Write(endBlock);
}
ms.Write(header);
stream.Position = 0;
stream.CopyTo(ms);
stream.Dispose();
stream = ms;
stream.Position = 0;
prevFileSize = stream.Length;
var uploadRequest = new UploadPartRequest
{
BucketName = _bucket,
Key = destinationPath,
UploadId = uploadId,
PartNumber = partNumber,
InputStream = stream
};
eTags.Add(new PartETag(partNumber, (await s3.UploadPartAsync(uploadRequest)).ETag));
var completeRequest = new CompleteMultipartUploadRequest
{
BucketName = _bucket,
Key = destinationPath,
UploadId = uploadId,
PartETags = eTags
};
try
{
await s3.CompleteMultipartUploadAsync(completeRequest);
}
catch(Exception e)
{
}
}
public async Task ConcatFileAsync(string domain, string path, string tarKey, string destinationDomain, string destinationKey, string uploadId, List<PartETag> eTags, int partNumber)
{
using var s3 = GetClient();
var file = MakePath(domain, path);
var destinationPath = MakePath(destinationDomain, destinationKey);
var blockSize = 512;
long prevFileSize = 0;
try
{
var objResult = await s3.GetObjectMetadataAsync(_bucket, destinationPath);
prevFileSize = objResult.ContentLength;
}
catch{}
var objFile = await s3.GetObjectMetadataAsync(_bucket, file);
var header = BuilderHeaders.CreateHeader(tarKey, objFile.ContentLength);
using var stream = new MemoryStream();
if (prevFileSize % blockSize != 0)
{
var endBlock = new byte[blockSize - prevFileSize % blockSize];
stream.Write(endBlock);
}
stream.Write(header);
stream.Position = 0;
prevFileSize = objFile.ContentLength;
var uploadRequest = new UploadPartRequest
{
BucketName = _bucket,
Key = destinationPath,
UploadId = uploadId,
PartNumber = partNumber,
InputStream = stream
};
eTags.Add(new PartETag(partNumber, (await s3.UploadPartAsync(uploadRequest)).ETag));
var completeRequest = new CompleteMultipartUploadRequest
{
BucketName = _bucket,
Key = destinationPath,
UploadId = uploadId,
PartETags = eTags
};
var completeUploadResponse = await s3.CompleteMultipartUploadAsync(completeRequest);
/*******/
(uploadId, eTags, partNumber) = await InitiateConcatAsync(destinationDomain, destinationKey);
var copyRequest = new CopyPartRequest
{
DestinationBucket = _bucket,
DestinationKey = destinationPath,
SourceBucket = _bucket,
SourceKey = file,
UploadId = uploadId,
PartNumber = partNumber
};
eTags.Add(new PartETag(partNumber, (await s3.CopyPartAsync(copyRequest)).ETag));
completeRequest = new CompleteMultipartUploadRequest
{
BucketName = _bucket,
Key = destinationPath,
UploadId = uploadId,
PartETags = eTags
};
completeUploadResponse = await s3.CompleteMultipartUploadAsync(completeRequest);
}
public async Task AddEndAsync(string domain, string key)
{
using var s3 = GetClient();
var path = MakePath(domain, key);
var blockSize = 512;
(var uploadId, var eTags, var partNumber) = await InitiateConcatAsync(domain, key);
var obj = await s3.GetObjectMetadataAsync(_bucket, path);
var buffer = new byte[blockSize - obj.ContentLength % blockSize + blockSize * 2];
var stream = new MemoryStream();
stream.Write(buffer);
stream.Position = 0;
var uploadRequest = new UploadPartRequest
{
BucketName = _bucket,
Key = path,
UploadId = uploadId,
PartNumber = partNumber,
InputStream = stream
};
eTags.Add(new PartETag(partNumber, (await s3.UploadPartAsync(uploadRequest)).ETag));
var completeRequest = new CompleteMultipartUploadRequest
{
BucketName = _bucket,
Key = path,
UploadId = uploadId,
PartETags = eTags
};
await s3.CompleteMultipartUploadAsync(completeRequest);
}
public async Task<(string uploadId, List<PartETag> eTags, int partNumber)> InitiateConcatAsync(string domain, string key, bool removeEmptyHeader = false)
{
using var s3 = GetClient();
key = MakePath(domain, key);
var initiateRequest = new InitiateMultipartUploadRequest
{
BucketName = _bucket,
Key = key
};
var initResponse = await s3.InitiateMultipartUploadAsync(initiateRequest);
try
{
long bytePosition = removeEmptyHeader ? 5 * 1024 * 1024 : 0;
var obj = await s3.GetObjectMetadataAsync(_bucket, key);
var eTags = new List<PartETag>();
var partSize = 5 * 1024 * 1024;
if (obj.ContentLength < partSize)
{
var copyRequest = new CopyPartRequest
{
DestinationBucket = _bucket,
DestinationKey = key,
SourceBucket = _bucket,
SourceKey = key,
UploadId = initResponse.UploadId,
PartNumber = 1,
FirstByte = bytePosition,
LastByte = obj.ContentLength - 1
};
eTags.Add(new PartETag(1, (await s3.CopyPartAsync(copyRequest)).ETag));
return (initResponse.UploadId, eTags, 2);
}
else
{
var objectSize = obj.ContentLength;
var partNumber = 1;
for (var i = 1; bytePosition < objectSize; i++)
{
var copyRequest = new CopyPartRequest
{
DestinationBucket = _bucket,
DestinationKey = key,
SourceBucket = _bucket,
SourceKey = key,
UploadId = initResponse.UploadId,
FirstByte = bytePosition,
LastByte = bytePosition + partSize - 1 >= objectSize ? objectSize - 1 : bytePosition + partSize - 1,
PartNumber = i
};
partNumber = i + 1;
bytePosition += partSize;
if (objectSize - bytePosition < 5 * 1024 * 1024)
{
copyRequest.LastByte = objectSize - 1;
bytePosition += partSize;
}
eTags.Add(new PartETag(i, (await s3.CopyPartAsync(copyRequest)).ETag));
}
return (initResponse.UploadId, eTags, partNumber);
}
}
catch
{
return (initResponse.UploadId, new List<PartETag>(), 1);
}
}
private IAmazonCloudFront GetCloudFrontClient()
{
var cfg = new AmazonCloudFrontConfig { MaxErrorRetry = 3 };

View File

@ -24,12 +24,27 @@
// content are licensed under the terms of the Creative Commons Attribution-ShareAlike 4.0
// International. See the License terms at http://creativecommons.org/licenses/by-sa/4.0/legalcode
namespace ASC.Data.Backup.Core.Log;
public static partial class DbHelperLogger
{
[LoggerMessage(Level = LogLevel.Error, Message = "Table {table}")]
public static partial void ErrorTableString(this ILogger<DbHelper> logger, string table, Exception exception);
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
[LoggerMessage(Level = LogLevel.Error, Message = "Table {table}")]
public static partial void ErrorTable(this ILogger<DbHelper> logger, DataTable table, Exception exception);
namespace ASC.Data.Storage.Tar;
public static class BuilderHeaders
{
public static byte[] CreateHeader(string name, long size)
{
var blockBuffer = new byte[512];
var tarHeader = new TarHeader()
{
Name = name,
Size = size
};
tarHeader.WriteHeader(blockBuffer, null);
return blockBuffer;
}
}

View File

@ -24,7 +24,7 @@
// content are licensed under the terms of the Creative Commons Attribution-ShareAlike 4.0
// International. See the License terms at http://creativecommons.org/licenses/by-sa/4.0/legalcode
namespace ASC.Data.Backup;
namespace ASC.Data.Storage.ZipOperators;
public static class ActionInvoker
{
@ -77,7 +77,18 @@ public static class ActionInvoker
}
}
public static async Task Try(
public static async Task TryAsync(
Func<Task> action,
int maxAttempts,
Action<Exception> onFailure = null,
Action<Exception> onAttemptFailure = null,
int sleepMs = 1000,
bool isSleepExponential = true)
{
await TryAsync(state => action(), null, maxAttempts, onFailure, onAttemptFailure, sleepMs, isSleepExponential);
}
public static async Task TryAsync(
Func<object, Task> action,
object state,
int maxAttempts,

View File

@ -63,7 +63,21 @@ public class ChunkZipWriteOperator : IDataWriteOperator
_sha = SHA256.Create();
}
public async Task WriteEntryAsync(string key, Stream stream)
public async Task WriteEntryAsync(string tarKey, string domain, string path, IDataStore store)
{
Stream fileStream = null;
await ActionInvoker.TryAsync(async () =>
{
fileStream = await store.GetReadStreamAsync(domain, path);
}, 5, error => throw error);
if (fileStream != null)
{
await WriteEntryAsync(tarKey, fileStream);
fileStream.Dispose();
}
}
public async Task WriteEntryAsync(string tarKey, Stream stream)
{
if (_fileStream == null)
{
@ -73,7 +87,7 @@ public class ChunkZipWriteOperator : IDataWriteOperator
await using (var buffered = _tempStream.GetBuffered(stream))
{
var entry = TarEntry.CreateTarEntry(key);
var entry = TarEntry.CreateTarEntry(tarKey);
entry.Size = buffered.Length;
await _tarOutputStream.PutNextEntryAsync(entry, default);
buffered.Position = 0;

View File

@ -28,7 +28,8 @@ namespace ASC.Data.Storage.ZipOperators;
public interface IDataWriteOperator : IAsyncDisposable
{
Task WriteEntryAsync(string key, Stream stream);
Task WriteEntryAsync(string tarKey, Stream stream);
Task WriteEntryAsync(string tarKey, string domain, string path, IDataStore store);
bool NeedUpload { get; }
string Hash { get; }
string StoragePath { get; }

View File

@ -0,0 +1,108 @@
// (c) Copyright Ascensio System SIA 2010-2022
//
// This program is a free software product.
// You can redistribute it and/or modify it under the terms
// of the GNU Affero General Public License (AGPL) version 3 as published by the Free Software
// Foundation. In accordance with Section 7(a) of the GNU AGPL its Section 15 shall be amended
// to the effect that Ascensio System SIA expressly excludes the warranty of non-infringement of
// any third-party rights.
//
// This program is distributed WITHOUT ANY WARRANTY, without even the implied warranty
// of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. For details, see
// the GNU AGPL at: http://www.gnu.org/licenses/agpl-3.0.html
//
// You can contact Ascensio System SIA at Lubanas st. 125a-25, Riga, Latvia, EU, LV-1021.
//
// The interactive user interfaces in modified source and object code versions of the Program must
// display Appropriate Legal Notices, as required under Section 5 of the GNU AGPL version 3.
//
// Pursuant to Section 7(b) of the License you must retain the original Product logo when
// distributing the program. Pursuant to Section 7(e) we decline to grant you any rights under
// trademark law for use of our trademarks.
//
// All the Product's GUI elements, including illustrations and icon sets, as well as technical writing
// content are licensed under the terms of the Creative Commons Attribution-ShareAlike 4.0
// International. See the License terms at http://creativecommons.org/licenses/by-sa/4.0/legalcode
using System.IO;
using Google.Apis.Storage.v1.Data;
namespace ASC.Data.Storage.ZipOperators;
internal class S3TarWriteOperator : IDataWriteOperator
{
private readonly CommonChunkedUploadSession _chunkedUploadSession;
private readonly CommonChunkedUploadSessionHolder _sessionHolder;
private readonly S3Storage _store;
private bool _first = true;
private readonly string _domain = "files_temp";
private readonly string _key;
public string Hash { get; private set; }
public string StoragePath { get; private set; }
public bool NeedUpload => false;
public S3TarWriteOperator(CommonChunkedUploadSession chunkedUploadSession, CommonChunkedUploadSessionHolder sessionHolder)
{
_chunkedUploadSession = chunkedUploadSession;
_sessionHolder = sessionHolder;
_store = _sessionHolder.DataStore as S3Storage;
using var stream = new MemoryStream();
var buffer = new byte[5 * 1024 * 1024];
stream.Write(buffer);
stream.Position = 0;
_key = _chunkedUploadSession.TempPath;
_sessionHolder.UploadChunkAsync(_chunkedUploadSession, stream, stream.Length).Wait();
}
public async Task WriteEntryAsync(string tarKey, string domain, string path, IDataStore store)
{
(var uploadId, var eTags, var chunkNumber) = await GetDataAsync();
await _store.ConcatFileAsync(domain, path, tarKey, _domain, _key, uploadId, eTags, chunkNumber);
}
public async Task WriteEntryAsync(string tarKey, Stream stream)
{
(var uploadId, var eTags, var chunkNumber) = await GetDataAsync();
await _store.ConcatFileStreamAsync(stream, tarKey, _domain, _key, uploadId, eTags, chunkNumber);
}
private async Task<(string uploadId, List<PartETag> eTags, int partNumber)> GetDataAsync()
{
List<PartETag> eTags = null;
var chunkNumber = 0;
string uploadId = null;
if (_first)
{
eTags = _chunkedUploadSession.GetItemOrDefault<Dictionary<int, string>>("ETag").Select(x => new PartETag(x.Key, x.Value)).ToList();
int.TryParse(_chunkedUploadSession.GetItemOrDefault<string>("ChunksUploaded"), out chunkNumber);
chunkNumber++;
uploadId = _chunkedUploadSession.UploadId;
_first = false;
}
else
{
(uploadId, eTags, chunkNumber) = await _store.InitiateConcatAsync(_domain, _key);
}
return (uploadId, eTags, chunkNumber);
}
public async ValueTask DisposeAsync()
{
await _store.AddEndAsync(_domain ,_key);
var contentLength = await _store.GetFileSizeAsync(_domain, _key);
(var uploadId, var eTags, var partNumber) = await _store.InitiateConcatAsync(_domain, _key, removeEmptyHeader: true);
_chunkedUploadSession.BytesUploaded = contentLength;
_chunkedUploadSession.BytesTotal = contentLength;
_chunkedUploadSession.UploadId = uploadId;
_chunkedUploadSession.Items["ETag"] = eTags.ToDictionary(e => e.PartNumber, e => e.ETag);
_chunkedUploadSession.Items["ChunksUploaded"] = partNumber.ToString();
StoragePath = await _sessionHolder.FinalizeAsync(_chunkedUploadSession);
Hash = "";
}
}

View File

@ -67,7 +67,21 @@ public class S3ZipWriteOperator : IDataWriteOperator
_sha = SHA256.Create();
}
public async Task WriteEntryAsync(string key, Stream stream)
public async Task WriteEntryAsync(string tarKey, string domain, string path, IDataStore store)
{
Stream fileStream = null;
await ActionInvoker.TryAsync(async () =>
{
fileStream = await _sessionHolder.DataStore.GetReadStreamAsync(domain, path);
}, 5, error => throw error);
if (fileStream != null)
{
await WriteEntryAsync(tarKey, fileStream);
fileStream.Dispose();
}
}
public async Task WriteEntryAsync(string tarKey, Stream stream)
{
if (_fileStream == null)
{
@ -77,7 +91,7 @@ public class S3ZipWriteOperator : IDataWriteOperator
await using (var buffered = _tempStream.GetBuffered(stream))
{
var entry = TarEntry.CreateTarEntry(key);
var entry = TarEntry.CreateTarEntry(tarKey);
entry.Size = buffered.Length;
await _tarOutputStream.PutNextEntryAsync(entry, default);
buffered.Position = 0;

View File

@ -54,11 +54,25 @@ public class ZipWriteOperator : IDataWriteOperator
_tarOutputStream = new TarOutputStream(_gZipOutputStream, Encoding.UTF8);
}
public async Task WriteEntryAsync(string key, Stream stream)
public async Task WriteEntryAsync(string tarKey, string domain, string path, IDataStore store)
{
Stream fileStream = null;
await ActionInvoker.TryAsync(async () =>
{
fileStream = await store.GetReadStreamAsync(domain, path);
}, 5, error => throw error);
if (fileStream != null)
{
await WriteEntryAsync(tarKey, fileStream);
fileStream.Dispose();
}
}
public async Task WriteEntryAsync(string tarKey, Stream stream)
{
await using (var buffered = _tempStream.GetBuffered(stream))
{
var entry = TarEntry.CreateTarEntry(key);
var entry = TarEntry.CreateTarEntry(tarKey);
entry.Size = buffered.Length;
await _tarOutputStream.PutNextEntryAsync(entry, default);
buffered.Position = 0;