cosocket: initial prototype for the builtin connection pool support via the new cosocket method "sock:setkeepalive()". also always reset u->length and u->rest for the "sock:receive()" method.

This commit is contained in:
agentzh (章亦春) 2012-01-27 23:51:00 +08:00
Родитель e60a948145
Коммит 5b2a977b37
7 изменённых файлов: 554 добавлений и 42 удалений

2
.gitignore поставляемый
Просмотреть файл

@ -131,3 +131,5 @@ buildroot/
src/headerfilterby.[ch]
*.patch
analyze
tsock
a.c

Просмотреть файл

@ -253,6 +253,9 @@ extern ngx_http_output_body_filter_pt ngx_http_lua_next_body_filter;
/* regex cache table key in Lua vm registry */
#define NGX_LUA_REGEX_CACHE "ngx_lua_regex_cache"
/* socket connection pool table key in Lua vm registry */
#define NGX_LUA_SOCKET_POOL "ngx_lua_socket_pool"
/* globals symbol to hold nginx request pointer */
#define GLOBALS_SYMBOL_REQUEST "ngx._req"

Просмотреть файл

@ -13,6 +13,8 @@
#define NGX_HTTP_LUA_SOCKET_FT_RESOLVER 0x0008
#define NGX_HTTP_LUA_SOCKET_FT_BUFTOOSMALL 0x0010
#define NGX_HTTP_LUA_SOCKET_POOL_SIZE (ngx_uint_t) 10
static int ngx_http_lua_socket_tcp(lua_State *L);
static int ngx_http_lua_socket_tcp_connect(lua_State *L);
@ -68,6 +70,14 @@ static ngx_int_t ngx_http_lua_socket_compile_pattern(u_char *data, size_t len,
static int ngx_http_lua_socket_cleanup_compiled_pattern(lua_State *L);
static int ngx_http_lua_req_socket(lua_State *L);
static void ngx_http_lua_req_socket_rev_handler(ngx_http_request_t *r);
static int ngx_http_lua_socket_tcp_getreusedtimes(lua_State *L);
static int ngx_http_lua_socket_tcp_setkeepalive(lua_State *L);
static ngx_int_t ngx_http_lua_get_keepalive_peer(ngx_http_request_t *r,
lua_State *L, int obj_index, int key_index,
ngx_http_lua_socket_upstream_t *u);
static void ngx_http_lua_socket_keepalive_dummy_handler(ngx_event_t *ev);
static void ngx_http_lua_socket_keepalive_close_handler(ngx_event_t *ev);
static void ngx_http_lua_socket_keepalive_close(ngx_connection_t *c);
void
@ -141,6 +151,12 @@ ngx_http_lua_inject_socket_api(ngx_log_t *log, lua_State *L)
lua_pushcfunction(L, ngx_http_lua_socket_tcp_settimeout);
lua_setfield(L, -2, "settimeout"); /* ngx socket mt */
lua_pushcfunction(L, ngx_http_lua_socket_tcp_getreusedtimes);
lua_setfield(L, -2, "getreusedtimes");
lua_pushcfunction(L, ngx_http_lua_socket_tcp_setkeepalive);
lua_setfield(L, -2, "setkeepalive");
lua_pushvalue(L, -1);
lua_setfield(L, -2, "__index");
lua_setfield(L, -3, "_tcp_meta");
@ -203,6 +219,7 @@ ngx_http_lua_socket_tcp_connect(lua_State *L)
ngx_url_t url;
ngx_int_t rc;
ngx_http_lua_loc_conf_t *llcf;
ngx_peer_connection_t *pc;
int timeout;
ngx_http_lua_socket_upstream_t *u;
@ -219,19 +236,6 @@ ngx_http_lua_socket_tcp_connect(lua_State *L)
p = (u_char *) luaL_checklstring(L, 2, &len);
if (n == 3) {
port = luaL_checkinteger(L, 3);
if (port < 0 || port > 65536) {
lua_pushnil(L);
lua_pushfstring(L, "bad port number: %d", port);
return 2;
}
} else { /* n == 2 */
port = 0;
}
host.data = ngx_palloc(r->pool, len + 1);
if (host.data == NULL) {
return luaL_error(L, "out of memory");
@ -242,28 +246,29 @@ ngx_http_lua_socket_tcp_connect(lua_State *L)
ngx_memcpy(host.data, p, len);
host.data[len] = '\0';
ngx_memzero(&url, sizeof(ngx_url_t));
if (n == 3) {
port = luaL_checkinteger(L, 3);
url.url.len = host.len;
url.url.data = host.data;
url.default_port = port;
url.no_resolve = 1;
if (ngx_parse_url(r->pool, &url) != NGX_OK) {
lua_pushnil(L);
if (url.err) {
lua_pushfstring(L, "failed to parse host name \"%s\": %s",
host.data, url.err);
} else {
lua_pushfstring(L, "failed to parse host name \"%s\"", host.data);
if (port < 0 || port > 65536) {
lua_pushnil(L);
lua_pushfstring(L, "bad port number: %d", port);
return 2;
}
return 2;
lua_pushliteral(L, ":");
lua_insert(L, 3);
lua_concat(L, 3);
dd("socket key: %s", lua_tostring(L, -1));
} else { /* n == 2 */
port = 0;
}
r->connection->single_connection = 0;
/* the key's index is 2 */
lua_pushvalue(L, -1);
lua_setfield(L, 1, "_key");
lua_getfield(L, 1, "_ctx");
u = lua_touserdata(L, -1);
@ -295,6 +300,57 @@ ngx_http_lua_socket_tcp_connect(lua_State *L)
ngx_memzero(u, sizeof(ngx_http_lua_socket_upstream_t));
u->request = r; /* set the controlling request */
llcf = ngx_http_get_module_loc_conf(r, ngx_http_lua_module);
u->conf = llcf;
pc = &u->peer;
pc->log = r->connection->log;
pc->log_error = NGX_ERROR_ERR;
dd("lua peer connection log: %p", pc->log);
r->connection->single_connection = 0;
rc = ngx_http_lua_get_keepalive_peer(r, L, 1, 2, u);
if (rc == NGX_OK) {
lua_pushinteger(L, 1);
lua_pushnil(L);
return 2;
}
if (rc == NGX_ERROR) {
lua_pushinteger(L, 1);
lua_pushliteral(L, "error in get keepalive peer");
return 2;
}
/* rc == NGX_DECLINED */
ngx_memzero(&url, sizeof(ngx_url_t));
url.url.len = host.len;
url.url.data = host.data;
url.default_port = port;
url.no_resolve = 1;
if (ngx_parse_url(r->pool, &url) != NGX_OK) {
lua_pushnil(L);
if (url.err) {
lua_pushfstring(L, "failed to parse host name \"%s\": %s",
host.data, url.err);
} else {
lua_pushfstring(L, "failed to parse host name \"%s\"", host.data);
}
return 2;
}
lua_getfield(L, 1, "_tm");
timeout = lua_tointeger(L, -1);
lua_pop(L, 1);
@ -310,8 +366,6 @@ ngx_http_lua_socket_tcp_connect(lua_State *L)
ngx_log_debug1(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
"lua socket connect timeout: %M", u->timeout);
u->request = r; /* set the controlling request */
u->resolved = ngx_pcalloc(r->pool, sizeof(ngx_http_upstream_resolved_t));
if (u->resolved == NULL) {
return luaL_error(L, "out of memory");
@ -552,7 +606,6 @@ ngx_http_lua_socket_resolve_retval_handler(ngx_http_request_t *r,
ngx_http_lua_socket_upstream_t *u, lua_State *L)
{
ngx_http_lua_ctx_t *ctx;
ngx_http_lua_loc_conf_t *llcf;
ngx_peer_connection_t *pc;
ngx_connection_t *c;
ngx_http_cleanup_t *cln;
@ -566,15 +619,8 @@ ngx_http_lua_socket_resolve_retval_handler(ngx_http_request_t *r,
return 2;
}
llcf = ngx_http_get_module_loc_conf(r, ngx_http_lua_module);
u->conf = llcf;
pc = &u->peer;
pc->log = r->connection->log;
pc->log_error = NGX_ERROR_ERR;
ur = u->resolved;
if (ur->sockaddr) {
@ -864,6 +910,9 @@ ngx_http_lua_socket_tcp_receive(lua_State *L)
break;
}
u->length = 0;
u->rest = 0;
break;
case LUA_TNUMBER:
@ -875,6 +924,7 @@ ngx_http_lua_socket_tcp_receive(lua_State *L)
u->input_filter = ngx_http_lua_socket_read_chunk;
u->length = (size_t) bytes;
u->rest = u->length;
break;
default:
@ -884,6 +934,8 @@ ngx_http_lua_socket_tcp_receive(lua_State *L)
} else {
u->input_filter = ngx_http_lua_socket_read_line;
u->length = 0;
u->rest = 0;
}
u->input_filter_ctx = u;
@ -1616,6 +1668,8 @@ ngx_http_lua_socket_send(ngx_http_request_t *r,
ngx_log_debug0(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
"lua socket send data");
dd("lua connection log: %p", c->log);
rc = ngx_output_chain(&u->output, u->request_sent ? NULL : u->request_bufs);
dd("output chain returned: %d", (int) rc);
@ -1839,9 +1893,9 @@ ngx_http_lua_socket_finalize(ngx_http_request_t *r,
"lua close socket connection");
ngx_close_connection(u->peer.connection);
}
u->peer.connection = NULL;
u->peer.connection = NULL;
}
}
@ -2498,3 +2552,338 @@ ngx_http_lua_req_socket_rev_handler(ngx_http_request_t *r)
}
}
static int
ngx_http_lua_socket_tcp_getreusedtimes(lua_State *L)
{
ngx_http_lua_socket_upstream_t *u;
if (lua_gettop(L) != 1) {
return luaL_error(L, "expecting 1 argument "
"(including the object), but got %d", lua_gettop(L));
}
luaL_checktype(L, 1, LUA_TTABLE);
lua_getfield(L, 1, "_ctx");
u = lua_touserdata(L, -1);
if (u == NULL || u->peer.connection == NULL || u->ft_type || u->eof) {
lua_pushnil(L);
lua_pushliteral(L, "closed");
return 2;
}
lua_pushinteger(L, u->reused);
return 1;
}
static int ngx_http_lua_socket_tcp_setkeepalive(lua_State *L)
{
ngx_http_lua_socket_upstream_t *u;
ngx_connection_t *c;
ngx_http_lua_socket_pool_t *spool;
size_t size;
ngx_str_t key;
ngx_uint_t i;
ngx_queue_t *q;
ngx_peer_connection_t *pc;
ngx_http_lua_socket_pool_item_t *items, *item;
if (lua_gettop(L) != 1) {
return luaL_error(L, "expecting 1 argument "
"(including the object), but got %d", lua_gettop(L));
}
luaL_checktype(L, 1, LUA_TTABLE);
lua_getfield(L, LUA_REGISTRYINDEX, NGX_LUA_SOCKET_POOL);
lua_getfield(L, 1, "_key");
key.data = (u_char *) lua_tolstring(L, -1, &key.len);
if (key.data == NULL) {
lua_pushliteral(L, "key not found");
return 2;
}
lua_getfield(L, 1, "_ctx");
u = lua_touserdata(L, -1);
lua_pop(L, 1);
/* stack: obj cache key */
pc = &u->peer;
c = pc->connection;
if (u == NULL || c == NULL || u->ft_type || u->eof) {
lua_pushnil(L);
lua_pushliteral(L, "closed");
return 2;
}
if (c->read->eof
|| c->read->error
|| c->read->timedout
|| c->write->error
|| c->write->timedout)
{
lua_pushnil(L);
lua_pushliteral(L, "invalid connection");
return 2;
}
if (ngx_handle_read_event(c->read, 0) != NGX_OK) {
lua_pushnil(L);
lua_pushliteral(L, "failed to handle read event");
return 2;
}
ngx_log_debug1(NGX_LOG_DEBUG_HTTP, pc->log, 0,
"lua socket set keepalive: saving connection %p", c);
dd("saving connection to key %s", lua_tostring(L, -1));
lua_pushvalue(L, -1);
lua_rawget(L, 2);
spool = lua_touserdata(L, -1);
lua_pop(L, 1);
/* stack: obj cache key */
if (spool == NULL) {
/* create a new socket pool for the current peer key */
size = sizeof(ngx_http_lua_socket_pool_t) + key.len - 1
+ sizeof(ngx_http_lua_socket_pool_item_t)
* NGX_HTTP_LUA_SOCKET_POOL_SIZE;
spool = lua_newuserdata(L, size);
if (spool == NULL) {
return luaL_error(L, "out of memory");
}
ngx_log_debug1(NGX_LOG_DEBUG_HTTP, pc->log, 0,
"lua socket keepalive create connection pool for key "
"\"%s\"", lua_tostring(L, -2));
lua_rawset(L, 2);
ngx_queue_init(&spool->cache);
ngx_queue_init(&spool->free);
items = (void *) ngx_copy(spool->key, key.data, key.len);
for (i = 0; i < NGX_HTTP_LUA_SOCKET_POOL_SIZE; i++) {
ngx_queue_insert_head(&spool->free, &items[i].queue);
items[i].socket_pool = spool;
}
}
if (ngx_queue_empty(&spool->free)) {
q = ngx_queue_last(&spool->cache);
ngx_queue_remove(q);
item = ngx_queue_data(q, ngx_http_lua_socket_pool_item_t, queue);
ngx_http_lua_socket_keepalive_close(item->connection);
} else {
q = ngx_queue_head(&spool->free);
ngx_queue_remove(q);
item = ngx_queue_data(q, ngx_http_lua_socket_pool_item_t, queue);
}
item->connection = c;
ngx_queue_insert_head(&spool->cache, q);
pc->connection = NULL;
/* TODO we should set a read timer for max idel connections */
if (c->read->timer_set) {
ngx_del_timer(c->read);
}
if (c->write->timer_set) {
ngx_del_timer(c->write);
}
c->write->handler = ngx_http_lua_socket_keepalive_dummy_handler;
c->read->handler = ngx_http_lua_socket_keepalive_close_handler;
c->data = item;
c->idle = 1;
c->log = ngx_cycle->log;
c->read->log = ngx_cycle->log;
c->write->log = ngx_cycle->log;
item->socklen = pc->socklen;
ngx_memcpy(&item->sockaddr, pc->sockaddr, pc->socklen);
item->reused = u->reused;
if (c->read->ready) {
ngx_http_lua_socket_keepalive_close_handler(c->read);
}
lua_pushinteger(L, 1);
lua_pushnil(L);
return 2;
}
static ngx_int_t
ngx_http_lua_get_keepalive_peer(ngx_http_request_t *r, lua_State *L,
int obj_index, int key_index, ngx_http_lua_socket_upstream_t *u)
{
ngx_http_lua_socket_pool_item_t *item;
ngx_http_lua_socket_pool_t *spool;
ngx_http_cleanup_t *cln;
ngx_queue_t *q;
int top;
ngx_peer_connection_t *pc;
ngx_connection_t *c;
top = lua_gettop(L);
if (key_index < 0) {
key_index = top + key_index + 1;
}
if (obj_index < 0) {
obj_index = top + obj_index + 1;
}
ngx_log_debug0(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
"lua socket pool get keepalive peer");
pc = &u->peer;
lua_getfield(L, LUA_REGISTRYINDEX, NGX_LUA_SOCKET_POOL); /* table */
lua_pushvalue(L, key_index); /* key */
lua_rawget(L, -2);
spool = lua_touserdata(L, -1);
if (spool == NULL) {
ngx_log_debug0(NGX_LOG_DEBUG_HTTP, pc->log, 0,
"lua socket keepalive connection pool not found");
lua_settop(L, top);
return NGX_DECLINED;
}
if (!ngx_queue_empty(&spool->cache)) {
q = ngx_queue_head(&spool->cache);
item = ngx_queue_data(q, ngx_http_lua_socket_pool_item_t, queue);
c = item->connection;
ngx_queue_remove(q);
ngx_queue_insert_head(&spool->free, q);
ngx_log_debug1(NGX_LOG_DEBUG_HTTP, pc->log, 0,
"lua socket get keepalive peer: using connection %p", c);
c->idle = 0;
c->log = pc->log;
c->read->log = pc->log;
c->write->log = pc->log;
c->data = u;
pc->connection = c;
pc->cached = 1;
u->reused = item->reused + 1;
u->writer.out = NULL;
u->writer.last = &u->writer.out;
u->writer.connection = c;
u->writer.limit = 0;
u->request_sent = 0;
cln = ngx_http_cleanup_add(r, 0);
if (cln == NULL) {
u->ft_type |= NGX_HTTP_LUA_SOCKET_FT_ERROR;
lua_settop(L, top);
return NGX_ERROR;
}
cln->handler = ngx_http_lua_socket_cleanup;
cln->data = u;
u->cleanup = &cln->handler;
lua_settop(L, top);
return NGX_OK;
}
ngx_log_debug0(NGX_LOG_DEBUG_HTTP, pc->log, 0,
"lua socket keepalive: connection pool empty");
lua_settop(L, top);
return NGX_DECLINED;
}
static void
ngx_http_lua_socket_keepalive_dummy_handler(ngx_event_t *ev)
{
ngx_log_debug0(NGX_LOG_DEBUG_HTTP, ev->log, 0,
"keepalive dummy handler");
}
static void
ngx_http_lua_socket_keepalive_close_handler(ngx_event_t *ev)
{
ngx_http_lua_socket_pool_item_t *item;
ngx_http_lua_socket_pool_t *spool;
int n;
char buf[1];
ngx_connection_t *c;
ngx_log_debug0(NGX_LOG_DEBUG_HTTP, ev->log, 0,
"keepalive close handler");
c = ev->data;
if (c->close) {
goto close;
}
n = recv(c->fd, buf, 1, MSG_PEEK);
if (n == -1 && ngx_socket_errno == NGX_EAGAIN) {
/* stale event */
if (ngx_handle_read_event(c->read, 0) != NGX_OK) {
goto close;
}
return;
}
close:
item = c->data;
spool = item->socket_pool;
ngx_http_lua_socket_keepalive_close(c);
ngx_queue_remove(&item->queue);
ngx_queue_insert_head(&spool->free, &item->queue);
/* TODO free up spool userdata if the pool is completely empty */
}
static void
ngx_http_lua_socket_keepalive_close(ngx_connection_t *c)
{
ngx_close_connection(c);
}

Просмотреть файл

@ -51,6 +51,8 @@ struct ngx_http_lua_socket_upstream_s {
size_t request_len;
ngx_chain_t *request_bufs;
ngx_uint_t reused;
unsigned luabuf_inited:1;
unsigned request_sent:1;
@ -79,6 +81,30 @@ typedef struct {
} ngx_http_lua_socket_compiled_pattern_t;
typedef struct {
/* queues of ngx_http_lua_socket_pool_item_t: */
ngx_queue_t cache;
ngx_queue_t free;
u_char key[1];
} ngx_http_lua_socket_pool_t;
typedef struct {
ngx_http_lua_socket_pool_t *socket_pool;
ngx_queue_t queue;
ngx_connection_t *connection;
socklen_t socklen;
struct sockaddr_storage sockaddr;
ngx_uint_t reused;
} ngx_http_lua_socket_pool_item_t;
void ngx_http_lua_inject_socket_api(ngx_log_t *log, lua_State *L);
void ngx_http_lua_inject_req_socket_api(lua_State *L);

Просмотреть файл

@ -498,6 +498,10 @@ init_ngx_lua_registry(ngx_conf_t *cf, lua_State *L)
lua_newtable(L);
lua_setfield(L, LUA_REGISTRYINDEX, NGX_LUA_REQ_CTX_REF);
/* create registry entry for the Lua socket connection pool table */
lua_newtable(L);
lua_setfield(L, LUA_REGISTRYINDEX, NGX_LUA_SOCKET_POOL);
/* create registry entry for the Lua request ctx data table */
lua_newtable(L);
lua_setfield(L, LUA_REGISTRYINDEX, NGX_LUA_REGEX_CACHE);

Просмотреть файл

@ -192,6 +192,7 @@ failed to send request: closed
--- config
server_tokens off;
resolver $TEST_NGINX_RESOLVER;
resolver_timeout 1s;
location /t {
content_by_lua '
local sock = ngx.socket.tcp()
@ -278,6 +279,7 @@ close: nil closed
lua_socket_connect_timeout 100ms;
lua_socket_send_timeout 100ms;
lua_socket_read_timeout 100ms;
resolver_timeout 1s;
location /test {
content_by_lua '
local sock = ngx.socket.tcp()
@ -341,6 +343,7 @@ connected: 1
--- config
server_tokens off;
resolver $TEST_NGINX_RESOLVER;
resolver_timeout 1s;
location /t {
content_by_lua '
local sock = ngx.socket.tcp()

85
t/068-socket-keepalive.t Normal file
Просмотреть файл

@ -0,0 +1,85 @@
# vim:set ft= ts=4 sw=4 et fdm=marker:
use lib 'lib';
use Test::Nginx::Socket;
repeat_each(2);
plan tests => repeat_each() * (blocks() * 4);
our $HtmlDir = html_dir;
$ENV{TEST_NGINX_CLIENT_PORT} ||= server_port();
$ENV{TEST_NGINX_MEMCACHED_PORT} ||= 11211;
no_long_string();
#no_diff();
#log_level 'warn';
run_tests();
__DATA__
=== TEST 1: sanity
--- http_config eval
"lua_package_path '$::HtmlDir/?.lua;./?.lua';"
--- config
location /t {
set $port $TEST_NGINX_MEMCACHED_PORT;
content_by_lua '
local test = require "test"
local port = ngx.var.port
test.go(port)
test.go(port)
';
}
--- request
GET /t
--- user_files
>>> test.lua
module("test", package.seeall)
function go(port)
local sock = ngx.socket.tcp()
local ok, err = sock:connect("127.0.0.1", port)
if not ok then
ngx.say("failed to connect: ", err)
return
end
ngx.say("connected: ", ok, ", reused: ", sock:getreusedtimes())
local req = "flush_all\r\n"
local bytes, err = sock:send(req)
if not bytes then
ngx.say("failed to send request: ", err)
return
end
ngx.say("request sent: ", bytes)
local line, err, part = sock:receive()
if line then
ngx.say("received: ", line)
else
ngx.say("failed to receive a line: ", err, " [", part, "]")
end
local ok, err = sock:setkeepalive()
if not ok then
ngx.say("failed to set reusable: ", err)
end
end
--- response_body_like
^connected: 1, reused: \d+
request sent: 11
received: OK
connected: 1, reused: [1-9]\d*
request sent: 11
received: OK
--- no_error_log
[error]
--- error_log
lua socket get keepalive peer: using connection