DocSpace-buildtools/thirdparty/Microsoft.Graph.Core/Tasks/LargeFileUploadTask.cs
2020-07-10 18:37:02 +03:00

265 lines
12 KiB
C#

// ------------------------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All Rights Reserved. Licensed under the MIT License. See License in the project root for license information.
// ------------------------------------------------------------------------------
namespace Microsoft.Graph
{
using System;
using System.Collections.Generic;
using System.IO;
using System.Net.Http;
using System.Threading.Tasks;
/// <summary>
/// Task to help with resume able large file uploads.
/// </summary>
public class LargeFileUploadTask<T>
{
private const int DefaultMaxSliceSize = 4 * 1024 * 1024;
private const int RequiredSliceSizeIncrement = 320 * 1024;
private IUploadSession Session { get; set; }
private readonly IBaseClient _client;
private readonly Stream _uploadStream;
private readonly int _maxSliceSize;
private List<Tuple<long, long>> _rangesRemaining;
private long TotalUploadLength => _uploadStream.Length;
/// <summary>
/// Task to help with resume able large file uploads. Generates slices based on <paramref name="uploadSession"/>
/// information, and can control uploading of requests/>
/// </summary>
/// <param name="uploadSession">Session information of type <see cref="IUploadSession"/>></param>
/// <param name="uploadStream">Readable, seekable stream to be uploaded. Length of session is determined via uploadStream.Length</param>
/// <param name="maxSliceSize">Max size of each slice to be uploaded. Multiple of 320 KiB (320 * 1024) is required.</param>
/// <param name="baseClient"><see cref="IBaseClient"/> to use for making upload requests. The client should not set Auth headers as upload urls do not need them.
/// If less than 0, default value of 5 MiB is used. .</param>
public LargeFileUploadTask(IUploadSession uploadSession, Stream uploadStream, int maxSliceSize = -1, IBaseClient baseClient = null)
{
if (!uploadStream.CanRead || !uploadStream.CanSeek)
{
throw new ArgumentException("Must provide stream that can read and seek");
}
this.Session = uploadSession;
this._client = baseClient ?? this.InitializeClient(uploadSession.UploadUrl);
this._uploadStream = uploadStream;
this._rangesRemaining = this.GetRangesRemaining(uploadSession);
this._maxSliceSize = maxSliceSize < 0 ? DefaultMaxSliceSize : maxSliceSize;
if (this._maxSliceSize % RequiredSliceSizeIncrement != 0)
{
throw new ArgumentException("Max slice size must be a multiple of 320 KiB", nameof(maxSliceSize));
}
}
/// <summary>
/// Initialize a baseClient to use for the upload that does not have Auth enabled as the upload URLs explicitly do not need authentication.
/// </summary>
/// <param name="uploadUrl">Url to perform the upload to from the session</param>
/// <returns></returns>
private IBaseClient InitializeClient(string uploadUrl)
{
HttpClient httpClient = GraphClientFactory.Create(authenticationProvider: null); //no auth
httpClient.SetFeatureFlag(FeatureFlag.FileUploadTask);
return new BaseClient(uploadUrl, httpClient);
}
/// <summary>
/// Write a slice of data using the UploadSliceRequest.
/// </summary>
/// <param name="uploadSliceRequest">The UploadSliceRequest to make the request with.</param>
/// <param name="exceptionTrackingList">A list of exceptions to use to track progress. SlicedUpload may retry.</param>
private async Task<UploadResult<T>> UploadSliceAsync(UploadSliceRequest<T> uploadSliceRequest, ICollection<Exception> exceptionTrackingList)
{
var firstAttempt = true;
this._uploadStream.Seek(uploadSliceRequest.RangeBegin, SeekOrigin.Begin);
while (true)
{
using (var requestBodyStream = new ReadOnlySubStream(this._uploadStream, uploadSliceRequest.RangeBegin, uploadSliceRequest.RangeLength))
{
try
{
return await uploadSliceRequest.PutAsync(requestBodyStream).ConfigureAwait(false);
}
catch (ServiceException exception)
{
if (exception.IsMatch("generalException") || exception.IsMatch("timeout"))
{
if (firstAttempt)
{
firstAttempt = false;
exceptionTrackingList.Add(exception);
}
else
{
throw;
}
}
else if (exception.IsMatch("invalidRange"))
{
// Succeeded previously, but nothing to return right now
return new UploadResult<T>();
}
else
{
throw;
}
}
}
}
}
/// <summary>
/// Get the series of requests needed to complete the upload session. Call <see cref="UpdateSessionStatusAsync"/>
/// first to update the internal session information.
/// </summary>
/// <returns>All requests currently needed to complete the upload session.</returns>
internal IEnumerable<UploadSliceRequest<T>> GetUploadSliceRequests()
{
foreach (var (item1, item2) in this._rangesRemaining)
{
var currentRangeBegins = item1;
while (currentRangeBegins <= item2)
{
var nextSliceSize = NextSliceSize(currentRangeBegins, item2);
var uploadRequest = new UploadSliceRequest<T>(
this.Session.UploadUrl,
this._client,
currentRangeBegins,
currentRangeBegins + nextSliceSize - 1,
this.TotalUploadLength);
yield return uploadRequest;
currentRangeBegins += nextSliceSize;
}
}
}
/// <summary>
/// Upload the whole session.
/// </summary>
/// <param name="maxTries">Number of times to retry entire session before giving up.</param>
/// <param name="progress">IProgress object to monitor the progress of the upload.</param>
/// <returns>Item information returned by server.</returns>
public async Task<UploadResult<T>> UploadAsync(IProgress<long> progress = null, int maxTries = 3)
{
var uploadTries = 0;
var trackedExceptions = new List<Exception>();
while (uploadTries < maxTries)
{
var sliceRequests = this.GetUploadSliceRequests();
foreach (var request in sliceRequests)
{
var uploadResult = await this.UploadSliceAsync(request, trackedExceptions).ConfigureAwait(false);
progress?.Report(request.RangeBegin);//report the progress of upload
if (uploadResult.UploadSucceeded)
{
return uploadResult;
}
}
await this.UpdateSessionStatusAsync().ConfigureAwait(false);
uploadTries += 1;
if (uploadTries < maxTries)
{
// Exponential back off in case of failures.
await Task.Delay(2000 * uploadTries * uploadTries).ConfigureAwait(false);
}
}
throw new TaskCanceledException("Upload failed too many times. See InnerException for list of exceptions that occured.", new AggregateException(trackedExceptions.ToArray()));
}
/// <summary>
/// Get info about the upload session and resume from where it left off.
/// </summary>
/// <param name="maxTries">Number of times to retry entire session before giving up.</param>
/// <param name="progress">IProgress object to monitor the progress of the upload.</param>
/// <returns>Item information returned by server.</returns>
public async Task<UploadResult<T>> ResumeAsync(IProgress<long> progress = null, int maxTries = 3)
{
var uploadSession = await this.UpdateSessionStatusAsync().ConfigureAwait(false);
var uploadExpirationTime = uploadSession.ExpirationDateTime ?? DateTimeOffset.Now;
// validate that the upload can still be resumed.
if (DateTimeOffset.Compare(uploadExpirationTime, DateTimeOffset.Now) <= 0)
{
throw new ClientException(
new Error
{
Code = ErrorConstants.Codes.Timeout,
Message = ErrorConstants.Messages.ExpiredUploadSession
});
}
return await this.UploadAsync(progress, maxTries).ConfigureAwait(false);
}
/// <summary>
/// Get the status of the session. Stores returned session internally.
/// Updates internal list of ranges remaining to be uploaded (according to the server).
/// </summary>
/// <returns><see cref="IUploadSession"/>> returned by the server.</returns>
public async Task<IUploadSession> UpdateSessionStatusAsync()
{
var request = new UploadSessionRequest(this.Session, this._client);
var newSession = await request.GetAsync().ConfigureAwait(false);
var newRangesRemaining = this.GetRangesRemaining(newSession);
this._rangesRemaining = newRangesRemaining;
newSession.UploadUrl = this.Session.UploadUrl; // Sometimes the UploadUrl is not returned
this.Session = newSession;
return newSession;
}
/// <summary>
/// Delete the session.
/// </summary>
/// <returns>Once returned task is complete, the session has been deleted.</returns>
public async Task DeleteSessionAsync()
{
// validate that the upload can still be deleted.
var uploadExpirationTime = this.Session.ExpirationDateTime ?? DateTimeOffset.Now;
if (DateTimeOffset.Compare(uploadExpirationTime, DateTimeOffset.Now) <= 0)
{
throw new ClientException(
new Error
{
Code = ErrorConstants.Codes.Timeout,
Message = ErrorConstants.Messages.ExpiredUploadSession
});
}
var request = new UploadSessionRequest(this.Session, this._client);
await request.DeleteAsync().ConfigureAwait(false);
}
private List<Tuple<long, long>> GetRangesRemaining(IUploadSession session)
{
// nextExpectedRanges: https://dev.onedrive.com/items/upload_large_files.htm
// Sample: ["12345-55232","77829-99375"]
// Also, second number in range can be blank, which means 'until the end'
var newRangesRemaining = new List<Tuple<long, long>>();
foreach (var range in session.NextExpectedRanges)
{
var rangeSpecifiers = range.Split('-');
newRangesRemaining.Add(new Tuple<long, long>(long.Parse(rangeSpecifiers[0]),
string.IsNullOrEmpty(rangeSpecifiers[1]) ? this.TotalUploadLength - 1 : long.Parse(rangeSpecifiers[1])));
}
return newRangesRemaining;
}
private long NextSliceSize(long rangeBegin, long rangeEnd)
{
var sizeBasedOnRange = rangeEnd - rangeBegin + 1;
return sizeBasedOnRange > this._maxSliceSize
? this._maxSliceSize
: sizeBasedOnRange;
}
}
}