From ab635e23ad5fb9cd9edb61665a3654ee3e4b372a Mon Sep 17 00:00:00 2001
From: "Brian S. O'Neill" <bronee@gmail.com>
Date: Wed, 30 Aug 2006 01:55:10 +0000
Subject: Add cursor implementations

---
 .../amazon/carbonado/cursor/AbstractCursor.java    |  83 +++
 .../amazon/carbonado/cursor/ArraySortBuffer.java   |  62 ++
 .../amazon/carbonado/cursor/DifferenceCursor.java  | 120 ++++
 .../com/amazon/carbonado/cursor/EmptyCursor.java   | 103 ++++
 .../amazon/carbonado/cursor/FilteredCursor.java    | 148 +++++
 .../carbonado/cursor/FilteredCursorGenerator.java  | 665 +++++++++++++++++++++
 .../com/amazon/carbonado/cursor/GroupedCursor.java | 205 +++++++
 .../carbonado/cursor/IntersectionCursor.java       | 118 ++++
 .../amazon/carbonado/cursor/IteratorCursor.java    |  62 ++
 .../carbonado/cursor/JoinedCursorFactory.java      | 459 ++++++++++++++
 .../amazon/carbonado/cursor/MergeSortBuffer.java   | 498 +++++++++++++++
 .../carbonado/cursor/MultiTransformedCursor.java   | 132 ++++
 .../com/amazon/carbonado/cursor/SortBuffer.java    |  53 ++
 .../com/amazon/carbonado/cursor/SortedCursor.java  | 332 ++++++++++
 .../cursor/SymmetricDifferenceCursor.java          | 123 ++++
 .../amazon/carbonado/cursor/ThrottledCursor.java   | 101 ++++
 .../amazon/carbonado/cursor/TransformedCursor.java | 119 ++++
 .../com/amazon/carbonado/cursor/UnionCursor.java   | 106 ++++
 .../com/amazon/carbonado/cursor/WorkFilePool.java  | 192 ++++++
 .../com/amazon/carbonado/cursor/package-info.java  |  23 +
 20 files changed, 3704 insertions(+)
 create mode 100644 src/main/java/com/amazon/carbonado/cursor/AbstractCursor.java
 create mode 100644 src/main/java/com/amazon/carbonado/cursor/ArraySortBuffer.java
 create mode 100644 src/main/java/com/amazon/carbonado/cursor/DifferenceCursor.java
 create mode 100644 src/main/java/com/amazon/carbonado/cursor/EmptyCursor.java
 create mode 100644 src/main/java/com/amazon/carbonado/cursor/FilteredCursor.java
 create mode 100644 src/main/java/com/amazon/carbonado/cursor/FilteredCursorGenerator.java
 create mode 100644 src/main/java/com/amazon/carbonado/cursor/GroupedCursor.java
 create mode 100644 src/main/java/com/amazon/carbonado/cursor/IntersectionCursor.java
 create mode 100644 src/main/java/com/amazon/carbonado/cursor/IteratorCursor.java
 create mode 100644 src/main/java/com/amazon/carbonado/cursor/JoinedCursorFactory.java
 create mode 100644 src/main/java/com/amazon/carbonado/cursor/MergeSortBuffer.java
 create mode 100644 src/main/java/com/amazon/carbonado/cursor/MultiTransformedCursor.java
 create mode 100644 src/main/java/com/amazon/carbonado/cursor/SortBuffer.java
 create mode 100644 src/main/java/com/amazon/carbonado/cursor/SortedCursor.java
 create mode 100644 src/main/java/com/amazon/carbonado/cursor/SymmetricDifferenceCursor.java
 create mode 100644 src/main/java/com/amazon/carbonado/cursor/ThrottledCursor.java
 create mode 100644 src/main/java/com/amazon/carbonado/cursor/TransformedCursor.java
 create mode 100644 src/main/java/com/amazon/carbonado/cursor/UnionCursor.java
 create mode 100644 src/main/java/com/amazon/carbonado/cursor/WorkFilePool.java
 create mode 100644 src/main/java/com/amazon/carbonado/cursor/package-info.java

(limited to 'src/main/java/com/amazon/carbonado/cursor')

