diff options
author | Brian S. O'Neill <bronee@gmail.com> | 2011-05-04 00:20:02 +0000 |
---|---|---|
committer | Brian S. O'Neill <bronee@gmail.com> | 2011-05-04 00:20:02 +0000 |
commit | 121886bc0c92389610408e3b415abb992ad8a212 (patch) | |
tree | ccd7bcada5efd29b9106e2150734bee375fe1163 /src/main/java/com/amazon | |
parent | 5be9a7ea0f9aad9e97c4d70cb82ce8a22f2d412a (diff) |
Add support for Query controller and timeouts; remove vestigial support for interrupts.
Diffstat (limited to 'src/main/java/com/amazon')
32 files changed, 1273 insertions, 103 deletions
diff --git a/src/main/java/com/amazon/carbonado/Query.java b/src/main/java/com/amazon/carbonado/Query.java index a660c1a..1780ebd 100644 --- a/src/main/java/com/amazon/carbonado/Query.java +++ b/src/main/java/com/amazon/carbonado/Query.java @@ -18,7 +18,13 @@ package com.amazon.carbonado;
+import java.io.Closeable;
import java.io.IOException;
+import java.io.Serializable;
+
+import java.util.concurrent.TimeUnit;
+
+import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import com.amazon.carbonado.filter.Filter;
import com.amazon.carbonado.filter.FilterValues;
@@ -287,6 +293,20 @@ public interface Query<S extends Storable> { Cursor<S> fetch() throws FetchException;
/**
+ * Fetches results for this query. If any updates or deletes might be
+ * performed on the results, consider enclosing the fetch in a
+ * transaction. This allows the isolation level and "for update" mode to be
+ * adjusted. Some repositories might otherwise deadlock.
+ *
+ * @param controller optional controller which can abort query operation
+ * @return fetch results
+ * @throws IllegalStateException if any blank parameters in this query
+ * @throws FetchException if storage layer throws an exception
+ * @see Repository#enterTransaction(IsolationLevel)
+ */
+ Cursor<S> fetch(Controller controller) throws FetchException;
+
+ /**
* Fetches a slice of results for this query, as defined by a numerical
* range. A slice can be used to limit the number of results from a
* query. It is strongly recommended that the query be given a total {@link
@@ -304,6 +324,24 @@ public interface Query<S extends Storable> { Cursor<S> fetchSlice(long from, Long to) throws FetchException;
/**
+ * Fetches a slice of results for this query, as defined by a numerical
+ * range. A slice can be used to limit the number of results from a
+ * query. It is strongly recommended that the query be given a total {@link
+ * #orderBy ordering} in order for the slice results to be deterministic.
+ *
+ * @param from zero-based {@code from} record number, inclusive
+ * @param to optional zero-based {@code to} record number, exclusive
+ * @param controller optional controller which can abort query operation
+ * @return fetch results
+ * @throws IllegalStateException if any blank parameters in this query
+ * @throws IllegalArgumentException if {@code from} is negative or if
+ * {@code from} is more than {@code to}
+ * @throws FetchException if storage layer throws an exception
+ * @since 1.2
+ */
+ Cursor<S> fetchSlice(long from, Long to, Controller controller) throws FetchException;
+
+ /**
* Fetches results for this query after a given starting point, which is
* useful for re-opening a cursor. This is only effective when query has
* been given an explicit {@link #orderBy ordering}. If not a total
@@ -326,6 +364,29 @@ public interface Query<S extends Storable> { <T extends S> Cursor<S> fetchAfter(T start) throws FetchException;
/**
+ * Fetches results for this query after a given starting point, which is
+ * useful for re-opening a cursor. This is only effective when query has
+ * been given an explicit {@link #orderBy ordering}. If not a total
+ * ordering, then cursor may start at an earlier position.
+ *
+ * <p>Note: This method can be very expensive to call repeatedly, if the
+ * query needs to perform a sort operation. Ideally, the query ordering
+ * should match the natural ordering of an index or key.
+ *
+ * <p>Calling {@code fetchAfter(s)} is equivalent to calling {@code
+ * after(s).fetch()}.
+ *
+ * @param start storable to attempt to start after; if null, fetch all results
+ * @param controller optional controller which can abort query operation
+ * @return fetch results
+ * @throws IllegalStateException if any blank parameters in this query
+ * @throws FetchException if storage layer throws an exception
+ * @see Repository#enterTransaction(IsolationLevel)
+ * @see #after
+ */
+ <T extends S> Cursor<S> fetchAfter(T start, Controller controller) throws FetchException;
+
+ /**
* Attempts to load exactly one matching object. If the number of matching
* records is zero or exceeds one, then an exception is thrown instead.
*
@@ -338,8 +399,21 @@ public interface Query<S extends Storable> { S loadOne() throws FetchException;
/**
- * May return null if nothing found. Throws exception if record count is
- * more than one.
+ * Attempts to load exactly one matching object. If the number of matching
+ * records is zero or exceeds one, then an exception is thrown instead.
+ *
+ * @param controller optional controller which can abort query operation
+ * @return a single fetched object
+ * @throws IllegalStateException if any blank parameters in this query
+ * @throws FetchNoneException if no matching record found
+ * @throws FetchMultipleException if more than one matching record found
+ * @throws FetchException if storage layer throws an exception
+ */
+ S loadOne(Controller controller) throws FetchException;
+
+ /**
+ * Tries to load one record, but returns null if nothing was found. Throws
+ * exception if record count is more than one.
*
* @return null or a single fetched object
* @throws IllegalStateException if any blank parameters in this query
@@ -349,6 +423,18 @@ public interface Query<S extends Storable> { S tryLoadOne() throws FetchException;
/**
+ * Tries to load one record, but returns null if nothing was found. Throws
+ * exception if record count is more than one.
+ *
+ * @param controller optional controller which can abort query operation
+ * @return null or a single fetched object
+ * @throws IllegalStateException if any blank parameters in this query
+ * @throws FetchMultipleException if more than one matching record found
+ * @throws FetchException if storage layer throws an exception
+ */
+ S tryLoadOne(Controller controller) throws FetchException;
+
+ /**
* Deletes one matching object. If the number of matching records is zero or
* exceeds one, then no delete occurs, and an exception is thrown instead.
*
@@ -360,6 +446,18 @@ public interface Query<S extends Storable> { void deleteOne() throws PersistException;
/**
+ * Deletes one matching object. If the number of matching records is zero or
+ * exceeds one, then no delete occurs, and an exception is thrown instead.
+ *
+ * @param controller optional controller which can abort query operation
+ * @throws IllegalStateException if any blank parameters in this query
+ * @throws PersistNoneException if no matching record found
+ * @throws PersistMultipleException if more than one record matches
+ * @throws PersistException if storage layer throws an exception
+ */
+ void deleteOne(Controller controller) throws PersistException;
+
+ /**
* Deletes zero or one matching objects. If the number of matching records
* exceeds one, then no delete occurs, and an exception is thrown instead.
*
@@ -371,6 +469,18 @@ public interface Query<S extends Storable> { boolean tryDeleteOne() throws PersistException;
/**
+ * Deletes zero or one matching objects. If the number of matching records
+ * exceeds one, then no delete occurs, and an exception is thrown instead.
+ *
+ * @param controller optional controller which can abort query operation
+ * @return true if record existed and was deleted, or false if no match
+ * @throws IllegalStateException if any blank parameters in this query
+ * @throws PersistMultipleException if more than one record matches
+ * @throws PersistException if storage layer throws an exception
+ */
+ boolean tryDeleteOne(Controller controller) throws PersistException;
+
+ /**
* Deletes zero or more matching objects. There is no guarantee that
* deleteAll is an atomic operation. If atomic behavior is desired, wrap
* the call in a transaction scope.
@@ -381,6 +491,17 @@ public interface Query<S extends Storable> { void deleteAll() throws PersistException;
/**
+ * Deletes zero or more matching objects. There is no guarantee that
+ * deleteAll is an atomic operation. If atomic behavior is desired, wrap
+ * the call in a transaction scope.
+ *
+ * @param controller optional controller which can abort query operation
+ * @throws IllegalStateException if any blank parameters in this query
+ * @throws PersistException if storage layer throws an exception
+ */
+ void deleteAll(Controller controller) throws PersistException;
+
+ /**
* Returns a count of all results matched by this query. Even though no
* results are explicitly fetched, this method may still be expensive to
* call. The actual performance will vary by repository and available indexes.
@@ -392,6 +513,18 @@ public interface Query<S extends Storable> { long count() throws FetchException;
/**
+ * Returns a count of all results matched by this query. Even though no
+ * results are explicitly fetched, this method may still be expensive to
+ * call. The actual performance will vary by repository and available indexes.
+ *
+ * @param controller optional controller which can abort query operation
+ * @return count of matches
+ * @throws IllegalStateException if any blank parameters in this query
+ * @throws FetchException if storage layer throws an exception
+ */
+ long count(Controller controller) throws FetchException;
+
+ /**
* Returns true if any results are matched by this query.
*
* @return true if any matches
@@ -402,6 +535,17 @@ public interface Query<S extends Storable> { boolean exists() throws FetchException;
/**
+ * Returns true if any results are matched by this query.
+ *
+ * @param controller optional controller which can abort query operation
+ * @return true if any matches
+ * @throws IllegalStateException if any blank parameters in this query
+ * @throws FetchException if storage layer throws an exception
+ * @since 1.2
+ */
+ boolean exists(Controller controller) throws FetchException;
+
+ /**
* Print the native query to standard out, which is useful for performance
* analysis. Not all repositories have a native query format. An example
* native format is SQL.
@@ -469,4 +613,166 @@ public interface Query<S extends Storable> { * Returns a description of the query filter and any other arguments.
*/
String toString();
+
+ /**
+ * Controller instance can be used to abort query operations.
+ *
+ * <p>Example:<pre>
+ * Storage<UserInfo> users = ...
+ * long count = users.query("name = ?").count(Query.Timeout.seconds(10));
+ * </pre>
+ */
+ public static interface Controller extends Serializable, Closeable {
+ /**
+ * Returns a non-negative value if controller imposes an absolute upper
+ * bound on query execution time.
+ */
+ public long getTimeout();
+
+ /**
+ * Returns the unit for the timeout, if applicable.
+ */
+ public TimeUnit getTimeoutUnit();
+
+ /**
+ * Called by query when it begins, possibly multiple times. Implementation
+ * is required to be idempotent and ignore multiple invocations.
+ */
+ public void begin();
+
+ /**
+ * Periodically called by query to determine if it should continue.
+ */
+ public void continueCheck() throws FetchException;
+
+ /**
+ * Always called by query when finished, even when it fails. Implementation
+ * is required to be idempotent and ignore multiple invocations.
+ */
+ public void close();
+ }
+
+ /**
+ * Timeout controller, for aborting long running queries. One instance is
+ * good for one timeout. The instance can be shared by multiple queries, if
+ * they are part of a single logical operation.
+ *
+ * <p>The timeout applies to the entire duration of fetching results, not
+ * just the time spent between individual fetches. A caller which is slowly
+ * processing results can timeout. More sophisticated timeouts can be
+ * implemented using custom Controller implementations.
+ */
+ public static final class Timeout implements Controller {
+ private static final long serialVersionUID = 1;
+
+ private static final AtomicLongFieldUpdater<Timeout> endUpdater =
+ AtomicLongFieldUpdater.newUpdater(Timeout.class, "mEndNanos");
+
+ /**
+ * Return a new Timeout in nanoseconds.
+ */
+ public static Timeout nanos(long timeout) {
+ return new Timeout(timeout, TimeUnit.NANOSECONDS);
+ }
+
+ /**
+ * Return a new Timeout in microseconds.
+ */
+ public static Timeout micros(long timeout) {
+ return new Timeout(timeout, TimeUnit.MICROSECONDS);
+ }
+
+ /**
+ * Return a new Timeout in milliseconds.
+ */
+ public static Timeout millis(long timeout) {
+ return new Timeout(timeout, TimeUnit.MILLISECONDS);
+ }
+
+ /**
+ * Return a new Timeout in seconds.
+ */
+ public static Timeout seconds(long timeout) {
+ return new Timeout(timeout, TimeUnit.SECONDS);
+ }
+
+ /**
+ * Return a new Timeout in minutes.
+ */
+
+ public static Timeout minutes(long timeout) {
+ return new Timeout(timeout, TimeUnit.MINUTES);
+ }
+
+ /**
+ * Return a new Timeout in hours.
+ */
+ public static Timeout hours(long timeout) {
+ return new Timeout(timeout, TimeUnit.HOURS);
+ }
+
+ private final long mTimeout;
+ private final TimeUnit mUnit;
+
+ private volatile transient long mEndNanos;
+
+ public Timeout(long timeout, TimeUnit unit) {
+ if (timeout < 0) {
+ throw new IllegalArgumentException("Timeout cannot be negative: " + timeout);
+ }
+ if (unit == null && timeout != 0) {
+ throw new IllegalArgumentException
+ ("TimeUnit cannot be null if timeout is non-zero: " + timeout);
+ }
+ mTimeout = timeout;
+ mUnit = unit;
+ }
+
+ public long getTimeout() {
+ return mTimeout;
+ }
+
+ public TimeUnit getTimeoutUnit() {
+ return mUnit;
+ }
+
+ @Override
+ public void begin() {
+ long end = System.nanoTime() + mUnit.toNanos(mTimeout);
+ if (end == 0) {
+ // Handle rare case to ensure atomic compare and set always
+ // works the first time, supporting idempotent calls to this
+ // method.
+ end = 1;
+ }
+ endUpdater.compareAndSet(this, 0, end);
+ }
+
+ @Override
+ public void continueCheck() throws FetchTimeoutException {
+ long end = mEndNanos;
+
+ if (end == 0) {
+ // Begin was not called, in violation of how the Controller
+ // must be used. Be lenient and begin now.
+ begin();
+ end = mEndNanos;
+ }
+
+ // Subtract to support modulo comparison.
+ if ((System.nanoTime() - end) >= 0) {
+ throw new FetchTimeoutException("Timed out: " + mTimeout + ' ' + mUnit);
+ }
+ }
+
+ @Override
+ public void close() {
+ // Nothing to do.
+ }
+
+ @Override
+ public String toString() {
+ return "Query.Timeout {timeout=" + mTimeout + ", unit=" + mUnit + '}';
+ }
+ }
}
diff --git a/src/main/java/com/amazon/carbonado/cursor/ControllerCursor.java b/src/main/java/com/amazon/carbonado/cursor/ControllerCursor.java new file mode 100644 index 0000000..da682d2 --- /dev/null +++ b/src/main/java/com/amazon/carbonado/cursor/ControllerCursor.java @@ -0,0 +1,99 @@ +/*
+ * Copyright 2011 Amazon Technologies, Inc. or its affiliates.
+ * Amazon, Amazon.com and Carbonado are trademarks or registered trademarks
+ * of Amazon Technologies, Inc. or its affiliates. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.amazon.carbonado.cursor;
+
+import com.amazon.carbonado.Cursor;
+import com.amazon.carbonado.FetchException;
+import com.amazon.carbonado.Query;
+
+/**
+ * Wraps another cursor and periodically calls a {@link com.amazon.carbonado.Query.Controller controller}.
+ *
+ * @author Brian S O'Neill
+ */
+public class ControllerCursor<S> extends AbstractCursor<S> {
+ /**
+ * Returns a ControllerCursor depending on whether a controller instance is
+ * passed in or not.
+ *
+ * @param controller optional controller which can abort query operation
+ * @throws IllegalArgumentException if source is null
+ */
+ public static <S> Cursor<S> apply(Cursor<S> source, Query.Controller controller) {
+ return controller == null ? source : new ControllerCursor<S>(source, controller);
+ }
+
+ private final Cursor<S> mSource;
+ private final Query.Controller mController;
+
+ private byte mCount;
+
+ /**
+ * @param controller required controller which can abort query operation
+ * @throws IllegalArgumentException if either argument is null
+ */
+ private ControllerCursor(Cursor<S> source, Query.Controller controller) {
+ if (source == null) {
+ throw new IllegalArgumentException("Source is null");
+ }
+ if (controller == null) {
+ throw new IllegalArgumentException("Controller is null");
+ }
+ mSource = source;
+ mController = controller;
+ controller.begin();
+ }
+
+ public boolean hasNext() throws FetchException {
+ if (mSource.hasNext()) {
+ continueCheck();
+ return true;
+ }
+ return false;
+ }
+
+ public S next() throws FetchException {
+ S next = mSource.next();
+ continueCheck();
+ return next;
+ }
+
+ public void close() throws FetchException {
+ try {
+ mSource.close();
+ } finally {
+ mController.close();
+ }
+ }
+
+ private void continueCheck() throws FetchException {
+ if (++mCount == 0) {
+ try {
+ mController.continueCheck();
+ } catch (FetchException e) {
+ try {
+ close();
+ } catch (FetchException e2) {
+ // Ignore.
+ }
+ throw e;
+ }
+ }
+ }
+}
diff --git a/src/main/java/com/amazon/carbonado/cursor/FilteredCursor.java b/src/main/java/com/amazon/carbonado/cursor/FilteredCursor.java index ad21667..4cf8c21 100644 --- a/src/main/java/com/amazon/carbonado/cursor/FilteredCursor.java +++ b/src/main/java/com/amazon/carbonado/cursor/FilteredCursor.java @@ -22,7 +22,6 @@ import java.util.NoSuchElementException; import com.amazon.carbonado.Cursor;
import com.amazon.carbonado.FetchException;
-import com.amazon.carbonado.FetchInterruptedException;
import com.amazon.carbonado.Storable;
import com.amazon.carbonado.filter.Filter;
@@ -114,14 +113,12 @@ public abstract class FilteredCursor<S> extends AbstractCursor<S> { return true;
}
try {
- int count = 0;
while (mCursor.hasNext()) {
S next = mCursor.next();
if (isAllowed(next)) {
mNext = next;
return true;
}
- interruptCheck(++count);
}
} catch (NoSuchElementException e) {
} catch (FetchException e) {
@@ -165,10 +162,9 @@ public abstract class FilteredCursor<S> extends AbstractCursor<S> { try {
int count = 0;
while (--amount >= 0 && hasNext()) {
- interruptCheck(++count);
+ ++count;
mNext = null;
}
-
return count;
} catch (FetchException e) {
try {
@@ -179,11 +175,4 @@ public abstract class FilteredCursor<S> extends AbstractCursor<S> { throw e;
}
}
-
- private void interruptCheck(int count) throws FetchException {
- if ((count & ~0xff) == 0 && Thread.interrupted()) {
- close();
- throw new FetchInterruptedException();
- }
- }
}
diff --git a/src/main/java/com/amazon/carbonado/cursor/GroupedCursor.java b/src/main/java/com/amazon/carbonado/cursor/GroupedCursor.java index 49ca84e..3053cd5 100644 --- a/src/main/java/com/amazon/carbonado/cursor/GroupedCursor.java +++ b/src/main/java/com/amazon/carbonado/cursor/GroupedCursor.java @@ -23,7 +23,6 @@ import java.util.NoSuchElementException; import com.amazon.carbonado.Cursor;
import com.amazon.carbonado.FetchException;
-import com.amazon.carbonado.FetchInterruptedException;
/**
* Abstract cursor for aggregation and finding distinct data. The source cursor
@@ -124,7 +123,6 @@ public abstract class GroupedCursor<S, G> extends AbstractCursor<G> { }
try {
- int count = 0;
if (mCursor.hasNext()) {
if (mGroupLeader == null) {
beginGroup(mGroupLeader = mCursor.next());
@@ -143,8 +141,6 @@ public abstract class GroupedCursor<S, G> extends AbstractCursor<G> { return true;
}
}
-
- interruptCheck(++count);
}
G aggregate = finishGroup();
@@ -206,7 +202,7 @@ public abstract class GroupedCursor<S, G> extends AbstractCursor<G> { try {
int count = 0;
while (--amount >= 0 && hasNext()) {
- interruptCheck(++count);
+ ++count;
mNextAggregate = null;
}
@@ -220,11 +216,4 @@ public abstract class GroupedCursor<S, G> extends AbstractCursor<G> { throw e;
}
}
-
- private void interruptCheck(int count) throws FetchException {
- if ((count & ~0xff) == 0 && Thread.interrupted()) {
- close();
- throw new FetchInterruptedException();
- }
- }
}
diff --git a/src/main/java/com/amazon/carbonado/cursor/MergeSortBuffer.java b/src/main/java/com/amazon/carbonado/cursor/MergeSortBuffer.java index aae5243..0231012 100644 --- a/src/main/java/com/amazon/carbonado/cursor/MergeSortBuffer.java +++ b/src/main/java/com/amazon/carbonado/cursor/MergeSortBuffer.java @@ -38,7 +38,9 @@ import java.util.List; import java.util.NoSuchElementException;
import java.util.PriorityQueue;
+import com.amazon.carbonado.FetchException;
import com.amazon.carbonado.FetchInterruptedException;
+import com.amazon.carbonado.Query;
import com.amazon.carbonado.Storable;
import com.amazon.carbonado.Storage;
import com.amazon.carbonado.SupportException;
@@ -122,6 +124,7 @@ public class MergeSortBuffer<S extends Storable> extends AbstractCollection<S> private final String mTempDir;
private final int mMaxArrayCapacity;
+ private final Query.Controller mController;
private Preparer<S> mPreparer;
@@ -144,6 +147,15 @@ public class MergeSortBuffer<S extends Storable> extends AbstractCollection<S> }
/**
+ * @since 1.2
+ *
+ * @param controller optional controller which can abort query operation
+ */
+ public MergeSortBuffer(Query.Controller controller) {
+ this(null, TEMP_DIR, MAX_ARRAY_CAPACITY, controller);
+ }
+
+ /**
* @param storage storage for elements; if null use first Storable to
* prepare reloaded Storables
*/
@@ -154,6 +166,15 @@ public class MergeSortBuffer<S extends Storable> extends AbstractCollection<S> /**
* @param storage storage for elements; if null use first Storable to
* prepare reloaded Storables
+ * @param controller optional controller which can abort query operation
+ */
+ public MergeSortBuffer(Storage<S> storage, Query.Controller controller) {
+ this(storage, TEMP_DIR, MAX_ARRAY_CAPACITY, controller);
+ }
+
+ /**
+ * @param storage storage for elements; if null use first Storable to
+ * prepare reloaded Storables
* @param tempDir directory to store temp files for merging, or null for default
*/
public MergeSortBuffer(Storage<S> storage, String tempDir) {
@@ -170,6 +191,22 @@ public class MergeSortBuffer<S extends Storable> extends AbstractCollection<S> */
@SuppressWarnings("unchecked")
public MergeSortBuffer(Storage<S> storage, String tempDir, int maxArrayCapacity) {
+ this(storage, tempDir, maxArrayCapacity, null);
+ }
+
+ /**
+ * @param storage storage for elements; if null use first Storable to
+ * prepare reloaded Storables
+ * @param tempDir directory to store temp files for merging, or null for default
+ * @param maxArrayCapacity maximum amount of storables to keep in an array
+ * before serializing to a file
+ * @param controller optional controller which can abort query operation
+ * @throws IllegalArgumentException if storage is null
+ */
+ @SuppressWarnings("unchecked")
+ public MergeSortBuffer(Storage<S> storage, String tempDir, int maxArrayCapacity,
+ Query.Controller controller)
+ {
mTempDir = tempDir;
mMaxArrayCapacity = maxArrayCapacity;
@@ -179,6 +216,10 @@ public class MergeSortBuffer<S extends Storable> extends AbstractCollection<S> int cap = Math.min(MIN_ARRAY_CAPACITY, maxArrayCapacity);
mElements = (S[]) new Storable[cap];
+
+ if ((mController = controller) != null) {
+ controller.begin();
+ }
}
public void prepare(Comparator<S> comparator) {
@@ -231,10 +272,10 @@ public class MergeSortBuffer<S extends Storable> extends AbstractCollection<S> if (mFilesInUse.size() < (MAX_OPEN_FILE_COUNT - 1)) {
mFilesInUse.add(raf);
- int count = 0;
+ byte count = 0;
for (S element : mElements) {
- // Check every so often if interrupted.
- interruptCheck(++count);
+ // Check every so often if should continue.
+ continueCheck(++count);
element.writeTo(out);
}
} else {
@@ -274,11 +315,11 @@ public class MergeSortBuffer<S extends Storable> extends AbstractCollection<S> // as well as error out earlier, should the disk be full.
raf.setLength(mergedLength);
- int count = 0;
+ byte count = 0;
Iterator<S> it = iterator(filesToMerge);
while (it.hasNext()) {
- // Check every so often if interrupted.
- interruptCheck(++count);
+ // Check every so often if should continue.
+ continueCheck(++count);
S element = it.next();
element.writeTo(out);
}
@@ -368,9 +409,16 @@ public class MergeSortBuffer<S extends Storable> extends AbstractCollection<S> }
public void close() {
- clear();
- if (mWorkFilePool != null) {
- mWorkFilePool.unregisterWorkFileUser(this);
+ try {
+ clear();
+ if (mWorkFilePool != null) {
+ mWorkFilePool.unregisterWorkFileUser(this);
+ }
+ } finally {
+ Query.Controller controller = mController;
+ if (controller != null) {
+ controller.close();
+ }
}
}
@@ -386,10 +434,20 @@ public class MergeSortBuffer<S extends Storable> extends AbstractCollection<S> return comparator;
}
- private void interruptCheck(int count) {
- if ((count & ~0xff) == 0 && (mStop || Thread.interrupted())) {
- close();
- throw new UndeclaredThrowableException(new FetchInterruptedException());
+ private void continueCheck(byte count) {
+ if (count == 0) {
+ try {
+ Query.Controller controller = mController;
+ if (controller != null) {
+ controller.continueCheck();
+ }
+ if (mStop) {
+ throw new FetchInterruptedException("Shutting down");
+ }
+ } catch (FetchException e) {
+ close();
+ throw new UndeclaredThrowableException(e);
+ }
}
}
diff --git a/src/main/java/com/amazon/carbonado/cursor/MultiTransformedCursor.java b/src/main/java/com/amazon/carbonado/cursor/MultiTransformedCursor.java index 8a1bd48..814a3ba 100644 --- a/src/main/java/com/amazon/carbonado/cursor/MultiTransformedCursor.java +++ b/src/main/java/com/amazon/carbonado/cursor/MultiTransformedCursor.java @@ -22,7 +22,6 @@ import java.util.NoSuchElementException; import com.amazon.carbonado.Cursor;
import com.amazon.carbonado.FetchException;
-import com.amazon.carbonado.FetchInterruptedException;
/**
* Abstract cursor which wraps another cursor and transforms each storable
@@ -72,7 +71,6 @@ public abstract class MultiTransformedCursor<S, T> extends AbstractCursor<T> { mNextCursor.close();
mNextCursor = null;
}
- int count = 0;
while (mCursor.hasNext()) {
Cursor<T> nextCursor = transform(mCursor.next());
if (nextCursor != null) {
@@ -82,7 +80,6 @@ public abstract class MultiTransformedCursor<S, T> extends AbstractCursor<T> { }
nextCursor.close();
}
- interruptCheck(++count);
}
} catch (NoSuchElementException e) {
} catch (FetchException e) {
@@ -129,7 +126,6 @@ public abstract class MultiTransformedCursor<S, T> extends AbstractCursor<T> { if ((amount -= chunk) <= 0) {
break;
}
- interruptCheck(count);
}
return count;
@@ -142,11 +138,4 @@ public abstract class MultiTransformedCursor<S, T> extends AbstractCursor<T> { throw e;
}
}
-
- private void interruptCheck(int count) throws FetchException {
- if ((count & ~0xff) == 0 && Thread.interrupted()) {
- close();
- throw new FetchInterruptedException();
- }
- }
}
diff --git a/src/main/java/com/amazon/carbonado/cursor/SortedCursor.java b/src/main/java/com/amazon/carbonado/cursor/SortedCursor.java index 5b5f9b5..e054137 100644 --- a/src/main/java/com/amazon/carbonado/cursor/SortedCursor.java +++ b/src/main/java/com/amazon/carbonado/cursor/SortedCursor.java @@ -32,7 +32,6 @@ import org.cojen.util.BeanProperty; import org.cojen.classfile.TypeDesc;
import com.amazon.carbonado.FetchException;
-import com.amazon.carbonado.FetchInterruptedException;
import com.amazon.carbonado.Cursor;
import com.amazon.carbonado.Storable;
@@ -388,12 +387,7 @@ public class SortedCursor<S> extends AbstractCursor<S> { fill: {
if (matcher == null) {
// Buffer up entire results and sort.
- int count = 0;
while (cursor.hasNext()) {
- // Check every so often if interrupted.
- if ((++count & ~0xff) == 0 && Thread.interrupted()) {
- throw new FetchInterruptedException();
- }
buffer.add(cursor.next());
}
break fill;
@@ -413,13 +407,8 @@ public class SortedCursor<S> extends AbstractCursor<S> { }
buffer.add(chunkStart);
- int count = 1;
while (cursor.hasNext()) {
- // Check every so often if interrupted.
- if ((++count & ~0xff) == 0 && Thread.interrupted()) {
- throw new FetchInterruptedException();
- }
S next = cursor.next();
if (matcher.compare(chunkStart, next) != 0) {
// Save for reading next chunk later.
diff --git a/src/main/java/com/amazon/carbonado/cursor/TransformedCursor.java b/src/main/java/com/amazon/carbonado/cursor/TransformedCursor.java index c05e0fc..bd41cef 100644 --- a/src/main/java/com/amazon/carbonado/cursor/TransformedCursor.java +++ b/src/main/java/com/amazon/carbonado/cursor/TransformedCursor.java @@ -22,7 +22,6 @@ import java.util.NoSuchElementException; import com.amazon.carbonado.Cursor;
import com.amazon.carbonado.FetchException;
-import com.amazon.carbonado.FetchInterruptedException;
/**
* Abstract cursor which wraps another cursor and transforms each storable
@@ -64,14 +63,12 @@ public abstract class TransformedCursor<S, T> extends AbstractCursor<T> { return true;
}
try {
- int count = 0;
while (mCursor.hasNext()) {
T next = transform(mCursor.next());
if (next != null) {
mNext = next;
return true;
}
- interruptCheck(++count);
}
} catch (NoSuchElementException e) {
} catch (FetchException e) {
@@ -115,7 +112,7 @@ public abstract class TransformedCursor<S, T> extends AbstractCursor<T> { try {
int count = 0;
while (--amount >= 0 && hasNext()) {
- interruptCheck(++count);
+ ++count;
mNext = null;
}
@@ -129,11 +126,4 @@ public abstract class TransformedCursor<S, T> extends AbstractCursor<T> { throw e;
}
}
-
- private void interruptCheck(int count) throws FetchException {
- if ((count & ~0xff) == 0 && Thread.interrupted()) {
- close();
- throw new FetchInterruptedException();
- }
- }
}
diff --git a/src/main/java/com/amazon/carbonado/qe/AbstractQuery.java b/src/main/java/com/amazon/carbonado/qe/AbstractQuery.java index 7e857c3..44735ae 100644 --- a/src/main/java/com/amazon/carbonado/qe/AbstractQuery.java +++ b/src/main/java/com/amazon/carbonado/qe/AbstractQuery.java @@ -61,6 +61,13 @@ public abstract class AbstractQuery<S extends Storable> implements Query<S>, App }
@Override
+ public <T extends S> Cursor<S> fetchAfter(T start, Controller controller)
+ throws FetchException
+ {
+ return after(start).fetch(controller);
+ }
+
+ @Override
public S loadOne() throws FetchException {
S obj = tryLoadOne();
if (obj == null) {
@@ -70,6 +77,15 @@ public abstract class AbstractQuery<S extends Storable> implements Query<S>, App }
@Override
+ public S loadOne(Controller controller) throws FetchException {
+ S obj = tryLoadOne(controller);
+ if (obj == null) {
+ throw new FetchNoneException(toString());
+ }
+ return obj;
+ }
+
+ @Override
public S tryLoadOne() throws FetchException {
Cursor<S> cursor = fetch();
try {
@@ -88,6 +104,24 @@ public abstract class AbstractQuery<S extends Storable> implements Query<S>, App }
@Override
+ public S tryLoadOne(Controller controller) throws FetchException {
+ Cursor<S> cursor = fetch(controller);
+ try {
+ if (cursor.hasNext()) {
+ S obj = cursor.next();
+ if (cursor.hasNext()) {
+ throw new FetchMultipleException(toString());
+ }
+ return obj;
+ } else {
+ return null;
+ }
+ } finally {
+ cursor.close();
+ }
+ }
+
+ @Override
public void deleteOne() throws PersistException {
if (!tryDeleteOne()) {
throw new PersistNoneException(toString());
@@ -95,6 +129,13 @@ public abstract class AbstractQuery<S extends Storable> implements Query<S>, App }
@Override
+ public void deleteOne(Controller controller) throws PersistException {
+ if (!tryDeleteOne(controller)) {
+ throw new PersistNoneException(toString());
+ }
+ }
+
+ @Override
public boolean printNative() {
try {
return printNative(System.out);
diff --git a/src/main/java/com/amazon/carbonado/qe/AbstractQueryExecutor.java b/src/main/java/com/amazon/carbonado/qe/AbstractQueryExecutor.java index e35ec7a..e1921d2 100644 --- a/src/main/java/com/amazon/carbonado/qe/AbstractQueryExecutor.java +++ b/src/main/java/com/amazon/carbonado/qe/AbstractQueryExecutor.java @@ -22,6 +22,7 @@ import java.io.IOException; import com.amazon.carbonado.Cursor;
import com.amazon.carbonado.FetchException;
+import com.amazon.carbonado.Query;
import com.amazon.carbonado.Storable;
import com.amazon.carbonado.cursor.LimitCursor;
@@ -57,6 +58,26 @@ public abstract class AbstractQueryExecutor<S extends Storable> implements Query }
/**
+ * Produces a slice via skip and limit cursors. Subclasses are encouraged
+ * to override with a more efficient implementation.
+ *
+ * @since 1.2
+ */
+ public Cursor<S> fetchSlice(FilterValues<S> values, long from, Long to,
+ Query.Controller controller)
+ throws FetchException
+ {
+ Cursor<S> cursor = fetch(values, controller);
+ if (from > 0) {
+ cursor = new SkipCursor<S>(cursor, from);
+ }
+ if (to != null) {
+ cursor = new LimitCursor<S>(cursor, to - from);
+ }
+ return cursor;
+ }
+
+ /**
* Counts results by opening a cursor and skipping entries. Subclasses are
* encouraged to override with a more efficient implementation.
*/
@@ -77,6 +98,26 @@ public abstract class AbstractQueryExecutor<S extends Storable> implements Query }
/**
+ * Counts results by opening a cursor and skipping entries. Subclasses are
+ * encouraged to override with a more efficient implementation.
+ */
+ public long count(FilterValues<S> values, Query.Controller controller) throws FetchException {
+ Cursor<S> cursor = fetch(values, controller);
+ try {
+ long count = cursor.skipNext(Integer.MAX_VALUE);
+ if (count == Integer.MAX_VALUE) {
+ int amt;
+ while ((amt = cursor.skipNext(Integer.MAX_VALUE)) > 0) {
+ count += amt;
+ }
+ }
+ return count;
+ } finally {
+ cursor.close();
+ }
+ }
+
+ /**
* Does nothing and returns false.
*/
public boolean printNative(Appendable app, int indentLevel, FilterValues<S> values)
diff --git a/src/main/java/com/amazon/carbonado/qe/DelegatedQueryExecutor.java b/src/main/java/com/amazon/carbonado/qe/DelegatedQueryExecutor.java index f9be52a..fa8d9e5 100644 --- a/src/main/java/com/amazon/carbonado/qe/DelegatedQueryExecutor.java +++ b/src/main/java/com/amazon/carbonado/qe/DelegatedQueryExecutor.java @@ -95,14 +95,31 @@ public class DelegatedQueryExecutor<S extends Storable> implements QueryExecutor return applyFilterValues(values).fetch();
}
+ public Cursor<S> fetch(FilterValues<S> values, Query.Controller controller)
+ throws FetchException
+ {
+ return applyFilterValues(values).fetch(controller);
+ }
+
public Cursor<S> fetchSlice(FilterValues<S> values, long from, Long to) throws FetchException {
return applyFilterValues(values).fetchSlice(from, to);
}
+ public Cursor<S> fetchSlice(FilterValues<S> values, long from, Long to,
+ Query.Controller controller)
+ throws FetchException
+ {
+ return applyFilterValues(values).fetchSlice(from, to, controller);
+ }
+
public long count(FilterValues<S> values) throws FetchException {
return applyFilterValues(values).count();
}
+ public long count(FilterValues<S> values, Query.Controller controller) throws FetchException {
+ return applyFilterValues(values).count(controller);
+ }
+
public Filter<S> getFilter() {
return mFilter;
}
diff --git a/src/main/java/com/amazon/carbonado/qe/EmptyQuery.java b/src/main/java/com/amazon/carbonado/qe/EmptyQuery.java index 62418a1..fc7ea0c 100644 --- a/src/main/java/com/amazon/carbonado/qe/EmptyQuery.java +++ b/src/main/java/com/amazon/carbonado/qe/EmptyQuery.java @@ -241,12 +241,28 @@ public final class EmptyQuery<S extends Storable> extends AbstractQuery<S> { * Always returns an {@link EmptyCursor}.
*/
@Override
+ public Cursor<S> fetch(Controller controller) {
+ return EmptyCursor.the();
+ }
+
+ /**
+ * Always returns an {@link EmptyCursor}.
+ */
+ @Override
public Cursor<S> fetchSlice(long from, Long to) {
checkSliceArguments(from, to);
return EmptyCursor.the();
}
/**
+ * Always returns an {@link EmptyCursor}.
+ */
+ @Override
+ public Cursor<S> fetchSlice(long from, Long to, Controller controller) {
+ return fetchSlice(from, to);
+ }
+
+ /**
* Always throws {@link PersistNoneException}.
*/
@Override
@@ -255,6 +271,14 @@ public final class EmptyQuery<S extends Storable> extends AbstractQuery<S> { }
/**
+ * Always throws {@link PersistNoneException}.
+ */
+ @Override
+ public void deleteOne(Controller controller) throws PersistNoneException {
+ throw new PersistNoneException();
+ }
+
+ /**
* Always returns false.
*/
@Override
@@ -263,6 +287,14 @@ public final class EmptyQuery<S extends Storable> extends AbstractQuery<S> { }
/**
+ * Always returns false.
+ */
+ @Override
+ public boolean tryDeleteOne(Controller controller) {
+ return false;
+ }
+
+ /**
* Does nothing.
*/
@Override
@@ -270,6 +302,13 @@ public final class EmptyQuery<S extends Storable> extends AbstractQuery<S> { }
/**
+ * Does nothing.
+ */
+ @Override
+ public void deleteAll(Controller controller) {
+ }
+
+ /**
* Always returns zero.
*/
@Override
@@ -278,6 +317,14 @@ public final class EmptyQuery<S extends Storable> extends AbstractQuery<S> { }
/**
+ * Always returns zero.
+ */
+ @Override
+ public long count(Controller controller) {
+ return 0;
+ }
+
+ /**
* Always returns false.
*/
@Override
@@ -285,6 +332,14 @@ public final class EmptyQuery<S extends Storable> extends AbstractQuery<S> { return false;
}
+ /**
+ * Always returns false.
+ */
+ @Override
+ public boolean exists(Controller controller) {
+ return false;
+ }
+
@Override
public void appendTo(Appendable app) throws IOException {
app.append("Query {type=");
diff --git a/src/main/java/com/amazon/carbonado/qe/FilteredQueryExecutor.java b/src/main/java/com/amazon/carbonado/qe/FilteredQueryExecutor.java index 770c1bc..17f3105 100644 --- a/src/main/java/com/amazon/carbonado/qe/FilteredQueryExecutor.java +++ b/src/main/java/com/amazon/carbonado/qe/FilteredQueryExecutor.java @@ -22,6 +22,7 @@ import java.io.IOException; import com.amazon.carbonado.Cursor;
import com.amazon.carbonado.FetchException;
+import com.amazon.carbonado.Query;
import com.amazon.carbonado.Storable;
import com.amazon.carbonado.cursor.FilteredCursor;
@@ -64,6 +65,12 @@ public class FilteredQueryExecutor<S extends Storable> extends AbstractQueryExec return FilteredCursor.applyFilter(mFilter, values, mExecutor.fetch(values));
}
+ public Cursor<S> fetch(FilterValues<S> values, Query.Controller controller)
+ throws FetchException
+ {
+ return FilteredCursor.applyFilter(mFilter, values, mExecutor.fetch(values, controller));
+ }
+
/**
* Returns the combined filter of the wrapped executor and the extra filter.
*/
diff --git a/src/main/java/com/amazon/carbonado/qe/FullScanQueryExecutor.java b/src/main/java/com/amazon/carbonado/qe/FullScanQueryExecutor.java index 9c7fd05..9a41fda 100644 --- a/src/main/java/com/amazon/carbonado/qe/FullScanQueryExecutor.java +++ b/src/main/java/com/amazon/carbonado/qe/FullScanQueryExecutor.java @@ -22,6 +22,7 @@ import java.io.IOException; import com.amazon.carbonado.Cursor;
import com.amazon.carbonado.FetchException;
+import com.amazon.carbonado.Query;
import com.amazon.carbonado.Storable;
import com.amazon.carbonado.filter.Filter;
@@ -58,6 +59,15 @@ public class FullScanQueryExecutor<S extends Storable> extends AbstractQueryExec return count;
}
+ @Override
+ public long count(FilterValues<S> values, Query.Controller controller) throws FetchException {
+ long count = mSupport.countAll(controller);
+ if (count == -1) {
+ count = super.count(values, controller);
+ }
+ return count;
+ }
+
/**
* Returns an open filter.
*/
@@ -69,6 +79,12 @@ public class FullScanQueryExecutor<S extends Storable> extends AbstractQueryExec return mSupport.fetchAll();
}
+ public Cursor<S> fetch(FilterValues<S> values, Query.Controller controller)
+ throws FetchException
+ {
+ return mSupport.fetchAll(controller);
+ }
+
/**
* Returns an empty list.
*/
@@ -99,8 +115,23 @@ public class FullScanQueryExecutor<S extends Storable> extends AbstractQueryExec long countAll() throws FetchException;
/**
+ * Counts all Storables. Implementation may return -1 to indicate that
+ * default count algorithm should be used.
+ *
+ * @param controller optional controller which can abort query operation
+ */
+ long countAll(Query.Controller controller) throws FetchException;
+
+ /**
* Perform a full scan of all Storables.
*/
Cursor<S> fetchAll() throws FetchException;
+
+ /**
+ * Perform a full scan of all Storables.
+ *
+ * @param controller optional controller which can abort query operation
+ */
+ Cursor<S> fetchAll(Query.Controller controller) throws FetchException;
}
}
diff --git a/src/main/java/com/amazon/carbonado/qe/IndexedQueryExecutor.java b/src/main/java/com/amazon/carbonado/qe/IndexedQueryExecutor.java index c3853d2..b8a39b2 100644 --- a/src/main/java/com/amazon/carbonado/qe/IndexedQueryExecutor.java +++ b/src/main/java/com/amazon/carbonado/qe/IndexedQueryExecutor.java @@ -120,6 +120,12 @@ public class IndexedQueryExecutor<S extends Storable> extends AbstractQueryExecu }
public Cursor<S> fetch(FilterValues<S> values) throws FetchException {
+ return fetch(values, null);
+ }
+
+ public Cursor<S> fetch(FilterValues<S> values, Query.Controller controller)
+ throws FetchException
+ {
Object[] identityValues = null;
Object rangeStartValue = null;
Object rangeEndValue = null;
@@ -182,7 +188,8 @@ public class IndexedQueryExecutor<S extends Storable> extends AbstractQueryExecu rangeStartBoundary, rangeStartValue,
rangeEndBoundary, rangeEndValue,
mReverseRange,
- mReverseOrder);
+ mReverseOrder,
+ controller);
} else {
indexEntryQuery = indexEntryQuery.withValues(identityValues);
if (rangeStartBoundary != BoundaryType.OPEN) {
@@ -194,7 +201,7 @@ public class IndexedQueryExecutor<S extends Storable> extends AbstractQueryExecu if (mCoveringFilter != null && values != null) {
indexEntryQuery = indexEntryQuery.withValues(values.getValuesFor(mCoveringFilter));
}
- return mSupport.fetchFromIndexEntryQuery(mIndex, indexEntryQuery);
+ return mSupport.fetchFromIndexEntryQuery(mIndex, indexEntryQuery, controller);
}
}
@@ -415,6 +422,20 @@ public class IndexedQueryExecutor<S extends Storable> extends AbstractQueryExecu throws FetchException;
/**
+ * Fetch Storables referenced by the given index entry query. This
+ * method is only called if index supports query access.
+ *
+ * @param index index to open
+ * @param indexEntryQuery query with no blank parameters, derived from
+ * the query returned by indexEntryQuery
+ * @param controller optional controller which can abort query operation
+ * @since 1.2
+ */
+ Cursor<S> fetchFromIndexEntryQuery(StorableIndex<S> index, Query<?> indexEntryQuery,
+ Query.Controller controller)
+ throws FetchException;
+
+ /**
* Perform an index scan of a subset of Storables referenced by an
* index. The identity values are aligned with the index properties at
* property 0. An optional range start or range end aligns with the index
@@ -445,5 +466,39 @@ public class IndexedQueryExecutor<S extends Storable> extends AbstractQueryExecu boolean reverseRange,
boolean reverseOrder)
throws FetchException;
+
+ /**
+ * Perform an index scan of a subset of Storables referenced by an
+ * index. The identity values are aligned with the index properties at
+ * property 0. An optional range start or range end aligns with the index
+ * property following the last of the identity values.
+ *
+ * <p>This method is only called if no index entry query was provided
+ * for the given index.
+ *
+ * @param index index to open, which may be a primary key index
+ * @param identityValues optional list of exactly matching values to apply to index
+ * @param rangeStartBoundary start boundary type
+ * @param rangeStartValue value to start at if boundary is not open
+ * @param rangeEndBoundary end boundary type
+ * @param rangeEndValue value to end at if boundary is not open
+ * @param reverseRange indicates that range operates on a property whose
+ * natural order is descending. Only the code that opens the physical
+ * cursor should examine this parameter. If true, then the range start and
+ * end parameter pairs need to be swapped.
+ * @param reverseOrder when true, iteration should be reversed from its
+ * natural order
+ * @param controller optional controller which can abort query operation
+ */
+ Cursor<S> fetchSubset(StorableIndex<S> index,
+ Object[] identityValues,
+ BoundaryType rangeStartBoundary,
+ Object rangeStartValue,
+ BoundaryType rangeEndBoundary,
+ Object rangeEndValue,
+ boolean reverseRange,
+ boolean reverseOrder,
+ Query.Controller controller)
+ throws FetchException;
}
}
diff --git a/src/main/java/com/amazon/carbonado/qe/IterableQueryExecutor.java b/src/main/java/com/amazon/carbonado/qe/IterableQueryExecutor.java index 9e04a32..2623bb3 100644 --- a/src/main/java/com/amazon/carbonado/qe/IterableQueryExecutor.java +++ b/src/main/java/com/amazon/carbonado/qe/IterableQueryExecutor.java @@ -23,11 +23,13 @@ import java.io.IOException; import java.util.concurrent.locks.Lock;
import com.amazon.carbonado.Cursor;
+import com.amazon.carbonado.Query;
import com.amazon.carbonado.Storable;
import com.amazon.carbonado.filter.Filter;
import com.amazon.carbonado.filter.FilterValues;
+import com.amazon.carbonado.cursor.ControllerCursor;
import com.amazon.carbonado.cursor.IteratorCursor;
/**
@@ -76,6 +78,10 @@ public class IterableQueryExecutor<S extends Storable> extends AbstractQueryExec return new IteratorCursor<S>(mIterable, mLock);
}
+ public Cursor<S> fetch(FilterValues<S> values, Query.Controller controller) {
+ return ControllerCursor.apply(new IteratorCursor<S>(mIterable, mLock), controller);
+ }
+
/**
* Returns an empty list.
*/
diff --git a/src/main/java/com/amazon/carbonado/qe/JoinedQueryExecutor.java b/src/main/java/com/amazon/carbonado/qe/JoinedQueryExecutor.java index 9a77f26..4e98df4 100644 --- a/src/main/java/com/amazon/carbonado/qe/JoinedQueryExecutor.java +++ b/src/main/java/com/amazon/carbonado/qe/JoinedQueryExecutor.java @@ -36,6 +36,7 @@ import org.cojen.util.ClassInjector; import com.amazon.carbonado.Cursor;
import com.amazon.carbonado.FetchException;
import com.amazon.carbonado.RepositoryException;
+import com.amazon.carbonado.Query;
import com.amazon.carbonado.Storable;
import com.amazon.carbonado.cursor.MultiTransformedCursor;
@@ -190,6 +191,7 @@ public class JoinedQueryExecutor<S extends Storable, T extends Storable> private static final String INNER_LOOP_EX_FIELD_NAME = "innerLoopExecutor";
private static final String INNER_LOOP_FV_FIELD_NAME = "innerLoopFilterValues";
+ private static final String INNER_LOOP_CONTROLLER_FIELD_NAME = "innerLoopController";
private static final String ACTIVE_SOURCE_FIELD_NAME = "active";
private static final SoftValuedCache<StorableProperty, Class> cJoinerCursorClassCache;
@@ -240,11 +242,14 @@ public class JoinedQueryExecutor<S extends Storable, T extends Storable> final TypeDesc cursorType = TypeDesc.forClass(Cursor.class);
final TypeDesc queryExecutorType = TypeDesc.forClass(QueryExecutor.class);
final TypeDesc filterValuesType = TypeDesc.forClass(FilterValues.class);
+ final TypeDesc controllerType = TypeDesc.forClass(Query.Controller.class);
// Define fields for inner loop executor and filter values, which are
// passed into the constructor.
cf.addField(Modifiers.PRIVATE.toFinal(true), INNER_LOOP_EX_FIELD_NAME, queryExecutorType);
cf.addField(Modifiers.PRIVATE.toFinal(true), INNER_LOOP_FV_FIELD_NAME, filterValuesType);
+ cf.addField(Modifiers.PRIVATE.toFinal(true),
+ INNER_LOOP_CONTROLLER_FIELD_NAME, controllerType);
// If target storable can set a reference to the joined source
// storable, then stash a copy of it as we go. This way, when user of
@@ -259,7 +264,7 @@ public class JoinedQueryExecutor<S extends Storable, T extends Storable> // Define constructor.
{
- TypeDesc[] params = {cursorType, queryExecutorType, filterValuesType};
+ TypeDesc[] params = {cursorType, queryExecutorType, filterValuesType, controllerType};
MethodInfo mi = cf.addConstructor(Modifiers.PUBLIC, params);
CodeBuilder b = new CodeBuilder(mi);
@@ -276,6 +281,10 @@ public class JoinedQueryExecutor<S extends Storable, T extends Storable> b.loadLocal(b.getParameter(2));
b.storeField(INNER_LOOP_FV_FIELD_NAME, filterValuesType);
+ b.loadThis();
+ b.loadLocal(b.getParameter(3));
+ b.storeField(INNER_LOOP_CONTROLLER_FIELD_NAME, controllerType);
+
b.returnVoid();
}
@@ -317,9 +326,12 @@ public class JoinedQueryExecutor<S extends Storable, T extends Storable> new TypeDesc[] {bindType});
}
+ b.loadThis();
+ b.loadField(INNER_LOOP_CONTROLLER_FIELD_NAME, controllerType);
+
// Now fetch and return.
b.invokeInterface(queryExecutorType, "fetch", cursorType,
- new TypeDesc[] {filterValuesType});
+ new TypeDesc[] {filterValuesType, controllerType});
b.returnValue(cursorType);
}
@@ -570,6 +582,12 @@ public class JoinedQueryExecutor<S extends Storable, T extends Storable> }
public Cursor<T> fetch(FilterValues<T> values) throws FetchException {
+ return fetch(values, null);
+ }
+
+ public Cursor<T> fetch(FilterValues<T> values, Query.Controller controller)
+ throws FetchException
+ {
FilterValues<T> innerLoopFilterValues = mInnerLoopFilterValues;
if (mTargetFilter != null) {
@@ -578,10 +596,14 @@ public class JoinedQueryExecutor<S extends Storable, T extends Storable> .withValues(values.getValuesFor(mTargetFilter));
}
- Cursor<S> outerLoopCursor = mOuterLoopExecutor.fetch(transferValues(values));
+ if (controller != null) {
+ controller.begin();
+ }
+
+ Cursor<S> outerLoopCursor = mOuterLoopExecutor.fetch(transferValues(values), controller);
return mJoinerFactory.newJoinedCursor
- (outerLoopCursor, mInnerLoopExecutor, innerLoopFilterValues);
+ (outerLoopCursor, mInnerLoopExecutor, innerLoopFilterValues, controller);
}
public Filter<T> getFilter() {
@@ -628,7 +650,8 @@ public class JoinedQueryExecutor<S extends Storable, T extends Storable> public static interface Factory<S, T extends Storable> {
Cursor<T> newJoinedCursor(Cursor<S> outerLoopCursor,
QueryExecutor<T> innerLoopExecutor,
- FilterValues<T> innerLoopFilterValues);
+ FilterValues<T> innerLoopFilterValues,
+ Query.Controller innerLoopController);
}
}
}
diff --git a/src/main/java/com/amazon/carbonado/qe/KeyQueryExecutor.java b/src/main/java/com/amazon/carbonado/qe/KeyQueryExecutor.java index 1cc0b59..dacda80 100644 --- a/src/main/java/com/amazon/carbonado/qe/KeyQueryExecutor.java +++ b/src/main/java/com/amazon/carbonado/qe/KeyQueryExecutor.java @@ -22,6 +22,7 @@ import java.io.IOException; import com.amazon.carbonado.Cursor;
import com.amazon.carbonado.FetchException;
+import com.amazon.carbonado.Query;
import com.amazon.carbonado.Storable;
import com.amazon.carbonado.filter.Filter;
@@ -72,6 +73,12 @@ public class KeyQueryExecutor<S extends Storable> extends AbstractQueryExecutor< return mSupport.fetchOne(mIndex, values.getValuesFor(mKeyFilter));
}
+ public Cursor<S> fetch(FilterValues<S> values, Query.Controller controller)
+ throws FetchException
+ {
+ return mSupport.fetchOne(mIndex, values.getValuesFor(mKeyFilter), controller);
+ }
+
public Filter<S> getFilter() {
return mKeyFilter;
}
@@ -115,5 +122,18 @@ public class KeyQueryExecutor<S extends Storable> extends AbstractQueryExecutor< */
Cursor<S> fetchOne(StorableIndex<S> index, Object[] identityValues)
throws FetchException;
+
+ /**
+ * Select at most one Storable referenced by an index. The identity
+ * values fully specify all elements of the index, and the index is
+ * unique.
+ *
+ * @param controller optional controller which can abort query operation
+ * @param index index to open, which may be a primary key index
+ * @param identityValues of exactly matching values to apply to index
+ */
+ Cursor<S> fetchOne(StorableIndex<S> index, Object[] identityValues,
+ Query.Controller controller)
+ throws FetchException;
}
}
diff --git a/src/main/java/com/amazon/carbonado/qe/QueryExecutor.java b/src/main/java/com/amazon/carbonado/qe/QueryExecutor.java index ab310b0..9ba259b 100644 --- a/src/main/java/com/amazon/carbonado/qe/QueryExecutor.java +++ b/src/main/java/com/amazon/carbonado/qe/QueryExecutor.java @@ -22,6 +22,7 @@ import java.io.IOException; import com.amazon.carbonado.Cursor;
import com.amazon.carbonado.FetchException;
+import com.amazon.carbonado.Query;
import com.amazon.carbonado.Storable;
import com.amazon.carbonado.filter.Filter;
@@ -46,6 +47,13 @@ public interface QueryExecutor<S extends Storable> { Cursor<S> fetch(FilterValues<S> values) throws FetchException;
/**
+ * Returns a new cursor using the given filter values.
+ *
+ * @param controller optional controller which can abort query operation
+ */
+ Cursor<S> fetch(FilterValues<S> values, Query.Controller controller) throws FetchException;
+
+ /**
* Returns a new cursor using the given filter values and slice.
*
* @since 1.2
@@ -53,11 +61,27 @@ public interface QueryExecutor<S extends Storable> { Cursor<S> fetchSlice(FilterValues<S> values, long from, Long to) throws FetchException;
/**
+ * Returns a new cursor using the given filter values and slice.
+ *
+ * @param controller optional controller which can abort query operation
+ * @since 1.2
+ */
+ Cursor<S> fetchSlice(FilterValues<S> values, long from, Long to, Query.Controller controller)
+ throws FetchException;
+
+ /**
* Counts the query results using the given filter values.
*/
long count(FilterValues<S> values) throws FetchException;
/**
+ * Counts the query results using the given filter values.
+ *
+ * @param controller optional controller which can abort query operation
+ */
+ long count(FilterValues<S> values, Query.Controller controller) throws FetchException;
+
+ /**
* Returns the filter used by this QueryExecutor.
*
* @return query filter, never null
diff --git a/src/main/java/com/amazon/carbonado/qe/SortedQueryExecutor.java b/src/main/java/com/amazon/carbonado/qe/SortedQueryExecutor.java index 82d0ef6..67f739d 100644 --- a/src/main/java/com/amazon/carbonado/qe/SortedQueryExecutor.java +++ b/src/main/java/com/amazon/carbonado/qe/SortedQueryExecutor.java @@ -24,9 +24,11 @@ import java.util.Comparator; import com.amazon.carbonado.Cursor;
import com.amazon.carbonado.FetchException;
+import com.amazon.carbonado.Query;
import com.amazon.carbonado.Storable;
import com.amazon.carbonado.cursor.ArraySortBuffer;
+import com.amazon.carbonado.cursor.ControllerCursor;
import com.amazon.carbonado.cursor.MergeSortBuffer;
import com.amazon.carbonado.cursor.SortBuffer;
import com.amazon.carbonado.cursor.SortedCursor;
@@ -106,11 +108,28 @@ public class SortedQueryExecutor<S extends Storable> extends AbstractQueryExecut return new SortedCursor<S>(cursor, buffer, mHandledComparator, mFinisherComparator);
}
+ public Cursor<S> fetch(FilterValues<S> values, Query.Controller controller)
+ throws FetchException
+ {
+ Cursor<S> cursor = mExecutor.fetch(values, controller);
+ SortBuffer<S> buffer = mSupport.createSortBuffer(controller);
+ // Apply the controller around the cursor to ensure timeouts are
+ // honored even when the caller is slowly iterating over the cursor.
+ return ControllerCursor.apply
+ (new SortedCursor<S>(cursor, buffer, mHandledComparator, mFinisherComparator),
+ controller);
+ }
+
@Override
public long count(FilterValues<S> values) throws FetchException {
return mExecutor.count(values);
}
+ @Override
+ public long count(FilterValues<S> values, Query.Controller controller) throws FetchException {
+ return mExecutor.count(values, controller);
+ }
+
public Filter<S> getFilter() {
return mExecutor.getFilter();
}
@@ -152,6 +171,13 @@ public class SortedQueryExecutor<S extends Storable> extends AbstractQueryExecut * Implementation must return an empty buffer for sorting.
*/
SortBuffer<S> createSortBuffer();
+
+ /**
+ * Implementation must return an empty buffer for sorting.
+ *
+ * @param controller optional controller which can abort query operation
+ */
+ SortBuffer<S> createSortBuffer(Query.Controller controller);
}
/**
@@ -164,6 +190,14 @@ public class SortedQueryExecutor<S extends Storable> extends AbstractQueryExecut public SortBuffer<S> createSortBuffer() {
return new ArraySortBuffer<S>();
}
+
+ /**
+ * Returns a new ArraySortBuffer.
+ */
+ public SortBuffer<S> createSortBuffer(Query.Controller controller) {
+ // Java sort utility doesn't support any kind of controller.
+ return new ArraySortBuffer<S>();
+ }
}
/**
@@ -176,5 +210,12 @@ public class SortedQueryExecutor<S extends Storable> extends AbstractQueryExecut public SortBuffer<S> createSortBuffer() {
return new MergeSortBuffer<S>();
}
+
+ /**
+ * Returns a new MergeSortBuffer.
+ */
+ public SortBuffer<S> createSortBuffer(Query.Controller controller) {
+ return new MergeSortBuffer<S>(controller);
+ }
}
}
diff --git a/src/main/java/com/amazon/carbonado/qe/StandardQuery.java b/src/main/java/com/amazon/carbonado/qe/StandardQuery.java index 0349ae2..bce32ed 100644 --- a/src/main/java/com/amazon/carbonado/qe/StandardQuery.java +++ b/src/main/java/com/amazon/carbonado/qe/StandardQuery.java @@ -288,14 +288,28 @@ public abstract class StandardQuery<S extends Storable> extends AbstractQuery<S> }
@Override
+ public Cursor<S> fetch(Controller controller) throws FetchException {
+ try {
+ return executor().fetch(mValues, controller);
+ } catch (RepositoryException e) {
+ throw e.toFetchException();
+ }
+ }
+
+ @Override
public Cursor<S> fetchSlice(long from, Long to) throws FetchException {
+ return fetchSlice(from, to, null);
+ }
+
+ @Override
+ public Cursor<S> fetchSlice(long from, Long to, Controller controller) throws FetchException {
if (!checkSliceArguments(from, to)) {
- return fetch();
+ return fetch(controller);
}
try {
QueryHints hints = QueryHints.emptyHints().with(QueryHint.CONSUME_SLICE);
return executorFactory().executor(mFilter, mOrdering, hints)
- .fetchSlice(mValues, from, to);
+ .fetchSlice(mValues, from, to, controller);
} catch (RepositoryException e) {
throw e.toFetchException();
}
@@ -303,9 +317,14 @@ public abstract class StandardQuery<S extends Storable> extends AbstractQuery<S> @Override
public boolean tryDeleteOne() throws PersistException {
+ return tryDeleteOne(null);
+ }
+
+ @Override
+ public boolean tryDeleteOne(Controller controller) throws PersistException {
Transaction txn = enterTransaction(IsolationLevel.READ_COMMITTED);
try {
- Cursor<S> cursor = fetch();
+ Cursor<S> cursor = fetch(controller);
boolean result;
try {
if (cursor.hasNext()) {
@@ -335,9 +354,14 @@ public abstract class StandardQuery<S extends Storable> extends AbstractQuery<S> @Override
public void deleteAll() throws PersistException {
+ deleteAll(null);
+ }
+
+ @Override
+ public void deleteAll(Controller controller) throws PersistException {
Transaction txn = enterTransaction(IsolationLevel.READ_COMMITTED);
try {
- Cursor<S> cursor = fetch();
+ Cursor<S> cursor = fetch(controller);
try {
while (cursor.hasNext()) {
cursor.next().tryDelete();
@@ -367,8 +391,22 @@ public abstract class StandardQuery<S extends Storable> extends AbstractQuery<S> }
@Override
+ public long count(Controller controller) throws FetchException {
+ try {
+ return executor().count(mValues, controller);
+ } catch (RepositoryException e) {
+ throw e.toFetchException();
+ }
+ }
+
+ @Override
public boolean exists() throws FetchException {
- Cursor<S> cursor = fetchSlice(0L, 1L);
+ return exists(null);
+ }
+
+ @Override
+ public boolean exists(Controller controller) throws FetchException {
+ Cursor<S> cursor = fetchSlice(0L, 1L, controller);
try {
return cursor.skipNext(1) > 0;
} finally {
diff --git a/src/main/java/com/amazon/carbonado/qe/UnionQueryExecutor.java b/src/main/java/com/amazon/carbonado/qe/UnionQueryExecutor.java index 09a3174..48152bb 100644 --- a/src/main/java/com/amazon/carbonado/qe/UnionQueryExecutor.java +++ b/src/main/java/com/amazon/carbonado/qe/UnionQueryExecutor.java @@ -26,6 +26,7 @@ import java.util.List; import com.amazon.carbonado.Cursor;
import com.amazon.carbonado.FetchException;
+import com.amazon.carbonado.Query;
import com.amazon.carbonado.Storable;
import com.amazon.carbonado.cursor.SortedCursor;
@@ -97,9 +98,15 @@ public class UnionQueryExecutor<S extends Storable> extends AbstractQueryExecuto }
public Cursor<S> fetch(FilterValues<S> values) throws FetchException {
+ return fetch(values, null);
+ }
+
+ public Cursor<S> fetch(FilterValues<S> values, Query.Controller controller)
+ throws FetchException
+ {
Cursor<S> cursor = null;
for (QueryExecutor<S> executor : mExecutors) {
- Cursor<S> subCursor = executor.fetch(values);
+ Cursor<S> subCursor = executor.fetch(values, controller);
cursor = (cursor == null) ? subCursor
: new UnionCursor<S>(cursor, subCursor, mOrderComparator);
}
diff --git a/src/main/java/com/amazon/carbonado/raw/GenericStorableCodecFactory.java b/src/main/java/com/amazon/carbonado/raw/GenericStorableCodecFactory.java index 6c9363f..2b15294 100644 --- a/src/main/java/com/amazon/carbonado/raw/GenericStorableCodecFactory.java +++ b/src/main/java/com/amazon/carbonado/raw/GenericStorableCodecFactory.java @@ -89,8 +89,9 @@ public class GenericStorableCodecFactory implements StorableCodecFactory { RawSupport support)
throws SupportException
{
+ LayoutOptions options = layout == null ? getLayoutOptions(type) : layout.getOptions();
return GenericStorableCodec.getInstance
- (this, createStrategy(type, pkIndex, null), isMaster, layout, support);
+ (this, createStrategy(type, pkIndex, options), isMaster, layout, support);
}
/**
diff --git a/src/main/java/com/amazon/carbonado/repo/indexed/IndexedStorage.java b/src/main/java/com/amazon/carbonado/repo/indexed/IndexedStorage.java index 4635f2c..d115a3b 100644 --- a/src/main/java/com/amazon/carbonado/repo/indexed/IndexedStorage.java +++ b/src/main/java/com/amazon/carbonado/repo/indexed/IndexedStorage.java @@ -191,14 +191,26 @@ class IndexedStorage<S extends Storable> implements Storage<S>, StorageAccess<S> return new MergeSortBuffer<S>();
}
+ public SortBuffer<S> createSortBuffer(Query.Controller controller) {
+ return new MergeSortBuffer<S>(controller);
+ }
+
public long countAll() throws FetchException {
return mMasterStorage.query().count();
}
+ public long countAll(Query.Controller controller) throws FetchException {
+ return mMasterStorage.query().count(controller);
+ }
+
public Cursor<S> fetchAll() throws FetchException {
return mMasterStorage.query().fetch();
}
+ public Cursor<S> fetchAll(Query.Controller controller) throws FetchException {
+ return mMasterStorage.query().fetch(controller);
+ }
+
public Cursor<S> fetchOne(StorableIndex<S> index,
Object[] identityValues)
throws FetchException
@@ -207,6 +219,15 @@ class IndexedStorage<S extends Storable> implements Storage<S>, StorageAccess<S> return indexInfo.fetchOne(this, identityValues);
}
+ public Cursor<S> fetchOne(StorableIndex<S> index,
+ Object[] identityValues,
+ Query.Controller controller)
+ throws FetchException
+ {
+ ManagedIndex<S> indexInfo = (ManagedIndex<S>) mAllIndexInfoMap.get(index);
+ return indexInfo.fetchOne(this, identityValues, controller);
+ }
+
public Query<?> indexEntryQuery(StorableIndex<S> index)
throws FetchException
{
@@ -221,6 +242,14 @@ class IndexedStorage<S extends Storable> implements Storage<S>, StorageAccess<S> return indexInfo.fetchFromIndexEntryQuery(this, indexEntryQuery);
}
+ public Cursor<S> fetchFromIndexEntryQuery(StorableIndex<S> index, Query<?> indexEntryQuery,
+ Query.Controller controller)
+ throws FetchException
+ {
+ ManagedIndex<S> indexInfo = (ManagedIndex<S>) mAllIndexInfoMap.get(index);
+ return indexInfo.fetchFromIndexEntryQuery(this, indexEntryQuery, controller);
+ }
+
public Cursor<S> fetchSubset(StorableIndex<S> index,
Object[] identityValues,
BoundaryType rangeStartBoundary,
@@ -235,6 +264,21 @@ class IndexedStorage<S extends Storable> implements Storage<S>, StorageAccess<S> throw new UnsupportedOperationException();
}
+ public Cursor<S> fetchSubset(StorableIndex<S> index,
+ Object[] identityValues,
+ BoundaryType rangeStartBoundary,
+ Object rangeStartValue,
+ BoundaryType rangeEndBoundary,
+ Object rangeEndValue,
+ boolean reverseRange,
+ boolean reverseOrder,
+ Query.Controller controller)
+ throws FetchException
+ {
+ // This method should never be called since a query was returned by indexEntryQuery.
+ throw new UnsupportedOperationException();
+ }
+
private void registerIndex(ManagedIndex<S> managedIndex)
throws RepositoryException
{
diff --git a/src/main/java/com/amazon/carbonado/repo/indexed/ManagedIndex.java b/src/main/java/com/amazon/carbonado/repo/indexed/ManagedIndex.java index 2bb2f49..6c28619 100644 --- a/src/main/java/com/amazon/carbonado/repo/indexed/ManagedIndex.java +++ b/src/main/java/com/amazon/carbonado/repo/indexed/ManagedIndex.java @@ -173,6 +173,13 @@ class ManagedIndex<S extends Storable> implements IndexEntryAccessor<S> { Cursor<S> fetchOne(IndexedStorage storage, Object[] identityValues)
throws FetchException
{
+ return fetchOne(storage, identityValues, null);
+ }
+
+ Cursor<S> fetchOne(IndexedStorage storage, Object[] identityValues,
+ Query.Controller controller)
+ throws FetchException
+ {
Query<?> query = mSingleMatchQuery;
if (query == null) {
@@ -184,13 +191,20 @@ class ManagedIndex<S extends Storable> implements IndexEntryAccessor<S> { mSingleMatchQuery = query = mIndexEntryStorage.query(filter);
}
- return fetchFromIndexEntryQuery(storage, query.withValues(identityValues));
+ return fetchFromIndexEntryQuery(storage, query.withValues(identityValues), controller);
}
Cursor<S> fetchFromIndexEntryQuery(IndexedStorage storage, Query<?> indexEntryQuery)
throws FetchException
{
- return new IndexedCursor<S>(indexEntryQuery.fetch(), storage, mAccessor);
+ return fetchFromIndexEntryQuery(storage, indexEntryQuery, null);
+ }
+
+ Cursor<S> fetchFromIndexEntryQuery(IndexedStorage storage, Query<?> indexEntryQuery,
+ Query.Controller controller)
+ throws FetchException
+ {
+ return new IndexedCursor<S>(indexEntryQuery.fetch(controller), storage, mAccessor);
}
@Override
diff --git a/src/main/java/com/amazon/carbonado/repo/jdbc/H2ExceptionTransformer.java b/src/main/java/com/amazon/carbonado/repo/jdbc/H2ExceptionTransformer.java index e760f58..d10856a 100644 --- a/src/main/java/com/amazon/carbonado/repo/jdbc/H2ExceptionTransformer.java +++ b/src/main/java/com/amazon/carbonado/repo/jdbc/H2ExceptionTransformer.java @@ -28,9 +28,15 @@ import java.sql.SQLException; */
class H2ExceptionTransformer extends JDBCExceptionTransformer {
public static int DUPLICATE_KEY = 23001;
+ public static int PROCESSING_CANCELED = 90051;
@Override
public boolean isUniqueConstraintError(SQLException e) {
- return DUPLICATE_KEY == e.getErrorCode();
+ return super.isUniqueConstraintError(e) || DUPLICATE_KEY == e.getErrorCode();
+ }
+
+ @Override
+ public boolean isTimeoutError(SQLException e) {
+ return super.isTimeoutError(e) || PROCESSING_CANCELED == e.getErrorCode();
}
}
diff --git a/src/main/java/com/amazon/carbonado/repo/jdbc/JDBCExceptionTransformer.java b/src/main/java/com/amazon/carbonado/repo/jdbc/JDBCExceptionTransformer.java index f72c717..4780140 100644 --- a/src/main/java/com/amazon/carbonado/repo/jdbc/JDBCExceptionTransformer.java +++ b/src/main/java/com/amazon/carbonado/repo/jdbc/JDBCExceptionTransformer.java @@ -23,9 +23,11 @@ import java.sql.SQLException; import com.amazon.carbonado.ConstraintException;
import com.amazon.carbonado.FetchDeadlockException;
import com.amazon.carbonado.FetchException;
+import com.amazon.carbonado.FetchTimeoutException;
import com.amazon.carbonado.PersistDeadlockException;
import com.amazon.carbonado.PersistDeniedException;
import com.amazon.carbonado.PersistException;
+import com.amazon.carbonado.PersistTimeoutException;
import com.amazon.carbonado.UniqueConstraintException;
import com.amazon.carbonado.spi.ExceptionTransformer;
@@ -59,6 +61,8 @@ class JDBCExceptionTransformer extends ExceptionTransformer { */
public static String SQLSTATE_DEADLOCK_WITH_ROLLBACK = "40001";
+ public static String SQLSTATE_PROCESSING_CANCELED = "57014";
+
/**
* Examines the SQLSTATE code of the given SQL exception and determines if
* it is a generic constaint violation.
@@ -103,6 +107,16 @@ class JDBCExceptionTransformer extends ExceptionTransformer { return false;
}
+ public boolean isTimeoutError(SQLException e) {
+ if (e != null) {
+ String sqlstate = e.getSQLState();
+ if (sqlstate != null) {
+ return SQLSTATE_PROCESSING_CANCELED.equals(sqlstate);
+ }
+ }
+ return false;
+ }
+
JDBCExceptionTransformer() {
}
@@ -117,6 +131,9 @@ class JDBCExceptionTransformer extends ExceptionTransformer { if (isDeadlockError(se)) {
return new FetchDeadlockException(e);
}
+ if (isTimeoutError(se)) {
+ return new FetchTimeoutException(e);
+ }
}
return null;
}
@@ -141,6 +158,9 @@ class JDBCExceptionTransformer extends ExceptionTransformer { if (isDeadlockError(se)) {
return new PersistDeadlockException(e);
}
+ if (isTimeoutError(se)) {
+ return new PersistTimeoutException(e);
+ }
}
return null;
}
diff --git a/src/main/java/com/amazon/carbonado/repo/jdbc/JDBCStorage.java b/src/main/java/com/amazon/carbonado/repo/jdbc/JDBCStorage.java index a49a150..bbe5589 100644 --- a/src/main/java/com/amazon/carbonado/repo/jdbc/JDBCStorage.java +++ b/src/main/java/com/amazon/carbonado/repo/jdbc/JDBCStorage.java @@ -27,6 +27,7 @@ import java.sql.ResultSet; import java.sql.SQLException;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.LogFactory;
@@ -34,6 +35,7 @@ import com.amazon.carbonado.Cursor; import com.amazon.carbonado.FetchException;
import com.amazon.carbonado.IsolationLevel;
import com.amazon.carbonado.PersistException;
+import com.amazon.carbonado.Query;
import com.amazon.carbonado.Repository;
import com.amazon.carbonado.RepositoryException;
import com.amazon.carbonado.Storable;
@@ -42,6 +44,7 @@ import com.amazon.carbonado.SupportException; import com.amazon.carbonado.Transaction;
import com.amazon.carbonado.Trigger;
import com.amazon.carbonado.capability.IndexInfo;
+import com.amazon.carbonado.cursor.ControllerCursor;
import com.amazon.carbonado.cursor.EmptyCursor;
import com.amazon.carbonado.cursor.LimitCursor;
import com.amazon.carbonado.filter.AndFilter;
@@ -329,6 +332,28 @@ class JDBCStorage<S extends Storable> extends StandardQueryFactory<S> return new JDBCQuery(filter, values, ordering, hints);
}
+ static PreparedStatement prepareStatement(Connection con, String sql,
+ Query.Controller controller)
+ throws SQLException
+ {
+ PreparedStatement ps = con.prepareStatement(sql);
+
+ if (controller != null) {
+ long timeout = controller.getTimeout();
+ if (timeout >= 0) {
+ TimeUnit unit = controller.getTimeoutUnit();
+ if (unit != null) {
+ long seconds = unit.toSeconds(timeout);
+ int intSeconds = seconds <= 0 ? 1 :
+ (seconds <= Integer.MAX_VALUE ? ((int) seconds) : 0);
+ ps.setQueryTimeout(intSeconds);
+ }
+ }
+ }
+
+ return ps;
+ }
+
public S instantiate(ResultSet rs) throws SQLException {
return (S) mInstanceFactory.instantiate(this, rs, FIRST_RESULT_INDEX);
}
@@ -668,12 +693,21 @@ class JDBCStorage<S extends Storable> extends StandardQueryFactory<S> }
}
+ @Override
public Cursor<S> fetch(FilterValues<S> values) throws FetchException {
+ return fetch(values, null);
+ }
+
+ @Override
+ public Cursor<S> fetch(FilterValues<S> values, Query.Controller controller)
+ throws FetchException
+ {
TransactionScope<JDBCTransaction> scope = mRepository.localTransactionScope();
boolean forUpdate = scope.isForUpdate();
Connection con = getConnection();
try {
- PreparedStatement ps = con.prepareStatement(prepareSelect(values, forUpdate));
+ PreparedStatement ps =
+ prepareStatement(con, prepareSelect(values, forUpdate), controller);
Integer fetchSize = mRepository.getFetchSize();
if (fetchSize != null) {
ps.setFetchSize(fetchSize);
@@ -681,7 +715,8 @@ class JDBCStorage<S extends Storable> extends StandardQueryFactory<S> try {
setParameters(ps, values);
- return new JDBCCursor<S>(JDBCStorage.this, scope, con, ps);
+ return ControllerCursor.apply
+ (new JDBCCursor<S>(JDBCStorage.this, scope, con, ps), controller);
} catch (Exception e) {
// in case of exception, close statement
try {
@@ -706,6 +741,14 @@ class JDBCStorage<S extends Storable> extends StandardQueryFactory<S> public Cursor<S> fetchSlice(FilterValues<S> values, long from, Long to)
throws FetchException
{
+ return fetchSlice(values, from, to, null);
+ }
+
+ @Override
+ public Cursor<S> fetchSlice(FilterValues<S> values, long from, Long to,
+ Query.Controller controller)
+ throws FetchException
+ {
if (to != null && (to - from) <= 0) {
return EmptyCursor.the();
}
@@ -716,17 +759,17 @@ class JDBCStorage<S extends Storable> extends StandardQueryFactory<S> switch (option) {
case NOT_SUPPORTED: default:
- return super.fetchSlice(values, from, to);
+ return super.fetchSlice(values, from, to, controller);
case LIMIT_ONLY:
if (from > 0 || to == null) {
- return super.fetchSlice(values, from, to);
+ return super.fetchSlice(values, from, to, controller);
}
select = prepareSelect(values, false);
select = mSupportStrategy.buildSelectWithSlice(select, false, true);
break;
case OFFSET_ONLY:
if (from <= 0) {
- return super.fetchSlice(values, from, to);
+ return super.fetchSlice(values, from, to, controller);
}
select = prepareSelect(values, false);
select = mSupportStrategy.buildSelectWithSlice(select, true, false);
@@ -746,7 +789,7 @@ class JDBCStorage<S extends Storable> extends StandardQueryFactory<S> Connection con = getConnection();
try {
- PreparedStatement ps = con.prepareStatement(select);
+ PreparedStatement ps = prepareStatement(con, select, controller);
Integer fetchSize = mRepository.getFetchSize();
if (fetchSize != null) {
ps.setFetchSize(fetchSize);
@@ -760,7 +803,10 @@ class JDBCStorage<S extends Storable> extends StandardQueryFactory<S> switch (option) {
case OFFSET_ONLY:
ps.setLong(psOrdinal, from);
- Cursor<S> c = new JDBCCursor<S>(JDBCStorage.this, scope, con, ps);
+ Cursor<S> c =
+ ControllerCursor.apply
+ (new JDBCCursor<S>(JDBCStorage.this, scope, con, ps),
+ controller);
return new LimitCursor<S>(c, to - from);
case LIMIT_AND_OFFSET:
ps.setLong(psOrdinal, to - from);
@@ -782,7 +828,8 @@ class JDBCStorage<S extends Storable> extends StandardQueryFactory<S> ps.setLong(psOrdinal, to);
}
- return new JDBCCursor<S>(JDBCStorage.this, scope, con, ps);
+ return ControllerCursor.apply
+ (new JDBCCursor<S>(JDBCStorage.this, scope, con, ps), controller);
} catch (Exception e) {
// in case of exception, close statement
try {
@@ -805,9 +852,17 @@ class JDBCStorage<S extends Storable> extends StandardQueryFactory<S> @Override
public long count(FilterValues<S> values) throws FetchException {
+ return count(values, null);
+ }
+
+ @Override
+ public long count(FilterValues<S> values, Query.Controller controller)
+ throws FetchException
+ {
Connection con = getConnection();
try {
- PreparedStatement ps = con.prepareStatement(prepareCount(values));
+ PreparedStatement ps = prepareStatement(con, prepareCount(values), controller);
+
try {
setParameters(ps, values);
ResultSet rs = ps.executeQuery();
@@ -827,10 +882,12 @@ class JDBCStorage<S extends Storable> extends StandardQueryFactory<S> }
}
+ @Override
public Filter<S> getFilter() {
return mFilter;
}
+ @Override
public OrderingList<S> getOrdering() {
return mOrdering;
}
@@ -846,6 +903,7 @@ class JDBCStorage<S extends Storable> extends StandardQueryFactory<S> return true;
}
+ @Override
public boolean printPlan(Appendable app, int indentLevel, FilterValues<S> values)
throws IOException
{
@@ -862,7 +920,9 @@ class JDBCStorage<S extends Storable> extends StandardQueryFactory<S> /**
* Delete operation is included in cursor factory for ease of implementation.
*/
- int executeDelete(FilterValues<S> filterValues) throws PersistException {
+ int executeDelete(FilterValues<S> filterValues, Query.Controller controller)
+ throws PersistException
+ {
Connection con;
try {
con = getConnection();
@@ -870,7 +930,8 @@ class JDBCStorage<S extends Storable> extends StandardQueryFactory<S> throw e.toPersistException();
}
try {
- PreparedStatement ps = con.prepareStatement(prepareDelete(filterValues));
+ PreparedStatement ps =
+ prepareStatement(con, prepareDelete(filterValues), controller);
try {
setParameters(ps, filterValues);
return ps.executeUpdate();
@@ -976,13 +1037,18 @@ class JDBCStorage<S extends Storable> extends StandardQueryFactory<S> @Override
public void deleteAll() throws PersistException {
+ deleteAll(null);
+ }
+
+ @Override
+ public void deleteAll(Controller controller) throws PersistException {
if (mTriggerManager.getDeleteTrigger() != null) {
// Super implementation loads one at time and calls
// delete. This allows delete trigger to be invoked on each.
- super.deleteAll();
+ super.deleteAll(controller);
} else {
try {
- ((Executor) executor()).executeDelete(getFilterValues());
+ ((Executor) executor()).executeDelete(getFilterValues(), controller);
} catch (RepositoryException e) {
throw e.toPersistException();
}
diff --git a/src/main/java/com/amazon/carbonado/repo/jdbc/OracleExceptionTransformer.java b/src/main/java/com/amazon/carbonado/repo/jdbc/OracleExceptionTransformer.java index ad63941..ebaafd7 100644 --- a/src/main/java/com/amazon/carbonado/repo/jdbc/OracleExceptionTransformer.java +++ b/src/main/java/com/amazon/carbonado/repo/jdbc/OracleExceptionTransformer.java @@ -32,6 +32,8 @@ class OracleExceptionTransformer extends JDBCExceptionTransformer { public static int DEADLOCK_DETECTED = 60;
+ public static int PROCESSING_CANCELED = 1013;
+
@Override
public boolean isUniqueConstraintError(SQLException e) {
if (isConstraintError(e)) {
@@ -63,4 +65,9 @@ class OracleExceptionTransformer extends JDBCExceptionTransformer { }
return false;
}
+
+ @Override
+ public boolean isTimeoutError(SQLException e) {
+ return super.isTimeoutError(e) || e != null && PROCESSING_CANCELED == e.getErrorCode();
+ }
}
diff --git a/src/main/java/com/amazon/carbonado/repo/logging/LoggingQuery.java b/src/main/java/com/amazon/carbonado/repo/logging/LoggingQuery.java index 08be6e3..5dc4d3b 100644 --- a/src/main/java/com/amazon/carbonado/repo/logging/LoggingQuery.java +++ b/src/main/java/com/amazon/carbonado/repo/logging/LoggingQuery.java @@ -165,6 +165,15 @@ class LoggingQuery<S extends Storable> implements Query<S> { }
@Override
+ public Cursor<S> fetch(Controller controller) throws FetchException {
+ Log log = mStorage.mLog;
+ if (log.isEnabled()) {
+ log.write("Query.fetch(controller) on " + this + ", controller: " + controller);
+ }
+ return mQuery.fetch(controller);
+ }
+
+ @Override
public Cursor<S> fetchSlice(long from, Long to) throws FetchException {
Log log = mStorage.mLog;
if (log.isEnabled()) {
@@ -175,6 +184,16 @@ class LoggingQuery<S extends Storable> implements Query<S> { }
@Override
+ public Cursor<S> fetchSlice(long from, Long to, Controller controller) throws FetchException {
+ Log log = mStorage.mLog;
+ if (log.isEnabled()) {
+ log.write("Query.fetchSlice(start, to, controller) on " + this +
+ ", from: " + from + ", to: " + to + ", controller: " + controller);
+ }
+ return mQuery.fetchSlice(from, to, controller);
+ }
+
+ @Override
public <T extends S> Cursor<S> fetchAfter(T start) throws FetchException {
Log log = mStorage.mLog;
if (log.isEnabled()) {
@@ -184,6 +203,18 @@ class LoggingQuery<S extends Storable> implements Query<S> { }
@Override
+ public <T extends S> Cursor<S> fetchAfter(T start, Controller controller)
+ throws FetchException
+ {
+ Log log = mStorage.mLog;
+ if (log.isEnabled()) {
+ log.write("Query.fetchAfter(start, controller) on " + this + ", start: " + start
+ + ", controller: " + controller);
+ }
+ return mQuery.fetchAfter(start, controller);
+ }
+
+ @Override
public S loadOne() throws FetchException {
Log log = mStorage.mLog;
if (log.isEnabled()) {
@@ -193,6 +224,15 @@ class LoggingQuery<S extends Storable> implements Query<S> { }
@Override
+ public S loadOne(Controller controller) throws FetchException {
+ Log log = mStorage.mLog;
+ if (log.isEnabled()) {
+ log.write("Query.loadOne() on " + this + ", controller: " + controller);
+ }
+ return mQuery.loadOne(controller);
+ }
+
+ @Override
public S tryLoadOne() throws FetchException {
Log log = mStorage.mLog;
if (log.isEnabled()) {
@@ -202,6 +242,15 @@ class LoggingQuery<S extends Storable> implements Query<S> { }
@Override
+ public S tryLoadOne(Controller controller) throws FetchException {
+ Log log = mStorage.mLog;
+ if (log.isEnabled()) {
+ log.write("Query.tryLoadOne(controller) on " + this + ", controller: " + controller);
+ }
+ return mQuery.tryLoadOne(controller);
+ }
+
+ @Override
public void deleteOne() throws PersistException {
Log log = mStorage.mLog;
if (log.isEnabled()) {
@@ -211,6 +260,15 @@ class LoggingQuery<S extends Storable> implements Query<S> { }
@Override
+ public void deleteOne(Controller controller) throws PersistException {
+ Log log = mStorage.mLog;
+ if (log.isEnabled()) {
+ log.write("Query.deleteOne(controller) on " + this + ", controller: " + controller);
+ }
+ mQuery.deleteOne(controller);
+ }
+
+ @Override
public boolean tryDeleteOne() throws PersistException {
Log log = mStorage.mLog;
if (log.isEnabled()) {
@@ -220,6 +278,15 @@ class LoggingQuery<S extends Storable> implements Query<S> { }
@Override
+ public boolean tryDeleteOne(Controller controller) throws PersistException {
+ Log log = mStorage.mLog;
+ if (log.isEnabled()) {
+ log.write("Query.tryDeleteOne(controller) on " + this + ", controller: " + controller);
+ }
+ return mQuery.tryDeleteOne(controller);
+ }
+
+ @Override
public void deleteAll() throws PersistException {
Log log = mStorage.mLog;
if (log.isEnabled()) {
@@ -229,6 +296,15 @@ class LoggingQuery<S extends Storable> implements Query<S> { }
@Override
+ public void deleteAll(Controller controller) throws PersistException {
+ Log log = mStorage.mLog;
+ if (log.isEnabled()) {
+ log.write("Query.deleteAll(controller) on " + this + ", controller: " + controller);
+ }
+ mQuery.deleteAll(controller);
+ }
+
+ @Override
public long count() throws FetchException {
Log log = mStorage.mLog;
if (log.isEnabled()) {
@@ -238,6 +314,15 @@ class LoggingQuery<S extends Storable> implements Query<S> { }
@Override
+ public long count(Controller controller) throws FetchException {
+ Log log = mStorage.mLog;
+ if (log.isEnabled()) {
+ log.write("Query.count(controller) on " + this + ", controller: " + controller);
+ }
+ return mQuery.count(controller);
+ }
+
+ @Override
public boolean exists() throws FetchException {
Log log = mStorage.mLog;
if (log.isEnabled()) {
@@ -247,6 +332,15 @@ class LoggingQuery<S extends Storable> implements Query<S> { }
@Override
+ public boolean exists(Controller controller) throws FetchException {
+ Log log = mStorage.mLog;
+ if (log.isEnabled()) {
+ log.write("Query.exists(controller) on " + this + ", controller: " + controller);
+ }
+ return mQuery.exists(controller);
+ }
+
+ @Override
public boolean printNative() {
return mQuery.printNative();
}
diff --git a/src/main/java/com/amazon/carbonado/repo/map/MapStorage.java b/src/main/java/com/amazon/carbonado/repo/map/MapStorage.java index c84d8e3..c9c17b6 100644 --- a/src/main/java/com/amazon/carbonado/repo/map/MapStorage.java +++ b/src/main/java/com/amazon/carbonado/repo/map/MapStorage.java @@ -47,6 +47,7 @@ import com.amazon.carbonado.Trigger; import com.amazon.carbonado.capability.IndexInfo;
import com.amazon.carbonado.cursor.ArraySortBuffer;
+import com.amazon.carbonado.cursor.ControllerCursor;
import com.amazon.carbonado.cursor.EmptyCursor;
import com.amazon.carbonado.cursor.FilteredCursor;
import com.amazon.carbonado.cursor.SingletonCursor;
@@ -540,6 +541,10 @@ class MapStorage<S extends Storable> }
public long countAll() throws FetchException {
+ return countAll(null);
+ }
+
+ public long countAll(Query.Controller controller) throws FetchException {
try {
TransactionScope<MapTransaction> scope = mRepo.localTransactionScope();
MapTransaction txn = scope.getTxn();
@@ -578,9 +583,20 @@ class MapStorage<S extends Storable> }
}
+ public Cursor<S> fetchAll(Query.Controller controller) throws FetchException {
+ return ControllerCursor.apply(fetchAll(), controller);
+ }
+
public Cursor<S> fetchOne(StorableIndex<S> index, Object[] identityValues)
throws FetchException
{
+ return fetchOne(index, identityValues, null);
+ }
+
+ public Cursor<S> fetchOne(StorableIndex<S> index, Object[] identityValues,
+ Query.Controller controller)
+ throws FetchException
+ {
try {
S key = prepare();
for (int i=0; i<identityValues.length; i++) {
@@ -643,6 +659,12 @@ class MapStorage<S extends Storable> return null;
}
+ public Cursor<S> fetchFromIndexEntryQuery(StorableIndex<S> index, Query<?> indexEntryQuery,
+ Query.Controller controller)
+ {
+ return null;
+ }
+
public Cursor<S> fetchSubset(StorableIndex<S> index,
Object[] identityValues,
BoundaryType rangeStartBoundary,
@@ -770,6 +792,28 @@ class MapStorage<S extends Storable> return cursor;
}
+ public Cursor<S> fetchSubset(StorableIndex<S> index,
+ Object[] identityValues,
+ BoundaryType rangeStartBoundary,
+ Object rangeStartValue,
+ BoundaryType rangeEndBoundary,
+ Object rangeEndValue,
+ boolean reverseRange,
+ boolean reverseOrder,
+ Query.Controller controller)
+ throws FetchException
+ {
+ return ControllerCursor.apply(fetchSubset(index,
+ identityValues,
+ rangeStartBoundary,
+ rangeStartValue,
+ rangeEndBoundary,
+ rangeEndValue,
+ reverseRange,
+ reverseOrder),
+ controller);
+ }
+
private List<OrderedProperty<S>> createPkPropList() {
return new ArrayList<OrderedProperty<S>>(mInfo.getPrimaryKey().getProperties());
}
@@ -806,6 +850,11 @@ class MapStorage<S extends Storable> return new ArraySortBuffer<S>();
}
+ public SortBuffer<S> createSortBuffer(Query.Controller controller) {
+ // ArraySortBuffer doesn't support controller.
+ return new ArraySortBuffer<S>();
+ }
+
public static interface InstanceFactory {
Storable instantiate(DelegateSupport support);
}
diff --git a/src/main/java/com/amazon/carbonado/repo/sleepycat/BDBStorage.java b/src/main/java/com/amazon/carbonado/repo/sleepycat/BDBStorage.java index f4d8f24..c0f882e 100644 --- a/src/main/java/com/amazon/carbonado/repo/sleepycat/BDBStorage.java +++ b/src/main/java/com/amazon/carbonado/repo/sleepycat/BDBStorage.java @@ -45,6 +45,7 @@ import com.amazon.carbonado.UniqueConstraintException; import com.amazon.carbonado.capability.IndexInfo;
+import com.amazon.carbonado.cursor.ControllerCursor;
import com.amazon.carbonado.cursor.EmptyCursor;
import com.amazon.carbonado.cursor.MergeSortBuffer;
import com.amazon.carbonado.cursor.SingletonCursor;
@@ -263,22 +264,45 @@ abstract class BDBStorage<Txn, S extends Storable> implements Storage<S>, Storag return new MergeSortBuffer<S>();
}
+ public SortBuffer<S> createSortBuffer(Query.Controller controller) {
+ return new MergeSortBuffer<S>(controller);
+ }
+
public long countAll() throws FetchException {
// Return -1 to indicate default algorithm should be used.
return -1;
}
+ public long countAll(Query.Controller controller) throws FetchException {
+ // Return -1 to indicate default algorithm should be used.
+ return -1;
+ }
+
public Cursor<S> fetchAll() throws FetchException {
+ return fetchAll(null);
+ }
+
+ public Cursor<S> fetchAll(Query.Controller controller) throws FetchException {
return fetchSubset(null, null,
BoundaryType.OPEN, null,
BoundaryType.OPEN, null,
- false, false);
+ false, false,
+ controller);
}
public Cursor<S> fetchOne(StorableIndex<S> index,
Object[] identityValues)
throws FetchException
{
+ return fetchOne(index, identityValues, null);
+ }
+
+ public Cursor<S> fetchOne(StorableIndex<S> index,
+ Object[] identityValues,
+ Query.Controller controller)
+ throws FetchException
+ {
+ // Note: Controller is never called.
byte[] key = mStorableCodec.encodePrimaryKey(identityValues);
byte[] value = mRawSupport.tryLoad(null, key);
if (value == null) {
@@ -296,6 +320,13 @@ abstract class BDBStorage<Txn, S extends Storable> implements Storage<S>, Storag throw new UnsupportedOperationException();
}
+ public Cursor<S> fetchFromIndexEntryQuery(StorableIndex<S> index, Query<?> indexEntryQuery,
+ Query.Controller controller)
+ {
+ // This method should never be called since null was returned by indexEntryQuery.
+ throw new UnsupportedOperationException();
+ }
+
public Cursor<S> fetchSubset(StorableIndex<S> index,
Object[] identityValues,
BoundaryType rangeStartBoundary,
@@ -378,6 +409,7 @@ abstract class BDBStorage<Txn, S extends Storable> implements Storage<S>, Storag getPrimaryDatabase());
cursor.open();
+
return cursor;
} catch (Exception e) {
throw toFetchException(e);
@@ -387,6 +419,28 @@ abstract class BDBStorage<Txn, S extends Storable> implements Storage<S>, Storag }
}
+ public Cursor<S> fetchSubset(StorableIndex<S> index,
+ Object[] identityValues,
+ BoundaryType rangeStartBoundary,
+ Object rangeStartValue,
+ BoundaryType rangeEndBoundary,
+ Object rangeEndValue,
+ boolean reverseRange,
+ boolean reverseOrder,
+ Query.Controller controller)
+ throws FetchException
+ {
+ return ControllerCursor.apply(fetchSubset(index,
+ identityValues,
+ rangeStartBoundary,
+ rangeStartValue,
+ rangeEndBoundary,
+ rangeEndValue,
+ reverseRange,
+ reverseOrder),
+ controller);
+ }
+
private byte[] createBound(Object[] exactValues, byte[] exactKey, Object rangeValue,
StorableCodec<S> codec) {
Object[] values = {rangeValue};
|