diff options
Diffstat (limited to 'db-4.8.30/rep/rep_record.c')
-rw-r--r-- | db-4.8.30/rep/rep_record.c | 2379 |
1 files changed, 2379 insertions, 0 deletions
diff --git a/db-4.8.30/rep/rep_record.c b/db-4.8.30/rep/rep_record.c new file mode 100644 index 0000000..7196ca2 --- /dev/null +++ b/db-4.8.30/rep/rep_record.c @@ -0,0 +1,2379 @@ +/*- + * See the file LICENSE for redistribution information. + * + * Copyright (c) 2001-2009 Oracle. All rights reserved. + * + * $Id$ + */ + +#include "db_config.h" + +#include "db_int.h" +#include "dbinc/db_page.h" +#include "dbinc/db_am.h" +#include "dbinc/lock.h" +#include "dbinc/log.h" +#include "dbinc/mp.h" +#include "dbinc/txn.h" + +static int __rep_collect_txn __P((ENV *, DB_LSN *, LSN_COLLECTION *)); +static int __rep_do_ckp __P((ENV *, DBT *, __rep_control_args *)); +static int __rep_fire_newmaster __P((ENV *, u_int32_t, int)); +static int __rep_fire_startupdone __P((ENV *, u_int32_t, int)); +static int __rep_getnext __P((ENV *, DB_THREAD_INFO *)); +static int __rep_lsn_cmp __P((const void *, const void *)); +static int __rep_newfile __P((ENV *, __rep_control_args *, DBT *)); +static int __rep_process_rec __P((ENV *, DB_THREAD_INFO *, __rep_control_args *, + DBT *, db_timespec *, DB_LSN *)); +static int __rep_remfirst __P((ENV *, DB_THREAD_INFO *, DBT *, DBT *)); +static int __rep_skip_msg __P((ENV *, REP *, int, u_int32_t)); + +/* Used to consistently designate which messages ought to be received where. */ + +#define MASTER_ONLY(rep, rp) do { \ + if (!F_ISSET(rep, REP_F_MASTER)) { \ + RPRINT(env, DB_VERB_REP_MSGS, \ + (env, "Master record received on client")); \ + REP_PRINT_MESSAGE(env, \ + eid, rp, "rep_process_message", 0); \ + /* Just skip/ignore it. */ \ + ret = 0; \ + goto errlock; \ + } \ +} while (0) + +#define CLIENT_ONLY(rep, rp) do { \ + if (!F_ISSET(rep, REP_F_CLIENT)) { \ + RPRINT(env, DB_VERB_REP_MSGS, \ + (env, "Client record received on master")); \ + /* \ + * Only broadcast DUPMASTER if leases are not \ + * in effect. If I am an old master, using \ + * leases and I get a newer message, my leases \ + * had better all be expired. \ + */ \ + if (IS_USING_LEASES(env)) \ + DB_ASSERT(env, \ + __rep_lease_check(env, 0) == \ + DB_REP_LEASE_EXPIRED); \ + else { \ + REP_PRINT_MESSAGE(env, \ + eid, rp, "rep_process_message", 0); \ + (void)__rep_send_message(env, DB_EID_BROADCAST, \ + REP_DUPMASTER, NULL, NULL, 0, 0); \ + } \ + ret = DB_REP_DUPMASTER; \ + goto errlock; \ + } \ +} while (0) + +/* + * If a client is attempting to service a request it does not have, + * call rep_skip_msg to skip this message and force a rerequest to the + * sender. We don't hold the mutex for the stats and may miscount. + */ +#define CLIENT_REREQ do { \ + if (F_ISSET(rep, REP_F_CLIENT)) { \ + STAT(rep->stat.st_client_svc_req++); \ + if (ret == DB_NOTFOUND) { \ + STAT(rep->stat.st_client_svc_miss++); \ + ret = __rep_skip_msg(env, rep, eid, rp->rectype);\ + } \ + } \ +} while (0) + +#define MASTER_UPDATE(env, renv) do { \ + REP_SYSTEM_LOCK(env); \ + F_SET((renv), DB_REGENV_REPLOCKED); \ + (void)time(&(renv)->op_timestamp); \ + REP_SYSTEM_UNLOCK(env); \ +} while (0) + +#define RECOVERING_SKIP do { \ + if (IS_REP_CLIENT(env) && recovering) { \ + /* Not holding region mutex, may miscount */ \ + STAT(rep->stat.st_msgs_recover++); \ + ret = __rep_skip_msg(env, rep, eid, rp->rectype); \ + goto errlock; \ + } \ +} while (0) + +/* + * If we're recovering the log we only want log records that are in the + * range we need to recover. Otherwise we can end up storing a huge + * number of "new" records, only to truncate the temp database later after + * we run recovery. If we are actively delaying a sync-up, we also skip + * all incoming log records until the application requests sync-up. + */ +#define RECOVERING_LOG_SKIP do { \ + if (F_ISSET(rep, REP_F_DELAY) || \ + rep->master_id == DB_EID_INVALID || \ + (recovering && \ + (!F_ISSET(rep, REP_F_RECOVER_LOG) || \ + LOG_COMPARE(&rp->lsn, &rep->last_lsn) > 0))) { \ + /* Not holding region mutex, may miscount */ \ + STAT(rep->stat.st_msgs_recover++); \ + ret = __rep_skip_msg(env, rep, eid, rp->rectype); \ + goto errlock; \ + } \ +} while (0) + +#define ANYSITE(rep) + +/* + * __rep_process_message_pp -- + * + * This routine takes an incoming message and processes it. + * + * control: contains the control fields from the record + * rec: contains the actual record + * eid: the environment id of the sender of the message; + * ret_lsnp: On DB_REP_ISPERM and DB_REP_NOTPERM returns, contains the + * lsn of the maximum permanent or current not permanent log record + * (respectively). + * + * PUBLIC: int __rep_process_message_pp + * PUBLIC: __P((DB_ENV *, DBT *, DBT *, int, DB_LSN *)); + */ +int +__rep_process_message_pp(dbenv, control, rec, eid, ret_lsnp) + DB_ENV *dbenv; + DBT *control, *rec; + int eid; + DB_LSN *ret_lsnp; +{ + ENV *env; + int ret; + + env = dbenv->env; + ret = 0; + + ENV_REQUIRES_CONFIG_XX( + env, rep_handle, "DB_ENV->rep_process_message", DB_INIT_REP); + + if (APP_IS_REPMGR(env)) { + __db_errx(env, "%s %s", "DB_ENV->rep_process_message:", + "cannot call from Replication Manager application"); + return (EINVAL); + } + + /* Control argument must be non-Null. */ + if (control == NULL || control->size == 0) { + __db_errx(env, + "DB_ENV->rep_process_message: control argument must be specified"); + return (EINVAL); + } + + /* + * Make sure site is a master or a client, which implies that + * replication has been started. + */ + if (!IS_REP_MASTER(env) && !IS_REP_CLIENT(env)) { + __db_errx(env, + "Environment not configured as replication master or client"); + return (EINVAL); + } + + if ((ret = __dbt_usercopy(env, control)) != 0 || + (ret = __dbt_usercopy(env, rec)) != 0) { + __dbt_userfree(env, control, rec, NULL); + __db_errx(env, + "DB_ENV->rep_process_message: error retrieving DBT contents"); + return ret; + } + + ret = __rep_process_message_int(env, control, rec, eid, ret_lsnp); + + return (ret); +} + +/* + * __rep_process_message_int -- + * + * This routine performs the internal steps to process an incoming message. + * + * PUBLIC: int __rep_process_message_int + * PUBLIC: __P((ENV *, DBT *, DBT *, int, DB_LSN *)); + */ +int +__rep_process_message_int(env, control, rec, eid, ret_lsnp) + ENV *env; + DBT *control, *rec; + int eid; + DB_LSN *ret_lsnp; +{ + DBT data_dbt; + DB_LOG *dblp; + DB_LSN last_lsn, lsn; + DB_REP *db_rep; + DB_THREAD_INFO *ip; + LOG *lp; + REGENV *renv; + REGINFO *infop; + REP *rep; + REP_46_CONTROL *rp46; + REP_OLD_CONTROL *orp; + __rep_control_args *rp, tmprp; + __rep_egen_args egen_arg; + size_t len; + u_int32_t gen, rep_version; + int cmp, do_sync, lockout, recovering, ret, t_ret; + time_t savetime; + u_int8_t buf[__REP_MAXMSG_SIZE]; + + ret = 0; + do_sync = 0; + lockout = 0; + db_rep = env->rep_handle; + rep = db_rep->region; + dblp = env->lg_handle; + lp = dblp->reginfo.primary; + infop = env->reginfo; + renv = infop->primary; + /* + * Casting this to REP_OLD_CONTROL is just kind of stylistic: the + * rep_version field of course has to be in the same offset in all + * versions in order for this to work. + * + * We can look at the rep_version unswapped here because if we're + * talking to an old version, it will always be unswapped. If + * we're talking to a new version, the only issue is if it is + * swapped and we take one of the old version conditionals + * incorrectly. The rep_version would need to be very, very + * large for a swapped version to look like a small, older + * version. There is no problem here looking at it unswapped. + */ + rep_version = ((REP_OLD_CONTROL *)control->data)->rep_version; + if (rep_version <= DB_REPVERSION_45) { + orp = (REP_OLD_CONTROL *)control->data; + if (rep_version == DB_REPVERSION_45 && + F_ISSET(orp, REPCTL_INIT_45)) { + F_CLR(orp, REPCTL_INIT_45); + F_SET(orp, REPCTL_INIT); + } + tmprp.rep_version = orp->rep_version; + tmprp.log_version = orp->log_version; + tmprp.lsn = orp->lsn; + tmprp.rectype = orp->rectype; + tmprp.gen = orp->gen; + tmprp.flags = orp->flags; + tmprp.msg_sec = 0; + tmprp.msg_nsec = 0; + } else if (rep_version == DB_REPVERSION_46) { + rp46 = (REP_46_CONTROL *)control->data; + tmprp.rep_version = rp46->rep_version; + tmprp.log_version = rp46->log_version; + tmprp.lsn = rp46->lsn; + tmprp.rectype = rp46->rectype; + tmprp.gen = rp46->gen; + tmprp.flags = rp46->flags; + tmprp.msg_sec = (u_int32_t)rp46->msg_time.tv_sec; + tmprp.msg_nsec = (u_int32_t)rp46->msg_time.tv_nsec; + } else + if ((ret = __rep_control_unmarshal(env, &tmprp, + control->data, control->size, NULL)) != 0) + return (ret); + rp = &tmprp; + if (ret_lsnp != NULL) + ZERO_LSN(*ret_lsnp); + + ENV_ENTER(env, ip); + + REP_PRINT_MESSAGE(env, eid, rp, "rep_process_message", 0); + /* + * Check the version number for both rep and log. If it is + * an old version we support, convert it. Otherwise complain. + */ + if (rp->rep_version < DB_REPVERSION) { + if (rp->rep_version < DB_REPVERSION_MIN) { + __db_errx(env, + "unsupported old replication message version %lu, minimum version %d", + (u_long)rp->rep_version, DB_REPVERSION_MIN); + ret = EINVAL; + goto errlock; + } + RPRINT(env, DB_VERB_REP_MSGS, (env, + "Received record %lu with old rep version %lu", + (u_long)rp->rectype, (u_long)rp->rep_version)); + rp->rectype = __rep_msg_from_old(rp->rep_version, rp->rectype); + DB_ASSERT(env, rp->rectype != REP_INVALID); + /* + * We should have a valid new record type for all the old + * versions. + */ + RPRINT(env, DB_VERB_REP_MSGS, (env, + "Converted to record %lu with old rep version %lu", + (u_long)rp->rectype, (u_long)rp->rep_version)); + } else if (rp->rep_version > DB_REPVERSION) { + __db_errx(env, + "unexpected replication message version %lu, expected %d", + (u_long)rp->rep_version, DB_REPVERSION); + ret = EINVAL; + goto errlock; + } + + if (rp->log_version < DB_LOGVERSION) { + if (rp->log_version < DB_LOGVERSION_MIN) { + __db_errx(env, + "unsupported old replication log version %lu, minimum version %d", + (u_long)rp->log_version, DB_LOGVERSION_MIN); + ret = EINVAL; + goto errlock; + } + RPRINT(env, DB_VERB_REP_MSGS, (env, + "Received record %lu with old log version %lu", + (u_long)rp->rectype, (u_long)rp->log_version)); + } else if (rp->log_version > DB_LOGVERSION) { + __db_errx(env, + "unexpected log record version %lu, expected %d", + (u_long)rp->log_version, DB_LOGVERSION); + ret = EINVAL; + goto errlock; + } + + /* + * Acquire the replication lock. + */ + REP_SYSTEM_LOCK(env); + if (F_ISSET(rep, REP_F_READY_MSG)) { + /* + * If we're racing with a thread in rep_start, then + * just ignore the message and return. + */ + RPRINT(env, DB_VERB_REP_MSGS, (env, + "Racing replication msg lockout, ignore message.")); + if (F_ISSET(rp, REPCTL_PERM)) + ret = DB_REP_IGNORE; + REP_SYSTEM_UNLOCK(env); + /* + * If another client has sent a c2c request to us, it may be a + * long time before it resends the request (due to its dual data + * streams avoidance heuristic); let it know we can't serve the + * request just now. + */ + if (F_ISSET(rep, REP_F_CLIENT) && REP_MSG_REQ(rp->rectype)) { + STAT(rep->stat.st_client_svc_req++); + STAT(rep->stat.st_client_svc_miss++); + (void)__rep_send_message(env, + eid, REP_REREQUEST, NULL, NULL, 0, 0); + } + goto out; + } + rep->msg_th++; + gen = rep->gen; + recovering = F_ISSET(rep, REP_F_RECOVER_MASK); + savetime = renv->rep_timestamp; + + STAT(rep->stat.st_msgs_processed++); + REP_SYSTEM_UNLOCK(env); + + /* + * Check for lease configuration matching. Leases must be + * configured all or none. If I am a client and I receive a + * message requesting a lease, and I'm not using leases, that + * is an error. + */ + if (!IS_USING_LEASES(env) && + (F_ISSET(rp, REPCTL_LEASE) || rp->rectype == REP_LEASE_GRANT)) { + __db_errx(env, + "Inconsistent lease configuration"); + RPRINT(env, DB_VERB_REP_MSGS, (env, + "Client received lease message and not using leases")); + ret = EINVAL; + ret = __env_panic(env, ret); + goto errlock; + } + + /* + * Check for generation number matching. Ignore any old messages + * except requests that are indicative of a new client that needs + * to get in sync. + */ + if (rp->gen < gen && rp->rectype != REP_ALIVE_REQ && + rp->rectype != REP_NEWCLIENT && rp->rectype != REP_MASTER_REQ && + rp->rectype != REP_DUPMASTER && rp->rectype != REP_VOTE1) { + /* + * We don't hold the rep mutex, and could miscount if we race. + */ + STAT(rep->stat.st_msgs_badgen++); + if (F_ISSET(rp, REPCTL_PERM)) + ret = DB_REP_IGNORE; + goto errlock; + } + + if (rp->gen > gen) { + /* + * If I am a master and am out of date with a lower generation + * number, I am in bad shape and should downgrade. + */ + if (F_ISSET(rep, REP_F_MASTER)) { + STAT(rep->stat.st_dupmasters++); + ret = DB_REP_DUPMASTER; + /* + * Only broadcast DUPMASTER if leases are not + * in effect. If I am an old master, using + * leases and I get a newer message, my leases + * had better all be expired. + */ + if (IS_USING_LEASES(env)) + DB_ASSERT(env, + __rep_lease_check(env, 0) == + DB_REP_LEASE_EXPIRED); + else if (rp->rectype != REP_DUPMASTER) + (void)__rep_send_message(env, + DB_EID_BROADCAST, REP_DUPMASTER, + NULL, NULL, 0, 0); + goto errlock; + } + + /* + * I am a client and am out of date. If this is an election, + * or a response from the first site I contacted, then I can + * accept the generation number and participate in future + * elections and communication. Otherwise, I need to hear about + * a new master and sync up. + */ + if (rp->rectype == REP_ALIVE || + rp->rectype == REP_VOTE1 || rp->rectype == REP_VOTE2) { + REP_SYSTEM_LOCK(env); + RPRINT(env, DB_VERB_REP_MSGS, (env, + "Updating gen from %lu to %lu", + (u_long)gen, (u_long)rp->gen)); + rep->master_id = DB_EID_INVALID; + gen = rep->gen = rp->gen; + /* + * Updating of egen will happen when we process the + * message below for each message type. + */ + REP_SYSTEM_UNLOCK(env); + if (rp->rectype == REP_ALIVE) + (void)__rep_send_message(env, + DB_EID_BROADCAST, REP_MASTER_REQ, NULL, + NULL, 0, 0); + } else if (rp->rectype != REP_NEWMASTER) { + /* + * Ignore this message, retransmit if needed. + */ + if (__rep_check_doreq(env, rep)) + (void)__rep_send_message(env, + DB_EID_BROADCAST, REP_MASTER_REQ, + NULL, NULL, 0, 0); + goto errlock; + } + /* + * If you get here, then you're a client and either you're + * in an election or you have a NEWMASTER or an ALIVE message + * whose processing will do the right thing below. + */ + } + + /* + * If the sender is part of an established group, so are we now. + */ + if (F_ISSET(rp, REPCTL_GROUP_ESTD)) { + REP_SYSTEM_LOCK(env); +#ifdef DIAGNOSTIC + if (!F_ISSET(rep, REP_F_GROUP_ESTD)) + RPRINT(env, DB_VERB_REP_MSGS, (env, + "I am now part of an established group")); +#endif + F_SET(rep, REP_F_GROUP_ESTD); + REP_SYSTEM_UNLOCK(env); + } + + /* + * We need to check if we're in recovery and if we are + * then we need to ignore any messages except VERIFY*, VOTE*, + * NEW* and ALIVE_REQ, or backup related messages: UPDATE*, + * PAGE* and FILE*. We need to also accept LOG messages + * if we're copying the log for recovery/backup. + */ + switch (rp->rectype) { + case REP_ALIVE: + /* + * Handle even if we're recovering. + */ + ANYSITE(rep); + if (rp->rep_version < DB_REPVERSION_47) + egen_arg.egen = *(u_int32_t *)rec->data; + else if ((ret = __rep_egen_unmarshal(env, &egen_arg, + rec->data, rec->size, NULL)) != 0) + return (ret); + REP_SYSTEM_LOCK(env); + RPRINT(env, DB_VERB_REP_MSGS, (env, + "Received ALIVE egen of %lu, mine %lu", + (u_long)egen_arg.egen, (u_long)rep->egen)); + if (egen_arg.egen > rep->egen) { + /* + * We're changing egen, need to clear out any old + * election information. We need to set the + * REP_F_EGENUPDATE flag here so that any thread + * waiting in rep_elect/rep_wait can distinguish + * this situation (and restart its election) from + * a current master saying it is still master and + * the egen getting incremented on that path. + */ + __rep_elect_done(env, rep, 0); + rep->egen = egen_arg.egen; + F_SET(rep, REP_F_EGENUPDATE); + } + REP_SYSTEM_UNLOCK(env); + break; + case REP_ALIVE_REQ: + /* + * Handle even if we're recovering. + */ + ANYSITE(rep); + LOG_SYSTEM_LOCK(env); + lsn = lp->lsn; + LOG_SYSTEM_UNLOCK(env); +#ifdef CONFIG_TEST + /* + * Send this first, before the ALIVE message because of the + * way the test suite and messaging is done sequentially. + * In some sequences it is possible to get into a situation + * where the test suite cannot get the later NEWMASTER because + * we break out of the messaging loop too early. + */ + if (F_ISSET(rep, REP_F_MASTER)) + (void)__rep_send_message(env, + DB_EID_BROADCAST, REP_NEWMASTER, &lsn, NULL, 0, 0); +#endif + REP_SYSTEM_LOCK(env); + egen_arg.egen = rep->egen; + REP_SYSTEM_UNLOCK(env); + if (rep->version < DB_REPVERSION_47) + DB_INIT_DBT(data_dbt, &egen_arg.egen, + sizeof(egen_arg.egen)); + else { + if ((ret = __rep_egen_marshal(env, + &egen_arg, buf, __REP_EGEN_SIZE, &len)) != 0) + goto errlock; + DB_INIT_DBT(data_dbt, buf, len); + } + (void)__rep_send_message(env, + eid, REP_ALIVE, &lsn, &data_dbt, 0, 0); + break; + case REP_ALL_REQ: + RECOVERING_SKIP; + ret = __rep_allreq(env, rp, eid); + CLIENT_REREQ; + break; + case REP_BULK_LOG: + RECOVERING_LOG_SKIP; + CLIENT_ONLY(rep, rp); + ret = __rep_bulk_log(env, ip, rp, rec, savetime, ret_lsnp); + break; + case REP_BULK_PAGE: + /* + * Handle even if we're recovering. + */ + CLIENT_ONLY(rep, rp); + ret = __rep_bulk_page(env, ip, eid, rp, rec); + break; + case REP_DUPMASTER: + /* + * Handle even if we're recovering. + */ + if (F_ISSET(rep, REP_F_MASTER)) + ret = DB_REP_DUPMASTER; + break; +#ifdef NOTYET + case REP_FILE: /* TODO */ + CLIENT_ONLY(rep, rp); + break; + case REP_FILE_REQ: + ret = __rep_send_file(env, rec, eid); + break; +#endif + case REP_FILE_FAIL: + /* + * Handle even if we're recovering. + */ + CLIENT_ONLY(rep, rp); + /* + * Clean up any internal init that was in progress. + */ + if (eid == rep->master_id) { + REP_SYSTEM_LOCK(env); + /* + * If we're already locking out messages, give up. + */ + if (F_ISSET(rep, REP_F_READY_MSG)) + goto errhlk; + /* + * Lock out other messages to prevent race + * conditions. + */ + if ((ret = + __rep_lockout_msg(env, rep, 1)) != 0) { + goto errhlk; + } + lockout = 1; + /* + * Need mtx_clientdb to safely clean up + * page database in __rep_init_cleanup(). + */ + REP_SYSTEM_UNLOCK(env); + MUTEX_LOCK(env, rep->mtx_clientdb); + REP_SYSTEM_LOCK(env); + /* + * Clean up internal init if one was in progress. + */ + if (F_ISSET(rep, REP_F_READY_API | REP_F_READY_OP)) { + RPRINT(env, DB_VERB_REP_MSGS, (env, + "FILE_FAIL is cleaning up old internal init")); +#ifdef CONFIG_TEST + STAT(rep->stat.st_filefail_cleanups++); +#endif + ret = __rep_init_cleanup(env, rep, DB_FORCE); + F_CLR(rep, + REP_F_ABBREVIATED | REP_F_RECOVER_MASK); + } + MUTEX_UNLOCK(env, rep->mtx_clientdb); + if (ret != 0) { + RPRINT(env, DB_VERB_REP_MSGS, (env, + "FILE_FAIL error cleaning up internal init: %d", ret)); + goto errhlk; + } + F_CLR(rep, REP_F_READY_MSG); + lockout = 0; + /* + * Restart internal init, setting UPDATE flag and + * zeroing applicable LSNs. + */ + F_SET(rep, REP_F_RECOVER_UPDATE); + ZERO_LSN(rep->first_lsn); + ZERO_LSN(rep->ckp_lsn); + REP_SYSTEM_UNLOCK(env); + (void)__rep_send_message(env, eid, REP_UPDATE_REQ, + NULL, NULL, 0, 0); + } + break; + case REP_LEASE_GRANT: + /* + * Handle even if we're recovering. + */ + MASTER_ONLY(rep, rp); + ret = __rep_lease_grant(env, rp, rec, eid); + break; + case REP_LOG: + case REP_LOG_MORE: + RECOVERING_LOG_SKIP; + CLIENT_ONLY(rep, rp); + ret = __rep_log(env, ip, rp, rec, savetime, ret_lsnp); + break; + case REP_LOG_REQ: + RECOVERING_SKIP; + if (F_ISSET(rp, REPCTL_INIT)) + MASTER_UPDATE(env, renv); + ret = __rep_logreq(env, rp, rec, eid); + CLIENT_REREQ; + break; + case REP_NEWSITE: + /* + * Handle even if we're recovering. + */ + /* We don't hold the rep mutex, and may miscount. */ + STAT(rep->stat.st_newsites++); + + /* This is a rebroadcast; simply tell the application. */ + if (F_ISSET(rep, REP_F_MASTER)) { + dblp = env->lg_handle; + lp = dblp->reginfo.primary; + LOG_SYSTEM_LOCK(env); + lsn = lp->lsn; + LOG_SYSTEM_UNLOCK(env); + (void)__rep_send_message(env, + eid, REP_NEWMASTER, &lsn, NULL, 0, 0); + } + ret = DB_REP_NEWSITE; + break; + case REP_NEWCLIENT: + /* + * Handle even if we're recovering. + */ + /* + * This message was received and should have resulted in the + * application entering the machine ID in its machine table. + * We respond to this with an ALIVE to send relevant information + * to the new client (if we are a master, we'll send a + * NEWMASTER, so we only need to send the ALIVE if we're a + * client). But first, broadcast the new client's record to + * all the clients. + */ + (void)__rep_send_message(env, + DB_EID_BROADCAST, REP_NEWSITE, &rp->lsn, rec, 0, 0); + + ret = DB_REP_NEWSITE; + + if (F_ISSET(rep, REP_F_CLIENT)) { + REP_SYSTEM_LOCK(env); + egen_arg.egen = rep->egen; + + /* + * Clean up any previous master remnants by making + * master_id invalid and cleaning up any internal + * init that was in progress. + */ + if (eid == rep->master_id) { + rep->master_id = DB_EID_INVALID; + + /* + * Already locking out messages, must be + * in sync-up recover or internal init, + * give up. + */ + if (F_ISSET(rep, REP_F_READY_MSG)) + goto errhlk; + + /* + * Lock out other messages to prevent race + * conditions. + */ + if ((t_ret = + __rep_lockout_msg(env, rep, 1)) != 0) { + ret = t_ret; + goto errhlk; + } + lockout = 1; + + /* + * Need mtx_clientdb to safely clean up + * page database in __rep_init_cleanup(). + */ + REP_SYSTEM_UNLOCK(env); + MUTEX_LOCK(env, rep->mtx_clientdb); + REP_SYSTEM_LOCK(env); + + /* + * Clean up internal init if one was in + * progress. + */ + if (F_ISSET(rep, REP_F_READY_API | + REP_F_READY_OP)) { + RPRINT(env, DB_VERB_REP_MSGS, (env, + "NEWCLIENT is cleaning up old internal init for invalid master")); + t_ret = __rep_init_cleanup(env, + rep, DB_FORCE); + F_CLR(rep, REP_F_ABBREVIATED | + REP_F_RECOVER_MASK); + } + MUTEX_UNLOCK(env, rep->mtx_clientdb); + if (t_ret != 0) { + ret = t_ret; + RPRINT(env, DB_VERB_REP_MSGS, (env, + "NEWCLIENT error cleaning up internal init for invalid master: %d", ret)); + goto errhlk; + } + F_CLR(rep, REP_F_READY_MSG); + lockout = 0; + } + REP_SYSTEM_UNLOCK(env); + if (rep->version < DB_REPVERSION_47) + DB_INIT_DBT(data_dbt, &egen_arg.egen, + sizeof(egen_arg.egen)); + else { + if ((ret = __rep_egen_marshal(env, &egen_arg, + buf, __REP_EGEN_SIZE, &len)) != 0) + goto errlock; + DB_INIT_DBT(data_dbt, buf, len); + } + (void)__rep_send_message(env, DB_EID_BROADCAST, + REP_ALIVE, &rp->lsn, &data_dbt, 0, 0); + break; + } + /* FALLTHROUGH */ + case REP_MASTER_REQ: + RECOVERING_SKIP; + if (F_ISSET(rep, REP_F_MASTER)) { + LOG_SYSTEM_LOCK(env); + lsn = lp->lsn; + LOG_SYSTEM_UNLOCK(env); + (void)__rep_send_message(env, + DB_EID_BROADCAST, REP_NEWMASTER, &lsn, NULL, 0, 0); + if (IS_USING_LEASES(env)) + (void)__rep_lease_refresh(env); + } + /* + * If there is no master, then we could get into a state + * where an old client lost the initial ALIVE message and + * is calling an election under an old gen and can + * never get to the current gen. + */ + if (F_ISSET(rep, REP_F_CLIENT) && rp->gen < gen) { + REP_SYSTEM_LOCK(env); + egen_arg.egen = rep->egen; + if (eid == rep->master_id) + rep->master_id = DB_EID_INVALID; + REP_SYSTEM_UNLOCK(env); + if (rep->version < DB_REPVERSION_47) + DB_INIT_DBT(data_dbt, &egen_arg.egen, + sizeof(egen_arg.egen)); + else { + if ((ret = __rep_egen_marshal(env, &egen_arg, + buf, __REP_EGEN_SIZE, &len)) != 0) + goto errlock; + DB_INIT_DBT(data_dbt, buf, len); + } + (void)__rep_send_message(env, eid, + REP_ALIVE, &rp->lsn, &data_dbt, 0, 0); + } + break; + case REP_NEWFILE: + RECOVERING_LOG_SKIP; + CLIENT_ONLY(rep, rp); + ret = __rep_apply(env, + ip, rp, rec, ret_lsnp, NULL, &last_lsn); + if (ret == DB_REP_LOGREADY) + ret = __rep_logready(env, rep, savetime, &last_lsn); + break; + case REP_NEWMASTER: + /* + * Handle even if we're recovering. + */ + ANYSITE(rep); + if (F_ISSET(rep, REP_F_MASTER) && + eid != rep->eid) { + /* We don't hold the rep mutex, and may miscount. */ + STAT(rep->stat.st_dupmasters++); + ret = DB_REP_DUPMASTER; + if (IS_USING_LEASES(env)) + DB_ASSERT(env, + __rep_lease_check(env, 0) == + DB_REP_LEASE_EXPIRED); + else + (void)__rep_send_message(env, + DB_EID_BROADCAST, REP_DUPMASTER, + NULL, NULL, 0, 0); + break; + } + if ((ret = + __rep_new_master(env, rp, eid)) == DB_REP_NEWMASTER) + ret = __rep_fire_newmaster(env, rp->gen, eid); + break; + case REP_PAGE: + case REP_PAGE_MORE: + /* + * Handle even if we're recovering. + */ + CLIENT_ONLY(rep, rp); + ret = __rep_page(env, ip, eid, rp, rec); + if (ret == DB_REP_PAGEDONE) + ret = 0; + break; + case REP_PAGE_FAIL: + /* + * Handle even if we're recovering. + */ + CLIENT_ONLY(rep, rp); + ret = __rep_page_fail(env, ip, eid, rp, rec); + break; + case REP_PAGE_REQ: + RECOVERING_SKIP; + MASTER_UPDATE(env, renv); + ret = __rep_page_req(env, ip, eid, rp, rec); + CLIENT_REREQ; + break; + case REP_REREQUEST: + /* + * Handle even if we're recovering. Don't do a master + * check. + */ + CLIENT_ONLY(rep, rp); + /* + * Don't hold any mutex, may miscount. + */ + STAT(rep->stat.st_client_rerequests++); + ret = __rep_resend_req(env, 1); + break; + case REP_START_SYNC: + RECOVERING_SKIP; + MUTEX_LOCK(env, rep->mtx_clientdb); + cmp = LOG_COMPARE(&rp->lsn, &lp->ready_lsn); + /* + * The comparison needs to be <= because the LSN in + * the message can be the LSN of the first outstanding + * txn, which may be the LSN immediately after the + * previous commit. The ready_lsn is the LSN of the + * next record expected. In that case, the LSNs + * could be equal and the client has the commit and + * wants to sync. [SR #15338] + */ + if (cmp <= 0) { + MUTEX_UNLOCK(env, rep->mtx_clientdb); + do_sync = 1; + } else { + STAT(rep->stat.st_startsync_delayed++); + /* + * There are cases where keeping the first ckp_lsn + * LSN is advantageous and cases where keeping + * a later LSN is better. If random, earlier + * log records are missing, keeping the later + * LSN seems to be better. That is what we'll + * do for now. + */ + if (LOG_COMPARE(&rp->lsn, &rep->ckp_lsn) > 0) + rep->ckp_lsn = rp->lsn; + RPRINT(env, DB_VERB_REP_MSGS, (env, + "Delayed START_SYNC memp_sync due to missing records.")); + RPRINT(env, DB_VERB_REP_MSGS, (env, + "ready LSN [%lu][%lu], ckp_lsn [%lu][%lu]", + (u_long)lp->ready_lsn.file, (u_long)lp->ready_lsn.offset, + (u_long)rep->ckp_lsn.file, (u_long)rep->ckp_lsn.offset)); + MUTEX_UNLOCK(env, rep->mtx_clientdb); + } + break; + case REP_UPDATE: + /* + * Handle even if we're recovering. + */ + CLIENT_ONLY(rep, rp); + ret = __rep_update_setup(env, eid, rp, rec, savetime); + break; + case REP_UPDATE_REQ: + /* + * Handle even if we're recovering. + */ + MASTER_ONLY(rep, rp); + infop = env->reginfo; + renv = infop->primary; + MASTER_UPDATE(env, renv); + ret = __rep_update_req(env, rp, eid); + break; + case REP_VERIFY: + if (recovering) { + MUTEX_LOCK(env, rep->mtx_clientdb); + cmp = LOG_COMPARE(&lp->verify_lsn, &rp->lsn); + MUTEX_UNLOCK(env, rep->mtx_clientdb); + /* + * If this is not the verify record I want, skip it. + */ + if (cmp != 0) { + ret = __rep_skip_msg( + env, rep, eid, rp->rectype); + break; + } + } + CLIENT_ONLY(rep, rp); + ret = __rep_verify(env, rp, rec, eid, savetime); + break; + case REP_VERIFY_FAIL: + /* + * Handle even if we're recovering. + */ + CLIENT_ONLY(rep, rp); + ret = __rep_verify_fail(env, rp); + break; + case REP_VERIFY_REQ: + RECOVERING_SKIP; + ret = __rep_verify_req(env, rp, eid); + CLIENT_REREQ; + break; + case REP_VOTE1: + /* + * Handle even if we're recovering. + */ + ret = __rep_vote1(env, rp, rec, eid); + break; + case REP_VOTE2: + /* + * Handle even if we're recovering. + */ + ret = __rep_vote2(env, rp, rec, eid); + break; + default: + __db_errx(env, + "DB_ENV->rep_process_message: unknown replication message: type %lu", + (u_long)rp->rectype); + ret = EINVAL; + break; + } + +errlock: + REP_SYSTEM_LOCK(env); +errhlk: if (lockout) + F_CLR(rep, REP_F_READY_MSG); + rep->msg_th--; + REP_SYSTEM_UNLOCK(env); + if (do_sync) { + MUTEX_LOCK(env, rep->mtx_ckp); + lsn = rp->lsn; + /* + * This is the REP_START_SYNC sync, and so we permit it to be + * interrupted. + */ + ret = __memp_sync( + env, DB_SYNC_CHECKPOINT | DB_SYNC_INTERRUPT_OK, &lsn); + MUTEX_UNLOCK(env, rep->mtx_ckp); + RPRINT(env, DB_VERB_REP_MSGS, + (env, "ALIVE: Completed sync [%lu][%lu]", + (u_long)lsn.file, (u_long)lsn.offset)); + } +out: + if (ret == 0 && F_ISSET(rp, REPCTL_PERM)) { + if (ret_lsnp != NULL) + *ret_lsnp = rp->lsn; + ret = DB_REP_NOTPERM; + } + __dbt_userfree(env, control, rec, NULL); + ENV_LEAVE(env, ip); + return (ret); +} + +/* + * __rep_apply -- + * + * Handle incoming log records on a client, applying when possible and + * entering into the bookkeeping table otherwise. This routine manages + * the state of the incoming message stream -- processing records, via + * __rep_process_rec, when possible and enqueuing in the __db.rep.db + * when necessary. As gaps in the stream are filled in, this is where + * we try to process as much as possible from __db.rep.db to catch up. + * + * PUBLIC: int __rep_apply __P((ENV *, DB_THREAD_INFO *, __rep_control_args *, + * PUBLIC: DBT *, DB_LSN *, int *, DB_LSN *)); + */ +int +__rep_apply(env, ip, rp, rec, ret_lsnp, is_dupp, last_lsnp) + ENV *env; + DB_THREAD_INFO *ip; + __rep_control_args *rp; + DBT *rec; + DB_LSN *ret_lsnp; + int *is_dupp; + DB_LSN *last_lsnp; +{ + DB *dbp; + DBT control_dbt, key_dbt; + DBT rec_dbt; + DB_LOG *dblp; + DB_LSN max_lsn, save_lsn; + DB_REP *db_rep; + LOG *lp; + REP *rep; + db_timespec msg_time, max_ts; + u_int32_t gen, rectype; + int cmp, event, master, newfile_seen, ret, set_apply, t_ret; + + COMPQUIET(gen, 0); + COMPQUIET(master, DB_EID_INVALID); + + db_rep = env->rep_handle; + rep = db_rep->region; + event = ret = set_apply = 0; + memset(&control_dbt, 0, sizeof(control_dbt)); + memset(&rec_dbt, 0, sizeof(rec_dbt)); + ZERO_LSN(max_lsn); + timespecclear(&max_ts); + timespecset(&msg_time, rp->msg_sec, rp->msg_nsec); + cmp = -2; /* OOB value that LOG_COMPARE can't return. */ + + dblp = env->lg_handle; + MUTEX_LOCK(env, rep->mtx_clientdb); + /* + * Lazily open the temp db. Always set the startup flag to 0 + * because it was initialized from rep_start. + */ + if (db_rep->rep_db == NULL && + (ret = __rep_client_dbinit(env, 0, REP_DB)) != 0) { + MUTEX_UNLOCK(env, rep->mtx_clientdb); + goto out; + } + dbp = db_rep->rep_db; + lp = dblp->reginfo.primary; + newfile_seen = 0; + REP_SYSTEM_LOCK(env); + if (F_ISSET(rep, REP_F_RECOVER_LOG) && + LOG_COMPARE(&lp->ready_lsn, &rep->first_lsn) < 0) + lp->ready_lsn = rep->first_lsn; + cmp = LOG_COMPARE(&rp->lsn, &lp->ready_lsn); + /* + * If we are going to skip or process any message other + * than a duplicate, make note of it if we're in an + * election so that the election can rerequest proactively. + */ + if (F_ISSET(rep, REP_F_READY_APPLY) && cmp >= 0) + F_SET(rep, REP_F_SKIPPED_APPLY); + + /* + * If we're in the middle of processing a NEWFILE, we've dropped + * the mutex and if this matches it is a duplicate record. We + * do not want this call taking the "matching" code below because + * we may then process later records in the temp db and the + * original NEWFILE may not have the log file ready. It will + * process those temp db items when it completes. + */ + if (F_ISSET(rep, REP_F_NEWFILE) && cmp == 0) + cmp = -1; + + if (cmp == 0) { + /* + * If we are in an election (i.e. we've sent a vote + * with an LSN in it), then we drop the next record + * we're expecting. When we find a master, we'll + * either go into sync, or if it was an existing + * master, rerequest this one record (later records + * are accumulating in the temp db). + * + * We can simply return here, and rep_process_message + * will set NOTPERM if necessary for this record. + */ + if (F_ISSET(rep, REP_F_READY_APPLY)) { + /* + * We will simply return now. All special return + * processing should be ignored because the special + * values are just initialized. Variables like + * max_lsn are still 0. + */ + RPRINT(env, DB_VERB_REP_MISC, (env, + "rep_apply: In election. Ignoring [%lu][%lu]", + (u_long)rp->lsn.file, (u_long)rp->lsn.offset)); + REP_SYSTEM_UNLOCK(env); + MUTEX_UNLOCK(env, rep->mtx_clientdb); + goto out; + } + rep->apply_th++; + set_apply = 1; + RPRINT(env, DB_VERB_REP_MISC, (env, + "rep_apply: Set apply_th %d", rep->apply_th)); + REP_SYSTEM_UNLOCK(env); + if (rp->rectype == REP_NEWFILE) + newfile_seen = 1; + if ((ret = __rep_process_rec(env, ip, + rp, rec, &max_ts, &max_lsn)) != 0) + goto err; + /* + * If we get the record we are expecting, reset + * the count of records we've received and are applying + * towards the request interval. + */ + __os_gettime(env, &lp->rcvd_ts, 1); + ZERO_LSN(lp->max_wait_lsn); + + /* + * The __rep_remfirst() and __rep_getnext() functions each open, + * use and then close a cursor on the temp db, each time through + * the loop. Although this may seem excessive, it is necessary + * to avoid locking problems with checkpoints. + */ + while (ret == 0 && + LOG_COMPARE(&lp->ready_lsn, &lp->waiting_lsn) == 0) { + /* + * We just filled in a gap in the log record stream. + * Write subsequent records to the log. + */ +gap_check: + if ((ret = __rep_remfirst(env, ip, + &control_dbt, &rec_dbt)) != 0) + goto err; + + rp = (__rep_control_args *)control_dbt.data; + timespecset(&msg_time, rp->msg_sec, rp->msg_nsec); + rec = &rec_dbt; + if (rp->rectype == REP_NEWFILE) + newfile_seen = 1; + if ((ret = __rep_process_rec(env, ip, + rp, rec, &max_ts, &max_lsn)) != 0) + goto err; + + --rep->stat.st_log_queued; + + /* + * Since we just filled a gap in the log stream, and + * we're writing subsequent records to the log, we want + * to use rcvd_ts and wait_ts so that we will + * request the next gap if we end up with a gap and + * not so recent records in the temp db, but not + * request if recent records are in the temp db and + * likely to arrive on its own shortly. We want to + * avoid requesting the record in that case. Also + * reset max_wait_lsn because the next gap is a + * fresh gap. + */ + lp->rcvd_ts = lp->last_ts; + lp->wait_ts = rep->request_gap; + if ((ret = __rep_getnext(env, ip)) == DB_NOTFOUND) { + __os_gettime(env, &lp->rcvd_ts, 1); + ret = 0; + break; + } else if (ret != 0) + goto err; + } + + /* + * Check if we're at a gap in the table and if so, whether we + * need to ask for any records. + */ + if (!IS_ZERO_LSN(lp->waiting_lsn) && + LOG_COMPARE(&lp->ready_lsn, &lp->waiting_lsn) != 0) { + /* + * We got a record and processed it, but we may + * still be waiting for more records. If we + * filled a gap we keep a count of how many other + * records are in the temp database and if we should + * request the next gap at this time. + */ + if (__rep_check_doreq(env, rep) && (ret = + __rep_loggap_req(env, rep, &rp->lsn, 0)) != 0) + goto err; + } else { + lp->wait_ts = rep->request_gap; + ZERO_LSN(lp->max_wait_lsn); + } + + } else if (cmp > 0) { + /* + * The LSN is higher than the one we were waiting for. + * This record isn't in sequence; add it to the temporary + * database, update waiting_lsn if necessary, and perform + * calculations to determine if we should issue requests + * for new records. + */ + REP_SYSTEM_UNLOCK(env); + memset(&key_dbt, 0, sizeof(key_dbt)); + key_dbt.data = rp; + key_dbt.size = sizeof(*rp); + ret = __db_put(dbp, ip, NULL, &key_dbt, rec, DB_NOOVERWRITE); + if (ret == 0) { + rep->stat.st_log_queued++; + __os_gettime(env, &lp->last_ts, 1); +#ifdef HAVE_STATISTICS + STAT(rep->stat.st_log_queued_total++); + if (rep->stat.st_log_queued_max < + rep->stat.st_log_queued) + rep->stat.st_log_queued_max = + rep->stat.st_log_queued; +#endif + } + + if (ret == DB_KEYEXIST) + ret = 0; + if (ret != 0) + goto done; + + if (IS_ZERO_LSN(lp->waiting_lsn) || + LOG_COMPARE(&rp->lsn, &lp->waiting_lsn) < 0) { + /* + * If this is a new gap, then reset the rcvd_ts so + * that an out-of-order record after an idle period + * does not (likely) immediately rerequest. + */ + if (IS_ZERO_LSN(lp->waiting_lsn)) + __os_gettime(env, &lp->rcvd_ts, 1); + lp->waiting_lsn = rp->lsn; + } + + if (__rep_check_doreq(env, rep) && + (ret = __rep_loggap_req(env, rep, &rp->lsn, 0) != 0)) + goto err; + + /* + * If this is permanent; let the caller know that we have + * not yet written it to disk, but we've accepted it. + */ + if (ret == 0 && F_ISSET(rp, REPCTL_PERM)) { + max_lsn = rp->lsn; + ret = DB_REP_NOTPERM; + } + goto done; + } else { + STAT(rep->stat.st_log_duplicated++); + REP_SYSTEM_UNLOCK(env); + if (is_dupp != NULL) + *is_dupp = 1; + LOGCOPY_32(env, &rectype, rec->data); + if (rectype == DB___txn_regop || rectype == DB___txn_ckp) + max_lsn = lp->max_perm_lsn; + /* + * We check REPCTL_LEASE here, because this client may + * have leases configured but the master may not (especially + * in a mixed version group. If the master has leases + * configured, all clients must also. + */ + if (IS_USING_LEASES(env) && + F_ISSET(rp, REPCTL_LEASE) && + timespecisset(&msg_time)) { + if (timespeccmp(&msg_time, &lp->max_lease_ts, >)) + max_ts = msg_time; + else + max_ts = lp->max_lease_ts; + } + goto done; + } + + /* Check if we need to go back into the table. */ + if (ret == 0 && LOG_COMPARE(&lp->ready_lsn, &lp->waiting_lsn) == 0) + goto gap_check; + +done: +err: /* + * In case of a race, to make sure only one thread can get + * DB_REP_LOGREADY, zero out rep->last_lsn to show that we've gotten to + * this point. + */ + REP_SYSTEM_LOCK(env); + if (ret == 0 && + F_ISSET(rep, REP_F_RECOVER_LOG) && + !IS_ZERO_LSN(rep->last_lsn) && + LOG_COMPARE(&lp->ready_lsn, &rep->last_lsn) >= 0) { + *last_lsnp = max_lsn; + ZERO_LSN(rep->last_lsn); + ZERO_LSN(max_lsn); + ret = DB_REP_LOGREADY; + } + /* + * Only decrement if we were actually applying log records. + * We do not care if we processed a dup record or put one + * in the temp db. + */ + if (set_apply) { + rep->apply_th--; + RPRINT(env, DB_VERB_REP_MISC, (env, + "rep_apply: Decrement apply_th %d [%lu][%lu]", + rep->apply_th, (u_long)lp->ready_lsn.file, + (u_long)lp->ready_lsn.offset)); + } + + if (ret == 0 && !F_ISSET(rep, REP_F_RECOVER_LOG) && + !IS_ZERO_LSN(max_lsn)) { + if (ret_lsnp != NULL) + *ret_lsnp = max_lsn; + ret = DB_REP_ISPERM; + DB_ASSERT(env, LOG_COMPARE(&max_lsn, &lp->max_perm_lsn) >= 0); + lp->max_perm_lsn = max_lsn; + } + + /* + * Start-up is complete when we process (or have already processed) up + * to the end of the replication group's log. In case we miss that + * message, as a back-up, we also recognize start-up completion when we + * actually process a live log record. Having cmp==0 here (with a good + * "ret" value) implies we actually processed the record. + */ + if ((ret == 0 || ret == DB_REP_ISPERM) && + rep->stat.st_startup_complete == 0 && + !F_ISSET(rep, REP_F_RECOVER_LOG) && + ((cmp <= 0 && F_ISSET(rp, REPCTL_LOG_END)) || + (cmp == 0 && !F_ISSET(rp, REPCTL_RESEND)))) { + rep->stat.st_startup_complete = 1; + event = 1; + gen = rep->gen; + master = rep->master_id; + } + REP_SYSTEM_UNLOCK(env); + /* + * If we've processed beyond the needed LSN for a pending + * start sync, start it now. We can compare >= here + * because ready_lsn is the next record we expect. + * Since ckp_lsn can point to the last commit record itself, + * but if it does and ready_lsn == commit (i.e. we haven't + * written the commit yet), we can still start to sync + * because we're guaranteed no additional buffers can + * be dirtied. + */ + if (!IS_ZERO_LSN(rep->ckp_lsn) && + LOG_COMPARE(&lp->ready_lsn, &rep->ckp_lsn) >= 0) { + save_lsn = rep->ckp_lsn; + ZERO_LSN(rep->ckp_lsn); + } else + ZERO_LSN(save_lsn); + + /* + * If this is a perm record, we are using leases, update the lease + * grant. We must hold the clientdb mutex. We must not hold + * the region mutex because rep_update_grant will acquire it. + */ + if (ret == DB_REP_ISPERM && IS_USING_LEASES(env) && + timespecisset(&max_ts)) { + if ((t_ret = __rep_update_grant(env, &max_ts)) != 0) + ret = t_ret; + else if (timespeccmp(&max_ts, &lp->max_lease_ts, >)) + lp->max_lease_ts = max_ts; + } + + MUTEX_UNLOCK(env, rep->mtx_clientdb); + if (!IS_ZERO_LSN(save_lsn)) { + /* + * Now call memp_sync holding only the ckp mutex. + */ + MUTEX_LOCK(env, rep->mtx_ckp); + RPRINT(env, DB_VERB_REP_MISC, (env, + "Starting delayed __memp_sync call [%lu][%lu]", + (u_long)save_lsn.file, (u_long)save_lsn.offset)); + t_ret = __memp_sync(env, + DB_SYNC_CHECKPOINT | DB_SYNC_INTERRUPT_OK, &save_lsn); + MUTEX_UNLOCK(env, rep->mtx_ckp); + } + if (event) { + RPRINT(env, DB_VERB_REP_MISC, (env, + "Start-up is done [%lu][%lu]", + (u_long)rp->lsn.file, (u_long)rp->lsn.offset)); + + if ((t_ret = __rep_fire_startupdone(env, gen, master)) != 0) { + DB_ASSERT(env, ret == 0 || ret == DB_REP_ISPERM); + /* Failure trumps either of those values. */ + ret = t_ret; + goto out; + } + } + if ((ret == 0 || ret == DB_REP_ISPERM) && + newfile_seen && lp->db_log_autoremove) + __log_autoremove(env); + if (control_dbt.data != NULL) + __os_ufree(env, control_dbt.data); + if (rec_dbt.data != NULL) + __os_ufree(env, rec_dbt.data); + +out: + switch (ret) { + case 0: + break; + case DB_REP_ISPERM: + RPRINT(env, DB_VERB_REP_MSGS, + (env, "Returning ISPERM [%lu][%lu], cmp = %d", + (u_long)max_lsn.file, (u_long)max_lsn.offset, cmp)); + break; + case DB_REP_LOGREADY: + RPRINT(env, DB_VERB_REP_MSGS, (env, + "Returning LOGREADY up to [%lu][%lu], cmp = %d", + (u_long)last_lsnp->file, + (u_long)last_lsnp->offset, cmp)); + break; + case DB_REP_NOTPERM: + if (!F_ISSET(rep, REP_F_RECOVER_LOG) && + !IS_ZERO_LSN(max_lsn) && ret_lsnp != NULL) + *ret_lsnp = max_lsn; + + RPRINT(env, DB_VERB_REP_MSGS, + (env, "Returning NOTPERM [%lu][%lu], cmp = %d", + (u_long)max_lsn.file, (u_long)max_lsn.offset, cmp)); + break; + default: + RPRINT(env, DB_VERB_REP_MSGS, + (env, "Returning %d [%lu][%lu], cmp = %d", ret, + (u_long)max_lsn.file, (u_long)max_lsn.offset, cmp)); + break; + } + + return (ret); +} + +/* + * __rep_process_txn -- + * + * This is the routine that actually gets a transaction ready for + * processing. + * + * PUBLIC: int __rep_process_txn __P((ENV *, DBT *)); + */ +int +__rep_process_txn(env, rec) + ENV *env; + DBT *rec; +{ + DBT data_dbt, *lock_dbt; + DB_LOCKER *locker; + DB_LOCKREQ req, *lvp; + DB_LOGC *logc; + DB_LSN prev_lsn, *lsnp; + DB_REP *db_rep; + DB_THREAD_INFO *ip; + DB_TXNHEAD *txninfo; + LSN_COLLECTION lc; + REP *rep; + __txn_regop_args *txn_args; + __txn_regop_42_args *txn42_args; + __txn_prepare_args *prep_args; + u_int32_t rectype; + u_int i; + int ret, t_ret; + + db_rep = env->rep_handle; + rep = db_rep->region; + logc = NULL; + txn_args = NULL; + txn42_args = NULL; + prep_args = NULL; + txninfo = NULL; + + ENV_ENTER(env, ip); + memset(&data_dbt, 0, sizeof(data_dbt)); + if (F_ISSET(env, ENV_THREAD)) + F_SET(&data_dbt, DB_DBT_REALLOC); + + /* + * There are two phases: First, we have to traverse backwards through + * the log records gathering the list of all LSNs in the transaction. + * Once we have this information, we can loop through and then apply it. + * + * We may be passed a prepare (if we're restoring a prepare on upgrade) + * instead of a commit (the common case). Check which it is and behave + * appropriately. + */ + LOGCOPY_32(env, &rectype, rec->data); + memset(&lc, 0, sizeof(lc)); + if (rectype == DB___txn_regop) { + /* + * We're the end of a transaction. Make sure this is + * really a commit and not an abort! + */ + if (rep->version >= DB_REPVERSION_44) { + if ((ret = __txn_regop_read( + env, rec->data, &txn_args)) != 0) + return (ret); + if (txn_args->opcode != TXN_COMMIT) { + __os_free(env, txn_args); + return (0); + } + prev_lsn = txn_args->prev_lsn; + lock_dbt = &txn_args->locks; + } else { + if ((ret = __txn_regop_42_read( + env, rec->data, &txn42_args)) != 0) + return (ret); + if (txn42_args->opcode != TXN_COMMIT) { + __os_free(env, txn42_args); + return (0); + } + prev_lsn = txn42_args->prev_lsn; + lock_dbt = &txn42_args->locks; + } + } else { + /* We're a prepare. */ + DB_ASSERT(env, rectype == DB___txn_prepare); + + if ((ret = __txn_prepare_read( + env, rec->data, &prep_args)) != 0) + return (ret); + prev_lsn = prep_args->prev_lsn; + lock_dbt = &prep_args->locks; + } + + /* Get locks. */ + if ((ret = __lock_id(env, NULL, &locker)) != 0) + goto err1; + + if ((ret = + __lock_get_list(env, locker, 0, DB_LOCK_WRITE, lock_dbt)) != 0) + goto err; + + /* Phase 1. Get a list of the LSNs in this transaction, and sort it. */ + if ((ret = __rep_collect_txn(env, &prev_lsn, &lc)) != 0) + goto err; + qsort(lc.array, lc.nlsns, sizeof(DB_LSN), __rep_lsn_cmp); + + /* + * The set of records for a transaction may include dbreg_register + * records. Create a txnlist so that they can keep track of file + * state between records. + */ + if ((ret = __db_txnlist_init(env, ip, 0, 0, NULL, &txninfo)) != 0) + goto err; + + /* Phase 2: Apply updates. */ + if ((ret = __log_cursor(env, &logc)) != 0) + goto err; + for (lsnp = &lc.array[0], i = 0; i < lc.nlsns; i++, lsnp++) { + if ((ret = __logc_get(logc, lsnp, &data_dbt, DB_SET)) != 0) { + __db_errx(env, "failed to read the log at [%lu][%lu]", + (u_long)lsnp->file, (u_long)lsnp->offset); + goto err; + } + if ((ret = __db_dispatch(env, &env->recover_dtab, + &data_dbt, lsnp, DB_TXN_APPLY, txninfo)) != 0) { + __db_errx(env, "transaction failed at [%lu][%lu]", + (u_long)lsnp->file, (u_long)lsnp->offset); + goto err; + } + } + +err: memset(&req, 0, sizeof(req)); + req.op = DB_LOCK_PUT_ALL; + if ((t_ret = + __lock_vec(env, locker, 0, &req, 1, &lvp)) != 0 && ret == 0) + ret = t_ret; + + if ((t_ret = __lock_id_free(env, locker)) != 0 && ret == 0) + ret = t_ret; + +err1: if (txn_args != NULL) + __os_free(env, txn_args); + if (txn42_args != NULL) + __os_free(env, txn42_args); + if (prep_args != NULL) + __os_free(env, prep_args); + if (lc.array != NULL) + __os_free(env, lc.array); + + if (logc != NULL && (t_ret = __logc_close(logc)) != 0 && ret == 0) + ret = t_ret; + + if (txninfo != NULL) + __db_txnlist_end(env, txninfo); + + if (F_ISSET(&data_dbt, DB_DBT_REALLOC) && data_dbt.data != NULL) + __os_ufree(env, data_dbt.data); + +#ifdef HAVE_STATISTICS + if (ret == 0) + /* + * We don't hold the rep mutex, and could miscount if we race. + */ + rep->stat.st_txns_applied++; +#endif + + return (ret); +} + +/* + * __rep_collect_txn + * Recursive function that will let us visit every entry in a transaction + * chain including all child transactions so that we can then apply + * the entire transaction family at once. + */ +static int +__rep_collect_txn(env, lsnp, lc) + ENV *env; + DB_LSN *lsnp; + LSN_COLLECTION *lc; +{ + __txn_child_args *argp; + DB_LOGC *logc; + DB_LSN c_lsn; + DBT data; + u_int32_t rectype; + u_int nalloc; + int ret, t_ret; + + memset(&data, 0, sizeof(data)); + F_SET(&data, DB_DBT_REALLOC); + + if ((ret = __log_cursor(env, &logc)) != 0) + return (ret); + + while (!IS_ZERO_LSN(*lsnp) && + (ret = __logc_get(logc, lsnp, &data, DB_SET)) == 0) { + LOGCOPY_32(env, &rectype, data.data); + if (rectype == DB___txn_child) { + if ((ret = __txn_child_read( + env, data.data, &argp)) != 0) + goto err; + c_lsn = argp->c_lsn; + *lsnp = argp->prev_lsn; + __os_free(env, argp); + ret = __rep_collect_txn(env, &c_lsn, lc); + } else { + if (lc->nalloc < lc->nlsns + 1) { + nalloc = lc->nalloc == 0 ? 20 : lc->nalloc * 2; + if ((ret = __os_realloc(env, + nalloc * sizeof(DB_LSN), &lc->array)) != 0) + goto err; + lc->nalloc = nalloc; + } + lc->array[lc->nlsns++] = *lsnp; + + /* + * Explicitly copy the previous lsn. The record + * starts with a u_int32_t record type, a u_int32_t + * txn id, and then the DB_LSN (prev_lsn) that we + * want. We copy explicitly because we have no idea + * what kind of record this is. + */ + LOGCOPY_TOLSN(env, lsnp, (u_int8_t *)data.data + + sizeof(u_int32_t) + sizeof(u_int32_t)); + } + + if (ret != 0) + goto err; + } + if (ret != 0) + __db_errx(env, "collect failed at: [%lu][%lu]", + (u_long)lsnp->file, (u_long)lsnp->offset); + +err: if ((t_ret = __logc_close(logc)) != 0 && ret == 0) + ret = t_ret; + if (data.data != NULL) + __os_ufree(env, data.data); + return (ret); +} + +/* + * __rep_lsn_cmp -- + * qsort-type-compatible wrapper for LOG_COMPARE. + */ +static int +__rep_lsn_cmp(lsn1, lsn2) + const void *lsn1, *lsn2; +{ + + return (LOG_COMPARE((DB_LSN *)lsn1, (DB_LSN *)lsn2)); +} + +/* + * __rep_newfile -- + * NEWFILE messages have the LSN of the last record in the previous + * log file. When applying a NEWFILE message, make sure we haven't already + * swapped files. Assume caller hold mtx_clientdb. + */ +static int +__rep_newfile(env, rp, rec) + ENV *env; + __rep_control_args *rp; + DBT *rec; +{ + DB_LOG *dblp; + DB_LSN tmplsn; + DB_REP *db_rep; + LOG *lp; + REP *rep; + __rep_newfile_args nf_args; + int ret; + + dblp = env->lg_handle; + lp = dblp->reginfo.primary; + db_rep = env->rep_handle; + rep = db_rep->region; + + /* + * If a newfile is already in progress, just ignore. + */ + if (F_ISSET(rep, REP_F_NEWFILE)) + return (0); + if (rp->lsn.file + 1 > lp->ready_lsn.file) { + if (rec == NULL || rec->size == 0) { + RPRINT(env, DB_VERB_REP_MISC, (env, +"rep_newfile: Old-style NEWFILE msg. Use control msg log version: %lu", + (u_long) rp->log_version)); + nf_args.version = rp->log_version; + } else if (rp->rep_version < DB_REPVERSION_47) + nf_args.version = *(u_int32_t *)rec->data; + else if ((ret = __rep_newfile_unmarshal(env, &nf_args, + rec->data, rec->size, NULL)) != 0) + return (ret); + RPRINT(env, DB_VERB_REP_MISC, + (env, "rep_newfile: File %lu vers %lu", + (u_long)rp->lsn.file + 1, (u_long)nf_args.version)); + + /* + * We drop the mtx_clientdb mutex during + * the file operation, and then reacquire it when + * we're done. We avoid colliding with new incoming + * log records because lp->ready_lsn is not getting + * updated and there is no real log record at this + * ready_lsn. We avoid colliding with a duplicate + * NEWFILE message by setting an in-progress flag. + */ + REP_SYSTEM_LOCK(env); + F_SET(rep, REP_F_NEWFILE); + REP_SYSTEM_UNLOCK(env); + MUTEX_UNLOCK(env, rep->mtx_clientdb); + LOG_SYSTEM_LOCK(env); + ret = __log_newfile(dblp, &tmplsn, 0, nf_args.version); + LOG_SYSTEM_UNLOCK(env); + MUTEX_LOCK(env, rep->mtx_clientdb); + REP_SYSTEM_LOCK(env); + F_CLR(rep, REP_F_NEWFILE); + REP_SYSTEM_UNLOCK(env); + if (ret == 0) + lp->ready_lsn = tmplsn; + return (ret); + } else + /* We've already applied this NEWFILE. Just ignore it. */ + return (0); +} + +/* + * __rep_do_ckp -- + * Perform the memp_sync necessary for this checkpoint without holding the + * REP->mtx_clientdb. Callers of this function must hold REP->mtx_clientdb + * and must not be holding the region mutex. + */ +static int +__rep_do_ckp(env, rec, rp) + ENV *env; + DBT *rec; + __rep_control_args *rp; +{ + DB_ENV *dbenv; + __txn_ckp_args *ckp_args; + DB_LSN ckp_lsn; + REP *rep; + int ret; + + dbenv = env->dbenv; + + /* Crack the log record and extract the checkpoint LSN. */ + if ((ret = __txn_ckp_read(env, rec->data, &ckp_args)) != 0) + return (ret); + ckp_lsn = ckp_args->ckp_lsn; + __os_free(env, ckp_args); + + rep = env->rep_handle->region; + + MUTEX_UNLOCK(env, rep->mtx_clientdb); + DB_TEST_WAIT(env, env->test_check); + + /* + * Sync the memory pool. + * + * This is the real PERM lock record/ckp. We cannot return ISPERM + * if we haven't truly completed the checkpoint, so we don't allow + * this call to be interrupted. + * + * We may be overlapping our log record with an in-progress startsync + * of this checkpoint; suppress the max_write settings on any running + * cache-flush operation so it completes quickly. + */ + (void)__memp_set_config(dbenv, DB_MEMP_SUPPRESS_WRITE, 1); + MUTEX_LOCK(env, rep->mtx_ckp); + ret = __memp_sync(env, DB_SYNC_CHECKPOINT, &ckp_lsn); + MUTEX_UNLOCK(env, rep->mtx_ckp); + (void)__memp_set_config(dbenv, DB_MEMP_SUPPRESS_WRITE, 0); + + /* Update the last_ckp in the txn region. */ + if (ret == 0) + ret = __txn_updateckp(env, &rp->lsn); + else { + __db_errx(env, "Error syncing ckp [%lu][%lu]", + (u_long)ckp_lsn.file, (u_long)ckp_lsn.offset); + ret = __env_panic(env, ret); + } + + MUTEX_LOCK(env, rep->mtx_clientdb); + return (ret); +} + +/* + * __rep_remfirst -- + * Remove the first entry from the __db.rep.db + */ +static int +__rep_remfirst(env, ip, cntrl, rec) + ENV *env; + DB_THREAD_INFO *ip; + DBT *cntrl; + DBT *rec; +{ + DB *dbp; + DBC *dbc; + DB_REP *db_rep; + int ret, t_ret; + + db_rep = env->rep_handle; + dbp = db_rep->rep_db; + if ((ret = __db_cursor(dbp, ip, NULL, &dbc, 0)) != 0) + return (ret); + + /* The DBTs need to persist through another call. */ + F_SET(cntrl, DB_DBT_REALLOC); + F_SET(rec, DB_DBT_REALLOC); + if ((ret = __dbc_get(dbc, cntrl, rec, DB_RMW | DB_FIRST)) == 0) + ret = __dbc_del(dbc, 0); + if ((t_ret = __dbc_close(dbc)) != 0 && ret == 0) + ret = t_ret; + + return (ret); +} + +/* + * __rep_getnext -- + * Get the next record out of the __db.rep.db table. + */ +static int +__rep_getnext(env, ip) + ENV *env; + DB_THREAD_INFO *ip; +{ + DB *dbp; + DBC *dbc; + DBT lsn_dbt, nextrec_dbt; + DB_LOG *dblp; + DB_REP *db_rep; + LOG *lp; + __rep_control_args *rp; + int ret, t_ret; + + dblp = env->lg_handle; + lp = dblp->reginfo.primary; + + db_rep = env->rep_handle; + dbp = db_rep->rep_db; + + if ((ret = __db_cursor(dbp, ip, NULL, &dbc, 0)) != 0) + return (ret); + + /* + * Update waiting_lsn. We need to move it + * forward to the LSN of the next record + * in the queue. + * + * If the next item in the database is a log + * record--the common case--we're not + * interested in its contents, just in its LSN. + * Optimize by doing a partial get of the data item. + */ + memset(&nextrec_dbt, 0, sizeof(nextrec_dbt)); + F_SET(&nextrec_dbt, DB_DBT_PARTIAL); + nextrec_dbt.ulen = nextrec_dbt.dlen = 0; + + memset(&lsn_dbt, 0, sizeof(lsn_dbt)); + ret = __dbc_get(dbc, &lsn_dbt, &nextrec_dbt, DB_FIRST); + if (ret != DB_NOTFOUND && ret != 0) + goto err; + + if (ret == DB_NOTFOUND) { + ZERO_LSN(lp->waiting_lsn); + /* + * Whether or not the current record is + * simple, there's no next one, and + * therefore we haven't got anything + * else to do right now. Break out. + */ + goto err; + } + rp = (__rep_control_args *)lsn_dbt.data; + lp->waiting_lsn = rp->lsn; + +err: if ((t_ret = __dbc_close(dbc)) != 0 && ret == 0) + ret = t_ret; + return (ret); +} + +/* + * __rep_process_rec -- + * + * Given a record in 'rp', process it. In the case of a NEWFILE, that means + * potentially switching files. In the case of a checkpoint, it means doing + * the checkpoint, and in other cases, it means simply writing the record into + * the log. + */ +static int +__rep_process_rec(env, ip, rp, rec, ret_tsp, ret_lsnp) + ENV *env; + DB_THREAD_INFO *ip; + __rep_control_args *rp; + DBT *rec; + db_timespec *ret_tsp; + DB_LSN *ret_lsnp; +{ + DB *dbp; + DBT control_dbt, key_dbt, rec_dbt; + DB_REP *db_rep; + REP *rep; + db_timespec msg_time; + u_int32_t rectype, txnid; + int ret, t_ret; + + db_rep = env->rep_handle; + rep = db_rep->region; + dbp = db_rep->rep_db; + ret = 0; + + if (rp->rectype == REP_NEWFILE) { + ret = __rep_newfile(env, rp, rec); + return (0); + } + + LOGCOPY_32(env, &rectype, rec->data); + memset(&control_dbt, 0, sizeof(control_dbt)); + memset(&rec_dbt, 0, sizeof(rec_dbt)); + timespecset(&msg_time, rp->msg_sec, rp->msg_nsec); + + /* + * We write all records except for checkpoint records here. + * All non-checkpoint records need to appear in the log before + * we take action upon them (i.e., we enforce write-ahead logging). + * However, we can't write the checkpoint record here until the + * data buffers are actually written to disk, else we are creating + * an invalid log -- one that says all data before a certain point + * has been written to disk. + * + * If two threads are both processing the same checkpoint record + * (because, for example, it was resent and the original finally + * arrived), we handle that below by checking for the existence of + * the log record when we add it to the replication database. + * + * Any log records that arrive while we are processing the checkpoint + * are added to the bookkeeping database because ready_lsn is not yet + * updated to point after the checkpoint record. + */ + if (rectype != DB___txn_ckp || F_ISSET(rep, REP_F_RECOVER_LOG)) { + if ((ret = __log_rep_put(env, &rp->lsn, rec, 0)) != 0) + return (ret); + STAT(rep->stat.st_log_records++); + if (F_ISSET(rep, REP_F_RECOVER_LOG)) { + *ret_lsnp = rp->lsn; + goto out; + } + } + + switch (rectype) { + case DB___dbreg_register: + /* + * DB opens occur in the context of a transaction, so we can + * simply handle them when we process the transaction. Closes, + * however, are not transaction-protected, so we have to handle + * them here. + * + * It should be unsafe for the master to do a close of a file + * that was opened in an active transaction, so we should be + * guaranteed to get the ordering right. + * + * !!! + * The txn ID is the second 4-byte field of the log record. + * We should really be calling __dbreg_register_read() and + * working from the __dbreg_register_args structure, but this + * is considerably faster and the order of the fields won't + * change. + */ + LOGCOPY_32(env, &txnid, + (u_int8_t *)rec->data + sizeof(u_int32_t)); + if (txnid == TXN_INVALID) + ret = __db_dispatch(env, &env->recover_dtab, + rec, &rp->lsn, DB_TXN_APPLY, NULL); + break; + case DB___txn_regop: + /* + * If an application is doing app-specific recovery + * and acquires locks while applying a transaction, + * it can deadlock. Any other locks held by this + * thread should have been discarded in the + * __rep_process_txn error path, so if we simply + * retry, we should eventually succeed. + */ + do { + ret = 0; + if (!F_ISSET(db_rep, DBREP_OPENFILES)) { + ret = __txn_openfiles(env, ip, NULL, 1); + F_SET(db_rep, DBREP_OPENFILES); + } + if (ret == 0) + ret = __rep_process_txn(env, rec); + } while (ret == DB_LOCK_DEADLOCK || ret == DB_LOCK_NOTGRANTED); + + /* Now flush the log unless we're running TXN_NOSYNC. */ + if (ret == 0 && !F_ISSET(env->dbenv, DB_ENV_TXN_NOSYNC)) + ret = __log_flush(env, NULL); + if (ret != 0) { + __db_errx(env, "Error processing txn [%lu][%lu]", + (u_long)rp->lsn.file, (u_long)rp->lsn.offset); + ret = __env_panic(env, ret); + } + *ret_lsnp = rp->lsn; + break; + case DB___txn_prepare: + ret = __log_flush(env, NULL); + /* + * Save the biggest prepared LSN we've seen. + */ + rep->max_prep_lsn = rp->lsn; + RPRINT(env, DB_VERB_REP_MSGS, + (env, "process_rec: prepare at [%lu][%lu]", + (u_long)rep->max_prep_lsn.file, + (u_long)rep->max_prep_lsn.offset)); + break; + case DB___txn_ckp: + /* + * We do not want to hold the REP->mtx_clientdb mutex while + * syncing the mpool, so if we get a checkpoint record we are + * supposed to process, add it to the __db.rep.db, do the + * memp_sync and then go back and process it later, when the + * sync has finished. If this record is already in the table, + * then some other thread will process it, so simply return + * REP_NOTPERM. + */ + memset(&key_dbt, 0, sizeof(key_dbt)); + key_dbt.data = rp; + key_dbt.size = sizeof(*rp); + + /* + * We want to put this record into the tmp DB only if + * it doesn't exist, so use DB_NOOVERWRITE. + */ + ret = __db_put(dbp, ip, NULL, &key_dbt, rec, DB_NOOVERWRITE); + if (ret == DB_KEYEXIST) { + if (ret_lsnp != NULL) + *ret_lsnp = rp->lsn; + ret = DB_REP_NOTPERM; + } + if (ret != 0) + break; + + /* + * Now, do the checkpoint. Regardless of + * whether the checkpoint succeeds or not, + * we need to remove the record we just put + * in the temporary database. If the + * checkpoint failed, return an error. We + * will act like we never received the + * checkpoint. + */ + if ((ret = __rep_do_ckp(env, rec, rp)) == 0) + ret = __log_rep_put(env, &rp->lsn, rec, + DB_LOG_CHKPNT); + if ((t_ret = __rep_remfirst(env, ip, + &control_dbt, &rec_dbt)) != 0 && ret == 0) + ret = t_ret; + /* + * If we're successful putting the log record in the + * log, flush it for a checkpoint. + */ + if (ret == 0) { + *ret_lsnp = rp->lsn; + ret = __log_flush(env, NULL); + } + break; + default: + break; + } + +out: + if (ret == 0 && F_ISSET(rp, REPCTL_PERM)) + *ret_lsnp = rp->lsn; + if (IS_USING_LEASES(env) && + F_ISSET(rp, REPCTL_LEASE)) + *ret_tsp = msg_time; + /* + * Set ret_lsnp before flushing the log because if the + * flush fails, we've still written the record to the + * log and the LSN has been entered. + */ + if (ret == 0 && F_ISSET(rp, REPCTL_FLUSH)) + ret = __log_flush(env, NULL); + if (control_dbt.data != NULL) + __os_ufree(env, control_dbt.data); + if (rec_dbt.data != NULL) + __os_ufree(env, rec_dbt.data); + + return (ret); +} + +/* + * __rep_resend_req -- + * We might have dropped a message, we need to resend our request. + * The request we send is dependent on what recovery state we're in. + * The caller holds no locks. + * + * PUBLIC: int __rep_resend_req __P((ENV *, int)); + */ +int +__rep_resend_req(env, rereq) + ENV *env; + int rereq; +{ + DB_LOG *dblp; + DB_LSN lsn, *lsnp; + DB_REP *db_rep; + LOG *lp; + REP *rep; + int master, ret; + u_int32_t gapflags, msgtype, repflags, sendflags; + + db_rep = env->rep_handle; + rep = db_rep->region; + dblp = env->lg_handle; + lp = dblp->reginfo.primary; + ret = 0; + lsnp = NULL; + msgtype = REP_INVALID; + sendflags = 0; + + repflags = rep->flags; + /* + * If we are delayed we do not rerequest anything. + */ + if (FLD_ISSET(repflags, REP_F_DELAY)) + return (ret); + gapflags = rereq ? REP_GAP_REREQUEST : 0; + + if (FLD_ISSET(repflags, REP_F_RECOVER_VERIFY)) { + MUTEX_LOCK(env, rep->mtx_clientdb); + lsn = lp->verify_lsn; + MUTEX_UNLOCK(env, rep->mtx_clientdb); + if (!IS_ZERO_LSN(lsn)) { + msgtype = REP_VERIFY_REQ; + lsnp = &lsn; + sendflags = DB_REP_REREQUEST; + } + } else if (FLD_ISSET(repflags, REP_F_RECOVER_UPDATE)) { + /* + * UPDATE_REQ only goes to the master. + */ + msgtype = REP_UPDATE_REQ; + } else if (FLD_ISSET(repflags, REP_F_RECOVER_PAGE)) { + REP_SYSTEM_LOCK(env); + ret = __rep_pggap_req(env, rep, NULL, gapflags); + REP_SYSTEM_UNLOCK(env); + } else { + MUTEX_LOCK(env, rep->mtx_clientdb); + ret = __rep_loggap_req(env, rep, NULL, gapflags); + MUTEX_UNLOCK(env, rep->mtx_clientdb); + } + + if (msgtype != REP_INVALID) { + master = rep->master_id; + if (master == DB_EID_INVALID) + (void)__rep_send_message(env, + DB_EID_BROADCAST, REP_MASTER_REQ, NULL, NULL, 0, 0); + else + (void)__rep_send_message(env, + master, msgtype, lsnp, NULL, 0, sendflags); + } + + return (ret); +} + +/* + * __rep_check_doreq -- + * PUBLIC: int __rep_check_doreq __P((ENV *, REP *)); + * + * Check if we need to send another request. If so, compare with + * the request limits the user might have set. This assumes the + * caller holds the REP->mtx_clientdb mutex. Returns 1 if a request + * needs to be made, and 0 if it does not. + */ +int +__rep_check_doreq(env, rep) + ENV *env; + REP *rep; +{ + + DB_LOG *dblp; + LOG *lp; + db_timespec now; + int req; + + dblp = env->lg_handle; + lp = dblp->reginfo.primary; + __os_gettime(env, &now, 1); + timespecsub(&now, &lp->rcvd_ts); + req = timespeccmp(&now, &lp->wait_ts, >=); + if (req) { + /* + * Add wait_ts to itself to double it. + */ + timespecadd(&lp->wait_ts, &lp->wait_ts); + if (timespeccmp(&lp->wait_ts, &rep->max_gap, >)) + lp->wait_ts = rep->max_gap; + __os_gettime(env, &lp->rcvd_ts, 1); + } + return (req); +} + +/* + * __rep_skip_msg - + * + * If we're in recovery we want to skip/ignore the message, but + * we also need to see if we need to re-request any retransmissions. + */ +static int +__rep_skip_msg(env, rep, eid, rectype) + ENV *env; + REP *rep; + int eid; + u_int32_t rectype; +{ + int do_req, ret; + + ret = 0; + /* + * If we have a request message from a client then immediately + * send a REP_REREQUEST back to that client since we're skipping it. + */ + if (F_ISSET(rep, REP_F_CLIENT) && REP_MSG_REQ(rectype)) + do_req = 1; + else { + /* Check for need to retransmit. */ + MUTEX_LOCK(env, rep->mtx_clientdb); + do_req = __rep_check_doreq(env, rep); + MUTEX_UNLOCK(env, rep->mtx_clientdb); + } + /* + * Don't respond to a MASTER_REQ with + * a MASTER_REQ or REREQUEST. + */ + if (do_req && rectype != REP_MASTER_REQ) { + /* + * There are three cases: + * 1. If we don't know who the master is, then send MASTER_REQ. + * 2. If the message we're skipping came from the master, + * then we need to rerequest. + * 3. If the message didn't come from a master (i.e. client + * to client), then send a rerequest back to the sender so + * the sender can rerequest it elsewhere, if we are a client. + */ + if (rep->master_id == DB_EID_INVALID) /* Case 1. */ + (void)__rep_send_message(env, + DB_EID_BROADCAST, REP_MASTER_REQ, NULL, NULL, 0, 0); + else if (eid == rep->master_id) /* Case 2. */ + ret = __rep_resend_req(env, 0); + else if (F_ISSET(rep, REP_F_CLIENT)) /* Case 3. */ + (void)__rep_send_message(env, + eid, REP_REREQUEST, NULL, NULL, 0, 0); + } + return (ret); +} + +static int +__rep_fire_newmaster(env, gen, master) + ENV *env; + u_int32_t gen; + int master; +{ + DB_REP *db_rep; + REP *rep; + + db_rep = env->rep_handle; + rep = db_rep->region; + + REP_EVENT_LOCK(env); + /* + * The firing of this event should be idempotent with respect to a + * particular generation number. + */ + if (rep->newmaster_event_gen < gen) { + __rep_fire_event(env, DB_EVENT_REP_NEWMASTER, &master); + rep->newmaster_event_gen = gen; + } + REP_EVENT_UNLOCK(env); + return (0); +} + +static int +__rep_fire_startupdone(env, gen, master) + ENV *env; + u_int32_t gen; + int master; +{ + DB_REP *db_rep; + REP *rep; + + db_rep = env->rep_handle; + rep = db_rep->region; + + REP_EVENT_LOCK(env); + /* + * Usually NEWMASTER will already have been fired. But if not, fire + * it here now, to ensure the application receives events in the + * expected order. + */ + if (rep->newmaster_event_gen < gen) { + __rep_fire_event(env, DB_EVENT_REP_NEWMASTER, &master); + rep->newmaster_event_gen = gen; + } + + /* + * Caller already ensures that it only tries to fire STARTUPDONE once + * per generation. If we did not want to rely on that, we could add a + * simple boolean flag (to the set of data protected by the mtx_event). + * The precise meaning of that flag would be "STARTUPDONE has been fired + * for the generation value stored in `newmaster_event_gen'". Then the + * more accurate test here would be simply to check that flag, and fire + * the event (and set the flag) if it were not already set. + */ + if (rep->newmaster_event_gen == gen) + __rep_fire_event(env, DB_EVENT_REP_STARTUPDONE, NULL); + REP_EVENT_UNLOCK(env); + return (0); +} |