diff options
Diffstat (limited to 'db-4.8.30/examples_c/ex_rep')
-rw-r--r-- | db-4.8.30/examples_c/ex_rep/README | 19 | ||||
-rw-r--r-- | db-4.8.30/examples_c/ex_rep/base/rep_base.c | 247 | ||||
-rw-r--r-- | db-4.8.30/examples_c/ex_rep/base/rep_base.h | 117 | ||||
-rw-r--r-- | db-4.8.30/examples_c/ex_rep/base/rep_msg.c | 467 | ||||
-rw-r--r-- | db-4.8.30/examples_c/ex_rep/base/rep_net.c | 749 | ||||
-rw-r--r-- | db-4.8.30/examples_c/ex_rep/common/rep_common.c | 672 | ||||
-rw-r--r-- | db-4.8.30/examples_c/ex_rep/common/rep_common.h | 81 | ||||
-rw-r--r-- | db-4.8.30/examples_c/ex_rep/mgr/rep_mgr.c | 204 |
8 files changed, 2556 insertions, 0 deletions
diff --git a/db-4.8.30/examples_c/ex_rep/README b/db-4.8.30/examples_c/ex_rep/README new file mode 100644 index 0000000..7f20e13 --- /dev/null +++ b/db-4.8.30/examples_c/ex_rep/README @@ -0,0 +1,19 @@ +# $Id$ + +This is the parent directory for the replication example programs. + +The example is a toy stock quote server. There are two versions of +the program: one version uses Berkeley DB's Replication Manager +support, and the other uses the base replication API. + +common/ Contains code to implement the basic functions of the + application, to demonstrate that these are largely + independent of which replication API is used. + +mgr/ Contains the small amount of code necessary to + configure the application to use Replication Manager. + +base/ Contains the sample communications infrastructure, and + other replication support code, to demonstrate some of + the kinds of things that are necessary when using the + base replication API. diff --git a/db-4.8.30/examples_c/ex_rep/base/rep_base.c b/db-4.8.30/examples_c/ex_rep/base/rep_base.c new file mode 100644 index 0000000..87273dc --- /dev/null +++ b/db-4.8.30/examples_c/ex_rep/base/rep_base.c @@ -0,0 +1,247 @@ +/*- + * See the file LICENSE for redistribution information. + * + * Copyright (c) 2001-2009 Oracle. All rights reserved. + * + * $Id$ + */ + +#include <sys/types.h> +#include <errno.h> +#include <signal.h> +#include <stdlib.h> +#include <string.h> + +#include <db.h> + +#include "rep_base.h" + +/* + * Process globals (we could put these in the machtab I suppose). + */ +int master_eid; +char *myaddr; +unsigned short myport; + +const char *progname = "ex_rep_base"; + +static void event_callback __P((DB_ENV *, u_int32_t, void *)); + +int +main(argc, argv) + int argc; + char *argv[]; +{ + DB_ENV *dbenv; + SETUP_DATA setup_info; + DBT local; + all_args aa; + connect_args ca; + supthr_args supa; + machtab_t *machtab; + thread_t all_thr, ckp_thr, conn_thr, lga_thr; + void *astatus, *cstatus; +#ifdef _WIN32 + WSADATA wsaData; +#else + struct sigaction sigact; +#endif + APP_DATA my_app_data; + int ret; + + memset(&setup_info, 0, sizeof(SETUP_DATA)); + setup_info.progname = progname; + master_eid = DB_EID_INVALID; + memset(&my_app_data, 0, sizeof(APP_DATA)); + dbenv = NULL; + machtab = NULL; + ret = 0; + + if ((ret = create_env(progname, &dbenv)) != 0) + goto err; + dbenv->app_private = &my_app_data; + (void)dbenv->set_event_notify(dbenv, event_callback); + + /* Parse command line and perform common replication setup. */ + if ((ret = common_rep_setup(dbenv, argc, argv, &setup_info)) != 0) + goto err; + + if (setup_info.role == MASTER) + master_eid = SELF_EID; + + myaddr = strdup(setup_info.self.host); + myport = setup_info.self.port; + +#ifdef _WIN32 + /* Initialize the Windows sockets DLL. */ + if ((ret = WSAStartup(MAKEWORD(2, 2), &wsaData)) != 0) { + fprintf(stderr, + "Unable to initialize Windows sockets: %d\n", ret); + goto err; + } +#else + /* + * Turn off SIGPIPE so that we don't kill processes when they + * happen to lose a connection at the wrong time. + */ + memset(&sigact, 0, sizeof(sigact)); + sigact.sa_handler = SIG_IGN; + if ((ret = sigaction(SIGPIPE, &sigact, NULL)) != 0) { + fprintf(stderr, + "Unable to turn off SIGPIPE: %s\n", strerror(ret)); + goto err; + } +#endif + + /* + * We are hardcoding priorities here that all clients have the + * same priority except for a designated master who gets a higher + * priority. + */ + if ((ret = + machtab_init(&machtab, setup_info.nsites)) != 0) + goto err; + my_app_data.comm_infrastructure = machtab; + + if ((ret = env_init(dbenv, setup_info.home)) != 0) + goto err; + + /* + * Now sets up comm infrastructure. There are two phases. First, + * we open our port for listening for incoming connections. Then + * we attempt to connect to every host we know about. + */ + + (void)dbenv->rep_set_transport(dbenv, SELF_EID, quote_send); + + ca.dbenv = dbenv; + ca.home = setup_info.home; + ca.progname = progname; + ca.machtab = machtab; + ca.port = setup_info.self.port; + if ((ret = thread_create(&conn_thr, NULL, connect_thread, &ca)) != 0) { + dbenv->errx(dbenv, "can't create connect thread"); + goto err; + } + + aa.dbenv = dbenv; + aa.progname = progname; + aa.home = setup_info.home; + aa.machtab = machtab; + aa.sites = setup_info.site_list; + aa.nsites = setup_info.remotesites; + if ((ret = thread_create(&all_thr, NULL, connect_all, &aa)) != 0) { + dbenv->errx(dbenv, "can't create connect-all thread"); + goto err; + } + + /* Start checkpoint and log archive threads. */ + supa.dbenv = dbenv; + supa.shared = &my_app_data.shared_data; + if ((ret = start_support_threads(dbenv, &supa, &ckp_thr, &lga_thr)) + != 0) + goto err; + + /* + * We have now got the entire communication infrastructure set up. + * It's time to declare ourselves to be a client or master. + */ + if (setup_info.role == MASTER) { + if ((ret = dbenv->rep_start(dbenv, NULL, DB_REP_MASTER)) != 0) { + dbenv->err(dbenv, ret, "dbenv->rep_start failed"); + goto err; + } + } else { + memset(&local, 0, sizeof(local)); + local.data = myaddr; + local.size = (u_int32_t)strlen(myaddr) + 1; + if ((ret = + dbenv->rep_start(dbenv, &local, DB_REP_CLIENT)) != 0) { + dbenv->err(dbenv, ret, "dbenv->rep_start failed"); + goto err; + } + /* Sleep to give ourselves time to find a master. */ + sleep(5); + } + + if ((ret = doloop(dbenv, &my_app_data.shared_data)) != 0) { + dbenv->err(dbenv, ret, "Main loop failed"); + goto err; + } + + /* Finish checkpoint and log archive threads. */ + if ((ret = finish_support_threads(&ckp_thr, &lga_thr)) != 0) + goto err; + + /* Wait on the connection threads. */ + if (thread_join(all_thr, &astatus) || thread_join(conn_thr, &cstatus)) { + ret = -1; + goto err; + } + if ((uintptr_t)astatus != EXIT_SUCCESS || + (uintptr_t)cstatus != EXIT_SUCCESS) { + ret = -1; + goto err; + } + + /* + * We have used the DB_TXN_NOSYNC environment flag for improved + * performance without the usual sacrifice of transactional durability, + * as discussed in the "Transactional guarantees" page of the Reference + * Guide: if one replication site crashes, we can expect the data to + * exist at another site. However, in case we shut down all sites + * gracefully, we push out the end of the log here so that the most + * recent transactions don't mysteriously disappear. + */ + if ((ret = dbenv->log_flush(dbenv, NULL)) != 0) + dbenv->err(dbenv, ret, "log_flush"); + +err: if (machtab != NULL) + free(machtab); + if (dbenv != NULL) + (void)dbenv->close(dbenv, 0); +#ifdef _WIN32 + /* Shut down the Windows sockets DLL. */ + (void)WSACleanup(); +#endif + return (ret); +} + +static void +event_callback(dbenv, which, info) + DB_ENV *dbenv; + u_int32_t which; + void *info; +{ + APP_DATA *app = dbenv->app_private; + SHARED_DATA *shared = &app->shared_data; + + switch (which) { + case DB_EVENT_REP_CLIENT: + shared->is_master = 0; + shared->in_client_sync = 1; + break; + + case DB_EVENT_REP_ELECTED: + app->elected = 1; + master_eid = SELF_EID; + break; + + case DB_EVENT_REP_MASTER: + shared->is_master = 1; + shared->in_client_sync = 0; + break; + + case DB_EVENT_REP_NEWMASTER: + master_eid = *(int*)info; + shared->in_client_sync = 1; + break; + + case DB_EVENT_REP_STARTUPDONE: + shared->in_client_sync = 0; + break; + + default: + dbenv->errx(dbenv, "ignoring event %d", which); + } +} diff --git a/db-4.8.30/examples_c/ex_rep/base/rep_base.h b/db-4.8.30/examples_c/ex_rep/base/rep_base.h new file mode 100644 index 0000000..4bfeb29 --- /dev/null +++ b/db-4.8.30/examples_c/ex_rep/base/rep_base.h @@ -0,0 +1,117 @@ +/*- + * See the file LICENSE for redistribution information. + * + * Copyright (c) 2001-2009 Oracle. All rights reserved. + * + * $Id$ + */ + +#ifndef _EX_REPQUOTE_H_ +#define _EX_REPQUOTE_H_ + +#include "../common/rep_common.h" + +#define SELF_EID 1 + +/* Globals */ +typedef struct { + SHARED_DATA shared_data; + int elected; + void *comm_infrastructure; +} APP_DATA; + +extern int master_eid; +extern char *myaddr; +extern unsigned short myport; + +struct __member; typedef struct __member member_t; +struct __machtab; typedef struct __machtab machtab_t; + +/* Arguments for the connect_all thread. */ +typedef struct { + DB_ENV *dbenv; + const char *progname; + const char *home; + machtab_t *machtab; + repsite_t *sites; + int nsites; +} all_args; + +/* Arguments for the connect_loop thread. */ +typedef struct { + DB_ENV *dbenv; + const char * home; + const char * progname; + machtab_t *machtab; + int port; +} connect_args; + +#define CACHESIZE (10 * 1024 * 1024) +#define DATABASE "quote.db" +#define MAX_THREADS 25 +#define SLEEPTIME 3 + +#ifndef COMPQUIET +#define COMPQUIET(x,y) x = (y) +#endif + +/* Portability macros for basic threading and networking */ +#ifdef _WIN32 + +typedef HANDLE mutex_t; +#define mutex_init(m, attr) \ + (((*(m) = CreateMutex(NULL, FALSE, NULL)) != NULL) ? 0 : -1) +#define mutex_lock(m) \ + ((WaitForSingleObject(*(m), INFINITE) == WAIT_OBJECT_0) ? 0 : -1) +#define mutex_unlock(m) (ReleaseMutex(*(m)) ? 0 : -1) + +typedef int socklen_t; +typedef SOCKET socket_t; +#define SOCKET_CREATION_FAILURE INVALID_SOCKET +#define readsocket(s, buf, sz) recv((s), (buf), (int)(sz), 0) +#define writesocket(s, buf, sz) send((s), (const char *)(buf), (int)(sz), 0) +#define net_errno WSAGetLastError() + +#else /* !_WIN32 */ + +#include <netinet/in.h> +#include <sys/socket.h> +#include <sys/wait.h> +#include <netdb.h> +#include <pthread.h> +#include <signal.h> +#include <unistd.h> + +typedef pthread_mutex_t mutex_t; +#define mutex_init(m, attr) pthread_mutex_init((m), (attr)) +#define mutex_lock(m) pthread_mutex_lock(m) +#define mutex_unlock(m) pthread_mutex_unlock(m) + +typedef int socket_t; +#define SOCKET_CREATION_FAILURE -1 +#define closesocket(fd) close(fd) +#define net_errno errno +#define readsocket(s, buf, sz) read((s), (buf), (sz)) +#define writesocket(s, buf, sz) write((s), (buf), (sz)) + +#endif + +void *connect_all __P((void *)); +void *connect_thread __P((void *)); +int doclient __P((DB_ENV *, const char *, machtab_t *)); +int domaster __P((DB_ENV *, const char *)); +socket_t get_accepted_socket __P((const char *, int)); +socket_t get_connected_socket + __P((machtab_t *, const char *, const char *, int, int *, int *)); +int get_next_message __P((socket_t, DBT *, DBT *)); +socket_t listen_socket_init __P((const char *, int)); +socket_t listen_socket_accept + __P((machtab_t *, const char *, socket_t, int *)); +int machtab_getinfo __P((machtab_t *, int, u_int32_t *, int *)); +int machtab_init __P((machtab_t **, int)); +void machtab_parm __P((machtab_t *, int *, u_int32_t *)); +int machtab_rem __P((machtab_t *, int, int)); +int quote_send __P((DB_ENV *, const DBT *, const DBT *, const DB_LSN *, + int, u_int32_t)); + +#endif /* !_EX_REPQUOTE_H_ */ diff --git a/db-4.8.30/examples_c/ex_rep/base/rep_msg.c b/db-4.8.30/examples_c/ex_rep/base/rep_msg.c new file mode 100644 index 0000000..445fc30 --- /dev/null +++ b/db-4.8.30/examples_c/ex_rep/base/rep_msg.c @@ -0,0 +1,467 @@ +/*- + * See the file LICENSE for redistribution information. + * + * Copyright (c) 2001-2009 Oracle. All rights reserved. + * + * $Id$ + */ + +#include <sys/types.h> +#include <errno.h> +#include <stdio.h> +#include <stdlib.h> +#include <string.h> + +#include <db.h> + +#include "rep_base.h" + +static int connect_site __P((DB_ENV *, machtab_t *, + const char *, repsite_t *, int *, thread_t *)); +static void *elect_thread __P((void *)); +static void *hm_loop __P((void *)); + +typedef struct { + DB_ENV *dbenv; + machtab_t *machtab; +} elect_args; + +typedef struct { + DB_ENV *dbenv; + const char *progname; + const char *home; + socket_t fd; + u_int32_t eid; + machtab_t *tab; +} hm_loop_args; + +/* + * This is a generic message handling loop that is used both by the + * master to accept messages from a client as well as by clients + * to communicate with other clients. + */ +static void * +hm_loop(args) + void *args; +{ + DB_ENV *dbenv; + DB_LSN permlsn; + DBT rec, control; + APP_DATA *app; + const char *c, *home, *progname; + elect_args *ea; + hm_loop_args *ha; + machtab_t *tab; + thread_t elect_thr, *site_thrs, *tmp, tid; + repsite_t self; + u_int32_t timeout; + int eid, n, nsites, nsites_allocd; + int already_open, r, ret, t_ret; + socket_t fd; + void *status; + + ea = NULL; + site_thrs = NULL; + nsites_allocd = 0; + nsites = 0; + + ha = (hm_loop_args *)args; + dbenv = ha->dbenv; + fd = ha->fd; + home = ha->home; + eid = ha->eid; + progname = ha->progname; + tab = ha->tab; + free(ha); + app = dbenv->app_private; + + memset(&rec, 0, sizeof(DBT)); + memset(&control, 0, sizeof(DBT)); + + for (ret = 0; ret == 0;) { + if ((ret = get_next_message(fd, &rec, &control)) != 0) { + /* + * Close this connection; if it's the master call + * for an election. + */ + closesocket(fd); + if ((ret = machtab_rem(tab, eid, 1)) != 0) + break; + + /* + * If I'm the master, I just lost a client and this + * thread is done. + */ + if (master_eid == SELF_EID) + break; + + /* + * If I was talking with the master and the master + * went away, I need to call an election; else I'm + * done. + */ + if (master_eid != eid) + break; + + master_eid = DB_EID_INVALID; + machtab_parm(tab, &n, &timeout); + (void)dbenv->rep_set_timeout(dbenv, + DB_REP_ELECTION_TIMEOUT, timeout); + if ((ret = dbenv->rep_elect(dbenv, + n, (n/2+1), 0)) != 0) + continue; + + /* + * Regardless of the results, the site I was talking + * to is gone, so I have nothing to do but exit. + */ + if (app->elected) { + app->elected = 0; + ret = dbenv->rep_start(dbenv, + NULL, DB_REP_MASTER); + } + break; + } + + switch (r = dbenv->rep_process_message(dbenv, + &control, &rec, eid, &permlsn)) { + case DB_REP_NEWSITE: + /* + * Check if we got sent connect information and if we + * did, if this is me or if we already have a + * connection to this new site. If we don't, + * establish a new one. + */ + + /* No connect info. */ + if (rec.size == 0) + break; + + /* It's me, do nothing. */ + if (strncmp(myaddr, rec.data, rec.size) == 0) + break; + + self.host = (char *)rec.data; + self.host = strtok(self.host, ":"); + if ((c = strtok(NULL, ":")) == NULL) { + dbenv->errx(dbenv, "Bad host specification"); + goto out; + } + self.port = atoi(c); + + /* + * We try to connect to the new site. If we can't, + * we treat it as an error since we know that the site + * should be up if we got a message from it (even + * indirectly). + */ + if (nsites == nsites_allocd) { + /* Need to allocate more space. */ + if ((tmp = realloc( + site_thrs, (10 + nsites) * + sizeof(thread_t))) == NULL) { + ret = errno; + goto out; + } + site_thrs = tmp; + nsites_allocd += 10; + } + if ((ret = connect_site(dbenv, tab, progname, + &self, &already_open, &tid)) != 0) + goto out; + if (!already_open) + memcpy(&site_thrs + [nsites++], &tid, sizeof(thread_t)); + break; + case DB_REP_HOLDELECTION: + if (master_eid == SELF_EID) + break; + /* Make sure that previous election has finished. */ + if (ea != NULL) { + if (thread_join(elect_thr, &status) != 0) { + dbenv->errx(dbenv, + "thread join failure"); + goto out; + } + ea = NULL; + } + if ((ea = calloc(sizeof(elect_args), 1)) == NULL) { + dbenv->errx(dbenv, "can't allocate memory"); + ret = errno; + goto out; + } + ea->dbenv = dbenv; + ea->machtab = tab; + if ((ret = thread_create(&elect_thr, + NULL, elect_thread, (void *)ea)) != 0) { + dbenv->errx(dbenv, + "can't create election thread"); + } + break; + case DB_REP_ISPERM: + break; + case 0: + if (app->elected) { + app->elected = 0; + if ((ret = dbenv->rep_start(dbenv, + NULL, DB_REP_MASTER)) != 0) { + dbenv->err(dbenv, ret, + "can't start as master"); + goto out; + } + } + break; + default: + dbenv->err(dbenv, r, "DB_ENV->rep_process_message"); + break; + } + } + +out: if ((t_ret = machtab_rem(tab, eid, 1)) != 0 && ret == 0) + ret = t_ret; + + /* Don't close the environment before any children exit. */ + if (ea != NULL && thread_join(elect_thr, &status) != 0) + dbenv->errx(dbenv, "can't join election thread"); + + if (site_thrs != NULL) + while (--nsites >= 0) + if (thread_join(site_thrs[nsites], &status) != 0) + dbenv->errx(dbenv, "can't join site thread"); + + return ((void *)(uintptr_t)ret); +} + +/* + * This is a generic thread that spawns a thread to listen for connections + * on a socket and then spawns off child threads to handle each new + * connection. + */ +void * +connect_thread(args) + void *args; +{ + DB_ENV *dbenv; + const char *home, *progname; + hm_loop_args *ha; + connect_args *cargs; + machtab_t *machtab; + thread_t hm_thrs[MAX_THREADS]; + void *status; + int i, eid, port, ret; + socket_t fd, ns; + + ha = NULL; + cargs = (connect_args *)args; + dbenv = cargs->dbenv; + home = cargs->home; + progname = cargs->progname; + machtab = cargs->machtab; + port = cargs->port; + + /* + * Loop forever, accepting connections from new machines, + * and forking off a thread to handle each. + */ + if ((fd = listen_socket_init(progname, port)) < 0) { + ret = errno; + goto err; + } + + for (i = 0; i < MAX_THREADS; i++) { + if ((ns = listen_socket_accept(machtab, + progname, fd, &eid)) == SOCKET_CREATION_FAILURE) { + ret = errno; + goto err; + } + if ((ha = calloc(sizeof(hm_loop_args), 1)) == NULL) { + dbenv->errx(dbenv, "can't allocate memory"); + ret = errno; + goto err; + } + ha->progname = progname; + ha->home = home; + ha->fd = ns; + ha->eid = eid; + ha->tab = machtab; + ha->dbenv = dbenv; + if ((ret = thread_create(&hm_thrs[i++], NULL, + hm_loop, (void *)ha)) != 0) { + dbenv->errx(dbenv, "can't create thread for site"); + goto err; + } + ha = NULL; + } + + /* If we fell out, we ended up with too many threads. */ + dbenv->errx(dbenv, "Too many threads"); + ret = ENOMEM; + + /* Do not return until all threads have exited. */ + while (--i >= 0) + if (thread_join(hm_thrs[i], &status) != 0) + dbenv->errx(dbenv, "can't join site thread"); + +err: return (ret == 0 ? (void *)EXIT_SUCCESS : (void *)EXIT_FAILURE); +} + +/* + * Open a connection to everyone that we've been told about. If we + * cannot open some connections, keep trying. + */ +void * +connect_all(args) + void *args; +{ + DB_ENV *dbenv; + all_args *aa; + const char *home, *progname; + hm_loop_args *ha; + int failed, i, nsites, open, ret, *success; + machtab_t *machtab; + thread_t *hm_thr; + repsite_t *sites; + + ha = NULL; + aa = (all_args *)args; + dbenv = aa->dbenv; + progname = aa->progname; + home = aa->home; + machtab = aa->machtab; + nsites = aa->nsites; + sites = aa->sites; + + ret = 0; + hm_thr = NULL; + success = NULL; + + /* Some implementations of calloc are sad about allocating 0 things. */ + if ((success = calloc(nsites > 0 ? nsites : 1, sizeof(int))) == NULL) { + dbenv->err(dbenv, errno, "connect_all"); + ret = 1; + goto err; + } + + if (nsites > 0 && (hm_thr = calloc(nsites, sizeof(int))) == NULL) { + dbenv->err(dbenv, errno, "connect_all"); + ret = 1; + goto err; + } + + for (failed = nsites; failed > 0;) { + for (i = 0; i < nsites; i++) { + if (success[i]) + continue; + + ret = connect_site(dbenv, machtab, + progname, &sites[i], &open, &hm_thr[i]); + + /* + * If we couldn't make the connection, this isn't + * fatal to the loop, but we have nothing further + * to do on this machine at the moment. + */ + if (ret == DB_REP_UNAVAIL) + continue; + + if (ret != 0) + goto err; + + failed--; + success[i] = 1; + + /* If the connection is already open, we're done. */ + if (ret == 0 && open == 1) + continue; + + } + sleep(1); + } + +err: if (success != NULL) + free(success); + if (hm_thr != NULL) + free(hm_thr); + return (ret ? (void *)EXIT_FAILURE : (void *)EXIT_SUCCESS); +} + +static int +connect_site(dbenv, machtab, progname, site, is_open, hm_thrp) + DB_ENV *dbenv; + machtab_t *machtab; + const char *progname; + repsite_t *site; + int *is_open; + thread_t *hm_thrp; +{ + int eid, ret; + socket_t s; + hm_loop_args *ha; + + if ((s = get_connected_socket(machtab, progname, + site->host, site->port, is_open, &eid)) < 0) + return (DB_REP_UNAVAIL); + + if (*is_open) + return (0); + + if ((ha = calloc(sizeof(hm_loop_args), 1)) == NULL) { + dbenv->errx(dbenv, "can't allocate memory"); + ret = errno; + goto err; + } + + ha->progname = progname; + ha->fd = s; + ha->eid = eid; + ha->tab = machtab; + ha->dbenv = dbenv; + + if ((ret = thread_create(hm_thrp, NULL, + hm_loop, (void *)ha)) != 0) { + dbenv->errx(dbenv, "can't create thread for connected site"); + goto err1; + } + + return (0); + +err1: free(ha); +err: + return (ret); +} + +/* + * We need to spawn off a new thread in which to hold an election in + * case we are the only thread listening on for messages. + */ +static void * +elect_thread(args) + void *args; +{ + DB_ENV *dbenv; + elect_args *eargs; + machtab_t *machtab; + u_int32_t timeout; + int n, ret; + APP_DATA *app; + + eargs = (elect_args *)args; + dbenv = eargs->dbenv; + machtab = eargs->machtab; + free(eargs); + app = dbenv->app_private; + + machtab_parm(machtab, &n, &timeout); + (void)dbenv->rep_set_timeout(dbenv, DB_REP_ELECTION_TIMEOUT, timeout); + while ((ret = dbenv->rep_elect(dbenv, n, (n/2+1), 0)) != 0) + sleep(2); + + if (app->elected) { + app->elected = 0; + if ((ret = dbenv->rep_start(dbenv, NULL, DB_REP_MASTER)) != 0) + dbenv->err(dbenv, ret, + "can't start as master in election thread"); + } + + return (NULL); +} diff --git a/db-4.8.30/examples_c/ex_rep/base/rep_net.c b/db-4.8.30/examples_c/ex_rep/base/rep_net.c new file mode 100644 index 0000000..350300d --- /dev/null +++ b/db-4.8.30/examples_c/ex_rep/base/rep_net.c @@ -0,0 +1,749 @@ +/*- + * See the file LICENSE for redistribution information. + * + * Copyright (c) 2001-2009 Oracle. All rights reserved. + * + * $Id$ + */ + +#include <sys/types.h> +#include <errno.h> +#include <stdio.h> +#include <stdlib.h> +#include <string.h> + +#include <db.h> +#include "rep_base.h" +#ifndef _SYS_QUEUE_H +/* + * Some *BSD Unix variants include the Queue macros in their libraries and + * these might already have been included. In that case, it would be bad + * to include them again. + */ +#include <dbinc/queue.h> /* !!!: for the LIST_XXX macros. */ +#endif + +int machtab_add __P((machtab_t *, socket_t, u_int32_t, int, int *)); +#ifdef DIAGNOSTIC +void machtab_print __P((machtab_t *)); +#endif +ssize_t readn __P((socket_t, void *, size_t)); + +/* + * This file defines the communication infrastructure for the ex_repquote + * sample application. + * + * This application uses TCP/IP for its communication. In an N-site + * replication group, this means that there are N * N communication + * channels so that every site can communicate with every other site + * (this allows elections to be held when the master fails). We do + * not require that anyone know about all sites when the application + * starts up. In order to communicate, the application should know + * about someone, else it has no idea how to ever get in the game. + * + * Communication is handled via a number of different threads. These + * thread functions are implemented in rep_util.c In this file, we + * define the data structures that maintain the state that describes + * the comm infrastructure, the functions that manipulates this state + * and the routines used to actually send and receive data over the + * sockets. + */ + +/* + * The communication infrastructure is represented by a machine table, + * machtab_t, which is essentially a mutex-protected linked list of members + * of the group. The machtab also contains the parameters that are needed + * to call for an election. We hardwire values for these parameters in the + * init function, but these could be set via some configuration setup in a + * real application. We reserve the machine-id 1 to refer to ourselves and + * make the machine-id 0 be invalid. + */ + +#define MACHID_INVALID 0 +#define MACHID_SELF 1 + +struct __machtab { + LIST_HEAD(__machlist, __member) machlist; + int nextid; + mutex_t mtmutex; + u_int32_t timeout_time; + int current; + int max; + int nsites; +}; + +/* Data structure that describes each entry in the machtab. */ +struct __member { + u_int32_t hostaddr; /* Host IP address. */ + int port; /* Port number. */ + int eid; /* Application-specific machine id. */ + socket_t fd; /* File descriptor for the socket. */ + LIST_ENTRY(__member) links; + /* For linked list of all members we know of. */ +}; + +static int quote_send_broadcast __P((machtab_t *, + const DBT *, const DBT *, u_int32_t)); +static int quote_send_one __P((const DBT *, const DBT *, socket_t, u_int32_t)); + +/* + * machtab_init -- + * Initialize the machine ID table. + * XXX Right now we treat the number of sites as the maximum + * number we've ever had on the list at one time. We probably + * want to make that smarter. + */ +int +machtab_init(machtabp, nsites) + machtab_t **machtabp; + int nsites; +{ + int ret; + machtab_t *machtab; + + if ((machtab = malloc(sizeof(machtab_t))) == NULL) { + fprintf(stderr, "can't allocate memory\n"); + return (ENOMEM); + } + + LIST_INIT(&machtab->machlist); + + /* Reserve eid's 0 and 1. */ + machtab->nextid = 2; + machtab->timeout_time = 2 * 1000000; /* 2 seconds. */ + machtab->current = machtab->max = 0; + machtab->nsites = nsites; + + ret = mutex_init(&machtab->mtmutex, NULL); + *machtabp = machtab; + + return (ret); +} + +/* + * machtab_add -- + * Add a file descriptor to the table of machines, returning + * a new machine ID. + */ +int +machtab_add(machtab, fd, hostaddr, port, idp) + machtab_t *machtab; + socket_t fd; + u_int32_t hostaddr; + int port, *idp; +{ + int ret; + member_t *m, *member; + + ret = 0; + if ((member = malloc(sizeof(member_t))) == NULL) { + fprintf(stderr, "can't allocate memory\n"); + return (ENOMEM); + } + + member->fd = fd; + member->hostaddr = hostaddr; + member->port = port; + + if ((ret = mutex_lock(&machtab->mtmutex)) != 0) { + fprintf(stderr, "can't lock mutex"); + return (ret); + } + + for (m = LIST_FIRST(&machtab->machlist); + m != NULL; m = LIST_NEXT(m, links)) + if (m->hostaddr == hostaddr && m->port == port) + break; + + if (m == NULL) { + member->eid = machtab->nextid++; + LIST_INSERT_HEAD(&machtab->machlist, member, links); + } else + member->eid = m->eid; + + if ((ret = mutex_unlock(&machtab->mtmutex)) != 0) { + fprintf(stderr, "can't unlock mutex\n"); + return (ret); + } + + if (idp != NULL) + *idp = member->eid; + + if (m == NULL) { + if (++machtab->current > machtab->max) + machtab->max = machtab->current; + } else { + free(member); + ret = EEXIST; + } +#ifdef DIAGNOSTIC + printf("Exiting machtab_add\n"); + machtab_print(machtab); +#endif + return (ret); +} + +/* + * machtab_getinfo -- + * Return host and port information for a particular machine id. + */ +int +machtab_getinfo(machtab, eid, hostp, portp) + machtab_t *machtab; + int eid; + u_int32_t *hostp; + int *portp; +{ + int ret; + member_t *member; + + if ((ret = mutex_lock(&machtab->mtmutex)) != 0) { + fprintf(stderr, "can't lock mutex\n"); + return (ret); + } + + for (member = LIST_FIRST(&machtab->machlist); + member != NULL; + member = LIST_NEXT(member, links)) + if (member->eid == eid) { + *hostp = member->hostaddr; + *portp = member->port; + break; + } + + if ((ret = mutex_unlock(&machtab->mtmutex)) != 0) { + fprintf(stderr, "can't unlock mutex\n"); + return (ret); + } + + return (member != NULL ? 0 : EINVAL); +} + +/* + * machtab_rem -- + * Remove a mapping from the table of machines. Lock indicates + * whether we need to lock the machtab or not (0 indicates we do not + * need to lock; non-zero indicates that we do need to lock). + */ +int +machtab_rem(machtab, eid, lock) + machtab_t *machtab; + int eid; + int lock; +{ + int found, ret; + member_t *member; + + ret = 0; + if (lock && (ret = mutex_lock(&machtab->mtmutex)) != 0) { + fprintf(stderr, "can't lock mutex\n"); + return (ret); + } + + for (found = 0, member = LIST_FIRST(&machtab->machlist); + member != NULL; + member = LIST_NEXT(member, links)) + if (member->eid == eid) { + found = 1; + LIST_REMOVE(member, links); + (void)closesocket(member->fd); + free(member); + machtab->current--; + break; + } + + if (LIST_FIRST(&machtab->machlist) == NULL) + machtab->nextid = 2; + + if (lock && (ret = mutex_unlock(&machtab->mtmutex)) != 0) + fprintf(stderr, "can't unlock mutex\n"); + +#ifdef DIAGNOSTIC + printf("Exiting machtab_rem\n"); + machtab_print(machtab); +#endif + return (ret); +} + +void +machtab_parm(machtab, nump, timeoutp) + machtab_t *machtab; + int *nump; + u_int32_t *timeoutp; +{ + if (machtab->nsites == 0) + *nump = machtab->max; + else + *nump = machtab->nsites; + *timeoutp = machtab->timeout_time; +} + +#ifdef DIAGNOSTIC +void +machtab_print(machtab) + machtab_t *machtab; +{ + member_t *m; + + if (mutex_lock(&machtab->mtmutex) != 0) { + fprintf(stderr, "can't lock mutex\n"); + abort(); + } + + for (m = LIST_FIRST(&machtab->machlist); + m != NULL; m = LIST_NEXT(m, links)) { + + printf("IP: %lx Port: %6d EID: %2d FD: %3d\n", + (long)m->hostaddr, m->port, m->eid, m->fd); + } + + if (mutex_unlock(&machtab->mtmutex) != 0) { + fprintf(stderr, "can't unlock mutex\n"); + abort(); + } +} +#endif +/* + * listen_socket_init -- + * Initialize a socket for listening on the specified port. Returns + * a file descriptor for the socket, ready for an accept() call + * in a thread that we're happy to let block. + */ +socket_t +listen_socket_init(progname, port) + const char *progname; + int port; +{ + socket_t s; + int sockopt; + struct sockaddr_in si; + + COMPQUIET(progname, NULL); + + if ((s = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0) { + perror("can't create listen socket"); + return (-1); + } + + memset(&si, 0, sizeof(si)); + si.sin_family = AF_INET; + si.sin_addr.s_addr = htonl(INADDR_ANY); + si.sin_port = htons((unsigned short)port); + + /* + * When using this example for testing, it's common to kill and restart + * regularly. On some systems, this causes bind to fail with "address + * in use" errors unless this option is set. + */ + sockopt = 1; + setsockopt(s, SOL_SOCKET, SO_REUSEADDR, + (const char *)&sockopt, sizeof(sockopt)); + + if (bind(s, (struct sockaddr *)&si, sizeof(si)) != 0) { + perror("can't bind listen socket"); + goto err; + } + + if (listen(s, 5) != 0) { + perror("can't establish listen queue"); + goto err; + } + + return (s); + +err: closesocket(s); + return (-1); +} + +/* + * listen_socket_accept -- + * Accept a connection on a socket. This is essentially just a wrapper + * for accept(3). + */ +socket_t +listen_socket_accept(machtab, progname, s, eidp) + machtab_t *machtab; + const char *progname; + socket_t s; + int *eidp; +{ + struct sockaddr_in si; + socklen_t si_len; + int host, ret; + socket_t ns; + u_int16_t port; + + COMPQUIET(progname, NULL); + +accept_wait: + memset(&si, 0, sizeof(si)); + si_len = sizeof(si); + ns = accept(s, (struct sockaddr *)&si, &si_len); + if (ns == SOCKET_CREATION_FAILURE) { + fprintf(stderr, "can't accept incoming connection\n"); + return ns; + } + host = ntohl(si.sin_addr.s_addr); + + /* + * Sites send their listening port when connections are first + * established, as it will be different from the outgoing port + * for this connection. + */ + if (readn(ns, &port, 2) != 2) + goto err; + port = ntohs(port); + + ret = machtab_add(machtab, ns, host, port, eidp); + if (ret == EEXIST) { + closesocket(ns); + goto accept_wait; + } else if (ret != 0) + goto err; + printf("Connected to host %x port %d, eid = %d\n", host, port, *eidp); + return (ns); + +err: closesocket(ns); + return SOCKET_CREATION_FAILURE; +} + +/* + * get_connected_socket -- + * Connect to the specified port of the specified remote machine, + * and return a file descriptor when we have accepted a connection on it. + * Add this connection to the machtab. If we already have a connection + * open to this machine, then don't create another one, return the eid + * of the connection (in *eidp) and set is_open to 1. Return 0. + */ +socket_t +get_connected_socket(machtab, progname, remotehost, port, is_open, eidp) + machtab_t *machtab; + const char *progname, *remotehost; + int port, *is_open, *eidp; +{ + int ret; + socket_t s; + struct hostent *hp; + struct sockaddr_in si; + u_int32_t addr; + u_int16_t nport; + + *is_open = 0; + + if ((hp = gethostbyname(remotehost)) == NULL) { + fprintf(stderr, "%s: host not found: %s\n", progname, + strerror(net_errno)); + return (-1); + } + + if ((s = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0) { + perror("can't create outgoing socket"); + return (-1); + } + memset(&si, 0, sizeof(si)); + memcpy((char *)&si.sin_addr, hp->h_addr, hp->h_length); + addr = ntohl(si.sin_addr.s_addr); + ret = machtab_add(machtab, s, addr, port, eidp); + if (ret == EEXIST) { + *is_open = 1; + closesocket(s); + return (0); + } else if (ret != 0) { + closesocket(s); + return (-1); + } + + si.sin_family = AF_INET; + si.sin_port = htons((unsigned short)port); + if (connect(s, (struct sockaddr *)&si, sizeof(si)) < 0) { + fprintf(stderr, "%s: connection failed: %s\n", + progname, strerror(net_errno)); + (void)machtab_rem(machtab, *eidp, 1); + return (-1); + } + + /* + * The first thing we send on the socket is our (listening) port + * so the site we are connecting to can register us correctly in + * its machtab. + */ + nport = htons(myport); + writesocket(s, &nport, 2); + + return (s); +} + +/* + * get_next_message -- + * Read a single message from the specified file descriptor, and + * return it in the format used by rep functions (two DBTs and a type). + * + * This function is called in a loop by both clients and masters, and + * the resulting DBTs are manually dispatched to DB_ENV->rep_process_message(). + */ +int +get_next_message(fd, rec, control) + socket_t fd; + DBT *rec, *control; +{ + size_t nr; + u_int32_t rsize, csize; + u_int8_t *recbuf, *controlbuf; + + /* + * The protocol we use on the wire is dead simple: + * + * 4 bytes - rec->size + * (# read above) - rec->data + * 4 bytes - control->size + * (# read above) - control->data + */ + + /* Read rec->size. */ + nr = readn(fd, &rsize, 4); + if (nr != 4) + return (1); + + /* Read the record itself. */ + if (rsize > 0) { + if (rec->size < rsize) + rec->data = realloc(rec->data, rsize); + if ((recbuf = rec->data) == NULL) + return (1); + nr = readn(fd, recbuf, rsize); + } else { + if (rec->data != NULL) + free(rec->data); + rec->data = NULL; + } + rec->size = rsize; + + /* Read control->size. */ + nr = readn(fd, &csize, 4); + if (nr != 4) + return (1); + + /* Read the control struct itself. */ + if (csize > 0) { + controlbuf = control->data; + if (control->size < csize) + controlbuf = realloc(controlbuf, csize); + if (controlbuf == NULL) + return (1); + nr = readn(fd, controlbuf, csize); + if (nr != csize) + return (1); + } else { + if (control->data != NULL) + free(control->data); + controlbuf = NULL; + } + control->data = controlbuf; + control->size = csize; + + return (0); +} + +/* + * readn -- + * Read a full n characters from a file descriptor, unless we get an error + * or EOF. + */ +ssize_t +readn(fd, vptr, n) + socket_t fd; + void *vptr; + size_t n; +{ + size_t nleft; + ssize_t nread; + char *ptr; + + ptr = vptr; + nleft = n; + while (nleft > 0) { + if ((nread = readsocket(fd, ptr, nleft)) < 0) { + /* + * Call read() again on interrupted system call; + * on other errors, bail. + */ + if (net_errno == EINTR) + nread = 0; + else { + perror("can't read from socket"); + return (-1); + } + } else if (nread == 0) + break; /* EOF */ + + nleft -= nread; + ptr += nread; + } + + return (n - nleft); +} + +/* + * quote_send -- + * The f_send function for DB_ENV->set_rep_transport. + */ +int +quote_send(dbenv, control, rec, lsnp, eid, flags) + DB_ENV *dbenv; + const DBT *control, *rec; + const DB_LSN *lsnp; + int eid; + u_int32_t flags; +{ + int n, ret, t_ret; + socket_t fd; + machtab_t *machtab; + member_t *m; + + COMPQUIET(lsnp, NULL); + machtab = + (machtab_t *)((APP_DATA*)dbenv->app_private)->comm_infrastructure; + + if (eid == DB_EID_BROADCAST) { + /* + * Right now, we do not require successful transmission. + * I'd like to move this requiring at least one successful + * transmission on PERMANENT requests. + */ + n = quote_send_broadcast(machtab, rec, control, flags); + if (n < 0 /*|| (n == 0 && LF_ISSET(DB_REP_PERMANENT))*/) + return (DB_REP_UNAVAIL); + return (0); + } + + if ((ret = mutex_lock(&machtab->mtmutex)) != 0) { + dbenv->errx(dbenv, "can't lock mutex"); + return (ret); + } + + fd = 0; + for (m = LIST_FIRST(&machtab->machlist); m != NULL; + m = LIST_NEXT(m, links)) { + if (m->eid == eid) { + fd = m->fd; + break; + } + } + + if (fd == 0) { + dbenv->err(dbenv, DB_REP_UNAVAIL, + "quote_send: cannot find machine ID %d", eid); + return (DB_REP_UNAVAIL); + } + + if ((ret = quote_send_one(rec, control, fd, flags)) != 0) + fprintf(stderr, "socket write error in send() function\n"); + + if ((t_ret = mutex_unlock(&machtab->mtmutex)) != 0) { + dbenv->errx(dbenv, "can't unlock mutex"); + if (ret == 0) + ret = t_ret; + } + + return (ret); +} + +/* + * quote_send_broadcast -- + * Send a message to everybody. + * Returns the number of sites to which this message was successfully + * communicated. A -1 indicates a fatal error. + */ +static int +quote_send_broadcast(machtab, rec, control, flags) + machtab_t *machtab; + const DBT *rec, *control; + u_int32_t flags; +{ + int ret, sent; + member_t *m, *next; + + if ((ret = mutex_lock(&machtab->mtmutex)) != 0) { + fprintf(stderr, "can't lock mutex\n"); + return (ret); + } + + sent = 0; + for (m = LIST_FIRST(&machtab->machlist); m != NULL; m = next) { + next = LIST_NEXT(m, links); + if ((ret = quote_send_one(rec, control, m->fd, flags)) != 0) { + fprintf(stderr, "socket write error in broadcast\n"); + (void)machtab_rem(machtab, m->eid, 0); + } else + sent++; + } + + if (mutex_unlock(&machtab->mtmutex) != 0) { + fprintf(stderr, "can't unlock mutex\n"); + return (-1); + } + + return (sent); +} + +/* + * quote_send_one -- + * Send a message to a single machine, given that machine's file + * descriptor. + * + * !!! + * Note that the machtab mutex should be held through this call. + * It doubles as a synchronizer to make sure that two threads don't + * intersperse writes that are part of two single messages. + */ +static int +quote_send_one(rec, control, fd, flags) + const DBT *rec, *control; + socket_t fd; + u_int32_t flags; + +{ + int retry; + ssize_t bytes_left, nw; + u_int8_t *wp; + + COMPQUIET(flags, 0); + + /* + * The protocol is simply: write rec->size, write rec->data, + * write control->size, write control->data. + */ + nw = writesocket(fd, (const char *)&rec->size, 4); + if (nw != 4) + return (DB_REP_UNAVAIL); + + if (rec->size > 0) { + nw = writesocket(fd, rec->data, rec->size); + if (nw < 0) + return (DB_REP_UNAVAIL); + if (nw != (ssize_t)rec->size) { + /* Try a couple of times to finish the write. */ + wp = (u_int8_t *)rec->data + nw; + bytes_left = rec->size - nw; + for (retry = 0; bytes_left > 0 && retry < 3; retry++) { + nw = writesocket(fd, wp, bytes_left); + if (nw < 0) + return (DB_REP_UNAVAIL); + bytes_left -= nw; + wp += nw; + } + if (bytes_left > 0) + return (DB_REP_UNAVAIL); + } + } + + nw = writesocket(fd, (const char *)&control->size, 4); + if (nw != 4) + return (DB_REP_UNAVAIL); + if (control->size > 0) { + nw = writesocket(fd, control->data, control->size); + if (nw != (ssize_t)control->size) + return (DB_REP_UNAVAIL); + } + return (0); +} diff --git a/db-4.8.30/examples_c/ex_rep/common/rep_common.c b/db-4.8.30/examples_c/ex_rep/common/rep_common.c new file mode 100644 index 0000000..c98ea98 --- /dev/null +++ b/db-4.8.30/examples_c/ex_rep/common/rep_common.c @@ -0,0 +1,672 @@ +/*- + * See the file LICENSE for redistribution information. + * + * Copyright (c) 2006-2009 Oracle. All rights reserved. + * + * $Id$ + */ + +#include <errno.h> +#include <stdlib.h> +#include <string.h> +#include <time.h> + +#include <db.h> + +#include "rep_common.h" + +#define CACHESIZE (10 * 1024 * 1024) +#define DATABASE "quote.db" +#define SLEEPTIME 3 + +static int print_stocks __P((DB *)); + +/* + * Perform command line parsing and common replication setup for the repmgr + * and base replication example programs. + */ +int +common_rep_setup(dbenv, argc, argv, setup_info) + DB_ENV *dbenv; + int argc; + char *argv[]; + SETUP_DATA *setup_info; +{ + repsite_t site; + extern char *optarg; + char ch, *portstr; + int ack_policy, got_self, is_repmgr, maxsites, priority, ret; + + got_self = is_repmgr = maxsites = ret = site.peer = 0; + + priority = 100; + ack_policy = DB_REPMGR_ACKS_QUORUM; + setup_info->role = UNKNOWN; + if (strncmp(setup_info->progname, "ex_rep_mgr", 10) == 0) + is_repmgr = 1; + + /* + * Replication setup calls that are only needed if a command + * line option is specified are made within this while/switch + * statement. Replication setup calls that should be made + * whether or not a command line option is specified are after + * this while/switch statement. + */ + while ((ch = getopt(argc, argv, "a:bCh:l:Mn:p:R:r:v")) != EOF) { + switch (ch) { + case 'a': + if (!is_repmgr) + usage(is_repmgr, setup_info->progname); + if (strncmp(optarg, "all", 3) == 0) + ack_policy = DB_REPMGR_ACKS_ALL; + else if (strncmp(optarg, "quorum", 6) != 0) + usage(is_repmgr, setup_info->progname); + break; + case 'b': + /* + * Configure bulk transfer to send groups of records + * to clients in a single network transfer. This is + * useful for master sites and clients participating + * in client-to-client synchronization. + */ + if ((ret = dbenv->rep_set_config(dbenv, + DB_REP_CONF_BULK, 1)) != 0) { + dbenv->err(dbenv, ret, + "Could not configure bulk transfer.\n"); + goto err; + } + break; + case 'C': + setup_info->role = CLIENT; + break; + case 'h': + setup_info->home = optarg; + break; + case 'l': + setup_info->self.host = strtok(optarg, ":"); + if ((portstr = strtok(NULL, ":")) == NULL) { + fprintf(stderr, "Bad host specification.\n"); + goto err; + } + setup_info->self.port = (unsigned short)atoi(portstr); + setup_info->self.peer = 0; + got_self = 1; + break; + case 'M': + setup_info->role = MASTER; + break; + case 'n': + setup_info->nsites = atoi(optarg); + /* + * For repmgr, set the total number of sites in the + * replication group. This is used by repmgr internal + * election processing. For base replication, nsites + * is simply passed back to main for use in its + * communications and election processing. + */ + if (is_repmgr && setup_info->nsites > 0 && + (ret = dbenv->rep_set_nsites(dbenv, + setup_info->nsites)) != 0) { + dbenv->err(dbenv, ret, + "Could not set nsites.\n"); + goto err; + } + break; + case 'p': + priority = atoi(optarg); + break; + case 'R': + if (!is_repmgr) + usage(is_repmgr, setup_info->progname); + site.peer = 1; /* FALLTHROUGH */ + case 'r': + site.host = optarg; + site.host = strtok(site.host, ":"); + if ((portstr = strtok(NULL, ":")) == NULL) { + fprintf(stderr, "Bad host specification.\n"); + goto err; + } + site.port = (unsigned short)atoi(portstr); + if (setup_info->site_list == NULL || + setup_info->remotesites >= maxsites) { + maxsites = maxsites == 0 ? 10 : 2 * maxsites; + if ((setup_info->site_list = + realloc(setup_info->site_list, + maxsites * sizeof(repsite_t))) == NULL) { + fprintf(stderr, "System error %s\n", + strerror(errno)); + goto err; + } + } + (setup_info->site_list)[(setup_info->remotesites)++] = + site; + site.peer = 0; + break; + case 'v': + if ((ret = dbenv->set_verbose(dbenv, + DB_VERB_REPLICATION, 1)) != 0) + goto err; + break; + case '?': + default: + usage(is_repmgr, setup_info->progname); + } + } + + /* Error check command line. */ + if (!got_self || setup_info->home == NULL) + usage(is_repmgr, setup_info->progname); + if (!is_repmgr && setup_info->role == UNKNOWN) { + fprintf(stderr, "Must specify -M or -C.\n"); + goto err; + } + + /* + * Set replication group election priority for this environment. + * An election first selects the site with the most recent log + * records as the new master. If multiple sites have the most + * recent log records, the site with the highest priority value + * is selected as master. + */ + if ((ret = dbenv->rep_set_priority(dbenv, priority)) != 0) { + dbenv->err(dbenv, ret, "Could not set priority.\n"); + goto err; + } + + /* + * For repmgr, set the policy that determines how master and client + * sites handle acknowledgement of replication messages needed for + * permanent records. The default policy of "quorum" requires only + * a quorum of electable peers sufficient to ensure a permanent + * record remains durable if an election is held. The "all" option + * requires all clients to acknowledge a permanent replication + * message instead. + */ + if (is_repmgr && + (ret = dbenv->repmgr_set_ack_policy(dbenv, ack_policy)) != 0) { + dbenv->err(dbenv, ret, "Could not set ack policy.\n"); + goto err; + } + + /* + * Set the threshold for the minimum and maximum time the client + * waits before requesting retransmission of a missing message. + * Base these values on the performance and load characteristics + * of the master and client host platforms as well as the round + * trip message time. + */ + if ((ret = dbenv->rep_set_request(dbenv, 20000, 500000)) != 0) { + dbenv->err(dbenv, ret, + "Could not set client_retransmission defaults.\n"); + goto err; + } + + /* + * Configure deadlock detection to ensure that any deadlocks + * are broken by having one of the conflicting lock requests + * rejected. DB_LOCK_DEFAULT uses the lock policy specified + * at environment creation time or DB_LOCK_RANDOM if none was + * specified. + */ + if ((ret = dbenv->set_lk_detect(dbenv, DB_LOCK_DEFAULT)) != 0) { + dbenv->err(dbenv, ret, + "Could not configure deadlock detection.\n"); + goto err; + } + + /* The following base replication features may also be useful to your + * application. See Berkeley DB documentation for more details. + * - Master leases: Provide stricter consistency for data reads + * on a master site. + * - Timeouts: Customize the amount of time Berkeley DB waits + * for such things as an election to be concluded or a master + * lease to be granted. + * - Delayed client synchronization: Manage the master site's + * resources by spreading out resource-intensive client + * synchronizations. + * - Blocked client operations: Return immediately with an error + * instead of waiting indefinitely if a client operation is + * blocked by an ongoing client synchronization. + */ + +err: + return (ret); +} + +static int +print_stocks(dbp) + DB *dbp; +{ + DBC *dbc; + DBT key, data; +#define MAXKEYSIZE 10 +#define MAXDATASIZE 20 + char keybuf[MAXKEYSIZE + 1], databuf[MAXDATASIZE + 1]; + int ret, t_ret; + u_int32_t keysize, datasize; + + if ((ret = dbp->cursor(dbp, NULL, &dbc, 0)) != 0) { + dbp->err(dbp, ret, "can't open cursor"); + return (ret); + } + + memset(&key, 0, sizeof(key)); + memset(&data, 0, sizeof(data)); + + printf("\tSymbol\tPrice\n"); + printf("\t======\t=====\n"); + + for (ret = dbc->get(dbc, &key, &data, DB_FIRST); + ret == 0; + ret = dbc->get(dbc, &key, &data, DB_NEXT)) { + keysize = key.size > MAXKEYSIZE ? MAXKEYSIZE : key.size; + memcpy(keybuf, key.data, keysize); + keybuf[keysize] = '\0'; + + datasize = data.size >= MAXDATASIZE ? MAXDATASIZE : data.size; + memcpy(databuf, data.data, datasize); + databuf[datasize] = '\0'; + + printf("\t%s\t%s\n", keybuf, databuf); + } + printf("\n"); + fflush(stdout); + + if ((t_ret = dbc->close(dbc)) != 0 && ret == 0) + ret = t_ret; + + switch (ret) { + case 0: + case DB_NOTFOUND: + case DB_LOCK_DEADLOCK: + return (0); + default: + return (ret); + } +} + +/* Start checkpoint and log archive support threads. */ +int +start_support_threads(dbenv, sup_args, ckp_thr, lga_thr) + DB_ENV *dbenv; + supthr_args *sup_args; + thread_t *ckp_thr; + thread_t *lga_thr; +{ + int ret; + + ret = 0; + if ((ret = thread_create(ckp_thr, NULL, checkpoint_thread, + sup_args)) != 0) { + dbenv->errx(dbenv, "can't create checkpoint thread"); + goto err; + } + if ((ret = thread_create(lga_thr, NULL, log_archive_thread, + sup_args)) != 0) + dbenv->errx(dbenv, "can't create log archive thread"); +err: + return (ret); + +} + +/* Wait for checkpoint and log archive support threads to finish. */ +int +finish_support_threads(ckp_thr, lga_thr) + thread_t *ckp_thr; + thread_t *lga_thr; +{ + void *ctstatus, *ltstatus; + int ret; + + ret = 0; + if (thread_join(*lga_thr, <status) || + thread_join(*ckp_thr, &ctstatus)) { + ret = -1; + goto err; + } + if ((uintptr_t)ltstatus != EXIT_SUCCESS || + (uintptr_t)ctstatus != EXIT_SUCCESS) + ret = -1; +err: + return (ret); +} + +#define BUFSIZE 1024 + +int +doloop(dbenv, shared_data) + DB_ENV *dbenv; + SHARED_DATA *shared_data; +{ + DB *dbp; + DBT key, data; + char buf[BUFSIZE], *first, *price; + u_int32_t flags; + int ret; + + dbp = NULL; + ret = 0; + memset(&key, 0, sizeof(key)); + memset(&data, 0, sizeof(data)); + + for (;;) { + printf("QUOTESERVER%s> ", + shared_data->is_master ? "" : " (read-only)"); + fflush(stdout); + + if (fgets(buf, sizeof(buf), stdin) == NULL) + break; + +#define DELIM " \t\n" + if ((first = strtok(&buf[0], DELIM)) == NULL) { + /* Blank input line. */ + price = NULL; + } else if ((price = strtok(NULL, DELIM)) == NULL) { + /* Just one input token. */ + if (strncmp(buf, "exit", 4) == 0 || + strncmp(buf, "quit", 4) == 0) { + /* + * This makes the checkpoint and log + * archive threads stop. + */ + shared_data->app_finished = 1; + break; + } + dbenv->errx(dbenv, "Format: TICKER VALUE"); + continue; + } else { + /* Normal two-token input line. */ + if (first != NULL && !shared_data->is_master) { + dbenv->errx(dbenv, "Can't update at client"); + continue; + } + } + + if (dbp == NULL) { + if ((ret = db_create(&dbp, dbenv, 0)) != 0) + return (ret); + + flags = DB_AUTO_COMMIT; + /* + * Open database with DB_CREATE only if this is + * a master database. A client database uses + * polling to attempt to open the database without + * DB_CREATE until it is successful. + * + * This DB_CREATE polling logic can be simplified + * under some circumstances. For example, if the + * application can be sure a database is already + * there, it would never need to open it with + * DB_CREATE. + */ + if (shared_data->is_master) + flags |= DB_CREATE; + if ((ret = dbp->open(dbp, + NULL, DATABASE, NULL, DB_BTREE, flags, 0)) != 0) { + if (ret == ENOENT) { + printf( + "No stock database yet available.\n"); + if ((ret = dbp->close(dbp, 0)) != 0) { + dbenv->err(dbenv, ret, + "DB->close"); + goto err; + } + dbp = NULL; + continue; + } + if (ret == DB_REP_HANDLE_DEAD || + ret == DB_LOCK_DEADLOCK) { + dbenv->err(dbenv, ret, + "please retry the operation"); + dbp->close(dbp, DB_NOSYNC); + dbp = NULL; + continue; + } + dbenv->err(dbenv, ret, "DB->open"); + goto err; + } + } + + if (first == NULL) { + /* + * If this is a client in the middle of + * synchronizing with the master, the client data + * is possibly stale and won't be displayed until + * client synchronization is finished. It is also + * possible to display the stale data if this is + * acceptable to the application. + */ + if (shared_data->in_client_sync) + printf( +"Cannot read data during client synchronization - please try again.\n"); + else + switch ((ret = print_stocks(dbp))) { + case 0: + break; + case DB_REP_HANDLE_DEAD: + (void)dbp->close(dbp, DB_NOSYNC); + dbp = NULL; + break; + default: + dbp->err(dbp, ret, + "Error traversing data"); + goto err; + } + } else { + key.data = first; + key.size = (u_int32_t)strlen(first); + + data.data = price; + data.size = (u_int32_t)strlen(price); + + if ((ret = dbp->put(dbp, + NULL, &key, &data, DB_AUTO_COMMIT)) != 0) { + dbp->err(dbp, ret, "DB->put"); + goto err; + } + } + } + +err: if (dbp != NULL) + (void)dbp->close(dbp, DB_NOSYNC); + return (ret); +} + +int +create_env(progname, dbenvp) + const char *progname; + DB_ENV **dbenvp; +{ + DB_ENV *dbenv; + int ret; + + if ((ret = db_env_create(&dbenv, 0)) != 0) { + fprintf(stderr, "can't create env handle: %s\n", + db_strerror(ret)); + return (ret); + } + + dbenv->set_errfile(dbenv, stderr); + dbenv->set_errpfx(dbenv, progname); + + *dbenvp = dbenv; + return (0); +} + +/* Open and configure an environment. */ +int +env_init(dbenv, home) + DB_ENV *dbenv; + const char *home; +{ + u_int32_t flags; + int ret; + + (void)dbenv->set_cachesize(dbenv, 0, CACHESIZE, 0); + (void)dbenv->set_flags(dbenv, DB_TXN_NOSYNC, 1); + + flags = DB_CREATE | DB_INIT_LOCK | DB_INIT_LOG | DB_INIT_MPOOL | + DB_INIT_REP | DB_INIT_TXN | DB_RECOVER | DB_THREAD; + if ((ret = dbenv->open(dbenv, home, flags, 0)) != 0) + dbenv->err(dbenv, ret, "can't open environment"); + return (ret); +} + +/* + * In this application, we specify all communication via the command line. In + * a real application, we would expect that information about the other sites + * in the system would be maintained in some sort of configuration file. The + * critical part of this interface is that we assume at startup that we can + * find out + * 1) what host/port we wish to listen on for connections, + * 2) a (possibly empty) list of other sites we should attempt to connect + * to; and + * 3) what our Berkeley DB home environment is. + * + * These pieces of information are expressed by the following flags. + * -a all|quorum (optional; repmgr only, a stands for ack policy) + * -b (optional, b stands for bulk) + * -C or -M start up as client or master (optional for repmgr, required + * for base example) + * -h home directory (required) + * -l host:port (required; l stands for local) + * -n nsites (optional; number of sites in replication group; defaults to 0 + * in which case we try to dynamically compute the number of sites in + * the replication group) + * -p priority (optional: defaults to 100) + * -r host:port (optional; r stands for remote; any number of these may be + * specified) + * -R host:port (optional; repmgr only, remote peer) + * -v (optional; v stands for verbose) + */ +void +usage(is_repmgr, progname) + const int is_repmgr; + const char *progname; +{ + fprintf(stderr, "usage: %s ", progname); + if (is_repmgr) + fprintf(stderr, "[-CM]-h home -l host:port[-r host:port]%s%s", + "[-R host:port][-a all|quorum][-b][-n nsites]", + "[-p priority][-v]\n"); + else + fprintf(stderr, "-CM -h home -l host:port[-r host:port]%s", + "[-b][-n nsites][-p priority][-v]\n"); + exit(EXIT_FAILURE); +} + +/* + * This is a very simple thread that performs checkpoints at a fixed + * time interval. For a master site, the time interval is one minute + * plus the duration of the checkpoint_delay timeout (30 seconds by + * default.) For a client site, the time interval is one minute. + */ +void * +checkpoint_thread(args) + void *args; +{ + DB_ENV *dbenv; + SHARED_DATA *shared; + supthr_args *ca; + int i, ret; + + ca = (supthr_args *)args; + dbenv = ca->dbenv; + shared = ca->shared; + + for (;;) { + /* + * Wait for one minute, polling once per second to see if + * application has finished. When application has finished, + * terminate this thread. + */ + for (i = 0; i < 60; i++) { + sleep(1); + if (shared->app_finished == 1) + return ((void *)EXIT_SUCCESS); + } + + /* Perform a checkpoint. */ + if ((ret = dbenv->txn_checkpoint(dbenv, 0, 0, 0)) != 0) { + dbenv->err(dbenv, ret, + "Could not perform checkpoint.\n"); + return ((void *)EXIT_FAILURE); + } + } +} + +/* + * This is a simple log archive thread. Once per minute, it removes all but + * the most recent 3 logs that are safe to remove according to a call to + * DB_ENV->log_archive(). + * + * Log cleanup is needed to conserve disk space, but aggressive log cleanup + * can cause more frequent client initializations if a client lags too far + * behind the current master. This can happen in the event of a slow client, + * a network partition, or a new master that has not kept as many logs as the + * previous master. + * + * The approach in this routine balances the need to mitigate against a + * lagging client by keeping a few more of the most recent unneeded logs + * with the need to conserve disk space by regularly cleaning up log files. + * Use of automatic log removal (DB_ENV->log_set_config() DB_LOG_AUTO_REMOVE + * flag) is not recommended for replication due to the risk of frequent + * client initializations. + */ +void * +log_archive_thread(args) + void *args; +{ + DB_ENV *dbenv; + SHARED_DATA *shared; + char **begin, **list; + supthr_args *la; + int i, listlen, logs_to_keep, minlog, ret; + + la = (supthr_args *)args; + dbenv = la->dbenv; + shared = la->shared; + logs_to_keep = 3; + + for (;;) { + /* + * Wait for one minute, polling once per second to see if + * application has finished. When application has finished, + * terminate this thread. + */ + for (i = 0; i < 60; i++) { + sleep(1); + if (shared->app_finished == 1) + return ((void *)EXIT_SUCCESS); + } + + /* Get the list of unneeded log files. */ + if ((ret = dbenv->log_archive(dbenv, &list, DB_ARCH_ABS)) + != 0) { + dbenv->err(dbenv, ret, + "Could not get log archive list."); + return ((void *)EXIT_FAILURE); + } + if (list != NULL) { + listlen = 0; + /* Get the number of logs in the list. */ + for (begin = list; *begin != NULL; begin++, listlen++); + /* + * Remove all but the logs_to_keep most recent + * unneeded log files. + */ + minlog = listlen - logs_to_keep; + for (begin = list, i= 0; i < minlog; list++, i++) { + if ((ret = unlink(*list)) != 0) { + dbenv->err(dbenv, ret, + "logclean: remove %s", *list); + dbenv->errx(dbenv, + "logclean: Error remove %s", *list); + free(begin); + return ((void *)EXIT_FAILURE); + } + } + free(begin); + } + } +} diff --git a/db-4.8.30/examples_c/ex_rep/common/rep_common.h b/db-4.8.30/examples_c/ex_rep/common/rep_common.h new file mode 100644 index 0000000..d81b9d2 --- /dev/null +++ b/db-4.8.30/examples_c/ex_rep/common/rep_common.h @@ -0,0 +1,81 @@ +/*- + * See the file LICENSE for redistribution information. + * + * Copyright (c) 2006-2009 Oracle. All rights reserved. + * + * $Id$ + */ + +/* User-specified role an environment should play in the replication group. */ +typedef enum { MASTER, CLIENT, UNKNOWN } ENV_ROLE; + +/* User-specified information about a replication site. */ +typedef struct { + char *host; /* Host name. */ + u_int32_t port; /* Port on which to connect to this site. */ + int peer; /* Whether remote site is repmgr peer. */ +} repsite_t; + +/* Data used for common replication setup. */ +typedef struct { + const char *progname; + char *home; + int nsites; + int remotesites; + ENV_ROLE role; + repsite_t self; + repsite_t *site_list; +} SETUP_DATA; + +/* Data shared by both repmgr and base versions of this program. */ +typedef struct { + int is_master; + int app_finished; + int in_client_sync; +} SHARED_DATA; + +/* Arguments for support threads. */ +typedef struct { + DB_ENV *dbenv; + SHARED_DATA *shared; +} supthr_args; + +/* Portability macros for basic threading & timing */ +#ifdef _WIN32 +#define WIN32_LEAN_AND_MEAN +#include <windows.h> +#include <winsock2.h> +#define snprintf _snprintf +#define sleep(s) Sleep(1000 * (s)) + +extern int getopt(int, char * const *, const char *); + +typedef HANDLE thread_t; +#define thread_create(thrp, attr, func, arg) \ + (((*(thrp) = CreateThread(NULL, 0, \ + (LPTHREAD_START_ROUTINE)(func), (arg), 0, NULL)) == NULL) ? -1 : 0) +#define thread_join(thr, statusp) \ + ((WaitForSingleObject((thr), INFINITE) == WAIT_OBJECT_0) && \ + GetExitCodeThread((thr), (LPDWORD)(statusp)) ? 0 : -1) + +#else /* !_WIN32 */ +#include <sys/time.h> +#include <pthread.h> + +typedef pthread_t thread_t; +#define thread_create(thrp, attr, func, arg) \ + pthread_create((thrp), (attr), (func), (arg)) +#define thread_join(thr, statusp) pthread_join((thr), (statusp)) + +#endif + +void *checkpoint_thread __P((void *)); +int common_rep_setup __P((DB_ENV *, int, char *[], SETUP_DATA *)); +int create_env __P((const char *, DB_ENV **)); +int doloop __P((DB_ENV *, SHARED_DATA *)); +int env_init __P((DB_ENV *, const char *)); +int finish_support_threads __P((thread_t *, thread_t *)); +void *log_archive_thread __P((void *)); +int start_support_threads __P((DB_ENV *, supthr_args *, thread_t *, + thread_t *)); +void usage __P((const int, const char *)); diff --git a/db-4.8.30/examples_c/ex_rep/mgr/rep_mgr.c b/db-4.8.30/examples_c/ex_rep/mgr/rep_mgr.c new file mode 100644 index 0000000..9a726d3 --- /dev/null +++ b/db-4.8.30/examples_c/ex_rep/mgr/rep_mgr.c @@ -0,0 +1,204 @@ +/*- + * See the file LICENSE for redistribution information. + * + * Copyright (c) 2001-2009 Oracle. All rights reserved. + * + * $Id$ + */ + +#include <sys/types.h> +#include <signal.h> +#include <stdlib.h> +#include <string.h> +#ifndef _WIN32 +#include <unistd.h> +#endif + +#include <db.h> + +#include "../common/rep_common.h" + +typedef struct { + SHARED_DATA shared_data; +} APP_DATA; + +const char *progname = "ex_rep_mgr"; + +static void event_callback __P((DB_ENV *, u_int32_t, void *)); + +int +main(argc, argv) + int argc; + char *argv[]; +{ + DB_ENV *dbenv; + SETUP_DATA setup_info; + repsite_t *site_list; + APP_DATA my_app_data; + thread_t ckp_thr, lga_thr; + supthr_args sup_args; + u_int32_t start_policy; + int i, ret, t_ret; + + memset(&setup_info, 0, sizeof(SETUP_DATA)); + setup_info.progname = progname; + memset(&my_app_data, 0, sizeof(APP_DATA)); + dbenv = NULL; + ret = 0; + + start_policy = DB_REP_ELECTION; + + if ((ret = create_env(progname, &dbenv)) != 0) + goto err; + dbenv->app_private = &my_app_data; + (void)dbenv->set_event_notify(dbenv, event_callback); + + /* Parse command line and perform common replication setup. */ + if ((ret = common_rep_setup(dbenv, argc, argv, &setup_info)) != 0) + goto err; + + /* Perform repmgr-specific setup based on command line options. */ + if (setup_info.role == MASTER) + start_policy = DB_REP_MASTER; + else if (setup_info.role == CLIENT) + start_policy = DB_REP_CLIENT; + if ((ret = dbenv->repmgr_set_local_site(dbenv, setup_info.self.host, + setup_info.self.port, 0)) != 0) { + fprintf(stderr, "Could not set listen address (%d).\n", ret); + goto err; + } + site_list = setup_info.site_list; + for (i = 0; i < setup_info.remotesites; i++) { + if ((ret = dbenv->repmgr_add_remote_site(dbenv, + site_list[i].host, site_list[i].port, NULL, + site_list[i].peer ? DB_REPMGR_PEER : 0)) != 0) { + dbenv->err(dbenv, ret, + "Could not add site %s:%d", site_list[i].host, + (int)site_list[i].port); + goto err; + } + } + + /* + * Configure heartbeat timeouts so that repmgr monitors the + * health of the TCP connection. Master sites broadcast a heartbeat + * at the frequency specified by the DB_REP_HEARTBEAT_SEND timeout. + * Client sites wait for message activity the length of the + * DB_REP_HEARTBEAT_MONITOR timeout before concluding that the + * connection to the master is lost. The DB_REP_HEARTBEAT_MONITOR + * timeout should be longer than the DB_REP_HEARTBEAT_SEND timeout. + */ + if ((ret = dbenv->rep_set_timeout(dbenv, DB_REP_HEARTBEAT_SEND, + 5000000)) != 0) + dbenv->err(dbenv, ret, + "Could not set heartbeat send timeout.\n"); + if ((ret = dbenv->rep_set_timeout(dbenv, DB_REP_HEARTBEAT_MONITOR, + 10000000)) != 0) + dbenv->err(dbenv, ret, + "Could not set heartbeat monitor timeout.\n"); + + /* + * The following repmgr features may also be useful to your + * application. See Berkeley DB documentation for more details. + * - Two-site strict majority rule - In a two-site replication + * group, require both sites to be available to elect a new + * master. + * - Timeouts - Customize the amount of time repmgr waits + * for such things as waiting for acknowledgements or attempting + * to reconnect to other sites. + * - Site list - return a list of sites currently known to repmgr. + */ + + if ((ret = env_init(dbenv, setup_info.home)) != 0) + goto err; + + /* Start checkpoint and log archive threads. */ + sup_args.dbenv = dbenv; + sup_args.shared = &my_app_data.shared_data; + if ((ret = start_support_threads(dbenv, &sup_args, &ckp_thr, + &lga_thr)) != 0) + goto err; + + if ((ret = dbenv->repmgr_start(dbenv, 3, start_policy)) != 0) + goto err; + + if ((ret = doloop(dbenv, &my_app_data.shared_data)) != 0) { + dbenv->err(dbenv, ret, "Client failed"); + goto err; + } + + /* Finish checkpoint and log archive threads. */ + if ((ret = finish_support_threads(&ckp_thr, &lga_thr)) != 0) + goto err; + + /* + * We have used the DB_TXN_NOSYNC environment flag for improved + * performance without the usual sacrifice of transactional durability, + * as discussed in the "Transactional guarantees" page of the Reference + * Guide: if one replication site crashes, we can expect the data to + * exist at another site. However, in case we shut down all sites + * gracefully, we push out the end of the log here so that the most + * recent transactions don't mysteriously disappear. + */ + if ((ret = dbenv->log_flush(dbenv, NULL)) != 0) { + dbenv->err(dbenv, ret, "log_flush"); + goto err; + } + +err: + if (dbenv != NULL && + (t_ret = dbenv->close(dbenv, 0)) != 0) { + fprintf(stderr, "failure closing env: %s (%d)\n", + db_strerror(t_ret), t_ret); + if (ret == 0) + ret = t_ret; + } + + return (ret); +} + +static void +event_callback(dbenv, which, info) + DB_ENV *dbenv; + u_int32_t which; + void *info; +{ + APP_DATA *app = dbenv->app_private; + SHARED_DATA *shared = &app->shared_data; + + info = NULL; /* Currently unused. */ + + switch (which) { + case DB_EVENT_REP_CLIENT: + shared->is_master = 0; + shared->in_client_sync = 1; + break; + + case DB_EVENT_REP_MASTER: + shared->is_master = 1; + shared->in_client_sync = 0; + break; + + case DB_EVENT_REP_NEWMASTER: + shared->in_client_sync = 1; + break; + + case DB_EVENT_REP_PERM_FAILED: + /* + * Did not get enough acks to guarantee transaction + * durability based on the configured ack policy. This + * transaction will be flushed to the master site's + * local disk storage for durability. + */ + printf( + "Insufficient acknowledgements to guarantee transaction durability.\n"); + break; + + case DB_EVENT_REP_STARTUPDONE: + shared->in_client_sync = 0; + break; + + default: + dbenv->errx(dbenv, "ignoring event %d", which); + } +} |