From 121886bc0c92389610408e3b415abb992ad8a212 Mon Sep 17 00:00:00 2001 From: "Brian S. O'Neill" Date: Wed, 4 May 2011 00:20:02 +0000 Subject: Add support for Query controller and timeouts; remove vestigial support for interrupts. --- .../com/amazon/carbonado/qe/AbstractQuery.java | 41 +++++++++++++++ .../amazon/carbonado/qe/AbstractQueryExecutor.java | 41 +++++++++++++++ .../carbonado/qe/DelegatedQueryExecutor.java | 17 +++++++ .../java/com/amazon/carbonado/qe/EmptyQuery.java | 55 ++++++++++++++++++++ .../amazon/carbonado/qe/FilteredQueryExecutor.java | 7 +++ .../amazon/carbonado/qe/FullScanQueryExecutor.java | 31 ++++++++++++ .../amazon/carbonado/qe/IndexedQueryExecutor.java | 59 +++++++++++++++++++++- .../amazon/carbonado/qe/IterableQueryExecutor.java | 6 +++ .../amazon/carbonado/qe/JoinedQueryExecutor.java | 33 ++++++++++-- .../com/amazon/carbonado/qe/KeyQueryExecutor.java | 20 ++++++++ .../com/amazon/carbonado/qe/QueryExecutor.java | 24 +++++++++ .../amazon/carbonado/qe/SortedQueryExecutor.java | 41 +++++++++++++++ .../com/amazon/carbonado/qe/StandardQuery.java | 48 ++++++++++++++++-- .../amazon/carbonado/qe/UnionQueryExecutor.java | 9 +++- 14 files changed, 419 insertions(+), 13 deletions(-) (limited to 'src/main/java/com/amazon/carbonado/qe') diff --git a/src/main/java/com/amazon/carbonado/qe/AbstractQuery.java b/src/main/java/com/amazon/carbonado/qe/AbstractQuery.java index 7e857c3..44735ae 100644 --- a/src/main/java/com/amazon/carbonado/qe/AbstractQuery.java +++ b/src/main/java/com/amazon/carbonado/qe/AbstractQuery.java @@ -60,6 +60,13 @@ public abstract class AbstractQuery implements Query, App return after(start).fetch(); } + @Override + public Cursor fetchAfter(T start, Controller controller) + throws FetchException + { + return after(start).fetch(controller); + } + @Override public S loadOne() throws FetchException { S obj = tryLoadOne(); @@ -69,6 +76,15 @@ public abstract class AbstractQuery implements Query, App return obj; } + @Override + public S loadOne(Controller controller) throws FetchException { + S obj = tryLoadOne(controller); + if (obj == null) { + throw new FetchNoneException(toString()); + } + return obj; + } + @Override public S tryLoadOne() throws FetchException { Cursor cursor = fetch(); @@ -87,6 +103,24 @@ public abstract class AbstractQuery implements Query, App } } + @Override + public S tryLoadOne(Controller controller) throws FetchException { + Cursor cursor = fetch(controller); + try { + if (cursor.hasNext()) { + S obj = cursor.next(); + if (cursor.hasNext()) { + throw new FetchMultipleException(toString()); + } + return obj; + } else { + return null; + } + } finally { + cursor.close(); + } + } + @Override public void deleteOne() throws PersistException { if (!tryDeleteOne()) { @@ -94,6 +128,13 @@ public abstract class AbstractQuery implements Query, App } } + @Override + public void deleteOne(Controller controller) throws PersistException { + if (!tryDeleteOne(controller)) { + throw new PersistNoneException(toString()); + } + } + @Override public boolean printNative() { try { diff --git a/src/main/java/com/amazon/carbonado/qe/AbstractQueryExecutor.java b/src/main/java/com/amazon/carbonado/qe/AbstractQueryExecutor.java index e35ec7a..e1921d2 100644 --- a/src/main/java/com/amazon/carbonado/qe/AbstractQueryExecutor.java +++ b/src/main/java/com/amazon/carbonado/qe/AbstractQueryExecutor.java @@ -22,6 +22,7 @@ import java.io.IOException; import com.amazon.carbonado.Cursor; import com.amazon.carbonado.FetchException; +import com.amazon.carbonado.Query; import com.amazon.carbonado.Storable; import com.amazon.carbonado.cursor.LimitCursor; @@ -56,6 +57,26 @@ public abstract class AbstractQueryExecutor implements Query return cursor; } + /** + * Produces a slice via skip and limit cursors. Subclasses are encouraged + * to override with a more efficient implementation. + * + * @since 1.2 + */ + public Cursor fetchSlice(FilterValues values, long from, Long to, + Query.Controller controller) + throws FetchException + { + Cursor cursor = fetch(values, controller); + if (from > 0) { + cursor = new SkipCursor(cursor, from); + } + if (to != null) { + cursor = new LimitCursor(cursor, to - from); + } + return cursor; + } + /** * Counts results by opening a cursor and skipping entries. Subclasses are * encouraged to override with a more efficient implementation. @@ -76,6 +97,26 @@ public abstract class AbstractQueryExecutor implements Query } } + /** + * Counts results by opening a cursor and skipping entries. Subclasses are + * encouraged to override with a more efficient implementation. + */ + public long count(FilterValues values, Query.Controller controller) throws FetchException { + Cursor cursor = fetch(values, controller); + try { + long count = cursor.skipNext(Integer.MAX_VALUE); + if (count == Integer.MAX_VALUE) { + int amt; + while ((amt = cursor.skipNext(Integer.MAX_VALUE)) > 0) { + count += amt; + } + } + return count; + } finally { + cursor.close(); + } + } + /** * Does nothing and returns false. */ diff --git a/src/main/java/com/amazon/carbonado/qe/DelegatedQueryExecutor.java b/src/main/java/com/amazon/carbonado/qe/DelegatedQueryExecutor.java index f9be52a..fa8d9e5 100644 --- a/src/main/java/com/amazon/carbonado/qe/DelegatedQueryExecutor.java +++ b/src/main/java/com/amazon/carbonado/qe/DelegatedQueryExecutor.java @@ -95,14 +95,31 @@ public class DelegatedQueryExecutor implements QueryExecutor return applyFilterValues(values).fetch(); } + public Cursor fetch(FilterValues values, Query.Controller controller) + throws FetchException + { + return applyFilterValues(values).fetch(controller); + } + public Cursor fetchSlice(FilterValues values, long from, Long to) throws FetchException { return applyFilterValues(values).fetchSlice(from, to); } + public Cursor fetchSlice(FilterValues values, long from, Long to, + Query.Controller controller) + throws FetchException + { + return applyFilterValues(values).fetchSlice(from, to, controller); + } + public long count(FilterValues values) throws FetchException { return applyFilterValues(values).count(); } + public long count(FilterValues values, Query.Controller controller) throws FetchException { + return applyFilterValues(values).count(controller); + } + public Filter getFilter() { return mFilter; } diff --git a/src/main/java/com/amazon/carbonado/qe/EmptyQuery.java b/src/main/java/com/amazon/carbonado/qe/EmptyQuery.java index 62418a1..fc7ea0c 100644 --- a/src/main/java/com/amazon/carbonado/qe/EmptyQuery.java +++ b/src/main/java/com/amazon/carbonado/qe/EmptyQuery.java @@ -237,6 +237,14 @@ public final class EmptyQuery extends AbstractQuery { return EmptyCursor.the(); } + /** + * Always returns an {@link EmptyCursor}. + */ + @Override + public Cursor fetch(Controller controller) { + return EmptyCursor.the(); + } + /** * Always returns an {@link EmptyCursor}. */ @@ -246,6 +254,14 @@ public final class EmptyQuery extends AbstractQuery { return EmptyCursor.the(); } + /** + * Always returns an {@link EmptyCursor}. + */ + @Override + public Cursor fetchSlice(long from, Long to, Controller controller) { + return fetchSlice(from, to); + } + /** * Always throws {@link PersistNoneException}. */ @@ -254,6 +270,14 @@ public final class EmptyQuery extends AbstractQuery { throw new PersistNoneException(); } + /** + * Always throws {@link PersistNoneException}. + */ + @Override + public void deleteOne(Controller controller) throws PersistNoneException { + throw new PersistNoneException(); + } + /** * Always returns false. */ @@ -262,6 +286,14 @@ public final class EmptyQuery extends AbstractQuery { return false; } + /** + * Always returns false. + */ + @Override + public boolean tryDeleteOne(Controller controller) { + return false; + } + /** * Does nothing. */ @@ -269,6 +301,13 @@ public final class EmptyQuery extends AbstractQuery { public void deleteAll() { } + /** + * Does nothing. + */ + @Override + public void deleteAll(Controller controller) { + } + /** * Always returns zero. */ @@ -277,6 +316,14 @@ public final class EmptyQuery extends AbstractQuery { return 0; } + /** + * Always returns zero. + */ + @Override + public long count(Controller controller) { + return 0; + } + /** * Always returns false. */ @@ -285,6 +332,14 @@ public final class EmptyQuery extends AbstractQuery { return false; } + /** + * Always returns false. + */ + @Override + public boolean exists(Controller controller) { + return false; + } + @Override public void appendTo(Appendable app) throws IOException { app.append("Query {type="); diff --git a/src/main/java/com/amazon/carbonado/qe/FilteredQueryExecutor.java b/src/main/java/com/amazon/carbonado/qe/FilteredQueryExecutor.java index 770c1bc..17f3105 100644 --- a/src/main/java/com/amazon/carbonado/qe/FilteredQueryExecutor.java +++ b/src/main/java/com/amazon/carbonado/qe/FilteredQueryExecutor.java @@ -22,6 +22,7 @@ import java.io.IOException; import com.amazon.carbonado.Cursor; import com.amazon.carbonado.FetchException; +import com.amazon.carbonado.Query; import com.amazon.carbonado.Storable; import com.amazon.carbonado.cursor.FilteredCursor; @@ -64,6 +65,12 @@ public class FilteredQueryExecutor extends AbstractQueryExec return FilteredCursor.applyFilter(mFilter, values, mExecutor.fetch(values)); } + public Cursor fetch(FilterValues values, Query.Controller controller) + throws FetchException + { + return FilteredCursor.applyFilter(mFilter, values, mExecutor.fetch(values, controller)); + } + /** * Returns the combined filter of the wrapped executor and the extra filter. */ diff --git a/src/main/java/com/amazon/carbonado/qe/FullScanQueryExecutor.java b/src/main/java/com/amazon/carbonado/qe/FullScanQueryExecutor.java index 9c7fd05..9a41fda 100644 --- a/src/main/java/com/amazon/carbonado/qe/FullScanQueryExecutor.java +++ b/src/main/java/com/amazon/carbonado/qe/FullScanQueryExecutor.java @@ -22,6 +22,7 @@ import java.io.IOException; import com.amazon.carbonado.Cursor; import com.amazon.carbonado.FetchException; +import com.amazon.carbonado.Query; import com.amazon.carbonado.Storable; import com.amazon.carbonado.filter.Filter; @@ -58,6 +59,15 @@ public class FullScanQueryExecutor extends AbstractQueryExec return count; } + @Override + public long count(FilterValues values, Query.Controller controller) throws FetchException { + long count = mSupport.countAll(controller); + if (count == -1) { + count = super.count(values, controller); + } + return count; + } + /** * Returns an open filter. */ @@ -69,6 +79,12 @@ public class FullScanQueryExecutor extends AbstractQueryExec return mSupport.fetchAll(); } + public Cursor fetch(FilterValues values, Query.Controller controller) + throws FetchException + { + return mSupport.fetchAll(controller); + } + /** * Returns an empty list. */ @@ -98,9 +114,24 @@ public class FullScanQueryExecutor extends AbstractQueryExec */ long countAll() throws FetchException; + /** + * Counts all Storables. Implementation may return -1 to indicate that + * default count algorithm should be used. + * + * @param controller optional controller which can abort query operation + */ + long countAll(Query.Controller controller) throws FetchException; + /** * Perform a full scan of all Storables. */ Cursor fetchAll() throws FetchException; + + /** + * Perform a full scan of all Storables. + * + * @param controller optional controller which can abort query operation + */ + Cursor fetchAll(Query.Controller controller) throws FetchException; } } diff --git a/src/main/java/com/amazon/carbonado/qe/IndexedQueryExecutor.java b/src/main/java/com/amazon/carbonado/qe/IndexedQueryExecutor.java index c3853d2..b8a39b2 100644 --- a/src/main/java/com/amazon/carbonado/qe/IndexedQueryExecutor.java +++ b/src/main/java/com/amazon/carbonado/qe/IndexedQueryExecutor.java @@ -120,6 +120,12 @@ public class IndexedQueryExecutor extends AbstractQueryExecu } public Cursor fetch(FilterValues values) throws FetchException { + return fetch(values, null); + } + + public Cursor fetch(FilterValues values, Query.Controller controller) + throws FetchException + { Object[] identityValues = null; Object rangeStartValue = null; Object rangeEndValue = null; @@ -182,7 +188,8 @@ public class IndexedQueryExecutor extends AbstractQueryExecu rangeStartBoundary, rangeStartValue, rangeEndBoundary, rangeEndValue, mReverseRange, - mReverseOrder); + mReverseOrder, + controller); } else { indexEntryQuery = indexEntryQuery.withValues(identityValues); if (rangeStartBoundary != BoundaryType.OPEN) { @@ -194,7 +201,7 @@ public class IndexedQueryExecutor extends AbstractQueryExecu if (mCoveringFilter != null && values != null) { indexEntryQuery = indexEntryQuery.withValues(values.getValuesFor(mCoveringFilter)); } - return mSupport.fetchFromIndexEntryQuery(mIndex, indexEntryQuery); + return mSupport.fetchFromIndexEntryQuery(mIndex, indexEntryQuery, controller); } } @@ -414,6 +421,20 @@ public class IndexedQueryExecutor extends AbstractQueryExecu Cursor fetchFromIndexEntryQuery(StorableIndex index, Query indexEntryQuery) throws FetchException; + /** + * Fetch Storables referenced by the given index entry query. This + * method is only called if index supports query access. + * + * @param index index to open + * @param indexEntryQuery query with no blank parameters, derived from + * the query returned by indexEntryQuery + * @param controller optional controller which can abort query operation + * @since 1.2 + */ + Cursor fetchFromIndexEntryQuery(StorableIndex index, Query indexEntryQuery, + Query.Controller controller) + throws FetchException; + /** * Perform an index scan of a subset of Storables referenced by an * index. The identity values are aligned with the index properties at @@ -445,5 +466,39 @@ public class IndexedQueryExecutor extends AbstractQueryExecu boolean reverseRange, boolean reverseOrder) throws FetchException; + + /** + * Perform an index scan of a subset of Storables referenced by an + * index. The identity values are aligned with the index properties at + * property 0. An optional range start or range end aligns with the index + * property following the last of the identity values. + * + *

This method is only called if no index entry query was provided + * for the given index. + * + * @param index index to open, which may be a primary key index + * @param identityValues optional list of exactly matching values to apply to index + * @param rangeStartBoundary start boundary type + * @param rangeStartValue value to start at if boundary is not open + * @param rangeEndBoundary end boundary type + * @param rangeEndValue value to end at if boundary is not open + * @param reverseRange indicates that range operates on a property whose + * natural order is descending. Only the code that opens the physical + * cursor should examine this parameter. If true, then the range start and + * end parameter pairs need to be swapped. + * @param reverseOrder when true, iteration should be reversed from its + * natural order + * @param controller optional controller which can abort query operation + */ + Cursor fetchSubset(StorableIndex index, + Object[] identityValues, + BoundaryType rangeStartBoundary, + Object rangeStartValue, + BoundaryType rangeEndBoundary, + Object rangeEndValue, + boolean reverseRange, + boolean reverseOrder, + Query.Controller controller) + throws FetchException; } } diff --git a/src/main/java/com/amazon/carbonado/qe/IterableQueryExecutor.java b/src/main/java/com/amazon/carbonado/qe/IterableQueryExecutor.java index 9e04a32..2623bb3 100644 --- a/src/main/java/com/amazon/carbonado/qe/IterableQueryExecutor.java +++ b/src/main/java/com/amazon/carbonado/qe/IterableQueryExecutor.java @@ -23,11 +23,13 @@ import java.io.IOException; import java.util.concurrent.locks.Lock; import com.amazon.carbonado.Cursor; +import com.amazon.carbonado.Query; import com.amazon.carbonado.Storable; import com.amazon.carbonado.filter.Filter; import com.amazon.carbonado.filter.FilterValues; +import com.amazon.carbonado.cursor.ControllerCursor; import com.amazon.carbonado.cursor.IteratorCursor; /** @@ -76,6 +78,10 @@ public class IterableQueryExecutor extends AbstractQueryExec return new IteratorCursor(mIterable, mLock); } + public Cursor fetch(FilterValues values, Query.Controller controller) { + return ControllerCursor.apply(new IteratorCursor(mIterable, mLock), controller); + } + /** * Returns an empty list. */ diff --git a/src/main/java/com/amazon/carbonado/qe/JoinedQueryExecutor.java b/src/main/java/com/amazon/carbonado/qe/JoinedQueryExecutor.java index 9a77f26..4e98df4 100644 --- a/src/main/java/com/amazon/carbonado/qe/JoinedQueryExecutor.java +++ b/src/main/java/com/amazon/carbonado/qe/JoinedQueryExecutor.java @@ -36,6 +36,7 @@ import org.cojen.util.ClassInjector; import com.amazon.carbonado.Cursor; import com.amazon.carbonado.FetchException; import com.amazon.carbonado.RepositoryException; +import com.amazon.carbonado.Query; import com.amazon.carbonado.Storable; import com.amazon.carbonado.cursor.MultiTransformedCursor; @@ -190,6 +191,7 @@ public class JoinedQueryExecutor private static final String INNER_LOOP_EX_FIELD_NAME = "innerLoopExecutor"; private static final String INNER_LOOP_FV_FIELD_NAME = "innerLoopFilterValues"; + private static final String INNER_LOOP_CONTROLLER_FIELD_NAME = "innerLoopController"; private static final String ACTIVE_SOURCE_FIELD_NAME = "active"; private static final SoftValuedCache cJoinerCursorClassCache; @@ -240,11 +242,14 @@ public class JoinedQueryExecutor final TypeDesc cursorType = TypeDesc.forClass(Cursor.class); final TypeDesc queryExecutorType = TypeDesc.forClass(QueryExecutor.class); final TypeDesc filterValuesType = TypeDesc.forClass(FilterValues.class); + final TypeDesc controllerType = TypeDesc.forClass(Query.Controller.class); // Define fields for inner loop executor and filter values, which are // passed into the constructor. cf.addField(Modifiers.PRIVATE.toFinal(true), INNER_LOOP_EX_FIELD_NAME, queryExecutorType); cf.addField(Modifiers.PRIVATE.toFinal(true), INNER_LOOP_FV_FIELD_NAME, filterValuesType); + cf.addField(Modifiers.PRIVATE.toFinal(true), + INNER_LOOP_CONTROLLER_FIELD_NAME, controllerType); // If target storable can set a reference to the joined source // storable, then stash a copy of it as we go. This way, when user of @@ -259,7 +264,7 @@ public class JoinedQueryExecutor // Define constructor. { - TypeDesc[] params = {cursorType, queryExecutorType, filterValuesType}; + TypeDesc[] params = {cursorType, queryExecutorType, filterValuesType, controllerType}; MethodInfo mi = cf.addConstructor(Modifiers.PUBLIC, params); CodeBuilder b = new CodeBuilder(mi); @@ -276,6 +281,10 @@ public class JoinedQueryExecutor b.loadLocal(b.getParameter(2)); b.storeField(INNER_LOOP_FV_FIELD_NAME, filterValuesType); + b.loadThis(); + b.loadLocal(b.getParameter(3)); + b.storeField(INNER_LOOP_CONTROLLER_FIELD_NAME, controllerType); + b.returnVoid(); } @@ -317,9 +326,12 @@ public class JoinedQueryExecutor new TypeDesc[] {bindType}); } + b.loadThis(); + b.loadField(INNER_LOOP_CONTROLLER_FIELD_NAME, controllerType); + // Now fetch and return. b.invokeInterface(queryExecutorType, "fetch", cursorType, - new TypeDesc[] {filterValuesType}); + new TypeDesc[] {filterValuesType, controllerType}); b.returnValue(cursorType); } @@ -570,6 +582,12 @@ public class JoinedQueryExecutor } public Cursor fetch(FilterValues values) throws FetchException { + return fetch(values, null); + } + + public Cursor fetch(FilterValues values, Query.Controller controller) + throws FetchException + { FilterValues innerLoopFilterValues = mInnerLoopFilterValues; if (mTargetFilter != null) { @@ -578,10 +596,14 @@ public class JoinedQueryExecutor .withValues(values.getValuesFor(mTargetFilter)); } - Cursor outerLoopCursor = mOuterLoopExecutor.fetch(transferValues(values)); + if (controller != null) { + controller.begin(); + } + + Cursor outerLoopCursor = mOuterLoopExecutor.fetch(transferValues(values), controller); return mJoinerFactory.newJoinedCursor - (outerLoopCursor, mInnerLoopExecutor, innerLoopFilterValues); + (outerLoopCursor, mInnerLoopExecutor, innerLoopFilterValues, controller); } public Filter getFilter() { @@ -628,7 +650,8 @@ public class JoinedQueryExecutor public static interface Factory { Cursor newJoinedCursor(Cursor outerLoopCursor, QueryExecutor innerLoopExecutor, - FilterValues innerLoopFilterValues); + FilterValues innerLoopFilterValues, + Query.Controller innerLoopController); } } } diff --git a/src/main/java/com/amazon/carbonado/qe/KeyQueryExecutor.java b/src/main/java/com/amazon/carbonado/qe/KeyQueryExecutor.java index 1cc0b59..dacda80 100644 --- a/src/main/java/com/amazon/carbonado/qe/KeyQueryExecutor.java +++ b/src/main/java/com/amazon/carbonado/qe/KeyQueryExecutor.java @@ -22,6 +22,7 @@ import java.io.IOException; import com.amazon.carbonado.Cursor; import com.amazon.carbonado.FetchException; +import com.amazon.carbonado.Query; import com.amazon.carbonado.Storable; import com.amazon.carbonado.filter.Filter; @@ -72,6 +73,12 @@ public class KeyQueryExecutor extends AbstractQueryExecutor< return mSupport.fetchOne(mIndex, values.getValuesFor(mKeyFilter)); } + public Cursor fetch(FilterValues values, Query.Controller controller) + throws FetchException + { + return mSupport.fetchOne(mIndex, values.getValuesFor(mKeyFilter), controller); + } + public Filter getFilter() { return mKeyFilter; } @@ -115,5 +122,18 @@ public class KeyQueryExecutor extends AbstractQueryExecutor< */ Cursor fetchOne(StorableIndex index, Object[] identityValues) throws FetchException; + + /** + * Select at most one Storable referenced by an index. The identity + * values fully specify all elements of the index, and the index is + * unique. + * + * @param controller optional controller which can abort query operation + * @param index index to open, which may be a primary key index + * @param identityValues of exactly matching values to apply to index + */ + Cursor fetchOne(StorableIndex index, Object[] identityValues, + Query.Controller controller) + throws FetchException; } } diff --git a/src/main/java/com/amazon/carbonado/qe/QueryExecutor.java b/src/main/java/com/amazon/carbonado/qe/QueryExecutor.java index ab310b0..9ba259b 100644 --- a/src/main/java/com/amazon/carbonado/qe/QueryExecutor.java +++ b/src/main/java/com/amazon/carbonado/qe/QueryExecutor.java @@ -22,6 +22,7 @@ import java.io.IOException; import com.amazon.carbonado.Cursor; import com.amazon.carbonado.FetchException; +import com.amazon.carbonado.Query; import com.amazon.carbonado.Storable; import com.amazon.carbonado.filter.Filter; @@ -45,6 +46,13 @@ public interface QueryExecutor { */ Cursor fetch(FilterValues values) throws FetchException; + /** + * Returns a new cursor using the given filter values. + * + * @param controller optional controller which can abort query operation + */ + Cursor fetch(FilterValues values, Query.Controller controller) throws FetchException; + /** * Returns a new cursor using the given filter values and slice. * @@ -52,11 +60,27 @@ public interface QueryExecutor { */ Cursor fetchSlice(FilterValues values, long from, Long to) throws FetchException; + /** + * Returns a new cursor using the given filter values and slice. + * + * @param controller optional controller which can abort query operation + * @since 1.2 + */ + Cursor fetchSlice(FilterValues values, long from, Long to, Query.Controller controller) + throws FetchException; + /** * Counts the query results using the given filter values. */ long count(FilterValues values) throws FetchException; + /** + * Counts the query results using the given filter values. + * + * @param controller optional controller which can abort query operation + */ + long count(FilterValues values, Query.Controller controller) throws FetchException; + /** * Returns the filter used by this QueryExecutor. * diff --git a/src/main/java/com/amazon/carbonado/qe/SortedQueryExecutor.java b/src/main/java/com/amazon/carbonado/qe/SortedQueryExecutor.java index 82d0ef6..67f739d 100644 --- a/src/main/java/com/amazon/carbonado/qe/SortedQueryExecutor.java +++ b/src/main/java/com/amazon/carbonado/qe/SortedQueryExecutor.java @@ -24,9 +24,11 @@ import java.util.Comparator; import com.amazon.carbonado.Cursor; import com.amazon.carbonado.FetchException; +import com.amazon.carbonado.Query; import com.amazon.carbonado.Storable; import com.amazon.carbonado.cursor.ArraySortBuffer; +import com.amazon.carbonado.cursor.ControllerCursor; import com.amazon.carbonado.cursor.MergeSortBuffer; import com.amazon.carbonado.cursor.SortBuffer; import com.amazon.carbonado.cursor.SortedCursor; @@ -106,11 +108,28 @@ public class SortedQueryExecutor extends AbstractQueryExecut return new SortedCursor(cursor, buffer, mHandledComparator, mFinisherComparator); } + public Cursor fetch(FilterValues values, Query.Controller controller) + throws FetchException + { + Cursor cursor = mExecutor.fetch(values, controller); + SortBuffer buffer = mSupport.createSortBuffer(controller); + // Apply the controller around the cursor to ensure timeouts are + // honored even when the caller is slowly iterating over the cursor. + return ControllerCursor.apply + (new SortedCursor(cursor, buffer, mHandledComparator, mFinisherComparator), + controller); + } + @Override public long count(FilterValues values) throws FetchException { return mExecutor.count(values); } + @Override + public long count(FilterValues values, Query.Controller controller) throws FetchException { + return mExecutor.count(values, controller); + } + public Filter getFilter() { return mExecutor.getFilter(); } @@ -152,6 +171,13 @@ public class SortedQueryExecutor extends AbstractQueryExecut * Implementation must return an empty buffer for sorting. */ SortBuffer createSortBuffer(); + + /** + * Implementation must return an empty buffer for sorting. + * + * @param controller optional controller which can abort query operation + */ + SortBuffer createSortBuffer(Query.Controller controller); } /** @@ -164,6 +190,14 @@ public class SortedQueryExecutor extends AbstractQueryExecut public SortBuffer createSortBuffer() { return new ArraySortBuffer(); } + + /** + * Returns a new ArraySortBuffer. + */ + public SortBuffer createSortBuffer(Query.Controller controller) { + // Java sort utility doesn't support any kind of controller. + return new ArraySortBuffer(); + } } /** @@ -176,5 +210,12 @@ public class SortedQueryExecutor extends AbstractQueryExecut public SortBuffer createSortBuffer() { return new MergeSortBuffer(); } + + /** + * Returns a new MergeSortBuffer. + */ + public SortBuffer createSortBuffer(Query.Controller controller) { + return new MergeSortBuffer(controller); + } } } diff --git a/src/main/java/com/amazon/carbonado/qe/StandardQuery.java b/src/main/java/com/amazon/carbonado/qe/StandardQuery.java index 0349ae2..bce32ed 100644 --- a/src/main/java/com/amazon/carbonado/qe/StandardQuery.java +++ b/src/main/java/com/amazon/carbonado/qe/StandardQuery.java @@ -287,15 +287,29 @@ public abstract class StandardQuery extends AbstractQuery } } + @Override + public Cursor fetch(Controller controller) throws FetchException { + try { + return executor().fetch(mValues, controller); + } catch (RepositoryException e) { + throw e.toFetchException(); + } + } + @Override public Cursor fetchSlice(long from, Long to) throws FetchException { + return fetchSlice(from, to, null); + } + + @Override + public Cursor fetchSlice(long from, Long to, Controller controller) throws FetchException { if (!checkSliceArguments(from, to)) { - return fetch(); + return fetch(controller); } try { QueryHints hints = QueryHints.emptyHints().with(QueryHint.CONSUME_SLICE); return executorFactory().executor(mFilter, mOrdering, hints) - .fetchSlice(mValues, from, to); + .fetchSlice(mValues, from, to, controller); } catch (RepositoryException e) { throw e.toFetchException(); } @@ -303,9 +317,14 @@ public abstract class StandardQuery extends AbstractQuery @Override public boolean tryDeleteOne() throws PersistException { + return tryDeleteOne(null); + } + + @Override + public boolean tryDeleteOne(Controller controller) throws PersistException { Transaction txn = enterTransaction(IsolationLevel.READ_COMMITTED); try { - Cursor cursor = fetch(); + Cursor cursor = fetch(controller); boolean result; try { if (cursor.hasNext()) { @@ -335,9 +354,14 @@ public abstract class StandardQuery extends AbstractQuery @Override public void deleteAll() throws PersistException { + deleteAll(null); + } + + @Override + public void deleteAll(Controller controller) throws PersistException { Transaction txn = enterTransaction(IsolationLevel.READ_COMMITTED); try { - Cursor cursor = fetch(); + Cursor cursor = fetch(controller); try { while (cursor.hasNext()) { cursor.next().tryDelete(); @@ -366,9 +390,23 @@ public abstract class StandardQuery extends AbstractQuery } } + @Override + public long count(Controller controller) throws FetchException { + try { + return executor().count(mValues, controller); + } catch (RepositoryException e) { + throw e.toFetchException(); + } + } + @Override public boolean exists() throws FetchException { - Cursor cursor = fetchSlice(0L, 1L); + return exists(null); + } + + @Override + public boolean exists(Controller controller) throws FetchException { + Cursor cursor = fetchSlice(0L, 1L, controller); try { return cursor.skipNext(1) > 0; } finally { diff --git a/src/main/java/com/amazon/carbonado/qe/UnionQueryExecutor.java b/src/main/java/com/amazon/carbonado/qe/UnionQueryExecutor.java index 09a3174..48152bb 100644 --- a/src/main/java/com/amazon/carbonado/qe/UnionQueryExecutor.java +++ b/src/main/java/com/amazon/carbonado/qe/UnionQueryExecutor.java @@ -26,6 +26,7 @@ import java.util.List; import com.amazon.carbonado.Cursor; import com.amazon.carbonado.FetchException; +import com.amazon.carbonado.Query; import com.amazon.carbonado.Storable; import com.amazon.carbonado.cursor.SortedCursor; @@ -97,9 +98,15 @@ public class UnionQueryExecutor extends AbstractQueryExecuto } public Cursor fetch(FilterValues values) throws FetchException { + return fetch(values, null); + } + + public Cursor fetch(FilterValues values, Query.Controller controller) + throws FetchException + { Cursor cursor = null; for (QueryExecutor executor : mExecutors) { - Cursor subCursor = executor.fetch(values); + Cursor subCursor = executor.fetch(values, controller); cursor = (cursor == null) ? subCursor : new UnionCursor(cursor, subCursor, mOrderComparator); } -- cgit v1.2.3