diff options
Diffstat (limited to 'src')
32 files changed, 1273 insertions, 103 deletions
| diff --git a/src/main/java/com/amazon/carbonado/Query.java b/src/main/java/com/amazon/carbonado/Query.java index a660c1a..1780ebd 100644 --- a/src/main/java/com/amazon/carbonado/Query.java +++ b/src/main/java/com/amazon/carbonado/Query.java @@ -18,7 +18,13 @@  package com.amazon.carbonado;
 +import java.io.Closeable;
  import java.io.IOException;
 +import java.io.Serializable;
 +
 +import java.util.concurrent.TimeUnit;
 +
 +import java.util.concurrent.atomic.AtomicLongFieldUpdater;
  import com.amazon.carbonado.filter.Filter;
  import com.amazon.carbonado.filter.FilterValues;
 @@ -287,6 +293,20 @@ public interface Query<S extends Storable> {      Cursor<S> fetch() throws FetchException;
      /**
 +     * Fetches results for this query. If any updates or deletes might be
 +     * performed on the results, consider enclosing the fetch in a
 +     * transaction. This allows the isolation level and "for update" mode to be
 +     * adjusted. Some repositories might otherwise deadlock.
 +     *
 +     * @param controller optional controller which can abort query operation
 +     * @return fetch results
 +     * @throws IllegalStateException if any blank parameters in this query
 +     * @throws FetchException if storage layer throws an exception
 +     * @see Repository#enterTransaction(IsolationLevel)
 +     */
 +    Cursor<S> fetch(Controller controller) throws FetchException;
 +
 +    /**
       * Fetches a slice of results for this query, as defined by a numerical
       * range. A slice can be used to limit the number of results from a
       * query. It is strongly recommended that the query be given a total {@link
 @@ -304,6 +324,24 @@ public interface Query<S extends Storable> {      Cursor<S> fetchSlice(long from, Long to) throws FetchException;
      /**
 +     * Fetches a slice of results for this query, as defined by a numerical
 +     * range. A slice can be used to limit the number of results from a
 +     * query. It is strongly recommended that the query be given a total {@link
 +     * #orderBy ordering} in order for the slice results to be deterministic.
 +     *
 +     * @param from zero-based {@code from} record number, inclusive
 +     * @param to optional zero-based {@code to} record number, exclusive
 +     * @param controller optional controller which can abort query operation
 +     * @return fetch results
 +     * @throws IllegalStateException if any blank parameters in this query
 +     * @throws IllegalArgumentException if {@code from} is negative or if
 +     * {@code from} is more than {@code to}
 +     * @throws FetchException if storage layer throws an exception
 +     * @since 1.2
 +     */
 +    Cursor<S> fetchSlice(long from, Long to, Controller controller) throws FetchException;
 +
 +    /**
       * Fetches results for this query after a given starting point, which is
       * useful for re-opening a cursor. This is only effective when query has
       * been given an explicit {@link #orderBy ordering}. If not a total
 @@ -326,6 +364,29 @@ public interface Query<S extends Storable> {      <T extends S> Cursor<S> fetchAfter(T start) throws FetchException;
      /**
 +     * Fetches results for this query after a given starting point, which is
 +     * useful for re-opening a cursor. This is only effective when query has
 +     * been given an explicit {@link #orderBy ordering}. If not a total
 +     * ordering, then cursor may start at an earlier position.
 +     *
 +     * <p>Note: This method can be very expensive to call repeatedly, if the
 +     * query needs to perform a sort operation. Ideally, the query ordering
 +     * should match the natural ordering of an index or key.
 +     *
 +     * <p>Calling {@code fetchAfter(s)} is equivalent to calling {@code
 +     * after(s).fetch()}.
 +     *
 +     * @param start storable to attempt to start after; if null, fetch all results
 +     * @param controller optional controller which can abort query operation
 +     * @return fetch results
 +     * @throws IllegalStateException if any blank parameters in this query
 +     * @throws FetchException if storage layer throws an exception
 +     * @see Repository#enterTransaction(IsolationLevel)
 +     * @see #after
 +     */
 +    <T extends S> Cursor<S> fetchAfter(T start, Controller controller) throws FetchException;
 +
 +    /**
       * Attempts to load exactly one matching object. If the number of matching
       * records is zero or exceeds one, then an exception is thrown instead.
       *
 @@ -338,8 +399,21 @@ public interface Query<S extends Storable> {      S loadOne() throws FetchException;
      /**
 -     * May return null if nothing found. Throws exception if record count is
 -     * more than one.
 +     * Attempts to load exactly one matching object. If the number of matching
 +     * records is zero or exceeds one, then an exception is thrown instead.
 +     *
 +     * @param controller optional controller which can abort query operation
 +     * @return a single fetched object
 +     * @throws IllegalStateException if any blank parameters in this query
 +     * @throws FetchNoneException if no matching record found
 +     * @throws FetchMultipleException if more than one matching record found
 +     * @throws FetchException if storage layer throws an exception
 +     */
 +    S loadOne(Controller controller) throws FetchException;
 +
 +    /**
 +     * Tries to load one record, but returns null if nothing was found. Throws
 +     * exception if record count is more than one.
       *
       * @return null or a single fetched object
       * @throws IllegalStateException if any blank parameters in this query
 @@ -349,6 +423,18 @@ public interface Query<S extends Storable> {      S tryLoadOne() throws FetchException;
      /**
 +     * Tries to load one record, but returns null if nothing was found. Throws
 +     * exception if record count is more than one.
 +     *
 +     * @param controller optional controller which can abort query operation
 +     * @return null or a single fetched object
 +     * @throws IllegalStateException if any blank parameters in this query
 +     * @throws FetchMultipleException if more than one matching record found
 +     * @throws FetchException if storage layer throws an exception
 +     */
 +    S tryLoadOne(Controller controller) throws FetchException;
 +
 +    /**
       * Deletes one matching object. If the number of matching records is zero or
       * exceeds one, then no delete occurs, and an exception is thrown instead.
       *
 @@ -360,6 +446,18 @@ public interface Query<S extends Storable> {      void deleteOne() throws PersistException;
      /**
 +     * Deletes one matching object. If the number of matching records is zero or
 +     * exceeds one, then no delete occurs, and an exception is thrown instead.
 +     *
 +     * @param controller optional controller which can abort query operation
 +     * @throws IllegalStateException if any blank parameters in this query
 +     * @throws PersistNoneException if no matching record found
 +     * @throws PersistMultipleException if more than one record matches
 +     * @throws PersistException if storage layer throws an exception
 +     */
 +    void deleteOne(Controller controller) throws PersistException;
 +
 +    /**
       * Deletes zero or one matching objects. If the number of matching records
       * exceeds one, then no delete occurs, and an exception is thrown instead.
       *
 @@ -371,6 +469,18 @@ public interface Query<S extends Storable> {      boolean tryDeleteOne() throws PersistException;
      /**
 +     * Deletes zero or one matching objects. If the number of matching records
 +     * exceeds one, then no delete occurs, and an exception is thrown instead.
 +     *
 +     * @param controller optional controller which can abort query operation
 +     * @return true if record existed and was deleted, or false if no match
 +     * @throws IllegalStateException if any blank parameters in this query
 +     * @throws PersistMultipleException if more than one record matches
 +     * @throws PersistException if storage layer throws an exception
 +     */
 +    boolean tryDeleteOne(Controller controller) throws PersistException;
 +
 +    /**
       * Deletes zero or more matching objects. There is no guarantee that
       * deleteAll is an atomic operation. If atomic behavior is desired, wrap
       * the call in a transaction scope.
 @@ -381,6 +491,17 @@ public interface Query<S extends Storable> {      void deleteAll() throws PersistException;
      /**
 +     * Deletes zero or more matching objects. There is no guarantee that
 +     * deleteAll is an atomic operation. If atomic behavior is desired, wrap
 +     * the call in a transaction scope.
 +     *
 +     * @param controller optional controller which can abort query operation
 +     * @throws IllegalStateException if any blank parameters in this query
 +     * @throws PersistException if storage layer throws an exception
 +     */
 +    void deleteAll(Controller controller) throws PersistException;
 +
 +    /**
       * Returns a count of all results matched by this query. Even though no
       * results are explicitly fetched, this method may still be expensive to
       * call. The actual performance will vary by repository and available indexes.
 @@ -392,6 +513,18 @@ public interface Query<S extends Storable> {      long count() throws FetchException;
      /**
 +     * Returns a count of all results matched by this query. Even though no
 +     * results are explicitly fetched, this method may still be expensive to
 +     * call. The actual performance will vary by repository and available indexes.
 +     *
 +     * @param controller optional controller which can abort query operation
 +     * @return count of matches
 +     * @throws IllegalStateException if any blank parameters in this query
 +     * @throws FetchException if storage layer throws an exception
 +     */
 +    long count(Controller controller) throws FetchException;
 +
 +    /**
       * Returns true if any results are matched by this query.
       *
       * @return true if any matches
 @@ -402,6 +535,17 @@ public interface Query<S extends Storable> {      boolean exists() throws FetchException;
      /**
 +     * Returns true if any results are matched by this query.
 +     *
 +     * @param controller optional controller which can abort query operation
 +     * @return true if any matches
 +     * @throws IllegalStateException if any blank parameters in this query
 +     * @throws FetchException if storage layer throws an exception
 +     * @since 1.2
 +     */
 +    boolean exists(Controller controller) throws FetchException;
 +
 +    /**
       * Print the native query to standard out, which is useful for performance
       * analysis. Not all repositories have a native query format. An example
       * native format is SQL.
 @@ -469,4 +613,166 @@ public interface Query<S extends Storable> {       * Returns a description of the query filter and any other arguments.
       */
      String toString();
 +
 +    /**
 +     * Controller instance can be used to abort query operations.
 +     *
 +     * <p>Example:<pre>
 +     * Storage<UserInfo> users = ...
 +     * long count = users.query("name = ?").count(Query.Timeout.seconds(10));
 +     * </pre>
 +     */
 +    public static interface Controller extends Serializable, Closeable {
 +        /**
 +         * Returns a non-negative value if controller imposes an absolute upper
 +         * bound on query execution time.
 +         */
 +        public long getTimeout();
 +
 +        /**
 +         * Returns the unit for the timeout, if applicable.
 +         */
 +        public TimeUnit getTimeoutUnit();
 +
 +        /**
 +         * Called by query when it begins, possibly multiple times. Implementation
 +         * is required to be idempotent and ignore multiple invocations.
 +         */
 +        public void begin();
 + 
 +        /**
 +         * Periodically called by query to determine if it should continue.
 +         */
 +        public void continueCheck() throws FetchException;
 + 
 +        /**
 +         * Always called by query when finished, even when it fails. Implementation
 +         * is required to be idempotent and ignore multiple invocations.
 +         */
 +        public void close();        
 +    }
 +
 +    /**
 +     * Timeout controller, for aborting long running queries. One instance is
 +     * good for one timeout. The instance can be shared by multiple queries, if
 +     * they are part of a single logical operation.
 +     *
 +     * <p>The timeout applies to the entire duration of fetching results, not
 +     * just the time spent between individual fetches. A caller which is slowly
 +     * processing results can timeout. More sophisticated timeouts can be
 +     * implemented using custom Controller implementations.
 +     */
 +    public static final class Timeout implements Controller {
 +        private static final long serialVersionUID = 1;
 +
 +        private static final AtomicLongFieldUpdater<Timeout> endUpdater =
 +            AtomicLongFieldUpdater.newUpdater(Timeout.class, "mEndNanos");
 +
 +        /**
 +         * Return a new Timeout in nanoseconds.
 +         */
 +        public static Timeout nanos(long timeout) {
 +            return new Timeout(timeout, TimeUnit.NANOSECONDS);
 +        }
 +
 +        /**
 +         * Return a new Timeout in microseconds.
 +         */
 +        public static Timeout micros(long timeout) {
 +            return new Timeout(timeout, TimeUnit.MICROSECONDS);
 +        }
 +
 +        /**
 +         * Return a new Timeout in milliseconds.
 +         */
 +        public static Timeout millis(long timeout) {
 +            return new Timeout(timeout, TimeUnit.MILLISECONDS);
 +        }
 + 
 +        /**
 +         * Return a new Timeout in seconds.
 +         */
 +        public static Timeout seconds(long timeout) {
 +            return new Timeout(timeout, TimeUnit.SECONDS);
 +        }
 +
 +        /**
 +         * Return a new Timeout in minutes.
 +         */
 + 
 +        public static Timeout minutes(long timeout) {
 +            return new Timeout(timeout, TimeUnit.MINUTES);
 +        }
 +
 +        /**
 +         * Return a new Timeout in hours.
 +         */
 +        public static Timeout hours(long timeout) {
 +            return new Timeout(timeout, TimeUnit.HOURS);
 +        }
 +
 +        private final long mTimeout;
 +        private final TimeUnit mUnit;
 +
 +        private volatile transient long mEndNanos;
 +
 +        public Timeout(long timeout, TimeUnit unit) {
 +            if (timeout < 0) {
 +                throw new IllegalArgumentException("Timeout cannot be negative: " + timeout);
 +            }
 +            if (unit == null && timeout != 0) {
 +                throw new IllegalArgumentException
 +                    ("TimeUnit cannot be null if timeout is non-zero: " + timeout);
 +            }
 +            mTimeout = timeout;
 +            mUnit = unit;
 +        }
 + 
 +        public long getTimeout() {
 +            return mTimeout;
 +        }
 +
 +        public TimeUnit getTimeoutUnit() {
 +            return mUnit;
 +        }
 +
 +        @Override
 +        public void begin() {
 +            long end = System.nanoTime() + mUnit.toNanos(mTimeout);
 +            if (end == 0) {
 +                // Handle rare case to ensure atomic compare and set always
 +                // works the first time, supporting idempotent calls to this
 +                // method.
 +                end = 1;
 +            }
 +            endUpdater.compareAndSet(this, 0, end);
 +        }
 +
 +        @Override
 +        public void continueCheck() throws FetchTimeoutException {
 +            long end = mEndNanos;
 +
 +            if (end == 0) {
 +                // Begin was not called, in violation of how the Controller
 +                // must be used. Be lenient and begin now.
 +                begin();
 +                end = mEndNanos;
 +            }
 +
 +            // Subtract to support modulo comparison.
 +            if ((System.nanoTime() - end) >= 0) {
 +                throw new FetchTimeoutException("Timed out: " + mTimeout + ' ' + mUnit);
 +            }
 +        }
 + 
 +        @Override
 +        public void close() {
 +            // Nothing to do.
 +        }
 +
 +        @Override
 +        public String toString() {
 +            return "Query.Timeout {timeout=" + mTimeout + ", unit=" + mUnit + '}';
 +        }
 +    }
  }
 diff --git a/src/main/java/com/amazon/carbonado/cursor/ControllerCursor.java b/src/main/java/com/amazon/carbonado/cursor/ControllerCursor.java new file mode 100644 index 0000000..da682d2 --- /dev/null +++ b/src/main/java/com/amazon/carbonado/cursor/ControllerCursor.java @@ -0,0 +1,99 @@ +/*
 + * Copyright 2011 Amazon Technologies, Inc. or its affiliates.
 + * Amazon, Amazon.com and Carbonado are trademarks or registered trademarks
 + * of Amazon Technologies, Inc. or its affiliates.  All rights reserved.
 + *
 + * Licensed under the Apache License, Version 2.0 (the "License");
 + * you may not use this file except in compliance with the License.
 + * You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package com.amazon.carbonado.cursor;
 +
 +import com.amazon.carbonado.Cursor;
 +import com.amazon.carbonado.FetchException;
 +import com.amazon.carbonado.Query;
 +
 +/**
 + * Wraps another cursor and periodically calls a {@link com.amazon.carbonado.Query.Controller controller}.
 + *
 + * @author Brian S O'Neill
 + */
 +public class ControllerCursor<S> extends AbstractCursor<S> {
 +    /**
 +     * Returns a ControllerCursor depending on whether a controller instance is
 +     * passed in or not.
 +     *
 +     * @param controller optional controller which can abort query operation
 +     * @throws IllegalArgumentException if source is null
 +     */
 +    public static <S> Cursor<S> apply(Cursor<S> source, Query.Controller controller) {
 +        return controller == null ? source : new ControllerCursor<S>(source, controller);
 +    }
 +
 +    private final Cursor<S> mSource;
 +    private final Query.Controller mController;
 +
 +    private byte mCount;
 +
 +    /**
 +     * @param controller required controller which can abort query operation
 +     * @throws IllegalArgumentException if either argument is null
 +     */
 +    private ControllerCursor(Cursor<S> source, Query.Controller controller) {
 +        if (source == null) {
 +            throw new IllegalArgumentException("Source is null");
 +        }
 +        if (controller == null) {
 +            throw new IllegalArgumentException("Controller is null");
 +        }
 +        mSource = source;
 +        mController = controller;
 +        controller.begin();
 +    }
 +
 +    public boolean hasNext() throws FetchException {
 +        if (mSource.hasNext()) {
 +            continueCheck();
 +            return true;
 +        }
 +        return false;
 +    }
 +
 +    public S next() throws FetchException {
 +        S next = mSource.next();
 +        continueCheck();
 +        return next;
 +    }
 +
 +    public void close() throws FetchException {
 +        try {
 +            mSource.close();
 +        } finally {
 +            mController.close();
 +        }
 +    }
 +
 +    private void continueCheck() throws FetchException {
 +        if (++mCount == 0) {
 +            try {
 +                mController.continueCheck();
 +            } catch (FetchException e) {
 +                try {
 +                    close();
 +                } catch (FetchException e2) {
 +                    // Ignore.
 +                }
 +                throw e;
 +            }
 +        }
 +    }
 +}
 diff --git a/src/main/java/com/amazon/carbonado/cursor/FilteredCursor.java b/src/main/java/com/amazon/carbonado/cursor/FilteredCursor.java index ad21667..4cf8c21 100644 --- a/src/main/java/com/amazon/carbonado/cursor/FilteredCursor.java +++ b/src/main/java/com/amazon/carbonado/cursor/FilteredCursor.java @@ -22,7 +22,6 @@ import java.util.NoSuchElementException;  import com.amazon.carbonado.Cursor;
  import com.amazon.carbonado.FetchException;
 -import com.amazon.carbonado.FetchInterruptedException;
  import com.amazon.carbonado.Storable;
  import com.amazon.carbonado.filter.Filter;
 @@ -114,14 +113,12 @@ public abstract class FilteredCursor<S> extends AbstractCursor<S> {              return true;
          }
          try {
 -            int count = 0;
              while (mCursor.hasNext()) {
                  S next = mCursor.next();
                  if (isAllowed(next)) {
                      mNext = next;
                      return true;
                  }
 -                interruptCheck(++count);
              }
          } catch (NoSuchElementException e) {
          } catch (FetchException e) {
 @@ -165,10 +162,9 @@ public abstract class FilteredCursor<S> extends AbstractCursor<S> {          try {
              int count = 0;
              while (--amount >= 0 && hasNext()) {
 -                interruptCheck(++count);
 +                ++count;
                  mNext = null;
              }
 -
              return count;
          } catch (FetchException e) {
              try {
 @@ -179,11 +175,4 @@ public abstract class FilteredCursor<S> extends AbstractCursor<S> {              throw e;
          }
      }
 -
 -    private void interruptCheck(int count) throws FetchException {
 -        if ((count & ~0xff) == 0 && Thread.interrupted()) {
 -            close();
 -            throw new FetchInterruptedException();
 -        }
 -    }
  }
 diff --git a/src/main/java/com/amazon/carbonado/cursor/GroupedCursor.java b/src/main/java/com/amazon/carbonado/cursor/GroupedCursor.java index 49ca84e..3053cd5 100644 --- a/src/main/java/com/amazon/carbonado/cursor/GroupedCursor.java +++ b/src/main/java/com/amazon/carbonado/cursor/GroupedCursor.java @@ -23,7 +23,6 @@ import java.util.NoSuchElementException;  import com.amazon.carbonado.Cursor;
  import com.amazon.carbonado.FetchException;
 -import com.amazon.carbonado.FetchInterruptedException;
  /**
   * Abstract cursor for aggregation and finding distinct data. The source cursor
 @@ -124,7 +123,6 @@ public abstract class GroupedCursor<S, G> extends AbstractCursor<G> {          }
          try {
 -            int count = 0;
              if (mCursor.hasNext()) {
                  if (mGroupLeader == null) {
                      beginGroup(mGroupLeader = mCursor.next());
 @@ -143,8 +141,6 @@ public abstract class GroupedCursor<S, G> extends AbstractCursor<G> {                              return true;
                          }
                      }
 -
 -                    interruptCheck(++count);
                  }
                  G aggregate = finishGroup();
 @@ -206,7 +202,7 @@ public abstract class GroupedCursor<S, G> extends AbstractCursor<G> {          try {
              int count = 0;
              while (--amount >= 0 && hasNext()) {
 -                interruptCheck(++count);
 +                ++count;
                  mNextAggregate = null;
              }
 @@ -220,11 +216,4 @@ public abstract class GroupedCursor<S, G> extends AbstractCursor<G> {              throw e;
          }
      }
 -
 -    private void interruptCheck(int count) throws FetchException {
 -        if ((count & ~0xff) == 0 && Thread.interrupted()) {
 -            close();
 -            throw new FetchInterruptedException();
 -        }
 -    }
  }
 diff --git a/src/main/java/com/amazon/carbonado/cursor/MergeSortBuffer.java b/src/main/java/com/amazon/carbonado/cursor/MergeSortBuffer.java index aae5243..0231012 100644 --- a/src/main/java/com/amazon/carbonado/cursor/MergeSortBuffer.java +++ b/src/main/java/com/amazon/carbonado/cursor/MergeSortBuffer.java @@ -38,7 +38,9 @@ import java.util.List;  import java.util.NoSuchElementException;
  import java.util.PriorityQueue;
 +import com.amazon.carbonado.FetchException;
  import com.amazon.carbonado.FetchInterruptedException;
 +import com.amazon.carbonado.Query;
  import com.amazon.carbonado.Storable;
  import com.amazon.carbonado.Storage;
  import com.amazon.carbonado.SupportException;
 @@ -122,6 +124,7 @@ public class MergeSortBuffer<S extends Storable> extends AbstractCollection<S>      private final String mTempDir;
      private final int mMaxArrayCapacity;
 +    private final Query.Controller mController;
      private Preparer<S> mPreparer;
 @@ -144,6 +147,15 @@ public class MergeSortBuffer<S extends Storable> extends AbstractCollection<S>      }
      /**
 +     * @since 1.2
 +     *
 +     * @param controller optional controller which can abort query operation
 +     */
 +    public MergeSortBuffer(Query.Controller controller) {
 +        this(null, TEMP_DIR, MAX_ARRAY_CAPACITY, controller);
 +    }
 +
 +    /**
       * @param storage storage for elements; if null use first Storable to
       * prepare reloaded Storables
       */
 @@ -154,6 +166,15 @@ public class MergeSortBuffer<S extends Storable> extends AbstractCollection<S>      /**
       * @param storage storage for elements; if null use first Storable to
       * prepare reloaded Storables
 +     * @param controller optional controller which can abort query operation
 +     */
 +    public MergeSortBuffer(Storage<S> storage, Query.Controller controller) {
 +        this(storage, TEMP_DIR, MAX_ARRAY_CAPACITY, controller);
 +    }
 +
 +    /**
 +     * @param storage storage for elements; if null use first Storable to
 +     * prepare reloaded Storables
       * @param tempDir directory to store temp files for merging, or null for default
       */
      public MergeSortBuffer(Storage<S> storage, String tempDir) {
 @@ -170,6 +191,22 @@ public class MergeSortBuffer<S extends Storable> extends AbstractCollection<S>       */
      @SuppressWarnings("unchecked")
      public MergeSortBuffer(Storage<S> storage, String tempDir, int maxArrayCapacity) {
 +        this(storage, tempDir, maxArrayCapacity, null);
 +    }
 +
 +    /**
 +     * @param storage storage for elements; if null use first Storable to
 +     * prepare reloaded Storables
 +     * @param tempDir directory to store temp files for merging, or null for default
 +     * @param maxArrayCapacity maximum amount of storables to keep in an array
 +     * before serializing to a file
 +     * @param controller optional controller which can abort query operation
 +     * @throws IllegalArgumentException if storage is null
 +     */
 +    @SuppressWarnings("unchecked")
 +    public MergeSortBuffer(Storage<S> storage, String tempDir, int maxArrayCapacity,
 +                           Query.Controller controller)
 +    {
          mTempDir = tempDir;
          mMaxArrayCapacity = maxArrayCapacity;
 @@ -179,6 +216,10 @@ public class MergeSortBuffer<S extends Storable> extends AbstractCollection<S>          int cap = Math.min(MIN_ARRAY_CAPACITY, maxArrayCapacity);
          mElements = (S[]) new Storable[cap];
 +
 +        if ((mController = controller) != null) {
 +            controller.begin();
 +        }
      }
      public void prepare(Comparator<S> comparator) {
 @@ -231,10 +272,10 @@ public class MergeSortBuffer<S extends Storable> extends AbstractCollection<S>                  if (mFilesInUse.size() < (MAX_OPEN_FILE_COUNT - 1)) {
                      mFilesInUse.add(raf);
 -                    int count = 0;
 +                    byte count = 0;
                      for (S element : mElements) {
 -                        // Check every so often if interrupted.
 -                        interruptCheck(++count);
 +                        // Check every so often if should continue.
 +                        continueCheck(++count);
                          element.writeTo(out);
                      }
                  } else {
 @@ -274,11 +315,11 @@ public class MergeSortBuffer<S extends Storable> extends AbstractCollection<S>                      // as well as error out earlier, should the disk be full.
                      raf.setLength(mergedLength);
 -                    int count = 0;
 +                    byte count = 0;
                      Iterator<S> it = iterator(filesToMerge);
                      while (it.hasNext()) {
 -                        // Check every so often if interrupted.
 -                        interruptCheck(++count);
 +                        // Check every so often if should continue.
 +                        continueCheck(++count);
                          S element = it.next();
                          element.writeTo(out);
                      }
 @@ -368,9 +409,16 @@ public class MergeSortBuffer<S extends Storable> extends AbstractCollection<S>      }
      public void close() {
 -        clear();
 -        if (mWorkFilePool != null) {
 -            mWorkFilePool.unregisterWorkFileUser(this);
 +        try {
 +            clear();
 +            if (mWorkFilePool != null) {
 +                mWorkFilePool.unregisterWorkFileUser(this);
 +            }
 +        } finally {
 +            Query.Controller controller = mController;
 +            if (controller != null) {
 +                controller.close();
 +            }
          }
      }
 @@ -386,10 +434,20 @@ public class MergeSortBuffer<S extends Storable> extends AbstractCollection<S>          return comparator;
      }
 -    private void interruptCheck(int count) {
 -        if ((count & ~0xff) == 0 && (mStop || Thread.interrupted())) {
 -            close();
 -            throw new UndeclaredThrowableException(new FetchInterruptedException());
 +    private void continueCheck(byte count) {
 +        if (count == 0) {
 +            try {
 +                Query.Controller controller = mController;
 +                if (controller != null) {
 +                    controller.continueCheck();
 +                }
 +                if (mStop) {
 +                    throw new FetchInterruptedException("Shutting down");
 +                }
 +            } catch (FetchException e) {
 +                close();
 +                throw new UndeclaredThrowableException(e);
 +            }
          }
      }
 diff --git a/src/main/java/com/amazon/carbonado/cursor/MultiTransformedCursor.java b/src/main/java/com/amazon/carbonado/cursor/MultiTransformedCursor.java index 8a1bd48..814a3ba 100644 --- a/src/main/java/com/amazon/carbonado/cursor/MultiTransformedCursor.java +++ b/src/main/java/com/amazon/carbonado/cursor/MultiTransformedCursor.java @@ -22,7 +22,6 @@ import java.util.NoSuchElementException;  import com.amazon.carbonado.Cursor;
  import com.amazon.carbonado.FetchException;
 -import com.amazon.carbonado.FetchInterruptedException;
  /**
   * Abstract cursor which wraps another cursor and transforms each storable
 @@ -72,7 +71,6 @@ public abstract class MultiTransformedCursor<S, T> extends AbstractCursor<T> {                  mNextCursor.close();
                  mNextCursor = null;
              }
 -            int count = 0;
              while (mCursor.hasNext()) {
                  Cursor<T> nextCursor = transform(mCursor.next());
                  if (nextCursor != null) {
 @@ -82,7 +80,6 @@ public abstract class MultiTransformedCursor<S, T> extends AbstractCursor<T> {                      }
                      nextCursor.close();
                  }
 -                interruptCheck(++count);
              }
          } catch (NoSuchElementException e) {
          } catch (FetchException e) {
 @@ -129,7 +126,6 @@ public abstract class MultiTransformedCursor<S, T> extends AbstractCursor<T> {                  if ((amount -= chunk) <= 0) {
                      break;
                  }
 -                interruptCheck(count);
              }
              return count;
 @@ -142,11 +138,4 @@ public abstract class MultiTransformedCursor<S, T> extends AbstractCursor<T> {              throw e;
          }
      }
 -
 -    private void interruptCheck(int count) throws FetchException {
 -        if ((count & ~0xff) == 0 && Thread.interrupted()) {
 -            close();
 -            throw new FetchInterruptedException();
 -        }
 -    }
  }
 diff --git a/src/main/java/com/amazon/carbonado/cursor/SortedCursor.java b/src/main/java/com/amazon/carbonado/cursor/SortedCursor.java index 5b5f9b5..e054137 100644 --- a/src/main/java/com/amazon/carbonado/cursor/SortedCursor.java +++ b/src/main/java/com/amazon/carbonado/cursor/SortedCursor.java @@ -32,7 +32,6 @@ import org.cojen.util.BeanProperty;  import org.cojen.classfile.TypeDesc;
  import com.amazon.carbonado.FetchException;
 -import com.amazon.carbonado.FetchInterruptedException;
  import com.amazon.carbonado.Cursor;
  import com.amazon.carbonado.Storable;
 @@ -388,12 +387,7 @@ public class SortedCursor<S> extends AbstractCursor<S> {              fill: {
                  if (matcher == null) {
                      // Buffer up entire results and sort.
 -                    int count = 0;
                      while (cursor.hasNext()) {
 -                        // Check every so often if interrupted.
 -                        if ((++count & ~0xff) == 0 && Thread.interrupted()) {
 -                            throw new FetchInterruptedException();
 -                        }
                          buffer.add(cursor.next());
                      }
                      break fill;
 @@ -413,13 +407,8 @@ public class SortedCursor<S> extends AbstractCursor<S> {                  }
                  buffer.add(chunkStart);
 -                int count = 1;
                  while (cursor.hasNext()) {
 -                    // Check every so often if interrupted.
 -                    if ((++count & ~0xff) == 0 && Thread.interrupted()) {
 -                        throw new FetchInterruptedException();
 -                    }
                      S next = cursor.next();
                      if (matcher.compare(chunkStart, next) != 0) {
                          // Save for reading next chunk later.
 diff --git a/src/main/java/com/amazon/carbonado/cursor/TransformedCursor.java b/src/main/java/com/amazon/carbonado/cursor/TransformedCursor.java index c05e0fc..bd41cef 100644 --- a/src/main/java/com/amazon/carbonado/cursor/TransformedCursor.java +++ b/src/main/java/com/amazon/carbonado/cursor/TransformedCursor.java @@ -22,7 +22,6 @@ import java.util.NoSuchElementException;  import com.amazon.carbonado.Cursor;
  import com.amazon.carbonado.FetchException;
 -import com.amazon.carbonado.FetchInterruptedException;
  /**
   * Abstract cursor which wraps another cursor and transforms each storable
 @@ -64,14 +63,12 @@ public abstract class TransformedCursor<S, T> extends AbstractCursor<T> {              return true;
          }
          try {
 -            int count = 0;
              while (mCursor.hasNext()) {
                  T next = transform(mCursor.next());
                  if (next != null) {
                      mNext = next;
                      return true;
                  }
 -                interruptCheck(++count);
              }
          } catch (NoSuchElementException e) {
          } catch (FetchException e) {
 @@ -115,7 +112,7 @@ public abstract class TransformedCursor<S, T> extends AbstractCursor<T> {          try {
              int count = 0;
              while (--amount >= 0 && hasNext()) {
 -                interruptCheck(++count);
 +                ++count;
                  mNext = null;
              }
 @@ -129,11 +126,4 @@ public abstract class TransformedCursor<S, T> extends AbstractCursor<T> {              throw e;
          }
      }
 -
 -    private void interruptCheck(int count) throws FetchException {
 -        if ((count & ~0xff) == 0 && Thread.interrupted()) {
 -            close();
 -            throw new FetchInterruptedException();
 -        }
 -    }
  }
 diff --git a/src/main/java/com/amazon/carbonado/qe/AbstractQuery.java b/src/main/java/com/amazon/carbonado/qe/AbstractQuery.java index 7e857c3..44735ae 100644 --- a/src/main/java/com/amazon/carbonado/qe/AbstractQuery.java +++ b/src/main/java/com/amazon/carbonado/qe/AbstractQuery.java @@ -61,6 +61,13 @@ public abstract class AbstractQuery<S extends Storable> implements Query<S>, App      }
      @Override
 +    public <T extends S> Cursor<S> fetchAfter(T start, Controller controller)
 +        throws FetchException
 +    {
 +        return after(start).fetch(controller);
 +    }
 +
 +    @Override
      public S loadOne() throws FetchException {
          S obj = tryLoadOne();
          if (obj == null) {
 @@ -70,6 +77,15 @@ public abstract class AbstractQuery<S extends Storable> implements Query<S>, App      }
      @Override
 +    public S loadOne(Controller controller) throws FetchException {
 +        S obj = tryLoadOne(controller);
 +        if (obj == null) {
 +            throw new FetchNoneException(toString());
 +        }
 +        return obj;
 +    }
 +
 +    @Override
      public S tryLoadOne() throws FetchException {
          Cursor<S> cursor = fetch();
          try {
 @@ -88,6 +104,24 @@ public abstract class AbstractQuery<S extends Storable> implements Query<S>, App      }
      @Override
 +    public S tryLoadOne(Controller controller) throws FetchException {
 +        Cursor<S> cursor = fetch(controller);
 +        try {
 +            if (cursor.hasNext()) {
 +                S obj = cursor.next();
 +                if (cursor.hasNext()) {
 +                    throw new FetchMultipleException(toString());
 +                }
 +                return obj;
 +            } else {
 +                return null;
 +            }
 +        } finally {
 +            cursor.close();
 +        }
 +    }
 +
 +    @Override
      public void deleteOne() throws PersistException {
          if (!tryDeleteOne()) {
              throw new PersistNoneException(toString());
 @@ -95,6 +129,13 @@ public abstract class AbstractQuery<S extends Storable> implements Query<S>, App      }
      @Override
 +    public void deleteOne(Controller controller) throws PersistException {
 +        if (!tryDeleteOne(controller)) {
 +            throw new PersistNoneException(toString());
 +        }
 +    }
 +
 +    @Override
      public boolean printNative() {
          try {
              return printNative(System.out);
 diff --git a/src/main/java/com/amazon/carbonado/qe/AbstractQueryExecutor.java b/src/main/java/com/amazon/carbonado/qe/AbstractQueryExecutor.java index e35ec7a..e1921d2 100644 --- a/src/main/java/com/amazon/carbonado/qe/AbstractQueryExecutor.java +++ b/src/main/java/com/amazon/carbonado/qe/AbstractQueryExecutor.java @@ -22,6 +22,7 @@ import java.io.IOException;  import com.amazon.carbonado.Cursor;
  import com.amazon.carbonado.FetchException;
 +import com.amazon.carbonado.Query;
  import com.amazon.carbonado.Storable;
  import com.amazon.carbonado.cursor.LimitCursor;
 @@ -57,6 +58,26 @@ public abstract class AbstractQueryExecutor<S extends Storable> implements Query      }
      /**
 +     * Produces a slice via skip and limit cursors. Subclasses are encouraged
 +     * to override with a more efficient implementation.
 +     *
 +     * @since 1.2
 +     */
 +    public Cursor<S> fetchSlice(FilterValues<S> values, long from, Long to,
 +                                Query.Controller controller)
 +        throws FetchException
 +    {
 +        Cursor<S> cursor = fetch(values, controller);
 +        if (from > 0) {
 +            cursor = new SkipCursor<S>(cursor, from);
 +        }
 +        if (to != null) {
 +            cursor = new LimitCursor<S>(cursor, to - from);
 +        }
 +        return cursor;
 +    }
 +
 +    /**
       * Counts results by opening a cursor and skipping entries. Subclasses are
       * encouraged to override with a more efficient implementation.
       */
 @@ -77,6 +98,26 @@ public abstract class AbstractQueryExecutor<S extends Storable> implements Query      }
      /**
 +     * Counts results by opening a cursor and skipping entries. Subclasses are
 +     * encouraged to override with a more efficient implementation.
 +     */
 +    public long count(FilterValues<S> values, Query.Controller controller) throws FetchException {
 +        Cursor<S> cursor = fetch(values, controller);
 +        try {
 +            long count = cursor.skipNext(Integer.MAX_VALUE);
 +            if (count == Integer.MAX_VALUE) {
 +                int amt;
 +                while ((amt = cursor.skipNext(Integer.MAX_VALUE)) > 0) {
 +                    count += amt;
 +                }
 +            }
 +            return count;
 +        } finally {
 +            cursor.close();
 +        }
 +    }
 +
 +    /**
       * Does nothing and returns false.
       */
      public boolean printNative(Appendable app, int indentLevel, FilterValues<S> values)
 diff --git a/src/main/java/com/amazon/carbonado/qe/DelegatedQueryExecutor.java b/src/main/java/com/amazon/carbonado/qe/DelegatedQueryExecutor.java index f9be52a..fa8d9e5 100644 --- a/src/main/java/com/amazon/carbonado/qe/DelegatedQueryExecutor.java +++ b/src/main/java/com/amazon/carbonado/qe/DelegatedQueryExecutor.java @@ -95,14 +95,31 @@ public class DelegatedQueryExecutor<S extends Storable> implements QueryExecutor          return applyFilterValues(values).fetch();
      }
 +    public Cursor<S> fetch(FilterValues<S> values, Query.Controller controller)
 +        throws FetchException
 +    {
 +        return applyFilterValues(values).fetch(controller);
 +    }
 +
      public Cursor<S> fetchSlice(FilterValues<S> values, long from, Long to) throws FetchException {
          return applyFilterValues(values).fetchSlice(from, to);
      }
 +    public Cursor<S> fetchSlice(FilterValues<S> values, long from, Long to,
 +                                Query.Controller controller)
 +        throws FetchException
 +    {
 +        return applyFilterValues(values).fetchSlice(from, to, controller);
 +    }
 +
      public long count(FilterValues<S> values) throws FetchException {
          return applyFilterValues(values).count();
      }
 +    public long count(FilterValues<S> values, Query.Controller controller) throws FetchException {
 +        return applyFilterValues(values).count(controller);
 +    }
 +
      public Filter<S> getFilter() {
          return mFilter;
      }
 diff --git a/src/main/java/com/amazon/carbonado/qe/EmptyQuery.java b/src/main/java/com/amazon/carbonado/qe/EmptyQuery.java index 62418a1..fc7ea0c 100644 --- a/src/main/java/com/amazon/carbonado/qe/EmptyQuery.java +++ b/src/main/java/com/amazon/carbonado/qe/EmptyQuery.java @@ -241,12 +241,28 @@ public final class EmptyQuery<S extends Storable> extends AbstractQuery<S> {       * Always returns an {@link EmptyCursor}.
       */
      @Override
 +    public Cursor<S> fetch(Controller controller) {
 +        return EmptyCursor.the();
 +    }
 +
 +    /**
 +     * Always returns an {@link EmptyCursor}.
 +     */
 +    @Override
      public Cursor<S> fetchSlice(long from, Long to) {
          checkSliceArguments(from, to);
          return EmptyCursor.the();
      }
      /**
 +     * Always returns an {@link EmptyCursor}.
 +     */
 +    @Override
 +    public Cursor<S> fetchSlice(long from, Long to, Controller controller) {
 +        return fetchSlice(from, to);
 +    }
 +
 +    /**
       * Always throws {@link PersistNoneException}.
       */
      @Override
 @@ -255,6 +271,14 @@ public final class EmptyQuery<S extends Storable> extends AbstractQuery<S> {      }
      /**
 +     * Always throws {@link PersistNoneException}.
 +     */
 +    @Override
 +    public void deleteOne(Controller controller) throws PersistNoneException {
 +        throw new PersistNoneException();
 +    }
 +
 +    /**
       * Always returns false.
       */
      @Override
 @@ -263,6 +287,14 @@ public final class EmptyQuery<S extends Storable> extends AbstractQuery<S> {      }
      /**
 +     * Always returns false.
 +     */
 +    @Override
 +    public boolean tryDeleteOne(Controller controller) {
 +        return false;
 +    }
 +
 +    /**
       * Does nothing.
       */
      @Override
 @@ -270,6 +302,13 @@ public final class EmptyQuery<S extends Storable> extends AbstractQuery<S> {      }
      /**
 +     * Does nothing.
 +     */
 +    @Override
 +    public void deleteAll(Controller controller) {
 +    }
 +
 +    /**
       * Always returns zero.
       */
      @Override
 @@ -278,6 +317,14 @@ public final class EmptyQuery<S extends Storable> extends AbstractQuery<S> {      }
      /**
 +     * Always returns zero.
 +     */
 +    @Override
 +    public long count(Controller controller) {
 +        return 0;
 +    }
 +
 +    /**
       * Always returns false.
       */
      @Override
 @@ -285,6 +332,14 @@ public final class EmptyQuery<S extends Storable> extends AbstractQuery<S> {          return false;
      }
 +    /**
 +     * Always returns false.
 +     */
 +    @Override
 +    public boolean exists(Controller controller) {
 +        return false;
 +    }
 +
      @Override
      public void appendTo(Appendable app) throws IOException {
          app.append("Query {type=");
 diff --git a/src/main/java/com/amazon/carbonado/qe/FilteredQueryExecutor.java b/src/main/java/com/amazon/carbonado/qe/FilteredQueryExecutor.java index 770c1bc..17f3105 100644 --- a/src/main/java/com/amazon/carbonado/qe/FilteredQueryExecutor.java +++ b/src/main/java/com/amazon/carbonado/qe/FilteredQueryExecutor.java @@ -22,6 +22,7 @@ import java.io.IOException;  import com.amazon.carbonado.Cursor;
  import com.amazon.carbonado.FetchException;
 +import com.amazon.carbonado.Query;
  import com.amazon.carbonado.Storable;
  import com.amazon.carbonado.cursor.FilteredCursor;
 @@ -64,6 +65,12 @@ public class FilteredQueryExecutor<S extends Storable> extends AbstractQueryExec          return FilteredCursor.applyFilter(mFilter, values, mExecutor.fetch(values));
      }
 +    public Cursor<S> fetch(FilterValues<S> values, Query.Controller controller)
 +        throws FetchException
 +    {
 +        return FilteredCursor.applyFilter(mFilter, values, mExecutor.fetch(values, controller));
 +    }
 +
      /**
       * Returns the combined filter of the wrapped executor and the extra filter.
       */
 diff --git a/src/main/java/com/amazon/carbonado/qe/FullScanQueryExecutor.java b/src/main/java/com/amazon/carbonado/qe/FullScanQueryExecutor.java index 9c7fd05..9a41fda 100644 --- a/src/main/java/com/amazon/carbonado/qe/FullScanQueryExecutor.java +++ b/src/main/java/com/amazon/carbonado/qe/FullScanQueryExecutor.java @@ -22,6 +22,7 @@ import java.io.IOException;  import com.amazon.carbonado.Cursor;
  import com.amazon.carbonado.FetchException;
 +import com.amazon.carbonado.Query;
  import com.amazon.carbonado.Storable;
  import com.amazon.carbonado.filter.Filter;
 @@ -58,6 +59,15 @@ public class FullScanQueryExecutor<S extends Storable> extends AbstractQueryExec          return count;
      }
 +    @Override
 +    public long count(FilterValues<S> values, Query.Controller controller) throws FetchException {
 +        long count = mSupport.countAll(controller);
 +        if (count == -1) {
 +            count = super.count(values, controller);
 +        }
 +        return count;
 +    }
 +
      /**
       * Returns an open filter.
       */
 @@ -69,6 +79,12 @@ public class FullScanQueryExecutor<S extends Storable> extends AbstractQueryExec          return mSupport.fetchAll();
      }
 +    public Cursor<S> fetch(FilterValues<S> values, Query.Controller controller)
 +        throws FetchException
 +    {
 +        return mSupport.fetchAll(controller);
 +    }
 +
      /**
       * Returns an empty list.
       */
 @@ -99,8 +115,23 @@ public class FullScanQueryExecutor<S extends Storable> extends AbstractQueryExec          long countAll() throws FetchException;
          /**
 +         * Counts all Storables. Implementation may return -1 to indicate that
 +         * default count algorithm should be used.
 +         *
 +         * @param controller optional controller which can abort query operation
 +         */
 +        long countAll(Query.Controller controller) throws FetchException;
 +
 +        /**
           * Perform a full scan of all Storables.
           */
          Cursor<S> fetchAll() throws FetchException;
 +
 +        /**
 +         * Perform a full scan of all Storables.
 +         *
 +         * @param controller optional controller which can abort query operation
 +         */
 +        Cursor<S> fetchAll(Query.Controller controller) throws FetchException;
      }
  }
 diff --git a/src/main/java/com/amazon/carbonado/qe/IndexedQueryExecutor.java b/src/main/java/com/amazon/carbonado/qe/IndexedQueryExecutor.java index c3853d2..b8a39b2 100644 --- a/src/main/java/com/amazon/carbonado/qe/IndexedQueryExecutor.java +++ b/src/main/java/com/amazon/carbonado/qe/IndexedQueryExecutor.java @@ -120,6 +120,12 @@ public class IndexedQueryExecutor<S extends Storable> extends AbstractQueryExecu      }
      public Cursor<S> fetch(FilterValues<S> values) throws FetchException {
 +        return fetch(values, null);
 +    }
 +
 +    public Cursor<S> fetch(FilterValues<S> values, Query.Controller controller)
 +        throws FetchException
 +    {
          Object[] identityValues = null;
          Object rangeStartValue = null;
          Object rangeEndValue = null;
 @@ -182,7 +188,8 @@ public class IndexedQueryExecutor<S extends Storable> extends AbstractQueryExecu                                          rangeStartBoundary, rangeStartValue,
                                          rangeEndBoundary, rangeEndValue,
                                          mReverseRange,
 -                                        mReverseOrder);
 +                                        mReverseOrder,
 +                                        controller);
          } else {
              indexEntryQuery = indexEntryQuery.withValues(identityValues);
              if (rangeStartBoundary != BoundaryType.OPEN) {
 @@ -194,7 +201,7 @@ public class IndexedQueryExecutor<S extends Storable> extends AbstractQueryExecu              if (mCoveringFilter != null && values != null) {
                  indexEntryQuery = indexEntryQuery.withValues(values.getValuesFor(mCoveringFilter));
              }
 -            return mSupport.fetchFromIndexEntryQuery(mIndex, indexEntryQuery);
 +            return mSupport.fetchFromIndexEntryQuery(mIndex, indexEntryQuery, controller);
          }
      }
 @@ -415,6 +422,20 @@ public class IndexedQueryExecutor<S extends Storable> extends AbstractQueryExecu              throws FetchException;
          /**
 +         * Fetch Storables referenced by the given index entry query. This
 +         * method is only called if index supports query access.
 +         *
 +         * @param index index to open
 +         * @param indexEntryQuery query with no blank parameters, derived from
 +         * the query returned by indexEntryQuery
 +         * @param controller optional controller which can abort query operation
 +         * @since 1.2
 +         */
 +        Cursor<S> fetchFromIndexEntryQuery(StorableIndex<S> index, Query<?> indexEntryQuery,
 +                                           Query.Controller controller)
 +            throws FetchException;
 +
 +        /**
           * Perform an index scan of a subset of Storables referenced by an
           * index. The identity values are aligned with the index properties at
           * property 0. An optional range start or range end aligns with the index
 @@ -445,5 +466,39 @@ public class IndexedQueryExecutor<S extends Storable> extends AbstractQueryExecu                                boolean reverseRange,
                                boolean reverseOrder)
              throws FetchException;
 +
 +        /**
 +         * Perform an index scan of a subset of Storables referenced by an
 +         * index. The identity values are aligned with the index properties at
 +         * property 0. An optional range start or range end aligns with the index
 +         * property following the last of the identity values.
 +         *
 +         * <p>This method is only called if no index entry query was provided
 +         * for the given index.
 +         *
 +         * @param index index to open, which may be a primary key index
 +         * @param identityValues optional list of exactly matching values to apply to index
 +         * @param rangeStartBoundary start boundary type
 +         * @param rangeStartValue value to start at if boundary is not open
 +         * @param rangeEndBoundary end boundary type
 +         * @param rangeEndValue value to end at if boundary is not open
 +         * @param reverseRange indicates that range operates on a property whose
 +         * natural order is descending. Only the code that opens the physical
 +         * cursor should examine this parameter. If true, then the range start and
 +         * end parameter pairs need to be swapped.
 +         * @param reverseOrder when true, iteration should be reversed from its
 +         * natural order
 +         * @param controller optional controller which can abort query operation
 +         */
 +        Cursor<S> fetchSubset(StorableIndex<S> index,
 +                              Object[] identityValues,
 +                              BoundaryType rangeStartBoundary,
 +                              Object rangeStartValue,
 +                              BoundaryType rangeEndBoundary,
 +                              Object rangeEndValue,
 +                              boolean reverseRange,
 +                              boolean reverseOrder,
 +                              Query.Controller controller)
 +            throws FetchException;
      }
  }
 diff --git a/src/main/java/com/amazon/carbonado/qe/IterableQueryExecutor.java b/src/main/java/com/amazon/carbonado/qe/IterableQueryExecutor.java index 9e04a32..2623bb3 100644 --- a/src/main/java/com/amazon/carbonado/qe/IterableQueryExecutor.java +++ b/src/main/java/com/amazon/carbonado/qe/IterableQueryExecutor.java @@ -23,11 +23,13 @@ import java.io.IOException;  import java.util.concurrent.locks.Lock;
  import com.amazon.carbonado.Cursor;
 +import com.amazon.carbonado.Query;
  import com.amazon.carbonado.Storable;
  import com.amazon.carbonado.filter.Filter;
  import com.amazon.carbonado.filter.FilterValues;
 +import com.amazon.carbonado.cursor.ControllerCursor;
  import com.amazon.carbonado.cursor.IteratorCursor;
  /**
 @@ -76,6 +78,10 @@ public class IterableQueryExecutor<S extends Storable> extends AbstractQueryExec          return new IteratorCursor<S>(mIterable, mLock);
      }
 +    public Cursor<S> fetch(FilterValues<S> values, Query.Controller controller) {
 +        return ControllerCursor.apply(new IteratorCursor<S>(mIterable, mLock), controller);
 +    }
 +
      /**
       * Returns an empty list.
       */
 diff --git a/src/main/java/com/amazon/carbonado/qe/JoinedQueryExecutor.java b/src/main/java/com/amazon/carbonado/qe/JoinedQueryExecutor.java index 9a77f26..4e98df4 100644 --- a/src/main/java/com/amazon/carbonado/qe/JoinedQueryExecutor.java +++ b/src/main/java/com/amazon/carbonado/qe/JoinedQueryExecutor.java @@ -36,6 +36,7 @@ import org.cojen.util.ClassInjector;  import com.amazon.carbonado.Cursor;
  import com.amazon.carbonado.FetchException;
  import com.amazon.carbonado.RepositoryException;
 +import com.amazon.carbonado.Query;
  import com.amazon.carbonado.Storable;
  import com.amazon.carbonado.cursor.MultiTransformedCursor;
 @@ -190,6 +191,7 @@ public class JoinedQueryExecutor<S extends Storable, T extends Storable>      private static final String INNER_LOOP_EX_FIELD_NAME = "innerLoopExecutor";
      private static final String INNER_LOOP_FV_FIELD_NAME = "innerLoopFilterValues";
 +    private static final String INNER_LOOP_CONTROLLER_FIELD_NAME = "innerLoopController";
      private static final String ACTIVE_SOURCE_FIELD_NAME = "active";
      private static final SoftValuedCache<StorableProperty, Class> cJoinerCursorClassCache;
 @@ -240,11 +242,14 @@ public class JoinedQueryExecutor<S extends Storable, T extends Storable>          final TypeDesc cursorType = TypeDesc.forClass(Cursor.class);
          final TypeDesc queryExecutorType = TypeDesc.forClass(QueryExecutor.class);
          final TypeDesc filterValuesType = TypeDesc.forClass(FilterValues.class);
 +        final TypeDesc controllerType = TypeDesc.forClass(Query.Controller.class);
          // Define fields for inner loop executor and filter values, which are
          // passed into the constructor.
          cf.addField(Modifiers.PRIVATE.toFinal(true), INNER_LOOP_EX_FIELD_NAME, queryExecutorType);
          cf.addField(Modifiers.PRIVATE.toFinal(true), INNER_LOOP_FV_FIELD_NAME, filterValuesType);
 +        cf.addField(Modifiers.PRIVATE.toFinal(true),
 +                    INNER_LOOP_CONTROLLER_FIELD_NAME, controllerType);
          // If target storable can set a reference to the joined source
          // storable, then stash a copy of it as we go. This way, when user of
 @@ -259,7 +264,7 @@ public class JoinedQueryExecutor<S extends Storable, T extends Storable>          // Define constructor.
          {
 -            TypeDesc[] params = {cursorType, queryExecutorType, filterValuesType};
 +            TypeDesc[] params = {cursorType, queryExecutorType, filterValuesType, controllerType};
              MethodInfo mi = cf.addConstructor(Modifiers.PUBLIC, params);
              CodeBuilder b = new CodeBuilder(mi);
 @@ -276,6 +281,10 @@ public class JoinedQueryExecutor<S extends Storable, T extends Storable>              b.loadLocal(b.getParameter(2));
              b.storeField(INNER_LOOP_FV_FIELD_NAME, filterValuesType);
 +            b.loadThis();
 +            b.loadLocal(b.getParameter(3));
 +            b.storeField(INNER_LOOP_CONTROLLER_FIELD_NAME, controllerType);
 +
              b.returnVoid();
          }
 @@ -317,9 +326,12 @@ public class JoinedQueryExecutor<S extends Storable, T extends Storable>                                  new TypeDesc[] {bindType});
              }
 +            b.loadThis();
 +            b.loadField(INNER_LOOP_CONTROLLER_FIELD_NAME, controllerType);
 +
              // Now fetch and return.
              b.invokeInterface(queryExecutorType, "fetch", cursorType,
 -                              new TypeDesc[] {filterValuesType});
 +                              new TypeDesc[] {filterValuesType, controllerType});
              b.returnValue(cursorType);
          }
 @@ -570,6 +582,12 @@ public class JoinedQueryExecutor<S extends Storable, T extends Storable>      }
      public Cursor<T> fetch(FilterValues<T> values) throws FetchException {
 +        return fetch(values, null);
 +    }
 +
 +    public Cursor<T> fetch(FilterValues<T> values, Query.Controller controller)
 +        throws FetchException
 +    {
          FilterValues<T> innerLoopFilterValues = mInnerLoopFilterValues;
          if (mTargetFilter != null) {
 @@ -578,10 +596,14 @@ public class JoinedQueryExecutor<S extends Storable, T extends Storable>                  .withValues(values.getValuesFor(mTargetFilter));
          }
 -        Cursor<S> outerLoopCursor = mOuterLoopExecutor.fetch(transferValues(values));
 +        if (controller != null) {
 +            controller.begin();
 +        }
 +
 +        Cursor<S> outerLoopCursor = mOuterLoopExecutor.fetch(transferValues(values), controller);
          return mJoinerFactory.newJoinedCursor
 -            (outerLoopCursor, mInnerLoopExecutor, innerLoopFilterValues);
 +            (outerLoopCursor, mInnerLoopExecutor, innerLoopFilterValues, controller);
      }
      public Filter<T> getFilter() {
 @@ -628,7 +650,8 @@ public class JoinedQueryExecutor<S extends Storable, T extends Storable>          public static interface Factory<S, T extends Storable> {
              Cursor<T> newJoinedCursor(Cursor<S> outerLoopCursor,
                                        QueryExecutor<T> innerLoopExecutor,
 -                                      FilterValues<T> innerLoopFilterValues);
 +                                      FilterValues<T> innerLoopFilterValues,
 +                                      Query.Controller innerLoopController);
          }
      }
  }
 diff --git a/src/main/java/com/amazon/carbonado/qe/KeyQueryExecutor.java b/src/main/java/com/amazon/carbonado/qe/KeyQueryExecutor.java index 1cc0b59..dacda80 100644 --- a/src/main/java/com/amazon/carbonado/qe/KeyQueryExecutor.java +++ b/src/main/java/com/amazon/carbonado/qe/KeyQueryExecutor.java @@ -22,6 +22,7 @@ import java.io.IOException;  import com.amazon.carbonado.Cursor;
  import com.amazon.carbonado.FetchException;
 +import com.amazon.carbonado.Query;
  import com.amazon.carbonado.Storable;
  import com.amazon.carbonado.filter.Filter;
 @@ -72,6 +73,12 @@ public class KeyQueryExecutor<S extends Storable> extends AbstractQueryExecutor<          return mSupport.fetchOne(mIndex, values.getValuesFor(mKeyFilter));
      }
 +    public Cursor<S> fetch(FilterValues<S> values, Query.Controller controller)
 +        throws FetchException
 +    {
 +        return mSupport.fetchOne(mIndex, values.getValuesFor(mKeyFilter), controller);
 +    }
 +
      public Filter<S> getFilter() {
          return mKeyFilter;
      }
 @@ -115,5 +122,18 @@ public class KeyQueryExecutor<S extends Storable> extends AbstractQueryExecutor<           */
          Cursor<S> fetchOne(StorableIndex<S> index, Object[] identityValues)
              throws FetchException;
 +
 +        /**
 +         * Select at most one Storable referenced by an index. The identity
 +         * values fully specify all elements of the index, and the index is
 +         * unique.
 +         *
 +         * @param controller optional controller which can abort query operation
 +         * @param index index to open, which may be a primary key index
 +         * @param identityValues of exactly matching values to apply to index
 +         */
 +        Cursor<S> fetchOne(StorableIndex<S> index, Object[] identityValues,
 +                           Query.Controller controller)
 +            throws FetchException;
      }
  }
 diff --git a/src/main/java/com/amazon/carbonado/qe/QueryExecutor.java b/src/main/java/com/amazon/carbonado/qe/QueryExecutor.java index ab310b0..9ba259b 100644 --- a/src/main/java/com/amazon/carbonado/qe/QueryExecutor.java +++ b/src/main/java/com/amazon/carbonado/qe/QueryExecutor.java @@ -22,6 +22,7 @@ import java.io.IOException;  import com.amazon.carbonado.Cursor;
  import com.amazon.carbonado.FetchException;
 +import com.amazon.carbonado.Query;
  import com.amazon.carbonado.Storable;
  import com.amazon.carbonado.filter.Filter;
 @@ -46,6 +47,13 @@ public interface QueryExecutor<S extends Storable> {      Cursor<S> fetch(FilterValues<S> values) throws FetchException;
      /**
 +     * Returns a new cursor using the given filter values.
 +     *
 +     * @param controller optional controller which can abort query operation
 +     */
 +    Cursor<S> fetch(FilterValues<S> values, Query.Controller controller) throws FetchException;
 +
 +    /**
       * Returns a new cursor using the given filter values and slice.
       *
       * @since 1.2
 @@ -53,11 +61,27 @@ public interface QueryExecutor<S extends Storable> {      Cursor<S> fetchSlice(FilterValues<S> values, long from, Long to) throws FetchException;
      /**
 +     * Returns a new cursor using the given filter values and slice.
 +     *
 +     * @param controller optional controller which can abort query operation
 +     * @since 1.2
 +     */
 +    Cursor<S> fetchSlice(FilterValues<S> values, long from, Long to, Query.Controller controller)
 +        throws FetchException;
 +
 +    /**
       * Counts the query results using the given filter values.
       */
      long count(FilterValues<S> values) throws FetchException;
      /**
 +     * Counts the query results using the given filter values.
 +     *
 +     * @param controller optional controller which can abort query operation
 +     */
 +    long count(FilterValues<S> values, Query.Controller controller) throws FetchException;
 +
 +    /**
       * Returns the filter used by this QueryExecutor.
       *
       * @return query filter, never null
 diff --git a/src/main/java/com/amazon/carbonado/qe/SortedQueryExecutor.java b/src/main/java/com/amazon/carbonado/qe/SortedQueryExecutor.java index 82d0ef6..67f739d 100644 --- a/src/main/java/com/amazon/carbonado/qe/SortedQueryExecutor.java +++ b/src/main/java/com/amazon/carbonado/qe/SortedQueryExecutor.java @@ -24,9 +24,11 @@ import java.util.Comparator;  import com.amazon.carbonado.Cursor;
  import com.amazon.carbonado.FetchException;
 +import com.amazon.carbonado.Query;
  import com.amazon.carbonado.Storable;
  import com.amazon.carbonado.cursor.ArraySortBuffer;
 +import com.amazon.carbonado.cursor.ControllerCursor;
  import com.amazon.carbonado.cursor.MergeSortBuffer;
  import com.amazon.carbonado.cursor.SortBuffer;
  import com.amazon.carbonado.cursor.SortedCursor;
 @@ -106,11 +108,28 @@ public class SortedQueryExecutor<S extends Storable> extends AbstractQueryExecut          return new SortedCursor<S>(cursor, buffer, mHandledComparator, mFinisherComparator);
      }
 +    public Cursor<S> fetch(FilterValues<S> values, Query.Controller controller)
 +        throws FetchException
 +    {
 +        Cursor<S> cursor = mExecutor.fetch(values, controller);
 +        SortBuffer<S> buffer = mSupport.createSortBuffer(controller);
 +        // Apply the controller around the cursor to ensure timeouts are
 +        // honored even when the caller is slowly iterating over the cursor.
 +        return ControllerCursor.apply
 +            (new SortedCursor<S>(cursor, buffer, mHandledComparator, mFinisherComparator),
 +             controller);
 +    }
 +
      @Override
      public long count(FilterValues<S> values) throws FetchException {
          return mExecutor.count(values);
      }
 +    @Override
 +    public long count(FilterValues<S> values, Query.Controller controller) throws FetchException {
 +        return mExecutor.count(values, controller);
 +    }
 +
      public Filter<S> getFilter() {
          return mExecutor.getFilter();
      }
 @@ -152,6 +171,13 @@ public class SortedQueryExecutor<S extends Storable> extends AbstractQueryExecut           * Implementation must return an empty buffer for sorting.
           */
          SortBuffer<S> createSortBuffer();
 +
 +        /**
 +         * Implementation must return an empty buffer for sorting.
 +         *
 +         * @param controller optional controller which can abort query operation
 +         */
 +        SortBuffer<S> createSortBuffer(Query.Controller controller);
      }
      /**
 @@ -164,6 +190,14 @@ public class SortedQueryExecutor<S extends Storable> extends AbstractQueryExecut          public SortBuffer<S> createSortBuffer() {
              return new ArraySortBuffer<S>();
          }
 +
 +        /**
 +         * Returns a new ArraySortBuffer.
 +         */
 +        public SortBuffer<S> createSortBuffer(Query.Controller controller) {
 +            // Java sort utility doesn't support any kind of controller.
 +            return new ArraySortBuffer<S>();
 +        }
      }
      /**
 @@ -176,5 +210,12 @@ public class SortedQueryExecutor<S extends Storable> extends AbstractQueryExecut          public SortBuffer<S> createSortBuffer() {
              return new MergeSortBuffer<S>();
          }
 +
 +        /**
 +         * Returns a new MergeSortBuffer.
 +         */
 +        public SortBuffer<S> createSortBuffer(Query.Controller controller) {
 +            return new MergeSortBuffer<S>(controller);
 +        }
      }
  }
 diff --git a/src/main/java/com/amazon/carbonado/qe/StandardQuery.java b/src/main/java/com/amazon/carbonado/qe/StandardQuery.java index 0349ae2..bce32ed 100644 --- a/src/main/java/com/amazon/carbonado/qe/StandardQuery.java +++ b/src/main/java/com/amazon/carbonado/qe/StandardQuery.java @@ -288,14 +288,28 @@ public abstract class StandardQuery<S extends Storable> extends AbstractQuery<S>      }
      @Override
 +    public Cursor<S> fetch(Controller controller) throws FetchException {
 +        try {
 +            return executor().fetch(mValues, controller);
 +        } catch (RepositoryException e) {
 +            throw e.toFetchException();
 +        }
 +    }
 +
 +    @Override
      public Cursor<S> fetchSlice(long from, Long to) throws FetchException {
 +        return fetchSlice(from, to, null);
 +    }
 +
 +    @Override
 +    public Cursor<S> fetchSlice(long from, Long to, Controller controller) throws FetchException {
          if (!checkSliceArguments(from, to)) {
 -            return fetch();
 +            return fetch(controller);
          }
          try {
              QueryHints hints = QueryHints.emptyHints().with(QueryHint.CONSUME_SLICE);
              return executorFactory().executor(mFilter, mOrdering, hints)
 -                .fetchSlice(mValues, from, to);
 +                .fetchSlice(mValues, from, to, controller);
          } catch (RepositoryException e) {
              throw e.toFetchException();
          }
 @@ -303,9 +317,14 @@ public abstract class StandardQuery<S extends Storable> extends AbstractQuery<S>      @Override
      public boolean tryDeleteOne() throws PersistException {
 +        return tryDeleteOne(null);
 +    }
 +
 +    @Override
 +    public boolean tryDeleteOne(Controller controller) throws PersistException {
          Transaction txn = enterTransaction(IsolationLevel.READ_COMMITTED);
          try {
 -            Cursor<S> cursor = fetch();
 +            Cursor<S> cursor = fetch(controller);
              boolean result;
              try {
                  if (cursor.hasNext()) {
 @@ -335,9 +354,14 @@ public abstract class StandardQuery<S extends Storable> extends AbstractQuery<S>      @Override
      public void deleteAll() throws PersistException {
 +        deleteAll(null);
 +    }
 +
 +    @Override
 +    public void deleteAll(Controller controller) throws PersistException {
          Transaction txn = enterTransaction(IsolationLevel.READ_COMMITTED);
          try {
 -            Cursor<S> cursor = fetch();
 +            Cursor<S> cursor = fetch(controller);
              try {
                  while (cursor.hasNext()) {
                      cursor.next().tryDelete();
 @@ -367,8 +391,22 @@ public abstract class StandardQuery<S extends Storable> extends AbstractQuery<S>      }
      @Override
 +    public long count(Controller controller) throws FetchException {
 +        try {
 +            return executor().count(mValues, controller);
 +        } catch (RepositoryException e) {
 +            throw e.toFetchException();
 +        }
 +    }
 +
 +    @Override
      public boolean exists() throws FetchException {
 -        Cursor<S> cursor = fetchSlice(0L, 1L);
 +        return exists(null);
 +    }
 +
 +    @Override
 +    public boolean exists(Controller controller) throws FetchException {
 +        Cursor<S> cursor = fetchSlice(0L, 1L, controller);
          try {
              return cursor.skipNext(1) > 0;
          } finally {
 diff --git a/src/main/java/com/amazon/carbonado/qe/UnionQueryExecutor.java b/src/main/java/com/amazon/carbonado/qe/UnionQueryExecutor.java index 09a3174..48152bb 100644 --- a/src/main/java/com/amazon/carbonado/qe/UnionQueryExecutor.java +++ b/src/main/java/com/amazon/carbonado/qe/UnionQueryExecutor.java @@ -26,6 +26,7 @@ import java.util.List;  import com.amazon.carbonado.Cursor;
  import com.amazon.carbonado.FetchException;
 +import com.amazon.carbonado.Query;
  import com.amazon.carbonado.Storable;
  import com.amazon.carbonado.cursor.SortedCursor;
 @@ -97,9 +98,15 @@ public class UnionQueryExecutor<S extends Storable> extends AbstractQueryExecuto      }
      public Cursor<S> fetch(FilterValues<S> values) throws FetchException {
 +        return fetch(values, null);
 +    }
 +
 +    public Cursor<S> fetch(FilterValues<S> values, Query.Controller controller)
 +        throws FetchException
 +    {
          Cursor<S> cursor = null;
          for (QueryExecutor<S> executor : mExecutors) {
 -            Cursor<S> subCursor = executor.fetch(values);
 +            Cursor<S> subCursor = executor.fetch(values, controller);
              cursor = (cursor == null) ? subCursor
                  : new UnionCursor<S>(cursor, subCursor, mOrderComparator);
          }
 diff --git a/src/main/java/com/amazon/carbonado/raw/GenericStorableCodecFactory.java b/src/main/java/com/amazon/carbonado/raw/GenericStorableCodecFactory.java index 6c9363f..2b15294 100644 --- a/src/main/java/com/amazon/carbonado/raw/GenericStorableCodecFactory.java +++ b/src/main/java/com/amazon/carbonado/raw/GenericStorableCodecFactory.java @@ -89,8 +89,9 @@ public class GenericStorableCodecFactory implements StorableCodecFactory {                                                                      RawSupport support)
          throws SupportException
      {
 +        LayoutOptions options = layout == null ? getLayoutOptions(type) : layout.getOptions();
          return GenericStorableCodec.getInstance
 -            (this, createStrategy(type, pkIndex, null), isMaster, layout, support);
 +            (this, createStrategy(type, pkIndex, options), isMaster, layout, support);
      }
      /**
 diff --git a/src/main/java/com/amazon/carbonado/repo/indexed/IndexedStorage.java b/src/main/java/com/amazon/carbonado/repo/indexed/IndexedStorage.java index 4635f2c..d115a3b 100644 --- a/src/main/java/com/amazon/carbonado/repo/indexed/IndexedStorage.java +++ b/src/main/java/com/amazon/carbonado/repo/indexed/IndexedStorage.java @@ -191,14 +191,26 @@ class IndexedStorage<S extends Storable> implements Storage<S>, StorageAccess<S>          return new MergeSortBuffer<S>();
      }
 +    public SortBuffer<S> createSortBuffer(Query.Controller controller) {
 +        return new MergeSortBuffer<S>(controller);
 +    }
 +
      public long countAll() throws FetchException {
          return mMasterStorage.query().count();
      }
 +    public long countAll(Query.Controller controller) throws FetchException {
 +        return mMasterStorage.query().count(controller);
 +    }
 +
      public Cursor<S> fetchAll() throws FetchException {
          return mMasterStorage.query().fetch();
      }
 +    public Cursor<S> fetchAll(Query.Controller controller) throws FetchException {
 +        return mMasterStorage.query().fetch(controller);
 +    }
 +
      public Cursor<S> fetchOne(StorableIndex<S> index,
                                Object[] identityValues)
          throws FetchException
 @@ -207,6 +219,15 @@ class IndexedStorage<S extends Storable> implements Storage<S>, StorageAccess<S>          return indexInfo.fetchOne(this, identityValues);
      }
 +    public Cursor<S> fetchOne(StorableIndex<S> index,
 +                              Object[] identityValues,
 +                              Query.Controller controller)
 +        throws FetchException
 +    {
 +        ManagedIndex<S> indexInfo = (ManagedIndex<S>) mAllIndexInfoMap.get(index);
 +        return indexInfo.fetchOne(this, identityValues, controller);
 +    }
 +
      public Query<?> indexEntryQuery(StorableIndex<S> index)
          throws FetchException
      {
 @@ -221,6 +242,14 @@ class IndexedStorage<S extends Storable> implements Storage<S>, StorageAccess<S>          return indexInfo.fetchFromIndexEntryQuery(this, indexEntryQuery);
      }
 +    public Cursor<S> fetchFromIndexEntryQuery(StorableIndex<S> index, Query<?> indexEntryQuery,
 +                                              Query.Controller controller)
 +        throws FetchException
 +    {
 +        ManagedIndex<S> indexInfo = (ManagedIndex<S>) mAllIndexInfoMap.get(index);
 +        return indexInfo.fetchFromIndexEntryQuery(this, indexEntryQuery, controller);
 +    }
 +
      public Cursor<S> fetchSubset(StorableIndex<S> index,
                                   Object[] identityValues,
                                   BoundaryType rangeStartBoundary,
 @@ -235,6 +264,21 @@ class IndexedStorage<S extends Storable> implements Storage<S>, StorageAccess<S>          throw new UnsupportedOperationException();
      }
 +    public Cursor<S> fetchSubset(StorableIndex<S> index,
 +                                 Object[] identityValues,
 +                                 BoundaryType rangeStartBoundary,
 +                                 Object rangeStartValue,
 +                                 BoundaryType rangeEndBoundary,
 +                                 Object rangeEndValue,
 +                                 boolean reverseRange,
 +                                 boolean reverseOrder,
 +                                 Query.Controller controller)
 +        throws FetchException
 +    {
 +        // This method should never be called since a query was returned by indexEntryQuery.
 +        throw new UnsupportedOperationException();
 +    }
 +
      private void registerIndex(ManagedIndex<S> managedIndex)
          throws RepositoryException
      {
 diff --git a/src/main/java/com/amazon/carbonado/repo/indexed/ManagedIndex.java b/src/main/java/com/amazon/carbonado/repo/indexed/ManagedIndex.java index 2bb2f49..6c28619 100644 --- a/src/main/java/com/amazon/carbonado/repo/indexed/ManagedIndex.java +++ b/src/main/java/com/amazon/carbonado/repo/indexed/ManagedIndex.java @@ -173,6 +173,13 @@ class ManagedIndex<S extends Storable> implements IndexEntryAccessor<S> {      Cursor<S> fetchOne(IndexedStorage storage, Object[] identityValues)
          throws FetchException
      {
 +        return fetchOne(storage, identityValues, null);
 +    }
 +
 +    Cursor<S> fetchOne(IndexedStorage storage, Object[] identityValues,
 +                       Query.Controller controller)
 +        throws FetchException
 +    {
          Query<?> query = mSingleMatchQuery;
          if (query == null) {
 @@ -184,13 +191,20 @@ class ManagedIndex<S extends Storable> implements IndexEntryAccessor<S> {              mSingleMatchQuery = query = mIndexEntryStorage.query(filter);
          }
 -        return fetchFromIndexEntryQuery(storage, query.withValues(identityValues));
 +        return fetchFromIndexEntryQuery(storage, query.withValues(identityValues), controller);
      }
      Cursor<S> fetchFromIndexEntryQuery(IndexedStorage storage, Query<?> indexEntryQuery)
          throws FetchException
      {
 -        return new IndexedCursor<S>(indexEntryQuery.fetch(), storage, mAccessor);
 +        return fetchFromIndexEntryQuery(storage, indexEntryQuery, null);
 +    }
 +
 +    Cursor<S> fetchFromIndexEntryQuery(IndexedStorage storage, Query<?> indexEntryQuery,
 +                                       Query.Controller controller)
 +        throws FetchException
 +    {
 +        return new IndexedCursor<S>(indexEntryQuery.fetch(controller), storage, mAccessor);
      }
      @Override
 diff --git a/src/main/java/com/amazon/carbonado/repo/jdbc/H2ExceptionTransformer.java b/src/main/java/com/amazon/carbonado/repo/jdbc/H2ExceptionTransformer.java index e760f58..d10856a 100644 --- a/src/main/java/com/amazon/carbonado/repo/jdbc/H2ExceptionTransformer.java +++ b/src/main/java/com/amazon/carbonado/repo/jdbc/H2ExceptionTransformer.java @@ -28,9 +28,15 @@ import java.sql.SQLException;   */
  class H2ExceptionTransformer extends JDBCExceptionTransformer {
      public static int DUPLICATE_KEY = 23001;
 +    public static int PROCESSING_CANCELED = 90051;
      @Override
      public boolean isUniqueConstraintError(SQLException e) {
 -        return DUPLICATE_KEY == e.getErrorCode();
 +        return super.isUniqueConstraintError(e) || DUPLICATE_KEY == e.getErrorCode();
 +    }
 +
 +    @Override
 +    public boolean isTimeoutError(SQLException e) {
 +        return super.isTimeoutError(e) || PROCESSING_CANCELED == e.getErrorCode();
      }
  }
 diff --git a/src/main/java/com/amazon/carbonado/repo/jdbc/JDBCExceptionTransformer.java b/src/main/java/com/amazon/carbonado/repo/jdbc/JDBCExceptionTransformer.java index f72c717..4780140 100644 --- a/src/main/java/com/amazon/carbonado/repo/jdbc/JDBCExceptionTransformer.java +++ b/src/main/java/com/amazon/carbonado/repo/jdbc/JDBCExceptionTransformer.java @@ -23,9 +23,11 @@ import java.sql.SQLException;  import com.amazon.carbonado.ConstraintException;
  import com.amazon.carbonado.FetchDeadlockException;
  import com.amazon.carbonado.FetchException;
 +import com.amazon.carbonado.FetchTimeoutException;
  import com.amazon.carbonado.PersistDeadlockException;
  import com.amazon.carbonado.PersistDeniedException;
  import com.amazon.carbonado.PersistException;
 +import com.amazon.carbonado.PersistTimeoutException;
  import com.amazon.carbonado.UniqueConstraintException;
  import com.amazon.carbonado.spi.ExceptionTransformer;
 @@ -59,6 +61,8 @@ class JDBCExceptionTransformer extends ExceptionTransformer {       */
      public static String SQLSTATE_DEADLOCK_WITH_ROLLBACK = "40001";
 +    public static String SQLSTATE_PROCESSING_CANCELED = "57014";
 +
      /**
       * Examines the SQLSTATE code of the given SQL exception and determines if
       * it is a generic constaint violation.
 @@ -103,6 +107,16 @@ class JDBCExceptionTransformer extends ExceptionTransformer {          return false;
      }
 +    public boolean isTimeoutError(SQLException e) {
 +        if (e != null) {
 +            String sqlstate = e.getSQLState();
 +            if (sqlstate != null) {
 +                return SQLSTATE_PROCESSING_CANCELED.equals(sqlstate);
 +            }
 +        }
 +        return false;
 +    }
 +
      JDBCExceptionTransformer() {
      }
 @@ -117,6 +131,9 @@ class JDBCExceptionTransformer extends ExceptionTransformer {              if (isDeadlockError(se)) {
                  return new FetchDeadlockException(e);
              }
 +            if (isTimeoutError(se)) {
 +                return new FetchTimeoutException(e);
 +            }
          }
          return null;
      }
 @@ -141,6 +158,9 @@ class JDBCExceptionTransformer extends ExceptionTransformer {              if (isDeadlockError(se)) {
                  return new PersistDeadlockException(e);
              }
 +            if (isTimeoutError(se)) {
 +                return new PersistTimeoutException(e);
 +            }
          }
          return null;
      }
 diff --git a/src/main/java/com/amazon/carbonado/repo/jdbc/JDBCStorage.java b/src/main/java/com/amazon/carbonado/repo/jdbc/JDBCStorage.java index a49a150..bbe5589 100644 --- a/src/main/java/com/amazon/carbonado/repo/jdbc/JDBCStorage.java +++ b/src/main/java/com/amazon/carbonado/repo/jdbc/JDBCStorage.java @@ -27,6 +27,7 @@ import java.sql.ResultSet;  import java.sql.SQLException;
  import java.util.List;
  import java.util.Map;
 +import java.util.concurrent.TimeUnit;
  import org.apache.commons.logging.LogFactory;
 @@ -34,6 +35,7 @@ import com.amazon.carbonado.Cursor;  import com.amazon.carbonado.FetchException;
  import com.amazon.carbonado.IsolationLevel;
  import com.amazon.carbonado.PersistException;
 +import com.amazon.carbonado.Query;
  import com.amazon.carbonado.Repository;
  import com.amazon.carbonado.RepositoryException;
  import com.amazon.carbonado.Storable;
 @@ -42,6 +44,7 @@ import com.amazon.carbonado.SupportException;  import com.amazon.carbonado.Transaction;
  import com.amazon.carbonado.Trigger;
  import com.amazon.carbonado.capability.IndexInfo;
 +import com.amazon.carbonado.cursor.ControllerCursor;
  import com.amazon.carbonado.cursor.EmptyCursor;
  import com.amazon.carbonado.cursor.LimitCursor;
  import com.amazon.carbonado.filter.AndFilter;
 @@ -329,6 +332,28 @@ class JDBCStorage<S extends Storable> extends StandardQueryFactory<S>          return new JDBCQuery(filter, values, ordering, hints);
      }
 +    static PreparedStatement prepareStatement(Connection con, String sql,
 +                                              Query.Controller controller)
 +        throws SQLException
 +    {
 +        PreparedStatement ps = con.prepareStatement(sql);
 +
 +        if (controller != null) {
 +            long timeout = controller.getTimeout();
 +            if (timeout >= 0) {
 +                TimeUnit unit = controller.getTimeoutUnit();
 +                if (unit != null) {
 +                    long seconds = unit.toSeconds(timeout);
 +                    int intSeconds = seconds <= 0 ? 1 :
 +                        (seconds <= Integer.MAX_VALUE ? ((int) seconds) : 0);
 +                    ps.setQueryTimeout(intSeconds);
 +                }
 +            }
 +        }
 +
 +        return ps;
 +    }
 +
      public S instantiate(ResultSet rs) throws SQLException {
          return (S) mInstanceFactory.instantiate(this, rs, FIRST_RESULT_INDEX);
      }
 @@ -668,12 +693,21 @@ class JDBCStorage<S extends Storable> extends StandardQueryFactory<S>              }
          }
 +        @Override
          public Cursor<S> fetch(FilterValues<S> values) throws FetchException {
 +            return fetch(values, null);
 +        }
 +
 +        @Override
 +        public Cursor<S> fetch(FilterValues<S> values, Query.Controller controller)
 +            throws FetchException
 +        {
              TransactionScope<JDBCTransaction> scope = mRepository.localTransactionScope();
              boolean forUpdate = scope.isForUpdate();
              Connection con = getConnection();
              try {
 -                PreparedStatement ps = con.prepareStatement(prepareSelect(values, forUpdate));
 +                PreparedStatement ps =
 +                    prepareStatement(con, prepareSelect(values, forUpdate), controller);
                  Integer fetchSize = mRepository.getFetchSize();
                  if (fetchSize != null) {
                      ps.setFetchSize(fetchSize);
 @@ -681,7 +715,8 @@ class JDBCStorage<S extends Storable> extends StandardQueryFactory<S>                  try {
                      setParameters(ps, values);
 -                    return new JDBCCursor<S>(JDBCStorage.this, scope, con, ps);
 +                    return ControllerCursor.apply
 +                        (new JDBCCursor<S>(JDBCStorage.this, scope, con, ps), controller);
                  } catch (Exception e) {
                      // in case of exception, close statement
                      try {
 @@ -706,6 +741,14 @@ class JDBCStorage<S extends Storable> extends StandardQueryFactory<S>          public Cursor<S> fetchSlice(FilterValues<S> values, long from, Long to)
              throws FetchException
          {
 +            return fetchSlice(values, from, to, null);
 +        }
 +
 +        @Override
 +        public Cursor<S> fetchSlice(FilterValues<S> values, long from, Long to,
 +                                    Query.Controller controller)
 +            throws FetchException
 +        {
              if (to != null && (to - from) <= 0) {
                  return EmptyCursor.the();
              }
 @@ -716,17 +759,17 @@ class JDBCStorage<S extends Storable> extends StandardQueryFactory<S>              switch (option) {
              case NOT_SUPPORTED: default:
 -                return super.fetchSlice(values, from, to);
 +                return super.fetchSlice(values, from, to, controller);
              case LIMIT_ONLY:
                  if (from > 0 || to == null) {
 -                    return super.fetchSlice(values, from, to);
 +                    return super.fetchSlice(values, from, to, controller);
                  }
                  select = prepareSelect(values, false);
                  select = mSupportStrategy.buildSelectWithSlice(select, false, true);
                  break;
              case OFFSET_ONLY:
                  if (from <= 0) {
 -                    return super.fetchSlice(values, from, to);
 +                    return super.fetchSlice(values, from, to, controller);
                  }
                  select = prepareSelect(values, false);
                  select = mSupportStrategy.buildSelectWithSlice(select, true, false);
 @@ -746,7 +789,7 @@ class JDBCStorage<S extends Storable> extends StandardQueryFactory<S>              Connection con = getConnection();
              try {
 -                PreparedStatement ps = con.prepareStatement(select);
 +                PreparedStatement ps = prepareStatement(con, select, controller);
                  Integer fetchSize = mRepository.getFetchSize();
                  if (fetchSize != null) {
                      ps.setFetchSize(fetchSize);
 @@ -760,7 +803,10 @@ class JDBCStorage<S extends Storable> extends StandardQueryFactory<S>                              switch (option) {
                              case OFFSET_ONLY:
                                  ps.setLong(psOrdinal, from);
 -                                Cursor<S> c = new JDBCCursor<S>(JDBCStorage.this, scope, con, ps);
 +                                Cursor<S> c =
 +                                    ControllerCursor.apply
 +                                    (new JDBCCursor<S>(JDBCStorage.this, scope, con, ps),
 +                                     controller);
                                  return new LimitCursor<S>(c, to - from);
                              case LIMIT_AND_OFFSET:
                                  ps.setLong(psOrdinal, to - from);
 @@ -782,7 +828,8 @@ class JDBCStorage<S extends Storable> extends StandardQueryFactory<S>                          ps.setLong(psOrdinal, to);
                      }
 -                    return new JDBCCursor<S>(JDBCStorage.this, scope, con, ps);
 +                    return ControllerCursor.apply
 +                        (new JDBCCursor<S>(JDBCStorage.this, scope, con, ps), controller);
                  } catch (Exception e) {
                      // in case of exception, close statement
                      try {
 @@ -805,9 +852,17 @@ class JDBCStorage<S extends Storable> extends StandardQueryFactory<S>          @Override
          public long count(FilterValues<S> values) throws FetchException {
 +            return count(values, null);
 +        }
 +
 +        @Override
 +        public long count(FilterValues<S> values, Query.Controller controller)
 +            throws FetchException
 +        {
              Connection con = getConnection();
              try {
 -                PreparedStatement ps = con.prepareStatement(prepareCount(values));
 +                PreparedStatement ps = prepareStatement(con, prepareCount(values), controller);
 +
                  try {
                      setParameters(ps, values);
                      ResultSet rs = ps.executeQuery();
 @@ -827,10 +882,12 @@ class JDBCStorage<S extends Storable> extends StandardQueryFactory<S>              }
          }
 +        @Override
          public Filter<S> getFilter() {
              return mFilter;
          }
 +        @Override
          public OrderingList<S> getOrdering() {
              return mOrdering;
          }
 @@ -846,6 +903,7 @@ class JDBCStorage<S extends Storable> extends StandardQueryFactory<S>              return true;
          }
 +        @Override
          public boolean printPlan(Appendable app, int indentLevel, FilterValues<S> values)
              throws IOException
          {
 @@ -862,7 +920,9 @@ class JDBCStorage<S extends Storable> extends StandardQueryFactory<S>          /**
           * Delete operation is included in cursor factory for ease of implementation.
           */
 -        int executeDelete(FilterValues<S> filterValues) throws PersistException {
 +        int executeDelete(FilterValues<S> filterValues, Query.Controller controller)
 +            throws PersistException
 +        {
              Connection con;
              try {
                  con = getConnection();
 @@ -870,7 +930,8 @@ class JDBCStorage<S extends Storable> extends StandardQueryFactory<S>                  throw e.toPersistException();
              }
              try {
 -                PreparedStatement ps = con.prepareStatement(prepareDelete(filterValues));
 +                PreparedStatement ps =
 +                    prepareStatement(con, prepareDelete(filterValues), controller);
                  try {
                      setParameters(ps, filterValues);
                      return ps.executeUpdate();
 @@ -976,13 +1037,18 @@ class JDBCStorage<S extends Storable> extends StandardQueryFactory<S>          @Override
          public void deleteAll() throws PersistException {
 +            deleteAll(null);
 +        }
 +
 +        @Override
 +        public void deleteAll(Controller controller) throws PersistException {
              if (mTriggerManager.getDeleteTrigger() != null) {
                  // Super implementation loads one at time and calls
                  // delete. This allows delete trigger to be invoked on each.
 -                super.deleteAll();
 +                super.deleteAll(controller);
              } else {
                  try {
 -                    ((Executor) executor()).executeDelete(getFilterValues());
 +                    ((Executor) executor()).executeDelete(getFilterValues(), controller);
                  } catch (RepositoryException e) {
                      throw e.toPersistException();
                  }
 diff --git a/src/main/java/com/amazon/carbonado/repo/jdbc/OracleExceptionTransformer.java b/src/main/java/com/amazon/carbonado/repo/jdbc/OracleExceptionTransformer.java index ad63941..ebaafd7 100644 --- a/src/main/java/com/amazon/carbonado/repo/jdbc/OracleExceptionTransformer.java +++ b/src/main/java/com/amazon/carbonado/repo/jdbc/OracleExceptionTransformer.java @@ -32,6 +32,8 @@ class OracleExceptionTransformer extends JDBCExceptionTransformer {      public static int DEADLOCK_DETECTED = 60;
 +    public static int PROCESSING_CANCELED = 1013;
 +
      @Override
      public boolean isUniqueConstraintError(SQLException e) {
          if (isConstraintError(e)) {
 @@ -63,4 +65,9 @@ class OracleExceptionTransformer extends JDBCExceptionTransformer {          }
          return false;
      }
 +
 +    @Override
 +    public boolean isTimeoutError(SQLException e) {
 +        return super.isTimeoutError(e) || e != null && PROCESSING_CANCELED == e.getErrorCode();
 +    }
  }
 diff --git a/src/main/java/com/amazon/carbonado/repo/logging/LoggingQuery.java b/src/main/java/com/amazon/carbonado/repo/logging/LoggingQuery.java index 08be6e3..5dc4d3b 100644 --- a/src/main/java/com/amazon/carbonado/repo/logging/LoggingQuery.java +++ b/src/main/java/com/amazon/carbonado/repo/logging/LoggingQuery.java @@ -165,6 +165,15 @@ class LoggingQuery<S extends Storable> implements Query<S> {      }
      @Override
 +    public Cursor<S> fetch(Controller controller) throws FetchException {
 +        Log log = mStorage.mLog;
 +        if (log.isEnabled()) {
 +            log.write("Query.fetch(controller) on " + this + ", controller: " + controller);
 +        }
 +        return mQuery.fetch(controller);
 +    }
 +
 +    @Override
      public Cursor<S> fetchSlice(long from, Long to) throws FetchException {
          Log log = mStorage.mLog;
          if (log.isEnabled()) {
 @@ -175,6 +184,16 @@ class LoggingQuery<S extends Storable> implements Query<S> {      }
      @Override
 +    public Cursor<S> fetchSlice(long from, Long to, Controller controller) throws FetchException {
 +        Log log = mStorage.mLog;
 +        if (log.isEnabled()) {
 +            log.write("Query.fetchSlice(start, to, controller) on " + this +
 +                      ", from: " + from + ", to: " + to + ", controller: " + controller);
 +        }
 +        return mQuery.fetchSlice(from, to, controller);
 +    }
 +
 +    @Override
      public <T extends S> Cursor<S> fetchAfter(T start) throws FetchException {
          Log log = mStorage.mLog;
          if (log.isEnabled()) {
 @@ -184,6 +203,18 @@ class LoggingQuery<S extends Storable> implements Query<S> {      }
      @Override
 +    public <T extends S> Cursor<S> fetchAfter(T start, Controller controller)
 +        throws FetchException
 +    {
 +        Log log = mStorage.mLog;
 +        if (log.isEnabled()) {
 +            log.write("Query.fetchAfter(start, controller) on " + this + ", start: " + start
 +                      + ", controller: " + controller);
 +        }
 +        return mQuery.fetchAfter(start, controller);
 +    }
 +
 +    @Override
      public S loadOne() throws FetchException {
          Log log = mStorage.mLog;
          if (log.isEnabled()) {
 @@ -193,6 +224,15 @@ class LoggingQuery<S extends Storable> implements Query<S> {      }
      @Override
 +    public S loadOne(Controller controller) throws FetchException {
 +        Log log = mStorage.mLog;
 +        if (log.isEnabled()) {
 +            log.write("Query.loadOne() on " + this + ", controller: " + controller);
 +        }
 +        return mQuery.loadOne(controller);
 +    }
 +
 +    @Override
      public S tryLoadOne() throws FetchException {
          Log log = mStorage.mLog;
          if (log.isEnabled()) {
 @@ -202,6 +242,15 @@ class LoggingQuery<S extends Storable> implements Query<S> {      }
      @Override
 +    public S tryLoadOne(Controller controller) throws FetchException {
 +        Log log = mStorage.mLog;
 +        if (log.isEnabled()) {
 +            log.write("Query.tryLoadOne(controller) on " + this + ", controller: " + controller);
 +        }
 +        return mQuery.tryLoadOne(controller);
 +    }
 +
 +    @Override
      public void deleteOne() throws PersistException {
          Log log = mStorage.mLog;
          if (log.isEnabled()) {
 @@ -211,6 +260,15 @@ class LoggingQuery<S extends Storable> implements Query<S> {      }
      @Override
 +    public void deleteOne(Controller controller) throws PersistException {
 +        Log log = mStorage.mLog;
 +        if (log.isEnabled()) {
 +            log.write("Query.deleteOne(controller) on " + this + ", controller: " + controller);
 +        }
 +        mQuery.deleteOne(controller);
 +    }
 +
 +    @Override
      public boolean tryDeleteOne() throws PersistException {
          Log log = mStorage.mLog;
          if (log.isEnabled()) {
 @@ -220,6 +278,15 @@ class LoggingQuery<S extends Storable> implements Query<S> {      }
      @Override
 +    public boolean tryDeleteOne(Controller controller) throws PersistException {
 +        Log log = mStorage.mLog;
 +        if (log.isEnabled()) {
 +            log.write("Query.tryDeleteOne(controller) on " + this + ", controller: " + controller);
 +        }
 +        return mQuery.tryDeleteOne(controller);
 +    }
 +
 +    @Override
      public void deleteAll() throws PersistException {
          Log log = mStorage.mLog;
          if (log.isEnabled()) {
 @@ -229,6 +296,15 @@ class LoggingQuery<S extends Storable> implements Query<S> {      }
      @Override
 +    public void deleteAll(Controller controller) throws PersistException {
 +        Log log = mStorage.mLog;
 +        if (log.isEnabled()) {
 +            log.write("Query.deleteAll(controller) on " + this + ", controller: " + controller);
 +        }
 +        mQuery.deleteAll(controller);
 +    }
 +
 +    @Override
      public long count() throws FetchException {
          Log log = mStorage.mLog;
          if (log.isEnabled()) {
 @@ -238,6 +314,15 @@ class LoggingQuery<S extends Storable> implements Query<S> {      }
      @Override
 +    public long count(Controller controller) throws FetchException {
 +        Log log = mStorage.mLog;
 +        if (log.isEnabled()) {
 +            log.write("Query.count(controller) on " + this + ", controller: " + controller);
 +        }
 +        return mQuery.count(controller);
 +    }
 +
 +    @Override
      public boolean exists() throws FetchException {
          Log log = mStorage.mLog;
          if (log.isEnabled()) {
 @@ -247,6 +332,15 @@ class LoggingQuery<S extends Storable> implements Query<S> {      }
      @Override
 +    public boolean exists(Controller controller) throws FetchException {
 +        Log log = mStorage.mLog;
 +        if (log.isEnabled()) {
 +            log.write("Query.exists(controller) on " + this + ", controller: " + controller);
 +        }
 +        return mQuery.exists(controller);
 +    }
 +
 +    @Override
      public boolean printNative() {
          return mQuery.printNative();
      }
 diff --git a/src/main/java/com/amazon/carbonado/repo/map/MapStorage.java b/src/main/java/com/amazon/carbonado/repo/map/MapStorage.java index c84d8e3..c9c17b6 100644 --- a/src/main/java/com/amazon/carbonado/repo/map/MapStorage.java +++ b/src/main/java/com/amazon/carbonado/repo/map/MapStorage.java @@ -47,6 +47,7 @@ import com.amazon.carbonado.Trigger;  import com.amazon.carbonado.capability.IndexInfo;
  import com.amazon.carbonado.cursor.ArraySortBuffer;
 +import com.amazon.carbonado.cursor.ControllerCursor;
  import com.amazon.carbonado.cursor.EmptyCursor;
  import com.amazon.carbonado.cursor.FilteredCursor;
  import com.amazon.carbonado.cursor.SingletonCursor;
 @@ -540,6 +541,10 @@ class MapStorage<S extends Storable>      }
      public long countAll() throws FetchException {
 +        return countAll(null);
 +    }
 +
 +    public long countAll(Query.Controller controller) throws FetchException {
          try {
              TransactionScope<MapTransaction> scope = mRepo.localTransactionScope();
              MapTransaction txn = scope.getTxn();
 @@ -578,9 +583,20 @@ class MapStorage<S extends Storable>          }
      }
 +    public Cursor<S> fetchAll(Query.Controller controller) throws FetchException {
 +        return ControllerCursor.apply(fetchAll(), controller);
 +    }
 +
      public Cursor<S> fetchOne(StorableIndex<S> index, Object[] identityValues)
          throws FetchException
      {
 +        return fetchOne(index, identityValues, null);
 +    }
 +
 +    public Cursor<S> fetchOne(StorableIndex<S> index, Object[] identityValues,
 +                              Query.Controller controller)
 +        throws FetchException
 +    {
          try {
              S key = prepare();
              for (int i=0; i<identityValues.length; i++) {
 @@ -643,6 +659,12 @@ class MapStorage<S extends Storable>          return null;
      }
 +    public Cursor<S> fetchFromIndexEntryQuery(StorableIndex<S> index, Query<?> indexEntryQuery,
 +                                              Query.Controller controller)
 +    {
 +        return null;
 +    }
 +
      public Cursor<S> fetchSubset(StorableIndex<S> index,
                                   Object[] identityValues,
                                   BoundaryType rangeStartBoundary,
 @@ -770,6 +792,28 @@ class MapStorage<S extends Storable>          return cursor;
      }
 +    public Cursor<S> fetchSubset(StorableIndex<S> index,
 +                                 Object[] identityValues,
 +                                 BoundaryType rangeStartBoundary,
 +                                 Object rangeStartValue,
 +                                 BoundaryType rangeEndBoundary,
 +                                 Object rangeEndValue,
 +                                 boolean reverseRange,
 +                                 boolean reverseOrder,
 +                                 Query.Controller controller)
 +        throws FetchException
 +    {
 +        return ControllerCursor.apply(fetchSubset(index,
 +                                                  identityValues,
 +                                                  rangeStartBoundary,
 +                                                  rangeStartValue,
 +                                                  rangeEndBoundary,
 +                                                  rangeEndValue,
 +                                                  reverseRange,
 +                                                  reverseOrder),
 +                                      controller);
 +    }
 +
      private List<OrderedProperty<S>> createPkPropList() {
          return new ArrayList<OrderedProperty<S>>(mInfo.getPrimaryKey().getProperties());
      }
 @@ -806,6 +850,11 @@ class MapStorage<S extends Storable>          return new ArraySortBuffer<S>();
      }
 +    public SortBuffer<S> createSortBuffer(Query.Controller controller) {
 +        // ArraySortBuffer doesn't support controller.
 +        return new ArraySortBuffer<S>();
 +    }
 +
      public static interface InstanceFactory {
          Storable instantiate(DelegateSupport support);
      }
 diff --git a/src/main/java/com/amazon/carbonado/repo/sleepycat/BDBStorage.java b/src/main/java/com/amazon/carbonado/repo/sleepycat/BDBStorage.java index f4d8f24..c0f882e 100644 --- a/src/main/java/com/amazon/carbonado/repo/sleepycat/BDBStorage.java +++ b/src/main/java/com/amazon/carbonado/repo/sleepycat/BDBStorage.java @@ -45,6 +45,7 @@ import com.amazon.carbonado.UniqueConstraintException;  import com.amazon.carbonado.capability.IndexInfo;
 +import com.amazon.carbonado.cursor.ControllerCursor;
  import com.amazon.carbonado.cursor.EmptyCursor;
  import com.amazon.carbonado.cursor.MergeSortBuffer;
  import com.amazon.carbonado.cursor.SingletonCursor;
 @@ -263,22 +264,45 @@ abstract class BDBStorage<Txn, S extends Storable> implements Storage<S>, Storag          return new MergeSortBuffer<S>();
      }
 +    public SortBuffer<S> createSortBuffer(Query.Controller controller) {
 +        return new MergeSortBuffer<S>(controller);
 +    }
 +
      public long countAll() throws FetchException {
          // Return -1 to indicate default algorithm should be used.
          return -1;
      }
 +    public long countAll(Query.Controller controller) throws FetchException {
 +        // Return -1 to indicate default algorithm should be used.
 +        return -1;
 +    }
 +
      public Cursor<S> fetchAll() throws FetchException {
 +        return fetchAll(null);
 +    }
 +
 +    public Cursor<S> fetchAll(Query.Controller controller) throws FetchException {
          return fetchSubset(null, null,
                             BoundaryType.OPEN, null,
                             BoundaryType.OPEN, null,
 -                           false, false);
 +                           false, false,
 +                           controller);
      }
      public Cursor<S> fetchOne(StorableIndex<S> index,
                                Object[] identityValues)
          throws FetchException
      {
 +        return fetchOne(index, identityValues, null);
 +    }
 +
 +    public Cursor<S> fetchOne(StorableIndex<S> index,
 +                              Object[] identityValues,
 +                              Query.Controller controller)
 +        throws FetchException
 +    {
 +        // Note: Controller is never called.
          byte[] key = mStorableCodec.encodePrimaryKey(identityValues);
          byte[] value = mRawSupport.tryLoad(null, key);
          if (value == null) {
 @@ -296,6 +320,13 @@ abstract class BDBStorage<Txn, S extends Storable> implements Storage<S>, Storag          throw new UnsupportedOperationException();
      }
 +    public Cursor<S> fetchFromIndexEntryQuery(StorableIndex<S> index, Query<?> indexEntryQuery,
 +                                              Query.Controller controller)
 +    {
 +        // This method should never be called since null was returned by indexEntryQuery.
 +        throw new UnsupportedOperationException();
 +    }
 +
      public Cursor<S> fetchSubset(StorableIndex<S> index,
                                   Object[] identityValues,
                                   BoundaryType rangeStartBoundary,
 @@ -378,6 +409,7 @@ abstract class BDBStorage<Txn, S extends Storable> implements Storage<S>, Storag                       getPrimaryDatabase());
                  cursor.open();
 +
                  return cursor;
              } catch (Exception e) {
                  throw toFetchException(e);
 @@ -387,6 +419,28 @@ abstract class BDBStorage<Txn, S extends Storable> implements Storage<S>, Storag          }
      }
 +    public Cursor<S> fetchSubset(StorableIndex<S> index,
 +                                 Object[] identityValues,
 +                                 BoundaryType rangeStartBoundary,
 +                                 Object rangeStartValue,
 +                                 BoundaryType rangeEndBoundary,
 +                                 Object rangeEndValue,
 +                                 boolean reverseRange,
 +                                 boolean reverseOrder,
 +                                 Query.Controller controller)
 +        throws FetchException
 +    {
 +        return ControllerCursor.apply(fetchSubset(index,
 +                                                  identityValues,
 +                                                  rangeStartBoundary,
 +                                                  rangeStartValue,
 +                                                  rangeEndBoundary,
 +                                                  rangeEndValue,
 +                                                  reverseRange,
 +                                                  reverseOrder),
 +                                      controller);
 +    }
 +
      private byte[] createBound(Object[] exactValues, byte[] exactKey, Object rangeValue,
                                 StorableCodec<S> codec) {
          Object[] values = {rangeValue};
 | 
