From: Jeff Garzik <jeff@garzik.org>
To: hail-devel@vger.kernel.org
Subject: [tabled patch] abstract out TCP-write code
Date: Wed, 22 Sep 2010 20:09:08 -0400 [thread overview]
Message-ID: <20100923000908.GA15908@havoc.gtf.org> (raw)
This is step #1. Other steps in the process:
2) update server/atcp.c and itd/*.c in tandem, until they have matching
TCP-write code. should be straightforward, as both are based on the
same codebase (tabled).
3) move atcp to libhail
4) remove TCP-write code from tabled, itd
5) update atcp to support SSL, sendfile
6) update chunkd to support atcp (req. step #5)
All this code bears the same lineage, so it shouldn't be too difficult.
Also note, this is a first draft with embedded libevent dependencies. I
agree w/ zaitcev that the goal should be to eliminate these. atcp is
wonderfully generic at present; not even a GLib dependency, IIRC.
server/Makefile.am | 1
server/atcp.c | 243 +++++++++++++++++++++++++++++++++++++++++++++++++++++
server/atcp.h | 90 +++++++++++++++++++
server/bucket.c | 8 -
server/object.c | 56 ++++++------
server/server.c | 237 +++++----------------------------------------------
server/tabled.h | 37 +-------
7 files changed, 400 insertions(+), 272 deletions(-)
diff --git a/server/Makefile.am b/server/Makefile.am
index 5b53a0a..5e0abd5 100644
--- a/server/Makefile.am
+++ b/server/Makefile.am
@@ -4,6 +4,7 @@ INCLUDES = -I$(top_srcdir)/include @GLIB_CFLAGS@ @HAIL_CFLAGS@
sbin_PROGRAMS = tabled tdbadm
tabled_SOURCES = tabled.h \
+ atcp.c atcp.h \
bucket.c cldu.c config.c metarep.c object.c replica.c \
server.c status.c storage.c storparse.c util.c
tabled_LDADD = ../lib/libtdb.a \
diff --git a/server/atcp.c b/server/atcp.c
new file mode 100644
index 0000000..dac5b91
--- /dev/null
+++ b/server/atcp.c
@@ -0,0 +1,243 @@
+
+/*
+ * Copyright 2010 Red Hat, Inc.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; see the file COPYING. If not, write to
+ * the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
+ *
+ */
+
+#define _GNU_SOURCE
+#include "tabled-config.h"
+
+#include <string.h>
+#include <stdlib.h>
+#include <errno.h>
+#include <sys/uio.h>
+#include "atcp.h"
+
+bool atcp_cb_free(void *cb_data1, void *cb_data2, bool done)
+{
+ /* in typical usage, cb_data1 is the owner of cb_data2,
+ and has a longer lifetime. Therefore, by convention,
+ cb_data2 is the buffer to be released.
+ */
+ free(cb_data2);
+ return false;
+}
+
+static void atcp_write_complete(struct atcp_write *tmp)
+{
+ struct atcp_wr_state *wst = tmp->wst;
+
+ list_del(&tmp->node);
+ list_add_tail(&tmp->node, &wst->write_compl_q);
+}
+
+static bool atcp_write_free(struct atcp_write *tmp, bool done)
+{
+ struct atcp_wr_state *wst = tmp->wst;
+ bool rcb = false;
+
+ wst->write_cnt -= tmp->length;
+ list_del_init(&tmp->node);
+ if (tmp->cb)
+ rcb = tmp->cb(tmp->cb_data1, tmp->cb_data2, done);
+ free(tmp);
+
+ return rcb;
+}
+
+bool atcp_write_run_compl(struct atcp_wr_state *wst)
+{
+ struct atcp_write *wr;
+ bool do_loop;
+
+ do_loop = false;
+ while (!list_empty(&wst->write_compl_q)) {
+ wr = list_entry(wst->write_compl_q.next,
+ struct atcp_write, node);
+ do_loop |= atcp_write_free(wr, true);
+ }
+ return do_loop;
+}
+
+void atcp_write_free_all(struct atcp_wr_state *wst)
+{
+ struct atcp_write *wr, *tmp;
+
+ atcp_write_run_compl(wst);
+ list_for_each_entry_safe(wr, tmp, &wst->write_q, node) {
+ atcp_write_free(wr, false);
+ }
+}
+
+size_t atcp_wqueued(struct atcp_wr_state *wst)
+{
+ return wst->write_cnt;
+}
+
+static bool atcp_writable(struct atcp_wr_state *wst)
+{
+ int n_iov;
+ struct atcp_write *tmp;
+ ssize_t rc;
+ struct iovec iov[ATCP_MAX_WR_IOV];
+
+ /* accumulate pending writes into iovec */
+ n_iov = 0;
+ list_for_each_entry(tmp, &wst->write_q, node) {
+ if (n_iov == ATCP_MAX_WR_IOV)
+ break;
+ /* bleh, struct iovec should declare iov_base const */
+ iov[n_iov].iov_base = (void *) tmp->buf;
+ iov[n_iov].iov_len = tmp->togo;
+ n_iov++;
+ }
+
+ /* execute non-blocking write */
+do_write:
+ rc = writev(wst->fd, iov, n_iov);
+ if (rc < 0) {
+ if (errno == EINTR)
+ goto do_write;
+ if (errno != EAGAIN)
+ goto err_out;
+ return true;
+ }
+
+ /* iterate through write queue, issuing completions based on
+ * amount of data written
+ */
+ while (rc > 0) {
+ int sz;
+
+ /* get pointer to first record on list */
+ tmp = list_entry(wst->write_q.next, struct atcp_write, node);
+
+ /* mark data consumed by decreasing tmp->len */
+ sz = (tmp->togo < rc) ? tmp->togo : rc;
+ tmp->togo -= sz;
+ tmp->buf += sz;
+ rc -= sz;
+
+ /* if tmp->len reaches zero, write is complete,
+ * so schedule it for clean up (cannot call callback
+ * right away or an endless recursion will result)
+ */
+ if (tmp->togo == 0)
+ atcp_write_complete(tmp);
+ }
+
+ /* if we emptied the queue, clear write notification */
+ if (list_empty(&wst->write_q)) {
+ wst->writing = false;
+ if (event_del(&wst->write_ev) < 0)
+ goto err_out;
+ }
+
+ return true;
+
+err_out:
+ atcp_write_free_all(wst);
+ return false;
+}
+
+static void atcp_wr_event(int fd, short events, void *userdata)
+{
+ struct atcp_wr_state *wst = userdata;
+
+ atcp_writable(wst);
+ atcp_write_run_compl(wst);
+}
+
+void atcp_wr_set_fd(struct atcp_wr_state *wst, int fd)
+{
+ wst->fd = fd;
+
+ event_set(&wst->write_ev, fd, EV_WRITE | EV_PERSIST,
+ atcp_wr_event, wst);
+}
+
+bool atcp_write_start(struct atcp_wr_state *wst)
+{
+ if (list_empty(&wst->write_q))
+ return true; /* loop, not poll */
+
+ /* if write-poll already active, nothing further to do */
+ if (wst->writing)
+ return false; /* poll wait */
+
+ /* attempt optimistic write, in hopes of avoiding poll,
+ * or at least refill the write buffers so as to not
+ * get -immediately- called again by the kernel
+ */
+ atcp_writable(wst);
+ if (list_empty(&wst->write_q)) {
+ wst->opt_write++;
+ return true; /* loop, not poll */
+ }
+
+ if (event_add(&wst->write_ev, NULL) < 0)
+ return true; /* loop, not poll */
+
+ wst->writing = true;
+
+ return false; /* poll wait */
+}
+
+int atcp_writeq(struct atcp_wr_state *wst, const void *buf, unsigned int buflen,
+ atcp_write_func cb, void *cb_data1, void *cb_data2)
+{
+ struct atcp_write *wr;
+
+ if (!buf || !buflen)
+ return -EINVAL;
+
+ wr = calloc(1, sizeof(struct atcp_write));
+ if (!wr)
+ return -ENOMEM;
+
+ wr->buf = buf;
+ wr->togo = buflen;
+ wr->length = buflen;
+ wr->cb = cb;
+ wr->cb_data1 = cb_data1;
+ wr->cb_data2 = cb_data2;
+ wr->wst = wst;
+ list_add_tail(&wr->node, &wst->write_q);
+ wst->write_cnt += buflen;
+ if (wst->write_cnt > wst->write_cnt_max)
+ wst->write_cnt_max = wst->write_cnt;
+
+ return 0;
+}
+
+void atcp_wr_exit(struct atcp_wr_state *wst)
+{
+ if (!wst)
+ return;
+
+ atcp_write_free_all(wst);
+}
+
+void atcp_wr_init(struct atcp_wr_state *wst)
+{
+ memset(wst, 0, sizeof(*wst));
+
+ INIT_LIST_HEAD(&wst->write_q);
+ INIT_LIST_HEAD(&wst->write_compl_q);
+
+ wst->fd = -1;
+}
+
diff --git a/server/atcp.h b/server/atcp.h
new file mode 100644
index 0000000..996a59e
--- /dev/null
+++ b/server/atcp.h
@@ -0,0 +1,90 @@
+#ifndef __ATCP_H__
+#define __ATCP_H__
+
+/*
+ * Copyright 2010 Red Hat, Inc.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; see the file COPYING. If not, write to
+ * the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
+ *
+ */
+
+#include <stdint.h>
+#include <stdbool.h>
+#include <event.h>
+#include <elist.h>
+
+enum {
+ ATCP_MAX_WR_IOV = 32, /* max iov per writev(2) */
+};
+
+struct atcp_wr_state {
+ int fd; /* our socket */
+
+ bool writing; /* actively trying to write? */
+
+ size_t write_cnt; /* water level */
+ size_t write_cnt_max;
+
+ struct list_head write_q; /* list of async writes */
+ struct list_head write_compl_q; /* list of done writes */
+
+ struct event write_ev;
+
+ /* various statistics */
+ uint64_t opt_write; /* optimistic writes */
+};
+
+typedef bool (*atcp_write_func)(void *, void *, bool);
+
+struct atcp_write {
+ const void *buf; /* write buffer pointer */
+ int togo; /* write buffer remainder */
+
+ int length; /* length for accounting */
+ atcp_write_func cb; /* callback */
+ void *cb_data1; /* first data passed to cb */
+ void *cb_data2; /* second data passed to cb */
+
+ struct atcp_wr_state *wst; /* our parent */
+
+ struct list_head node; /* write_[compl_]q list node */
+};
+
+/* setup and teardown atcp write state */
+extern void atcp_wr_exit(struct atcp_wr_state *wst);
+extern void atcp_wr_init(struct atcp_wr_state *wst);
+
+/* generic write callback, that call free(cb_data2) */
+extern bool atcp_cb_free(void *cb_data1, void *cb_data2, bool done);
+
+/* clear all write queues immediately, even if not complete */
+extern void atcp_write_free_all(struct atcp_wr_state *wst);
+
+/* complete all writes found on completion queue */
+extern bool atcp_write_run_compl(struct atcp_wr_state *wst);
+
+/* total number of octets queued at this moment */
+extern size_t atcp_wqueued(struct atcp_wr_state *wst);
+
+/* initialize internal fd, event setup */
+extern void atcp_wr_set_fd(struct atcp_wr_state *wst, int fd);
+
+/* add a buffer to the write queue */
+extern int atcp_writeq(struct atcp_wr_state *wst, const void *buf, unsigned int buflen,
+ atcp_write_func cb, void *cb_data1, void *cb_data2);
+
+/* begin pushing write queue to socket */
+extern bool atcp_write_start(struct atcp_wr_state *wst);
+
+#endif /* __ATCP_H__ */
diff --git a/server/bucket.c b/server/bucket.c
index eb03e03..982ed62 100644
--- a/server/bucket.c
+++ b/server/bucket.c
@@ -566,13 +566,13 @@ bool bucket_add(struct client *cli, const char *user, const char *bucket)
bucket) < 0)
return cli_err(cli, InternalError);
- rc = cli_writeq(cli, hdr, strlen(hdr), cli_cb_free, hdr);
+ rc = atcp_writeq(&cli->wst, hdr, strlen(hdr), atcp_cb_free, cli, hdr);
if (rc) {
free(hdr);
return true;
}
- return cli_write_start(cli);
+ return atcp_write_start(&cli->wst);
err_out:
rc = txn->abort(txn);
@@ -718,13 +718,13 @@ bool bucket_del(struct client *cli, const char *user, const char *bucket)
hutil_time2str(timestr, sizeof(timestr), time(NULL))) < 0)
return cli_err(cli, InternalError);
- rc = cli_writeq(cli, hdr, strlen(hdr), cli_cb_free, hdr);
+ rc = atcp_writeq(&cli->wst, hdr, strlen(hdr), atcp_cb_free, cli, hdr);
if (rc) {
free(hdr);
return true;
}
- return cli_write_start(cli);
+ return atcp_write_start(&cli->wst);
err_out:
rc = txn->abort(txn);
diff --git a/server/object.c b/server/object.c
index 3801e94..b053ed9 100644
--- a/server/object.c
+++ b/server/object.c
@@ -227,13 +227,13 @@ bool object_del(struct client *cli, const char *user,
hutil_time2str(timestr, sizeof(timestr), time(NULL))) < 0)
return cli_err(cli, InternalError);
- rc = cli_writeq(cli, hdr, strlen(hdr), cli_cb_free, hdr);
+ rc = atcp_writeq(&cli->wst, hdr, strlen(hdr), atcp_cb_free, cli, hdr);
if (rc) {
free(hdr);
return true;
}
- return cli_write_start(cli);
+ return atcp_write_start(&cli->wst);
err_out:
rc = txn->abort(txn);
@@ -525,13 +525,13 @@ static bool object_put_end(struct client *cli)
return cli_err(cli, InternalError);
}
- rc = cli_writeq(cli, hdr, strlen(hdr), cli_cb_free, hdr);
+ rc = atcp_writeq(&cli->wst, hdr, strlen(hdr), atcp_cb_free, cli, hdr);
if (rc) {
free(hdr);
return true;
}
- return cli_write_start(cli);
+ return atcp_write_start(&cli->wst);
err_out_rb:
rc = txn->abort(txn);
@@ -812,8 +812,8 @@ static bool object_put_body(struct client *cli, const char *user,
cli_out_end(cli);
return cli_err(cli, InternalError);
}
- cli_writeq(cli, cont, strlen(cont), cli_cb_free, cont);
- cli_write_start(cli);
+ atcp_writeq(&cli->wst, cont, strlen(cont), atcp_cb_free, cli, cont);
+ atcp_write_start(&cli->wst);
}
avail = MIN(cli_req_avail(cli), content_len);
@@ -940,13 +940,13 @@ static bool object_put_acls(struct client *cli, const char *user,
return cli_err(cli, InternalError);
}
- rc = cli_writeq(cli, hdr, strlen(hdr), cli_cb_free, hdr);
+ rc = atcp_writeq(&cli->wst, hdr, strlen(hdr), atcp_cb_free, cli, hdr);
if (rc) {
free(hdr);
return true;
}
- return cli_write_start(cli);
+ return atcp_write_start(&cli->wst);
err_out_rb:
rc = txn->abort(txn);
@@ -990,10 +990,10 @@ void cli_in_end(struct client *cli)
cli->in_len = 0;
}
-static bool object_get_more(struct client *cli, void *cb_data, bool done);
+static bool object_get_more(void *, void *, bool);
/*
- * Return true iff cli_writeq was called. This is compatible with the
+ * Return true iff atcp_writeq was called. This is compatible with the
* convention for cli continuation callbacks, so object_get_more can call us.
*/
static bool object_get_poke(struct client *cli)
@@ -1026,7 +1026,7 @@ static bool object_get_poke(struct client *cli)
if (bytes == 0) {
if (!cli->in_len) {
cli_in_end(cli);
- cli_write_start(cli);
+ atcp_write_start(&cli->wst);
}
free(buf);
return false;
@@ -1034,15 +1034,15 @@ static bool object_get_poke(struct client *cli)
cli->in_len -= bytes;
if (!cli->in_len) {
- if (cli_writeq(cli, buf, bytes, cli_cb_free, buf))
+ if (atcp_writeq(&cli->wst, buf, bytes, atcp_cb_free, cli, buf))
goto err_out;
cli_in_end(cli);
- cli_write_start(cli);
+ atcp_write_start(&cli->wst);
} else {
- if (cli_writeq(cli, buf, bytes, object_get_more, buf))
+ if (atcp_writeq(&cli->wst, buf, bytes, object_get_more, cli, buf))
goto err_out;
- if (cli_wqueued(cli) >= CLI_DATA_BUF_SZ)
- cli_write_start(cli);
+ if (atcp_wqueued(&cli->wst) >= CLI_DATA_BUF_SZ)
+ atcp_write_start(&cli->wst);
}
return true;
@@ -1053,11 +1053,12 @@ err_out:
}
/* callback from the client side: a queued write is being disposed */
-static bool object_get_more(struct client *cli, void *cb_data, bool done)
+static bool object_get_more(void *cb_data1, void *cb_data2, bool done)
{
+ struct client *cli = cb_data1;
/* free now-written buffer */
- free(cb_data);
+ free(cb_data2);
/* do not queue more, if !completion or fd was closed early */
if (!done) /* FIXME We used to test for input errors here. */
@@ -1071,8 +1072,10 @@ static bool object_get_more(struct client *cli, void *cb_data, bool done)
/* callback from the chunkd side: some data is available */
static void object_get_event(struct open_chunk *ochunk)
{
- object_get_poke(ochunk->cli);
- cli_write_run_compl();
+ struct client *cli = ochunk->cli;
+
+ object_get_poke(cli);
+ atcp_write_run_compl(&cli->wst);
}
static int object_node_count_up(struct db_obj_ent *obj)
@@ -1327,7 +1330,7 @@ static bool object_get_body(struct client *cli, const char *user,
if (!want_body) {
cli_in_end(cli);
- rc = cli_writeq(cli, hdr, strlen(hdr), cli_cb_free, hdr);
+ rc = atcp_writeq(&cli->wst, hdr, strlen(hdr), atcp_cb_free, cli, hdr);
if (rc) {
free(hdr);
return true;
@@ -1347,7 +1350,7 @@ static bool object_get_body(struct client *cli, const char *user,
if (!cli->in_len)
cli_in_end(cli);
- rc = cli_writeq(cli, hdr, strlen(hdr), cli_cb_free, hdr);
+ rc = atcp_writeq(&cli->wst, hdr, strlen(hdr), atcp_cb_free, cli, hdr);
if (rc) {
free(hdr);
goto err_out_in_end;
@@ -1365,21 +1368,22 @@ static bool object_get_body(struct client *cli, const char *user,
goto err_out_in_end;
memcpy(tmp, buf, bytes);
- rc = cli_writeq(cli, hdr, strlen(hdr), cli_cb_free, hdr);
+ rc = atcp_writeq(&cli->wst, hdr, strlen(hdr), atcp_cb_free, cli, hdr);
if (rc) {
free(hdr);
free(tmp);
return true;
}
- if (cli_writeq(cli, tmp, bytes,
- cli->in_len ? object_get_more : cli_cb_free, tmp))
+ if (atcp_writeq(&cli->wst, tmp, bytes,
+ cli->in_len ? object_get_more : atcp_cb_free,
+ cli, tmp))
goto err_out_in_end;
start_write:
free(obj);
g_string_free(extra_hdr, TRUE);
- return cli_write_start(cli);
+ return atcp_write_start(&cli->wst);
err_out_in_end:
cli_in_end(cli);
diff --git a/server/server.c b/server/server.c
index 7a9fb7a..f8c4540 100644
--- a/server/server.c
+++ b/server/server.c
@@ -56,8 +56,6 @@
const char *argp_program_version = PACKAGE_VERSION;
enum {
- CLI_MAX_WR_IOV = 32, /* max iov per writev(2) */
-
SFL_FOREGROUND = (1 << 0), /* run in foreground */
};
@@ -488,65 +486,25 @@ bool stat_status(struct client *cli, GList *content)
if (asprintf(&str,
"<p>Stats: "
- "poll %lu event %lu tcp_accept %lu opt_write %lu</p>\r\n"
- "<p>Debug: max_write_buf %lu</p>\r\n",
- tabled_srv.stats.poll,
- tabled_srv.stats.event,
- tabled_srv.stats.tcp_accept,
- tabled_srv.stats.opt_write,
- tabled_srv.stats.max_write_buf) < 0)
+ "poll %llu event %llu tcp_accept %llu opt_write %llu</p>\r\n"
+ "<p>Debug: max_write_buf %llu</p>\r\n",
+ (unsigned long long) tabled_srv.stats.poll,
+ (unsigned long long) tabled_srv.stats.event,
+ (unsigned long long) tabled_srv.stats.tcp_accept,
+ (unsigned long long) tabled_srv.stats.opt_write,
+ (unsigned long long) tabled_srv.stats.max_write_buf) < 0)
return false;
content = g_list_append(content, str);
return true;
}
-static void cli_write_complete(struct client *cli, struct client_write *tmp)
-{
- list_del(&tmp->node);
- list_add_tail(&tmp->node, &tabled_srv.write_compl_q);
-}
-
-static bool cli_write_free(struct client_write *tmp, bool done)
-{
- struct client *cli = tmp->cb_cli;
- bool rcb = false;
-
- cli->write_cnt -= tmp->length;
- list_del(&tmp->node);
- if (tmp->cb)
- rcb = tmp->cb(cli, tmp->cb_data, done);
- free(tmp);
-
- return rcb;
-}
-
-static void cli_write_free_all(struct client *cli)
-{
- struct client_write *wr, *tmp;
-
- cli_write_run_compl();
- list_for_each_entry_safe(wr, tmp, &cli->write_q, node) {
- cli_write_free(wr, false);
- }
-}
-
-bool cli_write_run_compl(void)
-{
- struct client_write *wr;
- bool do_loop;
-
- do_loop = false;
- while (!list_empty(&tabled_srv.write_compl_q)) {
- wr = list_entry(tabled_srv.write_compl_q.next,
- struct client_write, node);
- do_loop |= cli_write_free(wr, true);
- }
- return do_loop;
-}
-
static void cli_free(struct client *cli)
{
- cli_write_free_all(cli);
+ if (cli->wst.write_cnt_max > tabled_srv.stats.max_write_buf)
+ tabled_srv.stats.max_write_buf = cli->wst.write_cnt_max;
+ tabled_srv.stats.opt_write += cli->wst.opt_write;
+
+ atcp_wr_exit(&cli->wst);
cli_out_end(cli);
cli_in_end(cli);
@@ -561,9 +519,6 @@ static void cli_free(struct client *cli)
hreq_free(&cli->req);
- if (cli->write_cnt_max > tabled_srv.stats.max_write_buf)
- tabled_srv.stats.max_write_buf = cli->write_cnt_max;
-
if (debugging)
applog(LOG_INFO, "client %s ended", cli->addr_host);
@@ -575,7 +530,7 @@ static bool cli_evt_dispose(struct client *cli, unsigned int events)
/* if write queue is not empty, we should continue to get
* poll callbacks here until it is
*/
- if (list_empty(&cli->write_q))
+ if (list_empty(&cli->wst.write_q))
cli_free(cli);
return false;
@@ -608,134 +563,6 @@ static bool cli_evt_recycle(struct client *cli, unsigned int events)
return true;
}
-static void cli_writable(struct client *cli)
-{
- int n_iov;
- struct client_write *tmp;
- ssize_t rc;
- struct iovec iov[CLI_MAX_WR_IOV];
-
- /* accumulate pending writes into iovec */
- n_iov = 0;
- list_for_each_entry(tmp, &cli->write_q, node) {
- if (n_iov == CLI_MAX_WR_IOV)
- break;
- /* bleh, struct iovec should declare iov_base const */
- iov[n_iov].iov_base = (void *) tmp->buf;
- iov[n_iov].iov_len = tmp->togo;
- n_iov++;
- }
-
- /* execute non-blocking write */
-do_write:
- rc = writev(cli->fd, iov, n_iov);
- if (rc < 0) {
- if (errno == EINTR)
- goto do_write;
- if (errno != EAGAIN)
- goto err_out;
- return;
- }
-
- /* iterate through write queue, issuing completions based on
- * amount of data written
- */
- while (rc > 0) {
- int sz;
-
- /* get pointer to first record on list */
- tmp = list_entry(cli->write_q.next, struct client_write, node);
-
- /* mark data consumed by decreasing tmp->len */
- sz = (tmp->togo < rc) ? tmp->togo : rc;
- tmp->togo -= sz;
- tmp->buf += sz;
- rc -= sz;
-
- /* if tmp->len reaches zero, write is complete,
- * so schedule it for clean up (cannot call callback
- * right away or an endless recursion will result)
- */
- if (tmp->togo == 0)
- cli_write_complete(cli, tmp);
- }
-
- /* if we emptied the queue, clear write notification */
- if (list_empty(&cli->write_q)) {
- cli->writing = false;
- if (event_del(&cli->write_ev) < 0) {
- applog(LOG_WARNING, "cli_writable event_del");
- goto err_out;
- }
- }
-
- return;
-
-err_out:
- cli->state = evt_dispose;
- cli_write_free_all(cli);
-}
-
-bool cli_write_start(struct client *cli)
-{
- if (list_empty(&cli->write_q))
- return true; /* loop, not poll */
-
- /* if write-poll already active, nothing further to do */
- if (cli->writing)
- return false; /* poll wait */
-
- /* attempt optimistic write, in hopes of avoiding poll,
- * or at least refill the write buffers so as to not
- * get -immediately- called again by the kernel
- */
- cli_writable(cli);
- if (list_empty(&cli->write_q)) {
- tabled_srv.stats.opt_write++;
- return true; /* loop, not poll */
- }
-
- if (event_add(&cli->write_ev, NULL) < 0) {
- applog(LOG_WARNING, "cli_write event_add");
- return true; /* loop, not poll */
- }
-
- cli->writing = true;
-
- return false; /* poll wait */
-}
-
-int cli_writeq(struct client *cli, const void *buf, unsigned int buflen,
- cli_write_func cb, void *cb_data)
-{
- struct client_write *wr;
-
- if (!buf || !buflen)
- return -EINVAL;
-
- wr = calloc(1, sizeof(struct client_write));
- if (!wr)
- return -ENOMEM;
-
- wr->buf = buf;
- wr->togo = buflen;
- wr->length = buflen;
- wr->cb = cb;
- wr->cb_data = cb_data;
- wr->cb_cli = cli;
- list_add_tail(&wr->node, &cli->write_q);
- cli->write_cnt += buflen;
- if (cli->write_cnt > cli->write_cnt_max)
- cli->write_cnt_max = cli->write_cnt;
-
- return 0;
-}
-
-size_t cli_wqueued(struct client *cli)
-{
- return cli->write_cnt;
-}
-
/*
* Return:
* 0: progress was NOT made (EOF)
@@ -771,12 +598,6 @@ do_read:
return rc != 0;
}
-bool cli_cb_free(struct client *cli, void *cb_data, bool done)
-{
- free(cb_data);
- return false;
-}
-
static int cli_write_list(struct client *cli, GList *list)
{
int rc = 0;
@@ -784,8 +605,8 @@ static int cli_write_list(struct client *cli, GList *list)
tmp = list;
while (tmp) {
- rc = cli_writeq(cli, tmp->data, strlen(tmp->data),
- cli_cb_free, tmp->data);
+ rc = atcp_writeq(&cli->wst, tmp->data, strlen(tmp->data),
+ atcp_cb_free, cli, tmp->data);
if (rc)
goto out;
@@ -870,14 +691,14 @@ bool cli_err_write(struct client *cli, char *hdr, char *content)
cli->state = evt_dispose;
- rc = cli_writeq(cli, hdr, strlen(hdr), cli_cb_free, hdr);
+ rc = atcp_writeq(&cli->wst, hdr, strlen(hdr), atcp_cb_free, cli, hdr);
if (rc)
return true;
- rc = cli_writeq(cli, content, strlen(content), cli_cb_free, content);
+ rc = atcp_writeq(&cli->wst, content, strlen(content), atcp_cb_free, cli, content);
if (rc)
return true;
- return cli_write_start(cli);
+ return atcp_write_start(&cli->wst);
}
static bool cli_resp(struct client *cli, int http_status,
@@ -911,7 +732,7 @@ static bool cli_resp(struct client *cli, int http_status,
else
cli->state = evt_recycle;
- rc = cli_writeq(cli, hdr, strlen(hdr), cli_cb_free, hdr);
+ rc = atcp_writeq(&cli->wst, hdr, strlen(hdr), atcp_cb_free, cli, hdr);
if (rc) {
free(hdr);
cli->state = evt_dispose;
@@ -924,7 +745,7 @@ static bool cli_resp(struct client *cli, int http_status,
return true;
}
- rcb = cli_write_start(cli);
+ rcb = atcp_write_start(&cli->wst);
if (cli->state == evt_recycle)
return true;
@@ -1385,9 +1206,10 @@ static struct client *cli_alloc(bool is_status)
return NULL;
}
+ atcp_wr_init(&cli->wst);
+
cli->state = evt_read_req;
cli->evt_table = is_status? evt_funcs_status: evt_funcs_server;
- INIT_LIST_HEAD(&cli->write_q);
INIT_LIST_HEAD(&cli->out_ch);
cli->req_ptr = cli->req_buf;
memset(&cli->req, 0, sizeof(cli->req) - sizeof(cli->req.hdr));
@@ -1395,14 +1217,6 @@ static struct client *cli_alloc(bool is_status)
return cli;
}
-static void tcp_cli_wr_event(int fd, short events, void *userdata)
-{
- struct client *cli = userdata;
-
- cli_writable(cli);
- cli_write_run_compl();
-}
-
static void tcp_cli_event(int fd, short events, void *userdata)
{
struct client *cli = userdata;
@@ -1410,7 +1224,7 @@ static void tcp_cli_event(int fd, short events, void *userdata)
do {
loop = cli->evt_table[cli->state](cli, events);
- loop |= cli_write_run_compl();
+ loop |= atcp_write_run_compl(&cli->wst);
} while (loop);
}
@@ -1438,11 +1252,11 @@ static void tcp_srv_event(int fd, short events, void *userdata)
goto err_out;
}
+ atcp_wr_set_fd(&cli->wst, cli->fd);
+
tabled_srv.stats.tcp_accept++;
event_set(&cli->ev, cli->fd, EV_READ | EV_PERSIST, tcp_cli_event, cli);
- event_set(&cli->write_ev, cli->fd, EV_WRITE | EV_PERSIST,
- tcp_cli_wr_event, cli);
/* mark non-blocking, for upcoming poll use */
if (fsetflags("tcp client", cli->fd, O_NONBLOCK) < 0)
@@ -2202,7 +2016,6 @@ int main (int argc, char *argv[])
struct event_base *event_base_rep;
INIT_LIST_HEAD(&tabled_srv.all_stor);
- INIT_LIST_HEAD(&tabled_srv.write_compl_q);
tabled_srv.state_tdb = ST_TDB_INIT;
tabled_srv.rep_next_id = DBID_MIN;
diff --git a/server/tabled.h b/server/tabled.h
index d4d2048..be6c9b4 100644
--- a/server/tabled.h
+++ b/server/tabled.h
@@ -31,6 +31,7 @@
#include <elist.h>
#include <tdb.h>
#include <hail_log.h>
+#include "atcp.h"
#ifndef ARRAY_SIZE
#define ARRAY_SIZE(arr) (sizeof(arr) / sizeof((arr)[0]))
@@ -103,19 +104,6 @@ struct storage_node {
};
typedef bool (*cli_evt_func)(struct client *, unsigned int);
-typedef bool (*cli_write_func)(struct client *, void *, bool);
-
-struct client_write {
- const void *buf; /* write buffer pointer */
- int togo; /* write buffer remainder */
-
- int length; /* length for accounting */
- cli_write_func cb; /* callback */
- void *cb_data; /* data passed to cb */
- struct client *cb_cli; /* cli passed to cb */
-
- struct list_head node;
-};
/* an open chunkd client */
struct open_chunk {
@@ -163,13 +151,9 @@ struct client {
int fd; /* socket */
bool ev_active;
struct event ev;
- struct event write_ev;
- struct list_head write_q; /* list of async writes */
- size_t write_cnt; /* water level */
- bool writing;
+ struct atcp_wr_state wst;
/* some debugging stats */
- size_t write_cnt_max;
unsigned int req_used; /* amount of req_buf in use */
char *req_ptr; /* start of unexamined data */
@@ -216,12 +200,12 @@ enum st_net {
};
struct server_stats {
- unsigned long poll; /* number polls */
- unsigned long event; /* events dispatched */
- unsigned long tcp_accept; /* TCP accepted cxns */
- unsigned long opt_write; /* optimistic writes */
+ uint64_t poll; /* number polls */
+ uint64_t event; /* events dispatched */
+ uint64_t tcp_accept; /* TCP accepted cxns */
+ uint64_t opt_write; /* optimistic writes */
- unsigned long max_write_buf;
+ uint64_t max_write_buf;
};
#define DBID_NONE 0
@@ -249,7 +233,6 @@ struct server {
struct event_base *evbase_main;
int ev_pipe[2];
struct event pevt;
- struct list_head write_compl_q; /* list of done writes */
bool mc_delay;
struct event mc_timer;
@@ -398,12 +381,6 @@ extern bool cli_err(struct client *cli, enum errcode code);
extern bool cli_err_write(struct client *cli, char *hdr, char *content);
extern bool cli_resp_xml(struct client *cli, int http_status, GList *content);
extern bool cli_resp_html(struct client *cli, int http_status, GList *content);
-extern int cli_writeq(struct client *cli, const void *buf, unsigned int buflen,
- cli_write_func cb, void *cb_data);
-extern size_t cli_wqueued(struct client *cli);
-extern bool cli_cb_free(struct client *cli, void *cb_data, bool done);
-extern bool cli_write_start(struct client *cli);
-extern bool cli_write_run_compl(void);
extern int cli_req_avail(struct client *cli);
extern void applog(int prio, const char *fmt, ...);
extern void cld_update_cb(void);
next reply other threads:[~2010-09-23 0:09 UTC|newest]
Thread overview: 11+ messages / expand[flat|nested] mbox.gz Atom feed top
2010-09-23 0:09 Jeff Garzik [this message]
2010-09-23 0:28 ` [tabled patch] abstract out TCP-write code Pete Zaitcev
2010-09-23 1:26 ` Jeff Garzik
2010-09-23 2:37 ` Pete Zaitcev
2010-09-23 4:32 ` Jeff Garzik
2010-09-23 13:57 ` Pete Zaitcev
2010-09-23 15:28 ` Jim Meyering
2010-09-23 23:48 ` Jeff Garzik
2010-09-23 16:47 ` Jeff Garzik
2010-09-23 23:51 ` Jeff Garzik
2010-09-23 21:09 ` [tabled patch v2] " Jeff Garzik
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=20100923000908.GA15908@havoc.gtf.org \
--to=jeff@garzik.org \
--cc=hail-devel@vger.kernel.org \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for read-only IMAP folder(s) and NNTP newsgroup(s).