From c4954dc33c91c3815dda286b765e7164ec2b3eba Mon Sep 17 00:00:00 2001 From: "Brian S. O'Neill" Date: Sat, 17 Feb 2007 01:28:20 +0000 Subject: ReplicatedRepository installs user triggers on the replica again, but it now disables all triggers during resync to prevent errors. When triggers were on master, downstream triggers would not see changes made by earlier triggers. --- .../com/amazon/carbonado/spi/TriggerManager.java | 282 +++++++++++++++++---- 1 file changed, 230 insertions(+), 52 deletions(-) (limited to 'src/main/java/com/amazon/carbonado/spi') diff --git a/src/main/java/com/amazon/carbonado/spi/TriggerManager.java b/src/main/java/com/amazon/carbonado/spi/TriggerManager.java index 147d1ed..a67b134 100644 --- a/src/main/java/com/amazon/carbonado/spi/TriggerManager.java +++ b/src/main/java/com/amazon/carbonado/spi/TriggerManager.java @@ -23,6 +23,9 @@ import java.lang.reflect.Method; import java.util.ArrayList; import java.util.Arrays; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; + import com.amazon.carbonado.PersistException; import com.amazon.carbonado.RepositoryException; import com.amazon.carbonado.Storable; @@ -36,7 +39,7 @@ import com.amazon.carbonado.TriggerFactory; * * @author Brian S O'Neill */ -public class TriggerManager { +public class TriggerManager extends Trigger { // Bit masks returned by selectTypes. private static final int FOR_INSERT = 1; private static final int FOR_UPDATE = 2; @@ -91,9 +94,9 @@ public class TriggerManager { } } - private volatile ForInsert mForInsert; - private volatile ForUpdate mForUpdate; - private volatile ForDelete mForDelete; + private final ForInsert mForInsert = new ForInsert(); + private final ForUpdate mForUpdate = new ForUpdate(); + private final ForDelete mForDelete = new ForDelete(); public TriggerManager() { } @@ -111,102 +114,74 @@ public class TriggerManager { } /** - * Returns consolidated trigger to call for insert operations, or null if - * none. + * Returns a consolidated trigger to call for insert operations, or null if + * none. If not null, the consolidated trigger is not a snapshot -- it will + * change as the set of triggers in this manager changes. */ public Trigger getInsertTrigger() { - return mForInsert; + ForInsert forInsert = mForInsert; + return forInsert.isEmpty() ? null : forInsert; } /** - * Returns consolidated trigger to call for update operations, or null if - * none. + * Returns a consolidated trigger to call for update operations, or null if + * none. If not null, the consolidated trigger is not a snapshot -- it will + * change as the set of triggers in this manager changes. */ public Trigger getUpdateTrigger() { - return mForUpdate; + ForUpdate forUpdate = mForUpdate; + return forUpdate.isEmpty() ? null : forUpdate; } /** - * Returns consolidated trigger to call for delete operations, or null if - * none. + * Returns a consolidated trigger to call for delete operations, or null if + * none. If not null, the consolidated trigger is not a snapshot -- it will + * change as the set of triggers in this manager changes. */ public Trigger getDeleteTrigger() { - return mForDelete; + ForDelete forDelete = mForDelete; + return forDelete.isEmpty() ? null : forDelete; } - public synchronized boolean addTrigger(Trigger trigger) { + public boolean addTrigger(Trigger trigger) { if (trigger == null) { throw new IllegalArgumentException(); } int types = selectTypes(trigger); - if (types == 0) { - return false; - } boolean retValue = false; if ((types & FOR_INSERT) != 0) { - if (mForInsert == null) { - mForInsert = new ForInsert(); - } retValue |= mForInsert.add(trigger); } - if ((types & FOR_UPDATE) != 0) { - if (mForUpdate == null) { - mForUpdate = new ForUpdate(); - } retValue |= mForUpdate.add(trigger); } - if ((types & FOR_DELETE) != 0) { - if (mForDelete == null) { - mForDelete = new ForDelete(); - } retValue |= mForDelete.add(trigger); } return retValue; } - public synchronized boolean removeTrigger(Trigger trigger) { + public boolean removeTrigger(Trigger trigger) { if (trigger == null) { throw new IllegalArgumentException(); } int types = selectTypes(trigger); - if (types == 0) { - return false; - } boolean retValue = false; if ((types & FOR_INSERT) != 0) { - if (mForInsert != null && mForInsert.remove(trigger)) { - retValue = true; - if (mForInsert.isEmpty()) { - mForInsert = null; - } - } + retValue |= mForInsert.remove(trigger); } - if ((types & FOR_UPDATE) != 0) { - if (mForUpdate != null && mForUpdate.remove(trigger)) { - retValue = true; - if (mForUpdate.isEmpty()) { - mForUpdate = null; - } - } + retValue |= mForUpdate.remove(trigger); } - if ((types & FOR_DELETE) != 0) { - if (mForDelete != null && mForDelete.remove(trigger)) { - retValue = true; - if (mForDelete.isEmpty()) { - mForDelete = null; - } - } + retValue |= mForDelete.remove(trigger); } return retValue; @@ -223,6 +198,102 @@ public class TriggerManager { } } + /** + * Disables execution of all managed triggers for the current thread. Call + * localEnable to enable again. This call can be made multiple times, but + * be sure to call localEnable the same number of times to fully enable. + */ + public void localDisable() { + mForInsert.localDisable(); + mForUpdate.localDisable(); + mForDelete.localDisable(); + } + + /** + * Enables execution of all managed triggers for the current thread, if + * they had been disabled before. + */ + public void localEnable() { + mForInsert.localEnable(); + mForUpdate.localEnable(); + mForDelete.localEnable(); + } + + @Override + public Object beforeInsert(S storable) throws PersistException { + return mForInsert.beforeInsert(storable); + } + + @Override + public Object beforeTryInsert(S storable) throws PersistException { + return mForInsert.beforeTryInsert(storable); + } + + @Override + public void afterInsert(S storable, Object state) throws PersistException { + mForInsert.afterInsert(storable, state); + } + + @Override + public void afterTryInsert(S storable, Object state) throws PersistException { + mForInsert.afterTryInsert(storable, state); + } + + @Override + public void failedInsert(S storable, Object state) { + mForInsert.failedInsert(storable, state); + } + + @Override + public Object beforeUpdate(S storable) throws PersistException { + return mForUpdate.beforeUpdate(storable); + } + + @Override + public Object beforeTryUpdate(S storable) throws PersistException { + return mForUpdate.beforeTryUpdate(storable); + } + + @Override + public void afterUpdate(S storable, Object state) throws PersistException { + mForUpdate.afterUpdate(storable, state); + } + + @Override + public void afterTryUpdate(S storable, Object state) throws PersistException { + mForUpdate.afterTryUpdate(storable, state); + } + + @Override + public void failedUpdate(S storable, Object state) { + mForUpdate.failedUpdate(storable, state); + } + + @Override + public Object beforeDelete(S storable) throws PersistException { + return mForDelete.beforeDelete(storable); + } + + @Override + public Object beforeTryDelete(S storable) throws PersistException { + return mForDelete.beforeTryDelete(storable); + } + + @Override + public void afterDelete(S storable, Object state) throws PersistException { + mForDelete.afterDelete(storable, state); + } + + @Override + public void afterTryDelete(S storable, Object state) throws PersistException { + mForDelete.afterTryDelete(storable, state); + } + + @Override + public void failedDelete(S storable, Object state) { + mForDelete.failedDelete(storable, state); + } + /** * Determines which operations the given trigger overrides. */ @@ -281,10 +352,16 @@ public class TriggerManager { } private static abstract class ForSomething extends Trigger { + private static final AtomicReferenceFieldUpdater + cDisabledFlagRef = AtomicReferenceFieldUpdater + .newUpdater(ForSomething.class, ThreadLocal.class, "mDisabledFlag"); + private static Trigger[] NO_TRIGGERS = new Trigger[0]; protected volatile Trigger[] mTriggers; + private volatile ThreadLocal mDisabledFlag; + ForSomething() { mTriggers = NO_TRIGGERS; } @@ -313,11 +390,56 @@ public class TriggerManager { boolean isEmpty() { return mTriggers.length == 0; } + + boolean isLocallyDisabled() { + ThreadLocal disabledFlag = mDisabledFlag; + if (disabledFlag == null) { + return false; + } + // Count indicates how many times disabled (nested) + AtomicInteger i = disabledFlag.get(); + return i != null && i.get() > 0; + } + + void localDisable() { + // Using a count allows this method call to be nested. + ThreadLocal disabledFlag = disabledFlag(); + AtomicInteger i = disabledFlag.get(); + if (i == null) { + disabledFlag.set(new AtomicInteger(1)); + } else { + i.incrementAndGet(); + } + } + + void localEnable() { + // Using a count allows this method call to be nested. + AtomicInteger i = disabledFlag().get(); + if (i != null) { + i.decrementAndGet(); + } + } + + private ThreadLocal disabledFlag() { + ThreadLocal disabledFlag = mDisabledFlag; + while (disabledFlag == null) { + disabledFlag = new ThreadLocal(); + if (cDisabledFlagRef.compareAndSet(this, null, disabledFlag)) { + break; + } + disabledFlag = mDisabledFlag; + } + return disabledFlag; + } } private static class ForInsert extends ForSomething { @Override public Object beforeInsert(S storable) throws PersistException { + if (isLocallyDisabled()) { + return null; + } + TriggerStates triggerStates = null; Trigger[] triggers = mTriggers; @@ -336,6 +458,10 @@ public class TriggerManager { @Override public Object beforeTryInsert(S storable) throws PersistException { + if (isLocallyDisabled()) { + return null; + } + TriggerStates triggerStates = null; Trigger[] triggers = mTriggers; @@ -354,6 +480,10 @@ public class TriggerManager { @Override public void afterInsert(S storable, Object state) throws PersistException { + if (isLocallyDisabled()) { + return; + } + TriggerStates triggerStates; Trigger[] triggers; @@ -383,6 +513,10 @@ public class TriggerManager { @Override public void afterTryInsert(S storable, Object state) throws PersistException { + if (isLocallyDisabled()) { + return; + } + TriggerStates triggerStates; Trigger[] triggers; @@ -412,6 +546,10 @@ public class TriggerManager { @Override public void failedInsert(S storable, Object state) { + if (isLocallyDisabled()) { + return; + } + TriggerStates triggerStates; Trigger[] triggers; @@ -453,6 +591,10 @@ public class TriggerManager { private static class ForUpdate extends ForSomething { @Override public Object beforeUpdate(S storable) throws PersistException { + if (isLocallyDisabled()) { + return null; + } + TriggerStates triggerStates = null; Trigger[] triggers = mTriggers; @@ -471,6 +613,10 @@ public class TriggerManager { @Override public Object beforeTryUpdate(S storable) throws PersistException { + if (isLocallyDisabled()) { + return null; + } + TriggerStates triggerStates = null; Trigger[] triggers = mTriggers; @@ -489,6 +635,10 @@ public class TriggerManager { @Override public void afterUpdate(S storable, Object state) throws PersistException { + if (isLocallyDisabled()) { + return; + } + TriggerStates triggerStates; Trigger[] triggers; @@ -518,6 +668,10 @@ public class TriggerManager { @Override public void afterTryUpdate(S storable, Object state) throws PersistException { + if (isLocallyDisabled()) { + return; + } + TriggerStates triggerStates; Trigger[] triggers; @@ -547,6 +701,10 @@ public class TriggerManager { @Override public void failedUpdate(S storable, Object state) { + if (isLocallyDisabled()) { + return; + } + TriggerStates triggerStates; Trigger[] triggers; @@ -588,6 +746,10 @@ public class TriggerManager { private static class ForDelete extends ForSomething { @Override public Object beforeDelete(S storable) throws PersistException { + if (isLocallyDisabled()) { + return null; + } + TriggerStates triggerStates = null; Trigger[] triggers = mTriggers; @@ -606,6 +768,10 @@ public class TriggerManager { @Override public Object beforeTryDelete(S storable) throws PersistException { + if (isLocallyDisabled()) { + return null; + } + TriggerStates triggerStates = null; Trigger[] triggers = mTriggers; @@ -624,6 +790,10 @@ public class TriggerManager { @Override public void afterDelete(S storable, Object state) throws PersistException { + if (isLocallyDisabled()) { + return; + } + TriggerStates triggerStates; Trigger[] triggers; @@ -653,6 +823,10 @@ public class TriggerManager { @Override public void afterTryDelete(S storable, Object state) throws PersistException { + if (isLocallyDisabled()) { + return; + } + TriggerStates triggerStates; Trigger[] triggers; @@ -682,6 +856,10 @@ public class TriggerManager { @Override public void failedDelete(S storable, Object state) { + if (isLocallyDisabled()) { + return; + } + TriggerStates triggerStates; Trigger[] triggers; -- cgit v1.2.3