|
// ------------------------------------------------------------------------------
// <copyright file="CommandStream.cs" company="Microsoft">
// Copyright (c) Microsoft Corporation. All rights reserved.
// </copyright>
// ------------------------------------------------------------------------------
//
namespace System.Net {
using System.Collections;
using System.IO;
using System.Security.Cryptography.X509Certificates ;
using System.Net.Sockets;
using System.Security.Permissions;
using System.Text;
using System.Threading;
using System.Security.Authentication;
/// <devdoc>
/// <para>
/// Impliments basic sending and receiving of network commands.
/// Handles generic parsing of server responses and provides
/// a Pipeline sequencing mechnism for sending the commands to the
/// server.
/// </para>
/// </devdoc>
internal class CommandStream : PooledStream {
private static readonly AsyncCallback m_WriteCallbackDelegate = new AsyncCallback(WriteCallback);
private static readonly AsyncCallback m_ReadCallbackDelegate = new AsyncCallback(ReadCallback);
private bool m_RecoverableFailure;
//
// Active variables used for the command state machine
//
protected WebRequest m_Request;
protected bool m_Async;
private bool m_Aborted;
protected PipelineEntry [] m_Commands;
protected int m_Index;
private bool m_DoRead;
private bool m_DoSend;
private ResponseDescription m_CurrentResponseDescription;
protected string m_AbortReason;
const int _WaitingForPipeline = 1;
const int _CompletedPipeline = 2;
/// <devdoc>
/// <para>
/// Setups and Creates a NetworkStream connection to the server
/// perform any initalization if needed
/// </para>
/// </devdoc>
internal CommandStream(
ConnectionPool connectionPool,
TimeSpan lifetime,
bool checkLifetime
) : base(connectionPool, lifetime, checkLifetime) {
m_Decoder = m_Encoding.GetDecoder();
}
internal virtual void Abort(Exception e) {
GlobalLog.Print("CommandStream"+ValidationHelper.HashString(this)+"::Abort() - closing control Stream");
lock (this) {
if (m_Aborted)
return;
m_Aborted = true;
CanBePooled = false;
}
try {
base.Close(0);
}
finally {
if (e != null) {
InvokeRequestCallback(e);
} else {
InvokeRequestCallback(null);
}
}
}
/// <summary>
/// <para>Used to reset the connection</para>
/// </summary>
protected override void Dispose(bool disposing) {
GlobalLog.Print("CommandStream"+ValidationHelper.HashString(this)+"::Close()");
InvokeRequestCallback(null);
// Do not call base.Dispose(bool), which would close the web request.
// This stream effectively should be a wrapper around a web
// request that does not own the web request.
}
/// <summary>
/// <para>A WebRequest can use a different Connection after an Exception is set, or a null is passed
/// to mark completion. We shouldn't continue calling the Request.RequestCallback after that point</para>
/// </summary>
protected void InvokeRequestCallback(object obj) {
WebRequest webRequest = m_Request;
if (webRequest != null) {
webRequest.RequestCallback(obj);
}
}
/// <summary>
/// <para>Indicates that we caught an error that should allow us to resubmit a request</para>
/// </summary>
internal bool RecoverableFailure {
get {
return m_RecoverableFailure;
}
}
/// <summary>
/// <para>We only offer recovery, if we're at the start of the first command</para>
/// </summary>
protected void MarkAsRecoverableFailure() {
if (m_Index <= 1) {
m_RecoverableFailure = true;
}
}
/// <devdoc>
/// <para>
/// Setups and Creates a NetworkStream connection to the server
/// perform any initalization if needed
/// </para>
/// </devdoc>
internal Stream SubmitRequest(WebRequest request, bool async, bool readInitalResponseOnConnect) {
ClearState();
UpdateLifetime();
PipelineEntry [] commands = BuildCommandsList(request);
InitCommandPipeline(request, commands, async);
if(readInitalResponseOnConnect && JustConnected){
m_DoSend = false;
m_Index = -1;
}
return ContinueCommandPipeline();
}
protected virtual void ClearState() {
InitCommandPipeline(null, null, false);
}
protected virtual PipelineEntry [] BuildCommandsList(WebRequest request) {
return null;
}
protected Exception GenerateException(WebExceptionStatus status, Exception innerException) {
return new WebException(
NetRes.GetWebStatusString("net_connclosed", status),
innerException,
status,
null /* no response */ );
}
protected Exception GenerateException(FtpStatusCode code, string statusDescription, Exception innerException) {
return new WebException(SR.GetString(SR.net_servererror,NetRes.GetWebStatusCodeString(code, statusDescription)),
innerException,WebExceptionStatus.ProtocolError,null );
}
protected void InitCommandPipeline(WebRequest request, PipelineEntry [] commands, bool async) {
m_Commands = commands;
m_Index = 0;
m_Request = request;
m_Aborted = false;
m_DoRead = true;
m_DoSend = true;
m_CurrentResponseDescription = null;
m_Async = async;
m_RecoverableFailure = false;
m_AbortReason = string.Empty;
}
internal void CheckContinuePipeline()
{
if (m_Async)
return;
try {
ContinueCommandPipeline();
} catch (Exception e) {
Abort(e);
}
}
/// Pipelined command resoluton, how this works:
/// a list of commands that need to be sent to the FTP server are spliced together into a array,
/// each command such STOR, PORT, etc, is sent to the server, then the response is parsed into a string,
/// with the response, the delegate is called, which returns an instruction (either continue, stop, or read additional
/// responses from server).
///
/// When done calling Close() to Notify ConnectionGroup that we are free
protected Stream ContinueCommandPipeline()
{
// In async case, The BeginWrite can actually result in a
// series of synchronous completions that eventually close
// the connection. So we need to save the members that
// we need to access, since they may not be valid after
// BeginWrite returns
bool async = m_Async;
while (m_Index < m_Commands.Length)
{
if (m_DoSend)
{
if (m_Index < 0)
throw new InternalException();
byte[] sendBuffer = Encoding.GetBytes(m_Commands[m_Index].Command);
if (Logging.On)
{
string sendCommand = m_Commands[m_Index].Command.Substring(0, m_Commands[m_Index].Command.Length-2);
if (m_Commands[m_Index].HasFlag(PipelineEntryFlags.DontLogParameter))
{
int index = sendCommand.IndexOf(' ');
if (index != -1)
sendCommand = sendCommand.Substring(0, index) + " ********";
}
Logging.PrintInfo(Logging.Web, this, SR.GetString(SR.net_log_sending_command, sendCommand));
}
try {
if (async) {
BeginWrite(sendBuffer, 0, sendBuffer.Length, m_WriteCallbackDelegate, this);
} else {
Write(sendBuffer, 0, sendBuffer.Length);
}
} catch (IOException) {
MarkAsRecoverableFailure();
throw;
} catch {
throw;
}
if (async) {
return null;
}
}
Stream stream = null;
bool isReturn = PostSendCommandProcessing(ref stream);
if (isReturn)
{
return stream;
}
}
lock (this)
{
Close();
}
return null;
}
//
private bool PostSendCommandProcessing(ref Stream stream)
{
/*
** I don;t see how this code can be still relevant, remove it of no problems observed **
//
// This is a general race condition in Sync mode, if the server returns an error
// after we open the data connection, we will be off reading the data connection,
// and not the control connection. The best we can do is try to poll, and in the
// the worst case, we will timeout on establishing the data connection.
//
if (!m_DoRead && !m_Async) {
m_DoRead = Poll(100 * 1000, SelectMode.SelectRead); // Poll is in Microseconds.
}
*/
if (m_DoRead)
{
// In async case, The next call can actually result in a
// series of synchronous completions that eventually close
// the connection. So we need to save the members that
// we need to access, since they may not be valid after the
// next call returns
bool async = m_Async;
int index = m_Index;
PipelineEntry[] commands = m_Commands;
try {
ResponseDescription response = ReceiveCommandResponse();
if (async) {
return true;
}
m_CurrentResponseDescription = response;
} catch {
// If we get an exception on the QUIT command (which is
// always the last command), ignore the final exception
// and continue with the pipeline regardlss of sync/async
if (index < 0 || index >= commands.Length ||
commands[index].Command != "QUIT\r\n")
throw;
}
}
return PostReadCommandProcessing(ref stream);
}
//
private bool PostReadCommandProcessing(ref Stream stream)
{
if (m_Index >= m_Commands.Length)
return false;
// Set up front to prevent a race condition on result == PipelineInstruction.Pause
m_DoSend = false;
m_DoRead = false;
PipelineInstruction result;
PipelineEntry entry;
if(m_Index == -1)
entry = null;
else
entry = m_Commands[m_Index];
// Final QUIT command may get exceptions since the connectin
// may be already closed by the server. So there is no response
// to process, just advance the pipeline to continue
if (m_CurrentResponseDescription == null && entry.Command == "QUIT\r\n")
result = PipelineInstruction.Advance;
else
result = PipelineCallback(entry, m_CurrentResponseDescription, false, ref stream);
if (result == PipelineInstruction.Abort)
{
Exception exception;
if (m_AbortReason != string.Empty)
exception = new WebException(m_AbortReason);
else
exception = GenerateException(WebExceptionStatus.ServerProtocolViolation, null);
Abort(exception);
throw exception;
}
else if (result == PipelineInstruction.Advance)
{
m_CurrentResponseDescription = null;
m_DoSend = true;
m_DoRead = true;
m_Index++;
}
else if (result == PipelineInstruction.Pause)
{
//
// PipelineCallback did an async operation and will have to re-enter again
// Hold on for now
//
return true;
}
else if (result == PipelineInstruction.GiveStream)
{
//
// We will have another response coming, don't send
//
m_CurrentResponseDescription = null;
m_DoRead = true;
if (m_Async)
{
// If they block in the requestcallback we should still continue the pipeline
ContinueCommandPipeline();
InvokeRequestCallback(stream);
}
return true;
}
else if (result == PipelineInstruction.Reread)
{
// Another response is expected after this one
m_CurrentResponseDescription = null;
m_DoRead = true;
}
return false;
}
internal enum PipelineInstruction {
Abort, // aborts the pipeline
Advance, // advances to the next pipelined command
Pause, // Let async callback to continue the pipeline
Reread, // rereads from the command socket
GiveStream, // returns with open data stream, let stream close to continue
}
[Flags]
internal enum PipelineEntryFlags {
UserCommand = 0x1,
GiveDataStream = 0x2,
CreateDataConnection = 0x4,
DontLogParameter = 0x8
}
internal class PipelineEntry {
internal PipelineEntry(string command) {
Command = command;
}
internal PipelineEntry(string command, PipelineEntryFlags flags) {
Command = command;
Flags = flags;
}
internal bool HasFlag(PipelineEntryFlags flags) {
return (Flags & flags) != 0;
}
internal string Command;
internal PipelineEntryFlags Flags;
}
protected virtual PipelineInstruction PipelineCallback(PipelineEntry entry, ResponseDescription response, bool timeout, ref Stream stream) {
return PipelineInstruction.Abort;
}
//
// I/O callback methods
//
/// <summary>
/// <para>Provides a wrapper for the async operations, so that the code can be shared with sync</para>
/// </summary>
private static void ReadCallback(IAsyncResult asyncResult) {
ReceiveState state = (ReceiveState)asyncResult.AsyncState;
try {
Stream stream = (Stream)state.Connection;
int bytesRead = 0;
try {
bytesRead = stream.EndRead(asyncResult);
if (bytesRead == 0)
state.Connection.CloseSocket();
}
catch (IOException) {
state.Connection.MarkAsRecoverableFailure();
throw;
}
catch {
throw;
}
state.Connection.ReceiveCommandResponseCallback(state, bytesRead);
} catch (Exception e) {
state.Connection.Abort(e);
}
}
/// <summary>
/// <para>Provides a wrapper for the async write operations</para>
/// </summary>
private static void WriteCallback(IAsyncResult asyncResult) {
CommandStream connection = (CommandStream)asyncResult.AsyncState;
try {
try {
connection.EndWrite(asyncResult);
}
catch (IOException) {
connection.MarkAsRecoverableFailure();
throw;
}
catch {
throw;
}
Stream stream = null;
if (connection.PostSendCommandProcessing(ref stream))
return;
connection.ContinueCommandPipeline();
} catch (Exception e) {
connection.Abort(e);
}
}
//
// Read parsing methods and privates
//
private string m_Buffer = string.Empty;
private Encoding m_Encoding = Encoding.UTF8;
private Decoder m_Decoder;
protected Encoding Encoding {
get {
return m_Encoding;
}
set {
m_Encoding = value;
m_Decoder = m_Encoding.GetDecoder();
}
}
/// <summary>
/// This function is called a derived class to determine whether a response is valid, and when it is complete.
/// </summary>
protected virtual bool CheckValid(ResponseDescription response, ref int validThrough, ref int completeLength) {
return false;
}
/// <summary>
/// Kicks off an asynchronous or sync request to receive a response from the server.
/// Uses the Encoding <code>encoding</code> to transform the bytes received into a string to be
/// returned in the GeneralResponseDescription's StatusDescription field.
/// </summary>
private ResponseDescription ReceiveCommandResponse()
{
// These are the things that will be needed to maintain state
ReceiveState state = new ReceiveState(this);
try
{
// If a string of nonzero length was decoded from the buffered bytes after the last complete response, then we
// will use this string as our first string to append to the response StatusBuffer, and we will
// forego a Connection.Receive here.
if(m_Buffer.Length > 0)
{
ReceiveCommandResponseCallback(state, -1);
}
else
{
int bytesRead;
try {
if (m_Async) {
BeginRead(state.Buffer, 0, state.Buffer.Length, m_ReadCallbackDelegate, state);
return null;
} else {
bytesRead = Read(state.Buffer, 0, state.Buffer.Length);
if (bytesRead == 0)
CloseSocket();
ReceiveCommandResponseCallback(state, bytesRead);
}
}
catch (IOException) {
MarkAsRecoverableFailure();
throw;
}
catch {
throw;
}
}
}
catch(Exception e) {
if (e is WebException)
throw;
throw GenerateException(WebExceptionStatus.ReceiveFailure, e);
}
return state.Resp;
}
/// <summary>
/// ReceiveCommandResponseCallback is the main "while loop" of the ReceiveCommandResponse function family.
/// In general, what is does is perform an EndReceive() to complete the previous retrieval of bytes from the
/// server (unless it is using a buffered response) It then processes what is received by using the
/// implementing class's CheckValid() function, as described above. If the response is complete, it returns the single complete
/// response in the GeneralResponseDescription created in BeginReceiveComamndResponse, and buffers the rest as described above.
///
/// If the resposne is not complete, it issues another Connection.BeginReceive, with callback ReceiveCommandResponse2,
/// so the action will continue at the next invocation of ReceiveCommandResponse2.
/// </summary>
/// <param name="asyncResult"></param>
///
private void ReceiveCommandResponseCallback(ReceiveState state, int bytesRead)
{
// completeLength will be set to a nonnegative number by CheckValid if the response is complete:
// it will set completeLength to the length of a complete response.
int completeLength = -1;
while (true)
{
int validThrough = state.ValidThrough; // passed to checkvalid
// If we have a Buffered response (ie data was received with the last response that was past the end of that response)
// deal with it as if we had just received it now instead of actually doing another receive
if(m_Buffer.Length > 0)
{
// Append the string we got from the buffer, and flush it out.
state.Resp.StatusBuffer.Append(m_Buffer);
m_Buffer = string.Empty;
// invoke checkvalid.
if(!CheckValid(state.Resp, ref validThrough, ref completeLength)) {
throw GenerateException(WebExceptionStatus.ServerProtocolViolation, null);
}
}
else // we did a Connection.BeginReceive. Note that in this case, all bytes received are in the receive buffer (because bytes from
// the buffer were transferred there if necessary
{
// this indicates the connection was closed.
if(bytesRead <= 0) {
throw GenerateException(WebExceptionStatus.ServerProtocolViolation, null);
}
// decode the bytes in the receive buffer into a string, append it to the statusbuffer, and invoke checkvalid.
// Decoder automatically takes care of caching partial codepoints at the end of a buffer.
char[] chars = new char[m_Decoder.GetCharCount(state.Buffer, 0, bytesRead)];
int numChars = m_Decoder.GetChars(state.Buffer, 0, bytesRead, chars, 0, false);
string szResponse = new string(chars, 0, numChars);
state.Resp.StatusBuffer.Append(szResponse);
if(!CheckValid(state.Resp, ref validThrough, ref completeLength))
{
throw GenerateException(WebExceptionStatus.ServerProtocolViolation, null);
}
// If the response is complete, then determine how many characters are left over...these bytes need to be set into Buffer.
if(completeLength >= 0)
{
int unusedChars = state.Resp.StatusBuffer.Length - completeLength;
if (unusedChars > 0) {
m_Buffer = szResponse.Substring(szResponse.Length-unusedChars, unusedChars);
}
}
}
// Now, in general, if the response is not complete, update the "valid through" length for the efficiency of checkValid.
// and perform the next receive.
// Note that there may NOT be bytes in the beginning of the receive buffer (even if there were partial characters left over after the
// last encoding), because they get tracked in the Decoder.
if(completeLength < 0)
{
state.ValidThrough = validThrough;
try {
if (m_Async) {
BeginRead(state.Buffer, 0, state.Buffer.Length, m_ReadCallbackDelegate, state);
return;
} else {
bytesRead = Read(state.Buffer, 0, state.Buffer.Length);
if (bytesRead == 0)
CloseSocket();
continue;
}
}
catch (IOException) {
MarkAsRecoverableFailure();
throw;
}
catch {
throw;
}
}
// the response is completed
break;
}
// Otherwise, we have a complete response.
string responseString = state.Resp.StatusBuffer.ToString();
state.Resp.StatusDescription = responseString.Substring(0, completeLength);
// set the StatusDescription to the complete part of the response. Note that the Buffer has already been taken care of above.
if (Logging.On) Logging.PrintInfo(Logging.Web, this, SR.GetString(SR.net_log_received_response, responseString.Substring(0, completeLength-2)));
if (m_Async) {
// Tell who is listening what was received.
if (state.Resp != null) {
m_CurrentResponseDescription = state.Resp;
}
Stream stream = null;
if (PostReadCommandProcessing(ref stream))
return;
ContinueCommandPipeline();
}
}
} // class CommandStream
/// <summary>
/// Contains the parsed status line from the server
/// </summary>
internal class ResponseDescription {
internal const int NoStatus = -1;
internal bool Multiline = false;
internal int Status = NoStatus;
internal string StatusDescription;
internal StringBuilder StatusBuffer = new StringBuilder();
internal string StatusCodeString;
internal bool PositiveIntermediate { get { return (Status >= 100 && Status <= 199); } }
internal bool PositiveCompletion { get { return (Status >= 200 && Status <= 299); } }
//internal bool PositiveAuthRelated { get { return (Status >= 300 && Status <= 399); } }
internal bool TransientFailure { get { return (Status >= 400 && Status <= 499); } }
internal bool PermanentFailure { get { return (Status >= 500 && Status <= 599); } }
internal bool InvalidStatusCode { get { return (Status < 100 || Status > 599); } }
}
/// <summary>
/// State information that is used during ReceiveCommandResponse()'s async operations
/// </summary>
internal class ReceiveState
{
private const int bufferSize = 1024;
internal ResponseDescription Resp;
internal int ValidThrough;
internal byte[] Buffer;
internal CommandStream Connection;
internal ReceiveState(CommandStream connection)
{
Connection = connection;
Resp = new ResponseDescription();
Buffer = new byte[bufferSize]; //1024
ValidThrough = 0;
}
}
} // namespace System.Net
|