system: roll back fd optimization
This commit is contained in:
@@ -153,8 +153,6 @@ if [ $ngx_found = yes ]; then
|
||||
# Make pagespeed run immediately before gzip.
|
||||
HTTP_FILTER_MODULES=$(echo $HTTP_FILTER_MODULES |\
|
||||
sed "s/$HTTP_GZIP_FILTER_MODULE/$HTTP_GZIP_FILTER_MODULE $ngx_addon_name/")
|
||||
HTTP_FILTER_MODULES=$(echo $HTTP_FILTER_MODULES |\
|
||||
sed "s/$HTTP_HEADER_FILTER_MODULE/$HTTP_HEADER_FILTER_MODULE ngx_pagespeed_copy_filter/")
|
||||
CORE_LIBS="$CORE_LIBS $pagespeed_libs"
|
||||
CORE_INCS="$CORE_INCS $pagespeed_include"
|
||||
else
|
||||
|
||||
+30
-44
@@ -27,7 +27,7 @@
|
||||
|
||||
namespace net_instaweb {
|
||||
|
||||
NgxBaseFetch::NgxBaseFetch(ngx_http_request_t* r,
|
||||
NgxBaseFetch::NgxBaseFetch(ngx_http_request_t* r, int pipe_fd,
|
||||
NgxServerContext* server_context,
|
||||
const RequestContextPtr& request_ctx)
|
||||
: AsyncFetch(request_ctx),
|
||||
@@ -35,8 +35,8 @@ NgxBaseFetch::NgxBaseFetch(ngx_http_request_t* r,
|
||||
server_context_(server_context),
|
||||
done_called_(false),
|
||||
last_buf_sent_(false),
|
||||
references_(2),
|
||||
flush_(false) {
|
||||
pipe_fd_(pipe_fd),
|
||||
references_(2) {
|
||||
if (pthread_mutex_init(&mutex_, NULL)) CHECK(0);
|
||||
PopulateRequestHeaders();
|
||||
}
|
||||
@@ -115,9 +115,7 @@ bool NgxBaseFetch::HandleWrite(const StringPiece& sp,
|
||||
|
||||
ngx_int_t NgxBaseFetch::CopyBufferToNginx(ngx_chain_t** link_ptr) {
|
||||
if (done_called_ && last_buf_sent_) {
|
||||
// OK means HandleDone has been called
|
||||
*link_ptr = NULL;
|
||||
return NGX_OK;
|
||||
return NGX_DECLINED;
|
||||
}
|
||||
|
||||
int rc = ngx_psol::string_piece_to_buffer_chain(
|
||||
@@ -131,42 +129,26 @@ ngx_int_t NgxBaseFetch::CopyBufferToNginx(ngx_chain_t** link_ptr) {
|
||||
|
||||
if (done_called_) {
|
||||
last_buf_sent_ = true;
|
||||
return NGX_OK;
|
||||
}
|
||||
|
||||
return NGX_AGAIN;
|
||||
return NGX_OK;
|
||||
}
|
||||
|
||||
// There may also be a race condition if this is called between the last Write()
|
||||
// and Done() such that we're sending an empty buffer with last_buf set, which I
|
||||
// think nginx will reject.
|
||||
ngx_int_t NgxBaseFetch::CollectAccumulatedWrites(ngx_chain_t** link_ptr) {
|
||||
ngx_int_t rc = NGX_DECLINED;
|
||||
Lock();
|
||||
if (flush_) {
|
||||
rc = CopyBufferToNginx(link_ptr);
|
||||
flush_ = false;
|
||||
}
|
||||
ngx_int_t rc = CopyBufferToNginx(link_ptr);
|
||||
Unlock();
|
||||
|
||||
if (rc == NGX_DECLINED) {
|
||||
*link_ptr = NULL;
|
||||
return NGX_OK;
|
||||
}
|
||||
return rc;
|
||||
}
|
||||
|
||||
void NgxBaseFetch::RequestCollection() {
|
||||
Lock();
|
||||
if (flush_) {
|
||||
Unlock();
|
||||
return;
|
||||
}
|
||||
flush_ = true;
|
||||
ngx_psol::ps_base_fetch_signal(request_);
|
||||
Unlock();
|
||||
return;
|
||||
}
|
||||
|
||||
|
||||
ngx_int_t NgxBaseFetch::CollectHeaders(ngx_http_headers_out_t* headers_out) {
|
||||
Lock();
|
||||
const ResponseHeaders* pagespeed_headers = response_headers();
|
||||
@@ -174,6 +156,24 @@ ngx_int_t NgxBaseFetch::CollectHeaders(ngx_http_headers_out_t* headers_out) {
|
||||
return ngx_psol::copy_response_headers_to_ngx(request_, *pagespeed_headers);
|
||||
}
|
||||
|
||||
void NgxBaseFetch::RequestCollection() {
|
||||
int rc;
|
||||
char c = 'A'; // What byte we write is arbitrary.
|
||||
while (true) {
|
||||
rc = write(pipe_fd_, &c, 1);
|
||||
if (rc == 1) {
|
||||
break;
|
||||
} else if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK) {
|
||||
// TODO(jefftk): is this rare enough that spinning isn't a problem? Could
|
||||
// we get into a case where the pipe fills up and we spin forever?
|
||||
|
||||
} else {
|
||||
perror("NgxBaseFetch::RequestCollection");
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void NgxBaseFetch::HandleHeadersComplete() {
|
||||
// If this is a 404 response we need to count it in the stats.
|
||||
if (response_headers()->status_code() == HttpStatus::kNotFound) {
|
||||
@@ -189,9 +189,6 @@ bool NgxBaseFetch::HandleFlush(MessageHandler* handler) {
|
||||
}
|
||||
|
||||
void NgxBaseFetch::Release() {
|
||||
Lock();
|
||||
flush_ = true;
|
||||
Unlock();
|
||||
DecrefAndDeleteIfUnreferenced();
|
||||
}
|
||||
|
||||
@@ -205,24 +202,13 @@ void NgxBaseFetch::DecrefAndDeleteIfUnreferenced() {
|
||||
void NgxBaseFetch::HandleDone(bool success) {
|
||||
// TODO(jefftk): it's possible that instead of locking here we can just modify
|
||||
// CopyBufferToNginx to only read done_called_ once.
|
||||
|
||||
if (done_called_ == true) {
|
||||
return;
|
||||
}
|
||||
|
||||
Lock();
|
||||
if (done_called_ == true) {
|
||||
Unlock();
|
||||
return;
|
||||
}
|
||||
|
||||
done_called_ = true;
|
||||
if (!flush_) {
|
||||
flush_ = true;
|
||||
ngx_psol::ps_base_fetch_signal(request_);
|
||||
}
|
||||
|
||||
Unlock();
|
||||
|
||||
close(pipe_fd_); // Indicates to nginx that we're done with the rewrite.
|
||||
pipe_fd_ = -1;
|
||||
|
||||
DecrefAndDeleteIfUnreferenced();
|
||||
}
|
||||
|
||||
|
||||
@@ -55,7 +55,7 @@ namespace net_instaweb {
|
||||
|
||||
class NgxBaseFetch : public AsyncFetch {
|
||||
public:
|
||||
NgxBaseFetch(ngx_http_request_t* r,
|
||||
NgxBaseFetch(ngx_http_request_t* r, int pipe_fd,
|
||||
NgxServerContext* server_context,
|
||||
const RequestContextPtr& request_ctx);
|
||||
virtual ~NgxBaseFetch();
|
||||
@@ -102,9 +102,7 @@ class NgxBaseFetch : public AsyncFetch {
|
||||
// Lock must be acquired first.
|
||||
// Returns:
|
||||
// NGX_DECLINED: nothing to send, short circuit. Buffer not allocated.
|
||||
// NGX_ERROR: failure
|
||||
// NGX_AGAIN: success
|
||||
// NGX_OK: done, HandleDone has been called
|
||||
// NGX_OK, NGX_ERROR: success, failure
|
||||
// Allocates an nginx buffer, copies our buffer_ contents into it, clears
|
||||
// buffer_.
|
||||
ngx_int_t CopyBufferToNginx(ngx_chain_t** link_ptr);
|
||||
@@ -121,14 +119,12 @@ class NgxBaseFetch : public AsyncFetch {
|
||||
NgxServerContext* server_context_;
|
||||
bool done_called_;
|
||||
bool last_buf_sent_;
|
||||
int pipe_fd_;
|
||||
// How many active references there are to this fetch. Starts at two,
|
||||
// decremented once when Done() is called and once when Release() is called.
|
||||
int references_;
|
||||
pthread_mutex_t mutex_;
|
||||
|
||||
// set by RequestCollection, cleared by CollectAccumulatedWrites
|
||||
volatile bool flush_;
|
||||
|
||||
DISALLOW_COPY_AND_ASSIGN(NgxBaseFetch);
|
||||
};
|
||||
|
||||
|
||||
+290
-334
@@ -74,7 +74,7 @@ extern ngx_module_t ngx_pagespeed;
|
||||
#define DBG(r, args...) \
|
||||
ngx_log_error(NGX_LOG_DEBUG, (r)->connection->log, 0, args)
|
||||
#define PDBG(ctx, args...) \
|
||||
ngx_log_error(NGX_LOG_DEBUG, (ctx)->r->connection->log, 0, args)
|
||||
ngx_log_error(NGX_LOG_DEBUG, (ctx)->pagespeed_connection->log, 0, args)
|
||||
#define CDBG(cf, args...) \
|
||||
ngx_conf_log_error(NGX_LOG_DEBUG, cf, 0, args)
|
||||
|
||||
@@ -311,6 +311,12 @@ ps_request_ctx_t* ps_get_request_context(ngx_http_request_t* r);
|
||||
|
||||
void ps_initialize_server_context(ps_srv_conf_t* cfg);
|
||||
|
||||
ngx_int_t ps_update(ps_request_ctx_t* ctx, ngx_event_t* ev);
|
||||
|
||||
void ps_connection_read_handler(ngx_event_t* ev);
|
||||
|
||||
ngx_int_t ps_create_connection(ps_request_ctx_t* ctx);
|
||||
|
||||
namespace CreateRequestContext {
|
||||
enum Response {
|
||||
kOk,
|
||||
@@ -328,251 +334,6 @@ enum Response {
|
||||
};
|
||||
} // namespace CreateRequestContext
|
||||
|
||||
ngx_http_output_header_filter_pt ngx_http_next_header_filter;
|
||||
ngx_http_output_header_filter_pt ngx_http_header_filter;
|
||||
ngx_http_output_body_filter_pt ngx_http_next_body_filter;
|
||||
|
||||
|
||||
ngx_int_t ps_send_response(ngx_http_request_t *r) {
|
||||
ps_request_ctx_t* ctx = ps_get_request_context(r);
|
||||
ngx_int_t rc;
|
||||
|
||||
ngx_log_debug1(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
|
||||
"ps send response: %V", &r->uri);
|
||||
|
||||
// Get output from pagespeed.
|
||||
if (!r->header_sent) {
|
||||
if (ctx->is_resource_fetch || ctx->modify_headers) {
|
||||
ngx_http_clean_header(r);
|
||||
|
||||
rc = ctx->base_fetch->CollectHeaders(&r->headers_out);
|
||||
if (rc == NGX_ERROR) {
|
||||
return NGX_HTTP_INTERNAL_SERVER_ERROR;
|
||||
}
|
||||
}
|
||||
if (ctx->is_resource_fetch) {
|
||||
rc = ngx_http_send_header(r);
|
||||
} else {
|
||||
rc = ngx_http_header_filter(r);
|
||||
}
|
||||
|
||||
// standard nginx send header check see ngx_http_send_response
|
||||
if (rc == NGX_ERROR || rc > NGX_OK) {
|
||||
return ngx_http_filter_finalize_request(r, NULL, rc);
|
||||
}
|
||||
if (rc == NGX_OK && r->header_only) {
|
||||
return NGX_OK;
|
||||
}
|
||||
}
|
||||
|
||||
ngx_chain_t* cl;
|
||||
|
||||
// OK means last buffer has been sent
|
||||
rc = ctx->base_fetch->CollectAccumulatedWrites(&cl);
|
||||
PDBG(ctx, "CollectAccumulatedWrites, %d", rc);
|
||||
|
||||
if (rc == NGX_ERROR) {
|
||||
return NGX_HTTP_INTERNAL_SERVER_ERROR;
|
||||
}
|
||||
|
||||
// rc == NGX_OK || rc == NGX_AGAIN || rc == NGX_DECLINED
|
||||
//
|
||||
// Pass the optimized content along to later body filters.
|
||||
// From Weibin: This function should be called multiple times. Store the
|
||||
// whole file in one chain buffers is too aggressive. It could consume
|
||||
// too much memory in busy servers.
|
||||
|
||||
bool done = (rc == NGX_OK);
|
||||
|
||||
// body_filter can handle NULL chain.
|
||||
rc = ngx_http_next_body_filter(r, cl);
|
||||
|
||||
if (rc == NGX_OK) {
|
||||
ctx->write_pending = false;
|
||||
if (done) {
|
||||
ps_set_buffered(ctx->r, false);
|
||||
return NGX_OK;
|
||||
}
|
||||
}
|
||||
|
||||
if (rc == NGX_OK || rc == NGX_AGAIN) {
|
||||
ps_set_buffered(ctx->r, true);
|
||||
return NGX_AGAIN;
|
||||
}
|
||||
|
||||
// others
|
||||
return rc;
|
||||
}
|
||||
|
||||
|
||||
ngx_connection_t *ps_base_fetch_conn = NULL;
|
||||
int ps_base_fetch_pipefds[2] = {-1, -1};
|
||||
|
||||
// handle base fetch event(Flush/HeadersComplete/Done),
|
||||
// and ignore the event corresponding to SKIP.
|
||||
// SKIP should only be specified in ps_release_request_context,
|
||||
// so that released base fetch event will be ignored.
|
||||
//
|
||||
// modified from ngx_channel_handler
|
||||
void ps_base_fetch_clear(ngx_http_request_t *skip = NULL) {
|
||||
for ( ;; ) {
|
||||
ngx_http_request_t *requests[512];
|
||||
ssize_t size = read(ps_base_fetch_conn->fd,
|
||||
static_cast<void *>(requests), sizeof(requests));
|
||||
|
||||
if (size == -1) {
|
||||
if (ngx_errno == EINTR) {
|
||||
continue;
|
||||
} else if (ngx_errno == EAGAIN) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
if (size <= 0) {
|
||||
// Terminate
|
||||
if (ngx_event_flags & NGX_USE_EPOLL_EVENT) {
|
||||
ngx_del_conn(ps_base_fetch_conn, 0);
|
||||
}
|
||||
|
||||
ngx_close_connection(ps_base_fetch_conn);
|
||||
ps_base_fetch_conn = NULL;
|
||||
break;
|
||||
}
|
||||
|
||||
ngx_uint_t i;
|
||||
for (i = 0; i < size / sizeof(ngx_http_request_t *); i++) {
|
||||
ngx_http_request_t *r = requests[i];
|
||||
if (r == skip) {
|
||||
ngx_log_debug0(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
|
||||
"base fetch clear");
|
||||
continue;
|
||||
}
|
||||
ps_request_ctx_t* ctx = ps_get_request_context(r);
|
||||
ctx->write_pending = true;;
|
||||
ngx_http_finalize_request(r, ps_send_response(r));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void ps_base_fetch_event_handler(ngx_event_t *ev) {
|
||||
if (ev->timedout) {
|
||||
ev->timedout = 0;
|
||||
return;
|
||||
}
|
||||
ngx_log_debug0(NGX_LOG_DEBUG_EVENT, ev->log, 0, "base fetch event handler");
|
||||
ps_base_fetch_clear(NULL);
|
||||
}
|
||||
|
||||
ngx_int_t ps_base_fetch_event_init(ngx_cycle_t *cycle) {
|
||||
ps_main_conf_t* cfg_m = static_cast<ps_main_conf_t*>(
|
||||
ngx_http_cycle_get_module_main_conf(cycle, ngx_pagespeed));
|
||||
|
||||
if (::pipe(ps_base_fetch_pipefds) != 0) {
|
||||
cfg_m->handler->Message(net_instaweb::kError, "base fetch pipe() failed");
|
||||
return NGX_ERROR;
|
||||
}
|
||||
|
||||
if (ngx_nonblocking(ps_base_fetch_pipefds[0]) == -1) {
|
||||
cfg_m->handler->Message(net_instaweb::kError,
|
||||
"base fetch pipe[0] " ngx_nonblocking_n " failed");
|
||||
goto failed;
|
||||
}
|
||||
|
||||
// following codes are modified from ngx_add_channel_event
|
||||
// we have to save ngx_connection_t, so can not use ngx_add_channel_event
|
||||
ngx_event_t *rev, *wev;
|
||||
ngx_connection_t *c;
|
||||
|
||||
c = ngx_get_connection(ps_base_fetch_pipefds[0], cycle->log);
|
||||
|
||||
if (c == NULL) {
|
||||
goto failed;
|
||||
}
|
||||
|
||||
c->pool = cycle->pool;
|
||||
|
||||
rev = c->read;
|
||||
wev = c->write;
|
||||
|
||||
rev->log = cycle->log;
|
||||
wev->log = cycle->log;
|
||||
|
||||
#if (NGX_THREADS)
|
||||
rev->lock = &c->lock;
|
||||
wev->lock = &c->lock;
|
||||
rev->own_lock = &c->lock;
|
||||
wev->own_lock = &c->lock;
|
||||
#endif
|
||||
|
||||
rev->channel = 1;
|
||||
wev->channel = 1;
|
||||
|
||||
rev->handler = ps_base_fetch_event_handler;
|
||||
|
||||
// only EPOLL event has both add_event and add_connection
|
||||
// same as ngx_add_channel_event
|
||||
if (ngx_add_conn && (ngx_event_flags & NGX_USE_EPOLL_EVENT) == 0) {
|
||||
if (ngx_add_conn(c) == NGX_ERROR) {
|
||||
ngx_free_connection(c);
|
||||
goto failed;
|
||||
}
|
||||
} else {
|
||||
if (ngx_add_event(rev, NGX_READ_EVENT, 0) == NGX_ERROR) {
|
||||
ngx_free_connection(c);
|
||||
goto failed;
|
||||
}
|
||||
}
|
||||
|
||||
ps_base_fetch_conn = c;
|
||||
return NGX_OK;
|
||||
failed:
|
||||
|
||||
if (close(ps_base_fetch_pipefds[0]) == -1) {
|
||||
ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
|
||||
"close() base fetch pipe[0] failed");
|
||||
}
|
||||
if (close(ps_base_fetch_pipefds[1]) == -1) {
|
||||
ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
|
||||
"close() base fetch pipe[1] failed");
|
||||
}
|
||||
return NGX_ERROR;
|
||||
}
|
||||
|
||||
void ps_base_fetch_event_terminate(ngx_cycle_t *cycle) {
|
||||
if (ps_base_fetch_conn == NULL) {
|
||||
return;
|
||||
}
|
||||
if (close(ps_base_fetch_pipefds[0]) == -1) {
|
||||
ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
|
||||
"close() base fetch pipe[0] failed");
|
||||
}
|
||||
if (close(ps_base_fetch_pipefds[1]) == -1) {
|
||||
ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
|
||||
"close() base fetch pipe[1] failed");
|
||||
}
|
||||
}
|
||||
|
||||
}// namespace
|
||||
|
||||
void ps_base_fetch_signal(ngx_http_request_t *r) {
|
||||
ssize_t size = 0;
|
||||
|
||||
ngx_log_debug0(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
|
||||
"base fetch signal");
|
||||
|
||||
while (true) {
|
||||
size = write(ps_base_fetch_pipefds[1], static_cast<void *>(&r),
|
||||
sizeof(ngx_http_request_t *));
|
||||
if (size == -1 && errno == EINTR) {
|
||||
continue;
|
||||
}
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
namespace
|
||||
{
|
||||
|
||||
CreateRequestContext::Response ps_create_request_context(
|
||||
ngx_http_request_t* r, bool is_resource_fetch);
|
||||
|
||||
@@ -793,7 +554,6 @@ void ps_cleanup_main_conf(void* data) {
|
||||
cfg_m->handler = NULL;
|
||||
net_instaweb::NgxRewriteDriverFactory::Terminate();
|
||||
net_instaweb::NgxRewriteOptions::Terminate();
|
||||
ps_base_fetch_event_terminate(const_cast<ngx_cycle_t *>(ngx_cycle));
|
||||
|
||||
// reset the factory deleted flag, so we will clean up properly next time,
|
||||
// in case of a configuration reload.
|
||||
@@ -960,24 +720,12 @@ char* ps_merge_loc_conf(ngx_conf_t* cf, void* parent, void* child) {
|
||||
return NGX_CONF_OK;
|
||||
}
|
||||
|
||||
ngx_http_output_header_filter_pt ngx_http_next_header_filter;
|
||||
ngx_http_output_body_filter_pt ngx_http_next_body_filter;
|
||||
|
||||
void ps_release_request_context(void* data) {
|
||||
ps_request_ctx_t* ctx = static_cast<ps_request_ctx_t*>(data);
|
||||
|
||||
// In the normal flow BaseFetch doesn't delete itself in HandleDone() because
|
||||
// we still need to receive notification via pipe and call
|
||||
// CollectAccumulatedWrites. If there's an error and we're cleaning up early
|
||||
// then HandleDone() hasn't been called yet and we need the base fetch to wait
|
||||
// for that and then delete itself.
|
||||
if (ctx->base_fetch != NULL) {
|
||||
// We must first disable event from BaseFetch,
|
||||
// and then clear current pending event.
|
||||
// So release() Should be called before ps_base_fetch_clear()
|
||||
ctx->base_fetch->Release();
|
||||
ps_base_fetch_clear(ctx->r);
|
||||
ctx->base_fetch = NULL;
|
||||
}
|
||||
|
||||
// proxy_fetch deleted itself if we called Done(), but if an error happened
|
||||
// before then we need to tell it to delete itself.
|
||||
//
|
||||
@@ -986,11 +734,32 @@ void ps_release_request_context(void* data) {
|
||||
ctx->proxy_fetch->Done(false /* failure */);
|
||||
}
|
||||
|
||||
// In the normal flow BaseFetch doesn't delete itself in HandleDone() because
|
||||
// we still need to receive notification via pipe and call
|
||||
// CollectAccumulatedWrites. If there's an error and we're cleaning up early
|
||||
// then HandleDone() hasn't been called yet and we need the base fetch to wait
|
||||
// for that and then delete itself.
|
||||
if (ctx->base_fetch != NULL) {
|
||||
ctx->base_fetch->Release();
|
||||
ctx->base_fetch = NULL;
|
||||
}
|
||||
|
||||
if (ctx->inflater_ != NULL) {
|
||||
delete ctx->inflater_;
|
||||
ctx->inflater_ = NULL;
|
||||
}
|
||||
|
||||
// Close the connection, delete the events attached with it, and free it to
|
||||
// Nginx's connection pool
|
||||
if (ctx->pagespeed_connection != NULL) {
|
||||
ngx_close_connection(ctx->pagespeed_connection);
|
||||
ctx->pipe_fd = -1;
|
||||
}
|
||||
|
||||
if (ctx->pipe_fd != -1) {
|
||||
close(ctx->pipe_fd);
|
||||
}
|
||||
|
||||
delete ctx;
|
||||
}
|
||||
|
||||
@@ -1111,6 +880,189 @@ ps_request_ctx_t* ps_get_request_context(ngx_http_request_t* r) {
|
||||
ngx_http_get_module_ctx(r, ngx_pagespeed));
|
||||
}
|
||||
|
||||
// Returns:
|
||||
// NGX_OK: pagespeed is done, request complete
|
||||
// NGX_AGAIN: pagespeed still working, needs to be called again later
|
||||
// NGX_ERROR: error
|
||||
ngx_int_t ps_update(ps_request_ctx_t* ctx, ngx_event_t* ev) {
|
||||
bool done;
|
||||
int rc;
|
||||
char chr;
|
||||
do {
|
||||
rc = read(ctx->pipe_fd, &chr, 1);
|
||||
} while (rc == -1 && errno == EINTR); // Retry on EINTR.
|
||||
|
||||
// read() should only ever return 0 (closed), 1 (data), or -1 (error).
|
||||
CHECK(rc == -1 || rc == 0 || rc == 1);
|
||||
|
||||
if (rc == -1) {
|
||||
if (errno == EAGAIN || errno == EWOULDBLOCK) {
|
||||
PDBG(ctx, "no data to read from pagespeed yet");
|
||||
return NGX_AGAIN;
|
||||
} else {
|
||||
perror("ps_connection_read_handler");
|
||||
return NGX_ERROR;
|
||||
}
|
||||
} else {
|
||||
// We're done iff we read 0 bytes because that means the pipe was closed.
|
||||
done = (rc == 0);
|
||||
}
|
||||
|
||||
// Get output from pagespeed.
|
||||
if (ctx->is_resource_fetch && !ctx->sent_headers) {
|
||||
// For resource fetches, the first pipe-byte tells us headers are available
|
||||
// for fetching.
|
||||
rc = ctx->base_fetch->CollectHeaders(&ctx->r->headers_out);
|
||||
if (rc != NGX_OK) {
|
||||
PDBG(ctx, "problem with CollectHeaders");
|
||||
return rc;
|
||||
}
|
||||
|
||||
ngx_http_send_header(ctx->r);
|
||||
ctx->sent_headers = true;
|
||||
} else {
|
||||
// For proxy fetches and subsequent resource fetch pipe-bytes, the response
|
||||
// body is available for (partial) fetching.
|
||||
ngx_chain_t* cl;
|
||||
rc = ctx->base_fetch->CollectAccumulatedWrites(&cl);
|
||||
if (rc != NGX_OK) {
|
||||
PDBG(ctx, "problem with CollectAccumulatedWrites");
|
||||
return rc;
|
||||
}
|
||||
|
||||
PDBG(ctx, "pagespeed update: %p, done: %d", cl, done);
|
||||
|
||||
if (cl == NULL) {
|
||||
return done ? NGX_OK : NGX_AGAIN;
|
||||
}
|
||||
// Pass the optimized content along to later body filters.
|
||||
// From Weibin: This function should be called mutiple times. Store the
|
||||
// whole file in one chain buffers is too aggressive. It could consume
|
||||
// too much memory in busy servers.
|
||||
rc = ngx_http_next_body_filter(ctx->r, cl);
|
||||
if (rc == NGX_AGAIN && done) {
|
||||
ctx->write_pending = 1;
|
||||
return NGX_OK;
|
||||
}
|
||||
|
||||
if (rc != NGX_OK) {
|
||||
return rc;
|
||||
}
|
||||
}
|
||||
|
||||
return done ? NGX_OK : NGX_AGAIN;
|
||||
}
|
||||
|
||||
void ps_writer(ngx_http_request_t* r) {
|
||||
ngx_connection_t* c = r->connection;
|
||||
ngx_event_t* wev = c->write;
|
||||
|
||||
ngx_log_debug2(NGX_LOG_DEBUG_HTTP, wev->log, 0,
|
||||
"http pagespeed writer handler: \"%V?%V\"",
|
||||
&r->uri, &r->args);
|
||||
|
||||
if (wev->timedout) {
|
||||
ngx_log_error(NGX_LOG_INFO, c->log, NGX_ETIMEDOUT,
|
||||
"client timed out");
|
||||
c->timedout = 1;
|
||||
|
||||
ngx_http_finalize_request(r, NGX_HTTP_REQUEST_TIME_OUT);
|
||||
return;
|
||||
}
|
||||
|
||||
int rc = ngx_http_next_body_filter(r, NULL);
|
||||
ngx_log_debug3(NGX_LOG_DEBUG_HTTP, c->log, 0,
|
||||
"http pagespeed writer output filter: %d, \"%V?%V\"",
|
||||
rc, &r->uri, &r->args);
|
||||
if (rc == NGX_AGAIN) {
|
||||
return;
|
||||
}
|
||||
|
||||
r->write_event_handler = ngx_http_request_empty_handler;
|
||||
ngx_http_finalize_request(r, rc);
|
||||
}
|
||||
|
||||
ngx_int_t ngx_http_set_pagespeed_write_handler(ngx_http_request_t *r) {
|
||||
r->http_state = NGX_HTTP_WRITING_REQUEST_STATE;
|
||||
|
||||
r->read_event_handler = ngx_http_request_empty_handler;
|
||||
r->write_event_handler = ps_writer;
|
||||
|
||||
ngx_event_t* wev = r->connection->write;
|
||||
ngx_http_core_loc_conf_t* clcf = static_cast<ngx_http_core_loc_conf_t*>(
|
||||
ngx_http_get_module_loc_conf(r, ngx_http_core_module));
|
||||
ngx_add_timer(wev, clcf->send_timeout);
|
||||
|
||||
if (ngx_handle_write_event(wev, clcf->send_lowat) != NGX_OK) {
|
||||
return NGX_ERROR;
|
||||
}
|
||||
|
||||
return NGX_OK;
|
||||
}
|
||||
|
||||
void ps_connection_read_handler(ngx_event_t* ev) {
|
||||
CHECK(ev != NULL);
|
||||
|
||||
ngx_connection_t* c = static_cast<ngx_connection_t*>(ev->data);
|
||||
CHECK(c != NULL);
|
||||
|
||||
ps_request_ctx_t* ctx =
|
||||
static_cast<ps_request_ctx_t*>(c->data);
|
||||
CHECK(ctx != NULL);
|
||||
|
||||
int rc = ps_update(ctx, ev);
|
||||
ngx_log_debug1(NGX_LOG_DEBUG_HTTP, ev->log, 0,
|
||||
"http pagespeed connection read handler rc: %d", rc);
|
||||
|
||||
if (rc == NGX_AGAIN) {
|
||||
// Request needs more work by pagespeed.
|
||||
rc = ngx_handle_read_event(ev, 0);
|
||||
CHECK(rc == NGX_OK);
|
||||
} else if (rc == NGX_OK) {
|
||||
// Pagespeed is done. Stop watching the pipe. If we still have data to
|
||||
// write, set a write handler so we can get called back to make our write.
|
||||
ngx_del_event(ev, NGX_READ_EVENT, 0);
|
||||
ps_set_buffered(ctx->r, false);
|
||||
if (ctx->write_pending) {
|
||||
if (ngx_http_set_pagespeed_write_handler(ctx->r) != NGX_OK) {
|
||||
ngx_http_finalize_request(ctx->r, NGX_HTTP_INTERNAL_SERVER_ERROR);
|
||||
}
|
||||
} else {
|
||||
ngx_http_finalize_request(ctx->r, NGX_DONE);
|
||||
}
|
||||
} else if (rc == NGX_ERROR) {
|
||||
ngx_http_finalize_request(ctx->r, NGX_HTTP_INTERNAL_SERVER_ERROR);
|
||||
} else {
|
||||
CHECK(false);
|
||||
}
|
||||
}
|
||||
|
||||
ngx_int_t ps_create_connection(ps_request_ctx_t* ctx) {
|
||||
ngx_connection_t* c = ngx_get_connection(
|
||||
ctx->pipe_fd, ctx->r->connection->log);
|
||||
if (c == NULL) {
|
||||
return NGX_ERROR;
|
||||
}
|
||||
|
||||
c->recv = ngx_recv;
|
||||
c->send = ngx_send;
|
||||
c->recv_chain = ngx_recv_chain;
|
||||
c->send_chain = ngx_send_chain;
|
||||
|
||||
c->log_error = ctx->r->connection->log_error;
|
||||
|
||||
c->read->log = c->log;
|
||||
c->write->log = c->log;
|
||||
|
||||
ctx->pagespeed_connection = c;
|
||||
|
||||
// Tell nginx to monitor this pipe and call us back when there's data.
|
||||
c->data = ctx;
|
||||
c->read->handler = ps_connection_read_handler;
|
||||
ngx_add_event(c->read, NGX_READ_EVENT, 0);
|
||||
|
||||
return NGX_OK;
|
||||
}
|
||||
|
||||
// Populate cfg_* with configuration information for this
|
||||
// request. Thin wrappers around ngx_http_get_module_*_conf and cast.
|
||||
@@ -1550,19 +1502,50 @@ CreateRequestContext::Response ps_create_request_context(
|
||||
return CreateRequestContext::kNotUnderstood;
|
||||
}
|
||||
|
||||
int file_descriptors[2];
|
||||
int rc = pipe(file_descriptors);
|
||||
if (rc != 0) {
|
||||
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "pipe() failed");
|
||||
return CreateRequestContext::kError;
|
||||
}
|
||||
|
||||
if (ngx_nonblocking(file_descriptors[0]) == -1) {
|
||||
ngx_log_error(NGX_LOG_EMERG, r->connection->log, ngx_socket_errno,
|
||||
ngx_nonblocking_n " pipe[0] failed");
|
||||
return CreateRequestContext::kError;
|
||||
}
|
||||
|
||||
if (ngx_nonblocking(file_descriptors[1]) == -1) {
|
||||
ngx_log_error(NGX_LOG_EMERG, r->connection->log, ngx_socket_errno,
|
||||
ngx_nonblocking_n " pipe[1] failed");
|
||||
return CreateRequestContext::kError;
|
||||
}
|
||||
|
||||
ps_request_ctx_t* ctx = new ps_request_ctx_t();
|
||||
|
||||
ctx->r = r;
|
||||
ctx->pipe_fd = file_descriptors[0];
|
||||
ctx->is_resource_fetch = is_resource_fetch;
|
||||
ctx->write_pending = false;
|
||||
ctx->pagespeed_connection = NULL;
|
||||
|
||||
rc = ps_create_connection(ctx);
|
||||
if (rc != NGX_OK) {
|
||||
close(file_descriptors[1]);
|
||||
|
||||
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
|
||||
"ps_create_request_context: "
|
||||
"no pagespeed connection.");
|
||||
ps_release_request_context(ctx);
|
||||
return CreateRequestContext::kError;
|
||||
}
|
||||
|
||||
// Handles its own deletion. We need to call Release() when we're done with
|
||||
// it, and call Done() on the associated parent (Proxy or Resource) fetch. If
|
||||
// we fail before creating the associated fetch then we need to call Done() on
|
||||
// the BaseFetch ourselves.
|
||||
ctx->base_fetch = new net_instaweb::NgxBaseFetch(
|
||||
r,
|
||||
r, file_descriptors[1],
|
||||
cfg_s->server_context,
|
||||
net_instaweb::RequestContextPtr(new net_instaweb::NgxRequestContext(
|
||||
cfg_s->server_context->thread_system()->NewMutex(), r)));
|
||||
@@ -1623,26 +1606,21 @@ CreateRequestContext::Response ps_create_request_context(
|
||||
// rewrite drivers and so is faster because there's no wait to construct
|
||||
// them. Otherwise we have to build a new one every time.
|
||||
|
||||
// Do not store driver in request_context, it's not safe.
|
||||
net_instaweb::RewriteDriver* driver;
|
||||
|
||||
if (custom_options == NULL) {
|
||||
driver = cfg_s->server_context->NewRewriteDriver(
|
||||
ctx->driver = cfg_s->server_context->NewRewriteDriver(
|
||||
ctx->base_fetch->request_context());
|
||||
} else {
|
||||
// NewCustomRewriteDriver takes ownership of custom_options.
|
||||
driver = cfg_s->server_context->NewCustomRewriteDriver(
|
||||
ctx->driver = cfg_s->server_context->NewCustomRewriteDriver(
|
||||
custom_options, ctx->base_fetch->request_context());
|
||||
}
|
||||
|
||||
ctx->modify_headers = driver->options()->modify_caching_headers();
|
||||
|
||||
// TODO(jefftk): FlushEarlyFlow would go here.
|
||||
|
||||
// Will call StartParse etc. The rewrite driver will take care of deleting
|
||||
// itself if necessary.
|
||||
ctx->proxy_fetch = cfg_s->proxy_fetch_factory->CreateNewProxyFetch(
|
||||
url_string, ctx->base_fetch, driver,
|
||||
url_string, ctx->base_fetch, ctx->driver,
|
||||
property_callback.release(),
|
||||
NULL /* original_content_fetch */);
|
||||
}
|
||||
@@ -1739,15 +1717,22 @@ ngx_int_t ps_body_filter(ngx_http_request_t* r, ngx_chain_t* in) {
|
||||
ngx_log_debug1(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
|
||||
"http pagespeed filter \"%V\"", &r->uri);
|
||||
|
||||
if (!ctx->data_received) {
|
||||
// This is the first set of buffers we've got for this request.
|
||||
ctx->data_received = true;
|
||||
// Call this here and not in the header filter because we want to see the
|
||||
// headers after any other filters are finished modifying them. At this
|
||||
// point they are final.
|
||||
// TODO(jefftk): is this thread safe?
|
||||
ctx->base_fetch->PopulateResponseHeaders();
|
||||
}
|
||||
|
||||
if (in != NULL) {
|
||||
// Send all input data to the proxy fetch.
|
||||
ps_send_to_pagespeed(r, ctx, cfg_s, in);
|
||||
}
|
||||
ps_set_buffered(r, true);
|
||||
|
||||
if (ctx->write_pending) {
|
||||
return ps_send_response(r);
|
||||
}
|
||||
ps_set_buffered(r, true);
|
||||
return NGX_AGAIN;
|
||||
}
|
||||
|
||||
@@ -1926,6 +1911,7 @@ ngx_int_t ps_header_filter(ngx_http_request_t* r) {
|
||||
break;
|
||||
}
|
||||
ctx = ps_get_request_context(r);
|
||||
CHECK(ctx->driver != NULL); // Not a resource fetch, so driver is defined.
|
||||
|
||||
if (r->headers_out.content_encoding &&
|
||||
r->headers_out.content_encoding->value.len) {
|
||||
@@ -1955,26 +1941,42 @@ ngx_int_t ps_header_filter(ngx_http_request_t* r) {
|
||||
}
|
||||
}
|
||||
|
||||
const net_instaweb::RewriteOptions* options = ctx->driver->options();
|
||||
|
||||
ps_strip_html_headers(r);
|
||||
|
||||
if (options->modify_caching_headers()) {
|
||||
// Don't cache html. See mod_instaweb:instaweb_fix_headers_filter.
|
||||
ps_set_cache_control(r, const_cast<char*>("max-age=0, no-cache"));
|
||||
|
||||
// Pagespeed html doesn't need etags: it should never be cached.
|
||||
ngx_http_clear_etag(r);
|
||||
|
||||
// An html page may change without the underlying file changing, because of
|
||||
// how resources are included. Pagespeed adds cache control headers for
|
||||
// resources instead of using the last modified header.
|
||||
ngx_http_clear_last_modified(r);
|
||||
}
|
||||
|
||||
r->filter_need_in_memory = 1;
|
||||
|
||||
return ngx_http_next_header_filter(r);
|
||||
}
|
||||
|
||||
ngx_int_t ps_copy_header_filter(ngx_http_request_t *r) {
|
||||
ngx_log_debug1(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
|
||||
"ps copy header filter: %V", &r->uri);
|
||||
|
||||
ps_request_ctx_t* ctx = ps_get_request_context(r);
|
||||
|
||||
if (ctx && !ctx->is_resource_fetch) {
|
||||
// TODO(jefftk): is this thread safe?
|
||||
ctx->base_fetch->PopulateResponseHeaders();
|
||||
return NGX_AGAIN;
|
||||
// Set the "X-Page-Speed: VERSION" header.
|
||||
ngx_table_elt_t* x_pagespeed = static_cast<ngx_table_elt_t*>(
|
||||
ngx_list_push(&r->headers_out.headers));
|
||||
if (x_pagespeed == NULL) {
|
||||
return NGX_ERROR;
|
||||
}
|
||||
// Tell ngx_http_header_filter_module to include this header in the response.
|
||||
x_pagespeed->hash = 1;
|
||||
|
||||
return ngx_http_header_filter(r);
|
||||
ngx_str_set(&x_pagespeed->key, kPageSpeedHeader);
|
||||
// It's safe to use c_str here because once we're handling requests the
|
||||
// rewrite options are frozen and won't change out from under us.
|
||||
x_pagespeed->value.data = reinterpret_cast<u_char*>(const_cast<char*>(
|
||||
options->x_header_value().c_str()));
|
||||
x_pagespeed->value.len = options->x_header_value().size();
|
||||
|
||||
return ngx_http_next_header_filter(r);
|
||||
}
|
||||
|
||||
// TODO(oschaaf): make ps_static_handler use write_handler_response? for now,
|
||||
@@ -2531,18 +2533,6 @@ ngx_int_t ps_init(ngx_conf_t* cf) {
|
||||
return NGX_OK;
|
||||
}
|
||||
|
||||
ngx_int_t ps_copy_filter_init(ngx_conf_t* cf) {
|
||||
ps_main_conf_t* cfg_m = static_cast<ps_main_conf_t*>(
|
||||
ngx_http_conf_get_module_main_conf(cf, ngx_pagespeed));
|
||||
|
||||
if (cfg_m->driver_factory != NULL) {
|
||||
ngx_http_header_filter = ngx_http_top_header_filter;
|
||||
ngx_http_top_header_filter = ps_copy_header_filter;
|
||||
}
|
||||
|
||||
return NGX_OK;
|
||||
}
|
||||
|
||||
ngx_http_module_t ps_module = {
|
||||
NULL, // preconfiguration
|
||||
ps_init, // postconfiguration
|
||||
@@ -2557,20 +2547,6 @@ ngx_http_module_t ps_module = {
|
||||
ps_merge_loc_conf
|
||||
};
|
||||
|
||||
ngx_http_module_t ps_copy_filter_module = {
|
||||
NULL,
|
||||
ps_copy_filter_init, // postconfiguration
|
||||
|
||||
NULL,
|
||||
NULL,
|
||||
|
||||
NULL,
|
||||
NULL,
|
||||
|
||||
NULL,
|
||||
NULL
|
||||
};
|
||||
|
||||
// called after configuration is complete, but before nginx starts forking
|
||||
ngx_int_t ps_init_module(ngx_cycle_t* cycle) {
|
||||
ps_main_conf_t* cfg_m = static_cast<ps_main_conf_t*>(
|
||||
@@ -2663,10 +2639,6 @@ ngx_int_t ps_init_child_process(ngx_cycle_t* cycle) {
|
||||
return NGX_OK;
|
||||
}
|
||||
|
||||
if (ps_base_fetch_event_init(cycle) != NGX_OK) {
|
||||
return NGX_ERROR;
|
||||
}
|
||||
|
||||
// ChildInit() will initialise all ServerContexts, which we need to
|
||||
// create ProxyFetchFactories below
|
||||
cfg_m->driver_factory->ChildInit(cycle->log);
|
||||
@@ -2716,19 +2688,3 @@ ngx_module_t ngx_pagespeed = {
|
||||
NULL,
|
||||
NGX_MODULE_V1_PADDING
|
||||
};
|
||||
|
||||
ngx_module_t ngx_pagespeed_copy_filter = {
|
||||
NGX_MODULE_V1,
|
||||
&ngx_psol::ps_copy_filter_module,
|
||||
NULL,
|
||||
NGX_HTTP_MODULE,
|
||||
NULL,
|
||||
NULL,
|
||||
NULL,
|
||||
NULL,
|
||||
NULL,
|
||||
NULL,
|
||||
NULL,
|
||||
NGX_MODULE_V1_PADDING
|
||||
};
|
||||
|
||||
|
||||
+5
-4
@@ -83,16 +83,17 @@ ngx_int_t copy_response_headers_to_ngx(
|
||||
typedef struct {
|
||||
net_instaweb::ProxyFetch* proxy_fetch;
|
||||
net_instaweb::NgxBaseFetch* base_fetch;
|
||||
net_instaweb::RewriteDriver* driver;
|
||||
bool data_received;
|
||||
int pipe_fd;
|
||||
ngx_connection_t* pagespeed_connection;
|
||||
ngx_http_request_t* r;
|
||||
bool is_resource_fetch;
|
||||
bool sent_headers;
|
||||
bool write_pending;
|
||||
bool modify_headers;
|
||||
net_instaweb::GzipInflater* inflater_;
|
||||
} ps_request_ctx_t;
|
||||
|
||||
// called by net_instaweb::NgxBaseFetch to notify event
|
||||
void ps_base_fetch_signal(ngx_http_request_t *r);
|
||||
|
||||
} // namespace ngx_psol
|
||||
|
||||
#endif // NGX_PAGESPEED_H_
|
||||
|
||||
@@ -140,6 +140,7 @@ PSA_JS_LIBRARY_URL_PREFIX="ngx_pagespeed_static"
|
||||
|
||||
PAGESPEED_EXPECTED_FAILURES="
|
||||
~compression is enabled for rewritten JS.~
|
||||
~convert_meta_tags~
|
||||
~In-place resource optimization~
|
||||
"
|
||||
|
||||
@@ -510,7 +511,6 @@ check_from "$HTML_HEADERS" egrep -q 'Cache-Control: max-age=0, no-cache'
|
||||
|
||||
start_test ModPagespeedModifyCachingHeaders
|
||||
URL=$TEST_ROOT/retain_cache_control/index.html
|
||||
echo $WGET_DUMP $URL
|
||||
OUT=$($WGET_DUMP $URL)
|
||||
check_from "$OUT" grep -q "Cache-Control: private, max-age=3000"
|
||||
check_from "$OUT" grep -q "Last-Modified:"
|
||||
|
||||
Reference in New Issue
Block a user