diff options
author | Brian S. O'Neill <bronee@gmail.com> | 2011-05-04 00:20:02 +0000 |
---|---|---|
committer | Brian S. O'Neill <bronee@gmail.com> | 2011-05-04 00:20:02 +0000 |
commit | 121886bc0c92389610408e3b415abb992ad8a212 (patch) | |
tree | ccd7bcada5efd29b9106e2150734bee375fe1163 /src/main/java/com/amazon/carbonado/cursor | |
parent | 5be9a7ea0f9aad9e97c4d70cb82ce8a22f2d412a (diff) |
Add support for Query controller and timeouts; remove vestigial support for interrupts.
Diffstat (limited to 'src/main/java/com/amazon/carbonado/cursor')
7 files changed, 173 insertions, 70 deletions
diff --git a/src/main/java/com/amazon/carbonado/cursor/ControllerCursor.java b/src/main/java/com/amazon/carbonado/cursor/ControllerCursor.java new file mode 100644 index 0000000..da682d2 --- /dev/null +++ b/src/main/java/com/amazon/carbonado/cursor/ControllerCursor.java @@ -0,0 +1,99 @@ +/*
+ * Copyright 2011 Amazon Technologies, Inc. or its affiliates.
+ * Amazon, Amazon.com and Carbonado are trademarks or registered trademarks
+ * of Amazon Technologies, Inc. or its affiliates. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.amazon.carbonado.cursor;
+
+import com.amazon.carbonado.Cursor;
+import com.amazon.carbonado.FetchException;
+import com.amazon.carbonado.Query;
+
+/**
+ * Wraps another cursor and periodically calls a {@link com.amazon.carbonado.Query.Controller controller}.
+ *
+ * @author Brian S O'Neill
+ */
+public class ControllerCursor<S> extends AbstractCursor<S> {
+ /**
+ * Returns a ControllerCursor depending on whether a controller instance is
+ * passed in or not.
+ *
+ * @param controller optional controller which can abort query operation
+ * @throws IllegalArgumentException if source is null
+ */
+ public static <S> Cursor<S> apply(Cursor<S> source, Query.Controller controller) {
+ return controller == null ? source : new ControllerCursor<S>(source, controller);
+ }
+
+ private final Cursor<S> mSource;
+ private final Query.Controller mController;
+
+ private byte mCount;
+
+ /**
+ * @param controller required controller which can abort query operation
+ * @throws IllegalArgumentException if either argument is null
+ */
+ private ControllerCursor(Cursor<S> source, Query.Controller controller) {
+ if (source == null) {
+ throw new IllegalArgumentException("Source is null");
+ }
+ if (controller == null) {
+ throw new IllegalArgumentException("Controller is null");
+ }
+ mSource = source;
+ mController = controller;
+ controller.begin();
+ }
+
+ public boolean hasNext() throws FetchException {
+ if (mSource.hasNext()) {
+ continueCheck();
+ return true;
+ }
+ return false;
+ }
+
+ public S next() throws FetchException {
+ S next = mSource.next();
+ continueCheck();
+ return next;
+ }
+
+ public void close() throws FetchException {
+ try {
+ mSource.close();
+ } finally {
+ mController.close();
+ }
+ }
+
+ private void continueCheck() throws FetchException {
+ if (++mCount == 0) {
+ try {
+ mController.continueCheck();
+ } catch (FetchException e) {
+ try {
+ close();
+ } catch (FetchException e2) {
+ // Ignore.
+ }
+ throw e;
+ }
+ }
+ }
+}
diff --git a/src/main/java/com/amazon/carbonado/cursor/FilteredCursor.java b/src/main/java/com/amazon/carbonado/cursor/FilteredCursor.java index ad21667..4cf8c21 100644 --- a/src/main/java/com/amazon/carbonado/cursor/FilteredCursor.java +++ b/src/main/java/com/amazon/carbonado/cursor/FilteredCursor.java @@ -22,7 +22,6 @@ import java.util.NoSuchElementException; import com.amazon.carbonado.Cursor;
import com.amazon.carbonado.FetchException;
-import com.amazon.carbonado.FetchInterruptedException;
import com.amazon.carbonado.Storable;
import com.amazon.carbonado.filter.Filter;
@@ -114,14 +113,12 @@ public abstract class FilteredCursor<S> extends AbstractCursor<S> { return true;
}
try {
- int count = 0;
while (mCursor.hasNext()) {
S next = mCursor.next();
if (isAllowed(next)) {
mNext = next;
return true;
}
- interruptCheck(++count);
}
} catch (NoSuchElementException e) {
} catch (FetchException e) {
@@ -165,10 +162,9 @@ public abstract class FilteredCursor<S> extends AbstractCursor<S> { try {
int count = 0;
while (--amount >= 0 && hasNext()) {
- interruptCheck(++count);
+ ++count;
mNext = null;
}
-
return count;
} catch (FetchException e) {
try {
@@ -179,11 +175,4 @@ public abstract class FilteredCursor<S> extends AbstractCursor<S> { throw e;
}
}
-
- private void interruptCheck(int count) throws FetchException {
- if ((count & ~0xff) == 0 && Thread.interrupted()) {
- close();
- throw new FetchInterruptedException();
- }
- }
}
diff --git a/src/main/java/com/amazon/carbonado/cursor/GroupedCursor.java b/src/main/java/com/amazon/carbonado/cursor/GroupedCursor.java index 49ca84e..3053cd5 100644 --- a/src/main/java/com/amazon/carbonado/cursor/GroupedCursor.java +++ b/src/main/java/com/amazon/carbonado/cursor/GroupedCursor.java @@ -23,7 +23,6 @@ import java.util.NoSuchElementException; import com.amazon.carbonado.Cursor;
import com.amazon.carbonado.FetchException;
-import com.amazon.carbonado.FetchInterruptedException;
/**
* Abstract cursor for aggregation and finding distinct data. The source cursor
@@ -124,7 +123,6 @@ public abstract class GroupedCursor<S, G> extends AbstractCursor<G> { }
try {
- int count = 0;
if (mCursor.hasNext()) {
if (mGroupLeader == null) {
beginGroup(mGroupLeader = mCursor.next());
@@ -143,8 +141,6 @@ public abstract class GroupedCursor<S, G> extends AbstractCursor<G> { return true;
}
}
-
- interruptCheck(++count);
}
G aggregate = finishGroup();
@@ -206,7 +202,7 @@ public abstract class GroupedCursor<S, G> extends AbstractCursor<G> { try {
int count = 0;
while (--amount >= 0 && hasNext()) {
- interruptCheck(++count);
+ ++count;
mNextAggregate = null;
}
@@ -220,11 +216,4 @@ public abstract class GroupedCursor<S, G> extends AbstractCursor<G> { throw e;
}
}
-
- private void interruptCheck(int count) throws FetchException {
- if ((count & ~0xff) == 0 && Thread.interrupted()) {
- close();
- throw new FetchInterruptedException();
- }
- }
}
diff --git a/src/main/java/com/amazon/carbonado/cursor/MergeSortBuffer.java b/src/main/java/com/amazon/carbonado/cursor/MergeSortBuffer.java index aae5243..0231012 100644 --- a/src/main/java/com/amazon/carbonado/cursor/MergeSortBuffer.java +++ b/src/main/java/com/amazon/carbonado/cursor/MergeSortBuffer.java @@ -38,7 +38,9 @@ import java.util.List; import java.util.NoSuchElementException;
import java.util.PriorityQueue;
+import com.amazon.carbonado.FetchException;
import com.amazon.carbonado.FetchInterruptedException;
+import com.amazon.carbonado.Query;
import com.amazon.carbonado.Storable;
import com.amazon.carbonado.Storage;
import com.amazon.carbonado.SupportException;
@@ -122,6 +124,7 @@ public class MergeSortBuffer<S extends Storable> extends AbstractCollection<S> private final String mTempDir;
private final int mMaxArrayCapacity;
+ private final Query.Controller mController;
private Preparer<S> mPreparer;
@@ -144,6 +147,15 @@ public class MergeSortBuffer<S extends Storable> extends AbstractCollection<S> }
/**
+ * @since 1.2
+ *
+ * @param controller optional controller which can abort query operation
+ */
+ public MergeSortBuffer(Query.Controller controller) {
+ this(null, TEMP_DIR, MAX_ARRAY_CAPACITY, controller);
+ }
+
+ /**
* @param storage storage for elements; if null use first Storable to
* prepare reloaded Storables
*/
@@ -154,6 +166,15 @@ public class MergeSortBuffer<S extends Storable> extends AbstractCollection<S> /**
* @param storage storage for elements; if null use first Storable to
* prepare reloaded Storables
+ * @param controller optional controller which can abort query operation
+ */
+ public MergeSortBuffer(Storage<S> storage, Query.Controller controller) {
+ this(storage, TEMP_DIR, MAX_ARRAY_CAPACITY, controller);
+ }
+
+ /**
+ * @param storage storage for elements; if null use first Storable to
+ * prepare reloaded Storables
* @param tempDir directory to store temp files for merging, or null for default
*/
public MergeSortBuffer(Storage<S> storage, String tempDir) {
@@ -170,6 +191,22 @@ public class MergeSortBuffer<S extends Storable> extends AbstractCollection<S> */
@SuppressWarnings("unchecked")
public MergeSortBuffer(Storage<S> storage, String tempDir, int maxArrayCapacity) {
+ this(storage, tempDir, maxArrayCapacity, null);
+ }
+
+ /**
+ * @param storage storage for elements; if null use first Storable to
+ * prepare reloaded Storables
+ * @param tempDir directory to store temp files for merging, or null for default
+ * @param maxArrayCapacity maximum amount of storables to keep in an array
+ * before serializing to a file
+ * @param controller optional controller which can abort query operation
+ * @throws IllegalArgumentException if storage is null
+ */
+ @SuppressWarnings("unchecked")
+ public MergeSortBuffer(Storage<S> storage, String tempDir, int maxArrayCapacity,
+ Query.Controller controller)
+ {
mTempDir = tempDir;
mMaxArrayCapacity = maxArrayCapacity;
@@ -179,6 +216,10 @@ public class MergeSortBuffer<S extends Storable> extends AbstractCollection<S> int cap = Math.min(MIN_ARRAY_CAPACITY, maxArrayCapacity);
mElements = (S[]) new Storable[cap];
+
+ if ((mController = controller) != null) {
+ controller.begin();
+ }
}
public void prepare(Comparator<S> comparator) {
@@ -231,10 +272,10 @@ public class MergeSortBuffer<S extends Storable> extends AbstractCollection<S> if (mFilesInUse.size() < (MAX_OPEN_FILE_COUNT - 1)) {
mFilesInUse.add(raf);
- int count = 0;
+ byte count = 0;
for (S element : mElements) {
- // Check every so often if interrupted.
- interruptCheck(++count);
+ // Check every so often if should continue.
+ continueCheck(++count);
element.writeTo(out);
}
} else {
@@ -274,11 +315,11 @@ public class MergeSortBuffer<S extends Storable> extends AbstractCollection<S> // as well as error out earlier, should the disk be full.
raf.setLength(mergedLength);
- int count = 0;
+ byte count = 0;
Iterator<S> it = iterator(filesToMerge);
while (it.hasNext()) {
- // Check every so often if interrupted.
- interruptCheck(++count);
+ // Check every so often if should continue.
+ continueCheck(++count);
S element = it.next();
element.writeTo(out);
}
@@ -368,9 +409,16 @@ public class MergeSortBuffer<S extends Storable> extends AbstractCollection<S> }
public void close() {
- clear();
- if (mWorkFilePool != null) {
- mWorkFilePool.unregisterWorkFileUser(this);
+ try {
+ clear();
+ if (mWorkFilePool != null) {
+ mWorkFilePool.unregisterWorkFileUser(this);
+ }
+ } finally {
+ Query.Controller controller = mController;
+ if (controller != null) {
+ controller.close();
+ }
}
}
@@ -386,10 +434,20 @@ public class MergeSortBuffer<S extends Storable> extends AbstractCollection<S> return comparator;
}
- private void interruptCheck(int count) {
- if ((count & ~0xff) == 0 && (mStop || Thread.interrupted())) {
- close();
- throw new UndeclaredThrowableException(new FetchInterruptedException());
+ private void continueCheck(byte count) {
+ if (count == 0) {
+ try {
+ Query.Controller controller = mController;
+ if (controller != null) {
+ controller.continueCheck();
+ }
+ if (mStop) {
+ throw new FetchInterruptedException("Shutting down");
+ }
+ } catch (FetchException e) {
+ close();
+ throw new UndeclaredThrowableException(e);
+ }
}
}
diff --git a/src/main/java/com/amazon/carbonado/cursor/MultiTransformedCursor.java b/src/main/java/com/amazon/carbonado/cursor/MultiTransformedCursor.java index 8a1bd48..814a3ba 100644 --- a/src/main/java/com/amazon/carbonado/cursor/MultiTransformedCursor.java +++ b/src/main/java/com/amazon/carbonado/cursor/MultiTransformedCursor.java @@ -22,7 +22,6 @@ import java.util.NoSuchElementException; import com.amazon.carbonado.Cursor;
import com.amazon.carbonado.FetchException;
-import com.amazon.carbonado.FetchInterruptedException;
/**
* Abstract cursor which wraps another cursor and transforms each storable
@@ -72,7 +71,6 @@ public abstract class MultiTransformedCursor<S, T> extends AbstractCursor<T> { mNextCursor.close();
mNextCursor = null;
}
- int count = 0;
while (mCursor.hasNext()) {
Cursor<T> nextCursor = transform(mCursor.next());
if (nextCursor != null) {
@@ -82,7 +80,6 @@ public abstract class MultiTransformedCursor<S, T> extends AbstractCursor<T> { }
nextCursor.close();
}
- interruptCheck(++count);
}
} catch (NoSuchElementException e) {
} catch (FetchException e) {
@@ -129,7 +126,6 @@ public abstract class MultiTransformedCursor<S, T> extends AbstractCursor<T> { if ((amount -= chunk) <= 0) {
break;
}
- interruptCheck(count);
}
return count;
@@ -142,11 +138,4 @@ public abstract class MultiTransformedCursor<S, T> extends AbstractCursor<T> { throw e;
}
}
-
- private void interruptCheck(int count) throws FetchException {
- if ((count & ~0xff) == 0 && Thread.interrupted()) {
- close();
- throw new FetchInterruptedException();
- }
- }
}
diff --git a/src/main/java/com/amazon/carbonado/cursor/SortedCursor.java b/src/main/java/com/amazon/carbonado/cursor/SortedCursor.java index 5b5f9b5..e054137 100644 --- a/src/main/java/com/amazon/carbonado/cursor/SortedCursor.java +++ b/src/main/java/com/amazon/carbonado/cursor/SortedCursor.java @@ -32,7 +32,6 @@ import org.cojen.util.BeanProperty; import org.cojen.classfile.TypeDesc;
import com.amazon.carbonado.FetchException;
-import com.amazon.carbonado.FetchInterruptedException;
import com.amazon.carbonado.Cursor;
import com.amazon.carbonado.Storable;
@@ -388,12 +387,7 @@ public class SortedCursor<S> extends AbstractCursor<S> { fill: {
if (matcher == null) {
// Buffer up entire results and sort.
- int count = 0;
while (cursor.hasNext()) {
- // Check every so often if interrupted.
- if ((++count & ~0xff) == 0 && Thread.interrupted()) {
- throw new FetchInterruptedException();
- }
buffer.add(cursor.next());
}
break fill;
@@ -413,13 +407,8 @@ public class SortedCursor<S> extends AbstractCursor<S> { }
buffer.add(chunkStart);
- int count = 1;
while (cursor.hasNext()) {
- // Check every so often if interrupted.
- if ((++count & ~0xff) == 0 && Thread.interrupted()) {
- throw new FetchInterruptedException();
- }
S next = cursor.next();
if (matcher.compare(chunkStart, next) != 0) {
// Save for reading next chunk later.
diff --git a/src/main/java/com/amazon/carbonado/cursor/TransformedCursor.java b/src/main/java/com/amazon/carbonado/cursor/TransformedCursor.java index c05e0fc..bd41cef 100644 --- a/src/main/java/com/amazon/carbonado/cursor/TransformedCursor.java +++ b/src/main/java/com/amazon/carbonado/cursor/TransformedCursor.java @@ -22,7 +22,6 @@ import java.util.NoSuchElementException; import com.amazon.carbonado.Cursor;
import com.amazon.carbonado.FetchException;
-import com.amazon.carbonado.FetchInterruptedException;
/**
* Abstract cursor which wraps another cursor and transforms each storable
@@ -64,14 +63,12 @@ public abstract class TransformedCursor<S, T> extends AbstractCursor<T> { return true;
}
try {
- int count = 0;
while (mCursor.hasNext()) {
T next = transform(mCursor.next());
if (next != null) {
mNext = next;
return true;
}
- interruptCheck(++count);
}
} catch (NoSuchElementException e) {
} catch (FetchException e) {
@@ -115,7 +112,7 @@ public abstract class TransformedCursor<S, T> extends AbstractCursor<T> { try {
int count = 0;
while (--amount >= 0 && hasNext()) {
- interruptCheck(++count);
+ ++count;
mNext = null;
}
@@ -129,11 +126,4 @@ public abstract class TransformedCursor<S, T> extends AbstractCursor<T> { throw e;
}
}
-
- private void interruptCheck(int count) throws FetchException {
- if ((count & ~0xff) == 0 && Thread.interrupted()) {
- close();
- throw new FetchInterruptedException();
- }
- }
}
|