diff --git a/src/main/java/com/amazon/carbonado/cursor/AbstractCursor.java b/src/main/java/com/amazon/carbonado/cursor/AbstractCursor.java
new file mode 100644
index 0000000..c71393a
--- /dev/null
+++ b/src/main/java/com/amazon/carbonado/cursor/AbstractCursor.java
@@ -0,0 +1,83 @@
+/*
+ * Copyright 2006 Amazon Technologies, Inc. or its affiliates.
+ * Amazon, Amazon.com and Carbonado are trademarks or registered trademarks
+ * of Amazon Technologies, Inc. or its affiliates.  All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.amazon.carbonado.cursor;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import com.amazon.carbonado.Cursor;
+import com.amazon.carbonado.FetchException;
+
+/**
+ * AbstractCursor implements a small set of common Cursor methods.
+ *
+ * @author Brian S O'Neill
+ */
+public abstract class AbstractCursor<S> implements Cursor<S> {
+    // Note: Since constructor takes no parameters, this class is called
+    // Abstract instead of Base.
+    protected AbstractCursor() {
+    }
+
+    public int copyInto(Collection<? super S> c) throws FetchException {
+        int originalSize = c.size();
+        while (hasNext()) {
+            c.add(next());
+        }
+        return c.size() - originalSize;
+    }
+
+    public int copyInto(Collection<? super S> c, int limit) throws FetchException {
+        int originalSize = c.size();
+        while (--limit >= 0 && hasNext()) {
+            c.add(next());
+        }
+        return c.size() - originalSize;
+    }
+
+    public List<S> toList() throws FetchException {
+        List<S> list = new ArrayList<S>();
+        copyInto(list);
+        return list;
+    }
+
+    public List<S> toList(int limit) throws FetchException {
+        List<S> list = new ArrayList<S>();
+        copyInto(list, limit);
+        return list;
+    }
+
+    public synchronized int skipNext(int amount) throws FetchException {
+        if (amount <= 0) {
+            if (amount < 0) {
+                throw new IllegalArgumentException("Cannot skip negative amount: " + amount);
+            }
+            return 0;
+        }
+
+        int count = 0;
+        while (--amount >= 0 && hasNext()) {
+            next();
+            count++;
+        }
+
+        return count;
+    }
+}
diff --git a/src/main/java/com/amazon/carbonado/cursor/ArraySortBuffer.java b/src/main/java/com/amazon/carbonado/cursor/ArraySortBuffer.java
new file mode 100644
index 0000000..db74134
--- /dev/null
+++ b/src/main/java/com/amazon/carbonado/cursor/ArraySortBuffer.java
@@ -0,0 +1,62 @@
+/*
+ * Copyright 2006 Amazon Technologies, Inc. or its affiliates.
+ * Amazon, Amazon.com and Carbonado are trademarks or registered trademarks
+ * of Amazon Technologies, Inc. or its affiliates.  All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.amazon.carbonado.cursor;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+
+/**
+ * Sort buffer implementation backed by an ArrayList.
+ *
+ * @author Brian S O'Neill
+ * @see SortedCursor
+ */
+public class ArraySortBuffer<S> extends ArrayList<S> implements SortBuffer<S> {
+    private static final long serialVersionUID = -5622302375191321452L;
+
+    private Comparator<S> mComparator;
+
+    public ArraySortBuffer() {
+        super();
+    }
+
+    public ArraySortBuffer(int initialCapacity) {
+        super(initialCapacity);
+    }
+
+    public void prepare(Comparator<S> comparator) {
+        if (comparator == null) {
+            throw new IllegalArgumentException();
+        }
+        clear();
+        mComparator = comparator;
+    }
+
+    public void sort() {
+        if (mComparator == null) {
+            throw new IllegalStateException("Buffer was not prepared");
+        }
+        Collections.sort(this, mComparator);
+    }
+
+    public void close() {
+        clear();
+    }
+}
diff --git a/src/main/java/com/amazon/carbonado/cursor/DifferenceCursor.java b/src/main/java/com/amazon/carbonado/cursor/DifferenceCursor.java
new file mode 100644
index 0000000..39455c8
--- /dev/null
+++ b/src/main/java/com/amazon/carbonado/cursor/DifferenceCursor.java
@@ -0,0 +1,120 @@
+/*
+ * Copyright 2006 Amazon Technologies, Inc. or its affiliates.
+ * Amazon, Amazon.com and Carbonado are trademarks or registered trademarks
+ * of Amazon Technologies, Inc. or its affiliates.  All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.amazon.carbonado.cursor;
+
+import java.util.Comparator;
+import java.util.NoSuchElementException;
+
+import com.amazon.carbonado.FetchException;
+import com.amazon.carbonado.Cursor;
+
+/**
+ * Wraps two Cursors and performs an <i>asymmetric set difference</i>
+ * operation.
+ *
+ * <p>Both cursors must return results in the same order. Ordering is preserved
+ * by the difference.
+ *
+ * @author Brian S O'Neill
+ * @see UnionCursor
+ * @see IntersectionCursor
+ * @see SymmetricDifferenceCursor
+ */
+public class DifferenceCursor<S> extends AbstractCursor<S> {
+    private final Cursor<S> mLeftCursor;
+    private final Cursor<S> mRightCursor;
+    private final Comparator<S> mOrder;
+
+    private S mNext;
+    private S mNextRight;
+
+    /**
+     * @param left cursor to wrap
+     * @param right cursor to wrap whose results are completely discarded
+     * @param order describes sort ordering of wrapped cursors, which must be
+     * a total ordering
+     */
+    public DifferenceCursor(Cursor<S> left, Cursor<S> right, Comparator<S> order) {
+        if (left == null || right == null || order == null) {
+            throw new IllegalArgumentException();
+        }
+        mLeftCursor = left;
+        mRightCursor = right;
+        mOrder = order;
+    }
+
+    public synchronized void close() throws FetchException {
+        mLeftCursor.close();
+        mRightCursor.close();
+        mNext = null;
+        mNextRight = null;
+    }
+
+    public synchronized boolean hasNext() throws FetchException {
+        if (mNext != null) {
+            return true;
+        }
+
+        S nextLeft;
+
+        while (true) {
+            if (mLeftCursor.hasNext()) {
+                nextLeft = mLeftCursor.next();
+            } else {
+                close();
+                return false;
+            }
+            if (mNextRight == null) {
+                if (mRightCursor.hasNext()) {
+                    mNextRight = mRightCursor.next();
+                } else {
+                    mNext = nextLeft;
+                    return true;
+                }
+            }
+
+            while (true) {
+                int result = mOrder.compare(nextLeft, mNextRight);
+                if (result < 0) {
+                    mNext = nextLeft;
+                    return true;
+                } else if (result > 0) {
+                    if (mRightCursor.hasNext()) {
+                        mNextRight = mRightCursor.next();
+                    } else {
+                        mNextRight = null;
+                        mNext = nextLeft;
+                        return true;
+                    }
+                } else {
+                    break;
+                }
+            }
+        }
+    }
+
+    public synchronized S next() throws FetchException {
+        if (hasNext()) {
+            S next = mNext;
+            mNext = null;
+            return next;
+        }
+        throw new NoSuchElementException();
+    }
+}
diff --git a/src/main/java/com/amazon/carbonado/cursor/EmptyCursor.java b/src/main/java/com/amazon/carbonado/cursor/EmptyCursor.java
new file mode 100644
index 0000000..db3778b
--- /dev/null
+++ b/src/main/java/com/amazon/carbonado/cursor/EmptyCursor.java
@@ -0,0 +1,103 @@
+/*
+ * Copyright 2006 Amazon Technologies, Inc. or its affiliates.
+ * Amazon, Amazon.com and Carbonado are trademarks or registered trademarks
+ * of Amazon Technologies, Inc. or its affiliates.  All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.amazon.carbonado.cursor;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+import com.amazon.carbonado.Cursor;
+
+/**
+ * Special cursor implementation that is empty.
+ *
+ * @author Brian S O'Neill
+ */
+public class EmptyCursor<S> implements Cursor<S> {
+    private static final Cursor EMPTY_CURSOR = new EmptyCursor();
+
+    /**
+     * Returns the singleton empty cursor instance.
+     */
+    @SuppressWarnings("unchecked")
+    public static <S> Cursor<S> getEmptyCursor() {
+        return EMPTY_CURSOR;
+    }
+
+    // Package-private, to be used by test suite
+    EmptyCursor() {
+    }
+
+    /**
+     * Does nothing.
+     */
+    public void close() {
+    }
+
+    /**
+     * Always returns false.
+     */
+    public boolean hasNext() {
+        return false;
+    }
+
+    /**
+     * Always throws NoSuchElementException.
+     */
+    public S next() {
+        throw new NoSuchElementException();
+    }
+
+    /**
+     * Always returns 0.
+     */
+    public int skipNext(int amount) {
+        return 0;
+    }
+
+    /**
+     * Performs no copy and always returns 0.
+     */
+    public int copyInto(Collection<? super S> c) {
+        return 0;
+    }
+
+    /**
+     * Performs no copy and always returns 0.
+     */
+    public int copyInto(Collection<? super S> c, int limit) {
+        return 0;
+    }
+
+    /**
+     * Always returns an empty list.
+     */
+    public List<S> toList() {
+        return Collections.emptyList();
+    }
+
+    /**
+     * Always returns an empty list.
+     */
+    public List<S> toList(int limit) {
+        return Collections.emptyList();
+    }
+}
+
diff --git a/src/main/java/com/amazon/carbonado/cursor/FilteredCursor.java b/src/main/java/com/amazon/carbonado/cursor/FilteredCursor.java
new file mode 100644
index 0000000..60b3c81
--- /dev/null
+++ b/src/main/java/com/amazon/carbonado/cursor/FilteredCursor.java
@@ -0,0 +1,148 @@
+/*
+ * Copyright 2006 Amazon Technologies, Inc. or its affiliates.
+ * Amazon, Amazon.com and Carbonado are trademarks or registered trademarks
+ * of Amazon Technologies, Inc. or its affiliates.  All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.amazon.carbonado.cursor;
+
+import java.util.NoSuchElementException;
+
+import com.amazon.carbonado.Cursor;
+import com.amazon.carbonado.FetchException;
+import com.amazon.carbonado.FetchInterruptedException;
+import com.amazon.carbonado.Storable;
+
+import com.amazon.carbonado.filter.ClosedFilter;
+import com.amazon.carbonado.filter.Filter;
+import com.amazon.carbonado.filter.FilterValues;
+import com.amazon.carbonado.filter.OpenFilter;
+
+/**
+ * Wraps another cursor and applies custom filtering to reduce the set of
+ * results.
+ *
+ * @author Brian S O'Neill
+ */
+public abstract class FilteredCursor<S> extends AbstractCursor<S> {
+    /**
+     * Returns a Cursor that is filtered by the given Filter and FilterValues.
+     * The given Filter must be composed only of the same PropertyFilter
+     * instances as used to construct the FilterValues. An
+     * IllegalStateException will result otherwise.
+     *
+     * @param filter filter to apply
+     * @param filterValues values for filter
+     * @param cursor cursor to wrap
+     * @return wrapped cursor which filters results
+     * @throws IllegalStateException if any values are not specified
+     * @throws IllegalArgumentException if filter is closed
+     */
+    public static <S extends Storable> Cursor<S> applyFilter(Filter<S> filter,
+                                                             FilterValues<S> filterValues,
+                                                             Cursor<S> cursor)
+    {
+        if (filter instanceof OpenFilter) {
+            return cursor;
+        }
+        if (filter instanceof ClosedFilter) {
+            throw new IllegalArgumentException();
+        }
+
+        return FilteredCursorGenerator.getFactory(filter)
+            .newFilteredCursor(cursor, filterValues.getValuesFor(filter));
+    }
+
+    private final Cursor<S> mCursor;
+
+    private S mNext;
+
+    protected FilteredCursor(Cursor<S> cursor) {
+        if (cursor == null) {
+            throw new IllegalArgumentException();
+        }
+        mCursor = cursor;
+    }
+
+    /**
+     * @return false if object should not be in results
+     */
+    protected abstract boolean isAllowed(S storable);
+
+    public void close() throws FetchException {
+        synchronized (mCursor) {
+            mCursor.close();
+            mNext = null;
+        }
+    }
+
+    public boolean hasNext() throws FetchException {
+        synchronized (mCursor) {
+            if (mNext != null) {
+                return true;
+            }
+            try {
+                int count = 0;
+                while (mCursor.hasNext()) {
+                    S next = mCursor.next();
+                    if (isAllowed(next)) {
+                        mNext = next;
+                        return true;
+                    }
+                    interruptCheck(++count);
+                }
+            } catch (NoSuchElementException e) {
+            }
+            return false;
+        }
+    }
+
+    public S next() throws FetchException {
+        synchronized (mCursor) {
+            if (hasNext()) {
+                S next = mNext;
+                mNext = null;
+                return next;
+            }
+            throw new NoSuchElementException();
+        }
+    }
+
+    public int skipNext(int amount) throws FetchException {
+        synchronized (mCursor) {
+            if (amount <= 0) {
+                if (amount < 0) {
+                    throw new IllegalArgumentException("Cannot skip negative amount: " + amount);
+                }
+                return 0;
+            }
+
+            int count = 0;
+            while (--amount >= 0 && hasNext()) {
+                interruptCheck(++count);
+                mNext = null;
+            }
+
+            return count;
+        }
+    }
+
+    private void interruptCheck(int count) throws FetchException {
+        if ((count & ~0xff) == 0 && Thread.interrupted()) {
+            close();
+            throw new FetchInterruptedException();
+        }
+    }
+}
diff --git a/src/main/java/com/amazon/carbonado/cursor/FilteredCursorGenerator.java b/src/main/java/com/amazon/carbonado/cursor/FilteredCursorGenerator.java
new file mode 100644
index 0000000..ea1baa8
--- /dev/null
+++ b/src/main/java/com/amazon/carbonado/cursor/FilteredCursorGenerator.java
@@ -0,0 +1,665 @@
+/*
+ * Copyright 2006 Amazon Technologies, Inc. or its affiliates.
+ * Amazon, Amazon.com and Carbonado are trademarks or registered trademarks
+ * of Amazon Technologies, Inc. or its affiliates.  All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.amazon.carbonado.cursor;
+
+import java.lang.reflect.Method;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.Stack;
+
+import org.cojen.classfile.ClassFile;
+import org.cojen.classfile.CodeAssembler;
+import org.cojen.classfile.CodeBuilder;
+import org.cojen.classfile.Label;
+import org.cojen.classfile.LocalVariable;
+import org.cojen.classfile.MethodInfo;
+import org.cojen.classfile.Modifiers;
+import org.cojen.classfile.Opcode;
+import org.cojen.classfile.TypeDesc;
+import static org.cojen.classfile.TypeDesc.*;
+
+import org.cojen.util.ClassInjector;
+import org.cojen.util.WeakIdentityMap;
+
+import com.amazon.carbonado.Cursor;
+import com.amazon.carbonado.Storable;
+
+import com.amazon.carbonado.filter.AndFilter;
+import com.amazon.carbonado.filter.OrFilter;
+import com.amazon.carbonado.filter.Filter;
+import com.amazon.carbonado.filter.PropertyFilter;
+import com.amazon.carbonado.filter.RelOp;
+import com.amazon.carbonado.filter.Visitor;
+
+import com.amazon.carbonado.info.ChainedProperty;
+import com.amazon.carbonado.info.StorableProperty;
+
+import com.amazon.carbonado.util.QuickConstructorGenerator;
+
+
+/**
+ * Generates Cursor implementations that wrap another Cursor, applying a
+ * property matching filter on each result.
+ *
+ * @author Brian S O'Neill
+ * @see FilteredCursor
+ */
+class FilteredCursorGenerator {
+    private static Map cCache = new WeakIdentityMap();
+
+    /**
+     * Returns a factory for creating new filtered cursor instances.
+     *
+     * @param filter filter specification
+     * @throws IllegalArgumentException if filter is null
+     * @throws UnsupportedOperationException if filter is not supported
+     */
+    @SuppressWarnings("unchecked")
+    static <S extends Storable> Factory<S> getFactory(Filter<S> filter) {
+        if (filter == null) {
+            throw new IllegalArgumentException();
+        }
+        synchronized (cCache) {
+            Factory<S> factory = (Factory<S>) cCache.get(filter);
+            if (factory != null) {
+                return factory;
+            }
+            Class<Cursor<S>> clazz = generateClass(filter);
+            factory = QuickConstructorGenerator.getInstance(clazz, Factory.class);
+            cCache.put(filter, factory);
+            return factory;
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    private static <S extends Storable> Class<Cursor<S>> generateClass(Filter<S> filter) {
+        filter.accept(new Validator<S>(), null);
+
+        Class<S> type = filter.getStorableType();
+
+        String packageName;
+        {
+            String name = type.getName();
+            int index = name.lastIndexOf('.');
+            if (index >= 0) {
+                packageName = name.substring(0, index);
+            } else {
+                packageName = "";
+            }
+        }
+
+        // Try to generate against the same loader as the storable type. This
+        // allows the generated class access to it, preventing a
+        // NoClassDefFoundError.
+        ClassLoader loader = type.getClassLoader();
+
+        ClassInjector ci = ClassInjector.create(packageName + ".FilteredCursor", loader);
+        ClassFile cf = new ClassFile(ci.getClassName(), FilteredCursor.class);
+        cf.markSynthetic();
+        cf.setSourceFile(FilteredCursorGenerator.class.getName());
+        cf.setTarget("1.5");
+
+        // Begin constructor definition.
+        CodeBuilder ctorBuilder;
+        {
+            TypeDesc cursorType = TypeDesc.forClass(Cursor.class);
+            TypeDesc objectArrayType = TypeDesc.forClass(Object[].class);
+            TypeDesc[] params = {cursorType, objectArrayType};
+            MethodInfo ctor = cf.addConstructor(Modifiers.PUBLIC, params);
+            ctorBuilder = new CodeBuilder(ctor);
+            ctorBuilder.loadThis();
+            ctorBuilder.loadLocal(ctorBuilder.getParameter(0));
+            ctorBuilder.invokeSuperConstructor(new TypeDesc[] {cursorType});
+        }
+
+        // Begin isAllowed method definition.
+        CodeBuilder isAllowedBuilder;
+        LocalVariable storableVar;
+        {
+            TypeDesc[] params = {TypeDesc.OBJECT};
+            MethodInfo mi = cf.addMethod(Modifiers.PUBLIC, "isAllowed", BOOLEAN, params);
+            isAllowedBuilder = new CodeBuilder(mi);
+
+            storableVar = isAllowedBuilder.getParameter(0);
+
+            // Filter out any instances of null. Shouldn't happen though.
+            isAllowedBuilder.loadLocal(storableVar);
+            Label notNull = isAllowedBuilder.createLabel();
+            isAllowedBuilder.ifNullBranch(notNull, false);
+            isAllowedBuilder.loadConstant(false);
+            isAllowedBuilder.returnValue(BOOLEAN);
+            notNull.setLocation();
+
+            // Cast the parameter to the expected type.
+            isAllowedBuilder.loadLocal(storableVar);
+            TypeDesc storableType = TypeDesc.forClass(type);
+            isAllowedBuilder.checkCast(storableType);
+            storableVar = isAllowedBuilder.createLocalVariable("storable", storableType);
+            isAllowedBuilder.storeLocal(storableVar);
+        }
+
+        filter.accept(new CodeGen<S>(cf, ctorBuilder, isAllowedBuilder, storableVar), null);
+
+        // Finish constructor.
+        ctorBuilder.returnVoid();
+
+        return (Class<Cursor<S>>) ci.defineClass(cf);
+    }
+
+    public static interface Factory<S extends Storable> {
+        /**
+         * @param cursor cursor to wrap and filter
+         * @param filterValues values corresponding to original filter used to create this factory
+         */
+        Cursor<S> newFilteredCursor(Cursor<S> cursor, Object... filterValues);
+    }
+
+    /**
+     * Ensures properties can be read and that relational operation is
+     * supported.
+     */
+    private static class Validator<S extends Storable> extends Visitor<S, Object, Object> {
+        Validator() {
+        }
+
+        public Object visit(PropertyFilter<S> filter, Object param) {
+            ChainedProperty<S> chained = filter.getChainedProperty();
+
+            switch (filter.getOperator()) {
+            case EQ: case NE:
+                // Use equals() method instead of comparator.
+                break;
+            default:
+                TypeDesc typeDesc = TypeDesc.forClass(chained.getType()).toObjectType();
+
+                if (!Comparable.class.isAssignableFrom(typeDesc.toClass())) {
+                    throw new UnsupportedOperationException
+                        ("Property \"" + chained + "\" does not implement Comparable");
+                }
+                break;
+            }
+
+            // Follow the chain, verifying that each property has an accessible
+            // read method.
+
+            checkForReadMethod(chained, chained.getPrimeProperty());
+            for (int i=0; i<chained.getChainCount(); i++) {
+                checkForReadMethod(chained, chained.getChainedProperty(i));
+            }
+
+            return null;
+        }
+
+        private void checkForReadMethod(ChainedProperty<S> chained,
+                                        StorableProperty<?> property) {
+            if (property.getReadMethod() == null) {
+                String msg;
+                if (chained.getChainCount() == 0) {
+                    msg = "Property \"" + property.getName() + "\" cannot be read";
+                } else {
+                    msg = "Property \"" + property.getName() + "\" of \"" + chained +
+                        "\" cannot be read";
+                }
+                throw new UnsupportedOperationException(msg);
+            }
+        }
+    }
+
+    private static class CodeGen<S extends Storable> extends Visitor<S, Object, Object> {
+        private static String FIELD_PREFIX = "value$";
+
+        private final ClassFile mClassFile;
+        private final CodeBuilder mCtorBuilder;
+        private final CodeBuilder mIsAllowedBuilder;
+        private final LocalVariable mStorableVar;
+
+        private final Stack<Scope> mScopeStack;
+
+        private int mPropertyOrdinal;
+
+        CodeGen(ClassFile cf,
+                CodeBuilder ctorBuilder,
+                CodeBuilder isAllowedBuilder, LocalVariable storableVar) {
+            mClassFile = cf;
+            mCtorBuilder = ctorBuilder;
+            mIsAllowedBuilder = isAllowedBuilder;
+            mStorableVar = storableVar;
+            mScopeStack = new Stack<Scope>();
+            mScopeStack.push(new Scope(null, null));
+        }
+
+        public Object visit(OrFilter<S> filter, Object param) {
+            Label failLocation = mIsAllowedBuilder.createLabel();
+            // Inherit success location to short-circuit if 'or' test succeeds.
+            mScopeStack.push(new Scope(failLocation, getScope().mSuccessLocation));
+            filter.getLeftFilter().accept(this, param);
+            failLocation.setLocation();
+            mScopeStack.pop();
+            filter.getRightFilter().accept(this, param);
+            return null;
+        }
+
+        public Object visit(AndFilter<S> filter, Object param) {
+            Label successLocation = mIsAllowedBuilder.createLabel();
+            // Inherit fail location to short-circuit if 'and' test fails.
+            mScopeStack.push(new Scope(getScope().mFailLocation, successLocation));
+            filter.getLeftFilter().accept(this, param);
+            successLocation.setLocation();
+            mScopeStack.pop();
+            filter.getRightFilter().accept(this, param);
+            return null;
+        }
+
+        public Object visit(PropertyFilter<S> filter, Object param) {
+            ChainedProperty<S> chained = filter.getChainedProperty();
+            TypeDesc type = TypeDesc.forClass(chained.getType());
+            TypeDesc fieldType = actualFieldType(type);
+            String fieldName = FIELD_PREFIX + mPropertyOrdinal;
+
+            // Define storage field.
+            mClassFile.addField(Modifiers.PRIVATE.toFinal(true), fieldName, fieldType);
+
+            // Add code to constructor to store value into field.
+            CodeBuilder b = mCtorBuilder;
+            b.loadThis();
+            b.loadLocal(b.getParameter(1));
+            b.loadConstant(mPropertyOrdinal);
+            b.loadFromArray(OBJECT);
+            b.checkCast(type.toObjectType());
+            b.convert(type.toObjectType(), fieldType, CodeAssembler.CONVERT_FP_BITS);
+            b.storeField(fieldName, fieldType);
+
+            // Add code to load property value to stack.
+            b = mIsAllowedBuilder;
+            b.loadLocal(mStorableVar);
+            loadProperty(b, chained.getPrimeProperty());
+            for (int i=0; i<chained.getChainCount(); i++) {
+                // Check if last loaded property was null, and fail if so.
+                b.dup();
+                Label notNull = b.createLabel();
+                b.ifNullBranch(notNull, false);
+                b.pop();
+                getScope().fail(b);
+                notNull.setLocation();
+
+                // Now load next property in chain.
+                loadProperty(b, chained.getChainedProperty(i));
+            }
+
+            addPropertyFilter(b, type, filter.getOperator());
+
+            mPropertyOrdinal++;
+            return null;
+        }
+
+        private Scope getScope() {
+            return mScopeStack.peek();
+        }
+
+        private void loadProperty(CodeBuilder b, StorableProperty<?> property) {
+            Method readMethod = property.getReadMethod();
+            if (readMethod == null) {
+                // Invoke synthetic package accessible method.
+                String readName = property.getReadMethodName();
+                try {
+                    readMethod = property.getEnclosingType().getDeclaredMethod(readName);
+                } catch (NoSuchMethodException e) {
+                    // This shouldn't happen since it was checked for earlier.
+                    throw new IllegalArgumentException
+                        ("Property \"" + property.getName() + "\" cannot be read");
+                }
+            }
+            b.invoke(readMethod);
+        }
+
+        private void addPropertyFilter(CodeBuilder b, TypeDesc type, RelOp relOp) {
+            TypeDesc fieldType = actualFieldType(type);
+            String fieldName = FIELD_PREFIX + mPropertyOrdinal;
+
+            if (type.getTypeCode() == OBJECT_CODE) {
+                // Check if actual property being examined is null.
+
+                LocalVariable actualValue = b.createLocalVariable("temp", type);
+                b.storeLocal(actualValue);
+                b.loadLocal(actualValue);
+                Label notNull = b.createLabel();
+
+                switch (relOp) {
+                case EQ: default: // actual = ?
+                    // If actual value is null, so test value must be null also
+                    // to succeed.
+                case LE: // actual <= ?
+                    // If actual value is null, and since null is high, test
+                    // value must be null also to succeed.
+                    b.ifNullBranch(notNull, false);
+                    b.loadThis();
+                    b.loadField(fieldName, fieldType);
+                    getScope().successIfNullElseFail(b, true);
+                    break;
+                case NE: // actual != ?
+                    // If actual value is null, so test value must be non-null
+                    // to succeed.
+                case GT: // actual > ?
+                    // If actual value is null, and since null is high, test
+                    // value must be non-null to succeed.
+                    b.ifNullBranch(notNull, false);
+                    b.loadThis();
+                    b.loadField(fieldName, fieldType);
+                    getScope().successIfNullElseFail(b, false);
+                    break;
+                case LT: // actual < ?
+                    // Since null is high, always fail if actual value is
+                    // null. Don't need to examine test value.
+                    getScope().failIfNull(b, true);
+                    break;
+                case GE: // actual >= ?
+                    // Since null is high, always succeed if actual value is
+                    // null. Don't need to examine test value.
+                    getScope().successIfNull(b, true);
+                    break;
+                }
+
+                notNull.setLocation();
+
+                // At this point, actual property value is known to not be null.
+
+                // Check if property being tested for is null.
+                b.loadThis();
+                b.loadField(fieldName, fieldType);
+
+                switch (relOp) {
+                case EQ: default: // non-null actual = ?
+                    // If test value is null, fail.
+                case GT: // non-null actual > ?
+                    // If test value is null, and since null is high, fail.
+                case GE: // non-null actual >= ?
+                    // If test value is null, and since null is high, fail.
+                    getScope().failIfNull(b, true);
+                    break;
+                case NE: // non-null actual != ?
+                    // If test value is null, succeed
+                case LE: // non-null actual <= ?
+                    // If test value is null, and since null is high, succeed.
+                case LT: // non-null actual < ?
+                    // If test value is null, and since null is high, succeed.
+                    getScope().successIfNull(b, true);
+                    break;
+                }
+
+                b.loadLocal(actualValue);
+            }
+
+            // When this point is reached, actual property value is on the
+            // operand stack, and it is non-null. Property value to test
+            // against is not on the stack, but it is known to be non-null.
+
+            TypeDesc primitiveType = type.toPrimitiveType();
+
+            if (primitiveType == null) {
+                b.loadThis();
+                b.loadField(fieldName, fieldType);
+
+                // Do object comparison
+                switch (relOp) {
+                case EQ: case NE: default:
+                    if (fieldType.isArray()) {
+                        TypeDesc arraysDesc = TypeDesc.forClass(Arrays.class);
+                        TypeDesc componentType = fieldType.getComponentType();
+                        TypeDesc arrayType = fieldType;
+                        String methodName;
+                        if (componentType.isArray()) {
+                            methodName = "deepEquals";
+                            arrayType = OBJECT.toArrayType();
+                        } else {
+                            methodName = "equals";
+                            if (!componentType.isPrimitive()) {
+                                arrayType = OBJECT.toArrayType();
+                            }
+                        }
+                        b.invokeStatic(arraysDesc, methodName, BOOLEAN,
+                                       new TypeDesc[] {arrayType, arrayType});
+                    } else {
+                        b.invokeVirtual(OBJECT, "equals", BOOLEAN, new TypeDesc[] {OBJECT});
+                    }
+                    // Success if boolean value is non-zero for EQ, zero for NE.
+                    getScope().successIfZeroComparisonElseFail(b, relOp.reverse());
+                    break;
+
+                case GT: case GE: case LE: case LT:
+                    // Compare method exists because it was checked for earlier
+                    b.invokeInterface(TypeDesc.forClass(Comparable.class), "compareTo",
+                                      INT, new TypeDesc[] {OBJECT});
+                    getScope().successIfZeroComparisonElseFail(b, relOp);
+                    break;
+                }
+            } else {
+                if (!type.isPrimitive()) {
+                    // Extract primitive value out of wrapper.
+                    b.convert(type, primitiveType);
+                }
+
+                // Floating point values are compared based on actual
+                // bits. This allows NaN to be considered in the comparison.
+                if (primitiveType == FLOAT) {
+                    b.convert(primitiveType, INT, CodeBuilder.CONVERT_FP_BITS);
+                    primitiveType = INT;
+                } else if (primitiveType == DOUBLE) {
+                    b.convert(primitiveType, LONG, CodeBuilder.CONVERT_FP_BITS);
+                    primitiveType = LONG;
+                }
+
+                b.loadThis();
+                b.loadField(fieldName, fieldType);
+                // No need to do anything special for floating point since it
+                // has been pre-converted to bits.
+                b.convert(fieldType, primitiveType);
+
+                switch (primitiveType.getTypeCode()) {
+                case INT_CODE: default:
+                    getScope().successIfComparisonElseFail(b, relOp);
+                    break;
+
+                case LONG_CODE:
+                    b.math(Opcode.LCMP);
+                    getScope().successIfZeroComparisonElseFail(b, relOp);
+                    break;
+                }
+            }
+        }
+
+        /**
+         * Returns the actual field type used to store the given property
+         * type. Floating point values are represented in their "bit" form, in
+         * order to compare against NaN.
+         */
+        private static TypeDesc actualFieldType(TypeDesc type) {
+            if (type.toPrimitiveType() == FLOAT) {
+                if (type.isPrimitive()) {
+                    type = INT;
+                } else {
+                    type = INT.toObjectType();
+                }
+            } else if (type.toPrimitiveType() == DOUBLE) {
+                if (type.isPrimitive()) {
+                    type = LONG;
+                } else {
+                    type = LONG.toObjectType();
+                }
+            }
+            return type;
+        }
+
+        /**
+         * Defines boolean logic branching scope.
+         */
+        private static class Scope {
+            // If null, return false on test failure.
+            final Label mFailLocation;
+            // If null, return true on test success.
+            final Label mSuccessLocation;
+
+            Scope(Label failLocation, Label successLocation) {
+                mFailLocation = failLocation;
+                mSuccessLocation = successLocation;
+            }
+
+            void fail(CodeBuilder b) {
+                if (mFailLocation != null) {
+                    b.branch(mFailLocation);
+                } else {
+                    b.loadConstant(false);
+                    b.returnValue(BOOLEAN);
+                }
+            }
+
+            /**
+             * Branch to the fail location if the value on the stack is
+             * null. When choice is false, fail on non-null.
+             *
+             * @param choice if true, fail when null, else fail when not null
+             */
+            void failIfNull(CodeBuilder b, boolean choice) {
+                if (mFailLocation != null) {
+                    b.ifNullBranch(mFailLocation, choice);
+                } else {
+                    Label pass = b.createLabel();
+                    b.ifNullBranch(pass, !choice);
+                    b.loadConstant(false);
+                    b.returnValue(BOOLEAN);
+                    pass.setLocation();
+                }
+            }
+
+            void success(CodeBuilder b) {
+                if (mSuccessLocation != null) {
+                    b.branch(mSuccessLocation);
+                } else {
+                    b.loadConstant(true);
+                    b.returnValue(BOOLEAN);
+                }
+            }
+
+            /**
+             * Branch to the success location if the value on the stack is
+             * null. When choice is false, success on non-null.
+             *
+             * @param choice if true, success when null, else success when not null
+             */
+            void successIfNull(CodeBuilder b, boolean choice) {
+                if (mSuccessLocation != null) {
+                    b.ifNullBranch(mSuccessLocation, choice);
+                } else {
+                    Label pass = b.createLabel();
+                    b.ifNullBranch(pass, !choice);
+                    b.loadConstant(true);
+                    b.returnValue(BOOLEAN);
+                    pass.setLocation();
+                }
+            }
+
+            /**
+             * Branch to the success location if the value on the stack is
+             * null. When choice is false, success on non-null. If the success
+             * condition is not met, the fail location is branched to.
+             *
+             * @param choice if true, success when null, else success when not null
+             */
+            void successIfNullElseFail(CodeBuilder b, boolean choice) {
+                if (mSuccessLocation != null) {
+                    b.ifNullBranch(mSuccessLocation, choice);
+                    fail(b);
+                } else if (mFailLocation != null) {
+                    b.ifNullBranch(mFailLocation, !choice);
+                    success(b);
+                } else {
+                    Label success = b.createLabel();
+                    b.ifNullBranch(success, choice);
+                    b.loadConstant(false);
+                    b.returnValue(BOOLEAN);
+                    success.setLocation();
+                    b.loadConstant(true);
+                    b.returnValue(BOOLEAN);
+                }
+            }
+
+            void successIfZeroComparisonElseFail(CodeBuilder b, RelOp relOp) {
+                if (mSuccessLocation != null) {
+                    b.ifZeroComparisonBranch(mSuccessLocation, relOpToChoice(relOp));
+                    fail(b);
+                } else if (mFailLocation != null) {
+                    b.ifZeroComparisonBranch(mFailLocation, relOpToChoice(relOp.reverse()));
+                    success(b);
+                } else {
+                    if (relOp == RelOp.NE) {
+                        b.returnValue(BOOLEAN);
+                    } else if (relOp == RelOp.EQ) {
+                        b.loadConstant(1);
+                        b.math(Opcode.IAND);
+                        b.loadConstant(1);
+                        b.math(Opcode.IXOR);
+                        b.returnValue(BOOLEAN);
+                    } else {
+                        Label success = b.createLabel();
+                        b.ifZeroComparisonBranch(success, relOpToChoice(relOp));
+                        b.loadConstant(false);
+                        b.returnValue(BOOLEAN);
+                        success.setLocation();
+                        b.loadConstant(true);
+                        b.returnValue(BOOLEAN);
+                    }
+                }
+            }
+
+            void successIfComparisonElseFail(CodeBuilder b, RelOp relOp) {
+                if (mSuccessLocation != null) {
+                    b.ifComparisonBranch(mSuccessLocation, relOpToChoice(relOp));
+                    fail(b);
+                } else if (mFailLocation != null) {
+                    b.ifComparisonBranch(mFailLocation, relOpToChoice(relOp.reverse()));
+                    success(b);
+                } else {
+                    Label success = b.createLabel();
+                    b.ifComparisonBranch(success, relOpToChoice(relOp));
+                    b.loadConstant(false);
+                    b.returnValue(BOOLEAN);
+                    success.setLocation();
+                    b.loadConstant(true);
+                    b.returnValue(BOOLEAN);
+                }
+            }
+
+            private String relOpToChoice(RelOp relOp) {
+                switch (relOp) {
+                case EQ: default:
+                    return "==";
+                case NE:
+                    return "!=";
+                case LT:
+                    return "<";
+                case GE:
+                    return ">=";
+                case GT:
+                    return ">";
+                case LE:
+                    return "<=";
+                }
+            }
+        }
+    }
+}
diff --git a/src/main/java/com/amazon/carbonado/cursor/GroupedCursor.java b/src/main/java/com/amazon/carbonado/cursor/GroupedCursor.java
new file mode 100644
index 0000000..0b17e50
--- /dev/null
+++ b/src/main/java/com/amazon/carbonado/cursor/GroupedCursor.java
@@ -0,0 +1,205 @@
+/*
+ * Copyright 2006 Amazon Technologies, Inc. or its affiliates.
+ * Amazon, Amazon.com and Carbonado are trademarks or registered trademarks
+ * of Amazon Technologies, Inc. or its affiliates.  All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.amazon.carbonado.cursor;
+
+import java.util.Comparator;
+import java.util.NoSuchElementException;
+
+import org.cojen.util.BeanComparator;
+
+import com.amazon.carbonado.Cursor;
+import com.amazon.carbonado.FetchException;
+import com.amazon.carbonado.FetchInterruptedException;
+
+/**
+ * Abstract cursor for aggregation and finding distinct data. The source cursor
+ * must be ordered in some fashion by the grouping properties. The sequence of
+ * properties must match, but it does not matter if they are ascending or
+ * descending.
+ *
+ * @author Brian S O'Neill
+ * @see SortedCursor
+ */
+public abstract class GroupedCursor<S, G> extends AbstractCursor<G> {
+    private final Cursor<S> mCursor;
+    private final Comparator<S> mGroupComparator;
+
+    private S mGroupLeader;
+    private G mNextAggregate;
+
+    /**
+     * Create a GroupedCursor with an existing group comparator. The comparator
+     * defines the ordering of the source cursor, and it should be a partial
+     * odering. If group comparator defines a total ordering, then all groups
+     * have one member.
+     *
+     * @param cursor source of elements which must be ordered properly
+     * @param groupComparator comparator which defines ordering of source cursor
+     */
+    protected GroupedCursor(Cursor<S> cursor, Comparator<S> groupComparator) {
+        if (cursor == null || groupComparator == null) {
+            throw new IllegalArgumentException();
+        }
+        mCursor = cursor;
+        mGroupComparator = groupComparator;
+    }
+
+    /**
+     * Create a GroupedCursor using properties to define the group
+     * comparator. The set of properties defines the ordering of the source
+     * cursor, and it should be a partial ordering. If properties define a
+     * total ordering, then all groups have one member.
+     *
+     * @param cursor source of elements which must be ordered properly
+     * @param type type of storable to create cursor for
+     * @param groupProperties list of properties to group by
+     * @throws IllegalArgumentException if any property is null or not a member
+     * of storable type
+     */
+    protected GroupedCursor(Cursor<S> cursor, Class<S> type, String... groupProperties) {
+        if (cursor == null) {
+            throw new IllegalArgumentException();
+        }
+        mCursor = cursor;
+        mGroupComparator = SortedCursor.createComparator(type, groupProperties);
+    }
+
+    /**
+     * Returns the comparator used to identify group boundaries.
+     */
+    public Comparator<S> comparator() {
+        return mGroupComparator;
+    }
+
+    /**
+     * This method is called for the first entry in a group. This method is not
+     * called again until after finishGroup is called.
+     *
+     * @param groupLeader first entry in group
+     */
+    protected abstract void beginGroup(S groupLeader) throws FetchException;
+
+    /**
+     * This method is called when more entries are found for the current
+     * group. This method is not called until after beginGroup has been
+     * called. It may called multiple times until finishGroup is called.
+     *
+     * @param groupMember additional entry in group
+     */
+    protected abstract void addToGroup(S groupMember) throws FetchException;
+
+    /**
+     * This method is called when a group is finished, and it can return an
+     * aggregate. Simply return null if aggregate should be filtered out.
+     *
+     * @return aggregate, or null to filter it out
+     */
+    protected abstract G finishGroup() throws FetchException;
+
+    public void close() throws FetchException {
+        synchronized (mCursor) {
+            mCursor.close();
+            mGroupLeader = null;
+            mNextAggregate = null;
+        }
+    }
+
+    public boolean hasNext() throws FetchException {
+        synchronized (mCursor) {
+            if (mNextAggregate != null) {
+                return true;
+            }
+
+            try {
+                int count = 0;
+                if (mCursor.hasNext()) {
+                    if (mGroupLeader == null) {
+                        beginGroup(mGroupLeader = mCursor.next());
+                    }
+
+                    while (mCursor.hasNext()) {
+                        S groupMember = mCursor.next();
+
+                        if (mGroupComparator.compare(mGroupLeader, groupMember) == 0) {
+                            addToGroup(groupMember);
+                        } else {
+                            G aggregate = finishGroup();
+
+                            beginGroup(mGroupLeader = groupMember);
+
+                            if (aggregate != null) {
+                                mNextAggregate = aggregate;
+                                return true;
+                            }
+                        }
+
+                        interruptCheck(++count);
+                    }
+
+                    G aggregate = finishGroup();
+
+                    if (aggregate != null) {
+                        mNextAggregate = aggregate;
+                        return true;
+                    }
+                }
+            } catch (NoSuchElementException e) {
+            }
+
+            return false;
+        }
+    }
+
+    public G next() throws FetchException {
+        synchronized (mCursor) {
+            if (hasNext()) {
+                G next = mNextAggregate;
+                mNextAggregate = null;
+                return next;
+            }
+            throw new NoSuchElementException();
+        }
+    }
+
+    public int skipNext(int amount) throws FetchException {
+        synchronized (mCursor) {
+            if (amount <= 0) {
+                if (amount < 0) {
+                    throw new IllegalArgumentException("Cannot skip negative amount: " + amount);
+                }
+                return 0;
+            }
+
+            int count = 0;
+            while (--amount >= 0 && hasNext()) {
+                interruptCheck(++count);
+                mNextAggregate = null;
+            }
+
+            return count;
+        }
+    }
+
+    private void interruptCheck(int count) throws FetchException {
+        if ((count & ~0xff) == 0 && Thread.interrupted()) {
+            close();
+            throw new FetchInterruptedException();
+        }
+    }
+}
diff --git a/src/main/java/com/amazon/carbonado/cursor/IntersectionCursor.java b/src/main/java/com/amazon/carbonado/cursor/IntersectionCursor.java
new file mode 100644
index 0000000..9d11b2f
--- /dev/null
+++ b/src/main/java/com/amazon/carbonado/cursor/IntersectionCursor.java
@@ -0,0 +1,118 @@
+/*
+ * Copyright 2006 Amazon Technologies, Inc. or its affiliates.
+ * Amazon, Amazon.com and Carbonado are trademarks or registered trademarks
+ * of Amazon Technologies, Inc. or its affiliates.  All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.amazon.carbonado.cursor;
+
+import java.util.Comparator;
+import java.util.NoSuchElementException;
+
+import com.amazon.carbonado.FetchException;
+import com.amazon.carbonado.Cursor;
+
+/**
+ * Wraps two Cursors and performs a <i>set intersection</i> operation. In
+ * boolean logic, this is an <i>and</i> operation.
+ *
+ * <p>Both cursors must return results in the same order. Ordering is preserved
+ * by the intersection.
+ *
+ * @author Brian S O'Neill
+ * @see UnionCursor
+ * @see DifferenceCursor
+ * @see SymmetricDifferenceCursor
+ */
+public class IntersectionCursor<S> extends AbstractCursor<S> {
+    private final Cursor<S> mLeftCursor;
+    private final Cursor<S> mRightCursor;
+    private final Comparator<S> mOrder;
+
+    private S mNext;
+
+    /**
+     * @param left cursor to wrap
+     * @param right cursor to wrap
+     * @param order describes sort ordering of wrapped cursors, which must be
+     * a total ordering
+     */
+    public IntersectionCursor(Cursor<S> left, Cursor<S> right, Comparator<S> order) {
+        if (left == null || right == null || order == null) {
+            throw new IllegalArgumentException();
+        }
+        mLeftCursor = left;
+        mRightCursor = right;
+        mOrder = order;
+    }
+
+    public synchronized void close() throws FetchException {
+        mLeftCursor.close();
+        mRightCursor.close();
+        mNext = null;
+    }
+
+    public synchronized boolean hasNext() throws FetchException {
+        if (mNext != null) {
+            return true;
+        }
+
+        S nextLeft, nextRight;
+
+        if (mLeftCursor.hasNext()) {
+            nextLeft = mLeftCursor.next();
+        } else {
+            close();
+            return false;
+        }
+        if (mRightCursor.hasNext()) {
+            nextRight = mRightCursor.next();
+        } else {
+            close();
+            return false;
+        }
+
+        while (true) {
+            int result = mOrder.compare(nextLeft, nextRight);
+            if (result < 0) {
+                if (mLeftCursor.hasNext()) {
+                    nextLeft = mLeftCursor.next();
+                } else {
+                    close();
+                    return false;
+                }
+            } else if (result > 0) {
+                if (mRightCursor.hasNext()) {
+                    nextRight = mRightCursor.next();
+                } else {
+                    close();
+                    return false;
+                }
+            } else {
+                mNext = nextLeft;
+                return true;
+            }
+        }
+    }
+
+    public synchronized S next() throws FetchException {
+        if (hasNext()) {
+            S next = mNext;
+            mNext = null;
+            return next;
+        }
+        throw new NoSuchElementException();
+    }
+}
diff --git a/src/main/java/com/amazon/carbonado/cursor/IteratorCursor.java b/src/main/java/com/amazon/carbonado/cursor/IteratorCursor.java
new file mode 100644
index 0000000..0330629
--- /dev/null
+++ b/src/main/java/com/amazon/carbonado/cursor/IteratorCursor.java
@@ -0,0 +1,62 @@
+/*
+ * Copyright 2006 Amazon Technologies, Inc. or its affiliates.
+ * Amazon, Amazon.com and Carbonado are trademarks or registered trademarks
+ * of Amazon Technologies, Inc. or its affiliates.  All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.amazon.carbonado.cursor;
+
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+/**
+ * Adapts an Iterator into a Cursor.
+ *
+ * @author Brian S O'Neill
+ */
+public class IteratorCursor<S> extends AbstractCursor<S> {
+    private Iterator<S> mIterator;
+
+    /**
+     * @param iterable collection to iterate over, or null for empty cursor
+     */
+    public IteratorCursor(Iterable<S> iterable) {
+        this(iterable == null ? (Iterator<S>) null : iterable.iterator());
+    }
+
+    /**
+     * @param iterator iterator to wrap, or null for empty cursor
+     */
+    public IteratorCursor(Iterator<S> iterator) {
+        mIterator = iterator;
+    }
+
+    public void close() {
+        mIterator = null;
+    }
+
+    public boolean hasNext() {
+        Iterator it = mIterator;
+        return it != null && it.hasNext();
+    }
+
+    public S next() {
+        Iterator<S> it = mIterator;
+        if (it == null) {
+            throw new NoSuchElementException();
+        }
+        return it.next();
+    }
+}
diff --git a/src/main/java/com/amazon/carbonado/cursor/JoinedCursorFactory.java b/src/main/java/com/amazon/carbonado/cursor/JoinedCursorFactory.java
new file mode 100644
index 0000000..ae98a95
--- /dev/null
+++ b/src/main/java/com/amazon/carbonado/cursor/JoinedCursorFactory.java
@@ -0,0 +1,459 @@
+/*
+ * Copyright 2006 Amazon Technologies, Inc. or its affiliates.
+ * Amazon, Amazon.com and Carbonado are trademarks or registered trademarks
+ * of Amazon Technologies, Inc. or its affiliates.  All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.amazon.carbonado.cursor;
+
+import java.lang.reflect.Field;
+
+import java.util.Map;
+
+import org.cojen.classfile.ClassFile;
+import org.cojen.classfile.CodeBuilder;
+import org.cojen.classfile.FieldInfo;
+import org.cojen.classfile.Label;
+import org.cojen.classfile.LocalVariable;
+import org.cojen.classfile.MethodInfo;
+import org.cojen.classfile.Modifiers;
+import org.cojen.classfile.TypeDesc;
+
+import org.cojen.util.ClassInjector;
+import org.cojen.util.KeyFactory;
+import org.cojen.util.SoftValuedHashMap;
+import org.cojen.util.WeakIdentityMap;
+
+import com.amazon.carbonado.Cursor;
+import com.amazon.carbonado.FetchException;
+import com.amazon.carbonado.Query;
+import com.amazon.carbonado.Repository;
+import com.amazon.carbonado.RepositoryException;
+import com.amazon.carbonado.Storable;
+import com.amazon.carbonado.Storage;
+import com.amazon.carbonado.SupportException;
+
+import com.amazon.carbonado.info.ChainedProperty;
+import com.amazon.carbonado.info.StorableInfo;
+import com.amazon.carbonado.info.StorableIntrospector;
+import com.amazon.carbonado.info.StorableProperty;
+
+import com.amazon.carbonado.util.QuickConstructorGenerator;
+
+import com.amazon.carbonado.spi.CodeBuilderUtil;
+import static com.amazon.carbonado.spi.CommonMethodNames.*;
+
+/**
+ * Given two joined types <i>A</i> and <i>B</i>, this factory converts a cursor
+ * over type <i>A</i> into a cursor over type <i>B</i>. For example, consider
+ * two storable types, Employer and Person. A query filter for persons with a
+ * given employer might look like: {@code "employer.name = ?" }  The join can be
+ * manually implemented by querying for employers and then using this factory
+ * to produce persons:
+ *
+ * <pre>
+ * JoinedCursorFactory&lt;Employer, Person&gt; factory = new JoinedCursorFactory&lt;Employer, Person&gt;
+ *     (repo, Person.class, "employer", Employer.class);
+ *
+ * Cursor&lt;Employer&gt; employerCursor = repo.storageFor(Employer.class)
+ *     .query("name = ?").with(...).fetch();
+ *
+ * Cursor&lt;Person&gt; personCursor = factory.join(employerCursor);
+ * </pre>
+ *
+ * Chained properties are supported as well. A query filter for persons with an
+ * employer in a given state might look like: {@code "employer.address.state = ?" }
+ * The join can be manually implemented as:
+ *
+ * <pre>
+ * JoinedCursorFactory&lt;Address, Person&gt; factory = new JoinedCursorFactory&lt;Address, Person&gt;
+ *     (repo, Person.class, "employer.address", Address.class);
+ *
+ * Cursor&lt;Address&gt; addressCursor = repo.storageFor(Address.class)
+ *     .query("state = ?").with(...).fetch();
+ *
+ * Cursor&lt;Person&gt; personCursor = factory.join(addressCursor);
+ * </pre>
+ *
+ * @author Brian S O'Neill
+ */
+public class JoinedCursorFactory<A, B extends Storable> {
+    private static final String STORAGE_FIELD_NAME = "storage";
+    private static final String QUERY_FIELD_NAME = "query";
+    private static final String QUERY_FILTER_FIELD_NAME = "queryFilter";
+    private static final String ACTIVE_A_FIELD_NAME = "active";
+
+    private static final Map<Object, Class> cJoinerCursorClassCache;
+
+    static {
+        cJoinerCursorClassCache = new SoftValuedHashMap();
+    }
+
+    private static synchronized <B extends Storable> Joiner<?, B>
+        newBasicJoiner(StorableProperty<B> bToAProperty, Storage<B> bStorage)
+        throws FetchException
+    {
+        Class<?> aType = bToAProperty.getType();
+
+        final Object key = KeyFactory.createKey
+            (new Object[] {aType,
+                           bToAProperty.getEnclosingType(),
+                           bToAProperty.getName()});
+
+        Class clazz = cJoinerCursorClassCache.get(key);
+
+        if (clazz == null) {
+            clazz = generateBasicJoinerCursor(aType, bToAProperty);
+            cJoinerCursorClassCache.put(key, clazz);
+        }
+
+        // Transforming cursor class may need a Query to operate on.
+        Query<B> bQuery = null;
+        try {
+            String filter = (String) clazz.getField(QUERY_FILTER_FIELD_NAME).get(null);
+            bQuery = bStorage.query(filter);
+        } catch (NoSuchFieldException e) {
+        } catch (IllegalAccessException e) {
+        }
+
+        BasicJoiner.Factory<?, B> factory = (BasicJoiner.Factory<?, B>) QuickConstructorGenerator
+            .getInstance(clazz, BasicJoiner.Factory.class);
+
+        return new BasicJoiner(factory, bStorage, bQuery);
+    }
+
+    private static <B extends Storable> Class<Cursor<B>>
+        generateBasicJoinerCursor(Class<?> aType, StorableProperty<B> bToAProperty)
+    {
+        final int propCount = bToAProperty.getJoinElementCount();
+
+        // Determine if join is one-to-one, in which case slightly more optimal
+        // code can be generated.
+        boolean isOneToOne = true;
+        for (int i=0; i<propCount; i++) {
+            if (!bToAProperty.getInternalJoinElement(i).isPrimaryKeyMember()) {
+                isOneToOne = false;
+                break;
+            }
+            if (!bToAProperty.getExternalJoinElement(i).isPrimaryKeyMember()) {
+                isOneToOne = false;
+                break;
+            }
+        }
+
+        Class<B> bType = bToAProperty.getEnclosingType();
+
+        String packageName;
+        {
+            String name = bType.getName();
+            int index = name.lastIndexOf('.');
+            if (index >= 0) {
+                packageName = name.substring(0, index);
+            } else {
+                packageName = "";
+            }
+        }
+
+        ClassLoader loader = bType.getClassLoader();
+
+        ClassInjector ci = ClassInjector.create(packageName + ".JoinedCursor", loader);
+        Class superclass = isOneToOne ? TransformedCursor.class : MultiTransformedCursor.class;
+        ClassFile cf = new ClassFile(ci.getClassName(), superclass);
+        cf.markSynthetic();
+        cf.setSourceFile(JoinedCursorFactory.class.getName());
+        cf.setTarget("1.5");
+
+        final TypeDesc queryType = TypeDesc.forClass(Query.class);
+        final TypeDesc cursorType = TypeDesc.forClass(Cursor.class);
+        final TypeDesc storageType = TypeDesc.forClass(Storage.class);
+        final TypeDesc storableType = TypeDesc.forClass(Storable.class);
+
+        if (isOneToOne) {
+            cf.addField(Modifiers.PRIVATE.toFinal(true), STORAGE_FIELD_NAME, storageType);
+        } else {
+            // Field to hold query which fetches type B.
+            cf.addField(Modifiers.PRIVATE.toFinal(true), QUERY_FIELD_NAME, queryType);
+        }
+
+        boolean canSetAReference = bToAProperty.getWriteMethod() != null;
+
+        if (canSetAReference && !isOneToOne) {
+            // Field to hold active A storable.
+            cf.addField(Modifiers.PRIVATE, ACTIVE_A_FIELD_NAME, TypeDesc.forClass(aType));
+        }
+
+        // Constructor accepts a Storage and Query, but Storage is only used
+        // for one-to-one, and Query is only used for one-to-many.
+        {
+            MethodInfo mi = cf.addConstructor(Modifiers.PUBLIC,
+                                              new TypeDesc[] {cursorType, storageType, queryType});
+            CodeBuilder b = new CodeBuilder(mi);
+
+            b.loadThis();
+            b.loadLocal(b.getParameter(0)); // pass A cursor to superclass
+            b.invokeSuperConstructor(new TypeDesc[] {cursorType});
+
+            if (isOneToOne) {
+                b.loadThis();
+                b.loadLocal(b.getParameter(1)); // push B storage to stack
+                b.storeField(STORAGE_FIELD_NAME, storageType);
+            } else {
+                b.loadThis();
+                b.loadLocal(b.getParameter(2)); // push B query to stack
+                b.storeField(QUERY_FIELD_NAME, queryType);
+            }
+
+            b.returnVoid();
+        }
+
+        // For one-to-many, a query is needed. Save the query filter in a
+        // public static field to be grabbed later.
+        if (!isOneToOne) {
+            StringBuilder queryBuilder = new StringBuilder();
+
+            for (int i=0; i<propCount; i++) {
+                if (i > 0) {
+                    queryBuilder.append(" & ");
+                }
+                queryBuilder.append(bToAProperty.getInternalJoinElement(i).getName());
+                queryBuilder.append(" = ?");
+            }
+
+            FieldInfo fi = cf.addField(Modifiers.PUBLIC.toStatic(true).toFinal(true),
+                                       QUERY_FILTER_FIELD_NAME, TypeDesc.STRING);
+            fi.setConstantValue(queryBuilder.toString());
+        }
+
+        // Implement the transform method.
+        if (isOneToOne) {
+            MethodInfo mi = cf.addMethod(Modifiers.PROTECTED, "transform", TypeDesc.OBJECT,
+                                         new TypeDesc[] {TypeDesc.OBJECT});
+            mi.addException(TypeDesc.forClass(FetchException.class));
+            CodeBuilder b = new CodeBuilder(mi);
+
+            LocalVariable aVar = b.createLocalVariable(null, storableType);
+            b.loadLocal(b.getParameter(0));
+            b.checkCast(TypeDesc.forClass(aType));
+            b.storeLocal(aVar);
+
+            // Prepare B storable.
+            b.loadThis();
+            b.loadField(STORAGE_FIELD_NAME, storageType);
+            b.invokeInterface(storageType, PREPARE_METHOD_NAME, storableType, null);
+            LocalVariable bVar = b.createLocalVariable(null, storableType);
+            b.checkCast(TypeDesc.forClass(bType));
+            b.storeLocal(bVar);
+
+            // Copy pk property values from A to B.
+            for (int i=0; i<propCount; i++) {
+                StorableProperty<B> internal = bToAProperty.getInternalJoinElement(i);
+                StorableProperty<?> external = bToAProperty.getExternalJoinElement(i);
+
+                b.loadLocal(bVar);
+                b.loadLocal(aVar);
+                b.invoke(external.getReadMethod());
+                b.invoke(internal.getWriteMethod());
+            }
+
+            // tryLoad b.
+            b.loadLocal(bVar);
+            b.invokeInterface(storableType, TRY_LOAD_METHOD_NAME, TypeDesc.BOOLEAN, null);
+            Label wasLoaded = b.createLabel();
+            b.ifZeroComparisonBranch(wasLoaded, "!=");
+
+            b.loadNull();
+            b.returnValue(storableType);
+
+            wasLoaded.setLocation();
+
+            if (canSetAReference) {
+                b.loadLocal(bVar);
+                b.loadLocal(aVar);
+                b.invoke(bToAProperty.getWriteMethod());
+            }
+
+            b.loadLocal(bVar);
+            b.returnValue(storableType);
+        } else {
+            MethodInfo mi = cf.addMethod(Modifiers.PROTECTED, "transform", cursorType,
+                                         new TypeDesc[] {TypeDesc.OBJECT});
+            mi.addException(TypeDesc.forClass(FetchException.class));
+            CodeBuilder b = new CodeBuilder(mi);
+
+            LocalVariable aVar = b.createLocalVariable(null, storableType);
+            b.loadLocal(b.getParameter(0));
+            b.checkCast(TypeDesc.forClass(aType));
+            b.storeLocal(aVar);
+
+            if (canSetAReference) {
+                b.loadThis();
+                b.loadLocal(aVar);
+                b.storeField(ACTIVE_A_FIELD_NAME, TypeDesc.forClass(aType));
+            }
+
+            // Populate query parameters.
+            b.loadThis();
+            b.loadField(QUERY_FIELD_NAME, queryType);
+
+            for (int i=0; i<propCount; i++) {
+                StorableProperty<?> external = bToAProperty.getExternalJoinElement(i);
+                b.loadLocal(aVar);
+                b.invoke(external.getReadMethod());
+
+                TypeDesc bindType = CodeBuilderUtil.bindQueryParam(external.getType());
+                CodeBuilderUtil.convertValue(b, external.getType(), bindType.toClass());
+                b.invokeInterface(queryType, WITH_METHOD_NAME, queryType,
+                                  new TypeDesc[] {bindType});
+            }
+
+            // Now fetch and return.
+            b.invokeInterface(queryType, FETCH_METHOD_NAME, cursorType, null);
+            b.returnValue(cursorType);
+        }
+
+        if (canSetAReference && !isOneToOne) {
+            // Override the "next" method to set A object on B.
+            MethodInfo mi = cf.addMethod(Modifiers.PUBLIC, "next", TypeDesc.OBJECT, null);
+            mi.addException(TypeDesc.forClass(FetchException.class));
+            CodeBuilder b = new CodeBuilder(mi);
+
+            b.loadThis();
+            b.invokeSuper(TypeDesc.forClass(MultiTransformedCursor.class),
+                          "next", TypeDesc.OBJECT, null);
+            b.checkCast(TypeDesc.forClass(bType));
+            b.dup();
+
+            b.loadThis();
+            b.loadField(ACTIVE_A_FIELD_NAME, TypeDesc.forClass(aType));
+            b.invoke(bToAProperty.getWriteMethod());
+
+            b.returnValue(storableType);
+        }
+
+        return (Class<Cursor<B>>) ci.defineClass(cf);
+    }
+
+    private final Joiner<A, B> mJoiner;
+
+    /**
+     * @param repo access to storage instances for properties
+     * @param bType type of <i>B</i> instances
+     * @param bToAProperty property of <i>B</i> type which maps to instances of
+     * <i>A</i> type.
+     * @param aType type of <i>A</i> instances
+     * @throws IllegalArgumentException if property type is not <i>A</i>
+     */
+    public JoinedCursorFactory(Repository repo,
+                               Class<B> bType,
+                               String bToAProperty,
+                               Class<A> aType)
+        throws SupportException, FetchException, RepositoryException
+    {
+        this(repo,
+             ChainedProperty.parse(StorableIntrospector.examine(bType), bToAProperty),
+             aType);
+    }
+
+    /**
+     * @param repo access to storage instances for properties
+     * @param bToAProperty property of <i>B</i> type which maps to instances of
+     * <i>A</i> type.
+     * @param aType type of <i>A</i> instances
+     * @throws IllegalArgumentException if property type is not <i>A</i>
+     */
+    public JoinedCursorFactory(Repository repo,
+                               ChainedProperty<B> bToAProperty,
+                               Class<A> aType)
+        throws SupportException, FetchException, RepositoryException
+    {
+        if (bToAProperty.getType() != aType) {
+            throw new IllegalArgumentException
+                ("Property is not of type \"" + aType.getName() + "\": " +
+                 bToAProperty);
+        }
+
+        StorableProperty<B> primeB = bToAProperty.getPrimeProperty();
+        Storage<B> primeBStorage = repo.storageFor(primeB.getEnclosingType());
+
+        Joiner joiner = newBasicJoiner(primeB, primeBStorage);
+
+        int chainCount = bToAProperty.getChainCount();
+        for (int i=0; i<chainCount; i++) {
+            StorableProperty prop = bToAProperty.getChainedProperty(i);
+            Storage storage = repo.storageFor(prop.getEnclosingType());
+
+            joiner = new MultiJoiner(newBasicJoiner(prop, storage), joiner);
+        }
+
+        mJoiner = (Joiner<A, B>) joiner;
+    }
+
+    /**
+     * Given a cursor over <i>A</i>, returns a new cursor over joined property
+     * of type <i>B</i>.
+     */
+    public Cursor<B> join(Cursor<A> cursor) {
+        return mJoiner.join(cursor);
+    }
+
+    private static interface Joiner<A, B extends Storable> {
+        Cursor<B> join(Cursor<A> cursor);
+    }
+
+    /**
+     * Support for joins without an intermediate hop.
+     */
+    private static class BasicJoiner<A, B extends Storable> implements Joiner<A, B> {
+        private final Factory<A, B> mJoinerFactory;
+        private final Storage<B> mBStorage;
+        private final Query<B> mBQuery;
+
+        BasicJoiner(Factory<A, B> factory, Storage<B> bStorage, Query<B> bQuery) {
+            mJoinerFactory = factory;
+            mBStorage = bStorage;
+            mBQuery = bQuery;
+        }
+
+        public Cursor<B> join(Cursor<A> cursor) {
+            return mJoinerFactory.newJoinedCursor(cursor, mBStorage, mBQuery);
+        }
+
+        /**
+         * Needs to be public for {@link QuickConstructorGenerator}.
+         */
+        public static interface Factory<A, B extends Storable> {
+            Cursor<B> newJoinedCursor(Cursor<A> cursor, Storage<B> bStorage, Query<B> bQuery);
+        }
+    }
+
+    /**
+     * Support for joins with an intermediate hop -- multi-way joins.
+     */
+    private static class MultiJoiner<A, X extends Storable, B extends Storable>
+        implements Joiner<A, B>
+    {
+        private final Joiner<A, X> mAToMid;
+        private final Joiner<X, B> mMidToB;
+
+        MultiJoiner(Joiner<A, X> aToMidJoiner, Joiner<X, B> midToBJoiner) {
+            mAToMid = aToMidJoiner;
+            mMidToB = midToBJoiner;
+        }
+
+        public Cursor<B> join(Cursor<A> cursor) {
+            return mMidToB.join(mAToMid.join(cursor));
+        }
+    }
+}
diff --git a/src/main/java/com/amazon/carbonado/cursor/MergeSortBuffer.java b/src/main/java/com/amazon/carbonado/cursor/MergeSortBuffer.java
new file mode 100644
index 0000000..0094183
--- /dev/null
+++ b/src/main/java/com/amazon/carbonado/cursor/MergeSortBuffer.java
@@ -0,0 +1,498 @@
+/*
+ * Copyright 2006 Amazon Technologies, Inc. or its affiliates.
+ * Amazon, Amazon.com and Carbonado are trademarks or registered trademarks
+ * of Amazon Technologies, Inc. or its affiliates.  All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.amazon.carbonado.cursor;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+
+import java.lang.reflect.UndeclaredThrowableException;
+
+import java.util.AbstractCollection;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.PriorityQueue;
+
+import com.amazon.carbonado.FetchInterruptedException;
+import com.amazon.carbonado.Storable;
+import com.amazon.carbonado.Storage;
+import com.amazon.carbonado.SupportException;
+
+import com.amazon.carbonado.spi.RAFInputStream;
+import com.amazon.carbonado.spi.RAFOutputStream;
+import com.amazon.carbonado.spi.StorableSerializer;
+
+/**
+ * Sort buffer implemented via a merge sort algorithm. If there are too many
+ * storables to fit in the reserved memory buffer, they are sorted and
+ * serialized to temporary files.
+ *
+ * @author Brian S O'Neill
+ * @see SortedCursor
+ */
+public class MergeSortBuffer<S extends Storable> extends AbstractCollection<S>
+    implements SortBuffer<S>
+{
+    private static final int MIN_ARRAY_CAPACITY = 64;
+
+    // Bigger means better performance, but more memory is used.
+    private static final int MAX_ARRAY_CAPACITY = 8192;
+
+    // Bigger means better performance, but more file handles may be used.
+    private static final int MAX_FILE_COUNT = 100;
+
+    // Bigger may improve write performance, but not by much.
+    private static final int OUTPUT_BUFFER_SIZE = 10000;
+
+    private final Storage<S> mStorage;
+    private final String mTempDir;
+    private final int mMaxArrayCapacity;
+
+    private S[] mElements;
+    private int mSize;
+    private int mTotalSize;
+
+    private StorableSerializer<S> mSerializer;
+    private WorkFilePool mWorkFilePool;
+    private List<RandomAccessFile> mFilesInUse;
+
+    private Comparator<S> mComparator;
+
+    private volatile boolean mStop;
+
+    /**
+     * @param storage storage type of elements
+     */
+    public MergeSortBuffer(Storage<S> storage) {
+        this(storage, null, MAX_ARRAY_CAPACITY);
+    }
+
+    /**
+     * @param storage storage type of elements
+     * @param tempDir directory to store temp files for merging, or null for default
+     */
+    public MergeSortBuffer(Storage<S> storage, String tempDir) {
+        this(storage, tempDir, MAX_ARRAY_CAPACITY);
+    }
+
+    /**
+     * @param storage storage type of elements
+     * @param tempDir directory to store temp files for merging, or null for default
+     * @param maxArrayCapacity maximum amount of storables to keep in an array
+     * before serializing to a file
+     */
+    @SuppressWarnings("unchecked")
+    public MergeSortBuffer(Storage<S> storage, String tempDir, int maxArrayCapacity) {
+        mStorage = storage;
+        mTempDir = tempDir;
+        mMaxArrayCapacity = maxArrayCapacity;
+        int cap = Math.min(MIN_ARRAY_CAPACITY, maxArrayCapacity);
+        mElements = (S[]) new Storable[cap];
+    }
+
+    public void prepare(Comparator<S> comparator) {
+        if (comparator == null) {
+            throw new IllegalArgumentException();
+        }
+        clear();
+        mComparator = comparator;
+    }
+
+    public boolean add(S storable) {
+        arrayPrep:
+        if (mSize >= mElements.length) {
+            if (mElements.length < mMaxArrayCapacity) {
+                // Increase array capacity.
+                int newCap = mElements.length * 2;
+                if (newCap > mMaxArrayCapacity) {
+                    newCap = mMaxArrayCapacity;
+                }
+                S[] newElements = (S[]) new Storable[newCap];
+                System.arraycopy(mElements, 0, newElements, 0, mElements.length);
+                mElements = newElements;
+                break arrayPrep;
+            }
+
+            // Sort current in-memory results and serialize to a temp file.
+
+            // Make sure everything is set up to use temp files.
+            {
+                if (mSerializer == null) {
+                    try {
+                        mSerializer = StorableSerializer.forType(mStorage.getStorableType());
+                    } catch (SupportException e) {
+                        throw new UndeclaredThrowableException(e);
+                    }
+                    mWorkFilePool = WorkFilePool.getInstance(mTempDir);
+                    mFilesInUse = new ArrayList<RandomAccessFile>();
+                }
+            }
+
+            Arrays.sort(mElements, mComparator);
+
+            RandomAccessFile raf;
+            try {
+                raf = mWorkFilePool.acquireWorkFile(this);
+                OutputStream out =
+                    new BufferedOutputStream(new RAFOutputStream(raf), OUTPUT_BUFFER_SIZE);
+
+                StorableSerializer<S> serializer = mSerializer;
+
+                if (mFilesInUse.size() < (MAX_FILE_COUNT - 1)) {
+                    mFilesInUse.add(raf);
+                    int count = 0;
+                    for (S element : mElements) {
+                        // Check every so often if interrupted.
+                        interruptCheck(++count);
+                        serializer.write(element, out);
+                    }
+                } else {
+                    // Merge files together.
+
+                    // Determine the average length per file in use.
+                    long totalLength = 0;
+                    int fileCount = mFilesInUse.size();
+                    for (int i=0; i<fileCount; i++) {
+                        totalLength += mFilesInUse.get(i).length();
+                    }
+
+                    // Compute average with ceiling rounding mode.
+                    long averageLength = (totalLength + fileCount) / fileCount;
+
+                    // For any file whose length is above average, don't merge
+                    // it. The goal is to evenly distribute file growth.
+
+                    List<RandomAccessFile> filesToExclude = new ArrayList<RandomAccessFile>();
+                    List<RandomAccessFile> filesToMerge = new ArrayList<RandomAccessFile>();
+
+                    long mergedLength = 0;
+                    for (int i=0; i<fileCount; i++) {
+                        RandomAccessFile fileInUse = mFilesInUse.get(i);
+                        long fileLength = fileInUse.length();
+                        if (fileLength > averageLength) {
+                            filesToExclude.add(fileInUse);
+                        } else {
+                            filesToMerge.add(fileInUse);
+                            mergedLength += fileLength;
+                        }
+                    }
+
+                    mFilesInUse.add(raf);
+
+                    // Pre-allocate space, in an attempt to improve performance
+                    // as well as error out earlier, should the disk be full.
+                    raf.setLength(mergedLength);
+
+                    int count = 0;
+                    Iterator<S> it = iterator(filesToMerge);
+                    while (it.hasNext()) {
+                        // Check every so often if interrupted.
+                        interruptCheck(++count);
+                        S element = it.next();
+                        serializer.write(element, out);
+                    }
+
+                    mWorkFilePool.releaseWorkFiles(filesToMerge);
+                    mFilesInUse = filesToExclude;
+                    mFilesInUse.add(raf);
+                }
+
+                out.flush();
+
+                // Truncate any data from last time file was used.
+                raf.setLength(raf.getFilePointer());
+                // Reset to start of file in preparation for reading later.
+                raf.seek(0);
+            } catch (IOException e) {
+                throw new UndeclaredThrowableException(e);
+            }
+
+            mSize = 0;
+        }
+
+        mElements[mSize++] = storable;
+        mTotalSize++;
+        return true;
+    }
+
+    public int size() {
+        return mTotalSize;
+    }
+
+    public Iterator<S> iterator() {
+        return iterator(mFilesInUse);
+    }
+
+    private Iterator<S> iterator(List<RandomAccessFile> filesToMerge) {
+        if (mSerializer == null) {
+            return new ObjectArrayIterator<S>(mElements, 0, mSize);
+        }
+
+        // Merge with the files. Use a priority queue to decide which is the
+        // next buffer to pull an element from.
+
+        PriorityQueue<Iter<S>> pq = new PriorityQueue<Iter<S>>(1 + mFilesInUse.size());
+        pq.add(new ArrayIter<S>(mComparator, mElements, mSize));
+        for (RandomAccessFile raf : filesToMerge) {
+            try {
+                raf.seek(0);
+            } catch (IOException e) {
+                throw new UndeclaredThrowableException(e);
+            }
+
+            InputStream in = new BufferedInputStream(new RAFInputStream(raf));
+
+            pq.add(new InputIter<S>(mComparator, mSerializer, mStorage, in));
+        }
+
+        return new Merger<S>(pq);
+    }
+
+    public void clear() {
+        if (mTotalSize > 0) {
+            mSize = 0;
+            mTotalSize = 0;
+            if (mWorkFilePool != null && mFilesInUse != null) {
+                mWorkFilePool.releaseWorkFiles(mFilesInUse);
+                mFilesInUse.clear();
+            }
+        }
+    }
+
+    public void sort() {
+        // Sort current in-memory results. Anything residing in files has
+        // already been sorted.
+        if (mComparator == null) {
+            throw new IllegalStateException("Buffer was not prepared");
+        }
+        Arrays.sort(mElements, 0, mSize, mComparator);
+    }
+
+    public void close() {
+        clear();
+        if (mWorkFilePool != null) {
+            mWorkFilePool.unregisterWorkFileUser(this);
+        }
+    }
+
+    void stop() {
+        mStop = true;
+    }
+
+    private void interruptCheck(int count) {
+        if ((count & ~0xff) == 0 && (mStop || Thread.interrupted())) {
+            close();
+            throw new UndeclaredThrowableException(new FetchInterruptedException());
+        }
+    }
+
+    /**
+     * Simple interator interface that supports peeking at next element.
+     */
+    private abstract static class Iter<S extends Storable> implements Comparable<Iter<S>> {
+        private final Comparator<S> mComparator;
+
+        protected Iter(Comparator<S> comparator) {
+            mComparator = comparator;
+        }
+
+        /**
+         * Returns null if iterator is exhausted.
+         */
+        abstract S peek();
+
+        /**
+         * Returns null if iterator is exhausted.
+         */
+        abstract S next();
+
+        public int compareTo(Iter<S> iter) {
+            S thisPeek = peek();
+            S thatPeek = iter.peek();
+            if (thisPeek == null) {
+                if (thatPeek == null) {
+                    return 0;
+                }
+                // Null is low in order to rise to top of priority queue. This
+                // Iter will then be tossed out of the priority queue.
+                return -1;
+            } else if (thatPeek == null) {
+                return 1;
+            }
+            return mComparator.compare(thisPeek, thatPeek);
+        }
+    }
+
+    /**
+     * Iterator that reads from an array.
+     */
+    private static class ArrayIter<S extends Storable> extends Iter<S> {
+        private final S[] mArray;
+        private final int mSize;
+        private int mPos;
+
+        ArrayIter(Comparator<S> comparator, S[] array, int size) {
+            super(comparator);
+            mArray = array;
+            mSize = size;
+        }
+
+        S peek() {
+            int pos = mPos;
+            if (pos >= mSize) {
+                return null;
+            }
+            return mArray[pos];
+        }
+
+        S next() {
+            int pos = mPos;
+            if (pos >= mSize) {
+                return null;
+            }
+            S next = mArray[pos];
+            mPos = pos + 1;
+            return next;
+        }
+    }
+
+    /**
+     * Iterator that reads from an input stream of serialized Storables.
+     */
+    private static class InputIter<S extends Storable> extends Iter<S> {
+        private StorableSerializer<S> mSerializer;
+        private Storage<S> mStorage;
+        private InputStream mIn;
+
+        private S mNext;
+
+        InputIter(Comparator<S> comparator,
+                  StorableSerializer<S> serializer, Storage<S> storage, InputStream in)
+        {
+            super(comparator);
+            mSerializer = serializer;
+            mStorage = storage;
+            mIn = in;
+        }
+
+        S peek() {
+            if (mNext != null) {
+                return mNext;
+            }
+            if (mIn != null) {
+                try {
+                    mNext = mSerializer.read(mStorage, mIn);
+                    // TODO: Serializer is unable to determine state of
+                    // properties, and so they are lost. Since these storables
+                    // came directly from a cursor, we know they are clean.
+                    mNext.markAllPropertiesClean();
+                } catch (EOFException e) {
+                    mIn = null;
+                } catch (IOException e) {
+                    throw new UndeclaredThrowableException(e);
+                }
+            }
+            return mNext;
+        }
+
+        S next() {
+            S next = peek();
+            mNext = null;
+            return next;
+        }
+    }
+
+    private static class Merger<S extends Storable> implements Iterator<S> {
+        private final PriorityQueue<Iter<S>> mPQ;
+
+        private S mNext;
+
+        Merger(PriorityQueue<Iter<S>> pq) {
+            mPQ = pq;
+        }
+
+        public boolean hasNext() {
+            if (mNext == null) {
+                while (true) {
+                    Iter<S> iter = mPQ.poll();
+                    if (iter == null) {
+                        return false;
+                    }
+                    if ((mNext = iter.next()) != null) {
+                        // Iter is not exhausted, so put it back in to be used
+                        // again. Adding it back causes it to be inserted in
+                        // the proper order, based on the next element it has
+                        // to offer.
+                        mPQ.add(iter);
+                        return true;
+                    }
+                }
+            }
+            return true;
+        }
+
+        public S next() {
+            if (hasNext()) {
+                S next = mNext;
+                mNext = null;
+                return next;
+            }
+            throw new NoSuchElementException();
+        }
+
+        public void remove() {
+            throw new UnsupportedOperationException();
+        }
+    }
+
+    private static class ObjectArrayIterator<E> implements Iterator<E> {
+        private final E[] mElements;
+        private final int mEnd;
+        private int mIndex;
+
+        public ObjectArrayIterator(E[] elements, int start, int end) {
+            mElements = elements;
+            mEnd = end;
+            mIndex = start;
+        }
+
+        public boolean hasNext() {
+            return mIndex < mEnd;
+        }
+
+        public E next() {
+            if (mIndex >= mEnd) {
+                throw new NoSuchElementException();
+            }
+            return mElements[mIndex++];
+        }
+
+        public void remove() {
+            throw new UnsupportedOperationException();
+        }
+    }
+}
diff --git a/src/main/java/com/amazon/carbonado/cursor/MultiTransformedCursor.java b/src/main/java/com/amazon/carbonado/cursor/MultiTransformedCursor.java
new file mode 100644
index 0000000..0cfd197
--- /dev/null
+++ b/src/main/java/com/amazon/carbonado/cursor/MultiTransformedCursor.java
@@ -0,0 +1,132 @@
+/*
+ * Copyright 2006 Amazon Technologies, Inc. or its affiliates.
+ * Amazon, Amazon.com and Carbonado are trademarks or registered trademarks
+ * of Amazon Technologies, Inc. or its affiliates.  All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.amazon.carbonado.cursor;
+
+import java.util.NoSuchElementException;
+
+import com.amazon.carbonado.Cursor;
+import com.amazon.carbonado.FetchException;
+import com.amazon.carbonado.FetchInterruptedException;
+
+/**
+ * Abstract cursor which wraps another cursor and transforms each storable
+ * result into a set of target storables. This class can be used for
+ * implementing one-to-many joins. Use {@link TransformedCursor} for one-to-one
+ * joins.
+ *
+ * @author Brian S O'Neill
+ */
+public abstract class MultiTransformedCursor<S, T> extends AbstractCursor<T> {
+    private final Cursor<S> mCursor;
+
+    private Cursor<T> mNextCursor;
+
+    protected MultiTransformedCursor(Cursor<S> cursor) {
+        if (cursor == null) {
+            throw new IllegalArgumentException();
+        }
+        mCursor = cursor;
+    }
+
+    /**
+     * This method must be implemented to transform storables. If the storable
+     * cannot be transformed, either throw a FetchException or return null. If
+     * null is returned, the storable is simply filtered out.
+     *
+     * @return transformed storables, or null to filter it out
+     */
+    protected abstract Cursor<T> transform(S storable) throws FetchException;
+
+    public void close() throws FetchException {
+        synchronized (mCursor) {
+            mCursor.close();
+            if (mNextCursor != null) {
+                mNextCursor.close();
+                mNextCursor = null;
+            }
+        }
+    }
+
+    public boolean hasNext() throws FetchException {
+        synchronized (mCursor) {
+            if (mNextCursor != null) {
+                if (mNextCursor.hasNext()) {
+                    return true;
+                }
+                mNextCursor.close();
+                mNextCursor = null;
+            }
+            try {
+                int count = 0;
+                while (mCursor.hasNext()) {
+                    Cursor<T> nextCursor = transform(mCursor.next());
+                    if (nextCursor != null) {
+                        if (nextCursor.hasNext()) {
+                            mNextCursor = nextCursor;
+                            return true;
+                        }
+                        nextCursor.close();
+                    }
+                    interruptCheck(++count);
+                }
+            } catch (NoSuchElementException e) {
+            }
+            return false;
+        }
+    }
+
+    public T next() throws FetchException {
+        synchronized (mCursor) {
+            if (hasNext()) {
+                return mNextCursor.next();
+            }
+            throw new NoSuchElementException();
+        }
+    }
+
+    public int skipNext(int amount) throws FetchException {
+        synchronized (mCursor) {
+            if (amount <= 0) {
+                if (amount < 0) {
+                    throw new IllegalArgumentException("Cannot skip negative amount: " + amount);
+                }
+                return 0;
+            }
+
+            int count = 0;
+            while (hasNext()) {
+                int chunk = mNextCursor.skipNext(amount);
+                count += chunk;
+                if ((amount -= chunk) <= 0) {
+                    break;
+                }
+                interruptCheck(count);
+            }
+
+            return count;
+        }
+    }
+
+    private void interruptCheck(int count) throws FetchException {
+        if ((count & ~0xff) == 0 && Thread.interrupted()) {
+            close();
+            throw new FetchInterruptedException();
+        }
+    }
+}
diff --git a/src/main/java/com/amazon/carbonado/cursor/SortBuffer.java b/src/main/java/com/amazon/carbonado/cursor/SortBuffer.java
new file mode 100644
index 0000000..089488c
--- /dev/null
+++ b/src/main/java/com/amazon/carbonado/cursor/SortBuffer.java
@@ -0,0 +1,53 @@
+/*
+ * Copyright 2006 Amazon Technologies, Inc. or its affiliates.
+ * Amazon, Amazon.com and Carbonado are trademarks or registered trademarks
+ * of Amazon Technologies, Inc. or its affiliates.  All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.amazon.carbonado.cursor;
+
+import java.util.Comparator;
+import java.util.Collection;
+
+import com.amazon.carbonado.FetchException;
+
+/**
+ * Buffers up Storable instances allowing them to be sorted. Should any method
+ * need to throw an undeclared exception, wrap it with an
+ * UndeclaredThrowableException.
+ *
+ * @author Brian S O'Neill
+ * @see SortedCursor
+ */
+public interface SortBuffer<S> extends Collection<S> {
+    /**
+     * Clears buffer and assigns a comparator for sorting.
+     *
+     * @throws IllegalArgumentException if comparator is null
+     */
+    void prepare(Comparator<S> comparator);
+
+    /**
+     * Finish sorting buffer.
+     *
+     * @throws IllegalStateException if prepare was never called
+     */
+    void sort() throws FetchException;
+
+    /**
+     * Clear and close buffer.
+     */
+    void close() throws FetchException;
+}
diff --git a/src/main/java/com/amazon/carbonado/cursor/SortedCursor.java b/src/main/java/com/amazon/carbonado/cursor/SortedCursor.java
new file mode 100644
index 0000000..7a728bb
--- /dev/null
+++ b/src/main/java/com/amazon/carbonado/cursor/SortedCursor.java
@@ -0,0 +1,332 @@
+/*
+ * Copyright 2006 Amazon Technologies, Inc. or its affiliates.
+ * Amazon, Amazon.com and Carbonado are trademarks or registered trademarks
+ * of Amazon Technologies, Inc. or its affiliates.  All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.amazon.carbonado.cursor;
+
+import java.lang.reflect.UndeclaredThrowableException;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+import org.cojen.util.BeanComparator;
+
+import com.amazon.carbonado.FetchException;
+import com.amazon.carbonado.FetchInterruptedException;
+import com.amazon.carbonado.Cursor;
+import com.amazon.carbonado.Storable;
+
+import com.amazon.carbonado.info.Direction;
+import com.amazon.carbonado.info.OrderedProperty;
+
+/**
+ * Wraps another Cursor and ensures the results are sorted. If the elements in
+ * the source cursor are already partially sorted, a handled comparator can be
+ * passed in which specifies the partial ordering. Elements are then processed
+ * in smaller chunks rather than sorting the entire set. The handled comparator
+ * can represent ascending or descending order of source elements.
+ *
+ * @author Brian S O'Neill
+ */
+public class SortedCursor<S> extends AbstractCursor<S> {
+    /**
+     * Convenience method to create a comparator which orders storables by the
+     * given order-by properties. The property names may be prefixed with '+'
+     * or '-' to indicate ascending or descending order. If the prefix is
+     * omitted, ascending order is assumed.
+     *
+     * @param type type of storable to create comparator for
+     * @param orderProperties list of properties to order by
+     * @throws IllegalArgumentException if any property is null or not a member
+     * of storable type
+     */
+    public static <S> Comparator<S> createComparator(Class<S> type, String... orderProperties) {
+        BeanComparator bc = BeanComparator.forClass(type);
+        for (String property : orderProperties) {
+            bc = bc.orderBy(property);
+            bc = bc.caseSensitive();
+        }
+        return bc;
+    }
+
+    /**
+     * Convenience method to create a comparator which orders storables by the
+     * given properties.
+     *
+     * @param properties list of properties to order by
+     * @throws IllegalArgumentException if no properties or if any property is null
+     */
+    public static <S extends Storable> Comparator<S>
+        createComparator(OrderedProperty<S>... properties)
+    {
+        if (properties == null || properties.length == 0 || properties[0] == null) {
+            throw new IllegalArgumentException();
+        }
+
+        Class<S> type = properties[0].getChainedProperty().getPrimeProperty().getEnclosingType();
+
+        BeanComparator bc = BeanComparator.forClass(type);
+
+        for (OrderedProperty<S> property : properties) {
+            if (property == null) {
+                throw new IllegalArgumentException();
+            }
+            bc = bc.orderBy(property.getChainedProperty().toString());
+            bc = bc.caseSensitive();
+            if (property.getDirection() == Direction.DESCENDING) {
+                bc = bc.reverse();
+            }
+        }
+
+        return bc;
+    }
+
+    /**
+     * Convenience method to create a comparator which orders storables by the
+     * given properties.
+     *
+     * @param properties list of properties to order by
+     * @throws IllegalArgumentException if no properties or if any property is null
+     */
+    public static <S extends Storable> Comparator<S>
+        createComparator(List<OrderedProperty<S>> properties)
+    {
+        if (properties == null || properties.size() == 0 || properties.get(0) == null) {
+            throw new IllegalArgumentException();
+        }
+
+        Class<S> type =
+            properties.get(0).getChainedProperty().getPrimeProperty().getEnclosingType();
+
+        BeanComparator bc = BeanComparator.forClass(type);
+
+        for (OrderedProperty<S> property : properties) {
+            if (property == null) {
+                throw new IllegalArgumentException();
+            }
+            bc = bc.orderBy(property.getChainedProperty().toString());
+            bc = bc.caseSensitive();
+            if (property.getDirection() == Direction.DESCENDING) {
+                bc = bc.reverse();
+            }
+        }
+
+        return bc;
+    }
+
+    /** Wrapped cursor */
+    private final Cursor<S> mCursor;
+    /** Buffer to store and sort results */
+    private final SortBuffer<S> mChunkBuffer;
+    /** Optional comparator which matches ordering already handled by wrapped cursor */
+    private final Comparator<S> mChunkMatcher;
+    /** Comparator to use for sorting chunks */
+    private final Comparator<S> mChunkSorter;
+
+    /** Iteration over current contents in mChunkBuffer */
+    private Iterator<S> mChunkIterator;
+
+    /**
+     * First record in a chunk, according to chunk matcher. In order to tell if
+     * the next chunk has been reached, a record has to be read from the
+     * wrapped cursor. The record is "pushed back" here for use when the next
+     * chunk is ready to be processed.
+     */
+    private S mNextChunkStart;
+
+    /**
+     * @param cursor cursor to wrap
+     * @param buffer required buffer to hold results
+     * @param handled optional comparator which represents how the results are
+     * already sorted
+     * @param finisher required comparator which finishes the sort
+     */
+    public SortedCursor(Cursor<S> cursor, SortBuffer<S> buffer,
+                        Comparator<S> handled, Comparator<S> finisher) {
+        if (cursor == null || finisher == null) {
+            throw new IllegalArgumentException();
+        }
+        mCursor = cursor;
+        mChunkBuffer =  buffer;
+        mChunkMatcher = handled;
+        mChunkSorter = finisher;
+    }
+
+    /**
+     * @param cursor cursor to wrap
+     * @param buffer required buffer to hold results
+     * @param type type of storable to create cursor for
+     * @param orderProperties list of properties to order by
+     * @throws IllegalArgumentException if any property is null or not a member
+     * of storable type
+     */
+    public SortedCursor(Cursor<S> cursor, SortBuffer<S> buffer,
+                        Class<S> type, String... orderProperties) {
+        this(cursor, buffer, null, createComparator(type, orderProperties));
+    }
+
+    /**
+     * Returns a comparator representing the effective sort order of this cursor.
+     */
+    public Comparator<S> comparator() {
+        if (mChunkMatcher == null) {
+            return mChunkSorter;
+        }
+        return new Comparator<S>() {
+            public int compare(S a, S b) {
+                int result = mChunkMatcher.compare(a, b);
+                if (result == 0) {
+                    result = mChunkSorter.compare(a, b);
+                }
+                return result;
+            }
+        };
+    }
+
+    public synchronized void close() throws FetchException {
+        mCursor.close();
+        mChunkIterator = null;
+        mChunkBuffer.close();
+    }
+
+    public synchronized boolean hasNext() throws FetchException {
+        prepareNextChunk();
+        try {
+            if (mChunkIterator.hasNext()) {
+                return true;
+            }
+        } catch (UndeclaredThrowableException e) {
+            throw toFetchException(e);
+        }
+        close();
+        return false;
+    }
+
+    public synchronized S next() throws FetchException {
+        prepareNextChunk();
+        try {
+            return mChunkIterator.next();
+        } catch (UndeclaredThrowableException e) {
+            throw toFetchException(e);
+        } catch (NoSuchElementException e) {
+            try {
+                close();
+            } catch (FetchException e2) {
+                // Don't care.
+            }
+            throw e;
+        }
+    }
+
+    public synchronized int skipNext(int amount) throws FetchException {
+        if (amount <= 0) {
+            if (amount < 0) {
+                throw new IllegalArgumentException("Cannot skip negative amount: " + amount);
+            }
+            return 0;
+        }
+
+        int count = 0;
+        while (--amount >= 0 && hasNext()) {
+            next();
+            count++;
+        }
+
+        return count;
+    }
+
+    private void prepareNextChunk() throws FetchException {
+        if (mChunkIterator != null && (mChunkMatcher == null || mChunkIterator.hasNext())) {
+            // Ready to go.
+            return;
+        }
+
+        Cursor<S> cursor = mCursor;
+        SortBuffer<S> buffer = mChunkBuffer;
+        Comparator<S> matcher = mChunkMatcher;
+
+        try {
+            mChunkIterator = null;
+            buffer.prepare(mChunkSorter);
+
+            fill: {
+                if (matcher == null) {
+                    // Buffer up entire results and sort.
+                    int count = 0;
+                    while (cursor.hasNext()) {
+                        // Check every so often if interrupted.
+                        if ((++count & ~0xff) == 0 && Thread.interrupted()) {
+                            throw new FetchInterruptedException();
+                        }
+                        buffer.add(cursor.next());
+                    }
+                    break fill;
+                }
+
+                // Read a chunk into the buffer. First record is compared against
+                // subsequent records, to determine when the chunk end is reached.
+
+                S chunkStart;
+                if (mNextChunkStart != null) {
+                    chunkStart = mNextChunkStart;
+                    mNextChunkStart = null;
+                } else if (cursor.hasNext()) {
+                    chunkStart = cursor.next();
+                } else {
+                    break fill;
+                }
+
+                buffer.add(chunkStart);
+                int count = 1;
+
+                while (cursor.hasNext()) {
+                    // Check every so often if interrupted.
+                    if ((++count & ~0xff) == 0 && Thread.interrupted()) {
+                        throw new FetchInterruptedException();
+                    }
+                    S next = cursor.next();
+                    if (matcher.compare(chunkStart, next) != 0) {
+                        // Save for reading next chunk later.
+                        mNextChunkStart = next;
+                        break;
+                    }
+                    buffer.add(next);
+                }
+            }
+
+            if (buffer.size() > 1) {
+                buffer.sort();
+            }
+
+            mChunkIterator = buffer.iterator();
+        } catch (UndeclaredThrowableException e) {
+            throw toFetchException(e);
+        }
+    }
+
+    private FetchException toFetchException(UndeclaredThrowableException e) {
+        Throwable cause = e.getCause();
+        if (cause == null) {
+            cause = e;
+        }
+        if (cause instanceof FetchException) {
+            return (FetchException) cause;
+        }
+        return new FetchException(null, cause);
+    }
+}
diff --git a/src/main/java/com/amazon/carbonado/cursor/SymmetricDifferenceCursor.java b/src/main/java/com/amazon/carbonado/cursor/SymmetricDifferenceCursor.java
new file mode 100644
index 0000000..6851464
--- /dev/null
+++ b/src/main/java/com/amazon/carbonado/cursor/SymmetricDifferenceCursor.java
@@ -0,0 +1,123 @@
+/*
+ * Copyright 2006 Amazon Technologies, Inc. or its affiliates.
+ * Amazon, Amazon.com and Carbonado are trademarks or registered trademarks
+ * of Amazon Technologies, Inc. or its affiliates.  All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.amazon.carbonado.cursor;
+
+import java.util.Comparator;
+import java.util.NoSuchElementException;
+
+import com.amazon.carbonado.FetchException;
+import com.amazon.carbonado.Cursor;
+
+/**
+ * Wraps two Cursors and performs a <i>symmetric set difference</i>
+ * operation. In boolean logic, this is an <i>exclusive or</i> operation.
+ *
+ * <p>Both cursors must return results in the same order. Ordering is preserved
+ * by the difference.
+ *
+ * @author Brian S O'Neill
+ * @see UnionCursor
+ * @see IntersectionCursor
+ * @see DifferenceCursor
+ */
+public class SymmetricDifferenceCursor<S> extends AbstractCursor<S> {
+    private final Cursor<S> mLeftCursor;
+    private final Cursor<S> mRightCursor;
+    private final Comparator<S> mOrder;
+
+    private S mNextLeft;
+    private S mNextRight;
+    private int mCompareResult;
+
+    /**
+     * @param left cursor to wrap
+     * @param right cursor to wrap
+     * @param order describes sort ordering of wrapped cursors, which must be
+     * a total ordering
+     */
+    public SymmetricDifferenceCursor(Cursor<S> left, Cursor<S> right, Comparator<S> order) {
+        if (left == null || right == null || order == null) {
+            throw new IllegalArgumentException();
+        }
+        mLeftCursor = left;
+        mRightCursor = right;
+        mOrder = order;
+    }
+
+    public synchronized void close() throws FetchException {
+        mLeftCursor.close();
+        mRightCursor.close();
+        mNextLeft = null;
+        mNextRight = null;
+    }
+
+    public boolean hasNext() throws FetchException {
+        return compareNext() != 0;
+    }
+
+    /**
+     * Returns 0 if no next element available, &lt;0 if next element is from
+     * left source cursor, and &gt;0 if next element is from right source
+     * cursor.
+     */
+    public synchronized int compareNext() throws FetchException {
+        if (mCompareResult != 0) {
+            return mCompareResult;
+        }
+
+        while (true) {
+            if (mNextLeft == null && mLeftCursor.hasNext()) {
+                mNextLeft = mLeftCursor.next();
+            }
+            if (mNextRight == null && mRightCursor.hasNext()) {
+                mNextRight = mRightCursor.next();
+            }
+
+            if (mNextLeft == null) {
+                return mNextRight != null ? 1 : 0;
+            }
+            if (mNextRight == null) {
+                return -1;
+            }
+
+            if ((mCompareResult = mOrder.compare(mNextLeft, mNextRight)) == 0) {
+                mNextLeft = null;
+                mNextRight = null;
+            } else {
+                return mCompareResult;
+            }
+        }
+    }
+
+    public synchronized S next() throws FetchException {
+        S next;
+        int result = compareNext();
+        if (result < 0) {
+            next = mNextLeft;
+            mNextLeft = null;
+        } else if (result > 0) {
+            next = mNextRight;
+            mNextRight = null;
+        } else {
+            throw new NoSuchElementException();
+        }
+        mCompareResult = 0;
+        return next;
+    }
+}
diff --git a/src/main/java/com/amazon/carbonado/cursor/ThrottledCursor.java b/src/main/java/com/amazon/carbonado/cursor/ThrottledCursor.java
new file mode 100644
index 0000000..2f4c5a0
--- /dev/null
+++ b/src/main/java/com/amazon/carbonado/cursor/ThrottledCursor.java
@@ -0,0 +1,101 @@
+/*
+ * Copyright 2006 Amazon Technologies, Inc. or its affiliates.
+ * Amazon, Amazon.com and Carbonado are trademarks or registered trademarks
+ * of Amazon Technologies, Inc. or its affiliates.  All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.amazon.carbonado.cursor;
+
+import com.amazon.carbonado.Cursor;
+import com.amazon.carbonado.FetchException;
+import com.amazon.carbonado.FetchInterruptedException;
+
+import com.amazon.carbonado.util.Throttle;
+
+/**
+ * Wraps another cursor and fetches results at a reduced speed.
+ *
+ * @author Brian S O'Neill
+ */
+public class ThrottledCursor<S> extends AbstractCursor<S> {
+    private static final int WINDOW_SIZE = 10;
+    private static final int SLEEP_PRECISION_MILLIS = 50;
+
+    private final Cursor<S> mCursor;
+    private final Throttle mThrottle;
+    private final double mDesiredSpeed;
+
+    /**
+     * @param cursor cursor to wrap
+     * @param throttle 1.0 = fetch at full speed, 0.5 = fetch at half speed,
+     * 0.1 = fetch at one tenth speed, etc.
+     */
+    public ThrottledCursor(Cursor<S> cursor, double throttle) {
+        mCursor = cursor;
+        if (throttle < 1.0) {
+            if (throttle < 0.0) {
+                throttle = 0.0;
+            }
+            mThrottle = new Throttle(WINDOW_SIZE);
+            mDesiredSpeed = throttle;
+        } else {
+            mThrottle = null;
+            mDesiredSpeed = 1.0;
+        }
+    }
+
+    public void close() throws FetchException {
+        mCursor.close();
+    }
+
+    public boolean hasNext() throws FetchException {
+        return mCursor.hasNext();
+    }
+
+    public S next() throws FetchException {
+        throttle();
+        return mCursor.next();
+    }
+
+    public int skipNext(int amount) throws FetchException {
+        if (amount <= 0) {
+            if (amount < 0) {
+                throw new IllegalArgumentException("Cannot skip negative amount: " + amount);
+            }
+            return 0;
+        }
+
+        int count = 0;
+        while (--amount >= 0) {
+            throttle();
+            if (skipNext(1) <= 0) {
+                break;
+            }
+            count++;
+        }
+
+        return count;
+    }
+
+    private void throttle() throws FetchInterruptedException {
+        if (mThrottle != null) {
+            try {
+                mThrottle.throttle(mDesiredSpeed, SLEEP_PRECISION_MILLIS);
+            } catch (InterruptedException e) {
+                throw new FetchInterruptedException(e);
+            }
+        }
+    }
+}
diff --git a/src/main/java/com/amazon/carbonado/cursor/TransformedCursor.java b/src/main/java/com/amazon/carbonado/cursor/TransformedCursor.java
new file mode 100644
index 0000000..0860cf0
--- /dev/null
+++ b/src/main/java/com/amazon/carbonado/cursor/TransformedCursor.java
@@ -0,0 +1,119 @@
+/*
+ * Copyright 2006 Amazon Technologies, Inc. or its affiliates.
+ * Amazon, Amazon.com and Carbonado are trademarks or registered trademarks
+ * of Amazon Technologies, Inc. or its affiliates.  All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.amazon.carbonado.cursor;
+
+import java.util.NoSuchElementException;
+
+import com.amazon.carbonado.Cursor;
+import com.amazon.carbonado.FetchException;
+import com.amazon.carbonado.FetchInterruptedException;
+
+/**
+ * Abstract cursor which wraps another cursor and transforms each storable
+ * result into a target storable. This class can be used for implementing
+ * one-to-one joins. Use {@link MultiTransformedCursor} for one-to-many joins.
+ *
+ * @author Brian S O'Neill
+ */
+public abstract class TransformedCursor<S, T> extends AbstractCursor<T> {
+    private final Cursor<S> mCursor;
+
+    private T mNext;
+
+    protected TransformedCursor(Cursor<S> cursor) {
+        if (cursor == null) {
+            throw new IllegalArgumentException();
+        }
+        mCursor = cursor;
+    }
+
+    /**
+     * This method must be implemented to transform storables. If the storable
+     * cannot be transformed, either throw a FetchException or return null. If
+     * null is returned, the storable is simply filtered out.
+     *
+     * @return transformed storable, or null to filter it out
+     */
+    protected abstract T transform(S storable) throws FetchException;
+
+    public void close() throws FetchException {
+        synchronized (mCursor) {
+            mCursor.close();
+            mNext = null;
+        }
+    }
+
+    public boolean hasNext() throws FetchException {
+        synchronized (mCursor) {
+            if (mNext != null) {
+                return true;
+            }
+            try {
+                int count = 0;
+                while (mCursor.hasNext()) {
+                    T next = transform(mCursor.next());
+                    if (next != null) {
+                        mNext = next;
+                        return true;
+                    }
+                    interruptCheck(++count);
+                }
+            } catch (NoSuchElementException e) {
+            }
+            return false;
+        }
+    }
+
+    public T next() throws FetchException {
+        synchronized (mCursor) {
+            if (hasNext()) {
+                T next = mNext;
+                mNext = null;
+                return next;
+            }
+            throw new NoSuchElementException();
+        }
+    }
+
+    public int skipNext(int amount) throws FetchException {
+        synchronized (mCursor) {
+            if (amount <= 0) {
+                if (amount < 0) {
+                    throw new IllegalArgumentException("Cannot skip negative amount: " + amount);
+                }
+                return 0;
+            }
+
+            int count = 0;
+            while (--amount >= 0 && hasNext()) {
+                interruptCheck(++count);
+                mNext = null;
+            }
+
+            return count;
+        }
+    }
+
+    private void interruptCheck(int count) throws FetchException {
+        if ((count & ~0xff) == 0 && Thread.interrupted()) {
+            close();
+            throw new FetchInterruptedException();
+        }
+    }
+}
diff --git a/src/main/java/com/amazon/carbonado/cursor/UnionCursor.java b/src/main/java/com/amazon/carbonado/cursor/UnionCursor.java
new file mode 100644
index 0000000..7a62678
--- /dev/null
+++ b/src/main/java/com/amazon/carbonado/cursor/UnionCursor.java
@@ -0,0 +1,106 @@
+/*
+ * Copyright 2006 Amazon Technologies, Inc. or its affiliates.
+ * Amazon, Amazon.com and Carbonado are trademarks or registered trademarks
+ * of Amazon Technologies, Inc. or its affiliates.  All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.amazon.carbonado.cursor;
+
+import java.util.Comparator;
+import java.util.NoSuchElementException;
+
+import com.amazon.carbonado.FetchException;
+import com.amazon.carbonado.Cursor;
+
+/**
+ * Wraps two Cursors and performs a <i>set union</i> operation. In boolean
+ * logic, this is an <i>or</i> operation.
+ *
+ * <p>Both cursors must return results in the same order. Ordering is preserved
+ * by the union.
+ *
+ * @author Brian S O'Neill
+ * @see IntersectionCursor
+ * @see DifferenceCursor
+ * @see SymmetricDifferenceCursor
+ */
+public class UnionCursor<S> extends AbstractCursor<S> {
+    private final Cursor<S> mLeftCursor;
+    private final Cursor<S> mRightCursor;
+    private final Comparator<S> mOrder;
+
+    private S mNextLeft;
+    private S mNextRight;
+
+    /**
+     * @param left cursor to wrap
+     * @param right cursor to wrap
+     * @param order describes sort ordering of wrapped cursors, which must be
+     * a total ordering
+     */
+    public UnionCursor(Cursor<S> left, Cursor<S> right, Comparator<S> order) {
+        if (left == null || right == null || order == null) {
+            throw new IllegalArgumentException();
+        }
+        mLeftCursor = left;
+        mRightCursor = right;
+        mOrder = order;
+    }
+
+    public synchronized void close() throws FetchException {
+        mLeftCursor.close();
+        mRightCursor.close();
+        mNextLeft = null;
+        mNextRight = null;
+    }
+
+    public synchronized boolean hasNext() throws FetchException {
+        if (mNextLeft == null && mLeftCursor.hasNext()) {
+            mNextLeft = mLeftCursor.next();
+        }
+        if (mNextRight == null && mRightCursor.hasNext()) {
+            mNextRight = mRightCursor.next();
+        }
+        return mNextLeft != null || mNextRight != null;
+    }
+
+    public synchronized S next() throws FetchException {
+        if (hasNext()) {
+            S next;
+            if (mNextLeft == null) {
+                next = mNextRight;
+                mNextRight = null;
+            } else if (mNextRight == null) {
+                next = mNextLeft;
+                mNextLeft = null;
+            } else {
+                int result = mOrder.compare(mNextLeft, mNextRight);
+                if (result < 0) {
+                    next = mNextLeft;
+                    mNextLeft = null;
+                } else if (result > 0) {
+                    next = mNextRight;
+                    mNextRight = null;
+                } else {
+                    next = mNextLeft;
+                    mNextLeft = null;
+                    mNextRight = null;
+                }
+            }
+            return next;
+        }
+        throw new NoSuchElementException();
+    }
+}
diff --git a/src/main/java/com/amazon/carbonado/cursor/WorkFilePool.java b/src/main/java/com/amazon/carbonado/cursor/WorkFilePool.java
new file mode 100644
index 0000000..94c7b84
--- /dev/null
+++ b/src/main/java/com/amazon/carbonado/cursor/WorkFilePool.java
@@ -0,0 +1,192 @@
+/*
+ * Copyright 2006 Amazon Technologies, Inc. or its affiliates.
+ * Amazon, Amazon.com and Carbonado are trademarks or registered trademarks
+ * of Amazon Technologies, Inc. or its affiliates.  All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.amazon.carbonado.cursor;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Used internally by {@link MergeSortBuffer}.
+ *
+ * @author Brian S O'Neill
+ */
+class WorkFilePool {
+    private static final String cTempDir = System.getProperty("java.io.tmpdir");
+
+    private static final ConcurrentMap<String, WorkFilePool> cPools =
+        new ConcurrentHashMap<String, WorkFilePool>();
+
+    /**
+     * @param tempDir directory to store temp files for merging, or null for default
+     */
+    static WorkFilePool getInstance(String tempDir) {
+        // This method uses ConcurrentMap features to eliminate the need to
+        // ever synchronize access, since this method may be called frequently.
+
+        if (tempDir == null) {
+            tempDir = cTempDir;
+        }
+
+        WorkFilePool pool = cPools.get(tempDir);
+
+        if (pool != null) {
+            return pool;
+        }
+
+        String canonical;
+        try {
+            canonical = new File(tempDir).getCanonicalPath();
+        } catch (IOException e) {
+            canonical = new File(tempDir).getAbsolutePath();
+        }
+
+        if (!canonical.equals(tempDir)) {
+            pool = getInstance(canonical);
+            cPools.putIfAbsent(tempDir, pool);
+            return pool;
+        }
+
+        pool = new WorkFilePool(new File(tempDir));
+
+        WorkFilePool existing = cPools.putIfAbsent(tempDir, pool);
+
+        if (existing == null) {
+            // New pool is the winner, so finish off initialization. Pool can
+            // be used concurrently by another thread without shutdown hook.
+            pool.addShutdownHook(tempDir);
+        } else {
+            pool = existing;
+        }
+
+        return pool;
+    }
+
+    private final File mTempDir;
+
+    // Work files not currently being used by any MergeSortBuffer.
+    private final List<RandomAccessFile> mWorkFilePool;
+    // Instances of MergeSortBuffer, to be notified on shutdown that they
+    // should close their work files.
+    private final Set<MergeSortBuffer<?>> mWorkFileUsers;
+
+    private Thread mShutdownHook;
+
+    /**
+     * @param tempDir directory to store temp files for merging, or null for default
+     */
+    private WorkFilePool(File tempDir) {
+        mTempDir = tempDir;
+        mWorkFilePool = new ArrayList<RandomAccessFile>();
+        mWorkFileUsers = new HashSet<MergeSortBuffer<?>>();
+    }
+
+    RandomAccessFile acquireWorkFile(MergeSortBuffer<?> buffer) throws IOException {
+        synchronized (mWorkFileUsers) {
+            mWorkFileUsers.add(buffer);
+        }
+        synchronized (mWorkFilePool) {
+            if (mWorkFilePool.size() > 0) {
+                return mWorkFilePool.remove(mWorkFilePool.size() - 1);
+            }
+        }
+        File file = File.createTempFile("carbonado-mergesort-", null, mTempDir);
+        file.deleteOnExit();
+        return new RandomAccessFile(file, "rw");
+    }
+
+    void releaseWorkFiles(List<RandomAccessFile> files) {
+        synchronized (mWorkFilePool) {
+            for (RandomAccessFile raf : files) {
+                try {
+                    raf.seek(0);
+                    // Return space to file system.
+                    raf.setLength(0);
+                    mWorkFilePool.add(raf);
+                } catch (IOException e) {
+                    // Work file is broken, discard it.
+                    try {
+                        raf.close();
+                    } catch (IOException e2) {
+                    }
+                }
+            }
+        }
+    }
+
+    void unregisterWorkFileUser(MergeSortBuffer<?> buffer) {
+        synchronized (mWorkFileUsers) {
+            mWorkFileUsers.remove(buffer);
+            mWorkFileUsers.notify();
+        }
+    }
+
+    private synchronized void addShutdownHook(String tempDir) {
+        if (mShutdownHook != null) {
+            return;
+        }
+
+        // Add shutdown hook to close work files so that they can be deleted.
+        String threadName = "MergeSortBuffer shutdown (" + tempDir + ')';
+
+        mShutdownHook = new Thread(threadName) {
+            public void run() {
+                // Notify users of work files and wait for them to close.
+                synchronized (mWorkFileUsers) {
+                    for (MergeSortBuffer<?> buffer : mWorkFileUsers) {
+                        buffer.stop();
+                    }
+                    final long timeout = 10000;
+                    final long giveup = System.currentTimeMillis() + timeout;
+                    try {
+                        while (mWorkFileUsers.size() > 0) {
+                            long now = System.currentTimeMillis();
+                            if (now < giveup) {
+                                mWorkFileUsers.wait(giveup - now);
+                            } else {
+                                break;
+                            }
+                        }
+                    } catch (InterruptedException e) {
+                    }
+                }
+
+                synchronized (mWorkFilePool) {
+                    for (RandomAccessFile raf : mWorkFilePool) {
+                        try {
+                            raf.close();
+                        } catch (IOException e) {
+                        }
+                    }
+                    mWorkFilePool.clear();
+                }
+            }
+        };
+
+        Runtime.getRuntime().addShutdownHook(mShutdownHook);
+    }
+}
diff --git a/src/main/java/com/amazon/carbonado/cursor/package-info.java b/src/main/java/com/amazon/carbonado/cursor/package-info.java
new file mode 100644
index 0000000..3553d1a
--- /dev/null
+++ b/src/main/java/com/amazon/carbonado/cursor/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Copyright 2006 Amazon Technologies, Inc. or its affiliates.
+ * Amazon, Amazon.com and Carbonado are trademarks or registered trademarks
+ * of Amazon Technologies, Inc. or its affiliates.  All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Support for advanced processing of cursor results, including basic set
+ * theory operations.
+ */
+package com.amazon.carbonado.cursor;
-- 
cgit v1.2.3