Merge pull request #899 from pagespeed/oschaaf-trunk-tracking-shutdown
trunk-tracking-shutdown: improve shutdown code
This commit is contained in:
@@ -15,6 +15,7 @@
|
||||
*/
|
||||
|
||||
// Author: jefftk@google.com (Jeff Kaufman)
|
||||
#include <unistd.h> //for usleep
|
||||
|
||||
#include "ngx_base_fetch.h"
|
||||
#include "ngx_event_connection.h"
|
||||
@@ -25,6 +26,7 @@
|
||||
#include "net/instaweb/rewriter/public/rewrite_stats.h"
|
||||
#include "pagespeed/kernel/base/google_message_handler.h"
|
||||
#include "pagespeed/kernel/base/message_handler.h"
|
||||
#include "pagespeed/kernel/base/posix_timer.h"
|
||||
#include "pagespeed/kernel/http/response_headers.h"
|
||||
|
||||
namespace net_instaweb {
|
||||
@@ -34,6 +36,7 @@ const char kFlush = 'F';
|
||||
const char kDone = 'D';
|
||||
|
||||
NgxEventConnection* NgxBaseFetch::event_connection = NULL;
|
||||
int NgxBaseFetch::active_base_fetches = 0;
|
||||
|
||||
NgxBaseFetch::NgxBaseFetch(ngx_http_request_t* r,
|
||||
NgxServerContext* server_context,
|
||||
@@ -51,10 +54,12 @@ NgxBaseFetch::NgxBaseFetch(ngx_http_request_t* r,
|
||||
detached_(false),
|
||||
suppress_(false) {
|
||||
if (pthread_mutex_init(&mutex_, NULL)) CHECK(0);
|
||||
__sync_add_and_fetch(&NgxBaseFetch::active_base_fetches, 1);
|
||||
}
|
||||
|
||||
NgxBaseFetch::~NgxBaseFetch() {
|
||||
pthread_mutex_destroy(&mutex_);
|
||||
__sync_add_and_fetch(&NgxBaseFetch::active_base_fetches, -1);
|
||||
}
|
||||
|
||||
bool NgxBaseFetch::Initialize(ngx_cycle_t* cycle) {
|
||||
@@ -65,6 +70,31 @@ bool NgxBaseFetch::Initialize(ngx_cycle_t* cycle) {
|
||||
|
||||
void NgxBaseFetch::Terminate() {
|
||||
if (event_connection != NULL) {
|
||||
GoogleMessageHandler handler;
|
||||
PosixTimer timer;
|
||||
int64 timeout_us = Timer::kSecondUs * 30;
|
||||
int64 end_us = timer.NowUs() + timeout_us;
|
||||
static unsigned int sleep_microseconds = 100;
|
||||
|
||||
handler.Message(
|
||||
kInfo,"NgxBaseFetch::Terminate rounding up %d active base fetches.",
|
||||
NgxBaseFetch::active_base_fetches);
|
||||
|
||||
// Try to continue processing and get the active base fetch count to 0
|
||||
// untill the timeout expires.
|
||||
// TODO(oschaaf): This needs more work.
|
||||
while (NgxBaseFetch::active_base_fetches > 0 && end_us > timer.NowUs()) {
|
||||
event_connection->Drain();
|
||||
usleep(sleep_microseconds);
|
||||
}
|
||||
|
||||
if (NgxBaseFetch::active_base_fetches != 0) {
|
||||
handler.Message(
|
||||
kWarning,"NgxBaseFetch::Terminate timed out with %d active base fetches.",
|
||||
NgxBaseFetch::active_base_fetches);
|
||||
}
|
||||
|
||||
// Close down the named pipe.
|
||||
event_connection->Shutdown();
|
||||
delete event_connection;
|
||||
event_connection = NULL;
|
||||
|
||||
@@ -78,11 +78,17 @@ class NgxBaseFetch : public AsyncFetch {
|
||||
PreserveCachingHeaders preserve_caching_headers,
|
||||
NgxBaseFetchType base_fetch_type);
|
||||
virtual ~NgxBaseFetch();
|
||||
|
||||
// Statically initializes event_connection, require for PSOL and nginx to
|
||||
// communicate.
|
||||
static bool Initialize(ngx_cycle_t* cycle);
|
||||
|
||||
// Attempts to finish up request processing queued up in the named pipe and
|
||||
// PSOL for a fixed amount of time. If time is up, a fast and rough shutdown
|
||||
// is attempted.
|
||||
// Statically terminates and NULLS event_connection.
|
||||
static void Terminate();
|
||||
|
||||
static void ReadCallback(const ps_event_data& data);
|
||||
|
||||
// Puts a chain in link_ptr if we have any output data buffered. Returns
|
||||
@@ -145,6 +151,9 @@ class NgxBaseFetch : public AsyncFetch {
|
||||
int DecrefAndDeleteIfUnreferenced();
|
||||
|
||||
static NgxEventConnection* event_connection;
|
||||
|
||||
// Live count of NgxBaseFetch instances that are currently in use.
|
||||
static int active_base_fetches;
|
||||
|
||||
ngx_http_request_t* request_;
|
||||
GoogleString buffer_;
|
||||
|
||||
@@ -159,10 +159,14 @@ bool NgxEventConnection::WriteEvent(char type, void* sender) {
|
||||
return false;
|
||||
}
|
||||
|
||||
// Reads and processes what is available in the pipe.
|
||||
void NgxEventConnection::Drain() {
|
||||
NgxEventConnection::ReadAndNotify(pipe_read_fd_);
|
||||
}
|
||||
|
||||
void NgxEventConnection::Shutdown() {
|
||||
close(pipe_write_fd_);
|
||||
// Drain the pipe, process final events, and shut down.
|
||||
while (NgxEventConnection::ReadAndNotify(pipe_read_fd_));
|
||||
close(pipe_read_fd_);
|
||||
}
|
||||
|
||||
} // namespace net_instaweb
|
||||
|
||||
@@ -64,6 +64,8 @@ class NgxEventConnection {
|
||||
bool WriteEvent(char type, void* sender);
|
||||
// Convenience overload for clients that have a single event type.
|
||||
bool WriteEvent(void* sender);
|
||||
// Reads and processes what is available in the named pipe's buffer.
|
||||
void Drain();
|
||||
private:
|
||||
static bool CreateNgxConnection(ngx_cycle_t* cycle, ngx_fd_t pipe_fd);
|
||||
static void ReadEventHandler(ngx_event_t* e);
|
||||
|
||||
+34
-5
@@ -85,6 +85,8 @@ extern ngx_module_t ngx_pagespeed;
|
||||
// Needed for SystemRewriteDriverFactory to use shared memory.
|
||||
#define PAGESPEED_SUPPORT_POSIX_SHARED_MEM
|
||||
|
||||
net_instaweb::NgxRewriteDriverFactory* active_driver_factory = NULL;
|
||||
|
||||
namespace net_instaweb {
|
||||
|
||||
const char* kInternalEtagName = "@psol-etag";
|
||||
@@ -880,11 +882,12 @@ void ps_cleanup_srv_conf(void* data) {
|
||||
// to be shut down when we destroy any proxy_fetch_factories. This
|
||||
// will prevent any queued callbacks to destroyed proxy fetch factories
|
||||
// from being executed
|
||||
|
||||
if (!factory_deleted && cfg_s->server_context != NULL) {
|
||||
if (active_driver_factory == cfg_s->server_context->factory()) {
|
||||
active_driver_factory = NULL;
|
||||
}
|
||||
delete cfg_s->server_context->factory();
|
||||
factory_deleted = true;
|
||||
NgxBaseFetch::Terminate();
|
||||
}
|
||||
if (cfg_s->proxy_fetch_factory != NULL) {
|
||||
delete cfg_s->proxy_fetch_factory;
|
||||
@@ -931,12 +934,20 @@ void ps_set_conf_cleanup_handler(
|
||||
}
|
||||
|
||||
void terminate_process_context() {
|
||||
if (active_driver_factory != NULL) {
|
||||
// If we got here, that means we are in the cache loader/manager
|
||||
// or did not get a chance to cleanup otherwise.
|
||||
delete active_driver_factory;
|
||||
active_driver_factory = NULL;
|
||||
NgxBaseFetch::Terminate();
|
||||
}
|
||||
delete process_context;
|
||||
process_context = NULL;
|
||||
}
|
||||
|
||||
void* ps_create_main_conf(ngx_conf_t* cf) {
|
||||
if (!process_context_cleanup_hooked) {
|
||||
SystemRewriteDriverFactory::InitApr();
|
||||
atexit(terminate_process_context);
|
||||
process_context_cleanup_hooked = true;
|
||||
}
|
||||
@@ -953,6 +964,7 @@ void* ps_create_main_conf(ngx_conf_t* cf) {
|
||||
new SystemThreadSystem(),
|
||||
"" /* hostname, not used */,
|
||||
-1 /* port, not used */);
|
||||
active_driver_factory = cfg_m->driver_factory;
|
||||
cfg_m->driver_factory->Init();
|
||||
ps_set_conf_cleanup_handler(cf, ps_cleanup_main_conf, cfg_m);
|
||||
return cfg_m;
|
||||
@@ -1648,6 +1660,17 @@ ngx_int_t ps_resource_handler(ngx_http_request_t* r,
|
||||
ps_srv_conf_t* cfg_s = ps_get_srv_config(r);
|
||||
ps_request_ctx_t* ctx = ps_get_request_context(r);
|
||||
|
||||
if (ngx_terminate || ngx_exiting) {
|
||||
cfg_s->server_context->message_handler()->Message(
|
||||
kInfo, "ps_resource_handler declining: nginx worker is shutting down");
|
||||
|
||||
if (ctx == NULL) {
|
||||
return NGX_DECLINED;
|
||||
}
|
||||
ps_release_base_fetch(ctx);
|
||||
return NGX_DECLINED;
|
||||
}
|
||||
|
||||
CHECK(!(html_rewrite && (ctx == NULL || ctx->html_rewrite == false)));
|
||||
|
||||
if (!html_rewrite &&
|
||||
@@ -2967,10 +2990,18 @@ ngx_int_t ps_init_module(ngx_cycle_t* cycle) {
|
||||
} else {
|
||||
delete cfg_m->driver_factory;
|
||||
cfg_m->driver_factory = NULL;
|
||||
active_driver_factory = NULL;
|
||||
}
|
||||
return NGX_OK;
|
||||
}
|
||||
|
||||
void ps_exit_child_process(ngx_cycle_t* cycle) {
|
||||
ps_main_conf_t* cfg_m = static_cast<ps_main_conf_t*>(
|
||||
ngx_http_cycle_get_module_main_conf(cycle, ngx_pagespeed));
|
||||
NgxBaseFetch::Terminate();
|
||||
cfg_m->driver_factory->ShutDown();
|
||||
}
|
||||
|
||||
// Called when nginx forks worker processes. No threads should be started
|
||||
// before this.
|
||||
ngx_int_t ps_init_child_process(ngx_cycle_t* cycle) {
|
||||
@@ -2980,7 +3011,6 @@ ngx_int_t ps_init_child_process(ngx_cycle_t* cycle) {
|
||||
return NGX_OK;
|
||||
}
|
||||
|
||||
SystemRewriteDriverFactory::InitApr();
|
||||
if (!NgxBaseFetch::Initialize(cycle)) {
|
||||
return NGX_ERROR;
|
||||
}
|
||||
@@ -3016,7 +3046,6 @@ ngx_int_t ps_init_child_process(ngx_cycle_t* cycle) {
|
||||
return NGX_ERROR;
|
||||
}
|
||||
cfg_m->driver_factory->StartThreads();
|
||||
|
||||
return NGX_OK;
|
||||
}
|
||||
|
||||
@@ -3049,7 +3078,7 @@ ngx_module_t ngx_pagespeed = {
|
||||
net_instaweb::ps_init_child_process,
|
||||
NULL,
|
||||
NULL,
|
||||
NULL,
|
||||
net_instaweb::ps_exit_child_process,
|
||||
NULL,
|
||||
NGX_MODULE_V1_PADDING
|
||||
};
|
||||
|
||||
@@ -84,7 +84,8 @@ NgxRewriteDriverFactory::NgxRewriteDriverFactory(
|
||||
hostname_(hostname.as_string()),
|
||||
port_(port),
|
||||
process_script_variables_(false),
|
||||
process_script_variables_set_(false) {
|
||||
process_script_variables_set_(false),
|
||||
shut_down_(false) {
|
||||
InitializeDefaultOptions();
|
||||
default_options()->set_beacon_url("/ngx_pagespeed_beacon");
|
||||
SystemRewriteOptions* system_options = dynamic_cast<SystemRewriteOptions*>(
|
||||
@@ -188,6 +189,13 @@ ServerContext* NgxRewriteDriverFactory::NewServerContext() {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
void NgxRewriteDriverFactory::ShutDown() {
|
||||
if (!shut_down_) {
|
||||
shut_down_ = true;
|
||||
SystemRewriteDriverFactory::ShutDown();
|
||||
}
|
||||
}
|
||||
|
||||
void NgxRewriteDriverFactory::ShutDownMessageHandlers() {
|
||||
ngx_message_handler_->set_buffer(NULL);
|
||||
ngx_html_parse_message_handler_->set_buffer(NULL);
|
||||
|
||||
@@ -72,6 +72,7 @@ class NgxRewriteDriverFactory : public SystemRewriteDriverFactory {
|
||||
static void InitStats(Statistics* statistics);
|
||||
NgxServerContext* MakeNgxServerContext(StringPiece hostname, int port);
|
||||
virtual ServerContext* NewServerContext();
|
||||
virtual void ShutDown();
|
||||
|
||||
// Starts pagespeed threads if they've not been started already. Must be
|
||||
// called after the caller has finished any forking it intends to do.
|
||||
@@ -155,6 +156,7 @@ class NgxRewriteDriverFactory : public SystemRewriteDriverFactory {
|
||||
int port_;
|
||||
bool process_script_variables_;
|
||||
bool process_script_variables_set_;
|
||||
bool shut_down_;
|
||||
|
||||
DISALLOW_COPY_AND_ASSIGN(NgxRewriteDriverFactory);
|
||||
};
|
||||
|
||||
+92
-10
@@ -66,7 +66,7 @@ function keepalive_test() {
|
||||
NGX_LOG_FILE="$1.error.log"
|
||||
POST_DATA=$3
|
||||
|
||||
for ((i=0; i < 100; i++)); do
|
||||
for ((i=0; i < 10; i++)); do
|
||||
for accept_encoding in "" "gzip"; do
|
||||
if [ -z "$POST_DATA" ]; then
|
||||
curl -m 2 -S -s -v -H "Accept-Encoding: $accept_encoding" \
|
||||
@@ -116,11 +116,25 @@ function keepalive_test() {
|
||||
check [ -z "$OUT" ]
|
||||
}
|
||||
|
||||
function fire_ab_load() {
|
||||
AB_PID="0"
|
||||
if hash ab 2>/dev/null; then
|
||||
ab -n 10000 -k -c 100 http://$PRIMARY_HOSTNAME/ &>/dev/null & AB_PID=$!
|
||||
# Sleep to allow some queueing up of requests
|
||||
else
|
||||
echo "ab is not available, not able to test stressed shutdown and reload."
|
||||
fi
|
||||
sleep 2
|
||||
}
|
||||
|
||||
this_dir="$( cd $(dirname "$0") && pwd)"
|
||||
|
||||
# stop nginx
|
||||
killall nginx
|
||||
# stop nginx/valgrind
|
||||
killall -s KILL nginx
|
||||
# TODO(oschaaf): Fix waiting for valgrind on 32 bits systems.
|
||||
killall -s KILL memcheck-amd64-
|
||||
while pgrep nginx > /dev/null; do sleep 1; done
|
||||
while pgrep memcheck > /dev/null; do sleep 1; done
|
||||
|
||||
TEST_TMP="$this_dir/tmp"
|
||||
rm -r "$TEST_TMP"
|
||||
@@ -291,6 +305,7 @@ if $USE_VALGRIND; then
|
||||
~IPRO flow uses cache as expected.~
|
||||
~IPRO flow doesn't copy uncacheable resources multiple times.~
|
||||
~inline_unauthorized_resources allows unauthorized css selectors~
|
||||
~Blocking rewrite enabled.~
|
||||
"
|
||||
fi
|
||||
|
||||
@@ -523,11 +538,21 @@ check test $(scrape_stat image_rewrite_total_original_bytes) -ge 10000
|
||||
# happens both before and after.
|
||||
start_test "Reload config"
|
||||
|
||||
# Fire up some heavy load if ab is available to test a stressed reload.
|
||||
# TODO(oschaaf): make sure we wait for the new worker to get ready to accept
|
||||
# requests.
|
||||
fire_ab_load
|
||||
|
||||
check wget $EXAMPLE_ROOT/styles/W.rewrite_css_images.css.pagespeed.cf.Hash.css \
|
||||
-O /dev/null
|
||||
check_simple "$NGINX_EXECUTABLE" -s reload -c "$PAGESPEED_CONF"
|
||||
|
||||
check wget $EXAMPLE_ROOT/styles/W.rewrite_css_images.css.pagespeed.cf.Hash.css \
|
||||
-O /dev/null
|
||||
if [ "$AB_PID" != "0" ]; then
|
||||
echo "Kill ab (pid: $AB_PID)"
|
||||
kill -s KILL $AB_PID &>/dev/null || true
|
||||
fi
|
||||
|
||||
# This is dependent upon having a beacon handler.
|
||||
test_filter add_instrumentation beacons load.
|
||||
@@ -1096,7 +1121,7 @@ check_not_from "$OUT" fgrep -qi 'addInstrumentationInit'
|
||||
if [ "$NATIVE_FETCHER" != "on" ]; then
|
||||
start_test Test that we can rewrite an HTTPS resource.
|
||||
fetch_until $TEST_ROOT/https_fetch/https_fetch.html \
|
||||
'grep -c /https_gstatic_dot_com/1.gif.pagespeed.ce' 1
|
||||
'grep -c /https_gstatic_dot_com/1.gif.pagespeed.ce' 1
|
||||
fi
|
||||
|
||||
start_test Base config has purging disabled. Check error message syntax.
|
||||
@@ -1153,19 +1178,76 @@ check_from "$OUT" fgrep -qi '404'
|
||||
MATCHES=$(echo "$OUT" | grep -c "Cache-Control: override") || true
|
||||
check [ $MATCHES -eq 1 ]
|
||||
|
||||
start_test Shutting down.
|
||||
|
||||
# Fire up some heavy load if ab is available to test a stressed shutdown
|
||||
fire_ab_load
|
||||
|
||||
if $USE_VALGRIND; then
|
||||
# It is possible that there are still ProxyFetches outstanding
|
||||
# at this point in time. Give them a few extra seconds to allow
|
||||
# them to finish, so they will not generate valgrind complaints
|
||||
echo "Sleeping 30 seconds to allow outstanding ProxyFetches to finish."
|
||||
sleep 30
|
||||
kill -s quit $VALGRIND_PID
|
||||
wait
|
||||
while pgrep memcheck > /dev/null; do sleep 1; done
|
||||
# Clear the previously set trap, we don't need it anymore.
|
||||
trap - EXIT
|
||||
|
||||
start_test No Valgrind complaints.
|
||||
check_not [ -s "$TEST_TMP/valgrind.log" ]
|
||||
else
|
||||
check_simple "$NGINX_EXECUTABLE" -s quit -c "$PAGESPEED_CONF"
|
||||
while pgrep nginx > /dev/null; do sleep 1; done
|
||||
fi
|
||||
|
||||
if [ "$AB_PID" != "0" ]; then
|
||||
echo "Kill ab (pid: $AB_PID)"
|
||||
killall -s KILL $AB_PID &>/dev/null || true
|
||||
fi
|
||||
|
||||
start_test Logged output looks healthy.
|
||||
|
||||
# TODO(oschaaf): Sanity check for all the warnings/errors here.
|
||||
OUT=$(cat "test/tmp/error.log" \
|
||||
| grep "\\[" \
|
||||
| grep -v "\\[debug\\]" \
|
||||
| grep -v "\\[info\\]" \
|
||||
| grep -v "\\[notice\\]" \
|
||||
| grep -v "\\[warn\\].*Cache Flush.*" \
|
||||
| grep -v "\\[warn\\].*doesnotexist.css.*" \
|
||||
| grep -v "\\[warn\\].*Invalid filter name: bogus.*" \
|
||||
| grep -v "\\[warn\\].*You seem to have downstream caching.*" \
|
||||
| grep -v "\\[warn\\].*Warning_trigger*" \
|
||||
| grep -v "\\[warn\\].*Rewrite http://www.google.com/mod_pagespeed_example/ failed*" \
|
||||
| grep -v "\\[warn\\].*A.bad:0:Resource*" \
|
||||
| grep -v "\\[warn\\].*W.bad.pagespeed.cf.hash.css*" \
|
||||
| grep -v "\\[warn\\].*BadName*" \
|
||||
| grep -v "\\[warn\\].*CSS parsing error*" \
|
||||
| grep -v "\\[warn\\].*Fetch failed for resource*" \
|
||||
| grep -v "\\[warn\\].*Rewrite.*example.pdf failed*" \
|
||||
| grep -v "\\[warn\\].*Rewrite.*hello.js failed*" \
|
||||
| grep -v "\\[warn\\].*Resource based on.*ngx_pagespeed_statistics.*" \
|
||||
| grep -v "\\[warn\\].*Canceling 1 functions on sequence Shutdown.*" \
|
||||
| grep -v "\\[warn\\].*using uninitialized.*" \
|
||||
| grep -v "\\[error\\].*BadName*" \
|
||||
| grep -v "\\[error\\].*/mod_pagespeed/bad*" \
|
||||
| grep -v "\\[error\\].*doesnotexist.css.*" \
|
||||
| grep -v "\\[error\\].*is forbidden.*" \
|
||||
| grep -v "\\[error\\].*access forbidden by rule.*" \
|
||||
| grep -v "\\[error\\].*forbidden.example.com*" \
|
||||
| grep -v "\\[error\\].*custom-paths.example.com*" \
|
||||
| grep -v "\\[error\\].*bogus_format*" \
|
||||
| grep -v "\\[error\\].*src/install/foo*" \
|
||||
| grep -v "\\[error\\].*recv() failed*" \
|
||||
| grep -v "\\[error\\].*send() failed*" \
|
||||
| grep -v "\\[error\\].*Invalid url requested: js_defer.js.*" \
|
||||
| grep -v "\\[error\\].*/mod_pagespeed_example/styles/yellow.css+blue.css.pagespeed.cc..css.*" \
|
||||
| grep -v "\\[error\\].*/mod_pagespeed_example/images/Puzzle.jpg.pagespeed.ce..jpg.*" \
|
||||
| grep -v "\\[error\\].*/pagespeed_custom_static/js_defer.js.*" \
|
||||
| grep -v "\\[error\\].*UH8L-zY4b4AAAAAAAAAA.*" \
|
||||
| grep -v "\\[error\\].*UH8L-zY4b4.*" \
|
||||
| grep -v "\\[error\\].*Serf status 111(Connection refused) polling.*" \
|
||||
| grep -v "\\[error\\].*Failed to make directory*" \
|
||||
| grep -v "\\[error\\].*Could not create directories*" \
|
||||
| grep -v "\\[error\\].*opening temp file: No such file or directory.*" \
|
||||
|| true)
|
||||
|
||||
check [ -z "$OUT" ]
|
||||
|
||||
check_failures_and_exit
|
||||
|
||||
Reference in New Issue
Block a user