summaryrefslogtreecommitdiff
path: root/src/main/java/com/amazon/carbonado/repo/replicated
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/com/amazon/carbonado/repo/replicated')
-rw-r--r--src/main/java/com/amazon/carbonado/repo/replicated/ReplicatedRepository.java26
-rw-r--r--src/main/java/com/amazon/carbonado/repo/replicated/ReplicatedStorage.java29
-rw-r--r--src/main/java/com/amazon/carbonado/repo/replicated/ReplicationTrigger.java62
3 files changed, 48 insertions, 69 deletions
diff --git a/src/main/java/com/amazon/carbonado/repo/replicated/ReplicatedRepository.java b/src/main/java/com/amazon/carbonado/repo/replicated/ReplicatedRepository.java
index c292d9c..3b215bc 100644
--- a/src/main/java/com/amazon/carbonado/repo/replicated/ReplicatedRepository.java
+++ b/src/main/java/com/amazon/carbonado/repo/replicated/ReplicatedRepository.java
@@ -356,9 +356,9 @@ class ReplicatedRepository
Object... filterValues)
throws RepositoryException
{
- ReplicationTrigger<S> trigger;
+ ReplicationTrigger<S> replicationTrigger;
if (storageFor(type) instanceof ReplicatedStorage) {
- trigger = ((ReplicatedStorage) storageFor(type)).getTrigger();
+ replicationTrigger = ((ReplicatedStorage) storageFor(type)).getReplicationTrigger();
} else {
throw new UnsupportedTypeException("Storable type is not replicated", type);
}
@@ -413,7 +413,7 @@ class ReplicatedRepository
try {
replicaTxn.setForUpdate(true);
- resync(trigger,
+ resync(replicationTrigger,
replicaStorage, replicaQuery,
masterStorage, masterQuery,
throttle, desiredSpeed,
@@ -426,7 +426,7 @@ class ReplicatedRepository
}
@SuppressWarnings("unchecked")
- private <S extends Storable> void resync(ReplicationTrigger<S> trigger,
+ private <S extends Storable> void resync(ReplicationTrigger<S> replicationTrigger,
Storage<S> replicaStorage, Query<S> replicaQuery,
Storage<S> masterStorage, Query<S> masterQuery,
Throttle throttle, double desiredSpeed,
@@ -493,7 +493,7 @@ class ReplicatedRepository
if (replicaWithKeyOnly != null) {
// Delete corrupt replica entry.
try {
- trigger.deleteReplica(replicaWithKeyOnly);
+ replicationTrigger.deleteReplica(replicaWithKeyOnly);
log.info("Deleted corrupt replica entry: " +
replicaWithKeyOnly.toStringKeyOnly(), e);
skip = false;
@@ -551,7 +551,7 @@ class ReplicatedRepository
if (compare < 0) {
// Bogus record exists only in replica so delete it.
- resyncTask = prepareResyncTask(trigger, replicaEntry, null);
+ resyncTask = prepareResyncTask(replicationTrigger, replicaEntry, null);
// Allow replica to advance.
if (replicaCursor == null) {
replicaCursor = replicaQuery.fetchAfter(replicaEntry);
@@ -560,7 +560,7 @@ class ReplicatedRepository
replicaEntry = null;
} else if (compare > 0) {
// Replica cursor is missing an entry so copy it.
- resyncTask = prepareResyncTask(trigger, null, masterEntry);
+ resyncTask = prepareResyncTask(replicationTrigger, null, masterEntry);
// Allow master to advance.
lastMasterEntry = masterEntry;
masterEntry = null;
@@ -576,7 +576,8 @@ class ReplicatedRepository
// Both replicaEntry and masterEntry are non-null.
if (!replicaEntry.equalProperties(masterEntry)) {
// Replica is stale.
- resyncTask = prepareResyncTask(trigger, replicaEntry, masterEntry);
+ resyncTask = prepareResyncTask
+ (replicationTrigger, replicaEntry, masterEntry);
}
// Entries are synchronized so allow both cursors to advance.
@@ -607,9 +608,10 @@ class ReplicatedRepository
}
}
- private <S extends Storable> Runnable prepareResyncTask(final ReplicationTrigger<S> trigger,
- final S replicaEntry,
- final S masterEntry)
+ private <S extends Storable> Runnable prepareResyncTask
+ (final ReplicationTrigger<S> replicationTrigger,
+ final S replicaEntry,
+ final S masterEntry)
throws RepositoryException
{
if (replicaEntry == null && masterEntry == null) {
@@ -622,7 +624,7 @@ class ReplicatedRepository
Runnable task = new Runnable() {
public void run() {
try {
- trigger.resyncEntries(replicaEntry, masterEntry);
+ replicationTrigger.resyncEntries(replicaEntry, masterEntry);
} catch (Exception e) {
LogFactory.getLog(ReplicatedRepository.class).error(null, e);
}
diff --git a/src/main/java/com/amazon/carbonado/repo/replicated/ReplicatedStorage.java b/src/main/java/com/amazon/carbonado/repo/replicated/ReplicatedStorage.java
index ebb6db6..9711784 100644
--- a/src/main/java/com/amazon/carbonado/repo/replicated/ReplicatedStorage.java
+++ b/src/main/java/com/amazon/carbonado/repo/replicated/ReplicatedStorage.java
@@ -43,7 +43,7 @@ import com.amazon.carbonado.spi.BelatedStorageCreator;
class ReplicatedStorage<S extends Storable> implements Storage<S> {
final Storage<S> mReplicaStorage;
final Storage<S> mMasterStorage;
- final ReplicationTrigger<S> mTrigger;
+ final ReplicationTrigger<S> mReplicationTrigger;
/**
* @throws UnsupportedTypeException if master doesn't support Storable, but
@@ -63,8 +63,13 @@ class ReplicatedStorage<S extends Storable> implements Storage<S> {
ReplicatedRepositoryBuilder.DEFAULT_RETRY_MILLIS);
mMasterStorage = creator.get(ReplicatedRepositoryBuilder.DEFAULT_MASTER_TIMEOUT_MILLIS);
- mTrigger = new ReplicationTrigger<S>(aRepository, mReplicaStorage, mMasterStorage);
- mReplicaStorage.addTrigger(mTrigger);
+
+ // ReplicationTrigger contains internal TriggerManager, and all other
+ // triggers should register with the ReplicationTrigger. This allows
+ // all triggers to be easily disabled during resync and repairs.
+
+ mReplicationTrigger = new ReplicationTrigger<S>
+ (aRepository, mReplicaStorage, mMasterStorage);
}
/**
@@ -76,8 +81,8 @@ class ReplicatedStorage<S extends Storable> implements Storage<S> {
{
mReplicaStorage = replicaStorage;
mMasterStorage = masterStorage;
- mTrigger = new ReplicationTrigger<S>(aRepository, mReplicaStorage, masterStorage);
- mReplicaStorage.addTrigger(mTrigger);
+ mReplicationTrigger = new ReplicationTrigger<S>
+ (aRepository, mReplicaStorage, masterStorage);
}
public Class<S> getStorableType() {
@@ -100,21 +105,15 @@ class ReplicatedStorage<S extends Storable> implements Storage<S> {
return mReplicaStorage.query(filter);
}
- // Note: All user triggers must be added to the master storage. Otherwise,
- // resync operations can cause the triggers to run again, which can be
- // disastrous. If triggers ever support "after load" events, things get
- // complicated. Perhaps this use case is a good example for why supporting
- // "after load" events might be bad.
-
public boolean addTrigger(Trigger<? super S> trigger) {
- return mMasterStorage.addTrigger(trigger);
+ return mReplicationTrigger.addTrigger(trigger);
}
public boolean removeTrigger(Trigger<? super S> trigger) {
- return mMasterStorage.removeTrigger(trigger);
+ return mReplicationTrigger.removeTrigger(trigger);
}
- ReplicationTrigger<S> getTrigger() {
- return mTrigger;
+ ReplicationTrigger<S> getReplicationTrigger() {
+ return mReplicationTrigger;
}
}
diff --git a/src/main/java/com/amazon/carbonado/repo/replicated/ReplicationTrigger.java b/src/main/java/com/amazon/carbonado/repo/replicated/ReplicationTrigger.java
index 249a779..7f743ac 100644
--- a/src/main/java/com/amazon/carbonado/repo/replicated/ReplicationTrigger.java
+++ b/src/main/java/com/amazon/carbonado/repo/replicated/ReplicationTrigger.java
@@ -18,8 +18,6 @@
package com.amazon.carbonado.repo.replicated;
-import java.util.concurrent.atomic.AtomicInteger;
-
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -36,6 +34,7 @@ import com.amazon.carbonado.Trigger;
import com.amazon.carbonado.UniqueConstraintException;
import com.amazon.carbonado.spi.RepairExecutor;
+import com.amazon.carbonado.spi.TriggerManager;
/**
* All inserts/updates/deletes are first committed to the master storage, then
@@ -49,7 +48,7 @@ class ReplicationTrigger<S extends Storable> extends Trigger<S> {
private final Storage<S> mReplicaStorage;
private final Storage<S> mMasterStorage;
- private final ThreadLocal<AtomicInteger> mDisabled = new ThreadLocal<AtomicInteger>();
+ private final TriggerManager<S> mTriggerManager;
ReplicationTrigger(Repository repository,
Storage<S> replicaStorage,
@@ -58,6 +57,11 @@ class ReplicationTrigger<S extends Storable> extends Trigger<S> {
mRepository = repository;
mReplicaStorage = replicaStorage;
mMasterStorage = masterStorage;
+ // Use TriggerManager to locally disable trigger execution during
+ // resync and repairs.
+ mTriggerManager = new TriggerManager<S>();
+ mTriggerManager.addTrigger(this);
+ replicaStorage.addTrigger(mTriggerManager);
}
@Override
@@ -71,10 +75,6 @@ class ReplicationTrigger<S extends Storable> extends Trigger<S> {
}
private Object beforeInsert(S replica, boolean forTry) throws PersistException {
- if (isReplicationDisabled()) {
- return null;
- }
-
final S master = mMasterStorage.prepare();
replica.copyAllProperties(master);
@@ -129,10 +129,6 @@ class ReplicationTrigger<S extends Storable> extends Trigger<S> {
}
private Object beforeUpdate(S replica, boolean forTry) throws PersistException {
- if (isReplicationDisabled()) {
- return null;
- }
-
final S master = mMasterStorage.prepare();
replica.copyPrimaryKeyProperties(master);
@@ -209,10 +205,6 @@ class ReplicationTrigger<S extends Storable> extends Trigger<S> {
@Override
public Object beforeDelete(S replica) throws PersistException {
- if (isReplicationDisabled()) {
- return null;
- }
-
S master = mMasterStorage.prepare();
replica.copyPrimaryKeyProperties(master);
@@ -373,6 +365,14 @@ class ReplicationTrigger<S extends Storable> extends Trigger<S> {
});
}
+ boolean addTrigger(Trigger<? super S> trigger) {
+ return mTriggerManager.addTrigger(trigger);
+ }
+
+ boolean removeTrigger(Trigger<? super S> trigger) {
+ return mTriggerManager.removeTrigger(trigger);
+ }
+
/**
* Deletes the replica entry with replication disabled.
*/
@@ -401,35 +401,13 @@ class ReplicationTrigger<S extends Storable> extends Trigger<S> {
}
}
- /**
- * Returns true if replication is disabled for the current thread.
- */
- private boolean isReplicationDisabled() {
- // Count indicates how many times disabled (nested)
- AtomicInteger i = mDisabled.get();
- return i != null && i.get() > 0;
- }
-
- /**
- * By default, replication is enabled for the current thread. Pass true to
- * disable during re-sync operations.
- */
- private void setReplicationDisabled(boolean disabled) {
- // Using a count allows this method call to be nested. Based on the
- // current implementation, it should never be nested, so this extra
- // work is just a safeguard.
- AtomicInteger i = mDisabled.get();
+ void setReplicationDisabled(boolean disabled) {
+ // This method disables not only this trigger, but all triggers added
+ // to manager.
if (disabled) {
- if (i == null) {
- i = new AtomicInteger(1);
- mDisabled.set(i);
- } else {
- i.incrementAndGet();
- }
+ mTriggerManager.localDisable();
} else {
- if (i != null) {
- i.decrementAndGet();
- }
+ mTriggerManager.localEnable();
}
}
}