native fetcher: Support http keep-alive
Based on @dinic his work, add keep-alive support for the native fetcher.
Adds a new option, usable at the http{} level in configuration:
pagespeed NativeFetcherMaxKeepaliveRequests 50;
The default value is 100 (aligned to nginx). Setting the value to 1 turns off
keep-alive requests altogether).
Most notable changes:
- Request keep-alive by adding the appropriate request header
- Fixes connections getting reused while they are servicing other requests:
- Remove connection from the pool of available connections for keepalive when applicable
- Disable keepalive in more appropriate situations
- Response parsing fixes
- Remove connections that timeout from the k.a. pool
- Add a few sanity (D)CHECKS
- Emit debug messages for traceability
- Fix for ignoring ipv6 addresses returned from dns queries when ipv6 is enabled.
- Bump the fetch timeout in test configuration to deflake tests that require dns
lookups (which will be done via 8.8.8.8 currently for the native fetcher)
This commit is contained in:
+806
-474
File diff suppressed because it is too large
Load Diff
+10
-10
@@ -51,12 +51,13 @@ namespace net_instaweb {
|
||||
typedef bool (*response_handler_pt)(ngx_connection_t* c);
|
||||
|
||||
class NgxUrlAsyncFetcher;
|
||||
class NgxConnection;
|
||||
|
||||
class NgxFetch : public PoolElement<NgxFetch> {
|
||||
public:
|
||||
NgxFetch(const GoogleString& url,
|
||||
AsyncFetch* async_fetch,
|
||||
MessageHandler* message_handler,
|
||||
ngx_msec_t timeout_ms,
|
||||
ngx_log_t* log);
|
||||
~NgxFetch();
|
||||
|
||||
@@ -112,19 +113,19 @@ class NgxFetch : public PoolElement<NgxFetch> {
|
||||
response_handler = handler;
|
||||
}
|
||||
// Only the Static functions could be used in callbacks.
|
||||
static void NgxFetchResolveDone(ngx_resolver_ctx_t* ctx);
|
||||
static void ResolveDoneHandler(ngx_resolver_ctx_t* ctx);
|
||||
// Write the request.
|
||||
static void NgxFetchWrite(ngx_event_t* wev);
|
||||
static void ConnectionWriteHandler(ngx_event_t* wev);
|
||||
// Wait for the response.
|
||||
static void NgxFetchRead(ngx_event_t* rev);
|
||||
static void ConnectionReadHandler(ngx_event_t* rev);
|
||||
// Read and parse the first status line.
|
||||
static bool NgxFetchHandleStatusLine(ngx_connection_t* c);
|
||||
static bool HandleStatusLine(ngx_connection_t* c);
|
||||
// Read and parse the HTTP headers.
|
||||
static bool NgxFetchHandleHeader(ngx_connection_t* c);
|
||||
static bool HandleHeader(ngx_connection_t* c);
|
||||
// Read the response body.
|
||||
static bool NgxFetchHandleBody(ngx_connection_t* c);
|
||||
static bool HandleBody(ngx_connection_t* c);
|
||||
// Cancel the fetch when it's timeout.
|
||||
static void NgxFetchTimeout(ngx_event_t* tev);
|
||||
static void TimeoutHandler(ngx_event_t* tev);
|
||||
|
||||
// Add the pagespeed User-Agent.
|
||||
void FixUserAgent();
|
||||
@@ -139,7 +140,6 @@ class NgxFetch : public PoolElement<NgxFetch> {
|
||||
int64 bytes_received_;
|
||||
int64 fetch_start_ms_;
|
||||
int64 fetch_end_ms_;
|
||||
int64 timeout_ms_;
|
||||
bool done_;
|
||||
int64 content_length_;
|
||||
bool content_length_known_;
|
||||
@@ -152,7 +152,7 @@ class NgxFetch : public PoolElement<NgxFetch> {
|
||||
ngx_http_request_t* r_;
|
||||
ngx_http_status_t* status_;
|
||||
ngx_event_t* timeout_event_;
|
||||
ngx_connection_t* connection_;
|
||||
NgxConnection* connection_;
|
||||
ngx_resolver_ctx_t* resolver_ctx_;
|
||||
|
||||
DISALLOW_COPY_AND_ASSIGN(NgxFetch);
|
||||
|
||||
@@ -2662,8 +2662,8 @@ bool ps_request_body_to_string_piece(
|
||||
|
||||
if (ret < 0) {
|
||||
ngx_log_error(NGX_LOG_WARN, r->connection->log, 0,
|
||||
"ps_request_body_to_string_piece: "
|
||||
"error reading post body.");
|
||||
"ps_request_body_to_string_piece: "
|
||||
"error reading post body.");
|
||||
return false;
|
||||
}
|
||||
|
||||
|
||||
@@ -78,6 +78,8 @@ NgxRewriteDriverFactory::NgxRewriteDriverFactory(
|
||||
log_(NULL),
|
||||
resolver_timeout_(NGX_CONF_UNSET_MSEC),
|
||||
use_native_fetcher_(false),
|
||||
// 100 Aligns to nginx's server-side default.
|
||||
native_fetcher_max_keepalive_requests_(100),
|
||||
ngx_shared_circular_buffer_(NULL),
|
||||
hostname_(hostname.as_string()),
|
||||
port_(port),
|
||||
@@ -112,6 +114,7 @@ UrlAsyncFetcher* NgxRewriteDriverFactory::AllocateFetcher(
|
||||
resolver_timeout_,
|
||||
config->blocking_fetch_timeout_ms(),
|
||||
resolver_,
|
||||
native_fetcher_max_keepalive_requests_,
|
||||
thread_system(),
|
||||
message_handler());
|
||||
ngx_url_async_fetchers_.push_back(fetcher);
|
||||
|
||||
@@ -105,6 +105,12 @@ class NgxRewriteDriverFactory : public SystemRewriteDriverFactory {
|
||||
void set_use_native_fetcher(bool x) {
|
||||
use_native_fetcher_ = x;
|
||||
}
|
||||
int native_fetcher_max_keepalive_requests() {
|
||||
return native_fetcher_max_keepalive_requests_;
|
||||
}
|
||||
void set_native_fetcher_max_keepalive_requests(int x) {
|
||||
native_fetcher_max_keepalive_requests_ = x;
|
||||
}
|
||||
bool process_script_variables() {
|
||||
return process_script_variables_;
|
||||
}
|
||||
@@ -140,6 +146,8 @@ class NgxRewriteDriverFactory : public SystemRewriteDriverFactory {
|
||||
ngx_msec_t resolver_timeout_;
|
||||
ngx_resolver_t* resolver_;
|
||||
bool use_native_fetcher_;
|
||||
int native_fetcher_max_keepalive_requests_;
|
||||
|
||||
typedef std::set<NgxMessageHandler*> NgxMessageHandlerSet;
|
||||
NgxMessageHandlerSet server_context_message_handlers_;
|
||||
|
||||
|
||||
@@ -72,12 +72,14 @@ const char* const server_only_options[] = {
|
||||
"LoadFromFileMatch",
|
||||
"LoadFromFileRule",
|
||||
"LoadFromFileRuleMatch",
|
||||
"UseNativeFetcher"
|
||||
"UseNativeFetcher",
|
||||
"NativeFetcherMaxKeepaliveRequests"
|
||||
};
|
||||
|
||||
// Options that can only be used in the main (http) option scope.
|
||||
const char* const main_only_options[] = {
|
||||
"UseNativeFetcher"
|
||||
"UseNativeFetcher",
|
||||
"NativeFetcherMaxKeepaliveRequests"
|
||||
};
|
||||
|
||||
} // namespace
|
||||
@@ -343,6 +345,16 @@ const char* NgxRewriteOptions::ParseAndSetOptions(
|
||||
result = ParseAndSetOptionHelper<NgxRewriteDriverFactory>(
|
||||
arg, driver_factory,
|
||||
&NgxRewriteDriverFactory::set_use_native_fetcher);
|
||||
} else if (IsDirective(directive, "NativeFetcherMaxKeepaliveRequests")) {
|
||||
int max_keepalive_requests;
|
||||
if (StringToInt(arg, &max_keepalive_requests) &&
|
||||
max_keepalive_requests > 0) {
|
||||
driver_factory->set_native_fetcher_max_keepalive_requests(
|
||||
max_keepalive_requests);
|
||||
result = RewriteOptions::kOptionOk;
|
||||
} else {
|
||||
result = RewriteOptions::kOptionValueInvalid;
|
||||
}
|
||||
} else if (StringCaseEqual("ProcessScriptVariables", args[0])) {
|
||||
if (scope == RewriteOptions::kProcessScopeStrict) {
|
||||
if (StringCaseEqual(arg, "on")) {
|
||||
|
||||
@@ -55,6 +55,7 @@ namespace net_instaweb {
|
||||
ngx_msec_t resolver_timeout,
|
||||
ngx_msec_t fetch_timeout,
|
||||
ngx_resolver_t* resolver,
|
||||
int max_keepalive_requests,
|
||||
ThreadSystem* thread_system,
|
||||
MessageHandler* handler)
|
||||
: fetchers_count_(0),
|
||||
@@ -63,7 +64,8 @@ namespace net_instaweb {
|
||||
byte_count_(0),
|
||||
thread_system_(thread_system),
|
||||
message_handler_(handler),
|
||||
mutex_(NULL) {
|
||||
mutex_(NULL),
|
||||
max_keepalive_requests_(max_keepalive_requests) {
|
||||
resolver_timeout_ = resolver_timeout;
|
||||
fetch_timeout_ = fetch_timeout;
|
||||
ngx_memzero(&proxy_, sizeof(proxy_));
|
||||
@@ -223,7 +225,7 @@ namespace net_instaweb {
|
||||
AsyncFetch* async_fetch) {
|
||||
async_fetch = EnableInflation(async_fetch);
|
||||
NgxFetch* fetch = new NgxFetch(url, async_fetch,
|
||||
message_handler, fetch_timeout_, log_);
|
||||
message_handler, log_);
|
||||
ScopedMutex lock(mutex_);
|
||||
pending_fetches_.Add(fetch);
|
||||
SendCmd('F');
|
||||
|
||||
@@ -53,7 +53,8 @@ class NgxUrlAsyncFetcher : public UrlAsyncFetcher {
|
||||
NgxUrlAsyncFetcher(
|
||||
const char* proxy, ngx_log_t* log, ngx_msec_t resolver_timeout,
|
||||
ngx_msec_t fetch_timeout, ngx_resolver_t* resolver,
|
||||
ThreadSystem* thread_system, MessageHandler* handler);
|
||||
int max_keepalive_requests, ThreadSystem* thread_system,
|
||||
MessageHandler* handler);
|
||||
|
||||
~NgxUrlAsyncFetcher();
|
||||
|
||||
@@ -139,6 +140,7 @@ class NgxUrlAsyncFetcher : public UrlAsyncFetcher {
|
||||
ngx_connection_t* command_connection_; // the command pipe
|
||||
int pipe_fd_; // the write pipe end
|
||||
ngx_resolver_t* resolver_;
|
||||
int max_keepalive_requests_;
|
||||
ngx_msec_t resolver_timeout_;
|
||||
ngx_msec_t fetch_timeout_;
|
||||
|
||||
|
||||
@@ -38,6 +38,10 @@ http {
|
||||
pagespeed StaticAssetPrefix /pagespeed_custom_static/;
|
||||
|
||||
pagespeed MessageBufferSize 200000;
|
||||
# Increase the default fetcher timeout to resolve sporadic flakeyness when
|
||||
# the native fetcher uses 8.8.8.8 to resolve.
|
||||
pagespeed FetcherTimeoutMs 10000;
|
||||
pagespeed NativeFetcherMaxKeepaliveRequests 50;
|
||||
|
||||
root "@@SERVER_ROOT@@";
|
||||
|
||||
|
||||
Reference in New Issue
Block a user