From 121886bc0c92389610408e3b415abb992ad8a212 Mon Sep 17 00:00:00 2001 From: "Brian S. O'Neill" Date: Wed, 4 May 2011 00:20:02 +0000 Subject: Add support for Query controller and timeouts; remove vestigial support for interrupts. --- .../amazon/carbonado/cursor/ControllerCursor.java | 99 ++++++++++++++++++++++ .../amazon/carbonado/cursor/FilteredCursor.java | 13 +-- .../com/amazon/carbonado/cursor/GroupedCursor.java | 13 +-- .../amazon/carbonado/cursor/MergeSortBuffer.java | 84 +++++++++++++++--- .../carbonado/cursor/MultiTransformedCursor.java | 11 --- .../com/amazon/carbonado/cursor/SortedCursor.java | 11 --- .../amazon/carbonado/cursor/TransformedCursor.java | 12 +-- 7 files changed, 173 insertions(+), 70 deletions(-) create mode 100644 src/main/java/com/amazon/carbonado/cursor/ControllerCursor.java (limited to 'src/main/java/com/amazon/carbonado/cursor') diff --git a/src/main/java/com/amazon/carbonado/cursor/ControllerCursor.java b/src/main/java/com/amazon/carbonado/cursor/ControllerCursor.java new file mode 100644 index 0000000..da682d2 --- /dev/null +++ b/src/main/java/com/amazon/carbonado/cursor/ControllerCursor.java @@ -0,0 +1,99 @@ +/* + * Copyright 2011 Amazon Technologies, Inc. or its affiliates. + * Amazon, Amazon.com and Carbonado are trademarks or registered trademarks + * of Amazon Technologies, Inc. or its affiliates. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.amazon.carbonado.cursor; + +import com.amazon.carbonado.Cursor; +import com.amazon.carbonado.FetchException; +import com.amazon.carbonado.Query; + +/** + * Wraps another cursor and periodically calls a {@link com.amazon.carbonado.Query.Controller controller}. + * + * @author Brian S O'Neill + */ +public class ControllerCursor extends AbstractCursor { + /** + * Returns a ControllerCursor depending on whether a controller instance is + * passed in or not. + * + * @param controller optional controller which can abort query operation + * @throws IllegalArgumentException if source is null + */ + public static Cursor apply(Cursor source, Query.Controller controller) { + return controller == null ? source : new ControllerCursor(source, controller); + } + + private final Cursor mSource; + private final Query.Controller mController; + + private byte mCount; + + /** + * @param controller required controller which can abort query operation + * @throws IllegalArgumentException if either argument is null + */ + private ControllerCursor(Cursor source, Query.Controller controller) { + if (source == null) { + throw new IllegalArgumentException("Source is null"); + } + if (controller == null) { + throw new IllegalArgumentException("Controller is null"); + } + mSource = source; + mController = controller; + controller.begin(); + } + + public boolean hasNext() throws FetchException { + if (mSource.hasNext()) { + continueCheck(); + return true; + } + return false; + } + + public S next() throws FetchException { + S next = mSource.next(); + continueCheck(); + return next; + } + + public void close() throws FetchException { + try { + mSource.close(); + } finally { + mController.close(); + } + } + + private void continueCheck() throws FetchException { + if (++mCount == 0) { + try { + mController.continueCheck(); + } catch (FetchException e) { + try { + close(); + } catch (FetchException e2) { + // Ignore. + } + throw e; + } + } + } +} diff --git a/src/main/java/com/amazon/carbonado/cursor/FilteredCursor.java b/src/main/java/com/amazon/carbonado/cursor/FilteredCursor.java index ad21667..4cf8c21 100644 --- a/src/main/java/com/amazon/carbonado/cursor/FilteredCursor.java +++ b/src/main/java/com/amazon/carbonado/cursor/FilteredCursor.java @@ -22,7 +22,6 @@ import java.util.NoSuchElementException; import com.amazon.carbonado.Cursor; import com.amazon.carbonado.FetchException; -import com.amazon.carbonado.FetchInterruptedException; import com.amazon.carbonado.Storable; import com.amazon.carbonado.filter.Filter; @@ -114,14 +113,12 @@ public abstract class FilteredCursor extends AbstractCursor { return true; } try { - int count = 0; while (mCursor.hasNext()) { S next = mCursor.next(); if (isAllowed(next)) { mNext = next; return true; } - interruptCheck(++count); } } catch (NoSuchElementException e) { } catch (FetchException e) { @@ -165,10 +162,9 @@ public abstract class FilteredCursor extends AbstractCursor { try { int count = 0; while (--amount >= 0 && hasNext()) { - interruptCheck(++count); + ++count; mNext = null; } - return count; } catch (FetchException e) { try { @@ -179,11 +175,4 @@ public abstract class FilteredCursor extends AbstractCursor { throw e; } } - - private void interruptCheck(int count) throws FetchException { - if ((count & ~0xff) == 0 && Thread.interrupted()) { - close(); - throw new FetchInterruptedException(); - } - } } diff --git a/src/main/java/com/amazon/carbonado/cursor/GroupedCursor.java b/src/main/java/com/amazon/carbonado/cursor/GroupedCursor.java index 49ca84e..3053cd5 100644 --- a/src/main/java/com/amazon/carbonado/cursor/GroupedCursor.java +++ b/src/main/java/com/amazon/carbonado/cursor/GroupedCursor.java @@ -23,7 +23,6 @@ import java.util.NoSuchElementException; import com.amazon.carbonado.Cursor; import com.amazon.carbonado.FetchException; -import com.amazon.carbonado.FetchInterruptedException; /** * Abstract cursor for aggregation and finding distinct data. The source cursor @@ -124,7 +123,6 @@ public abstract class GroupedCursor extends AbstractCursor { } try { - int count = 0; if (mCursor.hasNext()) { if (mGroupLeader == null) { beginGroup(mGroupLeader = mCursor.next()); @@ -143,8 +141,6 @@ public abstract class GroupedCursor extends AbstractCursor { return true; } } - - interruptCheck(++count); } G aggregate = finishGroup(); @@ -206,7 +202,7 @@ public abstract class GroupedCursor extends AbstractCursor { try { int count = 0; while (--amount >= 0 && hasNext()) { - interruptCheck(++count); + ++count; mNextAggregate = null; } @@ -220,11 +216,4 @@ public abstract class GroupedCursor extends AbstractCursor { throw e; } } - - private void interruptCheck(int count) throws FetchException { - if ((count & ~0xff) == 0 && Thread.interrupted()) { - close(); - throw new FetchInterruptedException(); - } - } } diff --git a/src/main/java/com/amazon/carbonado/cursor/MergeSortBuffer.java b/src/main/java/com/amazon/carbonado/cursor/MergeSortBuffer.java index aae5243..0231012 100644 --- a/src/main/java/com/amazon/carbonado/cursor/MergeSortBuffer.java +++ b/src/main/java/com/amazon/carbonado/cursor/MergeSortBuffer.java @@ -38,7 +38,9 @@ import java.util.List; import java.util.NoSuchElementException; import java.util.PriorityQueue; +import com.amazon.carbonado.FetchException; import com.amazon.carbonado.FetchInterruptedException; +import com.amazon.carbonado.Query; import com.amazon.carbonado.Storable; import com.amazon.carbonado.Storage; import com.amazon.carbonado.SupportException; @@ -122,6 +124,7 @@ public class MergeSortBuffer extends AbstractCollection private final String mTempDir; private final int mMaxArrayCapacity; + private final Query.Controller mController; private Preparer mPreparer; @@ -143,6 +146,15 @@ public class MergeSortBuffer extends AbstractCollection this(null, TEMP_DIR, MAX_ARRAY_CAPACITY); } + /** + * @since 1.2 + * + * @param controller optional controller which can abort query operation + */ + public MergeSortBuffer(Query.Controller controller) { + this(null, TEMP_DIR, MAX_ARRAY_CAPACITY, controller); + } + /** * @param storage storage for elements; if null use first Storable to * prepare reloaded Storables @@ -151,6 +163,15 @@ public class MergeSortBuffer extends AbstractCollection this(storage, TEMP_DIR, MAX_ARRAY_CAPACITY); } + /** + * @param storage storage for elements; if null use first Storable to + * prepare reloaded Storables + * @param controller optional controller which can abort query operation + */ + public MergeSortBuffer(Storage storage, Query.Controller controller) { + this(storage, TEMP_DIR, MAX_ARRAY_CAPACITY, controller); + } + /** * @param storage storage for elements; if null use first Storable to * prepare reloaded Storables @@ -170,6 +191,22 @@ public class MergeSortBuffer extends AbstractCollection */ @SuppressWarnings("unchecked") public MergeSortBuffer(Storage storage, String tempDir, int maxArrayCapacity) { + this(storage, tempDir, maxArrayCapacity, null); + } + + /** + * @param storage storage for elements; if null use first Storable to + * prepare reloaded Storables + * @param tempDir directory to store temp files for merging, or null for default + * @param maxArrayCapacity maximum amount of storables to keep in an array + * before serializing to a file + * @param controller optional controller which can abort query operation + * @throws IllegalArgumentException if storage is null + */ + @SuppressWarnings("unchecked") + public MergeSortBuffer(Storage storage, String tempDir, int maxArrayCapacity, + Query.Controller controller) + { mTempDir = tempDir; mMaxArrayCapacity = maxArrayCapacity; @@ -179,6 +216,10 @@ public class MergeSortBuffer extends AbstractCollection int cap = Math.min(MIN_ARRAY_CAPACITY, maxArrayCapacity); mElements = (S[]) new Storable[cap]; + + if ((mController = controller) != null) { + controller.begin(); + } } public void prepare(Comparator comparator) { @@ -231,10 +272,10 @@ public class MergeSortBuffer extends AbstractCollection if (mFilesInUse.size() < (MAX_OPEN_FILE_COUNT - 1)) { mFilesInUse.add(raf); - int count = 0; + byte count = 0; for (S element : mElements) { - // Check every so often if interrupted. - interruptCheck(++count); + // Check every so often if should continue. + continueCheck(++count); element.writeTo(out); } } else { @@ -274,11 +315,11 @@ public class MergeSortBuffer extends AbstractCollection // as well as error out earlier, should the disk be full. raf.setLength(mergedLength); - int count = 0; + byte count = 0; Iterator it = iterator(filesToMerge); while (it.hasNext()) { - // Check every so often if interrupted. - interruptCheck(++count); + // Check every so often if should continue. + continueCheck(++count); S element = it.next(); element.writeTo(out); } @@ -368,9 +409,16 @@ public class MergeSortBuffer extends AbstractCollection } public void close() { - clear(); - if (mWorkFilePool != null) { - mWorkFilePool.unregisterWorkFileUser(this); + try { + clear(); + if (mWorkFilePool != null) { + mWorkFilePool.unregisterWorkFileUser(this); + } + } finally { + Query.Controller controller = mController; + if (controller != null) { + controller.close(); + } } } @@ -386,10 +434,20 @@ public class MergeSortBuffer extends AbstractCollection return comparator; } - private void interruptCheck(int count) { - if ((count & ~0xff) == 0 && (mStop || Thread.interrupted())) { - close(); - throw new UndeclaredThrowableException(new FetchInterruptedException()); + private void continueCheck(byte count) { + if (count == 0) { + try { + Query.Controller controller = mController; + if (controller != null) { + controller.continueCheck(); + } + if (mStop) { + throw new FetchInterruptedException("Shutting down"); + } + } catch (FetchException e) { + close(); + throw new UndeclaredThrowableException(e); + } } } diff --git a/src/main/java/com/amazon/carbonado/cursor/MultiTransformedCursor.java b/src/main/java/com/amazon/carbonado/cursor/MultiTransformedCursor.java index 8a1bd48..814a3ba 100644 --- a/src/main/java/com/amazon/carbonado/cursor/MultiTransformedCursor.java +++ b/src/main/java/com/amazon/carbonado/cursor/MultiTransformedCursor.java @@ -22,7 +22,6 @@ import java.util.NoSuchElementException; import com.amazon.carbonado.Cursor; import com.amazon.carbonado.FetchException; -import com.amazon.carbonado.FetchInterruptedException; /** * Abstract cursor which wraps another cursor and transforms each storable @@ -72,7 +71,6 @@ public abstract class MultiTransformedCursor extends AbstractCursor { mNextCursor.close(); mNextCursor = null; } - int count = 0; while (mCursor.hasNext()) { Cursor nextCursor = transform(mCursor.next()); if (nextCursor != null) { @@ -82,7 +80,6 @@ public abstract class MultiTransformedCursor extends AbstractCursor { } nextCursor.close(); } - interruptCheck(++count); } } catch (NoSuchElementException e) { } catch (FetchException e) { @@ -129,7 +126,6 @@ public abstract class MultiTransformedCursor extends AbstractCursor { if ((amount -= chunk) <= 0) { break; } - interruptCheck(count); } return count; @@ -142,11 +138,4 @@ public abstract class MultiTransformedCursor extends AbstractCursor { throw e; } } - - private void interruptCheck(int count) throws FetchException { - if ((count & ~0xff) == 0 && Thread.interrupted()) { - close(); - throw new FetchInterruptedException(); - } - } } diff --git a/src/main/java/com/amazon/carbonado/cursor/SortedCursor.java b/src/main/java/com/amazon/carbonado/cursor/SortedCursor.java index 5b5f9b5..e054137 100644 --- a/src/main/java/com/amazon/carbonado/cursor/SortedCursor.java +++ b/src/main/java/com/amazon/carbonado/cursor/SortedCursor.java @@ -32,7 +32,6 @@ import org.cojen.util.BeanProperty; import org.cojen.classfile.TypeDesc; import com.amazon.carbonado.FetchException; -import com.amazon.carbonado.FetchInterruptedException; import com.amazon.carbonado.Cursor; import com.amazon.carbonado.Storable; @@ -388,12 +387,7 @@ public class SortedCursor extends AbstractCursor { fill: { if (matcher == null) { // Buffer up entire results and sort. - int count = 0; while (cursor.hasNext()) { - // Check every so often if interrupted. - if ((++count & ~0xff) == 0 && Thread.interrupted()) { - throw new FetchInterruptedException(); - } buffer.add(cursor.next()); } break fill; @@ -413,13 +407,8 @@ public class SortedCursor extends AbstractCursor { } buffer.add(chunkStart); - int count = 1; while (cursor.hasNext()) { - // Check every so often if interrupted. - if ((++count & ~0xff) == 0 && Thread.interrupted()) { - throw new FetchInterruptedException(); - } S next = cursor.next(); if (matcher.compare(chunkStart, next) != 0) { // Save for reading next chunk later. diff --git a/src/main/java/com/amazon/carbonado/cursor/TransformedCursor.java b/src/main/java/com/amazon/carbonado/cursor/TransformedCursor.java index c05e0fc..bd41cef 100644 --- a/src/main/java/com/amazon/carbonado/cursor/TransformedCursor.java +++ b/src/main/java/com/amazon/carbonado/cursor/TransformedCursor.java @@ -22,7 +22,6 @@ import java.util.NoSuchElementException; import com.amazon.carbonado.Cursor; import com.amazon.carbonado.FetchException; -import com.amazon.carbonado.FetchInterruptedException; /** * Abstract cursor which wraps another cursor and transforms each storable @@ -64,14 +63,12 @@ public abstract class TransformedCursor extends AbstractCursor { return true; } try { - int count = 0; while (mCursor.hasNext()) { T next = transform(mCursor.next()); if (next != null) { mNext = next; return true; } - interruptCheck(++count); } } catch (NoSuchElementException e) { } catch (FetchException e) { @@ -115,7 +112,7 @@ public abstract class TransformedCursor extends AbstractCursor { try { int count = 0; while (--amount >= 0 && hasNext()) { - interruptCheck(++count); + ++count; mNext = null; } @@ -129,11 +126,4 @@ public abstract class TransformedCursor extends AbstractCursor { throw e; } } - - private void interruptCheck(int count) throws FetchException { - if ((count & ~0xff) == 0 && Thread.interrupted()) { - close(); - throw new FetchInterruptedException(); - } - } } -- cgit v1.2.3