summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/main/java/com/amazon/carbonado/lob/AbstractBlob.java27
-rw-r--r--src/main/java/com/amazon/carbonado/lob/AbstractClob.java27
-rw-r--r--src/main/java/com/amazon/carbonado/lob/BlobClob.java4
-rw-r--r--src/main/java/com/amazon/carbonado/lob/ByteArrayBlob.java7
-rw-r--r--src/main/java/com/amazon/carbonado/lob/CharArrayClob.java7
-rw-r--r--src/main/java/com/amazon/carbonado/lob/FileBlob.java7
-rw-r--r--src/main/java/com/amazon/carbonado/lob/Lob.java8
-rw-r--r--src/main/java/com/amazon/carbonado/lob/StringClob.java7
-rw-r--r--src/main/java/com/amazon/carbonado/repo/jdbc/JDBCBlob.java5
-rw-r--r--src/main/java/com/amazon/carbonado/repo/jdbc/JDBCClob.java5
-rw-r--r--src/main/java/com/amazon/carbonado/repo/replicated/BlobReplicationTrigger.java304
-rw-r--r--src/main/java/com/amazon/carbonado/repo/replicated/ClobReplicationTrigger.java294
-rw-r--r--src/main/java/com/amazon/carbonado/repo/replicated/ReplicationTrigger.java14
-rw-r--r--src/main/java/com/amazon/carbonado/spi/LobEngine.java33
14 files changed, 731 insertions, 18 deletions
diff --git a/src/main/java/com/amazon/carbonado/lob/AbstractBlob.java b/src/main/java/com/amazon/carbonado/lob/AbstractBlob.java
index 6440978..1d331bc 100644
--- a/src/main/java/com/amazon/carbonado/lob/AbstractBlob.java
+++ b/src/main/java/com/amazon/carbonado/lob/AbstractBlob.java
@@ -196,4 +196,31 @@ public abstract class AbstractBlob implements Blob {
}
}
}
+
+ @Override
+ public int hashCode() {
+ Object locator = getLocator();
+ return locator == null ? super.hashCode() : locator.hashCode();
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj instanceof AbstractBlob) {
+ Object locator = getLocator();
+ if (locator != null) {
+ AbstractBlob other = (AbstractBlob) obj;
+ return locator == other.getLocator();
+ }
+ }
+ return false;
+ }
+
+ @Override
+ public String toString() {
+ Object locator = getLocator();
+ return locator == null ? super.toString() : ("Blob@" + locator);
+ }
}
diff --git a/src/main/java/com/amazon/carbonado/lob/AbstractClob.java b/src/main/java/com/amazon/carbonado/lob/AbstractClob.java
index 6e4e273..8531ba4 100644
--- a/src/main/java/com/amazon/carbonado/lob/AbstractClob.java
+++ b/src/main/java/com/amazon/carbonado/lob/AbstractClob.java
@@ -165,4 +165,31 @@ public abstract class AbstractClob implements Clob {
}
}
}
+
+ @Override
+ public int hashCode() {
+ Object locator = getLocator();
+ return locator == null ? super.hashCode() : locator.hashCode();
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj instanceof AbstractClob) {
+ Object locator = getLocator();
+ if (locator != null) {
+ AbstractClob other = (AbstractClob) obj;
+ return locator == other.getLocator();
+ }
+ }
+ return false;
+ }
+
+ @Override
+ public String toString() {
+ Object locator = getLocator();
+ return locator == null ? super.toString() : ("Clob@" + locator);
+ }
}
diff --git a/src/main/java/com/amazon/carbonado/lob/BlobClob.java b/src/main/java/com/amazon/carbonado/lob/BlobClob.java
index 9822171..107f422 100644
--- a/src/main/java/com/amazon/carbonado/lob/BlobClob.java
+++ b/src/main/java/com/amazon/carbonado/lob/BlobClob.java
@@ -96,6 +96,10 @@ public class BlobClob extends AbstractClob {
mBlob.setLength(length << 1);
}
+ public Object getLocator() {
+ return mBlob.getLocator();
+ }
+
protected Blob getWrappedBlob() {
return mBlob;
}
diff --git a/src/main/java/com/amazon/carbonado/lob/ByteArrayBlob.java b/src/main/java/com/amazon/carbonado/lob/ByteArrayBlob.java
index eadd8e1..028f575 100644
--- a/src/main/java/com/amazon/carbonado/lob/ByteArrayBlob.java
+++ b/src/main/java/com/amazon/carbonado/lob/ByteArrayBlob.java
@@ -305,6 +305,13 @@ public class ByteArrayBlob extends AbstractBlob {
}
}
+ /**
+ * Always returns null.
+ */
+ public Object getLocator() {
+ return null;
+ }
+
private static class Input extends InputStream {
private final ByteArrayBlob mBlob;
private long mPos;
diff --git a/src/main/java/com/amazon/carbonado/lob/CharArrayClob.java b/src/main/java/com/amazon/carbonado/lob/CharArrayClob.java
index ed6b5e9..1472da3 100644
--- a/src/main/java/com/amazon/carbonado/lob/CharArrayClob.java
+++ b/src/main/java/com/amazon/carbonado/lob/CharArrayClob.java
@@ -310,6 +310,13 @@ public class CharArrayClob extends AbstractClob {
}
}
+ /**
+ * Always returns null.
+ */
+ public Object getLocator() {
+ return null;
+ }
+
private static class Input extends Reader {
private final CharArrayClob mClob;
private long mPos;
diff --git a/src/main/java/com/amazon/carbonado/lob/FileBlob.java b/src/main/java/com/amazon/carbonado/lob/FileBlob.java
index c351361..eeba914 100644
--- a/src/main/java/com/amazon/carbonado/lob/FileBlob.java
+++ b/src/main/java/com/amazon/carbonado/lob/FileBlob.java
@@ -109,4 +109,11 @@ public class FileBlob extends AbstractBlob {
throw new PersistException(e);
}
}
+
+ /**
+ * Always returns null.
+ */
+ public Object getLocator() {
+ return null;
+ }
}
diff --git a/src/main/java/com/amazon/carbonado/lob/Lob.java b/src/main/java/com/amazon/carbonado/lob/Lob.java
index a09bfe0..404500f 100644
--- a/src/main/java/com/amazon/carbonado/lob/Lob.java
+++ b/src/main/java/com/amazon/carbonado/lob/Lob.java
@@ -25,6 +25,14 @@ package com.amazon.carbonado.lob;
*/
public interface Lob {
/**
+ * Returns an object which identifies the Lob data, which may be null if
+ * not supported.
+ *
+ * @since 1.2
+ */
+ Object getLocator();
+
+ /**
* Two Lobs are considered equal if the object instances are the same or if
* they point to the same content. Lob data is not compared, as that would
* be expensive or it may result in a fetch exception.
diff --git a/src/main/java/com/amazon/carbonado/lob/StringClob.java b/src/main/java/com/amazon/carbonado/lob/StringClob.java
index f19d63d..fe9b05d 100644
--- a/src/main/java/com/amazon/carbonado/lob/StringClob.java
+++ b/src/main/java/com/amazon/carbonado/lob/StringClob.java
@@ -87,6 +87,13 @@ public class StringClob extends AbstractClob {
denied();
}
+ /**
+ * Always returns null.
+ */
+ public Object getLocator() {
+ return null;
+ }
+
private PersistException denied() {
return new PersistDeniedException("Read-only");
}
diff --git a/src/main/java/com/amazon/carbonado/repo/jdbc/JDBCBlob.java b/src/main/java/com/amazon/carbonado/repo/jdbc/JDBCBlob.java
index c031a9e..18d1eb4 100644
--- a/src/main/java/com/amazon/carbonado/repo/jdbc/JDBCBlob.java
+++ b/src/main/java/com/amazon/carbonado/repo/jdbc/JDBCBlob.java
@@ -116,6 +116,11 @@ class JDBCBlob extends AbstractBlob implements JDBCLob {
}
}
+ public Object getLocator() {
+ // FIXME
+ return null;
+ }
+
public void close() {
mBlob = null;
}
diff --git a/src/main/java/com/amazon/carbonado/repo/jdbc/JDBCClob.java b/src/main/java/com/amazon/carbonado/repo/jdbc/JDBCClob.java
index dc2ec0a..dc254bb 100644
--- a/src/main/java/com/amazon/carbonado/repo/jdbc/JDBCClob.java
+++ b/src/main/java/com/amazon/carbonado/repo/jdbc/JDBCClob.java
@@ -116,6 +116,11 @@ class JDBCClob extends AbstractClob implements JDBCLob {
}
}
+ public Object getLocator() {
+ // FIXME
+ return null;
+ }
+
public void close() {
mClob = null;
}
diff --git a/src/main/java/com/amazon/carbonado/repo/replicated/BlobReplicationTrigger.java b/src/main/java/com/amazon/carbonado/repo/replicated/BlobReplicationTrigger.java
new file mode 100644
index 0000000..0657d50
--- /dev/null
+++ b/src/main/java/com/amazon/carbonado/repo/replicated/BlobReplicationTrigger.java
@@ -0,0 +1,304 @@
+/*
+ * Copyright 2008 Amazon Technologies, Inc. or its affiliates.
+ * Amazon, Amazon.com and Carbonado are trademarks or registered trademarks
+ * of Amazon Technologies, Inc. or its affiliates. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.amazon.carbonado.repo.replicated;
+
+import java.io.BufferedOutputStream;
+import java.io.InputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+
+import java.nio.charset.Charset;
+
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.List;
+
+import com.amazon.carbonado.FetchException;
+import com.amazon.carbonado.PersistException;
+import com.amazon.carbonado.PersistNoneException;
+import com.amazon.carbonado.Storable;
+import com.amazon.carbonado.Storage;
+import com.amazon.carbonado.Trigger;
+
+import com.amazon.carbonado.info.StorableIntrospector;
+import com.amazon.carbonado.info.StorableProperty;
+
+import com.amazon.carbonado.lob.AbstractBlob;
+import com.amazon.carbonado.lob.Blob;
+import com.amazon.carbonado.lob.ByteArrayBlob;
+
+/**
+ * After loading a replica, replaces all Blobs with ReplicatedBlobs.
+ *
+ * @author Brian S O'Neill
+ */
+class BlobReplicationTrigger<S extends Storable> extends Trigger<S> {
+ /**
+ * Returns null if no Blobs need to be replicated.
+ */
+ static <S extends Storable> BlobReplicationTrigger<S> create(Storage<S> masterStorage) {
+ Map<String, ? extends StorableProperty<S>> properties =
+ StorableIntrospector.examine(masterStorage.getStorableType()).getDataProperties();
+
+ List<String> blobNames = new ArrayList<String>(2);
+
+ for (StorableProperty<S> property : properties.values()) {
+ if (property.getType() == Blob.class) {
+ blobNames.add(property.getName());
+ }
+ }
+
+ if (blobNames.size() == 0) {
+ return null;
+ }
+
+ return new BlobReplicationTrigger<S>(masterStorage,
+ blobNames.toArray(new String[blobNames.size()]));
+ }
+
+ private final Storage<S> mMasterStorage;
+ private final String[] mBlobNames;
+
+ private BlobReplicationTrigger(Storage<S> masterStorage, String[] blobNames) {
+ mMasterStorage = masterStorage;
+ mBlobNames = blobNames;
+ }
+
+ @Override
+ public void afterInsert(S replica, Object state) {
+ afterLoad(replica);
+ }
+
+ @Override
+ public void afterUpdate(S replica, Object state) {
+ afterLoad(replica);
+ }
+
+ @Override
+ public void afterLoad(S replica) {
+ for (String name : mBlobNames) {
+ if (!replica.isPropertySupported(name)) {
+ continue;
+ }
+ Blob replicaBlob = (Blob) replica.getPropertyValue(name);
+ if (replicaBlob != null) {
+ if (replicaBlob instanceof BlobReplicationTrigger.Replicated) {
+ if (((Replicated) replicaBlob).parent() == this) {
+ continue;
+ }
+ }
+ Replicated blob = new Replicated(name, replica, replicaBlob);
+ replica.setPropertyValue(name, blob);
+ }
+ }
+ replica.markAllPropertiesClean();
+ }
+
+ /**
+ * Writes go to master property first, and then to replica.
+ */
+ class Replicated extends AbstractBlob {
+ private static final int DEFAULT_BUFFER_SIZE = 4000;
+
+ private final String mBlobName;
+ private final S mReplica;
+ private final Blob mReplicaBlob;
+
+ private Blob mMasterBlob;
+ private boolean mMasterBlobLoaded;
+
+ Replicated(String blobName, S replica, Blob replicaBlob) {
+ mBlobName = blobName;
+ mReplica = replica;
+ mReplicaBlob = replicaBlob;
+ }
+
+ public InputStream openInputStream() throws FetchException {
+ return mReplicaBlob.openInputStream();
+ }
+
+ public InputStream openInputStream(long pos) throws FetchException {
+ return mReplicaBlob.openInputStream(pos);
+ }
+
+ public InputStream openInputStream(long pos, int bufferSize) throws FetchException {
+ return mReplicaBlob.openInputStream(pos, bufferSize);
+ }
+
+ public long getLength() throws FetchException {
+ return mReplicaBlob.getLength();
+ }
+
+ public String asString() throws FetchException {
+ return mReplicaBlob.asString();
+ }
+
+ public String asString(String charsetName) throws FetchException {
+ return mReplicaBlob.asString(charsetName);
+ }
+
+ public String asString(Charset charset) throws FetchException {
+ return mReplicaBlob.asString(charset);
+ }
+
+ public OutputStream openOutputStream() throws PersistException {
+ Blob masterBlob = masterBlob();
+ if (masterBlob == null) {
+ return mReplicaBlob.openOutputStream();
+ } else {
+ return openOutputStream(masterBlob, 0, DEFAULT_BUFFER_SIZE);
+ }
+ }
+
+ public OutputStream openOutputStream(long pos) throws PersistException {
+ Blob masterBlob = masterBlob();
+ if (masterBlob == null) {
+ return mReplicaBlob.openOutputStream(pos);
+ } else {
+ return openOutputStream(masterBlob, pos, DEFAULT_BUFFER_SIZE);
+ }
+ }
+
+ public OutputStream openOutputStream(long pos, int bufferSize) throws PersistException {
+ Blob masterBlob = masterBlob();
+ if (masterBlob == null) {
+ return mReplicaBlob.openOutputStream(pos, bufferSize);
+ } else {
+ return openOutputStream(masterBlob, pos, bufferSize);
+ }
+ }
+
+ private OutputStream openOutputStream(Blob masterBlob, long pos, int bufferSize)
+ throws PersistException
+ {
+ if (bufferSize < DEFAULT_BUFFER_SIZE) {
+ bufferSize = DEFAULT_BUFFER_SIZE;
+ }
+
+ OutputStream masterOut = masterBlob.openOutputStream(pos, 0);
+ OutputStream replicaOut = mReplicaBlob.openOutputStream(pos, 0);
+
+ return new BufferedOutputStream(new Copier(masterOut, replicaOut), bufferSize);
+ }
+
+ public void setLength(long length) throws PersistException {
+ Blob masterBlob = masterBlob();
+ if (masterBlob != null) {
+ masterBlob.setLength(length);
+ }
+ mReplicaBlob.setLength(length);
+ }
+
+ public Object getLocator() {
+ return mReplicaBlob.getLocator();
+ }
+
+ @Override
+ public int hashCode() {
+ return mReplicaBlob.hashCode();
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj instanceof BlobReplicationTrigger.Replicated) {
+ Replicated other = (Replicated) obj;
+ return parent() == other.parent() &&
+ mReplicaBlob.equals(other.mReplicaBlob);
+ }
+ return false;
+ }
+
+ @Override
+ public String toString() {
+ Object locator = getLocator();
+ return locator == null ? super.toString() : ("ReplicatedBlob@" + locator);
+ }
+
+ BlobReplicationTrigger parent() {
+ return BlobReplicationTrigger.this;
+ }
+
+ /**
+ * Returns null if not supported.
+ */
+ private Blob masterBlob() throws PersistException {
+ Blob masterBlob = mMasterBlob;
+
+ if (mMasterBlobLoaded) {
+ return masterBlob;
+ }
+
+ S master = mMasterStorage.prepare();
+ mReplica.copyPrimaryKeyProperties(master);
+
+ try {
+ // FIXME: handle missing master with resync
+ master.load();
+
+ if (master.isPropertySupported(mBlobName)) {
+ masterBlob = (Blob) master.getPropertyValue(mBlobName);
+ if (masterBlob == null) {
+ // FIXME: perform resync, but still throw exception
+ throw new PersistNoneException("Master Blob is null: " + mBlobName);
+ }
+ }
+
+ mMasterBlob = masterBlob;
+ mMasterBlobLoaded = true;
+
+ return masterBlob;
+ } catch (FetchException e) {
+ throw e.toPersistException();
+ }
+ }
+ }
+
+ private static class Copier extends OutputStream {
+ private final OutputStream mReplicaOut;
+ private final OutputStream mMasterOut;
+
+ Copier(OutputStream master, OutputStream replica) {
+ mMasterOut = master;
+ mReplicaOut = replica;
+ }
+
+ public void write(int b) throws IOException {
+ mMasterOut.write(b);
+ mReplicaOut.write(b);
+ }
+
+ public void write(byte[] b, int off, int len) throws IOException {
+ mMasterOut.write(b, off, len);
+ mReplicaOut.write(b, off, len);
+ }
+
+ public void flush() throws IOException {
+ mMasterOut.flush();
+ mReplicaOut.flush();
+ }
+
+ public void close() throws IOException {
+ mMasterOut.close();
+ mReplicaOut.close();
+ }
+ }
+}
diff --git a/src/main/java/com/amazon/carbonado/repo/replicated/ClobReplicationTrigger.java b/src/main/java/com/amazon/carbonado/repo/replicated/ClobReplicationTrigger.java
new file mode 100644
index 0000000..5da6eed
--- /dev/null
+++ b/src/main/java/com/amazon/carbonado/repo/replicated/ClobReplicationTrigger.java
@@ -0,0 +1,294 @@
+/*
+ * Copyright 2008 Amazon Technologies, Inc. or its affiliates.
+ * Amazon, Amazon.com and Carbonado are trademarks or registered trademarks
+ * of Amazon Technologies, Inc. or its affiliates. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.amazon.carbonado.repo.replicated;
+
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.io.Reader;
+import java.io.Writer;
+
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.List;
+
+import com.amazon.carbonado.FetchException;
+import com.amazon.carbonado.PersistException;
+import com.amazon.carbonado.PersistNoneException;
+import com.amazon.carbonado.Storable;
+import com.amazon.carbonado.Storage;
+import com.amazon.carbonado.Trigger;
+
+import com.amazon.carbonado.info.StorableIntrospector;
+import com.amazon.carbonado.info.StorableProperty;
+
+import com.amazon.carbonado.lob.AbstractClob;
+import com.amazon.carbonado.lob.Clob;
+import com.amazon.carbonado.lob.CharArrayClob;
+
+/**
+ * After loading a replica, replaces all Clobs with ReplicatedClobs.
+ *
+ * @author Brian S O'Neill
+ */
+class ClobReplicationTrigger<S extends Storable> extends Trigger<S> {
+ /**
+ * Returns null if no Clobs need to be replicated.
+ */
+ static <S extends Storable> ClobReplicationTrigger<S> create(Storage<S> masterStorage) {
+ Map<String, ? extends StorableProperty<S>> properties =
+ StorableIntrospector.examine(masterStorage.getStorableType()).getDataProperties();
+
+ List<String> clobNames = new ArrayList<String>(2);
+
+ for (StorableProperty<S> property : properties.values()) {
+ if (property.getType() == Clob.class) {
+ clobNames.add(property.getName());
+ }
+ }
+
+ if (clobNames.size() == 0) {
+ return null;
+ }
+
+ return new ClobReplicationTrigger<S>(masterStorage,
+ clobNames.toArray(new String[clobNames.size()]));
+ }
+
+ private final Storage<S> mMasterStorage;
+ private final String[] mClobNames;
+
+ private ClobReplicationTrigger(Storage<S> masterStorage, String[] clobNames) {
+ mMasterStorage = masterStorage;
+ mClobNames = clobNames;
+ }
+
+ @Override
+ public void afterInsert(S replica, Object state) {
+ afterLoad(replica);
+ }
+
+ @Override
+ public void afterUpdate(S replica, Object state) {
+ afterLoad(replica);
+ }
+
+ @Override
+ public void afterLoad(S replica) {
+ for (String name : mClobNames) {
+ if (!replica.isPropertySupported(name)) {
+ continue;
+ }
+ Clob replicaClob = (Clob) replica.getPropertyValue(name);
+ if (replicaClob != null) {
+ if (replicaClob instanceof ClobReplicationTrigger.Replicated) {
+ if (((Replicated) replicaClob).parent() == this) {
+ continue;
+ }
+ }
+ Replicated clob = new Replicated(name, replica, replicaClob);
+ replica.setPropertyValue(name, clob);
+ }
+ }
+ replica.markAllPropertiesClean();
+ }
+
+ /**
+ * Writes go to master property first, and then to replica.
+ */
+ class Replicated extends AbstractClob {
+ private static final int DEFAULT_BUFFER_SIZE = 4000;
+
+ private final String mClobName;
+ private final S mReplica;
+ private final Clob mReplicaClob;
+
+ private Clob mMasterClob;
+ private boolean mMasterClobLoaded;
+
+ Replicated(String clobName, S replica, Clob replicaClob) {
+ mClobName = clobName;
+ mReplica = replica;
+ mReplicaClob = replicaClob;
+ }
+
+ public Reader openReader() throws FetchException {
+ return mReplicaClob.openReader();
+ }
+
+ public Reader openReader(long pos) throws FetchException {
+ return mReplicaClob.openReader(pos);
+ }
+
+ public Reader openReader(long pos, int bufferSize) throws FetchException {
+ return mReplicaClob.openReader(pos, bufferSize);
+ }
+
+ public long getLength() throws FetchException {
+ return mReplicaClob.getLength();
+ }
+
+ public String asString() throws FetchException {
+ return mReplicaClob.asString();
+ }
+
+ public Writer openWriter() throws PersistException {
+ Clob masterClob = masterClob();
+ if (masterClob == null) {
+ return mReplicaClob.openWriter();
+ } else {
+ return openWriter(masterClob, 0, DEFAULT_BUFFER_SIZE);
+ }
+ }
+
+ public Writer openWriter(long pos) throws PersistException {
+ Clob masterClob = masterClob();
+ if (masterClob == null) {
+ return mReplicaClob.openWriter(pos);
+ } else {
+ return openWriter(masterClob, pos, DEFAULT_BUFFER_SIZE);
+ }
+ }
+
+ public Writer openWriter(long pos, int bufferSize) throws PersistException {
+ Clob masterClob = masterClob();
+ if (masterClob == null) {
+ return mReplicaClob.openWriter(pos, bufferSize);
+ } else {
+ return openWriter(masterClob, pos, bufferSize);
+ }
+ }
+
+ private Writer openWriter(Clob masterClob, long pos, int bufferSize)
+ throws PersistException
+ {
+ if (bufferSize < DEFAULT_BUFFER_SIZE) {
+ bufferSize = DEFAULT_BUFFER_SIZE;
+ }
+
+ Writer masterOut = masterClob.openWriter(pos, 0);
+ Writer replicaOut = mReplicaClob.openWriter(pos, 0);
+
+ return new BufferedWriter(new Copier(masterOut, replicaOut), bufferSize);
+ }
+
+ public void setLength(long length) throws PersistException {
+ Clob masterClob = masterClob();
+ if (masterClob != null) {
+ masterClob.setLength(length);
+ }
+ mReplicaClob.setLength(length);
+ }
+
+ public Object getLocator() {
+ return mReplicaClob.getLocator();
+ }
+
+ @Override
+ public int hashCode() {
+ return mReplicaClob.hashCode();
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj instanceof ClobReplicationTrigger.Replicated) {
+ Replicated other = (Replicated) obj;
+ return parent() == other.parent() &&
+ mReplicaClob.equals(other.mReplicaClob);
+ }
+ return false;
+ }
+
+ @Override
+ public String toString() {
+ Object locator = getLocator();
+ return locator == null ? super.toString() : ("ReplicatedClob@" + locator);
+ }
+
+ ClobReplicationTrigger parent() {
+ return ClobReplicationTrigger.this;
+ }
+
+ /**
+ * Returns null if not supported.
+ */
+ private Clob masterClob() throws PersistException {
+ Clob masterClob = mMasterClob;
+
+ if (mMasterClobLoaded) {
+ return masterClob;
+ }
+
+ S master = mMasterStorage.prepare();
+ mReplica.copyPrimaryKeyProperties(master);
+
+ try {
+ // FIXME: handle missing master with resync
+ master.load();
+
+ if (master.isPropertySupported(mClobName)) {
+ masterClob = (Clob) master.getPropertyValue(mClobName);
+ if (masterClob == null) {
+ // FIXME: perform resync, but still throw exception
+ throw new PersistNoneException("Master Clob is null: " + mClobName);
+ }
+ }
+
+ mMasterClob = masterClob;
+ mMasterClobLoaded = true;
+
+ return masterClob;
+ } catch (FetchException e) {
+ throw e.toPersistException();
+ }
+ }
+ }
+
+ private static class Copier extends Writer {
+ private final Writer mReplicaOut;
+ private final Writer mMasterOut;
+
+ Copier(Writer master, Writer replica) {
+ mMasterOut = master;
+ mReplicaOut = replica;
+ }
+
+ public void write(int c) throws IOException {
+ mMasterOut.write(c);
+ mReplicaOut.write(c);
+ }
+
+ public void write(char[] c, int off, int len) throws IOException {
+ mMasterOut.write(c, off, len);
+ mReplicaOut.write(c, off, len);
+ }
+
+ public void flush() throws IOException {
+ mMasterOut.flush();
+ mReplicaOut.flush();
+ }
+
+ public void close() throws IOException {
+ mMasterOut.close();
+ mReplicaOut.close();
+ }
+ }
+}
diff --git a/src/main/java/com/amazon/carbonado/repo/replicated/ReplicationTrigger.java b/src/main/java/com/amazon/carbonado/repo/replicated/ReplicationTrigger.java
index c3adeed..996a9f8 100644
--- a/src/main/java/com/amazon/carbonado/repo/replicated/ReplicationTrigger.java
+++ b/src/main/java/com/amazon/carbonado/repo/replicated/ReplicationTrigger.java
@@ -57,10 +57,22 @@ class ReplicationTrigger<S extends Storable> extends Trigger<S> {
mRepository = repository;
mReplicaStorage = replicaStorage;
mMasterStorage = masterStorage;
+
// Use TriggerManager to locally disable trigger execution during
// resync and repairs.
mTriggerManager = new TriggerManager<S>();
mTriggerManager.addTrigger(this);
+
+ BlobReplicationTrigger<S> blobTrigger = BlobReplicationTrigger.create(masterStorage);
+ if (blobTrigger != null) {
+ mTriggerManager.addTrigger(blobTrigger);
+ }
+
+ ClobReplicationTrigger<S> clobTrigger = ClobReplicationTrigger.create(masterStorage);
+ if (clobTrigger != null) {
+ mTriggerManager.addTrigger(clobTrigger);
+ }
+
replicaStorage.addTrigger(mTriggerManager);
}
@@ -377,6 +389,7 @@ class ReplicationTrigger<S extends Storable> extends Trigger<S> {
tm.locallyDisableInsert();
tm.locallyDisableUpdate();
tm.locallyDisableDelete();
+ tm.locallyDisableLoad();
}
void setReplicationEnabled() {
@@ -384,5 +397,6 @@ class ReplicationTrigger<S extends Storable> extends Trigger<S> {
tm.locallyEnableInsert();
tm.locallyEnableUpdate();
tm.locallyEnableDelete();
+ tm.locallyEnableLoad();
}
}
diff --git a/src/main/java/com/amazon/carbonado/spi/LobEngine.java b/src/main/java/com/amazon/carbonado/spi/LobEngine.java
index 8cc1bf9..e6fcb07 100644
--- a/src/main/java/com/amazon/carbonado/spi/LobEngine.java
+++ b/src/main/java/com/amazon/carbonado/spi/LobEngine.java
@@ -155,7 +155,8 @@ public class LobEngine {
if (lob == null) {
return 0;
}
- return ((LobImpl) lob).getLocator();
+ Long locator = (Long) lob.getLocator();
+ return locator == null ? 0 : locator;
}
/**
@@ -468,12 +469,8 @@ public class LobEngine {
return trigger;
}
- private interface LobImpl extends Lob {
- long getLocator();
- }
-
- private class BlobImpl extends AbstractBlob implements LobImpl {
- final long mLocator;
+ private class BlobImpl extends AbstractBlob implements Lob {
+ final Long mLocator;
final StoredLob mStoredLob;
BlobImpl(long locator) {
@@ -648,9 +645,13 @@ public class LobEngine {
}
}
+ public Long getLocator() {
+ return mLocator;
+ }
+
@Override
public int hashCode() {
- return ((int) (mLocator >> 32)) ^ ((int) mLocator);
+ return mLocator.hashCode();
}
@Override
@@ -660,7 +661,7 @@ public class LobEngine {
}
if (obj instanceof BlobImpl) {
BlobImpl other = (BlobImpl) obj;
- return LobEngine.this == other.getEnclosing() && mLocator == other.mLocator;
+ return LobEngine.this == other.getEnclosing() && mLocator.equals(other.mLocator);
}
return false;
}
@@ -670,16 +671,12 @@ public class LobEngine {
return "Blob@" + getLocator();
}
- public long getLocator() {
- return mLocator;
- }
-
LobEngine getEnclosing() {
return LobEngine.this;
}
}
- private class ClobImpl extends BlobClob implements LobImpl {
+ private class ClobImpl extends BlobClob implements Lob {
ClobImpl(long locator) {
super(new BlobImpl(locator));
}
@@ -688,6 +685,10 @@ public class LobEngine {
super(new BlobImpl(lob));
}
+ public Long getLocator() {
+ return ((BlobImpl) super.getWrappedBlob()).getLocator();
+ }
+
@Override
public int hashCode() {
return super.getWrappedBlob().hashCode();
@@ -709,10 +710,6 @@ public class LobEngine {
return "Clob@" + getLocator();
}
- public long getLocator() {
- return ((BlobImpl) super.getWrappedBlob()).getLocator();
- }
-
// Override to gain permission.
protected BlobImpl getWrappedBlob() {
return (BlobImpl) super.getWrappedBlob();