summaryrefslogtreecommitdiff
path: root/src/main/java/com/amazon/carbonado/cursor
diff options
context:
space:
mode:
authorBrian S. O'Neill <bronee@gmail.com>2006-08-30 01:55:10 +0000
committerBrian S. O'Neill <bronee@gmail.com>2006-08-30 01:55:10 +0000
commitab635e23ad5fb9cd9edb61665a3654ee3e4b372a (patch)
tree7feecd04c2a7305db1b4367fdf4513249c619d5b /src/main/java/com/amazon/carbonado/cursor
parentedcd84a1cb3c2009b0596939f30cee35028afcfc (diff)
Add cursor implementations
Diffstat (limited to 'src/main/java/com/amazon/carbonado/cursor')
-rw-r--r--src/main/java/com/amazon/carbonado/cursor/AbstractCursor.java83
-rw-r--r--src/main/java/com/amazon/carbonado/cursor/ArraySortBuffer.java62
-rw-r--r--src/main/java/com/amazon/carbonado/cursor/DifferenceCursor.java120
-rw-r--r--src/main/java/com/amazon/carbonado/cursor/EmptyCursor.java103
-rw-r--r--src/main/java/com/amazon/carbonado/cursor/FilteredCursor.java148
-rw-r--r--src/main/java/com/amazon/carbonado/cursor/FilteredCursorGenerator.java665
-rw-r--r--src/main/java/com/amazon/carbonado/cursor/GroupedCursor.java205
-rw-r--r--src/main/java/com/amazon/carbonado/cursor/IntersectionCursor.java118
-rw-r--r--src/main/java/com/amazon/carbonado/cursor/IteratorCursor.java62
-rw-r--r--src/main/java/com/amazon/carbonado/cursor/JoinedCursorFactory.java459
-rw-r--r--src/main/java/com/amazon/carbonado/cursor/MergeSortBuffer.java498
-rw-r--r--src/main/java/com/amazon/carbonado/cursor/MultiTransformedCursor.java132
-rw-r--r--src/main/java/com/amazon/carbonado/cursor/SortBuffer.java53
-rw-r--r--src/main/java/com/amazon/carbonado/cursor/SortedCursor.java332
-rw-r--r--src/main/java/com/amazon/carbonado/cursor/SymmetricDifferenceCursor.java123
-rw-r--r--src/main/java/com/amazon/carbonado/cursor/ThrottledCursor.java101
-rw-r--r--src/main/java/com/amazon/carbonado/cursor/TransformedCursor.java119
-rw-r--r--src/main/java/com/amazon/carbonado/cursor/UnionCursor.java106
-rw-r--r--src/main/java/com/amazon/carbonado/cursor/WorkFilePool.java192
-rw-r--r--src/main/java/com/amazon/carbonado/cursor/package-info.java23
20 files changed, 3704 insertions, 0 deletions
diff --git a/src/main/java/com/amazon/carbonado/cursor/AbstractCursor.java b/src/main/java/com/amazon/carbonado/cursor/AbstractCursor.java
new file mode 100644
index 0000000..c71393a
--- /dev/null
+++ b/src/main/java/com/amazon/carbonado/cursor/AbstractCursor.java
@@ -0,0 +1,83 @@
+/*
+ * Copyright 2006 Amazon Technologies, Inc. or its affiliates.
+ * Amazon, Amazon.com and Carbonado are trademarks or registered trademarks
+ * of Amazon Technologies, Inc. or its affiliates. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.amazon.carbonado.cursor;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import com.amazon.carbonado.Cursor;
+import com.amazon.carbonado.FetchException;
+
+/**
+ * AbstractCursor implements a small set of common Cursor methods.
+ *
+ * @author Brian S O'Neill
+ */
+public abstract class AbstractCursor<S> implements Cursor<S> {
+ // Note: Since constructor takes no parameters, this class is called
+ // Abstract instead of Base.
+ protected AbstractCursor() {
+ }
+
+ public int copyInto(Collection<? super S> c) throws FetchException {
+ int originalSize = c.size();
+ while (hasNext()) {
+ c.add(next());
+ }
+ return c.size() - originalSize;
+ }
+
+ public int copyInto(Collection<? super S> c, int limit) throws FetchException {
+ int originalSize = c.size();
+ while (--limit >= 0 && hasNext()) {
+ c.add(next());
+ }
+ return c.size() - originalSize;
+ }
+
+ public List<S> toList() throws FetchException {
+ List<S> list = new ArrayList<S>();
+ copyInto(list);
+ return list;
+ }
+
+ public List<S> toList(int limit) throws FetchException {
+ List<S> list = new ArrayList<S>();
+ copyInto(list, limit);
+ return list;
+ }
+
+ public synchronized int skipNext(int amount) throws FetchException {
+ if (amount <= 0) {
+ if (amount < 0) {
+ throw new IllegalArgumentException("Cannot skip negative amount: " + amount);
+ }
+ return 0;
+ }
+
+ int count = 0;
+ while (--amount >= 0 && hasNext()) {
+ next();
+ count++;
+ }
+
+ return count;
+ }
+}
diff --git a/src/main/java/com/amazon/carbonado/cursor/ArraySortBuffer.java b/src/main/java/com/amazon/carbonado/cursor/ArraySortBuffer.java
new file mode 100644
index 0000000..db74134
--- /dev/null
+++ b/src/main/java/com/amazon/carbonado/cursor/ArraySortBuffer.java
@@ -0,0 +1,62 @@
+/*
+ * Copyright 2006 Amazon Technologies, Inc. or its affiliates.
+ * Amazon, Amazon.com and Carbonado are trademarks or registered trademarks
+ * of Amazon Technologies, Inc. or its affiliates. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.amazon.carbonado.cursor;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+
+/**
+ * Sort buffer implementation backed by an ArrayList.
+ *
+ * @author Brian S O'Neill
+ * @see SortedCursor
+ */
+public class ArraySortBuffer<S> extends ArrayList<S> implements SortBuffer<S> {
+ private static final long serialVersionUID = -5622302375191321452L;
+
+ private Comparator<S> mComparator;
+
+ public ArraySortBuffer() {
+ super();
+ }
+
+ public ArraySortBuffer(int initialCapacity) {
+ super(initialCapacity);
+ }
+
+ public void prepare(Comparator<S> comparator) {
+ if (comparator == null) {
+ throw new IllegalArgumentException();
+ }
+ clear();
+ mComparator = comparator;
+ }
+
+ public void sort() {
+ if (mComparator == null) {
+ throw new IllegalStateException("Buffer was not prepared");
+ }
+ Collections.sort(this, mComparator);
+ }
+
+ public void close() {
+ clear();
+ }
+}
diff --git a/src/main/java/com/amazon/carbonado/cursor/DifferenceCursor.java b/src/main/java/com/amazon/carbonado/cursor/DifferenceCursor.java
new file mode 100644
index 0000000..39455c8
--- /dev/null
+++ b/src/main/java/com/amazon/carbonado/cursor/DifferenceCursor.java
@@ -0,0 +1,120 @@
+/*
+ * Copyright 2006 Amazon Technologies, Inc. or its affiliates.
+ * Amazon, Amazon.com and Carbonado are trademarks or registered trademarks
+ * of Amazon Technologies, Inc. or its affiliates. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.amazon.carbonado.cursor;
+
+import java.util.Comparator;
+import java.util.NoSuchElementException;
+
+import com.amazon.carbonado.FetchException;
+import com.amazon.carbonado.Cursor;
+
+/**
+ * Wraps two Cursors and performs an <i>asymmetric set difference</i>
+ * operation.
+ *
+ * <p>Both cursors must return results in the same order. Ordering is preserved
+ * by the difference.
+ *
+ * @author Brian S O'Neill
+ * @see UnionCursor
+ * @see IntersectionCursor
+ * @see SymmetricDifferenceCursor
+ */
+public class DifferenceCursor<S> extends AbstractCursor<S> {
+ private final Cursor<S> mLeftCursor;
+ private final Cursor<S> mRightCursor;
+ private final Comparator<S> mOrder;
+
+ private S mNext;
+ private S mNextRight;
+
+ /**
+ * @param left cursor to wrap
+ * @param right cursor to wrap whose results are completely discarded
+ * @param order describes sort ordering of wrapped cursors, which must be
+ * a total ordering
+ */
+ public DifferenceCursor(Cursor<S> left, Cursor<S> right, Comparator<S> order) {
+ if (left == null || right == null || order == null) {
+ throw new IllegalArgumentException();
+ }
+ mLeftCursor = left;
+ mRightCursor = right;
+ mOrder = order;
+ }
+
+ public synchronized void close() throws FetchException {
+ mLeftCursor.close();
+ mRightCursor.close();
+ mNext = null;
+ mNextRight = null;
+ }
+
+ public synchronized boolean hasNext() throws FetchException {
+ if (mNext != null) {
+ return true;
+ }
+
+ S nextLeft;
+
+ while (true) {
+ if (mLeftCursor.hasNext()) {
+ nextLeft = mLeftCursor.next();
+ } else {
+ close();
+ return false;
+ }
+ if (mNextRight == null) {
+ if (mRightCursor.hasNext()) {
+ mNextRight = mRightCursor.next();
+ } else {
+ mNext = nextLeft;
+ return true;
+ }
+ }
+
+ while (true) {
+ int result = mOrder.compare(nextLeft, mNextRight);
+ if (result < 0) {
+ mNext = nextLeft;
+ return true;
+ } else if (result > 0) {
+ if (mRightCursor.hasNext()) {
+ mNextRight = mRightCursor.next();
+ } else {
+ mNextRight = null;
+ mNext = nextLeft;
+ return true;
+ }
+ } else {
+ break;
+ }
+ }
+ }
+ }
+
+ public synchronized S next() throws FetchException {
+ if (hasNext()) {
+ S next = mNext;
+ mNext = null;
+ return next;
+ }
+ throw new NoSuchElementException();
+ }
+}
diff --git a/src/main/java/com/amazon/carbonado/cursor/EmptyCursor.java b/src/main/java/com/amazon/carbonado/cursor/EmptyCursor.java
new file mode 100644
index 0000000..db3778b
--- /dev/null
+++ b/src/main/java/com/amazon/carbonado/cursor/EmptyCursor.java
@@ -0,0 +1,103 @@
+/*
+ * Copyright 2006 Amazon Technologies, Inc. or its affiliates.
+ * Amazon, Amazon.com and Carbonado are trademarks or registered trademarks
+ * of Amazon Technologies, Inc. or its affiliates. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.amazon.carbonado.cursor;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+import com.amazon.carbonado.Cursor;
+
+/**
+ * Special cursor implementation that is empty.
+ *
+ * @author Brian S O'Neill
+ */
+public class EmptyCursor<S> implements Cursor<S> {
+ private static final Cursor EMPTY_CURSOR = new EmptyCursor();
+
+ /**
+ * Returns the singleton empty cursor instance.
+ */
+ @SuppressWarnings("unchecked")
+ public static <S> Cursor<S> getEmptyCursor() {
+ return EMPTY_CURSOR;
+ }
+
+ // Package-private, to be used by test suite
+ EmptyCursor() {
+ }
+
+ /**
+ * Does nothing.
+ */
+ public void close() {
+ }
+
+ /**
+ * Always returns false.
+ */
+ public boolean hasNext() {
+ return false;
+ }
+
+ /**
+ * Always throws NoSuchElementException.
+ */
+ public S next() {
+ throw new NoSuchElementException();
+ }
+
+ /**
+ * Always returns 0.
+ */
+ public int skipNext(int amount) {
+ return 0;
+ }
+
+ /**
+ * Performs no copy and always returns 0.
+ */
+ public int copyInto(Collection<? super S> c) {
+ return 0;
+ }
+
+ /**
+ * Performs no copy and always returns 0.
+ */
+ public int copyInto(Collection<? super S> c, int limit) {
+ return 0;
+ }
+
+ /**
+ * Always returns an empty list.
+ */
+ public List<S> toList() {
+ return Collections.emptyList();
+ }
+
+ /**
+ * Always returns an empty list.
+ */
+ public List<S> toList(int limit) {
+ return Collections.emptyList();
+ }
+}
+
diff --git a/src/main/java/com/amazon/carbonado/cursor/FilteredCursor.java b/src/main/java/com/amazon/carbonado/cursor/FilteredCursor.java
new file mode 100644
index 0000000..60b3c81
--- /dev/null
+++ b/src/main/java/com/amazon/carbonado/cursor/FilteredCursor.java
@@ -0,0 +1,148 @@
+/*
+ * Copyright 2006 Amazon Technologies, Inc. or its affiliates.
+ * Amazon, Amazon.com and Carbonado are trademarks or registered trademarks
+ * of Amazon Technologies, Inc. or its affiliates. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.amazon.carbonado.cursor;
+
+import java.util.NoSuchElementException;
+
+import com.amazon.carbonado.Cursor;
+import com.amazon.carbonado.FetchException;
+import com.amazon.carbonado.FetchInterruptedException;
+import com.amazon.carbonado.Storable;
+
+import com.amazon.carbonado.filter.ClosedFilter;
+import com.amazon.carbonado.filter.Filter;
+import com.amazon.carbonado.filter.FilterValues;
+import com.amazon.carbonado.filter.OpenFilter;
+
+/**
+ * Wraps another cursor and applies custom filtering to reduce the set of
+ * results.
+ *
+ * @author Brian S O'Neill
+ */
+public abstract class FilteredCursor<S> extends AbstractCursor<S> {
+ /**
+ * Returns a Cursor that is filtered by the given Filter and FilterValues.
+ * The given Filter must be composed only of the same PropertyFilter
+ * instances as used to construct the FilterValues. An
+ * IllegalStateException will result otherwise.
+ *
+ * @param filter filter to apply
+ * @param filterValues values for filter
+ * @param cursor cursor to wrap
+ * @return wrapped cursor which filters results
+ * @throws IllegalStateException if any values are not specified
+ * @throws IllegalArgumentException if filter is closed
+ */
+ public static <S extends Storable> Cursor<S> applyFilter(Filter<S> filter,
+ FilterValues<S> filterValues,
+ Cursor<S> cursor)
+ {
+ if (filter instanceof OpenFilter) {
+ return cursor;
+ }
+ if (filter instanceof ClosedFilter) {
+ throw new IllegalArgumentException();
+ }
+
+ return FilteredCursorGenerator.getFactory(filter)
+ .newFilteredCursor(cursor, filterValues.getValuesFor(filter));
+ }
+
+ private final Cursor<S> mCursor;
+
+ private S mNext;
+
+ protected FilteredCursor(Cursor<S> cursor) {
+ if (cursor == null) {
+ throw new IllegalArgumentException();
+ }
+ mCursor = cursor;
+ }
+
+ /**
+ * @return false if object should not be in results
+ */
+ protected abstract boolean isAllowed(S storable);
+
+ public void close() throws FetchException {
+ synchronized (mCursor) {
+ mCursor.close();
+ mNext = null;
+ }
+ }
+
+ public boolean hasNext() throws FetchException {
+ synchronized (mCursor) {
+ if (mNext != null) {
+ return true;
+ }
+ try {
+ int count = 0;
+ while (mCursor.hasNext()) {
+ S next = mCursor.next();
+ if (isAllowed(next)) {
+ mNext = next;
+ return true;
+ }
+ interruptCheck(++count);
+ }
+ } catch (NoSuchElementException e) {
+ }
+ return false;
+ }
+ }
+
+ public S next() throws FetchException {
+ synchronized (mCursor) {
+ if (hasNext()) {
+ S next = mNext;
+ mNext = null;
+ return next;
+ }
+ throw new NoSuchElementException();
+ }
+ }
+
+ public int skipNext(int amount) throws FetchException {
+ synchronized (mCursor) {
+ if (amount <= 0) {
+ if (amount < 0) {
+ throw new IllegalArgumentException("Cannot skip negative amount: " + amount);
+ }
+ return 0;
+ }
+
+ int count = 0;
+ while (--amount >= 0 && hasNext()) {
+ interruptCheck(++count);
+ mNext = null;
+ }
+
+ return count;
+ }
+ }
+
+ private void interruptCheck(int count) throws FetchException {
+ if ((count & ~0xff) == 0 && Thread.interrupted()) {
+ close();
+ throw new FetchInterruptedException();
+ }
+ }
+}
diff --git a/src/main/java/com/amazon/carbonado/cursor/FilteredCursorGenerator.java b/src/main/java/com/amazon/carbonado/cursor/FilteredCursorGenerator.java
new file mode 100644
index 0000000..ea1baa8
--- /dev/null
+++ b/src/main/java/com/amazon/carbonado/cursor/FilteredCursorGenerator.java
@@ -0,0 +1,665 @@
+/*
+ * Copyright 2006 Amazon Technologies, Inc. or its affiliates.
+ * Amazon, Amazon.com and Carbonado are trademarks or registered trademarks
+ * of Amazon Technologies, Inc. or its affiliates. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.amazon.carbonado.cursor;
+
+import java.lang.reflect.Method;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.Stack;
+
+import org.cojen.classfile.ClassFile;
+import org.cojen.classfile.CodeAssembler;
+import org.cojen.classfile.CodeBuilder;
+import org.cojen.classfile.Label;
+import org.cojen.classfile.LocalVariable;
+import org.cojen.classfile.MethodInfo;
+import org.cojen.classfile.Modifiers;
+import org.cojen.classfile.Opcode;
+import org.cojen.classfile.TypeDesc;
+import static org.cojen.classfile.TypeDesc.*;
+
+import org.cojen.util.ClassInjector;
+import org.cojen.util.WeakIdentityMap;
+
+import com.amazon.carbonado.Cursor;
+import com.amazon.carbonado.Storable;
+
+import com.amazon.carbonado.filter.AndFilter;
+import com.amazon.carbonado.filter.OrFilter;
+import com.amazon.carbonado.filter.Filter;
+import com.amazon.carbonado.filter.PropertyFilter;
+import com.amazon.carbonado.filter.RelOp;
+import com.amazon.carbonado.filter.Visitor;
+
+import com.amazon.carbonado.info.ChainedProperty;
+import com.amazon.carbonado.info.StorableProperty;
+
+import com.amazon.carbonado.util.QuickConstructorGenerator;
+
+
+/**
+ * Generates Cursor implementations that wrap another Cursor, applying a
+ * property matching filter on each result.
+ *
+ * @author Brian S O'Neill
+ * @see FilteredCursor
+ */
+class FilteredCursorGenerator {
+ private static Map cCache = new WeakIdentityMap();
+
+ /**
+ * Returns a factory for creating new filtered cursor instances.
+ *
+ * @param filter filter specification
+ * @throws IllegalArgumentException if filter is null
+ * @throws UnsupportedOperationException if filter is not supported
+ */
+ @SuppressWarnings("unchecked")
+ static <S extends Storable> Factory<S> getFactory(Filter<S> filter) {
+ if (filter == null) {
+ throw new IllegalArgumentException();
+ }
+ synchronized (cCache) {
+ Factory<S> factory = (Factory<S>) cCache.get(filter);
+ if (factory != null) {
+ return factory;
+ }
+ Class<Cursor<S>> clazz = generateClass(filter);
+ factory = QuickConstructorGenerator.getInstance(clazz, Factory.class);
+ cCache.put(filter, factory);
+ return factory;
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ private static <S extends Storable> Class<Cursor<S>> generateClass(Filter<S> filter) {
+ filter.accept(new Validator<S>(), null);
+
+ Class<S> type = filter.getStorableType();
+
+ String packageName;
+ {
+ String name = type.getName();
+ int index = name.lastIndexOf('.');
+ if (index >= 0) {
+ packageName = name.substring(0, index);
+ } else {
+ packageName = "";
+ }
+ }
+
+ // Try to generate against the same loader as the storable type. This
+ // allows the generated class access to it, preventing a
+ // NoClassDefFoundError.
+ ClassLoader loader = type.getClassLoader();
+
+ ClassInjector ci = ClassInjector.create(packageName + ".FilteredCursor", loader);
+ ClassFile cf = new ClassFile(ci.getClassName(), FilteredCursor.class);
+ cf.markSynthetic();
+ cf.setSourceFile(FilteredCursorGenerator.class.getName());
+ cf.setTarget("1.5");
+
+ // Begin constructor definition.
+ CodeBuilder ctorBuilder;
+ {
+ TypeDesc cursorType = TypeDesc.forClass(Cursor.class);
+ TypeDesc objectArrayType = TypeDesc.forClass(Object[].class);
+ TypeDesc[] params = {cursorType, objectArrayType};
+ MethodInfo ctor = cf.addConstructor(Modifiers.PUBLIC, params);
+ ctorBuilder = new CodeBuilder(ctor);
+ ctorBuilder.loadThis();
+ ctorBuilder.loadLocal(ctorBuilder.getParameter(0));
+ ctorBuilder.invokeSuperConstructor(new TypeDesc[] {cursorType});
+ }
+
+ // Begin isAllowed method definition.
+ CodeBuilder isAllowedBuilder;
+ LocalVariable storableVar;
+ {
+ TypeDesc[] params = {TypeDesc.OBJECT};
+ MethodInfo mi = cf.addMethod(Modifiers.PUBLIC, "isAllowed", BOOLEAN, params);
+ isAllowedBuilder = new CodeBuilder(mi);
+
+ storableVar = isAllowedBuilder.getParameter(0);
+
+ // Filter out any instances of null. Shouldn't happen though.
+ isAllowedBuilder.loadLocal(storableVar);
+ Label notNull = isAllowedBuilder.createLabel();
+ isAllowedBuilder.ifNullBranch(notNull, false);
+ isAllowedBuilder.loadConstant(false);
+ isAllowedBuilder.returnValue(BOOLEAN);
+ notNull.setLocation();
+
+ // Cast the parameter to the expected type.
+ isAllowedBuilder.loadLocal(storableVar);
+ TypeDesc storableType = TypeDesc.forClass(type);
+ isAllowedBuilder.checkCast(storableType);
+ storableVar = isAllowedBuilder.createLocalVariable("storable", storableType);
+ isAllowedBuilder.storeLocal(storableVar);
+ }
+
+ filter.accept(new CodeGen<S>(cf, ctorBuilder, isAllowedBuilder, storableVar), null);
+
+ // Finish constructor.
+ ctorBuilder.returnVoid();
+
+ return (Class<Cursor<S>>) ci.defineClass(cf);
+ }
+
+ public static interface Factory<S extends Storable> {
+ /**
+ * @param cursor cursor to wrap and filter
+ * @param filterValues values corresponding to original filter used to create this factory
+ */
+ Cursor<S> newFilteredCursor(Cursor<S> cursor, Object... filterValues);
+ }
+
+ /**
+ * Ensures properties can be read and that relational operation is
+ * supported.
+ */
+ private static class Validator<S extends Storable> extends Visitor<S, Object, Object> {
+ Validator() {
+ }
+
+ public Object visit(PropertyFilter<S> filter, Object param) {
+ ChainedProperty<S> chained = filter.getChainedProperty();
+
+ switch (filter.getOperator()) {
+ case EQ: case NE:
+ // Use equals() method instead of comparator.
+ break;
+ default:
+ TypeDesc typeDesc = TypeDesc.forClass(chained.getType()).toObjectType();
+
+ if (!Comparable.class.isAssignableFrom(typeDesc.toClass())) {
+ throw new UnsupportedOperationException
+ ("Property \"" + chained + "\" does not implement Comparable");
+ }
+ break;
+ }
+
+ // Follow the chain, verifying that each property has an accessible
+ // read method.
+
+ checkForReadMethod(chained, chained.getPrimeProperty());
+ for (int i=0; i<chained.getChainCount(); i++) {
+ checkForReadMethod(chained, chained.getChainedProperty(i));
+ }
+
+ return null;
+ }
+
+ private void checkForReadMethod(ChainedProperty<S> chained,
+ StorableProperty<?> property) {
+ if (property.getReadMethod() == null) {
+ String msg;
+ if (chained.getChainCount() == 0) {
+ msg = "Property \"" + property.getName() + "\" cannot be read";
+ } else {
+ msg = "Property \"" + property.getName() + "\" of \"" + chained +
+ "\" cannot be read";
+ }
+ throw new UnsupportedOperationException(msg);
+ }
+ }
+ }
+
+ private static class CodeGen<S extends Storable> extends Visitor<S, Object, Object> {
+ private static String FIELD_PREFIX = "value$";
+
+ private final ClassFile mClassFile;
+ private final CodeBuilder mCtorBuilder;
+ private final CodeBuilder mIsAllowedBuilder;
+ private final LocalVariable mStorableVar;
+
+ private final Stack<Scope> mScopeStack;
+
+ private int mPropertyOrdinal;
+
+ CodeGen(ClassFile cf,
+ CodeBuilder ctorBuilder,
+ CodeBuilder isAllowedBuilder, LocalVariable storableVar) {
+ mClassFile = cf;
+ mCtorBuilder = ctorBuilder;
+ mIsAllowedBuilder = isAllowedBuilder;
+ mStorableVar = storableVar;
+ mScopeStack = new Stack<Scope>();
+ mScopeStack.push(new Scope(null, null));
+ }
+
+ public Object visit(OrFilter<S> filter, Object param) {
+ Label failLocation = mIsAllowedBuilder.createLabel();
+ // Inherit success location to short-circuit if 'or' test succeeds.
+ mScopeStack.push(new Scope(failLocation, getScope().mSuccessLocation));
+ filter.getLeftFilter().accept(this, param);
+ failLocation.setLocation();
+ mScopeStack.pop();
+ filter.getRightFilter().accept(this, param);
+ return null;
+ }
+
+ public Object visit(AndFilter<S> filter, Object param) {
+ Label successLocation = mIsAllowedBuilder.createLabel();
+ // Inherit fail location to short-circuit if 'and' test fails.
+ mScopeStack.push(new Scope(getScope().mFailLocation, successLocation));
+ filter.getLeftFilter().accept(this, param);
+ successLocation.setLocation();
+ mScopeStack.pop();
+ filter.getRightFilter().accept(this, param);
+ return null;
+ }
+
+ public Object visit(PropertyFilter<S> filter, Object param) {
+ ChainedProperty<S> chained = filter.getChainedProperty();
+ TypeDesc type = TypeDesc.forClass(chained.getType());
+ TypeDesc fieldType = actualFieldType(type);
+ String fieldName = FIELD_PREFIX + mPropertyOrdinal;
+
+ // Define storage field.
+ mClassFile.addField(Modifiers.PRIVATE.toFinal(true), fieldName, fieldType);
+
+ // Add code to constructor to store value into field.
+ CodeBuilder b = mCtorBuilder;
+ b.loadThis();
+ b.loadLocal(b.getParameter(1));
+ b.loadConstant(mPropertyOrdinal);
+ b.loadFromArray(OBJECT);
+ b.checkCast(type.toObjectType());
+ b.convert(type.toObjectType(), fieldType, CodeAssembler.CONVERT_FP_BITS);
+ b.storeField(fieldName, fieldType);
+
+ // Add code to load property value to stack.
+ b = mIsAllowedBuilder;
+ b.loadLocal(mStorableVar);
+ loadProperty(b, chained.getPrimeProperty());
+ for (int i=0; i<chained.getChainCount(); i++) {
+ // Check if last loaded property was null, and fail if so.
+ b.dup();
+ Label notNull = b.createLabel();
+ b.ifNullBranch(notNull, false);
+ b.pop();
+ getScope().fail(b);
+ notNull.setLocation();
+
+ // Now load next property in chain.
+ loadProperty(b, chained.getChainedProperty(i));
+ }
+
+ addPropertyFilter(b, type, filter.getOperator());
+
+ mPropertyOrdinal++;
+ return null;
+ }
+
+ private Scope getScope() {
+ return mScopeStack.peek();
+ }
+
+ private void loadProperty(CodeBuilder b, StorableProperty<?> property) {
+ Method readMethod = property.getReadMethod();
+ if (readMethod == null) {
+ // Invoke synthetic package accessible method.
+ String readName = property.getReadMethodName();
+ try {
+ readMethod = property.getEnclosingType().getDeclaredMethod(readName);
+ } catch (NoSuchMethodException e) {
+ // This shouldn't happen since it was checked for earlier.
+ throw new IllegalArgumentException
+ ("Property \"" + property.getName() + "\" cannot be read");
+ }
+ }
+ b.invoke(readMethod);
+ }
+
+ private void addPropertyFilter(CodeBuilder b, TypeDesc type, RelOp relOp) {
+ TypeDesc fieldType = actualFieldType(type);
+ String fieldName = FIELD_PREFIX + mPropertyOrdinal;
+
+ if (type.getTypeCode() == OBJECT_CODE) {
+ // Check if actual property being examined is null.
+
+ LocalVariable actualValue = b.createLocalVariable("temp", type);
+ b.storeLocal(actualValue);
+ b.loadLocal(actualValue);
+ Label notNull = b.createLabel();
+
+ switch (relOp) {
+ case EQ: default: // actual = ?
+ // If actual value is null, so test value must be null also
+ // to succeed.
+ case LE: // actual <= ?
+ // If actual value is null, and since null is high, test
+ // value must be null also to succeed.
+ b.ifNullBranch(notNull, false);
+ b.loadThis();
+ b.loadField(fieldName, fieldType);
+ getScope().successIfNullElseFail(b, true);
+ break;
+ case NE: // actual != ?
+ // If actual value is null, so test value must be non-null
+ // to succeed.
+ case GT: // actual > ?
+ // If actual value is null, and since null is high, test
+ // value must be non-null to succeed.
+ b.ifNullBranch(notNull, false);
+ b.loadThis();
+ b.loadField(fieldName, fieldType);
+ getScope().successIfNullElseFail(b, false);
+ break;
+ case LT: // actual < ?
+ // Since null is high, always fail if actual value is
+ // null. Don't need to examine test value.
+ getScope().failIfNull(b, true);
+ break;
+ case GE: // actual >= ?
+ // Since null is high, always succeed if actual value is
+ // null. Don't need to examine test value.
+ getScope().successIfNull(b, true);
+ break;
+ }
+
+ notNull.setLocation();
+
+ // At this point, actual property value is known to not be null.
+
+ // Check if property being tested for is null.
+ b.loadThis();
+ b.loadField(fieldName, fieldType);
+
+ switch (relOp) {
+ case EQ: default: // non-null actual = ?
+ // If test value is null, fail.
+ case GT: // non-null actual > ?
+ // If test value is null, and since null is high, fail.
+ case GE: // non-null actual >= ?
+ // If test value is null, and since null is high, fail.
+ getScope().failIfNull(b, true);
+ break;
+ case NE: // non-null actual != ?
+ // If test value is null, succeed
+ case LE: // non-null actual <= ?
+ // If test value is null, and since null is high, succeed.
+ case LT: // non-null actual < ?
+ // If test value is null, and since null is high, succeed.
+ getScope().successIfNull(b, true);
+ break;
+ }
+
+ b.loadLocal(actualValue);
+ }
+
+ // When this point is reached, actual property value is on the
+ // operand stack, and it is non-null. Property value to test
+ // against is not on the stack, but it is known to be non-null.
+
+ TypeDesc primitiveType = type.toPrimitiveType();
+
+ if (primitiveType == null) {
+ b.loadThis();
+ b.loadField(fieldName, fieldType);
+
+ // Do object comparison
+ switch (relOp) {
+ case EQ: case NE: default:
+ if (fieldType.isArray()) {
+ TypeDesc arraysDesc = TypeDesc.forClass(Arrays.class);
+ TypeDesc componentType = fieldType.getComponentType();
+ TypeDesc arrayType = fieldType;
+ String methodName;
+ if (componentType.isArray()) {
+ methodName = "deepEquals";
+ arrayType = OBJECT.toArrayType();
+ } else {
+ methodName = "equals";
+ if (!componentType.isPrimitive()) {
+ arrayType = OBJECT.toArrayType();
+ }
+ }
+ b.invokeStatic(arraysDesc, methodName, BOOLEAN,
+ new TypeDesc[] {arrayType, arrayType});
+ } else {
+ b.invokeVirtual(OBJECT, "equals", BOOLEAN, new TypeDesc[] {OBJECT});
+ }
+ // Success if boolean value is non-zero for EQ, zero for NE.
+ getScope().successIfZeroComparisonElseFail(b, relOp.reverse());
+ break;
+
+ case GT: case GE: case LE: case LT:
+ // Compare method exists because it was checked for earlier
+ b.invokeInterface(TypeDesc.forClass(Comparable.class), "compareTo",
+ INT, new TypeDesc[] {OBJECT});
+ getScope().successIfZeroComparisonElseFail(b, relOp);
+ break;
+ }
+ } else {
+ if (!type.isPrimitive()) {
+ // Extract primitive value out of wrapper.
+ b.convert(type, primitiveType);
+ }
+
+ // Floating point values are compared based on actual
+ // bits. This allows NaN to be considered in the comparison.
+ if (primitiveType == FLOAT) {
+ b.convert(primitiveType, INT, CodeBuilder.CONVERT_FP_BITS);
+ primitiveType = INT;
+ } else if (primitiveType == DOUBLE) {
+ b.convert(primitiveType, LONG, CodeBuilder.CONVERT_FP_BITS);
+ primitiveType = LONG;
+ }
+
+ b.loadThis();
+ b.loadField(fieldName, fieldType);
+ // No need to do anything special for floating point since it
+ // has been pre-converted to bits.
+ b.convert(fieldType, primitiveType);
+
+ switch (primitiveType.getTypeCode()) {
+ case INT_CODE: default:
+ getScope().successIfComparisonElseFail(b, relOp);
+ break;
+
+ case LONG_CODE:
+ b.math(Opcode.LCMP);
+ getScope().successIfZeroComparisonElseFail(b, relOp);
+ break;
+ }
+ }
+ }
+
+ /**
+ * Returns the actual field type used to store the given property
+ * type. Floating point values are represented in their "bit" form, in
+ * order to compare against NaN.
+ */
+ private static TypeDesc actualFieldType(TypeDesc type) {
+ if (type.toPrimitiveType() == FLOAT) {
+ if (type.isPrimitive()) {
+ type = INT;
+ } else {
+ type = INT.toObjectType();
+ }
+ } else if (type.toPrimitiveType() == DOUBLE) {
+ if (type.isPrimitive()) {
+ type = LONG;
+ } else {
+ type = LONG.toObjectType();
+ }
+ }
+ return type;
+ }
+
+ /**
+ * Defines boolean logic branching scope.
+ */
+ private static class Scope {
+ // If null, return false on test failure.
+ final Label mFailLocation;
+ // If null, return true on test success.
+ final Label mSuccessLocation;
+
+ Scope(Label failLocation, Label successLocation) {
+ mFailLocation = failLocation;
+ mSuccessLocation = successLocation;
+ }
+
+ void fail(CodeBuilder b) {
+ if (mFailLocation != null) {
+ b.branch(mFailLocation);
+ } else {
+ b.loadConstant(false);
+ b.returnValue(BOOLEAN);
+ }
+ }
+
+ /**
+ * Branch to the fail location if the value on the stack is
+ * null. When choice is false, fail on non-null.
+ *
+ * @param choice if true, fail when null, else fail when not null
+ */
+ void failIfNull(CodeBuilder b, boolean choice) {
+ if (mFailLocation != null) {
+ b.ifNullBranch(mFailLocation, choice);
+ } else {
+ Label pass = b.createLabel();
+ b.ifNullBranch(pass, !choice);
+ b.loadConstant(false);
+ b.returnValue(BOOLEAN);
+ pass.setLocation();
+ }
+ }
+
+ void success(CodeBuilder b) {
+ if (mSuccessLocation != null) {
+ b.branch(mSuccessLocation);
+ } else {
+ b.loadConstant(true);
+ b.returnValue(BOOLEAN);
+ }
+ }
+
+ /**
+ * Branch to the success location if the value on the stack is
+ * null. When choice is false, success on non-null.
+ *
+ * @param choice if true, success when null, else success when not null
+ */
+ void successIfNull(CodeBuilder b, boolean choice) {
+ if (mSuccessLocation != null) {
+ b.ifNullBranch(mSuccessLocation, choice);
+ } else {
+ Label pass = b.createLabel();
+ b.ifNullBranch(pass, !choice);
+ b.loadConstant(true);
+ b.returnValue(BOOLEAN);
+ pass.setLocation();
+ }
+ }
+
+ /**
+ * Branch to the success location if the value on the stack is
+ * null. When choice is false, success on non-null. If the success
+ * condition is not met, the fail location is branched to.
+ *
+ * @param choice if true, success when null, else success when not null
+ */
+ void successIfNullElseFail(CodeBuilder b, boolean choice) {
+ if (mSuccessLocation != null) {
+ b.ifNullBranch(mSuccessLocation, choice);
+ fail(b);
+ } else if (mFailLocation != null) {
+ b.ifNullBranch(mFailLocation, !choice);
+ success(b);
+ } else {
+ Label success = b.createLabel();
+ b.ifNullBranch(success, choice);
+ b.loadConstant(false);
+ b.returnValue(BOOLEAN);
+ success.setLocation();
+ b.loadConstant(true);
+ b.returnValue(BOOLEAN);
+ }
+ }
+
+ void successIfZeroComparisonElseFail(CodeBuilder b, RelOp relOp) {
+ if (mSuccessLocation != null) {
+ b.ifZeroComparisonBranch(mSuccessLocation, relOpToChoice(relOp));
+ fail(b);
+ } else if (mFailLocation != null) {
+ b.ifZeroComparisonBranch(mFailLocation, relOpToChoice(relOp.reverse()));
+ success(b);
+ } else {
+ if (relOp == RelOp.NE) {
+ b.returnValue(BOOLEAN);
+ } else if (relOp == RelOp.EQ) {
+ b.loadConstant(1);
+ b.math(Opcode.IAND);
+ b.loadConstant(1);
+ b.math(Opcode.IXOR);
+ b.returnValue(BOOLEAN);
+ } else {
+ Label success = b.createLabel();
+ b.ifZeroComparisonBranch(success, relOpToChoice(relOp));
+ b.loadConstant(false);
+ b.returnValue(BOOLEAN);
+ success.setLocation();
+ b.loadConstant(true);
+ b.returnValue(BOOLEAN);
+ }
+ }
+ }
+
+ void successIfComparisonElseFail(CodeBuilder b, RelOp relOp) {
+ if (mSuccessLocation != null) {
+ b.ifComparisonBranch(mSuccessLocation, relOpToChoice(relOp));
+ fail(b);
+ } else if (mFailLocation != null) {
+ b.ifComparisonBranch(mFailLocation, relOpToChoice(relOp.reverse()));
+ success(b);
+ } else {
+ Label success = b.createLabel();
+ b.ifComparisonBranch(success, relOpToChoice(relOp));
+ b.loadConstant(false);
+ b.returnValue(BOOLEAN);
+ success.setLocation();
+ b.loadConstant(true);
+ b.returnValue(BOOLEAN);
+ }
+ }
+
+ private String relOpToChoice(RelOp relOp) {
+ switch (relOp) {
+ case EQ: default:
+ return "==";
+ case NE:
+ return "!=";
+ case LT:
+ return "<";
+ case GE:
+ return ">=";
+ case GT:
+ return ">";
+ case LE:
+ return "<=";
+ }
+ }
+ }
+ }
+}
diff --git a/src/main/java/com/amazon/carbonado/cursor/GroupedCursor.java b/src/main/java/com/amazon/carbonado/cursor/GroupedCursor.java
new file mode 100644
index 0000000..0b17e50
--- /dev/null
+++ b/src/main/java/com/amazon/carbonado/cursor/GroupedCursor.java
@@ -0,0 +1,205 @@
+/*
+ * Copyright 2006 Amazon Technologies, Inc. or its affiliates.
+ * Amazon, Amazon.com and Carbonado are trademarks or registered trademarks
+ * of Amazon Technologies, Inc. or its affiliates. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.amazon.carbonado.cursor;
+
+import java.util.Comparator;
+import java.util.NoSuchElementException;
+
+import org.cojen.util.BeanComparator;
+
+import com.amazon.carbonado.Cursor;
+import com.amazon.carbonado.FetchException;
+import com.amazon.carbonado.FetchInterruptedException;
+
+/**
+ * Abstract cursor for aggregation and finding distinct data. The source cursor
+ * must be ordered in some fashion by the grouping properties. The sequence of
+ * properties must match, but it does not matter if they are ascending or
+ * descending.
+ *
+ * @author Brian S O'Neill
+ * @see SortedCursor
+ */
+public abstract class GroupedCursor<S, G> extends AbstractCursor<G> {
+ private final Cursor<S> mCursor;
+ private final Comparator<S> mGroupComparator;
+
+ private S mGroupLeader;
+ private G mNextAggregate;
+
+ /**
+ * Create a GroupedCursor with an existing group comparator. The comparator
+ * defines the ordering of the source cursor, and it should be a partial
+ * odering. If group comparator defines a total ordering, then all groups
+ * have one member.
+ *
+ * @param cursor source of elements which must be ordered properly
+ * @param groupComparator comparator which defines ordering of source cursor
+ */
+ protected GroupedCursor(Cursor<S> cursor, Comparator<S> groupComparator) {
+ if (cursor == null || groupComparator == null) {
+ throw new IllegalArgumentException();
+ }
+ mCursor = cursor;
+ mGroupComparator = groupComparator;
+ }
+
+ /**
+ * Create a GroupedCursor using properties to define the group
+ * comparator. The set of properties defines the ordering of the source
+ * cursor, and it should be a partial ordering. If properties define a
+ * total ordering, then all groups have one member.
+ *
+ * @param cursor source of elements which must be ordered properly
+ * @param type type of storable to create cursor for
+ * @param groupProperties list of properties to group by
+ * @throws IllegalArgumentException if any property is null or not a member
+ * of storable type
+ */
+ protected GroupedCursor(Cursor<S> cursor, Class<S> type, String... groupProperties) {
+ if (cursor == null) {
+ throw new IllegalArgumentException();
+ }
+ mCursor = cursor;
+ mGroupComparator = SortedCursor.createComparator(type, groupProperties);
+ }
+
+ /**
+ * Returns the comparator used to identify group boundaries.
+ */
+ public Comparator<S> comparator() {
+ return mGroupComparator;
+ }
+
+ /**
+ * This method is called for the first entry in a group. This method is not
+ * called again until after finishGroup is called.
+ *
+ * @param groupLeader first entry in group
+ */
+ protected abstract void beginGroup(S groupLeader) throws FetchException;
+
+ /**
+ * This method is called when more entries are found for the current
+ * group. This method is not called until after beginGroup has been
+ * called. It may called multiple times until finishGroup is called.
+ *
+ * @param groupMember additional entry in group
+ */
+ protected abstract void addToGroup(S groupMember) throws FetchException;
+
+ /**
+ * This method is called when a group is finished, and it can return an
+ * aggregate. Simply return null if aggregate should be filtered out.
+ *
+ * @return aggregate, or null to filter it out
+ */
+ protected abstract G finishGroup() throws FetchException;
+
+ public void close() throws FetchException {
+ synchronized (mCursor) {
+ mCursor.close();
+ mGroupLeader = null;
+ mNextAggregate = null;
+ }
+ }
+
+ public boolean hasNext() throws FetchException {
+ synchronized (mCursor) {
+ if (mNextAggregate != null) {
+ return true;
+ }
+
+ try {
+ int count = 0;
+ if (mCursor.hasNext()) {
+ if (mGroupLeader == null) {
+ beginGroup(mGroupLeader = mCursor.next());
+ }
+
+ while (mCursor.hasNext()) {
+ S groupMember = mCursor.next();
+
+ if (mGroupComparator.compare(mGroupLeader, groupMember) == 0) {
+ addToGroup(groupMember);
+ } else {
+ G aggregate = finishGroup();
+
+ beginGroup(mGroupLeader = groupMember);
+
+ if (aggregate != null) {
+ mNextAggregate = aggregate;
+ return true;
+ }
+ }
+
+ interruptCheck(++count);
+ }
+
+ G aggregate = finishGroup();
+
+ if (aggregate != null) {
+ mNextAggregate = aggregate;
+ return true;
+ }
+ }
+ } catch (NoSuchElementException e) {
+ }
+
+ return false;
+ }
+ }
+
+ public G next() throws FetchException {
+ synchronized (mCursor) {
+ if (hasNext()) {
+ G next = mNextAggregate;
+ mNextAggregate = null;
+ return next;
+ }
+ throw new NoSuchElementException();
+ }
+ }
+
+ public int skipNext(int amount) throws FetchException {
+ synchronized (mCursor) {
+ if (amount <= 0) {
+ if (amount < 0) {
+ throw new IllegalArgumentException("Cannot skip negative amount: " + amount);
+ }
+ return 0;
+ }
+
+ int count = 0;
+ while (--amount >= 0 && hasNext()) {
+ interruptCheck(++count);
+ mNextAggregate = null;
+ }
+
+ return count;
+ }
+ }
+
+ private void interruptCheck(int count) throws FetchException {
+ if ((count & ~0xff) == 0 && Thread.interrupted()) {
+ close();
+ throw new FetchInterruptedException();
+ }
+ }
+}
diff --git a/src/main/java/com/amazon/carbonado/cursor/IntersectionCursor.java b/src/main/java/com/amazon/carbonado/cursor/IntersectionCursor.java
new file mode 100644
index 0000000..9d11b2f
--- /dev/null
+++ b/src/main/java/com/amazon/carbonado/cursor/IntersectionCursor.java
@@ -0,0 +1,118 @@
+/*
+ * Copyright 2006 Amazon Technologies, Inc. or its affiliates.
+ * Amazon, Amazon.com and Carbonado are trademarks or registered trademarks
+ * of Amazon Technologies, Inc. or its affiliates. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.amazon.carbonado.cursor;
+
+import java.util.Comparator;
+import java.util.NoSuchElementException;
+
+import com.amazon.carbonado.FetchException;
+import com.amazon.carbonado.Cursor;
+
+/**
+ * Wraps two Cursors and performs a <i>set intersection</i> operation. In
+ * boolean logic, this is an <i>and</i> operation.
+ *
+ * <p>Both cursors must return results in the same order. Ordering is preserved
+ * by the intersection.
+ *
+ * @author Brian S O'Neill
+ * @see UnionCursor
+ * @see DifferenceCursor
+ * @see SymmetricDifferenceCursor
+ */
+public class IntersectionCursor<S> extends AbstractCursor<S> {
+ private final Cursor<S> mLeftCursor;
+ private final Cursor<S> mRightCursor;
+ private final Comparator<S> mOrder;
+
+ private S mNext;
+
+ /**
+ * @param left cursor to wrap
+ * @param right cursor to wrap
+ * @param order describes sort ordering of wrapped cursors, which must be
+ * a total ordering
+ */
+ public IntersectionCursor(Cursor<S> left, Cursor<S> right, Comparator<S> order) {
+ if (left == null || right == null || order == null) {
+ throw new IllegalArgumentException();
+ }
+ mLeftCursor = left;
+ mRightCursor = right;
+ mOrder = order;
+ }
+
+ public synchronized void close() throws FetchException {
+ mLeftCursor.close();
+ mRightCursor.close();
+ mNext = null;
+ }
+
+ public synchronized boolean hasNext() throws FetchException {
+ if (mNext != null) {
+ return true;
+ }
+
+ S nextLeft, nextRight;
+
+ if (mLeftCursor.hasNext()) {
+ nextLeft = mLeftCursor.next();
+ } else {
+ close();
+ return false;
+ }
+ if (mRightCursor.hasNext()) {
+ nextRight = mRightCursor.next();
+ } else {
+ close();
+ return false;
+ }
+
+ while (true) {
+ int result = mOrder.compare(nextLeft, nextRight);
+ if (result < 0) {
+ if (mLeftCursor.hasNext()) {
+ nextLeft = mLeftCursor.next();
+ } else {
+ close();
+ return false;
+ }
+ } else if (result > 0) {
+ if (mRightCursor.hasNext()) {
+ nextRight = mRightCursor.next();
+ } else {
+ close();
+ return false;
+ }
+ } else {
+ mNext = nextLeft;
+ return true;
+ }
+ }
+ }
+
+ public synchronized S next() throws FetchException {
+ if (hasNext()) {
+ S next = mNext;
+ mNext = null;
+ return next;
+ }
+ throw new NoSuchElementException();
+ }
+}
diff --git a/src/main/java/com/amazon/carbonado/cursor/IteratorCursor.java b/src/main/java/com/amazon/carbonado/cursor/IteratorCursor.java
new file mode 100644
index 0000000..0330629
--- /dev/null
+++ b/src/main/java/com/amazon/carbonado/cursor/IteratorCursor.java
@@ -0,0 +1,62 @@
+/*
+ * Copyright 2006 Amazon Technologies, Inc. or its affiliates.
+ * Amazon, Amazon.com and Carbonado are trademarks or registered trademarks
+ * of Amazon Technologies, Inc. or its affiliates. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.amazon.carbonado.cursor;
+
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+/**
+ * Adapts an Iterator into a Cursor.
+ *
+ * @author Brian S O'Neill
+ */
+public class IteratorCursor<S> extends AbstractCursor<S> {
+ private Iterator<S> mIterator;
+
+ /**
+ * @param iterable collection to iterate over, or null for empty cursor
+ */
+ public IteratorCursor(Iterable<S> iterable) {
+ this(iterable == null ? (Iterator<S>) null : iterable.iterator());
+ }
+
+ /**
+ * @param iterator iterator to wrap, or null for empty cursor
+ */
+ public IteratorCursor(Iterator<S> iterator) {
+ mIterator = iterator;
+ }
+
+ public void close() {
+ mIterator = null;
+ }
+
+ public boolean hasNext() {
+ Iterator it = mIterator;
+ return it != null && it.hasNext();
+ }
+
+ public S next() {
+ Iterator<S> it = mIterator;
+ if (it == null) {
+ throw new NoSuchElementException();
+ }
+ return it.next();
+ }
+}
diff --git a/src/main/java/com/amazon/carbonado/cursor/JoinedCursorFactory.java b/src/main/java/com/amazon/carbonado/cursor/JoinedCursorFactory.java
new file mode 100644
index 0000000..ae98a95
--- /dev/null
+++ b/src/main/java/com/amazon/carbonado/cursor/JoinedCursorFactory.java
@@ -0,0 +1,459 @@
+/*
+ * Copyright 2006 Amazon Technologies, Inc. or its affiliates.
+ * Amazon, Amazon.com and Carbonado are trademarks or registered trademarks
+ * of Amazon Technologies, Inc. or its affiliates. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.amazon.carbonado.cursor;
+
+import java.lang.reflect.Field;
+
+import java.util.Map;
+
+import org.cojen.classfile.ClassFile;
+import org.cojen.classfile.CodeBuilder;
+import org.cojen.classfile.FieldInfo;
+import org.cojen.classfile.Label;
+import org.cojen.classfile.LocalVariable;
+import org.cojen.classfile.MethodInfo;
+import org.cojen.classfile.Modifiers;
+import org.cojen.classfile.TypeDesc;
+
+import org.cojen.util.ClassInjector;
+import org.cojen.util.KeyFactory;
+import org.cojen.util.SoftValuedHashMap;
+import org.cojen.util.WeakIdentityMap;
+
+import com.amazon.carbonado.Cursor;
+import com.amazon.carbonado.FetchException;
+import com.amazon.carbonado.Query;
+import com.amazon.carbonado.Repository;
+import com.amazon.carbonado.RepositoryException;
+import com.amazon.carbonado.Storable;
+import com.amazon.carbonado.Storage;
+import com.amazon.carbonado.SupportException;
+
+import com.amazon.carbonado.info.ChainedProperty;
+import com.amazon.carbonado.info.StorableInfo;
+import com.amazon.carbonado.info.StorableIntrospector;
+import com.amazon.carbonado.info.StorableProperty;
+
+import com.amazon.carbonado.util.QuickConstructorGenerator;
+
+import com.amazon.carbonado.spi.CodeBuilderUtil;
+import static com.amazon.carbonado.spi.CommonMethodNames.*;
+
+/**
+ * Given two joined types <i>A</i> and <i>B</i>, this factory converts a cursor
+ * over type <i>A</i> into a cursor over type <i>B</i>. For example, consider
+ * two storable types, Employer and Person. A query filter for persons with a
+ * given employer might look like: {@code "employer.name = ?" } The join can be
+ * manually implemented by querying for employers and then using this factory
+ * to produce persons:
+ *
+ * <pre>
+ * JoinedCursorFactory&lt;Employer, Person&gt; factory = new JoinedCursorFactory&lt;Employer, Person&gt;
+ * (repo, Person.class, "employer", Employer.class);
+ *
+ * Cursor&lt;Employer&gt; employerCursor = repo.storageFor(Employer.class)
+ * .query("name = ?").with(...).fetch();
+ *
+ * Cursor&lt;Person&gt; personCursor = factory.join(employerCursor);
+ * </pre>
+ *
+ * Chained properties are supported as well. A query filter for persons with an
+ * employer in a given state might look like: {@code "employer.address.state = ?" }
+ * The join can be manually implemented as:
+ *
+ * <pre>
+ * JoinedCursorFactory&lt;Address, Person&gt; factory = new JoinedCursorFactory&lt;Address, Person&gt;
+ * (repo, Person.class, "employer.address", Address.class);
+ *
+ * Cursor&lt;Address&gt; addressCursor = repo.storageFor(Address.class)
+ * .query("state = ?").with(...).fetch();
+ *
+ * Cursor&lt;Person&gt; personCursor = factory.join(addressCursor);
+ * </pre>
+ *
+ * @author Brian S O'Neill
+ */
+public class JoinedCursorFactory<A, B extends Storable> {
+ private static final String STORAGE_FIELD_NAME = "storage";
+ private static final String QUERY_FIELD_NAME = "query";
+ private static final String QUERY_FILTER_FIELD_NAME = "queryFilter";
+ private static final String ACTIVE_A_FIELD_NAME = "active";
+
+ private static final Map<Object, Class> cJoinerCursorClassCache;
+
+ static {
+ cJoinerCursorClassCache = new SoftValuedHashMap();
+ }
+
+ private static synchronized <B extends Storable> Joiner<?, B>
+ newBasicJoiner(StorableProperty<B> bToAProperty, Storage<B> bStorage)
+ throws FetchException
+ {
+ Class<?> aType = bToAProperty.getType();
+
+ final Object key = KeyFactory.createKey
+ (new Object[] {aType,
+ bToAProperty.getEnclosingType(),
+ bToAProperty.getName()});
+
+ Class clazz = cJoinerCursorClassCache.get(key);
+
+ if (clazz == null) {
+ clazz = generateBasicJoinerCursor(aType, bToAProperty);
+ cJoinerCursorClassCache.put(key, clazz);
+ }
+
+ // Transforming cursor class may need a Query to operate on.
+ Query<B> bQuery = null;
+ try {
+ String filter = (String) clazz.getField(QUERY_FILTER_FIELD_NAME).get(null);
+ bQuery = bStorage.query(filter);
+ } catch (NoSuchFieldException e) {
+ } catch (IllegalAccessException e) {
+ }
+
+ BasicJoiner.Factory<?, B> factory = (BasicJoiner.Factory<?, B>) QuickConstructorGenerator
+ .getInstance(clazz, BasicJoiner.Factory.class);
+
+ return new BasicJoiner(factory, bStorage, bQuery);
+ }
+
+ private static <B extends Storable> Class<Cursor<B>>
+ generateBasicJoinerCursor(Class<?> aType, StorableProperty<B> bToAProperty)
+ {
+ final int propCount = bToAProperty.getJoinElementCount();
+
+ // Determine if join is one-to-one, in which case slightly more optimal
+ // code can be generated.
+ boolean isOneToOne = true;
+ for (int i=0; i<propCount; i++) {
+ if (!bToAProperty.getInternalJoinElement(i).isPrimaryKeyMember()) {
+ isOneToOne = false;
+ break;
+ }
+ if (!bToAProperty.getExternalJoinElement(i).isPrimaryKeyMember()) {
+ isOneToOne = false;
+ break;
+ }
+ }
+
+ Class<B> bType = bToAProperty.getEnclosingType();
+
+ String packageName;
+ {
+ String name = bType.getName();
+ int index = name.lastIndexOf('.');
+ if (index >= 0) {
+ packageName = name.substring(0, index);
+ } else {
+ packageName = "";
+ }
+ }
+
+ ClassLoader loader = bType.getClassLoader();
+
+ ClassInjector ci = ClassInjector.create(packageName + ".JoinedCursor", loader);
+ Class superclass = isOneToOne ? TransformedCursor.class : MultiTransformedCursor.class;
+ ClassFile cf = new ClassFile(ci.getClassName(), superclass);
+ cf.markSynthetic();
+ cf.setSourceFile(JoinedCursorFactory.class.getName());
+ cf.setTarget("1.5");
+
+ final TypeDesc queryType = TypeDesc.forClass(Query.class);
+ final TypeDesc cursorType = TypeDesc.forClass(Cursor.class);
+ final TypeDesc storageType = TypeDesc.forClass(Storage.class);
+ final TypeDesc storableType = TypeDesc.forClass(Storable.class);
+
+ if (isOneToOne) {
+ cf.addField(Modifiers.PRIVATE.toFinal(true), STORAGE_FIELD_NAME, storageType);
+ } else {
+ // Field to hold query which fetches type B.
+ cf.addField(Modifiers.PRIVATE.toFinal(true), QUERY_FIELD_NAME, queryType);
+ }
+
+ boolean canSetAReference = bToAProperty.getWriteMethod() != null;
+
+ if (canSetAReference && !isOneToOne) {
+ // Field to hold active A storable.
+ cf.addField(Modifiers.PRIVATE, ACTIVE_A_FIELD_NAME, TypeDesc.forClass(aType));
+ }
+
+ // Constructor accepts a Storage and Query, but Storage is only used
+ // for one-to-one, and Query is only used for one-to-many.
+ {
+ MethodInfo mi = cf.addConstructor(Modifiers.PUBLIC,
+ new TypeDesc[] {cursorType, storageType, queryType});
+ CodeBuilder b = new CodeBuilder(mi);
+
+ b.loadThis();
+ b.loadLocal(b.getParameter(0)); // pass A cursor to superclass
+ b.invokeSuperConstructor(new TypeDesc[] {cursorType});
+
+ if (isOneToOne) {
+ b.loadThis();
+ b.loadLocal(b.getParameter(1)); // push B storage to stack
+ b.storeField(STORAGE_FIELD_NAME, storageType);
+ } else {
+ b.loadThis();
+ b.loadLocal(b.getParameter(2)); // push B query to stack
+ b.storeField(QUERY_FIELD_NAME, queryType);
+ }
+
+ b.returnVoid();
+ }
+
+ // For one-to-many, a query is needed. Save the query filter in a
+ // public static field to be grabbed later.
+ if (!isOneToOne) {
+ StringBuilder queryBuilder = new StringBuilder();
+
+ for (int i=0; i<propCount; i++) {
+ if (i > 0) {
+ queryBuilder.append(" & ");
+ }
+ queryBuilder.append(bToAProperty.getInternalJoinElement(i).getName());
+ queryBuilder.append(" = ?");
+ }
+
+ FieldInfo fi = cf.addField(Modifiers.PUBLIC.toStatic(true).toFinal(true),
+ QUERY_FILTER_FIELD_NAME, TypeDesc.STRING);
+ fi.setConstantValue(queryBuilder.toString());
+ }
+
+ // Implement the transform method.
+ if (isOneToOne) {
+ MethodInfo mi = cf.addMethod(Modifiers.PROTECTED, "transform", TypeDesc.OBJECT,
+ new TypeDesc[] {TypeDesc.OBJECT});
+ mi.addException(TypeDesc.forClass(FetchException.class));
+ CodeBuilder b = new CodeBuilder(mi);
+
+ LocalVariable aVar = b.createLocalVariable(null, storableType);
+ b.loadLocal(b.getParameter(0));
+ b.checkCast(TypeDesc.forClass(aType));
+ b.storeLocal(aVar);
+
+ // Prepare B storable.
+ b.loadThis();
+ b.loadField(STORAGE_FIELD_NAME, storageType);
+ b.invokeInterface(storageType, PREPARE_METHOD_NAME, storableType, null);
+ LocalVariable bVar = b.createLocalVariable(null, storableType);
+ b.checkCast(TypeDesc.forClass(bType));
+ b.storeLocal(bVar);
+
+ // Copy pk property values from A to B.
+ for (int i=0; i<propCount; i++) {
+ StorableProperty<B> internal = bToAProperty.getInternalJoinElement(i);
+ StorableProperty<?> external = bToAProperty.getExternalJoinElement(i);
+
+ b.loadLocal(bVar);
+ b.loadLocal(aVar);
+ b.invoke(external.getReadMethod());
+ b.invoke(internal.getWriteMethod());
+ }
+
+ // tryLoad b.
+ b.loadLocal(bVar);
+ b.invokeInterface(storableType, TRY_LOAD_METHOD_NAME, TypeDesc.BOOLEAN, null);
+ Label wasLoaded = b.createLabel();
+ b.ifZeroComparisonBranch(wasLoaded, "!=");
+
+ b.loadNull();
+ b.returnValue(storableType);
+
+ wasLoaded.setLocation();
+
+ if (canSetAReference) {
+ b.loadLocal(bVar);
+ b.loadLocal(aVar);
+ b.invoke(bToAProperty.getWriteMethod());
+ }
+
+ b.loadLocal(bVar);
+ b.returnValue(storableType);
+ } else {
+ MethodInfo mi = cf.addMethod(Modifiers.PROTECTED, "transform", cursorType,
+ new TypeDesc[] {TypeDesc.OBJECT});
+ mi.addException(TypeDesc.forClass(FetchException.class));
+ CodeBuilder b = new CodeBuilder(mi);
+
+ LocalVariable aVar = b.createLocalVariable(null, storableType);
+ b.loadLocal(b.getParameter(0));
+ b.checkCast(TypeDesc.forClass(aType));
+ b.storeLocal(aVar);
+
+ if (canSetAReference) {
+ b.loadThis();
+ b.loadLocal(aVar);
+ b.storeField(ACTIVE_A_FIELD_NAME, TypeDesc.forClass(aType));
+ }
+
+ // Populate query parameters.
+ b.loadThis();
+ b.loadField(QUERY_FIELD_NAME, queryType);
+
+ for (int i=0; i<propCount; i++) {
+ StorableProperty<?> external = bToAProperty.getExternalJoinElement(i);
+ b.loadLocal(aVar);
+ b.invoke(external.getReadMethod());
+
+ TypeDesc bindType = CodeBuilderUtil.bindQueryParam(external.getType());
+ CodeBuilderUtil.convertValue(b, external.getType(), bindType.toClass());
+ b.invokeInterface(queryType, WITH_METHOD_NAME, queryType,
+ new TypeDesc[] {bindType});
+ }
+
+ // Now fetch and return.
+ b.invokeInterface(queryType, FETCH_METHOD_NAME, cursorType, null);
+ b.returnValue(cursorType);
+ }
+
+ if (canSetAReference && !isOneToOne) {
+ // Override the "next" method to set A object on B.
+ MethodInfo mi = cf.addMethod(Modifiers.PUBLIC, "next", TypeDesc.OBJECT, null);
+ mi.addException(TypeDesc.forClass(FetchException.class));
+ CodeBuilder b = new CodeBuilder(mi);
+
+ b.loadThis();
+ b.invokeSuper(TypeDesc.forClass(MultiTransformedCursor.class),
+ "next", TypeDesc.OBJECT, null);
+ b.checkCast(TypeDesc.forClass(bType));
+ b.dup();
+
+ b.loadThis();
+ b.loadField(ACTIVE_A_FIELD_NAME, TypeDesc.forClass(aType));
+ b.invoke(bToAProperty.getWriteMethod());
+
+ b.returnValue(storableType);
+ }
+
+ return (Class<Cursor<B>>) ci.defineClass(cf);
+ }
+
+ private final Joiner<A, B> mJoiner;
+
+ /**
+ * @param repo access to storage instances for properties
+ * @param bType type of <i>B</i> instances
+ * @param bToAProperty property of <i>B</i> type which maps to instances of
+ * <i>A</i> type.
+ * @param aType type of <i>A</i> instances
+ * @throws IllegalArgumentException if property type is not <i>A</i>
+ */
+ public JoinedCursorFactory(Repository repo,
+ Class<B> bType,
+ String bToAProperty,
+ Class<A> aType)
+ throws SupportException, FetchException, RepositoryException
+ {
+ this(repo,
+ ChainedProperty.parse(StorableIntrospector.examine(bType), bToAProperty),
+ aType);
+ }
+
+ /**
+ * @param repo access to storage instances for properties
+ * @param bToAProperty property of <i>B</i> type which maps to instances of
+ * <i>A</i> type.
+ * @param aType type of <i>A</i> instances
+ * @throws IllegalArgumentException if property type is not <i>A</i>
+ */
+ public JoinedCursorFactory(Repository repo,
+ ChainedProperty<B> bToAProperty,
+ Class<A> aType)
+ throws SupportException, FetchException, RepositoryException
+ {
+ if (bToAProperty.getType() != aType) {
+ throw new IllegalArgumentException
+ ("Property is not of type \"" + aType.getName() + "\": " +
+ bToAProperty);
+ }
+
+ StorableProperty<B> primeB = bToAProperty.getPrimeProperty();
+ Storage<B> primeBStorage = repo.storageFor(primeB.getEnclosingType());
+
+ Joiner joiner = newBasicJoiner(primeB, primeBStorage);
+
+ int chainCount = bToAProperty.getChainCount();
+ for (int i=0; i<chainCount; i++) {
+ StorableProperty prop = bToAProperty.getChainedProperty(i);
+ Storage storage = repo.storageFor(prop.getEnclosingType());
+
+ joiner = new MultiJoiner(newBasicJoiner(prop, storage), joiner);
+ }
+
+ mJoiner = (Joiner<A, B>) joiner;
+ }
+
+ /**
+ * Given a cursor over <i>A</i>, returns a new cursor over joined property
+ * of type <i>B</i>.
+ */
+ public Cursor<B> join(Cursor<A> cursor) {
+ return mJoiner.join(cursor);
+ }
+
+ private static interface Joiner<A, B extends Storable> {
+ Cursor<B> join(Cursor<A> cursor);
+ }
+
+ /**
+ * Support for joins without an intermediate hop.
+ */
+ private static class BasicJoiner<A, B extends Storable> implements Joiner<A, B> {
+ private final Factory<A, B> mJoinerFactory;
+ private final Storage<B> mBStorage;
+ private final Query<B> mBQuery;
+
+ BasicJoiner(Factory<A, B> factory, Storage<B> bStorage, Query<B> bQuery) {
+ mJoinerFactory = factory;
+ mBStorage = bStorage;
+ mBQuery = bQuery;
+ }
+
+ public Cursor<B> join(Cursor<A> cursor) {
+ return mJoinerFactory.newJoinedCursor(cursor, mBStorage, mBQuery);
+ }
+
+ /**
+ * Needs to be public for {@link QuickConstructorGenerator}.
+ */
+ public static interface Factory<A, B extends Storable> {
+ Cursor<B> newJoinedCursor(Cursor<A> cursor, Storage<B> bStorage, Query<B> bQuery);
+ }
+ }
+
+ /**
+ * Support for joins with an intermediate hop -- multi-way joins.
+ */
+ private static class MultiJoiner<A, X extends Storable, B extends Storable>
+ implements Joiner<A, B>
+ {
+ private final Joiner<A, X> mAToMid;
+ private final Joiner<X, B> mMidToB;
+
+ MultiJoiner(Joiner<A, X> aToMidJoiner, Joiner<X, B> midToBJoiner) {
+ mAToMid = aToMidJoiner;
+ mMidToB = midToBJoiner;
+ }
+
+ public Cursor<B> join(Cursor<A> cursor) {
+ return mMidToB.join(mAToMid.join(cursor));
+ }
+ }
+}
diff --git a/src/main/java/com/amazon/carbonado/cursor/MergeSortBuffer.java b/src/main/java/com/amazon/carbonado/cursor/MergeSortBuffer.java
new file mode 100644
index 0000000..0094183
--- /dev/null
+++ b/src/main/java/com/amazon/carbonado/cursor/MergeSortBuffer.java
@@ -0,0 +1,498 @@
+/*
+ * Copyright 2006 Amazon Technologies, Inc. or its affiliates.
+ * Amazon, Amazon.com and Carbonado are trademarks or registered trademarks
+ * of Amazon Technologies, Inc. or its affiliates. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.amazon.carbonado.cursor;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+
+import java.lang.reflect.UndeclaredThrowableException;
+
+import java.util.AbstractCollection;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.PriorityQueue;
+
+import com.amazon.carbonado.FetchInterruptedException;
+import com.amazon.carbonado.Storable;
+import com.amazon.carbonado.Storage;
+import com.amazon.carbonado.SupportException;
+
+import com.amazon.carbonado.spi.RAFInputStream;
+import com.amazon.carbonado.spi.RAFOutputStream;
+import com.amazon.carbonado.spi.StorableSerializer;
+
+/**
+ * Sort buffer implemented via a merge sort algorithm. If there are too many
+ * storables to fit in the reserved memory buffer, they are sorted and
+ * serialized to temporary files.
+ *
+ * @author Brian S O'Neill
+ * @see SortedCursor
+ */
+public class MergeSortBuffer<S extends Storable> extends AbstractCollection<S>
+ implements SortBuffer<S>
+{
+ private static final int MIN_ARRAY_CAPACITY = 64;
+
+ // Bigger means better performance, but more memory is used.
+ private static final int MAX_ARRAY_CAPACITY = 8192;
+
+ // Bigger means better performance, but more file handles may be used.
+ private static final int MAX_FILE_COUNT = 100;
+
+ // Bigger may improve write performance, but not by much.
+ private static final int OUTPUT_BUFFER_SIZE = 10000;
+
+ private final Storage<S> mStorage;
+ private final String mTempDir;
+ private final int mMaxArrayCapacity;
+
+ private S[] mElements;
+ private int mSize;
+ private int mTotalSize;
+
+ private StorableSerializer<S> mSerializer;
+ private WorkFilePool mWorkFilePool;
+ private List<RandomAccessFile> mFilesInUse;
+
+ private Comparator<S> mComparator;
+
+ private volatile boolean mStop;
+
+ /**
+ * @param storage storage type of elements
+ */
+ public MergeSortBuffer(Storage<S> storage) {
+ this(storage, null, MAX_ARRAY_CAPACITY);
+ }
+
+ /**
+ * @param storage storage type of elements
+ * @param tempDir directory to store temp files for merging, or null for default
+ */
+ public MergeSortBuffer(Storage<S> storage, String tempDir) {
+ this(storage, tempDir, MAX_ARRAY_CAPACITY);
+ }
+
+ /**
+ * @param storage storage type of elements
+ * @param tempDir directory to store temp files for merging, or null for default
+ * @param maxArrayCapacity maximum amount of storables to keep in an array
+ * before serializing to a file
+ */
+ @SuppressWarnings("unchecked")
+ public MergeSortBuffer(Storage<S> storage, String tempDir, int maxArrayCapacity) {
+ mStorage = storage;
+ mTempDir = tempDir;
+ mMaxArrayCapacity = maxArrayCapacity;
+ int cap = Math.min(MIN_ARRAY_CAPACITY, maxArrayCapacity);
+ mElements = (S[]) new Storable[cap];
+ }
+
+ public void prepare(Comparator<S> comparator) {
+ if (comparator == null) {
+ throw new IllegalArgumentException();
+ }
+ clear();
+ mComparator = comparator;
+ }
+
+ public boolean add(S storable) {
+ arrayPrep:
+ if (mSize >= mElements.length) {
+ if (mElements.length < mMaxArrayCapacity) {
+ // Increase array capacity.
+ int newCap = mElements.length * 2;
+ if (newCap > mMaxArrayCapacity) {
+ newCap = mMaxArrayCapacity;
+ }
+ S[] newElements = (S[]) new Storable[newCap];
+ System.arraycopy(mElements, 0, newElements, 0, mElements.length);
+ mElements = newElements;
+ break arrayPrep;
+ }
+
+ // Sort current in-memory results and serialize to a temp file.
+
+ // Make sure everything is set up to use temp files.
+ {
+ if (mSerializer == null) {
+ try {
+ mSerializer = StorableSerializer.forType(mStorage.getStorableType());
+ } catch (SupportException e) {
+ throw new UndeclaredThrowableException(e);
+ }
+ mWorkFilePool = WorkFilePool.getInstance(mTempDir);
+ mFilesInUse = new ArrayList<RandomAccessFile>();
+ }
+ }
+
+ Arrays.sort(mElements, mComparator);
+
+ RandomAccessFile raf;
+ try {
+ raf = mWorkFilePool.acquireWorkFile(this);
+ OutputStream out =
+ new BufferedOutputStream(new RAFOutputStream(raf), OUTPUT_BUFFER_SIZE);
+
+ StorableSerializer<S> serializer = mSerializer;
+
+ if (mFilesInUse.size() < (MAX_FILE_COUNT - 1)) {
+ mFilesInUse.add(raf);
+ int count = 0;
+ for (S element : mElements) {
+ // Check every so often if interrupted.
+ interruptCheck(++count);
+ serializer.write(element, out);
+ }
+ } else {
+ // Merge files together.
+
+ // Determine the average length per file in use.
+ long totalLength = 0;
+ int fileCount = mFilesInUse.size();
+ for (int i=0; i<fileCount; i++) {
+ totalLength += mFilesInUse.get(i).length();
+ }
+
+ // Compute average with ceiling rounding mode.
+ long averageLength = (totalLength + fileCount) / fileCount;
+
+ // For any file whose length is above average, don't merge
+ // it. The goal is to evenly distribute file growth.
+
+ List<RandomAccessFile> filesToExclude = new ArrayList<RandomAccessFile>();
+ List<RandomAccessFile> filesToMerge = new ArrayList<RandomAccessFile>();
+
+ long mergedLength = 0;
+ for (int i=0; i<fileCount; i++) {
+ RandomAccessFile fileInUse = mFilesInUse.get(i);
+ long fileLength = fileInUse.length();
+ if (fileLength > averageLength) {
+ filesToExclude.add(fileInUse);
+ } else {
+ filesToMerge.add(fileInUse);
+ mergedLength += fileLength;
+ }
+ }
+
+ mFilesInUse.add(raf);
+
+ // Pre-allocate space, in an attempt to improve performance
+ // as well as error out earlier, should the disk be full.
+ raf.setLength(mergedLength);
+
+ int count = 0;
+ Iterator<S> it = iterator(filesToMerge);
+ while (it.hasNext()) {
+ // Check every so often if interrupted.
+ interruptCheck(++count);
+ S element = it.next();
+ serializer.write(element, out);
+ }
+
+ mWorkFilePool.releaseWorkFiles(filesToMerge);
+ mFilesInUse = filesToExclude;
+ mFilesInUse.add(raf);
+ }
+
+ out.flush();
+
+ // Truncate any data from last time file was used.
+ raf.setLength(raf.getFilePointer());
+ // Reset to start of file in preparation for reading later.
+ raf.seek(0);
+ } catch (IOException e) {
+ throw new UndeclaredThrowableException(e);
+ }
+
+ mSize = 0;
+ }
+
+ mElements[mSize++] = storable;
+ mTotalSize++;
+ return true;
+ }
+
+ public int size() {
+ return mTotalSize;
+ }
+
+ public Iterator<S> iterator() {
+ return iterator(mFilesInUse);
+ }
+
+ private Iterator<S> iterator(List<RandomAccessFile> filesToMerge) {
+ if (mSerializer == null) {
+ return new ObjectArrayIterator<S>(mElements, 0, mSize);
+ }
+
+ // Merge with the files. Use a priority queue to decide which is the
+ // next buffer to pull an element from.
+
+ PriorityQueue<Iter<S>> pq = new PriorityQueue<Iter<S>>(1 + mFilesInUse.size());
+ pq.add(new ArrayIter<S>(mComparator, mElements, mSize));
+ for (RandomAccessFile raf : filesToMerge) {
+ try {
+ raf.seek(0);
+ } catch (IOException e) {
+ throw new UndeclaredThrowableException(e);
+ }
+
+ InputStream in = new BufferedInputStream(new RAFInputStream(raf));
+
+ pq.add(new InputIter<S>(mComparator, mSerializer, mStorage, in));
+ }
+
+ return new Merger<S>(pq);
+ }
+
+ public void clear() {
+ if (mTotalSize > 0) {
+ mSize = 0;
+ mTotalSize = 0;
+ if (mWorkFilePool != null && mFilesInUse != null) {
+ mWorkFilePool.releaseWorkFiles(mFilesInUse);
+ mFilesInUse.clear();
+ }
+ }
+ }
+
+ public void sort() {
+ // Sort current in-memory results. Anything residing in files has
+ // already been sorted.
+ if (mComparator == null) {
+ throw new IllegalStateException("Buffer was not prepared");
+ }
+ Arrays.sort(mElements, 0, mSize, mComparator);
+ }
+
+ public void close() {
+ clear();
+ if (mWorkFilePool != null) {
+ mWorkFilePool.unregisterWorkFileUser(this);
+ }
+ }
+
+ void stop() {
+ mStop = true;
+ }
+
+ private void interruptCheck(int count) {
+ if ((count & ~0xff) == 0 && (mStop || Thread.interrupted())) {
+ close();
+ throw new UndeclaredThrowableException(new FetchInterruptedException());
+ }
+ }
+
+ /**
+ * Simple interator interface that supports peeking at next element.
+ */
+ private abstract static class Iter<S extends Storable> implements Comparable<Iter<S>> {
+ private final Comparator<S> mComparator;
+
+ protected Iter(Comparator<S> comparator) {
+ mComparator = comparator;
+ }
+
+ /**
+ * Returns null if iterator is exhausted.
+ */
+ abstract S peek();
+
+ /**
+ * Returns null if iterator is exhausted.
+ */
+ abstract S next();
+
+ public int compareTo(Iter<S> iter) {
+ S thisPeek = peek();
+ S thatPeek = iter.peek();
+ if (thisPeek == null) {
+ if (thatPeek == null) {
+ return 0;
+ }
+ // Null is low in order to rise to top of priority queue. This
+ // Iter will then be tossed out of the priority queue.
+ return -1;
+ } else if (thatPeek == null) {
+ return 1;
+ }
+ return mComparator.compare(thisPeek, thatPeek);
+ }
+ }
+
+ /**
+ * Iterator that reads from an array.
+ */
+ private static class ArrayIter<S extends Storable> extends Iter<S> {
+ private final S[] mArray;
+ private final int mSize;
+ private int mPos;
+
+ ArrayIter(Comparator<S> comparator, S[] array, int size) {
+ super(comparator);
+ mArray = array;
+ mSize = size;
+ }
+
+ S peek() {
+ int pos = mPos;
+ if (pos >= mSize) {
+ return null;
+ }
+ return mArray[pos];
+ }
+
+ S next() {
+ int pos = mPos;
+ if (pos >= mSize) {
+ return null;
+ }
+ S next = mArray[pos];
+ mPos = pos + 1;
+ return next;
+ }
+ }
+
+ /**
+ * Iterator that reads from an input stream of serialized Storables.
+ */
+ private static class InputIter<S extends Storable> extends Iter<S> {
+ private StorableSerializer<S> mSerializer;
+ private Storage<S> mStorage;
+ private InputStream mIn;
+
+ private S mNext;
+
+ InputIter(Comparator<S> comparator,
+ StorableSerializer<S> serializer, Storage<S> storage, InputStream in)
+ {
+ super(comparator);
+ mSerializer = serializer;
+ mStorage = storage;
+ mIn = in;
+ }
+
+ S peek() {
+ if (mNext != null) {
+ return mNext;
+ }
+ if (mIn != null) {
+ try {
+ mNext = mSerializer.read(mStorage, mIn);
+ // TODO: Serializer is unable to determine state of
+ // properties, and so they are lost. Since these storables
+ // came directly from a cursor, we know they are clean.
+ mNext.markAllPropertiesClean();
+ } catch (EOFException e) {
+ mIn = null;
+ } catch (IOException e) {
+ throw new UndeclaredThrowableException(e);
+ }
+ }
+ return mNext;
+ }
+
+ S next() {
+ S next = peek();
+ mNext = null;
+ return next;
+ }
+ }
+
+ private static class Merger<S extends Storable> implements Iterator<S> {
+ private final PriorityQueue<Iter<S>> mPQ;
+
+ private S mNext;
+
+ Merger(PriorityQueue<Iter<S>> pq) {
+ mPQ = pq;
+ }
+
+ public boolean hasNext() {
+ if (mNext == null) {
+ while (true) {
+ Iter<S> iter = mPQ.poll();
+ if (iter == null) {
+ return false;
+ }
+ if ((mNext = iter.next()) != null) {
+ // Iter is not exhausted, so put it back in to be used
+ // again. Adding it back causes it to be inserted in
+ // the proper order, based on the next element it has
+ // to offer.
+ mPQ.add(iter);
+ return true;
+ }
+ }
+ }
+ return true;
+ }
+
+ public S next() {
+ if (hasNext()) {
+ S next = mNext;
+ mNext = null;
+ return next;
+ }
+ throw new NoSuchElementException();
+ }
+
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+ }
+
+ private static class ObjectArrayIterator<E> implements Iterator<E> {
+ private final E[] mElements;
+ private final int mEnd;
+ private int mIndex;
+
+ public ObjectArrayIterator(E[] elements, int start, int end) {
+ mElements = elements;
+ mEnd = end;
+ mIndex = start;
+ }
+
+ public boolean hasNext() {
+ return mIndex < mEnd;
+ }
+
+ public E next() {
+ if (mIndex >= mEnd) {
+ throw new NoSuchElementException();
+ }
+ return mElements[mIndex++];
+ }
+
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+ }
+}
diff --git a/src/main/java/com/amazon/carbonado/cursor/MultiTransformedCursor.java b/src/main/java/com/amazon/carbonado/cursor/MultiTransformedCursor.java
new file mode 100644
index 0000000..0cfd197
--- /dev/null
+++ b/src/main/java/com/amazon/carbonado/cursor/MultiTransformedCursor.java
@@ -0,0 +1,132 @@
+/*
+ * Copyright 2006 Amazon Technologies, Inc. or its affiliates.
+ * Amazon, Amazon.com and Carbonado are trademarks or registered trademarks
+ * of Amazon Technologies, Inc. or its affiliates. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.amazon.carbonado.cursor;
+
+import java.util.NoSuchElementException;
+
+import com.amazon.carbonado.Cursor;
+import com.amazon.carbonado.FetchException;
+import com.amazon.carbonado.FetchInterruptedException;
+
+/**
+ * Abstract cursor which wraps another cursor and transforms each storable
+ * result into a set of target storables. This class can be used for
+ * implementing one-to-many joins. Use {@link TransformedCursor} for one-to-one
+ * joins.
+ *
+ * @author Brian S O'Neill
+ */
+public abstract class MultiTransformedCursor<S, T> extends AbstractCursor<T> {
+ private final Cursor<S> mCursor;
+
+ private Cursor<T> mNextCursor;
+
+ protected MultiTransformedCursor(Cursor<S> cursor) {
+ if (cursor == null) {
+ throw new IllegalArgumentException();
+ }
+ mCursor = cursor;
+ }
+
+ /**
+ * This method must be implemented to transform storables. If the storable
+ * cannot be transformed, either throw a FetchException or return null. If
+ * null is returned, the storable is simply filtered out.
+ *
+ * @return transformed storables, or null to filter it out
+ */
+ protected abstract Cursor<T> transform(S storable) throws FetchException;
+
+ public void close() throws FetchException {
+ synchronized (mCursor) {
+ mCursor.close();
+ if (mNextCursor != null) {
+ mNextCursor.close();
+ mNextCursor = null;
+ }
+ }
+ }
+
+ public boolean hasNext() throws FetchException {
+ synchronized (mCursor) {
+ if (mNextCursor != null) {
+ if (mNextCursor.hasNext()) {
+ return true;
+ }
+ mNextCursor.close();
+ mNextCursor = null;
+ }
+ try {
+ int count = 0;
+ while (mCursor.hasNext()) {
+ Cursor<T> nextCursor = transform(mCursor.next());
+ if (nextCursor != null) {
+ if (nextCursor.hasNext()) {
+ mNextCursor = nextCursor;
+ return true;
+ }
+ nextCursor.close();
+ }
+ interruptCheck(++count);
+ }
+ } catch (NoSuchElementException e) {
+ }
+ return false;
+ }
+ }
+
+ public T next() throws FetchException {
+ synchronized (mCursor) {
+ if (hasNext()) {
+ return mNextCursor.next();
+ }
+ throw new NoSuchElementException();
+ }
+ }
+
+ public int skipNext(int amount) throws FetchException {
+ synchronized (mCursor) {
+ if (amount <= 0) {
+ if (amount < 0) {
+ throw new IllegalArgumentException("Cannot skip negative amount: " + amount);
+ }
+ return 0;
+ }
+
+ int count = 0;
+ while (hasNext()) {
+ int chunk = mNextCursor.skipNext(amount);
+ count += chunk;
+ if ((amount -= chunk) <= 0) {
+ break;
+ }
+ interruptCheck(count);
+ }
+
+ return count;
+ }
+ }
+
+ private void interruptCheck(int count) throws FetchException {
+ if ((count & ~0xff) == 0 && Thread.interrupted()) {
+ close();
+ throw new FetchInterruptedException();
+ }
+ }
+}
diff --git a/src/main/java/com/amazon/carbonado/cursor/SortBuffer.java b/src/main/java/com/amazon/carbonado/cursor/SortBuffer.java
new file mode 100644
index 0000000..089488c
--- /dev/null
+++ b/src/main/java/com/amazon/carbonado/cursor/SortBuffer.java
@@ -0,0 +1,53 @@
+/*
+ * Copyright 2006 Amazon Technologies, Inc. or its affiliates.
+ * Amazon, Amazon.com and Carbonado are trademarks or registered trademarks
+ * of Amazon Technologies, Inc. or its affiliates. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.amazon.carbonado.cursor;
+
+import java.util.Comparator;
+import java.util.Collection;
+
+import com.amazon.carbonado.FetchException;
+
+/**
+ * Buffers up Storable instances allowing them to be sorted. Should any method
+ * need to throw an undeclared exception, wrap it with an
+ * UndeclaredThrowableException.
+ *
+ * @author Brian S O'Neill
+ * @see SortedCursor
+ */
+public interface SortBuffer<S> extends Collection<S> {
+ /**
+ * Clears buffer and assigns a comparator for sorting.
+ *
+ * @throws IllegalArgumentException if comparator is null
+ */
+ void prepare(Comparator<S> comparator);
+
+ /**
+ * Finish sorting buffer.
+ *
+ * @throws IllegalStateException if prepare was never called
+ */
+ void sort() throws FetchException;
+
+ /**
+ * Clear and close buffer.
+ */
+ void close() throws FetchException;
+}
diff --git a/src/main/java/com/amazon/carbonado/cursor/SortedCursor.java b/src/main/java/com/amazon/carbonado/cursor/SortedCursor.java
new file mode 100644
index 0000000..7a728bb
--- /dev/null
+++ b/src/main/java/com/amazon/carbonado/cursor/SortedCursor.java
@@ -0,0 +1,332 @@
+/*
+ * Copyright 2006 Amazon Technologies, Inc. or its affiliates.
+ * Amazon, Amazon.com and Carbonado are trademarks or registered trademarks
+ * of Amazon Technologies, Inc. or its affiliates. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.amazon.carbonado.cursor;
+
+import java.lang.reflect.UndeclaredThrowableException;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+import org.cojen.util.BeanComparator;
+
+import com.amazon.carbonado.FetchException;
+import com.amazon.carbonado.FetchInterruptedException;
+import com.amazon.carbonado.Cursor;
+import com.amazon.carbonado.Storable;
+
+import com.amazon.carbonado.info.Direction;
+import com.amazon.carbonado.info.OrderedProperty;
+
+/**
+ * Wraps another Cursor and ensures the results are sorted. If the elements in
+ * the source cursor are already partially sorted, a handled comparator can be
+ * passed in which specifies the partial ordering. Elements are then processed
+ * in smaller chunks rather than sorting the entire set. The handled comparator
+ * can represent ascending or descending order of source elements.
+ *
+ * @author Brian S O'Neill
+ */
+public class SortedCursor<S> extends AbstractCursor<S> {
+ /**
+ * Convenience method to create a comparator which orders storables by the
+ * given order-by properties. The property names may be prefixed with '+'
+ * or '-' to indicate ascending or descending order. If the prefix is
+ * omitted, ascending order is assumed.
+ *
+ * @param type type of storable to create comparator for
+ * @param orderProperties list of properties to order by
+ * @throws IllegalArgumentException if any property is null or not a member
+ * of storable type
+ */
+ public static <S> Comparator<S> createComparator(Class<S> type, String... orderProperties) {
+ BeanComparator bc = BeanComparator.forClass(type);
+ for (String property : orderProperties) {
+ bc = bc.orderBy(property);
+ bc = bc.caseSensitive();
+ }
+ return bc;
+ }
+
+ /**
+ * Convenience method to create a comparator which orders storables by the
+ * given properties.
+ *
+ * @param properties list of properties to order by
+ * @throws IllegalArgumentException if no properties or if any property is null
+ */
+ public static <S extends Storable> Comparator<S>
+ createComparator(OrderedProperty<S>... properties)
+ {
+ if (properties == null || properties.length == 0 || properties[0] == null) {
+ throw new IllegalArgumentException();
+ }
+
+ Class<S> type = properties[0].getChainedProperty().getPrimeProperty().getEnclosingType();
+
+ BeanComparator bc = BeanComparator.forClass(type);
+
+ for (OrderedProperty<S> property : properties) {
+ if (property == null) {
+ throw new IllegalArgumentException();
+ }
+ bc = bc.orderBy(property.getChainedProperty().toString());
+ bc = bc.caseSensitive();
+ if (property.getDirection() == Direction.DESCENDING) {
+ bc = bc.reverse();
+ }
+ }
+
+ return bc;
+ }
+
+ /**
+ * Convenience method to create a comparator which orders storables by the
+ * given properties.
+ *
+ * @param properties list of properties to order by
+ * @throws IllegalArgumentException if no properties or if any property is null
+ */
+ public static <S extends Storable> Comparator<S>
+ createComparator(List<OrderedProperty<S>> properties)
+ {
+ if (properties == null || properties.size() == 0 || properties.get(0) == null) {
+ throw new IllegalArgumentException();
+ }
+
+ Class<S> type =
+ properties.get(0).getChainedProperty().getPrimeProperty().getEnclosingType();
+
+ BeanComparator bc = BeanComparator.forClass(type);
+
+ for (OrderedProperty<S> property : properties) {
+ if (property == null) {
+ throw new IllegalArgumentException();
+ }
+ bc = bc.orderBy(property.getChainedProperty().toString());
+ bc = bc.caseSensitive();
+ if (property.getDirection() == Direction.DESCENDING) {
+ bc = bc.reverse();
+ }
+ }
+
+ return bc;
+ }
+
+ /** Wrapped cursor */
+ private final Cursor<S> mCursor;
+ /** Buffer to store and sort results */
+ private final SortBuffer<S> mChunkBuffer;
+ /** Optional comparator which matches ordering already handled by wrapped cursor */
+ private final Comparator<S> mChunkMatcher;
+ /** Comparator to use for sorting chunks */
+ private final Comparator<S> mChunkSorter;
+
+ /** Iteration over current contents in mChunkBuffer */
+ private Iterator<S> mChunkIterator;
+
+ /**
+ * First record in a chunk, according to chunk matcher. In order to tell if
+ * the next chunk has been reached, a record has to be read from the
+ * wrapped cursor. The record is "pushed back" here for use when the next
+ * chunk is ready to be processed.
+ */
+ private S mNextChunkStart;
+
+ /**
+ * @param cursor cursor to wrap
+ * @param buffer required buffer to hold results
+ * @param handled optional comparator which represents how the results are
+ * already sorted
+ * @param finisher required comparator which finishes the sort
+ */
+ public SortedCursor(Cursor<S> cursor, SortBuffer<S> buffer,
+ Comparator<S> handled, Comparator<S> finisher) {
+ if (cursor == null || finisher == null) {
+ throw new IllegalArgumentException();
+ }
+ mCursor = cursor;
+ mChunkBuffer = buffer;
+ mChunkMatcher = handled;
+ mChunkSorter = finisher;
+ }
+
+ /**
+ * @param cursor cursor to wrap
+ * @param buffer required buffer to hold results
+ * @param type type of storable to create cursor for
+ * @param orderProperties list of properties to order by
+ * @throws IllegalArgumentException if any property is null or not a member
+ * of storable type
+ */
+ public SortedCursor(Cursor<S> cursor, SortBuffer<S> buffer,
+ Class<S> type, String... orderProperties) {
+ this(cursor, buffer, null, createComparator(type, orderProperties));
+ }
+
+ /**
+ * Returns a comparator representing the effective sort order of this cursor.
+ */
+ public Comparator<S> comparator() {
+ if (mChunkMatcher == null) {
+ return mChunkSorter;
+ }
+ return new Comparator<S>() {
+ public int compare(S a, S b) {
+ int result = mChunkMatcher.compare(a, b);
+ if (result == 0) {
+ result = mChunkSorter.compare(a, b);
+ }
+ return result;
+ }
+ };
+ }
+
+ public synchronized void close() throws FetchException {
+ mCursor.close();
+ mChunkIterator = null;
+ mChunkBuffer.close();
+ }
+
+ public synchronized boolean hasNext() throws FetchException {
+ prepareNextChunk();
+ try {
+ if (mChunkIterator.hasNext()) {
+ return true;
+ }
+ } catch (UndeclaredThrowableException e) {
+ throw toFetchException(e);
+ }
+ close();
+ return false;
+ }
+
+ public synchronized S next() throws FetchException {
+ prepareNextChunk();
+ try {
+ return mChunkIterator.next();
+ } catch (UndeclaredThrowableException e) {
+ throw toFetchException(e);
+ } catch (NoSuchElementException e) {
+ try {
+ close();
+ } catch (FetchException e2) {
+ // Don't care.
+ }
+ throw e;
+ }
+ }
+
+ public synchronized int skipNext(int amount) throws FetchException {
+ if (amount <= 0) {
+ if (amount < 0) {
+ throw new IllegalArgumentException("Cannot skip negative amount: " + amount);
+ }
+ return 0;
+ }
+
+ int count = 0;
+ while (--amount >= 0 && hasNext()) {
+ next();
+ count++;
+ }
+
+ return count;
+ }
+
+ private void prepareNextChunk() throws FetchException {
+ if (mChunkIterator != null && (mChunkMatcher == null || mChunkIterator.hasNext())) {
+ // Ready to go.
+ return;
+ }
+
+ Cursor<S> cursor = mCursor;
+ SortBuffer<S> buffer = mChunkBuffer;
+ Comparator<S> matcher = mChunkMatcher;
+
+ try {
+ mChunkIterator = null;
+ buffer.prepare(mChunkSorter);
+
+ fill: {
+ if (matcher == null) {
+ // Buffer up entire results and sort.
+ int count = 0;
+ while (cursor.hasNext()) {
+ // Check every so often if interrupted.
+ if ((++count & ~0xff) == 0 && Thread.interrupted()) {
+ throw new FetchInterruptedException();
+ }
+ buffer.add(cursor.next());
+ }
+ break fill;
+ }
+
+ // Read a chunk into the buffer. First record is compared against
+ // subsequent records, to determine when the chunk end is reached.
+
+ S chunkStart;
+ if (mNextChunkStart != null) {
+ chunkStart = mNextChunkStart;
+ mNextChunkStart = null;
+ } else if (cursor.hasNext()) {
+ chunkStart = cursor.next();
+ } else {
+ break fill;
+ }
+
+ buffer.add(chunkStart);
+ int count = 1;
+
+ while (cursor.hasNext()) {
+ // Check every so often if interrupted.
+ if ((++count & ~0xff) == 0 && Thread.interrupted()) {
+ throw new FetchInterruptedException();
+ }
+ S next = cursor.next();
+ if (matcher.compare(chunkStart, next) != 0) {
+ // Save for reading next chunk later.
+ mNextChunkStart = next;
+ break;
+ }
+ buffer.add(next);
+ }
+ }
+
+ if (buffer.size() > 1) {
+ buffer.sort();
+ }
+
+ mChunkIterator = buffer.iterator();
+ } catch (UndeclaredThrowableException e) {
+ throw toFetchException(e);
+ }
+ }
+
+ private FetchException toFetchException(UndeclaredThrowableException e) {
+ Throwable cause = e.getCause();
+ if (cause == null) {
+ cause = e;
+ }
+ if (cause instanceof FetchException) {
+ return (FetchException) cause;
+ }
+ return new FetchException(null, cause);
+ }
+}
diff --git a/src/main/java/com/amazon/carbonado/cursor/SymmetricDifferenceCursor.java b/src/main/java/com/amazon/carbonado/cursor/SymmetricDifferenceCursor.java
new file mode 100644
index 0000000..6851464
--- /dev/null
+++ b/src/main/java/com/amazon/carbonado/cursor/SymmetricDifferenceCursor.java
@@ -0,0 +1,123 @@
+/*
+ * Copyright 2006 Amazon Technologies, Inc. or its affiliates.
+ * Amazon, Amazon.com and Carbonado are trademarks or registered trademarks
+ * of Amazon Technologies, Inc. or its affiliates. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.amazon.carbonado.cursor;
+
+import java.util.Comparator;
+import java.util.NoSuchElementException;
+
+import com.amazon.carbonado.FetchException;
+import com.amazon.carbonado.Cursor;
+
+/**
+ * Wraps two Cursors and performs a <i>symmetric set difference</i>
+ * operation. In boolean logic, this is an <i>exclusive or</i> operation.
+ *
+ * <p>Both cursors must return results in the same order. Ordering is preserved
+ * by the difference.
+ *
+ * @author Brian S O'Neill
+ * @see UnionCursor
+ * @see IntersectionCursor
+ * @see DifferenceCursor
+ */
+public class SymmetricDifferenceCursor<S> extends AbstractCursor<S> {
+ private final Cursor<S> mLeftCursor;
+ private final Cursor<S> mRightCursor;
+ private final Comparator<S> mOrder;
+
+ private S mNextLeft;
+ private S mNextRight;
+ private int mCompareResult;
+
+ /**
+ * @param left cursor to wrap
+ * @param right cursor to wrap
+ * @param order describes sort ordering of wrapped cursors, which must be
+ * a total ordering
+ */
+ public SymmetricDifferenceCursor(Cursor<S> left, Cursor<S> right, Comparator<S> order) {
+ if (left == null || right == null || order == null) {
+ throw new IllegalArgumentException();
+ }
+ mLeftCursor = left;
+ mRightCursor = right;
+ mOrder = order;
+ }
+
+ public synchronized void close() throws FetchException {
+ mLeftCursor.close();
+ mRightCursor.close();
+ mNextLeft = null;
+ mNextRight = null;
+ }
+
+ public boolean hasNext() throws FetchException {
+ return compareNext() != 0;
+ }
+
+ /**
+ * Returns 0 if no next element available, &lt;0 if next element is from
+ * left source cursor, and &gt;0 if next element is from right source
+ * cursor.
+ */
+ public synchronized int compareNext() throws FetchException {
+ if (mCompareResult != 0) {
+ return mCompareResult;
+ }
+
+ while (true) {
+ if (mNextLeft == null && mLeftCursor.hasNext()) {
+ mNextLeft = mLeftCursor.next();
+ }
+ if (mNextRight == null && mRightCursor.hasNext()) {
+ mNextRight = mRightCursor.next();
+ }
+
+ if (mNextLeft == null) {
+ return mNextRight != null ? 1 : 0;
+ }
+ if (mNextRight == null) {
+ return -1;
+ }
+
+ if ((mCompareResult = mOrder.compare(mNextLeft, mNextRight)) == 0) {
+ mNextLeft = null;
+ mNextRight = null;
+ } else {
+ return mCompareResult;
+ }
+ }
+ }
+
+ public synchronized S next() throws FetchException {
+ S next;
+ int result = compareNext();
+ if (result < 0) {
+ next = mNextLeft;
+ mNextLeft = null;
+ } else if (result > 0) {
+ next = mNextRight;
+ mNextRight = null;
+ } else {
+ throw new NoSuchElementException();
+ }
+ mCompareResult = 0;
+ return next;
+ }
+}
diff --git a/src/main/java/com/amazon/carbonado/cursor/ThrottledCursor.java b/src/main/java/com/amazon/carbonado/cursor/ThrottledCursor.java
new file mode 100644
index 0000000..2f4c5a0
--- /dev/null
+++ b/src/main/java/com/amazon/carbonado/cursor/ThrottledCursor.java
@@ -0,0 +1,101 @@
+/*
+ * Copyright 2006 Amazon Technologies, Inc. or its affiliates.
+ * Amazon, Amazon.com and Carbonado are trademarks or registered trademarks
+ * of Amazon Technologies, Inc. or its affiliates. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.amazon.carbonado.cursor;
+
+import com.amazon.carbonado.Cursor;
+import com.amazon.carbonado.FetchException;
+import com.amazon.carbonado.FetchInterruptedException;
+
+import com.amazon.carbonado.util.Throttle;
+
+/**
+ * Wraps another cursor and fetches results at a reduced speed.
+ *
+ * @author Brian S O'Neill
+ */
+public class ThrottledCursor<S> extends AbstractCursor<S> {
+ private static final int WINDOW_SIZE = 10;
+ private static final int SLEEP_PRECISION_MILLIS = 50;
+
+ private final Cursor<S> mCursor;
+ private final Throttle mThrottle;
+ private final double mDesiredSpeed;
+
+ /**
+ * @param cursor cursor to wrap
+ * @param throttle 1.0 = fetch at full speed, 0.5 = fetch at half speed,
+ * 0.1 = fetch at one tenth speed, etc.
+ */
+ public ThrottledCursor(Cursor<S> cursor, double throttle) {
+ mCursor = cursor;
+ if (throttle < 1.0) {
+ if (throttle < 0.0) {
+ throttle = 0.0;
+ }
+ mThrottle = new Throttle(WINDOW_SIZE);
+ mDesiredSpeed = throttle;
+ } else {
+ mThrottle = null;
+ mDesiredSpeed = 1.0;
+ }
+ }
+
+ public void close() throws FetchException {
+ mCursor.close();
+ }
+
+ public boolean hasNext() throws FetchException {
+ return mCursor.hasNext();
+ }
+
+ public S next() throws FetchException {
+ throttle();
+ return mCursor.next();
+ }
+
+ public int skipNext(int amount) throws FetchException {
+ if (amount <= 0) {
+ if (amount < 0) {
+ throw new IllegalArgumentException("Cannot skip negative amount: " + amount);
+ }
+ return 0;
+ }
+
+ int count = 0;
+ while (--amount >= 0) {
+ throttle();
+ if (skipNext(1) <= 0) {
+ break;
+ }
+ count++;
+ }
+
+ return count;
+ }
+
+ private void throttle() throws FetchInterruptedException {
+ if (mThrottle != null) {
+ try {
+ mThrottle.throttle(mDesiredSpeed, SLEEP_PRECISION_MILLIS);
+ } catch (InterruptedException e) {
+ throw new FetchInterruptedException(e);
+ }
+ }
+ }
+}
diff --git a/src/main/java/com/amazon/carbonado/cursor/TransformedCursor.java b/src/main/java/com/amazon/carbonado/cursor/TransformedCursor.java
new file mode 100644
index 0000000..0860cf0
--- /dev/null
+++ b/src/main/java/com/amazon/carbonado/cursor/TransformedCursor.java
@@ -0,0 +1,119 @@
+/*
+ * Copyright 2006 Amazon Technologies, Inc. or its affiliates.
+ * Amazon, Amazon.com and Carbonado are trademarks or registered trademarks
+ * of Amazon Technologies, Inc. or its affiliates. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.amazon.carbonado.cursor;
+
+import java.util.NoSuchElementException;
+
+import com.amazon.carbonado.Cursor;
+import com.amazon.carbonado.FetchException;
+import com.amazon.carbonado.FetchInterruptedException;
+
+/**
+ * Abstract cursor which wraps another cursor and transforms each storable
+ * result into a target storable. This class can be used for implementing
+ * one-to-one joins. Use {@link MultiTransformedCursor} for one-to-many joins.
+ *
+ * @author Brian S O'Neill
+ */
+public abstract class TransformedCursor<S, T> extends AbstractCursor<T> {
+ private final Cursor<S> mCursor;
+
+ private T mNext;
+
+ protected TransformedCursor(Cursor<S> cursor) {
+ if (cursor == null) {
+ throw new IllegalArgumentException();
+ }
+ mCursor = cursor;
+ }
+
+ /**
+ * This method must be implemented to transform storables. If the storable
+ * cannot be transformed, either throw a FetchException or return null. If
+ * null is returned, the storable is simply filtered out.
+ *
+ * @return transformed storable, or null to filter it out
+ */
+ protected abstract T transform(S storable) throws FetchException;
+
+ public void close() throws FetchException {
+ synchronized (mCursor) {
+ mCursor.close();
+ mNext = null;
+ }
+ }
+
+ public boolean hasNext() throws FetchException {
+ synchronized (mCursor) {
+ if (mNext != null) {
+ return true;
+ }
+ try {
+ int count = 0;
+ while (mCursor.hasNext()) {
+ T next = transform(mCursor.next());
+ if (next != null) {
+ mNext = next;
+ return true;
+ }
+ interruptCheck(++count);
+ }
+ } catch (NoSuchElementException e) {
+ }
+ return false;
+ }
+ }
+
+ public T next() throws FetchException {
+ synchronized (mCursor) {
+ if (hasNext()) {
+ T next = mNext;
+ mNext = null;
+ return next;
+ }
+ throw new NoSuchElementException();
+ }
+ }
+
+ public int skipNext(int amount) throws FetchException {
+ synchronized (mCursor) {
+ if (amount <= 0) {
+ if (amount < 0) {
+ throw new IllegalArgumentException("Cannot skip negative amount: " + amount);
+ }
+ return 0;
+ }
+
+ int count = 0;
+ while (--amount >= 0 && hasNext()) {
+ interruptCheck(++count);
+ mNext = null;
+ }
+
+ return count;
+ }
+ }
+
+ private void interruptCheck(int count) throws FetchException {
+ if ((count & ~0xff) == 0 && Thread.interrupted()) {
+ close();
+ throw new FetchInterruptedException();
+ }
+ }
+}
diff --git a/src/main/java/com/amazon/carbonado/cursor/UnionCursor.java b/src/main/java/com/amazon/carbonado/cursor/UnionCursor.java
new file mode 100644
index 0000000..7a62678
--- /dev/null
+++ b/src/main/java/com/amazon/carbonado/cursor/UnionCursor.java
@@ -0,0 +1,106 @@
+/*
+ * Copyright 2006 Amazon Technologies, Inc. or its affiliates.
+ * Amazon, Amazon.com and Carbonado are trademarks or registered trademarks
+ * of Amazon Technologies, Inc. or its affiliates. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.amazon.carbonado.cursor;
+
+import java.util.Comparator;
+import java.util.NoSuchElementException;
+
+import com.amazon.carbonado.FetchException;
+import com.amazon.carbonado.Cursor;
+
+/**
+ * Wraps two Cursors and performs a <i>set union</i> operation. In boolean
+ * logic, this is an <i>or</i> operation.
+ *
+ * <p>Both cursors must return results in the same order. Ordering is preserved
+ * by the union.
+ *
+ * @author Brian S O'Neill
+ * @see IntersectionCursor
+ * @see DifferenceCursor
+ * @see SymmetricDifferenceCursor
+ */
+public class UnionCursor<S> extends AbstractCursor<S> {
+ private final Cursor<S> mLeftCursor;
+ private final Cursor<S> mRightCursor;
+ private final Comparator<S> mOrder;
+
+ private S mNextLeft;
+ private S mNextRight;
+
+ /**
+ * @param left cursor to wrap
+ * @param right cursor to wrap
+ * @param order describes sort ordering of wrapped cursors, which must be
+ * a total ordering
+ */
+ public UnionCursor(Cursor<S> left, Cursor<S> right, Comparator<S> order) {
+ if (left == null || right == null || order == null) {
+ throw new IllegalArgumentException();
+ }
+ mLeftCursor = left;
+ mRightCursor = right;
+ mOrder = order;
+ }
+
+ public synchronized void close() throws FetchException {
+ mLeftCursor.close();
+ mRightCursor.close();
+ mNextLeft = null;
+ mNextRight = null;
+ }
+
+ public synchronized boolean hasNext() throws FetchException {
+ if (mNextLeft == null && mLeftCursor.hasNext()) {
+ mNextLeft = mLeftCursor.next();
+ }
+ if (mNextRight == null && mRightCursor.hasNext()) {
+ mNextRight = mRightCursor.next();
+ }
+ return mNextLeft != null || mNextRight != null;
+ }
+
+ public synchronized S next() throws FetchException {
+ if (hasNext()) {
+ S next;
+ if (mNextLeft == null) {
+ next = mNextRight;
+ mNextRight = null;
+ } else if (mNextRight == null) {
+ next = mNextLeft;
+ mNextLeft = null;
+ } else {
+ int result = mOrder.compare(mNextLeft, mNextRight);
+ if (result < 0) {
+ next = mNextLeft;
+ mNextLeft = null;
+ } else if (result > 0) {
+ next = mNextRight;
+ mNextRight = null;
+ } else {
+ next = mNextLeft;
+ mNextLeft = null;
+ mNextRight = null;
+ }
+ }
+ return next;
+ }
+ throw new NoSuchElementException();
+ }
+}
diff --git a/src/main/java/com/amazon/carbonado/cursor/WorkFilePool.java b/src/main/java/com/amazon/carbonado/cursor/WorkFilePool.java
new file mode 100644
index 0000000..94c7b84
--- /dev/null
+++ b/src/main/java/com/amazon/carbonado/cursor/WorkFilePool.java
@@ -0,0 +1,192 @@
+/*
+ * Copyright 2006 Amazon Technologies, Inc. or its affiliates.
+ * Amazon, Amazon.com and Carbonado are trademarks or registered trademarks
+ * of Amazon Technologies, Inc. or its affiliates. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.amazon.carbonado.cursor;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Used internally by {@link MergeSortBuffer}.
+ *
+ * @author Brian S O'Neill
+ */
+class WorkFilePool {
+ private static final String cTempDir = System.getProperty("java.io.tmpdir");
+
+ private static final ConcurrentMap<String, WorkFilePool> cPools =
+ new ConcurrentHashMap<String, WorkFilePool>();
+
+ /**
+ * @param tempDir directory to store temp files for merging, or null for default
+ */
+ static WorkFilePool getInstance(String tempDir) {
+ // This method uses ConcurrentMap features to eliminate the need to
+ // ever synchronize access, since this method may be called frequently.
+
+ if (tempDir == null) {
+ tempDir = cTempDir;
+ }
+
+ WorkFilePool pool = cPools.get(tempDir);
+
+ if (pool != null) {
+ return pool;
+ }
+
+ String canonical;
+ try {
+ canonical = new File(tempDir).getCanonicalPath();
+ } catch (IOException e) {
+ canonical = new File(tempDir).getAbsolutePath();
+ }
+
+ if (!canonical.equals(tempDir)) {
+ pool = getInstance(canonical);
+ cPools.putIfAbsent(tempDir, pool);
+ return pool;
+ }
+
+ pool = new WorkFilePool(new File(tempDir));
+
+ WorkFilePool existing = cPools.putIfAbsent(tempDir, pool);
+
+ if (existing == null) {
+ // New pool is the winner, so finish off initialization. Pool can
+ // be used concurrently by another thread without shutdown hook.
+ pool.addShutdownHook(tempDir);
+ } else {
+ pool = existing;
+ }
+
+ return pool;
+ }
+
+ private final File mTempDir;
+
+ // Work files not currently being used by any MergeSortBuffer.
+ private final List<RandomAccessFile> mWorkFilePool;
+ // Instances of MergeSortBuffer, to be notified on shutdown that they
+ // should close their work files.
+ private final Set<MergeSortBuffer<?>> mWorkFileUsers;
+
+ private Thread mShutdownHook;
+
+ /**
+ * @param tempDir directory to store temp files for merging, or null for default
+ */
+ private WorkFilePool(File tempDir) {
+ mTempDir = tempDir;
+ mWorkFilePool = new ArrayList<RandomAccessFile>();
+ mWorkFileUsers = new HashSet<MergeSortBuffer<?>>();
+ }
+
+ RandomAccessFile acquireWorkFile(MergeSortBuffer<?> buffer) throws IOException {
+ synchronized (mWorkFileUsers) {
+ mWorkFileUsers.add(buffer);
+ }
+ synchronized (mWorkFilePool) {
+ if (mWorkFilePool.size() > 0) {
+ return mWorkFilePool.remove(mWorkFilePool.size() - 1);
+ }
+ }
+ File file = File.createTempFile("carbonado-mergesort-", null, mTempDir);
+ file.deleteOnExit();
+ return new RandomAccessFile(file, "rw");
+ }
+
+ void releaseWorkFiles(List<RandomAccessFile> files) {
+ synchronized (mWorkFilePool) {
+ for (RandomAccessFile raf : files) {
+ try {
+ raf.seek(0);
+ // Return space to file system.
+ raf.setLength(0);
+ mWorkFilePool.add(raf);
+ } catch (IOException e) {
+ // Work file is broken, discard it.
+ try {
+ raf.close();
+ } catch (IOException e2) {
+ }
+ }
+ }
+ }
+ }
+
+ void unregisterWorkFileUser(MergeSortBuffer<?> buffer) {
+ synchronized (mWorkFileUsers) {
+ mWorkFileUsers.remove(buffer);
+ mWorkFileUsers.notify();
+ }
+ }
+
+ private synchronized void addShutdownHook(String tempDir) {
+ if (mShutdownHook != null) {
+ return;
+ }
+
+ // Add shutdown hook to close work files so that they can be deleted.
+ String threadName = "MergeSortBuffer shutdown (" + tempDir + ')';
+
+ mShutdownHook = new Thread(threadName) {
+ public void run() {
+ // Notify users of work files and wait for them to close.
+ synchronized (mWorkFileUsers) {
+ for (MergeSortBuffer<?> buffer : mWorkFileUsers) {
+ buffer.stop();
+ }
+ final long timeout = 10000;
+ final long giveup = System.currentTimeMillis() + timeout;
+ try {
+ while (mWorkFileUsers.size() > 0) {
+ long now = System.currentTimeMillis();
+ if (now < giveup) {
+ mWorkFileUsers.wait(giveup - now);
+ } else {
+ break;
+ }
+ }
+ } catch (InterruptedException e) {
+ }
+ }
+
+ synchronized (mWorkFilePool) {
+ for (RandomAccessFile raf : mWorkFilePool) {
+ try {
+ raf.close();
+ } catch (IOException e) {
+ }
+ }
+ mWorkFilePool.clear();
+ }
+ }
+ };
+
+ Runtime.getRuntime().addShutdownHook(mShutdownHook);
+ }
+}
diff --git a/src/main/java/com/amazon/carbonado/cursor/package-info.java b/src/main/java/com/amazon/carbonado/cursor/package-info.java
new file mode 100644
index 0000000..3553d1a
--- /dev/null
+++ b/src/main/java/com/amazon/carbonado/cursor/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Copyright 2006 Amazon Technologies, Inc. or its affiliates.
+ * Amazon, Amazon.com and Carbonado are trademarks or registered trademarks
+ * of Amazon Technologies, Inc. or its affiliates. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Support for advanced processing of cursor results, including basic set
+ * theory operations.
+ */
+package com.amazon.carbonado.cursor;