diff --git a/config b/config index d4cb3f6e2..610f99ef6 100644 --- a/config +++ b/config @@ -153,6 +153,8 @@ if [ $ngx_found = yes ]; then # Make pagespeed run immediately before gzip. HTTP_FILTER_MODULES=$(echo $HTTP_FILTER_MODULES |\ sed "s/$HTTP_GZIP_FILTER_MODULE/$HTTP_GZIP_FILTER_MODULE $ngx_addon_name/") + HTTP_FILTER_MODULES=$(echo $HTTP_FILTER_MODULES |\ + sed "s/$HTTP_HEADER_FILTER_MODULE/$HTTP_HEADER_FILTER_MODULE ngx_pagespeed_copy_filter/") CORE_LIBS="$CORE_LIBS $pagespeed_libs" CORE_INCS="$CORE_INCS $pagespeed_include" else diff --git a/src/ngx_base_fetch.cc b/src/ngx_base_fetch.cc index 85da6f94f..67bacc0a5 100644 --- a/src/ngx_base_fetch.cc +++ b/src/ngx_base_fetch.cc @@ -27,7 +27,7 @@ namespace net_instaweb { -NgxBaseFetch::NgxBaseFetch(ngx_http_request_t* r, int pipe_fd, +NgxBaseFetch::NgxBaseFetch(ngx_http_request_t* r, NgxServerContext* server_context, const RequestContextPtr& request_ctx) : AsyncFetch(request_ctx), @@ -35,8 +35,8 @@ NgxBaseFetch::NgxBaseFetch(ngx_http_request_t* r, int pipe_fd, server_context_(server_context), done_called_(false), last_buf_sent_(false), - pipe_fd_(pipe_fd), - references_(2) { + references_(2), + flush_(false) { if (pthread_mutex_init(&mutex_, NULL)) CHECK(0); PopulateRequestHeaders(); } @@ -115,7 +115,9 @@ bool NgxBaseFetch::HandleWrite(const StringPiece& sp, ngx_int_t NgxBaseFetch::CopyBufferToNginx(ngx_chain_t** link_ptr) { if (done_called_ && last_buf_sent_) { - return NGX_DECLINED; + // OK means HandleDone has been called + *link_ptr = NULL; + return NGX_OK; } int rc = ngx_psol::string_piece_to_buffer_chain( @@ -129,26 +131,42 @@ ngx_int_t NgxBaseFetch::CopyBufferToNginx(ngx_chain_t** link_ptr) { if (done_called_) { last_buf_sent_ = true; + return NGX_OK; } - return NGX_OK; + return NGX_AGAIN; } // There may also be a race condition if this is called between the last Write() // and Done() such that we're sending an empty buffer with last_buf set, which I // think nginx will reject. ngx_int_t NgxBaseFetch::CollectAccumulatedWrites(ngx_chain_t** link_ptr) { + ngx_int_t rc = NGX_DECLINED; Lock(); - ngx_int_t rc = CopyBufferToNginx(link_ptr); + if (flush_) { + rc = CopyBufferToNginx(link_ptr); + flush_ = false; + } Unlock(); - if (rc == NGX_DECLINED) { *link_ptr = NULL; - return NGX_OK; } return rc; } +void NgxBaseFetch::RequestCollection() { + Lock(); + if (flush_) { + Unlock(); + return; + } + flush_ = true; + ngx_psol::ps_base_fetch_signal(request_); + Unlock(); + return; +} + + ngx_int_t NgxBaseFetch::CollectHeaders(ngx_http_headers_out_t* headers_out) { Lock(); const ResponseHeaders* pagespeed_headers = response_headers(); @@ -156,24 +174,6 @@ ngx_int_t NgxBaseFetch::CollectHeaders(ngx_http_headers_out_t* headers_out) { return ngx_psol::copy_response_headers_to_ngx(request_, *pagespeed_headers); } -void NgxBaseFetch::RequestCollection() { - int rc; - char c = 'A'; // What byte we write is arbitrary. - while (true) { - rc = write(pipe_fd_, &c, 1); - if (rc == 1) { - break; - } else if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK) { - // TODO(jefftk): is this rare enough that spinning isn't a problem? Could - // we get into a case where the pipe fills up and we spin forever? - - } else { - perror("NgxBaseFetch::RequestCollection"); - break; - } - } -} - void NgxBaseFetch::HandleHeadersComplete() { // If this is a 404 response we need to count it in the stats. if (response_headers()->status_code() == HttpStatus::kNotFound) { @@ -189,6 +189,9 @@ bool NgxBaseFetch::HandleFlush(MessageHandler* handler) { } void NgxBaseFetch::Release() { + Lock(); + flush_ = true; + Unlock(); DecrefAndDeleteIfUnreferenced(); } @@ -202,13 +205,24 @@ void NgxBaseFetch::DecrefAndDeleteIfUnreferenced() { void NgxBaseFetch::HandleDone(bool success) { // TODO(jefftk): it's possible that instead of locking here we can just modify // CopyBufferToNginx to only read done_called_ once. + + if (done_called_ == true) { + return; + } + Lock(); + if (done_called_ == true) { + Unlock(); + return; + } + done_called_ = true; + if (!flush_) { + flush_ = true; + ngx_psol::ps_base_fetch_signal(request_); + } + Unlock(); - - close(pipe_fd_); // Indicates to nginx that we're done with the rewrite. - pipe_fd_ = -1; - DecrefAndDeleteIfUnreferenced(); } diff --git a/src/ngx_base_fetch.h b/src/ngx_base_fetch.h index a5b526607..8bcfa1194 100644 --- a/src/ngx_base_fetch.h +++ b/src/ngx_base_fetch.h @@ -55,7 +55,7 @@ namespace net_instaweb { class NgxBaseFetch : public AsyncFetch { public: - NgxBaseFetch(ngx_http_request_t* r, int pipe_fd, + NgxBaseFetch(ngx_http_request_t* r, NgxServerContext* server_context, const RequestContextPtr& request_ctx); virtual ~NgxBaseFetch(); @@ -102,7 +102,9 @@ class NgxBaseFetch : public AsyncFetch { // Lock must be acquired first. // Returns: // NGX_DECLINED: nothing to send, short circuit. Buffer not allocated. - // NGX_OK, NGX_ERROR: success, failure + // NGX_ERROR: failure + // NGX_AGAIN: success + // NGX_OK: done, HandleDone has been called // Allocates an nginx buffer, copies our buffer_ contents into it, clears // buffer_. ngx_int_t CopyBufferToNginx(ngx_chain_t** link_ptr); @@ -119,12 +121,14 @@ class NgxBaseFetch : public AsyncFetch { NgxServerContext* server_context_; bool done_called_; bool last_buf_sent_; - int pipe_fd_; // How many active references there are to this fetch. Starts at two, // decremented once when Done() is called and once when Release() is called. int references_; pthread_mutex_t mutex_; + // set by RequestCollection, cleared by CollectAccumulatedWrites + volatile bool flush_; + DISALLOW_COPY_AND_ASSIGN(NgxBaseFetch); }; diff --git a/src/ngx_pagespeed.cc b/src/ngx_pagespeed.cc index 466484f91..bac0cdf83 100644 --- a/src/ngx_pagespeed.cc +++ b/src/ngx_pagespeed.cc @@ -73,7 +73,7 @@ extern ngx_module_t ngx_pagespeed; #define DBG(r, args...) \ ngx_log_error(NGX_LOG_DEBUG, (r)->connection->log, 0, args) #define PDBG(ctx, args...) \ - ngx_log_error(NGX_LOG_DEBUG, (ctx)->pagespeed_connection->log, 0, args) + ngx_log_error(NGX_LOG_DEBUG, (ctx)->r->connection->log, 0, args) #define CDBG(cf, args...) \ ngx_conf_log_error(NGX_LOG_DEBUG, cf, 0, args) @@ -310,12 +310,6 @@ ps_request_ctx_t* ps_get_request_context(ngx_http_request_t* r); void ps_initialize_server_context(ps_srv_conf_t* cfg); -ngx_int_t ps_update(ps_request_ctx_t* ctx, ngx_event_t* ev); - -void ps_connection_read_handler(ngx_event_t* ev); - -ngx_int_t ps_create_connection(ps_request_ctx_t* ctx); - namespace CreateRequestContext { enum Response { kOk, @@ -333,6 +327,251 @@ enum Response { }; } // namespace CreateRequestContext +ngx_http_output_header_filter_pt ngx_http_next_header_filter; +ngx_http_output_header_filter_pt ngx_http_header_filter; +ngx_http_output_body_filter_pt ngx_http_next_body_filter; + + +ngx_int_t ps_send_response(ngx_http_request_t *r) { + ps_request_ctx_t* ctx = ps_get_request_context(r); + ngx_int_t rc; + + ngx_log_debug1(NGX_LOG_DEBUG_HTTP, r->connection->log, 0, + "ps send response: %V", &r->uri); + + // Get output from pagespeed. + if (!r->header_sent) { + if (ctx->is_resource_fetch || ctx->modify_headers) { + ngx_http_clean_header(r); + + rc = ctx->base_fetch->CollectHeaders(&r->headers_out); + if (rc == NGX_ERROR) { + return NGX_HTTP_INTERNAL_SERVER_ERROR; + } + } + if (ctx->is_resource_fetch) { + rc = ngx_http_send_header(r); + } else { + rc = ngx_http_header_filter(r); + } + + // standard nginx send header check see ngx_http_send_response + if (rc == NGX_ERROR || rc > NGX_OK) { + return ngx_http_filter_finalize_request(r, NULL, rc); + } + if (rc == NGX_OK && r->header_only) { + return NGX_OK; + } + } + + ngx_chain_t* cl; + + // OK means last buffer has been sent + rc = ctx->base_fetch->CollectAccumulatedWrites(&cl); + PDBG(ctx, "CollectAccumulatedWrites, %d", rc); + + if (rc == NGX_ERROR) { + return NGX_HTTP_INTERNAL_SERVER_ERROR; + } + + // rc == NGX_OK || rc == NGX_AGAIN || rc == NGX_DECLINED + // + // Pass the optimized content along to later body filters. + // From Weibin: This function should be called mutiple times. Store the + // whole file in one chain buffers is too aggressive. It could consume + // too much memory in busy servers. + + bool done = (rc == NGX_OK); + + // body_filter can handle NULL chain. + rc = ngx_http_next_body_filter(r, cl); + + if (rc == NGX_OK) { + ctx->write_pending = false; + if (done) { + ps_set_buffered(ctx->r, false); + return NGX_OK; + } + } + + if (rc == NGX_OK || rc == NGX_AGAIN) { + ps_set_buffered(ctx->r, true); + return NGX_AGAIN; + } + + // others + return rc; +} + + +ngx_connection_t *ps_base_fetch_conn = NULL; +int ps_base_fetch_pipefds[2] = {-1, -1}; + +// handle base fetch event(Flush/HeadersComplete/Done), +// and ignore the event corresponding to SKIP. +// SKIP should only be specified in ps_release_request_context, +// so that released base fetch event will be ignored. +// +// modified from ngx_channel_handler +void ps_base_fetch_clear(ngx_http_request_t *skip = NULL) { + for ( ;; ) { + ngx_http_request_t *requests[512]; + ssize_t size = read(ps_base_fetch_conn->fd, + static_cast(requests), sizeof(requests)); + + if (size == -1) { + if (ngx_errno == EINTR) { + continue; + } else if (ngx_errno == EAGAIN) { + return; + } + } + + if (size <= 0) { + // Terminate + if (ngx_event_flags & NGX_USE_EPOLL_EVENT) { + ngx_del_conn(ps_base_fetch_conn, 0); + } + + ngx_close_connection(ps_base_fetch_conn); + ps_base_fetch_conn = NULL; + break; + } + + ngx_uint_t i; + for (i = 0; i < size / sizeof(ngx_http_request_t *); i++) { + ngx_http_request_t *r = requests[i]; + if (r == skip) { + ngx_log_debug0(NGX_LOG_DEBUG_HTTP, r->connection->log, 0, + "base fetch clear"); + continue; + } + ps_request_ctx_t* ctx = ps_get_request_context(r); + ctx->write_pending = true;; + ngx_http_finalize_request(r, ps_send_response(r)); + } + } +} + +void ps_base_fetch_event_handler(ngx_event_t *ev) { + if (ev->timedout) { + ev->timedout = 0; + return; + } + ngx_log_debug0(NGX_LOG_DEBUG_EVENT, ev->log, 0, "base fetch event handler"); + ps_base_fetch_clear(NULL); +} + +ngx_int_t ps_base_fetch_event_init(ngx_cycle_t *cycle) { + ps_main_conf_t* cfg_m = static_cast( + ngx_http_cycle_get_module_main_conf(cycle, ngx_pagespeed)); + + if (::pipe(ps_base_fetch_pipefds) != 0) { + cfg_m->handler->Message(net_instaweb::kError, "base fetch pipe() failed"); + return NGX_ERROR; + } + + if (ngx_nonblocking(ps_base_fetch_pipefds[0]) == -1) { + cfg_m->handler->Message(net_instaweb::kError, + "base fetch pipe[0] " ngx_nonblocking_n " failed"); + goto failed; + } + + // following codes are modified from ngx_add_channel_event + // we have to save ngx_connection_t, so can not use ngx_add_channel_event + ngx_event_t *rev, *wev; + ngx_connection_t *c; + + c = ngx_get_connection(ps_base_fetch_pipefds[0], cycle->log); + + if (c == NULL) { + goto failed; + } + + c->pool = cycle->pool; + + rev = c->read; + wev = c->write; + + rev->log = cycle->log; + wev->log = cycle->log; + +#if (NGX_THREADS) + rev->lock = &c->lock; + wev->lock = &c->lock; + rev->own_lock = &c->lock; + wev->own_lock = &c->lock; +#endif + + rev->channel = 1; + wev->channel = 1; + + rev->handler = ps_base_fetch_event_handler; + + // only EPOLL event has both add_event and add_connection + // same as ngx_add_channel_event + if (ngx_add_conn && (ngx_event_flags & NGX_USE_EPOLL_EVENT) == 0) { + if (ngx_add_conn(c) == NGX_ERROR) { + ngx_free_connection(c); + goto failed; + } + } else { + if (ngx_add_event(rev, NGX_READ_EVENT, 0) == NGX_ERROR) { + ngx_free_connection(c); + goto failed; + } + } + + ps_base_fetch_conn = c; + return NGX_OK; + failed: + + if (close(ps_base_fetch_pipefds[0]) == -1) { + ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno, + "close() base fetch pipe[0] failed"); + } + if (close(ps_base_fetch_pipefds[1]) == -1) { + ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno, + "close() base fetch pipe[1] failed"); + } + return NGX_ERROR; +} + +void ps_base_fetch_event_terminate(ngx_cycle_t *cycle) { + if (ps_base_fetch_conn == NULL) { + return; + } + if (close(ps_base_fetch_pipefds[0]) == -1) { + ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno, + "close() base fetch pipe[0] failed"); + } + if (close(ps_base_fetch_pipefds[1]) == -1) { + ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno, + "close() base fetch pipe[1] failed"); + } +} + +}// namespace + +void ps_base_fetch_signal(ngx_http_request_t *r) { + ssize_t size = 0; + + ngx_log_debug0(NGX_LOG_DEBUG_HTTP, r->connection->log, 0, + "base fetch signal"); + + while (true) { + size = write(ps_base_fetch_pipefds[1], static_cast(&r), + sizeof(ngx_http_request_t *)); + if (size == -1 && errno == EINTR) { + continue; + } + return; + } +} + +namespace +{ + CreateRequestContext::Response ps_create_request_context( ngx_http_request_t* r, bool is_resource_fetch); @@ -553,6 +792,7 @@ void ps_cleanup_main_conf(void* data) { cfg_m->handler = NULL; net_instaweb::NgxRewriteDriverFactory::Terminate(); net_instaweb::NgxRewriteOptions::Terminate(); + ps_base_fetch_event_terminate(const_cast(ngx_cycle)); // reset the factory deleted flag, so we will clean up properly next time, // in case of a configuration reload. @@ -718,12 +958,24 @@ char* ps_merge_loc_conf(ngx_conf_t* cf, void* parent, void* child) { return NGX_CONF_OK; } -ngx_http_output_header_filter_pt ngx_http_next_header_filter; -ngx_http_output_body_filter_pt ngx_http_next_body_filter; void ps_release_request_context(void* data) { ps_request_ctx_t* ctx = static_cast(data); + // In the normal flow BaseFetch doesn't delete itself in HandleDone() because + // we still need to receive notification via pipe and call + // CollectAccumulatedWrites. If there's an error and we're cleaning up early + // then HandleDone() hasn't been called yet and we need the base fetch to wait + // for that and then delete itself. + if (ctx->base_fetch != NULL) { + // We must first disable event from BaseFetch, + // and then clear current pending event. + // So release() Should be called before ps_base_fetch_clear() + ctx->base_fetch->Release(); + ps_base_fetch_clear(ctx->r); + ctx->base_fetch = NULL; + } + // proxy_fetch deleted itself if we called Done(), but if an error happened // before then we need to tell it to delete itself. // @@ -732,32 +984,11 @@ void ps_release_request_context(void* data) { ctx->proxy_fetch->Done(false /* failure */); } - // In the normal flow BaseFetch doesn't delete itself in HandleDone() because - // we still need to receive notification via pipe and call - // CollectAccumulatedWrites. If there's an error and we're cleaning up early - // then HandleDone() hasn't been called yet and we need the base fetch to wait - // for that and then delete itself. - if (ctx->base_fetch != NULL) { - ctx->base_fetch->Release(); - ctx->base_fetch = NULL; - } - if (ctx->inflater_ != NULL) { delete ctx->inflater_; ctx->inflater_ = NULL; } - // Close the connection, delete the events attached with it, and free it to - // Nginx's connection pool - if (ctx->pagespeed_connection != NULL) { - ngx_close_connection(ctx->pagespeed_connection); - ctx->pipe_fd = -1; - } - - if (ctx->pipe_fd != -1) { - close(ctx->pipe_fd); - } - delete ctx; } @@ -878,189 +1109,6 @@ ps_request_ctx_t* ps_get_request_context(ngx_http_request_t* r) { ngx_http_get_module_ctx(r, ngx_pagespeed)); } -// Returns: -// NGX_OK: pagespeed is done, request complete -// NGX_AGAIN: pagespeed still working, needs to be called again later -// NGX_ERROR: error -ngx_int_t ps_update(ps_request_ctx_t* ctx, ngx_event_t* ev) { - bool done; - int rc; - char chr; - do { - rc = read(ctx->pipe_fd, &chr, 1); - } while (rc == -1 && errno == EINTR); // Retry on EINTR. - - // read() should only ever return 0 (closed), 1 (data), or -1 (error). - CHECK(rc == -1 || rc == 0 || rc == 1); - - if (rc == -1) { - if (errno == EAGAIN || errno == EWOULDBLOCK) { - PDBG(ctx, "no data to read from pagespeed yet"); - return NGX_AGAIN; - } else { - perror("ps_connection_read_handler"); - return NGX_ERROR; - } - } else { - // We're done iff we read 0 bytes because that means the pipe was closed. - done = (rc == 0); - } - - // Get output from pagespeed. - if (ctx->is_resource_fetch && !ctx->sent_headers) { - // For resource fetches, the first pipe-byte tells us headers are available - // for fetching. - rc = ctx->base_fetch->CollectHeaders(&ctx->r->headers_out); - if (rc != NGX_OK) { - PDBG(ctx, "problem with CollectHeaders"); - return rc; - } - - ngx_http_send_header(ctx->r); - ctx->sent_headers = true; - } else { - // For proxy fetches and subsequent resource fetch pipe-bytes, the response - // body is available for (partial) fetching. - ngx_chain_t* cl; - rc = ctx->base_fetch->CollectAccumulatedWrites(&cl); - if (rc != NGX_OK) { - PDBG(ctx, "problem with CollectAccumulatedWrites"); - return rc; - } - - PDBG(ctx, "pagespeed update: %p, done: %d", cl, done); - - if (cl == NULL) { - return done ? NGX_OK : NGX_AGAIN; - } - // Pass the optimized content along to later body filters. - // From Weibin: This function should be called mutiple times. Store the - // whole file in one chain buffers is too aggressive. It could consume - // too much memory in busy servers. - rc = ngx_http_next_body_filter(ctx->r, cl); - if (rc == NGX_AGAIN && done) { - ctx->write_pending = 1; - return NGX_OK; - } - - if (rc != NGX_OK) { - return rc; - } - } - - return done ? NGX_OK : NGX_AGAIN; -} - -void ps_writer(ngx_http_request_t* r) { - ngx_connection_t* c = r->connection; - ngx_event_t* wev = c->write; - - ngx_log_debug2(NGX_LOG_DEBUG_HTTP, wev->log, 0, - "http pagespeed writer handler: \"%V?%V\"", - &r->uri, &r->args); - - if (wev->timedout) { - ngx_log_error(NGX_LOG_INFO, c->log, NGX_ETIMEDOUT, - "client timed out"); - c->timedout = 1; - - ngx_http_finalize_request(r, NGX_HTTP_REQUEST_TIME_OUT); - return; - } - - int rc = ngx_http_next_body_filter(r, NULL); - ngx_log_debug3(NGX_LOG_DEBUG_HTTP, c->log, 0, - "http pagespeed writer output filter: %d, \"%V?%V\"", - rc, &r->uri, &r->args); - if (rc == NGX_AGAIN) { - return; - } - - r->write_event_handler = ngx_http_request_empty_handler; - ngx_http_finalize_request(r, rc); -} - -ngx_int_t ngx_http_set_pagespeed_write_handler(ngx_http_request_t *r) { - r->http_state = NGX_HTTP_WRITING_REQUEST_STATE; - - r->read_event_handler = ngx_http_request_empty_handler; - r->write_event_handler = ps_writer; - - ngx_event_t* wev = r->connection->write; - ngx_http_core_loc_conf_t* clcf = static_cast( - ngx_http_get_module_loc_conf(r, ngx_http_core_module)); - ngx_add_timer(wev, clcf->send_timeout); - - if (ngx_handle_write_event(wev, clcf->send_lowat) != NGX_OK) { - return NGX_ERROR; - } - - return NGX_OK; -} - -void ps_connection_read_handler(ngx_event_t* ev) { - CHECK(ev != NULL); - - ngx_connection_t* c = static_cast(ev->data); - CHECK(c != NULL); - - ps_request_ctx_t* ctx = - static_cast(c->data); - CHECK(ctx != NULL); - - int rc = ps_update(ctx, ev); - ngx_log_debug1(NGX_LOG_DEBUG_HTTP, ev->log, 0, - "http pagespeed connection read handler rc: %d", rc); - - if (rc == NGX_AGAIN) { - // Request needs more work by pagespeed. - rc = ngx_handle_read_event(ev, 0); - CHECK(rc == NGX_OK); - } else if (rc == NGX_OK) { - // Pagespeed is done. Stop watching the pipe. If we still have data to - // write, set a write handler so we can get called back to make our write. - ngx_del_event(ev, NGX_READ_EVENT, 0); - ps_set_buffered(ctx->r, false); - if (ctx->write_pending) { - if (ngx_http_set_pagespeed_write_handler(ctx->r) != NGX_OK) { - ngx_http_finalize_request(ctx->r, NGX_HTTP_INTERNAL_SERVER_ERROR); - } - } else { - ngx_http_finalize_request(ctx->r, NGX_DONE); - } - } else if (rc == NGX_ERROR) { - ngx_http_finalize_request(ctx->r, NGX_HTTP_INTERNAL_SERVER_ERROR); - } else { - CHECK(false); - } -} - -ngx_int_t ps_create_connection(ps_request_ctx_t* ctx) { - ngx_connection_t* c = ngx_get_connection( - ctx->pipe_fd, ctx->r->connection->log); - if (c == NULL) { - return NGX_ERROR; - } - - c->recv = ngx_recv; - c->send = ngx_send; - c->recv_chain = ngx_recv_chain; - c->send_chain = ngx_send_chain; - - c->log_error = ctx->r->connection->log_error; - - c->read->log = c->log; - c->write->log = c->log; - - ctx->pagespeed_connection = c; - - // Tell nginx to monitor this pipe and call us back when there's data. - c->data = ctx; - c->read->handler = ps_connection_read_handler; - ngx_add_event(c->read, NGX_READ_EVENT, 0); - - return NGX_OK; -} // Populate cfg_* with configuration information for this // request. Thin wrappers around ngx_http_get_module_*_conf and cast. @@ -1500,50 +1548,19 @@ CreateRequestContext::Response ps_create_request_context( return CreateRequestContext::kNotUnderstood; } - int file_descriptors[2]; - int rc = pipe(file_descriptors); - if (rc != 0) { - ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "pipe() failed"); - return CreateRequestContext::kError; - } - - if (ngx_nonblocking(file_descriptors[0]) == -1) { - ngx_log_error(NGX_LOG_EMERG, r->connection->log, ngx_socket_errno, - ngx_nonblocking_n " pipe[0] failed"); - return CreateRequestContext::kError; - } - - if (ngx_nonblocking(file_descriptors[1]) == -1) { - ngx_log_error(NGX_LOG_EMERG, r->connection->log, ngx_socket_errno, - ngx_nonblocking_n " pipe[1] failed"); - return CreateRequestContext::kError; - } - ps_request_ctx_t* ctx = new ps_request_ctx_t(); ctx->r = r; - ctx->pipe_fd = file_descriptors[0]; ctx->is_resource_fetch = is_resource_fetch; ctx->write_pending = false; - ctx->pagespeed_connection = NULL; - rc = ps_create_connection(ctx); - if (rc != NGX_OK) { - close(file_descriptors[1]); - - ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, - "ps_create_request_context: " - "no pagespeed connection."); - ps_release_request_context(ctx); - return CreateRequestContext::kError; - } // Handles its own deletion. We need to call Release() when we're done with // it, and call Done() on the associated parent (Proxy or Resource) fetch. If // we fail before creating the associated fetch then we need to call Done() on // the BaseFetch ourselves. ctx->base_fetch = new net_instaweb::NgxBaseFetch( - r, file_descriptors[1], + r, cfg_s->server_context, net_instaweb::RequestContextPtr(new net_instaweb::NgxRequestContext( cfg_s->server_context->thread_system()->NewMutex(), r))); @@ -1604,15 +1621,20 @@ CreateRequestContext::Response ps_create_request_context( // rewrite drivers and so is faster because there's no wait to construct // them. Otherwise we have to build a new one every time. + // Do not store driver in request_context, it's not safe. + net_instaweb::RewriteDriver* driver; + if (custom_options == NULL) { - ctx->driver = cfg_s->server_context->NewRewriteDriver( + driver = cfg_s->server_context->NewRewriteDriver( ctx->base_fetch->request_context()); } else { // NewCustomRewriteDriver takes ownership of custom_options. - ctx->driver = cfg_s->server_context->NewCustomRewriteDriver( + driver = cfg_s->server_context->NewCustomRewriteDriver( custom_options, ctx->base_fetch->request_context()); } + ctx->modify_headers = driver->options()->modify_caching_headers(); + // TODO(jefftk): FlushEarlyFlow would go here. // Will call StartParse etc. The rewrite driver will take care of deleting @@ -1715,22 +1737,15 @@ ngx_int_t ps_body_filter(ngx_http_request_t* r, ngx_chain_t* in) { ngx_log_debug1(NGX_LOG_DEBUG_HTTP, r->connection->log, 0, "http pagespeed filter \"%V\"", &r->uri); - if (!ctx->data_received) { - // This is the first set of buffers we've got for this request. - ctx->data_received = true; - // Call this here and not in the header filter because we want to see the - // headers after any other filters are finished modifying them. At this - // point they are final. - // TODO(jefftk): is this thread safe? - ctx->base_fetch->PopulateResponseHeaders(); - } - if (in != NULL) { // Send all input data to the proxy fetch. ps_send_to_pagespeed(r, ctx, cfg_s, in); } - ps_set_buffered(r, true); + + if (ctx->write_pending) { + return ps_send_response(r); + } return NGX_AGAIN; } @@ -1909,7 +1924,6 @@ ngx_int_t ps_header_filter(ngx_http_request_t* r) { break; } ctx = ps_get_request_context(r); - CHECK(ctx->driver != NULL); // Not a resource fetch, so driver is defined. if (r->headers_out.content_encoding && r->headers_out.content_encoding->value.len) { @@ -1939,44 +1953,28 @@ ngx_int_t ps_header_filter(ngx_http_request_t* r) { } } - const net_instaweb::RewriteOptions* options = ctx->driver->options(); - ps_strip_html_headers(r); - if (options->modify_caching_headers()) { - // Don't cache html. See mod_instaweb:instaweb_fix_headers_filter. - ps_set_cache_control(r, const_cast("max-age=0, no-cache")); - - // Pagespeed html doesn't need etags: it should never be cached. - ngx_http_clear_etag(r); - - // An html page may change without the underlying file changing, because of - // how resources are included. Pagespeed adds cache control headers for - // resources instead of using the last modified header. - ngx_http_clear_last_modified(r); - } - r->filter_need_in_memory = 1; - // Set the "X-Page-Speed: VERSION" header. - ngx_table_elt_t* x_pagespeed = static_cast( - ngx_list_push(&r->headers_out.headers)); - if (x_pagespeed == NULL) { - return NGX_ERROR; - } - // Tell ngx_http_header_filter_module to include this header in the response. - x_pagespeed->hash = 1; - - ngx_str_set(&x_pagespeed->key, kPageSpeedHeader); - // It's safe to use c_str here because once we're handling requests the - // rewrite options are frozen and won't change out from under us. - x_pagespeed->value.data = reinterpret_cast(const_cast( - options->x_header_value().c_str())); - x_pagespeed->value.len = options->x_header_value().size(); - return ngx_http_next_header_filter(r); } +ngx_int_t ps_copy_header_filter(ngx_http_request_t *r) { + ngx_log_debug1(NGX_LOG_DEBUG_HTTP, r->connection->log, 0, + "ps copy header filter: %V", &r->uri); + + ps_request_ctx_t* ctx = ps_get_request_context(r); + + if (ctx && !ctx->is_resource_fetch) { + // TODO(jefftk): is this thread safe? + ctx->base_fetch->PopulateResponseHeaders(); + return NGX_AGAIN; + } + + return ngx_http_header_filter(r); +} + // TODO(oschaaf): make ps_static_handler use write_handler_response? for now, // minimize the diff ngx_int_t ps_static_handler(ngx_http_request_t* r) { @@ -2531,6 +2529,18 @@ ngx_int_t ps_init(ngx_conf_t* cf) { return NGX_OK; } +ngx_int_t ps_copy_filter_init(ngx_conf_t* cf) { + ps_main_conf_t* cfg_m = static_cast( + ngx_http_conf_get_module_main_conf(cf, ngx_pagespeed)); + + if (cfg_m->driver_factory != NULL) { + ngx_http_header_filter = ngx_http_top_header_filter; + ngx_http_top_header_filter = ps_copy_header_filter; + } + + return NGX_OK; +} + ngx_http_module_t ps_module = { NULL, // preconfiguration ps_init, // postconfiguration @@ -2545,6 +2555,20 @@ ngx_http_module_t ps_module = { ps_merge_loc_conf }; +ngx_http_module_t ps_copy_filter_module = { + NULL, + ps_copy_filter_init, // postconfiguration + + NULL, + NULL, + + NULL, + NULL, + + NULL, + NULL +}; + // called after configuration is complete, but before nginx starts forking ngx_int_t ps_init_module(ngx_cycle_t* cycle) { ps_main_conf_t* cfg_m = static_cast( @@ -2637,6 +2661,10 @@ ngx_int_t ps_init_child_process(ngx_cycle_t* cycle) { return NGX_OK; } + if (ps_base_fetch_event_init(cycle) != NGX_OK) { + return NGX_ERROR; + } + // ChildInit() will initialise all ServerContexts, which we need to // create ProxyFetchFactories below cfg_m->driver_factory->ChildInit(cycle->log); @@ -2686,3 +2714,19 @@ ngx_module_t ngx_pagespeed = { NULL, NGX_MODULE_V1_PADDING }; + +ngx_module_t ngx_pagespeed_copy_filter = { + NGX_MODULE_V1, + &ngx_psol::ps_copy_filter_module, + NULL, + NGX_HTTP_MODULE, + NULL, + NULL, + NULL, + NULL, + NULL, + NULL, + NULL, + NGX_MODULE_V1_PADDING +}; + diff --git a/src/ngx_pagespeed.h b/src/ngx_pagespeed.h index e4ab15139..3e1f342fa 100644 --- a/src/ngx_pagespeed.h +++ b/src/ngx_pagespeed.h @@ -83,17 +83,16 @@ ngx_int_t copy_response_headers_to_ngx( typedef struct { net_instaweb::ProxyFetch* proxy_fetch; net_instaweb::NgxBaseFetch* base_fetch; - net_instaweb::RewriteDriver* driver; - bool data_received; - int pipe_fd; - ngx_connection_t* pagespeed_connection; ngx_http_request_t* r; bool is_resource_fetch; - bool sent_headers; bool write_pending; + bool modify_headers; net_instaweb::GzipInflater* inflater_; } ps_request_ctx_t; +// called by net_instaweb::NgxBaseFetch to notify event +void ps_base_fetch_signal(ngx_http_request_t *r); + } // namespace ngx_psol #endif // NGX_PAGESPEED_H_ diff --git a/test/nginx_system_test.sh b/test/nginx_system_test.sh index 3f27dc3ee..55f740c23 100755 --- a/test/nginx_system_test.sh +++ b/test/nginx_system_test.sh @@ -511,6 +511,7 @@ check_from "$HTML_HEADERS" egrep -q 'Cache-Control: max-age=0, no-cache' start_test ModPagespeedModifyCachingHeaders URL=$TEST_ROOT/retain_cache_control/index.html +echo $WGET_DUMP $URL OUT=$($WGET_DUMP $URL) check_from "$OUT" grep -q "Cache-Control: private, max-age=3000" check_from "$OUT" grep -q "Last-Modified:"