From: Jeff Garzik <jeff@garzik.org>
To: hail-devel@vger.kernel.org
Subject: [PATCH 1/3] CLD: convert back to libevent
Date: Fri, 31 Dec 2010 05:56:34 -0500 [thread overview]
Message-ID: <20101231105634.GA4210@havoc.gtf.org> (raw)
Switch CLD from hand-rolled server poll code, to libevent. Follows
similar techniques and rationale as chunkd commit
c1aed7464f237e5a6309351bf003162c77d69e27. This reverts ancient commit
90b3b5edcf5aa00577f4395fdbb490ed7e9be824.
Signed-off-by: Jeff Garzik <jgarzik@redhat.com>
---
cld/Makefile.am | 3 -
cld/cld.h | 22 +++----
cld/server.c | 161 ++++++++++++++++++++------------------------------------
cld/session.c | 69 ++++++++++++++++--------
4 files changed, 118 insertions(+), 137 deletions(-)
diff --git a/cld/Makefile.am b/cld/Makefile.am
index 9a13ce0..30eea0b 100644
--- a/cld/Makefile.am
+++ b/cld/Makefile.am
@@ -12,7 +12,8 @@ cld_SOURCES = cldb.h cld.h \
cldb.c msg.c server.c session.c util.c
cld_LDADD = \
../lib/libhail.la @GLIB_LIBS@ @CRYPTO_LIBS@ \
- @SSL_LIBS@ @DB4_LIBS@ @XML_LIBS@ @LIBCURL@
+ @SSL_LIBS@ @DB4_LIBS@ @XML_LIBS@ @LIBCURL@ \
+ @EVENT_LIBS@
cldbadm_SOURCES = cldb.h cldbadm.c
cldbadm_LDADD = @CRYPTO_LIBS@ @GLIB_LIBS@ @DB4_LIBS@
diff --git a/cld/cld.h b/cld/cld.h
index 4c0099f..17f14b8 100644
--- a/cld/cld.h
+++ b/cld/cld.h
@@ -22,8 +22,9 @@
#include <netinet/in.h>
#include <sys/time.h>
-#include <poll.h>
+#include <event.h>
#include <glib.h>
+#include <elist.h>
#include "cldb.h"
#include <cld_msg_rpc.h>
#include <cld_common.h>
@@ -59,13 +60,13 @@ struct session {
uint64_t last_contact;
uint64_t next_fh;
- struct cld_timer timer;
+ struct event timer;
uint64_t next_seqid_in;
uint64_t next_seqid_out;
GList *out_q; /* outgoing pkts (to client) */
- struct cld_timer retry_timer;
+ struct event retry_timer;
char user[CLD_MAX_USERNAME];
@@ -85,10 +86,10 @@ struct server_stats {
unsigned long garbage; /* num. garbage pkts dropped */
};
-struct server_poll {
+struct server_socket {
int fd;
- bool (*cb)(int fd, short events, void *userdata);
- void *userdata;
+ struct event ev;
+ struct list_head sockets_node;
};
struct server {
@@ -103,14 +104,13 @@ struct server {
struct cldb cldb; /* database info */
- GArray *polls;
- GArray *poll_data;
+ struct event_base *evbase_main;
- GHashTable *sessions;
+ struct list_head sockets;
- struct cld_timer_list timers;
+ GHashTable *sessions;
- struct cld_timer chkpt_timer; /* db4 checkpoint timer */
+ struct event chkpt_timer; /* db4 checkpoint timer */
struct server_stats stats; /* global statistics */
};
diff --git a/cld/server.c b/cld/server.c
index 7a57785..aed501b 100644
--- a/cld/server.c
+++ b/cld/server.c
@@ -559,7 +559,7 @@ static void simple_sendresp(int sock_fd, const struct client *cli,
info->op);
}
-static bool udp_srv_event(int fd, short events, void *userdata)
+static void udp_srv_event(int fd, short events, void *userdata)
{
struct client cli;
char host[64];
@@ -586,7 +586,7 @@ static bool udp_srv_event(int fd, short events, void *userdata)
rrc = recvmsg(fd, &hdr, 0);
if (rrc < 0) {
syslogerr("UDP recvmsg");
- return true; /* continue main loop; do NOT terminate server */
+ return;
}
cli.addr_len = hdr.msg_namelen;
@@ -601,59 +601,60 @@ static bool udp_srv_event(int fd, short events, void *userdata)
if (!parse_pkt_header(raw_pkt, rrc, &pkt, &hdr_len)) {
cld_srv.stats.garbage++;
- return true;
+ return;
}
if (!get_pkt_info(&pkt, raw_pkt, rrc, hdr_len, &info)) {
xdr_free((xdrproc_t)xdr_cld_pkt_hdr, (char *)&pkt);
cld_srv.stats.garbage++;
- return true;
+ return;
}
if (packet_is_dupe(&info)) {
/* silently drop dupes */
xdr_free((xdrproc_t)xdr_cld_pkt_hdr, (char *)&pkt);
- return true;
+ return;
}
err = validate_pkt_session(&info, &cli);
if (err) {
simple_sendresp(fd, &cli, &info, err);
xdr_free((xdrproc_t)xdr_cld_pkt_hdr, (char *)&pkt);
- return true;
+ return;
}
err = pkt_chk_sig(raw_pkt, rrc, &pkt);
if (err) {
simple_sendresp(fd, &cli, &info, err);
xdr_free((xdrproc_t)xdr_cld_pkt_hdr, (char *)&pkt);
- return true;
+ return;
}
if (!(cld_srv.cldb.is_master && cld_srv.cldb.up)) {
simple_sendmsg(fd, &cli, pkt.sid, pkt.user, 0xdeadbeef,
(xdrproc_t)xdr_void, NULL, CMO_NOT_MASTER);
xdr_free((xdrproc_t)xdr_cld_pkt_hdr, (char *)&pkt);
- return true;
+ return;
}
err = udp_rx(fd, &cli, &info, raw_pkt, rrc);
if (err) {
simple_sendresp(fd, &cli, &info, err);
xdr_free((xdrproc_t)xdr_cld_pkt_hdr, (char *)&pkt);
- return true;
+ return;
}
xdr_free((xdrproc_t)xdr_cld_pkt_hdr, (char *)&pkt);
- return true;
}
static void add_chkpt_timer(void)
{
- cld_timer_add(&cld_srv.timers, &cld_srv.chkpt_timer,
- time(NULL) + CLD_CHKPT_SEC);
+ struct timeval tv = { .tv_sec = CLD_CHKPT_SEC };
+
+ if (evtimer_add(&cld_srv.chkpt_timer, &tv) < 0)
+ HAIL_WARN(&srv_log, "chkpt timer add failed");
}
-static void cldb_checkpoint(struct cld_timer *timer)
+static void cldb_checkpoint(int fd, short events, void *userdata)
{
DB_ENV *dbenv = cld_srv.cldb.env;
int rc;
@@ -690,28 +691,28 @@ static int net_write_port(const char *port_file, const char *port_str)
static void net_close(void)
{
- struct pollfd *pfd;
- int i;
-
- if (!cld_srv.polls)
- return;
-
- for (i = 0; i < cld_srv.polls->len; i++) {
- pfd = &g_array_index(cld_srv.polls, struct pollfd, i);
- if (pfd->fd >= 0) {
- if (close(pfd->fd) < 0)
- HAIL_WARN(&srv_log, "%s(%d): %s",
- __func__, pfd->fd, strerror(errno));
- pfd->fd = -1;
+ struct server_socket *tmp, *iter;
+
+ list_for_each_entry_safe(tmp, iter, &cld_srv.sockets, sockets_node) {
+ if (tmp->fd >= 0) {
+ if (event_del(&tmp->ev) < 0)
+ HAIL_WARN(&srv_log, "Event delete(%d) failed",
+ tmp->fd);
+ if (close(tmp->fd) < 0)
+ HAIL_WARN(&srv_log, "Close(%d) failed: %s",
+ tmp->fd, strerror(errno));
+ tmp->fd = -1;
}
+
+ list_del(&tmp->sockets_node);
+ free(tmp);
}
}
static int net_open_socket(int addr_fam, int sock_type, int sock_prot,
int addr_len, void *addr_ptr)
{
- struct server_poll sp;
- struct pollfd pfd;
+ struct server_socket *sock;
int fd, rc;
fd = socket(addr_fam, sock_type, sock_prot);
@@ -732,15 +733,25 @@ static int net_open_socket(int addr_fam, int sock_type, int sock_prot,
return -errno;
}
- sp.fd = fd;
- sp.cb = udp_srv_event;
- sp.userdata = NULL;
- g_array_append_val(cld_srv.poll_data, sp);
+ sock = calloc(1, sizeof(*sock));
+ if (!sock) {
+ close(fd);
+ return -ENOMEM;
+ }
+
+ sock->fd = fd;
+ INIT_LIST_HEAD(&sock->sockets_node);
+
+ event_set(&sock->ev, fd, EV_READ | EV_PERSIST,
+ udp_srv_event, sock);
+
+ if (event_add(&sock->ev, NULL) < 0) {
+ free(sock);
+ close(fd);
+ return -EIO;
+ }
- pfd.fd = fd;
- pfd.events = POLLIN;
- pfd.revents = 0;
- g_array_append_val(cld_srv.polls, pfd);
+ list_add_tail(&sock->sockets_node, &cld_srv.sockets);
return fd;
}
@@ -891,11 +902,13 @@ static void segv_signal(int signo)
static void term_signal(int signo)
{
server_running = false;
+ event_loopbreak();
}
static void stats_signal(int signo)
{
dump_stats = true;
+ event_loopbreak();
}
#define X(stat) \
@@ -975,73 +988,16 @@ static error_t parse_opt (int key, char *arg, struct argp_state *state)
static int main_loop(void)
{
- time_t next_timeout;
-
- next_timeout = cld_timers_run(&cld_srv.timers);
-
while (server_running) {
- struct pollfd *pfd;
- int i, fired, rc;
-
cld_srv.stats.poll++;
-
- /* poll for fd activity, or next timer event */
- rc = poll(&g_array_index(cld_srv.polls, struct pollfd, 0),
- cld_srv.polls->len,
- next_timeout ? (next_timeout * 1000) : -1);
- if (rc < 0) {
- syslogerr("poll");
- if (errno != EINTR)
- break;
- }
+ event_dispatch();
gettimeofday(¤t_time, NULL);
- /* determine which fd's fired; call their callbacks */
- fired = 0;
- for (i = 0; i < cld_srv.polls->len; i++) {
- struct server_poll *sp;
- bool runrunrun;
- short revents;
-
- /* ref pollfd struct */
- pfd = &g_array_index(cld_srv.polls, struct pollfd, i);
-
- /* if no events fired, move on to next */
- if (!pfd->revents)
- continue;
-
- fired++;
-
- revents = pfd->revents;
- pfd->revents = 0;
-
- /* ref 1:1 matching server_poll struct */
- sp = &g_array_index(cld_srv.poll_data,
- struct server_poll, i);
-
- cld_srv.stats.event++;
-
- /* call callback, shutting down server if requested */
- runrunrun = sp->cb(sp->fd, revents, sp->userdata);
- if (!runrunrun) {
- server_running = false;
- break;
- }
-
- /* if we reached poll(2) activity count, it is
- * pointless to continue looping
- */
- if (fired == rc)
- break;
- }
-
if (dump_stats) {
dump_stats = false;
stats_dump();
}
-
- next_timeout = cld_timers_run(&cld_srv.timers);
}
return 0;
@@ -1052,6 +1008,8 @@ int main (int argc, char *argv[])
error_t aprc;
int rc = 1;
+ INIT_LIST_HEAD(&cld_srv.sockets);
+
/* isspace() and strcasecmp() consistency requires this */
setlocale(LC_ALL, "C");
@@ -1075,6 +1033,8 @@ int main (int argc, char *argv[])
if (use_syslog)
openlog(PROGRAM_NAME, LOG_PID, LOG_LOCAL3);
+ cld_srv.evbase_main = event_init();
+
if (!(cld_srv.flags & SFL_FOREGROUND) && (daemon(1, !use_syslog) < 0)) {
syslogerr("daemon");
goto err_out;
@@ -1103,17 +1063,13 @@ int main (int argc, char *argv[])
ensure_root();
- cld_timer_init(&cld_srv.chkpt_timer, "db4-checkpoint",
- cldb_checkpoint, NULL);
+ evtimer_set(&cld_srv.chkpt_timer, cldb_checkpoint, NULL);
add_chkpt_timer();
rc = 1;
cld_srv.sessions = g_hash_table_new(sess_hash, sess_equal);
- cld_srv.poll_data = g_array_sized_new(FALSE, FALSE,
- sizeof(struct server_poll), 4);
- cld_srv.polls = g_array_sized_new(FALSE,FALSE,sizeof(struct pollfd), 4);
- if (!cld_srv.sessions || !cld_srv.poll_data || !cld_srv.polls)
+ if (!cld_srv.sessions)
goto err_out_pid;
if (sess_load(cld_srv.sessions) != 0)
@@ -1137,7 +1093,8 @@ int main (int argc, char *argv[])
HAIL_INFO(&srv_log, "shutting down");
if (strict_free)
- cld_timer_del(&cld_srv.timers, &cld_srv.chkpt_timer);
+ if (evtimer_del(&cld_srv.chkpt_timer) < 0)
+ HAIL_WARN(&srv_log, "chkpt timer del failed");
if (cld_srv.cldb.up)
cldb_down(&cld_srv.cldb);
@@ -1149,8 +1106,6 @@ err_out_pid:
err_out:
if (strict_free) {
net_close();
- g_array_free(cld_srv.polls, TRUE);
- g_array_free(cld_srv.poll_data, TRUE);
sessions_free();
g_hash_table_unref(cld_srv.sessions);
}
diff --git a/cld/session.c b/cld/session.c
index d76186b..9887aaa 100644
--- a/cld/session.c
+++ b/cld/session.c
@@ -43,8 +43,8 @@ struct session_outpkt {
void *done_data;
};
-static void session_retry(struct cld_timer *);
-static void session_timeout(struct cld_timer *);
+static void session_retry(int, short, void *);
+static void session_timeout(int, short, void *);
static int sess_load_db(GHashTable *ss, DB_TXN *txn);
static void op_unref(struct session_outpkt *op);
@@ -87,8 +87,8 @@ static struct session *session_new(void)
cld_rand64(&sess->next_seqid_out);
- cld_timer_init(&sess->timer, "session-timeout", session_timeout, sess);
- cld_timer_init(&sess->retry_timer, "session-retry", session_retry, sess);
+ evtimer_set(&sess->timer, session_timeout, sess);
+ evtimer_set(&sess->retry_timer, session_retry, sess);
return sess;
}
@@ -103,8 +103,10 @@ static void session_free(struct session *sess, bool hash_remove)
if (hash_remove)
g_hash_table_remove(cld_srv.sessions, sess->sid);
- cld_timer_del(&cld_srv.timers, &sess->timer);
- cld_timer_del(&cld_srv.timers, &sess->retry_timer);
+ if (evtimer_del(&sess->timer) < 0)
+ HAIL_ERR(&srv_log, "sess timer del failed");
+ if (evtimer_del(&sess->retry_timer) < 0)
+ HAIL_ERR(&srv_log, "sess retry timer del failed");
tmp = sess->out_q;
while (tmp) {
@@ -376,9 +378,9 @@ static void session_ping_done(struct session_outpkt *outpkt)
outpkt->sess->ping_open = false;
}
-static void session_timeout(struct cld_timer *timer)
+static void session_timeout(int fd, short events, void *userdata)
{
- struct session *sess = timer->userdata;
+ struct session *sess = userdata;
uint64_t sess_expire;
int rc;
DB_ENV *dbenv = cld_srv.cldb.env;
@@ -387,6 +389,8 @@ static void session_timeout(struct cld_timer *timer)
sess_expire = sess->last_contact + CLD_SESS_TIMEOUT;
if (!sess->dead && (sess_expire > now)) {
+ struct timeval tv;
+
if (!sess->ping_open &&
(sess_expire > (sess->last_contact + (CLD_SESS_TIMEOUT / 2) &&
(sess->sock_fd > 0)))) {
@@ -396,9 +400,12 @@ static void session_timeout(struct cld_timer *timer)
session_ping_done, NULL);
}
- cld_timer_add(&cld_srv.timers, &sess->timer,
- now + ((sess_expire - now) / 2) + 1);
- return; /* timer added; do not time out session */
+ tv.tv_sec = ((sess_expire - now) / 2) + 1;
+ tv.tv_usec = 0;
+ if (evtimer_add(&sess->timer, &tv) < 0)
+ HAIL_ERR(&srv_log, "timer add failed, sid " SIDFMT,
+ SIDARG(sess->sid));
+ return; /* timer added; do not time out session */
}
HAIL_INFO(&srv_log, "session %s, addr %s sid " SIDFMT,
@@ -554,25 +561,33 @@ static int sess_retry_output(struct session *sess, time_t *next_retry_out)
return rc;
}
-static void session_retry(struct cld_timer *timer)
+static void session_retry(int fd, short events, void *userdata)
{
- struct session *sess = timer->userdata;
+ struct session *sess = userdata;
time_t next_retry;
+ time_t now = time(NULL);
+ struct timeval tv;
if (!sess->out_q)
return;
sess_retry_output(sess, &next_retry);
- cld_timer_add(&cld_srv.timers, &sess->retry_timer, next_retry);
+ tv.tv_sec = next_retry - now;
+ tv.tv_usec = 0;
+
+ if (evtimer_add(&sess->retry_timer, &tv) < 0)
+ HAIL_ERR(&srv_log, "retry timer re-add failed");
}
static void session_outq(struct session *sess, GList *new_pkts)
{
/* if out_q empty, start retry timer */
- if (!sess->out_q)
- cld_timer_add(&cld_srv.timers, &sess->retry_timer,
- time(NULL) + CLD_RETRY_START);
+ if (!sess->out_q) {
+ struct timeval tv = { .tv_sec = CLD_RETRY_START };
+ if (evtimer_add(&sess->retry_timer, &tv) < 0)
+ HAIL_ERR(&srv_log, "retry timer start failed");
+ }
sess->out_q = g_list_concat(sess->out_q, new_pkts);
}
@@ -766,7 +781,8 @@ void msg_ack(struct session *sess, uint64_t seqid)
}
if (!sess->out_q)
- cld_timer_del(&cld_srv.timers, &sess->retry_timer);
+ if (evtimer_del(&sess->retry_timer) < 0)
+ HAIL_ERR(&srv_log, "sess retry timer del 2 failed");
}
void msg_new_sess(int sock_fd, const struct client *cli,
@@ -780,6 +796,7 @@ void msg_new_sess(int sock_fd, const struct client *cli,
int rc;
enum cle_err_codes resp_rc = CLE_OK;
struct cld_msg_generic_resp resp;
+ struct timeval tv;
sess = session_new();
if (!sess) {
@@ -832,8 +849,10 @@ void msg_new_sess(int sock_fd, const struct client *cli,
g_hash_table_insert(cld_srv.sessions, sess->sid, sess);
/* begin session timer */
- cld_timer_add(&cld_srv.timers, &sess->timer,
- time(NULL) + (CLD_SESS_TIMEOUT / 2));
+ tv.tv_sec = CLD_SESS_TIMEOUT / 2;
+ tv.tv_usec = 0;
+ if (evtimer_add(&sess->timer, &tv) < 0)
+ HAIL_ERR(&srv_log, "sess timer start failed");
/* send new-sess reply */
resp.code = CLE_OK;
@@ -933,6 +952,8 @@ static int sess_load_db(GHashTable *ss, DB_TXN *txn)
val.flags = DB_DBT_USERMEM;
while (1) {
+ struct timeval tv;
+
rc = cur->get(cur, &key, &val, DB_NEXT);
if (rc == DB_NOTFOUND)
break;
@@ -960,8 +981,12 @@ static int sess_load_db(GHashTable *ss, DB_TXN *txn)
g_hash_table_insert(ss, sess->sid, sess);
/* begin session timer */
- cld_timer_add(&cld_srv.timers, &sess->timer,
- time(NULL) + (CLD_SESS_TIMEOUT / 2));
+ tv.tv_sec = CLD_SESS_TIMEOUT / 2;
+ tv.tv_usec = 0;
+ if (evtimer_add(&sess->timer, &tv) < 0) {
+ HAIL_ERR(&srv_log, "sess timer loop start failed");
+ break;
+ }
}
cur->close(cur);
next reply other threads:[~2010-12-31 10:56 UTC|newest]
Thread overview: 6+ messages / expand[flat|nested] mbox.gz Atom feed top
2010-12-31 10:56 Jeff Garzik [this message]
2010-12-31 10:57 ` [PATCH 2/3] CLD: switch network proto from UDP to TCP Jeff Garzik
2010-12-31 10:58 ` [PATCH 3/3] CLD: enable replication on server and client Jeff Garzik
2011-01-02 23:32 ` [PATCH 2/3] CLD: switch network proto from UDP to TCP Pete Zaitcev
2011-01-03 10:43 ` Jim Meyering
2011-01-03 18:00 ` Jeff Garzik
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=20101231105634.GA4210@havoc.gtf.org \
--to=jeff@garzik.org \
--cc=hail-devel@vger.kernel.org \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for read-only IMAP folder(s) and NNTP newsgroup(s).