From c4954dc33c91c3815dda286b765e7164ec2b3eba Mon Sep 17 00:00:00 2001 From: "Brian S. O'Neill" Date: Sat, 17 Feb 2007 01:28:20 +0000 Subject: ReplicatedRepository installs user triggers on the replica again, but it now disables all triggers during resync to prevent errors. When triggers were on master, downstream triggers would not see changes made by earlier triggers. --- RELEASE-NOTES.txt | 3 + .../repo/replicated/ReplicatedRepository.java | 26 +- .../repo/replicated/ReplicatedStorage.java | 29 +-- .../repo/replicated/ReplicationTrigger.java | 62 ++--- .../com/amazon/carbonado/spi/TriggerManager.java | 282 +++++++++++++++++---- 5 files changed, 281 insertions(+), 121 deletions(-) diff --git a/RELEASE-NOTES.txt b/RELEASE-NOTES.txt index e2d3ede..ea9ff54 100644 --- a/RELEASE-NOTES.txt +++ b/RELEASE-NOTES.txt @@ -7,6 +7,9 @@ Carbonado change history cleared. Otherwise, indexes on newly added properties might not get updated. - Added system properties to control performance characteristics of MergeSortBuffer. - Fixed verify error in generation of markPropertiesDirty. +- ReplicatedRepository installs user triggers on the replica again, but it now + disables all triggers during resync to prevent errors. When triggers were on + master, downstream triggers would not see changes made by earlier triggers. 1.1-BETA9 to 1.1-BETA10 ------------------------------- diff --git a/src/main/java/com/amazon/carbonado/repo/replicated/ReplicatedRepository.java b/src/main/java/com/amazon/carbonado/repo/replicated/ReplicatedRepository.java index c292d9c..3b215bc 100644 --- a/src/main/java/com/amazon/carbonado/repo/replicated/ReplicatedRepository.java +++ b/src/main/java/com/amazon/carbonado/repo/replicated/ReplicatedRepository.java @@ -356,9 +356,9 @@ class ReplicatedRepository Object... filterValues) throws RepositoryException { - ReplicationTrigger trigger; + ReplicationTrigger replicationTrigger; if (storageFor(type) instanceof ReplicatedStorage) { - trigger = ((ReplicatedStorage) storageFor(type)).getTrigger(); + replicationTrigger = ((ReplicatedStorage) storageFor(type)).getReplicationTrigger(); } else { throw new UnsupportedTypeException("Storable type is not replicated", type); } @@ -413,7 +413,7 @@ class ReplicatedRepository try { replicaTxn.setForUpdate(true); - resync(trigger, + resync(replicationTrigger, replicaStorage, replicaQuery, masterStorage, masterQuery, throttle, desiredSpeed, @@ -426,7 +426,7 @@ class ReplicatedRepository } @SuppressWarnings("unchecked") - private void resync(ReplicationTrigger trigger, + private void resync(ReplicationTrigger replicationTrigger, Storage replicaStorage, Query replicaQuery, Storage masterStorage, Query masterQuery, Throttle throttle, double desiredSpeed, @@ -493,7 +493,7 @@ class ReplicatedRepository if (replicaWithKeyOnly != null) { // Delete corrupt replica entry. try { - trigger.deleteReplica(replicaWithKeyOnly); + replicationTrigger.deleteReplica(replicaWithKeyOnly); log.info("Deleted corrupt replica entry: " + replicaWithKeyOnly.toStringKeyOnly(), e); skip = false; @@ -551,7 +551,7 @@ class ReplicatedRepository if (compare < 0) { // Bogus record exists only in replica so delete it. - resyncTask = prepareResyncTask(trigger, replicaEntry, null); + resyncTask = prepareResyncTask(replicationTrigger, replicaEntry, null); // Allow replica to advance. if (replicaCursor == null) { replicaCursor = replicaQuery.fetchAfter(replicaEntry); @@ -560,7 +560,7 @@ class ReplicatedRepository replicaEntry = null; } else if (compare > 0) { // Replica cursor is missing an entry so copy it. - resyncTask = prepareResyncTask(trigger, null, masterEntry); + resyncTask = prepareResyncTask(replicationTrigger, null, masterEntry); // Allow master to advance. lastMasterEntry = masterEntry; masterEntry = null; @@ -576,7 +576,8 @@ class ReplicatedRepository // Both replicaEntry and masterEntry are non-null. if (!replicaEntry.equalProperties(masterEntry)) { // Replica is stale. - resyncTask = prepareResyncTask(trigger, replicaEntry, masterEntry); + resyncTask = prepareResyncTask + (replicationTrigger, replicaEntry, masterEntry); } // Entries are synchronized so allow both cursors to advance. @@ -607,9 +608,10 @@ class ReplicatedRepository } } - private Runnable prepareResyncTask(final ReplicationTrigger trigger, - final S replicaEntry, - final S masterEntry) + private Runnable prepareResyncTask + (final ReplicationTrigger replicationTrigger, + final S replicaEntry, + final S masterEntry) throws RepositoryException { if (replicaEntry == null && masterEntry == null) { @@ -622,7 +624,7 @@ class ReplicatedRepository Runnable task = new Runnable() { public void run() { try { - trigger.resyncEntries(replicaEntry, masterEntry); + replicationTrigger.resyncEntries(replicaEntry, masterEntry); } catch (Exception e) { LogFactory.getLog(ReplicatedRepository.class).error(null, e); } diff --git a/src/main/java/com/amazon/carbonado/repo/replicated/ReplicatedStorage.java b/src/main/java/com/amazon/carbonado/repo/replicated/ReplicatedStorage.java index ebb6db6..9711784 100644 --- a/src/main/java/com/amazon/carbonado/repo/replicated/ReplicatedStorage.java +++ b/src/main/java/com/amazon/carbonado/repo/replicated/ReplicatedStorage.java @@ -43,7 +43,7 @@ import com.amazon.carbonado.spi.BelatedStorageCreator; class ReplicatedStorage implements Storage { final Storage mReplicaStorage; final Storage mMasterStorage; - final ReplicationTrigger mTrigger; + final ReplicationTrigger mReplicationTrigger; /** * @throws UnsupportedTypeException if master doesn't support Storable, but @@ -63,8 +63,13 @@ class ReplicatedStorage implements Storage { ReplicatedRepositoryBuilder.DEFAULT_RETRY_MILLIS); mMasterStorage = creator.get(ReplicatedRepositoryBuilder.DEFAULT_MASTER_TIMEOUT_MILLIS); - mTrigger = new ReplicationTrigger(aRepository, mReplicaStorage, mMasterStorage); - mReplicaStorage.addTrigger(mTrigger); + + // ReplicationTrigger contains internal TriggerManager, and all other + // triggers should register with the ReplicationTrigger. This allows + // all triggers to be easily disabled during resync and repairs. + + mReplicationTrigger = new ReplicationTrigger + (aRepository, mReplicaStorage, mMasterStorage); } /** @@ -76,8 +81,8 @@ class ReplicatedStorage implements Storage { { mReplicaStorage = replicaStorage; mMasterStorage = masterStorage; - mTrigger = new ReplicationTrigger(aRepository, mReplicaStorage, masterStorage); - mReplicaStorage.addTrigger(mTrigger); + mReplicationTrigger = new ReplicationTrigger + (aRepository, mReplicaStorage, masterStorage); } public Class getStorableType() { @@ -100,21 +105,15 @@ class ReplicatedStorage implements Storage { return mReplicaStorage.query(filter); } - // Note: All user triggers must be added to the master storage. Otherwise, - // resync operations can cause the triggers to run again, which can be - // disastrous. If triggers ever support "after load" events, things get - // complicated. Perhaps this use case is a good example for why supporting - // "after load" events might be bad. - public boolean addTrigger(Trigger trigger) { - return mMasterStorage.addTrigger(trigger); + return mReplicationTrigger.addTrigger(trigger); } public boolean removeTrigger(Trigger trigger) { - return mMasterStorage.removeTrigger(trigger); + return mReplicationTrigger.removeTrigger(trigger); } - ReplicationTrigger getTrigger() { - return mTrigger; + ReplicationTrigger getReplicationTrigger() { + return mReplicationTrigger; } } diff --git a/src/main/java/com/amazon/carbonado/repo/replicated/ReplicationTrigger.java b/src/main/java/com/amazon/carbonado/repo/replicated/ReplicationTrigger.java index 249a779..7f743ac 100644 --- a/src/main/java/com/amazon/carbonado/repo/replicated/ReplicationTrigger.java +++ b/src/main/java/com/amazon/carbonado/repo/replicated/ReplicationTrigger.java @@ -18,8 +18,6 @@ package com.amazon.carbonado.repo.replicated; -import java.util.concurrent.atomic.AtomicInteger; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -36,6 +34,7 @@ import com.amazon.carbonado.Trigger; import com.amazon.carbonado.UniqueConstraintException; import com.amazon.carbonado.spi.RepairExecutor; +import com.amazon.carbonado.spi.TriggerManager; /** * All inserts/updates/deletes are first committed to the master storage, then @@ -49,7 +48,7 @@ class ReplicationTrigger extends Trigger { private final Storage mReplicaStorage; private final Storage mMasterStorage; - private final ThreadLocal mDisabled = new ThreadLocal(); + private final TriggerManager mTriggerManager; ReplicationTrigger(Repository repository, Storage replicaStorage, @@ -58,6 +57,11 @@ class ReplicationTrigger extends Trigger { mRepository = repository; mReplicaStorage = replicaStorage; mMasterStorage = masterStorage; + // Use TriggerManager to locally disable trigger execution during + // resync and repairs. + mTriggerManager = new TriggerManager(); + mTriggerManager.addTrigger(this); + replicaStorage.addTrigger(mTriggerManager); } @Override @@ -71,10 +75,6 @@ class ReplicationTrigger extends Trigger { } private Object beforeInsert(S replica, boolean forTry) throws PersistException { - if (isReplicationDisabled()) { - return null; - } - final S master = mMasterStorage.prepare(); replica.copyAllProperties(master); @@ -129,10 +129,6 @@ class ReplicationTrigger extends Trigger { } private Object beforeUpdate(S replica, boolean forTry) throws PersistException { - if (isReplicationDisabled()) { - return null; - } - final S master = mMasterStorage.prepare(); replica.copyPrimaryKeyProperties(master); @@ -209,10 +205,6 @@ class ReplicationTrigger extends Trigger { @Override public Object beforeDelete(S replica) throws PersistException { - if (isReplicationDisabled()) { - return null; - } - S master = mMasterStorage.prepare(); replica.copyPrimaryKeyProperties(master); @@ -373,6 +365,14 @@ class ReplicationTrigger extends Trigger { }); } + boolean addTrigger(Trigger trigger) { + return mTriggerManager.addTrigger(trigger); + } + + boolean removeTrigger(Trigger trigger) { + return mTriggerManager.removeTrigger(trigger); + } + /** * Deletes the replica entry with replication disabled. */ @@ -401,35 +401,13 @@ class ReplicationTrigger extends Trigger { } } - /** - * Returns true if replication is disabled for the current thread. - */ - private boolean isReplicationDisabled() { - // Count indicates how many times disabled (nested) - AtomicInteger i = mDisabled.get(); - return i != null && i.get() > 0; - } - - /** - * By default, replication is enabled for the current thread. Pass true to - * disable during re-sync operations. - */ - private void setReplicationDisabled(boolean disabled) { - // Using a count allows this method call to be nested. Based on the - // current implementation, it should never be nested, so this extra - // work is just a safeguard. - AtomicInteger i = mDisabled.get(); + void setReplicationDisabled(boolean disabled) { + // This method disables not only this trigger, but all triggers added + // to manager. if (disabled) { - if (i == null) { - i = new AtomicInteger(1); - mDisabled.set(i); - } else { - i.incrementAndGet(); - } + mTriggerManager.localDisable(); } else { - if (i != null) { - i.decrementAndGet(); - } + mTriggerManager.localEnable(); } } } diff --git a/src/main/java/com/amazon/carbonado/spi/TriggerManager.java b/src/main/java/com/amazon/carbonado/spi/TriggerManager.java index 147d1ed..a67b134 100644 --- a/src/main/java/com/amazon/carbonado/spi/TriggerManager.java +++ b/src/main/java/com/amazon/carbonado/spi/TriggerManager.java @@ -23,6 +23,9 @@ import java.lang.reflect.Method; import java.util.ArrayList; import java.util.Arrays; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; + import com.amazon.carbonado.PersistException; import com.amazon.carbonado.RepositoryException; import com.amazon.carbonado.Storable; @@ -36,7 +39,7 @@ import com.amazon.carbonado.TriggerFactory; * * @author Brian S O'Neill */ -public class TriggerManager { +public class TriggerManager extends Trigger { // Bit masks returned by selectTypes. private static final int FOR_INSERT = 1; private static final int FOR_UPDATE = 2; @@ -91,9 +94,9 @@ public class TriggerManager { } } - private volatile ForInsert mForInsert; - private volatile ForUpdate mForUpdate; - private volatile ForDelete mForDelete; + private final ForInsert mForInsert = new ForInsert(); + private final ForUpdate mForUpdate = new ForUpdate(); + private final ForDelete mForDelete = new ForDelete(); public TriggerManager() { } @@ -111,102 +114,74 @@ public class TriggerManager { } /** - * Returns consolidated trigger to call for insert operations, or null if - * none. + * Returns a consolidated trigger to call for insert operations, or null if + * none. If not null, the consolidated trigger is not a snapshot -- it will + * change as the set of triggers in this manager changes. */ public Trigger getInsertTrigger() { - return mForInsert; + ForInsert forInsert = mForInsert; + return forInsert.isEmpty() ? null : forInsert; } /** - * Returns consolidated trigger to call for update operations, or null if - * none. + * Returns a consolidated trigger to call for update operations, or null if + * none. If not null, the consolidated trigger is not a snapshot -- it will + * change as the set of triggers in this manager changes. */ public Trigger getUpdateTrigger() { - return mForUpdate; + ForUpdate forUpdate = mForUpdate; + return forUpdate.isEmpty() ? null : forUpdate; } /** - * Returns consolidated trigger to call for delete operations, or null if - * none. + * Returns a consolidated trigger to call for delete operations, or null if + * none. If not null, the consolidated trigger is not a snapshot -- it will + * change as the set of triggers in this manager changes. */ public Trigger getDeleteTrigger() { - return mForDelete; + ForDelete forDelete = mForDelete; + return forDelete.isEmpty() ? null : forDelete; } - public synchronized boolean addTrigger(Trigger trigger) { + public boolean addTrigger(Trigger trigger) { if (trigger == null) { throw new IllegalArgumentException(); } int types = selectTypes(trigger); - if (types == 0) { - return false; - } boolean retValue = false; if ((types & FOR_INSERT) != 0) { - if (mForInsert == null) { - mForInsert = new ForInsert(); - } retValue |= mForInsert.add(trigger); } - if ((types & FOR_UPDATE) != 0) { - if (mForUpdate == null) { - mForUpdate = new ForUpdate(); - } retValue |= mForUpdate.add(trigger); } - if ((types & FOR_DELETE) != 0) { - if (mForDelete == null) { - mForDelete = new ForDelete(); - } retValue |= mForDelete.add(trigger); } return retValue; } - public synchronized boolean removeTrigger(Trigger trigger) { + public boolean removeTrigger(Trigger trigger) { if (trigger == null) { throw new IllegalArgumentException(); } int types = selectTypes(trigger); - if (types == 0) { - return false; - } boolean retValue = false; if ((types & FOR_INSERT) != 0) { - if (mForInsert != null && mForInsert.remove(trigger)) { - retValue = true; - if (mForInsert.isEmpty()) { - mForInsert = null; - } - } + retValue |= mForInsert.remove(trigger); } - if ((types & FOR_UPDATE) != 0) { - if (mForUpdate != null && mForUpdate.remove(trigger)) { - retValue = true; - if (mForUpdate.isEmpty()) { - mForUpdate = null; - } - } + retValue |= mForUpdate.remove(trigger); } - if ((types & FOR_DELETE) != 0) { - if (mForDelete != null && mForDelete.remove(trigger)) { - retValue = true; - if (mForDelete.isEmpty()) { - mForDelete = null; - } - } + retValue |= mForDelete.remove(trigger); } return retValue; @@ -223,6 +198,102 @@ public class TriggerManager { } } + /** + * Disables execution of all managed triggers for the current thread. Call + * localEnable to enable again. This call can be made multiple times, but + * be sure to call localEnable the same number of times to fully enable. + */ + public void localDisable() { + mForInsert.localDisable(); + mForUpdate.localDisable(); + mForDelete.localDisable(); + } + + /** + * Enables execution of all managed triggers for the current thread, if + * they had been disabled before. + */ + public void localEnable() { + mForInsert.localEnable(); + mForUpdate.localEnable(); + mForDelete.localEnable(); + } + + @Override + public Object beforeInsert(S storable) throws PersistException { + return mForInsert.beforeInsert(storable); + } + + @Override + public Object beforeTryInsert(S storable) throws PersistException { + return mForInsert.beforeTryInsert(storable); + } + + @Override + public void afterInsert(S storable, Object state) throws PersistException { + mForInsert.afterInsert(storable, state); + } + + @Override + public void afterTryInsert(S storable, Object state) throws PersistException { + mForInsert.afterTryInsert(storable, state); + } + + @Override + public void failedInsert(S storable, Object state) { + mForInsert.failedInsert(storable, state); + } + + @Override + public Object beforeUpdate(S storable) throws PersistException { + return mForUpdate.beforeUpdate(storable); + } + + @Override + public Object beforeTryUpdate(S storable) throws PersistException { + return mForUpdate.beforeTryUpdate(storable); + } + + @Override + public void afterUpdate(S storable, Object state) throws PersistException { + mForUpdate.afterUpdate(storable, state); + } + + @Override + public void afterTryUpdate(S storable, Object state) throws PersistException { + mForUpdate.afterTryUpdate(storable, state); + } + + @Override + public void failedUpdate(S storable, Object state) { + mForUpdate.failedUpdate(storable, state); + } + + @Override + public Object beforeDelete(S storable) throws PersistException { + return mForDelete.beforeDelete(storable); + } + + @Override + public Object beforeTryDelete(S storable) throws PersistException { + return mForDelete.beforeTryDelete(storable); + } + + @Override + public void afterDelete(S storable, Object state) throws PersistException { + mForDelete.afterDelete(storable, state); + } + + @Override + public void afterTryDelete(S storable, Object state) throws PersistException { + mForDelete.afterTryDelete(storable, state); + } + + @Override + public void failedDelete(S storable, Object state) { + mForDelete.failedDelete(storable, state); + } + /** * Determines which operations the given trigger overrides. */ @@ -281,10 +352,16 @@ public class TriggerManager { } private static abstract class ForSomething extends Trigger { + private static final AtomicReferenceFieldUpdater + cDisabledFlagRef = AtomicReferenceFieldUpdater + .newUpdater(ForSomething.class, ThreadLocal.class, "mDisabledFlag"); + private static Trigger[] NO_TRIGGERS = new Trigger[0]; protected volatile Trigger[] mTriggers; + private volatile ThreadLocal mDisabledFlag; + ForSomething() { mTriggers = NO_TRIGGERS; } @@ -313,11 +390,56 @@ public class TriggerManager { boolean isEmpty() { return mTriggers.length == 0; } + + boolean isLocallyDisabled() { + ThreadLocal disabledFlag = mDisabledFlag; + if (disabledFlag == null) { + return false; + } + // Count indicates how many times disabled (nested) + AtomicInteger i = disabledFlag.get(); + return i != null && i.get() > 0; + } + + void localDisable() { + // Using a count allows this method call to be nested. + ThreadLocal disabledFlag = disabledFlag(); + AtomicInteger i = disabledFlag.get(); + if (i == null) { + disabledFlag.set(new AtomicInteger(1)); + } else { + i.incrementAndGet(); + } + } + + void localEnable() { + // Using a count allows this method call to be nested. + AtomicInteger i = disabledFlag().get(); + if (i != null) { + i.decrementAndGet(); + } + } + + private ThreadLocal disabledFlag() { + ThreadLocal disabledFlag = mDisabledFlag; + while (disabledFlag == null) { + disabledFlag = new ThreadLocal(); + if (cDisabledFlagRef.compareAndSet(this, null, disabledFlag)) { + break; + } + disabledFlag = mDisabledFlag; + } + return disabledFlag; + } } private static class ForInsert extends ForSomething { @Override public Object beforeInsert(S storable) throws PersistException { + if (isLocallyDisabled()) { + return null; + } + TriggerStates triggerStates = null; Trigger[] triggers = mTriggers; @@ -336,6 +458,10 @@ public class TriggerManager { @Override public Object beforeTryInsert(S storable) throws PersistException { + if (isLocallyDisabled()) { + return null; + } + TriggerStates triggerStates = null; Trigger[] triggers = mTriggers; @@ -354,6 +480,10 @@ public class TriggerManager { @Override public void afterInsert(S storable, Object state) throws PersistException { + if (isLocallyDisabled()) { + return; + } + TriggerStates triggerStates; Trigger[] triggers; @@ -383,6 +513,10 @@ public class TriggerManager { @Override public void afterTryInsert(S storable, Object state) throws PersistException { + if (isLocallyDisabled()) { + return; + } + TriggerStates triggerStates; Trigger[] triggers; @@ -412,6 +546,10 @@ public class TriggerManager { @Override public void failedInsert(S storable, Object state) { + if (isLocallyDisabled()) { + return; + } + TriggerStates triggerStates; Trigger[] triggers; @@ -453,6 +591,10 @@ public class TriggerManager { private static class ForUpdate extends ForSomething { @Override public Object beforeUpdate(S storable) throws PersistException { + if (isLocallyDisabled()) { + return null; + } + TriggerStates triggerStates = null; Trigger[] triggers = mTriggers; @@ -471,6 +613,10 @@ public class TriggerManager { @Override public Object beforeTryUpdate(S storable) throws PersistException { + if (isLocallyDisabled()) { + return null; + } + TriggerStates triggerStates = null; Trigger[] triggers = mTriggers; @@ -489,6 +635,10 @@ public class TriggerManager { @Override public void afterUpdate(S storable, Object state) throws PersistException { + if (isLocallyDisabled()) { + return; + } + TriggerStates triggerStates; Trigger[] triggers; @@ -518,6 +668,10 @@ public class TriggerManager { @Override public void afterTryUpdate(S storable, Object state) throws PersistException { + if (isLocallyDisabled()) { + return; + } + TriggerStates triggerStates; Trigger[] triggers; @@ -547,6 +701,10 @@ public class TriggerManager { @Override public void failedUpdate(S storable, Object state) { + if (isLocallyDisabled()) { + return; + } + TriggerStates triggerStates; Trigger[] triggers; @@ -588,6 +746,10 @@ public class TriggerManager { private static class ForDelete extends ForSomething { @Override public Object beforeDelete(S storable) throws PersistException { + if (isLocallyDisabled()) { + return null; + } + TriggerStates triggerStates = null; Trigger[] triggers = mTriggers; @@ -606,6 +768,10 @@ public class TriggerManager { @Override public Object beforeTryDelete(S storable) throws PersistException { + if (isLocallyDisabled()) { + return null; + } + TriggerStates triggerStates = null; Trigger[] triggers = mTriggers; @@ -624,6 +790,10 @@ public class TriggerManager { @Override public void afterDelete(S storable, Object state) throws PersistException { + if (isLocallyDisabled()) { + return; + } + TriggerStates triggerStates; Trigger[] triggers; @@ -653,6 +823,10 @@ public class TriggerManager { @Override public void afterTryDelete(S storable, Object state) throws PersistException { + if (isLocallyDisabled()) { + return; + } + TriggerStates triggerStates; Trigger[] triggers; @@ -682,6 +856,10 @@ public class TriggerManager { @Override public void failedDelete(S storable, Object state) { + if (isLocallyDisabled()) { + return; + } + TriggerStates triggerStates; Trigger[] triggers; -- cgit v1.2.3