| // Copyright (c) 2010 The Chromium Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| #include "net/spdy/spdy_stream.h" |
| |
| #include "base/logging.h" |
| #include "base/message_loop.h" |
| #include "base/values.h" |
| #include "net/spdy/spdy_session.h" |
| |
| namespace net { |
| |
| namespace { |
| |
| class NetLogSpdyStreamWindowUpdateParameter : public NetLog::EventParameters { |
| public: |
| NetLogSpdyStreamWindowUpdateParameter(spdy::SpdyStreamId stream_id, |
| int delta, |
| int window_size) |
| : stream_id_(stream_id), delta_(delta), window_size_(window_size) {} |
| virtual Value* ToValue() const { |
| DictionaryValue* dict = new DictionaryValue(); |
| dict->SetInteger("id", static_cast<int>(stream_id_)); |
| dict->SetInteger("delta", delta_); |
| dict->SetInteger("window_size", window_size_); |
| return dict; |
| } |
| private: |
| const spdy::SpdyStreamId stream_id_; |
| const int delta_; |
| const int window_size_; |
| DISALLOW_COPY_AND_ASSIGN(NetLogSpdyStreamWindowUpdateParameter); |
| }; |
| |
| } |
| |
| SpdyStream::SpdyStream(SpdySession* session, |
| spdy::SpdyStreamId stream_id, |
| bool pushed, |
| const BoundNetLog& net_log) |
| : continue_buffering_data_(true), |
| stream_id_(stream_id), |
| priority_(0), |
| stalled_by_flow_control_(false), |
| send_window_size_(spdy::kSpdyStreamInitialWindowSize), |
| recv_window_size_(spdy::kSpdyStreamInitialWindowSize), |
| pushed_(pushed), |
| response_received_(false), |
| session_(session), |
| delegate_(NULL), |
| request_time_(base::Time::Now()), |
| response_(new spdy::SpdyHeaderBlock), |
| io_state_(STATE_NONE), |
| response_status_(OK), |
| cancelled_(false), |
| has_upload_data_(false), |
| net_log_(net_log), |
| send_bytes_(0), |
| recv_bytes_(0) { |
| } |
| |
| SpdyStream::~SpdyStream() { |
| UpdateHistograms(); |
| } |
| |
| void SpdyStream::SetDelegate(Delegate* delegate) { |
| CHECK(delegate); |
| delegate_ = delegate; |
| |
| if (pushed_) { |
| CHECK(response_received()); |
| MessageLoop::current()->PostTask( |
| FROM_HERE, NewRunnableMethod(this, |
| &SpdyStream::PushedStreamReplayData)); |
| } else { |
| continue_buffering_data_ = false; |
| } |
| } |
| |
| void SpdyStream::PushedStreamReplayData() { |
| if (cancelled_ || !delegate_) |
| return; |
| |
| continue_buffering_data_ = false; |
| |
| int rv = delegate_->OnResponseReceived(*response_, response_time_, OK); |
| if (rv == ERR_INCOMPLETE_SPDY_HEADERS) { |
| // We don't have complete headers. Assume we're waiting for another |
| // HEADERS frame. Since we don't have headers, we had better not have |
| // any pending data frames. |
| DCHECK_EQ(0U, pending_buffers_.size()); |
| return; |
| } |
| |
| std::vector<scoped_refptr<IOBufferWithSize> > buffers; |
| buffers.swap(pending_buffers_); |
| for (size_t i = 0; i < buffers.size(); ++i) { |
| // It is always possible that a callback to the delegate results in |
| // the delegate no longer being available. |
| if (!delegate_) |
| break; |
| if (buffers[i]) { |
| delegate_->OnDataReceived(buffers[i]->data(), buffers[i]->size()); |
| } else { |
| delegate_->OnDataReceived(NULL, 0); |
| session_->CloseStream(stream_id_, net::OK); |
| // Note: |this| may be deleted after calling CloseStream. |
| DCHECK_EQ(buffers.size() - 1, i); |
| } |
| } |
| } |
| |
| void SpdyStream::DetachDelegate() { |
| if (delegate_) |
| delegate_->set_chunk_callback(NULL); |
| delegate_ = NULL; |
| if (!closed()) |
| Cancel(); |
| } |
| |
| const linked_ptr<spdy::SpdyHeaderBlock>& SpdyStream::spdy_headers() const { |
| return request_; |
| } |
| |
| void SpdyStream::set_spdy_headers( |
| const linked_ptr<spdy::SpdyHeaderBlock>& headers) { |
| request_ = headers; |
| } |
| |
| void SpdyStream::IncreaseSendWindowSize(int delta_window_size) { |
| DCHECK_GE(delta_window_size, 1); |
| int new_window_size = send_window_size_ + delta_window_size; |
| |
| // We should ignore WINDOW_UPDATEs received before or after this state, |
| // since before means we've not written SYN_STREAM yet (i.e. it's too |
| // early) and after means we've written a DATA frame with FIN bit. |
| if (io_state_ != STATE_SEND_BODY_COMPLETE) |
| return; |
| |
| // it's valid for send_window_size_ to become negative (via an incoming |
| // SETTINGS), in which case incoming WINDOW_UPDATEs will eventually make |
| // it positive; however, if send_window_size_ is positive and incoming |
| // WINDOW_UPDATE makes it negative, we have an overflow. |
| if (send_window_size_ > 0 && new_window_size < 0) { |
| LOG(WARNING) << "Received WINDOW_UPDATE [delta:" << delta_window_size |
| << "] for stream " << stream_id_ |
| << " overflows send_window_size_ [current:" |
| << send_window_size_ << "]"; |
| session_->ResetStream(stream_id_, spdy::FLOW_CONTROL_ERROR); |
| return; |
| } |
| |
| send_window_size_ = new_window_size; |
| |
| net_log_.AddEvent( |
| NetLog::TYPE_SPDY_STREAM_SEND_WINDOW_UPDATE, |
| make_scoped_refptr(new NetLogSpdyStreamWindowUpdateParameter( |
| stream_id_, delta_window_size, send_window_size_))); |
| if (stalled_by_flow_control_) { |
| stalled_by_flow_control_ = false; |
| io_state_ = STATE_SEND_BODY; |
| DoLoop(OK); |
| } |
| } |
| |
| void SpdyStream::DecreaseSendWindowSize(int delta_window_size) { |
| // we only call this method when sending a frame, therefore |
| // |delta_window_size| should be within the valid frame size range. |
| DCHECK_GE(delta_window_size, 1); |
| DCHECK_LE(delta_window_size, kMaxSpdyFrameChunkSize); |
| |
| // |send_window_size_| should have been at least |delta_window_size| for |
| // this call to happen. |
| DCHECK_GE(send_window_size_, delta_window_size); |
| |
| send_window_size_ -= delta_window_size; |
| |
| net_log_.AddEvent( |
| NetLog::TYPE_SPDY_STREAM_SEND_WINDOW_UPDATE, |
| make_scoped_refptr(new NetLogSpdyStreamWindowUpdateParameter( |
| stream_id_, -delta_window_size, send_window_size_))); |
| } |
| |
| void SpdyStream::IncreaseRecvWindowSize(int delta_window_size) { |
| DCHECK_GE(delta_window_size, 1); |
| // By the time a read is isued, stream may become inactive. |
| if (!session_->IsStreamActive(stream_id_)) |
| return; |
| int new_window_size = recv_window_size_ + delta_window_size; |
| if (recv_window_size_ > 0) |
| DCHECK(new_window_size > 0); |
| |
| recv_window_size_ = new_window_size; |
| net_log_.AddEvent( |
| NetLog::TYPE_SPDY_STREAM_RECV_WINDOW_UPDATE, |
| make_scoped_refptr(new NetLogSpdyStreamWindowUpdateParameter( |
| stream_id_, delta_window_size, recv_window_size_))); |
| session_->SendWindowUpdate(stream_id_, delta_window_size); |
| } |
| |
| void SpdyStream::DecreaseRecvWindowSize(int delta_window_size) { |
| DCHECK_GE(delta_window_size, 1); |
| |
| recv_window_size_ -= delta_window_size; |
| net_log_.AddEvent( |
| NetLog::TYPE_SPDY_STREAM_RECV_WINDOW_UPDATE, |
| make_scoped_refptr(new NetLogSpdyStreamWindowUpdateParameter( |
| stream_id_, -delta_window_size, recv_window_size_))); |
| |
| // Since we never decrease the initial window size, we should never hit |
| // a negative |recv_window_size_|, if we do, it's a flow-control violation. |
| if (recv_window_size_ < 0) |
| session_->ResetStream(stream_id_, spdy::FLOW_CONTROL_ERROR); |
| } |
| |
| int SpdyStream::GetPeerAddress(AddressList* address) const { |
| return session_->GetPeerAddress(address); |
| } |
| |
| int SpdyStream::GetLocalAddress(IPEndPoint* address) const { |
| return session_->GetLocalAddress(address); |
| } |
| |
| bool SpdyStream::WasEverUsed() const { |
| return session_->WasEverUsed(); |
| } |
| |
| base::Time SpdyStream::GetRequestTime() const { |
| return request_time_; |
| } |
| |
| void SpdyStream::SetRequestTime(base::Time t) { |
| request_time_ = t; |
| } |
| |
| int SpdyStream::OnResponseReceived(const spdy::SpdyHeaderBlock& response) { |
| int rv = OK; |
| |
| metrics_.StartStream(); |
| |
| DCHECK(response_->empty()); |
| *response_ = response; // TODO(ukai): avoid copy. |
| |
| recv_first_byte_time_ = base::TimeTicks::Now(); |
| response_time_ = base::Time::Now(); |
| |
| // If we receive a response before we are in STATE_WAITING_FOR_RESPONSE, then |
| // the server has sent the SYN_REPLY too early. |
| if (!pushed_ && io_state_ != STATE_WAITING_FOR_RESPONSE) |
| return ERR_SPDY_PROTOCOL_ERROR; |
| if (pushed_) |
| CHECK(io_state_ == STATE_NONE); |
| io_state_ = STATE_OPEN; |
| |
| if (delegate_) |
| rv = delegate_->OnResponseReceived(*response_, response_time_, rv); |
| // If delegate_ is not yet attached, we'll call OnResponseReceived after the |
| // delegate gets attached to the stream. |
| |
| return rv; |
| } |
| |
| int SpdyStream::OnHeaders(const spdy::SpdyHeaderBlock& headers) { |
| DCHECK(!response_->empty()); |
| |
| // Append all the headers into the response header block. |
| for (spdy::SpdyHeaderBlock::const_iterator it = headers.begin(); |
| it != headers.end(); ++it) { |
| // Disallow duplicate headers. This is just to be conservative. |
| if ((*response_).find(it->first) != (*response_).end()) { |
| LOG(WARNING) << "HEADERS duplicate header"; |
| response_status_ = ERR_SPDY_PROTOCOL_ERROR; |
| return ERR_SPDY_PROTOCOL_ERROR; |
| } |
| |
| (*response_)[it->first] = it->second; |
| } |
| |
| int rv = OK; |
| if (delegate_) { |
| rv = delegate_->OnResponseReceived(*response_, response_time_, rv); |
| // ERR_INCOMPLETE_SPDY_HEADERS means that we are waiting for more |
| // headers before the response header block is complete. |
| if (rv == ERR_INCOMPLETE_SPDY_HEADERS) |
| rv = OK; |
| } |
| return rv; |
| } |
| |
| void SpdyStream::OnDataReceived(const char* data, int length) { |
| DCHECK_GE(length, 0); |
| |
| // If we don't have a response, then the SYN_REPLY did not come through. |
| // We cannot pass data up to the caller unless the reply headers have been |
| // received. |
| if (!response_received()) { |
| session_->CloseStream(stream_id_, ERR_SYN_REPLY_NOT_RECEIVED); |
| return; |
| } |
| |
| if (!delegate_ || continue_buffering_data_) { |
| // It should be valid for this to happen in the server push case. |
| // We'll return received data when delegate gets attached to the stream. |
| if (length > 0) { |
| IOBufferWithSize* buf = new IOBufferWithSize(length); |
| memcpy(buf->data(), data, length); |
| pending_buffers_.push_back(make_scoped_refptr(buf)); |
| } else { |
| pending_buffers_.push_back(NULL); |
| metrics_.StopStream(); |
| // Note: we leave the stream open in the session until the stream |
| // is claimed. |
| } |
| return; |
| } |
| |
| CHECK(!closed()); |
| |
| // A zero-length read means that the stream is being closed. |
| if (!length) { |
| metrics_.StopStream(); |
| session_->CloseStream(stream_id_, net::OK); |
| // Note: |this| may be deleted after calling CloseStream. |
| return; |
| } |
| |
| if (session_->flow_control()) |
| DecreaseRecvWindowSize(length); |
| |
| // Track our bandwidth. |
| metrics_.RecordBytes(length); |
| recv_bytes_ += length; |
| recv_last_byte_time_ = base::TimeTicks::Now(); |
| |
| if (!delegate_) { |
| // It should be valid for this to happen in the server push case. |
| // We'll return received data when delegate gets attached to the stream. |
| IOBufferWithSize* buf = new IOBufferWithSize(length); |
| memcpy(buf->data(), data, length); |
| pending_buffers_.push_back(make_scoped_refptr(buf)); |
| return; |
| } |
| |
| delegate_->OnDataReceived(data, length); |
| } |
| |
| // This function is only called when an entire frame is written. |
| void SpdyStream::OnWriteComplete(int bytes) { |
| DCHECK_LE(0, bytes); |
| send_bytes_ += bytes; |
| if (cancelled() || closed()) |
| return; |
| DoLoop(bytes); |
| } |
| |
| void SpdyStream::OnChunkAvailable() { |
| DCHECK(io_state_ == STATE_SEND_HEADERS || io_state_ == STATE_SEND_BODY || |
| io_state_ == STATE_SEND_BODY_COMPLETE); |
| if (io_state_ == STATE_SEND_BODY) |
| OnWriteComplete(0); |
| } |
| |
| void SpdyStream::OnClose(int status) { |
| io_state_ = STATE_DONE; |
| response_status_ = status; |
| Delegate* delegate = delegate_; |
| delegate_ = NULL; |
| if (delegate) { |
| delegate->set_chunk_callback(NULL); |
| delegate->OnClose(status); |
| } |
| } |
| |
| void SpdyStream::Cancel() { |
| if (cancelled()) |
| return; |
| |
| cancelled_ = true; |
| if (session_->IsStreamActive(stream_id_)) |
| session_->ResetStream(stream_id_, spdy::CANCEL); |
| } |
| |
| int SpdyStream::SendRequest(bool has_upload_data) { |
| if (delegate_) |
| delegate_->set_chunk_callback(this); |
| |
| // Pushed streams do not send any data, and should always be in STATE_OPEN or |
| // STATE_DONE. However, we still want to return IO_PENDING to mimic non-push |
| // behavior. |
| has_upload_data_ = has_upload_data; |
| if (pushed_) { |
| send_time_ = base::TimeTicks::Now(); |
| DCHECK(!has_upload_data_); |
| DCHECK(response_received()); |
| return ERR_IO_PENDING; |
| } |
| CHECK_EQ(STATE_NONE, io_state_); |
| io_state_ = STATE_SEND_HEADERS; |
| return DoLoop(OK); |
| } |
| |
| int SpdyStream::WriteStreamData(IOBuffer* data, int length, |
| spdy::SpdyDataFlags flags) { |
| return session_->WriteStreamData(stream_id_, data, length, flags); |
| } |
| |
| bool SpdyStream::GetSSLInfo(SSLInfo* ssl_info, bool* was_npn_negotiated) { |
| return session_->GetSSLInfo(ssl_info, was_npn_negotiated); |
| } |
| |
| bool SpdyStream::GetSSLCertRequestInfo(SSLCertRequestInfo* cert_request_info) { |
| return session_->GetSSLCertRequestInfo(cert_request_info); |
| } |
| |
| bool SpdyStream::HasUrl() const { |
| if (pushed_) |
| return response_received(); |
| return request_.get() != NULL; |
| } |
| |
| GURL SpdyStream::GetUrl() const { |
| DCHECK(HasUrl()); |
| |
| if (pushed_) { |
| // assemble from the response |
| std::string url; |
| spdy::SpdyHeaderBlock::const_iterator it; |
| it = response_->find("url"); |
| if (it != (*response_).end()) |
| url = it->second; |
| return GURL(url); |
| } |
| |
| // assemble from the request |
| std::string scheme; |
| std::string host_port; |
| std::string path; |
| spdy::SpdyHeaderBlock::const_iterator it; |
| it = request_->find("scheme"); |
| if (it != (*request_).end()) |
| scheme = it->second; |
| it = request_->find("host"); |
| if (it != (*request_).end()) |
| host_port = it->second; |
| it = request_->find("path"); |
| if (it != (*request_).end()) |
| path = it->second; |
| std::string url = scheme + "://" + host_port + path; |
| return GURL(url); |
| } |
| |
| int SpdyStream::DoLoop(int result) { |
| do { |
| State state = io_state_; |
| io_state_ = STATE_NONE; |
| switch (state) { |
| // State machine 1: Send headers and body. |
| case STATE_SEND_HEADERS: |
| CHECK_EQ(OK, result); |
| result = DoSendHeaders(); |
| break; |
| case STATE_SEND_HEADERS_COMPLETE: |
| result = DoSendHeadersComplete(result); |
| break; |
| case STATE_SEND_BODY: |
| CHECK_EQ(OK, result); |
| result = DoSendBody(); |
| break; |
| case STATE_SEND_BODY_COMPLETE: |
| result = DoSendBodyComplete(result); |
| break; |
| // This is an intermediary waiting state. This state is reached when all |
| // data has been sent, but no data has been received. |
| case STATE_WAITING_FOR_RESPONSE: |
| io_state_ = STATE_WAITING_FOR_RESPONSE; |
| result = ERR_IO_PENDING; |
| break; |
| // State machine 2: connection is established. |
| // In STATE_OPEN, OnResponseReceived has already been called. |
| // OnDataReceived, OnClose and OnWriteCompelte can be called. |
| // Only OnWriteCompletee calls DoLoop((). |
| // |
| // For HTTP streams, no data is sent from the client while in the OPEN |
| // state, so OnWriteComplete is never called here. The HTTP body is |
| // handled in the OnDataReceived callback, which does not call into |
| // DoLoop. |
| // |
| // For WebSocket streams, which are bi-directional, we'll send and |
| // receive data once the connection is established. Received data is |
| // handled in OnDataReceived. Sent data is handled in OnWriteComplete, |
| // which calls DoOpen(). |
| case STATE_OPEN: |
| result = DoOpen(result); |
| break; |
| |
| case STATE_DONE: |
| DCHECK(result != ERR_IO_PENDING); |
| break; |
| default: |
| NOTREACHED() << io_state_; |
| break; |
| } |
| } while (result != ERR_IO_PENDING && io_state_ != STATE_NONE && |
| io_state_ != STATE_OPEN); |
| |
| return result; |
| } |
| |
| int SpdyStream::DoSendHeaders() { |
| CHECK(!cancelled_); |
| |
| spdy::SpdyControlFlags flags = spdy::CONTROL_FLAG_NONE; |
| if (!has_upload_data_) |
| flags = spdy::CONTROL_FLAG_FIN; |
| |
| CHECK(request_.get()); |
| int result = session_->WriteSynStream( |
| stream_id_, static_cast<RequestPriority>(priority_), flags, |
| request_); |
| if (result != ERR_IO_PENDING) |
| return result; |
| |
| send_time_ = base::TimeTicks::Now(); |
| io_state_ = STATE_SEND_HEADERS_COMPLETE; |
| return ERR_IO_PENDING; |
| } |
| |
| int SpdyStream::DoSendHeadersComplete(int result) { |
| if (result < 0) |
| return result; |
| |
| CHECK_GT(result, 0); |
| |
| if (!delegate_) |
| return ERR_UNEXPECTED; |
| |
| // There is no body, skip that state. |
| if (delegate_->OnSendHeadersComplete(result)) { |
| io_state_ = STATE_WAITING_FOR_RESPONSE; |
| return OK; |
| } |
| |
| io_state_ = STATE_SEND_BODY; |
| return OK; |
| } |
| |
| // DoSendBody is called to send the optional body for the request. This call |
| // will also be called as each write of a chunk of the body completes. |
| int SpdyStream::DoSendBody() { |
| // If we're already in the STATE_SENDING_BODY state, then we've already |
| // sent a portion of the body. In that case, we need to first consume |
| // the bytes written in the body stream. Note that the bytes written is |
| // the number of bytes in the frame that were written, only consume the |
| // data portion, of course. |
| io_state_ = STATE_SEND_BODY_COMPLETE; |
| if (!delegate_) |
| return ERR_UNEXPECTED; |
| return delegate_->OnSendBody(); |
| } |
| |
| int SpdyStream::DoSendBodyComplete(int result) { |
| if (result < 0) |
| return result; |
| |
| if (!delegate_) |
| return ERR_UNEXPECTED; |
| |
| bool eof = false; |
| result = delegate_->OnSendBodyComplete(result, &eof); |
| if (!eof) |
| io_state_ = STATE_SEND_BODY; |
| else |
| io_state_ = STATE_WAITING_FOR_RESPONSE; |
| |
| return result; |
| } |
| |
| int SpdyStream::DoOpen(int result) { |
| if (delegate_) |
| delegate_->OnDataSent(result); |
| io_state_ = STATE_OPEN; |
| return result; |
| } |
| |
| void SpdyStream::UpdateHistograms() { |
| // We need all timers to be filled in, otherwise metrics can be bogus. |
| if (send_time_.is_null() || recv_first_byte_time_.is_null() || |
| recv_last_byte_time_.is_null()) |
| return; |
| |
| UMA_HISTOGRAM_TIMES("Net.SpdyStreamTimeToFirstByte", |
| recv_first_byte_time_ - send_time_); |
| UMA_HISTOGRAM_TIMES("Net.SpdyStreamDownloadTime", |
| recv_last_byte_time_ - recv_first_byte_time_); |
| UMA_HISTOGRAM_TIMES("Net.SpdyStreamTime", |
| recv_last_byte_time_ - send_time_); |
| |
| UMA_HISTOGRAM_COUNTS("Net.SpdySendBytes", send_bytes_); |
| UMA_HISTOGRAM_COUNTS("Net.SpdyRecvBytes", recv_bytes_); |
| } |
| |
| } // namespace net |