diff options
Diffstat (limited to 'src/main/java/com/amazon/carbonado')
3 files changed, 118 insertions, 10 deletions
diff --git a/src/main/java/com/amazon/carbonado/capability/ResyncCapability.java b/src/main/java/com/amazon/carbonado/capability/ResyncCapability.java index 502fc5c..337afb0 100644 --- a/src/main/java/com/amazon/carbonado/capability/ResyncCapability.java +++ b/src/main/java/com/amazon/carbonado/capability/ResyncCapability.java @@ -46,8 +46,63 @@ public interface ResyncCapability extends Capability { throws RepositoryException;
/**
+ * Re-synchronizes replicated storables against the master repository.
+ *
+ * @param type type of storable to re-sync
+ * @param listener optional listener which gets notified as storables are re-sync'd
+ * @param desiredSpeed throttling parameter - 1.0 = full speed, 0.5 = half
+ * speed, 0.1 = one-tenth speed, etc
+ * @param filter optional query filter to limit which objects get re-sync'ed
+ * @param filterValues filter values for optional filter
+ * @since 1.2
+ */
+ <S extends Storable> void resync(Class<S> type,
+ Listener<? super S> listener,
+ double desiredSpeed,
+ String filter,
+ Object... filterValues)
+ throws RepositoryException;
+
+ /**
* Returns the immediate master Repository, for manual comparison. Direct
* updates to the master will likely create inconsistencies.
*/
Repository getMasterRepository();
+
+ /**
+ * Defines callbacks which are invoked as storables get re-sync'd. The
+ * callback is invoked in the scope of the resync transaction. If any
+ * exception is thrown, the immediate changes are rolled back and the
+ * entire repository resync operation is aborted.
+ *
+ * <p>The listener implementation should return quickly from the callback
+ * methods, to avoid lingering transactions. If the listener is used to
+ * invoke special repair operations, they should be placed into a task
+ * queue. A separate thread can then perform the repairs outside the resync
+ * transaction.
+ */
+ public static interface Listener<S> {
+ /**
+ * Called when a storable was inserted as part of a resync.
+ *
+ * @param newStorable storable which was inserted, never null
+ */
+ void inserted(S newStorable);
+
+ /**
+ * Called when a storable was updated as part of a resync. Both old and
+ * new storables have a matching primary key.
+ *
+ * @param oldStorable storable which was deleted, never null
+ * @param newStorable storable which was inserted, never null
+ */
+ void updated(S oldStorable, S newStorable);
+
+ /**
+ * Called when a storable was deleted as part of a resync.
+ *
+ * @param oldStorable storable which was deleted, never null
+ */
+ void deleted(S oldStorable);
+ }
}
diff --git a/src/main/java/com/amazon/carbonado/repo/replicated/ReplicatedRepository.java b/src/main/java/com/amazon/carbonado/repo/replicated/ReplicatedRepository.java index 55a1710..5aa4243 100644 --- a/src/main/java/com/amazon/carbonado/repo/replicated/ReplicatedRepository.java +++ b/src/main/java/com/amazon/carbonado/repo/replicated/ReplicatedRepository.java @@ -366,6 +366,28 @@ class ReplicatedRepository Object... filterValues)
throws RepositoryException
{
+ resync(type, null, desiredSpeed, filter, filterValues);
+ }
+
+ /**
+ * Repairs replicated storables by synchronizing the replica repository
+ * against the master repository.
+ *
+ * @param type type of storable to re-sync
+ * @param listener optional listener which gets notified as storables are re-sync'd
+ * @param desiredSpeed throttling parameter - 1.0 = full speed, 0.5 = half
+ * speed, 0.1 = one-tenth speed, etc
+ * @param filter optional query filter to limit which objects get re-sync'ed
+ * @param filterValues filter values for optional filter
+ * @since 1.2
+ */
+ public <S extends Storable> void resync(Class<S> type,
+ ResyncCapability.Listener<? super S> listener,
+ double desiredSpeed,
+ String filter,
+ Object... filterValues)
+ throws RepositoryException
+ {
ReplicationTrigger<S> replicationTrigger;
if (storageFor(type) instanceof ReplicatedStorage) {
replicationTrigger = ((ReplicatedStorage) storageFor(type)).getReplicationTrigger();
@@ -422,6 +444,7 @@ class ReplicatedRepository resync(replicationTrigger,
replicaStorage, replicaQuery,
masterStorage, masterQuery,
+ listener,
throttle, desiredSpeed,
comparator, replicaTxn);
@@ -435,6 +458,7 @@ class ReplicatedRepository private <S extends Storable> void resync(ReplicationTrigger<S> replicationTrigger,
Storage<S> replicaStorage, Query<S> replicaQuery,
Storage<S> masterStorage, Query<S> masterQuery,
+ ResyncCapability.Listener<? super S> listener,
Throttle throttle, double desiredSpeed,
Comparator comparator, Transaction replicaTxn)
throws RepositoryException
@@ -556,7 +580,8 @@ class ReplicatedRepository if (compare < 0) {
// Bogus record exists only in replica so delete it.
- resyncTask = prepareResyncTask(replicationTrigger, replicaEntry, null);
+ resyncTask = prepareResyncTask
+ (replicationTrigger, listener, replicaEntry, null);
// Allow replica to advance.
if (replicaCursor == null) {
replicaCursor = replicaQuery.fetchAfter(replicaEntry);
@@ -565,7 +590,8 @@ class ReplicatedRepository replicaEntry = null;
} else if (compare > 0) {
// Replica cursor is missing an entry so copy it.
- resyncTask = prepareResyncTask(replicationTrigger, null, masterEntry);
+ resyncTask = prepareResyncTask
+ (replicationTrigger, listener, null, masterEntry);
// Allow master to advance.
masterEntry = null;
} else {
@@ -581,7 +607,7 @@ class ReplicatedRepository if (!replicaEntry.equalProperties(masterEntry)) {
// Replica is stale.
resyncTask = prepareResyncTask
- (replicationTrigger, replicaEntry, masterEntry);
+ (replicationTrigger, listener, replicaEntry, masterEntry);
}
// Entries are synchronized so allow both cursors to advance.
@@ -613,6 +639,7 @@ class ReplicatedRepository private <S extends Storable> Runnable prepareResyncTask
(final ReplicationTrigger<S> replicationTrigger,
+ final ResyncCapability.Listener<? super S> listener,
final S replicaEntry,
final S masterEntry)
throws RepositoryException
@@ -627,7 +654,7 @@ class ReplicatedRepository Runnable task = new Runnable() {
public void run() {
try {
- replicationTrigger.resyncEntries(replicaEntry, masterEntry);
+ replicationTrigger.resyncEntries(listener, replicaEntry, masterEntry);
} catch (Exception e) {
LogFactory.getLog(ReplicatedRepository.class).error(null, e);
}
diff --git a/src/main/java/com/amazon/carbonado/repo/replicated/ReplicationTrigger.java b/src/main/java/com/amazon/carbonado/repo/replicated/ReplicationTrigger.java index bdacca4..8cf7007 100644 --- a/src/main/java/com/amazon/carbonado/repo/replicated/ReplicationTrigger.java +++ b/src/main/java/com/amazon/carbonado/repo/replicated/ReplicationTrigger.java @@ -32,6 +32,8 @@ import com.amazon.carbonado.Transaction; import com.amazon.carbonado.Trigger;
import com.amazon.carbonado.UniqueConstraintException;
+import com.amazon.carbonado.capability.ResyncCapability;
+
import com.amazon.carbonado.spi.RepairExecutor;
import com.amazon.carbonado.spi.TriggerManager;
@@ -198,10 +200,14 @@ class ReplicationTrigger<S extends Storable> extends Trigger<S> { * Re-sync the replica to the master. The primary keys of both entries are
* assumed to match.
*
+ * @param listener optional
* @param replicaEntry current replica entry, or null if none
* @param masterEntry current master entry, or null if none
*/
- void resyncEntries(S replicaEntry, S masterEntry) throws FetchException, PersistException {
+ void resyncEntries(ResyncCapability.Listener<? super S> listener,
+ S replicaEntry, S masterEntry)
+ throws FetchException, PersistException
+ {
if (replicaEntry == null && masterEntry == null) {
return;
}
@@ -212,6 +218,9 @@ class ReplicationTrigger<S extends Storable> extends Trigger<S> { try {
Transaction txn = mRepository.enterTransaction();
try {
+ S newReplicaEntry = null;
+
+ // Delete old entry.
if (replicaEntry != null) {
if (masterEntry == null) {
log.info("Deleting bogus replica entry: " + replicaEntry);
@@ -222,11 +231,13 @@ class ReplicationTrigger<S extends Storable> extends Trigger<S> { log.error("Unable to delete replica entry: " + replicaEntry, e);
if (masterEntry != null) {
// Try to update instead.
- S newReplicaEntry = mReplicaStorage.prepare();
+ newReplicaEntry = mReplicaStorage.prepare();
transferToReplicaEntry(replicaEntry, masterEntry, newReplicaEntry);
log.info("Replacing corrupt replica entry with: " + newReplicaEntry);
try {
newReplicaEntry.update();
+ // This disables the insert step, which is not needed now.
+ masterEntry = null;
} catch (PersistException e2) {
log.error("Unable to update replica entry: " + replicaEntry, e2);
return;
@@ -234,8 +245,10 @@ class ReplicationTrigger<S extends Storable> extends Trigger<S> { }
}
}
+
+ // Insert new entry.
if (masterEntry != null) {
- S newReplicaEntry = mReplicaStorage.prepare();
+ newReplicaEntry = mReplicaStorage.prepare();
if (replicaEntry == null) {
masterEntry.copyAllProperties(newReplicaEntry);
log.info("Adding missing replica entry: " + newReplicaEntry);
@@ -252,6 +265,19 @@ class ReplicationTrigger<S extends Storable> extends Trigger<S> { newReplicaEntry.tryInsert();
}
}
+
+ if (listener != null) {
+ if (replicaEntry == null) {
+ if (newReplicaEntry != null) {
+ listener.inserted(newReplicaEntry);
+ }
+ } else if (newReplicaEntry != null) {
+ listener.updated(replicaEntry, newReplicaEntry);
+ } else {
+ listener.deleted(replicaEntry);
+ }
+ }
+
txn.commit();
} finally {
txn.exit();
@@ -319,12 +345,12 @@ class ReplicationTrigger<S extends Storable> extends Trigger<S> { txn.setForUpdate(true);
if (finalReplica.tryLoad()) {
if (finalMaster.tryLoad()) {
- resyncEntries(finalReplica, finalMaster);
+ resyncEntries(null, finalReplica, finalMaster);
} else {
- resyncEntries(finalReplica, null);
+ resyncEntries(null, finalReplica, null);
}
} else if (finalMaster.tryLoad()) {
- resyncEntries(null, finalMaster);
+ resyncEntries(null, null, finalMaster);
}
txn.commit();
} finally {
|