From 121886bc0c92389610408e3b415abb992ad8a212 Mon Sep 17 00:00:00 2001 From: "Brian S. O'Neill" Date: Wed, 4 May 2011 00:20:02 +0000 Subject: Add support for Query controller and timeouts; remove vestigial support for interrupts. --- src/main/java/com/amazon/carbonado/Query.java | 310 ++++++++++++++++++++- .../amazon/carbonado/cursor/ControllerCursor.java | 99 +++++++ .../amazon/carbonado/cursor/FilteredCursor.java | 13 +- .../com/amazon/carbonado/cursor/GroupedCursor.java | 13 +- .../amazon/carbonado/cursor/MergeSortBuffer.java | 84 +++++- .../carbonado/cursor/MultiTransformedCursor.java | 11 - .../com/amazon/carbonado/cursor/SortedCursor.java | 11 - .../amazon/carbonado/cursor/TransformedCursor.java | 12 +- .../com/amazon/carbonado/qe/AbstractQuery.java | 41 +++ .../amazon/carbonado/qe/AbstractQueryExecutor.java | 41 +++ .../carbonado/qe/DelegatedQueryExecutor.java | 17 ++ .../java/com/amazon/carbonado/qe/EmptyQuery.java | 55 ++++ .../amazon/carbonado/qe/FilteredQueryExecutor.java | 7 + .../amazon/carbonado/qe/FullScanQueryExecutor.java | 31 +++ .../amazon/carbonado/qe/IndexedQueryExecutor.java | 59 +++- .../amazon/carbonado/qe/IterableQueryExecutor.java | 6 + .../amazon/carbonado/qe/JoinedQueryExecutor.java | 33 ++- .../com/amazon/carbonado/qe/KeyQueryExecutor.java | 20 ++ .../com/amazon/carbonado/qe/QueryExecutor.java | 24 ++ .../amazon/carbonado/qe/SortedQueryExecutor.java | 41 +++ .../com/amazon/carbonado/qe/StandardQuery.java | 48 +++- .../amazon/carbonado/qe/UnionQueryExecutor.java | 9 +- .../carbonado/raw/GenericStorableCodecFactory.java | 3 +- .../carbonado/repo/indexed/IndexedStorage.java | 44 +++ .../carbonado/repo/indexed/ManagedIndex.java | 18 +- .../repo/jdbc/H2ExceptionTransformer.java | 8 +- .../repo/jdbc/JDBCExceptionTransformer.java | 20 ++ .../amazon/carbonado/repo/jdbc/JDBCStorage.java | 92 +++++- .../repo/jdbc/OracleExceptionTransformer.java | 7 + .../carbonado/repo/logging/LoggingQuery.java | 94 +++++++ .../com/amazon/carbonado/repo/map/MapStorage.java | 49 ++++ .../carbonado/repo/sleepycat/BDBStorage.java | 56 +++- 32 files changed, 1273 insertions(+), 103 deletions(-) create mode 100644 src/main/java/com/amazon/carbonado/cursor/ControllerCursor.java diff --git a/src/main/java/com/amazon/carbonado/Query.java b/src/main/java/com/amazon/carbonado/Query.java index a660c1a..1780ebd 100644 --- a/src/main/java/com/amazon/carbonado/Query.java +++ b/src/main/java/com/amazon/carbonado/Query.java @@ -18,7 +18,13 @@ package com.amazon.carbonado; +import java.io.Closeable; import java.io.IOException; +import java.io.Serializable; + +import java.util.concurrent.TimeUnit; + +import java.util.concurrent.atomic.AtomicLongFieldUpdater; import com.amazon.carbonado.filter.Filter; import com.amazon.carbonado.filter.FilterValues; @@ -286,6 +292,20 @@ public interface Query { */ Cursor fetch() throws FetchException; + /** + * Fetches results for this query. If any updates or deletes might be + * performed on the results, consider enclosing the fetch in a + * transaction. This allows the isolation level and "for update" mode to be + * adjusted. Some repositories might otherwise deadlock. + * + * @param controller optional controller which can abort query operation + * @return fetch results + * @throws IllegalStateException if any blank parameters in this query + * @throws FetchException if storage layer throws an exception + * @see Repository#enterTransaction(IsolationLevel) + */ + Cursor fetch(Controller controller) throws FetchException; + /** * Fetches a slice of results for this query, as defined by a numerical * range. A slice can be used to limit the number of results from a @@ -303,6 +323,24 @@ public interface Query { */ Cursor fetchSlice(long from, Long to) throws FetchException; + /** + * Fetches a slice of results for this query, as defined by a numerical + * range. A slice can be used to limit the number of results from a + * query. It is strongly recommended that the query be given a total {@link + * #orderBy ordering} in order for the slice results to be deterministic. + * + * @param from zero-based {@code from} record number, inclusive + * @param to optional zero-based {@code to} record number, exclusive + * @param controller optional controller which can abort query operation + * @return fetch results + * @throws IllegalStateException if any blank parameters in this query + * @throws IllegalArgumentException if {@code from} is negative or if + * {@code from} is more than {@code to} + * @throws FetchException if storage layer throws an exception + * @since 1.2 + */ + Cursor fetchSlice(long from, Long to, Controller controller) throws FetchException; + /** * Fetches results for this query after a given starting point, which is * useful for re-opening a cursor. This is only effective when query has @@ -325,6 +363,29 @@ public interface Query { */ Cursor fetchAfter(T start) throws FetchException; + /** + * Fetches results for this query after a given starting point, which is + * useful for re-opening a cursor. This is only effective when query has + * been given an explicit {@link #orderBy ordering}. If not a total + * ordering, then cursor may start at an earlier position. + * + *

Note: This method can be very expensive to call repeatedly, if the + * query needs to perform a sort operation. Ideally, the query ordering + * should match the natural ordering of an index or key. + * + *

Calling {@code fetchAfter(s)} is equivalent to calling {@code + * after(s).fetch()}. + * + * @param start storable to attempt to start after; if null, fetch all results + * @param controller optional controller which can abort query operation + * @return fetch results + * @throws IllegalStateException if any blank parameters in this query + * @throws FetchException if storage layer throws an exception + * @see Repository#enterTransaction(IsolationLevel) + * @see #after + */ + Cursor fetchAfter(T start, Controller controller) throws FetchException; + /** * Attempts to load exactly one matching object. If the number of matching * records is zero or exceeds one, then an exception is thrown instead. @@ -338,8 +399,21 @@ public interface Query { S loadOne() throws FetchException; /** - * May return null if nothing found. Throws exception if record count is - * more than one. + * Attempts to load exactly one matching object. If the number of matching + * records is zero or exceeds one, then an exception is thrown instead. + * + * @param controller optional controller which can abort query operation + * @return a single fetched object + * @throws IllegalStateException if any blank parameters in this query + * @throws FetchNoneException if no matching record found + * @throws FetchMultipleException if more than one matching record found + * @throws FetchException if storage layer throws an exception + */ + S loadOne(Controller controller) throws FetchException; + + /** + * Tries to load one record, but returns null if nothing was found. Throws + * exception if record count is more than one. * * @return null or a single fetched object * @throws IllegalStateException if any blank parameters in this query @@ -348,6 +422,18 @@ public interface Query { */ S tryLoadOne() throws FetchException; + /** + * Tries to load one record, but returns null if nothing was found. Throws + * exception if record count is more than one. + * + * @param controller optional controller which can abort query operation + * @return null or a single fetched object + * @throws IllegalStateException if any blank parameters in this query + * @throws FetchMultipleException if more than one matching record found + * @throws FetchException if storage layer throws an exception + */ + S tryLoadOne(Controller controller) throws FetchException; + /** * Deletes one matching object. If the number of matching records is zero or * exceeds one, then no delete occurs, and an exception is thrown instead. @@ -359,6 +445,18 @@ public interface Query { */ void deleteOne() throws PersistException; + /** + * Deletes one matching object. If the number of matching records is zero or + * exceeds one, then no delete occurs, and an exception is thrown instead. + * + * @param controller optional controller which can abort query operation + * @throws IllegalStateException if any blank parameters in this query + * @throws PersistNoneException if no matching record found + * @throws PersistMultipleException if more than one record matches + * @throws PersistException if storage layer throws an exception + */ + void deleteOne(Controller controller) throws PersistException; + /** * Deletes zero or one matching objects. If the number of matching records * exceeds one, then no delete occurs, and an exception is thrown instead. @@ -370,6 +468,18 @@ public interface Query { */ boolean tryDeleteOne() throws PersistException; + /** + * Deletes zero or one matching objects. If the number of matching records + * exceeds one, then no delete occurs, and an exception is thrown instead. + * + * @param controller optional controller which can abort query operation + * @return true if record existed and was deleted, or false if no match + * @throws IllegalStateException if any blank parameters in this query + * @throws PersistMultipleException if more than one record matches + * @throws PersistException if storage layer throws an exception + */ + boolean tryDeleteOne(Controller controller) throws PersistException; + /** * Deletes zero or more matching objects. There is no guarantee that * deleteAll is an atomic operation. If atomic behavior is desired, wrap @@ -380,6 +490,17 @@ public interface Query { */ void deleteAll() throws PersistException; + /** + * Deletes zero or more matching objects. There is no guarantee that + * deleteAll is an atomic operation. If atomic behavior is desired, wrap + * the call in a transaction scope. + * + * @param controller optional controller which can abort query operation + * @throws IllegalStateException if any blank parameters in this query + * @throws PersistException if storage layer throws an exception + */ + void deleteAll(Controller controller) throws PersistException; + /** * Returns a count of all results matched by this query. Even though no * results are explicitly fetched, this method may still be expensive to @@ -391,6 +512,18 @@ public interface Query { */ long count() throws FetchException; + /** + * Returns a count of all results matched by this query. Even though no + * results are explicitly fetched, this method may still be expensive to + * call. The actual performance will vary by repository and available indexes. + * + * @param controller optional controller which can abort query operation + * @return count of matches + * @throws IllegalStateException if any blank parameters in this query + * @throws FetchException if storage layer throws an exception + */ + long count(Controller controller) throws FetchException; + /** * Returns true if any results are matched by this query. * @@ -401,6 +534,17 @@ public interface Query { */ boolean exists() throws FetchException; + /** + * Returns true if any results are matched by this query. + * + * @param controller optional controller which can abort query operation + * @return true if any matches + * @throws IllegalStateException if any blank parameters in this query + * @throws FetchException if storage layer throws an exception + * @since 1.2 + */ + boolean exists(Controller controller) throws FetchException; + /** * Print the native query to standard out, which is useful for performance * analysis. Not all repositories have a native query format. An example @@ -469,4 +613,166 @@ public interface Query { * Returns a description of the query filter and any other arguments. */ String toString(); + + /** + * Controller instance can be used to abort query operations. + * + *

Example:

+     * Storage<UserInfo> users = ...
+     * long count = users.query("name = ?").count(Query.Timeout.seconds(10));
+     * 
+ */ + public static interface Controller extends Serializable, Closeable { + /** + * Returns a non-negative value if controller imposes an absolute upper + * bound on query execution time. + */ + public long getTimeout(); + + /** + * Returns the unit for the timeout, if applicable. + */ + public TimeUnit getTimeoutUnit(); + + /** + * Called by query when it begins, possibly multiple times. Implementation + * is required to be idempotent and ignore multiple invocations. + */ + public void begin(); + + /** + * Periodically called by query to determine if it should continue. + */ + public void continueCheck() throws FetchException; + + /** + * Always called by query when finished, even when it fails. Implementation + * is required to be idempotent and ignore multiple invocations. + */ + public void close(); + } + + /** + * Timeout controller, for aborting long running queries. One instance is + * good for one timeout. The instance can be shared by multiple queries, if + * they are part of a single logical operation. + * + *

The timeout applies to the entire duration of fetching results, not + * just the time spent between individual fetches. A caller which is slowly + * processing results can timeout. More sophisticated timeouts can be + * implemented using custom Controller implementations. + */ + public static final class Timeout implements Controller { + private static final long serialVersionUID = 1; + + private static final AtomicLongFieldUpdater endUpdater = + AtomicLongFieldUpdater.newUpdater(Timeout.class, "mEndNanos"); + + /** + * Return a new Timeout in nanoseconds. + */ + public static Timeout nanos(long timeout) { + return new Timeout(timeout, TimeUnit.NANOSECONDS); + } + + /** + * Return a new Timeout in microseconds. + */ + public static Timeout micros(long timeout) { + return new Timeout(timeout, TimeUnit.MICROSECONDS); + } + + /** + * Return a new Timeout in milliseconds. + */ + public static Timeout millis(long timeout) { + return new Timeout(timeout, TimeUnit.MILLISECONDS); + } + + /** + * Return a new Timeout in seconds. + */ + public static Timeout seconds(long timeout) { + return new Timeout(timeout, TimeUnit.SECONDS); + } + + /** + * Return a new Timeout in minutes. + */ + + public static Timeout minutes(long timeout) { + return new Timeout(timeout, TimeUnit.MINUTES); + } + + /** + * Return a new Timeout in hours. + */ + public static Timeout hours(long timeout) { + return new Timeout(timeout, TimeUnit.HOURS); + } + + private final long mTimeout; + private final TimeUnit mUnit; + + private volatile transient long mEndNanos; + + public Timeout(long timeout, TimeUnit unit) { + if (timeout < 0) { + throw new IllegalArgumentException("Timeout cannot be negative: " + timeout); + } + if (unit == null && timeout != 0) { + throw new IllegalArgumentException + ("TimeUnit cannot be null if timeout is non-zero: " + timeout); + } + mTimeout = timeout; + mUnit = unit; + } + + public long getTimeout() { + return mTimeout; + } + + public TimeUnit getTimeoutUnit() { + return mUnit; + } + + @Override + public void begin() { + long end = System.nanoTime() + mUnit.toNanos(mTimeout); + if (end == 0) { + // Handle rare case to ensure atomic compare and set always + // works the first time, supporting idempotent calls to this + // method. + end = 1; + } + endUpdater.compareAndSet(this, 0, end); + } + + @Override + public void continueCheck() throws FetchTimeoutException { + long end = mEndNanos; + + if (end == 0) { + // Begin was not called, in violation of how the Controller + // must be used. Be lenient and begin now. + begin(); + end = mEndNanos; + } + + // Subtract to support modulo comparison. + if ((System.nanoTime() - end) >= 0) { + throw new FetchTimeoutException("Timed out: " + mTimeout + ' ' + mUnit); + } + } + + @Override + public void close() { + // Nothing to do. + } + + @Override + public String toString() { + return "Query.Timeout {timeout=" + mTimeout + ", unit=" + mUnit + '}'; + } + } } diff --git a/src/main/java/com/amazon/carbonado/cursor/ControllerCursor.java b/src/main/java/com/amazon/carbonado/cursor/ControllerCursor.java new file mode 100644 index 0000000..da682d2 --- /dev/null +++ b/src/main/java/com/amazon/carbonado/cursor/ControllerCursor.java @@ -0,0 +1,99 @@ +/* + * Copyright 2011 Amazon Technologies, Inc. or its affiliates. + * Amazon, Amazon.com and Carbonado are trademarks or registered trademarks + * of Amazon Technologies, Inc. or its affiliates. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.amazon.carbonado.cursor; + +import com.amazon.carbonado.Cursor; +import com.amazon.carbonado.FetchException; +import com.amazon.carbonado.Query; + +/** + * Wraps another cursor and periodically calls a {@link com.amazon.carbonado.Query.Controller controller}. + * + * @author Brian S O'Neill + */ +public class ControllerCursor extends AbstractCursor { + /** + * Returns a ControllerCursor depending on whether a controller instance is + * passed in or not. + * + * @param controller optional controller which can abort query operation + * @throws IllegalArgumentException if source is null + */ + public static Cursor apply(Cursor source, Query.Controller controller) { + return controller == null ? source : new ControllerCursor(source, controller); + } + + private final Cursor mSource; + private final Query.Controller mController; + + private byte mCount; + + /** + * @param controller required controller which can abort query operation + * @throws IllegalArgumentException if either argument is null + */ + private ControllerCursor(Cursor source, Query.Controller controller) { + if (source == null) { + throw new IllegalArgumentException("Source is null"); + } + if (controller == null) { + throw new IllegalArgumentException("Controller is null"); + } + mSource = source; + mController = controller; + controller.begin(); + } + + public boolean hasNext() throws FetchException { + if (mSource.hasNext()) { + continueCheck(); + return true; + } + return false; + } + + public S next() throws FetchException { + S next = mSource.next(); + continueCheck(); + return next; + } + + public void close() throws FetchException { + try { + mSource.close(); + } finally { + mController.close(); + } + } + + private void continueCheck() throws FetchException { + if (++mCount == 0) { + try { + mController.continueCheck(); + } catch (FetchException e) { + try { + close(); + } catch (FetchException e2) { + // Ignore. + } + throw e; + } + } + } +} diff --git a/src/main/java/com/amazon/carbonado/cursor/FilteredCursor.java b/src/main/java/com/amazon/carbonado/cursor/FilteredCursor.java index ad21667..4cf8c21 100644 --- a/src/main/java/com/amazon/carbonado/cursor/FilteredCursor.java +++ b/src/main/java/com/amazon/carbonado/cursor/FilteredCursor.java @@ -22,7 +22,6 @@ import java.util.NoSuchElementException; import com.amazon.carbonado.Cursor; import com.amazon.carbonado.FetchException; -import com.amazon.carbonado.FetchInterruptedException; import com.amazon.carbonado.Storable; import com.amazon.carbonado.filter.Filter; @@ -114,14 +113,12 @@ public abstract class FilteredCursor extends AbstractCursor { return true; } try { - int count = 0; while (mCursor.hasNext()) { S next = mCursor.next(); if (isAllowed(next)) { mNext = next; return true; } - interruptCheck(++count); } } catch (NoSuchElementException e) { } catch (FetchException e) { @@ -165,10 +162,9 @@ public abstract class FilteredCursor extends AbstractCursor { try { int count = 0; while (--amount >= 0 && hasNext()) { - interruptCheck(++count); + ++count; mNext = null; } - return count; } catch (FetchException e) { try { @@ -179,11 +175,4 @@ public abstract class FilteredCursor extends AbstractCursor { throw e; } } - - private void interruptCheck(int count) throws FetchException { - if ((count & ~0xff) == 0 && Thread.interrupted()) { - close(); - throw new FetchInterruptedException(); - } - } } diff --git a/src/main/java/com/amazon/carbonado/cursor/GroupedCursor.java b/src/main/java/com/amazon/carbonado/cursor/GroupedCursor.java index 49ca84e..3053cd5 100644 --- a/src/main/java/com/amazon/carbonado/cursor/GroupedCursor.java +++ b/src/main/java/com/amazon/carbonado/cursor/GroupedCursor.java @@ -23,7 +23,6 @@ import java.util.NoSuchElementException; import com.amazon.carbonado.Cursor; import com.amazon.carbonado.FetchException; -import com.amazon.carbonado.FetchInterruptedException; /** * Abstract cursor for aggregation and finding distinct data. The source cursor @@ -124,7 +123,6 @@ public abstract class GroupedCursor extends AbstractCursor { } try { - int count = 0; if (mCursor.hasNext()) { if (mGroupLeader == null) { beginGroup(mGroupLeader = mCursor.next()); @@ -143,8 +141,6 @@ public abstract class GroupedCursor extends AbstractCursor { return true; } } - - interruptCheck(++count); } G aggregate = finishGroup(); @@ -206,7 +202,7 @@ public abstract class GroupedCursor extends AbstractCursor { try { int count = 0; while (--amount >= 0 && hasNext()) { - interruptCheck(++count); + ++count; mNextAggregate = null; } @@ -220,11 +216,4 @@ public abstract class GroupedCursor extends AbstractCursor { throw e; } } - - private void interruptCheck(int count) throws FetchException { - if ((count & ~0xff) == 0 && Thread.interrupted()) { - close(); - throw new FetchInterruptedException(); - } - } } diff --git a/src/main/java/com/amazon/carbonado/cursor/MergeSortBuffer.java b/src/main/java/com/amazon/carbonado/cursor/MergeSortBuffer.java index aae5243..0231012 100644 --- a/src/main/java/com/amazon/carbonado/cursor/MergeSortBuffer.java +++ b/src/main/java/com/amazon/carbonado/cursor/MergeSortBuffer.java @@ -38,7 +38,9 @@ import java.util.List; import java.util.NoSuchElementException; import java.util.PriorityQueue; +import com.amazon.carbonado.FetchException; import com.amazon.carbonado.FetchInterruptedException; +import com.amazon.carbonado.Query; import com.amazon.carbonado.Storable; import com.amazon.carbonado.Storage; import com.amazon.carbonado.SupportException; @@ -122,6 +124,7 @@ public class MergeSortBuffer extends AbstractCollection private final String mTempDir; private final int mMaxArrayCapacity; + private final Query.Controller mController; private Preparer mPreparer; @@ -143,6 +146,15 @@ public class MergeSortBuffer extends AbstractCollection this(null, TEMP_DIR, MAX_ARRAY_CAPACITY); } + /** + * @since 1.2 + * + * @param controller optional controller which can abort query operation + */ + public MergeSortBuffer(Query.Controller controller) { + this(null, TEMP_DIR, MAX_ARRAY_CAPACITY, controller); + } + /** * @param storage storage for elements; if null use first Storable to * prepare reloaded Storables @@ -151,6 +163,15 @@ public class MergeSortBuffer extends AbstractCollection this(storage, TEMP_DIR, MAX_ARRAY_CAPACITY); } + /** + * @param storage storage for elements; if null use first Storable to + * prepare reloaded Storables + * @param controller optional controller which can abort query operation + */ + public MergeSortBuffer(Storage storage, Query.Controller controller) { + this(storage, TEMP_DIR, MAX_ARRAY_CAPACITY, controller); + } + /** * @param storage storage for elements; if null use first Storable to * prepare reloaded Storables @@ -170,6 +191,22 @@ public class MergeSortBuffer extends AbstractCollection */ @SuppressWarnings("unchecked") public MergeSortBuffer(Storage storage, String tempDir, int maxArrayCapacity) { + this(storage, tempDir, maxArrayCapacity, null); + } + + /** + * @param storage storage for elements; if null use first Storable to + * prepare reloaded Storables + * @param tempDir directory to store temp files for merging, or null for default + * @param maxArrayCapacity maximum amount of storables to keep in an array + * before serializing to a file + * @param controller optional controller which can abort query operation + * @throws IllegalArgumentException if storage is null + */ + @SuppressWarnings("unchecked") + public MergeSortBuffer(Storage storage, String tempDir, int maxArrayCapacity, + Query.Controller controller) + { mTempDir = tempDir; mMaxArrayCapacity = maxArrayCapacity; @@ -179,6 +216,10 @@ public class MergeSortBuffer extends AbstractCollection int cap = Math.min(MIN_ARRAY_CAPACITY, maxArrayCapacity); mElements = (S[]) new Storable[cap]; + + if ((mController = controller) != null) { + controller.begin(); + } } public void prepare(Comparator comparator) { @@ -231,10 +272,10 @@ public class MergeSortBuffer extends AbstractCollection if (mFilesInUse.size() < (MAX_OPEN_FILE_COUNT - 1)) { mFilesInUse.add(raf); - int count = 0; + byte count = 0; for (S element : mElements) { - // Check every so often if interrupted. - interruptCheck(++count); + // Check every so often if should continue. + continueCheck(++count); element.writeTo(out); } } else { @@ -274,11 +315,11 @@ public class MergeSortBuffer extends AbstractCollection // as well as error out earlier, should the disk be full. raf.setLength(mergedLength); - int count = 0; + byte count = 0; Iterator it = iterator(filesToMerge); while (it.hasNext()) { - // Check every so often if interrupted. - interruptCheck(++count); + // Check every so often if should continue. + continueCheck(++count); S element = it.next(); element.writeTo(out); } @@ -368,9 +409,16 @@ public class MergeSortBuffer extends AbstractCollection } public void close() { - clear(); - if (mWorkFilePool != null) { - mWorkFilePool.unregisterWorkFileUser(this); + try { + clear(); + if (mWorkFilePool != null) { + mWorkFilePool.unregisterWorkFileUser(this); + } + } finally { + Query.Controller controller = mController; + if (controller != null) { + controller.close(); + } } } @@ -386,10 +434,20 @@ public class MergeSortBuffer extends AbstractCollection return comparator; } - private void interruptCheck(int count) { - if ((count & ~0xff) == 0 && (mStop || Thread.interrupted())) { - close(); - throw new UndeclaredThrowableException(new FetchInterruptedException()); + private void continueCheck(byte count) { + if (count == 0) { + try { + Query.Controller controller = mController; + if (controller != null) { + controller.continueCheck(); + } + if (mStop) { + throw new FetchInterruptedException("Shutting down"); + } + } catch (FetchException e) { + close(); + throw new UndeclaredThrowableException(e); + } } } diff --git a/src/main/java/com/amazon/carbonado/cursor/MultiTransformedCursor.java b/src/main/java/com/amazon/carbonado/cursor/MultiTransformedCursor.java index 8a1bd48..814a3ba 100644 --- a/src/main/java/com/amazon/carbonado/cursor/MultiTransformedCursor.java +++ b/src/main/java/com/amazon/carbonado/cursor/MultiTransformedCursor.java @@ -22,7 +22,6 @@ import java.util.NoSuchElementException; import com.amazon.carbonado.Cursor; import com.amazon.carbonado.FetchException; -import com.amazon.carbonado.FetchInterruptedException; /** * Abstract cursor which wraps another cursor and transforms each storable @@ -72,7 +71,6 @@ public abstract class MultiTransformedCursor extends AbstractCursor { mNextCursor.close(); mNextCursor = null; } - int count = 0; while (mCursor.hasNext()) { Cursor nextCursor = transform(mCursor.next()); if (nextCursor != null) { @@ -82,7 +80,6 @@ public abstract class MultiTransformedCursor extends AbstractCursor { } nextCursor.close(); } - interruptCheck(++count); } } catch (NoSuchElementException e) { } catch (FetchException e) { @@ -129,7 +126,6 @@ public abstract class MultiTransformedCursor extends AbstractCursor { if ((amount -= chunk) <= 0) { break; } - interruptCheck(count); } return count; @@ -142,11 +138,4 @@ public abstract class MultiTransformedCursor extends AbstractCursor { throw e; } } - - private void interruptCheck(int count) throws FetchException { - if ((count & ~0xff) == 0 && Thread.interrupted()) { - close(); - throw new FetchInterruptedException(); - } - } } diff --git a/src/main/java/com/amazon/carbonado/cursor/SortedCursor.java b/src/main/java/com/amazon/carbonado/cursor/SortedCursor.java index 5b5f9b5..e054137 100644 --- a/src/main/java/com/amazon/carbonado/cursor/SortedCursor.java +++ b/src/main/java/com/amazon/carbonado/cursor/SortedCursor.java @@ -32,7 +32,6 @@ import org.cojen.util.BeanProperty; import org.cojen.classfile.TypeDesc; import com.amazon.carbonado.FetchException; -import com.amazon.carbonado.FetchInterruptedException; import com.amazon.carbonado.Cursor; import com.amazon.carbonado.Storable; @@ -388,12 +387,7 @@ public class SortedCursor extends AbstractCursor { fill: { if (matcher == null) { // Buffer up entire results and sort. - int count = 0; while (cursor.hasNext()) { - // Check every so often if interrupted. - if ((++count & ~0xff) == 0 && Thread.interrupted()) { - throw new FetchInterruptedException(); - } buffer.add(cursor.next()); } break fill; @@ -413,13 +407,8 @@ public class SortedCursor extends AbstractCursor { } buffer.add(chunkStart); - int count = 1; while (cursor.hasNext()) { - // Check every so often if interrupted. - if ((++count & ~0xff) == 0 && Thread.interrupted()) { - throw new FetchInterruptedException(); - } S next = cursor.next(); if (matcher.compare(chunkStart, next) != 0) { // Save for reading next chunk later. diff --git a/src/main/java/com/amazon/carbonado/cursor/TransformedCursor.java b/src/main/java/com/amazon/carbonado/cursor/TransformedCursor.java index c05e0fc..bd41cef 100644 --- a/src/main/java/com/amazon/carbonado/cursor/TransformedCursor.java +++ b/src/main/java/com/amazon/carbonado/cursor/TransformedCursor.java @@ -22,7 +22,6 @@ import java.util.NoSuchElementException; import com.amazon.carbonado.Cursor; import com.amazon.carbonado.FetchException; -import com.amazon.carbonado.FetchInterruptedException; /** * Abstract cursor which wraps another cursor and transforms each storable @@ -64,14 +63,12 @@ public abstract class TransformedCursor extends AbstractCursor { return true; } try { - int count = 0; while (mCursor.hasNext()) { T next = transform(mCursor.next()); if (next != null) { mNext = next; return true; } - interruptCheck(++count); } } catch (NoSuchElementException e) { } catch (FetchException e) { @@ -115,7 +112,7 @@ public abstract class TransformedCursor extends AbstractCursor { try { int count = 0; while (--amount >= 0 && hasNext()) { - interruptCheck(++count); + ++count; mNext = null; } @@ -129,11 +126,4 @@ public abstract class TransformedCursor extends AbstractCursor { throw e; } } - - private void interruptCheck(int count) throws FetchException { - if ((count & ~0xff) == 0 && Thread.interrupted()) { - close(); - throw new FetchInterruptedException(); - } - } } diff --git a/src/main/java/com/amazon/carbonado/qe/AbstractQuery.java b/src/main/java/com/amazon/carbonado/qe/AbstractQuery.java index 7e857c3..44735ae 100644 --- a/src/main/java/com/amazon/carbonado/qe/AbstractQuery.java +++ b/src/main/java/com/amazon/carbonado/qe/AbstractQuery.java @@ -60,6 +60,13 @@ public abstract class AbstractQuery implements Query, App return after(start).fetch(); } + @Override + public Cursor fetchAfter(T start, Controller controller) + throws FetchException + { + return after(start).fetch(controller); + } + @Override public S loadOne() throws FetchException { S obj = tryLoadOne(); @@ -69,6 +76,15 @@ public abstract class AbstractQuery implements Query, App return obj; } + @Override + public S loadOne(Controller controller) throws FetchException { + S obj = tryLoadOne(controller); + if (obj == null) { + throw new FetchNoneException(toString()); + } + return obj; + } + @Override public S tryLoadOne() throws FetchException { Cursor cursor = fetch(); @@ -87,6 +103,24 @@ public abstract class AbstractQuery implements Query, App } } + @Override + public S tryLoadOne(Controller controller) throws FetchException { + Cursor cursor = fetch(controller); + try { + if (cursor.hasNext()) { + S obj = cursor.next(); + if (cursor.hasNext()) { + throw new FetchMultipleException(toString()); + } + return obj; + } else { + return null; + } + } finally { + cursor.close(); + } + } + @Override public void deleteOne() throws PersistException { if (!tryDeleteOne()) { @@ -94,6 +128,13 @@ public abstract class AbstractQuery implements Query, App } } + @Override + public void deleteOne(Controller controller) throws PersistException { + if (!tryDeleteOne(controller)) { + throw new PersistNoneException(toString()); + } + } + @Override public boolean printNative() { try { diff --git a/src/main/java/com/amazon/carbonado/qe/AbstractQueryExecutor.java b/src/main/java/com/amazon/carbonado/qe/AbstractQueryExecutor.java index e35ec7a..e1921d2 100644 --- a/src/main/java/com/amazon/carbonado/qe/AbstractQueryExecutor.java +++ b/src/main/java/com/amazon/carbonado/qe/AbstractQueryExecutor.java @@ -22,6 +22,7 @@ import java.io.IOException; import com.amazon.carbonado.Cursor; import com.amazon.carbonado.FetchException; +import com.amazon.carbonado.Query; import com.amazon.carbonado.Storable; import com.amazon.carbonado.cursor.LimitCursor; @@ -56,6 +57,26 @@ public abstract class AbstractQueryExecutor implements Query return cursor; } + /** + * Produces a slice via skip and limit cursors. Subclasses are encouraged + * to override with a more efficient implementation. + * + * @since 1.2 + */ + public Cursor fetchSlice(FilterValues values, long from, Long to, + Query.Controller controller) + throws FetchException + { + Cursor cursor = fetch(values, controller); + if (from > 0) { + cursor = new SkipCursor(cursor, from); + } + if (to != null) { + cursor = new LimitCursor(cursor, to - from); + } + return cursor; + } + /** * Counts results by opening a cursor and skipping entries. Subclasses are * encouraged to override with a more efficient implementation. @@ -76,6 +97,26 @@ public abstract class AbstractQueryExecutor implements Query } } + /** + * Counts results by opening a cursor and skipping entries. Subclasses are + * encouraged to override with a more efficient implementation. + */ + public long count(FilterValues values, Query.Controller controller) throws FetchException { + Cursor cursor = fetch(values, controller); + try { + long count = cursor.skipNext(Integer.MAX_VALUE); + if (count == Integer.MAX_VALUE) { + int amt; + while ((amt = cursor.skipNext(Integer.MAX_VALUE)) > 0) { + count += amt; + } + } + return count; + } finally { + cursor.close(); + } + } + /** * Does nothing and returns false. */ diff --git a/src/main/java/com/amazon/carbonado/qe/DelegatedQueryExecutor.java b/src/main/java/com/amazon/carbonado/qe/DelegatedQueryExecutor.java index f9be52a..fa8d9e5 100644 --- a/src/main/java/com/amazon/carbonado/qe/DelegatedQueryExecutor.java +++ b/src/main/java/com/amazon/carbonado/qe/DelegatedQueryExecutor.java @@ -95,14 +95,31 @@ public class DelegatedQueryExecutor implements QueryExecutor return applyFilterValues(values).fetch(); } + public Cursor fetch(FilterValues values, Query.Controller controller) + throws FetchException + { + return applyFilterValues(values).fetch(controller); + } + public Cursor fetchSlice(FilterValues values, long from, Long to) throws FetchException { return applyFilterValues(values).fetchSlice(from, to); } + public Cursor fetchSlice(FilterValues values, long from, Long to, + Query.Controller controller) + throws FetchException + { + return applyFilterValues(values).fetchSlice(from, to, controller); + } + public long count(FilterValues values) throws FetchException { return applyFilterValues(values).count(); } + public long count(FilterValues values, Query.Controller controller) throws FetchException { + return applyFilterValues(values).count(controller); + } + public Filter getFilter() { return mFilter; } diff --git a/src/main/java/com/amazon/carbonado/qe/EmptyQuery.java b/src/main/java/com/amazon/carbonado/qe/EmptyQuery.java index 62418a1..fc7ea0c 100644 --- a/src/main/java/com/amazon/carbonado/qe/EmptyQuery.java +++ b/src/main/java/com/amazon/carbonado/qe/EmptyQuery.java @@ -237,6 +237,14 @@ public final class EmptyQuery extends AbstractQuery { return EmptyCursor.the(); } + /** + * Always returns an {@link EmptyCursor}. + */ + @Override + public Cursor fetch(Controller controller) { + return EmptyCursor.the(); + } + /** * Always returns an {@link EmptyCursor}. */ @@ -246,6 +254,14 @@ public final class EmptyQuery extends AbstractQuery { return EmptyCursor.the(); } + /** + * Always returns an {@link EmptyCursor}. + */ + @Override + public Cursor fetchSlice(long from, Long to, Controller controller) { + return fetchSlice(from, to); + } + /** * Always throws {@link PersistNoneException}. */ @@ -254,6 +270,14 @@ public final class EmptyQuery extends AbstractQuery { throw new PersistNoneException(); } + /** + * Always throws {@link PersistNoneException}. + */ + @Override + public void deleteOne(Controller controller) throws PersistNoneException { + throw new PersistNoneException(); + } + /** * Always returns false. */ @@ -262,6 +286,14 @@ public final class EmptyQuery extends AbstractQuery { return false; } + /** + * Always returns false. + */ + @Override + public boolean tryDeleteOne(Controller controller) { + return false; + } + /** * Does nothing. */ @@ -269,6 +301,13 @@ public final class EmptyQuery extends AbstractQuery { public void deleteAll() { } + /** + * Does nothing. + */ + @Override + public void deleteAll(Controller controller) { + } + /** * Always returns zero. */ @@ -277,6 +316,14 @@ public final class EmptyQuery extends AbstractQuery { return 0; } + /** + * Always returns zero. + */ + @Override + public long count(Controller controller) { + return 0; + } + /** * Always returns false. */ @@ -285,6 +332,14 @@ public final class EmptyQuery extends AbstractQuery { return false; } + /** + * Always returns false. + */ + @Override + public boolean exists(Controller controller) { + return false; + } + @Override public void appendTo(Appendable app) throws IOException { app.append("Query {type="); diff --git a/src/main/java/com/amazon/carbonado/qe/FilteredQueryExecutor.java b/src/main/java/com/amazon/carbonado/qe/FilteredQueryExecutor.java index 770c1bc..17f3105 100644 --- a/src/main/java/com/amazon/carbonado/qe/FilteredQueryExecutor.java +++ b/src/main/java/com/amazon/carbonado/qe/FilteredQueryExecutor.java @@ -22,6 +22,7 @@ import java.io.IOException; import com.amazon.carbonado.Cursor; import com.amazon.carbonado.FetchException; +import com.amazon.carbonado.Query; import com.amazon.carbonado.Storable; import com.amazon.carbonado.cursor.FilteredCursor; @@ -64,6 +65,12 @@ public class FilteredQueryExecutor extends AbstractQueryExec return FilteredCursor.applyFilter(mFilter, values, mExecutor.fetch(values)); } + public Cursor fetch(FilterValues values, Query.Controller controller) + throws FetchException + { + return FilteredCursor.applyFilter(mFilter, values, mExecutor.fetch(values, controller)); + } + /** * Returns the combined filter of the wrapped executor and the extra filter. */ diff --git a/src/main/java/com/amazon/carbonado/qe/FullScanQueryExecutor.java b/src/main/java/com/amazon/carbonado/qe/FullScanQueryExecutor.java index 9c7fd05..9a41fda 100644 --- a/src/main/java/com/amazon/carbonado/qe/FullScanQueryExecutor.java +++ b/src/main/java/com/amazon/carbonado/qe/FullScanQueryExecutor.java @@ -22,6 +22,7 @@ import java.io.IOException; import com.amazon.carbonado.Cursor; import com.amazon.carbonado.FetchException; +import com.amazon.carbonado.Query; import com.amazon.carbonado.Storable; import com.amazon.carbonado.filter.Filter; @@ -58,6 +59,15 @@ public class FullScanQueryExecutor extends AbstractQueryExec return count; } + @Override + public long count(FilterValues values, Query.Controller controller) throws FetchException { + long count = mSupport.countAll(controller); + if (count == -1) { + count = super.count(values, controller); + } + return count; + } + /** * Returns an open filter. */ @@ -69,6 +79,12 @@ public class FullScanQueryExecutor extends AbstractQueryExec return mSupport.fetchAll(); } + public Cursor fetch(FilterValues values, Query.Controller controller) + throws FetchException + { + return mSupport.fetchAll(controller); + } + /** * Returns an empty list. */ @@ -98,9 +114,24 @@ public class FullScanQueryExecutor extends AbstractQueryExec */ long countAll() throws FetchException; + /** + * Counts all Storables. Implementation may return -1 to indicate that + * default count algorithm should be used. + * + * @param controller optional controller which can abort query operation + */ + long countAll(Query.Controller controller) throws FetchException; + /** * Perform a full scan of all Storables. */ Cursor fetchAll() throws FetchException; + + /** + * Perform a full scan of all Storables. + * + * @param controller optional controller which can abort query operation + */ + Cursor fetchAll(Query.Controller controller) throws FetchException; } } diff --git a/src/main/java/com/amazon/carbonado/qe/IndexedQueryExecutor.java b/src/main/java/com/amazon/carbonado/qe/IndexedQueryExecutor.java index c3853d2..b8a39b2 100644 --- a/src/main/java/com/amazon/carbonado/qe/IndexedQueryExecutor.java +++ b/src/main/java/com/amazon/carbonado/qe/IndexedQueryExecutor.java @@ -120,6 +120,12 @@ public class IndexedQueryExecutor extends AbstractQueryExecu } public Cursor fetch(FilterValues values) throws FetchException { + return fetch(values, null); + } + + public Cursor fetch(FilterValues values, Query.Controller controller) + throws FetchException + { Object[] identityValues = null; Object rangeStartValue = null; Object rangeEndValue = null; @@ -182,7 +188,8 @@ public class IndexedQueryExecutor extends AbstractQueryExecu rangeStartBoundary, rangeStartValue, rangeEndBoundary, rangeEndValue, mReverseRange, - mReverseOrder); + mReverseOrder, + controller); } else { indexEntryQuery = indexEntryQuery.withValues(identityValues); if (rangeStartBoundary != BoundaryType.OPEN) { @@ -194,7 +201,7 @@ public class IndexedQueryExecutor extends AbstractQueryExecu if (mCoveringFilter != null && values != null) { indexEntryQuery = indexEntryQuery.withValues(values.getValuesFor(mCoveringFilter)); } - return mSupport.fetchFromIndexEntryQuery(mIndex, indexEntryQuery); + return mSupport.fetchFromIndexEntryQuery(mIndex, indexEntryQuery, controller); } } @@ -414,6 +421,20 @@ public class IndexedQueryExecutor extends AbstractQueryExecu Cursor fetchFromIndexEntryQuery(StorableIndex index, Query indexEntryQuery) throws FetchException; + /** + * Fetch Storables referenced by the given index entry query. This + * method is only called if index supports query access. + * + * @param index index to open + * @param indexEntryQuery query with no blank parameters, derived from + * the query returned by indexEntryQuery + * @param controller optional controller which can abort query operation + * @since 1.2 + */ + Cursor fetchFromIndexEntryQuery(StorableIndex index, Query indexEntryQuery, + Query.Controller controller) + throws FetchException; + /** * Perform an index scan of a subset of Storables referenced by an * index. The identity values are aligned with the index properties at @@ -445,5 +466,39 @@ public class IndexedQueryExecutor extends AbstractQueryExecu boolean reverseRange, boolean reverseOrder) throws FetchException; + + /** + * Perform an index scan of a subset of Storables referenced by an + * index. The identity values are aligned with the index properties at + * property 0. An optional range start or range end aligns with the index + * property following the last of the identity values. + * + *

This method is only called if no index entry query was provided + * for the given index. + * + * @param index index to open, which may be a primary key index + * @param identityValues optional list of exactly matching values to apply to index + * @param rangeStartBoundary start boundary type + * @param rangeStartValue value to start at if boundary is not open + * @param rangeEndBoundary end boundary type + * @param rangeEndValue value to end at if boundary is not open + * @param reverseRange indicates that range operates on a property whose + * natural order is descending. Only the code that opens the physical + * cursor should examine this parameter. If true, then the range start and + * end parameter pairs need to be swapped. + * @param reverseOrder when true, iteration should be reversed from its + * natural order + * @param controller optional controller which can abort query operation + */ + Cursor fetchSubset(StorableIndex index, + Object[] identityValues, + BoundaryType rangeStartBoundary, + Object rangeStartValue, + BoundaryType rangeEndBoundary, + Object rangeEndValue, + boolean reverseRange, + boolean reverseOrder, + Query.Controller controller) + throws FetchException; } } diff --git a/src/main/java/com/amazon/carbonado/qe/IterableQueryExecutor.java b/src/main/java/com/amazon/carbonado/qe/IterableQueryExecutor.java index 9e04a32..2623bb3 100644 --- a/src/main/java/com/amazon/carbonado/qe/IterableQueryExecutor.java +++ b/src/main/java/com/amazon/carbonado/qe/IterableQueryExecutor.java @@ -23,11 +23,13 @@ import java.io.IOException; import java.util.concurrent.locks.Lock; import com.amazon.carbonado.Cursor; +import com.amazon.carbonado.Query; import com.amazon.carbonado.Storable; import com.amazon.carbonado.filter.Filter; import com.amazon.carbonado.filter.FilterValues; +import com.amazon.carbonado.cursor.ControllerCursor; import com.amazon.carbonado.cursor.IteratorCursor; /** @@ -76,6 +78,10 @@ public class IterableQueryExecutor extends AbstractQueryExec return new IteratorCursor(mIterable, mLock); } + public Cursor fetch(FilterValues values, Query.Controller controller) { + return ControllerCursor.apply(new IteratorCursor(mIterable, mLock), controller); + } + /** * Returns an empty list. */ diff --git a/src/main/java/com/amazon/carbonado/qe/JoinedQueryExecutor.java b/src/main/java/com/amazon/carbonado/qe/JoinedQueryExecutor.java index 9a77f26..4e98df4 100644 --- a/src/main/java/com/amazon/carbonado/qe/JoinedQueryExecutor.java +++ b/src/main/java/com/amazon/carbonado/qe/JoinedQueryExecutor.java @@ -36,6 +36,7 @@ import org.cojen.util.ClassInjector; import com.amazon.carbonado.Cursor; import com.amazon.carbonado.FetchException; import com.amazon.carbonado.RepositoryException; +import com.amazon.carbonado.Query; import com.amazon.carbonado.Storable; import com.amazon.carbonado.cursor.MultiTransformedCursor; @@ -190,6 +191,7 @@ public class JoinedQueryExecutor private static final String INNER_LOOP_EX_FIELD_NAME = "innerLoopExecutor"; private static final String INNER_LOOP_FV_FIELD_NAME = "innerLoopFilterValues"; + private static final String INNER_LOOP_CONTROLLER_FIELD_NAME = "innerLoopController"; private static final String ACTIVE_SOURCE_FIELD_NAME = "active"; private static final SoftValuedCache cJoinerCursorClassCache; @@ -240,11 +242,14 @@ public class JoinedQueryExecutor final TypeDesc cursorType = TypeDesc.forClass(Cursor.class); final TypeDesc queryExecutorType = TypeDesc.forClass(QueryExecutor.class); final TypeDesc filterValuesType = TypeDesc.forClass(FilterValues.class); + final TypeDesc controllerType = TypeDesc.forClass(Query.Controller.class); // Define fields for inner loop executor and filter values, which are // passed into the constructor. cf.addField(Modifiers.PRIVATE.toFinal(true), INNER_LOOP_EX_FIELD_NAME, queryExecutorType); cf.addField(Modifiers.PRIVATE.toFinal(true), INNER_LOOP_FV_FIELD_NAME, filterValuesType); + cf.addField(Modifiers.PRIVATE.toFinal(true), + INNER_LOOP_CONTROLLER_FIELD_NAME, controllerType); // If target storable can set a reference to the joined source // storable, then stash a copy of it as we go. This way, when user of @@ -259,7 +264,7 @@ public class JoinedQueryExecutor // Define constructor. { - TypeDesc[] params = {cursorType, queryExecutorType, filterValuesType}; + TypeDesc[] params = {cursorType, queryExecutorType, filterValuesType, controllerType}; MethodInfo mi = cf.addConstructor(Modifiers.PUBLIC, params); CodeBuilder b = new CodeBuilder(mi); @@ -276,6 +281,10 @@ public class JoinedQueryExecutor b.loadLocal(b.getParameter(2)); b.storeField(INNER_LOOP_FV_FIELD_NAME, filterValuesType); + b.loadThis(); + b.loadLocal(b.getParameter(3)); + b.storeField(INNER_LOOP_CONTROLLER_FIELD_NAME, controllerType); + b.returnVoid(); } @@ -317,9 +326,12 @@ public class JoinedQueryExecutor new TypeDesc[] {bindType}); } + b.loadThis(); + b.loadField(INNER_LOOP_CONTROLLER_FIELD_NAME, controllerType); + // Now fetch and return. b.invokeInterface(queryExecutorType, "fetch", cursorType, - new TypeDesc[] {filterValuesType}); + new TypeDesc[] {filterValuesType, controllerType}); b.returnValue(cursorType); } @@ -570,6 +582,12 @@ public class JoinedQueryExecutor } public Cursor fetch(FilterValues values) throws FetchException { + return fetch(values, null); + } + + public Cursor fetch(FilterValues values, Query.Controller controller) + throws FetchException + { FilterValues innerLoopFilterValues = mInnerLoopFilterValues; if (mTargetFilter != null) { @@ -578,10 +596,14 @@ public class JoinedQueryExecutor .withValues(values.getValuesFor(mTargetFilter)); } - Cursor outerLoopCursor = mOuterLoopExecutor.fetch(transferValues(values)); + if (controller != null) { + controller.begin(); + } + + Cursor outerLoopCursor = mOuterLoopExecutor.fetch(transferValues(values), controller); return mJoinerFactory.newJoinedCursor - (outerLoopCursor, mInnerLoopExecutor, innerLoopFilterValues); + (outerLoopCursor, mInnerLoopExecutor, innerLoopFilterValues, controller); } public Filter getFilter() { @@ -628,7 +650,8 @@ public class JoinedQueryExecutor public static interface Factory { Cursor newJoinedCursor(Cursor outerLoopCursor, QueryExecutor innerLoopExecutor, - FilterValues innerLoopFilterValues); + FilterValues innerLoopFilterValues, + Query.Controller innerLoopController); } } } diff --git a/src/main/java/com/amazon/carbonado/qe/KeyQueryExecutor.java b/src/main/java/com/amazon/carbonado/qe/KeyQueryExecutor.java index 1cc0b59..dacda80 100644 --- a/src/main/java/com/amazon/carbonado/qe/KeyQueryExecutor.java +++ b/src/main/java/com/amazon/carbonado/qe/KeyQueryExecutor.java @@ -22,6 +22,7 @@ import java.io.IOException; import com.amazon.carbonado.Cursor; import com.amazon.carbonado.FetchException; +import com.amazon.carbonado.Query; import com.amazon.carbonado.Storable; import com.amazon.carbonado.filter.Filter; @@ -72,6 +73,12 @@ public class KeyQueryExecutor extends AbstractQueryExecutor< return mSupport.fetchOne(mIndex, values.getValuesFor(mKeyFilter)); } + public Cursor fetch(FilterValues values, Query.Controller controller) + throws FetchException + { + return mSupport.fetchOne(mIndex, values.getValuesFor(mKeyFilter), controller); + } + public Filter getFilter() { return mKeyFilter; } @@ -115,5 +122,18 @@ public class KeyQueryExecutor extends AbstractQueryExecutor< */ Cursor fetchOne(StorableIndex index, Object[] identityValues) throws FetchException; + + /** + * Select at most one Storable referenced by an index. The identity + * values fully specify all elements of the index, and the index is + * unique. + * + * @param controller optional controller which can abort query operation + * @param index index to open, which may be a primary key index + * @param identityValues of exactly matching values to apply to index + */ + Cursor fetchOne(StorableIndex index, Object[] identityValues, + Query.Controller controller) + throws FetchException; } } diff --git a/src/main/java/com/amazon/carbonado/qe/QueryExecutor.java b/src/main/java/com/amazon/carbonado/qe/QueryExecutor.java index ab310b0..9ba259b 100644 --- a/src/main/java/com/amazon/carbonado/qe/QueryExecutor.java +++ b/src/main/java/com/amazon/carbonado/qe/QueryExecutor.java @@ -22,6 +22,7 @@ import java.io.IOException; import com.amazon.carbonado.Cursor; import com.amazon.carbonado.FetchException; +import com.amazon.carbonado.Query; import com.amazon.carbonado.Storable; import com.amazon.carbonado.filter.Filter; @@ -45,6 +46,13 @@ public interface QueryExecutor { */ Cursor fetch(FilterValues values) throws FetchException; + /** + * Returns a new cursor using the given filter values. + * + * @param controller optional controller which can abort query operation + */ + Cursor fetch(FilterValues values, Query.Controller controller) throws FetchException; + /** * Returns a new cursor using the given filter values and slice. * @@ -52,11 +60,27 @@ public interface QueryExecutor { */ Cursor fetchSlice(FilterValues values, long from, Long to) throws FetchException; + /** + * Returns a new cursor using the given filter values and slice. + * + * @param controller optional controller which can abort query operation + * @since 1.2 + */ + Cursor fetchSlice(FilterValues values, long from, Long to, Query.Controller controller) + throws FetchException; + /** * Counts the query results using the given filter values. */ long count(FilterValues values) throws FetchException; + /** + * Counts the query results using the given filter values. + * + * @param controller optional controller which can abort query operation + */ + long count(FilterValues values, Query.Controller controller) throws FetchException; + /** * Returns the filter used by this QueryExecutor. * diff --git a/src/main/java/com/amazon/carbonado/qe/SortedQueryExecutor.java b/src/main/java/com/amazon/carbonado/qe/SortedQueryExecutor.java index 82d0ef6..67f739d 100644 --- a/src/main/java/com/amazon/carbonado/qe/SortedQueryExecutor.java +++ b/src/main/java/com/amazon/carbonado/qe/SortedQueryExecutor.java @@ -24,9 +24,11 @@ import java.util.Comparator; import com.amazon.carbonado.Cursor; import com.amazon.carbonado.FetchException; +import com.amazon.carbonado.Query; import com.amazon.carbonado.Storable; import com.amazon.carbonado.cursor.ArraySortBuffer; +import com.amazon.carbonado.cursor.ControllerCursor; import com.amazon.carbonado.cursor.MergeSortBuffer; import com.amazon.carbonado.cursor.SortBuffer; import com.amazon.carbonado.cursor.SortedCursor; @@ -106,11 +108,28 @@ public class SortedQueryExecutor extends AbstractQueryExecut return new SortedCursor(cursor, buffer, mHandledComparator, mFinisherComparator); } + public Cursor fetch(FilterValues values, Query.Controller controller) + throws FetchException + { + Cursor cursor = mExecutor.fetch(values, controller); + SortBuffer buffer = mSupport.createSortBuffer(controller); + // Apply the controller around the cursor to ensure timeouts are + // honored even when the caller is slowly iterating over the cursor. + return ControllerCursor.apply + (new SortedCursor(cursor, buffer, mHandledComparator, mFinisherComparator), + controller); + } + @Override public long count(FilterValues values) throws FetchException { return mExecutor.count(values); } + @Override + public long count(FilterValues values, Query.Controller controller) throws FetchException { + return mExecutor.count(values, controller); + } + public Filter getFilter() { return mExecutor.getFilter(); } @@ -152,6 +171,13 @@ public class SortedQueryExecutor extends AbstractQueryExecut * Implementation must return an empty buffer for sorting. */ SortBuffer createSortBuffer(); + + /** + * Implementation must return an empty buffer for sorting. + * + * @param controller optional controller which can abort query operation + */ + SortBuffer createSortBuffer(Query.Controller controller); } /** @@ -164,6 +190,14 @@ public class SortedQueryExecutor extends AbstractQueryExecut public SortBuffer createSortBuffer() { return new ArraySortBuffer(); } + + /** + * Returns a new ArraySortBuffer. + */ + public SortBuffer createSortBuffer(Query.Controller controller) { + // Java sort utility doesn't support any kind of controller. + return new ArraySortBuffer(); + } } /** @@ -176,5 +210,12 @@ public class SortedQueryExecutor extends AbstractQueryExecut public SortBuffer createSortBuffer() { return new MergeSortBuffer(); } + + /** + * Returns a new MergeSortBuffer. + */ + public SortBuffer createSortBuffer(Query.Controller controller) { + return new MergeSortBuffer(controller); + } } } diff --git a/src/main/java/com/amazon/carbonado/qe/StandardQuery.java b/src/main/java/com/amazon/carbonado/qe/StandardQuery.java index 0349ae2..bce32ed 100644 --- a/src/main/java/com/amazon/carbonado/qe/StandardQuery.java +++ b/src/main/java/com/amazon/carbonado/qe/StandardQuery.java @@ -287,15 +287,29 @@ public abstract class StandardQuery extends AbstractQuery } } + @Override + public Cursor fetch(Controller controller) throws FetchException { + try { + return executor().fetch(mValues, controller); + } catch (RepositoryException e) { + throw e.toFetchException(); + } + } + @Override public Cursor fetchSlice(long from, Long to) throws FetchException { + return fetchSlice(from, to, null); + } + + @Override + public Cursor fetchSlice(long from, Long to, Controller controller) throws FetchException { if (!checkSliceArguments(from, to)) { - return fetch(); + return fetch(controller); } try { QueryHints hints = QueryHints.emptyHints().with(QueryHint.CONSUME_SLICE); return executorFactory().executor(mFilter, mOrdering, hints) - .fetchSlice(mValues, from, to); + .fetchSlice(mValues, from, to, controller); } catch (RepositoryException e) { throw e.toFetchException(); } @@ -303,9 +317,14 @@ public abstract class StandardQuery extends AbstractQuery @Override public boolean tryDeleteOne() throws PersistException { + return tryDeleteOne(null); + } + + @Override + public boolean tryDeleteOne(Controller controller) throws PersistException { Transaction txn = enterTransaction(IsolationLevel.READ_COMMITTED); try { - Cursor cursor = fetch(); + Cursor cursor = fetch(controller); boolean result; try { if (cursor.hasNext()) { @@ -335,9 +354,14 @@ public abstract class StandardQuery extends AbstractQuery @Override public void deleteAll() throws PersistException { + deleteAll(null); + } + + @Override + public void deleteAll(Controller controller) throws PersistException { Transaction txn = enterTransaction(IsolationLevel.READ_COMMITTED); try { - Cursor cursor = fetch(); + Cursor cursor = fetch(controller); try { while (cursor.hasNext()) { cursor.next().tryDelete(); @@ -366,9 +390,23 @@ public abstract class StandardQuery extends AbstractQuery } } + @Override + public long count(Controller controller) throws FetchException { + try { + return executor().count(mValues, controller); + } catch (RepositoryException e) { + throw e.toFetchException(); + } + } + @Override public boolean exists() throws FetchException { - Cursor cursor = fetchSlice(0L, 1L); + return exists(null); + } + + @Override + public boolean exists(Controller controller) throws FetchException { + Cursor cursor = fetchSlice(0L, 1L, controller); try { return cursor.skipNext(1) > 0; } finally { diff --git a/src/main/java/com/amazon/carbonado/qe/UnionQueryExecutor.java b/src/main/java/com/amazon/carbonado/qe/UnionQueryExecutor.java index 09a3174..48152bb 100644 --- a/src/main/java/com/amazon/carbonado/qe/UnionQueryExecutor.java +++ b/src/main/java/com/amazon/carbonado/qe/UnionQueryExecutor.java @@ -26,6 +26,7 @@ import java.util.List; import com.amazon.carbonado.Cursor; import com.amazon.carbonado.FetchException; +import com.amazon.carbonado.Query; import com.amazon.carbonado.Storable; import com.amazon.carbonado.cursor.SortedCursor; @@ -97,9 +98,15 @@ public class UnionQueryExecutor extends AbstractQueryExecuto } public Cursor fetch(FilterValues values) throws FetchException { + return fetch(values, null); + } + + public Cursor fetch(FilterValues values, Query.Controller controller) + throws FetchException + { Cursor cursor = null; for (QueryExecutor executor : mExecutors) { - Cursor subCursor = executor.fetch(values); + Cursor subCursor = executor.fetch(values, controller); cursor = (cursor == null) ? subCursor : new UnionCursor(cursor, subCursor, mOrderComparator); } diff --git a/src/main/java/com/amazon/carbonado/raw/GenericStorableCodecFactory.java b/src/main/java/com/amazon/carbonado/raw/GenericStorableCodecFactory.java index 6c9363f..2b15294 100644 --- a/src/main/java/com/amazon/carbonado/raw/GenericStorableCodecFactory.java +++ b/src/main/java/com/amazon/carbonado/raw/GenericStorableCodecFactory.java @@ -89,8 +89,9 @@ public class GenericStorableCodecFactory implements StorableCodecFactory { RawSupport support) throws SupportException { + LayoutOptions options = layout == null ? getLayoutOptions(type) : layout.getOptions(); return GenericStorableCodec.getInstance - (this, createStrategy(type, pkIndex, null), isMaster, layout, support); + (this, createStrategy(type, pkIndex, options), isMaster, layout, support); } /** diff --git a/src/main/java/com/amazon/carbonado/repo/indexed/IndexedStorage.java b/src/main/java/com/amazon/carbonado/repo/indexed/IndexedStorage.java index 4635f2c..d115a3b 100644 --- a/src/main/java/com/amazon/carbonado/repo/indexed/IndexedStorage.java +++ b/src/main/java/com/amazon/carbonado/repo/indexed/IndexedStorage.java @@ -191,14 +191,26 @@ class IndexedStorage implements Storage, StorageAccess return new MergeSortBuffer(); } + public SortBuffer createSortBuffer(Query.Controller controller) { + return new MergeSortBuffer(controller); + } + public long countAll() throws FetchException { return mMasterStorage.query().count(); } + public long countAll(Query.Controller controller) throws FetchException { + return mMasterStorage.query().count(controller); + } + public Cursor fetchAll() throws FetchException { return mMasterStorage.query().fetch(); } + public Cursor fetchAll(Query.Controller controller) throws FetchException { + return mMasterStorage.query().fetch(controller); + } + public Cursor fetchOne(StorableIndex index, Object[] identityValues) throws FetchException @@ -207,6 +219,15 @@ class IndexedStorage implements Storage, StorageAccess return indexInfo.fetchOne(this, identityValues); } + public Cursor fetchOne(StorableIndex index, + Object[] identityValues, + Query.Controller controller) + throws FetchException + { + ManagedIndex indexInfo = (ManagedIndex) mAllIndexInfoMap.get(index); + return indexInfo.fetchOne(this, identityValues, controller); + } + public Query indexEntryQuery(StorableIndex index) throws FetchException { @@ -221,6 +242,14 @@ class IndexedStorage implements Storage, StorageAccess return indexInfo.fetchFromIndexEntryQuery(this, indexEntryQuery); } + public Cursor fetchFromIndexEntryQuery(StorableIndex index, Query indexEntryQuery, + Query.Controller controller) + throws FetchException + { + ManagedIndex indexInfo = (ManagedIndex) mAllIndexInfoMap.get(index); + return indexInfo.fetchFromIndexEntryQuery(this, indexEntryQuery, controller); + } + public Cursor fetchSubset(StorableIndex index, Object[] identityValues, BoundaryType rangeStartBoundary, @@ -235,6 +264,21 @@ class IndexedStorage implements Storage, StorageAccess throw new UnsupportedOperationException(); } + public Cursor fetchSubset(StorableIndex index, + Object[] identityValues, + BoundaryType rangeStartBoundary, + Object rangeStartValue, + BoundaryType rangeEndBoundary, + Object rangeEndValue, + boolean reverseRange, + boolean reverseOrder, + Query.Controller controller) + throws FetchException + { + // This method should never be called since a query was returned by indexEntryQuery. + throw new UnsupportedOperationException(); + } + private void registerIndex(ManagedIndex managedIndex) throws RepositoryException { diff --git a/src/main/java/com/amazon/carbonado/repo/indexed/ManagedIndex.java b/src/main/java/com/amazon/carbonado/repo/indexed/ManagedIndex.java index 2bb2f49..6c28619 100644 --- a/src/main/java/com/amazon/carbonado/repo/indexed/ManagedIndex.java +++ b/src/main/java/com/amazon/carbonado/repo/indexed/ManagedIndex.java @@ -172,6 +172,13 @@ class ManagedIndex implements IndexEntryAccessor { Cursor fetchOne(IndexedStorage storage, Object[] identityValues) throws FetchException + { + return fetchOne(storage, identityValues, null); + } + + Cursor fetchOne(IndexedStorage storage, Object[] identityValues, + Query.Controller controller) + throws FetchException { Query query = mSingleMatchQuery; @@ -184,13 +191,20 @@ class ManagedIndex implements IndexEntryAccessor { mSingleMatchQuery = query = mIndexEntryStorage.query(filter); } - return fetchFromIndexEntryQuery(storage, query.withValues(identityValues)); + return fetchFromIndexEntryQuery(storage, query.withValues(identityValues), controller); } Cursor fetchFromIndexEntryQuery(IndexedStorage storage, Query indexEntryQuery) throws FetchException { - return new IndexedCursor(indexEntryQuery.fetch(), storage, mAccessor); + return fetchFromIndexEntryQuery(storage, indexEntryQuery, null); + } + + Cursor fetchFromIndexEntryQuery(IndexedStorage storage, Query indexEntryQuery, + Query.Controller controller) + throws FetchException + { + return new IndexedCursor(indexEntryQuery.fetch(controller), storage, mAccessor); } @Override diff --git a/src/main/java/com/amazon/carbonado/repo/jdbc/H2ExceptionTransformer.java b/src/main/java/com/amazon/carbonado/repo/jdbc/H2ExceptionTransformer.java index e760f58..d10856a 100644 --- a/src/main/java/com/amazon/carbonado/repo/jdbc/H2ExceptionTransformer.java +++ b/src/main/java/com/amazon/carbonado/repo/jdbc/H2ExceptionTransformer.java @@ -28,9 +28,15 @@ import java.sql.SQLException; */ class H2ExceptionTransformer extends JDBCExceptionTransformer { public static int DUPLICATE_KEY = 23001; + public static int PROCESSING_CANCELED = 90051; @Override public boolean isUniqueConstraintError(SQLException e) { - return DUPLICATE_KEY == e.getErrorCode(); + return super.isUniqueConstraintError(e) || DUPLICATE_KEY == e.getErrorCode(); + } + + @Override + public boolean isTimeoutError(SQLException e) { + return super.isTimeoutError(e) || PROCESSING_CANCELED == e.getErrorCode(); } } diff --git a/src/main/java/com/amazon/carbonado/repo/jdbc/JDBCExceptionTransformer.java b/src/main/java/com/amazon/carbonado/repo/jdbc/JDBCExceptionTransformer.java index f72c717..4780140 100644 --- a/src/main/java/com/amazon/carbonado/repo/jdbc/JDBCExceptionTransformer.java +++ b/src/main/java/com/amazon/carbonado/repo/jdbc/JDBCExceptionTransformer.java @@ -23,9 +23,11 @@ import java.sql.SQLException; import com.amazon.carbonado.ConstraintException; import com.amazon.carbonado.FetchDeadlockException; import com.amazon.carbonado.FetchException; +import com.amazon.carbonado.FetchTimeoutException; import com.amazon.carbonado.PersistDeadlockException; import com.amazon.carbonado.PersistDeniedException; import com.amazon.carbonado.PersistException; +import com.amazon.carbonado.PersistTimeoutException; import com.amazon.carbonado.UniqueConstraintException; import com.amazon.carbonado.spi.ExceptionTransformer; @@ -59,6 +61,8 @@ class JDBCExceptionTransformer extends ExceptionTransformer { */ public static String SQLSTATE_DEADLOCK_WITH_ROLLBACK = "40001"; + public static String SQLSTATE_PROCESSING_CANCELED = "57014"; + /** * Examines the SQLSTATE code of the given SQL exception and determines if * it is a generic constaint violation. @@ -103,6 +107,16 @@ class JDBCExceptionTransformer extends ExceptionTransformer { return false; } + public boolean isTimeoutError(SQLException e) { + if (e != null) { + String sqlstate = e.getSQLState(); + if (sqlstate != null) { + return SQLSTATE_PROCESSING_CANCELED.equals(sqlstate); + } + } + return false; + } + JDBCExceptionTransformer() { } @@ -117,6 +131,9 @@ class JDBCExceptionTransformer extends ExceptionTransformer { if (isDeadlockError(se)) { return new FetchDeadlockException(e); } + if (isTimeoutError(se)) { + return new FetchTimeoutException(e); + } } return null; } @@ -141,6 +158,9 @@ class JDBCExceptionTransformer extends ExceptionTransformer { if (isDeadlockError(se)) { return new PersistDeadlockException(e); } + if (isTimeoutError(se)) { + return new PersistTimeoutException(e); + } } return null; } diff --git a/src/main/java/com/amazon/carbonado/repo/jdbc/JDBCStorage.java b/src/main/java/com/amazon/carbonado/repo/jdbc/JDBCStorage.java index a49a150..bbe5589 100644 --- a/src/main/java/com/amazon/carbonado/repo/jdbc/JDBCStorage.java +++ b/src/main/java/com/amazon/carbonado/repo/jdbc/JDBCStorage.java @@ -27,6 +27,7 @@ import java.sql.ResultSet; import java.sql.SQLException; import java.util.List; import java.util.Map; +import java.util.concurrent.TimeUnit; import org.apache.commons.logging.LogFactory; @@ -34,6 +35,7 @@ import com.amazon.carbonado.Cursor; import com.amazon.carbonado.FetchException; import com.amazon.carbonado.IsolationLevel; import com.amazon.carbonado.PersistException; +import com.amazon.carbonado.Query; import com.amazon.carbonado.Repository; import com.amazon.carbonado.RepositoryException; import com.amazon.carbonado.Storable; @@ -42,6 +44,7 @@ import com.amazon.carbonado.SupportException; import com.amazon.carbonado.Transaction; import com.amazon.carbonado.Trigger; import com.amazon.carbonado.capability.IndexInfo; +import com.amazon.carbonado.cursor.ControllerCursor; import com.amazon.carbonado.cursor.EmptyCursor; import com.amazon.carbonado.cursor.LimitCursor; import com.amazon.carbonado.filter.AndFilter; @@ -329,6 +332,28 @@ class JDBCStorage extends StandardQueryFactory return new JDBCQuery(filter, values, ordering, hints); } + static PreparedStatement prepareStatement(Connection con, String sql, + Query.Controller controller) + throws SQLException + { + PreparedStatement ps = con.prepareStatement(sql); + + if (controller != null) { + long timeout = controller.getTimeout(); + if (timeout >= 0) { + TimeUnit unit = controller.getTimeoutUnit(); + if (unit != null) { + long seconds = unit.toSeconds(timeout); + int intSeconds = seconds <= 0 ? 1 : + (seconds <= Integer.MAX_VALUE ? ((int) seconds) : 0); + ps.setQueryTimeout(intSeconds); + } + } + } + + return ps; + } + public S instantiate(ResultSet rs) throws SQLException { return (S) mInstanceFactory.instantiate(this, rs, FIRST_RESULT_INDEX); } @@ -668,12 +693,21 @@ class JDBCStorage extends StandardQueryFactory } } + @Override public Cursor fetch(FilterValues values) throws FetchException { + return fetch(values, null); + } + + @Override + public Cursor fetch(FilterValues values, Query.Controller controller) + throws FetchException + { TransactionScope scope = mRepository.localTransactionScope(); boolean forUpdate = scope.isForUpdate(); Connection con = getConnection(); try { - PreparedStatement ps = con.prepareStatement(prepareSelect(values, forUpdate)); + PreparedStatement ps = + prepareStatement(con, prepareSelect(values, forUpdate), controller); Integer fetchSize = mRepository.getFetchSize(); if (fetchSize != null) { ps.setFetchSize(fetchSize); @@ -681,7 +715,8 @@ class JDBCStorage extends StandardQueryFactory try { setParameters(ps, values); - return new JDBCCursor(JDBCStorage.this, scope, con, ps); + return ControllerCursor.apply + (new JDBCCursor(JDBCStorage.this, scope, con, ps), controller); } catch (Exception e) { // in case of exception, close statement try { @@ -705,6 +740,14 @@ class JDBCStorage extends StandardQueryFactory @Override public Cursor fetchSlice(FilterValues values, long from, Long to) throws FetchException + { + return fetchSlice(values, from, to, null); + } + + @Override + public Cursor fetchSlice(FilterValues values, long from, Long to, + Query.Controller controller) + throws FetchException { if (to != null && (to - from) <= 0) { return EmptyCursor.the(); @@ -716,17 +759,17 @@ class JDBCStorage extends StandardQueryFactory switch (option) { case NOT_SUPPORTED: default: - return super.fetchSlice(values, from, to); + return super.fetchSlice(values, from, to, controller); case LIMIT_ONLY: if (from > 0 || to == null) { - return super.fetchSlice(values, from, to); + return super.fetchSlice(values, from, to, controller); } select = prepareSelect(values, false); select = mSupportStrategy.buildSelectWithSlice(select, false, true); break; case OFFSET_ONLY: if (from <= 0) { - return super.fetchSlice(values, from, to); + return super.fetchSlice(values, from, to, controller); } select = prepareSelect(values, false); select = mSupportStrategy.buildSelectWithSlice(select, true, false); @@ -746,7 +789,7 @@ class JDBCStorage extends StandardQueryFactory Connection con = getConnection(); try { - PreparedStatement ps = con.prepareStatement(select); + PreparedStatement ps = prepareStatement(con, select, controller); Integer fetchSize = mRepository.getFetchSize(); if (fetchSize != null) { ps.setFetchSize(fetchSize); @@ -760,7 +803,10 @@ class JDBCStorage extends StandardQueryFactory switch (option) { case OFFSET_ONLY: ps.setLong(psOrdinal, from); - Cursor c = new JDBCCursor(JDBCStorage.this, scope, con, ps); + Cursor c = + ControllerCursor.apply + (new JDBCCursor(JDBCStorage.this, scope, con, ps), + controller); return new LimitCursor(c, to - from); case LIMIT_AND_OFFSET: ps.setLong(psOrdinal, to - from); @@ -782,7 +828,8 @@ class JDBCStorage extends StandardQueryFactory ps.setLong(psOrdinal, to); } - return new JDBCCursor(JDBCStorage.this, scope, con, ps); + return ControllerCursor.apply + (new JDBCCursor(JDBCStorage.this, scope, con, ps), controller); } catch (Exception e) { // in case of exception, close statement try { @@ -805,9 +852,17 @@ class JDBCStorage extends StandardQueryFactory @Override public long count(FilterValues values) throws FetchException { + return count(values, null); + } + + @Override + public long count(FilterValues values, Query.Controller controller) + throws FetchException + { Connection con = getConnection(); try { - PreparedStatement ps = con.prepareStatement(prepareCount(values)); + PreparedStatement ps = prepareStatement(con, prepareCount(values), controller); + try { setParameters(ps, values); ResultSet rs = ps.executeQuery(); @@ -827,10 +882,12 @@ class JDBCStorage extends StandardQueryFactory } } + @Override public Filter getFilter() { return mFilter; } + @Override public OrderingList getOrdering() { return mOrdering; } @@ -846,6 +903,7 @@ class JDBCStorage extends StandardQueryFactory return true; } + @Override public boolean printPlan(Appendable app, int indentLevel, FilterValues values) throws IOException { @@ -862,7 +920,9 @@ class JDBCStorage extends StandardQueryFactory /** * Delete operation is included in cursor factory for ease of implementation. */ - int executeDelete(FilterValues filterValues) throws PersistException { + int executeDelete(FilterValues filterValues, Query.Controller controller) + throws PersistException + { Connection con; try { con = getConnection(); @@ -870,7 +930,8 @@ class JDBCStorage extends StandardQueryFactory throw e.toPersistException(); } try { - PreparedStatement ps = con.prepareStatement(prepareDelete(filterValues)); + PreparedStatement ps = + prepareStatement(con, prepareDelete(filterValues), controller); try { setParameters(ps, filterValues); return ps.executeUpdate(); @@ -976,13 +1037,18 @@ class JDBCStorage extends StandardQueryFactory @Override public void deleteAll() throws PersistException { + deleteAll(null); + } + + @Override + public void deleteAll(Controller controller) throws PersistException { if (mTriggerManager.getDeleteTrigger() != null) { // Super implementation loads one at time and calls // delete. This allows delete trigger to be invoked on each. - super.deleteAll(); + super.deleteAll(controller); } else { try { - ((Executor) executor()).executeDelete(getFilterValues()); + ((Executor) executor()).executeDelete(getFilterValues(), controller); } catch (RepositoryException e) { throw e.toPersistException(); } diff --git a/src/main/java/com/amazon/carbonado/repo/jdbc/OracleExceptionTransformer.java b/src/main/java/com/amazon/carbonado/repo/jdbc/OracleExceptionTransformer.java index ad63941..ebaafd7 100644 --- a/src/main/java/com/amazon/carbonado/repo/jdbc/OracleExceptionTransformer.java +++ b/src/main/java/com/amazon/carbonado/repo/jdbc/OracleExceptionTransformer.java @@ -32,6 +32,8 @@ class OracleExceptionTransformer extends JDBCExceptionTransformer { public static int DEADLOCK_DETECTED = 60; + public static int PROCESSING_CANCELED = 1013; + @Override public boolean isUniqueConstraintError(SQLException e) { if (isConstraintError(e)) { @@ -63,4 +65,9 @@ class OracleExceptionTransformer extends JDBCExceptionTransformer { } return false; } + + @Override + public boolean isTimeoutError(SQLException e) { + return super.isTimeoutError(e) || e != null && PROCESSING_CANCELED == e.getErrorCode(); + } } diff --git a/src/main/java/com/amazon/carbonado/repo/logging/LoggingQuery.java b/src/main/java/com/amazon/carbonado/repo/logging/LoggingQuery.java index 08be6e3..5dc4d3b 100644 --- a/src/main/java/com/amazon/carbonado/repo/logging/LoggingQuery.java +++ b/src/main/java/com/amazon/carbonado/repo/logging/LoggingQuery.java @@ -164,6 +164,15 @@ class LoggingQuery implements Query { return mQuery.fetch(); } + @Override + public Cursor fetch(Controller controller) throws FetchException { + Log log = mStorage.mLog; + if (log.isEnabled()) { + log.write("Query.fetch(controller) on " + this + ", controller: " + controller); + } + return mQuery.fetch(controller); + } + @Override public Cursor fetchSlice(long from, Long to) throws FetchException { Log log = mStorage.mLog; @@ -174,6 +183,16 @@ class LoggingQuery implements Query { return mQuery.fetchSlice(from, to); } + @Override + public Cursor fetchSlice(long from, Long to, Controller controller) throws FetchException { + Log log = mStorage.mLog; + if (log.isEnabled()) { + log.write("Query.fetchSlice(start, to, controller) on " + this + + ", from: " + from + ", to: " + to + ", controller: " + controller); + } + return mQuery.fetchSlice(from, to, controller); + } + @Override public Cursor fetchAfter(T start) throws FetchException { Log log = mStorage.mLog; @@ -183,6 +202,18 @@ class LoggingQuery implements Query { return mQuery.fetchAfter(start); } + @Override + public Cursor fetchAfter(T start, Controller controller) + throws FetchException + { + Log log = mStorage.mLog; + if (log.isEnabled()) { + log.write("Query.fetchAfter(start, controller) on " + this + ", start: " + start + + ", controller: " + controller); + } + return mQuery.fetchAfter(start, controller); + } + @Override public S loadOne() throws FetchException { Log log = mStorage.mLog; @@ -192,6 +223,15 @@ class LoggingQuery implements Query { return mQuery.loadOne(); } + @Override + public S loadOne(Controller controller) throws FetchException { + Log log = mStorage.mLog; + if (log.isEnabled()) { + log.write("Query.loadOne() on " + this + ", controller: " + controller); + } + return mQuery.loadOne(controller); + } + @Override public S tryLoadOne() throws FetchException { Log log = mStorage.mLog; @@ -201,6 +241,15 @@ class LoggingQuery implements Query { return mQuery.tryLoadOne(); } + @Override + public S tryLoadOne(Controller controller) throws FetchException { + Log log = mStorage.mLog; + if (log.isEnabled()) { + log.write("Query.tryLoadOne(controller) on " + this + ", controller: " + controller); + } + return mQuery.tryLoadOne(controller); + } + @Override public void deleteOne() throws PersistException { Log log = mStorage.mLog; @@ -210,6 +259,15 @@ class LoggingQuery implements Query { mQuery.deleteOne(); } + @Override + public void deleteOne(Controller controller) throws PersistException { + Log log = mStorage.mLog; + if (log.isEnabled()) { + log.write("Query.deleteOne(controller) on " + this + ", controller: " + controller); + } + mQuery.deleteOne(controller); + } + @Override public boolean tryDeleteOne() throws PersistException { Log log = mStorage.mLog; @@ -219,6 +277,15 @@ class LoggingQuery implements Query { return mQuery.tryDeleteOne(); } + @Override + public boolean tryDeleteOne(Controller controller) throws PersistException { + Log log = mStorage.mLog; + if (log.isEnabled()) { + log.write("Query.tryDeleteOne(controller) on " + this + ", controller: " + controller); + } + return mQuery.tryDeleteOne(controller); + } + @Override public void deleteAll() throws PersistException { Log log = mStorage.mLog; @@ -228,6 +295,15 @@ class LoggingQuery implements Query { mQuery.deleteAll(); } + @Override + public void deleteAll(Controller controller) throws PersistException { + Log log = mStorage.mLog; + if (log.isEnabled()) { + log.write("Query.deleteAll(controller) on " + this + ", controller: " + controller); + } + mQuery.deleteAll(controller); + } + @Override public long count() throws FetchException { Log log = mStorage.mLog; @@ -237,6 +313,15 @@ class LoggingQuery implements Query { return mQuery.count(); } + @Override + public long count(Controller controller) throws FetchException { + Log log = mStorage.mLog; + if (log.isEnabled()) { + log.write("Query.count(controller) on " + this + ", controller: " + controller); + } + return mQuery.count(controller); + } + @Override public boolean exists() throws FetchException { Log log = mStorage.mLog; @@ -246,6 +331,15 @@ class LoggingQuery implements Query { return mQuery.exists(); } + @Override + public boolean exists(Controller controller) throws FetchException { + Log log = mStorage.mLog; + if (log.isEnabled()) { + log.write("Query.exists(controller) on " + this + ", controller: " + controller); + } + return mQuery.exists(controller); + } + @Override public boolean printNative() { return mQuery.printNative(); diff --git a/src/main/java/com/amazon/carbonado/repo/map/MapStorage.java b/src/main/java/com/amazon/carbonado/repo/map/MapStorage.java index c84d8e3..c9c17b6 100644 --- a/src/main/java/com/amazon/carbonado/repo/map/MapStorage.java +++ b/src/main/java/com/amazon/carbonado/repo/map/MapStorage.java @@ -47,6 +47,7 @@ import com.amazon.carbonado.Trigger; import com.amazon.carbonado.capability.IndexInfo; import com.amazon.carbonado.cursor.ArraySortBuffer; +import com.amazon.carbonado.cursor.ControllerCursor; import com.amazon.carbonado.cursor.EmptyCursor; import com.amazon.carbonado.cursor.FilteredCursor; import com.amazon.carbonado.cursor.SingletonCursor; @@ -540,6 +541,10 @@ class MapStorage } public long countAll() throws FetchException { + return countAll(null); + } + + public long countAll(Query.Controller controller) throws FetchException { try { TransactionScope scope = mRepo.localTransactionScope(); MapTransaction txn = scope.getTxn(); @@ -578,8 +583,19 @@ class MapStorage } } + public Cursor fetchAll(Query.Controller controller) throws FetchException { + return ControllerCursor.apply(fetchAll(), controller); + } + public Cursor fetchOne(StorableIndex index, Object[] identityValues) throws FetchException + { + return fetchOne(index, identityValues, null); + } + + public Cursor fetchOne(StorableIndex index, Object[] identityValues, + Query.Controller controller) + throws FetchException { try { S key = prepare(); @@ -643,6 +659,12 @@ class MapStorage return null; } + public Cursor fetchFromIndexEntryQuery(StorableIndex index, Query indexEntryQuery, + Query.Controller controller) + { + return null; + } + public Cursor fetchSubset(StorableIndex index, Object[] identityValues, BoundaryType rangeStartBoundary, @@ -770,6 +792,28 @@ class MapStorage return cursor; } + public Cursor fetchSubset(StorableIndex index, + Object[] identityValues, + BoundaryType rangeStartBoundary, + Object rangeStartValue, + BoundaryType rangeEndBoundary, + Object rangeEndValue, + boolean reverseRange, + boolean reverseOrder, + Query.Controller controller) + throws FetchException + { + return ControllerCursor.apply(fetchSubset(index, + identityValues, + rangeStartBoundary, + rangeStartValue, + rangeEndBoundary, + rangeEndValue, + reverseRange, + reverseOrder), + controller); + } + private List> createPkPropList() { return new ArrayList>(mInfo.getPrimaryKey().getProperties()); } @@ -806,6 +850,11 @@ class MapStorage return new ArraySortBuffer(); } + public SortBuffer createSortBuffer(Query.Controller controller) { + // ArraySortBuffer doesn't support controller. + return new ArraySortBuffer(); + } + public static interface InstanceFactory { Storable instantiate(DelegateSupport support); } diff --git a/src/main/java/com/amazon/carbonado/repo/sleepycat/BDBStorage.java b/src/main/java/com/amazon/carbonado/repo/sleepycat/BDBStorage.java index f4d8f24..c0f882e 100644 --- a/src/main/java/com/amazon/carbonado/repo/sleepycat/BDBStorage.java +++ b/src/main/java/com/amazon/carbonado/repo/sleepycat/BDBStorage.java @@ -45,6 +45,7 @@ import com.amazon.carbonado.UniqueConstraintException; import com.amazon.carbonado.capability.IndexInfo; +import com.amazon.carbonado.cursor.ControllerCursor; import com.amazon.carbonado.cursor.EmptyCursor; import com.amazon.carbonado.cursor.MergeSortBuffer; import com.amazon.carbonado.cursor.SingletonCursor; @@ -263,22 +264,45 @@ abstract class BDBStorage implements Storage, Storag return new MergeSortBuffer(); } + public SortBuffer createSortBuffer(Query.Controller controller) { + return new MergeSortBuffer(controller); + } + public long countAll() throws FetchException { // Return -1 to indicate default algorithm should be used. return -1; } + public long countAll(Query.Controller controller) throws FetchException { + // Return -1 to indicate default algorithm should be used. + return -1; + } + public Cursor fetchAll() throws FetchException { + return fetchAll(null); + } + + public Cursor fetchAll(Query.Controller controller) throws FetchException { return fetchSubset(null, null, BoundaryType.OPEN, null, BoundaryType.OPEN, null, - false, false); + false, false, + controller); } public Cursor fetchOne(StorableIndex index, Object[] identityValues) throws FetchException { + return fetchOne(index, identityValues, null); + } + + public Cursor fetchOne(StorableIndex index, + Object[] identityValues, + Query.Controller controller) + throws FetchException + { + // Note: Controller is never called. byte[] key = mStorableCodec.encodePrimaryKey(identityValues); byte[] value = mRawSupport.tryLoad(null, key); if (value == null) { @@ -296,6 +320,13 @@ abstract class BDBStorage implements Storage, Storag throw new UnsupportedOperationException(); } + public Cursor fetchFromIndexEntryQuery(StorableIndex index, Query indexEntryQuery, + Query.Controller controller) + { + // This method should never be called since null was returned by indexEntryQuery. + throw new UnsupportedOperationException(); + } + public Cursor fetchSubset(StorableIndex index, Object[] identityValues, BoundaryType rangeStartBoundary, @@ -378,6 +409,7 @@ abstract class BDBStorage implements Storage, Storag getPrimaryDatabase()); cursor.open(); + return cursor; } catch (Exception e) { throw toFetchException(e); @@ -387,6 +419,28 @@ abstract class BDBStorage implements Storage, Storag } } + public Cursor fetchSubset(StorableIndex index, + Object[] identityValues, + BoundaryType rangeStartBoundary, + Object rangeStartValue, + BoundaryType rangeEndBoundary, + Object rangeEndValue, + boolean reverseRange, + boolean reverseOrder, + Query.Controller controller) + throws FetchException + { + return ControllerCursor.apply(fetchSubset(index, + identityValues, + rangeStartBoundary, + rangeStartValue, + rangeEndBoundary, + rangeEndValue, + reverseRange, + reverseOrder), + controller); + } + private byte[] createBound(Object[] exactValues, byte[] exactKey, Object rangeValue, StorableCodec codec) { Object[] values = {rangeValue}; -- cgit v1.2.3