From 121886bc0c92389610408e3b415abb992ad8a212 Mon Sep 17 00:00:00 2001
From: "Brian S. O'Neill" <bronee@gmail.com>
Date: Wed, 4 May 2011 00:20:02 +0000
Subject: Add support for Query controller and timeouts; remove vestigial
 support for interrupts.

---
 src/main/java/com/amazon/carbonado/Query.java      | 310 ++++++++++++++++++++-
 .../amazon/carbonado/cursor/ControllerCursor.java  |  99 +++++++
 .../amazon/carbonado/cursor/FilteredCursor.java    |  13 +-
 .../com/amazon/carbonado/cursor/GroupedCursor.java |  13 +-
 .../amazon/carbonado/cursor/MergeSortBuffer.java   |  84 +++++-
 .../carbonado/cursor/MultiTransformedCursor.java   |  11 -
 .../com/amazon/carbonado/cursor/SortedCursor.java  |  11 -
 .../amazon/carbonado/cursor/TransformedCursor.java |  12 +-
 .../com/amazon/carbonado/qe/AbstractQuery.java     |  41 +++
 .../amazon/carbonado/qe/AbstractQueryExecutor.java |  41 +++
 .../carbonado/qe/DelegatedQueryExecutor.java       |  17 ++
 .../java/com/amazon/carbonado/qe/EmptyQuery.java   |  55 ++++
 .../amazon/carbonado/qe/FilteredQueryExecutor.java |   7 +
 .../amazon/carbonado/qe/FullScanQueryExecutor.java |  31 +++
 .../amazon/carbonado/qe/IndexedQueryExecutor.java  |  59 +++-
 .../amazon/carbonado/qe/IterableQueryExecutor.java |   6 +
 .../amazon/carbonado/qe/JoinedQueryExecutor.java   |  33 ++-
 .../com/amazon/carbonado/qe/KeyQueryExecutor.java  |  20 ++
 .../com/amazon/carbonado/qe/QueryExecutor.java     |  24 ++
 .../amazon/carbonado/qe/SortedQueryExecutor.java   |  41 +++
 .../com/amazon/carbonado/qe/StandardQuery.java     |  48 +++-
 .../amazon/carbonado/qe/UnionQueryExecutor.java    |   9 +-
 .../carbonado/raw/GenericStorableCodecFactory.java |   3 +-
 .../carbonado/repo/indexed/IndexedStorage.java     |  44 +++
 .../carbonado/repo/indexed/ManagedIndex.java       |  18 +-
 .../repo/jdbc/H2ExceptionTransformer.java          |   8 +-
 .../repo/jdbc/JDBCExceptionTransformer.java        |  20 ++
 .../amazon/carbonado/repo/jdbc/JDBCStorage.java    |  92 +++++-
 .../repo/jdbc/OracleExceptionTransformer.java      |   7 +
 .../carbonado/repo/logging/LoggingQuery.java       |  94 +++++++
 .../com/amazon/carbonado/repo/map/MapStorage.java  |  49 ++++
 .../carbonado/repo/sleepycat/BDBStorage.java       |  56 +++-
 32 files changed, 1273 insertions(+), 103 deletions(-)
 create mode 100644 src/main/java/com/amazon/carbonado/cursor/ControllerCursor.java

(limited to 'src/main/java/com')

diff --git a/src/main/java/com/amazon/carbonado/Query.java b/src/main/java/com/amazon/carbonado/Query.java
index a660c1a..1780ebd 100644
--- a/src/main/java/com/amazon/carbonado/Query.java
+++ b/src/main/java/com/amazon/carbonado/Query.java
@@ -18,7 +18,13 @@
 
 package com.amazon.carbonado;
 
+import java.io.Closeable;
 import java.io.IOException;
+import java.io.Serializable;
+
+import java.util.concurrent.TimeUnit;
+
+import java.util.concurrent.atomic.AtomicLongFieldUpdater;
 
 import com.amazon.carbonado.filter.Filter;
 import com.amazon.carbonado.filter.FilterValues;
@@ -286,6 +292,20 @@ public interface Query<S extends Storable> {
      */
     Cursor<S> fetch() throws FetchException;
 
+    /**
+     * Fetches results for this query. If any updates or deletes might be
+     * performed on the results, consider enclosing the fetch in a
+     * transaction. This allows the isolation level and "for update" mode to be
+     * adjusted. Some repositories might otherwise deadlock.
+     *
+     * @param controller optional controller which can abort query operation
+     * @return fetch results
+     * @throws IllegalStateException if any blank parameters in this query
+     * @throws FetchException if storage layer throws an exception
+     * @see Repository#enterTransaction(IsolationLevel)
+     */
+    Cursor<S> fetch(Controller controller) throws FetchException;
+
     /**
      * Fetches a slice of results for this query, as defined by a numerical
      * range. A slice can be used to limit the number of results from a
@@ -303,6 +323,24 @@ public interface Query<S extends Storable> {
      */
     Cursor<S> fetchSlice(long from, Long to) throws FetchException;
 
+    /**
+     * Fetches a slice of results for this query, as defined by a numerical
+     * range. A slice can be used to limit the number of results from a
+     * query. It is strongly recommended that the query be given a total {@link
+     * #orderBy ordering} in order for the slice results to be deterministic.
+     *
+     * @param from zero-based {@code from} record number, inclusive
+     * @param to optional zero-based {@code to} record number, exclusive
+     * @param controller optional controller which can abort query operation
+     * @return fetch results
+     * @throws IllegalStateException if any blank parameters in this query
+     * @throws IllegalArgumentException if {@code from} is negative or if
+     * {@code from} is more than {@code to}
+     * @throws FetchException if storage layer throws an exception
+     * @since 1.2
+     */
+    Cursor<S> fetchSlice(long from, Long to, Controller controller) throws FetchException;
+
     /**
      * Fetches results for this query after a given starting point, which is
      * useful for re-opening a cursor. This is only effective when query has
@@ -325,6 +363,29 @@ public interface Query<S extends Storable> {
      */
     <T extends S> Cursor<S> fetchAfter(T start) throws FetchException;
 
+    /**
+     * Fetches results for this query after a given starting point, which is
+     * useful for re-opening a cursor. This is only effective when query has
+     * been given an explicit {@link #orderBy ordering}. If not a total
+     * ordering, then cursor may start at an earlier position.
+     *
+     * <p>Note: This method can be very expensive to call repeatedly, if the
+     * query needs to perform a sort operation. Ideally, the query ordering
+     * should match the natural ordering of an index or key.
+     *
+     * <p>Calling {@code fetchAfter(s)} is equivalent to calling {@code
+     * after(s).fetch()}.
+     *
+     * @param start storable to attempt to start after; if null, fetch all results
+     * @param controller optional controller which can abort query operation
+     * @return fetch results
+     * @throws IllegalStateException if any blank parameters in this query
+     * @throws FetchException if storage layer throws an exception
+     * @see Repository#enterTransaction(IsolationLevel)
+     * @see #after
+     */
+    <T extends S> Cursor<S> fetchAfter(T start, Controller controller) throws FetchException;
+
     /**
      * Attempts to load exactly one matching object. If the number of matching
      * records is zero or exceeds one, then an exception is thrown instead.
@@ -338,8 +399,21 @@ public interface Query<S extends Storable> {
     S loadOne() throws FetchException;
 
     /**
-     * May return null if nothing found. Throws exception if record count is
-     * more than one.
+     * Attempts to load exactly one matching object. If the number of matching
+     * records is zero or exceeds one, then an exception is thrown instead.
+     *
+     * @param controller optional controller which can abort query operation
+     * @return a single fetched object
+     * @throws IllegalStateException if any blank parameters in this query
+     * @throws FetchNoneException if no matching record found
+     * @throws FetchMultipleException if more than one matching record found
+     * @throws FetchException if storage layer throws an exception
+     */
+    S loadOne(Controller controller) throws FetchException;
+
+    /**
+     * Tries to load one record, but returns null if nothing was found. Throws
+     * exception if record count is more than one.
      *
      * @return null or a single fetched object
      * @throws IllegalStateException if any blank parameters in this query
@@ -348,6 +422,18 @@ public interface Query<S extends Storable> {
      */
     S tryLoadOne() throws FetchException;
 
+    /**
+     * Tries to load one record, but returns null if nothing was found. Throws
+     * exception if record count is more than one.
+     *
+     * @param controller optional controller which can abort query operation
+     * @return null or a single fetched object
+     * @throws IllegalStateException if any blank parameters in this query
+     * @throws FetchMultipleException if more than one matching record found
+     * @throws FetchException if storage layer throws an exception
+     */
+    S tryLoadOne(Controller controller) throws FetchException;
+
     /**
      * Deletes one matching object. If the number of matching records is zero or
      * exceeds one, then no delete occurs, and an exception is thrown instead.
@@ -359,6 +445,18 @@ public interface Query<S extends Storable> {
      */
     void deleteOne() throws PersistException;
 
+    /**
+     * Deletes one matching object. If the number of matching records is zero or
+     * exceeds one, then no delete occurs, and an exception is thrown instead.
+     *
+     * @param controller optional controller which can abort query operation
+     * @throws IllegalStateException if any blank parameters in this query
+     * @throws PersistNoneException if no matching record found
+     * @throws PersistMultipleException if more than one record matches
+     * @throws PersistException if storage layer throws an exception
+     */
+    void deleteOne(Controller controller) throws PersistException;
+
     /**
      * Deletes zero or one matching objects. If the number of matching records
      * exceeds one, then no delete occurs, and an exception is thrown instead.
@@ -370,6 +468,18 @@ public interface Query<S extends Storable> {
      */
     boolean tryDeleteOne() throws PersistException;
 
+    /**
+     * Deletes zero or one matching objects. If the number of matching records
+     * exceeds one, then no delete occurs, and an exception is thrown instead.
+     *
+     * @param controller optional controller which can abort query operation
+     * @return true if record existed and was deleted, or false if no match
+     * @throws IllegalStateException if any blank parameters in this query
+     * @throws PersistMultipleException if more than one record matches
+     * @throws PersistException if storage layer throws an exception
+     */
+    boolean tryDeleteOne(Controller controller) throws PersistException;
+
     /**
      * Deletes zero or more matching objects. There is no guarantee that
      * deleteAll is an atomic operation. If atomic behavior is desired, wrap
@@ -380,6 +490,17 @@ public interface Query<S extends Storable> {
      */
     void deleteAll() throws PersistException;
 
+    /**
+     * Deletes zero or more matching objects. There is no guarantee that
+     * deleteAll is an atomic operation. If atomic behavior is desired, wrap
+     * the call in a transaction scope.
+     *
+     * @param controller optional controller which can abort query operation
+     * @throws IllegalStateException if any blank parameters in this query
+     * @throws PersistException if storage layer throws an exception
+     */
+    void deleteAll(Controller controller) throws PersistException;
+
     /**
      * Returns a count of all results matched by this query. Even though no
      * results are explicitly fetched, this method may still be expensive to
@@ -391,6 +512,18 @@ public interface Query<S extends Storable> {
      */
     long count() throws FetchException;
 
+    /**
+     * Returns a count of all results matched by this query. Even though no
+     * results are explicitly fetched, this method may still be expensive to
+     * call. The actual performance will vary by repository and available indexes.
+     *
+     * @param controller optional controller which can abort query operation
+     * @return count of matches
+     * @throws IllegalStateException if any blank parameters in this query
+     * @throws FetchException if storage layer throws an exception
+     */
+    long count(Controller controller) throws FetchException;
+
     /**
      * Returns true if any results are matched by this query.
      *
@@ -401,6 +534,17 @@ public interface Query<S extends Storable> {
      */
     boolean exists() throws FetchException;
 
+    /**
+     * Returns true if any results are matched by this query.
+     *
+     * @param controller optional controller which can abort query operation
+     * @return true if any matches
+     * @throws IllegalStateException if any blank parameters in this query
+     * @throws FetchException if storage layer throws an exception
+     * @since 1.2
+     */
+    boolean exists(Controller controller) throws FetchException;
+
     /**
      * Print the native query to standard out, which is useful for performance
      * analysis. Not all repositories have a native query format. An example
@@ -469,4 +613,166 @@ public interface Query<S extends Storable> {
      * Returns a description of the query filter and any other arguments.
      */
     String toString();
+
+    /**
+     * Controller instance can be used to abort query operations.
+     *
+     * <p>Example:<pre>
+     * Storage&lt;UserInfo&gt; users = ...
+     * long count = users.query("name = ?").count(Query.Timeout.seconds(10));
+     * </pre>
+     */
+    public static interface Controller extends Serializable, Closeable {
+        /**
+         * Returns a non-negative value if controller imposes an absolute upper
+         * bound on query execution time.
+         */
+        public long getTimeout();
+
+        /**
+         * Returns the unit for the timeout, if applicable.
+         */
+        public TimeUnit getTimeoutUnit();
+
+        /**
+         * Called by query when it begins, possibly multiple times. Implementation
+         * is required to be idempotent and ignore multiple invocations.
+         */
+        public void begin();
+ 
+        /**
+         * Periodically called by query to determine if it should continue.
+         */
+        public void continueCheck() throws FetchException;
+ 
+        /**
+         * Always called by query when finished, even when it fails. Implementation
+         * is required to be idempotent and ignore multiple invocations.
+         */
+        public void close();        
+    }
+
+    /**
+     * Timeout controller, for aborting long running queries. One instance is
+     * good for one timeout. The instance can be shared by multiple queries, if
+     * they are part of a single logical operation.
+     *
+     * <p>The timeout applies to the entire duration of fetching results, not
+     * just the time spent between individual fetches. A caller which is slowly
+     * processing results can timeout. More sophisticated timeouts can be
+     * implemented using custom Controller implementations.
+     */
+    public static final class Timeout implements Controller {
+        private static final long serialVersionUID = 1;
+
+        private static final AtomicLongFieldUpdater<Timeout> endUpdater =
+            AtomicLongFieldUpdater.newUpdater(Timeout.class, "mEndNanos");
+
+        /**
+         * Return a new Timeout in nanoseconds.
+         */
+        public static Timeout nanos(long timeout) {
+            return new Timeout(timeout, TimeUnit.NANOSECONDS);
+        }
+
+        /**
+         * Return a new Timeout in microseconds.
+         */
+        public static Timeout micros(long timeout) {
+            return new Timeout(timeout, TimeUnit.MICROSECONDS);
+        }
+
+        /**
+         * Return a new Timeout in milliseconds.
+         */
+        public static Timeout millis(long timeout) {
+            return new Timeout(timeout, TimeUnit.MILLISECONDS);
+        }
+ 
+        /**
+         * Return a new Timeout in seconds.
+         */
+        public static Timeout seconds(long timeout) {
+            return new Timeout(timeout, TimeUnit.SECONDS);
+        }
+
+        /**
+         * Return a new Timeout in minutes.
+         */
+ 
+        public static Timeout minutes(long timeout) {
+            return new Timeout(timeout, TimeUnit.MINUTES);
+        }
+
+        /**
+         * Return a new Timeout in hours.
+         */
+        public static Timeout hours(long timeout) {
+            return new Timeout(timeout, TimeUnit.HOURS);
+        }
+
+        private final long mTimeout;
+        private final TimeUnit mUnit;
+
+        private volatile transient long mEndNanos;
+
+        public Timeout(long timeout, TimeUnit unit) {
+            if (timeout < 0) {
+                throw new IllegalArgumentException("Timeout cannot be negative: " + timeout);
+            }
+            if (unit == null && timeout != 0) {
+                throw new IllegalArgumentException
+                    ("TimeUnit cannot be null if timeout is non-zero: " + timeout);
+            }
+            mTimeout = timeout;
+            mUnit = unit;
+        }
+ 
+        public long getTimeout() {
+            return mTimeout;
+        }
+
+        public TimeUnit getTimeoutUnit() {
+            return mUnit;
+        }
+
+        @Override
+        public void begin() {
+            long end = System.nanoTime() + mUnit.toNanos(mTimeout);
+            if (end == 0) {
+                // Handle rare case to ensure atomic compare and set always
+                // works the first time, supporting idempotent calls to this
+                // method.
+                end = 1;
+            }
+            endUpdater.compareAndSet(this, 0, end);
+        }
+
+        @Override
+        public void continueCheck() throws FetchTimeoutException {
+            long end = mEndNanos;
+
+            if (end == 0) {
+                // Begin was not called, in violation of how the Controller
+                // must be used. Be lenient and begin now.
+                begin();
+                end = mEndNanos;
+            }
+
+            // Subtract to support modulo comparison.
+            if ((System.nanoTime() - end) >= 0) {
+                throw new FetchTimeoutException("Timed out: " + mTimeout + ' ' + mUnit);
+            }
+        }
+ 
+        @Override
+        public void close() {
+            // Nothing to do.
+        }
+
+        @Override
+        public String toString() {
+            return "Query.Timeout {timeout=" + mTimeout + ", unit=" + mUnit + '}';
+        }
+    }
 }
