summaryrefslogtreecommitdiff
path: root/src/main
diff options
context:
space:
mode:
Diffstat (limited to 'src/main')
-rw-r--r--src/main/java/com/amazon/carbonado/cursor/MergeSortBuffer.java80
1 files changed, 67 insertions, 13 deletions
diff --git a/src/main/java/com/amazon/carbonado/cursor/MergeSortBuffer.java b/src/main/java/com/amazon/carbonado/cursor/MergeSortBuffer.java
index 1057d01..b02f9bf 100644
--- a/src/main/java/com/amazon/carbonado/cursor/MergeSortBuffer.java
+++ b/src/main/java/com/amazon/carbonado/cursor/MergeSortBuffer.java
@@ -120,10 +120,11 @@ public class MergeSortBuffer<S extends Storable> extends AbstractCollection<S>
TEMP_DIR = tempDir;
}
- private final Storage<S> mStorage;
private final String mTempDir;
private final int mMaxArrayCapacity;
+ private Preparer<S> mPreparer;
+
private S[] mElements;
private int mSize;
private int mTotalSize;
@@ -136,14 +137,23 @@ public class MergeSortBuffer<S extends Storable> extends AbstractCollection<S>
private volatile boolean mStop;
/**
- * @param storage storage type of elements
+ * @since 1.2
+ */
+ public MergeSortBuffer() {
+ this(null, TEMP_DIR, MAX_ARRAY_CAPACITY);
+ }
+
+ /**
+ * @param storage storage for elements; if null use first Storable to
+ * prepare reloaded Storables
*/
public MergeSortBuffer(Storage<S> storage) {
this(storage, TEMP_DIR, MAX_ARRAY_CAPACITY);
}
/**
- * @param storage storage type of elements
+ * @param storage storage for elements; if null use first Storable to
+ * prepare reloaded Storables
* @param tempDir directory to store temp files for merging, or null for default
*/
public MergeSortBuffer(Storage<S> storage, String tempDir) {
@@ -151,7 +161,8 @@ public class MergeSortBuffer<S extends Storable> extends AbstractCollection<S>
}
/**
- * @param storage storage type of elements
+ * @param storage storage for elements; if null use first Storable to
+ * prepare reloaded Storables
* @param tempDir directory to store temp files for merging, or null for default
* @param maxArrayCapacity maximum amount of storables to keep in an array
* before serializing to a file
@@ -159,12 +170,13 @@ public class MergeSortBuffer<S extends Storable> extends AbstractCollection<S>
*/
@SuppressWarnings("unchecked")
public MergeSortBuffer(Storage<S> storage, String tempDir, int maxArrayCapacity) {
- if (storage == null) {
- throw new IllegalArgumentException();
- }
- mStorage = storage;
mTempDir = tempDir;
mMaxArrayCapacity = maxArrayCapacity;
+
+ if (storage != null) {
+ mPreparer = new FromStorage(storage);
+ }
+
int cap = Math.min(MIN_ARRAY_CAPACITY, maxArrayCapacity);
mElements = (S[]) new Storable[cap];
}
@@ -178,6 +190,10 @@ public class MergeSortBuffer<S extends Storable> extends AbstractCollection<S>
}
public boolean add(S storable) {
+ if (mPreparer == null) {
+ mPreparer = new FromStorable(storable);
+ }
+
Comparator<S> comparator = comparator();
arrayPrep:
@@ -320,13 +336,17 @@ public class MergeSortBuffer<S extends Storable> extends AbstractCollection<S>
InputStream in = new BufferedInputStream(new RAFInputStream(raf));
- pq.add(new InputIter<S>(comparator, mStorage, in));
+ pq.add(new InputIter<S>(comparator, mPreparer, in));
}
return new Merger<S>(pq);
}
public void clear() {
+ if (mPreparer instanceof FromStorable) {
+ mPreparer = null;
+ }
+
if (mTotalSize > 0) {
mSize = 0;
mTotalSize = 0;
@@ -369,6 +389,40 @@ public class MergeSortBuffer<S extends Storable> extends AbstractCollection<S>
}
}
+ private static interface Preparer<S extends Storable> {
+ S prepare();
+ }
+
+ private static class FromStorage<S extends Storable> implements Preparer<S> {
+ private final Storage<S> mStorage;
+
+ FromStorage(Storage<S> storage) {
+ if (storage == null) {
+ throw new IllegalArgumentException();
+ }
+ mStorage = storage;
+ }
+
+ public S prepare() {
+ return mStorage.prepare();
+ }
+ }
+
+ private static class FromStorable<S extends Storable> implements Preparer<S> {
+ private final S mStorable;
+
+ FromStorable(S storable) {
+ if (storable == null) {
+ throw new IllegalArgumentException();
+ }
+ mStorable = (S) storable.prepare();
+ }
+
+ public S prepare() {
+ return (S) mStorable.prepare();
+ }
+ }
+
/**
* Simple interator interface that supports peeking at next element.
*/
@@ -443,14 +497,14 @@ public class MergeSortBuffer<S extends Storable> extends AbstractCollection<S>
* Iterator that reads from an input stream of serialized Storables.
*/
private static class InputIter<S extends Storable> extends Iter<S> {
- private Storage<S> mStorage;
+ private final Preparer<S> mPreparer;
private InputStream mIn;
private S mNext;
- InputIter(Comparator<S> comparator, Storage<S> storage, InputStream in) {
+ InputIter(Comparator<S> comparator, Preparer<S> preparer, InputStream in) {
super(comparator);
- mStorage = storage;
+ mPreparer = preparer;
mIn = in;
}
@@ -460,7 +514,7 @@ public class MergeSortBuffer<S extends Storable> extends AbstractCollection<S>
}
if (mIn != null) {
try {
- S next = mStorage.prepare();
+ S next = mPreparer.prepare();
next.readFrom(mIn);
mNext = next;
} catch (EOFException e) {