diff options
Diffstat (limited to 'src/main/java')
14 files changed, 731 insertions, 18 deletions
diff --git a/src/main/java/com/amazon/carbonado/lob/AbstractBlob.java b/src/main/java/com/amazon/carbonado/lob/AbstractBlob.java index 6440978..1d331bc 100644 --- a/src/main/java/com/amazon/carbonado/lob/AbstractBlob.java +++ b/src/main/java/com/amazon/carbonado/lob/AbstractBlob.java @@ -196,4 +196,31 @@ public abstract class AbstractBlob implements Blob {              }
          }
      }
 +
 +    @Override
 +    public int hashCode() {
 +        Object locator = getLocator();
 +        return locator == null ? super.hashCode() : locator.hashCode();
 +    }
 +
 +    @Override
 +    public boolean equals(Object obj) {
 +        if (this == obj) {
 +            return true;
 +        }
 +        if (obj instanceof AbstractBlob) {
 +            Object locator = getLocator();
 +            if (locator != null) {
 +                AbstractBlob other = (AbstractBlob) obj;
 +                return locator == other.getLocator();
 +            }
 +        }
 +        return false;
 +    }
 +
 +    @Override
 +    public String toString() {
 +        Object locator = getLocator();
 +        return locator == null ? super.toString() : ("Blob@" + locator);
 +    }
  }
 diff --git a/src/main/java/com/amazon/carbonado/lob/AbstractClob.java b/src/main/java/com/amazon/carbonado/lob/AbstractClob.java index 6e4e273..8531ba4 100644 --- a/src/main/java/com/amazon/carbonado/lob/AbstractClob.java +++ b/src/main/java/com/amazon/carbonado/lob/AbstractClob.java @@ -165,4 +165,31 @@ public abstract class AbstractClob implements Clob {              }
          }
      }
 +
 +    @Override
 +    public int hashCode() {
 +        Object locator = getLocator();
 +        return locator == null ? super.hashCode() : locator.hashCode();
 +    }
 +
 +    @Override
 +    public boolean equals(Object obj) {
 +        if (this == obj) {
 +            return true;
 +        }
 +        if (obj instanceof AbstractClob) {
 +            Object locator = getLocator();
 +            if (locator != null) {
 +                AbstractClob other = (AbstractClob) obj;
 +                return locator == other.getLocator();
 +            }
 +        }
 +        return false;
 +    }
 +
 +    @Override
 +    public String toString() {
 +        Object locator = getLocator();
 +        return locator == null ? super.toString() : ("Clob@" + locator);
 +    }
  }
 diff --git a/src/main/java/com/amazon/carbonado/lob/BlobClob.java b/src/main/java/com/amazon/carbonado/lob/BlobClob.java index 9822171..107f422 100644 --- a/src/main/java/com/amazon/carbonado/lob/BlobClob.java +++ b/src/main/java/com/amazon/carbonado/lob/BlobClob.java @@ -96,6 +96,10 @@ public class BlobClob extends AbstractClob {          mBlob.setLength(length << 1);
      }
 +    public Object getLocator() {
 +        return mBlob.getLocator();
 +    }
 +
      protected Blob getWrappedBlob() {
          return mBlob;
      }
 diff --git a/src/main/java/com/amazon/carbonado/lob/ByteArrayBlob.java b/src/main/java/com/amazon/carbonado/lob/ByteArrayBlob.java index eadd8e1..028f575 100644 --- a/src/main/java/com/amazon/carbonado/lob/ByteArrayBlob.java +++ b/src/main/java/com/amazon/carbonado/lob/ByteArrayBlob.java @@ -305,6 +305,13 @@ public class ByteArrayBlob extends AbstractBlob {          }
      }
 +    /**
 +     * Always returns null.
 +     */
 +    public Object getLocator() {
 +        return null;
 +    }
 +
      private static class Input extends InputStream {
          private final ByteArrayBlob mBlob;
          private long mPos;
 diff --git a/src/main/java/com/amazon/carbonado/lob/CharArrayClob.java b/src/main/java/com/amazon/carbonado/lob/CharArrayClob.java index ed6b5e9..1472da3 100644 --- a/src/main/java/com/amazon/carbonado/lob/CharArrayClob.java +++ b/src/main/java/com/amazon/carbonado/lob/CharArrayClob.java @@ -310,6 +310,13 @@ public class CharArrayClob extends AbstractClob {          }
      }
 +    /**
 +     * Always returns null.
 +     */
 +    public Object getLocator() {
 +        return null;
 +    }
 +
      private static class Input extends Reader {
          private final CharArrayClob mClob;
          private long mPos;
 diff --git a/src/main/java/com/amazon/carbonado/lob/FileBlob.java b/src/main/java/com/amazon/carbonado/lob/FileBlob.java index c351361..eeba914 100644 --- a/src/main/java/com/amazon/carbonado/lob/FileBlob.java +++ b/src/main/java/com/amazon/carbonado/lob/FileBlob.java @@ -109,4 +109,11 @@ public class FileBlob extends AbstractBlob {              throw new PersistException(e);
          }
      }
 +
 +    /**
 +     * Always returns null.
 +     */
 +    public Object getLocator() {
 +        return null;
 +    }
  }
 diff --git a/src/main/java/com/amazon/carbonado/lob/Lob.java b/src/main/java/com/amazon/carbonado/lob/Lob.java index a09bfe0..404500f 100644 --- a/src/main/java/com/amazon/carbonado/lob/Lob.java +++ b/src/main/java/com/amazon/carbonado/lob/Lob.java @@ -25,6 +25,14 @@ package com.amazon.carbonado.lob;   */
  public interface Lob {
      /**
 +     * Returns an object which identifies the Lob data, which may be null if
 +     * not supported.
 +     *
 +     * @since 1.2
 +     */
 +    Object getLocator();
 +
 +    /**
       * Two Lobs are considered equal if the object instances are the same or if
       * they point to the same content. Lob data is not compared, as that would
       * be expensive or it may result in a fetch exception.
 diff --git a/src/main/java/com/amazon/carbonado/lob/StringClob.java b/src/main/java/com/amazon/carbonado/lob/StringClob.java index f19d63d..fe9b05d 100644 --- a/src/main/java/com/amazon/carbonado/lob/StringClob.java +++ b/src/main/java/com/amazon/carbonado/lob/StringClob.java @@ -87,6 +87,13 @@ public class StringClob extends AbstractClob {          denied();
      }
 +    /**
 +     * Always returns null.
 +     */
 +    public Object getLocator() {
 +        return null;
 +    }
 +
      private PersistException denied() {
          return new PersistDeniedException("Read-only");
      }
 diff --git a/src/main/java/com/amazon/carbonado/repo/jdbc/JDBCBlob.java b/src/main/java/com/amazon/carbonado/repo/jdbc/JDBCBlob.java index c031a9e..18d1eb4 100644 --- a/src/main/java/com/amazon/carbonado/repo/jdbc/JDBCBlob.java +++ b/src/main/java/com/amazon/carbonado/repo/jdbc/JDBCBlob.java @@ -116,6 +116,11 @@ class JDBCBlob extends AbstractBlob implements JDBCLob {          }
      }
 +    public Object getLocator() {
 +        // FIXME
 +        return null;
 +    }
 +
      public void close() {
          mBlob = null;
      }
 diff --git a/src/main/java/com/amazon/carbonado/repo/jdbc/JDBCClob.java b/src/main/java/com/amazon/carbonado/repo/jdbc/JDBCClob.java index dc2ec0a..dc254bb 100644 --- a/src/main/java/com/amazon/carbonado/repo/jdbc/JDBCClob.java +++ b/src/main/java/com/amazon/carbonado/repo/jdbc/JDBCClob.java @@ -116,6 +116,11 @@ class JDBCClob extends AbstractClob implements JDBCLob {          }
      }
 +    public Object getLocator() {
 +        // FIXME
 +        return null;
 +    }
 +
      public void close() {
          mClob = null;
      }
 diff --git a/src/main/java/com/amazon/carbonado/repo/replicated/BlobReplicationTrigger.java b/src/main/java/com/amazon/carbonado/repo/replicated/BlobReplicationTrigger.java new file mode 100644 index 0000000..0657d50 --- /dev/null +++ b/src/main/java/com/amazon/carbonado/repo/replicated/BlobReplicationTrigger.java @@ -0,0 +1,304 @@ +/*
 + * Copyright 2008 Amazon Technologies, Inc. or its affiliates.
 + * Amazon, Amazon.com and Carbonado are trademarks or registered trademarks
 + * of Amazon Technologies, Inc. or its affiliates.  All rights reserved.
 + *
 + * Licensed under the Apache License, Version 2.0 (the "License");
 + * you may not use this file except in compliance with the License.
 + * You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package com.amazon.carbonado.repo.replicated;
 +
 +import java.io.BufferedOutputStream;
 +import java.io.InputStream;
 +import java.io.IOException;
 +import java.io.OutputStream;
 +
 +import java.nio.charset.Charset;
 +
 +import java.util.ArrayList;
 +import java.util.Map;
 +import java.util.List;
 +
 +import com.amazon.carbonado.FetchException;
 +import com.amazon.carbonado.PersistException;
 +import com.amazon.carbonado.PersistNoneException;
 +import com.amazon.carbonado.Storable;
 +import com.amazon.carbonado.Storage;
 +import com.amazon.carbonado.Trigger;
 +
 +import com.amazon.carbonado.info.StorableIntrospector;
 +import com.amazon.carbonado.info.StorableProperty;
 +
 +import com.amazon.carbonado.lob.AbstractBlob;
 +import com.amazon.carbonado.lob.Blob;
 +import com.amazon.carbonado.lob.ByteArrayBlob;
 +
 +/**
 + * After loading a replica, replaces all Blobs with ReplicatedBlobs.
 + *
 + * @author Brian S O'Neill
 + */
 +class BlobReplicationTrigger<S extends Storable> extends Trigger<S> {
 +    /**
 +     * Returns null if no Blobs need to be replicated.
 +     */
 +    static <S extends Storable> BlobReplicationTrigger<S> create(Storage<S> masterStorage) {
 +        Map<String, ? extends StorableProperty<S>> properties = 
 +            StorableIntrospector.examine(masterStorage.getStorableType()).getDataProperties();
 +
 +        List<String> blobNames = new ArrayList<String>(2);
 +
 +        for (StorableProperty<S> property : properties.values()) {
 +            if (property.getType() == Blob.class) {
 +                blobNames.add(property.getName());
 +            }
 +        }
 +
 +        if (blobNames.size() == 0) {
 +            return null;
 +        }
 +
 +        return new BlobReplicationTrigger<S>(masterStorage,
 +                                             blobNames.toArray(new String[blobNames.size()]));
 +    }
 +
 +    private final Storage<S> mMasterStorage;
 +    private final String[] mBlobNames;
 +
 +    private BlobReplicationTrigger(Storage<S> masterStorage, String[] blobNames) {
 +        mMasterStorage = masterStorage;
 +        mBlobNames = blobNames;
 +    }
 +
 +    @Override
 +    public void afterInsert(S replica, Object state) {
 +        afterLoad(replica);
 +    }
 +
 +    @Override
 +    public void afterUpdate(S replica, Object state) {
 +        afterLoad(replica);
 +    }
 +
 +    @Override
 +    public void afterLoad(S replica) {
 +        for (String name : mBlobNames) {
 +            if (!replica.isPropertySupported(name)) {
 +                continue;
 +            }
 +            Blob replicaBlob = (Blob) replica.getPropertyValue(name);
 +            if (replicaBlob != null) {
 +                if (replicaBlob instanceof BlobReplicationTrigger.Replicated) {
 +                    if (((Replicated) replicaBlob).parent() == this) {
 +                        continue;
 +                    }
 +                }
 +                Replicated blob = new Replicated(name, replica, replicaBlob);
 +                replica.setPropertyValue(name, blob);
 +            }
 +        }
 +        replica.markAllPropertiesClean();
 +    }
 +
 +    /**
 +     * Writes go to master property first, and then to replica.
 +     */
 +    class Replicated extends AbstractBlob {
 +        private static final int DEFAULT_BUFFER_SIZE = 4000;
 +
 +        private final String mBlobName;
 +        private final S mReplica;
 +        private final Blob mReplicaBlob;
 +
 +        private Blob mMasterBlob;
 +        private boolean mMasterBlobLoaded;
 +
 +        Replicated(String blobName, S replica, Blob replicaBlob) {
 +            mBlobName = blobName;
 +            mReplica = replica;
 +            mReplicaBlob = replicaBlob;
 +        }
 +
 +        public InputStream openInputStream() throws FetchException {
 +            return mReplicaBlob.openInputStream();
 +        }
 +
 +        public InputStream openInputStream(long pos) throws FetchException {
 +            return mReplicaBlob.openInputStream(pos);
 +        }
 +
 +        public InputStream openInputStream(long pos, int bufferSize) throws FetchException {
 +            return mReplicaBlob.openInputStream(pos, bufferSize);
 +        }
 +
 +        public long getLength() throws FetchException {
 +            return mReplicaBlob.getLength();
 +        }
 +
 +        public String asString() throws FetchException {
 +            return mReplicaBlob.asString();
 +        }
 +
 +        public String asString(String charsetName) throws FetchException {
 +            return mReplicaBlob.asString(charsetName);
 +        }
 +
 +        public String asString(Charset charset) throws FetchException {
 +            return mReplicaBlob.asString(charset);
 +        }
 +
 +        public OutputStream openOutputStream() throws PersistException {
 +            Blob masterBlob = masterBlob();
 +            if (masterBlob == null) {
 +                return mReplicaBlob.openOutputStream();
 +            } else {
 +                return openOutputStream(masterBlob, 0, DEFAULT_BUFFER_SIZE);
 +            }
 +        }
 +
 +        public OutputStream openOutputStream(long pos) throws PersistException {
 +            Blob masterBlob = masterBlob();
 +            if (masterBlob == null) {
 +                return mReplicaBlob.openOutputStream(pos);
 +            } else {
 +                return openOutputStream(masterBlob, pos, DEFAULT_BUFFER_SIZE);
 +            }
 +        }
 +
 +        public OutputStream openOutputStream(long pos, int bufferSize) throws PersistException {
 +            Blob masterBlob = masterBlob();
 +            if (masterBlob == null) {
 +                return mReplicaBlob.openOutputStream(pos, bufferSize);
 +            } else {
 +                return openOutputStream(masterBlob, pos, bufferSize);
 +            }
 +        }
 +
 +        private OutputStream openOutputStream(Blob masterBlob, long pos, int bufferSize)
 +            throws PersistException
 +        {
 +            if (bufferSize < DEFAULT_BUFFER_SIZE) {
 +                bufferSize = DEFAULT_BUFFER_SIZE;
 +            }
 +
 +            OutputStream masterOut = masterBlob.openOutputStream(pos, 0);
 +            OutputStream replicaOut = mReplicaBlob.openOutputStream(pos, 0);
 +
 +            return new BufferedOutputStream(new Copier(masterOut, replicaOut), bufferSize);
 +        }
 +
 +        public void setLength(long length) throws PersistException {
 +            Blob masterBlob = masterBlob();
 +            if (masterBlob != null) {
 +                masterBlob.setLength(length);
 +            }
 +            mReplicaBlob.setLength(length);
 +        }
 +
 +        public Object getLocator() {
 +            return mReplicaBlob.getLocator();
 +        }
 +
 +        @Override
 +        public int hashCode() {
 +            return mReplicaBlob.hashCode();
 +        }
 +
 +        @Override
 +        public boolean equals(Object obj) {
 +            if (this == obj) {
 +                return true;
 +            }
 +            if (obj instanceof BlobReplicationTrigger.Replicated) {
 +                Replicated other = (Replicated) obj;
 +                return parent() == other.parent() &&
 +                    mReplicaBlob.equals(other.mReplicaBlob);
 +            }
 +            return false;
 +        }
 +
 +        @Override
 +        public String toString() {
 +            Object locator = getLocator();
 +            return locator == null ? super.toString() : ("ReplicatedBlob@" + locator);
 +        }
 +
 +        BlobReplicationTrigger parent() {
 +            return BlobReplicationTrigger.this;
 +        }
 +
 +        /**
 +         * Returns null if not supported.
 +         */
 +        private Blob masterBlob() throws PersistException {
 +            Blob masterBlob = mMasterBlob;
 +
 +            if (mMasterBlobLoaded) {
 +                return masterBlob;
 +            }
 +
 +            S master = mMasterStorage.prepare();
 +            mReplica.copyPrimaryKeyProperties(master);
 +
 +            try {
 +                // FIXME: handle missing master with resync
 +                master.load();
 +
 +                if (master.isPropertySupported(mBlobName)) {
 +                    masterBlob = (Blob) master.getPropertyValue(mBlobName);
 +                    if (masterBlob == null) {
 +                        // FIXME: perform resync, but still throw exception
 +                        throw new PersistNoneException("Master Blob is null: " + mBlobName);
 +                    }
 +                }
 +
 +                mMasterBlob = masterBlob;
 +                mMasterBlobLoaded = true;
 +
 +                return masterBlob;
 +            } catch (FetchException e) {
 +                throw e.toPersistException();
 +            }
 +        }
 +    }
 +
 +    private static class Copier extends OutputStream {
 +        private final OutputStream mReplicaOut;
 +        private final OutputStream mMasterOut;
 +
 +        Copier(OutputStream master, OutputStream replica) {
 +            mMasterOut = master;
 +            mReplicaOut = replica;
 +        }
 +
 +        public void write(int b) throws IOException {
 +            mMasterOut.write(b);
 +            mReplicaOut.write(b);
 +        }
 +
 +        public void write(byte[] b, int off, int len) throws IOException {
 +            mMasterOut.write(b, off, len);
 +            mReplicaOut.write(b, off, len);
 +        }
 +
 +        public void flush() throws IOException {
 +            mMasterOut.flush();
 +            mReplicaOut.flush();
 +        }
 +
 +        public void close() throws IOException {
 +            mMasterOut.close();
 +            mReplicaOut.close();
 +        }
 +    }
 +}
 diff --git a/src/main/java/com/amazon/carbonado/repo/replicated/ClobReplicationTrigger.java b/src/main/java/com/amazon/carbonado/repo/replicated/ClobReplicationTrigger.java new file mode 100644 index 0000000..5da6eed --- /dev/null +++ b/src/main/java/com/amazon/carbonado/repo/replicated/ClobReplicationTrigger.java @@ -0,0 +1,294 @@ +/*
 + * Copyright 2008 Amazon Technologies, Inc. or its affiliates.
 + * Amazon, Amazon.com and Carbonado are trademarks or registered trademarks
 + * of Amazon Technologies, Inc. or its affiliates.  All rights reserved.
 + *
 + * Licensed under the Apache License, Version 2.0 (the "License");
 + * you may not use this file except in compliance with the License.
 + * You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package com.amazon.carbonado.repo.replicated;
 +
 +import java.io.BufferedWriter;
 +import java.io.IOException;
 +import java.io.Reader;
 +import java.io.Writer;
 +
 +import java.util.ArrayList;
 +import java.util.Map;
 +import java.util.List;
 +
 +import com.amazon.carbonado.FetchException;
 +import com.amazon.carbonado.PersistException;
 +import com.amazon.carbonado.PersistNoneException;
 +import com.amazon.carbonado.Storable;
 +import com.amazon.carbonado.Storage;
 +import com.amazon.carbonado.Trigger;
 +
 +import com.amazon.carbonado.info.StorableIntrospector;
 +import com.amazon.carbonado.info.StorableProperty;
 +
 +import com.amazon.carbonado.lob.AbstractClob;
 +import com.amazon.carbonado.lob.Clob;
 +import com.amazon.carbonado.lob.CharArrayClob;
 +
 +/**
 + * After loading a replica, replaces all Clobs with ReplicatedClobs.
 + *
 + * @author Brian S O'Neill
 + */
 +class ClobReplicationTrigger<S extends Storable> extends Trigger<S> {
 +    /**
 +     * Returns null if no Clobs need to be replicated.
 +     */
 +    static <S extends Storable> ClobReplicationTrigger<S> create(Storage<S> masterStorage) {
 +        Map<String, ? extends StorableProperty<S>> properties = 
 +            StorableIntrospector.examine(masterStorage.getStorableType()).getDataProperties();
 +
 +        List<String> clobNames = new ArrayList<String>(2);
 +
 +        for (StorableProperty<S> property : properties.values()) {
 +            if (property.getType() == Clob.class) {
 +                clobNames.add(property.getName());
 +            }
 +        }
 +
 +        if (clobNames.size() == 0) {
 +            return null;
 +        }
 +
 +        return new ClobReplicationTrigger<S>(masterStorage,
 +                                             clobNames.toArray(new String[clobNames.size()]));
 +    }
 +
 +    private final Storage<S> mMasterStorage;
 +    private final String[] mClobNames;
 +
 +    private ClobReplicationTrigger(Storage<S> masterStorage, String[] clobNames) {
 +        mMasterStorage = masterStorage;
 +        mClobNames = clobNames;
 +    }
 +
 +    @Override
 +    public void afterInsert(S replica, Object state) {
 +        afterLoad(replica);
 +    }
 +
 +    @Override
 +    public void afterUpdate(S replica, Object state) {
 +        afterLoad(replica);
 +    }
 +
 +    @Override
 +    public void afterLoad(S replica) {
 +        for (String name : mClobNames) {
 +            if (!replica.isPropertySupported(name)) {
 +                continue;
 +            }
 +            Clob replicaClob = (Clob) replica.getPropertyValue(name);
 +            if (replicaClob != null) {
 +                if (replicaClob instanceof ClobReplicationTrigger.Replicated) {
 +                    if (((Replicated) replicaClob).parent() == this) {
 +                        continue;
 +                    }
 +                }
 +                Replicated clob = new Replicated(name, replica, replicaClob);
 +                replica.setPropertyValue(name, clob);
 +            }
 +        }
 +        replica.markAllPropertiesClean();
 +    }
 +
 +    /**
 +     * Writes go to master property first, and then to replica.
 +     */
 +    class Replicated extends AbstractClob {
 +        private static final int DEFAULT_BUFFER_SIZE = 4000;
 +
 +        private final String mClobName;
 +        private final S mReplica;
 +        private final Clob mReplicaClob;
 +
 +        private Clob mMasterClob;
 +        private boolean mMasterClobLoaded;
 +
 +        Replicated(String clobName, S replica, Clob replicaClob) {
 +            mClobName = clobName;
 +            mReplica = replica;
 +            mReplicaClob = replicaClob;
 +        }
 +
 +        public Reader openReader() throws FetchException {
 +            return mReplicaClob.openReader();
 +        }
 +
 +        public Reader openReader(long pos) throws FetchException {
 +            return mReplicaClob.openReader(pos);
 +        }
 +
 +        public Reader openReader(long pos, int bufferSize) throws FetchException {
 +            return mReplicaClob.openReader(pos, bufferSize);
 +        }
 +
 +        public long getLength() throws FetchException {
 +            return mReplicaClob.getLength();
 +        }
 +
 +        public String asString() throws FetchException {
 +            return mReplicaClob.asString();
 +        }
 +
 +        public Writer openWriter() throws PersistException {
 +            Clob masterClob = masterClob();
 +            if (masterClob == null) {
 +                return mReplicaClob.openWriter();
 +            } else {
 +                return openWriter(masterClob, 0, DEFAULT_BUFFER_SIZE);
 +            }
 +        }
 +
 +        public Writer openWriter(long pos) throws PersistException {
 +            Clob masterClob = masterClob();
 +            if (masterClob == null) {
 +                return mReplicaClob.openWriter(pos);
 +            } else {
 +                return openWriter(masterClob, pos, DEFAULT_BUFFER_SIZE);
 +            }
 +        }
 +
 +        public Writer openWriter(long pos, int bufferSize) throws PersistException {
 +            Clob masterClob = masterClob();
 +            if (masterClob == null) {
 +                return mReplicaClob.openWriter(pos, bufferSize);
 +            } else {
 +                return openWriter(masterClob, pos, bufferSize);
 +            }
 +        }
 +
 +        private Writer openWriter(Clob masterClob, long pos, int bufferSize)
 +            throws PersistException
 +        {
 +            if (bufferSize < DEFAULT_BUFFER_SIZE) {
 +                bufferSize = DEFAULT_BUFFER_SIZE;
 +            }
 +
 +            Writer masterOut = masterClob.openWriter(pos, 0);
 +            Writer replicaOut = mReplicaClob.openWriter(pos, 0);
 +
 +            return new BufferedWriter(new Copier(masterOut, replicaOut), bufferSize);
 +        }
 +
 +        public void setLength(long length) throws PersistException {
 +            Clob masterClob = masterClob();
 +            if (masterClob != null) {
 +                masterClob.setLength(length);
 +            }
 +            mReplicaClob.setLength(length);
 +        }
 +
 +        public Object getLocator() {
 +            return mReplicaClob.getLocator();
 +        }
 +
 +        @Override
 +        public int hashCode() {
 +            return mReplicaClob.hashCode();
 +        }
 +
 +        @Override
 +        public boolean equals(Object obj) {
 +            if (this == obj) {
 +                return true;
 +            }
 +            if (obj instanceof ClobReplicationTrigger.Replicated) {
 +                Replicated other = (Replicated) obj;
 +                return parent() == other.parent() &&
 +                    mReplicaClob.equals(other.mReplicaClob);
 +            }
 +            return false;
 +        }
 +
 +        @Override
 +        public String toString() {
 +            Object locator = getLocator();
 +            return locator == null ? super.toString() : ("ReplicatedClob@" + locator);
 +        }
 +
 +        ClobReplicationTrigger parent() {
 +            return ClobReplicationTrigger.this;
 +        }
 +
 +        /**
 +         * Returns null if not supported.
 +         */
 +        private Clob masterClob() throws PersistException {
 +            Clob masterClob = mMasterClob;
 +
 +            if (mMasterClobLoaded) {
 +                return masterClob;
 +            }
 +
 +            S master = mMasterStorage.prepare();
 +            mReplica.copyPrimaryKeyProperties(master);
 +
 +            try {
 +                // FIXME: handle missing master with resync
 +                master.load();
 +
 +                if (master.isPropertySupported(mClobName)) {
 +                    masterClob = (Clob) master.getPropertyValue(mClobName);
 +                    if (masterClob == null) {
 +                        // FIXME: perform resync, but still throw exception
 +                        throw new PersistNoneException("Master Clob is null: " + mClobName);
 +                    }
 +                }
 +
 +                mMasterClob = masterClob;
 +                mMasterClobLoaded = true;
 +
 +                return masterClob;
 +            } catch (FetchException e) {
 +                throw e.toPersistException();
 +            }
 +        }
 +    }
 +
 +    private static class Copier extends Writer {
 +        private final Writer mReplicaOut;
 +        private final Writer mMasterOut;
 +
 +        Copier(Writer master, Writer replica) {
 +            mMasterOut = master;
 +            mReplicaOut = replica;
 +        }
 +
 +        public void write(int c) throws IOException {
 +            mMasterOut.write(c);
 +            mReplicaOut.write(c);
 +        }
 +
 +        public void write(char[] c, int off, int len) throws IOException {
 +            mMasterOut.write(c, off, len);
 +            mReplicaOut.write(c, off, len);
 +        }
 +
 +        public void flush() throws IOException {
 +            mMasterOut.flush();
 +            mReplicaOut.flush();
 +        }
 +
 +        public void close() throws IOException {
 +            mMasterOut.close();
 +            mReplicaOut.close();
 +        }
 +    }
 +}
 diff --git a/src/main/java/com/amazon/carbonado/repo/replicated/ReplicationTrigger.java b/src/main/java/com/amazon/carbonado/repo/replicated/ReplicationTrigger.java index c3adeed..996a9f8 100644 --- a/src/main/java/com/amazon/carbonado/repo/replicated/ReplicationTrigger.java +++ b/src/main/java/com/amazon/carbonado/repo/replicated/ReplicationTrigger.java @@ -57,10 +57,22 @@ class ReplicationTrigger<S extends Storable> extends Trigger<S> {          mRepository = repository;
          mReplicaStorage = replicaStorage;
          mMasterStorage = masterStorage;
 +
          // Use TriggerManager to locally disable trigger execution during
          // resync and repairs.
          mTriggerManager = new TriggerManager<S>();
          mTriggerManager.addTrigger(this);
 +
 +        BlobReplicationTrigger<S> blobTrigger = BlobReplicationTrigger.create(masterStorage);
 +        if (blobTrigger != null) {
 +            mTriggerManager.addTrigger(blobTrigger);
 +        }
 +
 +        ClobReplicationTrigger<S> clobTrigger = ClobReplicationTrigger.create(masterStorage);
 +        if (clobTrigger != null) {
 +            mTriggerManager.addTrigger(clobTrigger);
 +        }
 +
          replicaStorage.addTrigger(mTriggerManager);
      }
 @@ -377,6 +389,7 @@ class ReplicationTrigger<S extends Storable> extends Trigger<S> {          tm.locallyDisableInsert();
          tm.locallyDisableUpdate();
          tm.locallyDisableDelete();
 +        tm.locallyDisableLoad();
      }
      void setReplicationEnabled() {
 @@ -384,5 +397,6 @@ class ReplicationTrigger<S extends Storable> extends Trigger<S> {          tm.locallyEnableInsert();
          tm.locallyEnableUpdate();
          tm.locallyEnableDelete();
 +        tm.locallyEnableLoad();
      }
  }
 diff --git a/src/main/java/com/amazon/carbonado/spi/LobEngine.java b/src/main/java/com/amazon/carbonado/spi/LobEngine.java index 8cc1bf9..e6fcb07 100644 --- a/src/main/java/com/amazon/carbonado/spi/LobEngine.java +++ b/src/main/java/com/amazon/carbonado/spi/LobEngine.java @@ -155,7 +155,8 @@ public class LobEngine {          if (lob == null) {
              return 0;
          }
 -        return ((LobImpl) lob).getLocator();
 +        Long locator = (Long) lob.getLocator();
 +        return locator == null ? 0 : locator;
      }
      /**
 @@ -468,12 +469,8 @@ public class LobEngine {          return trigger;
      }
 -    private interface LobImpl extends Lob {
 -        long getLocator();
 -    }
 -
 -    private class BlobImpl extends AbstractBlob implements LobImpl {
 -        final long mLocator;
 +    private class BlobImpl extends AbstractBlob implements Lob {
 +        final Long mLocator;
          final StoredLob mStoredLob;
          BlobImpl(long locator) {
 @@ -648,9 +645,13 @@ public class LobEngine {              }
          }
 +        public Long getLocator() {
 +            return mLocator;
 +        }
 +
          @Override
          public int hashCode() {
 -            return ((int) (mLocator >> 32)) ^ ((int) mLocator);
 +            return mLocator.hashCode();
          }
          @Override
 @@ -660,7 +661,7 @@ public class LobEngine {              }
              if (obj instanceof BlobImpl) {
                  BlobImpl other = (BlobImpl) obj;
 -                return LobEngine.this == other.getEnclosing() && mLocator == other.mLocator;
 +                return LobEngine.this == other.getEnclosing() && mLocator.equals(other.mLocator);
              }
              return false;
          }
 @@ -670,16 +671,12 @@ public class LobEngine {              return "Blob@" + getLocator();
          }
 -        public long getLocator() {
 -            return mLocator;
 -        }
 -
          LobEngine getEnclosing() {
              return LobEngine.this;
          }
      }
 -    private class ClobImpl extends BlobClob implements LobImpl {
 +    private class ClobImpl extends BlobClob implements Lob {
          ClobImpl(long locator) {
              super(new BlobImpl(locator));
          }
 @@ -688,6 +685,10 @@ public class LobEngine {              super(new BlobImpl(lob));
          }
 +        public Long getLocator() {
 +            return ((BlobImpl) super.getWrappedBlob()).getLocator();
 +        }
 +
          @Override
          public int hashCode() {
              return super.getWrappedBlob().hashCode();
 @@ -709,10 +710,6 @@ public class LobEngine {              return "Clob@" + getLocator();
          }
 -        public long getLocator() {
 -            return ((BlobImpl) super.getWrappedBlob()).getLocator();
 -        }
 -
          // Override to gain permission.
          protected BlobImpl getWrappedBlob() {
              return (BlobImpl) super.getWrappedBlob();
  | 