diff --git a/src/main/java/com/amazon/carbonado/cursor/ControllerCursor.java b/src/main/java/com/amazon/carbonado/cursor/ControllerCursor.java
new file mode 100644
index 0000000..da682d2
--- /dev/null
+++ b/src/main/java/com/amazon/carbonado/cursor/ControllerCursor.java
@@ -0,0 +1,99 @@
+/*
+ * Copyright 2011 Amazon Technologies, Inc. or its affiliates.
+ * Amazon, Amazon.com and Carbonado are trademarks or registered trademarks
+ * of Amazon Technologies, Inc. or its affiliates.  All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.amazon.carbonado.cursor;
+
+import com.amazon.carbonado.Cursor;
+import com.amazon.carbonado.FetchException;
+import com.amazon.carbonado.Query;
+
+/**
+ * Wraps another cursor and periodically calls a {@link com.amazon.carbonado.Query.Controller controller}.
+ *
+ * @author Brian S O'Neill
+ */
+public class ControllerCursor<S> extends AbstractCursor<S> {
+    /**
+     * Returns a ControllerCursor depending on whether a controller instance is
+     * passed in or not.
+     *
+     * @param controller optional controller which can abort query operation
+     * @throws IllegalArgumentException if source is null
+     */
+    public static <S> Cursor<S> apply(Cursor<S> source, Query.Controller controller) {
+        return controller == null ? source : new ControllerCursor<S>(source, controller);
+    }
+
+    private final Cursor<S> mSource;
+    private final Query.Controller mController;
+
+    private byte mCount;
+
+    /**
+     * @param controller required controller which can abort query operation
+     * @throws IllegalArgumentException if either argument is null
+     */
+    private ControllerCursor(Cursor<S> source, Query.Controller controller) {
+        if (source == null) {
+            throw new IllegalArgumentException("Source is null");
+        }
+        if (controller == null) {
+            throw new IllegalArgumentException("Controller is null");
+        }
+        mSource = source;
+        mController = controller;
+        controller.begin();
+    }
+
+    public boolean hasNext() throws FetchException {
+        if (mSource.hasNext()) {
+            continueCheck();
+            return true;
+        }
+        return false;
+    }
+
+    public S next() throws FetchException {
+        S next = mSource.next();
+        continueCheck();
+        return next;
+    }
+
+    public void close() throws FetchException {
+        try {
+            mSource.close();
+        } finally {
+            mController.close();
+        }
+    }
+
+    private void continueCheck() throws FetchException {
+        if (++mCount == 0) {
+            try {
+                mController.continueCheck();
+            } catch (FetchException e) {
+                try {
+                    close();
+                } catch (FetchException e2) {
+                    // Ignore.
+                }
+                throw e;
+            }
+        }
+    }
+}
diff --git a/src/main/java/com/amazon/carbonado/cursor/FilteredCursor.java b/src/main/java/com/amazon/carbonado/cursor/FilteredCursor.java
index ad21667..4cf8c21 100644
--- a/src/main/java/com/amazon/carbonado/cursor/FilteredCursor.java
+++ b/src/main/java/com/amazon/carbonado/cursor/FilteredCursor.java
@@ -22,7 +22,6 @@ import java.util.NoSuchElementException;
 
 import com.amazon.carbonado.Cursor;
 import com.amazon.carbonado.FetchException;
-import com.amazon.carbonado.FetchInterruptedException;
 import com.amazon.carbonado.Storable;
 
 import com.amazon.carbonado.filter.Filter;
