summaryrefslogtreecommitdiff
path: root/src/main/java/com/amazon/carbonado/repo/replicated
diff options
context:
space:
mode:
authorBrian S. O'Neill <bronee@gmail.com>2007-02-17 01:28:20 +0000
committerBrian S. O'Neill <bronee@gmail.com>2007-02-17 01:28:20 +0000
commitc4954dc33c91c3815dda286b765e7164ec2b3eba (patch)
treed8d907beff864d371c27a652b77c07f4b91bbf6f /src/main/java/com/amazon/carbonado/repo/replicated
parent431daf9c6e6373f9c272d170b37c74c5b0c84c4f (diff)
ReplicatedRepository installs user triggers on the replica again, but it now disables all triggers during resync to prevent errors. When triggers were on master, downstream triggers would not see changes made by earlier triggers.
Diffstat (limited to 'src/main/java/com/amazon/carbonado/repo/replicated')
-rw-r--r--src/main/java/com/amazon/carbonado/repo/replicated/ReplicatedRepository.java26
-rw-r--r--src/main/java/com/amazon/carbonado/repo/replicated/ReplicatedStorage.java29
-rw-r--r--src/main/java/com/amazon/carbonado/repo/replicated/ReplicationTrigger.java62
3 files changed, 48 insertions, 69 deletions
diff --git a/src/main/java/com/amazon/carbonado/repo/replicated/ReplicatedRepository.java b/src/main/java/com/amazon/carbonado/repo/replicated/ReplicatedRepository.java
index c292d9c..3b215bc 100644
--- a/src/main/java/com/amazon/carbonado/repo/replicated/ReplicatedRepository.java
+++ b/src/main/java/com/amazon/carbonado/repo/replicated/ReplicatedRepository.java
@@ -356,9 +356,9 @@ class ReplicatedRepository
Object... filterValues)
throws RepositoryException
{
- ReplicationTrigger<S> trigger;
+ ReplicationTrigger<S> replicationTrigger;
if (storageFor(type) instanceof ReplicatedStorage) {
- trigger = ((ReplicatedStorage) storageFor(type)).getTrigger();
+ replicationTrigger = ((ReplicatedStorage) storageFor(type)).getReplicationTrigger();
} else {
throw new UnsupportedTypeException("Storable type is not replicated", type);
}
@@ -413,7 +413,7 @@ class ReplicatedRepository
try {
replicaTxn.setForUpdate(true);
- resync(trigger,
+ resync(replicationTrigger,
replicaStorage, replicaQuery,
masterStorage, masterQuery,
throttle, desiredSpeed,
@@ -426,7 +426,7 @@ class ReplicatedRepository
}
@SuppressWarnings("unchecked")
- private <S extends Storable> void resync(ReplicationTrigger<S> trigger,
+ private <S extends Storable> void resync(ReplicationTrigger<S> replicationTrigger,
Storage<S> replicaStorage, Query<S> replicaQuery,
Storage<S> masterStorage, Query<S> masterQuery,
Throttle throttle, double desiredSpeed,
@@ -493,7 +493,7 @@ class ReplicatedRepository
if (replicaWithKeyOnly != null) {
// Delete corrupt replica entry.
try {
- trigger.deleteReplica(replicaWithKeyOnly);
+ replicationTrigger.deleteReplica(replicaWithKeyOnly);
log.info("Deleted corrupt replica entry: " +
replicaWithKeyOnly.toStringKeyOnly(), e);
skip = false;
@@ -551,7 +551,7 @@ class ReplicatedRepository
if (compare < 0) {
// Bogus record exists only in replica so delete it.
- resyncTask = prepareResyncTask(trigger, replicaEntry, null);
+ resyncTask = prepareResyncTask(replicationTrigger, replicaEntry, null);
// Allow replica to advance.
if (replicaCursor == null) {
replicaCursor = replicaQuery.fetchAfter(replicaEntry);
@@ -560,7 +560,7 @@ class ReplicatedRepository
replicaEntry = null;
} else if (compare > 0) {
// Replica cursor is missing an entry so copy it.
- resyncTask = prepareResyncTask(trigger, null, masterEntry);
+ resyncTask = prepareResyncTask(replicationTrigger, null, masterEntry);
// Allow master to advance.
lastMasterEntry = masterEntry;
masterEntry = null;
@@ -576,7 +576,8 @@ class ReplicatedRepository
// Both replicaEntry and masterEntry are non-null.
if (!replicaEntry.equalProperties(masterEntry)) {
// Replica is stale.
- resyncTask = prepareResyncTask(trigger, replicaEntry, masterEntry);
+ resyncTask = prepareResyncTask
+ (replicationTrigger, replicaEntry, masterEntry);
}
// Entries are synchronized so allow both cursors to advance.
@@ -607,9 +608,10 @@ class ReplicatedRepository
}
}
- private <S extends Storable> Runnable prepareResyncTask(final ReplicationTrigger<S> trigger,
- final S replicaEntry,
- final S masterEntry)
+ private <S extends Storable> Runnable prepareResyncTask
+ (final ReplicationTrigger<S> replicationTrigger,
+ final S replicaEntry,
+ final S masterEntry)
throws RepositoryException
{
if (replicaEntry == null && masterEntry == null) {
@@ -622,7 +624,7 @@ class ReplicatedRepository
Runnable task = new Runnable() {
public void run() {
try {
- trigger.resyncEntries(replicaEntry, masterEntry);
+ replicationTrigger.resyncEntries(replicaEntry, masterEntry);
} catch (Exception e) {
LogFactory.getLog(ReplicatedRepository.class).error(null, e);
}
diff --git a/src/main/java/com/amazon/carbonado/repo/replicated/ReplicatedStorage.java b/src/main/java/com/amazon/carbonado/repo/replicated/ReplicatedStorage.java
index ebb6db6..9711784 100644
--- a/src/main/java/com/amazon/carbonado/repo/replicated/ReplicatedStorage.java
+++ b/src/main/java/com/amazon/carbonado/repo/replicated/ReplicatedStorage.java
@@ -43,7 +43,7 @@ import com.amazon.carbonado.spi.BelatedStorageCreator;
class ReplicatedStorage<S extends Storable> implements Storage<S> {
final Storage<S> mReplicaStorage;
final Storage<S> mMasterStorage;
- final ReplicationTrigger<S> mTrigger;
+ final ReplicationTrigger<S> mReplicationTrigger;
/**
* @throws UnsupportedTypeException if master doesn't support Storable, but
@@ -63,8 +63,13 @@ class ReplicatedStorage<S extends Storable> implements Storage<S> {
ReplicatedRepositoryBuilder.DEFAULT_RETRY_MILLIS);
mMasterStorage = creator.get(ReplicatedRepositoryBuilder.DEFAULT_MASTER_TIMEOUT_MILLIS);
- mTrigger = new ReplicationTrigger<S>(aRepository, mReplicaStorage, mMasterStorage);
- mReplicaStorage.addTrigger(mTrigger);
+
+ // ReplicationTrigger contains internal TriggerManager, and all other
+ // triggers should register with the ReplicationTrigger. This allows
+ // all triggers to be easily disabled during resync and repairs.
+
+ mReplicationTrigger = new ReplicationTrigger<S>
+ (aRepository, mReplicaStorage, mMasterStorage);
}
/**
@@ -76,8 +81,8 @@ class ReplicatedStorage<S extends Storable> implements Storage<S> {
{
mReplicaStorage = replicaStorage;
mMasterStorage = masterStorage;
- mTrigger = new ReplicationTrigger<S>(aRepository, mReplicaStorage, masterStorage);
- mReplicaStorage.addTrigger(mTrigger);
+ mReplicationTrigger = new ReplicationTrigger<S>
+ (aRepository, mReplicaStorage, masterStorage);
}
public Class<S> getStorableType() {
@@ -100,21 +105,15 @@ class ReplicatedStorage<S extends Storable> implements Storage<S> {
return mReplicaStorage.query(filter);
}
- // Note: All user triggers must be added to the master storage. Otherwise,
- // resync operations can cause the triggers to run again, which can be
- // disastrous. If triggers ever support "after load" events, things get
- // complicated. Perhaps this use case is a good example for why supporting
- // "after load" events might be bad.
-
public boolean addTrigger(Trigger<? super S> trigger) {
- return mMasterStorage.addTrigger(trigger);
+ return mReplicationTrigger.addTrigger(trigger);
}
public boolean removeTrigger(Trigger<? super S> trigger) {
- return mMasterStorage.removeTrigger(trigger);
+ return mReplicationTrigger.removeTrigger(trigger);
}
- ReplicationTrigger<S> getTrigger() {
- return mTrigger;
+ ReplicationTrigger<S> getReplicationTrigger() {
+ return mReplicationTrigger;
}
}
diff --git a/src/main/java/com/amazon/carbonado/repo/replicated/ReplicationTrigger.java b/src/main/java/com/amazon/carbonado/repo/replicated/ReplicationTrigger.java
index 249a779..7f743ac 100644
--- a/src/main/java/com/amazon/carbonado/repo/replicated/ReplicationTrigger.java
+++ b/src/main/java/com/amazon/carbonado/repo/replicated/ReplicationTrigger.java
@@ -18,8 +18,6 @@
package com.amazon.carbonado.repo.replicated;
-import java.util.concurrent.atomic.AtomicInteger;
-
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -36,6 +34,7 @@ import com.amazon.carbonado.Trigger;
import com.amazon.carbonado.UniqueConstraintException;
import com.amazon.carbonado.spi.RepairExecutor;
+import com.amazon.carbonado.spi.TriggerManager;
/**
* All inserts/updates/deletes are first committed to the master storage, then
@@ -49,7 +48,7 @@ class ReplicationTrigger<S extends Storable> extends Trigger<S> {
private final Storage<S> mReplicaStorage;
private final Storage<S> mMasterStorage;
- private final ThreadLocal<AtomicInteger> mDisabled = new ThreadLocal<AtomicInteger>();
+ private final TriggerManager<S> mTriggerManager;
ReplicationTrigger(Repository repository,
Storage<S> replicaStorage,
@@ -58,6 +57,11 @@ class ReplicationTrigger<S extends Storable> extends Trigger<S> {
mRepository = repository;
mReplicaStorage = replicaStorage;
mMasterStorage = masterStorage;
+ // Use TriggerManager to locally disable trigger execution during
+ // resync and repairs.
+ mTriggerManager = new TriggerManager<S>();
+ mTriggerManager.addTrigger(this);
+ replicaStorage.addTrigger(mTriggerManager);
}
@Override
@@ -71,10 +75,6 @@ class ReplicationTrigger<S extends Storable> extends Trigger<S> {
}
private Object beforeInsert(S replica, boolean forTry) throws PersistException {
- if (isReplicationDisabled()) {
- return null;
- }
-
final S master = mMasterStorage.prepare();
replica.copyAllProperties(master);
@@ -129,10 +129,6 @@ class ReplicationTrigger<S extends Storable> extends Trigger<S> {
}
private Object beforeUpdate(S replica, boolean forTry) throws PersistException {
- if (isReplicationDisabled()) {
- return null;
- }
-
final S master = mMasterStorage.prepare();
replica.copyPrimaryKeyProperties(master);
@@ -209,10 +205,6 @@ class ReplicationTrigger<S extends Storable> extends Trigger<S> {
@Override
public Object beforeDelete(S replica) throws PersistException {
- if (isReplicationDisabled()) {
- return null;
- }
-
S master = mMasterStorage.prepare();
replica.copyPrimaryKeyProperties(master);
@@ -373,6 +365,14 @@ class ReplicationTrigger<S extends Storable> extends Trigger<S> {
});
}
+ boolean addTrigger(Trigger<? super S> trigger) {
+ return mTriggerManager.addTrigger(trigger);
+ }
+
+ boolean removeTrigger(Trigger<? super S> trigger) {
+ return mTriggerManager.removeTrigger(trigger);
+ }
+
/**
* Deletes the replica entry with replication disabled.
*/
@@ -401,35 +401,13 @@ class ReplicationTrigger<S extends Storable> extends Trigger<S> {
}
}
- /**
- * Returns true if replication is disabled for the current thread.
- */
- private boolean isReplicationDisabled() {
- // Count indicates how many times disabled (nested)
- AtomicInteger i = mDisabled.get();
- return i != null && i.get() > 0;
- }
-
- /**
- * By default, replication is enabled for the current thread. Pass true to
- * disable during re-sync operations.
- */
- private void setReplicationDisabled(boolean disabled) {
- // Using a count allows this method call to be nested. Based on the
- // current implementation, it should never be nested, so this extra
- // work is just a safeguard.
- AtomicInteger i = mDisabled.get();
+ void setReplicationDisabled(boolean disabled) {
+ // This method disables not only this trigger, but all triggers added
+ // to manager.
if (disabled) {
- if (i == null) {
- i = new AtomicInteger(1);
- mDisabled.set(i);
- } else {
- i.incrementAndGet();
- }
+ mTriggerManager.localDisable();
} else {
- if (i != null) {
- i.decrementAndGet();
- }
+ mTriggerManager.localEnable();
}
}
}