/* * Copyright 2012 Google Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ // Author: jefftk@google.com (Jeff Kaufman) #include "ngx_base_fetch.h" #include "ngx_list_iterator.h" #include "ngx_pagespeed.h" #include "net/instaweb/http/public/response_headers.h" #include "net/instaweb/rewriter/public/rewrite_stats.h" #include "net/instaweb/util/public/google_message_handler.h" #include "net/instaweb/util/public/message_handler.h" namespace net_instaweb { NgxBaseFetch::NgxBaseFetch(ngx_http_request_t* r, int pipe_fd, NgxServerContext* server_context, const RequestContextPtr& request_ctx, PreserveCachingHeaders preserve_caching_headers) : AsyncFetch(request_ctx), request_(r), server_context_(server_context), done_called_(false), last_buf_sent_(false), pipe_fd_(pipe_fd), references_(2), handle_error_(true), preserve_caching_headers_(preserve_caching_headers) { if (pthread_mutex_init(&mutex_, NULL)) CHECK(0); } NgxBaseFetch::~NgxBaseFetch() { pthread_mutex_destroy(&mutex_); } void NgxBaseFetch::Lock() { pthread_mutex_lock(&mutex_); } void NgxBaseFetch::Unlock() { pthread_mutex_unlock(&mutex_); } bool NgxBaseFetch::HandleWrite(const StringPiece& sp, MessageHandler* handler) { Lock(); buffer_.append(sp.data(), sp.size()); Unlock(); return true; } // should only be called in nginx thread ngx_int_t NgxBaseFetch::CopyBufferToNginx(ngx_chain_t** link_ptr) { CHECK(!(done_called_ && last_buf_sent_)) << "CopyBufferToNginx() was called after the last buffer was sent"; // there is no buffer to send if (!done_called_ && buffer_.empty()) { *link_ptr = NULL; return NGX_AGAIN; } int rc = string_piece_to_buffer_chain( request_->pool, buffer_, link_ptr, done_called_ /* send_last_buf */); if (rc != NGX_OK) { return rc; } // Done with buffer contents now. buffer_.clear(); if (done_called_) { last_buf_sent_ = true; return NGX_OK; } return NGX_AGAIN; } // There may also be a race condition if this is called between the last Write() // and Done() such that we're sending an empty buffer with last_buf set, which I // think nginx will reject. ngx_int_t NgxBaseFetch::CollectAccumulatedWrites(ngx_chain_t** link_ptr) { ngx_int_t rc; Lock(); rc = CopyBufferToNginx(link_ptr); Unlock(); return rc; } ngx_int_t NgxBaseFetch::CollectHeaders(ngx_http_headers_out_t* headers_out) { const ResponseHeaders* pagespeed_headers = response_headers(); // TODO(chaizhenhua): Add and check. // if (content_length_known()) { // headers_out->content_length = NULL; // headers_out->content_length_n = content_length(); // } return copy_response_headers_to_ngx(request_, *pagespeed_headers, preserve_caching_headers_); } void NgxBaseFetch::RequestCollection() { int rc; char c = 'A'; // What byte we write is arbitrary. while (true) { rc = write(pipe_fd_, &c, 1); if (rc == 1) { break; } else if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK) { // TODO(jefftk): is this rare enough that spinning isn't a problem? Could // we get into a case where the pipe fills up and we spin forever? } else { perror("NgxBaseFetch::RequestCollection"); break; } } } void NgxBaseFetch::HandleHeadersComplete() { int status_code = response_headers()->status_code(); bool status_ok = (status_code != 0) && (status_code < 400); if (status_ok || handle_error_) { // If this is a 404 response we need to count it in the stats. if (response_headers()->status_code() == HttpStatus::kNotFound) { server_context_->rewrite_stats()->resource_404_count()->Add(1); } } RequestCollection(); // Headers available. } bool NgxBaseFetch::HandleFlush(MessageHandler* handler) { RequestCollection(); // A new part of the response body is available. return true; } void NgxBaseFetch::Release() { DecrefAndDeleteIfUnreferenced(); } void NgxBaseFetch::DecrefAndDeleteIfUnreferenced() { // Creates a full memory barrier. if (__sync_add_and_fetch(&references_, -1) == 0) { delete this; } } void NgxBaseFetch::HandleDone(bool success) { // TODO(jefftk): it's possible that instead of locking here we can just modify // CopyBufferToNginx to only read done_called_ once. Lock(); done_called_ = true; Unlock(); close(pipe_fd_); // Indicates to nginx that we're done with the rewrite. pipe_fd_ = -1; DecrefAndDeleteIfUnreferenced(); } } // namespace net_instaweb