@@ -114,14 +113,12 @@ public abstract class FilteredCursor<S> extends AbstractCursor<S> {
             return true;
         }
         try {
-            int count = 0;
             while (mCursor.hasNext()) {
                 S next = mCursor.next();
                 if (isAllowed(next)) {
                     mNext = next;
                     return true;
                 }
-                interruptCheck(++count);
             }
         } catch (NoSuchElementException e) {
         } catch (FetchException e) {
@@ -165,10 +162,9 @@ public abstract class FilteredCursor<S> extends AbstractCursor<S> {
         try {
             int count = 0;
             while (--amount >= 0 && hasNext()) {
-                interruptCheck(++count);
+                ++count;
                 mNext = null;
             }
-
             return count;
         } catch (FetchException e) {
             try {
@@ -179,11 +175,4 @@ public abstract class FilteredCursor<S> extends AbstractCursor<S> {
             throw e;
         }
     }
-
-    private void interruptCheck(int count) throws FetchException {
-        if ((count & ~0xff) == 0 && Thread.interrupted()) {
-            close();
-            throw new FetchInterruptedException();
-        }
-    }
 }
diff --git a/src/main/java/com/amazon/carbonado/cursor/GroupedCursor.java b/src/main/java/com/amazon/carbonado/cursor/GroupedCursor.java
index 49ca84e..3053cd5 100644
--- a/src/main/java/com/amazon/carbonado/cursor/GroupedCursor.java
+++ b/src/main/java/com/amazon/carbonado/cursor/GroupedCursor.java
@@ -23,7 +23,6 @@ import java.util.NoSuchElementException;
 
 import com.amazon.carbonado.Cursor;
 import com.amazon.carbonado.FetchException;
-import com.amazon.carbonado.FetchInterruptedException;
 
 /**
  * Abstract cursor for aggregation and finding distinct data. The source cursor
@@ -124,7 +123,6 @@ public abstract class GroupedCursor<S, G> extends AbstractCursor<G> {
         }
 
         try {
-            int count = 0;
             if (mCursor.hasNext()) {
                 if (mGroupLeader == null) {
                     beginGroup(mGroupLeader = mCursor.next());
@@ -143,8 +141,6 @@ public abstract class GroupedCursor<S, G> extends AbstractCursor<G> {
                             return true;
                         }
                     }
-
-                    interruptCheck(++count);
                 }
 
                 G aggregate = finishGroup();
@@ -206,7 +202,7 @@ public abstract class GroupedCursor<S, G> extends AbstractCursor<G> {
         try {
             int count = 0;
             while (--amount >= 0 && hasNext()) {
-                interruptCheck(++count);
+                ++count;
                 mNextAggregate = null;
             }
 
@@ -220,11 +216,4 @@ public abstract class GroupedCursor<S, G> extends AbstractCursor<G> {
             throw e;
         }
     }
-
-    private void interruptCheck(int count) throws FetchException {
-        if ((count & ~0xff) == 0 && Thread.interrupted()) {
-            close();
-            throw new FetchInterruptedException();
-        }
-    }
 }
diff --git a/src/main/java/com/amazon/carbonado/cursor/MergeSortBuffer.java b/src/main/java/com/amazon/carbonado/cursor/MergeSortBuffer.java
index aae5243..0231012 100644
--- a/src/main/java/com/amazon/carbonado/cursor/MergeSortBuffer.java
+++ b/src/main/java/com/amazon/carbonado/cursor/MergeSortBuffer.java
@@ -38,7 +38,9 @@ import java.util.List;
 import java.util.NoSuchElementException;
 import java.util.PriorityQueue;
 
+import com.amazon.carbonado.FetchException;
 import com.amazon.carbonado.FetchInterruptedException;
+import com.amazon.carbonado.Query;
 import com.amazon.carbonado.Storable;
 import com.amazon.carbonado.Storage;
 import com.amazon.carbonado.SupportException;
@@ -122,6 +124,7 @@ public class MergeSortBuffer<S extends Storable> extends AbstractCollection<S>
 
     private final String mTempDir;
     private final int mMaxArrayCapacity;
+    private final Query.Controller mController;
 
     private Preparer<S> mPreparer;
 
@@ -143,6 +146,15 @@ public class MergeSortBuffer<S extends Storable> extends AbstractCollection<S>
         this(null, TEMP_DIR, MAX_ARRAY_CAPACITY);
     }
 
+    /**
+     * @since 1.2
+     *
+     * @param controller optional controller which can abort query operation
+     */
+    public MergeSortBuffer(Query.Controller controller) {
+        this(null, TEMP_DIR, MAX_ARRAY_CAPACITY, controller);
+    }
+
     /**
      * @param storage storage for elements; if null use first Storable to
      * prepare reloaded Storables
@@ -151,6 +163,15 @@ public class MergeSortBuffer<S extends Storable> extends AbstractCollection<S>
         this(storage, TEMP_DIR, MAX_ARRAY_CAPACITY);
     }
 
+    /**
+     * @param storage storage for elements; if null use first Storable to
+     * prepare reloaded Storables
+     * @param controller optional controller which can abort query operation
+     */
+    public MergeSortBuffer(Storage<S> storage, Query.Controller controller) {
+        this(storage, TEMP_DIR, MAX_ARRAY_CAPACITY, controller);
+    }
+
     /**
      * @param storage storage for elements; if null use first Storable to
      * prepare reloaded Storables
@@ -170,6 +191,22 @@ public class MergeSortBuffer<S extends Storable> extends AbstractCollection<S>
      */
     @SuppressWarnings("unchecked")
     public MergeSortBuffer(Storage<S> storage, String tempDir, int maxArrayCapacity) {
+        this(storage, tempDir, maxArrayCapacity, null);
+    }
+
+    /**
+     * @param storage storage for elements; if null use first Storable to
+     * prepare reloaded Storables
+     * @param tempDir directory to store temp files for merging, or null for default
+     * @param maxArrayCapacity maximum amount of storables to keep in an array
+     * before serializing to a file
+     * @param controller optional controller which can abort query operation
+     * @throws IllegalArgumentException if storage is null
+     */
+    @SuppressWarnings("unchecked")
+    public MergeSortBuffer(Storage<S> storage, String tempDir, int maxArrayCapacity,
+                           Query.Controller controller)
+    {
         mTempDir = tempDir;
         mMaxArrayCapacity = maxArrayCapacity;
 
@@ -179,6 +216,10 @@ public class MergeSortBuffer<S extends Storable> extends AbstractCollection<S>
 
         int cap = Math.min(MIN_ARRAY_CAPACITY, maxArrayCapacity);
         mElements = (S[]) new Storable[cap];
+
+        if ((mController = controller) != null) {
+            controller.begin();
+        }
     }
 
     public void prepare(Comparator<S> comparator) {
@@ -231,10 +272,10 @@ public class MergeSortBuffer<S extends Storable> extends AbstractCollection<S>
 
                 if (mFilesInUse.size() < (MAX_OPEN_FILE_COUNT - 1)) {
                     mFilesInUse.add(raf);
-                    int count = 0;
+                    byte count = 0;
                     for (S element : mElements) {
-                        // Check every so often if interrupted.
-                        interruptCheck(++count);
+                        // Check every so often if should continue.
+                        continueCheck(++count);
                         element.writeTo(out);
                     }
                 } else {
@@ -274,11 +315,11 @@ public class MergeSortBuffer<S extends Storable> extends AbstractCollection<S>
                     // as well as error out earlier, should the disk be full.
                     raf.setLength(mergedLength);
 
-                    int count = 0;
+                    byte count = 0;
                     Iterator<S> it = iterator(filesToMerge);
                     while (it.hasNext()) {
-                        // Check every so often if interrupted.
-                        interruptCheck(++count);
+                        // Check every so often if should continue.
+                        continueCheck(++count);
                         S element = it.next();
                         element.writeTo(out);
                     }
@@ -368,9 +409,16 @@ public class MergeSortBuffer<S extends Storable> extends AbstractCollection<S>
     }
 
     public void close() {
-        clear();
-        if (mWorkFilePool != null) {
-            mWorkFilePool.unregisterWorkFileUser(this);
+        try {
+            clear();
+            if (mWorkFilePool != null) {
+                mWorkFilePool.unregisterWorkFileUser(this);
+            }
+        } finally {
+            Query.Controller controller = mController;
+            if (controller != null) {
+                controller.close();
+            }
         }
     }
 
@@ -386,10 +434,20 @@ public class MergeSortBuffer<S extends Storable> extends AbstractCollection<S>
         return comparator;
     }
 
-    private void interruptCheck(int count) {
-        if ((count & ~0xff) == 0 && (mStop || Thread.interrupted())) {
-            close();
-            throw new UndeclaredThrowableException(new FetchInterruptedException());
+    private void continueCheck(byte count) {
+        if (count == 0) {
+            try {
+                Query.Controller controller = mController;
+                if (controller != null) {
+                    controller.continueCheck();
+                }
+                if (mStop) {
+                    throw new FetchInterruptedException("Shutting down");
+                }
+            } catch (FetchException e) {
+                close();
+                throw new UndeclaredThrowableException(e);
+            }
         }
     }
 
diff --git a/src/main/java/com/amazon/carbonado/cursor/MultiTransformedCursor.java b/src/main/java/com/amazon/carbonado/cursor/MultiTransformedCursor.java
index 8a1bd48..814a3ba 100644
--- a/src/main/java/com/amazon/carbonado/cursor/MultiTransformedCursor.java
+++ b/src/main/java/com/amazon/carbonado/cursor/MultiTransformedCursor.java
@@ -22,7 +22,6 @@ import java.util.NoSuchElementException;
 
 import com.amazon.carbonado.Cursor;
 import com.amazon.carbonado.FetchException;
-import com.amazon.carbonado.FetchInterruptedException;
 
 /**
  * Abstract cursor which wraps another cursor and transforms each storable
@@ -72,7 +71,6 @@ public abstract class MultiTransformedCursor<S, T> extends AbstractCursor<T> {
                 mNextCursor.close();
                 mNextCursor = null;
             }
-            int count = 0;
             while (mCursor.hasNext()) {
                 Cursor<T> nextCursor = transform(mCursor.next());
                 if (nextCursor != null) {
@@ -82,7 +80,6 @@ public abstract class MultiTransformedCursor<S, T> extends AbstractCursor<T> {
                     }
                     nextCursor.close();
                 }
-                interruptCheck(++count);
             }
         } catch (NoSuchElementException e) {
         } catch (FetchException e) {
@@ -129,7 +126,6 @@ public abstract class MultiTransformedCursor<S, T> extends AbstractCursor<T> {
                 if ((amount -= chunk) <= 0) {
                     break;
                 }
-                interruptCheck(count);
             }
 
             return count;
@@ -142,11 +138,4 @@ public abstract class MultiTransformedCursor<S, T> extends AbstractCursor<T> {
             throw e;
         }
     }
-
-    private void interruptCheck(int count) throws FetchException {
-        if ((count & ~0xff) == 0 && Thread.interrupted()) {
-            close();
-            throw new FetchInterruptedException();
-        }
-    }
 }
diff --git a/src/main/java/com/amazon/carbonado/cursor/SortedCursor.java b/src/main/java/com/amazon/carbonado/cursor/SortedCursor.java
index 5b5f9b5..e054137 100644
--- a/src/main/java/com/amazon/carbonado/cursor/SortedCursor.java
+++ b/src/main/java/com/amazon/carbonado/cursor/SortedCursor.java
@@ -32,7 +32,6 @@ import org.cojen.util.BeanProperty;
 import org.cojen.classfile.TypeDesc;
 
 import com.amazon.carbonado.FetchException;
-import com.amazon.carbonado.FetchInterruptedException;
 import com.amazon.carbonado.Cursor;
 import com.amazon.carbonado.Storable;
 
@@ -388,12 +387,7 @@ public class SortedCursor<S> extends AbstractCursor<S> {
             fill: {
                 if (matcher == null) {
                     // Buffer up entire results and sort.
-                    int count = 0;
                     while (cursor.hasNext()) {
-                        // Check every so often if interrupted.
-                        if ((++count & ~0xff) == 0 && Thread.interrupted()) {
-                            throw new FetchInterruptedException();
-                        }
                         buffer.add(cursor.next());
                     }
                     break fill;
@@ -413,13 +407,8 @@ public class SortedCursor<S> extends AbstractCursor<S> {
                 }
 
                 buffer.add(chunkStart);
-                int count = 1;
 
                 while (cursor.hasNext()) {
-                    // Check every so often if interrupted.
-                    if ((++count & ~0xff) == 0 && Thread.interrupted()) {
-                        throw new FetchInterruptedException();
-                    }
                     S next = cursor.next();
                     if (matcher.compare(chunkStart, next) != 0) {
                         // Save for reading next chunk later.
diff --git a/src/main/java/com/amazon/carbonado/cursor/TransformedCursor.java b/src/main/java/com/amazon/carbonado/cursor/TransformedCursor.java
index c05e0fc..bd41cef 100644
--- a/src/main/java/com/amazon/carbonado/cursor/TransformedCursor.java
+++ b/src/main/java/com/amazon/carbonado/cursor/TransformedCursor.java
@@ -22,7 +22,6 @@ import java.util.NoSuchElementException;
 
 import com.amazon.carbonado.Cursor;
 import com.amazon.carbonado.FetchException;
-import com.amazon.carbonado.FetchInterruptedException;
 
 /**
  * Abstract cursor which wraps another cursor and transforms each storable
@@ -64,14 +63,12 @@ public abstract class TransformedCursor<S, T> extends AbstractCursor<T> {
             return true;
         }
         try {
-            int count = 0;
             while (mCursor.hasNext()) {
                 T next = transform(mCursor.next());
                 if (next != null) {
                     mNext = next;
                     return true;
                 }
-                interruptCheck(++count);
             }
         } catch (NoSuchElementException e) {
         } catch (FetchException e) {
@@ -115,7 +112,7 @@ public abstract class TransformedCursor<S, T> extends AbstractCursor<T> {
         try {
             int count = 0;
             while (--amount >= 0 && hasNext()) {
-                interruptCheck(++count);
+                ++count;
                 mNext = null;
             }
 
@@ -129,11 +126,4 @@ public abstract class TransformedCursor<S, T> extends AbstractCursor<T> {
             throw e;
         }
     }
-
-    private void interruptCheck(int count) throws FetchException {
-        if ((count & ~0xff) == 0 && Thread.interrupted()) {
-            close();
-            throw new FetchInterruptedException();
-        }
-    }
 }
diff --git a/src/main/java/com/amazon/carbonado/qe/AbstractQuery.java b/src/main/java/com/amazon/carbonado/qe/AbstractQuery.java
index 7e857c3..44735ae 100644
--- a/src/main/java/com/amazon/carbonado/qe/AbstractQuery.java
+++ b/src/main/java/com/amazon/carbonado/qe/AbstractQuery.java
@@ -60,6 +60,13 @@ public abstract class AbstractQuery<S extends Storable> implements Query<S>, App
         return after(start).fetch();
     }
 
+    @Override
+    public <T extends S> Cursor<S> fetchAfter(T start, Controller controller)
+        throws FetchException
+    {
+        return after(start).fetch(controller);
+    }
+
     @Override
     public S loadOne() throws FetchException {
         S obj = tryLoadOne();
@@ -69,6 +76,15 @@ public abstract class AbstractQuery<S extends Storable> implements Query<S>, App
         return obj;
     }
 
+    @Override
+    public S loadOne(Controller controller) throws FetchException {
+        S obj = tryLoadOne(controller);
+        if (obj == null) {
+            throw new FetchNoneException(toString());
+        }
+        return obj;
+    }
+
     @Override
     public S tryLoadOne() throws FetchException {
         Cursor<S> cursor = fetch();
@@ -87,6 +103,24 @@ public abstract class AbstractQuery<S extends Storable> implements Query<S>, App
         }
     }
 
+    @Override
+    public S tryLoadOne(Controller controller) throws FetchException {
+        Cursor<S> cursor = fetch(controller);
+        try {
+            if (cursor.hasNext()) {
+                S obj = cursor.next();
+                if (cursor.hasNext()) {
+                    throw new FetchMultipleException(toString());
+                }
+                return obj;
+            } else {
+                return null;
+            }
+        } finally {
+            cursor.close();
+        }
+    }
+
     @Override
     public void deleteOne() throws PersistException {
         if (!tryDeleteOne()) {
@@ -94,6 +128,13 @@ public abstract class AbstractQuery<S extends Storable> implements Query<S>, App
         }
     }
 
+    @Override
+    public void deleteOne(Controller controller) throws PersistException {
+        if (!tryDeleteOne(controller)) {
+            throw new PersistNoneException(toString());
+        }
+    }
+
     @Override
     public boolean printNative() {
         try {
diff --git a/src/main/java/com/amazon/carbonado/qe/AbstractQueryExecutor.java b/src/main/java/com/amazon/carbonado/qe/AbstractQueryExecutor.java
index e35ec7a..e1921d2 100644
--- a/src/main/java/com/amazon/carbonado/qe/AbstractQueryExecutor.java
+++ b/src/main/java/com/amazon/carbonado/qe/AbstractQueryExecutor.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 
 import com.amazon.carbonado.Cursor;
 import com.amazon.carbonado.FetchException;
+import com.amazon.carbonado.Query;
 import com.amazon.carbonado.Storable;
 
 import com.amazon.carbonado.cursor.LimitCursor;
@@ -56,6 +57,26 @@ public abstract class AbstractQueryExecutor<S extends Storable> implements Query
         return cursor;
     }
 
+    /**
+     * Produces a slice via skip and limit cursors. Subclasses are encouraged
+     * to override with a more efficient implementation.
+     *
+     * @since 1.2
+     */
+    public Cursor<S> fetchSlice(FilterValues<S> values, long from, Long to,
+                                Query.Controller controller)
+        throws FetchException
+    {
+        Cursor<S> cursor = fetch(values, controller);
+        if (from > 0) {
+            cursor = new SkipCursor<S>(cursor, from);
+        }
+        if (to != null) {
+            cursor = new LimitCursor<S>(cursor, to - from);
+        }
+        return cursor;
+    }
+
     /**
      * Counts results by opening a cursor and skipping entries. Subclasses are
      * encouraged to override with a more efficient implementation.
@@ -76,6 +97,26 @@ public abstract class AbstractQueryExecutor<S extends Storable> implements Query
         }
     }
 
+    /**
+     * Counts results by opening a cursor and skipping entries. Subclasses are
+     * encouraged to override with a more efficient implementation.
+     */
+    public long count(FilterValues<S> values, Query.Controller controller) throws FetchException {
+        Cursor<S> cursor = fetch(values, controller);
+        try {
+            long count = cursor.skipNext(Integer.MAX_VALUE);
+            if (count == Integer.MAX_VALUE) {
+                int amt;
+                while ((amt = cursor.skipNext(Integer.MAX_VALUE)) > 0) {
+                    count += amt;
+                }
+            }
+            return count;
+        } finally {
+            cursor.close();
+        }
+    }
+
     /**
      * Does nothing and returns false.
      */
diff --git a/src/main/java/com/amazon/carbonado/qe/DelegatedQueryExecutor.java b/src/main/java/com/amazon/carbonado/qe/DelegatedQueryExecutor.java
index f9be52a..fa8d9e5 100644
--- a/src/main/java/com/amazon/carbonado/qe/DelegatedQueryExecutor.java
+++ b/src/main/java/com/amazon/carbonado/qe/DelegatedQueryExecutor.java
@@ -95,14 +95,31 @@ public class DelegatedQueryExecutor<S extends Storable> implements QueryExecutor
         return applyFilterValues(values).fetch();
     }
 
+    public Cursor<S> fetch(FilterValues<S> values, Query.Controller controller)
+        throws FetchException
+    {
+        return applyFilterValues(values).fetch(controller);
+    }
+
     public Cursor<S> fetchSlice(FilterValues<S> values, long from, Long to) throws FetchException {
         return applyFilterValues(values).fetchSlice(from, to);
     }
 
+    public Cursor<S> fetchSlice(FilterValues<S> values, long from, Long to,
+                                Query.Controller controller)
+        throws FetchException
+    {
+        return applyFilterValues(values).fetchSlice(from, to, controller);
+    }
+
     public long count(FilterValues<S> values) throws FetchException {
         return applyFilterValues(values).count();
     }
 
+    public long count(FilterValues<S> values, Query.Controller controller) throws FetchException {
+        return applyFilterValues(values).count(controller);
+    }
+
     public Filter<S> getFilter() {
         return mFilter;
     }
diff --git a/src/main/java/com/amazon/carbonado/qe/EmptyQuery.java b/src/main/java/com/amazon/carbonado/qe/EmptyQuery.java
index 62418a1..fc7ea0c 100644
--- a/src/main/java/com/amazon/carbonado/qe/EmptyQuery.java
+++ b/src/main/java/com/amazon/carbonado/qe/EmptyQuery.java
@@ -237,6 +237,14 @@ public final class EmptyQuery<S extends Storable> extends AbstractQuery<S> {
         return EmptyCursor.the();
     }
 
+    /**
+     * Always returns an {@link EmptyCursor}.
+     */
+    @Override
+    public Cursor<S> fetch(Controller controller) {
+        return EmptyCursor.the();
+    }
+
     /**
      * Always returns an {@link EmptyCursor}.
      */
@@ -246,6 +254,14 @@ public final class EmptyQuery<S extends Storable> extends AbstractQuery<S> {
         return EmptyCursor.the();
     }
 
+    /**
+     * Always returns an {@link EmptyCursor}.
+     */
+    @Override
+    public Cursor<S> fetchSlice(long from, Long to, Controller controller) {
+        return fetchSlice(from, to);
+    }
+
     /**
      * Always throws {@link PersistNoneException}.
      */
@@ -254,6 +270,14 @@ public final class EmptyQuery<S extends Storable> extends AbstractQuery<S> {
         throw new PersistNoneException();
     }
 
+    /**
+     * Always throws {@link PersistNoneException}.
+     */
+    @Override
+    public void deleteOne(Controller controller) throws PersistNoneException {
+        throw new PersistNoneException();
+    }
+
     /**
      * Always returns false.
      */
@@ -262,6 +286,14 @@ public final class EmptyQuery<S extends Storable> extends AbstractQuery<S> {
         return false;
     }
 
+    /**
+     * Always returns false.
+     */
+    @Override
+    public boolean tryDeleteOne(Controller controller) {
+        return false;
+    }
+
     /**
      * Does nothing.
      */
@@ -269,6 +301,13 @@ public final class EmptyQuery<S extends Storable> extends AbstractQuery<S> {
     public void deleteAll() {
     }
 
+    /**
+     * Does nothing.
+     */
+    @Override
+    public void deleteAll(Controller controller) {
+    }
+
     /**
      * Always returns zero.
      */
@@ -277,6 +316,14 @@ public final class EmptyQuery<S extends Storable> extends AbstractQuery<S> {
         return 0;
     }
 
+    /**
+     * Always returns zero.
+     */
+    @Override
+    public long count(Controller controller) {
+        return 0;
+    }
+
     /**
      * Always returns false.
      */
@@ -285,6 +332,14 @@ public final class EmptyQuery<S extends Storable> extends AbstractQuery<S> {
         return false;
     }
 
+    /**
+     * Always returns false.
+     */
+    @Override
+    public boolean exists(Controller controller) {
+        return false;
+    }
+
     @Override
     public void appendTo(Appendable app) throws IOException {
         app.append("Query {type=");
diff --git a/src/main/java/com/amazon/carbonado/qe/FilteredQueryExecutor.java b/src/main/java/com/amazon/carbonado/qe/FilteredQueryExecutor.java
index 770c1bc..17f3105 100644
--- a/src/main/java/com/amazon/carbonado/qe/FilteredQueryExecutor.java
+++ b/src/main/java/com/amazon/carbonado/qe/FilteredQueryExecutor.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 
 import com.amazon.carbonado.Cursor;
 import com.amazon.carbonado.FetchException;
+import com.amazon.carbonado.Query;
 import com.amazon.carbonado.Storable;
 
 import com.amazon.carbonado.cursor.FilteredCursor;
@@ -64,6 +65,12 @@ public class FilteredQueryExecutor<S extends Storable> extends AbstractQueryExec
         return FilteredCursor.applyFilter(mFilter, values, mExecutor.fetch(values));
     }
 
+    public Cursor<S> fetch(FilterValues<S> values, Query.Controller controller)
+        throws FetchException
+    {
+        return FilteredCursor.applyFilter(mFilter, values, mExecutor.fetch(values, controller));
+    }
+
     /**
      * Returns the combined filter of the wrapped executor and the extra filter.
      */
diff --git a/src/main/java/com/amazon/carbonado/qe/FullScanQueryExecutor.java b/src/main/java/com/amazon/carbonado/qe/FullScanQueryExecutor.java
index 9c7fd05..9a41fda 100644
--- a/src/main/java/com/amazon/carbonado/qe/FullScanQueryExecutor.java
+++ b/src/main/java/com/amazon/carbonado/qe/FullScanQueryExecutor.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 
 import com.amazon.carbonado.Cursor;
 import com.amazon.carbonado.FetchException;
+import com.amazon.carbonado.Query;
 import com.amazon.carbonado.Storable;
 
 import com.amazon.carbonado.filter.Filter;
@@ -58,6 +59,15 @@ public class FullScanQueryExecutor<S extends Storable> extends AbstractQueryExec
         return count;
     }
 
+    @Override
+    public long count(FilterValues<S> values, Query.Controller controller) throws FetchException {
+        long count = mSupport.countAll(controller);
+        if (count == -1) {
+            count = super.count(values, controller);
+        }
+        return count;
+    }
+
     /**
      * Returns an open filter.
      */
@@ -69,6 +79,12 @@ public class FullScanQueryExecutor<S extends Storable> extends AbstractQueryExec
         return mSupport.fetchAll();
     }
 
+    public Cursor<S> fetch(FilterValues<S> values, Query.Controller controller)
+        throws FetchException
+    {
+        return mSupport.fetchAll(controller);
+    }
+
     /**
      * Returns an empty list.
      */
@@ -98,9 +114,24 @@ public class FullScanQueryExecutor<S extends Storable> extends AbstractQueryExec
          */
         long countAll() throws FetchException;
 
+        /**
+         * Counts all Storables. Implementation may return -1 to indicate that
+         * default count algorithm should be used.
+         *
+         * @param controller optional controller which can abort query operation
+         */
+        long countAll(Query.Controller controller) throws FetchException;
+
         /**
          * Perform a full scan of all Storables.
          */
         Cursor<S> fetchAll() throws FetchException;
+
+        /**
+         * Perform a full scan of all Storables.
+         *
+         * @param controller optional controller which can abort query operation
+         */
+        Cursor<S> fetchAll(Query.Controller controller) throws FetchException;
     }
 }
diff --git a/src/main/java/com/amazon/carbonado/qe/IndexedQueryExecutor.java b/src/main/java/com/amazon/carbonado/qe/IndexedQueryExecutor.java
index c3853d2..b8a39b2 100644
--- a/src/main/java/com/amazon/carbonado/qe/IndexedQueryExecutor.java
+++ b/src/main/java/com/amazon/carbonado/qe/IndexedQueryExecutor.java
@@ -120,6 +120,12 @@ public class IndexedQueryExecutor<S extends Storable> extends AbstractQueryExecu
     }
 
     public Cursor<S> fetch(FilterValues<S> values) throws FetchException {
+        return fetch(values, null);
+    }
+
+    public Cursor<S> fetch(FilterValues<S> values, Query.Controller controller)
+        throws FetchException
+    {
         Object[] identityValues = null;
         Object rangeStartValue = null;
         Object rangeEndValue = null;
@@ -182,7 +188,8 @@ public class IndexedQueryExecutor<S extends Storable> extends AbstractQueryExecu
                                         rangeStartBoundary, rangeStartValue,
                                         rangeEndBoundary, rangeEndValue,
                                         mReverseRange,
-                                        mReverseOrder);
+                                        mReverseOrder,
+                                        controller);
         } else {
             indexEntryQuery = indexEntryQuery.withValues(identityValues);
             if (rangeStartBoundary != BoundaryType.OPEN) {
@@ -194,7 +201,7 @@ public class IndexedQueryExecutor<S extends Storable> extends AbstractQueryExecu
             if (mCoveringFilter != null && values != null) {
                 indexEntryQuery = indexEntryQuery.withValues(values.getValuesFor(mCoveringFilter));
             }
-            return mSupport.fetchFromIndexEntryQuery(mIndex, indexEntryQuery);
+            return mSupport.fetchFromIndexEntryQuery(mIndex, indexEntryQuery, controller);
         }
     }
 
