diff options
| author | Brian S. O'Neill <bronee@gmail.com> | 2008-03-16 23:48:13 +0000 | 
|---|---|---|
| committer | Brian S. O'Neill <bronee@gmail.com> | 2008-03-16 23:48:13 +0000 | 
| commit | d025e69cce3ff409c0d45a821245800a830609cf (patch) | |
| tree | 8239c0f4b9cb65723df67ac7e3b338a5b798929f /src/main/java/com/amazon/carbonado/repo/replicated | |
| parent | 50fbad96036fe7b214c2a33ff3f9fef6abaf2593 (diff) | |
Support updates to replicated LOBs.
Diffstat (limited to 'src/main/java/com/amazon/carbonado/repo/replicated')
3 files changed, 612 insertions, 0 deletions
diff --git a/src/main/java/com/amazon/carbonado/repo/replicated/BlobReplicationTrigger.java b/src/main/java/com/amazon/carbonado/repo/replicated/BlobReplicationTrigger.java new file mode 100644 index 0000000..0657d50 --- /dev/null +++ b/src/main/java/com/amazon/carbonado/repo/replicated/BlobReplicationTrigger.java @@ -0,0 +1,304 @@ +/*
 + * Copyright 2008 Amazon Technologies, Inc. or its affiliates.
 + * Amazon, Amazon.com and Carbonado are trademarks or registered trademarks
 + * of Amazon Technologies, Inc. or its affiliates.  All rights reserved.
 + *
 + * Licensed under the Apache License, Version 2.0 (the "License");
 + * you may not use this file except in compliance with the License.
 + * You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package com.amazon.carbonado.repo.replicated;
 +
 +import java.io.BufferedOutputStream;
 +import java.io.InputStream;
 +import java.io.IOException;
 +import java.io.OutputStream;
 +
 +import java.nio.charset.Charset;
 +
 +import java.util.ArrayList;
 +import java.util.Map;
 +import java.util.List;
 +
 +import com.amazon.carbonado.FetchException;
 +import com.amazon.carbonado.PersistException;
 +import com.amazon.carbonado.PersistNoneException;
 +import com.amazon.carbonado.Storable;
 +import com.amazon.carbonado.Storage;
 +import com.amazon.carbonado.Trigger;
 +
 +import com.amazon.carbonado.info.StorableIntrospector;
 +import com.amazon.carbonado.info.StorableProperty;
 +
 +import com.amazon.carbonado.lob.AbstractBlob;
 +import com.amazon.carbonado.lob.Blob;
 +import com.amazon.carbonado.lob.ByteArrayBlob;
 +
 +/**
 + * After loading a replica, replaces all Blobs with ReplicatedBlobs.
 + *
 + * @author Brian S O'Neill
 + */
 +class BlobReplicationTrigger<S extends Storable> extends Trigger<S> {
 +    /**
 +     * Returns null if no Blobs need to be replicated.
 +     */
 +    static <S extends Storable> BlobReplicationTrigger<S> create(Storage<S> masterStorage) {
 +        Map<String, ? extends StorableProperty<S>> properties = 
 +            StorableIntrospector.examine(masterStorage.getStorableType()).getDataProperties();
 +
 +        List<String> blobNames = new ArrayList<String>(2);
 +
 +        for (StorableProperty<S> property : properties.values()) {
 +            if (property.getType() == Blob.class) {
 +                blobNames.add(property.getName());
 +            }
 +        }
 +
 +        if (blobNames.size() == 0) {
 +            return null;
 +        }
 +
 +        return new BlobReplicationTrigger<S>(masterStorage,
 +                                             blobNames.toArray(new String[blobNames.size()]));
 +    }
 +
 +    private final Storage<S> mMasterStorage;
 +    private final String[] mBlobNames;
 +
 +    private BlobReplicationTrigger(Storage<S> masterStorage, String[] blobNames) {
 +        mMasterStorage = masterStorage;
 +        mBlobNames = blobNames;
 +    }
 +
 +    @Override
 +    public void afterInsert(S replica, Object state) {
 +        afterLoad(replica);
 +    }
 +
 +    @Override
 +    public void afterUpdate(S replica, Object state) {
 +        afterLoad(replica);
 +    }
 +
 +    @Override
 +    public void afterLoad(S replica) {
 +        for (String name : mBlobNames) {
 +            if (!replica.isPropertySupported(name)) {
 +                continue;
 +            }
 +            Blob replicaBlob = (Blob) replica.getPropertyValue(name);
 +            if (replicaBlob != null) {
 +                if (replicaBlob instanceof BlobReplicationTrigger.Replicated) {
 +                    if (((Replicated) replicaBlob).parent() == this) {
 +                        continue;
 +                    }
 +                }
 +                Replicated blob = new Replicated(name, replica, replicaBlob);
 +                replica.setPropertyValue(name, blob);
 +            }
 +        }
 +        replica.markAllPropertiesClean();
 +    }
 +
 +    /**
 +     * Writes go to master property first, and then to replica.
 +     */
 +    class Replicated extends AbstractBlob {
 +        private static final int DEFAULT_BUFFER_SIZE = 4000;
 +
 +        private final String mBlobName;
 +        private final S mReplica;
 +        private final Blob mReplicaBlob;
 +
 +        private Blob mMasterBlob;
 +        private boolean mMasterBlobLoaded;
 +
 +        Replicated(String blobName, S replica, Blob replicaBlob) {
 +            mBlobName = blobName;
 +            mReplica = replica;
 +            mReplicaBlob = replicaBlob;
 +        }
 +
 +        public InputStream openInputStream() throws FetchException {
 +            return mReplicaBlob.openInputStream();
 +        }
 +
 +        public InputStream openInputStream(long pos) throws FetchException {
 +            return mReplicaBlob.openInputStream(pos);
 +        }
 +
 +        public InputStream openInputStream(long pos, int bufferSize) throws FetchException {
 +            return mReplicaBlob.openInputStream(pos, bufferSize);
 +        }
 +
 +        public long getLength() throws FetchException {
 +            return mReplicaBlob.getLength();
 +        }
 +
 +        public String asString() throws FetchException {
 +            return mReplicaBlob.asString();
 +        }
 +
 +        public String asString(String charsetName) throws FetchException {
 +            return mReplicaBlob.asString(charsetName);
 +        }
 +
 +        public String asString(Charset charset) throws FetchException {
 +            return mReplicaBlob.asString(charset);
 +        }
 +
 +        public OutputStream openOutputStream() throws PersistException {
 +            Blob masterBlob = masterBlob();
 +            if (masterBlob == null) {
 +                return mReplicaBlob.openOutputStream();
 +            } else {
 +                return openOutputStream(masterBlob, 0, DEFAULT_BUFFER_SIZE);
 +            }
 +        }
 +
 +        public OutputStream openOutputStream(long pos) throws PersistException {
 +            Blob masterBlob = masterBlob();
 +            if (masterBlob == null) {
 +                return mReplicaBlob.openOutputStream(pos);
 +            } else {
 +                return openOutputStream(masterBlob, pos, DEFAULT_BUFFER_SIZE);
 +            }
 +        }
 +
 +        public OutputStream openOutputStream(long pos, int bufferSize) throws PersistException {
 +            Blob masterBlob = masterBlob();
 +            if (masterBlob == null) {
 +                return mReplicaBlob.openOutputStream(pos, bufferSize);
 +            } else {
 +                return openOutputStream(masterBlob, pos, bufferSize);
 +            }
 +        }
 +
 +        private OutputStream openOutputStream(Blob masterBlob, long pos, int bufferSize)
 +            throws PersistException
 +        {
 +            if (bufferSize < DEFAULT_BUFFER_SIZE) {
 +                bufferSize = DEFAULT_BUFFER_SIZE;
 +            }
 +
 +            OutputStream masterOut = masterBlob.openOutputStream(pos, 0);
 +            OutputStream replicaOut = mReplicaBlob.openOutputStream(pos, 0);
 +
 +            return new BufferedOutputStream(new Copier(masterOut, replicaOut), bufferSize);
 +        }
 +
 +        public void setLength(long length) throws PersistException {
 +            Blob masterBlob = masterBlob();
 +            if (masterBlob != null) {
 +                masterBlob.setLength(length);
 +            }
 +            mReplicaBlob.setLength(length);
 +        }
 +
 +        public Object getLocator() {
 +            return mReplicaBlob.getLocator();
 +        }
 +
 +        @Override
 +        public int hashCode() {
 +            return mReplicaBlob.hashCode();
 +        }
 +
 +        @Override
 +        public boolean equals(Object obj) {
 +            if (this == obj) {
 +                return true;
 +            }
 +            if (obj instanceof BlobReplicationTrigger.Replicated) {
 +                Replicated other = (Replicated) obj;
 +                return parent() == other.parent() &&
 +                    mReplicaBlob.equals(other.mReplicaBlob);
 +            }
 +            return false;
 +        }
 +
 +        @Override
 +        public String toString() {
 +            Object locator = getLocator();
 +            return locator == null ? super.toString() : ("ReplicatedBlob@" + locator);
 +        }
 +
 +        BlobReplicationTrigger parent() {
 +            return BlobReplicationTrigger.this;
 +        }
 +
 +        /**
 +         * Returns null if not supported.
 +         */
 +        private Blob masterBlob() throws PersistException {
 +            Blob masterBlob = mMasterBlob;
 +
 +            if (mMasterBlobLoaded) {
 +                return masterBlob;
 +            }
 +
 +            S master = mMasterStorage.prepare();
 +            mReplica.copyPrimaryKeyProperties(master);
 +
 +            try {
 +                // FIXME: handle missing master with resync
 +                master.load();
 +
 +                if (master.isPropertySupported(mBlobName)) {
 +                    masterBlob = (Blob) master.getPropertyValue(mBlobName);
 +                    if (masterBlob == null) {
 +                        // FIXME: perform resync, but still throw exception
 +                        throw new PersistNoneException("Master Blob is null: " + mBlobName);
 +                    }
 +                }
 +
 +                mMasterBlob = masterBlob;
 +                mMasterBlobLoaded = true;
 +
 +                return masterBlob;
 +            } catch (FetchException e) {
 +                throw e.toPersistException();
 +            }
 +        }
 +    }
 +
 +    private static class Copier extends OutputStream {
 +        private final OutputStream mReplicaOut;
 +        private final OutputStream mMasterOut;
 +
 +        Copier(OutputStream master, OutputStream replica) {
 +            mMasterOut = master;
 +            mReplicaOut = replica;
 +        }
 +
 +        public void write(int b) throws IOException {
 +            mMasterOut.write(b);
 +            mReplicaOut.write(b);
 +        }
 +
 +        public void write(byte[] b, int off, int len) throws IOException {
 +            mMasterOut.write(b, off, len);
 +            mReplicaOut.write(b, off, len);
 +        }
 +
 +        public void flush() throws IOException {
 +            mMasterOut.flush();
 +            mReplicaOut.flush();
 +        }
 +
 +        public void close() throws IOException {
 +            mMasterOut.close();
 +            mReplicaOut.close();
 +        }
 +    }
 +}
 diff --git a/src/main/java/com/amazon/carbonado/repo/replicated/ClobReplicationTrigger.java b/src/main/java/com/amazon/carbonado/repo/replicated/ClobReplicationTrigger.java new file mode 100644 index 0000000..5da6eed --- /dev/null +++ b/src/main/java/com/amazon/carbonado/repo/replicated/ClobReplicationTrigger.java @@ -0,0 +1,294 @@ +/*
 + * Copyright 2008 Amazon Technologies, Inc. or its affiliates.
 + * Amazon, Amazon.com and Carbonado are trademarks or registered trademarks
 + * of Amazon Technologies, Inc. or its affiliates.  All rights reserved.
 + *
 + * Licensed under the Apache License, Version 2.0 (the "License");
 + * you may not use this file except in compliance with the License.
 + * You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package com.amazon.carbonado.repo.replicated;
 +
 +import java.io.BufferedWriter;
 +import java.io.IOException;
 +import java.io.Reader;
 +import java.io.Writer;
 +
 +import java.util.ArrayList;
 +import java.util.Map;
 +import java.util.List;
 +
 +import com.amazon.carbonado.FetchException;
 +import com.amazon.carbonado.PersistException;
 +import com.amazon.carbonado.PersistNoneException;
 +import com.amazon.carbonado.Storable;
 +import com.amazon.carbonado.Storage;
 +import com.amazon.carbonado.Trigger;
 +
 +import com.amazon.carbonado.info.StorableIntrospector;
 +import com.amazon.carbonado.info.StorableProperty;
 +
 +import com.amazon.carbonado.lob.AbstractClob;
 +import com.amazon.carbonado.lob.Clob;
 +import com.amazon.carbonado.lob.CharArrayClob;
 +
 +/**
 + * After loading a replica, replaces all Clobs with ReplicatedClobs.
 + *
 + * @author Brian S O'Neill
 + */
 +class ClobReplicationTrigger<S extends Storable> extends Trigger<S> {
 +    /**
 +     * Returns null if no Clobs need to be replicated.
 +     */
 +    static <S extends Storable> ClobReplicationTrigger<S> create(Storage<S> masterStorage) {
 +        Map<String, ? extends StorableProperty<S>> properties = 
 +            StorableIntrospector.examine(masterStorage.getStorableType()).getDataProperties();
 +
 +        List<String> clobNames = new ArrayList<String>(2);
 +
 +        for (StorableProperty<S> property : properties.values()) {
 +            if (property.getType() == Clob.class) {
 +                clobNames.add(property.getName());
 +            }
 +        }
 +
 +        if (clobNames.size() == 0) {
 +            return null;
 +        }
 +
 +        return new ClobReplicationTrigger<S>(masterStorage,
 +                                             clobNames.toArray(new String[clobNames.size()]));
 +    }
 +
 +    private final Storage<S> mMasterStorage;
 +    private final String[] mClobNames;
 +
 +    private ClobReplicationTrigger(Storage<S> masterStorage, String[] clobNames) {
 +        mMasterStorage = masterStorage;
 +        mClobNames = clobNames;
 +    }
 +
 +    @Override
 +    public void afterInsert(S replica, Object state) {
 +        afterLoad(replica);
 +    }
 +
 +    @Override
 +    public void afterUpdate(S replica, Object state) {
 +        afterLoad(replica);
 +    }
 +
 +    @Override
 +    public void afterLoad(S replica) {
 +        for (String name : mClobNames) {
 +            if (!replica.isPropertySupported(name)) {
 +                continue;
 +            }
 +            Clob replicaClob = (Clob) replica.getPropertyValue(name);
 +            if (replicaClob != null) {
 +                if (replicaClob instanceof ClobReplicationTrigger.Replicated) {
 +                    if (((Replicated) replicaClob).parent() == this) {
 +                        continue;
 +                    }
 +                }
 +                Replicated clob = new Replicated(name, replica, replicaClob);
 +                replica.setPropertyValue(name, clob);
 +            }
 +        }
 +        replica.markAllPropertiesClean();
 +    }
 +
 +    /**
 +     * Writes go to master property first, and then to replica.
 +     */
 +    class Replicated extends AbstractClob {
 +        private static final int DEFAULT_BUFFER_SIZE = 4000;
 +
 +        private final String mClobName;
 +        private final S mReplica;
 +        private final Clob mReplicaClob;
 +
 +        private Clob mMasterClob;
 +        private boolean mMasterClobLoaded;
 +
 +        Replicated(String clobName, S replica, Clob replicaClob) {
 +            mClobName = clobName;
 +            mReplica = replica;
 +            mReplicaClob = replicaClob;
 +        }
 +
 +        public Reader openReader() throws FetchException {
 +            return mReplicaClob.openReader();
 +        }
 +
 +        public Reader openReader(long pos) throws FetchException {
 +            return mReplicaClob.openReader(pos);
 +        }
 +
 +        public Reader openReader(long pos, int bufferSize) throws FetchException {
 +            return mReplicaClob.openReader(pos, bufferSize);
 +        }
 +
 +        public long getLength() throws FetchException {
 +            return mReplicaClob.getLength();
 +        }
 +
 +        public String asString() throws FetchException {
 +            return mReplicaClob.asString();
 +        }
 +
 +        public Writer openWriter() throws PersistException {
 +            Clob masterClob = masterClob();
 +            if (masterClob == null) {
 +                return mReplicaClob.openWriter();
 +            } else {
 +                return openWriter(masterClob, 0, DEFAULT_BUFFER_SIZE);
 +            }
 +        }
 +
 +        public Writer openWriter(long pos) throws PersistException {
 +            Clob masterClob = masterClob();
 +            if (masterClob == null) {
 +                return mReplicaClob.openWriter(pos);
 +            } else {
 +                return openWriter(masterClob, pos, DEFAULT_BUFFER_SIZE);
 +            }
 +        }
 +
 +        public Writer openWriter(long pos, int bufferSize) throws PersistException {
 +            Clob masterClob = masterClob();
 +            if (masterClob == null) {
 +                return mReplicaClob.openWriter(pos, bufferSize);
 +            } else {
 +                return openWriter(masterClob, pos, bufferSize);
 +            }
 +        }
 +
 +        private Writer openWriter(Clob masterClob, long pos, int bufferSize)
 +            throws PersistException
 +        {
 +            if (bufferSize < DEFAULT_BUFFER_SIZE) {
 +                bufferSize = DEFAULT_BUFFER_SIZE;
 +            }
 +
 +            Writer masterOut = masterClob.openWriter(pos, 0);
 +            Writer replicaOut = mReplicaClob.openWriter(pos, 0);
 +
 +            return new BufferedWriter(new Copier(masterOut, replicaOut), bufferSize);
 +        }
 +
 +        public void setLength(long length) throws PersistException {
 +            Clob masterClob = masterClob();
 +            if (masterClob != null) {
 +                masterClob.setLength(length);
 +            }
 +            mReplicaClob.setLength(length);
 +        }
 +
 +        public Object getLocator() {
 +            return mReplicaClob.getLocator();
 +        }
 +
 +        @Override
 +        public int hashCode() {
 +            return mReplicaClob.hashCode();
 +        }
 +
 +        @Override
 +        public boolean equals(Object obj) {
 +            if (this == obj) {
 +                return true;
 +            }
 +            if (obj instanceof ClobReplicationTrigger.Replicated) {
 +                Replicated other = (Replicated) obj;
 +                return parent() == other.parent() &&
 +                    mReplicaClob.equals(other.mReplicaClob);
 +            }
 +            return false;
 +        }
 +
 +        @Override
 +        public String toString() {
 +            Object locator = getLocator();
 +            return locator == null ? super.toString() : ("ReplicatedClob@" + locator);
 +        }
 +
 +        ClobReplicationTrigger parent() {
 +            return ClobReplicationTrigger.this;
 +        }
 +
 +        /**
 +         * Returns null if not supported.
 +         */
 +        private Clob masterClob() throws PersistException {
 +            Clob masterClob = mMasterClob;
 +
 +            if (mMasterClobLoaded) {
 +                return masterClob;
 +            }
 +
 +            S master = mMasterStorage.prepare();
 +            mReplica.copyPrimaryKeyProperties(master);
 +
 +            try {
 +                // FIXME: handle missing master with resync
 +                master.load();
 +
 +                if (master.isPropertySupported(mClobName)) {
 +                    masterClob = (Clob) master.getPropertyValue(mClobName);
 +                    if (masterClob == null) {
 +                        // FIXME: perform resync, but still throw exception
 +                        throw new PersistNoneException("Master Clob is null: " + mClobName);
 +                    }
 +                }
 +
 +                mMasterClob = masterClob;
 +                mMasterClobLoaded = true;
 +
 +                return masterClob;
 +            } catch (FetchException e) {
 +                throw e.toPersistException();
 +            }
 +        }
 +    }
 +
 +    private static class Copier extends Writer {
 +        private final Writer mReplicaOut;
 +        private final Writer mMasterOut;
 +
 +        Copier(Writer master, Writer replica) {
 +            mMasterOut = master;
 +            mReplicaOut = replica;
 +        }
 +
 +        public void write(int c) throws IOException {
 +            mMasterOut.write(c);
 +            mReplicaOut.write(c);
 +        }
 +
 +        public void write(char[] c, int off, int len) throws IOException {
 +            mMasterOut.write(c, off, len);
 +            mReplicaOut.write(c, off, len);
 +        }
 +
 +        public void flush() throws IOException {
 +            mMasterOut.flush();
 +            mReplicaOut.flush();
 +        }
 +
 +        public void close() throws IOException {
 +            mMasterOut.close();
 +            mReplicaOut.close();
 +        }
 +    }
 +}
 diff --git a/src/main/java/com/amazon/carbonado/repo/replicated/ReplicationTrigger.java b/src/main/java/com/amazon/carbonado/repo/replicated/ReplicationTrigger.java index c3adeed..996a9f8 100644 --- a/src/main/java/com/amazon/carbonado/repo/replicated/ReplicationTrigger.java +++ b/src/main/java/com/amazon/carbonado/repo/replicated/ReplicationTrigger.java @@ -57,10 +57,22 @@ class ReplicationTrigger<S extends Storable> extends Trigger<S> {          mRepository = repository;
          mReplicaStorage = replicaStorage;
          mMasterStorage = masterStorage;
 +
          // Use TriggerManager to locally disable trigger execution during
          // resync and repairs.
          mTriggerManager = new TriggerManager<S>();
          mTriggerManager.addTrigger(this);
 +
 +        BlobReplicationTrigger<S> blobTrigger = BlobReplicationTrigger.create(masterStorage);
 +        if (blobTrigger != null) {
 +            mTriggerManager.addTrigger(blobTrigger);
 +        }
 +
 +        ClobReplicationTrigger<S> clobTrigger = ClobReplicationTrigger.create(masterStorage);
 +        if (clobTrigger != null) {
 +            mTriggerManager.addTrigger(clobTrigger);
 +        }
 +
          replicaStorage.addTrigger(mTriggerManager);
      }
 @@ -377,6 +389,7 @@ class ReplicationTrigger<S extends Storable> extends Trigger<S> {          tm.locallyDisableInsert();
          tm.locallyDisableUpdate();
          tm.locallyDisableDelete();
 +        tm.locallyDisableLoad();
      }
      void setReplicationEnabled() {
 @@ -384,5 +397,6 @@ class ReplicationTrigger<S extends Storable> extends Trigger<S> {          tm.locallyEnableInsert();
          tm.locallyEnableUpdate();
          tm.locallyEnableDelete();
 +        tm.locallyEnableLoad();
      }
  }
  | 
