summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/main/java/com/amazon/carbonado/CorruptEncodingException.java31
-rw-r--r--src/main/java/com/amazon/carbonado/raw/GenericInstanceFactory.java3
-rw-r--r--src/main/java/com/amazon/carbonado/raw/GenericStorableCodec.java48
-rw-r--r--src/main/java/com/amazon/carbonado/raw/RawStorableGenerator.java105
-rw-r--r--src/main/java/com/amazon/carbonado/repo/replicated/ReplicatedRepository.java53
5 files changed, 192 insertions, 48 deletions
diff --git a/src/main/java/com/amazon/carbonado/CorruptEncodingException.java b/src/main/java/com/amazon/carbonado/CorruptEncodingException.java
index b4dcf77..d35fbe4 100644
--- a/src/main/java/com/amazon/carbonado/CorruptEncodingException.java
+++ b/src/main/java/com/amazon/carbonado/CorruptEncodingException.java
@@ -27,6 +27,8 @@ public class CorruptEncodingException extends FetchException {
private static final long serialVersionUID = 4543503149683482362L;
+ private Storable mStorable;
+
public CorruptEncodingException() {
super();
}
@@ -51,4 +53,33 @@ public class CorruptEncodingException extends FetchException {
super("Expected layout generation of " + expectedGeneration +
", but actual layout generation was " + actualGeneration);
}
+
+ /**
+ * If the decoder can at least extract the primary key, it should set it here.
+ */
+ public void setStorableWithPrimaryKey(Storable s) {
+ mStorable = s;
+ }
+
+ /**
+ * If the decoder was able to extract the primary key, it will be available
+ * in the returned Storable.
+ *
+ * @return partial Storable with primary key defined, or null if unable to
+ * decode the key
+ */
+ public Storable getStorableWithPrimaryKey() {
+ return mStorable;
+ }
+
+ @Override
+ public String getMessage() {
+ String message = super.getMessage();
+
+ if (mStorable != null) {
+ message = message + "; " + mStorable.toStringKeyOnly();
+ }
+
+ return message;
+ }
}
diff --git a/src/main/java/com/amazon/carbonado/raw/GenericInstanceFactory.java b/src/main/java/com/amazon/carbonado/raw/GenericInstanceFactory.java
index e436760..339bdbd 100644
--- a/src/main/java/com/amazon/carbonado/raw/GenericInstanceFactory.java
+++ b/src/main/java/com/amazon/carbonado/raw/GenericInstanceFactory.java
@@ -30,6 +30,9 @@ import com.amazon.carbonado.Storable;
public interface GenericInstanceFactory {
Storable instantiate(RawSupport support);
+ Storable instantiate(RawSupport support, byte[] key)
+ throws FetchException;
+
Storable instantiate(RawSupport support, byte[] key, byte[] value)
throws FetchException;
}
diff --git a/src/main/java/com/amazon/carbonado/raw/GenericStorableCodec.java b/src/main/java/com/amazon/carbonado/raw/GenericStorableCodec.java
index df3245b..cac1283 100644
--- a/src/main/java/com/amazon/carbonado/raw/GenericStorableCodec.java
+++ b/src/main/java/com/amazon/carbonado/raw/GenericStorableCodec.java
@@ -149,27 +149,31 @@ public class GenericStorableCodec<S extends Storable> implements StorableCodec<S
b.returnVoid();
}
- // Add constructor that accepts a RawSupport.
- {
- TypeDesc[] params = {rawSupportType};
- MethodInfo mi = cf.addConstructor(Modifiers.PUBLIC, params);
- CodeBuilder b = new CodeBuilder(mi);
- b.loadThis();
- b.loadLocal(b.getParameter(0));
- b.invokeSuperConstructor(params);
- b.returnVoid();
- }
+ // Add constructors.
+ // 1: Accepts a RawSupport.
+ // 2: Accepts a RawSupport and an encoded key.
+ // 3: Accepts a RawSupport, an encoded key and an encoded data.
+ for (int i=1; i<=3; i++) {
+ TypeDesc[] params = new TypeDesc[i];
+ params[0] = rawSupportType;
+ if (i >= 2) {
+ params[1] = byteArrayType;
+ if (i == 3) {
+ params[2] = byteArrayType;
+ }
+ }
- // Add constructor that accepts a RawSupport, an encoded key, and an
- // encoded data.
- {
- TypeDesc[] params = {rawSupportType, byteArrayType, byteArrayType};
MethodInfo mi = cf.addConstructor(Modifiers.PUBLIC, params);
CodeBuilder b = new CodeBuilder(mi);
+
b.loadThis();
b.loadLocal(b.getParameter(0));
- b.loadLocal(b.getParameter(1));
- b.loadLocal(b.getParameter(2));
+ if (i >= 2) {
+ b.loadLocal(b.getParameter(1));
+ if (i == 3) {
+ b.loadLocal(b.getParameter(2));
+ }
+ }
b.invokeSuperConstructor(params);
b.returnVoid();
}
@@ -367,7 +371,17 @@ public class GenericStorableCodec<S extends Storable> implements StorableCodec<S
public S instantiate(RawSupport<S> support, byte[] key, byte[] value)
throws FetchException
{
- return (S) mInstanceFactory.instantiate(support, key, value);
+ try {
+ return (S) mInstanceFactory.instantiate(support, key, value);
+ } catch (CorruptEncodingException e) {
+ // Try to instantiate just the key and pass what we can to the exception.
+ try {
+ e.setStorableWithPrimaryKey(mInstanceFactory.instantiate(support, key));
+ } catch (FetchException e2) {
+ // Oh well, can't even decode the key.
+ }
+ throw e;
+ }
}
public StorableIndex<S> getPrimaryKeyIndex() {
diff --git a/src/main/java/com/amazon/carbonado/raw/RawStorableGenerator.java b/src/main/java/com/amazon/carbonado/raw/RawStorableGenerator.java
index 93c38be..e48aef8 100644
--- a/src/main/java/com/amazon/carbonado/raw/RawStorableGenerator.java
+++ b/src/main/java/com/amazon/carbonado/raw/RawStorableGenerator.java
@@ -20,6 +20,7 @@ package com.amazon.carbonado.raw;
import java.lang.ref.Reference;
import java.lang.ref.SoftReference;
+import java.util.Collection;
import java.util.EnumSet;
import java.util.Map;
@@ -29,6 +30,7 @@ import org.cojen.classfile.Label;
import org.cojen.classfile.LocalVariable;
import org.cojen.classfile.MethodInfo;
import org.cojen.classfile.Modifiers;
+import org.cojen.classfile.Opcode;
import org.cojen.classfile.TypeDesc;
import org.cojen.util.ClassInjector;
import org.cojen.util.WeakIdentityMap;
@@ -38,6 +40,9 @@ import com.amazon.carbonado.PersistException;
import com.amazon.carbonado.Storable;
import com.amazon.carbonado.SupportException;
+import com.amazon.carbonado.info.StorableIntrospector;
+import com.amazon.carbonado.info.StorableProperty;
+
import com.amazon.carbonado.spi.MasterFeature;
import com.amazon.carbonado.spi.MasterStorableGenerator;
import com.amazon.carbonado.spi.MasterSupport;
@@ -112,12 +117,14 @@ public class RawStorableGenerator {
* Returns an abstract implementation of the given Storable type, which is
* fully thread-safe. The Storable type itself may be an interface or a
* class. If it is a class, then it must not be final, and it must have a
- * public, no-arg constructor. Two constructors are defined for the
+ * public, no-arg constructor. Three constructors are defined for the
* abstract implementation:
*
* <pre>
* public &lt;init&gt;(RawSupport);
-
+ *
+ * public &lt;init&gt;(RawSupport, byte[] key);
+ *
* public &lt;init&gt;(RawSupport, byte[] key, byte[] value);
* </pre>
*
@@ -201,40 +208,80 @@ public class RawStorableGenerator {
final TypeDesc rawSupportType = TypeDesc.forClass(RawSupport.class);
final TypeDesc byteArrayType = TypeDesc.forClass(byte[].class);
- // Add constructor that accepts a RawSupport.
- {
- TypeDesc[] params = {rawSupportType};
- MethodInfo mi = cf.addConstructor(Modifiers.PUBLIC, params);
- CodeBuilder b = new CodeBuilder(mi);
- b.loadThis();
- b.loadLocal(b.getParameter(0));
- b.invokeSuperConstructor(new TypeDesc[] {masterSupportType});
- b.returnVoid();
- }
+ // Add constructors.
+ // 1: Accepts a RawSupport.
+ // 2: Accepts a RawSupport and an encoded key.
+ // 3: Accepts a RawSupport, an encoded key and an encoded data.
+ for (int i=1; i<=3; i++) {
+ TypeDesc[] params = new TypeDesc[i];
+ params[0] = rawSupportType;
+ if (i >= 2) {
+ params[1] = byteArrayType;
+ if (i == 3) {
+ params[2] = byteArrayType;
+ }
+ }
- // Add constructor that accepts a RawSupport, an encoded key, and an
- // encoded data.
- {
- TypeDesc[] params = {rawSupportType, byteArrayType, byteArrayType};
MethodInfo mi = cf.addConstructor(Modifiers.PUBLIC, params);
CodeBuilder b = new CodeBuilder(mi);
+
b.loadThis();
b.loadLocal(b.getParameter(0));
b.invokeSuperConstructor(new TypeDesc[] {masterSupportType});
- params = new TypeDesc[] {byteArrayType};
-
- b.loadThis();
- b.loadLocal(b.getParameter(1));
- b.invokeVirtual(DECODE_KEY_METHOD_NAME, null, params);
-
- b.loadThis();
- b.loadLocal(b.getParameter(2));
- b.invokeVirtual(DECODE_DATA_METHOD_NAME, null, params);
-
- // Indicate that object is clean by calling markAllPropertiesClean.
- b.loadThis();
- b.invokeVirtual(MARK_ALL_PROPERTIES_CLEAN, null, null);
+ if (i >= 2) {
+ params = new TypeDesc[] {byteArrayType};
+
+ b.loadThis();
+ b.loadLocal(b.getParameter(1));
+ b.invokeVirtual(DECODE_KEY_METHOD_NAME, null, params);
+
+ if (i == 3) {
+ b.loadThis();
+ b.loadLocal(b.getParameter(2));
+ b.invokeVirtual(DECODE_DATA_METHOD_NAME, null, params);
+
+ // Indicate that object is clean by calling markAllPropertiesClean.
+ b.loadThis();
+ b.invokeVirtual(MARK_ALL_PROPERTIES_CLEAN, null, null);
+ } else {
+ // Only the primary key is clean. Calling
+ // markPropertiesClean might have no effect since subclass
+ // may set fields directly. Instead, set state bits directly.
+
+ Collection<? extends StorableProperty<S>> properties = StorableIntrospector
+ .examine(storableClass).getPrimaryKeyProperties().values();
+ final int count = properties.size();
+ int ordinal = 0;
+ int andMask = ~0;
+ int orMask = 0;
+
+ for (StorableProperty property : properties) {
+ orMask |= StorableGenerator.PROPERTY_STATE_CLEAN << ((ordinal & 0xf) * 2);
+ andMask &= StorableGenerator.PROPERTY_STATE_MASK << ((ordinal & 0xf) * 2);
+
+ ordinal++;
+ if ((ordinal & 0xf) == 0 || ordinal >= count) {
+ String stateFieldName =
+ StorableGenerator.PROPERTY_STATE_FIELD_NAME + ((ordinal - 1) >> 4);
+ b.loadThis();
+ if (andMask == 0) {
+ b.loadConstant(orMask);
+ } else {
+ b.loadThis();
+ b.loadField(stateFieldName, TypeDesc.INT);
+ b.loadConstant(andMask);
+ b.math(Opcode.IAND);
+ b.loadConstant(orMask);
+ b.math(Opcode.IOR);
+ }
+ b.storeField(stateFieldName, TypeDesc.INT);
+ andMask = ~0;
+ orMask = 0;
+ }
+ }
+ }
+ }
b.returnVoid();
}
diff --git a/src/main/java/com/amazon/carbonado/repo/replicated/ReplicatedRepository.java b/src/main/java/com/amazon/carbonado/repo/replicated/ReplicatedRepository.java
index 11212c4..fa64533 100644
--- a/src/main/java/com/amazon/carbonado/repo/replicated/ReplicatedRepository.java
+++ b/src/main/java/com/amazon/carbonado/repo/replicated/ReplicatedRepository.java
@@ -21,14 +21,18 @@ import java.util.Comparator;
import java.util.LinkedHashSet;
import java.util.Set;
+import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.cojen.util.BeanComparator;
+import com.amazon.carbonado.CorruptEncodingException;
import com.amazon.carbonado.Cursor;
+import com.amazon.carbonado.FetchException;
import com.amazon.carbonado.FetchInterruptedException;
import com.amazon.carbonado.IsolationLevel;
import com.amazon.carbonado.MalformedTypeException;
+import com.amazon.carbonado.PersistException;
import com.amazon.carbonado.Query;
import com.amazon.carbonado.Repository;
import com.amazon.carbonado.RepositoryException;
@@ -409,6 +413,8 @@ class ReplicatedRepository
Comparator comparator, Transaction replicaTxn)
throws RepositoryException
{
+ final Log log = LogFactory.getLog(ReplicatedRepository.class);
+
Cursor<S> replicaCursor = null;
Cursor<S> masterCursor = null;
@@ -430,8 +436,51 @@ class ReplicatedRepository
}
}
- if (replicaEntry == null && replicaCursor != null && replicaCursor.hasNext()) {
- replicaEntry = replicaCursor.next();
+ if (replicaEntry == null && replicaCursor != null) {
+ long skippedCount = 0;
+ while (replicaCursor.hasNext()) {
+ try {
+ replicaEntry = replicaCursor.next();
+ if (skippedCount > 0) {
+ if (skippedCount == 1) {
+ log.warn("Skipped corrupt replica entry before this one: " +
+ replicaEntry);
+ } else {
+ log.warn("Skipped " + skippedCount +
+ " corrupt replica entries before this one: " +
+ replicaEntry);
+ }
+ }
+ break;
+ } catch (CorruptEncodingException e) {
+ replicaEntry = null;
+ Storable withKey = e.getStorableWithPrimaryKey();
+
+ if (withKey != null &&
+ withKey.storableType() == replicaStorage.getStorableType())
+ {
+ // Delete corrupt replica entry.
+ try {
+ withKey.delete();
+ log.info("Deleted corrupt replica entry: " +
+ withKey.toStringKeyOnly(), e);
+ continue;
+ } catch (PersistException e2) {
+ log.warn("Unable to delete corrupt replica entry: " +
+ withKey.toStringKeyOnly(), e2);
+ }
+ }
+
+ // Just skip it.
+ try {
+ skippedCount += replicaCursor.skipNext(1);
+ log.info("Skipped corrupt replica entry", e);
+ } catch (FetchException e2) {
+ log.error("Unable to skip past corrupt replica entry", e2);
+ throw e;
+ }
+ }
+ }
}
if (count++ >= RESYNC_WATERMARK || txnCount >= RESYNC_BATCH_SIZE) {