@@ -414,6 +421,20 @@ public class IndexedQueryExecutor<S extends Storable> extends AbstractQueryExecu
         Cursor<S> fetchFromIndexEntryQuery(StorableIndex<S> index, Query<?> indexEntryQuery)
             throws FetchException;
 
+        /**
+         * Fetch Storables referenced by the given index entry query. This
+         * method is only called if index supports query access.
+         *
+         * @param index index to open
+         * @param indexEntryQuery query with no blank parameters, derived from
+         * the query returned by indexEntryQuery
+         * @param controller optional controller which can abort query operation
+         * @since 1.2
+         */
+        Cursor<S> fetchFromIndexEntryQuery(StorableIndex<S> index, Query<?> indexEntryQuery,
+                                           Query.Controller controller)
+            throws FetchException;
+
         /**
          * Perform an index scan of a subset of Storables referenced by an
          * index. The identity values are aligned with the index properties at
@@ -445,5 +466,39 @@ public class IndexedQueryExecutor<S extends Storable> extends AbstractQueryExecu
                               boolean reverseRange,
                               boolean reverseOrder)
             throws FetchException;
+
+        /**
+         * Perform an index scan of a subset of Storables referenced by an
+         * index. The identity values are aligned with the index properties at
+         * property 0. An optional range start or range end aligns with the index
+         * property following the last of the identity values.
+         *
+         * <p>This method is only called if no index entry query was provided
+         * for the given index.
+         *
+         * @param index index to open, which may be a primary key index
+         * @param identityValues optional list of exactly matching values to apply to index
+         * @param rangeStartBoundary start boundary type
+         * @param rangeStartValue value to start at if boundary is not open
+         * @param rangeEndBoundary end boundary type
+         * @param rangeEndValue value to end at if boundary is not open
+         * @param reverseRange indicates that range operates on a property whose
+         * natural order is descending. Only the code that opens the physical
+         * cursor should examine this parameter. If true, then the range start and
+         * end parameter pairs need to be swapped.
+         * @param reverseOrder when true, iteration should be reversed from its
+         * natural order
+         * @param controller optional controller which can abort query operation
+         */
+        Cursor<S> fetchSubset(StorableIndex<S> index,
+                              Object[] identityValues,
+                              BoundaryType rangeStartBoundary,
+                              Object rangeStartValue,
+                              BoundaryType rangeEndBoundary,
+                              Object rangeEndValue,
+                              boolean reverseRange,
+                              boolean reverseOrder,
+                              Query.Controller controller)
+            throws FetchException;
     }
 }
diff --git a/src/main/java/com/amazon/carbonado/qe/IterableQueryExecutor.java b/src/main/java/com/amazon/carbonado/qe/IterableQueryExecutor.java
index 9e04a32..2623bb3 100644
--- a/src/main/java/com/amazon/carbonado/qe/IterableQueryExecutor.java
+++ b/src/main/java/com/amazon/carbonado/qe/IterableQueryExecutor.java
@@ -23,11 +23,13 @@ import java.io.IOException;
 import java.util.concurrent.locks.Lock;
 
 import com.amazon.carbonado.Cursor;
+import com.amazon.carbonado.Query;
 import com.amazon.carbonado.Storable;
 
 import com.amazon.carbonado.filter.Filter;
 import com.amazon.carbonado.filter.FilterValues;
 
+import com.amazon.carbonado.cursor.ControllerCursor;
 import com.amazon.carbonado.cursor.IteratorCursor;
 
 /**
@@ -76,6 +78,10 @@ public class IterableQueryExecutor<S extends Storable> extends AbstractQueryExec
         return new IteratorCursor<S>(mIterable, mLock);
     }
 
+    public Cursor<S> fetch(FilterValues<S> values, Query.Controller controller) {
+        return ControllerCursor.apply(new IteratorCursor<S>(mIterable, mLock), controller);
+    }
+
     /**
      * Returns an empty list.
      */
