diff options
| author | Brian S. O'Neill <bronee@gmail.com> | 2006-11-07 02:29:16 +0000 | 
|---|---|---|
| committer | Brian S. O'Neill <bronee@gmail.com> | 2006-11-07 02:29:16 +0000 | 
| commit | 2f14a36ed762b2287bab1c97fdb4cbee118e2921 (patch) | |
| tree | 6f79b5010d0b5100171cc975f4ad560b1ae4d18b | |
| parent | 2d942b153e7f2f9ca16c35c67b452c89eb762b7d (diff) | |
Allow resync to delete or skip over corrupt replica entries.
5 files changed, 192 insertions, 48 deletions
| diff --git a/src/main/java/com/amazon/carbonado/CorruptEncodingException.java b/src/main/java/com/amazon/carbonado/CorruptEncodingException.java index b4dcf77..d35fbe4 100644 --- a/src/main/java/com/amazon/carbonado/CorruptEncodingException.java +++ b/src/main/java/com/amazon/carbonado/CorruptEncodingException.java @@ -27,6 +27,8 @@ public class CorruptEncodingException extends FetchException {      private static final long serialVersionUID = 4543503149683482362L;
 +    private Storable mStorable;
 +
      public CorruptEncodingException() {
          super();
      }
 @@ -51,4 +53,33 @@ public class CorruptEncodingException extends FetchException {          super("Expected layout generation of " + expectedGeneration +
                ", but actual layout generation was " + actualGeneration);
      }
 +
 +    /**
 +     * If the decoder can at least extract the primary key, it should set it here.
 +     */
 +    public void setStorableWithPrimaryKey(Storable s) {
 +        mStorable = s;
 +    }
 +
 +    /**
 +     * If the decoder was able to extract the primary key, it will be available
 +     * in the returned Storable.
 +     *
 +     * @return partial Storable with primary key defined, or null if unable to
 +     * decode the key
 +     */
 +    public Storable getStorableWithPrimaryKey() {
 +        return mStorable;
 +    }
 +
 +    @Override
 +    public String getMessage() {
 +        String message = super.getMessage();
 +        
 +        if (mStorable != null) {
 +            message = message + "; " + mStorable.toStringKeyOnly();
 +        }
 +
 +        return message;
 +    }
  }
 diff --git a/src/main/java/com/amazon/carbonado/raw/GenericInstanceFactory.java b/src/main/java/com/amazon/carbonado/raw/GenericInstanceFactory.java index e436760..339bdbd 100644 --- a/src/main/java/com/amazon/carbonado/raw/GenericInstanceFactory.java +++ b/src/main/java/com/amazon/carbonado/raw/GenericInstanceFactory.java @@ -30,6 +30,9 @@ import com.amazon.carbonado.Storable;  public interface GenericInstanceFactory {
      Storable instantiate(RawSupport support);
 +    Storable instantiate(RawSupport support, byte[] key)
 +        throws FetchException;
 +
      Storable instantiate(RawSupport support, byte[] key, byte[] value)
          throws FetchException;
  }
 diff --git a/src/main/java/com/amazon/carbonado/raw/GenericStorableCodec.java b/src/main/java/com/amazon/carbonado/raw/GenericStorableCodec.java index df3245b..cac1283 100644 --- a/src/main/java/com/amazon/carbonado/raw/GenericStorableCodec.java +++ b/src/main/java/com/amazon/carbonado/raw/GenericStorableCodec.java @@ -149,27 +149,31 @@ public class GenericStorableCodec<S extends Storable> implements StorableCodec<S              b.returnVoid();
          }
 -        // Add constructor that accepts a RawSupport.
 -        {
 -            TypeDesc[] params = {rawSupportType};
 -            MethodInfo mi = cf.addConstructor(Modifiers.PUBLIC, params);
 -            CodeBuilder b = new CodeBuilder(mi);
 -            b.loadThis();
 -            b.loadLocal(b.getParameter(0));
 -            b.invokeSuperConstructor(params);
 -            b.returnVoid();
 -        }
 +        // Add constructors.
 +        // 1: Accepts a RawSupport.
 +        // 2: Accepts a RawSupport and an encoded key.
 +        // 3: Accepts a RawSupport, an encoded key and an encoded data.
 +        for (int i=1; i<=3; i++) {
 +            TypeDesc[] params = new TypeDesc[i];
 +            params[0] = rawSupportType;
 +            if (i >= 2) {
 +                params[1] = byteArrayType;
 +                if (i == 3) {
 +                    params[2] = byteArrayType;
 +                }
 +            }
 -        // Add constructor that accepts a RawSupport, an encoded key, and an
 -        // encoded data.
 -        {
 -            TypeDesc[] params = {rawSupportType, byteArrayType, byteArrayType};
              MethodInfo mi = cf.addConstructor(Modifiers.PUBLIC, params);
              CodeBuilder b = new CodeBuilder(mi);
 +
              b.loadThis();
              b.loadLocal(b.getParameter(0));
 -            b.loadLocal(b.getParameter(1));
 -            b.loadLocal(b.getParameter(2));
 +            if (i >= 2) {
 +                b.loadLocal(b.getParameter(1));
 +                if (i == 3) {
 +                    b.loadLocal(b.getParameter(2));
 +                }
 +            }
              b.invokeSuperConstructor(params);
              b.returnVoid();
          }
 @@ -367,7 +371,17 @@ public class GenericStorableCodec<S extends Storable> implements StorableCodec<S      public S instantiate(RawSupport<S> support, byte[] key, byte[] value)
          throws FetchException
      {
 -        return (S) mInstanceFactory.instantiate(support, key, value);
 +        try {
 +            return (S) mInstanceFactory.instantiate(support, key, value);
 +        } catch (CorruptEncodingException e) {
 +            // Try to instantiate just the key and pass what we can to the exception.
 +            try {
 +                e.setStorableWithPrimaryKey(mInstanceFactory.instantiate(support, key));
 +            } catch (FetchException e2) {
 +                // Oh well, can't even decode the key.
 +            }
 +            throw e;
 +        }
      }
      public StorableIndex<S> getPrimaryKeyIndex() {
 diff --git a/src/main/java/com/amazon/carbonado/raw/RawStorableGenerator.java b/src/main/java/com/amazon/carbonado/raw/RawStorableGenerator.java index 93c38be..e48aef8 100644 --- a/src/main/java/com/amazon/carbonado/raw/RawStorableGenerator.java +++ b/src/main/java/com/amazon/carbonado/raw/RawStorableGenerator.java @@ -20,6 +20,7 @@ package com.amazon.carbonado.raw;  import java.lang.ref.Reference;
  import java.lang.ref.SoftReference;
 +import java.util.Collection;
  import java.util.EnumSet;
  import java.util.Map;
 @@ -29,6 +30,7 @@ import org.cojen.classfile.Label;  import org.cojen.classfile.LocalVariable;
  import org.cojen.classfile.MethodInfo;
  import org.cojen.classfile.Modifiers;
 +import org.cojen.classfile.Opcode;
  import org.cojen.classfile.TypeDesc;
  import org.cojen.util.ClassInjector;
  import org.cojen.util.WeakIdentityMap;
 @@ -38,6 +40,9 @@ import com.amazon.carbonado.PersistException;  import com.amazon.carbonado.Storable;
  import com.amazon.carbonado.SupportException;
 +import com.amazon.carbonado.info.StorableIntrospector;
 +import com.amazon.carbonado.info.StorableProperty;
 +
  import com.amazon.carbonado.spi.MasterFeature;
  import com.amazon.carbonado.spi.MasterStorableGenerator;
  import com.amazon.carbonado.spi.MasterSupport;
 @@ -112,12 +117,14 @@ public class RawStorableGenerator {       * Returns an abstract implementation of the given Storable type, which is
       * fully thread-safe. The Storable type itself may be an interface or a
       * class. If it is a class, then it must not be final, and it must have a
 -     * public, no-arg constructor. Two constructors are defined for the
 +     * public, no-arg constructor. Three constructors are defined for the
       * abstract implementation:
       *
       * <pre>
       * public <init>(RawSupport);
 -
 +     *
 +     * public <init>(RawSupport, byte[] key);
 +     *
       * public <init>(RawSupport, byte[] key, byte[] value);
       * </pre>
       *
 @@ -201,40 +208,80 @@ public class RawStorableGenerator {          final TypeDesc rawSupportType = TypeDesc.forClass(RawSupport.class);
          final TypeDesc byteArrayType = TypeDesc.forClass(byte[].class);
 -        // Add constructor that accepts a RawSupport.
 -        {
 -            TypeDesc[] params = {rawSupportType};
 -            MethodInfo mi = cf.addConstructor(Modifiers.PUBLIC, params);
 -            CodeBuilder b = new CodeBuilder(mi);
 -            b.loadThis();
 -            b.loadLocal(b.getParameter(0));
 -            b.invokeSuperConstructor(new TypeDesc[] {masterSupportType});
 -            b.returnVoid();
 -        }
 +        // Add constructors.
 +        // 1: Accepts a RawSupport.
 +        // 2: Accepts a RawSupport and an encoded key.
 +        // 3: Accepts a RawSupport, an encoded key and an encoded data.
 +        for (int i=1; i<=3; i++) {
 +            TypeDesc[] params = new TypeDesc[i];
 +            params[0] = rawSupportType;
 +            if (i >= 2) {
 +                params[1] = byteArrayType;
 +                if (i == 3) {
 +                    params[2] = byteArrayType;
 +                }
 +            }
 -        // Add constructor that accepts a RawSupport, an encoded key, and an
 -        // encoded data.
 -        {
 -            TypeDesc[] params = {rawSupportType, byteArrayType, byteArrayType};
              MethodInfo mi = cf.addConstructor(Modifiers.PUBLIC, params);
              CodeBuilder b = new CodeBuilder(mi);
 +
              b.loadThis();
              b.loadLocal(b.getParameter(0));
              b.invokeSuperConstructor(new TypeDesc[] {masterSupportType});
 -            params = new TypeDesc[] {byteArrayType};
 -
 -            b.loadThis();
 -            b.loadLocal(b.getParameter(1));
 -            b.invokeVirtual(DECODE_KEY_METHOD_NAME, null, params);
 -
 -            b.loadThis();
 -            b.loadLocal(b.getParameter(2));
 -            b.invokeVirtual(DECODE_DATA_METHOD_NAME, null, params);
 -
 -            // Indicate that object is clean by calling markAllPropertiesClean.
 -            b.loadThis();
 -            b.invokeVirtual(MARK_ALL_PROPERTIES_CLEAN, null, null);
 +            if (i >= 2) {
 +                params = new TypeDesc[] {byteArrayType};
 +
 +                b.loadThis();
 +                b.loadLocal(b.getParameter(1));
 +                b.invokeVirtual(DECODE_KEY_METHOD_NAME, null, params);
 +
 +                if (i == 3) {
 +                    b.loadThis();
 +                    b.loadLocal(b.getParameter(2));
 +                    b.invokeVirtual(DECODE_DATA_METHOD_NAME, null, params);
 +
 +                    // Indicate that object is clean by calling markAllPropertiesClean.
 +                    b.loadThis();
 +                    b.invokeVirtual(MARK_ALL_PROPERTIES_CLEAN, null, null);
 +                } else {
 +                    // Only the primary key is clean. Calling
 +                    // markPropertiesClean might have no effect since subclass
 +                    // may set fields directly. Instead, set state bits directly.
 +
 +                    Collection<? extends StorableProperty<S>> properties = StorableIntrospector
 +                        .examine(storableClass).getPrimaryKeyProperties().values();
 +                    final int count = properties.size();
 +                    int ordinal = 0;
 +                    int andMask = ~0;
 +                    int orMask = 0;
 +
 +                    for (StorableProperty property : properties) {
 +                        orMask |= StorableGenerator.PROPERTY_STATE_CLEAN << ((ordinal & 0xf) * 2);
 +                        andMask &= StorableGenerator.PROPERTY_STATE_MASK << ((ordinal & 0xf) * 2);
 +
 +                        ordinal++;
 +                        if ((ordinal & 0xf) == 0 || ordinal >= count) {
 +                            String stateFieldName =
 +                                StorableGenerator.PROPERTY_STATE_FIELD_NAME + ((ordinal - 1) >> 4);
 +                            b.loadThis();
 +                            if (andMask == 0) {
 +                                b.loadConstant(orMask);
 +                            } else {
 +                                b.loadThis();
 +                                b.loadField(stateFieldName, TypeDesc.INT);
 +                                b.loadConstant(andMask);
 +                                b.math(Opcode.IAND);
 +                                b.loadConstant(orMask);
 +                                b.math(Opcode.IOR);
 +                            }
 +                            b.storeField(stateFieldName, TypeDesc.INT);
 +                            andMask = ~0;
 +                            orMask = 0;
 +                        }
 +                    }
 +                }
 +            }
              b.returnVoid();
          }
 diff --git a/src/main/java/com/amazon/carbonado/repo/replicated/ReplicatedRepository.java b/src/main/java/com/amazon/carbonado/repo/replicated/ReplicatedRepository.java index 11212c4..fa64533 100644 --- a/src/main/java/com/amazon/carbonado/repo/replicated/ReplicatedRepository.java +++ b/src/main/java/com/amazon/carbonado/repo/replicated/ReplicatedRepository.java @@ -21,14 +21,18 @@ import java.util.Comparator;  import java.util.LinkedHashSet;
  import java.util.Set;
 +import org.apache.commons.logging.Log;
  import org.apache.commons.logging.LogFactory;
  import org.cojen.util.BeanComparator;
 +import com.amazon.carbonado.CorruptEncodingException;
  import com.amazon.carbonado.Cursor;
 +import com.amazon.carbonado.FetchException;
  import com.amazon.carbonado.FetchInterruptedException;
  import com.amazon.carbonado.IsolationLevel;
  import com.amazon.carbonado.MalformedTypeException;
 +import com.amazon.carbonado.PersistException;
  import com.amazon.carbonado.Query;
  import com.amazon.carbonado.Repository;
  import com.amazon.carbonado.RepositoryException;
 @@ -409,6 +413,8 @@ class ReplicatedRepository                                               Comparator comparator, Transaction replicaTxn)
          throws RepositoryException
      {
 +        final Log log = LogFactory.getLog(ReplicatedRepository.class);
 +
          Cursor<S> replicaCursor = null;
          Cursor<S> masterCursor = null;
 @@ -430,8 +436,51 @@ class ReplicatedRepository                      }
                  }
 -                if (replicaEntry == null && replicaCursor != null && replicaCursor.hasNext()) {
 -                    replicaEntry = replicaCursor.next();
 +                if (replicaEntry == null && replicaCursor != null) {
 +                    long skippedCount = 0;
 +                    while (replicaCursor.hasNext()) {
 +                        try {
 +                            replicaEntry = replicaCursor.next();
 +                            if (skippedCount > 0) {
 +                                if (skippedCount == 1) {
 +                                    log.warn("Skipped corrupt replica entry before this one: " +
 +                                             replicaEntry);
 +                                } else {
 +                                    log.warn("Skipped " + skippedCount +
 +                                             " corrupt replica entries before this one: " +
 +                                             replicaEntry);
 +                                }
 +                            }
 +                            break;
 +                        } catch (CorruptEncodingException e) {
 +                            replicaEntry = null;
 +                            Storable withKey = e.getStorableWithPrimaryKey();
 +
 +                            if (withKey != null &&
 +                                withKey.storableType() == replicaStorage.getStorableType())
 +                            {
 +                                // Delete corrupt replica entry.
 +                                try {
 +                                    withKey.delete();
 +                                    log.info("Deleted corrupt replica entry: " +
 +                                             withKey.toStringKeyOnly(), e);
 +                                    continue;
 +                                } catch (PersistException e2) {
 +                                    log.warn("Unable to delete corrupt replica entry: " +
 +                                             withKey.toStringKeyOnly(), e2);
 +                                }
 +                            }
 +
 +                            // Just skip it.
 +                            try {
 +                                skippedCount += replicaCursor.skipNext(1);
 +                                log.info("Skipped corrupt replica entry", e);
 +                            } catch (FetchException e2) {
 +                                log.error("Unable to skip past corrupt replica entry", e2);
 +                                throw e;
 +                            }
 +                        }
 +                    }
                  }
                  if (count++ >= RESYNC_WATERMARK || txnCount >= RESYNC_BATCH_SIZE) {
 | 
