| // Copyright (c) 2011 The Chromium Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| #include "net/websockets/websocket_job.h" |
| |
| #include <algorithm> |
| |
| #include "base/lazy_instance.h" |
| #include "base/string_tokenizer.h" |
| #include "googleurl/src/gurl.h" |
| #include "net/base/net_errors.h" |
| #include "net/base/net_log.h" |
| #include "net/base/cookie_policy.h" |
| #include "net/base/cookie_store.h" |
| #include "net/base/io_buffer.h" |
| #include "net/http/http_util.h" |
| #include "net/url_request/url_request_context.h" |
| #include "net/websockets/websocket_frame_handler.h" |
| #include "net/websockets/websocket_handshake_handler.h" |
| #include "net/websockets/websocket_net_log_params.h" |
| #include "net/websockets/websocket_throttle.h" |
| |
| namespace { |
| |
| // lower-case header names. |
| const char* const kCookieHeaders[] = { |
| "cookie", "cookie2" |
| }; |
| const char* const kSetCookieHeaders[] = { |
| "set-cookie", "set-cookie2" |
| }; |
| |
| net::SocketStreamJob* WebSocketJobFactory( |
| const GURL& url, net::SocketStream::Delegate* delegate) { |
| net::WebSocketJob* job = new net::WebSocketJob(delegate); |
| job->InitSocketStream(new net::SocketStream(url, job)); |
| return job; |
| } |
| |
| class WebSocketJobInitSingleton { |
| private: |
| friend struct base::DefaultLazyInstanceTraits<WebSocketJobInitSingleton>; |
| WebSocketJobInitSingleton() { |
| net::SocketStreamJob::RegisterProtocolFactory("ws", WebSocketJobFactory); |
| net::SocketStreamJob::RegisterProtocolFactory("wss", WebSocketJobFactory); |
| } |
| }; |
| |
| static base::LazyInstance<WebSocketJobInitSingleton> g_websocket_job_init( |
| base::LINKER_INITIALIZED); |
| |
| } // anonymous namespace |
| |
| namespace net { |
| |
| // static |
| void WebSocketJob::EnsureInit() { |
| g_websocket_job_init.Get(); |
| } |
| |
| WebSocketJob::WebSocketJob(SocketStream::Delegate* delegate) |
| : delegate_(delegate), |
| state_(INITIALIZED), |
| waiting_(false), |
| callback_(NULL), |
| handshake_request_(new WebSocketHandshakeRequestHandler), |
| handshake_response_(new WebSocketHandshakeResponseHandler), |
| handshake_request_sent_(0), |
| response_cookies_save_index_(0), |
| send_frame_handler_(new WebSocketFrameHandler), |
| receive_frame_handler_(new WebSocketFrameHandler) { |
| } |
| |
| WebSocketJob::~WebSocketJob() { |
| DCHECK_EQ(CLOSED, state_); |
| DCHECK(!delegate_); |
| DCHECK(!socket_.get()); |
| } |
| |
| void WebSocketJob::Connect() { |
| DCHECK(socket_.get()); |
| DCHECK_EQ(state_, INITIALIZED); |
| state_ = CONNECTING; |
| socket_->Connect(); |
| } |
| |
| bool WebSocketJob::SendData(const char* data, int len) { |
| switch (state_) { |
| case INITIALIZED: |
| return false; |
| |
| case CONNECTING: |
| return SendHandshakeRequest(data, len); |
| |
| case OPEN: |
| { |
| send_frame_handler_->AppendData(data, len); |
| // If current buffer is sending now, this data will be sent in |
| // SendPending() after current data was sent. |
| // Do not buffer sending data for now. Since |
| // WebCore::SocketStreamHandle controls traffic to keep number of |
| // pending bytes less than max_pending_send_allowed, so when sending |
| // larger message than max_pending_send_allowed should not be buffered. |
| // If we don't call OnSentData, WebCore::SocketStreamHandle would stop |
| // sending more data when pending data reaches max_pending_send_allowed. |
| // TODO(ukai): Fix this to support compression for larger message. |
| int err = 0; |
| if (!send_frame_handler_->GetCurrentBuffer() && |
| (err = send_frame_handler_->UpdateCurrentBuffer(false)) > 0) { |
| DCHECK(!current_buffer_); |
| current_buffer_ = new DrainableIOBuffer( |
| send_frame_handler_->GetCurrentBuffer(), |
| send_frame_handler_->GetCurrentBufferSize()); |
| return socket_->SendData( |
| current_buffer_->data(), current_buffer_->BytesRemaining()); |
| } |
| return err >= 0; |
| } |
| |
| case CLOSING: |
| case CLOSED: |
| return false; |
| } |
| return false; |
| } |
| |
| void WebSocketJob::Close() { |
| state_ = CLOSING; |
| if (current_buffer_) { |
| // Will close in SendPending. |
| return; |
| } |
| state_ = CLOSED; |
| socket_->Close(); |
| } |
| |
| void WebSocketJob::RestartWithAuth( |
| const string16& username, |
| const string16& password) { |
| state_ = CONNECTING; |
| socket_->RestartWithAuth(username, password); |
| } |
| |
| void WebSocketJob::DetachDelegate() { |
| state_ = CLOSED; |
| WebSocketThrottle::GetInstance()->RemoveFromQueue(this); |
| WebSocketThrottle::GetInstance()->WakeupSocketIfNecessary(); |
| |
| scoped_refptr<WebSocketJob> protect(this); |
| |
| delegate_ = NULL; |
| if (socket_) |
| socket_->DetachDelegate(); |
| socket_ = NULL; |
| if (callback_) { |
| waiting_ = false; |
| callback_ = NULL; |
| Release(); // Balanced with OnStartOpenConnection(). |
| } |
| } |
| |
| int WebSocketJob::OnStartOpenConnection( |
| SocketStream* socket, CompletionCallback* callback) { |
| DCHECK(!callback_); |
| state_ = CONNECTING; |
| addresses_.Copy(socket->address_list().head(), true); |
| WebSocketThrottle::GetInstance()->PutInQueue(this); |
| if (!waiting_) |
| return OK; |
| callback_ = callback; |
| AddRef(); // Balanced when callback_ becomes NULL. |
| return ERR_IO_PENDING; |
| } |
| |
| void WebSocketJob::OnConnected( |
| SocketStream* socket, int max_pending_send_allowed) { |
| if (state_ == CLOSED) |
| return; |
| DCHECK_EQ(CONNECTING, state_); |
| if (delegate_) |
| delegate_->OnConnected(socket, max_pending_send_allowed); |
| } |
| |
| void WebSocketJob::OnSentData(SocketStream* socket, int amount_sent) { |
| DCHECK_NE(INITIALIZED, state_); |
| if (state_ == CLOSED) |
| return; |
| if (state_ == CONNECTING) { |
| OnSentHandshakeRequest(socket, amount_sent); |
| return; |
| } |
| if (delegate_) { |
| DCHECK(state_ == OPEN || state_ == CLOSING); |
| DCHECK_GT(amount_sent, 0); |
| DCHECK(current_buffer_); |
| current_buffer_->DidConsume(amount_sent); |
| if (current_buffer_->BytesRemaining() > 0) |
| return; |
| |
| // We need to report amount_sent of original buffer size, instead of |
| // amount sent to |socket|. |
| amount_sent = send_frame_handler_->GetOriginalBufferSize(); |
| DCHECK_GT(amount_sent, 0); |
| current_buffer_ = NULL; |
| send_frame_handler_->ReleaseCurrentBuffer(); |
| delegate_->OnSentData(socket, amount_sent); |
| MessageLoopForIO::current()->PostTask( |
| FROM_HERE, NewRunnableMethod(this, &WebSocketJob::SendPending)); |
| } |
| } |
| |
| void WebSocketJob::OnReceivedData( |
| SocketStream* socket, const char* data, int len) { |
| DCHECK_NE(INITIALIZED, state_); |
| if (state_ == CLOSED) |
| return; |
| if (state_ == CONNECTING) { |
| OnReceivedHandshakeResponse(socket, data, len); |
| return; |
| } |
| DCHECK(state_ == OPEN || state_ == CLOSING); |
| std::string received_data; |
| receive_frame_handler_->AppendData(data, len); |
| // Don't buffer receiving data for now. |
| // TODO(ukai): fix performance of WebSocketFrameHandler. |
| while (receive_frame_handler_->UpdateCurrentBuffer(false) > 0) { |
| received_data += |
| std::string(receive_frame_handler_->GetCurrentBuffer()->data(), |
| receive_frame_handler_->GetCurrentBufferSize()); |
| receive_frame_handler_->ReleaseCurrentBuffer(); |
| } |
| if (delegate_ && !received_data.empty()) |
| delegate_->OnReceivedData( |
| socket, received_data.data(), received_data.size()); |
| } |
| |
| void WebSocketJob::OnClose(SocketStream* socket) { |
| state_ = CLOSED; |
| WebSocketThrottle::GetInstance()->RemoveFromQueue(this); |
| WebSocketThrottle::GetInstance()->WakeupSocketIfNecessary(); |
| |
| scoped_refptr<WebSocketJob> protect(this); |
| |
| SocketStream::Delegate* delegate = delegate_; |
| delegate_ = NULL; |
| socket_ = NULL; |
| if (callback_) { |
| waiting_ = false; |
| callback_ = NULL; |
| Release(); // Balanced with OnStartOpenConnection(). |
| } |
| if (delegate) |
| delegate->OnClose(socket); |
| } |
| |
| void WebSocketJob::OnAuthRequired( |
| SocketStream* socket, AuthChallengeInfo* auth_info) { |
| if (delegate_) |
| delegate_->OnAuthRequired(socket, auth_info); |
| } |
| |
| void WebSocketJob::OnError(const SocketStream* socket, int error) { |
| if (delegate_) |
| delegate_->OnError(socket, error); |
| } |
| |
| bool WebSocketJob::SendHandshakeRequest(const char* data, int len) { |
| DCHECK_EQ(state_, CONNECTING); |
| if (!handshake_request_->ParseRequest(data, len)) |
| return false; |
| |
| // handshake message is completed. |
| AddCookieHeaderAndSend(); |
| // Just buffered in |handshake_request_|. |
| return true; |
| } |
| |
| void WebSocketJob::AddCookieHeaderAndSend() { |
| int policy = OK; |
| if (socket_->context()->cookie_policy()) { |
| GURL url_for_cookies = GetURLForCookies(); |
| policy = socket_->context()->cookie_policy()->CanGetCookies( |
| url_for_cookies, |
| url_for_cookies); |
| } |
| DCHECK_NE(ERR_IO_PENDING, policy); |
| OnCanGetCookiesCompleted(policy); |
| } |
| |
| void WebSocketJob::OnCanGetCookiesCompleted(int policy) { |
| if (socket_ && delegate_ && state_ == CONNECTING) { |
| handshake_request_->RemoveHeaders( |
| kCookieHeaders, arraysize(kCookieHeaders)); |
| if (policy == OK) { |
| // Add cookies, including HttpOnly cookies. |
| if (socket_->context()->cookie_store()) { |
| CookieOptions cookie_options; |
| cookie_options.set_include_httponly(); |
| std::string cookie = |
| socket_->context()->cookie_store()->GetCookiesWithOptions( |
| GetURLForCookies(), cookie_options); |
| if (!cookie.empty()) |
| handshake_request_->AppendHeaderIfMissing("Cookie", cookie); |
| } |
| } |
| |
| const std::string& handshake_request = handshake_request_->GetRawRequest(); |
| handshake_request_sent_ = 0; |
| socket_->net_log()->AddEvent( |
| NetLog::TYPE_WEB_SOCKET_SEND_REQUEST_HEADERS, |
| make_scoped_refptr( |
| new NetLogWebSocketHandshakeParameter(handshake_request))); |
| socket_->SendData(handshake_request.data(), |
| handshake_request.size()); |
| } |
| } |
| |
| void WebSocketJob::OnSentHandshakeRequest( |
| SocketStream* socket, int amount_sent) { |
| DCHECK_EQ(state_, CONNECTING); |
| handshake_request_sent_ += amount_sent; |
| DCHECK_LE(handshake_request_sent_, handshake_request_->raw_length()); |
| if (handshake_request_sent_ >= handshake_request_->raw_length()) { |
| // handshake request has been sent. |
| // notify original size of handshake request to delegate. |
| if (delegate_) |
| delegate_->OnSentData( |
| socket, |
| handshake_request_->original_length()); |
| handshake_request_.reset(); |
| } |
| } |
| |
| void WebSocketJob::OnReceivedHandshakeResponse( |
| SocketStream* socket, const char* data, int len) { |
| DCHECK_EQ(state_, CONNECTING); |
| if (handshake_response_->HasResponse()) { |
| // If we already has handshake response, received data should be frame |
| // data, not handshake message. |
| receive_frame_handler_->AppendData(data, len); |
| return; |
| } |
| |
| size_t response_length = handshake_response_->ParseRawResponse(data, len); |
| if (!handshake_response_->HasResponse()) { |
| // not yet. we need more data. |
| return; |
| } |
| // handshake message is completed. |
| socket_->net_log()->AddEvent( |
| NetLog::TYPE_WEB_SOCKET_READ_RESPONSE_HEADERS, |
| make_scoped_refptr(new NetLogWebSocketHandshakeParameter( |
| handshake_response_->GetRawResponse()))); |
| if (len - response_length > 0) { |
| // If we received extra data, it should be frame data. |
| receive_frame_handler_->AppendData(data + response_length, |
| len - response_length); |
| } |
| SaveCookiesAndNotifyHeaderComplete(); |
| } |
| |
| void WebSocketJob::SaveCookiesAndNotifyHeaderComplete() { |
| // handshake message is completed. |
| DCHECK(handshake_response_->HasResponse()); |
| |
| response_cookies_.clear(); |
| response_cookies_save_index_ = 0; |
| |
| handshake_response_->GetHeaders( |
| kSetCookieHeaders, arraysize(kSetCookieHeaders), &response_cookies_); |
| |
| // Now, loop over the response cookies, and attempt to persist each. |
| SaveNextCookie(); |
| } |
| |
| void WebSocketJob::SaveNextCookie() { |
| if (response_cookies_save_index_ == response_cookies_.size()) { |
| response_cookies_.clear(); |
| response_cookies_save_index_ = 0; |
| |
| // Remove cookie headers, with malformed headers preserved. |
| // Actual handshake should be done in WebKit. |
| handshake_response_->RemoveHeaders( |
| kSetCookieHeaders, arraysize(kSetCookieHeaders)); |
| std::string received_data = handshake_response_->GetResponse(); |
| // Don't buffer receiving data for now. |
| // TODO(ukai): fix performance of WebSocketFrameHandler. |
| while (receive_frame_handler_->UpdateCurrentBuffer(false) > 0) { |
| received_data += |
| std::string(receive_frame_handler_->GetCurrentBuffer()->data(), |
| receive_frame_handler_->GetCurrentBufferSize()); |
| receive_frame_handler_->ReleaseCurrentBuffer(); |
| } |
| |
| state_ = OPEN; |
| if (delegate_) |
| delegate_->OnReceivedData( |
| socket_, received_data.data(), received_data.size()); |
| |
| handshake_response_.reset(); |
| |
| WebSocketThrottle::GetInstance()->RemoveFromQueue(this); |
| WebSocketThrottle::GetInstance()->WakeupSocketIfNecessary(); |
| return; |
| } |
| |
| int policy = OK; |
| if (socket_->context()->cookie_policy()) { |
| GURL url_for_cookies = GetURLForCookies(); |
| policy = socket_->context()->cookie_policy()->CanSetCookie( |
| url_for_cookies, |
| url_for_cookies, |
| response_cookies_[response_cookies_save_index_]); |
| } |
| |
| DCHECK_NE(ERR_IO_PENDING, policy); |
| OnCanSetCookieCompleted(policy); |
| } |
| |
| void WebSocketJob::OnCanSetCookieCompleted(int policy) { |
| if (socket_ && delegate_ && state_ == CONNECTING) { |
| if ((policy == OK || policy == OK_FOR_SESSION_ONLY) && |
| socket_->context()->cookie_store()) { |
| CookieOptions options; |
| options.set_include_httponly(); |
| if (policy == OK_FOR_SESSION_ONLY) |
| options.set_force_session(); |
| GURL url_for_cookies = GetURLForCookies(); |
| socket_->context()->cookie_store()->SetCookieWithOptions( |
| url_for_cookies, response_cookies_[response_cookies_save_index_], |
| options); |
| } |
| response_cookies_save_index_++; |
| SaveNextCookie(); |
| } |
| } |
| |
| GURL WebSocketJob::GetURLForCookies() const { |
| GURL url = socket_->url(); |
| std::string scheme = socket_->is_secure() ? "https" : "http"; |
| url_canon::Replacements<char> replacements; |
| replacements.SetScheme(scheme.c_str(), |
| url_parse::Component(0, scheme.length())); |
| return url.ReplaceComponents(replacements); |
| } |
| |
| const AddressList& WebSocketJob::address_list() const { |
| return addresses_; |
| } |
| |
| void WebSocketJob::SetWaiting() { |
| waiting_ = true; |
| } |
| |
| bool WebSocketJob::IsWaiting() const { |
| return waiting_; |
| } |
| |
| void WebSocketJob::Wakeup() { |
| if (!waiting_) |
| return; |
| waiting_ = false; |
| DCHECK(callback_); |
| MessageLoopForIO::current()->PostTask( |
| FROM_HERE, |
| NewRunnableMethod(this, |
| &WebSocketJob::DoCallback)); |
| } |
| |
| void WebSocketJob::DoCallback() { |
| // |callback_| may be NULL if OnClose() or DetachDelegate() was called. |
| if (callback_) { |
| net::CompletionCallback* callback = callback_; |
| callback_ = NULL; |
| callback->Run(net::OK); |
| Release(); // Balanced with OnStartOpenConnection(). |
| } |
| } |
| |
| void WebSocketJob::SendPending() { |
| if (current_buffer_) |
| return; |
| // Current buffer is done. Try next buffer if any. |
| // Don't buffer sending data. See comment on case OPEN in SendData(). |
| if (send_frame_handler_->UpdateCurrentBuffer(false) <= 0) { |
| // No more data to send. |
| if (state_ == CLOSING) |
| socket_->Close(); |
| return; |
| } |
| current_buffer_ = new DrainableIOBuffer( |
| send_frame_handler_->GetCurrentBuffer(), |
| send_frame_handler_->GetCurrentBufferSize()); |
| socket_->SendData(current_buffer_->data(), current_buffer_->BytesRemaining()); |
| } |
| |
| } // namespace net |