NgxConnection: single event mechanism for NgxFetch and NgxBaseFetch

Abstract the pipe communication into NgxEventConnection, for reuse
by NgxBaseFetch and NgxUrlAsyncFetcher.

Based on Chai's earlier work, but with a few fixes discovered
while working on this and SPDY module compatibility

- Uses less file descriptors, I expect this to be faster but need
  measurement is needed to back that.
- Fixed NgxUrlAsyncFetcher actually shutting down its fetchers.
- Fixes a bug where we wouldn't clean idle pooled NgxConnections.
- Fixes a bug for requests that are finalized mid-IPRO lookup.
- Makes us use ngx_handle_read_event/ngx_del_event
This commit is contained in:
Otto van der Schaaf
2014-10-16 21:52:11 +02:00
parent bff15040be
commit 5dfe42f3d3
12 changed files with 683 additions and 508 deletions
+94 -29
View File
@@ -17,6 +17,7 @@
// Author: jefftk@google.com (Jeff Kaufman)
#include "ngx_base_fetch.h"
#include "ngx_event_connection.h"
#include "ngx_list_iterator.h"
#include "ngx_pagespeed.h"
@@ -28,7 +29,13 @@
namespace net_instaweb {
NgxBaseFetch::NgxBaseFetch(ngx_http_request_t* r, int pipe_fd,
const char kHeadersComplete = 'H';
const char kFlush = 'F';
const char kDone = 'D';
NgxEventConnection* NgxBaseFetch::event_connection = NULL;
NgxBaseFetch::NgxBaseFetch(ngx_http_request_t* r,
NgxServerContext* server_context,
const RequestContextPtr& request_ctx,
PreserveCachingHeaders preserve_caching_headers)
@@ -37,10 +44,10 @@ NgxBaseFetch::NgxBaseFetch(ngx_http_request_t* r, int pipe_fd,
server_context_(server_context),
done_called_(false),
last_buf_sent_(false),
pipe_fd_(pipe_fd),
references_(2),
ipro_lookup_(false),
preserve_caching_headers_(preserve_caching_headers) {
preserve_caching_headers_(preserve_caching_headers),
detached_(false) {
if (pthread_mutex_init(&mutex_, NULL)) CHECK(0);
}
@@ -48,6 +55,66 @@ NgxBaseFetch::~NgxBaseFetch() {
pthread_mutex_destroy(&mutex_);
}
bool NgxBaseFetch::Initialize(ngx_cycle_t* cycle) {
CHECK(event_connection == NULL) << "event connection already set";
event_connection = new NgxEventConnection(ReadCallback);
return event_connection->Init(cycle);
}
void NgxBaseFetch::Terminate() {
if (event_connection != NULL) {
delete event_connection;
event_connection = NULL;
}
}
void NgxBaseFetch::ReadCallback(const ps_event_data& data) {
NgxBaseFetch* base_fetch = reinterpret_cast<NgxBaseFetch*>(data.sender);
ngx_http_request_t* r = base_fetch->request();
bool detached = base_fetch->detached();
int refcount = base_fetch->DecrementRefCount();
#if (NGX_DEBUG)
ngx_log_error(NGX_LOG_DEBUG, ngx_cycle->log, 0,
"pagespeed [%p] event: %c. bf:%p - refcnt:%d - det: %c", r,
data.type, base_fetch, refcount, detached ? 'Y': 'N');
#endif
// If we ended up destructing the base fetch, or the request context is
// detached, skip this event.
if (refcount == 0 || detached) {
return;
}
ps_request_ctx_t* ctx = ps_get_request_context(r);
CHECK(data.sender == ctx->base_fetch);
// ngx_base_fetch_handler() ends up setting ctx->fetch_done, which
// means we shouldn't call it anymore.
if (ctx->fetch_done) {
return;
}
CHECK(r->count > 0) << "r->count: " << r->count;
// If we are unlucky enough to have our connection finalized mid-ipro-lookup,
// we must enter a different flow. Also see ps_in_place_check_header_filter().
if (!ctx->base_fetch->ipro_lookup_ && r->connection->error) {
ngx_log_error(NGX_LOG_DEBUG, ngx_cycle->log, 0,
"pagespeed [%p] request already finalized", r);
ngx_http_finalize_request(r, NGX_ERROR);
return;
}
int rc = ps_base_fetch::ps_base_fetch_handler(r);
#if (NGX_DEBUG)
ngx_log_error(NGX_LOG_DEBUG, ngx_cycle->log, 0,
"pagespeed [%p] ps_base_fetch_handler() returned %d for %c",
r, rc, data.type);
#endif
ngx_http_finalize_request(r, rc);
}
void NgxBaseFetch::Lock() {
pthread_mutex_lock(&mutex_);
}
@@ -115,21 +182,15 @@ ngx_int_t NgxBaseFetch::CollectHeaders(ngx_http_headers_out_t* headers_out) {
preserve_caching_headers_);
}
void NgxBaseFetch::RequestCollection() {
int rc;
char c = 'A'; // What byte we write is arbitrary.
while (true) {
rc = write(pipe_fd_, &c, 1);
if (rc == 1) {
break;
} else if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK) {
// TODO(jefftk): is this rare enough that spinning isn't a problem? Could
// we get into a case where the pipe fills up and we spin forever?
} else {
perror("NgxBaseFetch::RequestCollection");
break;
}
void NgxBaseFetch::RequestCollection(char type) {
// We must optimistically increment the refcount, and decrement it
// when we conclude we failed. If we only increment on a successfull write,
// there's a small chance that between writing and adding to the refcount
// both pagespeed and nginx will release their refcount -- destructing
// this NgxBaseFetch instance.
IncrementRefCount();
if (!event_connection->WriteEvent(type, this)) {
DecrementRefCount();
}
}
@@ -147,38 +208,42 @@ void NgxBaseFetch::HandleHeadersComplete() {
// For the IPRO lookup, supress notification of the nginx side here.
// If we send both this event and the one from done, nasty stuff will happen
// if we loose the race with with the nginx side destructing this base fetch
// instance (and thereby clearing the byte and its pending extraneous event.
// instance (and thereby clearing the byte and its pending extraneous event).
if (!ipro_lookup_) {
RequestCollection(); // Headers available.
RequestCollection(kHeadersComplete); // Headers available.
}
}
bool NgxBaseFetch::HandleFlush(MessageHandler* handler) {
RequestCollection(); // A new part of the response body is available.
RequestCollection(kFlush); // A new part of the response body is available
return true;
}
void NgxBaseFetch::Release() {
DecrefAndDeleteIfUnreferenced();
int NgxBaseFetch::DecrementRefCount() {
return DecrefAndDeleteIfUnreferenced();
}
void NgxBaseFetch::DecrefAndDeleteIfUnreferenced() {
int NgxBaseFetch::IncrementRefCount() {
return __sync_add_and_fetch(&references_, 1);
}
int NgxBaseFetch::DecrefAndDeleteIfUnreferenced() {
// Creates a full memory barrier.
if (__sync_add_and_fetch(&references_, -1) == 0) {
int r = __sync_add_and_fetch(&references_, -1);
if (r == 0) {
delete this;
}
return r;
}
void NgxBaseFetch::HandleDone(bool success) {
// TODO(jefftk): it's possible that instead of locking here we can just modify
// CopyBufferToNginx to only read done_called_ once.
CHECK(!done_called_) << "Done already called!";
Lock();
done_called_ = true;
Unlock();
close(pipe_fd_); // Indicates to nginx that we're done with the rewrite.
pipe_fd_ = -1;
RequestCollection(kDone);
DecrefAndDeleteIfUnreferenced();
}