Compare commits
3 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 01d5e79f8a | |||
| 2e6767e6d2 | |||
| b2bb619524 |
+183
-11
@@ -54,8 +54,156 @@ extern "C" {
|
||||
#include "net/instaweb/util/public/thread_system.h"
|
||||
#include "net/instaweb/util/public/timer.h"
|
||||
#include "net/instaweb/util/public/writer.h"
|
||||
#include "net/instaweb/util/public/pthread_mutex.h"
|
||||
|
||||
namespace net_instaweb {
|
||||
class NgxConnection : public PoolElement<NgxConnection> {
|
||||
public:
|
||||
NgxConnection();
|
||||
~NgxConnection();
|
||||
void SetKeepAlive(bool k = true) { keepalive_ = k; }
|
||||
bool KeepAlive() { return keepalive_; }
|
||||
void SetSock(u_char *sockaddr, socklen_t socklen) {
|
||||
socklen_ = socklen;
|
||||
ngx_memcpy(&sockaddr_, sockaddr, socklen);
|
||||
}
|
||||
|
||||
static NgxConnection* Connect(ngx_peer_connection_t *pc);
|
||||
void Close();
|
||||
static void NgxConnectionDumyHandler(ngx_event_t *ev) {};
|
||||
static void NgxConnectionCloseHandler(ngx_event_t *ev);
|
||||
|
||||
typedef Pool<NgxConnection> NgxConnectionPool;
|
||||
|
||||
static NgxConnectionPool connection_pool;
|
||||
static PthreadMutex connection_pool_mutex;
|
||||
ngx_connection_t *c_;
|
||||
|
||||
private:
|
||||
int64 timeout_;
|
||||
int max_requests_;
|
||||
bool keepalive_;
|
||||
socklen_t socklen_;
|
||||
u_char sockaddr_[NGX_SOCKADDRLEN];
|
||||
};
|
||||
|
||||
NgxConnection::NgxConnectionPool NgxConnection::connection_pool;
|
||||
PthreadMutex NgxConnection::connection_pool_mutex;
|
||||
|
||||
NgxConnection::NgxConnection() {
|
||||
c_ = NULL;
|
||||
keepalive_ = false;
|
||||
|
||||
// default keepalive 60s, max process 100 requests
|
||||
timeout_ = 60000;
|
||||
max_requests_ = 100;
|
||||
}
|
||||
|
||||
NgxConnection::~NgxConnection() {
|
||||
//
|
||||
}
|
||||
|
||||
NgxConnection* NgxConnection::Connect(ngx_peer_connection_t *pc) {
|
||||
NgxConnection *nc;
|
||||
|
||||
NgxConnection::connection_pool_mutex.Lock();
|
||||
for (Pool<NgxConnection>::iterator p = connection_pool.begin();
|
||||
p != connection_pool.end(); p++) {
|
||||
nc = *p;
|
||||
if (ngx_memn2cmp(static_cast<u_char*>(nc->sockaddr_),
|
||||
reinterpret_cast<u_char*>(pc->sockaddr),
|
||||
nc->socklen_, pc->socklen) == 0) {
|
||||
nc->c_->idle = 0;
|
||||
nc->c_->log = pc->log;
|
||||
nc->c_->read->log = pc->log;
|
||||
nc->c_->write->log = pc->log;
|
||||
nc->c_->pool->log = pc->log;
|
||||
|
||||
if (nc->c_->read->timer_set) {
|
||||
ngx_del_timer(nc->c_->read);
|
||||
}
|
||||
|
||||
NgxConnection::connection_pool_mutex.Unlock();
|
||||
return nc;
|
||||
}
|
||||
}
|
||||
connection_pool_mutex.Unlock();
|
||||
|
||||
int rc = ngx_event_connect_peer(pc);
|
||||
if (rc == NGX_ERROR || rc == NGX_DECLINED || rc == NGX_BUSY) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
nc = new NgxConnection();
|
||||
nc->SetSock(reinterpret_cast<u_char*>(pc->sockaddr), pc->socklen);
|
||||
nc->c_ = pc->connection;
|
||||
|
||||
return nc;
|
||||
}
|
||||
|
||||
void NgxConnection::Close() {
|
||||
max_requests_--;
|
||||
if (!keepalive_ || max_requests_ <= 0) {
|
||||
ngx_close_connection(c_);
|
||||
delete this;
|
||||
return;
|
||||
}
|
||||
|
||||
if (c_->read->timer_set) {
|
||||
ngx_del_timer(c_->read);
|
||||
}
|
||||
|
||||
if (c_->write->timer_set) {
|
||||
ngx_del_timer(c_->write);
|
||||
}
|
||||
|
||||
ngx_add_timer(c_->read, static_cast<ngx_msec_t>(timeout_));
|
||||
|
||||
c_->data = this;
|
||||
c_->read->handler = NgxConnectionCloseHandler;
|
||||
c_->write->handler = NgxConnectionDumyHandler;
|
||||
c_->idle = 1;
|
||||
|
||||
// this connection should not be associated with current fetch
|
||||
c_->log = ngx_cycle->log;
|
||||
c_->read->log = ngx_cycle->log;
|
||||
c_->write->log = ngx_cycle->log;
|
||||
c_->pool->log = ngx_cycle->log;
|
||||
|
||||
connection_pool_mutex.Lock();
|
||||
connection_pool.Add(this);
|
||||
connection_pool_mutex.Unlock();
|
||||
}
|
||||
|
||||
void NgxConnection::NgxConnectionCloseHandler(ngx_event_t *ev) {
|
||||
ngx_connection_t *c = static_cast<ngx_connection_t*>(ev->data);
|
||||
NgxConnection *nc = static_cast<NgxConnection*>(c->data);
|
||||
|
||||
if (c->read->timedout) {
|
||||
nc->SetKeepAlive(false);
|
||||
nc->Close();
|
||||
return;
|
||||
}
|
||||
|
||||
char buf[1];
|
||||
int n;
|
||||
|
||||
// not a timedout event, we should check connection
|
||||
n = recv(c->fd, buf, 1, MSG_PEEK);
|
||||
if (n == -1 && ngx_socket_errno == NGX_EAGAIN) {
|
||||
if (ngx_handle_read_event(c->read, 0) != NGX_OK) {
|
||||
nc->SetKeepAlive(false);
|
||||
nc->Close();
|
||||
return;
|
||||
}
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
nc->SetKeepAlive(false);
|
||||
nc->Close();
|
||||
}
|
||||
|
||||
NgxFetch::NgxFetch(const GoogleString& url,
|
||||
AsyncFetch* async_fetch,
|
||||
MessageHandler* message_handler,
|
||||
@@ -83,7 +231,8 @@ namespace net_instaweb {
|
||||
ngx_del_timer(timeout_event_);
|
||||
}
|
||||
if (connection_ != NULL) {
|
||||
ngx_close_connection(connection_);
|
||||
connection_->Close();
|
||||
connection_ = NULL;
|
||||
}
|
||||
if (pool_ != NULL) {
|
||||
ngx_destroy_pool(pool_);
|
||||
@@ -216,8 +365,31 @@ namespace net_instaweb {
|
||||
ngx_del_timer(timeout_event_);
|
||||
timeout_event_ = NULL;
|
||||
}
|
||||
|
||||
if (success) {
|
||||
ConstStringStarVector v;
|
||||
if (async_fetch_->response_headers()->Lookup(
|
||||
StringPiece(HttpAttributes::kConnection), &v)) {
|
||||
bool keepalive = false;
|
||||
for (int i = 0; i < v.size(); i++) {
|
||||
if (*v[i] == "keep-alive") {
|
||||
keepalive = true;
|
||||
break;
|
||||
|
||||
} else if (*v[i] == "close") {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
// - enable keepalive if we find "keep-alive" header
|
||||
// - disable keepalive, if it's with "Connection:close"
|
||||
// - disable keepalive, if it's without "keep-alive" header
|
||||
connection_->SetKeepAlive(keepalive);
|
||||
}
|
||||
}
|
||||
|
||||
if (connection_) {
|
||||
ngx_close_connection(connection_);
|
||||
connection_->Close();
|
||||
connection_ = NULL;
|
||||
}
|
||||
|
||||
@@ -408,7 +580,7 @@ namespace net_instaweb {
|
||||
return rc;
|
||||
}
|
||||
|
||||
NgxFetchWrite(connection_->write);
|
||||
NgxFetchWrite(connection_->c_->write);
|
||||
return NGX_OK;
|
||||
}
|
||||
|
||||
@@ -425,18 +597,18 @@ namespace net_instaweb {
|
||||
pc.log = fetcher_->log_;
|
||||
pc.rcvbuf = -1;
|
||||
|
||||
int rc = ngx_event_connect_peer(&pc);
|
||||
if (rc == NGX_ERROR || rc == NGX_DECLINED || rc == NGX_BUSY) {
|
||||
return rc;
|
||||
|
||||
connection_ = NgxConnection::Connect(&pc);
|
||||
if (connection_ == NULL) {
|
||||
return NGX_ERROR;
|
||||
}
|
||||
|
||||
connection_ = pc.connection;
|
||||
connection_->write->handler = NgxFetchWrite;
|
||||
connection_->read->handler = NgxFetchRead;
|
||||
connection_->data = this;
|
||||
connection_->c_->write->handler = NgxFetchWrite;
|
||||
connection_->c_->read->handler = NgxFetchRead;
|
||||
connection_->c_->data = this;
|
||||
|
||||
// Timer set in Init() is still in effect.
|
||||
return rc;
|
||||
return NGX_OK;
|
||||
}
|
||||
|
||||
// When the fetch sends the request completely, it will hook the read event,
|
||||
|
||||
+2
-1
@@ -42,6 +42,7 @@ namespace net_instaweb {
|
||||
typedef bool (*response_handler_pt)(ngx_connection_t* c);
|
||||
|
||||
class NgxUrlAsyncFetcher;
|
||||
class NgxConnection;
|
||||
class NgxFetch : public PoolElement<NgxFetch> {
|
||||
public:
|
||||
NgxFetch(const GoogleString& url,
|
||||
@@ -136,7 +137,7 @@ class NgxFetch : public PoolElement<NgxFetch> {
|
||||
ngx_http_request_t* r_;
|
||||
ngx_http_status_t* status_;
|
||||
ngx_event_t* timeout_event_;
|
||||
ngx_connection_t* connection_;
|
||||
NgxConnection* connection_;
|
||||
ngx_resolver_ctx_t* resolver_ctx_;
|
||||
|
||||
DISALLOW_COPY_AND_ASSIGN(NgxFetch);
|
||||
|
||||
Reference in New Issue
Block a user