|
using System;
using System.Data.Common;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
namespace System.Data.SqlClient
{
sealed internal class SqlSequentialStream : System.IO.Stream
{
private SqlDataReader _reader; // The SqlDataReader that we are reading data from
private int _columnIndex; // The index of out column in the table
private Task _currentTask; // Holds the current task being processed
private int _readTimeout; // Read timeout for this stream in ms (for Stream.ReadTimeout)
private CancellationTokenSource _disposalTokenSource; // Used to indicate that a cancellation is requested due to disposal
internal SqlSequentialStream(SqlDataReader reader, int columnIndex)
{
Debug.Assert(reader != null, "Null reader when creating sequential stream");
Debug.Assert(columnIndex >= 0, "Invalid column index when creating sequential stream");
_reader = reader;
_columnIndex = columnIndex;
_currentTask = null;
_disposalTokenSource = new CancellationTokenSource();
// Safely safely convert the CommandTimeout from seconds to milliseconds
if ((reader.Command != null) && (reader.Command.CommandTimeout != 0))
{
_readTimeout = (int)Math.Min((long)reader.Command.CommandTimeout * 1000L, (long)Int32.MaxValue);
}
else
{
_readTimeout = Timeout.Infinite;
}
}
public override bool CanRead
{
get { return ((_reader != null) && (!_reader.IsClosed)); }
}
public override bool CanSeek
{
get { return false; }
}
public override bool CanTimeout
{
get { return true; }
}
public override bool CanWrite
{
get { return false; }
}
public override void Flush()
{ }
public override long Length
{
get { throw ADP.NotSupported(); }
}
public override long Position
{
get { throw ADP.NotSupported(); }
set { throw ADP.NotSupported(); }
}
public override int ReadTimeout
{
get { return _readTimeout; }
set
{
if ((value > 0) || (value == Timeout.Infinite))
{
_readTimeout = value;
}
else
{
throw ADP.ArgumentOutOfRange("value");
}
}
}
internal int ColumnIndex
{
get { return _columnIndex; }
}
public override int Read(byte[] buffer, int offset, int count)
{
ValidateReadParameters(buffer, offset, count);
if (!CanRead)
{
throw ADP.ObjectDisposed(this);
}
if (_currentTask != null)
{
throw ADP.AsyncOperationPending();
}
try
{
return _reader.GetBytesInternalSequential(_columnIndex, buffer, offset, count, _readTimeout);
}
catch (SqlException ex)
{
// Stream.Read() can't throw a SqlException - so wrap it in an IOException
throw ADP.ErrorReadingFromStream(ex);
}
}
public override IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback callback, object state)
{
if (!CanRead)
{
// This is checked in ReadAsync - but its a better for the user if it throw here instead of having to wait for EndRead
throw ADP.ObjectDisposed(this);
}
Task readTask = ReadAsync(buffer, offset, count, CancellationToken.None);
if (callback != null)
{
readTask.ContinueWith((t) => callback(t), TaskScheduler.Default);
}
return readTask;
}
public override int EndRead(IAsyncResult asyncResult)
{
if (asyncResult == null)
{
throw ADP.ArgumentNull("asyncResult");
}
// Wait for the task to complete - this will also cause any exceptions to be thrown
Task<int> readTask = (Task<int>)asyncResult;
try
{
readTask.Wait();
}
catch (AggregateException ex)
{
throw ex.InnerException;
}
return readTask.Result;
}
public override Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
ValidateReadParameters(buffer, offset, count);
TaskCompletionSource<int> completion = new TaskCompletionSource<int>();
if (!CanRead)
{
completion.SetException(ADP.ExceptionWithStackTrace(ADP.ObjectDisposed(this)));
}
else
{
try
{
Task original = Interlocked.CompareExchange<Task>(ref _currentTask, completion.Task, null);
if (original != null)
{
completion.SetException(ADP.ExceptionWithStackTrace(ADP.AsyncOperationPending()));
}
else
{
// Set up a combined cancellation token for both the user's and our disposal tokens
CancellationTokenSource combinedTokenSource;
if (!cancellationToken.CanBeCanceled)
{
// Users token is not cancellable - just use ours
combinedTokenSource = _disposalTokenSource;
}
else
{
// Setup registrations from user and disposal token to cancel the combined token
combinedTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, _disposalTokenSource.Token);
}
int bytesRead = 0;
Task<int> getBytesTask = null;
var reader = _reader;
if ((reader != null) && (!cancellationToken.IsCancellationRequested) && (!_disposalTokenSource.Token.IsCancellationRequested))
{
getBytesTask = reader.GetBytesAsync(_columnIndex, buffer, offset, count, _readTimeout, combinedTokenSource.Token, out bytesRead);
}
if (getBytesTask == null)
{
_currentTask = null;
if (cancellationToken.IsCancellationRequested)
{
completion.SetCanceled();
}
else if (!CanRead)
{
completion.SetException(ADP.ExceptionWithStackTrace(ADP.ObjectDisposed(this)));
}
else
{
completion.SetResult(bytesRead);
}
if (combinedTokenSource != _disposalTokenSource)
{
combinedTokenSource.Dispose();
}
}
else
{
getBytesTask.ContinueWith((t) =>
{
_currentTask = null;
// If we completed, but _reader is null (i.e. the stream is closed), then report cancellation
if ((t.Status == TaskStatus.RanToCompletion) && (CanRead))
{
completion.SetResult((int)t.Result);
}
else if (t.Status == TaskStatus.Faulted)
{
if (t.Exception.InnerException is SqlException)
{
// Stream.ReadAsync() can't throw a SqlException - so wrap it in an IOException
completion.SetException(ADP.ExceptionWithStackTrace(ADP.ErrorReadingFromStream(t.Exception.InnerException)));
}
else
{
completion.SetException(t.Exception.InnerException);
}
}
else if (!CanRead)
{
completion.SetException(ADP.ExceptionWithStackTrace(ADP.ObjectDisposed(this)));
}
else
{
completion.SetCanceled();
}
if (combinedTokenSource != _disposalTokenSource)
{
combinedTokenSource.Dispose();
}
}, TaskScheduler.Default);
}
}
}
catch (Exception ex)
{
// In case of any errors, ensure that the completion is completed and the task is set back to null if we switched it
completion.TrySetException(ex);
Interlocked.CompareExchange(ref _currentTask, null, completion.Task);
throw;
}
}
return completion.Task;
}
public override long Seek(long offset, IO.SeekOrigin origin)
{
throw ADP.NotSupported();
}
public override void SetLength(long value)
{
throw ADP.NotSupported();
}
public override void Write(byte[] buffer, int offset, int count)
{
throw ADP.NotSupported();
}
/// <summary>
/// Forces the stream to act as if it was closed (i.e. CanRead=false and Read() throws)
/// This does not actually close the stream, read off the rest of the data or dispose this
/// </summary>
internal void SetClosed()
{
_disposalTokenSource.Cancel();
_reader = null;
// Wait for pending task
var currentTask = _currentTask;
if (currentTask != null)
{
((IAsyncResult)currentTask).AsyncWaitHandle.WaitOne();
}
}
protected override void Dispose(bool disposing)
{
if (disposing)
{
// Set the stream as closed
SetClosed();
}
base.Dispose(disposing);
}
/// <summary>
/// Checks the the parameters passed into a Read() method are valid
/// </summary>
/// <param name="buffer"></param>
/// <param name="index"></param>
/// <param name="count"></param>
internal static void ValidateReadParameters(byte[] buffer, int offset, int count)
{
if (buffer == null)
{
throw ADP.ArgumentNull(ADP.ParameterBuffer);
}
if (offset < 0)
{
throw ADP.ArgumentOutOfRange(ADP.ParameterOffset);
}
if (count < 0)
{
throw ADP.ArgumentOutOfRange(ADP.ParameterCount);
}
try
{
if (checked(offset + count) > buffer.Length)
{
throw ExceptionBuilder.InvalidOffsetLength();
}
}
catch (OverflowException)
{
// If we've overflowed when adding offset and count, then they never would have fit into buffer anyway
throw ExceptionBuilder.InvalidOffsetLength();
}
}
}
}
|