diff --git a/src/main/java/com/amazon/carbonado/qe/JoinedQueryExecutor.java b/src/main/java/com/amazon/carbonado/qe/JoinedQueryExecutor.java
index 9a77f26..4e98df4 100644
--- a/src/main/java/com/amazon/carbonado/qe/JoinedQueryExecutor.java
+++ b/src/main/java/com/amazon/carbonado/qe/JoinedQueryExecutor.java
@@ -36,6 +36,7 @@ import org.cojen.util.ClassInjector;
 import com.amazon.carbonado.Cursor;
 import com.amazon.carbonado.FetchException;
 import com.amazon.carbonado.RepositoryException;
+import com.amazon.carbonado.Query;
 import com.amazon.carbonado.Storable;
 import com.amazon.carbonado.cursor.MultiTransformedCursor;
 
@@ -190,6 +191,7 @@ public class JoinedQueryExecutor<S extends Storable, T extends Storable>
 
     private static final String INNER_LOOP_EX_FIELD_NAME = "innerLoopExecutor";
     private static final String INNER_LOOP_FV_FIELD_NAME = "innerLoopFilterValues";
+    private static final String INNER_LOOP_CONTROLLER_FIELD_NAME = "innerLoopController";
     private static final String ACTIVE_SOURCE_FIELD_NAME = "active";
 
     private static final SoftValuedCache<StorableProperty, Class> cJoinerCursorClassCache;
@@ -240,11 +242,14 @@ public class JoinedQueryExecutor<S extends Storable, T extends Storable>
         final TypeDesc cursorType = TypeDesc.forClass(Cursor.class);
         final TypeDesc queryExecutorType = TypeDesc.forClass(QueryExecutor.class);
         final TypeDesc filterValuesType = TypeDesc.forClass(FilterValues.class);
+        final TypeDesc controllerType = TypeDesc.forClass(Query.Controller.class);
 
         // Define fields for inner loop executor and filter values, which are
         // passed into the constructor.
         cf.addField(Modifiers.PRIVATE.toFinal(true), INNER_LOOP_EX_FIELD_NAME, queryExecutorType);
         cf.addField(Modifiers.PRIVATE.toFinal(true), INNER_LOOP_FV_FIELD_NAME, filterValuesType);
+        cf.addField(Modifiers.PRIVATE.toFinal(true),
+                    INNER_LOOP_CONTROLLER_FIELD_NAME, controllerType);
 
         // If target storable can set a reference to the joined source
         // storable, then stash a copy of it as we go. This way, when user of
@@ -259,7 +264,7 @@ public class JoinedQueryExecutor<S extends Storable, T extends Storable>
 
         // Define constructor.
         {
-            TypeDesc[] params = {cursorType, queryExecutorType, filterValuesType};
+            TypeDesc[] params = {cursorType, queryExecutorType, filterValuesType, controllerType};
 
             MethodInfo mi = cf.addConstructor(Modifiers.PUBLIC, params);
             CodeBuilder b = new CodeBuilder(mi);
@@ -276,6 +281,10 @@ public class JoinedQueryExecutor<S extends Storable, T extends Storable>
             b.loadLocal(b.getParameter(2));
             b.storeField(INNER_LOOP_FV_FIELD_NAME, filterValuesType);
 
+            b.loadThis();
+            b.loadLocal(b.getParameter(3));
+            b.storeField(INNER_LOOP_CONTROLLER_FIELD_NAME, controllerType);
+
             b.returnVoid();
         }
 
@@ -317,9 +326,12 @@ public class JoinedQueryExecutor<S extends Storable, T extends Storable>
                                 new TypeDesc[] {bindType});
             }
 
+            b.loadThis();
+            b.loadField(INNER_LOOP_CONTROLLER_FIELD_NAME, controllerType);
+
             // Now fetch and return.
             b.invokeInterface(queryExecutorType, "fetch", cursorType,
-                              new TypeDesc[] {filterValuesType});
+                              new TypeDesc[] {filterValuesType, controllerType});
             b.returnValue(cursorType);
         }
 
@@ -570,6 +582,12 @@ public class JoinedQueryExecutor<S extends Storable, T extends Storable>
     }
 
     public Cursor<T> fetch(FilterValues<T> values) throws FetchException {
+        return fetch(values, null);
+    }
+
+    public Cursor<T> fetch(FilterValues<T> values, Query.Controller controller)
+        throws FetchException
+    {
         FilterValues<T> innerLoopFilterValues = mInnerLoopFilterValues;
 
         if (mTargetFilter != null) {
@@ -578,10 +596,14 @@ public class JoinedQueryExecutor<S extends Storable, T extends Storable>
                 .withValues(values.getValuesFor(mTargetFilter));
         }
 
-        Cursor<S> outerLoopCursor = mOuterLoopExecutor.fetch(transferValues(values));
+        if (controller != null) {
+            controller.begin();
+        }
+
+        Cursor<S> outerLoopCursor = mOuterLoopExecutor.fetch(transferValues(values), controller);
 
         return mJoinerFactory.newJoinedCursor
-            (outerLoopCursor, mInnerLoopExecutor, innerLoopFilterValues);
+            (outerLoopCursor, mInnerLoopExecutor, innerLoopFilterValues, controller);
     }
 
     public Filter<T> getFilter() {
@@ -628,7 +650,8 @@ public class JoinedQueryExecutor<S extends Storable, T extends Storable>
         public static interface Factory<S, T extends Storable> {
             Cursor<T> newJoinedCursor(Cursor<S> outerLoopCursor,
                                       QueryExecutor<T> innerLoopExecutor,
-                                      FilterValues<T> innerLoopFilterValues);
+                                      FilterValues<T> innerLoopFilterValues,
+                                      Query.Controller innerLoopController);
         }
     }
 }
diff --git a/src/main/java/com/amazon/carbonado/qe/KeyQueryExecutor.java b/src/main/java/com/amazon/carbonado/qe/KeyQueryExecutor.java
index 1cc0b59..dacda80 100644
--- a/src/main/java/com/amazon/carbonado/qe/KeyQueryExecutor.java
+++ b/src/main/java/com/amazon/carbonado/qe/KeyQueryExecutor.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 
 import com.amazon.carbonado.Cursor;
 import com.amazon.carbonado.FetchException;
+import com.amazon.carbonado.Query;
 import com.amazon.carbonado.Storable;
 
 import com.amazon.carbonado.filter.Filter;
@@ -72,6 +73,12 @@ public class KeyQueryExecutor<S extends Storable> extends AbstractQueryExecutor<
         return mSupport.fetchOne(mIndex, values.getValuesFor(mKeyFilter));
     }
 
+    public Cursor<S> fetch(FilterValues<S> values, Query.Controller controller)
+        throws FetchException
+    {
+        return mSupport.fetchOne(mIndex, values.getValuesFor(mKeyFilter), controller);
+    }
+
     public Filter<S> getFilter() {
         return mKeyFilter;
     }
@@ -115,5 +122,18 @@ public class KeyQueryExecutor<S extends Storable> extends AbstractQueryExecutor<
          */
         Cursor<S> fetchOne(StorableIndex<S> index, Object[] identityValues)
             throws FetchException;
+
+        /**
+         * Select at most one Storable referenced by an index. The identity
+         * values fully specify all elements of the index, and the index is
+         * unique.
+         *
+         * @param controller optional controller which can abort query operation
+         * @param index index to open, which may be a primary key index
+         * @param identityValues of exactly matching values to apply to index
+         */
+        Cursor<S> fetchOne(StorableIndex<S> index, Object[] identityValues,
+                           Query.Controller controller)
+            throws FetchException;
     }
 }
diff --git a/src/main/java/com/amazon/carbonado/qe/QueryExecutor.java b/src/main/java/com/amazon/carbonado/qe/QueryExecutor.java
index ab310b0..9ba259b 100644
--- a/src/main/java/com/amazon/carbonado/qe/QueryExecutor.java
+++ b/src/main/java/com/amazon/carbonado/qe/QueryExecutor.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 
 import com.amazon.carbonado.Cursor;
 import com.amazon.carbonado.FetchException;
+import com.amazon.carbonado.Query;
 import com.amazon.carbonado.Storable;
 
 import com.amazon.carbonado.filter.Filter;
@@ -45,6 +46,13 @@ public interface QueryExecutor<S extends Storable> {
      */
     Cursor<S> fetch(FilterValues<S> values) throws FetchException;
 
+    /**
+     * Returns a new cursor using the given filter values.
+     *
+     * @param controller optional controller which can abort query operation
+     */
+    Cursor<S> fetch(FilterValues<S> values, Query.Controller controller) throws FetchException;
+
     /**
      * Returns a new cursor using the given filter values and slice.
      *
@@ -52,11 +60,27 @@ public interface QueryExecutor<S extends Storable> {
      */
     Cursor<S> fetchSlice(FilterValues<S> values, long from, Long to) throws FetchException;
 
+    /**
+     * Returns a new cursor using the given filter values and slice.
+     *
+     * @param controller optional controller which can abort query operation
+     * @since 1.2
+     */
+    Cursor<S> fetchSlice(FilterValues<S> values, long from, Long to, Query.Controller controller)
+        throws FetchException;
+
     /**
      * Counts the query results using the given filter values.
      */
     long count(FilterValues<S> values) throws FetchException;
 
+    /**
+     * Counts the query results using the given filter values.
+     *
+     * @param controller optional controller which can abort query operation
+     */
+    long count(FilterValues<S> values, Query.Controller controller) throws FetchException;
+
     /**
      * Returns the filter used by this QueryExecutor.
      *
diff --git a/src/main/java/com/amazon/carbonado/qe/SortedQueryExecutor.java b/src/main/java/com/amazon/carbonado/qe/SortedQueryExecutor.java
index 82d0ef6..67f739d 100644
--- a/src/main/java/com/amazon/carbonado/qe/SortedQueryExecutor.java
+++ b/src/main/java/com/amazon/carbonado/qe/SortedQueryExecutor.java
@@ -24,9 +24,11 @@ import java.util.Comparator;
 
 import com.amazon.carbonado.Cursor;
 import com.amazon.carbonado.FetchException;
+import com.amazon.carbonado.Query;
 import com.amazon.carbonado.Storable;
 
 import com.amazon.carbonado.cursor.ArraySortBuffer;
+import com.amazon.carbonado.cursor.ControllerCursor;
 import com.amazon.carbonado.cursor.MergeSortBuffer;
 import com.amazon.carbonado.cursor.SortBuffer;
 import com.amazon.carbonado.cursor.SortedCursor;
@@ -106,11 +108,28 @@ public class SortedQueryExecutor<S extends Storable> extends AbstractQueryExecut
         return new SortedCursor<S>(cursor, buffer, mHandledComparator, mFinisherComparator);
     }
 
+    public Cursor<S> fetch(FilterValues<S> values, Query.Controller controller)
+        throws FetchException
+    {
+        Cursor<S> cursor = mExecutor.fetch(values, controller);
+        SortBuffer<S> buffer = mSupport.createSortBuffer(controller);
+        // Apply the controller around the cursor to ensure timeouts are
+        // honored even when the caller is slowly iterating over the cursor.
+        return ControllerCursor.apply
+            (new SortedCursor<S>(cursor, buffer, mHandledComparator, mFinisherComparator),
+             controller);
+    }
+
     @Override
     public long count(FilterValues<S> values) throws FetchException {
         return mExecutor.count(values);
     }
 
+    @Override
+    public long count(FilterValues<S> values, Query.Controller controller) throws FetchException {
+        return mExecutor.count(values, controller);
+    }
+
     public Filter<S> getFilter() {
         return mExecutor.getFilter();
     }
@@ -152,6 +171,13 @@ public class SortedQueryExecutor<S extends Storable> extends AbstractQueryExecut
          * Implementation must return an empty buffer for sorting.
          */
         SortBuffer<S> createSortBuffer();
+
+        /**
+         * Implementation must return an empty buffer for sorting.
+         *
+         * @param controller optional controller which can abort query operation
+         */
+        SortBuffer<S> createSortBuffer(Query.Controller controller);
     }
 
     /**
@@ -164,6 +190,14 @@ public class SortedQueryExecutor<S extends Storable> extends AbstractQueryExecut
         public SortBuffer<S> createSortBuffer() {
             return new ArraySortBuffer<S>();
         }
+
+        /**
+         * Returns a new ArraySortBuffer.
+         */
+        public SortBuffer<S> createSortBuffer(Query.Controller controller) {
+            // Java sort utility doesn't support any kind of controller.
+            return new ArraySortBuffer<S>();
+        }
     }
 
     /**
@@ -176,5 +210,12 @@ public class SortedQueryExecutor<S extends Storable> extends AbstractQueryExecut
         public SortBuffer<S> createSortBuffer() {
             return new MergeSortBuffer<S>();
         }
+
+        /**
+         * Returns a new MergeSortBuffer.
+         */
+        public SortBuffer<S> createSortBuffer(Query.Controller controller) {
+            return new MergeSortBuffer<S>(controller);
+        }
     }
 }
