/* * * (c) Copyright Ascensio System Limited 2010-2018 * * This program is freeware. You can redistribute it and/or modify it under the terms of the GNU * General Public License (GPL) version 3 as published by the Free Software Foundation (https://www.gnu.org/copyleft/gpl.html). * In accordance with Section 7(a) of the GNU GPL its Section 15 shall be amended to the effect that * Ascensio System SIA expressly excludes the warranty of non-infringement of any third-party rights. * * THIS PROGRAM IS DISTRIBUTED WITHOUT ANY WARRANTY; WITHOUT EVEN THE IMPLIED WARRANTY OF MERCHANTABILITY OR * FITNESS FOR A PARTICULAR PURPOSE. For more details, see GNU GPL at https://www.gnu.org/copyleft/gpl.html * * You can contact Ascensio System SIA by email at sales@onlyoffice.com * * The interactive user interfaces in modified source and object code versions of ONLYOFFICE must display * Appropriate Legal Notices, as required under Section 5 of the GNU GPL version 3. * * Pursuant to Section 7 § 3(b) of the GNU GPL you must retain the original ONLYOFFICE logo which contains * relevant author attributions when distributing the software. If the display of the logo in its graphic * form is not reasonably feasible for technical reasons, you must include the words "Powered by ONLYOFFICE" * in every copy of the program you distribute. * Pursuant to Section 7 § 3(e) we decline to grant you any rights under trademark law for use of our trademarks. * */ using System; using System.Collections.Generic; using System.Linq; using System.Threading; using ASC.Common.Logging; using Microsoft.Extensions.Options; namespace ASC.Common.Threading.Workers { public class WorkerQueueOptionsManager : OptionsManager> { public WorkerQueueOptionsManager(IOptionsFactory> factory) : base(factory) { } } public class ConfigureWorkerQueue : IConfigureOptions> { public ConfigureWorkerQueue(IOptionsMonitor log) { Log = log; } public IOptionsMonitor Log { get; } public void Configure(Workers.WorkerQueue queue) { queue.log = Log.Get("ASC.WorkerQueue"); } } public class WorkerQueue { internal ILog log; private readonly ICollection> items = new List>(); private readonly List threads = new List(); private readonly AutoResetEvent waitEvent = new AutoResetEvent(false); private readonly ManualResetEvent stopEvent = new ManualResetEvent(false); public int workerCount; public bool stopAfterFinsih; public int errorCount; public int waitInterval; private Action action; private volatile bool started; public object SynchRoot { get { return Items; } } protected virtual ICollection> Items { get { return items; } } public bool IsStarted { get { return started; } } public WorkerQueue() { } public void Start(Action starter) { Start(starter, true); } public IEnumerable GetItems() { lock (Items) { return Items.Select(x => x.Item).ToList(); } } public virtual void AddRange(IEnumerable items) { lock (Items) { foreach (var item in items) { Items.Add(new WorkItem(item)); } } waitEvent.Set(); ReviveThreads(); } public virtual void Add(T item) { lock (Items) { Items.Add(new WorkItem(item)); } waitEvent.Set(); ReviveThreads(); } public void Remove(T item) { lock (Items) { var existing = Items.Where(x => Equals(x.Item, item)).SingleOrDefault(); RemoveInternal(existing); } } public void Clear() { lock (Items) { foreach (var workItem in Items) { workItem.Dispose(); } Items.Clear(); } } public void Stop() { if (started) { started = false; stopEvent.Set(); waitEvent.Set(); log.Debug("Stoping queue. Joining threads"); foreach (var workerThread in threads) { workerThread.Join(); } threads.Clear(); log.Debug("Queue stoped. Threads cleared"); } } public void Terminate() { if (started) { started = false; stopEvent.Set(); waitEvent.Set(); log.Debug("Stoping queue. Terminating threads"); foreach (var worker in threads.Where(t => t != Thread.CurrentThread)) { worker.Abort(); } if (threads.Contains(Thread.CurrentThread)) { threads.Clear(); log.Debug("Terminate called from current worker thread. Terminating"); Thread.CurrentThread.Abort(); } threads.Clear(); log.Debug("Queue stoped. Threads cleared"); } } protected virtual WorkItem Selector() { return Items.Where(x => !x.IsProcessed).OrderBy(x => x.Added).FirstOrDefault(); } protected virtual void PostComplete(WorkItem item) { RemoveInternal(item); } protected void RemoveInternal(WorkItem item) { if (item != null) { Items.Remove(item); item.Dispose(); } } protected virtual void ErrorLimit(WorkItem item) { RemoveInternal(item); } protected virtual void Error(WorkItem item, Exception exception) { log.Error(item, exception); item.IsProcessed = false; item.Added = DateTime.Now; } private WaitHandle[] WaitObjects() { return new WaitHandle[] { stopEvent, waitEvent }; } private void ReviveThreads() { if (threads.Count != 0) { var haveLiveThread = threads.Count(x => x.IsAlive) > 0; if (!haveLiveThread) { Stop(); Start(action); } } } private void Start(Action starter, bool backgroundThreads) { if (!started) { started = true; action = starter; stopEvent.Reset(); waitEvent.Reset(); log.Debug("Creating threads"); for (var i = 0; i < workerCount; i++) { threads.Add(new Thread(DoWork) { IsBackground = backgroundThreads }); } log.Debug("Starting threads"); foreach (var thread in threads) { thread.Start(stopAfterFinsih); } } } private void DoWork(object state) { try { var stopAfterFinsih = false; if (state != null && state is bool) { stopAfterFinsih = (bool)state; } do { WorkItem item; Action localAction; lock (Items) { localAction = action; item = Selector(); if (item != null) { item.IsProcessed = true; } } if (localAction == null) break;//Exit if action is null if (item != null) { try { localAction(item.Item); var fallSleep = false; lock (Items) { PostComplete(item); if (Items.Count == 0) { fallSleep = true; } } if (fallSleep) { if (stopAfterFinsih || WaitHandle.WaitAny(WaitObjects(), Timeout.Infinite, false) == 0) { break; } } } catch (ThreadAbortException) { return; } catch (Exception e) { lock (Items) { Error(item, e); item.ErrorCount++; if (item.ErrorCount > errorCount) { ErrorLimit(item); } } } } else { if (stopAfterFinsih || WaitHandle.WaitAny(WaitObjects(), waitInterval, false) == 0) { break; } } } while (true); } catch (ThreadAbortException) { return; } catch (Exception err) { log.Error(err); } } } }