diff options
Diffstat (limited to 'src/main/java/com/amazon/carbonado')
4 files changed, 106 insertions, 31 deletions
diff --git a/src/main/java/com/amazon/carbonado/layout/Layout.java b/src/main/java/com/amazon/carbonado/layout/Layout.java index 8b8fc0b..f784e84 100644 --- a/src/main/java/com/amazon/carbonado/layout/Layout.java +++ b/src/main/java/com/amazon/carbonado/layout/Layout.java @@ -28,6 +28,8 @@ import java.util.Map; import org.joda.time.DateTime;
+import org.apache.commons.logging.LogFactory;
+
import org.cojen.util.SoftValuedHashMap;
import com.amazon.carbonado.Cursor;
@@ -35,6 +37,7 @@ import com.amazon.carbonado.FetchException; import com.amazon.carbonado.FetchNoneException;
import com.amazon.carbonado.PersistException;
import com.amazon.carbonado.Query;
+import com.amazon.carbonado.RepositoryException;
import com.amazon.carbonado.Storable;
import com.amazon.carbonado.SupportException;
import com.amazon.carbonado.info.StorableInfo;
@@ -44,6 +47,8 @@ import com.amazon.carbonado.synthetic.SyntheticProperty; import com.amazon.carbonado.synthetic.SyntheticStorableBuilder;
import com.amazon.carbonado.util.AnnotationDescPrinter;
+import com.amazon.carbonado.capability.ResyncCapability;
+
/**
* Describes the layout of a specific generation of a storable.
*
@@ -265,10 +270,48 @@ public class Layout { * @throws FetchNoneException if generation not found
*/
public Layout getGeneration(int generation) throws FetchNoneException, FetchException {
- StoredLayout storedLayout = mLayoutFactory.mLayoutStorage
- .query("storableTypeName = ? & generation = ?")
- .with(getStorableTypeName()).with(generation)
- .loadOne();
+ final String filter = "storableTypeName = ? & generation = ?";
+
+ StoredLayout storedLayout;
+ try {
+ storedLayout = mLayoutFactory.mLayoutStorage
+ .query(filter)
+ .with(getStorableTypeName()).with(generation)
+ .loadOne();
+ } catch (FetchNoneException e) {
+ // Try to resync with a master.
+ ResyncCapability cap =
+ mLayoutFactory.mRepository.getCapability(ResyncCapability.class);
+
+ if (cap == null) {
+ throw e;
+ }
+
+ try {
+ cap.resync(mLayoutFactory.mLayoutStorage.getStorableType(), 1.0,
+ filter, getStorableTypeName(), generation);
+ } catch (RepositoryException e2) {
+ // Uh oh, double trouble. Log this one and throw original exception.
+ LogFactory.getLog(Layout.class).error("Unable to resync layout", e2);
+ throw e;
+ }
+
+ storedLayout = mLayoutFactory.mLayoutStorage
+ .query(filter)
+ .with(getStorableTypeName()).with(generation)
+ .loadOne();
+
+ // Make sure all the properties are re-sync'd too.
+ try {
+ cap.resync(mLayoutFactory.mPropertyStorage.getStorableType(), 1.0,
+ "layoutID = ?", storedLayout.getLayoutID());
+ } catch (RepositoryException e2) {
+ // Uh oh, double trouble. Log this one and throw original exception.
+ LogFactory.getLog(Layout.class).error("Unable to resync layout", e2);
+ throw e;
+ }
+ }
+
return new Layout(mLayoutFactory, storedLayout);
}
diff --git a/src/main/java/com/amazon/carbonado/raw/RawStorableGenerator.java b/src/main/java/com/amazon/carbonado/raw/RawStorableGenerator.java index e48aef8..880bb7c 100644 --- a/src/main/java/com/amazon/carbonado/raw/RawStorableGenerator.java +++ b/src/main/java/com/amazon/carbonado/raw/RawStorableGenerator.java @@ -258,7 +258,8 @@ public class RawStorableGenerator { for (StorableProperty property : properties) {
orMask |= StorableGenerator.PROPERTY_STATE_CLEAN << ((ordinal & 0xf) * 2);
- andMask &= StorableGenerator.PROPERTY_STATE_MASK << ((ordinal & 0xf) * 2);
+ andMask &=
+ ~(StorableGenerator.PROPERTY_STATE_MASK << ((ordinal & 0xf) * 2));
ordinal++;
if ((ordinal & 0xf) == 0 || ordinal >= count) {
diff --git a/src/main/java/com/amazon/carbonado/repo/replicated/ReplicatedRepository.java b/src/main/java/com/amazon/carbonado/repo/replicated/ReplicatedRepository.java index fa64533..0c30ce0 100644 --- a/src/main/java/com/amazon/carbonado/repo/replicated/ReplicatedRepository.java +++ b/src/main/java/com/amazon/carbonado/repo/replicated/ReplicatedRepository.java @@ -422,6 +422,8 @@ class ReplicatedRepository replicaCursor = replicaQuery.fetch();
masterCursor = masterQuery.fetch();
+ S lastReplicaEntry = null;
+ S lastMasterEntry = null;
S replicaEntry = null;
S masterEntry = null;
@@ -453,32 +455,43 @@ class ReplicatedRepository }
break;
} catch (CorruptEncodingException e) {
- replicaEntry = null;
- Storable withKey = e.getStorableWithPrimaryKey();
+ // Exception forces cursor to close. Close again to be sure.
+ replicaCursor.close();
+ replicaCursor = null;
+
+ skipCorruption: {
+ Storable withKey = e.getStorableWithPrimaryKey();
+
+ if (withKey != null &&
+ withKey.storableType() == replicaStorage.getStorableType())
+ {
+ // Delete corrupt replica entry.
+ try {
+ trigger.deleteReplica(withKey);
+ log.info("Deleted corrupt replica entry: " +
+ withKey.toStringKeyOnly(), e);
+ break skipCorruption;
+ } catch (PersistException e2) {
+ log.warn("Unable to delete corrupt replica entry: " +
+ withKey.toStringKeyOnly(), e2);
+ }
+ }
- if (withKey != null &&
- withKey.storableType() == replicaStorage.getStorableType())
- {
- // Delete corrupt replica entry.
+ // Just skip it.
try {
- withKey.delete();
- log.info("Deleted corrupt replica entry: " +
- withKey.toStringKeyOnly(), e);
- continue;
- } catch (PersistException e2) {
- log.warn("Unable to delete corrupt replica entry: " +
- withKey.toStringKeyOnly(), e2);
+ skippedCount += replicaCursor.skipNext(1);
+ log.info("Skipped corrupt replica entry", e);
+ } catch (FetchException e2) {
+ log.error("Unable to skip past corrupt replica entry", e2);
+ throw e;
}
}
- // Just skip it.
- try {
- skippedCount += replicaCursor.skipNext(1);
- log.info("Skipped corrupt replica entry", e);
- } catch (FetchException e2) {
- log.error("Unable to skip past corrupt replica entry", e2);
- throw e;
+ // Re-open (if we can)
+ if (lastReplicaEntry == null) {
+ break;
}
+ replicaCursor = replicaQuery.fetchAfter(lastReplicaEntry);
}
}
}
@@ -512,11 +525,13 @@ class ReplicatedRepository if (replicaCursor == null) {
replicaCursor = replicaQuery.fetchAfter(replicaEntry);
}
+ lastReplicaEntry = replicaEntry;
replicaEntry = null;
} else if (compare > 0) {
// Replica cursor is missing an entry so copy it.
resyncTask = prepareResyncTask(trigger, null, masterEntry);
// Allow master to advance.
+ lastMasterEntry = masterEntry;
masterEntry = null;
} else {
if (replicaEntry == null && masterEntry == null) {
@@ -533,6 +548,8 @@ class ReplicatedRepository if (replicaCursor == null) {
replicaCursor = replicaQuery.fetchAfter(replicaEntry);
}
+ lastReplicaEntry = replicaEntry;
+ lastMasterEntry = masterEntry;
replicaEntry = null;
masterEntry = null;
}
diff --git a/src/main/java/com/amazon/carbonado/repo/replicated/ReplicationTrigger.java b/src/main/java/com/amazon/carbonado/repo/replicated/ReplicationTrigger.java index 7ac60a6..479f870 100644 --- a/src/main/java/com/amazon/carbonado/repo/replicated/ReplicationTrigger.java +++ b/src/main/java/com/amazon/carbonado/repo/replicated/ReplicationTrigger.java @@ -149,7 +149,7 @@ class ReplicationTrigger<S extends Storable> extends Trigger<S> { if (!master.tryLoad()) {
// Master record does not exist. To ensure consistency,
// delete record from replica.
- deleteReplica(replica);
+ tryDeleteReplica(replica);
throw abortTry();
}
} else {
@@ -158,7 +158,7 @@ class ReplicationTrigger<S extends Storable> extends Trigger<S> { } catch (FetchNoneException e) {
// Master record does not exist. To ensure consistency,
// delete record from replica.
- deleteReplica(replica);
+ tryDeleteReplica(replica);
throw e;
}
}
@@ -175,7 +175,7 @@ class ReplicationTrigger<S extends Storable> extends Trigger<S> { if (!master.tryUpdate()) {
// Master record does not exist. To ensure consistency,
// delete record from replica.
- deleteReplica(replica);
+ tryDeleteReplica(replica);
throw abortTry();
}
} else {
@@ -184,7 +184,7 @@ class ReplicationTrigger<S extends Storable> extends Trigger<S> { } catch (PersistNoneException e) {
// Master record does not exist. To ensure consistency,
// delete record from replica.
- deleteReplica(replica);
+ tryDeleteReplica(replica);
throw e;
}
}
@@ -350,12 +350,26 @@ class ReplicationTrigger<S extends Storable> extends Trigger<S> { /**
* Deletes the replica entry with replication disabled.
*/
- private void deleteReplica(S replica) throws PersistException {
+ boolean tryDeleteReplica(Storable replica) throws PersistException {
// Disable replication to prevent trigger from being invoked by
// deleting replica.
setReplicationDisabled(true);
try {
- replica.tryDelete();
+ return replica.tryDelete();
+ } finally {
+ setReplicationDisabled(false);
+ }
+ }
+
+ /**
+ * Deletes the replica entry with replication disabled.
+ */
+ void deleteReplica(Storable replica) throws PersistException {
+ // Disable replication to prevent trigger from being invoked by
+ // deleting replica.
+ setReplicationDisabled(true);
+ try {
+ replica.delete();
} finally {
setReplicationDisabled(false);
}
|