From c4954dc33c91c3815dda286b765e7164ec2b3eba Mon Sep 17 00:00:00 2001 From: "Brian S. O'Neill" Date: Sat, 17 Feb 2007 01:28:20 +0000 Subject: ReplicatedRepository installs user triggers on the replica again, but it now disables all triggers during resync to prevent errors. When triggers were on master, downstream triggers would not see changes made by earlier triggers. --- .../repo/replicated/ReplicatedRepository.java | 26 ++++----- .../repo/replicated/ReplicatedStorage.java | 29 +++++----- .../repo/replicated/ReplicationTrigger.java | 62 +++++++--------------- 3 files changed, 48 insertions(+), 69 deletions(-) (limited to 'src/main/java/com/amazon/carbonado/repo/replicated') diff --git a/src/main/java/com/amazon/carbonado/repo/replicated/ReplicatedRepository.java b/src/main/java/com/amazon/carbonado/repo/replicated/ReplicatedRepository.java index c292d9c..3b215bc 100644 --- a/src/main/java/com/amazon/carbonado/repo/replicated/ReplicatedRepository.java +++ b/src/main/java/com/amazon/carbonado/repo/replicated/ReplicatedRepository.java @@ -356,9 +356,9 @@ class ReplicatedRepository Object... filterValues) throws RepositoryException { - ReplicationTrigger trigger; + ReplicationTrigger replicationTrigger; if (storageFor(type) instanceof ReplicatedStorage) { - trigger = ((ReplicatedStorage) storageFor(type)).getTrigger(); + replicationTrigger = ((ReplicatedStorage) storageFor(type)).getReplicationTrigger(); } else { throw new UnsupportedTypeException("Storable type is not replicated", type); } @@ -413,7 +413,7 @@ class ReplicatedRepository try { replicaTxn.setForUpdate(true); - resync(trigger, + resync(replicationTrigger, replicaStorage, replicaQuery, masterStorage, masterQuery, throttle, desiredSpeed, @@ -426,7 +426,7 @@ class ReplicatedRepository } @SuppressWarnings("unchecked") - private void resync(ReplicationTrigger trigger, + private void resync(ReplicationTrigger replicationTrigger, Storage replicaStorage, Query replicaQuery, Storage masterStorage, Query masterQuery, Throttle throttle, double desiredSpeed, @@ -493,7 +493,7 @@ class ReplicatedRepository if (replicaWithKeyOnly != null) { // Delete corrupt replica entry. try { - trigger.deleteReplica(replicaWithKeyOnly); + replicationTrigger.deleteReplica(replicaWithKeyOnly); log.info("Deleted corrupt replica entry: " + replicaWithKeyOnly.toStringKeyOnly(), e); skip = false; @@ -551,7 +551,7 @@ class ReplicatedRepository if (compare < 0) { // Bogus record exists only in replica so delete it. - resyncTask = prepareResyncTask(trigger, replicaEntry, null); + resyncTask = prepareResyncTask(replicationTrigger, replicaEntry, null); // Allow replica to advance. if (replicaCursor == null) { replicaCursor = replicaQuery.fetchAfter(replicaEntry); @@ -560,7 +560,7 @@ class ReplicatedRepository replicaEntry = null; } else if (compare > 0) { // Replica cursor is missing an entry so copy it. - resyncTask = prepareResyncTask(trigger, null, masterEntry); + resyncTask = prepareResyncTask(replicationTrigger, null, masterEntry); // Allow master to advance. lastMasterEntry = masterEntry; masterEntry = null; @@ -576,7 +576,8 @@ class ReplicatedRepository // Both replicaEntry and masterEntry are non-null. if (!replicaEntry.equalProperties(masterEntry)) { // Replica is stale. - resyncTask = prepareResyncTask(trigger, replicaEntry, masterEntry); + resyncTask = prepareResyncTask + (replicationTrigger, replicaEntry, masterEntry); } // Entries are synchronized so allow both cursors to advance. @@ -607,9 +608,10 @@ class ReplicatedRepository } } - private Runnable prepareResyncTask(final ReplicationTrigger trigger, - final S replicaEntry, - final S masterEntry) + private Runnable prepareResyncTask + (final ReplicationTrigger replicationTrigger, + final S replicaEntry, + final S masterEntry) throws RepositoryException { if (replicaEntry == null && masterEntry == null) { @@ -622,7 +624,7 @@ class ReplicatedRepository Runnable task = new Runnable() { public void run() { try { - trigger.resyncEntries(replicaEntry, masterEntry); + replicationTrigger.resyncEntries(replicaEntry, masterEntry); } catch (Exception e) { LogFactory.getLog(ReplicatedRepository.class).error(null, e); } diff --git a/src/main/java/com/amazon/carbonado/repo/replicated/ReplicatedStorage.java b/src/main/java/com/amazon/carbonado/repo/replicated/ReplicatedStorage.java index ebb6db6..9711784 100644 --- a/src/main/java/com/amazon/carbonado/repo/replicated/ReplicatedStorage.java +++ b/src/main/java/com/amazon/carbonado/repo/replicated/ReplicatedStorage.java @@ -43,7 +43,7 @@ import com.amazon.carbonado.spi.BelatedStorageCreator; class ReplicatedStorage implements Storage { final Storage mReplicaStorage; final Storage mMasterStorage; - final ReplicationTrigger mTrigger; + final ReplicationTrigger mReplicationTrigger; /** * @throws UnsupportedTypeException if master doesn't support Storable, but @@ -63,8 +63,13 @@ class ReplicatedStorage implements Storage { ReplicatedRepositoryBuilder.DEFAULT_RETRY_MILLIS); mMasterStorage = creator.get(ReplicatedRepositoryBuilder.DEFAULT_MASTER_TIMEOUT_MILLIS); - mTrigger = new ReplicationTrigger(aRepository, mReplicaStorage, mMasterStorage); - mReplicaStorage.addTrigger(mTrigger); + + // ReplicationTrigger contains internal TriggerManager, and all other + // triggers should register with the ReplicationTrigger. This allows + // all triggers to be easily disabled during resync and repairs. + + mReplicationTrigger = new ReplicationTrigger + (aRepository, mReplicaStorage, mMasterStorage); } /** @@ -76,8 +81,8 @@ class ReplicatedStorage implements Storage { { mReplicaStorage = replicaStorage; mMasterStorage = masterStorage; - mTrigger = new ReplicationTrigger(aRepository, mReplicaStorage, masterStorage); - mReplicaStorage.addTrigger(mTrigger); + mReplicationTrigger = new ReplicationTrigger + (aRepository, mReplicaStorage, masterStorage); } public Class getStorableType() { @@ -100,21 +105,15 @@ class ReplicatedStorage implements Storage { return mReplicaStorage.query(filter); } - // Note: All user triggers must be added to the master storage. Otherwise, - // resync operations can cause the triggers to run again, which can be - // disastrous. If triggers ever support "after load" events, things get - // complicated. Perhaps this use case is a good example for why supporting - // "after load" events might be bad. - public boolean addTrigger(Trigger trigger) { - return mMasterStorage.addTrigger(trigger); + return mReplicationTrigger.addTrigger(trigger); } public boolean removeTrigger(Trigger trigger) { - return mMasterStorage.removeTrigger(trigger); + return mReplicationTrigger.removeTrigger(trigger); } - ReplicationTrigger getTrigger() { - return mTrigger; + ReplicationTrigger getReplicationTrigger() { + return mReplicationTrigger; } } diff --git a/src/main/java/com/amazon/carbonado/repo/replicated/ReplicationTrigger.java b/src/main/java/com/amazon/carbonado/repo/replicated/ReplicationTrigger.java index 249a779..7f743ac 100644 --- a/src/main/java/com/amazon/carbonado/repo/replicated/ReplicationTrigger.java +++ b/src/main/java/com/amazon/carbonado/repo/replicated/ReplicationTrigger.java @@ -18,8 +18,6 @@ package com.amazon.carbonado.repo.replicated; -import java.util.concurrent.atomic.AtomicInteger; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -36,6 +34,7 @@ import com.amazon.carbonado.Trigger; import com.amazon.carbonado.UniqueConstraintException; import com.amazon.carbonado.spi.RepairExecutor; +import com.amazon.carbonado.spi.TriggerManager; /** * All inserts/updates/deletes are first committed to the master storage, then @@ -49,7 +48,7 @@ class ReplicationTrigger extends Trigger { private final Storage mReplicaStorage; private final Storage mMasterStorage; - private final ThreadLocal mDisabled = new ThreadLocal(); + private final TriggerManager mTriggerManager; ReplicationTrigger(Repository repository, Storage replicaStorage, @@ -58,6 +57,11 @@ class ReplicationTrigger extends Trigger { mRepository = repository; mReplicaStorage = replicaStorage; mMasterStorage = masterStorage; + // Use TriggerManager to locally disable trigger execution during + // resync and repairs. + mTriggerManager = new TriggerManager(); + mTriggerManager.addTrigger(this); + replicaStorage.addTrigger(mTriggerManager); } @Override @@ -71,10 +75,6 @@ class ReplicationTrigger extends Trigger { } private Object beforeInsert(S replica, boolean forTry) throws PersistException { - if (isReplicationDisabled()) { - return null; - } - final S master = mMasterStorage.prepare(); replica.copyAllProperties(master); @@ -129,10 +129,6 @@ class ReplicationTrigger extends Trigger { } private Object beforeUpdate(S replica, boolean forTry) throws PersistException { - if (isReplicationDisabled()) { - return null; - } - final S master = mMasterStorage.prepare(); replica.copyPrimaryKeyProperties(master); @@ -209,10 +205,6 @@ class ReplicationTrigger extends Trigger { @Override public Object beforeDelete(S replica) throws PersistException { - if (isReplicationDisabled()) { - return null; - } - S master = mMasterStorage.prepare(); replica.copyPrimaryKeyProperties(master); @@ -373,6 +365,14 @@ class ReplicationTrigger extends Trigger { }); } + boolean addTrigger(Trigger trigger) { + return mTriggerManager.addTrigger(trigger); + } + + boolean removeTrigger(Trigger trigger) { + return mTriggerManager.removeTrigger(trigger); + } + /** * Deletes the replica entry with replication disabled. */ @@ -401,35 +401,13 @@ class ReplicationTrigger extends Trigger { } } - /** - * Returns true if replication is disabled for the current thread. - */ - private boolean isReplicationDisabled() { - // Count indicates how many times disabled (nested) - AtomicInteger i = mDisabled.get(); - return i != null && i.get() > 0; - } - - /** - * By default, replication is enabled for the current thread. Pass true to - * disable during re-sync operations. - */ - private void setReplicationDisabled(boolean disabled) { - // Using a count allows this method call to be nested. Based on the - // current implementation, it should never be nested, so this extra - // work is just a safeguard. - AtomicInteger i = mDisabled.get(); + void setReplicationDisabled(boolean disabled) { + // This method disables not only this trigger, but all triggers added + // to manager. if (disabled) { - if (i == null) { - i = new AtomicInteger(1); - mDisabled.set(i); - } else { - i.incrementAndGet(); - } + mTriggerManager.localDisable(); } else { - if (i != null) { - i.decrementAndGet(); - } + mTriggerManager.localEnable(); } } } -- cgit v1.2.3