Revert "Merge pull request #872 from pagespeed/jefftk-revert-834"

This reverts commit 7bd90f7b3a, reversing
changes made to 58228564dd.
This commit is contained in:
Otto van der Schaaf
2015-01-10 22:07:21 +01:00
parent 7bd90f7b3a
commit a73c096950
12 changed files with 683 additions and 508 deletions
+2
View File
@@ -169,6 +169,7 @@ if [ $ngx_found = yes ]; then
$ps_src/log_message_handler.h \
$ps_src/ngx_base_fetch.h \
$ps_src/ngx_caching_headers.h \
$ps_src/ngx_event_connection.h \
$ps_src/ngx_fetch.h \
$ps_src/ngx_gzip_setter.h \
$ps_src/ngx_list_iterator.h \
@@ -183,6 +184,7 @@ if [ $ngx_found = yes ]; then
$ps_src/log_message_handler.cc \
$ps_src/ngx_base_fetch.cc \
$ps_src/ngx_caching_headers.cc \
$ps_src/ngx_event_connection.cc \
$ps_src/ngx_fetch.cc \
$ps_src/ngx_gzip_setter.cc \
$ps_src/ngx_list_iterator.cc \
+94 -29
View File
@@ -17,6 +17,7 @@
// Author: jefftk@google.com (Jeff Kaufman)
#include "ngx_base_fetch.h"
#include "ngx_event_connection.h"
#include "ngx_list_iterator.h"
#include "ngx_pagespeed.h"
@@ -28,7 +29,13 @@
namespace net_instaweb {
NgxBaseFetch::NgxBaseFetch(ngx_http_request_t* r, int pipe_fd,
const char kHeadersComplete = 'H';
const char kFlush = 'F';
const char kDone = 'D';
NgxEventConnection* NgxBaseFetch::event_connection = NULL;
NgxBaseFetch::NgxBaseFetch(ngx_http_request_t* r,
NgxServerContext* server_context,
const RequestContextPtr& request_ctx,
PreserveCachingHeaders preserve_caching_headers)
@@ -37,10 +44,10 @@ NgxBaseFetch::NgxBaseFetch(ngx_http_request_t* r, int pipe_fd,
server_context_(server_context),
done_called_(false),
last_buf_sent_(false),
pipe_fd_(pipe_fd),
references_(2),
ipro_lookup_(false),
preserve_caching_headers_(preserve_caching_headers) {
preserve_caching_headers_(preserve_caching_headers),
detached_(false) {
if (pthread_mutex_init(&mutex_, NULL)) CHECK(0);
}
@@ -48,6 +55,66 @@ NgxBaseFetch::~NgxBaseFetch() {
pthread_mutex_destroy(&mutex_);
}
bool NgxBaseFetch::Initialize(ngx_cycle_t* cycle) {
CHECK(event_connection == NULL) << "event connection already set";
event_connection = new NgxEventConnection(ReadCallback);
return event_connection->Init(cycle);
}
void NgxBaseFetch::Terminate() {
if (event_connection != NULL) {
delete event_connection;
event_connection = NULL;
}
}
void NgxBaseFetch::ReadCallback(const ps_event_data& data) {
NgxBaseFetch* base_fetch = reinterpret_cast<NgxBaseFetch*>(data.sender);
ngx_http_request_t* r = base_fetch->request();
bool detached = base_fetch->detached();
int refcount = base_fetch->DecrementRefCount();
#if (NGX_DEBUG)
ngx_log_error(NGX_LOG_DEBUG, ngx_cycle->log, 0,
"pagespeed [%p] event: %c. bf:%p - refcnt:%d - det: %c", r,
data.type, base_fetch, refcount, detached ? 'Y': 'N');
#endif
// If we ended up destructing the base fetch, or the request context is
// detached, skip this event.
if (refcount == 0 || detached) {
return;
}
ps_request_ctx_t* ctx = ps_get_request_context(r);
CHECK(data.sender == ctx->base_fetch);
// ngx_base_fetch_handler() ends up setting ctx->fetch_done, which
// means we shouldn't call it anymore.
if (ctx->fetch_done) {
return;
}
CHECK(r->count > 0) << "r->count: " << r->count;
// If we are unlucky enough to have our connection finalized mid-ipro-lookup,
// we must enter a different flow. Also see ps_in_place_check_header_filter().
if (!ctx->base_fetch->ipro_lookup_ && r->connection->error) {
ngx_log_error(NGX_LOG_DEBUG, ngx_cycle->log, 0,
"pagespeed [%p] request already finalized", r);
ngx_http_finalize_request(r, NGX_ERROR);
return;
}
int rc = ps_base_fetch::ps_base_fetch_handler(r);
#if (NGX_DEBUG)
ngx_log_error(NGX_LOG_DEBUG, ngx_cycle->log, 0,
"pagespeed [%p] ps_base_fetch_handler() returned %d for %c",
r, rc, data.type);
#endif
ngx_http_finalize_request(r, rc);
}
void NgxBaseFetch::Lock() {
pthread_mutex_lock(&mutex_);
}
@@ -115,21 +182,15 @@ ngx_int_t NgxBaseFetch::CollectHeaders(ngx_http_headers_out_t* headers_out) {
preserve_caching_headers_);
}
void NgxBaseFetch::RequestCollection() {
int rc;
char c = 'A'; // What byte we write is arbitrary.
while (true) {
rc = write(pipe_fd_, &c, 1);
if (rc == 1) {
break;
} else if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK) {
// TODO(jefftk): is this rare enough that spinning isn't a problem? Could
// we get into a case where the pipe fills up and we spin forever?
} else {
perror("NgxBaseFetch::RequestCollection");
break;
}
void NgxBaseFetch::RequestCollection(char type) {
// We must optimistically increment the refcount, and decrement it
// when we conclude we failed. If we only increment on a successfull write,
// there's a small chance that between writing and adding to the refcount
// both pagespeed and nginx will release their refcount -- destructing
// this NgxBaseFetch instance.
IncrementRefCount();
if (!event_connection->WriteEvent(type, this)) {
DecrementRefCount();
}
}
@@ -147,38 +208,42 @@ void NgxBaseFetch::HandleHeadersComplete() {
// For the IPRO lookup, supress notification of the nginx side here.
// If we send both this event and the one from done, nasty stuff will happen
// if we loose the race with with the nginx side destructing this base fetch
// instance (and thereby clearing the byte and its pending extraneous event.
// instance (and thereby clearing the byte and its pending extraneous event).
if (!ipro_lookup_) {
RequestCollection(); // Headers available.
RequestCollection(kHeadersComplete); // Headers available.
}
}
bool NgxBaseFetch::HandleFlush(MessageHandler* handler) {
RequestCollection(); // A new part of the response body is available.
RequestCollection(kFlush); // A new part of the response body is available
return true;
}
void NgxBaseFetch::Release() {
DecrefAndDeleteIfUnreferenced();
int NgxBaseFetch::DecrementRefCount() {
return DecrefAndDeleteIfUnreferenced();
}
void NgxBaseFetch::DecrefAndDeleteIfUnreferenced() {
int NgxBaseFetch::IncrementRefCount() {
return __sync_add_and_fetch(&references_, 1);
}
int NgxBaseFetch::DecrefAndDeleteIfUnreferenced() {
// Creates a full memory barrier.
if (__sync_add_and_fetch(&references_, -1) == 0) {
int r = __sync_add_and_fetch(&references_, -1);
if (r == 0) {
delete this;
}
return r;
}
void NgxBaseFetch::HandleDone(bool success) {
// TODO(jefftk): it's possible that instead of locking here we can just modify
// CopyBufferToNginx to only read done_called_ once.
CHECK(!done_called_) << "Done already called!";
Lock();
done_called_ = true;
Unlock();
close(pipe_fd_); // Indicates to nginx that we're done with the rewrite.
pipe_fd_ = -1;
RequestCollection(kDone);
DecrefAndDeleteIfUnreferenced();
}
+54 -18
View File
@@ -17,22 +17,32 @@
// Author: jefftk@google.com (Jeff Kaufman)
//
// Collects output from pagespeed and buffers it until nginx asks for it.
// Notifies nginx via pipe to call CollectAccumulatedWrites() on flush.
// Notifies nginx via NgxEventConnection to call ReadCallback() when
// the headers are computed, when a flush should be performed, and when done.
//
// - nginx creates a base fetch and passes it to a new proxy fetch.
// - The proxy fetch manages rewriting and thread complexity, and through
// several chained steps passes rewritten html to HandleWrite().
// - Written data is buffered.
// - When Flush() is called the base fetch writes a byte to a pipe nginx is
// watching so nginx knows to call CollectAccumulatedWrites() to pick up the
// rewritten html.
// - When Done() is called the base fetch closes the pipe, which tells nginx to
// make a final call to CollectAccumulatedWrites().
// - When HandleHeadersComplete(), HandleFlush(), or HandleDone() is called by
// PSOL, events are written to NgxEventConnection which will end up being
// handled by ReadCallback() on nginx's thread.
// When applicable, request processing will be continued via a call to
// ps_base_fetch_handler().
// - ps_base_fetch_handler() will pull the header and body bytes from PSOL
// via CollectAccumulatedWrites() and write those to the module's output.
//
// This class is referred two in two places: the proxy fetch and nginx's
// request. It must stay alive until both are finished. The proxy fetch will
// call Done() to indicate this; nginx will call Release(). Once both Done()
// and Release() have been called this class will delete itself.
// This class is referred to in three places: the proxy fetch, nginx's request,
// and pending events written to the associated NgxEventConnection. It must stay
// alive until the proxy fetch and nginx request are finished, and no more
// events are pending.
// - The proxy fetch will call Done() to indicate this.
// - nginx will call Detach() when the associated request is handled
// completely (e.g. the request context is about to be destroyed).
// - ReadCallback() will call DecrementRefCount() on instances associated to
// events it handles.
//
// When the last reference is dropped, this class will delete itself.
#ifndef NGX_BASE_FETCH_H_
#define NGX_BASE_FETCH_H_
@@ -45,6 +55,7 @@ extern "C" {
#include "ngx_pagespeed.h"
#include "ngx_event_connection.h"
#include "ngx_server_context.h"
#include "net/instaweb/http/public/async_fetch.h"
@@ -53,13 +64,19 @@ extern "C" {
namespace net_instaweb {
class NgxBaseFetch : public AsyncFetch {
public:
NgxBaseFetch(ngx_http_request_t* r, int pipe_fd,
NgxServerContext* server_context,
NgxBaseFetch(ngx_http_request_t* r, NgxServerContext* server_context,
const RequestContextPtr& request_ctx,
PreserveCachingHeaders preserve_caching_headers);
virtual ~NgxBaseFetch();
// Statically initializes event_connection, require for PSOL and nginx to
// communicate.
static bool Initialize(ngx_cycle_t* cycle);
// Statically terminates and NULLS event_connection.
static void Terminate();
static void ReadCallback(const ps_event_data& data);
// Puts a chain in link_ptr if we have any output data buffered. Returns
// NGX_OK on success, NGX_ERROR on errors. If there's no data to send, sends
@@ -77,10 +94,24 @@ class NgxBaseFetch : public AsyncFetch {
// time for resource fetches. Not called at all for proxy fetches.
ngx_int_t CollectHeaders(ngx_http_headers_out_t* headers_out);
// Called by nginx when it's done with us.
void Release();
// Called by nginx to decrement the refcount.
int DecrementRefCount();
// Called by pagespeed to increment the refcount.
int IncrementRefCount();
void set_ipro_lookup(bool x) { ipro_lookup_ = x; }
// Detach() is called when the nginx side releases this base fetch. It
// sets detached_ to true and decrements the refcount. We need to know
// this to be able to handle events which nginx request context has been
// released while the event was in-flight.
void Detach() { detached_ = true; DecrementRefCount(); }
bool detached() { return detached_; }
ngx_http_request_t* request() { return request_; }
private:
virtual bool HandleWrite(const StringPiece& sp, MessageHandler* handler);
virtual bool HandleFlush(MessageHandler* handler);
@@ -89,7 +120,7 @@ class NgxBaseFetch : public AsyncFetch {
// Indicate to nginx that we would like it to call
// CollectAccumulatedWrites().
void RequestCollection();
void RequestCollection(char type);
// Lock must be acquired first.
// Returns:
@@ -105,20 +136,25 @@ class NgxBaseFetch : public AsyncFetch {
// Called by Done() and Release(). Decrements our reference count, and if
// it's zero we delete ourself.
void DecrefAndDeleteIfUnreferenced();
int DecrefAndDeleteIfUnreferenced();
static NgxEventConnection* event_connection;
ngx_http_request_t* request_;
GoogleString buffer_;
NgxServerContext* server_context_;
bool done_called_;
bool last_buf_sent_;
int pipe_fd_;
// How many active references there are to this fetch. Starts at two,
// decremented once when Done() is called and once when Release() is called.
// decremented once when Done() is called and once when Detach() is called.
// Incremented for each event written by pagespeed for this NgxBaseFetch, and
// decremented on the nginx side for each event read for it.
int references_;
pthread_mutex_t mutex_;
bool ipro_lookup_;
PreserveCachingHeaders preserve_caching_headers_;
// Set to true just before the nginx side releases its reference
bool detached_;
DISALLOW_COPY_AND_ASSIGN(NgxBaseFetch);
};
+168
View File
@@ -0,0 +1,168 @@
/*
* Copyright 2014 Google Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
// Author: oschaaf@we-amp.com (Otto van der Schaaf)
extern "C" {
#include <ngx_channel.h>
}
#include "ngx_event_connection.h"
#include "pagespeed/kernel/base/google_message_handler.h"
#include "pagespeed/kernel/base/message_handler.h"
namespace net_instaweb {
NgxEventConnection::NgxEventConnection(callbackPtr callback)
: event_handler_(callback) {
}
bool NgxEventConnection::Init(ngx_cycle_t* cycle) {
int file_descriptors[2];
if (pipe(file_descriptors) != 0) {
ngx_log_error(NGX_LOG_EMERG, cycle->log, 0, "pagespeed: pipe() failed");
return false;
}
if (ngx_nonblocking(file_descriptors[0]) == -1) {
ngx_log_error(NGX_LOG_EMERG, cycle->log, ngx_socket_errno,
ngx_nonblocking_n "pagespeed: pipe[0] failed");
} else if (ngx_nonblocking(file_descriptors[1]) == -1) {
ngx_log_error(NGX_LOG_EMERG, cycle->log, ngx_socket_errno,
ngx_nonblocking_n "pagespeed: pipe[1] failed");
} else if (!CreateNgxConnection(cycle, file_descriptors[0])) {
ngx_log_error(NGX_LOG_EMERG, cycle->log, 0,
"pagespeed: failed to create connection.");
} else {
pipe_read_fd_ = file_descriptors[0];
pipe_write_fd_ = file_descriptors[1];
return true;
}
close(file_descriptors[0]);
close(file_descriptors[1]);
return false;
}
bool NgxEventConnection::CreateNgxConnection(ngx_cycle_t* cycle,
ngx_fd_t pipe_fd) {
// pipe_fd (the read side of the pipe will end up as c->fd on the
// underlying ngx_connection_t that gets created here)
ngx_int_t rc = ngx_add_channel_event(cycle, pipe_fd, NGX_READ_EVENT,
&NgxEventConnection::ReadEventHandler);
return rc == NGX_OK;
}
void NgxEventConnection::ReadEventHandler(ngx_event_t* ev) {
ngx_connection_t* c = static_cast<ngx_connection_t*>(ev->data);
ngx_int_t result = ngx_handle_read_event(ev, 0);
if (result != NGX_OK) {
CHECK(false) << "pagespeed: ngx_handle_read_event error: " << result;
}
if (ev->timedout) {
ev->timedout = 0;
return;
}
if (!NgxEventConnection::ReadAndNotify(c->fd)) {
// This was copied from ngx_channel_handler(): for epoll, we need to call
// ngx_del_conn(). Sadly, no documentation as to why.
if (ngx_event_flags & NGX_USE_EPOLL_EVENT) {
ngx_del_conn(c, 0);
}
ngx_close_connection(c);
ngx_del_event(ev, NGX_READ_EVENT, 0);
}
}
// Deserialize ps_event_data's from the pipe as they become available.
// Subsequently do some bookkeeping, cleanup, and error checking to keep
// the mess out of ps_base_fetch_handler.
bool NgxEventConnection::ReadAndNotify(ngx_fd_t fd) {
while (true) {
// We read only one ps_event_data at a time for now:
// We can end up recursing all the way and end up calling ourselves here.
// If that happens in the middle of looping over multiple ps_event_data's we
// have obtained with read(), the results from the next read() will make us
// process events out of order. Which can give headaches.
// Alternatively, we could maintain a queue to make sure we process in
// sequence
ps_event_data data;
ngx_int_t size = read(fd, static_cast<void*>(&data), sizeof(data));
if (size == -1) {
if (errno == EINTR) {
continue;
// TODO(oschaaf): should we worry about spinning here?
} else if (ngx_errno == EAGAIN || ngx_errno == EWOULDBLOCK) {
return true;
}
}
if (size <= 0) {
return false;
}
data.connection->event_handler_(data);
return true;
}
}
bool NgxEventConnection::WriteEvent(void* sender) {
return WriteEvent('X' /* Anything char is fine */, sender);
}
bool NgxEventConnection::WriteEvent(char type, void* sender) {
ssize_t size = 0;
ps_event_data data;
ngx_memzero(&data, sizeof(data));
data.type = type;
data.sender = sender;
data.connection = this;
while (true) {
size = write(pipe_write_fd_,
static_cast<void*>(&data), sizeof(data));
if (size == sizeof(data)) {
return true;
} else if (size == -1) {
// TODO(oschaaf): should we worry about spinning here?
if (ngx_errno == EINTR || ngx_errno == EAGAIN
|| ngx_errno == EWOULDBLOCK) {
continue;
} else {
return false;
}
} else {
CHECK(false) << "pagespeed: unexpected return value from write(): "
<< size;
}
}
CHECK(false) << "Should not get here";
return false;
}
void NgxEventConnection::Shutdown() {
close(pipe_write_fd_);
// Drain the pipe, process final events, and shut down.
while (NgxEventConnection::ReadAndNotify(pipe_read_fd_));
}
} // namespace net_instaweb
+82
View File
@@ -0,0 +1,82 @@
/*
* Copyright 2014 Google Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
// Author: oschaaf@we-amp.com (Otto van der Schaaf)
//
// NgxEventConnection implements a means to send events from other threads to
// nginx's event loop, and is implemented by a named pipe under the hood.
// A single instance is used by NgxBaseFetch, and one instance is created per
// NgxUrlAsyncFetcher when native fetching is on.
#ifndef NGX_EVENT_CONNECTION_H_
#define NGX_EVENT_CONNECTION_H_
extern "C" {
#include <ngx_http.h>
}
#include <pthread.h>
#include "pagespeed/kernel/base/string.h"
#include "pagespeed/kernel/http/headers.h"
namespace net_instaweb {
class NgxEventConnection;
// Represents a single event that can be written to or read from the pipe.
// Technically, sender is the only data we need to send. type and connection are
// included to provide a means to trace the events along with some more
// info.
typedef struct {
char type;
void* sender;
NgxEventConnection* connection;
} ps_event_data;
// Handler signature for receiving events
typedef void (*callbackPtr)(const ps_event_data&);
// Abstracts a connection to nginx through which events can be written.
class NgxEventConnection {
public:
explicit NgxEventConnection(callbackPtr handler);
// Creates the file descriptors and ngx_connection_t required for event
// messaging between pagespeed and nginx.
bool Init(ngx_cycle_t* cycle);
// Shuts down the underlying file descriptors and connection created in Init()
void Shutdown();
// Constructs a ps_event_data and writes it to the underlying named pipe.
bool WriteEvent(char type, void* sender);
// Convenience overload for clients that have a single event type.
bool WriteEvent(void* sender);
private:
static bool CreateNgxConnection(ngx_cycle_t* cycle, ngx_fd_t pipe_fd);
static void ReadEventHandler(ngx_event_t* e);
static bool ReadAndNotify(ngx_fd_t fd);
callbackPtr event_handler_;
// We own these file descriptors
ngx_fd_t pipe_write_fd_;
ngx_fd_t pipe_read_fd_;
DISALLOW_COPY_AND_ASSIGN(NgxEventConnection);
};
} // namespace net_instaweb
#endif // NGX_EVENT_CONNECTION_H_
+12 -45
View File
@@ -64,53 +64,9 @@ extern "C" {
#include "pagespeed/kernel/http/request_headers.h"
#include "pagespeed/kernel/http/response_headers.h"
#include "pagespeed/kernel/http/response_headers_parser.h"
#include "pagespeed/kernel/thread/pthread_mutex.h"
namespace net_instaweb {
class NgxConnection : public PoolElement<NgxConnection> {
public:
NgxConnection(MessageHandler* handler, int max_keepalive_requests);
~NgxConnection();
void SetSock(u_char *sockaddr, socklen_t socklen) {
socklen_ = socklen;
ngx_memcpy(&sockaddr_, sockaddr, socklen);
}
// Close ensures that NgxConnection deletes itself at the appropriate time,
// which can be after receiving a non-keepalive response, or when the remote
// server closes the connection when the NgxConnection is pooled and idle.
void Close();
// Once keepalive is disabled, it can't be toggled back on.
void set_keepalive(bool k) { keepalive_ = keepalive_ && k; }
bool keepalive() { return keepalive_; }
typedef Pool<NgxConnection> NgxConnectionPool;
static NgxConnection* Connect(ngx_peer_connection_t* pc,
MessageHandler* handler,
int max_keepalive_requests);
static void IdleWriteHandler(ngx_event_t* ev);
static void IdleReadHandler(ngx_event_t* ev);
static NgxConnectionPool connection_pool;
static PthreadMutex connection_pool_mutex;
// c_ is owned by NgxConnection and freed in ::Close()
ngx_connection_t* c_;
static const int64 keepalive_timeout_ms;
static const GoogleString ka_header;
private:
int max_keepalive_requests_;
bool keepalive_;
socklen_t socklen_;
u_char sockaddr_[NGX_SOCKADDRLEN];
MessageHandler* handler_;
DISALLOW_COPY_AND_ASSIGN(NgxConnection);
};
NgxConnection::NgxConnectionPool NgxConnection::connection_pool;
PthreadMutex NgxConnection::connection_pool_mutex;
// Default keepalive 60s.
@@ -131,7 +87,18 @@ NgxConnection::NgxConnection(MessageHandler* handler,
}
NgxConnection::~NgxConnection() {
CHECK(c_ == NULL) << "NgxFetch: Underlying connection should be NULL";
CHECK(c_ == NULL) << "NgxConnection: Underlying connection should be NULL";
}
void NgxConnection::Terminate() {
for (NgxConnectionPool::iterator p = connection_pool.begin();
p != connection_pool.end(); ++p) {
NgxConnection* nc = *p;
ngx_close_connection(nc->c_);
nc->c_ = NULL;
delete nc;
}
connection_pool.Clear();
}
NgxConnection* NgxConnection::Connect(ngx_peer_connection_t* pc,
+47
View File
@@ -45,6 +45,8 @@ extern "C" {
#include "pagespeed/kernel/base/string.h"
#include "pagespeed/kernel/http/response_headers.h"
#include "pagespeed/kernel/http/response_headers_parser.h"
#include "pagespeed/kernel/thread/pthread_mutex.h"
namespace net_instaweb {
@@ -53,6 +55,51 @@ typedef bool (*response_handler_pt)(ngx_connection_t* c);
class NgxUrlAsyncFetcher;
class NgxConnection;
class NgxConnection : public PoolElement<NgxConnection> {
public:
NgxConnection(MessageHandler* handler, int max_keepalive_requests);
~NgxConnection();
void SetSock(u_char *sockaddr, socklen_t socklen) {
socklen_ = socklen;
ngx_memcpy(&sockaddr_, sockaddr, socklen);
}
// Close ensures that NgxConnection deletes itself at the appropriate time,
// which can be after receiving a non-keepalive response, or when the remote
// server closes the connection when the NgxConnection is pooled and idle.
void Close();
// Once keepalive is disabled, it can't be toggled back on.
void set_keepalive(bool k) { keepalive_ = keepalive_ && k; }
bool keepalive() { return keepalive_; }
typedef Pool<NgxConnection> NgxConnectionPool;
static NgxConnection* Connect(ngx_peer_connection_t* pc,
MessageHandler* handler,
int max_keepalive_requests);
static void IdleWriteHandler(ngx_event_t* ev);
static void IdleReadHandler(ngx_event_t* ev);
// Terminate will cleanup any idle connections upon shutdown.
static void Terminate();
static NgxConnectionPool connection_pool;
static PthreadMutex connection_pool_mutex;
// c_ is owned by NgxConnection and freed in ::Close()
ngx_connection_t* c_;
static const int64 keepalive_timeout_ms;
static const GoogleString ka_header;
private:
int max_keepalive_requests_;
bool keepalive_;
socklen_t socklen_;
u_char sockaddr_[NGX_SOCKADDRLEN];
MessageHandler* handler_;
DISALLOW_COPY_AND_ASSIGN(NgxConnection);
};
class NgxFetch : public PoolElement<NgxFetch> {
public:
NgxFetch(const GoogleString& url,
+157 -285
View File
@@ -203,6 +203,145 @@ ngx_int_t string_piece_to_buffer_chain(
return NGX_OK;
}
// Get the context for this request. ps_connection_read_handler should already
// have been called to create it.
ps_request_ctx_t* ps_get_request_context(ngx_http_request_t* r) {
return static_cast<ps_request_ctx_t*>(
ngx_http_get_module_ctx(r, ngx_pagespeed));
}
// Tell nginx whether we have network activity we're waiting for so that it sets
// a write handler. See src/http/ngx_http_request.c:2083.
void ps_set_buffered(ngx_http_request_t* r, bool on) {
if (on) {
r->buffered |= NGX_HTTP_PAGESPEED_BUFFERED;
} else {
r->buffered &= ~NGX_HTTP_PAGESPEED_BUFFERED;
}
}
namespace ps_base_fetch {
ngx_http_output_header_filter_pt ngx_http_next_header_filter;
ngx_http_output_body_filter_pt ngx_http_next_body_filter;
ngx_int_t ps_base_fetch_filter(ngx_http_request_t* r, ngx_chain_t* in) {
ps_request_ctx_t* ctx = ps_get_request_context(r);
if (ctx == NULL || ctx->base_fetch == NULL) {
return ngx_http_next_body_filter(r, in);
}
ngx_log_debug1(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
"http pagespeed write filter \"%V\"", &r->uri);
// send response body
if (in || ctx->write_pending) {
ngx_int_t rc = ngx_http_next_body_filter(r, in);
ctx->write_pending = (rc == NGX_AGAIN);
if (rc == NGX_OK && !ctx->fetch_done) {
return NGX_AGAIN;
}
return rc;
}
return ctx->fetch_done ? NGX_OK : NGX_AGAIN;
}
// This runs on the nginx event loop in response to seeing the byte PageSpeed
// sent over the pipe to trigger the nginx-side code. Copy whatever is ready
// from PageSpeed out to the browser (headers and/or body).
ngx_int_t ps_base_fetch_handler(ngx_http_request_t* r) {
ps_request_ctx_t* ctx = ps_get_request_context(r);
ngx_int_t rc;
ngx_chain_t* cl = NULL;
ngx_log_debug1(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
"ps fetch handler: %V", &r->uri);
if (!r->header_sent) {
if (ctx->preserve_caching_headers != kDontPreserveHeaders) {
ngx_table_elt_t* header;
NgxListIterator it(&(r->headers_out.headers.part));
while ((header = it.Next()) != NULL) {
// We need to remember a few headers when ModifyCachingHeaders is off,
// so we can send them unmodified in copy_response_headers_to_ngx().
// This just sets the hash to 0 for all other headers. That way, we
// avoid some relatively complicated code to reconstruct these headers.
if (!(STR_CASE_EQ_LITERAL(header->key, "Cache-Control") ||
(ctx->preserve_caching_headers == kPreserveAllCachingHeaders &&
(STR_CASE_EQ_LITERAL(header->key, "Etag") ||
STR_CASE_EQ_LITERAL(header->key, "Date") ||
STR_CASE_EQ_LITERAL(header->key, "Last-Modified") ||
STR_CASE_EQ_LITERAL(header->key, "Expires"))))) {
header->hash = 0;
}
}
} else {
ngx_http_clean_header(r);
}
// collect response headers from pagespeed
rc = ctx->base_fetch->CollectHeaders(&r->headers_out);
if (rc == NGX_ERROR) {
return NGX_HTTP_INTERNAL_SERVER_ERROR;
}
// send response headers
rc = ngx_http_next_header_filter(r);
// standard nginx send header check see ngx_http_send_response
if (rc == NGX_ERROR || rc > NGX_OK) {
return ngx_http_filter_finalize_request(r, NULL, rc);
}
// for in_place_check_header_filter
if (rc < NGX_OK && rc != NGX_AGAIN) {
CHECK(rc == NGX_DONE);
return NGX_DONE;
}
ctx->write_pending = (rc == NGX_AGAIN);
ps_set_buffered(r, true);
}
// collect response body from pagespeed
// Pass the optimized content along to later body filters.
// From Weibin: This function should be called mutiple times. Store the
// whole file in one chain buffers is too aggressive. It could consume
// too much memory in busy servers.
rc = ctx->base_fetch->CollectAccumulatedWrites(&cl);
ngx_log_error(NGX_LOG_DEBUG, ctx->r->connection->log, 0,
"CollectAccumulatedWrites, %d", rc);
if (rc == NGX_ERROR) {
ps_set_buffered(r, false);
return NGX_HTTP_INTERNAL_SERVER_ERROR;
}
if (rc == NGX_AGAIN && cl == NULL) {
// there is no body buffer to send now.
return NGX_AGAIN;
}
if (rc == NGX_OK) {
ps_set_buffered(r, false);
ctx->fetch_done = true;
}
return ps_base_fetch_filter(r, cl);
}
void ps_base_fetch_filter_init() {
ngx_http_next_header_filter = ngx_http_top_header_filter;
ngx_http_next_body_filter = ngx_http_top_body_filter;
ngx_http_top_body_filter = ps_base_fetch_filter;
}
} // namespace ps_base_fetch
namespace {
// Setting headers in nginx is tricky because it's not just a matter of adding
@@ -731,6 +870,7 @@ void ps_cleanup_srv_conf(void* data) {
if (!factory_deleted && cfg_s->server_context != NULL) {
delete cfg_s->server_context->factory();
factory_deleted = true;
NgxBaseFetch::Terminate();
}
if (cfg_s->proxy_fetch_factory != NULL) {
delete cfg_s->proxy_fetch_factory;
@@ -967,16 +1107,6 @@ char* ps_merge_loc_conf(ngx_conf_t* cf, void* parent, void* child) {
// _ef_ is a shorthand for ETag Filter
ngx_http_output_header_filter_pt ngx_http_ef_next_header_filter;
// Tell nginx whether we have network activity we're waiting for so that it sets
// a write handler. See src/http/ngx_http_request.c:2083.
void ps_set_buffered(ngx_http_request_t* r, bool on) {
if (on) {
r->buffered |= NGX_HTTP_PAGESPEED_BUFFERED;
} else {
r->buffered &= ~NGX_HTTP_PAGESPEED_BUFFERED;
}
}
bool ps_is_https(ngx_http_request_t* r) {
// Based on ngx_http_variable_scheme.
#if (NGX_HTTP_SSL)
@@ -1083,13 +1213,6 @@ GoogleString ps_determine_url(ngx_http_request_t* r) {
host, port_string, str_to_string_piece(r->unparsed_uri));
}
// Get the context for this request. ps_connection_read_handler should already
// have been called to create it.
ps_request_ctx_t* ps_get_request_context(ngx_http_request_t* r) {
return static_cast<ps_request_ctx_t*>(
ngx_http_get_module_ctx(r, ngx_pagespeed));
}
void ps_release_base_fetch(ps_request_ctx_t* ctx);
// we are still at pagespeed phase
@@ -1122,230 +1245,6 @@ ngx_int_t ps_async_wait_response(ngx_http_request_t* r) {
return NGX_DONE;
}
namespace {
ngx_http_output_header_filter_pt ngx_http_next_header_filter;
ngx_http_output_body_filter_pt ngx_http_next_body_filter;
ngx_int_t ps_base_fetch_filter(ngx_http_request_t* r, ngx_chain_t* in) {
ps_request_ctx_t* ctx = ps_get_request_context(r);
if (ctx == NULL || ctx->base_fetch == NULL) {
return ngx_http_next_body_filter(r, in);
}
ngx_log_debug1(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
"http pagespeed write filter \"%V\"", &r->uri);
// send response body
if (in || ctx->write_pending) {
ngx_int_t rc = ngx_http_next_body_filter(r, in);
ctx->write_pending = (rc == NGX_AGAIN);
if (rc == NGX_OK && !ctx->fetch_done) {
return NGX_AGAIN;
}
return rc;
}
return ctx->fetch_done ? NGX_OK : NGX_AGAIN;
}
// This runs on the nginx event loop in response to seeing the byte PageSpeed
// sent over the pipe to trigger the nginx-side code. Copy whatever is ready
// from PageSpeed out to the browser (headers and/or body).
ngx_int_t ps_base_fetch_handler(ngx_http_request_t* r) {
ps_request_ctx_t* ctx = ps_get_request_context(r);
ngx_int_t rc;
ngx_chain_t* cl = NULL;
ngx_log_debug1(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
"ps fetch handler: %V", &r->uri);
if (!r->header_sent) {
if (ctx->preserve_caching_headers != kDontPreserveHeaders) {
ngx_table_elt_t* header;
NgxListIterator it(&(r->headers_out.headers.part));
while ((header = it.Next()) != NULL) {
// We need to remember a few headers when ModifyCachingHeaders is off,
// so we can send them unmodified in copy_response_headers_to_ngx().
// This just sets the hash to 0 for all other headers. That way, we
// avoid some relatively complicated code to reconstruct these headers.
if (!(STR_CASE_EQ_LITERAL(header->key, "Cache-Control") ||
(ctx->preserve_caching_headers == kPreserveAllCachingHeaders &&
(STR_CASE_EQ_LITERAL(header->key, "Etag") ||
STR_CASE_EQ_LITERAL(header->key, "Date") ||
STR_CASE_EQ_LITERAL(header->key, "Last-Modified") ||
STR_CASE_EQ_LITERAL(header->key, "Expires"))))) {
header->hash = 0;
}
}
} else {
ngx_http_clean_header(r);
}
// collect response headers from pagespeed
rc = ctx->base_fetch->CollectHeaders(&r->headers_out);
if (rc == NGX_ERROR) {
return NGX_HTTP_INTERNAL_SERVER_ERROR;
}
// send response headers
rc = ngx_http_next_header_filter(r);
// standard nginx send header check see ngx_http_send_response
if (rc == NGX_ERROR || rc > NGX_OK) {
return ngx_http_filter_finalize_request(r, NULL, rc);
}
// for in_place_check_header_filter
if (rc < NGX_OK && rc != NGX_AGAIN) {
CHECK(rc == NGX_DONE);
return rc;
}
ctx->write_pending = (rc == NGX_AGAIN);
ps_set_buffered(r, true);
}
// collect response body from pagespeed
// Pass the optimized content along to later body filters.
// From Weibin: This function should be called mutiple times. Store the
// whole file in one chain buffers is too aggressive. It could consume
// too much memory in busy servers.
rc = ctx->base_fetch->CollectAccumulatedWrites(&cl);
ngx_log_error(NGX_LOG_DEBUG, ctx->r->connection->log, 0,
"CollectAccumulatedWrites, %d", rc);
if (rc == NGX_ERROR) {
ps_set_buffered(r, false);
return NGX_HTTP_INTERNAL_SERVER_ERROR;
}
if (rc == NGX_AGAIN && cl == NULL) {
// there is no body buffer to send now.
return NGX_AGAIN;
}
if (rc == NGX_OK) {
ps_set_buffered(r, false);
ctx->fetch_done = true;
}
return ps_base_fetch_filter(r, cl);
}
void ps_base_fetch_filter_init() {
ngx_http_next_header_filter = ngx_http_top_header_filter;
ngx_http_next_body_filter = ngx_http_top_body_filter;
ngx_http_top_body_filter = ps_base_fetch_filter;
}
} // namespace
// Do some bookkeeping, cleanup, and error checking to keep the mess out of
// ps_base_fetch_handler.
void ps_connection_read_handler(ngx_event_t* ev) {
CHECK(ev != NULL);
ngx_connection_t* c = static_cast<ngx_connection_t*>(ev->data);
CHECK(c != NULL);
int rc;
// Request has been finalized, do nothing just clear the pipe.
if (c->error) {
do {
char chr[256];
rc = read(c->fd, chr, 256);
} while (rc > 0 || (rc == -1 && errno == EINTR)); // Retry on EINTR.
if (rc == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
return;
}
// Write peer close or error occur.
ngx_close_connection(c);
return;
}
ps_request_ctx_t* ctx = static_cast<ps_request_ctx_t*>(c->data);
CHECK(ctx != NULL);
ngx_http_request_t* r = ctx->r;
CHECK(r != NULL);
// Clear the pipe.
do {
char chr[256];
rc = read(c->fd, chr, 256);
} while (rc > 0 || (rc == -1 && errno == EINTR)); // Retry on EINTR.
if (r->connection->error) {
ngx_log_error(NGX_LOG_DEBUG, ngx_cycle->log, 0,
"pagespeed [%p] request already finalized", r);
ctx->pagespeed_connection = NULL;
ngx_close_connection(c);
ngx_http_finalize_request(r, NGX_ERROR);
return;
}
if (rc == -1 && errno != EAGAIN && errno != EWOULDBLOCK) {
ctx->pagespeed_connection = NULL;
ngx_close_connection(c);
return ngx_http_finalize_request(r, NGX_HTTP_INTERNAL_SERVER_ERROR);
}
// AGAIN or rc == 0.
if (rc == 0) {
// Close the pipe here to avoid SIGPIPE
// Done will be check in RequestCollection.
ctx->pagespeed_connection = NULL;
ngx_close_connection(c);
}
if (ctx->fetch_done) {
return;
}
ngx_http_finalize_request(r, ps_base_fetch_handler(r));
}
ngx_int_t ps_create_connection(
ps_request_ctx_t* ctx, NgxServerContext* server_context, int pipe_fd) {
// We have to use the server_context's log (which is the server context's
// ngx_http_core_loc_conf_t->error_log) and not the request's log because
// this connection can outlast the request by a little while.
ngx_log_t* server_context_log = server_context->ngx_message_handler()->log();
if (server_context_log == NULL) {
ngx_log_debug0(NGX_LOG_INFO, ctx->r->connection->log, 0,
"ps_create_connection failed to get server context log");
return NGX_ERROR;
}
ngx_connection_t* c = ngx_get_connection(pipe_fd, server_context_log);
if (c == NULL) {
return NGX_ERROR;
}
c->recv = ngx_recv;
c->send = ngx_send;
c->recv_chain = ngx_recv_chain;
c->send_chain = ngx_send_chain;
c->log_error = ctx->r->connection->log_error;
c->read->log = c->log;
c->write->log = c->log;
ctx->pagespeed_connection = c;
// Tell nginx to monitor this pipe and call us back when there's data.
c->data = ctx;
c->read->handler = ps_connection_read_handler;
ngx_add_event(c->read, NGX_READ_EVENT, 0);
return NGX_OK;
}
// Populate cfg_* with configuration information for this request.
// Thin wrappers around ngx_http_get_module_*_conf and cast.
ps_srv_conf_t* ps_get_srv_config(ngx_http_request_t* r) {
@@ -1590,7 +1489,6 @@ bool is_pagespeed_subrequest(ngx_http_request_t* r) {
return (user_agent.find(kModPagespeedSubrequestUserAgent) != user_agent.npos);
}
// TODO(chaizhenhua): merge into NgxBaseFetch::Release()
void ps_release_base_fetch(ps_request_ctx_t* ctx) {
// In the normal flow BaseFetch doesn't delete itself in HandleDone() because
// we still need to receive notification via pipe and call
@@ -1598,15 +1496,9 @@ void ps_release_base_fetch(ps_request_ctx_t* ctx) {
// then HandleDone() hasn't been called yet and we need the base fetch to wait
// for that and then delete itself.
if (ctx->base_fetch != NULL) {
ctx->base_fetch->Release();
ctx->base_fetch->Detach();
ctx->base_fetch = NULL;
}
if (ctx->pagespeed_connection != NULL) {
// Tell pagespeed connection ctx has been released.
ctx->pagespeed_connection->error = 1;
ctx->pagespeed_connection = NULL;
}
}
// TODO(chaizhenhua): merge into NgxBaseFetch ctor
@@ -1614,45 +1506,13 @@ ngx_int_t ps_create_base_fetch(ps_request_ctx_t* ctx,
RequestContextPtr request_context) {
ngx_http_request_t* r = ctx->r;
ps_srv_conf_t* cfg_s = ps_get_srv_config(r);
int file_descriptors[2];
int rc = pipe(file_descriptors);
if (rc != 0) {
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "pipe() failed");
return NGX_ERROR;
}
if (ngx_nonblocking(file_descriptors[0]) == -1) {
ngx_log_error(NGX_LOG_EMERG, r->connection->log, ngx_socket_errno,
ngx_nonblocking_n " pipe[0] failed");
close(file_descriptors[0]);
close(file_descriptors[1]);
return NGX_ERROR;
}
if (ngx_nonblocking(file_descriptors[1]) == -1) {
ngx_log_error(NGX_LOG_EMERG, r->connection->log, ngx_socket_errno,
ngx_nonblocking_n " pipe[1] failed");
close(file_descriptors[0]);
close(file_descriptors[1]);
return NGX_ERROR;
}
rc = ps_create_connection(ctx, cfg_s->server_context, file_descriptors[0]);
if (rc != NGX_OK) {
close(file_descriptors[0]);
close(file_descriptors[1]);
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
"ps_route_request: no pagespeed connection.");
return NGX_ERROR;
}
// Handles its own deletion. We need to call Release() when we're done with
// Handles its own deletion. We need to call Release when we're done with
// it, and call Done() on the associated parent (Proxy or Resource) fetch. If
// we fail before creating the associated fetch then we need to call Done() on
// the BaseFetch ourselves.
ctx->base_fetch = new NgxBaseFetch(
r, file_descriptors[1], cfg_s->server_context,
r, cfg_s->server_context,
request_context, ctx->preserve_caching_headers);
return NGX_OK;
@@ -1858,7 +1718,6 @@ ngx_int_t ps_resource_handler(ngx_http_request_t* r,
ctx->write_pending = false;
ctx->html_rewrite = false;
ctx->in_place = false;
ctx->pagespeed_connection = NULL;
ctx->preserve_caching_headers = kDontPreserveHeaders;
// See build_context_for_request() in mod_instaweb.cc
@@ -2374,6 +2233,16 @@ ngx_int_t ps_in_place_check_header_filter(ngx_http_request_t* r) {
return ngx_http_next_header_filter(r);
}
// If our request was finalized during the IPRO lookup, we decline.
if (r->connection->error) {
ctx->driver->Cleanup();
ctx->driver = NULL;
// enable html_rewrite
ctx->html_rewrite = true;
ctx->in_place = false;
return ps_decline_request(r);
}
ngx_log_debug1(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
"ps in place check header filter initial: %V", &r->uri);
@@ -2985,7 +2854,7 @@ ngx_int_t ps_init(ngx_conf_t* cf) {
ps_in_place_filter_init();
ps_html_rewrite_fix_headers_filter_init();
ps_base_fetch_filter_init();
ps_base_fetch::ps_base_fetch_filter_init();
ps_html_rewrite_filter_init();
ngx_http_core_main_conf_t* cmcf = static_cast<ngx_http_core_main_conf_t*>(
@@ -3104,6 +2973,9 @@ ngx_int_t ps_init_child_process(ngx_cycle_t* cycle) {
}
SystemRewriteDriverFactory::InitApr();
if (!NgxBaseFetch::Initialize(cycle)) {
return NGX_ERROR;
}
// ChildInit() will initialise all ServerContexts, which we need to
// create ProxyFetchFactories below
+7 -1
View File
@@ -84,7 +84,6 @@ enum PreserveCachingHeaders {
typedef struct {
NgxBaseFetch* base_fetch;
ngx_connection_t* pagespeed_connection;
ngx_http_request_t* r;
bool html_rewrite;
@@ -109,6 +108,7 @@ typedef struct {
GoogleString url_string;
} ps_request_ctx_t;
ps_request_ctx_t* ps_get_request_context(ngx_http_request_t* r);
void copy_request_headers_from_ngx(const ngx_http_request_t* r,
RequestHeaders* headers);
@@ -123,6 +123,12 @@ ngx_int_t copy_response_headers_to_ngx(
StringPiece ps_determine_host(ngx_http_request_t* r);
namespace ps_base_fetch {
ngx_int_t ps_base_fetch_handler(ngx_http_request_t* r);
} // namespace ps_base_fetch
} // namespace net_instaweb
#endif // NGX_PAGESPEED_H_
+3 -1
View File
@@ -154,7 +154,9 @@ RewriteOptions* NgxRewriteDriverFactory::NewRewriteOptions() {
bool NgxRewriteDriverFactory::InitNgxUrlAsyncFetchers() {
log_ = ngx_cycle->log;
for (size_t i = 0; i < ngx_url_async_fetchers_.size(); ++i) {
if (!ngx_url_async_fetchers_[i]->Init()) {
// TODO(oschaaf): Can we pass the log from the server{} block here?
if (!ngx_url_async_fetchers_[i]->Init(
const_cast<ngx_cycle_t*>(ngx_cycle))) {
return false;
}
}
+37 -111
View File
@@ -65,7 +65,8 @@ namespace net_instaweb {
thread_system_(thread_system),
message_handler_(handler),
mutex_(NULL),
max_keepalive_requests_(max_keepalive_requests) {
max_keepalive_requests_(max_keepalive_requests),
event_connection_(NULL) {
resolver_timeout_ = resolver_timeout;
fetch_timeout_ = fetch_timeout;
ngx_memzero(&proxy_, sizeof(proxy_));
@@ -76,12 +77,11 @@ namespace net_instaweb {
mutex_ = thread_system_->NewMutex();
log_ = log;
pool_ = NULL;
command_connection_ = NULL;
pipe_fd_ = -1;
resolver_ = resolver;
}
NgxUrlAsyncFetcher::~NgxUrlAsyncFetcher() {
DCHECK(shutdown_) << "Shut down before destructing NgxUrlAsyncFetcher.";
message_handler_->Message(
kInfo,
"Destruct NgxUrlAsyncFetcher with [%d] active fetchers",
@@ -89,19 +89,12 @@ namespace net_instaweb {
CancelActiveFetches();
active_fetches_.DeleteAll();
NgxConnection::Terminate();
if (pool_ != NULL) {
ngx_destroy_pool(pool_);
pool_ = NULL;
}
if (command_connection_ != NULL) {
ngx_close_connection(command_connection_);
command_connection_ = NULL;
}
if (pipe_fd_ != -1) {
close(pipe_fd_);
pipe_fd_ = -1;
}
if (mutex_ != NULL) {
delete mutex_;
mutex_ = NULL;
@@ -152,9 +145,13 @@ namespace net_instaweb {
// Create the pool for fetcher, create the pipe, add the read event for main
// thread. It should be called in the worker process.
bool NgxUrlAsyncFetcher::Init() {
log_ = ngx_cycle->log;
bool NgxUrlAsyncFetcher::Init(ngx_cycle_t* cycle) {
log_ = cycle->log;
CHECK(event_connection_ == NULL) << "event connection already set";
event_connection_ = new NgxEventConnection(ReadCallback);
if (!event_connection_->Init(cycle)) {
return false;
}
if (pool_ == NULL) {
pool_ = ngx_create_pool(4096, log_);
if (pool_ == NULL) {
@@ -164,41 +161,6 @@ namespace net_instaweb {
}
}
int pipe_fds[2];
int rc = pipe(pipe_fds);
if (rc != 0) {
ngx_log_error(NGX_LOG_ERR, log_, 0, "pipe() failed");
return false;
}
if (ngx_nonblocking(pipe_fds[0]) == -1) {
ngx_log_error(NGX_LOG_ERR, log_, 0, "nonblocking pipe[0] failed");
return false;
}
if (ngx_nonblocking(pipe_fds[1]) == -1) {
ngx_log_error(NGX_LOG_ERR, log_, 0, "nonblocking pipe[1] failed");
return false;
}
pipe_fd_ = pipe_fds[1];
command_connection_ = ngx_get_connection(pipe_fds[0], log_);
if (command_connection_ == NULL) {
close(pipe_fds[1]);
close(pipe_fds[0]);
pipe_fd_ = -1;
return false;
}
command_connection_->recv = ngx_recv;
command_connection_->send = ngx_send;
command_connection_->recv_chain = ngx_recv_chain;
command_connection_->send_chain = ngx_send_chain;
command_connection_->log = log_;
command_connection_->read->log = log_;
command_connection_->write->log = log_;
command_connection_->data = this;
command_connection_->read->handler = CommandHandler;
ngx_add_event(command_connection_->read, NGX_READ_EVENT, 0);
if (proxy_.url.len == 0) {
return true;
}
@@ -215,7 +177,23 @@ namespace net_instaweb {
void NgxUrlAsyncFetcher::ShutDown() {
shutdown_ = true;
SendCmd('S');
if (!pending_fetches_.empty()) {
pending_fetches_.DeleteAll();
}
if (!active_fetches_.empty()) {
for (Pool<NgxFetch>::iterator p = active_fetches_.begin(),
e = active_fetches_.end(); p != e; p++) {
NgxFetch* fetch = *p;
fetch->CallbackDone(false);
}
active_fetches_.Clear();
}
if (event_connection_ != NULL) {
event_connection_->Shutdown();
delete event_connection_;
event_connection_ = NULL;
}
}
// It's called in the rewrite thread. All the fetches are started at
@@ -228,53 +206,24 @@ namespace net_instaweb {
message_handler, log_);
ScopedMutex lock(mutex_);
pending_fetches_.Add(fetch);
SendCmd('F');
}
// send command to nginx main thread
// 'F' : start a fetch
// 'S' : shutdown the fetcher
bool NgxUrlAsyncFetcher::SendCmd(const char command) {
int rc;
while (true) {
rc = write(pipe_fd_, &command, 1);
if (rc == 1) {
return true;
} else if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK) {
// TODO(junmin): It's rare. But it need be fixed.
} else {
return false;
}
}
return true;
// TODO(oschaaf): thread safety on written vs shutdown.
// It is possible that shutdown() is called after writing an event? In that
// case, this could (rarely) fail when it shouldn't.
bool written = event_connection_->WriteEvent(this);
CHECK(written || shutdown_) << "NgxUrlAsyncFetcher: event write failure";
}
// This is the read event which is called in the main thread.
// It will do the real work. Add the work event and start the fetch.
void NgxUrlAsyncFetcher::CommandHandler(ngx_event_t* cmdev) {
char command;
int rc;
ngx_connection_t* c = static_cast<ngx_connection_t*>(cmdev->data);
NgxUrlAsyncFetcher* fetcher = static_cast<NgxUrlAsyncFetcher*>(c->data);
do {
rc = read(c->fd, &command, 1);
} while (rc == -1 && errno == EINTR);
CHECK(rc == -1 || rc == 0 || rc == 1);
if (rc == -1 || rc == 0) {
// EAGAIN
return;
}
void NgxUrlAsyncFetcher::ReadCallback(const ps_event_data& data) {
std::vector<NgxFetch*> to_start;
NgxUrlAsyncFetcher* fetcher = reinterpret_cast<NgxUrlAsyncFetcher*>(
data.sender);
switch (command) {
// All the new fetches are appended in the pending_fetches.
// Start all these fetches.
case 'F':
fetcher->mutex_->Lock();
fetcher->completed_fetches_.DeleteAll();
for (Pool<NgxFetch>::iterator p = fetcher->pending_fetches_.begin(),
e = fetcher->pending_fetches_.end(); p != e; p++) {
NgxFetch* fetch = *p;
@@ -287,29 +236,6 @@ namespace net_instaweb {
for (size_t i = 0; i < to_start.size(); i++) {
fetcher->StartFetch(to_start[i]);
}
CHECK(ngx_handle_read_event(cmdev, 0) == NGX_OK);
break;
// Shutdown all the fetches.
case 'S':
if (!fetcher->pending_fetches_.empty()) {
fetcher->pending_fetches_.DeleteAll();
}
if (!fetcher->active_fetches_.empty()) {
for (Pool<NgxFetch>::iterator p = fetcher->active_fetches_.begin(),
e = fetcher->active_fetches_.end(); p != e; p++) {
NgxFetch* fetch = *p;
fetch->CallbackDone(false);
}
fetcher->active_fetches_.Clear();
}
CHECK(ngx_del_event(cmdev, NGX_READ_EVENT, 0) == NGX_OK);
break;
default:
break;
}
return;
}
+9 -7
View File
@@ -33,6 +33,9 @@ extern "C" {
}
#include <vector>
#include "ngx_event_connection.h"
#include "net/instaweb/http/public/url_async_fetcher.h"
#include "pagespeed/kernel/base/basictypes.h"
#include "pagespeed/kernel/base/pool.h"
@@ -60,21 +63,20 @@ class NgxUrlAsyncFetcher : public UrlAsyncFetcher {
// It should be called in the module init_process callback function. Do some
// intializations which can't be done in the master process
bool Init();
bool Init(ngx_cycle_t* cycle);
// shutdown all the fetches.
virtual void ShutDown();
// the read handler in the main thread
static void ReadCallback(const ps_event_data& data);
virtual bool SupportsHttps() const { return false; }
virtual void Fetch(const GoogleString& url,
MessageHandler* message_handler,
AsyncFetch* callback);
// send the command from the current thread to main thread
bool SendCmd(const char command);
// the read handler in the main thread
static void CommandHandler(ngx_event_t* cmdev);
bool StartFetch(NgxFetch* fetch);
// Remove the completed fetch from the active fetch set, and put it into a
@@ -137,13 +139,13 @@ class NgxUrlAsyncFetcher : public UrlAsyncFetcher {
ngx_pool_t* pool_;
ngx_log_t* log_;
ngx_connection_t* command_connection_; // the command pipe
int pipe_fd_; // the write pipe end
ngx_resolver_t* resolver_;
int max_keepalive_requests_;
ngx_msec_t resolver_timeout_;
ngx_msec_t fetch_timeout_;
NgxEventConnection* event_connection_;
DISALLOW_COPY_AND_ASSIGN(NgxUrlAsyncFetcher);
};