// (c) Copyright Ascensio System SIA 2010-2022 // // This program is a free software product. // You can redistribute it and/or modify it under the terms // of the GNU Affero General Public License (AGPL) version 3 as published by the Free Software // Foundation. In accordance with Section 7(a) of the GNU AGPL its Section 15 shall be amended // to the effect that Ascensio System SIA expressly excludes the warranty of non-infringement of // any third-party rights. // // This program is distributed WITHOUT ANY WARRANTY, without even the implied warranty // of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. For details, see // the GNU AGPL at: http://www.gnu.org/licenses/agpl-3.0.html // // You can contact Ascensio System SIA at Lubanas st. 125a-25, Riga, Latvia, EU, LV-1021. // // The interactive user interfaces in modified source and object code versions of the Program must // display Appropriate Legal Notices, as required under Section 5 of the GNU AGPL version 3. // // Pursuant to Section 7(b) of the License you must retain the original Product logo when // distributing the program. Pursuant to Section 7(e) we decline to grant you any rights under // trademark law for use of our trademarks. // // All the Product's GUI elements, including illustrations and icon sets, as well as technical writing // content are licensed under the terms of the Creative Commons Attribution-ShareAlike 4.0 // International. See the License terms at http://creativecommons.org/licenses/by-sa/4.0/legalcode namespace ASC.ElasticSearch; [Singletone] public class FactoryIndexerHelper { public DateTime LastIndexed { get; set; } public string Indexing { get; set; } public FactoryIndexerHelper(ICacheNotify cacheNotify) { cacheNotify.Subscribe((a) => { if (a.LastIndexed != 0) { LastIndexed = new DateTime(a.LastIndexed); } Indexing = a.Indexing; }, CacheNotifyAction.Any); } } public interface IFactoryIndexer { string IndexName { get; } string SettingsTitle { get; } void IndexAll(); void ReIndex(); } [Scope] public class FactoryIndexer : IFactoryIndexer where T : class, ISearchItem { public ILogger Logger { get; } public string IndexName { get => _indexer.IndexName; } public virtual string SettingsTitle => string.Empty; protected readonly TenantManager _tenantManager; protected readonly BaseIndexer _indexer; private readonly SearchSettingsHelper _searchSettingsHelper; private readonly FactoryIndexer _factoryIndexerCommon; private readonly IServiceProvider _serviceProvider; private readonly ICache _cache; private static readonly TaskScheduler _scheduler = new ConcurrentExclusiveSchedulerPair(TaskScheduler.Default, 10).ConcurrentScheduler; public FactoryIndexer( ILoggerProvider options, TenantManager tenantManager, SearchSettingsHelper searchSettingsHelper, FactoryIndexer factoryIndexer, BaseIndexer baseIndexer, IServiceProvider serviceProvider, ICache cache) { _cache = cache; Logger = options.CreateLogger("ASC.Indexer"); _tenantManager = tenantManager; _searchSettingsHelper = searchSettingsHelper; _factoryIndexerCommon = factoryIndexer; _indexer = baseIndexer; _serviceProvider = serviceProvider; } public bool TrySelect(Expression, Selector>> expression, out IReadOnlyCollection result) { var t = _serviceProvider.GetService(); if (!Support(t) || !_indexer.CheckExist(t)) { result = new List(); return false; } try { result = _indexer.Select(expression); } catch (Exception e) { Logger.LogError(e, "Select"); result = new List(); return false; } return true; } public bool TrySelectIds(Expression, Selector>> expression, out List result) { var t = _serviceProvider.GetService(); if (!Support(t) || !_indexer.CheckExist(t)) { result = new List(); return false; } try { result = _indexer.Select(expression, true).Select(r => r.Id).ToList(); } catch (Exception e) { Logger.LogError(e, "Select"); result = new List(); return false; } return true; } public bool TrySelectIds(Expression, Selector>> expression, out List result, out long total) { var t = _serviceProvider.GetService(); if (!Support(t) || !_indexer.CheckExist(t)) { result = new List(); total = 0; return false; } try { result = _indexer.Select(expression, true, out total).Select(r => r.Id).ToList(); } catch (Exception e) { Logger.LogError(e, "Select"); total = 0; result = new List(); return false; } return true; } public bool CanIndexByContent(T t) { return Support(t) && _searchSettingsHelper.CanIndexByContent(_tenantManager.GetCurrentTenant().Id); } public bool Index(T data, bool immediately = true) { var t = _serviceProvider.GetService(); if (!Support(t)) { return false; } try { _indexer.Index(data, immediately); return true; } catch (Exception e) { Logger.LogError(e, "Index"); } return false; } public void Index(List data, bool immediately = true, int retry = 0) { var t = _serviceProvider.GetService(); if (!Support(t) || data.Count == 0) { return; } try { _indexer.Index(data, immediately); } catch (ElasticsearchClientException e) { Logger.LogError(e, "Index"); if (e.Response != null) { Logger.LogError(e.Response.HttpStatusCode.ToString()); if (e.Response.HttpStatusCode == 413 || e.Response.HttpStatusCode == 403 || e.Response.HttpStatusCode == 408) { data.ForEach(r => Index(r, immediately)); } else if (e.Response.HttpStatusCode == 429) { Thread.Sleep(60000); if (retry < 5) { Index(data, immediately, retry + 1); return; } throw; } } } catch (AggregateException e) //ElasticsearchClientException { if (e.InnerExceptions.Count == 0) { throw; } var inner = e.InnerExceptions.OfType().FirstOrDefault(); if (inner != null) { Logger.LogError(inner, "inner"); if (inner.Response.HttpStatusCode == 413 || inner.Response.HttpStatusCode == 403) { Logger.LogError(inner.Response.HttpStatusCode.ToString()); data.ForEach(r => Index(r, immediately)); } else if (inner.Response.HttpStatusCode == 429) { Thread.Sleep(60000); if (retry < 5) { Index(data, immediately, retry + 1); return; } throw; } } else { throw; } } } public Task IndexAsync(List data, bool immediately = true, int retry = 0) { var t = _serviceProvider.GetService(); if (!Support(t) || data.Count == 0) { return Task.CompletedTask; } return InternalIndexAsync(data, immediately, retry); } private async Task InternalIndexAsync(List data, bool immediately, int retry) { try { await _indexer.IndexAsync(data, immediately).ConfigureAwait(false); } catch (ElasticsearchClientException e) { Logger.LogError(e, "IndexAsync"); if (e.Response != null) { Logger.LogError(e.Response.HttpStatusCode.ToString()); if (e.Response.HttpStatusCode == 413 || e.Response.HttpStatusCode == 403 || e.Response.HttpStatusCode == 408) { data.ForEach(r => Index(r, immediately)); } else if (e.Response.HttpStatusCode == 429) { await Task.Delay(60000); if (retry < 5) { await IndexAsync(data, immediately, retry + 1); return; } throw; } } } catch (AggregateException e) //ElasticsearchClientException { if (e.InnerExceptions.Count == 0) { throw; } var inner = e.InnerExceptions.OfType().FirstOrDefault(); if (inner != null) { Logger.LogError(inner, "IndexAsync"); if (inner.Response.HttpStatusCode == 413 || inner.Response.HttpStatusCode == 403) { Logger.LogError(inner.Response.HttpStatusCode.ToString()); data.ForEach(r => Index(r, immediately)); } else if (inner.Response.HttpStatusCode == 429) { await Task.Delay(60000); if (retry < 5) { await IndexAsync(data, immediately, retry + 1); return; } throw; } } else { throw; } } } public void Update(T data, bool immediately = true, params Expression>[] fields) { var t = _serviceProvider.GetService(); if (!Support(t)) { return; } try { _indexer.Update(data, immediately, fields); } catch (Exception e) { Logger.LogError(e, "Update"); } } public void Update(T data, UpdateAction action, Expression> field, bool immediately = true) { var t = _serviceProvider.GetService(); if (!Support(t)) { return; } try { _indexer.Update(data, action, field, immediately); } catch (Exception e) { Logger.LogError(e, "Update"); } } public void Update(T data, Expression, Selector>> expression, bool immediately = true, params Expression>[] fields) { var t = _serviceProvider.GetService(); if (!Support(t)) { return; } try { var tenant = _tenantManager.GetCurrentTenant().Id; _indexer.Update(data, expression, tenant, immediately, fields); } catch (Exception e) { Logger.LogError(e, "Update"); } } public void Update(T data, Expression, Selector>> expression, UpdateAction action, Expression> fields, bool immediately = true) { var t = _serviceProvider.GetService(); if (!Support(t)) { return; } try { var tenant = _tenantManager.GetCurrentTenant().Id; _indexer.Update(data, expression, tenant, action, fields, immediately); } catch (Exception e) { Logger.LogError(e, "Update"); } } public void Delete(T data, bool immediately = true) { var t = _serviceProvider.GetService(); if (!Support(t)) { return; } try { _indexer.Delete(data, immediately); } catch (Exception e) { Logger.LogError(e, "Delete"); } } public void Delete(Expression, Selector>> expression, bool immediately = true) { var t = _serviceProvider.GetService(); if (!Support(t)) { return; } var tenant = _tenantManager.GetCurrentTenant().Id; try { _indexer.Delete(expression, tenant, immediately); } catch (Exception e) { Logger.LogError(e, "Index"); } } public Task IndexAsync(T data, bool immediately = true) { var t = _serviceProvider.GetService(); return !Support(t) ? Task.FromResult(false) : Queue(() => _indexer.Index(data, immediately)); } public Task UpdateAsync(T data, bool immediately = true, params Expression>[] fields) { var t = _serviceProvider.GetService(); return !Support(t) ? Task.FromResult(false) : Queue(() => _indexer.Update(data, immediately, fields)); } public Task DeleteAsync(T data, bool immediately = true) { var t = _serviceProvider.GetService(); return !Support(t) ? Task.FromResult(false) : Queue(() => _indexer.Delete(data, immediately)); } public async Task DeleteAsync(Expression, Selector>> expression, bool immediately = true) { var t = _serviceProvider.GetService(); if (!await SupportAsync(t)) { return false; } var tenant = _tenantManager.GetCurrentTenant().Id; return await Queue(() => _indexer.Delete(expression, tenant, immediately)); } public void Flush() { var t = _serviceProvider.GetService(); if (!Support(t)) { return; } _indexer.Flush(); } public void Refresh() { var t = _serviceProvider.GetService(); if (!Support(t)) { return; } _indexer.Refresh(); } public virtual void IndexAll() { return; } public void ReIndex() { _indexer.ReIndex(); } public bool Support(T t) { if (!_factoryIndexerCommon.CheckState()) { return false; } var cacheTime = DateTime.UtcNow.AddMinutes(15); var key = "elasticsearch " + t.IndexName; try { var cacheValue = _cache.Get(key); if (!string.IsNullOrEmpty(cacheValue)) { return Convert.ToBoolean(cacheValue); } //TODO: //var service = new Service.Service(); //var result = service.Support(t.IndexName); //Cache.Insert(key, result.ToString(CultureInfo.InvariantCulture).ToLower(), cacheTime); return true; } catch (Exception e) { _cache.Insert(key, "false", cacheTime); Logger.LogError(e, "FactoryIndexer CheckState"); return false; } } public Task SupportAsync(T t) { return _factoryIndexerCommon.CheckStateAsync(); } private Task Queue(Action actionData) { var task = new Task(() => { try { actionData(); return true; } catch (AggregateException agg) { foreach (var e in agg.InnerExceptions) { Logger.LogError(e, "Queue"); } throw; } }, TaskCreationOptions.LongRunning); task.ConfigureAwait(false); task.Start(_scheduler); return task; } } [Scope] public class FactoryIndexer { public ILogger Log { get; } private readonly ICache _cache; private readonly IServiceProvider _serviceProvider; private readonly FactoryIndexerHelper _factoryIndexerHelper; private readonly Client _client; private readonly CoreBaseSettings _coreBaseSettings; public FactoryIndexer( IServiceProvider serviceProvider, FactoryIndexerHelper factoryIndexerHelper, Client client, ILoggerProvider options, CoreBaseSettings coreBaseSettings, ICache cache) { _cache = cache; _serviceProvider = serviceProvider; _factoryIndexerHelper = factoryIndexerHelper; _client = client; _coreBaseSettings = coreBaseSettings; try { Log = options.CreateLogger("ASC.Indexer"); } catch (Exception e) { Log.LogCritical(e, "FactoryIndexer"); } } public bool CheckState(bool cacheState = true) { const string key = "elasticsearch"; if (cacheState) { var cacheValue = _cache.Get(key); if (!string.IsNullOrEmpty(cacheValue)) { return Convert.ToBoolean(cacheValue); } } var cacheTime = DateTime.UtcNow.AddMinutes(15); try { var isValid = _client.Ping(); if (cacheState) { _cache.Insert(key, isValid.ToString(CultureInfo.InvariantCulture).ToLower(), cacheTime); } return isValid; } catch (Exception e) { if (cacheState) { _cache.Insert(key, "false", cacheTime); } Log.LogError(e, "Ping false"); return false; } } public Task CheckStateAsync(bool cacheState = true) { const string key = "elasticsearch"; if (cacheState) { var cacheValue = _cache.Get(key); if (!string.IsNullOrEmpty(cacheValue)) { return Task.FromResult(Convert.ToBoolean(cacheValue)); } } return InternalCheckStateAsync(cacheState, key); } private async Task InternalCheckStateAsync(bool cacheState, string key) { var cacheTime = DateTime.UtcNow.AddMinutes(15); try { var result = await _client.Instance.PingAsync(new PingRequest()); var isValid = result.IsValid; Log.LogDebug("CheckState ping {0}", result.DebugInformation); if (cacheState) { _cache.Insert(key, isValid.ToString(CultureInfo.InvariantCulture).ToLower(), cacheTime); } return isValid; } catch (Exception e) { if (cacheState) { _cache.Insert(key, "false", cacheTime); } Log.LogError(e, "Ping false"); return false; } } public object GetState(TenantUtil tenantUtil) { State state = null; IEnumerable indices = null; Dictionary count = null; if (!_coreBaseSettings.Standalone) { return new { state, indices, status = CheckState() }; } state = new State { Indexing = _factoryIndexerHelper.Indexing, LastIndexed = _factoryIndexerHelper.LastIndexed != DateTime.MinValue ? _factoryIndexerHelper.LastIndexed : default(DateTime?) }; if (state.LastIndexed.HasValue) { state.LastIndexed = tenantUtil.DateTimeFromUtc(state.LastIndexed.Value); } indices = _client.Instance.Cat.Indices(new CatIndicesRequest { SortByColumns = new[] { "index" } }).Records .Select(r => new { r.Index, Count = count.ContainsKey(r.Index) ? count[r.Index] : 0, DocsCount = _client.Instance.Count(new CountRequest(r.Index)).Count, r.StoreSize }) .Where(r => { return r.Count > 0; }); return new { state, indices, status = CheckState() }; } public void Reindex(string name) { if (!_coreBaseSettings.Standalone) { return; } var generic = typeof(BaseIndexer<>); var indexers = _serviceProvider.GetService>() .Where(r => string.IsNullOrEmpty(name) || r.IndexName == name) .Select(r => (IFactoryIndexer)Activator.CreateInstance(generic.MakeGenericType(r.GetType()), r)); foreach (var indexer in indexers) { indexer.ReIndex(); } } }