diff options
author | Brian S. O'Neill <bronee@gmail.com> | 2007-02-17 01:28:20 +0000 |
---|---|---|
committer | Brian S. O'Neill <bronee@gmail.com> | 2007-02-17 01:28:20 +0000 |
commit | c4954dc33c91c3815dda286b765e7164ec2b3eba (patch) | |
tree | d8d907beff864d371c27a652b77c07f4b91bbf6f /src/main/java/com/amazon/carbonado/spi | |
parent | 431daf9c6e6373f9c272d170b37c74c5b0c84c4f (diff) |
ReplicatedRepository installs user triggers on the replica again, but it now disables all triggers during resync to prevent errors. When triggers were on master, downstream triggers would not see changes made by earlier triggers.
Diffstat (limited to 'src/main/java/com/amazon/carbonado/spi')
-rw-r--r-- | src/main/java/com/amazon/carbonado/spi/TriggerManager.java | 282 |
1 files changed, 230 insertions, 52 deletions
diff --git a/src/main/java/com/amazon/carbonado/spi/TriggerManager.java b/src/main/java/com/amazon/carbonado/spi/TriggerManager.java index 147d1ed..a67b134 100644 --- a/src/main/java/com/amazon/carbonado/spi/TriggerManager.java +++ b/src/main/java/com/amazon/carbonado/spi/TriggerManager.java @@ -23,6 +23,9 @@ import java.lang.reflect.Method; import java.util.ArrayList;
import java.util.Arrays;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+
import com.amazon.carbonado.PersistException;
import com.amazon.carbonado.RepositoryException;
import com.amazon.carbonado.Storable;
@@ -36,7 +39,7 @@ import com.amazon.carbonado.TriggerFactory; *
* @author Brian S O'Neill
*/
-public class TriggerManager<S extends Storable> {
+public class TriggerManager<S extends Storable> extends Trigger<S> {
// Bit masks returned by selectTypes.
private static final int FOR_INSERT = 1;
private static final int FOR_UPDATE = 2;
@@ -91,9 +94,9 @@ public class TriggerManager<S extends Storable> { }
}
- private volatile ForInsert<S> mForInsert;
- private volatile ForUpdate<S> mForUpdate;
- private volatile ForDelete<S> mForDelete;
+ private final ForInsert<S> mForInsert = new ForInsert<S>();
+ private final ForUpdate<S> mForUpdate = new ForUpdate<S>();
+ private final ForDelete<S> mForDelete = new ForDelete<S>();
public TriggerManager() {
}
@@ -111,102 +114,74 @@ public class TriggerManager<S extends Storable> { }
/**
- * Returns consolidated trigger to call for insert operations, or null if
- * none.
+ * Returns a consolidated trigger to call for insert operations, or null if
+ * none. If not null, the consolidated trigger is not a snapshot -- it will
+ * change as the set of triggers in this manager changes.
*/
public Trigger<? super S> getInsertTrigger() {
- return mForInsert;
+ ForInsert<S> forInsert = mForInsert;
+ return forInsert.isEmpty() ? null : forInsert;
}
/**
- * Returns consolidated trigger to call for update operations, or null if
- * none.
+ * Returns a consolidated trigger to call for update operations, or null if
+ * none. If not null, the consolidated trigger is not a snapshot -- it will
+ * change as the set of triggers in this manager changes.
*/
public Trigger<? super S> getUpdateTrigger() {
- return mForUpdate;
+ ForUpdate<S> forUpdate = mForUpdate;
+ return forUpdate.isEmpty() ? null : forUpdate;
}
/**
- * Returns consolidated trigger to call for delete operations, or null if
- * none.
+ * Returns a consolidated trigger to call for delete operations, or null if
+ * none. If not null, the consolidated trigger is not a snapshot -- it will
+ * change as the set of triggers in this manager changes.
*/
public Trigger<? super S> getDeleteTrigger() {
- return mForDelete;
+ ForDelete<S> forDelete = mForDelete;
+ return forDelete.isEmpty() ? null : forDelete;
}
- public synchronized boolean addTrigger(Trigger<? super S> trigger) {
+ public boolean addTrigger(Trigger<? super S> trigger) {
if (trigger == null) {
throw new IllegalArgumentException();
}
int types = selectTypes(trigger);
- if (types == 0) {
- return false;
- }
boolean retValue = false;
if ((types & FOR_INSERT) != 0) {
- if (mForInsert == null) {
- mForInsert = new ForInsert<S>();
- }
retValue |= mForInsert.add(trigger);
}
-
if ((types & FOR_UPDATE) != 0) {
- if (mForUpdate == null) {
- mForUpdate = new ForUpdate<S>();
- }
retValue |= mForUpdate.add(trigger);
}
-
if ((types & FOR_DELETE) != 0) {
- if (mForDelete == null) {
- mForDelete = new ForDelete<S>();
- }
retValue |= mForDelete.add(trigger);
}
return retValue;
}
- public synchronized boolean removeTrigger(Trigger<? super S> trigger) {
+ public boolean removeTrigger(Trigger<? super S> trigger) {
if (trigger == null) {
throw new IllegalArgumentException();
}
int types = selectTypes(trigger);
- if (types == 0) {
- return false;
- }
boolean retValue = false;
if ((types & FOR_INSERT) != 0) {
- if (mForInsert != null && mForInsert.remove(trigger)) {
- retValue = true;
- if (mForInsert.isEmpty()) {
- mForInsert = null;
- }
- }
+ retValue |= mForInsert.remove(trigger);
}
-
if ((types & FOR_UPDATE) != 0) {
- if (mForUpdate != null && mForUpdate.remove(trigger)) {
- retValue = true;
- if (mForUpdate.isEmpty()) {
- mForUpdate = null;
- }
- }
+ retValue |= mForUpdate.remove(trigger);
}
-
if ((types & FOR_DELETE) != 0) {
- if (mForDelete != null && mForDelete.remove(trigger)) {
- retValue = true;
- if (mForDelete.isEmpty()) {
- mForDelete = null;
- }
- }
+ retValue |= mForDelete.remove(trigger);
}
return retValue;
@@ -224,6 +199,102 @@ public class TriggerManager<S extends Storable> { }
/**
+ * Disables execution of all managed triggers for the current thread. Call
+ * localEnable to enable again. This call can be made multiple times, but
+ * be sure to call localEnable the same number of times to fully enable.
+ */
+ public void localDisable() {
+ mForInsert.localDisable();
+ mForUpdate.localDisable();
+ mForDelete.localDisable();
+ }
+
+ /**
+ * Enables execution of all managed triggers for the current thread, if
+ * they had been disabled before.
+ */
+ public void localEnable() {
+ mForInsert.localEnable();
+ mForUpdate.localEnable();
+ mForDelete.localEnable();
+ }
+
+ @Override
+ public Object beforeInsert(S storable) throws PersistException {
+ return mForInsert.beforeInsert(storable);
+ }
+
+ @Override
+ public Object beforeTryInsert(S storable) throws PersistException {
+ return mForInsert.beforeTryInsert(storable);
+ }
+
+ @Override
+ public void afterInsert(S storable, Object state) throws PersistException {
+ mForInsert.afterInsert(storable, state);
+ }
+
+ @Override
+ public void afterTryInsert(S storable, Object state) throws PersistException {
+ mForInsert.afterTryInsert(storable, state);
+ }
+
+ @Override
+ public void failedInsert(S storable, Object state) {
+ mForInsert.failedInsert(storable, state);
+ }
+
+ @Override
+ public Object beforeUpdate(S storable) throws PersistException {
+ return mForUpdate.beforeUpdate(storable);
+ }
+
+ @Override
+ public Object beforeTryUpdate(S storable) throws PersistException {
+ return mForUpdate.beforeTryUpdate(storable);
+ }
+
+ @Override
+ public void afterUpdate(S storable, Object state) throws PersistException {
+ mForUpdate.afterUpdate(storable, state);
+ }
+
+ @Override
+ public void afterTryUpdate(S storable, Object state) throws PersistException {
+ mForUpdate.afterTryUpdate(storable, state);
+ }
+
+ @Override
+ public void failedUpdate(S storable, Object state) {
+ mForUpdate.failedUpdate(storable, state);
+ }
+
+ @Override
+ public Object beforeDelete(S storable) throws PersistException {
+ return mForDelete.beforeDelete(storable);
+ }
+
+ @Override
+ public Object beforeTryDelete(S storable) throws PersistException {
+ return mForDelete.beforeTryDelete(storable);
+ }
+
+ @Override
+ public void afterDelete(S storable, Object state) throws PersistException {
+ mForDelete.afterDelete(storable, state);
+ }
+
+ @Override
+ public void afterTryDelete(S storable, Object state) throws PersistException {
+ mForDelete.afterTryDelete(storable, state);
+ }
+
+ @Override
+ public void failedDelete(S storable, Object state) {
+ mForDelete.failedDelete(storable, state);
+ }
+
+ /**
* Determines which operations the given trigger overrides.
*/
private int selectTypes(Trigger<? super S> trigger) {
@@ -281,10 +352,16 @@ public class TriggerManager<S extends Storable> { }
private static abstract class ForSomething<S> extends Trigger<S> {
+ private static final AtomicReferenceFieldUpdater<ForSomething, ThreadLocal>
+ cDisabledFlagRef = AtomicReferenceFieldUpdater
+ .newUpdater(ForSomething.class, ThreadLocal.class, "mDisabledFlag");
+
private static Trigger[] NO_TRIGGERS = new Trigger[0];
protected volatile Trigger<? super S>[] mTriggers;
+ private volatile ThreadLocal<AtomicInteger> mDisabledFlag;
+
ForSomething() {
mTriggers = NO_TRIGGERS;
}
@@ -313,11 +390,56 @@ public class TriggerManager<S extends Storable> { boolean isEmpty() {
return mTriggers.length == 0;
}
+
+ boolean isLocallyDisabled() {
+ ThreadLocal<AtomicInteger> disabledFlag = mDisabledFlag;
+ if (disabledFlag == null) {
+ return false;
+ }
+ // Count indicates how many times disabled (nested)
+ AtomicInteger i = disabledFlag.get();
+ return i != null && i.get() > 0;
+ }
+
+ void localDisable() {
+ // Using a count allows this method call to be nested.
+ ThreadLocal<AtomicInteger> disabledFlag = disabledFlag();
+ AtomicInteger i = disabledFlag.get();
+ if (i == null) {
+ disabledFlag.set(new AtomicInteger(1));
+ } else {
+ i.incrementAndGet();
+ }
+ }
+
+ void localEnable() {
+ // Using a count allows this method call to be nested.
+ AtomicInteger i = disabledFlag().get();
+ if (i != null) {
+ i.decrementAndGet();
+ }
+ }
+
+ private ThreadLocal<AtomicInteger> disabledFlag() {
+ ThreadLocal<AtomicInteger> disabledFlag = mDisabledFlag;
+ while (disabledFlag == null) {
+ disabledFlag = new ThreadLocal<AtomicInteger>();
+ if (cDisabledFlagRef.compareAndSet(this, null, disabledFlag)) {
+ break;
+ }
+ disabledFlag = mDisabledFlag;
+ }
+ return disabledFlag;
+ }
}
private static class ForInsert<S> extends ForSomething<S> {
@Override
public Object beforeInsert(S storable) throws PersistException {
+ if (isLocallyDisabled()) {
+ return null;
+ }
+
TriggerStates<S> triggerStates = null;
Trigger<? super S>[] triggers = mTriggers;
@@ -336,6 +458,10 @@ public class TriggerManager<S extends Storable> { @Override
public Object beforeTryInsert(S storable) throws PersistException {
+ if (isLocallyDisabled()) {
+ return null;
+ }
+
TriggerStates<S> triggerStates = null;
Trigger<? super S>[] triggers = mTriggers;
@@ -354,6 +480,10 @@ public class TriggerManager<S extends Storable> { @Override
public void afterInsert(S storable, Object state) throws PersistException {
+ if (isLocallyDisabled()) {
+ return;
+ }
+
TriggerStates<S> triggerStates;
Trigger<? super S>[] triggers;
@@ -383,6 +513,10 @@ public class TriggerManager<S extends Storable> { @Override
public void afterTryInsert(S storable, Object state) throws PersistException {
+ if (isLocallyDisabled()) {
+ return;
+ }
+
TriggerStates<S> triggerStates;
Trigger<? super S>[] triggers;
@@ -412,6 +546,10 @@ public class TriggerManager<S extends Storable> { @Override
public void failedInsert(S storable, Object state) {
+ if (isLocallyDisabled()) {
+ return;
+ }
+
TriggerStates<S> triggerStates;
Trigger<? super S>[] triggers;
@@ -453,6 +591,10 @@ public class TriggerManager<S extends Storable> { private static class ForUpdate<S> extends ForSomething<S> {
@Override
public Object beforeUpdate(S storable) throws PersistException {
+ if (isLocallyDisabled()) {
+ return null;
+ }
+
TriggerStates<S> triggerStates = null;
Trigger<? super S>[] triggers = mTriggers;
@@ -471,6 +613,10 @@ public class TriggerManager<S extends Storable> { @Override
public Object beforeTryUpdate(S storable) throws PersistException {
+ if (isLocallyDisabled()) {
+ return null;
+ }
+
TriggerStates<S> triggerStates = null;
Trigger<? super S>[] triggers = mTriggers;
@@ -489,6 +635,10 @@ public class TriggerManager<S extends Storable> { @Override
public void afterUpdate(S storable, Object state) throws PersistException {
+ if (isLocallyDisabled()) {
+ return;
+ }
+
TriggerStates<S> triggerStates;
Trigger<? super S>[] triggers;
@@ -518,6 +668,10 @@ public class TriggerManager<S extends Storable> { @Override
public void afterTryUpdate(S storable, Object state) throws PersistException {
+ if (isLocallyDisabled()) {
+ return;
+ }
+
TriggerStates<S> triggerStates;
Trigger<? super S>[] triggers;
@@ -547,6 +701,10 @@ public class TriggerManager<S extends Storable> { @Override
public void failedUpdate(S storable, Object state) {
+ if (isLocallyDisabled()) {
+ return;
+ }
+
TriggerStates<S> triggerStates;
Trigger<? super S>[] triggers;
@@ -588,6 +746,10 @@ public class TriggerManager<S extends Storable> { private static class ForDelete<S> extends ForSomething<S> {
@Override
public Object beforeDelete(S storable) throws PersistException {
+ if (isLocallyDisabled()) {
+ return null;
+ }
+
TriggerStates<S> triggerStates = null;
Trigger<? super S>[] triggers = mTriggers;
@@ -606,6 +768,10 @@ public class TriggerManager<S extends Storable> { @Override
public Object beforeTryDelete(S storable) throws PersistException {
+ if (isLocallyDisabled()) {
+ return null;
+ }
+
TriggerStates<S> triggerStates = null;
Trigger<? super S>[] triggers = mTriggers;
@@ -624,6 +790,10 @@ public class TriggerManager<S extends Storable> { @Override
public void afterDelete(S storable, Object state) throws PersistException {
+ if (isLocallyDisabled()) {
+ return;
+ }
+
TriggerStates<S> triggerStates;
Trigger<? super S>[] triggers;
@@ -653,6 +823,10 @@ public class TriggerManager<S extends Storable> { @Override
public void afterTryDelete(S storable, Object state) throws PersistException {
+ if (isLocallyDisabled()) {
+ return;
+ }
+
TriggerStates<S> triggerStates;
Trigger<? super S>[] triggers;
@@ -682,6 +856,10 @@ public class TriggerManager<S extends Storable> { @Override
public void failedDelete(S storable, Object state) {
+ if (isLocallyDisabled()) {
+ return;
+ }
+
TriggerStates<S> triggerStates;
Trigger<? super S>[] triggers;
|