From d025e69cce3ff409c0d45a821245800a830609cf Mon Sep 17 00:00:00 2001 From: "Brian S. O'Neill" Date: Sun, 16 Mar 2008 23:48:13 +0000 Subject: Support updates to replicated LOBs. --- .../repo/replicated/BlobReplicationTrigger.java | 304 +++++++++++++++++++++ .../repo/replicated/ClobReplicationTrigger.java | 294 ++++++++++++++++++++ .../repo/replicated/ReplicationTrigger.java | 14 + 3 files changed, 612 insertions(+) create mode 100644 src/main/java/com/amazon/carbonado/repo/replicated/BlobReplicationTrigger.java create mode 100644 src/main/java/com/amazon/carbonado/repo/replicated/ClobReplicationTrigger.java (limited to 'src/main/java/com/amazon/carbonado/repo/replicated') diff --git a/src/main/java/com/amazon/carbonado/repo/replicated/BlobReplicationTrigger.java b/src/main/java/com/amazon/carbonado/repo/replicated/BlobReplicationTrigger.java new file mode 100644 index 0000000..0657d50 --- /dev/null +++ b/src/main/java/com/amazon/carbonado/repo/replicated/BlobReplicationTrigger.java @@ -0,0 +1,304 @@ +/* + * Copyright 2008 Amazon Technologies, Inc. or its affiliates. + * Amazon, Amazon.com and Carbonado are trademarks or registered trademarks + * of Amazon Technologies, Inc. or its affiliates. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.amazon.carbonado.repo.replicated; + +import java.io.BufferedOutputStream; +import java.io.InputStream; +import java.io.IOException; +import java.io.OutputStream; + +import java.nio.charset.Charset; + +import java.util.ArrayList; +import java.util.Map; +import java.util.List; + +import com.amazon.carbonado.FetchException; +import com.amazon.carbonado.PersistException; +import com.amazon.carbonado.PersistNoneException; +import com.amazon.carbonado.Storable; +import com.amazon.carbonado.Storage; +import com.amazon.carbonado.Trigger; + +import com.amazon.carbonado.info.StorableIntrospector; +import com.amazon.carbonado.info.StorableProperty; + +import com.amazon.carbonado.lob.AbstractBlob; +import com.amazon.carbonado.lob.Blob; +import com.amazon.carbonado.lob.ByteArrayBlob; + +/** + * After loading a replica, replaces all Blobs with ReplicatedBlobs. + * + * @author Brian S O'Neill + */ +class BlobReplicationTrigger extends Trigger { + /** + * Returns null if no Blobs need to be replicated. + */ + static BlobReplicationTrigger create(Storage masterStorage) { + Map> properties = + StorableIntrospector.examine(masterStorage.getStorableType()).getDataProperties(); + + List blobNames = new ArrayList(2); + + for (StorableProperty property : properties.values()) { + if (property.getType() == Blob.class) { + blobNames.add(property.getName()); + } + } + + if (blobNames.size() == 0) { + return null; + } + + return new BlobReplicationTrigger(masterStorage, + blobNames.toArray(new String[blobNames.size()])); + } + + private final Storage mMasterStorage; + private final String[] mBlobNames; + + private BlobReplicationTrigger(Storage masterStorage, String[] blobNames) { + mMasterStorage = masterStorage; + mBlobNames = blobNames; + } + + @Override + public void afterInsert(S replica, Object state) { + afterLoad(replica); + } + + @Override + public void afterUpdate(S replica, Object state) { + afterLoad(replica); + } + + @Override + public void afterLoad(S replica) { + for (String name : mBlobNames) { + if (!replica.isPropertySupported(name)) { + continue; + } + Blob replicaBlob = (Blob) replica.getPropertyValue(name); + if (replicaBlob != null) { + if (replicaBlob instanceof BlobReplicationTrigger.Replicated) { + if (((Replicated) replicaBlob).parent() == this) { + continue; + } + } + Replicated blob = new Replicated(name, replica, replicaBlob); + replica.setPropertyValue(name, blob); + } + } + replica.markAllPropertiesClean(); + } + + /** + * Writes go to master property first, and then to replica. + */ + class Replicated extends AbstractBlob { + private static final int DEFAULT_BUFFER_SIZE = 4000; + + private final String mBlobName; + private final S mReplica; + private final Blob mReplicaBlob; + + private Blob mMasterBlob; + private boolean mMasterBlobLoaded; + + Replicated(String blobName, S replica, Blob replicaBlob) { + mBlobName = blobName; + mReplica = replica; + mReplicaBlob = replicaBlob; + } + + public InputStream openInputStream() throws FetchException { + return mReplicaBlob.openInputStream(); + } + + public InputStream openInputStream(long pos) throws FetchException { + return mReplicaBlob.openInputStream(pos); + } + + public InputStream openInputStream(long pos, int bufferSize) throws FetchException { + return mReplicaBlob.openInputStream(pos, bufferSize); + } + + public long getLength() throws FetchException { + return mReplicaBlob.getLength(); + } + + public String asString() throws FetchException { + return mReplicaBlob.asString(); + } + + public String asString(String charsetName) throws FetchException { + return mReplicaBlob.asString(charsetName); + } + + public String asString(Charset charset) throws FetchException { + return mReplicaBlob.asString(charset); + } + + public OutputStream openOutputStream() throws PersistException { + Blob masterBlob = masterBlob(); + if (masterBlob == null) { + return mReplicaBlob.openOutputStream(); + } else { + return openOutputStream(masterBlob, 0, DEFAULT_BUFFER_SIZE); + } + } + + public OutputStream openOutputStream(long pos) throws PersistException { + Blob masterBlob = masterBlob(); + if (masterBlob == null) { + return mReplicaBlob.openOutputStream(pos); + } else { + return openOutputStream(masterBlob, pos, DEFAULT_BUFFER_SIZE); + } + } + + public OutputStream openOutputStream(long pos, int bufferSize) throws PersistException { + Blob masterBlob = masterBlob(); + if (masterBlob == null) { + return mReplicaBlob.openOutputStream(pos, bufferSize); + } else { + return openOutputStream(masterBlob, pos, bufferSize); + } + } + + private OutputStream openOutputStream(Blob masterBlob, long pos, int bufferSize) + throws PersistException + { + if (bufferSize < DEFAULT_BUFFER_SIZE) { + bufferSize = DEFAULT_BUFFER_SIZE; + } + + OutputStream masterOut = masterBlob.openOutputStream(pos, 0); + OutputStream replicaOut = mReplicaBlob.openOutputStream(pos, 0); + + return new BufferedOutputStream(new Copier(masterOut, replicaOut), bufferSize); + } + + public void setLength(long length) throws PersistException { + Blob masterBlob = masterBlob(); + if (masterBlob != null) { + masterBlob.setLength(length); + } + mReplicaBlob.setLength(length); + } + + public Object getLocator() { + return mReplicaBlob.getLocator(); + } + + @Override + public int hashCode() { + return mReplicaBlob.hashCode(); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj instanceof BlobReplicationTrigger.Replicated) { + Replicated other = (Replicated) obj; + return parent() == other.parent() && + mReplicaBlob.equals(other.mReplicaBlob); + } + return false; + } + + @Override + public String toString() { + Object locator = getLocator(); + return locator == null ? super.toString() : ("ReplicatedBlob@" + locator); + } + + BlobReplicationTrigger parent() { + return BlobReplicationTrigger.this; + } + + /** + * Returns null if not supported. + */ + private Blob masterBlob() throws PersistException { + Blob masterBlob = mMasterBlob; + + if (mMasterBlobLoaded) { + return masterBlob; + } + + S master = mMasterStorage.prepare(); + mReplica.copyPrimaryKeyProperties(master); + + try { + // FIXME: handle missing master with resync + master.load(); + + if (master.isPropertySupported(mBlobName)) { + masterBlob = (Blob) master.getPropertyValue(mBlobName); + if (masterBlob == null) { + // FIXME: perform resync, but still throw exception + throw new PersistNoneException("Master Blob is null: " + mBlobName); + } + } + + mMasterBlob = masterBlob; + mMasterBlobLoaded = true; + + return masterBlob; + } catch (FetchException e) { + throw e.toPersistException(); + } + } + } + + private static class Copier extends OutputStream { + private final OutputStream mReplicaOut; + private final OutputStream mMasterOut; + + Copier(OutputStream master, OutputStream replica) { + mMasterOut = master; + mReplicaOut = replica; + } + + public void write(int b) throws IOException { + mMasterOut.write(b); + mReplicaOut.write(b); + } + + public void write(byte[] b, int off, int len) throws IOException { + mMasterOut.write(b, off, len); + mReplicaOut.write(b, off, len); + } + + public void flush() throws IOException { + mMasterOut.flush(); + mReplicaOut.flush(); + } + + public void close() throws IOException { + mMasterOut.close(); + mReplicaOut.close(); + } + } +} diff --git a/src/main/java/com/amazon/carbonado/repo/replicated/ClobReplicationTrigger.java b/src/main/java/com/amazon/carbonado/repo/replicated/ClobReplicationTrigger.java new file mode 100644 index 0000000..5da6eed --- /dev/null +++ b/src/main/java/com/amazon/carbonado/repo/replicated/ClobReplicationTrigger.java @@ -0,0 +1,294 @@ +/* + * Copyright 2008 Amazon Technologies, Inc. or its affiliates. + * Amazon, Amazon.com and Carbonado are trademarks or registered trademarks + * of Amazon Technologies, Inc. or its affiliates. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.amazon.carbonado.repo.replicated; + +import java.io.BufferedWriter; +import java.io.IOException; +import java.io.Reader; +import java.io.Writer; + +import java.util.ArrayList; +import java.util.Map; +import java.util.List; + +import com.amazon.carbonado.FetchException; +import com.amazon.carbonado.PersistException; +import com.amazon.carbonado.PersistNoneException; +import com.amazon.carbonado.Storable; +import com.amazon.carbonado.Storage; +import com.amazon.carbonado.Trigger; + +import com.amazon.carbonado.info.StorableIntrospector; +import com.amazon.carbonado.info.StorableProperty; + +import com.amazon.carbonado.lob.AbstractClob; +import com.amazon.carbonado.lob.Clob; +import com.amazon.carbonado.lob.CharArrayClob; + +/** + * After loading a replica, replaces all Clobs with ReplicatedClobs. + * + * @author Brian S O'Neill + */ +class ClobReplicationTrigger extends Trigger { + /** + * Returns null if no Clobs need to be replicated. + */ + static ClobReplicationTrigger create(Storage masterStorage) { + Map> properties = + StorableIntrospector.examine(masterStorage.getStorableType()).getDataProperties(); + + List clobNames = new ArrayList(2); + + for (StorableProperty property : properties.values()) { + if (property.getType() == Clob.class) { + clobNames.add(property.getName()); + } + } + + if (clobNames.size() == 0) { + return null; + } + + return new ClobReplicationTrigger(masterStorage, + clobNames.toArray(new String[clobNames.size()])); + } + + private final Storage mMasterStorage; + private final String[] mClobNames; + + private ClobReplicationTrigger(Storage masterStorage, String[] clobNames) { + mMasterStorage = masterStorage; + mClobNames = clobNames; + } + + @Override + public void afterInsert(S replica, Object state) { + afterLoad(replica); + } + + @Override + public void afterUpdate(S replica, Object state) { + afterLoad(replica); + } + + @Override + public void afterLoad(S replica) { + for (String name : mClobNames) { + if (!replica.isPropertySupported(name)) { + continue; + } + Clob replicaClob = (Clob) replica.getPropertyValue(name); + if (replicaClob != null) { + if (replicaClob instanceof ClobReplicationTrigger.Replicated) { + if (((Replicated) replicaClob).parent() == this) { + continue; + } + } + Replicated clob = new Replicated(name, replica, replicaClob); + replica.setPropertyValue(name, clob); + } + } + replica.markAllPropertiesClean(); + } + + /** + * Writes go to master property first, and then to replica. + */ + class Replicated extends AbstractClob { + private static final int DEFAULT_BUFFER_SIZE = 4000; + + private final String mClobName; + private final S mReplica; + private final Clob mReplicaClob; + + private Clob mMasterClob; + private boolean mMasterClobLoaded; + + Replicated(String clobName, S replica, Clob replicaClob) { + mClobName = clobName; + mReplica = replica; + mReplicaClob = replicaClob; + } + + public Reader openReader() throws FetchException { + return mReplicaClob.openReader(); + } + + public Reader openReader(long pos) throws FetchException { + return mReplicaClob.openReader(pos); + } + + public Reader openReader(long pos, int bufferSize) throws FetchException { + return mReplicaClob.openReader(pos, bufferSize); + } + + public long getLength() throws FetchException { + return mReplicaClob.getLength(); + } + + public String asString() throws FetchException { + return mReplicaClob.asString(); + } + + public Writer openWriter() throws PersistException { + Clob masterClob = masterClob(); + if (masterClob == null) { + return mReplicaClob.openWriter(); + } else { + return openWriter(masterClob, 0, DEFAULT_BUFFER_SIZE); + } + } + + public Writer openWriter(long pos) throws PersistException { + Clob masterClob = masterClob(); + if (masterClob == null) { + return mReplicaClob.openWriter(pos); + } else { + return openWriter(masterClob, pos, DEFAULT_BUFFER_SIZE); + } + } + + public Writer openWriter(long pos, int bufferSize) throws PersistException { + Clob masterClob = masterClob(); + if (masterClob == null) { + return mReplicaClob.openWriter(pos, bufferSize); + } else { + return openWriter(masterClob, pos, bufferSize); + } + } + + private Writer openWriter(Clob masterClob, long pos, int bufferSize) + throws PersistException + { + if (bufferSize < DEFAULT_BUFFER_SIZE) { + bufferSize = DEFAULT_BUFFER_SIZE; + } + + Writer masterOut = masterClob.openWriter(pos, 0); + Writer replicaOut = mReplicaClob.openWriter(pos, 0); + + return new BufferedWriter(new Copier(masterOut, replicaOut), bufferSize); + } + + public void setLength(long length) throws PersistException { + Clob masterClob = masterClob(); + if (masterClob != null) { + masterClob.setLength(length); + } + mReplicaClob.setLength(length); + } + + public Object getLocator() { + return mReplicaClob.getLocator(); + } + + @Override + public int hashCode() { + return mReplicaClob.hashCode(); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj instanceof ClobReplicationTrigger.Replicated) { + Replicated other = (Replicated) obj; + return parent() == other.parent() && + mReplicaClob.equals(other.mReplicaClob); + } + return false; + } + + @Override + public String toString() { + Object locator = getLocator(); + return locator == null ? super.toString() : ("ReplicatedClob@" + locator); + } + + ClobReplicationTrigger parent() { + return ClobReplicationTrigger.this; + } + + /** + * Returns null if not supported. + */ + private Clob masterClob() throws PersistException { + Clob masterClob = mMasterClob; + + if (mMasterClobLoaded) { + return masterClob; + } + + S master = mMasterStorage.prepare(); + mReplica.copyPrimaryKeyProperties(master); + + try { + // FIXME: handle missing master with resync + master.load(); + + if (master.isPropertySupported(mClobName)) { + masterClob = (Clob) master.getPropertyValue(mClobName); + if (masterClob == null) { + // FIXME: perform resync, but still throw exception + throw new PersistNoneException("Master Clob is null: " + mClobName); + } + } + + mMasterClob = masterClob; + mMasterClobLoaded = true; + + return masterClob; + } catch (FetchException e) { + throw e.toPersistException(); + } + } + } + + private static class Copier extends Writer { + private final Writer mReplicaOut; + private final Writer mMasterOut; + + Copier(Writer master, Writer replica) { + mMasterOut = master; + mReplicaOut = replica; + } + + public void write(int c) throws IOException { + mMasterOut.write(c); + mReplicaOut.write(c); + } + + public void write(char[] c, int off, int len) throws IOException { + mMasterOut.write(c, off, len); + mReplicaOut.write(c, off, len); + } + + public void flush() throws IOException { + mMasterOut.flush(); + mReplicaOut.flush(); + } + + public void close() throws IOException { + mMasterOut.close(); + mReplicaOut.close(); + } + } +} diff --git a/src/main/java/com/amazon/carbonado/repo/replicated/ReplicationTrigger.java b/src/main/java/com/amazon/carbonado/repo/replicated/ReplicationTrigger.java index c3adeed..996a9f8 100644 --- a/src/main/java/com/amazon/carbonado/repo/replicated/ReplicationTrigger.java +++ b/src/main/java/com/amazon/carbonado/repo/replicated/ReplicationTrigger.java @@ -57,10 +57,22 @@ class ReplicationTrigger extends Trigger { mRepository = repository; mReplicaStorage = replicaStorage; mMasterStorage = masterStorage; + // Use TriggerManager to locally disable trigger execution during // resync and repairs. mTriggerManager = new TriggerManager(); mTriggerManager.addTrigger(this); + + BlobReplicationTrigger blobTrigger = BlobReplicationTrigger.create(masterStorage); + if (blobTrigger != null) { + mTriggerManager.addTrigger(blobTrigger); + } + + ClobReplicationTrigger clobTrigger = ClobReplicationTrigger.create(masterStorage); + if (clobTrigger != null) { + mTriggerManager.addTrigger(clobTrigger); + } + replicaStorage.addTrigger(mTriggerManager); } @@ -377,6 +389,7 @@ class ReplicationTrigger extends Trigger { tm.locallyDisableInsert(); tm.locallyDisableUpdate(); tm.locallyDisableDelete(); + tm.locallyDisableLoad(); } void setReplicationEnabled() { @@ -384,5 +397,6 @@ class ReplicationTrigger extends Trigger { tm.locallyEnableInsert(); tm.locallyEnableUpdate(); tm.locallyEnableDelete(); + tm.locallyEnableLoad(); } } -- cgit v1.2.3