summaryrefslogtreecommitdiff
path: root/src/main/java/com/amazon/carbonado/repo/replicated
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/com/amazon/carbonado/repo/replicated')
-rw-r--r--src/main/java/com/amazon/carbonado/repo/replicated/ReplicatedRepository.java35
-rw-r--r--src/main/java/com/amazon/carbonado/repo/replicated/ReplicationTrigger.java38
2 files changed, 63 insertions, 10 deletions
diff --git a/src/main/java/com/amazon/carbonado/repo/replicated/ReplicatedRepository.java b/src/main/java/com/amazon/carbonado/repo/replicated/ReplicatedRepository.java
index 55a1710..5aa4243 100644
--- a/src/main/java/com/amazon/carbonado/repo/replicated/ReplicatedRepository.java
+++ b/src/main/java/com/amazon/carbonado/repo/replicated/ReplicatedRepository.java
@@ -366,6 +366,28 @@ class ReplicatedRepository
Object... filterValues)
throws RepositoryException
{
+ resync(type, null, desiredSpeed, filter, filterValues);
+ }
+
+ /**
+ * Repairs replicated storables by synchronizing the replica repository
+ * against the master repository.
+ *
+ * @param type type of storable to re-sync
+ * @param listener optional listener which gets notified as storables are re-sync'd
+ * @param desiredSpeed throttling parameter - 1.0 = full speed, 0.5 = half
+ * speed, 0.1 = one-tenth speed, etc
+ * @param filter optional query filter to limit which objects get re-sync'ed
+ * @param filterValues filter values for optional filter
+ * @since 1.2
+ */
+ public <S extends Storable> void resync(Class<S> type,
+ ResyncCapability.Listener<? super S> listener,
+ double desiredSpeed,
+ String filter,
+ Object... filterValues)
+ throws RepositoryException
+ {
ReplicationTrigger<S> replicationTrigger;
if (storageFor(type) instanceof ReplicatedStorage) {
replicationTrigger = ((ReplicatedStorage) storageFor(type)).getReplicationTrigger();
@@ -422,6 +444,7 @@ class ReplicatedRepository
resync(replicationTrigger,
replicaStorage, replicaQuery,
masterStorage, masterQuery,
+ listener,
throttle, desiredSpeed,
comparator, replicaTxn);
@@ -435,6 +458,7 @@ class ReplicatedRepository
private <S extends Storable> void resync(ReplicationTrigger<S> replicationTrigger,
Storage<S> replicaStorage, Query<S> replicaQuery,
Storage<S> masterStorage, Query<S> masterQuery,
+ ResyncCapability.Listener<? super S> listener,
Throttle throttle, double desiredSpeed,
Comparator comparator, Transaction replicaTxn)
throws RepositoryException
@@ -556,7 +580,8 @@ class ReplicatedRepository
if (compare < 0) {
// Bogus record exists only in replica so delete it.
- resyncTask = prepareResyncTask(replicationTrigger, replicaEntry, null);
+ resyncTask = prepareResyncTask
+ (replicationTrigger, listener, replicaEntry, null);
// Allow replica to advance.
if (replicaCursor == null) {
replicaCursor = replicaQuery.fetchAfter(replicaEntry);
@@ -565,7 +590,8 @@ class ReplicatedRepository
replicaEntry = null;
} else if (compare > 0) {
// Replica cursor is missing an entry so copy it.
- resyncTask = prepareResyncTask(replicationTrigger, null, masterEntry);
+ resyncTask = prepareResyncTask
+ (replicationTrigger, listener, null, masterEntry);
// Allow master to advance.
masterEntry = null;
} else {
@@ -581,7 +607,7 @@ class ReplicatedRepository
if (!replicaEntry.equalProperties(masterEntry)) {
// Replica is stale.
resyncTask = prepareResyncTask
- (replicationTrigger, replicaEntry, masterEntry);
+ (replicationTrigger, listener, replicaEntry, masterEntry);
}
// Entries are synchronized so allow both cursors to advance.
@@ -613,6 +639,7 @@ class ReplicatedRepository
private <S extends Storable> Runnable prepareResyncTask
(final ReplicationTrigger<S> replicationTrigger,
+ final ResyncCapability.Listener<? super S> listener,
final S replicaEntry,
final S masterEntry)
throws RepositoryException
@@ -627,7 +654,7 @@ class ReplicatedRepository
Runnable task = new Runnable() {
public void run() {
try {
- replicationTrigger.resyncEntries(replicaEntry, masterEntry);
+ replicationTrigger.resyncEntries(listener, replicaEntry, masterEntry);
} catch (Exception e) {
LogFactory.getLog(ReplicatedRepository.class).error(null, e);
}
diff --git a/src/main/java/com/amazon/carbonado/repo/replicated/ReplicationTrigger.java b/src/main/java/com/amazon/carbonado/repo/replicated/ReplicationTrigger.java
index bdacca4..8cf7007 100644
--- a/src/main/java/com/amazon/carbonado/repo/replicated/ReplicationTrigger.java
+++ b/src/main/java/com/amazon/carbonado/repo/replicated/ReplicationTrigger.java
@@ -32,6 +32,8 @@ import com.amazon.carbonado.Transaction;
import com.amazon.carbonado.Trigger;
import com.amazon.carbonado.UniqueConstraintException;
+import com.amazon.carbonado.capability.ResyncCapability;
+
import com.amazon.carbonado.spi.RepairExecutor;
import com.amazon.carbonado.spi.TriggerManager;
@@ -198,10 +200,14 @@ class ReplicationTrigger<S extends Storable> extends Trigger<S> {
* Re-sync the replica to the master. The primary keys of both entries are
* assumed to match.
*
+ * @param listener optional
* @param replicaEntry current replica entry, or null if none
* @param masterEntry current master entry, or null if none
*/
- void resyncEntries(S replicaEntry, S masterEntry) throws FetchException, PersistException {
+ void resyncEntries(ResyncCapability.Listener<? super S> listener,
+ S replicaEntry, S masterEntry)
+ throws FetchException, PersistException
+ {
if (replicaEntry == null && masterEntry == null) {
return;
}
@@ -212,6 +218,9 @@ class ReplicationTrigger<S extends Storable> extends Trigger<S> {
try {
Transaction txn = mRepository.enterTransaction();
try {
+ S newReplicaEntry = null;
+
+ // Delete old entry.
if (replicaEntry != null) {
if (masterEntry == null) {
log.info("Deleting bogus replica entry: " + replicaEntry);
@@ -222,11 +231,13 @@ class ReplicationTrigger<S extends Storable> extends Trigger<S> {
log.error("Unable to delete replica entry: " + replicaEntry, e);
if (masterEntry != null) {
// Try to update instead.
- S newReplicaEntry = mReplicaStorage.prepare();
+ newReplicaEntry = mReplicaStorage.prepare();
transferToReplicaEntry(replicaEntry, masterEntry, newReplicaEntry);
log.info("Replacing corrupt replica entry with: " + newReplicaEntry);
try {
newReplicaEntry.update();
+ // This disables the insert step, which is not needed now.
+ masterEntry = null;
} catch (PersistException e2) {
log.error("Unable to update replica entry: " + replicaEntry, e2);
return;
@@ -234,8 +245,10 @@ class ReplicationTrigger<S extends Storable> extends Trigger<S> {
}
}
}
+
+ // Insert new entry.
if (masterEntry != null) {
- S newReplicaEntry = mReplicaStorage.prepare();
+ newReplicaEntry = mReplicaStorage.prepare();
if (replicaEntry == null) {
masterEntry.copyAllProperties(newReplicaEntry);
log.info("Adding missing replica entry: " + newReplicaEntry);
@@ -252,6 +265,19 @@ class ReplicationTrigger<S extends Storable> extends Trigger<S> {
newReplicaEntry.tryInsert();
}
}
+
+ if (listener != null) {
+ if (replicaEntry == null) {
+ if (newReplicaEntry != null) {
+ listener.inserted(newReplicaEntry);
+ }
+ } else if (newReplicaEntry != null) {
+ listener.updated(replicaEntry, newReplicaEntry);
+ } else {
+ listener.deleted(replicaEntry);
+ }
+ }
+
txn.commit();
} finally {
txn.exit();
@@ -319,12 +345,12 @@ class ReplicationTrigger<S extends Storable> extends Trigger<S> {
txn.setForUpdate(true);
if (finalReplica.tryLoad()) {
if (finalMaster.tryLoad()) {
- resyncEntries(finalReplica, finalMaster);
+ resyncEntries(null, finalReplica, finalMaster);
} else {
- resyncEntries(finalReplica, null);
+ resyncEntries(null, finalReplica, null);
}
} else if (finalMaster.tryLoad()) {
- resyncEntries(null, finalMaster);
+ resyncEntries(null, null, finalMaster);
}
txn.commit();
} finally {