summaryrefslogtreecommitdiff
path: root/db-4.8.30/rep/rep_method.c
diff options
context:
space:
mode:
Diffstat (limited to 'db-4.8.30/rep/rep_method.c')
-rw-r--r--db-4.8.30/rep/rep_method.c2142
1 files changed, 2142 insertions, 0 deletions
diff --git a/db-4.8.30/rep/rep_method.c b/db-4.8.30/rep/rep_method.c
new file mode 100644
index 0000000..fb21f7e
--- /dev/null
+++ b/db-4.8.30/rep/rep_method.c
@@ -0,0 +1,2142 @@
+/*-
+ * See the file LICENSE for redistribution information.
+ *
+ * Copyright (c) 2001-2009 Oracle. All rights reserved.
+ *
+ * $Id$
+ */
+
+#include "db_config.h"
+
+#include "db_int.h"
+#include "dbinc/db_page.h"
+#include "dbinc/btree.h"
+#include "dbinc/log.h"
+#include "dbinc/mp.h"
+#include "dbinc/txn.h"
+
+static int __rep_abort_prepared __P((ENV *));
+static int __rep_bt_cmp __P((DB *, const DBT *, const DBT *));
+static void __rep_config_map __P((ENV *, u_int32_t *, u_int32_t *));
+static u_int32_t __rep_conv_vers __P((ENV *, u_int32_t));
+static int __rep_restore_prepared __P((ENV *));
+
+/*
+ * __rep_env_create --
+ * Replication-specific initialization of the ENV structure.
+ *
+ * PUBLIC: int __rep_env_create __P((DB_ENV *));
+ */
+int
+__rep_env_create(dbenv)
+ DB_ENV *dbenv;
+{
+ DB_REP *db_rep;
+ ENV *env;
+ int ret;
+
+ env = dbenv->env;
+
+ if ((ret = __os_calloc(env, 1, sizeof(DB_REP), &db_rep)) != 0)
+ return (ret);
+
+ db_rep->eid = DB_EID_INVALID;
+ db_rep->bytes = REP_DEFAULT_THROTTLE;
+ DB_TIMEOUT_TO_TIMESPEC(DB_REP_REQUEST_GAP, &db_rep->request_gap);
+ DB_TIMEOUT_TO_TIMESPEC(DB_REP_MAX_GAP, &db_rep->max_gap);
+ db_rep->elect_timeout = 2 * US_PER_SEC; /* 2 seconds */
+ db_rep->chkpt_delay = 30 * US_PER_SEC; /* 30 seconds */
+ db_rep->my_priority = DB_REP_DEFAULT_PRIORITY;
+ /*
+ * Make no clock skew the default. Setting both fields
+ * to the same non-zero value means no skew.
+ */
+ db_rep->clock_skew = 1;
+ db_rep->clock_base = 1;
+
+#ifdef HAVE_REPLICATION_THREADS
+ if ((ret = __repmgr_env_create(env, db_rep)) != 0) {
+ __os_free(env, db_rep);
+ return (ret);
+ }
+#endif
+
+ env->rep_handle = db_rep;
+ return (0);
+}
+
+/*
+ * __rep_env_destroy --
+ * Replication-specific destruction of the ENV structure.
+ *
+ * PUBLIC: void __rep_env_destroy __P((DB_ENV *));
+ */
+void
+__rep_env_destroy(dbenv)
+ DB_ENV *dbenv;
+{
+ ENV *env;
+
+ env = dbenv->env;
+
+ if (env->rep_handle != NULL) {
+#ifdef HAVE_REPLICATION_THREADS
+ __repmgr_env_destroy(env, env->rep_handle);
+#endif
+ __os_free(env, env->rep_handle);
+ env->rep_handle = NULL;
+ }
+}
+
+/*
+ * __rep_get_config --
+ * Return the replication subsystem configuration.
+ *
+ * PUBLIC: int __rep_get_config __P((DB_ENV *, u_int32_t, int *));
+ */
+int
+__rep_get_config(dbenv, which, onp)
+ DB_ENV *dbenv;
+ u_int32_t which;
+ int *onp;
+{
+ DB_REP *db_rep;
+ ENV *env;
+ REP *rep;
+ u_int32_t mapped;
+
+ env = dbenv->env;
+
+#undef OK_FLAGS
+#define OK_FLAGS \
+ (DB_REP_CONF_BULK | DB_REP_CONF_DELAYCLIENT | DB_REP_CONF_INMEM | \
+ DB_REP_CONF_LEASE | DB_REP_CONF_NOAUTOINIT | DB_REP_CONF_NOWAIT | \
+ DB_REPMGR_CONF_2SITE_STRICT)
+
+ if (FLD_ISSET(which, ~OK_FLAGS))
+ return (__db_ferr(env, "DB_ENV->rep_get_config", 0));
+
+ db_rep = env->rep_handle;
+ ENV_NOT_CONFIGURED(
+ env, db_rep->region, "DB_ENV->rep_get_config", DB_INIT_REP);
+
+ mapped = 0;
+ __rep_config_map(env, &which, &mapped);
+ if (REP_ON(env)) {
+ rep = db_rep->region;
+ if (FLD_ISSET(rep->config, mapped))
+ *onp = 1;
+ else
+ *onp = 0;
+ } else {
+ if (FLD_ISSET(db_rep->config, mapped))
+ *onp = 1;
+ else
+ *onp = 0;
+ }
+ return (0);
+}
+
+/*
+ * __rep_set_config --
+ * Configure the replication subsystem.
+ *
+ * PUBLIC: int __rep_set_config __P((DB_ENV *, u_int32_t, int));
+ */
+int
+__rep_set_config(dbenv, which, on)
+ DB_ENV *dbenv;
+ u_int32_t which;
+ int on;
+{
+ DB_LOG *dblp;
+ DB_REP *db_rep;
+ DB_THREAD_INFO *ip;
+ ENV *env;
+ LOG *lp;
+ REP *rep;
+ REP_BULK bulk;
+ u_int32_t mapped, orig;
+ int ret;
+
+ env = dbenv->env;
+ db_rep = env->rep_handle;
+ ret = 0;
+
+#undef OK_FLAGS
+#define OK_FLAGS \
+ (DB_REP_CONF_BULK | DB_REP_CONF_DELAYCLIENT | DB_REP_CONF_INMEM | \
+ DB_REP_CONF_LEASE | DB_REP_CONF_NOAUTOINIT | DB_REP_CONF_NOWAIT | \
+ DB_REPMGR_CONF_2SITE_STRICT)
+
+ ENV_NOT_CONFIGURED(
+ env, db_rep->region, "DB_ENV->rep_set_config", DB_INIT_REP);
+
+ if (FLD_ISSET(which, ~OK_FLAGS))
+ return (__db_ferr(env, "DB_ENV->rep_set_config", 0));
+
+ mapped = 0;
+ __rep_config_map(env, &which, &mapped);
+
+ if (APP_IS_BASEAPI(env) && FLD_ISSET(mapped, REP_C_2SITE_STRICT)) {
+ __db_errx(env, "%s %s", "DB_ENV->rep_set_config:",
+"cannot configure 2SITE_STRICT from base replication application");
+ return (EINVAL);
+ }
+
+ if (REP_ON(env)) {
+ ENV_ENTER(env, ip);
+
+ rep = db_rep->region;
+ /*
+ * In-memory replication must be called before calling
+ * env->open. If it is turned on and off before env->open,
+ * it doesn't matter. Any attempt to turn it on or off after
+ * env->open is intercepted by this error.
+ */
+ if (FLD_ISSET(mapped, REP_C_INMEM)) {
+ __db_errx(env, "%s %s", "DB_ENV->rep_set_config:",
+ "in-memory replication must be configured before DB_ENV->open");
+ return (EINVAL);
+ }
+ /*
+ * Leases must be turned on before calling rep_start.
+ * Leases can never be turned off once they're turned on.
+ */
+ if (FLD_ISSET(mapped, REP_C_LEASE)) {
+ if (F_ISSET(rep, REP_F_START_CALLED)) {
+ __db_errx(env,
+"DB_ENV->rep_set_config: leases must be configured before DB_ENV->rep_start");
+ ret = EINVAL;
+ }
+ if (on == 0) {
+ __db_errx(env,
+ "DB_ENV->rep_set_config: leases cannot be turned off");
+ ret = EINVAL;
+ }
+ if (ret != 0)
+ return (ret);
+ }
+ MUTEX_LOCK(env, rep->mtx_clientdb);
+ REP_SYSTEM_LOCK(env);
+ orig = rep->config;
+ if (on)
+ FLD_SET(rep->config, mapped);
+ else
+ FLD_CLR(rep->config, mapped);
+
+ /*
+ * Bulk transfer requires special processing if it is getting
+ * toggled.
+ */
+ dblp = env->lg_handle;
+ lp = dblp->reginfo.primary;
+ if (FLD_ISSET(rep->config, REP_C_BULK) &&
+ !FLD_ISSET(orig, REP_C_BULK))
+ db_rep->bulk = R_ADDR(&dblp->reginfo, lp->bulk_buf);
+ REP_SYSTEM_UNLOCK(env);
+
+ /*
+ * If turning bulk off and it was on, send out whatever is in
+ * the buffer already.
+ */
+ if (FLD_ISSET(orig, REP_C_BULK) &&
+ !FLD_ISSET(rep->config, REP_C_BULK) && lp->bulk_off != 0) {
+ memset(&bulk, 0, sizeof(bulk));
+ if (db_rep->bulk == NULL)
+ bulk.addr =
+ R_ADDR(&dblp->reginfo, lp->bulk_buf);
+ else
+ bulk.addr = db_rep->bulk;
+ bulk.offp = &lp->bulk_off;
+ bulk.len = lp->bulk_len;
+ bulk.type = REP_BULK_LOG;
+ bulk.eid = DB_EID_BROADCAST;
+ bulk.flagsp = &lp->bulk_flags;
+ ret = __rep_send_bulk(env, &bulk, 0);
+ }
+ MUTEX_UNLOCK(env, rep->mtx_clientdb);
+
+ ENV_LEAVE(env, ip);
+ } else {
+ if (on)
+ FLD_SET(db_rep->config, mapped);
+ else
+ FLD_CLR(db_rep->config, mapped);
+ }
+ /* Configuring 2SITE_STRICT makes this a repmgr application */
+ if (ret == 0 && FLD_ISSET(mapped, REP_C_2SITE_STRICT))
+ APP_SET_REPMGR(env);
+ return (ret);
+}
+
+static void
+__rep_config_map(env, inflagsp, outflagsp)
+ ENV *env;
+ u_int32_t *inflagsp, *outflagsp;
+{
+ COMPQUIET(env, NULL);
+
+ if (FLD_ISSET(*inflagsp, DB_REP_CONF_BULK)) {
+ FLD_SET(*outflagsp, REP_C_BULK);
+ FLD_CLR(*inflagsp, DB_REP_CONF_BULK);
+ }
+ if (FLD_ISSET(*inflagsp, DB_REP_CONF_DELAYCLIENT)) {
+ FLD_SET(*outflagsp, REP_C_DELAYCLIENT);
+ FLD_CLR(*inflagsp, DB_REP_CONF_DELAYCLIENT);
+ }
+ if (FLD_ISSET(*inflagsp, DB_REP_CONF_INMEM)) {
+ FLD_SET(*outflagsp, REP_C_INMEM);
+ FLD_CLR(*inflagsp, DB_REP_CONF_INMEM);
+ }
+ if (FLD_ISSET(*inflagsp, DB_REP_CONF_LEASE)) {
+ FLD_SET(*outflagsp, REP_C_LEASE);
+ FLD_CLR(*inflagsp, DB_REP_CONF_LEASE);
+ }
+ if (FLD_ISSET(*inflagsp, DB_REP_CONF_NOAUTOINIT)) {
+ FLD_SET(*outflagsp, REP_C_NOAUTOINIT);
+ FLD_CLR(*inflagsp, DB_REP_CONF_NOAUTOINIT);
+ }
+ if (FLD_ISSET(*inflagsp, DB_REP_CONF_NOWAIT)) {
+ FLD_SET(*outflagsp, REP_C_NOWAIT);
+ FLD_CLR(*inflagsp, DB_REP_CONF_NOWAIT);
+ }
+ if (FLD_ISSET(*inflagsp, DB_REPMGR_CONF_2SITE_STRICT)) {
+ FLD_SET(*outflagsp, REP_C_2SITE_STRICT);
+ FLD_CLR(*inflagsp, DB_REPMGR_CONF_2SITE_STRICT);
+ }
+}
+
+/*
+ * __rep_start_pp --
+ * Become a master or client, and start sending messages to participate
+ * in the replication environment. Must be called after the environment
+ * is open.
+ *
+ * PUBLIC: int __rep_start_pp __P((DB_ENV *, DBT *, u_int32_t));
+ */
+int
+__rep_start_pp(dbenv, dbt, flags)
+ DB_ENV *dbenv;
+ DBT *dbt;
+ u_int32_t flags;
+{
+ DB_REP *db_rep;
+ ENV *env;
+
+ env = dbenv->env;
+ db_rep = env->rep_handle;
+
+ ENV_REQUIRES_CONFIG_XX(
+ env, rep_handle, "DB_ENV->rep_start", DB_INIT_REP);
+
+ if (APP_IS_REPMGR(env)) {
+ __db_errx(env,
+"DB_ENV->rep_start: cannot call from Replication Manager application");
+ return (EINVAL);
+ }
+
+ switch (LF_ISSET(DB_REP_CLIENT | DB_REP_MASTER)) {
+ case DB_REP_CLIENT:
+ case DB_REP_MASTER:
+ break;
+ default:
+ __db_errx(env,
+ "DB_ENV->rep_start: must specify DB_REP_CLIENT or DB_REP_MASTER");
+ return (EINVAL);
+ }
+
+ /* We need a transport function because we send messages. */
+ if (db_rep->send == NULL) {
+ __db_errx(env,
+ "DB_ENV->rep_start: must be called after DB_ENV->rep_set_transport");
+ return (EINVAL);
+ }
+
+ return (__rep_start_int(env, dbt, flags));
+}
+
+/*
+ * __rep_start_int --
+ * Internal processing to become a master or client and start sending
+ * messages to participate in the replication environment.
+ *
+ * We must protect rep_start_int, which may change the world, with the rest
+ * of the DB library. Each API interface will count itself as it enters
+ * the library. Rep_start_int checks the following:
+ *
+ * rep->msg_th - this is the count of threads currently in rep_process_message
+ * rep->handle_cnt - number of threads actively using a dbp in library.
+ * rep->txn_cnt - number of active txns.
+ * REP_F_READY_* - Replication flag that indicates that we wish to run
+ * recovery, and want to prohibit new transactions from entering and cause
+ * existing ones to return immediately (with a DB_LOCK_DEADLOCK error).
+ *
+ * There is also the renv->rep_timestamp which is updated whenever significant
+ * events (i.e., new masters, log rollback, etc). Upon creation, a handle
+ * is associated with the current timestamp. Each time a handle enters the
+ * library it must check if the handle timestamp is the same as the one
+ * stored in the replication region. This prevents the use of handles on
+ * clients that reference non-existent files whose creation was backed out
+ * during a synchronizing recovery.
+ *
+ * PUBLIC: int __rep_start_int __P((ENV *, DBT *, u_int32_t));
+ */
+int
+__rep_start_int(env, dbt, flags)
+ ENV *env;
+ DBT *dbt;
+ u_int32_t flags;
+{
+ DB *dbp;
+ DB_LOG *dblp;
+ DB_LOGC *logc;
+ DB_LSN lsn, perm_lsn;
+ DB_REP *db_rep;
+ DB_THREAD_INFO *ip;
+ DB_TXNREGION *region;
+ LOG *lp;
+ REGENV *renv;
+ REGINFO *infop;
+ REP *rep;
+ db_timeout_t tmp;
+ u_int32_t oldvers, pending_event, repflags, role;
+ int do_ckp, interrupting, locked, ret, role_chg, start_th, t_ret;
+
+ db_rep = env->rep_handle;
+ rep = db_rep->region;
+ infop = env->reginfo;
+ renv = infop->primary;
+ interrupting = locked = 0;
+ pending_event = DB_EVENT_NO_SUCH_EVENT;
+ role = LF_ISSET(DB_REP_CLIENT | DB_REP_MASTER);
+ start_th = 0;
+ do_ckp = 0;
+
+ /*
+ * If we're using master leases, check that all needed
+ * setup has been done, including setting the lease timeout.
+ */
+ if (IS_USING_LEASES(env) && rep->lease_timeout == 0) {
+ __db_errx(env,
+"DB_ENV->rep_start: must call DB_ENV->rep_set_timeout for leases first");
+ return (EINVAL);
+ }
+
+ ENV_ENTER(env, ip);
+
+ /*
+ * In order to correctly check log files for old versions, we
+ * need to flush the logs.
+ */
+ if ((ret = __log_flush(env, NULL)) != 0)
+ goto out;
+
+ REP_SYSTEM_LOCK(env);
+ /*
+ * We only need one thread to start-up replication, so if
+ * there is another thread in rep_start, we'll let it finish
+ * its work and have this thread simply return. Similarly,
+ * if a thread is in a critical lockout section we return.
+ */
+ if (F_ISSET(rep, REP_F_INREPSTART)) {
+ /*
+ * There is already someone in rep_start. Return.
+ */
+ RPRINT(env, DB_VERB_REP_MISC,
+ (env, "Thread already in rep_start"));
+ REP_SYSTEM_UNLOCK(env);
+ goto out;
+ } else {
+ F_SET(rep, REP_F_INREPSTART);
+ start_th = 1;
+ }
+
+ if (F_ISSET(rep, REP_F_READY_MSG)) {
+ /*
+ * There is already someone in msg lockout. Return.
+ */
+ RPRINT(env, DB_VERB_REP_MISC,
+ (env, "Thread already in msg lockout"));
+ REP_SYSTEM_UNLOCK(env);
+ goto out;
+ } else if ((ret = __rep_lockout_msg(env, rep, 0)) != 0)
+ goto errunlock;
+
+ /*
+ * If we are internal init and we try to become master, reject it.
+ * Our environment databases/logs are in an inconsistent state and
+ * we cannot become master.
+ */
+ if (IN_INTERNAL_INIT(rep) && role == DB_REP_MASTER) {
+ __db_errx(env,
+"DB_ENV->rep_start: Cannot become master during internal init");
+ ret = DB_REP_UNAVAIL;
+ goto errunlock;
+ }
+
+ role_chg = (!F_ISSET(rep, REP_F_MASTER) && role == DB_REP_MASTER) ||
+ (!F_ISSET(rep, REP_F_CLIENT) && role == DB_REP_CLIENT);
+
+ /*
+ * Wait for any active txns or mpool ops to complete, and
+ * prevent any new ones from occurring, only if we're
+ * changing roles.
+ */
+ if (role_chg) {
+ if ((ret = __rep_lockout_api(env, rep)) != 0)
+ goto errunlock;
+ locked = 1;
+ }
+
+ dblp = env->lg_handle;
+ lp = dblp->reginfo.primary;
+ if (role == DB_REP_MASTER) {
+ if (role_chg) {
+ /*
+ * If we were previously a client, it's possible we
+ * could have an interruptible STARTSYNC in progress.
+ * Interrupt it now, so that it doesn't slow down our
+ * transition to master, and because its effects aren't
+ * doing us any good anyway.
+ */
+ (void)__memp_set_config(
+ env->dbenv, DB_MEMP_SYNC_INTERRUPT, 1);
+ interrupting = 1;
+
+ /*
+ * If we're upgrading from having been a client,
+ * preclose, so that we close our temporary database
+ * and any files we opened while doing a rep_apply.
+ * If we don't we can infinitely leak file ids if
+ * the master crashed with files open (the likely
+ * case). If we don't close them we can run into
+ * problems if we try to remove that file or long
+ * running applications end up with an unbounded
+ * number of used fileids, each getting written
+ * on checkpoint. Just close them.
+ * Then invalidate all files open in the logging
+ * region. These are files open by other processes
+ * attached to the environment. They must be
+ * closed by the other processes when they notice
+ * the change in role.
+ */
+ if ((ret = __rep_preclose(env)) != 0)
+ goto errunlock;
+
+ rep->gen++;
+ /*
+ * There could have been any number of failed
+ * elections, so jump the gen if we need to now.
+ */
+ if (rep->egen > rep->gen)
+ rep->gen = rep->egen;
+ if (IS_USING_LEASES(env) &&
+ !F_ISSET(rep, REP_F_MASTERELECT)) {
+ __db_errx(env,
+ "rep_start: Cannot become master without being elected when using leases.");
+ ret = EINVAL;
+ goto errunlock;
+ }
+ if (F_ISSET(rep, REP_F_MASTERELECT)) {
+ __rep_elect_done(env, rep, 0);
+ F_CLR(rep, REP_F_MASTERELECT);
+ }
+ if (rep->egen <= rep->gen)
+ rep->egen = rep->gen + 1;
+ RPRINT(env, DB_VERB_REP_MISC, (env,
+ "New master gen %lu, egen %lu",
+ (u_long)rep->gen, (u_long)rep->egen));
+ /*
+ * If not running in-memory replication, write
+ * gen file.
+ */
+ if (!FLD_ISSET(rep->config, REP_C_INMEM)) {
+ if ((ret = __rep_write_gen(env, rep, rep->gen))
+ != 0)
+ goto errunlock;
+ } else if (!F_ISSET(rep, REP_F_MASTERELECT))
+ /*
+ * Help detect if application has
+ * ignored our recommendation against
+ * reappointing same master after a
+ * crash/reboot when running in-memory
+ * replication. Doing this allows a
+ * slight chance of two masters at the
+ * same generation resulting in client
+ * crashes.
+ */
+ RPRINT(env, DB_VERB_REP_MISC, (env,
+ "Appointed new master while running in-memory replication."));
+ }
+ /*
+ * Set lease duration assuming clients have faster clock.
+ * Master needs to compensate so that clients do not
+ * expire their grant while the master thinks it is valid.
+ */
+ if (IS_USING_LEASES(env) &&
+ (role_chg || !IS_REP_STARTED(env))) {
+ /*
+ * If we have already granted our lease, we
+ * cannot become master.
+ */
+ if ((ret = __rep_islease_granted(env))) {
+ __db_errx(env,
+ "rep_start: Cannot become master with outstanding lease granted.");
+ ret = EINVAL;
+ goto errunlock;
+ }
+ /*
+ * Set max_perm_lsn to last PERM record on master.
+ */
+ if ((ret = __log_cursor(env, &logc)) != 0)
+ goto errunlock;
+ ret = __rep_log_backup(env, rep, logc, &perm_lsn);
+ (void)__logc_close(logc);
+ /*
+ * If we found a perm LSN use it. Otherwise, if
+ * no perm LSN exists, initialize.
+ */
+ if (ret == 0)
+ lp->max_perm_lsn = perm_lsn;
+ else if (ret == DB_NOTFOUND) {
+ /*
+ * If we have no perm records, we want to
+ * force (later) a checkpoint to the log.
+ * By doing this now, we avoid a sticky
+ * deadlock with a txn. We need a perm
+ * record for leases, but if the first perm
+ * record is a txn, that txn cannot commit
+ * without leases refreshed. A client may
+ * be in internal init and cannot sync up if
+ * it needs to read pages the txn holds write
+ * locks on and we have an impasse. This
+ * checkpoint will allow leases to be granted
+ * on this perm record first and that does not
+ * need any locks.
+ */
+ do_ckp = 1;
+ INIT_LSN(lp->max_perm_lsn);
+ } else
+ goto errunlock;
+
+ /*
+ * Simply compute the larger ratio for the lease.
+ */
+ tmp = (db_timeout_t)((double)rep->lease_timeout /
+ ((double)rep->clock_skew /
+ (double)rep->clock_base));
+ DB_TIMEOUT_TO_TIMESPEC(tmp, &rep->lease_duration);
+ if ((ret = __rep_lease_table_alloc(env,
+ rep->nsites)) != 0)
+ goto errunlock;
+ }
+ rep->master_id = rep->eid;
+ STAT(rep->stat.st_master_changes++);
+
+ /*
+ * Clear out almost everything, and then set MASTER. Leave
+ * READY_* alone in case we did a lockout above;
+ * we'll clear it in a moment (below), once we've written
+ * the txn_recycle into the log.
+ */
+ repflags = F_ISSET(rep, REP_F_INREPSTART | REP_F_READY_API |
+ REP_F_READY_MSG | REP_F_READY_OP | REP_F_STICKY_MASK);
+#ifdef DIAGNOSTIC
+ if (!F_ISSET(rep, REP_F_GROUP_ESTD))
+ RPRINT(env, DB_VERB_REP_MISC, (env,
+ "Establishing group as master."));
+#endif
+ FLD_SET(repflags, REP_F_MASTER |
+ REP_F_GROUP_ESTD | REP_F_NIMDBS_LOADED);
+ rep->flags = repflags;
+
+ /*
+ * We're master. Set the versions to the current ones.
+ */
+ oldvers = lp->persist.version;
+ /*
+ * If we're moving forward to the current version, we need
+ * to force the log file to advance and reset the
+ * recovery table since it contains pointers to old
+ * recovery functions.
+ */
+ RPRINT(env, DB_VERB_REP_MISC, (env,
+ "rep_start: Old log version was %lu", (u_long)oldvers));
+ if (lp->persist.version != DB_LOGVERSION) {
+ if ((ret = __env_init_rec(env, DB_LOGVERSION)) != 0)
+ goto errunlock;
+ }
+ rep->version = DB_REPVERSION;
+ F_CLR(rep, REP_F_READY_MSG);
+ REP_SYSTEM_UNLOCK(env);
+ LOG_SYSTEM_LOCK(env);
+ lsn = lp->lsn;
+ LOG_SYSTEM_UNLOCK(env);
+
+ /*
+ * Send the NEWMASTER message first so that clients know
+ * subsequent messages are coming from the right master.
+ * We need to perform all actions below no matter what
+ * regarding errors.
+ */
+ (void)__rep_send_message(env,
+ DB_EID_BROADCAST, REP_NEWMASTER, &lsn, NULL, 0, 0);
+ ret = 0;
+ if (role_chg) {
+ pending_event = DB_EVENT_REP_MASTER;
+ /*
+ * If prepared transactions have not been restored
+ * look to see if there are any. If there are,
+ * then mark the open files, otherwise close them.
+ */
+ region = env->tx_handle->reginfo.primary;
+ if (region->stat.st_nrestores == 0 &&
+ (t_ret = __rep_restore_prepared(env)) != 0 &&
+ ret == 0)
+ ret = t_ret;
+ if (region->stat.st_nrestores != 0) {
+ if ((t_ret = __dbreg_mark_restored(env)) != 0 &&
+ ret == 0)
+ ret = t_ret;
+ } else {
+ ret = __dbreg_invalidate_files(env, 0);
+ if ((t_ret = __rep_closefiles(env)) != 0 &&
+ ret == 0)
+ ret = t_ret;
+ }
+ if ((t_ret = __txn_recycle_id(env)) != 0 && ret == 0)
+ ret = t_ret;
+ REP_SYSTEM_LOCK(env);
+ F_CLR(rep, REP_F_READY_API | REP_F_READY_OP);
+ locked = 0;
+ REP_SYSTEM_UNLOCK(env);
+ (void)__memp_set_config(
+ env->dbenv, DB_MEMP_SYNC_INTERRUPT, 0);
+ interrupting = 0;
+ /*
+ * Force a checkpoint if this new master has no
+ * perm record yet.
+ */
+ if (ret == 0 && do_ckp)
+ ret = __txn_checkpoint(env, 0, 0,
+ DB_CKP_INTERNAL | DB_FORCE);
+ }
+ } else {
+ if (role_chg)
+ rep->master_id = DB_EID_INVALID;
+ /*
+ * Zero out "everything" except recovery and tally flags.
+ */
+ repflags = F_ISSET(rep,
+ REP_F_INREPSTART | REP_F_NOARCHIVE | REP_F_READY_MSG |
+ REP_F_RECOVER_MASK | REP_F_TALLY | REP_F_STICKY_MASK);
+ FLD_SET(repflags, REP_F_CLIENT);
+ if (role_chg) {
+ if ((ret = __log_get_oldversion(env, &oldvers)) != 0)
+ goto errunlock;
+ RPRINT(env, DB_VERB_REP_MISC, (env,
+ "rep_start: Found old version log %d", oldvers));
+ if (oldvers >= DB_LOGVERSION_MIN) {
+ __log_set_version(env, oldvers);
+ oldvers = __rep_conv_vers(env, oldvers);
+ DB_ASSERT(
+ env, oldvers != DB_REPVERSION_INVALID);
+ rep->version = oldvers;
+ }
+ }
+ rep->flags = repflags;
+ /*
+ * On a client, compute the lease duration on the
+ * assumption that the client has a fast clock.
+ * Expire any existing leases we might have held as
+ * a master.
+ */
+ if (IS_USING_LEASES(env) &&
+ (role_chg || !IS_REP_STARTED(env))) {
+ if ((ret = __rep_lease_expire(env)) != 0)
+ goto errunlock;
+ /*
+ * Since the master is also compensating on its
+ * side as well, we're being doubly conservative
+ * to compensate on the client side. Theoretically,
+ * this compensation is not necessary, as it is
+ * effectively doubling the skew compensation.
+ * But we are making guarantees based on time and
+ * skews across machines. So we are being extra
+ * cautious.
+ */
+ tmp = (db_timeout_t)((double)rep->lease_timeout *
+ ((double)rep->clock_skew /
+ (double)rep->clock_base));
+ DB_TIMEOUT_TO_TIMESPEC(tmp, &rep->lease_duration);
+ if (rep->lease_off != INVALID_ROFF) {
+ MUTEX_LOCK(env, renv->mtx_regenv);
+ __env_alloc_free(infop,
+ R_ADDR(infop, rep->lease_off));
+ MUTEX_UNLOCK(env, renv->mtx_regenv);
+ rep->lease_off = INVALID_ROFF;
+ }
+ }
+ REP_SYSTEM_UNLOCK(env);
+
+ /*
+ * Abort any prepared transactions that were restored
+ * by recovery. We won't be able to create any txns of
+ * our own until they're resolved, but we can't resolve
+ * them ourselves; the master has to. If any get
+ * resolved as commits, we'll redo them when commit
+ * records come in. Aborts will simply be ignored.
+ */
+ if ((ret = __rep_abort_prepared(env)) != 0)
+ goto errlock;
+
+ /*
+ * If we're changing roles we need to init the db.
+ */
+ if (role_chg) {
+ if ((ret = __db_create_internal(&dbp, env, 0)) != 0)
+ goto errlock;
+ /*
+ * Ignore errors, because if the file doesn't exist,
+ * this is perfectly OK.
+ */
+ MUTEX_LOCK(env, rep->mtx_clientdb);
+ (void)__db_remove(dbp, ip, NULL, REPDBNAME,
+ NULL, DB_FORCE);
+ MUTEX_UNLOCK(env, rep->mtx_clientdb);
+ /*
+ * Set pending_event after calls that can fail.
+ */
+ pending_event = DB_EVENT_REP_CLIENT;
+ }
+ REP_SYSTEM_LOCK(env);
+ F_CLR(rep, REP_F_READY_MSG);
+ if (locked) {
+ F_CLR(rep, REP_F_READY_API | REP_F_READY_OP);
+ locked = 0;
+ }
+ REP_SYSTEM_UNLOCK(env);
+
+ if ((role_chg || rep->master_id == DB_EID_INVALID) &&
+ F_ISSET(env, ENV_PRIVATE))
+ /*
+ * If we think we're a new client, and we have a
+ * private env, set our gen number down to 0.
+ * Otherwise, we can restart and think
+ * we're ready to accept a new record (because our
+ * gen is okay), but really this client needs to
+ * sync with the master.
+ */
+ rep->gen = 0;
+
+ /*
+ * Announce ourselves and send out our data.
+ */
+ if ((ret = __dbt_usercopy(env, dbt)) != 0)
+ goto out;
+ (void)__rep_send_message(env,
+ DB_EID_BROADCAST, REP_NEWCLIENT, NULL, dbt, 0, 0);
+ }
+
+ if (0) {
+ /*
+ * We have separate labels for errors. If we're returning an
+ * error before we've set REP_F_READY_MSG, we use 'err'. If
+ * we are erroring while holding the region mutex, then we use
+ * 'errunlock' label. If we error without holding the rep
+ * mutex we must use 'errlock'.
+ */
+errlock: REP_SYSTEM_LOCK(env);
+errunlock: F_CLR(rep, REP_F_READY_MSG);
+ if (locked)
+ F_CLR(rep, REP_F_READY_API | REP_F_READY_OP);
+ if (interrupting)
+ (void)__memp_set_config(
+ env->dbenv, DB_MEMP_SYNC_INTERRUPT, 0);
+ REP_SYSTEM_UNLOCK(env);
+ }
+out:
+ if (ret == 0) {
+ REP_SYSTEM_LOCK(env);
+ F_SET(rep, REP_F_START_CALLED);
+ REP_SYSTEM_UNLOCK(env);
+ }
+ if (start_th) {
+ REP_SYSTEM_LOCK(env);
+ F_CLR(rep, REP_F_INREPSTART);
+ REP_SYSTEM_UNLOCK(env);
+ }
+ if (pending_event != DB_EVENT_NO_SUCH_EVENT)
+ __rep_fire_event(env, pending_event, NULL);
+ __dbt_userfree(env, dbt, NULL, NULL);
+ ENV_LEAVE(env, ip);
+ return (ret);
+}
+
+/*
+ * __rep_client_dbinit --
+ *
+ * Initialize the LSN database on the client side. This is called from the
+ * client initialization code. The startup flag value indicates if
+ * this is the first thread/process starting up and therefore should create
+ * the LSN database. This routine must be called once by each process acting
+ * as a client.
+ *
+ * Assumes caller holds appropriate mutex.
+ *
+ * PUBLIC: int __rep_client_dbinit __P((ENV *, int, repdb_t));
+ */
+int
+__rep_client_dbinit(env, startup, which)
+ ENV *env;
+ int startup;
+ repdb_t which;
+{
+ DB *dbp, **rdbpp;
+ DB_REP *db_rep;
+ DB_THREAD_INFO *ip;
+ REP *rep;
+ int ret, t_ret;
+ u_int32_t flags;
+ const char *fname, *name, *subdb;
+
+ db_rep = env->rep_handle;
+ rep = db_rep->region;
+ dbp = NULL;
+
+ if (which == REP_DB) {
+ name = REPDBNAME;
+ rdbpp = &db_rep->rep_db;
+ } else {
+ name = REPPAGENAME;
+ rdbpp = &rep->file_dbp;
+ }
+ /* Check if this has already been called on this environment. */
+ if (*rdbpp != NULL)
+ return (0);
+
+ ENV_GET_THREAD_INFO(env, ip);
+
+ /* Set up arguments for __db_remove and __db_open calls. */
+ fname = name;
+ subdb = NULL;
+ if (FLD_ISSET(rep->config, REP_C_INMEM)) {
+ fname = NULL;
+ subdb = name;
+ }
+
+ if (startup) {
+ if ((ret = __db_create_internal(&dbp, env, 0)) != 0)
+ goto err;
+ /*
+ * Prevent in-memory database remove from writing to
+ * non-existent logs.
+ */
+ if (FLD_ISSET(rep->config, REP_C_INMEM))
+ (void)__db_set_flags(dbp, DB_TXN_NOT_DURABLE);
+ /*
+ * Ignore errors, because if the file doesn't exist, this
+ * is perfectly OK.
+ */
+ (void)__db_remove(dbp, ip, NULL, fname, subdb, DB_FORCE);
+ }
+
+ if ((ret = __db_create_internal(&dbp, env, 0)) != 0)
+ goto err;
+ if (which == REP_DB &&
+ (ret = __bam_set_bt_compare(dbp, __rep_bt_cmp)) != 0)
+ goto err;
+
+ /* Don't write log records on the client. */
+ if ((ret = __db_set_flags(dbp, DB_TXN_NOT_DURABLE)) != 0)
+ goto err;
+
+ flags = DB_NO_AUTO_COMMIT | DB_CREATE |
+ (F_ISSET(env, ENV_THREAD) ? DB_THREAD : 0);
+
+ if ((ret = __db_open(dbp, ip, NULL, fname, subdb,
+ (which == REP_DB ? DB_BTREE : DB_RECNO),
+ flags, 0, PGNO_BASE_MD)) != 0)
+ goto err;
+
+ *rdbpp = dbp;
+
+ if (0) {
+err: if (dbp != NULL &&
+ (t_ret = __db_close(dbp, NULL, DB_NOSYNC)) != 0 && ret == 0)
+ ret = t_ret;
+ *rdbpp = NULL;
+ }
+
+ return (ret);
+}
+
+/*
+ * __rep_bt_cmp --
+ *
+ * Comparison function for the LSN table. We use the entire control
+ * structure as a key (for simplicity, so we don't have to merge the
+ * other fields in the control with the data field), but really only
+ * care about the LSNs.
+ */
+static int
+__rep_bt_cmp(dbp, dbt1, dbt2)
+ DB *dbp;
+ const DBT *dbt1, *dbt2;
+{
+ DB_LSN lsn1, lsn2;
+ __rep_control_args *rp1, *rp2;
+
+ COMPQUIET(dbp, NULL);
+
+ rp1 = dbt1->data;
+ rp2 = dbt2->data;
+
+ (void)__ua_memcpy(&lsn1, &rp1->lsn, sizeof(DB_LSN));
+ (void)__ua_memcpy(&lsn2, &rp2->lsn, sizeof(DB_LSN));
+
+ if (lsn1.file > lsn2.file)
+ return (1);
+
+ if (lsn1.file < lsn2.file)
+ return (-1);
+
+ if (lsn1.offset > lsn2.offset)
+ return (1);
+
+ if (lsn1.offset < lsn2.offset)
+ return (-1);
+
+ return (0);
+}
+
+/*
+ * __rep_abort_prepared --
+ * Abort any prepared transactions that recovery restored.
+ *
+ * This is used by clients that have just run recovery, since
+ * they cannot/should not call txn_recover and handle prepared transactions
+ * themselves.
+ */
+static int
+__rep_abort_prepared(env)
+ ENV *env;
+{
+#define PREPLISTSIZE 50
+ DB_LOG *dblp;
+ DB_PREPLIST prep[PREPLISTSIZE], *p;
+ DB_TXNMGR *mgr;
+ DB_TXNREGION *region;
+ LOG *lp;
+ int ret;
+ u_int32_t count, i;
+ u_int32_t op;
+
+ mgr = env->tx_handle;
+ region = mgr->reginfo.primary;
+ dblp = env->lg_handle;
+ lp = dblp->reginfo.primary;
+
+ if (region->stat.st_nrestores == 0)
+ return (0);
+
+ op = DB_FIRST;
+ do {
+ if ((ret = __txn_recover(env,
+ prep, PREPLISTSIZE, &count, op)) != 0)
+ return (ret);
+ for (i = 0; i < count; i++) {
+ p = &prep[i];
+ if ((ret = __txn_abort(p->txn)) != 0)
+ return (ret);
+ env->rep_handle->region->op_cnt--;
+ env->rep_handle->region->max_prep_lsn = lp->lsn;
+ region->stat.st_nrestores--;
+ }
+ op = DB_NEXT;
+ } while (count == PREPLISTSIZE);
+
+ return (0);
+}
+
+/*
+ * __rep_restore_prepared --
+ * Restore to a prepared state any prepared but not yet committed
+ * transactions.
+ *
+ * This performs, in effect, a "mini-recovery"; it is called from
+ * __rep_start by newly upgraded masters. There may be transactions that an
+ * old master prepared but did not resolve, which we need to restore to an
+ * active state.
+ */
+static int
+__rep_restore_prepared(env)
+ ENV *env;
+{
+ DBT rec;
+ DB_LOGC *logc;
+ DB_LSN ckp_lsn, lsn;
+ DB_REP *db_rep;
+ DB_TXNHEAD *txninfo;
+ REP *rep;
+ __txn_ckp_args *ckp_args;
+ __txn_regop_args *regop_args;
+ __txn_prepare_args *prep_args;
+ int ret, t_ret;
+ u_int32_t hi_txn, low_txn, rectype, status, txnid, txnop;
+
+ db_rep = env->rep_handle;
+ rep = db_rep->region;
+ if (IS_ZERO_LSN(rep->max_prep_lsn)) {
+ RPRINT(env, DB_VERB_REP_MISC,
+ (env, "restore_prep: No prepares. Skip."));
+ return (0);
+ }
+ txninfo = NULL;
+ ckp_args = NULL;
+ prep_args = NULL;
+ regop_args = NULL;
+ ZERO_LSN(ckp_lsn);
+ ZERO_LSN(lsn);
+
+ if ((ret = __log_cursor(env, &logc)) != 0)
+ return (ret);
+
+ /*
+ * Get our first LSN to see if the prepared LSN is still
+ * available. If so, it might be unresolved. If not,
+ * then it is guaranteed to be resolved.
+ */
+ memset(&rec, 0, sizeof(DBT));
+ if ((ret = __logc_get(logc, &lsn, &rec, DB_FIRST)) != 0) {
+ __db_errx(env, "First record not found");
+ goto err;
+ }
+ /*
+ * If the max_prep_lsn is no longer available, we're sure
+ * that txn has been resolved. We're done.
+ */
+ if (rep->max_prep_lsn.file < lsn.file) {
+ RPRINT(env, DB_VERB_REP_MISC,
+ (env, "restore_prep: Prepare resolved. Skip"));
+ ZERO_LSN(rep->max_prep_lsn);
+ goto done;
+ }
+ /*
+ * We need to consider the set of records between the most recent
+ * checkpoint LSN and the end of the log; any txn in that
+ * range, and only txns in that range, could still have been
+ * active, and thus prepared but not yet committed (PBNYC),
+ * when the old master died.
+ *
+ * Find the most recent checkpoint LSN, and get the record there.
+ * If there is no checkpoint in the log, start off by getting
+ * the very first record in the log instead.
+ */
+ if ((ret = __txn_getckp(env, &lsn)) == 0) {
+ if ((ret = __logc_get(logc, &lsn, &rec, DB_SET)) != 0) {
+ __db_errx(env,
+ "Checkpoint record at LSN [%lu][%lu] not found",
+ (u_long)lsn.file, (u_long)lsn.offset);
+ goto err;
+ }
+
+ if ((ret = __txn_ckp_read(
+ env, rec.data, &ckp_args)) == 0) {
+ ckp_lsn = ckp_args->ckp_lsn;
+ __os_free(env, ckp_args);
+ }
+ if (ret != 0) {
+ __db_errx(env,
+ "Invalid checkpoint record at [%lu][%lu]",
+ (u_long)lsn.file, (u_long)lsn.offset);
+ goto err;
+ }
+
+ if ((ret = __logc_get(logc, &ckp_lsn, &rec, DB_SET)) != 0) {
+ __db_errx(env,
+ "Checkpoint LSN record [%lu][%lu] not found",
+ (u_long)ckp_lsn.file, (u_long)ckp_lsn.offset);
+ goto err;
+ }
+ } else if ((ret = __logc_get(logc, &lsn, &rec, DB_FIRST)) != 0) {
+ if (ret == DB_NOTFOUND) {
+ /* An empty log means no PBNYC txns. */
+ ret = 0;
+ goto done;
+ }
+ __db_errx(env, "Attempt to get first log record failed");
+ goto err;
+ }
+
+ /*
+ * We use the same txnlist infrastructure that recovery does;
+ * it demands an estimate of the high and low txnids for
+ * initialization.
+ *
+ * First, the low txnid.
+ */
+ do {
+ /* txnid is after rectype, which is a u_int32. */
+ LOGCOPY_32(env, &low_txn,
+ (u_int8_t *)rec.data + sizeof(u_int32_t));
+ if (low_txn != 0)
+ break;
+ } while ((ret = __logc_get(logc, &lsn, &rec, DB_NEXT)) == 0);
+
+ /* If there are no txns, there are no PBNYC txns. */
+ if (ret == DB_NOTFOUND) {
+ ret = 0;
+ goto done;
+ } else if (ret != 0)
+ goto err;
+
+ /* Now, the high txnid. */
+ if ((ret = __logc_get(logc, &lsn, &rec, DB_LAST)) != 0) {
+ /*
+ * Note that DB_NOTFOUND is unacceptable here because we
+ * had to have looked at some log record to get this far.
+ */
+ __db_errx(env, "Final log record not found");
+ goto err;
+ }
+ do {
+ /* txnid is after rectype, which is a u_int32. */
+ LOGCOPY_32(env, &hi_txn,
+ (u_int8_t *)rec.data + sizeof(u_int32_t));
+ if (hi_txn != 0)
+ break;
+ } while ((ret = __logc_get(logc, &lsn, &rec, DB_PREV)) == 0);
+ if (ret == DB_NOTFOUND) {
+ ret = 0;
+ goto done;
+ } else if (ret != 0)
+ goto err;
+
+ /* We have a high and low txnid. Initialise the txn list. */
+ if ((ret = __db_txnlist_init(env,
+ NULL, low_txn, hi_txn, NULL, &txninfo)) != 0)
+ goto err;
+
+ /*
+ * Now, walk backward from the end of the log to ckp_lsn. Any
+ * prepares that we hit without first hitting a commit or
+ * abort belong to PBNYC txns, and we need to apply them and
+ * restore them to a prepared state.
+ *
+ * Note that we wind up applying transactions out of order.
+ * Since all PBNYC txns still held locks on the old master and
+ * were isolated, this should be safe.
+ */
+ F_SET(env->lg_handle, DBLOG_RECOVER);
+ for (ret = __logc_get(logc, &lsn, &rec, DB_LAST);
+ ret == 0 && LOG_COMPARE(&lsn, &ckp_lsn) > 0;
+ ret = __logc_get(logc, &lsn, &rec, DB_PREV)) {
+ LOGCOPY_32(env, &rectype, rec.data);
+ switch (rectype) {
+ case DB___txn_regop:
+ /*
+ * It's a commit or abort--but we don't care
+ * which! Just add it to the list of txns
+ * that are resolved.
+ */
+ if ((ret = __txn_regop_read(
+ env, rec.data, &regop_args)) != 0)
+ goto err;
+ txnid = regop_args->txnp->txnid;
+ txnop = regop_args->opcode;
+ __os_free(env, regop_args);
+
+ ret = __db_txnlist_find(env,
+ txninfo, txnid, &status);
+ if (ret == DB_NOTFOUND)
+ ret = __db_txnlist_add(env, txninfo,
+ txnid, txnop, &lsn);
+ else if (ret != 0)
+ goto err;
+ break;
+ case DB___txn_prepare:
+ /*
+ * It's a prepare. If its not aborted and
+ * we haven't put the txn on our list yet, it
+ * hasn't been resolved, so apply and restore it.
+ */
+ if ((ret = __txn_prepare_read(
+ env, rec.data, &prep_args)) != 0)
+ goto err;
+ ret = __db_txnlist_find(env, txninfo,
+ prep_args->txnp->txnid, &status);
+ if (ret == DB_NOTFOUND) {
+ if (prep_args->opcode == TXN_ABORT)
+ ret = __db_txnlist_add(env, txninfo,
+ prep_args->txnp->txnid,
+ prep_args->opcode, &lsn);
+ else if ((ret =
+ __rep_process_txn(env, &rec)) == 0) {
+ /*
+ * We are guaranteed to be single
+ * threaded here. We need to
+ * account for this newly
+ * instantiated txn in the op_cnt
+ * so that it is counted when it is
+ * resolved.
+ */
+ rep->op_cnt++;
+ ret = __txn_restore_txn(env,
+ &lsn, prep_args);
+ }
+ } else if (ret != 0)
+ goto err;
+ __os_free(env, prep_args);
+ break;
+ default:
+ continue;
+ }
+ }
+
+ /* It's not an error to have hit the beginning of the log. */
+ if (ret == DB_NOTFOUND)
+ ret = 0;
+
+done:
+err: t_ret = __logc_close(logc);
+ F_CLR(env->lg_handle, DBLOG_RECOVER);
+
+ if (txninfo != NULL)
+ __db_txnlist_end(env, txninfo);
+
+ return (ret == 0 ? t_ret : ret);
+}
+
+/*
+ * __rep_get_limit --
+ * Get the limit on the amount of data that will be sent during a single
+ * invocation of __rep_process_message.
+ *
+ * PUBLIC: int __rep_get_limit __P((DB_ENV *, u_int32_t *, u_int32_t *));
+ */
+int
+__rep_get_limit(dbenv, gbytesp, bytesp)
+ DB_ENV *dbenv;
+ u_int32_t *gbytesp, *bytesp;
+{
+ DB_REP *db_rep;
+ DB_THREAD_INFO *ip;
+ ENV *env;
+ REP *rep;
+
+ env = dbenv->env;
+ db_rep = env->rep_handle;
+
+ ENV_NOT_CONFIGURED(
+ env, db_rep->region, "DB_ENV->rep_get_limit", DB_INIT_REP);
+
+ if (REP_ON(env)) {
+ rep = db_rep->region;
+ ENV_ENTER(env, ip);
+ REP_SYSTEM_LOCK(env);
+ if (gbytesp != NULL)
+ *gbytesp = rep->gbytes;
+ if (bytesp != NULL)
+ *bytesp = rep->bytes;
+ REP_SYSTEM_UNLOCK(env);
+ ENV_LEAVE(env, ip);
+ } else {
+ if (gbytesp != NULL)
+ *gbytesp = db_rep->gbytes;
+ if (bytesp != NULL)
+ *bytesp = db_rep->bytes;
+ }
+
+ return (0);
+}
+
+/*
+ * __rep_set_limit --
+ * Set a limit on the amount of data that will be sent during a single
+ * invocation of __rep_process_message.
+ *
+ * PUBLIC: int __rep_set_limit __P((DB_ENV *, u_int32_t, u_int32_t));
+ */
+int
+__rep_set_limit(dbenv, gbytes, bytes)
+ DB_ENV *dbenv;
+ u_int32_t gbytes, bytes;
+{
+ DB_REP *db_rep;
+ DB_THREAD_INFO *ip;
+ ENV *env;
+ REP *rep;
+
+ env = dbenv->env;
+ db_rep = env->rep_handle;
+
+ ENV_NOT_CONFIGURED(
+ env, db_rep->region, "DB_ENV->rep_set_limit", DB_INIT_REP);
+
+ if (bytes > GIGABYTE) {
+ gbytes += bytes / GIGABYTE;
+ bytes = bytes % GIGABYTE;
+ }
+
+ if (REP_ON(env)) {
+ rep = db_rep->region;
+ ENV_ENTER(env, ip);
+ REP_SYSTEM_LOCK(env);
+ rep->gbytes = gbytes;
+ rep->bytes = bytes;
+ REP_SYSTEM_UNLOCK(env);
+ ENV_LEAVE(env, ip);
+ } else {
+ db_rep->gbytes = gbytes;
+ db_rep->bytes = bytes;
+ }
+
+ return (0);
+}
+
+/*
+ * PUBLIC: int __rep_set_nsites __P((DB_ENV *, u_int32_t));
+ */
+int
+__rep_set_nsites(dbenv, n)
+ DB_ENV *dbenv;
+ u_int32_t n;
+{
+ DB_REP *db_rep;
+ ENV *env;
+ REP *rep;
+
+ env = dbenv->env;
+ db_rep = env->rep_handle;
+
+ ENV_NOT_CONFIGURED(
+ env, db_rep->region, "DB_ENV->rep_set_nsites", DB_INIT_REP);
+
+ if (IS_USING_LEASES(env) && IS_REP_STARTED(env)) {
+ __db_errx(env,
+ "DB_ENV->rep_set_nsites: must be called before DB_ENV->rep_start");
+ return (EINVAL);
+ }
+
+ if (REP_ON(env)) {
+ rep = db_rep->region;
+ rep->config_nsites = n;
+ } else
+ db_rep->config_nsites = n;
+ return (0);
+}
+
+/*
+ * PUBLIC: int __rep_get_nsites __P((DB_ENV *, u_int32_t *));
+ */
+int
+__rep_get_nsites(dbenv, n)
+ DB_ENV *dbenv;
+ u_int32_t *n;
+{
+ DB_REP *db_rep;
+ ENV *env;
+ REP *rep;
+
+ env = dbenv->env;
+ db_rep = env->rep_handle;
+
+ ENV_NOT_CONFIGURED(
+ env, db_rep->region, "DB_ENV->rep_get_nsites", DB_INIT_REP);
+
+ if (REP_ON(env)) {
+ rep = db_rep->region;
+ *n = rep->config_nsites;
+ } else
+ *n = db_rep->config_nsites;
+
+ return (0);
+}
+
+/*
+ * PUBLIC: int __rep_set_priority __P((DB_ENV *, u_int32_t));
+ */
+int
+__rep_set_priority(dbenv, priority)
+ DB_ENV *dbenv;
+ u_int32_t priority;
+{
+ DB_REP *db_rep;
+ ENV *env;
+ REP *rep;
+
+ env = dbenv->env;
+ db_rep = env->rep_handle;
+
+ ENV_NOT_CONFIGURED(
+ env, db_rep->region, "DB_ENV->rep_set_priority", DB_INIT_REP);
+
+ if (REP_ON(env)) {
+ rep = db_rep->region;
+ rep->priority = priority;
+ } else
+ db_rep->my_priority = priority;
+ return (0);
+}
+
+/*
+ * PUBLIC: int __rep_get_priority __P((DB_ENV *, u_int32_t *));
+ */
+int
+__rep_get_priority(dbenv, priority)
+ DB_ENV *dbenv;
+ u_int32_t *priority;
+{
+ DB_REP *db_rep;
+ ENV *env;
+ REP *rep;
+
+ env = dbenv->env;
+ db_rep = env->rep_handle;
+
+ ENV_NOT_CONFIGURED(
+ env, db_rep->region, "DB_ENV->rep_get_priority", DB_INIT_REP);
+
+ if (REP_ON(env)) {
+ rep = db_rep->region;
+ *priority = rep->priority;
+ } else
+ *priority = db_rep->my_priority;
+ return (0);
+}
+
+/*
+ * PUBLIC: int __rep_set_timeout __P((DB_ENV *, int, db_timeout_t));
+ */
+int
+__rep_set_timeout(dbenv, which, timeout)
+ DB_ENV *dbenv;
+ int which;
+ db_timeout_t timeout;
+{
+ DB_REP *db_rep;
+ ENV *env;
+ REP *rep;
+ int repmgr_timeout, ret;
+
+ env = dbenv->env;
+ db_rep = env->rep_handle;
+ rep = db_rep->region;
+ ret = 0;
+ repmgr_timeout = 0;
+
+ if (which == DB_REP_ACK_TIMEOUT || which == DB_REP_CONNECTION_RETRY ||
+ which == DB_REP_ELECTION_RETRY ||
+ which == DB_REP_HEARTBEAT_MONITOR ||
+ which == DB_REP_HEARTBEAT_SEND)
+ repmgr_timeout = 1;
+
+ ENV_NOT_CONFIGURED(
+ env, db_rep->region, "DB_ENV->rep_set_timeout", DB_INIT_REP);
+
+ if (APP_IS_BASEAPI(env) && repmgr_timeout) {
+ __db_errx(env, "%s %s", "DB_ENV->rep_set_timeout:",
+"cannot set Replication Manager timeout from base replication application");
+ return (EINVAL);
+ }
+ if (which == DB_REP_LEASE_TIMEOUT && IS_REP_STARTED(env)) {
+ ret = EINVAL;
+ __db_errx(env, "%s %s", "DB_ENV->rep_set_timeout:",
+"lease timeout must be set before DB_ENV->rep_start.");
+ return (EINVAL);
+ }
+
+ switch (which) {
+ case DB_REP_CHECKPOINT_DELAY:
+ if (REP_ON(env))
+ rep->chkpt_delay = timeout;
+ else
+ db_rep->chkpt_delay = timeout;
+ break;
+ case DB_REP_ELECTION_TIMEOUT:
+ if (REP_ON(env))
+ rep->elect_timeout = timeout;
+ else
+ db_rep->elect_timeout = timeout;
+ break;
+ case DB_REP_FULL_ELECTION_TIMEOUT:
+ if (REP_ON(env))
+ rep->full_elect_timeout = timeout;
+ else
+ db_rep->full_elect_timeout = timeout;
+ break;
+ case DB_REP_LEASE_TIMEOUT:
+ if (REP_ON(env))
+ rep->lease_timeout = timeout;
+ else
+ db_rep->lease_timeout = timeout;
+ break;
+#ifdef HAVE_REPLICATION_THREADS
+ case DB_REP_ACK_TIMEOUT:
+ db_rep->ack_timeout = timeout;
+ break;
+ case DB_REP_CONNECTION_RETRY:
+ db_rep->connection_retry_wait = timeout;
+ break;
+ case DB_REP_ELECTION_RETRY:
+ db_rep->election_retry_wait = timeout;
+ break;
+ case DB_REP_HEARTBEAT_MONITOR:
+ db_rep->heartbeat_monitor_timeout = timeout;
+ break;
+ case DB_REP_HEARTBEAT_SEND:
+ db_rep->heartbeat_frequency = timeout;
+ break;
+#endif
+ default:
+ __db_errx(env,
+ "Unknown timeout type argument to DB_ENV->rep_set_timeout");
+ ret = EINVAL;
+ }
+
+ /* Setting a repmgr timeout makes this a repmgr application */
+ if (ret == 0 && repmgr_timeout)
+ APP_SET_REPMGR(env);
+ return (ret);
+}
+
+/*
+ * PUBLIC: int __rep_get_timeout __P((DB_ENV *, int, db_timeout_t *));
+ */
+int
+__rep_get_timeout(dbenv, which, timeout)
+ DB_ENV *dbenv;
+ int which;
+ db_timeout_t *timeout;
+{
+ DB_REP *db_rep;
+ ENV *env;
+ REP *rep;
+
+ env = dbenv->env;
+ db_rep = env->rep_handle;
+ rep = db_rep->region;
+
+ ENV_NOT_CONFIGURED(
+ env, db_rep->region, "DB_ENV->rep_get_timeout", DB_INIT_REP);
+
+ switch (which) {
+ case DB_REP_CHECKPOINT_DELAY:
+ *timeout = REP_ON(env) ?
+ rep->chkpt_delay : db_rep->chkpt_delay;
+ break;
+ case DB_REP_ELECTION_TIMEOUT:
+ *timeout = REP_ON(env) ?
+ rep->elect_timeout : db_rep->elect_timeout;
+ break;
+ case DB_REP_FULL_ELECTION_TIMEOUT:
+ *timeout = REP_ON(env) ?
+ rep->full_elect_timeout : db_rep->full_elect_timeout;
+ break;
+ case DB_REP_LEASE_TIMEOUT:
+ *timeout = REP_ON(env) ?
+ rep->lease_timeout : db_rep->lease_timeout;
+ break;
+#ifdef HAVE_REPLICATION_THREADS
+ case DB_REP_ACK_TIMEOUT:
+ *timeout = db_rep->ack_timeout;
+ break;
+ case DB_REP_CONNECTION_RETRY:
+ *timeout = db_rep->connection_retry_wait;
+ break;
+ case DB_REP_ELECTION_RETRY:
+ *timeout = db_rep->election_retry_wait;
+ break;
+ case DB_REP_HEARTBEAT_MONITOR:
+ *timeout = db_rep->heartbeat_monitor_timeout;
+ break;
+ case DB_REP_HEARTBEAT_SEND:
+ *timeout = db_rep->heartbeat_frequency;
+ break;
+#endif
+ default:
+ __db_errx(env,
+ "unknown timeout type argument to DB_ENV->rep_get_timeout");
+ return (EINVAL);
+ }
+
+ return (0);
+}
+
+/*
+ * __rep_get_request --
+ * Get the minimum and maximum number of log records that we wait
+ * before retransmitting.
+ *
+ * PUBLIC: int __rep_get_request
+ * PUBLIC: __P((DB_ENV *, db_timeout_t *, db_timeout_t *));
+ */
+int
+__rep_get_request(dbenv, minp, maxp)
+ DB_ENV *dbenv;
+ db_timeout_t *minp, *maxp;
+{
+ DB_REP *db_rep;
+ DB_THREAD_INFO *ip;
+ ENV *env;
+ REP *rep;
+
+ env = dbenv->env;
+ db_rep = env->rep_handle;
+
+ ENV_NOT_CONFIGURED(
+ env, db_rep->region, "DB_ENV->rep_get_request", DB_INIT_REP);
+
+ if (REP_ON(env)) {
+ rep = db_rep->region;
+ ENV_ENTER(env, ip);
+ /*
+ * We acquire the mtx_region or mtx_clientdb mutexes as needed.
+ */
+ REP_SYSTEM_LOCK(env);
+ if (minp != NULL)
+ DB_TIMESPEC_TO_TIMEOUT((*minp), &rep->request_gap, 0);
+ if (maxp != NULL)
+ DB_TIMESPEC_TO_TIMEOUT((*maxp), &rep->max_gap, 0);
+ REP_SYSTEM_UNLOCK(env);
+ ENV_LEAVE(env, ip);
+ } else {
+ if (minp != NULL)
+ DB_TIMESPEC_TO_TIMEOUT((*minp),
+ &db_rep->request_gap, 0);
+ if (maxp != NULL)
+ DB_TIMESPEC_TO_TIMEOUT((*maxp), &db_rep->max_gap, 0);
+ }
+
+ return (0);
+}
+
+/*
+ * __rep_set_request --
+ * Set the minimum and maximum number of log records that we wait
+ * before retransmitting.
+ *
+ * PUBLIC: int __rep_set_request __P((DB_ENV *, db_timeout_t, db_timeout_t));
+ */
+int
+__rep_set_request(dbenv, min, max)
+ DB_ENV *dbenv;
+ db_timeout_t min, max;
+{
+ DB_LOG *dblp;
+ DB_REP *db_rep;
+ DB_THREAD_INFO *ip;
+ ENV *env;
+ LOG *lp;
+ REP *rep;
+
+ env = dbenv->env;
+ db_rep = env->rep_handle;
+
+ ENV_NOT_CONFIGURED(
+ env, db_rep->region, "DB_ENV->rep_set_request", DB_INIT_REP);
+
+ if (min == 0 || max < min) {
+ __db_errx(env,
+ "DB_ENV->rep_set_request: Invalid min or max values");
+ return (EINVAL);
+ }
+ if (REP_ON(env)) {
+ rep = db_rep->region;
+ ENV_ENTER(env, ip);
+ /*
+ * We acquire the mtx_region or mtx_clientdb mutexes as needed.
+ */
+ REP_SYSTEM_LOCK(env);
+ DB_TIMEOUT_TO_TIMESPEC(min, &rep->request_gap);
+ DB_TIMEOUT_TO_TIMESPEC(max, &rep->max_gap);
+ REP_SYSTEM_UNLOCK(env);
+
+ MUTEX_LOCK(env, rep->mtx_clientdb);
+ dblp = env->lg_handle;
+ if (dblp != NULL && (lp = dblp->reginfo.primary) != NULL) {
+ DB_TIMEOUT_TO_TIMESPEC(min, &lp->wait_ts);
+ }
+ MUTEX_UNLOCK(env, rep->mtx_clientdb);
+ ENV_LEAVE(env, ip);
+ } else {
+ DB_TIMEOUT_TO_TIMESPEC(min, &db_rep->request_gap);
+ DB_TIMEOUT_TO_TIMESPEC(max, &db_rep->max_gap);
+ }
+
+ return (0);
+}
+
+/*
+ * __rep_set_transport_pp --
+ * Set the transport function for replication.
+ *
+ * PUBLIC: int __rep_set_transport_pp __P((DB_ENV *, int,
+ * PUBLIC: int (*)(DB_ENV *, const DBT *, const DBT *, const DB_LSN *,
+ * PUBLIC: int, u_int32_t)));
+ */
+int
+__rep_set_transport_pp(dbenv, eid, f_send)
+ DB_ENV *dbenv;
+ int eid;
+ int (*f_send) __P((DB_ENV *,
+ const DBT *, const DBT *, const DB_LSN *, int, u_int32_t));
+{
+ DB_REP *db_rep;
+ ENV *env;
+ int ret;
+
+ env = dbenv->env;
+ db_rep = env->rep_handle;
+ ret = 0;
+
+ ENV_NOT_CONFIGURED(
+ env, db_rep->region, "DB_ENV->rep_set_transport", DB_INIT_REP);
+
+ if (APP_IS_REPMGR(env)) {
+ __db_errx(env,
+"DB_ENV->rep_set_transport: cannot call from Replication Manager application");
+ return (EINVAL);
+ }
+
+ if (f_send == NULL) {
+ __db_errx(env,
+ "DB_ENV->rep_set_transport: no send function specified");
+ return (EINVAL);
+ }
+
+ if (eid < 0) {
+ __db_errx(env,
+ "DB_ENV->rep_set_transport: eid must be greater than or equal to 0");
+ return (EINVAL);
+ }
+
+ if ((ret = __rep_set_transport_int(env, eid, f_send)) == 0)
+ /*
+ * Setting a non-repmgr send function makes this a base API
+ * application.
+ */
+ APP_SET_BASEAPI(env);
+
+ return (ret);
+}
+
+/*
+ * __rep_set_transport_int --
+ * Set the internal values for the transport function for replication.
+ *
+ * PUBLIC: int __rep_set_transport_int __P((ENV *, int,
+ * PUBLIC: int (*)(DB_ENV *, const DBT *, const DBT *, const DB_LSN *,
+ * PUBLIC: int, u_int32_t)));
+ */
+int
+__rep_set_transport_int(env, eid, f_send)
+ ENV *env;
+ int eid;
+ int (*f_send) __P((DB_ENV *,
+ const DBT *, const DBT *, const DB_LSN *, int, u_int32_t));
+{
+ DB_REP *db_rep;
+ REP *rep;
+
+ db_rep = env->rep_handle;
+ db_rep->send = f_send;
+ if (REP_ON(env)) {
+ rep = db_rep->region;
+ rep->eid = eid;
+ } else
+ db_rep->eid = eid;
+ return (0);
+}
+
+/*
+ * PUBLIC: int __rep_get_clockskew __P((DB_ENV *, u_int32_t *, u_int32_t *));
+ */
+int
+__rep_get_clockskew(dbenv, fast_clockp, slow_clockp)
+ DB_ENV *dbenv;
+ u_int32_t *fast_clockp, *slow_clockp;
+{
+ DB_REP *db_rep;
+ DB_THREAD_INFO *ip;
+ ENV *env;
+ REP *rep;
+
+ env = dbenv->env;
+ db_rep = env->rep_handle;
+
+ ENV_NOT_CONFIGURED(
+ env, db_rep->region, "DB_ENV->rep_get_clockskew", DB_INIT_REP);
+
+ if (REP_ON(env)) {
+ rep = db_rep->region;
+ ENV_ENTER(env, ip);
+ REP_SYSTEM_LOCK(env);
+ *fast_clockp = rep->clock_skew;
+ *slow_clockp = rep->clock_base;
+ REP_SYSTEM_UNLOCK(env);
+ ENV_LEAVE(env, ip);
+ } else {
+ *fast_clockp = db_rep->clock_skew;
+ *slow_clockp = db_rep->clock_base;
+ }
+
+ return (0);
+}
+
+/*
+ * PUBLIC: int __rep_set_clockskew __P((DB_ENV *, u_int32_t, u_int32_t));
+ */
+int
+__rep_set_clockskew(dbenv, fast_clock, slow_clock)
+ DB_ENV *dbenv;
+ u_int32_t fast_clock, slow_clock;
+{
+ DB_REP *db_rep;
+ DB_THREAD_INFO *ip;
+ ENV *env;
+ REP *rep;
+ int ret;
+
+ env = dbenv->env;
+ db_rep = env->rep_handle;
+ ret = 0;
+
+ ENV_NOT_CONFIGURED(
+ env, db_rep->region, "DB_ENV->rep_set_clockskew", DB_INIT_REP);
+
+ /*
+ * Check for valid values. The fast clock should be a larger
+ * number than the slow clock. We use the slow clock value as
+ * our base for adjustment - therefore, a 2% difference should
+ * be fast == 102, slow == 100. Check for values being 0. If
+ * they are, then set them both to 1 internally.
+ *
+ * We will use these numbers to compute the larger ratio to be
+ * most conservative about the user's intention.
+ */
+ if (fast_clock == 0 || slow_clock == 0) {
+ /*
+ * If one value is zero, reject if both aren't zero.
+ */
+ if (slow_clock != 0 || fast_clock != 0) {
+ __db_errx(env,
+"DB_ENV->rep_set_clockskew: Zero only valid for when used for both arguments");
+ return (EINVAL);
+ }
+ fast_clock = 1;
+ slow_clock = 1;
+ }
+ if (fast_clock < slow_clock) {
+ __db_errx(env,
+"DB_ENV->rep_set_clockskew: slow_clock value is larger than fast_clock_value");
+ return (EINVAL);
+ }
+ if (REP_ON(env)) {
+ rep = db_rep->region;
+ if (IS_REP_STARTED(env)) {
+ __db_errx(env,
+ "DB_ENV->rep_set_clockskew: must be called before DB_ENV->rep_start");
+ return (EINVAL);
+ }
+ ENV_ENTER(env, ip);
+ REP_SYSTEM_LOCK(env);
+ rep->clock_skew = fast_clock;
+ rep->clock_base = slow_clock;
+ REP_SYSTEM_UNLOCK(env);
+ ENV_LEAVE(env, ip);
+ } else {
+ db_rep->clock_skew = fast_clock;
+ db_rep->clock_base = slow_clock;
+ }
+ return (ret);
+}
+
+/*
+ * __rep_flush --
+ * Re-push the last log record to all clients, in case they've lost
+ * messages and don't know it.
+ *
+ * PUBLIC: int __rep_flush __P((DB_ENV *));
+ */
+int
+__rep_flush(dbenv)
+ DB_ENV *dbenv;
+{
+ DBT rec;
+ DB_LOGC *logc;
+ DB_LSN lsn;
+ DB_REP *db_rep;
+ DB_THREAD_INFO *ip;
+ ENV *env;
+ int ret, t_ret;
+
+ env = dbenv->env;
+ db_rep = env->rep_handle;
+
+ ENV_REQUIRES_CONFIG_XX(
+ env, rep_handle, "DB_ENV->rep_flush", DB_INIT_REP);
+
+ /* We need a transport function because we send messages. */
+ if (db_rep->send == NULL) {
+ __db_errx(env,
+ "DB_ENV->rep_flush: must be called after DB_ENV->rep_set_transport");
+ return (EINVAL);
+ }
+
+ ENV_ENTER(env, ip);
+
+ if ((ret = __log_cursor(env, &logc)) != 0)
+ return (ret);
+
+ memset(&rec, 0, sizeof(rec));
+ memset(&lsn, 0, sizeof(lsn));
+
+ if ((ret = __logc_get(logc, &lsn, &rec, DB_LAST)) != 0)
+ goto err;
+
+ (void)__rep_send_message(env,
+ DB_EID_BROADCAST, REP_LOG, &lsn, &rec, 0, 0);
+
+err: if ((t_ret = __logc_close(logc)) != 0 && ret == 0)
+ ret = t_ret;
+ ENV_LEAVE(env, ip);
+ return (ret);
+}
+
+/*
+ * __rep_sync --
+ * Force a synchronization to occur between this client and the master.
+ * This is the other half of configuring DELAYCLIENT.
+ *
+ * PUBLIC: int __rep_sync __P((DB_ENV *, u_int32_t));
+ */
+int
+__rep_sync(dbenv, flags)
+ DB_ENV *dbenv;
+ u_int32_t flags;
+{
+ DB_LOG *dblp;
+ DB_LSN lsn;
+ DB_REP *db_rep;
+ DB_THREAD_INFO *ip;
+ ENV *env;
+ LOG *lp;
+ REP *rep;
+ int master, ret;
+ u_int32_t repflags, type;
+
+ env = dbenv->env;
+ db_rep = env->rep_handle;
+
+ COMPQUIET(flags, 0);
+
+ ENV_REQUIRES_CONFIG_XX(
+ env, rep_handle, "DB_ENV->rep_sync", DB_INIT_REP);
+
+ /* We need a transport function because we send messages. */
+ if (db_rep->send == NULL) {
+ __db_errx(env,
+ "DB_ENV->rep_sync: must be called after DB_ENV->rep_set_transport");
+ return (EINVAL);
+ }
+
+ dblp = env->lg_handle;
+ lp = dblp->reginfo.primary;
+ rep = db_rep->region;
+ ret = 0;
+
+ ENV_ENTER(env, ip);
+
+ /*
+ * Simple cases. If we're not in the DELAY state we have nothing
+ * to do. If we don't know who the master is, send a MASTER_REQ.
+ */
+ MUTEX_LOCK(env, rep->mtx_clientdb);
+ lsn = lp->verify_lsn;
+ MUTEX_UNLOCK(env, rep->mtx_clientdb);
+ REP_SYSTEM_LOCK(env);
+ master = rep->master_id;
+ if (master == DB_EID_INVALID) {
+ REP_SYSTEM_UNLOCK(env);
+ (void)__rep_send_message(env, DB_EID_BROADCAST,
+ REP_MASTER_REQ, NULL, NULL, 0, 0);
+ goto out;
+ }
+ /*
+ * We want to hold the rep mutex to test and then clear the
+ * DELAY flag. Racing threads in here could otherwise result
+ * in dual data streams.
+ */
+ if (!F_ISSET(rep, REP_F_DELAY)) {
+ REP_SYSTEM_UNLOCK(env);
+ goto out;
+ }
+
+ DB_ASSERT(env,
+ !IS_USING_LEASES(env) || __rep_islease_granted(env) == 0);
+
+ /*
+ * If we get here, we clear the delay flag and kick off a
+ * synchronization. From this point forward, we will
+ * synchronize until the next time the master changes.
+ */
+ F_CLR(rep, REP_F_DELAY);
+ if (IS_ZERO_LSN(lsn) && FLD_ISSET(rep->config, REP_C_NOAUTOINIT)) {
+ F_CLR(rep, REP_F_NOARCHIVE | REP_F_RECOVER_MASK);
+ ret = DB_REP_JOIN_FAILURE;
+ REP_SYSTEM_UNLOCK(env);
+ goto out;
+ }
+ REP_SYSTEM_UNLOCK(env);
+ /*
+ * When we set REP_F_DELAY, we set verify_lsn to the real verify lsn if
+ * we need to verify, or we zeroed it out if this is a client that needs
+ * internal init. So, send the type of message now that
+ * __rep_new_master delayed sending.
+ */
+ if (IS_ZERO_LSN(lsn)) {
+ DB_ASSERT(env, F_ISSET(rep, REP_F_RECOVER_UPDATE));
+ type = REP_UPDATE_REQ;
+ repflags = 0;
+ } else {
+ DB_ASSERT(env, F_ISSET(rep, REP_F_RECOVER_VERIFY));
+ type = REP_VERIFY_REQ;
+ repflags = DB_REP_ANYWHERE;
+ }
+ (void)__rep_send_message(env, master, type, &lsn, NULL, 0, repflags);
+
+out: ENV_LEAVE(env, ip);
+ return (ret);
+}
+
+/*
+ * __rep_conv_vers --
+ * Convert from a log version to the replication message version
+ * that release used.
+ */
+static u_int32_t
+__rep_conv_vers(env, log_ver)
+ ENV *env;
+ u_int32_t log_ver;
+{
+ COMPQUIET(env, NULL);
+
+ /*
+ * We can't use a switch statement, some of the DB_LOGVERSION_XX
+ * constants are the same
+ */
+ if (log_ver == DB_LOGVERSION)
+ return (DB_REPVERSION);
+ if (log_ver == DB_LOGVERSION_44)
+ return (DB_REPVERSION_44);
+ if (log_ver == DB_LOGVERSION_45)
+ return (DB_REPVERSION_45);
+ if (log_ver == DB_LOGVERSION_46)
+ return (DB_REPVERSION_46);
+ if (log_ver == DB_LOGVERSION_47)
+ return (DB_REPVERSION_47);
+ return (DB_REPVERSION_INVALID);
+}