Compare commits

...

3 Commits

Author SHA1 Message Date
dinic 01d5e79f8a 1. bugfix, connection_pool_mutex may not got unlock
2. disable keepalive if we don't find keep-alive header
2014-05-04 11:36:51 +08:00
dinic 2e6767e6d2 set connection keepalive when response header with "Connect: keep-alive" 2014-04-30 18:23:16 +08:00
dinic b2bb619524 native fetch support "keepalive" 2014-04-30 17:24:19 +08:00
2 changed files with 185 additions and 12 deletions
+183 -11
View File
@@ -54,8 +54,156 @@ extern "C" {
#include "net/instaweb/util/public/thread_system.h"
#include "net/instaweb/util/public/timer.h"
#include "net/instaweb/util/public/writer.h"
#include "net/instaweb/util/public/pthread_mutex.h"
namespace net_instaweb {
class NgxConnection : public PoolElement<NgxConnection> {
public:
NgxConnection();
~NgxConnection();
void SetKeepAlive(bool k = true) { keepalive_ = k; }
bool KeepAlive() { return keepalive_; }
void SetSock(u_char *sockaddr, socklen_t socklen) {
socklen_ = socklen;
ngx_memcpy(&sockaddr_, sockaddr, socklen);
}
static NgxConnection* Connect(ngx_peer_connection_t *pc);
void Close();
static void NgxConnectionDumyHandler(ngx_event_t *ev) {};
static void NgxConnectionCloseHandler(ngx_event_t *ev);
typedef Pool<NgxConnection> NgxConnectionPool;
static NgxConnectionPool connection_pool;
static PthreadMutex connection_pool_mutex;
ngx_connection_t *c_;
private:
int64 timeout_;
int max_requests_;
bool keepalive_;
socklen_t socklen_;
u_char sockaddr_[NGX_SOCKADDRLEN];
};
NgxConnection::NgxConnectionPool NgxConnection::connection_pool;
PthreadMutex NgxConnection::connection_pool_mutex;
NgxConnection::NgxConnection() {
c_ = NULL;
keepalive_ = false;
// default keepalive 60s, max process 100 requests
timeout_ = 60000;
max_requests_ = 100;
}
NgxConnection::~NgxConnection() {
//
}
NgxConnection* NgxConnection::Connect(ngx_peer_connection_t *pc) {
NgxConnection *nc;
NgxConnection::connection_pool_mutex.Lock();
for (Pool<NgxConnection>::iterator p = connection_pool.begin();
p != connection_pool.end(); p++) {
nc = *p;
if (ngx_memn2cmp(static_cast<u_char*>(nc->sockaddr_),
reinterpret_cast<u_char*>(pc->sockaddr),
nc->socklen_, pc->socklen) == 0) {
nc->c_->idle = 0;
nc->c_->log = pc->log;
nc->c_->read->log = pc->log;
nc->c_->write->log = pc->log;
nc->c_->pool->log = pc->log;
if (nc->c_->read->timer_set) {
ngx_del_timer(nc->c_->read);
}
NgxConnection::connection_pool_mutex.Unlock();
return nc;
}
}
connection_pool_mutex.Unlock();
int rc = ngx_event_connect_peer(pc);
if (rc == NGX_ERROR || rc == NGX_DECLINED || rc == NGX_BUSY) {
return NULL;
}
nc = new NgxConnection();
nc->SetSock(reinterpret_cast<u_char*>(pc->sockaddr), pc->socklen);
nc->c_ = pc->connection;
return nc;
}
void NgxConnection::Close() {
max_requests_--;
if (!keepalive_ || max_requests_ <= 0) {
ngx_close_connection(c_);
delete this;
return;
}
if (c_->read->timer_set) {
ngx_del_timer(c_->read);
}
if (c_->write->timer_set) {
ngx_del_timer(c_->write);
}
ngx_add_timer(c_->read, static_cast<ngx_msec_t>(timeout_));
c_->data = this;
c_->read->handler = NgxConnectionCloseHandler;
c_->write->handler = NgxConnectionDumyHandler;
c_->idle = 1;
// this connection should not be associated with current fetch
c_->log = ngx_cycle->log;
c_->read->log = ngx_cycle->log;
c_->write->log = ngx_cycle->log;
c_->pool->log = ngx_cycle->log;
connection_pool_mutex.Lock();
connection_pool.Add(this);
connection_pool_mutex.Unlock();
}
void NgxConnection::NgxConnectionCloseHandler(ngx_event_t *ev) {
ngx_connection_t *c = static_cast<ngx_connection_t*>(ev->data);
NgxConnection *nc = static_cast<NgxConnection*>(c->data);
if (c->read->timedout) {
nc->SetKeepAlive(false);
nc->Close();
return;
}
char buf[1];
int n;
// not a timedout event, we should check connection
n = recv(c->fd, buf, 1, MSG_PEEK);
if (n == -1 && ngx_socket_errno == NGX_EAGAIN) {
if (ngx_handle_read_event(c->read, 0) != NGX_OK) {
nc->SetKeepAlive(false);
nc->Close();
return;
}
return;
}
nc->SetKeepAlive(false);
nc->Close();
}
NgxFetch::NgxFetch(const GoogleString& url,
AsyncFetch* async_fetch,
MessageHandler* message_handler,
@@ -83,7 +231,8 @@ namespace net_instaweb {
ngx_del_timer(timeout_event_);
}
if (connection_ != NULL) {
ngx_close_connection(connection_);
connection_->Close();
connection_ = NULL;
}
if (pool_ != NULL) {
ngx_destroy_pool(pool_);
@@ -216,8 +365,31 @@ namespace net_instaweb {
ngx_del_timer(timeout_event_);
timeout_event_ = NULL;
}
if (success) {
ConstStringStarVector v;
if (async_fetch_->response_headers()->Lookup(
StringPiece(HttpAttributes::kConnection), &v)) {
bool keepalive = false;
for (int i = 0; i < v.size(); i++) {
if (*v[i] == "keep-alive") {
keepalive = true;
break;
} else if (*v[i] == "close") {
break;
}
}
// - enable keepalive if we find "keep-alive" header
// - disable keepalive, if it's with "Connection:close"
// - disable keepalive, if it's without "keep-alive" header
connection_->SetKeepAlive(keepalive);
}
}
if (connection_) {
ngx_close_connection(connection_);
connection_->Close();
connection_ = NULL;
}
@@ -408,7 +580,7 @@ namespace net_instaweb {
return rc;
}
NgxFetchWrite(connection_->write);
NgxFetchWrite(connection_->c_->write);
return NGX_OK;
}
@@ -425,18 +597,18 @@ namespace net_instaweb {
pc.log = fetcher_->log_;
pc.rcvbuf = -1;
int rc = ngx_event_connect_peer(&pc);
if (rc == NGX_ERROR || rc == NGX_DECLINED || rc == NGX_BUSY) {
return rc;
connection_ = NgxConnection::Connect(&pc);
if (connection_ == NULL) {
return NGX_ERROR;
}
connection_ = pc.connection;
connection_->write->handler = NgxFetchWrite;
connection_->read->handler = NgxFetchRead;
connection_->data = this;
connection_->c_->write->handler = NgxFetchWrite;
connection_->c_->read->handler = NgxFetchRead;
connection_->c_->data = this;
// Timer set in Init() is still in effect.
return rc;
return NGX_OK;
}
// When the fetch sends the request completely, it will hook the read event,
+2 -1
View File
@@ -42,6 +42,7 @@ namespace net_instaweb {
typedef bool (*response_handler_pt)(ngx_connection_t* c);
class NgxUrlAsyncFetcher;
class NgxConnection;
class NgxFetch : public PoolElement<NgxFetch> {
public:
NgxFetch(const GoogleString& url,
@@ -136,7 +137,7 @@ class NgxFetch : public PoolElement<NgxFetch> {
ngx_http_request_t* r_;
ngx_http_status_t* status_;
ngx_event_t* timeout_event_;
ngx_connection_t* connection_;
NgxConnection* connection_;
ngx_resolver_ctx_t* resolver_ctx_;
DISALLOW_COPY_AND_ASSIGN(NgxFetch);