diff --git a/src/main/java/com/amazon/carbonado/qe/StandardQuery.java b/src/main/java/com/amazon/carbonado/qe/StandardQuery.java
index 0349ae2..bce32ed 100644
--- a/src/main/java/com/amazon/carbonado/qe/StandardQuery.java
+++ b/src/main/java/com/amazon/carbonado/qe/StandardQuery.java
@@ -287,15 +287,29 @@ public abstract class StandardQuery<S extends Storable> extends AbstractQuery<S>
         }
     }
 
+    @Override
+    public Cursor<S> fetch(Controller controller) throws FetchException {
+        try {
+            return executor().fetch(mValues, controller);
+        } catch (RepositoryException e) {
+            throw e.toFetchException();
+        }
+    }
+
     @Override
     public Cursor<S> fetchSlice(long from, Long to) throws FetchException {
+        return fetchSlice(from, to, null);
+    }
+
+    @Override
+    public Cursor<S> fetchSlice(long from, Long to, Controller controller) throws FetchException {
         if (!checkSliceArguments(from, to)) {
-            return fetch();
+            return fetch(controller);
         }
         try {
             QueryHints hints = QueryHints.emptyHints().with(QueryHint.CONSUME_SLICE);
             return executorFactory().executor(mFilter, mOrdering, hints)
-                .fetchSlice(mValues, from, to);
+                .fetchSlice(mValues, from, to, controller);
         } catch (RepositoryException e) {
             throw e.toFetchException();
         }
@@ -303,9 +317,14 @@ public abstract class StandardQuery<S extends Storable> extends AbstractQuery<S>
 
     @Override
     public boolean tryDeleteOne() throws PersistException {
+        return tryDeleteOne(null);
+    }
+
+    @Override
+    public boolean tryDeleteOne(Controller controller) throws PersistException {
         Transaction txn = enterTransaction(IsolationLevel.READ_COMMITTED);
         try {
-            Cursor<S> cursor = fetch();
+            Cursor<S> cursor = fetch(controller);
             boolean result;
             try {
                 if (cursor.hasNext()) {
@@ -335,9 +354,14 @@ public abstract class StandardQuery<S extends Storable> extends AbstractQuery<S>
 
     @Override
     public void deleteAll() throws PersistException {
+        deleteAll(null);
+    }
+
+    @Override
+    public void deleteAll(Controller controller) throws PersistException {
         Transaction txn = enterTransaction(IsolationLevel.READ_COMMITTED);
         try {
-            Cursor<S> cursor = fetch();
+            Cursor<S> cursor = fetch(controller);
             try {
                 while (cursor.hasNext()) {
                     cursor.next().tryDelete();
@@ -366,9 +390,23 @@ public abstract class StandardQuery<S extends Storable> extends AbstractQuery<S>
         }
     }
 
+    @Override
+    public long count(Controller controller) throws FetchException {
+        try {
+            return executor().count(mValues, controller);
+        } catch (RepositoryException e) {
+            throw e.toFetchException();
+        }
+    }
+
     @Override
     public boolean exists() throws FetchException {
-        Cursor<S> cursor = fetchSlice(0L, 1L);
+        return exists(null);
+    }
+
+    @Override
+    public boolean exists(Controller controller) throws FetchException {
+        Cursor<S> cursor = fetchSlice(0L, 1L, controller);
         try {
             return cursor.skipNext(1) > 0;
         } finally {
diff --git a/src/main/java/com/amazon/carbonado/qe/UnionQueryExecutor.java b/src/main/java/com/amazon/carbonado/qe/UnionQueryExecutor.java
index 09a3174..48152bb 100644
--- a/src/main/java/com/amazon/carbonado/qe/UnionQueryExecutor.java
+++ b/src/main/java/com/amazon/carbonado/qe/UnionQueryExecutor.java
@@ -26,6 +26,7 @@ import java.util.List;
 
 import com.amazon.carbonado.Cursor;
 import com.amazon.carbonado.FetchException;
+import com.amazon.carbonado.Query;
 import com.amazon.carbonado.Storable;
 
 import com.amazon.carbonado.cursor.SortedCursor;
@@ -97,9 +98,15 @@ public class UnionQueryExecutor<S extends Storable> extends AbstractQueryExecuto
     }
 
     public Cursor<S> fetch(FilterValues<S> values) throws FetchException {
+        return fetch(values, null);
+    }
+
+    public Cursor<S> fetch(FilterValues<S> values, Query.Controller controller)
+        throws FetchException
+    {
         Cursor<S> cursor = null;
         for (QueryExecutor<S> executor : mExecutors) {
-            Cursor<S> subCursor = executor.fetch(values);
+            Cursor<S> subCursor = executor.fetch(values, controller);
             cursor = (cursor == null) ? subCursor
                 : new UnionCursor<S>(cursor, subCursor, mOrderComparator);
         }
diff --git a/src/main/java/com/amazon/carbonado/raw/GenericStorableCodecFactory.java b/src/main/java/com/amazon/carbonado/raw/GenericStorableCodecFactory.java
index 6c9363f..2b15294 100644
--- a/src/main/java/com/amazon/carbonado/raw/GenericStorableCodecFactory.java
+++ b/src/main/java/com/amazon/carbonado/raw/GenericStorableCodecFactory.java
@@ -89,8 +89,9 @@ public class GenericStorableCodecFactory implements StorableCodecFactory {
                                                                     RawSupport support)
         throws SupportException
     {
+        LayoutOptions options = layout == null ? getLayoutOptions(type) : layout.getOptions();
         return GenericStorableCodec.getInstance
-            (this, createStrategy(type, pkIndex, null), isMaster, layout, support);
+            (this, createStrategy(type, pkIndex, options), isMaster, layout, support);
     }
 
     /**
diff --git a/src/main/java/com/amazon/carbonado/repo/indexed/IndexedStorage.java b/src/main/java/com/amazon/carbonado/repo/indexed/IndexedStorage.java
index 4635f2c..d115a3b 100644
--- a/src/main/java/com/amazon/carbonado/repo/indexed/IndexedStorage.java
+++ b/src/main/java/com/amazon/carbonado/repo/indexed/IndexedStorage.java
@@ -191,14 +191,26 @@ class IndexedStorage<S extends Storable> implements Storage<S>, StorageAccess<S>
         return new MergeSortBuffer<S>();
     }
 
+    public SortBuffer<S> createSortBuffer(Query.Controller controller) {
+        return new MergeSortBuffer<S>(controller);
+    }
+
     public long countAll() throws FetchException {
         return mMasterStorage.query().count();
     }
 
+    public long countAll(Query.Controller controller) throws FetchException {
+        return mMasterStorage.query().count(controller);
+    }
+
     public Cursor<S> fetchAll() throws FetchException {
         return mMasterStorage.query().fetch();
     }
 
+    public Cursor<S> fetchAll(Query.Controller controller) throws FetchException {
+        return mMasterStorage.query().fetch(controller);
+    }
+
     public Cursor<S> fetchOne(StorableIndex<S> index,
                               Object[] identityValues)
         throws FetchException
@@ -207,6 +219,15 @@ class IndexedStorage<S extends Storable> implements Storage<S>, StorageAccess<S>
         return indexInfo.fetchOne(this, identityValues);
     }
 
+    public Cursor<S> fetchOne(StorableIndex<S> index,
+                              Object[] identityValues,
+                              Query.Controller controller)
+        throws FetchException
+    {
+        ManagedIndex<S> indexInfo = (ManagedIndex<S>) mAllIndexInfoMap.get(index);
+        return indexInfo.fetchOne(this, identityValues, controller);
+    }
+
     public Query<?> indexEntryQuery(StorableIndex<S> index)
         throws FetchException
     {
@@ -221,6 +242,14 @@ class IndexedStorage<S extends Storable> implements Storage<S>, StorageAccess<S>
         return indexInfo.fetchFromIndexEntryQuery(this, indexEntryQuery);
     }
 
+    public Cursor<S> fetchFromIndexEntryQuery(StorableIndex<S> index, Query<?> indexEntryQuery,
+                                              Query.Controller controller)
+        throws FetchException
+    {
+        ManagedIndex<S> indexInfo = (ManagedIndex<S>) mAllIndexInfoMap.get(index);
+        return indexInfo.fetchFromIndexEntryQuery(this, indexEntryQuery, controller);
+    }
+
     public Cursor<S> fetchSubset(StorableIndex<S> index,
                                  Object[] identityValues,
                                  BoundaryType rangeStartBoundary,
@@ -235,6 +264,21 @@ class IndexedStorage<S extends Storable> implements Storage<S>, StorageAccess<S>
         throw new UnsupportedOperationException();
     }
 
+    public Cursor<S> fetchSubset(StorableIndex<S> index,
+                                 Object[] identityValues,
+                                 BoundaryType rangeStartBoundary,
+                                 Object rangeStartValue,
+                                 BoundaryType rangeEndBoundary,
+                                 Object rangeEndValue,
+                                 boolean reverseRange,
+                                 boolean reverseOrder,
+                                 Query.Controller controller)
+        throws FetchException
+    {
+        // This method should never be called since a query was returned by indexEntryQuery.
+        throw new UnsupportedOperationException();
+    }
+
     private void registerIndex(ManagedIndex<S> managedIndex)
         throws RepositoryException
     {
diff --git a/src/main/java/com/amazon/carbonado/repo/indexed/ManagedIndex.java b/src/main/java/com/amazon/carbonado/repo/indexed/ManagedIndex.java
index 2bb2f49..6c28619 100644
--- a/src/main/java/com/amazon/carbonado/repo/indexed/ManagedIndex.java
+++ b/src/main/java/com/amazon/carbonado/repo/indexed/ManagedIndex.java
@@ -172,6 +172,13 @@ class ManagedIndex<S extends Storable> implements IndexEntryAccessor<S> {
 
     Cursor<S> fetchOne(IndexedStorage storage, Object[] identityValues)
         throws FetchException
+    {
+        return fetchOne(storage, identityValues, null);
+    }
+
+    Cursor<S> fetchOne(IndexedStorage storage, Object[] identityValues,
+                       Query.Controller controller)
+        throws FetchException
     {
         Query<?> query = mSingleMatchQuery;
 
@@ -184,13 +191,20 @@ class ManagedIndex<S extends Storable> implements IndexEntryAccessor<S> {
             mSingleMatchQuery = query = mIndexEntryStorage.query(filter);
         }
 
-        return fetchFromIndexEntryQuery(storage, query.withValues(identityValues));
+        return fetchFromIndexEntryQuery(storage, query.withValues(identityValues), controller);
     }
 
     Cursor<S> fetchFromIndexEntryQuery(IndexedStorage storage, Query<?> indexEntryQuery)
         throws FetchException
     {
-        return new IndexedCursor<S>(indexEntryQuery.fetch(), storage, mAccessor);
+        return fetchFromIndexEntryQuery(storage, indexEntryQuery, null);
+    }
+
+    Cursor<S> fetchFromIndexEntryQuery(IndexedStorage storage, Query<?> indexEntryQuery,
+                                       Query.Controller controller)
+        throws FetchException
+    {
+        return new IndexedCursor<S>(indexEntryQuery.fetch(controller), storage, mAccessor);
     }
 
     @Override
diff --git a/src/main/java/com/amazon/carbonado/repo/jdbc/H2ExceptionTransformer.java b/src/main/java/com/amazon/carbonado/repo/jdbc/H2ExceptionTransformer.java
index e760f58..d10856a 100644
--- a/src/main/java/com/amazon/carbonado/repo/jdbc/H2ExceptionTransformer.java
+++ b/src/main/java/com/amazon/carbonado/repo/jdbc/H2ExceptionTransformer.java
@@ -28,9 +28,15 @@ import java.sql.SQLException;
  */
 class H2ExceptionTransformer extends JDBCExceptionTransformer {
     public static int DUPLICATE_KEY = 23001;
+    public static int PROCESSING_CANCELED = 90051;
 
     @Override
     public boolean isUniqueConstraintError(SQLException e) {
-        return DUPLICATE_KEY == e.getErrorCode();
+        return super.isUniqueConstraintError(e) || DUPLICATE_KEY == e.getErrorCode();
+    }
+
+    @Override
+    public boolean isTimeoutError(SQLException e) {
+        return super.isTimeoutError(e) || PROCESSING_CANCELED == e.getErrorCode();
     }
 }
diff --git a/src/main/java/com/amazon/carbonado/repo/jdbc/JDBCExceptionTransformer.java b/src/main/java/com/amazon/carbonado/repo/jdbc/JDBCExceptionTransformer.java
index f72c717..4780140 100644
--- a/src/main/java/com/amazon/carbonado/repo/jdbc/JDBCExceptionTransformer.java
+++ b/src/main/java/com/amazon/carbonado/repo/jdbc/JDBCExceptionTransformer.java
@@ -23,9 +23,11 @@ import java.sql.SQLException;
 import com.amazon.carbonado.ConstraintException;
 import com.amazon.carbonado.FetchDeadlockException;
 import com.amazon.carbonado.FetchException;
+import com.amazon.carbonado.FetchTimeoutException;
 import com.amazon.carbonado.PersistDeadlockException;
 import com.amazon.carbonado.PersistDeniedException;
 import com.amazon.carbonado.PersistException;
+import com.amazon.carbonado.PersistTimeoutException;
 import com.amazon.carbonado.UniqueConstraintException;
 import com.amazon.carbonado.spi.ExceptionTransformer;
 
@@ -59,6 +61,8 @@ class JDBCExceptionTransformer extends ExceptionTransformer {
      */
     public static String SQLSTATE_DEADLOCK_WITH_ROLLBACK = "40001";
 
+    public static String SQLSTATE_PROCESSING_CANCELED = "57014";
+
     /**
      * Examines the SQLSTATE code of the given SQL exception and determines if
      * it is a generic constaint violation.
@@ -103,6 +107,16 @@ class JDBCExceptionTransformer extends ExceptionTransformer {
         return false;
     }
 
+    public boolean isTimeoutError(SQLException e) {
+        if (e != null) {
+            String sqlstate = e.getSQLState();
+            if (sqlstate != null) {
+                return SQLSTATE_PROCESSING_CANCELED.equals(sqlstate);
+            }
+        }
+        return false;
+    }
+
     JDBCExceptionTransformer() {
     }
 
@@ -117,6 +131,9 @@ class JDBCExceptionTransformer extends ExceptionTransformer {
             if (isDeadlockError(se)) {
                 return new FetchDeadlockException(e);
             }
+            if (isTimeoutError(se)) {
+                return new FetchTimeoutException(e);
+            }
         }
         return null;
     }
@@ -141,6 +158,9 @@ class JDBCExceptionTransformer extends ExceptionTransformer {
             if (isDeadlockError(se)) {
                 return new PersistDeadlockException(e);
             }
+            if (isTimeoutError(se)) {
+                return new PersistTimeoutException(e);
+            }
         }
         return null;
     }
diff --git a/src/main/java/com/amazon/carbonado/repo/jdbc/JDBCStorage.java b/src/main/java/com/amazon/carbonado/repo/jdbc/JDBCStorage.java
index a49a150..bbe5589 100644
--- a/src/main/java/com/amazon/carbonado/repo/jdbc/JDBCStorage.java
+++ b/src/main/java/com/amazon/carbonado/repo/jdbc/JDBCStorage.java
@@ -27,6 +27,7 @@ import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.logging.LogFactory;
 
@@ -34,6 +35,7 @@ import com.amazon.carbonado.Cursor;
 import com.amazon.carbonado.FetchException;
 import com.amazon.carbonado.IsolationLevel;
 import com.amazon.carbonado.PersistException;
+import com.amazon.carbonado.Query;
 import com.amazon.carbonado.Repository;
 import com.amazon.carbonado.RepositoryException;
 import com.amazon.carbonado.Storable;
@@ -42,6 +44,7 @@ import com.amazon.carbonado.SupportException;
 import com.amazon.carbonado.Transaction;
 import com.amazon.carbonado.Trigger;
 import com.amazon.carbonado.capability.IndexInfo;
+import com.amazon.carbonado.cursor.ControllerCursor;
 import com.amazon.carbonado.cursor.EmptyCursor;
 import com.amazon.carbonado.cursor.LimitCursor;
 import com.amazon.carbonado.filter.AndFilter;
@@ -329,6 +332,28 @@ class JDBCStorage<S extends Storable> extends StandardQueryFactory<S>
         return new JDBCQuery(filter, values, ordering, hints);
     }
 
+    static PreparedStatement prepareStatement(Connection con, String sql,
+                                              Query.Controller controller)
+        throws SQLException
+    {
+        PreparedStatement ps = con.prepareStatement(sql);
+
+        if (controller != null) {
+            long timeout = controller.getTimeout();
+            if (timeout >= 0) {
+                TimeUnit unit = controller.getTimeoutUnit();
+                if (unit != null) {
+                    long seconds = unit.toSeconds(timeout);
+                    int intSeconds = seconds <= 0 ? 1 :
+                        (seconds <= Integer.MAX_VALUE ? ((int) seconds) : 0);
+                    ps.setQueryTimeout(intSeconds);
+                }
+            }
+        }
+
+        return ps;
+    }
+
     public S instantiate(ResultSet rs) throws SQLException {
         return (S) mInstanceFactory.instantiate(this, rs, FIRST_RESULT_INDEX);
     }
@@ -668,12 +693,21 @@ class JDBCStorage<S extends Storable> extends StandardQueryFactory<S>
             }
         }
 
+        @Override
         public Cursor<S> fetch(FilterValues<S> values) throws FetchException {
+            return fetch(values, null);
+        }
+
+        @Override
+        public Cursor<S> fetch(FilterValues<S> values, Query.Controller controller)
+            throws FetchException
+        {
             TransactionScope<JDBCTransaction> scope = mRepository.localTransactionScope();
             boolean forUpdate = scope.isForUpdate();
             Connection con = getConnection();
             try {
-                PreparedStatement ps = con.prepareStatement(prepareSelect(values, forUpdate));
+                PreparedStatement ps =
+                    prepareStatement(con, prepareSelect(values, forUpdate), controller);
                 Integer fetchSize = mRepository.getFetchSize();
                 if (fetchSize != null) {
                     ps.setFetchSize(fetchSize);
@@ -681,7 +715,8 @@ class JDBCStorage<S extends Storable> extends StandardQueryFactory<S>
 
                 try {
                     setParameters(ps, values);
-                    return new JDBCCursor<S>(JDBCStorage.this, scope, con, ps);
+                    return ControllerCursor.apply
+                        (new JDBCCursor<S>(JDBCStorage.this, scope, con, ps), controller);
                 } catch (Exception e) {
                     // in case of exception, close statement
                     try {
@@ -705,6 +740,14 @@ class JDBCStorage<S extends Storable> extends StandardQueryFactory<S>
         @Override
         public Cursor<S> fetchSlice(FilterValues<S> values, long from, Long to)
             throws FetchException
+        {
+            return fetchSlice(values, from, to, null);
+        }
+
+        @Override
+        public Cursor<S> fetchSlice(FilterValues<S> values, long from, Long to,
+                                    Query.Controller controller)
+            throws FetchException
         {
             if (to != null && (to - from) <= 0) {
                 return EmptyCursor.the();
@@ -716,17 +759,17 @@ class JDBCStorage<S extends Storable> extends StandardQueryFactory<S>
 
             switch (option) {
             case NOT_SUPPORTED: default:
-                return super.fetchSlice(values, from, to);
+                return super.fetchSlice(values, from, to, controller);
             case LIMIT_ONLY:
                 if (from > 0 || to == null) {
-                    return super.fetchSlice(values, from, to);
+                    return super.fetchSlice(values, from, to, controller);
                 }
                 select = prepareSelect(values, false);
                 select = mSupportStrategy.buildSelectWithSlice(select, false, true);
                 break;
             case OFFSET_ONLY:
                 if (from <= 0) {
-                    return super.fetchSlice(values, from, to);
+                    return super.fetchSlice(values, from, to, controller);
                 }
                 select = prepareSelect(values, false);
                 select = mSupportStrategy.buildSelectWithSlice(select, true, false);
@@ -746,7 +789,7 @@ class JDBCStorage<S extends Storable> extends StandardQueryFactory<S>
 
             Connection con = getConnection();
             try {
-                PreparedStatement ps = con.prepareStatement(select);
+                PreparedStatement ps = prepareStatement(con, select, controller);
                 Integer fetchSize = mRepository.getFetchSize();
                 if (fetchSize != null) {
                     ps.setFetchSize(fetchSize);
@@ -760,7 +803,10 @@ class JDBCStorage<S extends Storable> extends StandardQueryFactory<S>
                             switch (option) {
                             case OFFSET_ONLY:
                                 ps.setLong(psOrdinal, from);
-                                Cursor<S> c = new JDBCCursor<S>(JDBCStorage.this, scope, con, ps);
+                                Cursor<S> c =
+                                    ControllerCursor.apply
+                                    (new JDBCCursor<S>(JDBCStorage.this, scope, con, ps),
+                                     controller);
                                 return new LimitCursor<S>(c, to - from);
                             case LIMIT_AND_OFFSET:
                                 ps.setLong(psOrdinal, to - from);
@@ -782,7 +828,8 @@ class JDBCStorage<S extends Storable> extends StandardQueryFactory<S>
                         ps.setLong(psOrdinal, to);
                     }
 
-                    return new JDBCCursor<S>(JDBCStorage.this, scope, con, ps);
+                    return ControllerCursor.apply
+                        (new JDBCCursor<S>(JDBCStorage.this, scope, con, ps), controller);
                 } catch (Exception e) {
                     // in case of exception, close statement
                     try {
@@ -805,9 +852,17 @@ class JDBCStorage<S extends Storable> extends StandardQueryFactory<S>
 
         @Override
         public long count(FilterValues<S> values) throws FetchException {
+            return count(values, null);
+        }
+
+        @Override
+        public long count(FilterValues<S> values, Query.Controller controller)
+            throws FetchException
+        {
             Connection con = getConnection();
             try {
-                PreparedStatement ps = con.prepareStatement(prepareCount(values));
+                PreparedStatement ps = prepareStatement(con, prepareCount(values), controller);
+
                 try {
                     setParameters(ps, values);
                     ResultSet rs = ps.executeQuery();
@@ -827,10 +882,12 @@ class JDBCStorage<S extends Storable> extends StandardQueryFactory<S>
             }
         }
 
+        @Override
         public Filter<S> getFilter() {
             return mFilter;
         }
 
+        @Override
         public OrderingList<S> getOrdering() {
             return mOrdering;
         }
@@ -846,6 +903,7 @@ class JDBCStorage<S extends Storable> extends StandardQueryFactory<S>
             return true;
         }
 
+        @Override
         public boolean printPlan(Appendable app, int indentLevel, FilterValues<S> values)
             throws IOException
         {
@@ -862,7 +920,9 @@ class JDBCStorage<S extends Storable> extends StandardQueryFactory<S>
         /**
          * Delete operation is included in cursor factory for ease of implementation.
          */
-        int executeDelete(FilterValues<S> filterValues) throws PersistException {
+        int executeDelete(FilterValues<S> filterValues, Query.Controller controller)
+            throws PersistException
+        {
             Connection con;
             try {
                 con = getConnection();
@@ -870,7 +930,8 @@ class JDBCStorage<S extends Storable> extends StandardQueryFactory<S>
                 throw e.toPersistException();
             }
             try {
-                PreparedStatement ps = con.prepareStatement(prepareDelete(filterValues));
+                PreparedStatement ps =
+                    prepareStatement(con, prepareDelete(filterValues), controller);
                 try {
                     setParameters(ps, filterValues);
                     return ps.executeUpdate();
@@ -976,13 +1037,18 @@ class JDBCStorage<S extends Storable> extends StandardQueryFactory<S>
 
         @Override
         public void deleteAll() throws PersistException {
+            deleteAll(null);
+        }
+
+        @Override
+        public void deleteAll(Controller controller) throws PersistException {
             if (mTriggerManager.getDeleteTrigger() != null) {
                 // Super implementation loads one at time and calls
                 // delete. This allows delete trigger to be invoked on each.
-                super.deleteAll();
+                super.deleteAll(controller);
             } else {
                 try {
-                    ((Executor) executor()).executeDelete(getFilterValues());
+                    ((Executor) executor()).executeDelete(getFilterValues(), controller);
                 } catch (RepositoryException e) {
                     throw e.toPersistException();
                 }
diff --git a/src/main/java/com/amazon/carbonado/repo/jdbc/OracleExceptionTransformer.java b/src/main/java/com/amazon/carbonado/repo/jdbc/OracleExceptionTransformer.java
index ad63941..ebaafd7 100644
--- a/src/main/java/com/amazon/carbonado/repo/jdbc/OracleExceptionTransformer.java
+++ b/src/main/java/com/amazon/carbonado/repo/jdbc/OracleExceptionTransformer.java
@@ -32,6 +32,8 @@ class OracleExceptionTransformer extends JDBCExceptionTransformer {
 
     public static int DEADLOCK_DETECTED = 60;
 
+    public static int PROCESSING_CANCELED = 1013;
+
     @Override
     public boolean isUniqueConstraintError(SQLException e) {
         if (isConstraintError(e)) {
@@ -63,4 +65,9 @@ class OracleExceptionTransformer extends JDBCExceptionTransformer {
         }
         return false;
     }
+
+    @Override
+    public boolean isTimeoutError(SQLException e) {
+        return super.isTimeoutError(e) || e != null && PROCESSING_CANCELED == e.getErrorCode();
+    }
 }
diff --git a/src/main/java/com/amazon/carbonado/repo/logging/LoggingQuery.java b/src/main/java/com/amazon/carbonado/repo/logging/LoggingQuery.java
index 08be6e3..5dc4d3b 100644
--- a/src/main/java/com/amazon/carbonado/repo/logging/LoggingQuery.java
+++ b/src/main/java/com/amazon/carbonado/repo/logging/LoggingQuery.java
@@ -164,6 +164,15 @@ class LoggingQuery<S extends Storable> implements Query<S> {
         return mQuery.fetch();
     }
 
+    @Override
+    public Cursor<S> fetch(Controller controller) throws FetchException {
+        Log log = mStorage.mLog;
+        if (log.isEnabled()) {
+            log.write("Query.fetch(controller) on " + this + ", controller: " + controller);
+        }
+        return mQuery.fetch(controller);
+    }
+
     @Override
     public Cursor<S> fetchSlice(long from, Long to) throws FetchException {
         Log log = mStorage.mLog;
@@ -174,6 +183,16 @@ class LoggingQuery<S extends Storable> implements Query<S> {
         return mQuery.fetchSlice(from, to);
     }
 
+    @Override
+    public Cursor<S> fetchSlice(long from, Long to, Controller controller) throws FetchException {
+        Log log = mStorage.mLog;
+        if (log.isEnabled()) {
+            log.write("Query.fetchSlice(start, to, controller) on " + this +
+                      ", from: " + from + ", to: " + to + ", controller: " + controller);
+        }
+        return mQuery.fetchSlice(from, to, controller);
+    }
+
     @Override
     public <T extends S> Cursor<S> fetchAfter(T start) throws FetchException {
         Log log = mStorage.mLog;
@@ -183,6 +202,18 @@ class LoggingQuery<S extends Storable> implements Query<S> {
         return mQuery.fetchAfter(start);
     }
 
+    @Override
+    public <T extends S> Cursor<S> fetchAfter(T start, Controller controller)
+        throws FetchException
+    {
+        Log log = mStorage.mLog;
+        if (log.isEnabled()) {
+            log.write("Query.fetchAfter(start, controller) on " + this + ", start: " + start
+                      + ", controller: " + controller);
+        }
+        return mQuery.fetchAfter(start, controller);
+    }
+
     @Override
     public S loadOne() throws FetchException {
         Log log = mStorage.mLog;
@@ -192,6 +223,15 @@ class LoggingQuery<S extends Storable> implements Query<S> {
         return mQuery.loadOne();
     }
 
+    @Override
+    public S loadOne(Controller controller) throws FetchException {
+        Log log = mStorage.mLog;
+        if (log.isEnabled()) {
+            log.write("Query.loadOne() on " + this + ", controller: " + controller);
+        }
+        return mQuery.loadOne(controller);
+    }
+
     @Override
     public S tryLoadOne() throws FetchException {
         Log log = mStorage.mLog;
@@ -201,6 +241,15 @@ class LoggingQuery<S extends Storable> implements Query<S> {
         return mQuery.tryLoadOne();
     }
 
+    @Override
+    public S tryLoadOne(Controller controller) throws FetchException {
+        Log log = mStorage.mLog;
+        if (log.isEnabled()) {
+            log.write("Query.tryLoadOne(controller) on " + this + ", controller: " + controller);
+        }
+        return mQuery.tryLoadOne(controller);
+    }
+
     @Override
     public void deleteOne() throws PersistException {
         Log log = mStorage.mLog;
@@ -210,6 +259,15 @@ class LoggingQuery<S extends Storable> implements Query<S> {
         mQuery.deleteOne();
     }
 
+    @Override
+    public void deleteOne(Controller controller) throws PersistException {
+        Log log = mStorage.mLog;
+        if (log.isEnabled()) {
+            log.write("Query.deleteOne(controller) on " + this + ", controller: " + controller);
+        }
+        mQuery.deleteOne(controller);
+    }
+
     @Override
     public boolean tryDeleteOne() throws PersistException {
         Log log = mStorage.mLog;
@@ -219,6 +277,15 @@ class LoggingQuery<S extends Storable> implements Query<S> {
         return mQuery.tryDeleteOne();
     }
 
+    @Override
+    public boolean tryDeleteOne(Controller controller) throws PersistException {
+        Log log = mStorage.mLog;
+        if (log.isEnabled()) {
+            log.write("Query.tryDeleteOne(controller) on " + this + ", controller: " + controller);
+        }
+        return mQuery.tryDeleteOne(controller);
+    }
+
     @Override
     public void deleteAll() throws PersistException {
         Log log = mStorage.mLog;
@@ -228,6 +295,15 @@ class LoggingQuery<S extends Storable> implements Query<S> {
         mQuery.deleteAll();
     }
 
+    @Override
+    public void deleteAll(Controller controller) throws PersistException {
+        Log log = mStorage.mLog;
+        if (log.isEnabled()) {
+            log.write("Query.deleteAll(controller) on " + this + ", controller: " + controller);
+        }
+        mQuery.deleteAll(controller);
+    }
+
     @Override
     public long count() throws FetchException {
         Log log = mStorage.mLog;
@@ -237,6 +313,15 @@ class LoggingQuery<S extends Storable> implements Query<S> {
         return mQuery.count();
     }
 
+    @Override
+    public long count(Controller controller) throws FetchException {
+        Log log = mStorage.mLog;
+        if (log.isEnabled()) {
+            log.write("Query.count(controller) on " + this + ", controller: " + controller);
+        }
+        return mQuery.count(controller);
+    }
+
     @Override
     public boolean exists() throws FetchException {
         Log log = mStorage.mLog;
@@ -246,6 +331,15 @@ class LoggingQuery<S extends Storable> implements Query<S> {
         return mQuery.exists();
     }
 
+    @Override
+    public boolean exists(Controller controller) throws FetchException {
+        Log log = mStorage.mLog;
+        if (log.isEnabled()) {
+            log.write("Query.exists(controller) on " + this + ", controller: " + controller);
+        }
+        return mQuery.exists(controller);
+    }
+
     @Override
     public boolean printNative() {
         return mQuery.printNative();
diff --git a/src/main/java/com/amazon/carbonado/repo/map/MapStorage.java b/src/main/java/com/amazon/carbonado/repo/map/MapStorage.java
index c84d8e3..c9c17b6 100644
--- a/src/main/java/com/amazon/carbonado/repo/map/MapStorage.java
+++ b/src/main/java/com/amazon/carbonado/repo/map/MapStorage.java
@@ -47,6 +47,7 @@ import com.amazon.carbonado.Trigger;
 import com.amazon.carbonado.capability.IndexInfo;
 
 import com.amazon.carbonado.cursor.ArraySortBuffer;
+import com.amazon.carbonado.cursor.ControllerCursor;
 import com.amazon.carbonado.cursor.EmptyCursor;
 import com.amazon.carbonado.cursor.FilteredCursor;
 import com.amazon.carbonado.cursor.SingletonCursor;
@@ -540,6 +541,10 @@ class MapStorage<S extends Storable>
     }
 
     public long countAll() throws FetchException {
+        return countAll(null);
+    }
+
+    public long countAll(Query.Controller controller) throws FetchException {
         try {
             TransactionScope<MapTransaction> scope = mRepo.localTransactionScope();
             MapTransaction txn = scope.getTxn();
@@ -578,8 +583,19 @@ class MapStorage<S extends Storable>
         }
     }
 
+    public Cursor<S> fetchAll(Query.Controller controller) throws FetchException {
+        return ControllerCursor.apply(fetchAll(), controller);
+    }
+
     public Cursor<S> fetchOne(StorableIndex<S> index, Object[] identityValues)
         throws FetchException
+    {
+        return fetchOne(index, identityValues, null);
+    }
+
+    public Cursor<S> fetchOne(StorableIndex<S> index, Object[] identityValues,
+                              Query.Controller controller)
+        throws FetchException
     {
         try {
             S key = prepare();
@@ -643,6 +659,12 @@ class MapStorage<S extends Storable>
         return null;
     }
 
+    public Cursor<S> fetchFromIndexEntryQuery(StorableIndex<S> index, Query<?> indexEntryQuery,
+                                              Query.Controller controller)
+    {
+        return null;
+    }
+
     public Cursor<S> fetchSubset(StorableIndex<S> index,
                                  Object[] identityValues,
                                  BoundaryType rangeStartBoundary,
@@ -770,6 +792,28 @@ class MapStorage<S extends Storable>
         return cursor;
     }
 
+    public Cursor<S> fetchSubset(StorableIndex<S> index,
+                                 Object[] identityValues,
+                                 BoundaryType rangeStartBoundary,
+                                 Object rangeStartValue,
+                                 BoundaryType rangeEndBoundary,
+                                 Object rangeEndValue,
+                                 boolean reverseRange,
+                                 boolean reverseOrder,
+                                 Query.Controller controller)
+        throws FetchException
+    {
+        return ControllerCursor.apply(fetchSubset(index,
+                                                  identityValues,
+                                                  rangeStartBoundary,
+                                                  rangeStartValue,
+                                                  rangeEndBoundary,
+                                                  rangeEndValue,
+                                                  reverseRange,
+                                                  reverseOrder),
+                                      controller);
+    }
+
     private List<OrderedProperty<S>> createPkPropList() {
         return new ArrayList<OrderedProperty<S>>(mInfo.getPrimaryKey().getProperties());
     }
@@ -806,6 +850,11 @@ class MapStorage<S extends Storable>
         return new ArraySortBuffer<S>();
     }
 
+    public SortBuffer<S> createSortBuffer(Query.Controller controller) {
+        // ArraySortBuffer doesn't support controller.
+        return new ArraySortBuffer<S>();
+    }
+
     public static interface InstanceFactory {
         Storable instantiate(DelegateSupport support);
     }
diff --git a/src/main/java/com/amazon/carbonado/repo/sleepycat/BDBStorage.java b/src/main/java/com/amazon/carbonado/repo/sleepycat/BDBStorage.java
index f4d8f24..c0f882e 100644
--- a/src/main/java/com/amazon/carbonado/repo/sleepycat/BDBStorage.java
+++ b/src/main/java/com/amazon/carbonado/repo/sleepycat/BDBStorage.java
@@ -45,6 +45,7 @@ import com.amazon.carbonado.UniqueConstraintException;
 
 import com.amazon.carbonado.capability.IndexInfo;
 
+import com.amazon.carbonado.cursor.ControllerCursor;
 import com.amazon.carbonado.cursor.EmptyCursor;
 import com.amazon.carbonado.cursor.MergeSortBuffer;
 import com.amazon.carbonado.cursor.SingletonCursor;
@@ -263,22 +264,45 @@ abstract class BDBStorage<Txn, S extends Storable> implements Storage<S>, Storag
         return new MergeSortBuffer<S>();
     }
 
+    public SortBuffer<S> createSortBuffer(Query.Controller controller) {
+        return new MergeSortBuffer<S>(controller);
+    }
+
     public long countAll() throws FetchException {
         // Return -1 to indicate default algorithm should be used.
         return -1;
     }
 
+    public long countAll(Query.Controller controller) throws FetchException {
+        // Return -1 to indicate default algorithm should be used.
+        return -1;
+    }
+
     public Cursor<S> fetchAll() throws FetchException {
+        return fetchAll(null);
+    }
+
+    public Cursor<S> fetchAll(Query.Controller controller) throws FetchException {
         return fetchSubset(null, null,
                            BoundaryType.OPEN, null,
                            BoundaryType.OPEN, null,
-                           false, false);
+                           false, false,
+                           controller);
     }
 
     public Cursor<S> fetchOne(StorableIndex<S> index,
                               Object[] identityValues)
         throws FetchException
     {
+        return fetchOne(index, identityValues, null);
+    }
+
+    public Cursor<S> fetchOne(StorableIndex<S> index,
+                              Object[] identityValues,
+                              Query.Controller controller)
+        throws FetchException
+    {
+        // Note: Controller is never called.
         byte[] key = mStorableCodec.encodePrimaryKey(identityValues);
         byte[] value = mRawSupport.tryLoad(null, key);
         if (value == null) {
@@ -296,6 +320,13 @@ abstract class BDBStorage<Txn, S extends Storable> implements Storage<S>, Storag
         throw new UnsupportedOperationException();
     }
 
+    public Cursor<S> fetchFromIndexEntryQuery(StorableIndex<S> index, Query<?> indexEntryQuery,
+                                              Query.Controller controller)
+    {
+        // This method should never be called since null was returned by indexEntryQuery.
+        throw new UnsupportedOperationException();
+    }
+
     public Cursor<S> fetchSubset(StorableIndex<S> index,
                                  Object[] identityValues,
                                  BoundaryType rangeStartBoundary,
@@ -378,6 +409,7 @@ abstract class BDBStorage<Txn, S extends Storable> implements Storage<S>, Storag
                      getPrimaryDatabase());
 
                 cursor.open();
+
                 return cursor;
             } catch (Exception e) {
                 throw toFetchException(e);
@@ -387,6 +419,28 @@ abstract class BDBStorage<Txn, S extends Storable> implements Storage<S>, Storag
         }
     }
 
+    public Cursor<S> fetchSubset(StorableIndex<S> index,
+                                 Object[] identityValues,
+                                 BoundaryType rangeStartBoundary,
+                                 Object rangeStartValue,
+                                 BoundaryType rangeEndBoundary,
+                                 Object rangeEndValue,
+                                 boolean reverseRange,
+                                 boolean reverseOrder,
+                                 Query.Controller controller)
+        throws FetchException
+    {
+        return ControllerCursor.apply(fetchSubset(index,
+                                                  identityValues,
+                                                  rangeStartBoundary,
+                                                  rangeStartValue,
+                                                  rangeEndBoundary,
+                                                  rangeEndValue,
+                                                  reverseRange,
+                                                  reverseOrder),
+                                      controller);
+    }
+
     private byte[] createBound(Object[] exactValues, byte[] exactKey, Object rangeValue,
                                StorableCodec<S> codec) {
         Object[] values = {rangeValue};
-- 
cgit v1.2.3