diff options
Diffstat (limited to 'db-4.8.30/examples_c/ex_rep/base')
-rw-r--r-- | db-4.8.30/examples_c/ex_rep/base/rep_base.c | 247 | ||||
-rw-r--r-- | db-4.8.30/examples_c/ex_rep/base/rep_base.h | 117 | ||||
-rw-r--r-- | db-4.8.30/examples_c/ex_rep/base/rep_msg.c | 467 | ||||
-rw-r--r-- | db-4.8.30/examples_c/ex_rep/base/rep_net.c | 749 |
4 files changed, 1580 insertions, 0 deletions
diff --git a/db-4.8.30/examples_c/ex_rep/base/rep_base.c b/db-4.8.30/examples_c/ex_rep/base/rep_base.c new file mode 100644 index 0000000..87273dc --- /dev/null +++ b/db-4.8.30/examples_c/ex_rep/base/rep_base.c @@ -0,0 +1,247 @@ +/*- + * See the file LICENSE for redistribution information. + * + * Copyright (c) 2001-2009 Oracle. All rights reserved. + * + * $Id$ + */ + +#include <sys/types.h> +#include <errno.h> +#include <signal.h> +#include <stdlib.h> +#include <string.h> + +#include <db.h> + +#include "rep_base.h" + +/* + * Process globals (we could put these in the machtab I suppose). + */ +int master_eid; +char *myaddr; +unsigned short myport; + +const char *progname = "ex_rep_base"; + +static void event_callback __P((DB_ENV *, u_int32_t, void *)); + +int +main(argc, argv) + int argc; + char *argv[]; +{ + DB_ENV *dbenv; + SETUP_DATA setup_info; + DBT local; + all_args aa; + connect_args ca; + supthr_args supa; + machtab_t *machtab; + thread_t all_thr, ckp_thr, conn_thr, lga_thr; + void *astatus, *cstatus; +#ifdef _WIN32 + WSADATA wsaData; +#else + struct sigaction sigact; +#endif + APP_DATA my_app_data; + int ret; + + memset(&setup_info, 0, sizeof(SETUP_DATA)); + setup_info.progname = progname; + master_eid = DB_EID_INVALID; + memset(&my_app_data, 0, sizeof(APP_DATA)); + dbenv = NULL; + machtab = NULL; + ret = 0; + + if ((ret = create_env(progname, &dbenv)) != 0) + goto err; + dbenv->app_private = &my_app_data; + (void)dbenv->set_event_notify(dbenv, event_callback); + + /* Parse command line and perform common replication setup. */ + if ((ret = common_rep_setup(dbenv, argc, argv, &setup_info)) != 0) + goto err; + + if (setup_info.role == MASTER) + master_eid = SELF_EID; + + myaddr = strdup(setup_info.self.host); + myport = setup_info.self.port; + +#ifdef _WIN32 + /* Initialize the Windows sockets DLL. */ + if ((ret = WSAStartup(MAKEWORD(2, 2), &wsaData)) != 0) { + fprintf(stderr, + "Unable to initialize Windows sockets: %d\n", ret); + goto err; + } +#else + /* + * Turn off SIGPIPE so that we don't kill processes when they + * happen to lose a connection at the wrong time. + */ + memset(&sigact, 0, sizeof(sigact)); + sigact.sa_handler = SIG_IGN; + if ((ret = sigaction(SIGPIPE, &sigact, NULL)) != 0) { + fprintf(stderr, + "Unable to turn off SIGPIPE: %s\n", strerror(ret)); + goto err; + } +#endif + + /* + * We are hardcoding priorities here that all clients have the + * same priority except for a designated master who gets a higher + * priority. + */ + if ((ret = + machtab_init(&machtab, setup_info.nsites)) != 0) + goto err; + my_app_data.comm_infrastructure = machtab; + + if ((ret = env_init(dbenv, setup_info.home)) != 0) + goto err; + + /* + * Now sets up comm infrastructure. There are two phases. First, + * we open our port for listening for incoming connections. Then + * we attempt to connect to every host we know about. + */ + + (void)dbenv->rep_set_transport(dbenv, SELF_EID, quote_send); + + ca.dbenv = dbenv; + ca.home = setup_info.home; + ca.progname = progname; + ca.machtab = machtab; + ca.port = setup_info.self.port; + if ((ret = thread_create(&conn_thr, NULL, connect_thread, &ca)) != 0) { + dbenv->errx(dbenv, "can't create connect thread"); + goto err; + } + + aa.dbenv = dbenv; + aa.progname = progname; + aa.home = setup_info.home; + aa.machtab = machtab; + aa.sites = setup_info.site_list; + aa.nsites = setup_info.remotesites; + if ((ret = thread_create(&all_thr, NULL, connect_all, &aa)) != 0) { + dbenv->errx(dbenv, "can't create connect-all thread"); + goto err; + } + + /* Start checkpoint and log archive threads. */ + supa.dbenv = dbenv; + supa.shared = &my_app_data.shared_data; + if ((ret = start_support_threads(dbenv, &supa, &ckp_thr, &lga_thr)) + != 0) + goto err; + + /* + * We have now got the entire communication infrastructure set up. + * It's time to declare ourselves to be a client or master. + */ + if (setup_info.role == MASTER) { + if ((ret = dbenv->rep_start(dbenv, NULL, DB_REP_MASTER)) != 0) { + dbenv->err(dbenv, ret, "dbenv->rep_start failed"); + goto err; + } + } else { + memset(&local, 0, sizeof(local)); + local.data = myaddr; + local.size = (u_int32_t)strlen(myaddr) + 1; + if ((ret = + dbenv->rep_start(dbenv, &local, DB_REP_CLIENT)) != 0) { + dbenv->err(dbenv, ret, "dbenv->rep_start failed"); + goto err; + } + /* Sleep to give ourselves time to find a master. */ + sleep(5); + } + + if ((ret = doloop(dbenv, &my_app_data.shared_data)) != 0) { + dbenv->err(dbenv, ret, "Main loop failed"); + goto err; + } + + /* Finish checkpoint and log archive threads. */ + if ((ret = finish_support_threads(&ckp_thr, &lga_thr)) != 0) + goto err; + + /* Wait on the connection threads. */ + if (thread_join(all_thr, &astatus) || thread_join(conn_thr, &cstatus)) { + ret = -1; + goto err; + } + if ((uintptr_t)astatus != EXIT_SUCCESS || + (uintptr_t)cstatus != EXIT_SUCCESS) { + ret = -1; + goto err; + } + + /* + * We have used the DB_TXN_NOSYNC environment flag for improved + * performance without the usual sacrifice of transactional durability, + * as discussed in the "Transactional guarantees" page of the Reference + * Guide: if one replication site crashes, we can expect the data to + * exist at another site. However, in case we shut down all sites + * gracefully, we push out the end of the log here so that the most + * recent transactions don't mysteriously disappear. + */ + if ((ret = dbenv->log_flush(dbenv, NULL)) != 0) + dbenv->err(dbenv, ret, "log_flush"); + +err: if (machtab != NULL) + free(machtab); + if (dbenv != NULL) + (void)dbenv->close(dbenv, 0); +#ifdef _WIN32 + /* Shut down the Windows sockets DLL. */ + (void)WSACleanup(); +#endif + return (ret); +} + +static void +event_callback(dbenv, which, info) + DB_ENV *dbenv; + u_int32_t which; + void *info; +{ + APP_DATA *app = dbenv->app_private; + SHARED_DATA *shared = &app->shared_data; + + switch (which) { + case DB_EVENT_REP_CLIENT: + shared->is_master = 0; + shared->in_client_sync = 1; + break; + + case DB_EVENT_REP_ELECTED: + app->elected = 1; + master_eid = SELF_EID; + break; + + case DB_EVENT_REP_MASTER: + shared->is_master = 1; + shared->in_client_sync = 0; + break; + + case DB_EVENT_REP_NEWMASTER: + master_eid = *(int*)info; + shared->in_client_sync = 1; + break; + + case DB_EVENT_REP_STARTUPDONE: + shared->in_client_sync = 0; + break; + + default: + dbenv->errx(dbenv, "ignoring event %d", which); + } +} diff --git a/db-4.8.30/examples_c/ex_rep/base/rep_base.h b/db-4.8.30/examples_c/ex_rep/base/rep_base.h new file mode 100644 index 0000000..4bfeb29 --- /dev/null +++ b/db-4.8.30/examples_c/ex_rep/base/rep_base.h @@ -0,0 +1,117 @@ +/*- + * See the file LICENSE for redistribution information. + * + * Copyright (c) 2001-2009 Oracle. All rights reserved. + * + * $Id$ + */ + +#ifndef _EX_REPQUOTE_H_ +#define _EX_REPQUOTE_H_ + +#include "../common/rep_common.h" + +#define SELF_EID 1 + +/* Globals */ +typedef struct { + SHARED_DATA shared_data; + int elected; + void *comm_infrastructure; +} APP_DATA; + +extern int master_eid; +extern char *myaddr; +extern unsigned short myport; + +struct __member; typedef struct __member member_t; +struct __machtab; typedef struct __machtab machtab_t; + +/* Arguments for the connect_all thread. */ +typedef struct { + DB_ENV *dbenv; + const char *progname; + const char *home; + machtab_t *machtab; + repsite_t *sites; + int nsites; +} all_args; + +/* Arguments for the connect_loop thread. */ +typedef struct { + DB_ENV *dbenv; + const char * home; + const char * progname; + machtab_t *machtab; + int port; +} connect_args; + +#define CACHESIZE (10 * 1024 * 1024) +#define DATABASE "quote.db" +#define MAX_THREADS 25 +#define SLEEPTIME 3 + +#ifndef COMPQUIET +#define COMPQUIET(x,y) x = (y) +#endif + +/* Portability macros for basic threading and networking */ +#ifdef _WIN32 + +typedef HANDLE mutex_t; +#define mutex_init(m, attr) \ + (((*(m) = CreateMutex(NULL, FALSE, NULL)) != NULL) ? 0 : -1) +#define mutex_lock(m) \ + ((WaitForSingleObject(*(m), INFINITE) == WAIT_OBJECT_0) ? 0 : -1) +#define mutex_unlock(m) (ReleaseMutex(*(m)) ? 0 : -1) + +typedef int socklen_t; +typedef SOCKET socket_t; +#define SOCKET_CREATION_FAILURE INVALID_SOCKET +#define readsocket(s, buf, sz) recv((s), (buf), (int)(sz), 0) +#define writesocket(s, buf, sz) send((s), (const char *)(buf), (int)(sz), 0) +#define net_errno WSAGetLastError() + +#else /* !_WIN32 */ + +#include <netinet/in.h> +#include <sys/socket.h> +#include <sys/wait.h> +#include <netdb.h> +#include <pthread.h> +#include <signal.h> +#include <unistd.h> + +typedef pthread_mutex_t mutex_t; +#define mutex_init(m, attr) pthread_mutex_init((m), (attr)) +#define mutex_lock(m) pthread_mutex_lock(m) +#define mutex_unlock(m) pthread_mutex_unlock(m) + +typedef int socket_t; +#define SOCKET_CREATION_FAILURE -1 +#define closesocket(fd) close(fd) +#define net_errno errno +#define readsocket(s, buf, sz) read((s), (buf), (sz)) +#define writesocket(s, buf, sz) write((s), (buf), (sz)) + +#endif + +void *connect_all __P((void *)); +void *connect_thread __P((void *)); +int doclient __P((DB_ENV *, const char *, machtab_t *)); +int domaster __P((DB_ENV *, const char *)); +socket_t get_accepted_socket __P((const char *, int)); +socket_t get_connected_socket + __P((machtab_t *, const char *, const char *, int, int *, int *)); +int get_next_message __P((socket_t, DBT *, DBT *)); +socket_t listen_socket_init __P((const char *, int)); +socket_t listen_socket_accept + __P((machtab_t *, const char *, socket_t, int *)); +int machtab_getinfo __P((machtab_t *, int, u_int32_t *, int *)); +int machtab_init __P((machtab_t **, int)); +void machtab_parm __P((machtab_t *, int *, u_int32_t *)); +int machtab_rem __P((machtab_t *, int, int)); +int quote_send __P((DB_ENV *, const DBT *, const DBT *, const DB_LSN *, + int, u_int32_t)); + +#endif /* !_EX_REPQUOTE_H_ */ diff --git a/db-4.8.30/examples_c/ex_rep/base/rep_msg.c b/db-4.8.30/examples_c/ex_rep/base/rep_msg.c new file mode 100644 index 0000000..445fc30 --- /dev/null +++ b/db-4.8.30/examples_c/ex_rep/base/rep_msg.c @@ -0,0 +1,467 @@ +/*- + * See the file LICENSE for redistribution information. + * + * Copyright (c) 2001-2009 Oracle. All rights reserved. + * + * $Id$ + */ + +#include <sys/types.h> +#include <errno.h> +#include <stdio.h> +#include <stdlib.h> +#include <string.h> + +#include <db.h> + +#include "rep_base.h" + +static int connect_site __P((DB_ENV *, machtab_t *, + const char *, repsite_t *, int *, thread_t *)); +static void *elect_thread __P((void *)); +static void *hm_loop __P((void *)); + +typedef struct { + DB_ENV *dbenv; + machtab_t *machtab; +} elect_args; + +typedef struct { + DB_ENV *dbenv; + const char *progname; + const char *home; + socket_t fd; + u_int32_t eid; + machtab_t *tab; +} hm_loop_args; + +/* + * This is a generic message handling loop that is used both by the + * master to accept messages from a client as well as by clients + * to communicate with other clients. + */ +static void * +hm_loop(args) + void *args; +{ + DB_ENV *dbenv; + DB_LSN permlsn; + DBT rec, control; + APP_DATA *app; + const char *c, *home, *progname; + elect_args *ea; + hm_loop_args *ha; + machtab_t *tab; + thread_t elect_thr, *site_thrs, *tmp, tid; + repsite_t self; + u_int32_t timeout; + int eid, n, nsites, nsites_allocd; + int already_open, r, ret, t_ret; + socket_t fd; + void *status; + + ea = NULL; + site_thrs = NULL; + nsites_allocd = 0; + nsites = 0; + + ha = (hm_loop_args *)args; + dbenv = ha->dbenv; + fd = ha->fd; + home = ha->home; + eid = ha->eid; + progname = ha->progname; + tab = ha->tab; + free(ha); + app = dbenv->app_private; + + memset(&rec, 0, sizeof(DBT)); + memset(&control, 0, sizeof(DBT)); + + for (ret = 0; ret == 0;) { + if ((ret = get_next_message(fd, &rec, &control)) != 0) { + /* + * Close this connection; if it's the master call + * for an election. + */ + closesocket(fd); + if ((ret = machtab_rem(tab, eid, 1)) != 0) + break; + + /* + * If I'm the master, I just lost a client and this + * thread is done. + */ + if (master_eid == SELF_EID) + break; + + /* + * If I was talking with the master and the master + * went away, I need to call an election; else I'm + * done. + */ + if (master_eid != eid) + break; + + master_eid = DB_EID_INVALID; + machtab_parm(tab, &n, &timeout); + (void)dbenv->rep_set_timeout(dbenv, + DB_REP_ELECTION_TIMEOUT, timeout); + if ((ret = dbenv->rep_elect(dbenv, + n, (n/2+1), 0)) != 0) + continue; + + /* + * Regardless of the results, the site I was talking + * to is gone, so I have nothing to do but exit. + */ + if (app->elected) { + app->elected = 0; + ret = dbenv->rep_start(dbenv, + NULL, DB_REP_MASTER); + } + break; + } + + switch (r = dbenv->rep_process_message(dbenv, + &control, &rec, eid, &permlsn)) { + case DB_REP_NEWSITE: + /* + * Check if we got sent connect information and if we + * did, if this is me or if we already have a + * connection to this new site. If we don't, + * establish a new one. + */ + + /* No connect info. */ + if (rec.size == 0) + break; + + /* It's me, do nothing. */ + if (strncmp(myaddr, rec.data, rec.size) == 0) + break; + + self.host = (char *)rec.data; + self.host = strtok(self.host, ":"); + if ((c = strtok(NULL, ":")) == NULL) { + dbenv->errx(dbenv, "Bad host specification"); + goto out; + } + self.port = atoi(c); + + /* + * We try to connect to the new site. If we can't, + * we treat it as an error since we know that the site + * should be up if we got a message from it (even + * indirectly). + */ + if (nsites == nsites_allocd) { + /* Need to allocate more space. */ + if ((tmp = realloc( + site_thrs, (10 + nsites) * + sizeof(thread_t))) == NULL) { + ret = errno; + goto out; + } + site_thrs = tmp; + nsites_allocd += 10; + } + if ((ret = connect_site(dbenv, tab, progname, + &self, &already_open, &tid)) != 0) + goto out; + if (!already_open) + memcpy(&site_thrs + [nsites++], &tid, sizeof(thread_t)); + break; + case DB_REP_HOLDELECTION: + if (master_eid == SELF_EID) + break; + /* Make sure that previous election has finished. */ + if (ea != NULL) { + if (thread_join(elect_thr, &status) != 0) { + dbenv->errx(dbenv, + "thread join failure"); + goto out; + } + ea = NULL; + } + if ((ea = calloc(sizeof(elect_args), 1)) == NULL) { + dbenv->errx(dbenv, "can't allocate memory"); + ret = errno; + goto out; + } + ea->dbenv = dbenv; + ea->machtab = tab; + if ((ret = thread_create(&elect_thr, + NULL, elect_thread, (void *)ea)) != 0) { + dbenv->errx(dbenv, + "can't create election thread"); + } + break; + case DB_REP_ISPERM: + break; + case 0: + if (app->elected) { + app->elected = 0; + if ((ret = dbenv->rep_start(dbenv, + NULL, DB_REP_MASTER)) != 0) { + dbenv->err(dbenv, ret, + "can't start as master"); + goto out; + } + } + break; + default: + dbenv->err(dbenv, r, "DB_ENV->rep_process_message"); + break; + } + } + +out: if ((t_ret = machtab_rem(tab, eid, 1)) != 0 && ret == 0) + ret = t_ret; + + /* Don't close the environment before any children exit. */ + if (ea != NULL && thread_join(elect_thr, &status) != 0) + dbenv->errx(dbenv, "can't join election thread"); + + if (site_thrs != NULL) + while (--nsites >= 0) + if (thread_join(site_thrs[nsites], &status) != 0) + dbenv->errx(dbenv, "can't join site thread"); + + return ((void *)(uintptr_t)ret); +} + +/* + * This is a generic thread that spawns a thread to listen for connections + * on a socket and then spawns off child threads to handle each new + * connection. + */ +void * +connect_thread(args) + void *args; +{ + DB_ENV *dbenv; + const char *home, *progname; + hm_loop_args *ha; + connect_args *cargs; + machtab_t *machtab; + thread_t hm_thrs[MAX_THREADS]; + void *status; + int i, eid, port, ret; + socket_t fd, ns; + + ha = NULL; + cargs = (connect_args *)args; + dbenv = cargs->dbenv; + home = cargs->home; + progname = cargs->progname; + machtab = cargs->machtab; + port = cargs->port; + + /* + * Loop forever, accepting connections from new machines, + * and forking off a thread to handle each. + */ + if ((fd = listen_socket_init(progname, port)) < 0) { + ret = errno; + goto err; + } + + for (i = 0; i < MAX_THREADS; i++) { + if ((ns = listen_socket_accept(machtab, + progname, fd, &eid)) == SOCKET_CREATION_FAILURE) { + ret = errno; + goto err; + } + if ((ha = calloc(sizeof(hm_loop_args), 1)) == NULL) { + dbenv->errx(dbenv, "can't allocate memory"); + ret = errno; + goto err; + } + ha->progname = progname; + ha->home = home; + ha->fd = ns; + ha->eid = eid; + ha->tab = machtab; + ha->dbenv = dbenv; + if ((ret = thread_create(&hm_thrs[i++], NULL, + hm_loop, (void *)ha)) != 0) { + dbenv->errx(dbenv, "can't create thread for site"); + goto err; + } + ha = NULL; + } + + /* If we fell out, we ended up with too many threads. */ + dbenv->errx(dbenv, "Too many threads"); + ret = ENOMEM; + + /* Do not return until all threads have exited. */ + while (--i >= 0) + if (thread_join(hm_thrs[i], &status) != 0) + dbenv->errx(dbenv, "can't join site thread"); + +err: return (ret == 0 ? (void *)EXIT_SUCCESS : (void *)EXIT_FAILURE); +} + +/* + * Open a connection to everyone that we've been told about. If we + * cannot open some connections, keep trying. + */ +void * +connect_all(args) + void *args; +{ + DB_ENV *dbenv; + all_args *aa; + const char *home, *progname; + hm_loop_args *ha; + int failed, i, nsites, open, ret, *success; + machtab_t *machtab; + thread_t *hm_thr; + repsite_t *sites; + + ha = NULL; + aa = (all_args *)args; + dbenv = aa->dbenv; + progname = aa->progname; + home = aa->home; + machtab = aa->machtab; + nsites = aa->nsites; + sites = aa->sites; + + ret = 0; + hm_thr = NULL; + success = NULL; + + /* Some implementations of calloc are sad about allocating 0 things. */ + if ((success = calloc(nsites > 0 ? nsites : 1, sizeof(int))) == NULL) { + dbenv->err(dbenv, errno, "connect_all"); + ret = 1; + goto err; + } + + if (nsites > 0 && (hm_thr = calloc(nsites, sizeof(int))) == NULL) { + dbenv->err(dbenv, errno, "connect_all"); + ret = 1; + goto err; + } + + for (failed = nsites; failed > 0;) { + for (i = 0; i < nsites; i++) { + if (success[i]) + continue; + + ret = connect_site(dbenv, machtab, + progname, &sites[i], &open, &hm_thr[i]); + + /* + * If we couldn't make the connection, this isn't + * fatal to the loop, but we have nothing further + * to do on this machine at the moment. + */ + if (ret == DB_REP_UNAVAIL) + continue; + + if (ret != 0) + goto err; + + failed--; + success[i] = 1; + + /* If the connection is already open, we're done. */ + if (ret == 0 && open == 1) + continue; + + } + sleep(1); + } + +err: if (success != NULL) + free(success); + if (hm_thr != NULL) + free(hm_thr); + return (ret ? (void *)EXIT_FAILURE : (void *)EXIT_SUCCESS); +} + +static int +connect_site(dbenv, machtab, progname, site, is_open, hm_thrp) + DB_ENV *dbenv; + machtab_t *machtab; + const char *progname; + repsite_t *site; + int *is_open; + thread_t *hm_thrp; +{ + int eid, ret; + socket_t s; + hm_loop_args *ha; + + if ((s = get_connected_socket(machtab, progname, + site->host, site->port, is_open, &eid)) < 0) + return (DB_REP_UNAVAIL); + + if (*is_open) + return (0); + + if ((ha = calloc(sizeof(hm_loop_args), 1)) == NULL) { + dbenv->errx(dbenv, "can't allocate memory"); + ret = errno; + goto err; + } + + ha->progname = progname; + ha->fd = s; + ha->eid = eid; + ha->tab = machtab; + ha->dbenv = dbenv; + + if ((ret = thread_create(hm_thrp, NULL, + hm_loop, (void *)ha)) != 0) { + dbenv->errx(dbenv, "can't create thread for connected site"); + goto err1; + } + + return (0); + +err1: free(ha); +err: + return (ret); +} + +/* + * We need to spawn off a new thread in which to hold an election in + * case we are the only thread listening on for messages. + */ +static void * +elect_thread(args) + void *args; +{ + DB_ENV *dbenv; + elect_args *eargs; + machtab_t *machtab; + u_int32_t timeout; + int n, ret; + APP_DATA *app; + + eargs = (elect_args *)args; + dbenv = eargs->dbenv; + machtab = eargs->machtab; + free(eargs); + app = dbenv->app_private; + + machtab_parm(machtab, &n, &timeout); + (void)dbenv->rep_set_timeout(dbenv, DB_REP_ELECTION_TIMEOUT, timeout); + while ((ret = dbenv->rep_elect(dbenv, n, (n/2+1), 0)) != 0) + sleep(2); + + if (app->elected) { + app->elected = 0; + if ((ret = dbenv->rep_start(dbenv, NULL, DB_REP_MASTER)) != 0) + dbenv->err(dbenv, ret, + "can't start as master in election thread"); + } + + return (NULL); +} diff --git a/db-4.8.30/examples_c/ex_rep/base/rep_net.c b/db-4.8.30/examples_c/ex_rep/base/rep_net.c new file mode 100644 index 0000000..350300d --- /dev/null +++ b/db-4.8.30/examples_c/ex_rep/base/rep_net.c @@ -0,0 +1,749 @@ +/*- + * See the file LICENSE for redistribution information. + * + * Copyright (c) 2001-2009 Oracle. All rights reserved. + * + * $Id$ + */ + +#include <sys/types.h> +#include <errno.h> +#include <stdio.h> +#include <stdlib.h> +#include <string.h> + +#include <db.h> +#include "rep_base.h" +#ifndef _SYS_QUEUE_H +/* + * Some *BSD Unix variants include the Queue macros in their libraries and + * these might already have been included. In that case, it would be bad + * to include them again. + */ +#include <dbinc/queue.h> /* !!!: for the LIST_XXX macros. */ +#endif + +int machtab_add __P((machtab_t *, socket_t, u_int32_t, int, int *)); +#ifdef DIAGNOSTIC +void machtab_print __P((machtab_t *)); +#endif +ssize_t readn __P((socket_t, void *, size_t)); + +/* + * This file defines the communication infrastructure for the ex_repquote + * sample application. + * + * This application uses TCP/IP for its communication. In an N-site + * replication group, this means that there are N * N communication + * channels so that every site can communicate with every other site + * (this allows elections to be held when the master fails). We do + * not require that anyone know about all sites when the application + * starts up. In order to communicate, the application should know + * about someone, else it has no idea how to ever get in the game. + * + * Communication is handled via a number of different threads. These + * thread functions are implemented in rep_util.c In this file, we + * define the data structures that maintain the state that describes + * the comm infrastructure, the functions that manipulates this state + * and the routines used to actually send and receive data over the + * sockets. + */ + +/* + * The communication infrastructure is represented by a machine table, + * machtab_t, which is essentially a mutex-protected linked list of members + * of the group. The machtab also contains the parameters that are needed + * to call for an election. We hardwire values for these parameters in the + * init function, but these could be set via some configuration setup in a + * real application. We reserve the machine-id 1 to refer to ourselves and + * make the machine-id 0 be invalid. + */ + +#define MACHID_INVALID 0 +#define MACHID_SELF 1 + +struct __machtab { + LIST_HEAD(__machlist, __member) machlist; + int nextid; + mutex_t mtmutex; + u_int32_t timeout_time; + int current; + int max; + int nsites; +}; + +/* Data structure that describes each entry in the machtab. */ +struct __member { + u_int32_t hostaddr; /* Host IP address. */ + int port; /* Port number. */ + int eid; /* Application-specific machine id. */ + socket_t fd; /* File descriptor for the socket. */ + LIST_ENTRY(__member) links; + /* For linked list of all members we know of. */ +}; + +static int quote_send_broadcast __P((machtab_t *, + const DBT *, const DBT *, u_int32_t)); +static int quote_send_one __P((const DBT *, const DBT *, socket_t, u_int32_t)); + +/* + * machtab_init -- + * Initialize the machine ID table. + * XXX Right now we treat the number of sites as the maximum + * number we've ever had on the list at one time. We probably + * want to make that smarter. + */ +int +machtab_init(machtabp, nsites) + machtab_t **machtabp; + int nsites; +{ + int ret; + machtab_t *machtab; + + if ((machtab = malloc(sizeof(machtab_t))) == NULL) { + fprintf(stderr, "can't allocate memory\n"); + return (ENOMEM); + } + + LIST_INIT(&machtab->machlist); + + /* Reserve eid's 0 and 1. */ + machtab->nextid = 2; + machtab->timeout_time = 2 * 1000000; /* 2 seconds. */ + machtab->current = machtab->max = 0; + machtab->nsites = nsites; + + ret = mutex_init(&machtab->mtmutex, NULL); + *machtabp = machtab; + + return (ret); +} + +/* + * machtab_add -- + * Add a file descriptor to the table of machines, returning + * a new machine ID. + */ +int +machtab_add(machtab, fd, hostaddr, port, idp) + machtab_t *machtab; + socket_t fd; + u_int32_t hostaddr; + int port, *idp; +{ + int ret; + member_t *m, *member; + + ret = 0; + if ((member = malloc(sizeof(member_t))) == NULL) { + fprintf(stderr, "can't allocate memory\n"); + return (ENOMEM); + } + + member->fd = fd; + member->hostaddr = hostaddr; + member->port = port; + + if ((ret = mutex_lock(&machtab->mtmutex)) != 0) { + fprintf(stderr, "can't lock mutex"); + return (ret); + } + + for (m = LIST_FIRST(&machtab->machlist); + m != NULL; m = LIST_NEXT(m, links)) + if (m->hostaddr == hostaddr && m->port == port) + break; + + if (m == NULL) { + member->eid = machtab->nextid++; + LIST_INSERT_HEAD(&machtab->machlist, member, links); + } else + member->eid = m->eid; + + if ((ret = mutex_unlock(&machtab->mtmutex)) != 0) { + fprintf(stderr, "can't unlock mutex\n"); + return (ret); + } + + if (idp != NULL) + *idp = member->eid; + + if (m == NULL) { + if (++machtab->current > machtab->max) + machtab->max = machtab->current; + } else { + free(member); + ret = EEXIST; + } +#ifdef DIAGNOSTIC + printf("Exiting machtab_add\n"); + machtab_print(machtab); +#endif + return (ret); +} + +/* + * machtab_getinfo -- + * Return host and port information for a particular machine id. + */ +int +machtab_getinfo(machtab, eid, hostp, portp) + machtab_t *machtab; + int eid; + u_int32_t *hostp; + int *portp; +{ + int ret; + member_t *member; + + if ((ret = mutex_lock(&machtab->mtmutex)) != 0) { + fprintf(stderr, "can't lock mutex\n"); + return (ret); + } + + for (member = LIST_FIRST(&machtab->machlist); + member != NULL; + member = LIST_NEXT(member, links)) + if (member->eid == eid) { + *hostp = member->hostaddr; + *portp = member->port; + break; + } + + if ((ret = mutex_unlock(&machtab->mtmutex)) != 0) { + fprintf(stderr, "can't unlock mutex\n"); + return (ret); + } + + return (member != NULL ? 0 : EINVAL); +} + +/* + * machtab_rem -- + * Remove a mapping from the table of machines. Lock indicates + * whether we need to lock the machtab or not (0 indicates we do not + * need to lock; non-zero indicates that we do need to lock). + */ +int +machtab_rem(machtab, eid, lock) + machtab_t *machtab; + int eid; + int lock; +{ + int found, ret; + member_t *member; + + ret = 0; + if (lock && (ret = mutex_lock(&machtab->mtmutex)) != 0) { + fprintf(stderr, "can't lock mutex\n"); + return (ret); + } + + for (found = 0, member = LIST_FIRST(&machtab->machlist); + member != NULL; + member = LIST_NEXT(member, links)) + if (member->eid == eid) { + found = 1; + LIST_REMOVE(member, links); + (void)closesocket(member->fd); + free(member); + machtab->current--; + break; + } + + if (LIST_FIRST(&machtab->machlist) == NULL) + machtab->nextid = 2; + + if (lock && (ret = mutex_unlock(&machtab->mtmutex)) != 0) + fprintf(stderr, "can't unlock mutex\n"); + +#ifdef DIAGNOSTIC + printf("Exiting machtab_rem\n"); + machtab_print(machtab); +#endif + return (ret); +} + +void +machtab_parm(machtab, nump, timeoutp) + machtab_t *machtab; + int *nump; + u_int32_t *timeoutp; +{ + if (machtab->nsites == 0) + *nump = machtab->max; + else + *nump = machtab->nsites; + *timeoutp = machtab->timeout_time; +} + +#ifdef DIAGNOSTIC +void +machtab_print(machtab) + machtab_t *machtab; +{ + member_t *m; + + if (mutex_lock(&machtab->mtmutex) != 0) { + fprintf(stderr, "can't lock mutex\n"); + abort(); + } + + for (m = LIST_FIRST(&machtab->machlist); + m != NULL; m = LIST_NEXT(m, links)) { + + printf("IP: %lx Port: %6d EID: %2d FD: %3d\n", + (long)m->hostaddr, m->port, m->eid, m->fd); + } + + if (mutex_unlock(&machtab->mtmutex) != 0) { + fprintf(stderr, "can't unlock mutex\n"); + abort(); + } +} +#endif +/* + * listen_socket_init -- + * Initialize a socket for listening on the specified port. Returns + * a file descriptor for the socket, ready for an accept() call + * in a thread that we're happy to let block. + */ +socket_t +listen_socket_init(progname, port) + const char *progname; + int port; +{ + socket_t s; + int sockopt; + struct sockaddr_in si; + + COMPQUIET(progname, NULL); + + if ((s = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0) { + perror("can't create listen socket"); + return (-1); + } + + memset(&si, 0, sizeof(si)); + si.sin_family = AF_INET; + si.sin_addr.s_addr = htonl(INADDR_ANY); + si.sin_port = htons((unsigned short)port); + + /* + * When using this example for testing, it's common to kill and restart + * regularly. On some systems, this causes bind to fail with "address + * in use" errors unless this option is set. + */ + sockopt = 1; + setsockopt(s, SOL_SOCKET, SO_REUSEADDR, + (const char *)&sockopt, sizeof(sockopt)); + + if (bind(s, (struct sockaddr *)&si, sizeof(si)) != 0) { + perror("can't bind listen socket"); + goto err; + } + + if (listen(s, 5) != 0) { + perror("can't establish listen queue"); + goto err; + } + + return (s); + +err: closesocket(s); + return (-1); +} + +/* + * listen_socket_accept -- + * Accept a connection on a socket. This is essentially just a wrapper + * for accept(3). + */ +socket_t +listen_socket_accept(machtab, progname, s, eidp) + machtab_t *machtab; + const char *progname; + socket_t s; + int *eidp; +{ + struct sockaddr_in si; + socklen_t si_len; + int host, ret; + socket_t ns; + u_int16_t port; + + COMPQUIET(progname, NULL); + +accept_wait: + memset(&si, 0, sizeof(si)); + si_len = sizeof(si); + ns = accept(s, (struct sockaddr *)&si, &si_len); + if (ns == SOCKET_CREATION_FAILURE) { + fprintf(stderr, "can't accept incoming connection\n"); + return ns; + } + host = ntohl(si.sin_addr.s_addr); + + /* + * Sites send their listening port when connections are first + * established, as it will be different from the outgoing port + * for this connection. + */ + if (readn(ns, &port, 2) != 2) + goto err; + port = ntohs(port); + + ret = machtab_add(machtab, ns, host, port, eidp); + if (ret == EEXIST) { + closesocket(ns); + goto accept_wait; + } else if (ret != 0) + goto err; + printf("Connected to host %x port %d, eid = %d\n", host, port, *eidp); + return (ns); + +err: closesocket(ns); + return SOCKET_CREATION_FAILURE; +} + +/* + * get_connected_socket -- + * Connect to the specified port of the specified remote machine, + * and return a file descriptor when we have accepted a connection on it. + * Add this connection to the machtab. If we already have a connection + * open to this machine, then don't create another one, return the eid + * of the connection (in *eidp) and set is_open to 1. Return 0. + */ +socket_t +get_connected_socket(machtab, progname, remotehost, port, is_open, eidp) + machtab_t *machtab; + const char *progname, *remotehost; + int port, *is_open, *eidp; +{ + int ret; + socket_t s; + struct hostent *hp; + struct sockaddr_in si; + u_int32_t addr; + u_int16_t nport; + + *is_open = 0; + + if ((hp = gethostbyname(remotehost)) == NULL) { + fprintf(stderr, "%s: host not found: %s\n", progname, + strerror(net_errno)); + return (-1); + } + + if ((s = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0) { + perror("can't create outgoing socket"); + return (-1); + } + memset(&si, 0, sizeof(si)); + memcpy((char *)&si.sin_addr, hp->h_addr, hp->h_length); + addr = ntohl(si.sin_addr.s_addr); + ret = machtab_add(machtab, s, addr, port, eidp); + if (ret == EEXIST) { + *is_open = 1; + closesocket(s); + return (0); + } else if (ret != 0) { + closesocket(s); + return (-1); + } + + si.sin_family = AF_INET; + si.sin_port = htons((unsigned short)port); + if (connect(s, (struct sockaddr *)&si, sizeof(si)) < 0) { + fprintf(stderr, "%s: connection failed: %s\n", + progname, strerror(net_errno)); + (void)machtab_rem(machtab, *eidp, 1); + return (-1); + } + + /* + * The first thing we send on the socket is our (listening) port + * so the site we are connecting to can register us correctly in + * its machtab. + */ + nport = htons(myport); + writesocket(s, &nport, 2); + + return (s); +} + +/* + * get_next_message -- + * Read a single message from the specified file descriptor, and + * return it in the format used by rep functions (two DBTs and a type). + * + * This function is called in a loop by both clients and masters, and + * the resulting DBTs are manually dispatched to DB_ENV->rep_process_message(). + */ +int +get_next_message(fd, rec, control) + socket_t fd; + DBT *rec, *control; +{ + size_t nr; + u_int32_t rsize, csize; + u_int8_t *recbuf, *controlbuf; + + /* + * The protocol we use on the wire is dead simple: + * + * 4 bytes - rec->size + * (# read above) - rec->data + * 4 bytes - control->size + * (# read above) - control->data + */ + + /* Read rec->size. */ + nr = readn(fd, &rsize, 4); + if (nr != 4) + return (1); + + /* Read the record itself. */ + if (rsize > 0) { + if (rec->size < rsize) + rec->data = realloc(rec->data, rsize); + if ((recbuf = rec->data) == NULL) + return (1); + nr = readn(fd, recbuf, rsize); + } else { + if (rec->data != NULL) + free(rec->data); + rec->data = NULL; + } + rec->size = rsize; + + /* Read control->size. */ + nr = readn(fd, &csize, 4); + if (nr != 4) + return (1); + + /* Read the control struct itself. */ + if (csize > 0) { + controlbuf = control->data; + if (control->size < csize) + controlbuf = realloc(controlbuf, csize); + if (controlbuf == NULL) + return (1); + nr = readn(fd, controlbuf, csize); + if (nr != csize) + return (1); + } else { + if (control->data != NULL) + free(control->data); + controlbuf = NULL; + } + control->data = controlbuf; + control->size = csize; + + return (0); +} + +/* + * readn -- + * Read a full n characters from a file descriptor, unless we get an error + * or EOF. + */ +ssize_t +readn(fd, vptr, n) + socket_t fd; + void *vptr; + size_t n; +{ + size_t nleft; + ssize_t nread; + char *ptr; + + ptr = vptr; + nleft = n; + while (nleft > 0) { + if ((nread = readsocket(fd, ptr, nleft)) < 0) { + /* + * Call read() again on interrupted system call; + * on other errors, bail. + */ + if (net_errno == EINTR) + nread = 0; + else { + perror("can't read from socket"); + return (-1); + } + } else if (nread == 0) + break; /* EOF */ + + nleft -= nread; + ptr += nread; + } + + return (n - nleft); +} + +/* + * quote_send -- + * The f_send function for DB_ENV->set_rep_transport. + */ +int +quote_send(dbenv, control, rec, lsnp, eid, flags) + DB_ENV *dbenv; + const DBT *control, *rec; + const DB_LSN *lsnp; + int eid; + u_int32_t flags; +{ + int n, ret, t_ret; + socket_t fd; + machtab_t *machtab; + member_t *m; + + COMPQUIET(lsnp, NULL); + machtab = + (machtab_t *)((APP_DATA*)dbenv->app_private)->comm_infrastructure; + + if (eid == DB_EID_BROADCAST) { + /* + * Right now, we do not require successful transmission. + * I'd like to move this requiring at least one successful + * transmission on PERMANENT requests. + */ + n = quote_send_broadcast(machtab, rec, control, flags); + if (n < 0 /*|| (n == 0 && LF_ISSET(DB_REP_PERMANENT))*/) + return (DB_REP_UNAVAIL); + return (0); + } + + if ((ret = mutex_lock(&machtab->mtmutex)) != 0) { + dbenv->errx(dbenv, "can't lock mutex"); + return (ret); + } + + fd = 0; + for (m = LIST_FIRST(&machtab->machlist); m != NULL; + m = LIST_NEXT(m, links)) { + if (m->eid == eid) { + fd = m->fd; + break; + } + } + + if (fd == 0) { + dbenv->err(dbenv, DB_REP_UNAVAIL, + "quote_send: cannot find machine ID %d", eid); + return (DB_REP_UNAVAIL); + } + + if ((ret = quote_send_one(rec, control, fd, flags)) != 0) + fprintf(stderr, "socket write error in send() function\n"); + + if ((t_ret = mutex_unlock(&machtab->mtmutex)) != 0) { + dbenv->errx(dbenv, "can't unlock mutex"); + if (ret == 0) + ret = t_ret; + } + + return (ret); +} + +/* + * quote_send_broadcast -- + * Send a message to everybody. + * Returns the number of sites to which this message was successfully + * communicated. A -1 indicates a fatal error. + */ +static int +quote_send_broadcast(machtab, rec, control, flags) + machtab_t *machtab; + const DBT *rec, *control; + u_int32_t flags; +{ + int ret, sent; + member_t *m, *next; + + if ((ret = mutex_lock(&machtab->mtmutex)) != 0) { + fprintf(stderr, "can't lock mutex\n"); + return (ret); + } + + sent = 0; + for (m = LIST_FIRST(&machtab->machlist); m != NULL; m = next) { + next = LIST_NEXT(m, links); + if ((ret = quote_send_one(rec, control, m->fd, flags)) != 0) { + fprintf(stderr, "socket write error in broadcast\n"); + (void)machtab_rem(machtab, m->eid, 0); + } else + sent++; + } + + if (mutex_unlock(&machtab->mtmutex) != 0) { + fprintf(stderr, "can't unlock mutex\n"); + return (-1); + } + + return (sent); +} + +/* + * quote_send_one -- + * Send a message to a single machine, given that machine's file + * descriptor. + * + * !!! + * Note that the machtab mutex should be held through this call. + * It doubles as a synchronizer to make sure that two threads don't + * intersperse writes that are part of two single messages. + */ +static int +quote_send_one(rec, control, fd, flags) + const DBT *rec, *control; + socket_t fd; + u_int32_t flags; + +{ + int retry; + ssize_t bytes_left, nw; + u_int8_t *wp; + + COMPQUIET(flags, 0); + + /* + * The protocol is simply: write rec->size, write rec->data, + * write control->size, write control->data. + */ + nw = writesocket(fd, (const char *)&rec->size, 4); + if (nw != 4) + return (DB_REP_UNAVAIL); + + if (rec->size > 0) { + nw = writesocket(fd, rec->data, rec->size); + if (nw < 0) + return (DB_REP_UNAVAIL); + if (nw != (ssize_t)rec->size) { + /* Try a couple of times to finish the write. */ + wp = (u_int8_t *)rec->data + nw; + bytes_left = rec->size - nw; + for (retry = 0; bytes_left > 0 && retry < 3; retry++) { + nw = writesocket(fd, wp, bytes_left); + if (nw < 0) + return (DB_REP_UNAVAIL); + bytes_left -= nw; + wp += nw; + } + if (bytes_left > 0) + return (DB_REP_UNAVAIL); + } + } + + nw = writesocket(fd, (const char *)&control->size, 4); + if (nw != 4) + return (DB_REP_UNAVAIL); + if (control->size > 0) { + nw = writesocket(fd, control->data, control->size); + if (nw != (ssize_t)control->size) + return (DB_REP_UNAVAIL); + } + return (0); +} |