From ab635e23ad5fb9cd9edb61665a3654ee3e4b372a Mon Sep 17 00:00:00 2001 From: "Brian S. O'Neill" Date: Wed, 30 Aug 2006 01:55:10 +0000 Subject: Add cursor implementations --- .../amazon/carbonado/cursor/AbstractCursor.java | 83 +++ .../amazon/carbonado/cursor/ArraySortBuffer.java | 62 ++ .../amazon/carbonado/cursor/DifferenceCursor.java | 120 ++++ .../com/amazon/carbonado/cursor/EmptyCursor.java | 103 ++++ .../amazon/carbonado/cursor/FilteredCursor.java | 148 +++++ .../carbonado/cursor/FilteredCursorGenerator.java | 665 +++++++++++++++++++++ .../com/amazon/carbonado/cursor/GroupedCursor.java | 205 +++++++ .../carbonado/cursor/IntersectionCursor.java | 118 ++++ .../amazon/carbonado/cursor/IteratorCursor.java | 62 ++ .../carbonado/cursor/JoinedCursorFactory.java | 459 ++++++++++++++ .../amazon/carbonado/cursor/MergeSortBuffer.java | 498 +++++++++++++++ .../carbonado/cursor/MultiTransformedCursor.java | 132 ++++ .../com/amazon/carbonado/cursor/SortBuffer.java | 53 ++ .../com/amazon/carbonado/cursor/SortedCursor.java | 332 ++++++++++ .../cursor/SymmetricDifferenceCursor.java | 123 ++++ .../amazon/carbonado/cursor/ThrottledCursor.java | 101 ++++ .../amazon/carbonado/cursor/TransformedCursor.java | 119 ++++ .../com/amazon/carbonado/cursor/UnionCursor.java | 106 ++++ .../com/amazon/carbonado/cursor/WorkFilePool.java | 192 ++++++ .../com/amazon/carbonado/cursor/package-info.java | 23 + 20 files changed, 3704 insertions(+) create mode 100644 src/main/java/com/amazon/carbonado/cursor/AbstractCursor.java create mode 100644 src/main/java/com/amazon/carbonado/cursor/ArraySortBuffer.java create mode 100644 src/main/java/com/amazon/carbonado/cursor/DifferenceCursor.java create mode 100644 src/main/java/com/amazon/carbonado/cursor/EmptyCursor.java create mode 100644 src/main/java/com/amazon/carbonado/cursor/FilteredCursor.java create mode 100644 src/main/java/com/amazon/carbonado/cursor/FilteredCursorGenerator.java create mode 100644 src/main/java/com/amazon/carbonado/cursor/GroupedCursor.java create mode 100644 src/main/java/com/amazon/carbonado/cursor/IntersectionCursor.java create mode 100644 src/main/java/com/amazon/carbonado/cursor/IteratorCursor.java create mode 100644 src/main/java/com/amazon/carbonado/cursor/JoinedCursorFactory.java create mode 100644 src/main/java/com/amazon/carbonado/cursor/MergeSortBuffer.java create mode 100644 src/main/java/com/amazon/carbonado/cursor/MultiTransformedCursor.java create mode 100644 src/main/java/com/amazon/carbonado/cursor/SortBuffer.java create mode 100644 src/main/java/com/amazon/carbonado/cursor/SortedCursor.java create mode 100644 src/main/java/com/amazon/carbonado/cursor/SymmetricDifferenceCursor.java create mode 100644 src/main/java/com/amazon/carbonado/cursor/ThrottledCursor.java create mode 100644 src/main/java/com/amazon/carbonado/cursor/TransformedCursor.java create mode 100644 src/main/java/com/amazon/carbonado/cursor/UnionCursor.java create mode 100644 src/main/java/com/amazon/carbonado/cursor/WorkFilePool.java create mode 100644 src/main/java/com/amazon/carbonado/cursor/package-info.java diff --git a/src/main/java/com/amazon/carbonado/cursor/AbstractCursor.java b/src/main/java/com/amazon/carbonado/cursor/AbstractCursor.java new file mode 100644 index 0000000..c71393a --- /dev/null +++ b/src/main/java/com/amazon/carbonado/cursor/AbstractCursor.java @@ -0,0 +1,83 @@ +/* + * Copyright 2006 Amazon Technologies, Inc. or its affiliates. + * Amazon, Amazon.com and Carbonado are trademarks or registered trademarks + * of Amazon Technologies, Inc. or its affiliates. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.amazon.carbonado.cursor; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +import com.amazon.carbonado.Cursor; +import com.amazon.carbonado.FetchException; + +/** + * AbstractCursor implements a small set of common Cursor methods. + * + * @author Brian S O'Neill + */ +public abstract class AbstractCursor implements Cursor { + // Note: Since constructor takes no parameters, this class is called + // Abstract instead of Base. + protected AbstractCursor() { + } + + public int copyInto(Collection c) throws FetchException { + int originalSize = c.size(); + while (hasNext()) { + c.add(next()); + } + return c.size() - originalSize; + } + + public int copyInto(Collection c, int limit) throws FetchException { + int originalSize = c.size(); + while (--limit >= 0 && hasNext()) { + c.add(next()); + } + return c.size() - originalSize; + } + + public List toList() throws FetchException { + List list = new ArrayList(); + copyInto(list); + return list; + } + + public List toList(int limit) throws FetchException { + List list = new ArrayList(); + copyInto(list, limit); + return list; + } + + public synchronized int skipNext(int amount) throws FetchException { + if (amount <= 0) { + if (amount < 0) { + throw new IllegalArgumentException("Cannot skip negative amount: " + amount); + } + return 0; + } + + int count = 0; + while (--amount >= 0 && hasNext()) { + next(); + count++; + } + + return count; + } +} diff --git a/src/main/java/com/amazon/carbonado/cursor/ArraySortBuffer.java b/src/main/java/com/amazon/carbonado/cursor/ArraySortBuffer.java new file mode 100644 index 0000000..db74134 --- /dev/null +++ b/src/main/java/com/amazon/carbonado/cursor/ArraySortBuffer.java @@ -0,0 +1,62 @@ +/* + * Copyright 2006 Amazon Technologies, Inc. or its affiliates. + * Amazon, Amazon.com and Carbonado are trademarks or registered trademarks + * of Amazon Technologies, Inc. or its affiliates. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.amazon.carbonado.cursor; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; + +/** + * Sort buffer implementation backed by an ArrayList. + * + * @author Brian S O'Neill + * @see SortedCursor + */ +public class ArraySortBuffer extends ArrayList implements SortBuffer { + private static final long serialVersionUID = -5622302375191321452L; + + private Comparator mComparator; + + public ArraySortBuffer() { + super(); + } + + public ArraySortBuffer(int initialCapacity) { + super(initialCapacity); + } + + public void prepare(Comparator comparator) { + if (comparator == null) { + throw new IllegalArgumentException(); + } + clear(); + mComparator = comparator; + } + + public void sort() { + if (mComparator == null) { + throw new IllegalStateException("Buffer was not prepared"); + } + Collections.sort(this, mComparator); + } + + public void close() { + clear(); + } +} diff --git a/src/main/java/com/amazon/carbonado/cursor/DifferenceCursor.java b/src/main/java/com/amazon/carbonado/cursor/DifferenceCursor.java new file mode 100644 index 0000000..39455c8 --- /dev/null +++ b/src/main/java/com/amazon/carbonado/cursor/DifferenceCursor.java @@ -0,0 +1,120 @@ +/* + * Copyright 2006 Amazon Technologies, Inc. or its affiliates. + * Amazon, Amazon.com and Carbonado are trademarks or registered trademarks + * of Amazon Technologies, Inc. or its affiliates. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.amazon.carbonado.cursor; + +import java.util.Comparator; +import java.util.NoSuchElementException; + +import com.amazon.carbonado.FetchException; +import com.amazon.carbonado.Cursor; + +/** + * Wraps two Cursors and performs an asymmetric set difference + * operation. + * + *

Both cursors must return results in the same order. Ordering is preserved + * by the difference. + * + * @author Brian S O'Neill + * @see UnionCursor + * @see IntersectionCursor + * @see SymmetricDifferenceCursor + */ +public class DifferenceCursor extends AbstractCursor { + private final Cursor mLeftCursor; + private final Cursor mRightCursor; + private final Comparator mOrder; + + private S mNext; + private S mNextRight; + + /** + * @param left cursor to wrap + * @param right cursor to wrap whose results are completely discarded + * @param order describes sort ordering of wrapped cursors, which must be + * a total ordering + */ + public DifferenceCursor(Cursor left, Cursor right, Comparator order) { + if (left == null || right == null || order == null) { + throw new IllegalArgumentException(); + } + mLeftCursor = left; + mRightCursor = right; + mOrder = order; + } + + public synchronized void close() throws FetchException { + mLeftCursor.close(); + mRightCursor.close(); + mNext = null; + mNextRight = null; + } + + public synchronized boolean hasNext() throws FetchException { + if (mNext != null) { + return true; + } + + S nextLeft; + + while (true) { + if (mLeftCursor.hasNext()) { + nextLeft = mLeftCursor.next(); + } else { + close(); + return false; + } + if (mNextRight == null) { + if (mRightCursor.hasNext()) { + mNextRight = mRightCursor.next(); + } else { + mNext = nextLeft; + return true; + } + } + + while (true) { + int result = mOrder.compare(nextLeft, mNextRight); + if (result < 0) { + mNext = nextLeft; + return true; + } else if (result > 0) { + if (mRightCursor.hasNext()) { + mNextRight = mRightCursor.next(); + } else { + mNextRight = null; + mNext = nextLeft; + return true; + } + } else { + break; + } + } + } + } + + public synchronized S next() throws FetchException { + if (hasNext()) { + S next = mNext; + mNext = null; + return next; + } + throw new NoSuchElementException(); + } +} diff --git a/src/main/java/com/amazon/carbonado/cursor/EmptyCursor.java b/src/main/java/com/amazon/carbonado/cursor/EmptyCursor.java new file mode 100644 index 0000000..db3778b --- /dev/null +++ b/src/main/java/com/amazon/carbonado/cursor/EmptyCursor.java @@ -0,0 +1,103 @@ +/* + * Copyright 2006 Amazon Technologies, Inc. or its affiliates. + * Amazon, Amazon.com and Carbonado are trademarks or registered trademarks + * of Amazon Technologies, Inc. or its affiliates. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.amazon.carbonado.cursor; + +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.NoSuchElementException; + +import com.amazon.carbonado.Cursor; + +/** + * Special cursor implementation that is empty. + * + * @author Brian S O'Neill + */ +public class EmptyCursor implements Cursor { + private static final Cursor EMPTY_CURSOR = new EmptyCursor(); + + /** + * Returns the singleton empty cursor instance. + */ + @SuppressWarnings("unchecked") + public static Cursor getEmptyCursor() { + return EMPTY_CURSOR; + } + + // Package-private, to be used by test suite + EmptyCursor() { + } + + /** + * Does nothing. + */ + public void close() { + } + + /** + * Always returns false. + */ + public boolean hasNext() { + return false; + } + + /** + * Always throws NoSuchElementException. + */ + public S next() { + throw new NoSuchElementException(); + } + + /** + * Always returns 0. + */ + public int skipNext(int amount) { + return 0; + } + + /** + * Performs no copy and always returns 0. + */ + public int copyInto(Collection c) { + return 0; + } + + /** + * Performs no copy and always returns 0. + */ + public int copyInto(Collection c, int limit) { + return 0; + } + + /** + * Always returns an empty list. + */ + public List toList() { + return Collections.emptyList(); + } + + /** + * Always returns an empty list. + */ + public List toList(int limit) { + return Collections.emptyList(); + } +} + diff --git a/src/main/java/com/amazon/carbonado/cursor/FilteredCursor.java b/src/main/java/com/amazon/carbonado/cursor/FilteredCursor.java new file mode 100644 index 0000000..60b3c81 --- /dev/null +++ b/src/main/java/com/amazon/carbonado/cursor/FilteredCursor.java @@ -0,0 +1,148 @@ +/* + * Copyright 2006 Amazon Technologies, Inc. or its affiliates. + * Amazon, Amazon.com and Carbonado are trademarks or registered trademarks + * of Amazon Technologies, Inc. or its affiliates. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.amazon.carbonado.cursor; + +import java.util.NoSuchElementException; + +import com.amazon.carbonado.Cursor; +import com.amazon.carbonado.FetchException; +import com.amazon.carbonado.FetchInterruptedException; +import com.amazon.carbonado.Storable; + +import com.amazon.carbonado.filter.ClosedFilter; +import com.amazon.carbonado.filter.Filter; +import com.amazon.carbonado.filter.FilterValues; +import com.amazon.carbonado.filter.OpenFilter; + +/** + * Wraps another cursor and applies custom filtering to reduce the set of + * results. + * + * @author Brian S O'Neill + */ +public abstract class FilteredCursor extends AbstractCursor { + /** + * Returns a Cursor that is filtered by the given Filter and FilterValues. + * The given Filter must be composed only of the same PropertyFilter + * instances as used to construct the FilterValues. An + * IllegalStateException will result otherwise. + * + * @param filter filter to apply + * @param filterValues values for filter + * @param cursor cursor to wrap + * @return wrapped cursor which filters results + * @throws IllegalStateException if any values are not specified + * @throws IllegalArgumentException if filter is closed + */ + public static Cursor applyFilter(Filter filter, + FilterValues filterValues, + Cursor cursor) + { + if (filter instanceof OpenFilter) { + return cursor; + } + if (filter instanceof ClosedFilter) { + throw new IllegalArgumentException(); + } + + return FilteredCursorGenerator.getFactory(filter) + .newFilteredCursor(cursor, filterValues.getValuesFor(filter)); + } + + private final Cursor mCursor; + + private S mNext; + + protected FilteredCursor(Cursor cursor) { + if (cursor == null) { + throw new IllegalArgumentException(); + } + mCursor = cursor; + } + + /** + * @return false if object should not be in results + */ + protected abstract boolean isAllowed(S storable); + + public void close() throws FetchException { + synchronized (mCursor) { + mCursor.close(); + mNext = null; + } + } + + public boolean hasNext() throws FetchException { + synchronized (mCursor) { + if (mNext != null) { + return true; + } + try { + int count = 0; + while (mCursor.hasNext()) { + S next = mCursor.next(); + if (isAllowed(next)) { + mNext = next; + return true; + } + interruptCheck(++count); + } + } catch (NoSuchElementException e) { + } + return false; + } + } + + public S next() throws FetchException { + synchronized (mCursor) { + if (hasNext()) { + S next = mNext; + mNext = null; + return next; + } + throw new NoSuchElementException(); + } + } + + public int skipNext(int amount) throws FetchException { + synchronized (mCursor) { + if (amount <= 0) { + if (amount < 0) { + throw new IllegalArgumentException("Cannot skip negative amount: " + amount); + } + return 0; + } + + int count = 0; + while (--amount >= 0 && hasNext()) { + interruptCheck(++count); + mNext = null; + } + + return count; + } + } + + private void interruptCheck(int count) throws FetchException { + if ((count & ~0xff) == 0 && Thread.interrupted()) { + close(); + throw new FetchInterruptedException(); + } + } +} diff --git a/src/main/java/com/amazon/carbonado/cursor/FilteredCursorGenerator.java b/src/main/java/com/amazon/carbonado/cursor/FilteredCursorGenerator.java new file mode 100644 index 0000000..ea1baa8 --- /dev/null +++ b/src/main/java/com/amazon/carbonado/cursor/FilteredCursorGenerator.java @@ -0,0 +1,665 @@ +/* + * Copyright 2006 Amazon Technologies, Inc. or its affiliates. + * Amazon, Amazon.com and Carbonado are trademarks or registered trademarks + * of Amazon Technologies, Inc. or its affiliates. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.amazon.carbonado.cursor; + +import java.lang.reflect.Method; +import java.util.Arrays; +import java.util.Map; +import java.util.Stack; + +import org.cojen.classfile.ClassFile; +import org.cojen.classfile.CodeAssembler; +import org.cojen.classfile.CodeBuilder; +import org.cojen.classfile.Label; +import org.cojen.classfile.LocalVariable; +import org.cojen.classfile.MethodInfo; +import org.cojen.classfile.Modifiers; +import org.cojen.classfile.Opcode; +import org.cojen.classfile.TypeDesc; +import static org.cojen.classfile.TypeDesc.*; + +import org.cojen.util.ClassInjector; +import org.cojen.util.WeakIdentityMap; + +import com.amazon.carbonado.Cursor; +import com.amazon.carbonado.Storable; + +import com.amazon.carbonado.filter.AndFilter; +import com.amazon.carbonado.filter.OrFilter; +import com.amazon.carbonado.filter.Filter; +import com.amazon.carbonado.filter.PropertyFilter; +import com.amazon.carbonado.filter.RelOp; +import com.amazon.carbonado.filter.Visitor; + +import com.amazon.carbonado.info.ChainedProperty; +import com.amazon.carbonado.info.StorableProperty; + +import com.amazon.carbonado.util.QuickConstructorGenerator; + + +/** + * Generates Cursor implementations that wrap another Cursor, applying a + * property matching filter on each result. + * + * @author Brian S O'Neill + * @see FilteredCursor + */ +class FilteredCursorGenerator { + private static Map cCache = new WeakIdentityMap(); + + /** + * Returns a factory for creating new filtered cursor instances. + * + * @param filter filter specification + * @throws IllegalArgumentException if filter is null + * @throws UnsupportedOperationException if filter is not supported + */ + @SuppressWarnings("unchecked") + static Factory getFactory(Filter filter) { + if (filter == null) { + throw new IllegalArgumentException(); + } + synchronized (cCache) { + Factory factory = (Factory) cCache.get(filter); + if (factory != null) { + return factory; + } + Class> clazz = generateClass(filter); + factory = QuickConstructorGenerator.getInstance(clazz, Factory.class); + cCache.put(filter, factory); + return factory; + } + } + + @SuppressWarnings("unchecked") + private static Class> generateClass(Filter filter) { + filter.accept(new Validator(), null); + + Class type = filter.getStorableType(); + + String packageName; + { + String name = type.getName(); + int index = name.lastIndexOf('.'); + if (index >= 0) { + packageName = name.substring(0, index); + } else { + packageName = ""; + } + } + + // Try to generate against the same loader as the storable type. This + // allows the generated class access to it, preventing a + // NoClassDefFoundError. + ClassLoader loader = type.getClassLoader(); + + ClassInjector ci = ClassInjector.create(packageName + ".FilteredCursor", loader); + ClassFile cf = new ClassFile(ci.getClassName(), FilteredCursor.class); + cf.markSynthetic(); + cf.setSourceFile(FilteredCursorGenerator.class.getName()); + cf.setTarget("1.5"); + + // Begin constructor definition. + CodeBuilder ctorBuilder; + { + TypeDesc cursorType = TypeDesc.forClass(Cursor.class); + TypeDesc objectArrayType = TypeDesc.forClass(Object[].class); + TypeDesc[] params = {cursorType, objectArrayType}; + MethodInfo ctor = cf.addConstructor(Modifiers.PUBLIC, params); + ctorBuilder = new CodeBuilder(ctor); + ctorBuilder.loadThis(); + ctorBuilder.loadLocal(ctorBuilder.getParameter(0)); + ctorBuilder.invokeSuperConstructor(new TypeDesc[] {cursorType}); + } + + // Begin isAllowed method definition. + CodeBuilder isAllowedBuilder; + LocalVariable storableVar; + { + TypeDesc[] params = {TypeDesc.OBJECT}; + MethodInfo mi = cf.addMethod(Modifiers.PUBLIC, "isAllowed", BOOLEAN, params); + isAllowedBuilder = new CodeBuilder(mi); + + storableVar = isAllowedBuilder.getParameter(0); + + // Filter out any instances of null. Shouldn't happen though. + isAllowedBuilder.loadLocal(storableVar); + Label notNull = isAllowedBuilder.createLabel(); + isAllowedBuilder.ifNullBranch(notNull, false); + isAllowedBuilder.loadConstant(false); + isAllowedBuilder.returnValue(BOOLEAN); + notNull.setLocation(); + + // Cast the parameter to the expected type. + isAllowedBuilder.loadLocal(storableVar); + TypeDesc storableType = TypeDesc.forClass(type); + isAllowedBuilder.checkCast(storableType); + storableVar = isAllowedBuilder.createLocalVariable("storable", storableType); + isAllowedBuilder.storeLocal(storableVar); + } + + filter.accept(new CodeGen(cf, ctorBuilder, isAllowedBuilder, storableVar), null); + + // Finish constructor. + ctorBuilder.returnVoid(); + + return (Class>) ci.defineClass(cf); + } + + public static interface Factory { + /** + * @param cursor cursor to wrap and filter + * @param filterValues values corresponding to original filter used to create this factory + */ + Cursor newFilteredCursor(Cursor cursor, Object... filterValues); + } + + /** + * Ensures properties can be read and that relational operation is + * supported. + */ + private static class Validator extends Visitor { + Validator() { + } + + public Object visit(PropertyFilter filter, Object param) { + ChainedProperty chained = filter.getChainedProperty(); + + switch (filter.getOperator()) { + case EQ: case NE: + // Use equals() method instead of comparator. + break; + default: + TypeDesc typeDesc = TypeDesc.forClass(chained.getType()).toObjectType(); + + if (!Comparable.class.isAssignableFrom(typeDesc.toClass())) { + throw new UnsupportedOperationException + ("Property \"" + chained + "\" does not implement Comparable"); + } + break; + } + + // Follow the chain, verifying that each property has an accessible + // read method. + + checkForReadMethod(chained, chained.getPrimeProperty()); + for (int i=0; i chained, + StorableProperty property) { + if (property.getReadMethod() == null) { + String msg; + if (chained.getChainCount() == 0) { + msg = "Property \"" + property.getName() + "\" cannot be read"; + } else { + msg = "Property \"" + property.getName() + "\" of \"" + chained + + "\" cannot be read"; + } + throw new UnsupportedOperationException(msg); + } + } + } + + private static class CodeGen extends Visitor { + private static String FIELD_PREFIX = "value$"; + + private final ClassFile mClassFile; + private final CodeBuilder mCtorBuilder; + private final CodeBuilder mIsAllowedBuilder; + private final LocalVariable mStorableVar; + + private final Stack mScopeStack; + + private int mPropertyOrdinal; + + CodeGen(ClassFile cf, + CodeBuilder ctorBuilder, + CodeBuilder isAllowedBuilder, LocalVariable storableVar) { + mClassFile = cf; + mCtorBuilder = ctorBuilder; + mIsAllowedBuilder = isAllowedBuilder; + mStorableVar = storableVar; + mScopeStack = new Stack(); + mScopeStack.push(new Scope(null, null)); + } + + public Object visit(OrFilter filter, Object param) { + Label failLocation = mIsAllowedBuilder.createLabel(); + // Inherit success location to short-circuit if 'or' test succeeds. + mScopeStack.push(new Scope(failLocation, getScope().mSuccessLocation)); + filter.getLeftFilter().accept(this, param); + failLocation.setLocation(); + mScopeStack.pop(); + filter.getRightFilter().accept(this, param); + return null; + } + + public Object visit(AndFilter filter, Object param) { + Label successLocation = mIsAllowedBuilder.createLabel(); + // Inherit fail location to short-circuit if 'and' test fails. + mScopeStack.push(new Scope(getScope().mFailLocation, successLocation)); + filter.getLeftFilter().accept(this, param); + successLocation.setLocation(); + mScopeStack.pop(); + filter.getRightFilter().accept(this, param); + return null; + } + + public Object visit(PropertyFilter filter, Object param) { + ChainedProperty chained = filter.getChainedProperty(); + TypeDesc type = TypeDesc.forClass(chained.getType()); + TypeDesc fieldType = actualFieldType(type); + String fieldName = FIELD_PREFIX + mPropertyOrdinal; + + // Define storage field. + mClassFile.addField(Modifiers.PRIVATE.toFinal(true), fieldName, fieldType); + + // Add code to constructor to store value into field. + CodeBuilder b = mCtorBuilder; + b.loadThis(); + b.loadLocal(b.getParameter(1)); + b.loadConstant(mPropertyOrdinal); + b.loadFromArray(OBJECT); + b.checkCast(type.toObjectType()); + b.convert(type.toObjectType(), fieldType, CodeAssembler.CONVERT_FP_BITS); + b.storeField(fieldName, fieldType); + + // Add code to load property value to stack. + b = mIsAllowedBuilder; + b.loadLocal(mStorableVar); + loadProperty(b, chained.getPrimeProperty()); + for (int i=0; i property) { + Method readMethod = property.getReadMethod(); + if (readMethod == null) { + // Invoke synthetic package accessible method. + String readName = property.getReadMethodName(); + try { + readMethod = property.getEnclosingType().getDeclaredMethod(readName); + } catch (NoSuchMethodException e) { + // This shouldn't happen since it was checked for earlier. + throw new IllegalArgumentException + ("Property \"" + property.getName() + "\" cannot be read"); + } + } + b.invoke(readMethod); + } + + private void addPropertyFilter(CodeBuilder b, TypeDesc type, RelOp relOp) { + TypeDesc fieldType = actualFieldType(type); + String fieldName = FIELD_PREFIX + mPropertyOrdinal; + + if (type.getTypeCode() == OBJECT_CODE) { + // Check if actual property being examined is null. + + LocalVariable actualValue = b.createLocalVariable("temp", type); + b.storeLocal(actualValue); + b.loadLocal(actualValue); + Label notNull = b.createLabel(); + + switch (relOp) { + case EQ: default: // actual = ? + // If actual value is null, so test value must be null also + // to succeed. + case LE: // actual <= ? + // If actual value is null, and since null is high, test + // value must be null also to succeed. + b.ifNullBranch(notNull, false); + b.loadThis(); + b.loadField(fieldName, fieldType); + getScope().successIfNullElseFail(b, true); + break; + case NE: // actual != ? + // If actual value is null, so test value must be non-null + // to succeed. + case GT: // actual > ? + // If actual value is null, and since null is high, test + // value must be non-null to succeed. + b.ifNullBranch(notNull, false); + b.loadThis(); + b.loadField(fieldName, fieldType); + getScope().successIfNullElseFail(b, false); + break; + case LT: // actual < ? + // Since null is high, always fail if actual value is + // null. Don't need to examine test value. + getScope().failIfNull(b, true); + break; + case GE: // actual >= ? + // Since null is high, always succeed if actual value is + // null. Don't need to examine test value. + getScope().successIfNull(b, true); + break; + } + + notNull.setLocation(); + + // At this point, actual property value is known to not be null. + + // Check if property being tested for is null. + b.loadThis(); + b.loadField(fieldName, fieldType); + + switch (relOp) { + case EQ: default: // non-null actual = ? + // If test value is null, fail. + case GT: // non-null actual > ? + // If test value is null, and since null is high, fail. + case GE: // non-null actual >= ? + // If test value is null, and since null is high, fail. + getScope().failIfNull(b, true); + break; + case NE: // non-null actual != ? + // If test value is null, succeed + case LE: // non-null actual <= ? + // If test value is null, and since null is high, succeed. + case LT: // non-null actual < ? + // If test value is null, and since null is high, succeed. + getScope().successIfNull(b, true); + break; + } + + b.loadLocal(actualValue); + } + + // When this point is reached, actual property value is on the + // operand stack, and it is non-null. Property value to test + // against is not on the stack, but it is known to be non-null. + + TypeDesc primitiveType = type.toPrimitiveType(); + + if (primitiveType == null) { + b.loadThis(); + b.loadField(fieldName, fieldType); + + // Do object comparison + switch (relOp) { + case EQ: case NE: default: + if (fieldType.isArray()) { + TypeDesc arraysDesc = TypeDesc.forClass(Arrays.class); + TypeDesc componentType = fieldType.getComponentType(); + TypeDesc arrayType = fieldType; + String methodName; + if (componentType.isArray()) { + methodName = "deepEquals"; + arrayType = OBJECT.toArrayType(); + } else { + methodName = "equals"; + if (!componentType.isPrimitive()) { + arrayType = OBJECT.toArrayType(); + } + } + b.invokeStatic(arraysDesc, methodName, BOOLEAN, + new TypeDesc[] {arrayType, arrayType}); + } else { + b.invokeVirtual(OBJECT, "equals", BOOLEAN, new TypeDesc[] {OBJECT}); + } + // Success if boolean value is non-zero for EQ, zero for NE. + getScope().successIfZeroComparisonElseFail(b, relOp.reverse()); + break; + + case GT: case GE: case LE: case LT: + // Compare method exists because it was checked for earlier + b.invokeInterface(TypeDesc.forClass(Comparable.class), "compareTo", + INT, new TypeDesc[] {OBJECT}); + getScope().successIfZeroComparisonElseFail(b, relOp); + break; + } + } else { + if (!type.isPrimitive()) { + // Extract primitive value out of wrapper. + b.convert(type, primitiveType); + } + + // Floating point values are compared based on actual + // bits. This allows NaN to be considered in the comparison. + if (primitiveType == FLOAT) { + b.convert(primitiveType, INT, CodeBuilder.CONVERT_FP_BITS); + primitiveType = INT; + } else if (primitiveType == DOUBLE) { + b.convert(primitiveType, LONG, CodeBuilder.CONVERT_FP_BITS); + primitiveType = LONG; + } + + b.loadThis(); + b.loadField(fieldName, fieldType); + // No need to do anything special for floating point since it + // has been pre-converted to bits. + b.convert(fieldType, primitiveType); + + switch (primitiveType.getTypeCode()) { + case INT_CODE: default: + getScope().successIfComparisonElseFail(b, relOp); + break; + + case LONG_CODE: + b.math(Opcode.LCMP); + getScope().successIfZeroComparisonElseFail(b, relOp); + break; + } + } + } + + /** + * Returns the actual field type used to store the given property + * type. Floating point values are represented in their "bit" form, in + * order to compare against NaN. + */ + private static TypeDesc actualFieldType(TypeDesc type) { + if (type.toPrimitiveType() == FLOAT) { + if (type.isPrimitive()) { + type = INT; + } else { + type = INT.toObjectType(); + } + } else if (type.toPrimitiveType() == DOUBLE) { + if (type.isPrimitive()) { + type = LONG; + } else { + type = LONG.toObjectType(); + } + } + return type; + } + + /** + * Defines boolean logic branching scope. + */ + private static class Scope { + // If null, return false on test failure. + final Label mFailLocation; + // If null, return true on test success. + final Label mSuccessLocation; + + Scope(Label failLocation, Label successLocation) { + mFailLocation = failLocation; + mSuccessLocation = successLocation; + } + + void fail(CodeBuilder b) { + if (mFailLocation != null) { + b.branch(mFailLocation); + } else { + b.loadConstant(false); + b.returnValue(BOOLEAN); + } + } + + /** + * Branch to the fail location if the value on the stack is + * null. When choice is false, fail on non-null. + * + * @param choice if true, fail when null, else fail when not null + */ + void failIfNull(CodeBuilder b, boolean choice) { + if (mFailLocation != null) { + b.ifNullBranch(mFailLocation, choice); + } else { + Label pass = b.createLabel(); + b.ifNullBranch(pass, !choice); + b.loadConstant(false); + b.returnValue(BOOLEAN); + pass.setLocation(); + } + } + + void success(CodeBuilder b) { + if (mSuccessLocation != null) { + b.branch(mSuccessLocation); + } else { + b.loadConstant(true); + b.returnValue(BOOLEAN); + } + } + + /** + * Branch to the success location if the value on the stack is + * null. When choice is false, success on non-null. + * + * @param choice if true, success when null, else success when not null + */ + void successIfNull(CodeBuilder b, boolean choice) { + if (mSuccessLocation != null) { + b.ifNullBranch(mSuccessLocation, choice); + } else { + Label pass = b.createLabel(); + b.ifNullBranch(pass, !choice); + b.loadConstant(true); + b.returnValue(BOOLEAN); + pass.setLocation(); + } + } + + /** + * Branch to the success location if the value on the stack is + * null. When choice is false, success on non-null. If the success + * condition is not met, the fail location is branched to. + * + * @param choice if true, success when null, else success when not null + */ + void successIfNullElseFail(CodeBuilder b, boolean choice) { + if (mSuccessLocation != null) { + b.ifNullBranch(mSuccessLocation, choice); + fail(b); + } else if (mFailLocation != null) { + b.ifNullBranch(mFailLocation, !choice); + success(b); + } else { + Label success = b.createLabel(); + b.ifNullBranch(success, choice); + b.loadConstant(false); + b.returnValue(BOOLEAN); + success.setLocation(); + b.loadConstant(true); + b.returnValue(BOOLEAN); + } + } + + void successIfZeroComparisonElseFail(CodeBuilder b, RelOp relOp) { + if (mSuccessLocation != null) { + b.ifZeroComparisonBranch(mSuccessLocation, relOpToChoice(relOp)); + fail(b); + } else if (mFailLocation != null) { + b.ifZeroComparisonBranch(mFailLocation, relOpToChoice(relOp.reverse())); + success(b); + } else { + if (relOp == RelOp.NE) { + b.returnValue(BOOLEAN); + } else if (relOp == RelOp.EQ) { + b.loadConstant(1); + b.math(Opcode.IAND); + b.loadConstant(1); + b.math(Opcode.IXOR); + b.returnValue(BOOLEAN); + } else { + Label success = b.createLabel(); + b.ifZeroComparisonBranch(success, relOpToChoice(relOp)); + b.loadConstant(false); + b.returnValue(BOOLEAN); + success.setLocation(); + b.loadConstant(true); + b.returnValue(BOOLEAN); + } + } + } + + void successIfComparisonElseFail(CodeBuilder b, RelOp relOp) { + if (mSuccessLocation != null) { + b.ifComparisonBranch(mSuccessLocation, relOpToChoice(relOp)); + fail(b); + } else if (mFailLocation != null) { + b.ifComparisonBranch(mFailLocation, relOpToChoice(relOp.reverse())); + success(b); + } else { + Label success = b.createLabel(); + b.ifComparisonBranch(success, relOpToChoice(relOp)); + b.loadConstant(false); + b.returnValue(BOOLEAN); + success.setLocation(); + b.loadConstant(true); + b.returnValue(BOOLEAN); + } + } + + private String relOpToChoice(RelOp relOp) { + switch (relOp) { + case EQ: default: + return "=="; + case NE: + return "!="; + case LT: + return "<"; + case GE: + return ">="; + case GT: + return ">"; + case LE: + return "<="; + } + } + } + } +} diff --git a/src/main/java/com/amazon/carbonado/cursor/GroupedCursor.java b/src/main/java/com/amazon/carbonado/cursor/GroupedCursor.java new file mode 100644 index 0000000..0b17e50 --- /dev/null +++ b/src/main/java/com/amazon/carbonado/cursor/GroupedCursor.java @@ -0,0 +1,205 @@ +/* + * Copyright 2006 Amazon Technologies, Inc. or its affiliates. + * Amazon, Amazon.com and Carbonado are trademarks or registered trademarks + * of Amazon Technologies, Inc. or its affiliates. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.amazon.carbonado.cursor; + +import java.util.Comparator; +import java.util.NoSuchElementException; + +import org.cojen.util.BeanComparator; + +import com.amazon.carbonado.Cursor; +import com.amazon.carbonado.FetchException; +import com.amazon.carbonado.FetchInterruptedException; + +/** + * Abstract cursor for aggregation and finding distinct data. The source cursor + * must be ordered in some fashion by the grouping properties. The sequence of + * properties must match, but it does not matter if they are ascending or + * descending. + * + * @author Brian S O'Neill + * @see SortedCursor + */ +public abstract class GroupedCursor extends AbstractCursor { + private final Cursor mCursor; + private final Comparator mGroupComparator; + + private S mGroupLeader; + private G mNextAggregate; + + /** + * Create a GroupedCursor with an existing group comparator. The comparator + * defines the ordering of the source cursor, and it should be a partial + * odering. If group comparator defines a total ordering, then all groups + * have one member. + * + * @param cursor source of elements which must be ordered properly + * @param groupComparator comparator which defines ordering of source cursor + */ + protected GroupedCursor(Cursor cursor, Comparator groupComparator) { + if (cursor == null || groupComparator == null) { + throw new IllegalArgumentException(); + } + mCursor = cursor; + mGroupComparator = groupComparator; + } + + /** + * Create a GroupedCursor using properties to define the group + * comparator. The set of properties defines the ordering of the source + * cursor, and it should be a partial ordering. If properties define a + * total ordering, then all groups have one member. + * + * @param cursor source of elements which must be ordered properly + * @param type type of storable to create cursor for + * @param groupProperties list of properties to group by + * @throws IllegalArgumentException if any property is null or not a member + * of storable type + */ + protected GroupedCursor(Cursor cursor, Class type, String... groupProperties) { + if (cursor == null) { + throw new IllegalArgumentException(); + } + mCursor = cursor; + mGroupComparator = SortedCursor.createComparator(type, groupProperties); + } + + /** + * Returns the comparator used to identify group boundaries. + */ + public Comparator comparator() { + return mGroupComparator; + } + + /** + * This method is called for the first entry in a group. This method is not + * called again until after finishGroup is called. + * + * @param groupLeader first entry in group + */ + protected abstract void beginGroup(S groupLeader) throws FetchException; + + /** + * This method is called when more entries are found for the current + * group. This method is not called until after beginGroup has been + * called. It may called multiple times until finishGroup is called. + * + * @param groupMember additional entry in group + */ + protected abstract void addToGroup(S groupMember) throws FetchException; + + /** + * This method is called when a group is finished, and it can return an + * aggregate. Simply return null if aggregate should be filtered out. + * + * @return aggregate, or null to filter it out + */ + protected abstract G finishGroup() throws FetchException; + + public void close() throws FetchException { + synchronized (mCursor) { + mCursor.close(); + mGroupLeader = null; + mNextAggregate = null; + } + } + + public boolean hasNext() throws FetchException { + synchronized (mCursor) { + if (mNextAggregate != null) { + return true; + } + + try { + int count = 0; + if (mCursor.hasNext()) { + if (mGroupLeader == null) { + beginGroup(mGroupLeader = mCursor.next()); + } + + while (mCursor.hasNext()) { + S groupMember = mCursor.next(); + + if (mGroupComparator.compare(mGroupLeader, groupMember) == 0) { + addToGroup(groupMember); + } else { + G aggregate = finishGroup(); + + beginGroup(mGroupLeader = groupMember); + + if (aggregate != null) { + mNextAggregate = aggregate; + return true; + } + } + + interruptCheck(++count); + } + + G aggregate = finishGroup(); + + if (aggregate != null) { + mNextAggregate = aggregate; + return true; + } + } + } catch (NoSuchElementException e) { + } + + return false; + } + } + + public G next() throws FetchException { + synchronized (mCursor) { + if (hasNext()) { + G next = mNextAggregate; + mNextAggregate = null; + return next; + } + throw new NoSuchElementException(); + } + } + + public int skipNext(int amount) throws FetchException { + synchronized (mCursor) { + if (amount <= 0) { + if (amount < 0) { + throw new IllegalArgumentException("Cannot skip negative amount: " + amount); + } + return 0; + } + + int count = 0; + while (--amount >= 0 && hasNext()) { + interruptCheck(++count); + mNextAggregate = null; + } + + return count; + } + } + + private void interruptCheck(int count) throws FetchException { + if ((count & ~0xff) == 0 && Thread.interrupted()) { + close(); + throw new FetchInterruptedException(); + } + } +} diff --git a/src/main/java/com/amazon/carbonado/cursor/IntersectionCursor.java b/src/main/java/com/amazon/carbonado/cursor/IntersectionCursor.java new file mode 100644 index 0000000..9d11b2f --- /dev/null +++ b/src/main/java/com/amazon/carbonado/cursor/IntersectionCursor.java @@ -0,0 +1,118 @@ +/* + * Copyright 2006 Amazon Technologies, Inc. or its affiliates. + * Amazon, Amazon.com and Carbonado are trademarks or registered trademarks + * of Amazon Technologies, Inc. or its affiliates. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.amazon.carbonado.cursor; + +import java.util.Comparator; +import java.util.NoSuchElementException; + +import com.amazon.carbonado.FetchException; +import com.amazon.carbonado.Cursor; + +/** + * Wraps two Cursors and performs a set intersection operation. In + * boolean logic, this is an and operation. + * + *

Both cursors must return results in the same order. Ordering is preserved + * by the intersection. + * + * @author Brian S O'Neill + * @see UnionCursor + * @see DifferenceCursor + * @see SymmetricDifferenceCursor + */ +public class IntersectionCursor extends AbstractCursor { + private final Cursor mLeftCursor; + private final Cursor mRightCursor; + private final Comparator mOrder; + + private S mNext; + + /** + * @param left cursor to wrap + * @param right cursor to wrap + * @param order describes sort ordering of wrapped cursors, which must be + * a total ordering + */ + public IntersectionCursor(Cursor left, Cursor right, Comparator order) { + if (left == null || right == null || order == null) { + throw new IllegalArgumentException(); + } + mLeftCursor = left; + mRightCursor = right; + mOrder = order; + } + + public synchronized void close() throws FetchException { + mLeftCursor.close(); + mRightCursor.close(); + mNext = null; + } + + public synchronized boolean hasNext() throws FetchException { + if (mNext != null) { + return true; + } + + S nextLeft, nextRight; + + if (mLeftCursor.hasNext()) { + nextLeft = mLeftCursor.next(); + } else { + close(); + return false; + } + if (mRightCursor.hasNext()) { + nextRight = mRightCursor.next(); + } else { + close(); + return false; + } + + while (true) { + int result = mOrder.compare(nextLeft, nextRight); + if (result < 0) { + if (mLeftCursor.hasNext()) { + nextLeft = mLeftCursor.next(); + } else { + close(); + return false; + } + } else if (result > 0) { + if (mRightCursor.hasNext()) { + nextRight = mRightCursor.next(); + } else { + close(); + return false; + } + } else { + mNext = nextLeft; + return true; + } + } + } + + public synchronized S next() throws FetchException { + if (hasNext()) { + S next = mNext; + mNext = null; + return next; + } + throw new NoSuchElementException(); + } +} diff --git a/src/main/java/com/amazon/carbonado/cursor/IteratorCursor.java b/src/main/java/com/amazon/carbonado/cursor/IteratorCursor.java new file mode 100644 index 0000000..0330629 --- /dev/null +++ b/src/main/java/com/amazon/carbonado/cursor/IteratorCursor.java @@ -0,0 +1,62 @@ +/* + * Copyright 2006 Amazon Technologies, Inc. or its affiliates. + * Amazon, Amazon.com and Carbonado are trademarks or registered trademarks + * of Amazon Technologies, Inc. or its affiliates. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.amazon.carbonado.cursor; + +import java.util.Iterator; +import java.util.NoSuchElementException; + +/** + * Adapts an Iterator into a Cursor. + * + * @author Brian S O'Neill + */ +public class IteratorCursor extends AbstractCursor { + private Iterator mIterator; + + /** + * @param iterable collection to iterate over, or null for empty cursor + */ + public IteratorCursor(Iterable iterable) { + this(iterable == null ? (Iterator) null : iterable.iterator()); + } + + /** + * @param iterator iterator to wrap, or null for empty cursor + */ + public IteratorCursor(Iterator iterator) { + mIterator = iterator; + } + + public void close() { + mIterator = null; + } + + public boolean hasNext() { + Iterator it = mIterator; + return it != null && it.hasNext(); + } + + public S next() { + Iterator it = mIterator; + if (it == null) { + throw new NoSuchElementException(); + } + return it.next(); + } +} diff --git a/src/main/java/com/amazon/carbonado/cursor/JoinedCursorFactory.java b/src/main/java/com/amazon/carbonado/cursor/JoinedCursorFactory.java new file mode 100644 index 0000000..ae98a95 --- /dev/null +++ b/src/main/java/com/amazon/carbonado/cursor/JoinedCursorFactory.java @@ -0,0 +1,459 @@ +/* + * Copyright 2006 Amazon Technologies, Inc. or its affiliates. + * Amazon, Amazon.com and Carbonado are trademarks or registered trademarks + * of Amazon Technologies, Inc. or its affiliates. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.amazon.carbonado.cursor; + +import java.lang.reflect.Field; + +import java.util.Map; + +import org.cojen.classfile.ClassFile; +import org.cojen.classfile.CodeBuilder; +import org.cojen.classfile.FieldInfo; +import org.cojen.classfile.Label; +import org.cojen.classfile.LocalVariable; +import org.cojen.classfile.MethodInfo; +import org.cojen.classfile.Modifiers; +import org.cojen.classfile.TypeDesc; + +import org.cojen.util.ClassInjector; +import org.cojen.util.KeyFactory; +import org.cojen.util.SoftValuedHashMap; +import org.cojen.util.WeakIdentityMap; + +import com.amazon.carbonado.Cursor; +import com.amazon.carbonado.FetchException; +import com.amazon.carbonado.Query; +import com.amazon.carbonado.Repository; +import com.amazon.carbonado.RepositoryException; +import com.amazon.carbonado.Storable; +import com.amazon.carbonado.Storage; +import com.amazon.carbonado.SupportException; + +import com.amazon.carbonado.info.ChainedProperty; +import com.amazon.carbonado.info.StorableInfo; +import com.amazon.carbonado.info.StorableIntrospector; +import com.amazon.carbonado.info.StorableProperty; + +import com.amazon.carbonado.util.QuickConstructorGenerator; + +import com.amazon.carbonado.spi.CodeBuilderUtil; +import static com.amazon.carbonado.spi.CommonMethodNames.*; + +/** + * Given two joined types A and B, this factory converts a cursor + * over type A into a cursor over type B. For example, consider + * two storable types, Employer and Person. A query filter for persons with a + * given employer might look like: {@code "employer.name = ?" } The join can be + * manually implemented by querying for employers and then using this factory + * to produce persons: + * + *

+ * JoinedCursorFactory<Employer, Person> factory = new JoinedCursorFactory<Employer, Person>
+ *     (repo, Person.class, "employer", Employer.class);
+ *
+ * Cursor<Employer> employerCursor = repo.storageFor(Employer.class)
+ *     .query("name = ?").with(...).fetch();
+ *
+ * Cursor<Person> personCursor = factory.join(employerCursor);
+ * 
+ * + * Chained properties are supported as well. A query filter for persons with an + * employer in a given state might look like: {@code "employer.address.state = ?" } + * The join can be manually implemented as: + * + *
+ * JoinedCursorFactory<Address, Person> factory = new JoinedCursorFactory<Address, Person>
+ *     (repo, Person.class, "employer.address", Address.class);
+ *
+ * Cursor<Address> addressCursor = repo.storageFor(Address.class)
+ *     .query("state = ?").with(...).fetch();
+ *
+ * Cursor<Person> personCursor = factory.join(addressCursor);
+ * 
+ * + * @author Brian S O'Neill + */ +public class JoinedCursorFactory { + private static final String STORAGE_FIELD_NAME = "storage"; + private static final String QUERY_FIELD_NAME = "query"; + private static final String QUERY_FILTER_FIELD_NAME = "queryFilter"; + private static final String ACTIVE_A_FIELD_NAME = "active"; + + private static final Map cJoinerCursorClassCache; + + static { + cJoinerCursorClassCache = new SoftValuedHashMap(); + } + + private static synchronized Joiner + newBasicJoiner(StorableProperty bToAProperty, Storage bStorage) + throws FetchException + { + Class aType = bToAProperty.getType(); + + final Object key = KeyFactory.createKey + (new Object[] {aType, + bToAProperty.getEnclosingType(), + bToAProperty.getName()}); + + Class clazz = cJoinerCursorClassCache.get(key); + + if (clazz == null) { + clazz = generateBasicJoinerCursor(aType, bToAProperty); + cJoinerCursorClassCache.put(key, clazz); + } + + // Transforming cursor class may need a Query to operate on. + Query bQuery = null; + try { + String filter = (String) clazz.getField(QUERY_FILTER_FIELD_NAME).get(null); + bQuery = bStorage.query(filter); + } catch (NoSuchFieldException e) { + } catch (IllegalAccessException e) { + } + + BasicJoiner.Factory factory = (BasicJoiner.Factory) QuickConstructorGenerator + .getInstance(clazz, BasicJoiner.Factory.class); + + return new BasicJoiner(factory, bStorage, bQuery); + } + + private static Class> + generateBasicJoinerCursor(Class aType, StorableProperty bToAProperty) + { + final int propCount = bToAProperty.getJoinElementCount(); + + // Determine if join is one-to-one, in which case slightly more optimal + // code can be generated. + boolean isOneToOne = true; + for (int i=0; i bType = bToAProperty.getEnclosingType(); + + String packageName; + { + String name = bType.getName(); + int index = name.lastIndexOf('.'); + if (index >= 0) { + packageName = name.substring(0, index); + } else { + packageName = ""; + } + } + + ClassLoader loader = bType.getClassLoader(); + + ClassInjector ci = ClassInjector.create(packageName + ".JoinedCursor", loader); + Class superclass = isOneToOne ? TransformedCursor.class : MultiTransformedCursor.class; + ClassFile cf = new ClassFile(ci.getClassName(), superclass); + cf.markSynthetic(); + cf.setSourceFile(JoinedCursorFactory.class.getName()); + cf.setTarget("1.5"); + + final TypeDesc queryType = TypeDesc.forClass(Query.class); + final TypeDesc cursorType = TypeDesc.forClass(Cursor.class); + final TypeDesc storageType = TypeDesc.forClass(Storage.class); + final TypeDesc storableType = TypeDesc.forClass(Storable.class); + + if (isOneToOne) { + cf.addField(Modifiers.PRIVATE.toFinal(true), STORAGE_FIELD_NAME, storageType); + } else { + // Field to hold query which fetches type B. + cf.addField(Modifiers.PRIVATE.toFinal(true), QUERY_FIELD_NAME, queryType); + } + + boolean canSetAReference = bToAProperty.getWriteMethod() != null; + + if (canSetAReference && !isOneToOne) { + // Field to hold active A storable. + cf.addField(Modifiers.PRIVATE, ACTIVE_A_FIELD_NAME, TypeDesc.forClass(aType)); + } + + // Constructor accepts a Storage and Query, but Storage is only used + // for one-to-one, and Query is only used for one-to-many. + { + MethodInfo mi = cf.addConstructor(Modifiers.PUBLIC, + new TypeDesc[] {cursorType, storageType, queryType}); + CodeBuilder b = new CodeBuilder(mi); + + b.loadThis(); + b.loadLocal(b.getParameter(0)); // pass A cursor to superclass + b.invokeSuperConstructor(new TypeDesc[] {cursorType}); + + if (isOneToOne) { + b.loadThis(); + b.loadLocal(b.getParameter(1)); // push B storage to stack + b.storeField(STORAGE_FIELD_NAME, storageType); + } else { + b.loadThis(); + b.loadLocal(b.getParameter(2)); // push B query to stack + b.storeField(QUERY_FIELD_NAME, queryType); + } + + b.returnVoid(); + } + + // For one-to-many, a query is needed. Save the query filter in a + // public static field to be grabbed later. + if (!isOneToOne) { + StringBuilder queryBuilder = new StringBuilder(); + + for (int i=0; i 0) { + queryBuilder.append(" & "); + } + queryBuilder.append(bToAProperty.getInternalJoinElement(i).getName()); + queryBuilder.append(" = ?"); + } + + FieldInfo fi = cf.addField(Modifiers.PUBLIC.toStatic(true).toFinal(true), + QUERY_FILTER_FIELD_NAME, TypeDesc.STRING); + fi.setConstantValue(queryBuilder.toString()); + } + + // Implement the transform method. + if (isOneToOne) { + MethodInfo mi = cf.addMethod(Modifiers.PROTECTED, "transform", TypeDesc.OBJECT, + new TypeDesc[] {TypeDesc.OBJECT}); + mi.addException(TypeDesc.forClass(FetchException.class)); + CodeBuilder b = new CodeBuilder(mi); + + LocalVariable aVar = b.createLocalVariable(null, storableType); + b.loadLocal(b.getParameter(0)); + b.checkCast(TypeDesc.forClass(aType)); + b.storeLocal(aVar); + + // Prepare B storable. + b.loadThis(); + b.loadField(STORAGE_FIELD_NAME, storageType); + b.invokeInterface(storageType, PREPARE_METHOD_NAME, storableType, null); + LocalVariable bVar = b.createLocalVariable(null, storableType); + b.checkCast(TypeDesc.forClass(bType)); + b.storeLocal(bVar); + + // Copy pk property values from A to B. + for (int i=0; i internal = bToAProperty.getInternalJoinElement(i); + StorableProperty external = bToAProperty.getExternalJoinElement(i); + + b.loadLocal(bVar); + b.loadLocal(aVar); + b.invoke(external.getReadMethod()); + b.invoke(internal.getWriteMethod()); + } + + // tryLoad b. + b.loadLocal(bVar); + b.invokeInterface(storableType, TRY_LOAD_METHOD_NAME, TypeDesc.BOOLEAN, null); + Label wasLoaded = b.createLabel(); + b.ifZeroComparisonBranch(wasLoaded, "!="); + + b.loadNull(); + b.returnValue(storableType); + + wasLoaded.setLocation(); + + if (canSetAReference) { + b.loadLocal(bVar); + b.loadLocal(aVar); + b.invoke(bToAProperty.getWriteMethod()); + } + + b.loadLocal(bVar); + b.returnValue(storableType); + } else { + MethodInfo mi = cf.addMethod(Modifiers.PROTECTED, "transform", cursorType, + new TypeDesc[] {TypeDesc.OBJECT}); + mi.addException(TypeDesc.forClass(FetchException.class)); + CodeBuilder b = new CodeBuilder(mi); + + LocalVariable aVar = b.createLocalVariable(null, storableType); + b.loadLocal(b.getParameter(0)); + b.checkCast(TypeDesc.forClass(aType)); + b.storeLocal(aVar); + + if (canSetAReference) { + b.loadThis(); + b.loadLocal(aVar); + b.storeField(ACTIVE_A_FIELD_NAME, TypeDesc.forClass(aType)); + } + + // Populate query parameters. + b.loadThis(); + b.loadField(QUERY_FIELD_NAME, queryType); + + for (int i=0; i external = bToAProperty.getExternalJoinElement(i); + b.loadLocal(aVar); + b.invoke(external.getReadMethod()); + + TypeDesc bindType = CodeBuilderUtil.bindQueryParam(external.getType()); + CodeBuilderUtil.convertValue(b, external.getType(), bindType.toClass()); + b.invokeInterface(queryType, WITH_METHOD_NAME, queryType, + new TypeDesc[] {bindType}); + } + + // Now fetch and return. + b.invokeInterface(queryType, FETCH_METHOD_NAME, cursorType, null); + b.returnValue(cursorType); + } + + if (canSetAReference && !isOneToOne) { + // Override the "next" method to set A object on B. + MethodInfo mi = cf.addMethod(Modifiers.PUBLIC, "next", TypeDesc.OBJECT, null); + mi.addException(TypeDesc.forClass(FetchException.class)); + CodeBuilder b = new CodeBuilder(mi); + + b.loadThis(); + b.invokeSuper(TypeDesc.forClass(MultiTransformedCursor.class), + "next", TypeDesc.OBJECT, null); + b.checkCast(TypeDesc.forClass(bType)); + b.dup(); + + b.loadThis(); + b.loadField(ACTIVE_A_FIELD_NAME, TypeDesc.forClass(aType)); + b.invoke(bToAProperty.getWriteMethod()); + + b.returnValue(storableType); + } + + return (Class>) ci.defineClass(cf); + } + + private final Joiner mJoiner; + + /** + * @param repo access to storage instances for properties + * @param bType type of B instances + * @param bToAProperty property of B type which maps to instances of + * A type. + * @param aType type of A instances + * @throws IllegalArgumentException if property type is not A + */ + public JoinedCursorFactory(Repository repo, + Class bType, + String bToAProperty, + Class aType) + throws SupportException, FetchException, RepositoryException + { + this(repo, + ChainedProperty.parse(StorableIntrospector.examine(bType), bToAProperty), + aType); + } + + /** + * @param repo access to storage instances for properties + * @param bToAProperty property of B type which maps to instances of + * A type. + * @param aType type of A instances + * @throws IllegalArgumentException if property type is not A + */ + public JoinedCursorFactory(Repository repo, + ChainedProperty bToAProperty, + Class aType) + throws SupportException, FetchException, RepositoryException + { + if (bToAProperty.getType() != aType) { + throw new IllegalArgumentException + ("Property is not of type \"" + aType.getName() + "\": " + + bToAProperty); + } + + StorableProperty primeB = bToAProperty.getPrimeProperty(); + Storage primeBStorage = repo.storageFor(primeB.getEnclosingType()); + + Joiner joiner = newBasicJoiner(primeB, primeBStorage); + + int chainCount = bToAProperty.getChainCount(); + for (int i=0; i) joiner; + } + + /** + * Given a cursor over A, returns a new cursor over joined property + * of type B. + */ + public Cursor join(Cursor cursor) { + return mJoiner.join(cursor); + } + + private static interface Joiner { + Cursor join(Cursor cursor); + } + + /** + * Support for joins without an intermediate hop. + */ + private static class BasicJoiner implements Joiner { + private final Factory mJoinerFactory; + private final Storage mBStorage; + private final Query mBQuery; + + BasicJoiner(Factory factory, Storage bStorage, Query bQuery) { + mJoinerFactory = factory; + mBStorage = bStorage; + mBQuery = bQuery; + } + + public Cursor join(Cursor cursor) { + return mJoinerFactory.newJoinedCursor(cursor, mBStorage, mBQuery); + } + + /** + * Needs to be public for {@link QuickConstructorGenerator}. + */ + public static interface Factory { + Cursor newJoinedCursor(Cursor cursor, Storage bStorage, Query bQuery); + } + } + + /** + * Support for joins with an intermediate hop -- multi-way joins. + */ + private static class MultiJoiner + implements Joiner + { + private final Joiner mAToMid; + private final Joiner mMidToB; + + MultiJoiner(Joiner aToMidJoiner, Joiner midToBJoiner) { + mAToMid = aToMidJoiner; + mMidToB = midToBJoiner; + } + + public Cursor join(Cursor cursor) { + return mMidToB.join(mAToMid.join(cursor)); + } + } +} diff --git a/src/main/java/com/amazon/carbonado/cursor/MergeSortBuffer.java b/src/main/java/com/amazon/carbonado/cursor/MergeSortBuffer.java new file mode 100644 index 0000000..0094183 --- /dev/null +++ b/src/main/java/com/amazon/carbonado/cursor/MergeSortBuffer.java @@ -0,0 +1,498 @@ +/* + * Copyright 2006 Amazon Technologies, Inc. or its affiliates. + * Amazon, Amazon.com and Carbonado are trademarks or registered trademarks + * of Amazon Technologies, Inc. or its affiliates. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.amazon.carbonado.cursor; + +import java.io.BufferedInputStream; +import java.io.BufferedOutputStream; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.EOFException; +import java.io.IOException; +import java.io.RandomAccessFile; + +import java.lang.reflect.UndeclaredThrowableException; + +import java.util.AbstractCollection; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Comparator; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.PriorityQueue; + +import com.amazon.carbonado.FetchInterruptedException; +import com.amazon.carbonado.Storable; +import com.amazon.carbonado.Storage; +import com.amazon.carbonado.SupportException; + +import com.amazon.carbonado.spi.RAFInputStream; +import com.amazon.carbonado.spi.RAFOutputStream; +import com.amazon.carbonado.spi.StorableSerializer; + +/** + * Sort buffer implemented via a merge sort algorithm. If there are too many + * storables to fit in the reserved memory buffer, they are sorted and + * serialized to temporary files. + * + * @author Brian S O'Neill + * @see SortedCursor + */ +public class MergeSortBuffer extends AbstractCollection + implements SortBuffer +{ + private static final int MIN_ARRAY_CAPACITY = 64; + + // Bigger means better performance, but more memory is used. + private static final int MAX_ARRAY_CAPACITY = 8192; + + // Bigger means better performance, but more file handles may be used. + private static final int MAX_FILE_COUNT = 100; + + // Bigger may improve write performance, but not by much. + private static final int OUTPUT_BUFFER_SIZE = 10000; + + private final Storage mStorage; + private final String mTempDir; + private final int mMaxArrayCapacity; + + private S[] mElements; + private int mSize; + private int mTotalSize; + + private StorableSerializer mSerializer; + private WorkFilePool mWorkFilePool; + private List mFilesInUse; + + private Comparator mComparator; + + private volatile boolean mStop; + + /** + * @param storage storage type of elements + */ + public MergeSortBuffer(Storage storage) { + this(storage, null, MAX_ARRAY_CAPACITY); + } + + /** + * @param storage storage type of elements + * @param tempDir directory to store temp files for merging, or null for default + */ + public MergeSortBuffer(Storage storage, String tempDir) { + this(storage, tempDir, MAX_ARRAY_CAPACITY); + } + + /** + * @param storage storage type of elements + * @param tempDir directory to store temp files for merging, or null for default + * @param maxArrayCapacity maximum amount of storables to keep in an array + * before serializing to a file + */ + @SuppressWarnings("unchecked") + public MergeSortBuffer(Storage storage, String tempDir, int maxArrayCapacity) { + mStorage = storage; + mTempDir = tempDir; + mMaxArrayCapacity = maxArrayCapacity; + int cap = Math.min(MIN_ARRAY_CAPACITY, maxArrayCapacity); + mElements = (S[]) new Storable[cap]; + } + + public void prepare(Comparator comparator) { + if (comparator == null) { + throw new IllegalArgumentException(); + } + clear(); + mComparator = comparator; + } + + public boolean add(S storable) { + arrayPrep: + if (mSize >= mElements.length) { + if (mElements.length < mMaxArrayCapacity) { + // Increase array capacity. + int newCap = mElements.length * 2; + if (newCap > mMaxArrayCapacity) { + newCap = mMaxArrayCapacity; + } + S[] newElements = (S[]) new Storable[newCap]; + System.arraycopy(mElements, 0, newElements, 0, mElements.length); + mElements = newElements; + break arrayPrep; + } + + // Sort current in-memory results and serialize to a temp file. + + // Make sure everything is set up to use temp files. + { + if (mSerializer == null) { + try { + mSerializer = StorableSerializer.forType(mStorage.getStorableType()); + } catch (SupportException e) { + throw new UndeclaredThrowableException(e); + } + mWorkFilePool = WorkFilePool.getInstance(mTempDir); + mFilesInUse = new ArrayList(); + } + } + + Arrays.sort(mElements, mComparator); + + RandomAccessFile raf; + try { + raf = mWorkFilePool.acquireWorkFile(this); + OutputStream out = + new BufferedOutputStream(new RAFOutputStream(raf), OUTPUT_BUFFER_SIZE); + + StorableSerializer serializer = mSerializer; + + if (mFilesInUse.size() < (MAX_FILE_COUNT - 1)) { + mFilesInUse.add(raf); + int count = 0; + for (S element : mElements) { + // Check every so often if interrupted. + interruptCheck(++count); + serializer.write(element, out); + } + } else { + // Merge files together. + + // Determine the average length per file in use. + long totalLength = 0; + int fileCount = mFilesInUse.size(); + for (int i=0; i filesToExclude = new ArrayList(); + List filesToMerge = new ArrayList(); + + long mergedLength = 0; + for (int i=0; i averageLength) { + filesToExclude.add(fileInUse); + } else { + filesToMerge.add(fileInUse); + mergedLength += fileLength; + } + } + + mFilesInUse.add(raf); + + // Pre-allocate space, in an attempt to improve performance + // as well as error out earlier, should the disk be full. + raf.setLength(mergedLength); + + int count = 0; + Iterator it = iterator(filesToMerge); + while (it.hasNext()) { + // Check every so often if interrupted. + interruptCheck(++count); + S element = it.next(); + serializer.write(element, out); + } + + mWorkFilePool.releaseWorkFiles(filesToMerge); + mFilesInUse = filesToExclude; + mFilesInUse.add(raf); + } + + out.flush(); + + // Truncate any data from last time file was used. + raf.setLength(raf.getFilePointer()); + // Reset to start of file in preparation for reading later. + raf.seek(0); + } catch (IOException e) { + throw new UndeclaredThrowableException(e); + } + + mSize = 0; + } + + mElements[mSize++] = storable; + mTotalSize++; + return true; + } + + public int size() { + return mTotalSize; + } + + public Iterator iterator() { + return iterator(mFilesInUse); + } + + private Iterator iterator(List filesToMerge) { + if (mSerializer == null) { + return new ObjectArrayIterator(mElements, 0, mSize); + } + + // Merge with the files. Use a priority queue to decide which is the + // next buffer to pull an element from. + + PriorityQueue> pq = new PriorityQueue>(1 + mFilesInUse.size()); + pq.add(new ArrayIter(mComparator, mElements, mSize)); + for (RandomAccessFile raf : filesToMerge) { + try { + raf.seek(0); + } catch (IOException e) { + throw new UndeclaredThrowableException(e); + } + + InputStream in = new BufferedInputStream(new RAFInputStream(raf)); + + pq.add(new InputIter(mComparator, mSerializer, mStorage, in)); + } + + return new Merger(pq); + } + + public void clear() { + if (mTotalSize > 0) { + mSize = 0; + mTotalSize = 0; + if (mWorkFilePool != null && mFilesInUse != null) { + mWorkFilePool.releaseWorkFiles(mFilesInUse); + mFilesInUse.clear(); + } + } + } + + public void sort() { + // Sort current in-memory results. Anything residing in files has + // already been sorted. + if (mComparator == null) { + throw new IllegalStateException("Buffer was not prepared"); + } + Arrays.sort(mElements, 0, mSize, mComparator); + } + + public void close() { + clear(); + if (mWorkFilePool != null) { + mWorkFilePool.unregisterWorkFileUser(this); + } + } + + void stop() { + mStop = true; + } + + private void interruptCheck(int count) { + if ((count & ~0xff) == 0 && (mStop || Thread.interrupted())) { + close(); + throw new UndeclaredThrowableException(new FetchInterruptedException()); + } + } + + /** + * Simple interator interface that supports peeking at next element. + */ + private abstract static class Iter implements Comparable> { + private final Comparator mComparator; + + protected Iter(Comparator comparator) { + mComparator = comparator; + } + + /** + * Returns null if iterator is exhausted. + */ + abstract S peek(); + + /** + * Returns null if iterator is exhausted. + */ + abstract S next(); + + public int compareTo(Iter iter) { + S thisPeek = peek(); + S thatPeek = iter.peek(); + if (thisPeek == null) { + if (thatPeek == null) { + return 0; + } + // Null is low in order to rise to top of priority queue. This + // Iter will then be tossed out of the priority queue. + return -1; + } else if (thatPeek == null) { + return 1; + } + return mComparator.compare(thisPeek, thatPeek); + } + } + + /** + * Iterator that reads from an array. + */ + private static class ArrayIter extends Iter { + private final S[] mArray; + private final int mSize; + private int mPos; + + ArrayIter(Comparator comparator, S[] array, int size) { + super(comparator); + mArray = array; + mSize = size; + } + + S peek() { + int pos = mPos; + if (pos >= mSize) { + return null; + } + return mArray[pos]; + } + + S next() { + int pos = mPos; + if (pos >= mSize) { + return null; + } + S next = mArray[pos]; + mPos = pos + 1; + return next; + } + } + + /** + * Iterator that reads from an input stream of serialized Storables. + */ + private static class InputIter extends Iter { + private StorableSerializer mSerializer; + private Storage mStorage; + private InputStream mIn; + + private S mNext; + + InputIter(Comparator comparator, + StorableSerializer serializer, Storage storage, InputStream in) + { + super(comparator); + mSerializer = serializer; + mStorage = storage; + mIn = in; + } + + S peek() { + if (mNext != null) { + return mNext; + } + if (mIn != null) { + try { + mNext = mSerializer.read(mStorage, mIn); + // TODO: Serializer is unable to determine state of + // properties, and so they are lost. Since these storables + // came directly from a cursor, we know they are clean. + mNext.markAllPropertiesClean(); + } catch (EOFException e) { + mIn = null; + } catch (IOException e) { + throw new UndeclaredThrowableException(e); + } + } + return mNext; + } + + S next() { + S next = peek(); + mNext = null; + return next; + } + } + + private static class Merger implements Iterator { + private final PriorityQueue> mPQ; + + private S mNext; + + Merger(PriorityQueue> pq) { + mPQ = pq; + } + + public boolean hasNext() { + if (mNext == null) { + while (true) { + Iter iter = mPQ.poll(); + if (iter == null) { + return false; + } + if ((mNext = iter.next()) != null) { + // Iter is not exhausted, so put it back in to be used + // again. Adding it back causes it to be inserted in + // the proper order, based on the next element it has + // to offer. + mPQ.add(iter); + return true; + } + } + } + return true; + } + + public S next() { + if (hasNext()) { + S next = mNext; + mNext = null; + return next; + } + throw new NoSuchElementException(); + } + + public void remove() { + throw new UnsupportedOperationException(); + } + } + + private static class ObjectArrayIterator implements Iterator { + private final E[] mElements; + private final int mEnd; + private int mIndex; + + public ObjectArrayIterator(E[] elements, int start, int end) { + mElements = elements; + mEnd = end; + mIndex = start; + } + + public boolean hasNext() { + return mIndex < mEnd; + } + + public E next() { + if (mIndex >= mEnd) { + throw new NoSuchElementException(); + } + return mElements[mIndex++]; + } + + public void remove() { + throw new UnsupportedOperationException(); + } + } +} diff --git a/src/main/java/com/amazon/carbonado/cursor/MultiTransformedCursor.java b/src/main/java/com/amazon/carbonado/cursor/MultiTransformedCursor.java new file mode 100644 index 0000000..0cfd197 --- /dev/null +++ b/src/main/java/com/amazon/carbonado/cursor/MultiTransformedCursor.java @@ -0,0 +1,132 @@ +/* + * Copyright 2006 Amazon Technologies, Inc. or its affiliates. + * Amazon, Amazon.com and Carbonado are trademarks or registered trademarks + * of Amazon Technologies, Inc. or its affiliates. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.amazon.carbonado.cursor; + +import java.util.NoSuchElementException; + +import com.amazon.carbonado.Cursor; +import com.amazon.carbonado.FetchException; +import com.amazon.carbonado.FetchInterruptedException; + +/** + * Abstract cursor which wraps another cursor and transforms each storable + * result into a set of target storables. This class can be used for + * implementing one-to-many joins. Use {@link TransformedCursor} for one-to-one + * joins. + * + * @author Brian S O'Neill + */ +public abstract class MultiTransformedCursor extends AbstractCursor { + private final Cursor mCursor; + + private Cursor mNextCursor; + + protected MultiTransformedCursor(Cursor cursor) { + if (cursor == null) { + throw new IllegalArgumentException(); + } + mCursor = cursor; + } + + /** + * This method must be implemented to transform storables. If the storable + * cannot be transformed, either throw a FetchException or return null. If + * null is returned, the storable is simply filtered out. + * + * @return transformed storables, or null to filter it out + */ + protected abstract Cursor transform(S storable) throws FetchException; + + public void close() throws FetchException { + synchronized (mCursor) { + mCursor.close(); + if (mNextCursor != null) { + mNextCursor.close(); + mNextCursor = null; + } + } + } + + public boolean hasNext() throws FetchException { + synchronized (mCursor) { + if (mNextCursor != null) { + if (mNextCursor.hasNext()) { + return true; + } + mNextCursor.close(); + mNextCursor = null; + } + try { + int count = 0; + while (mCursor.hasNext()) { + Cursor nextCursor = transform(mCursor.next()); + if (nextCursor != null) { + if (nextCursor.hasNext()) { + mNextCursor = nextCursor; + return true; + } + nextCursor.close(); + } + interruptCheck(++count); + } + } catch (NoSuchElementException e) { + } + return false; + } + } + + public T next() throws FetchException { + synchronized (mCursor) { + if (hasNext()) { + return mNextCursor.next(); + } + throw new NoSuchElementException(); + } + } + + public int skipNext(int amount) throws FetchException { + synchronized (mCursor) { + if (amount <= 0) { + if (amount < 0) { + throw new IllegalArgumentException("Cannot skip negative amount: " + amount); + } + return 0; + } + + int count = 0; + while (hasNext()) { + int chunk = mNextCursor.skipNext(amount); + count += chunk; + if ((amount -= chunk) <= 0) { + break; + } + interruptCheck(count); + } + + return count; + } + } + + private void interruptCheck(int count) throws FetchException { + if ((count & ~0xff) == 0 && Thread.interrupted()) { + close(); + throw new FetchInterruptedException(); + } + } +} diff --git a/src/main/java/com/amazon/carbonado/cursor/SortBuffer.java b/src/main/java/com/amazon/carbonado/cursor/SortBuffer.java new file mode 100644 index 0000000..089488c --- /dev/null +++ b/src/main/java/com/amazon/carbonado/cursor/SortBuffer.java @@ -0,0 +1,53 @@ +/* + * Copyright 2006 Amazon Technologies, Inc. or its affiliates. + * Amazon, Amazon.com and Carbonado are trademarks or registered trademarks + * of Amazon Technologies, Inc. or its affiliates. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.amazon.carbonado.cursor; + +import java.util.Comparator; +import java.util.Collection; + +import com.amazon.carbonado.FetchException; + +/** + * Buffers up Storable instances allowing them to be sorted. Should any method + * need to throw an undeclared exception, wrap it with an + * UndeclaredThrowableException. + * + * @author Brian S O'Neill + * @see SortedCursor + */ +public interface SortBuffer extends Collection { + /** + * Clears buffer and assigns a comparator for sorting. + * + * @throws IllegalArgumentException if comparator is null + */ + void prepare(Comparator comparator); + + /** + * Finish sorting buffer. + * + * @throws IllegalStateException if prepare was never called + */ + void sort() throws FetchException; + + /** + * Clear and close buffer. + */ + void close() throws FetchException; +} diff --git a/src/main/java/com/amazon/carbonado/cursor/SortedCursor.java b/src/main/java/com/amazon/carbonado/cursor/SortedCursor.java new file mode 100644 index 0000000..7a728bb --- /dev/null +++ b/src/main/java/com/amazon/carbonado/cursor/SortedCursor.java @@ -0,0 +1,332 @@ +/* + * Copyright 2006 Amazon Technologies, Inc. or its affiliates. + * Amazon, Amazon.com and Carbonado are trademarks or registered trademarks + * of Amazon Technologies, Inc. or its affiliates. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.amazon.carbonado.cursor; + +import java.lang.reflect.UndeclaredThrowableException; +import java.util.Comparator; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; + +import org.cojen.util.BeanComparator; + +import com.amazon.carbonado.FetchException; +import com.amazon.carbonado.FetchInterruptedException; +import com.amazon.carbonado.Cursor; +import com.amazon.carbonado.Storable; + +import com.amazon.carbonado.info.Direction; +import com.amazon.carbonado.info.OrderedProperty; + +/** + * Wraps another Cursor and ensures the results are sorted. If the elements in + * the source cursor are already partially sorted, a handled comparator can be + * passed in which specifies the partial ordering. Elements are then processed + * in smaller chunks rather than sorting the entire set. The handled comparator + * can represent ascending or descending order of source elements. + * + * @author Brian S O'Neill + */ +public class SortedCursor extends AbstractCursor { + /** + * Convenience method to create a comparator which orders storables by the + * given order-by properties. The property names may be prefixed with '+' + * or '-' to indicate ascending or descending order. If the prefix is + * omitted, ascending order is assumed. + * + * @param type type of storable to create comparator for + * @param orderProperties list of properties to order by + * @throws IllegalArgumentException if any property is null or not a member + * of storable type + */ + public static Comparator createComparator(Class type, String... orderProperties) { + BeanComparator bc = BeanComparator.forClass(type); + for (String property : orderProperties) { + bc = bc.orderBy(property); + bc = bc.caseSensitive(); + } + return bc; + } + + /** + * Convenience method to create a comparator which orders storables by the + * given properties. + * + * @param properties list of properties to order by + * @throws IllegalArgumentException if no properties or if any property is null + */ + public static Comparator + createComparator(OrderedProperty... properties) + { + if (properties == null || properties.length == 0 || properties[0] == null) { + throw new IllegalArgumentException(); + } + + Class type = properties[0].getChainedProperty().getPrimeProperty().getEnclosingType(); + + BeanComparator bc = BeanComparator.forClass(type); + + for (OrderedProperty property : properties) { + if (property == null) { + throw new IllegalArgumentException(); + } + bc = bc.orderBy(property.getChainedProperty().toString()); + bc = bc.caseSensitive(); + if (property.getDirection() == Direction.DESCENDING) { + bc = bc.reverse(); + } + } + + return bc; + } + + /** + * Convenience method to create a comparator which orders storables by the + * given properties. + * + * @param properties list of properties to order by + * @throws IllegalArgumentException if no properties or if any property is null + */ + public static Comparator + createComparator(List> properties) + { + if (properties == null || properties.size() == 0 || properties.get(0) == null) { + throw new IllegalArgumentException(); + } + + Class type = + properties.get(0).getChainedProperty().getPrimeProperty().getEnclosingType(); + + BeanComparator bc = BeanComparator.forClass(type); + + for (OrderedProperty property : properties) { + if (property == null) { + throw new IllegalArgumentException(); + } + bc = bc.orderBy(property.getChainedProperty().toString()); + bc = bc.caseSensitive(); + if (property.getDirection() == Direction.DESCENDING) { + bc = bc.reverse(); + } + } + + return bc; + } + + /** Wrapped cursor */ + private final Cursor mCursor; + /** Buffer to store and sort results */ + private final SortBuffer mChunkBuffer; + /** Optional comparator which matches ordering already handled by wrapped cursor */ + private final Comparator mChunkMatcher; + /** Comparator to use for sorting chunks */ + private final Comparator mChunkSorter; + + /** Iteration over current contents in mChunkBuffer */ + private Iterator mChunkIterator; + + /** + * First record in a chunk, according to chunk matcher. In order to tell if + * the next chunk has been reached, a record has to be read from the + * wrapped cursor. The record is "pushed back" here for use when the next + * chunk is ready to be processed. + */ + private S mNextChunkStart; + + /** + * @param cursor cursor to wrap + * @param buffer required buffer to hold results + * @param handled optional comparator which represents how the results are + * already sorted + * @param finisher required comparator which finishes the sort + */ + public SortedCursor(Cursor cursor, SortBuffer buffer, + Comparator handled, Comparator finisher) { + if (cursor == null || finisher == null) { + throw new IllegalArgumentException(); + } + mCursor = cursor; + mChunkBuffer = buffer; + mChunkMatcher = handled; + mChunkSorter = finisher; + } + + /** + * @param cursor cursor to wrap + * @param buffer required buffer to hold results + * @param type type of storable to create cursor for + * @param orderProperties list of properties to order by + * @throws IllegalArgumentException if any property is null or not a member + * of storable type + */ + public SortedCursor(Cursor cursor, SortBuffer buffer, + Class type, String... orderProperties) { + this(cursor, buffer, null, createComparator(type, orderProperties)); + } + + /** + * Returns a comparator representing the effective sort order of this cursor. + */ + public Comparator comparator() { + if (mChunkMatcher == null) { + return mChunkSorter; + } + return new Comparator() { + public int compare(S a, S b) { + int result = mChunkMatcher.compare(a, b); + if (result == 0) { + result = mChunkSorter.compare(a, b); + } + return result; + } + }; + } + + public synchronized void close() throws FetchException { + mCursor.close(); + mChunkIterator = null; + mChunkBuffer.close(); + } + + public synchronized boolean hasNext() throws FetchException { + prepareNextChunk(); + try { + if (mChunkIterator.hasNext()) { + return true; + } + } catch (UndeclaredThrowableException e) { + throw toFetchException(e); + } + close(); + return false; + } + + public synchronized S next() throws FetchException { + prepareNextChunk(); + try { + return mChunkIterator.next(); + } catch (UndeclaredThrowableException e) { + throw toFetchException(e); + } catch (NoSuchElementException e) { + try { + close(); + } catch (FetchException e2) { + // Don't care. + } + throw e; + } + } + + public synchronized int skipNext(int amount) throws FetchException { + if (amount <= 0) { + if (amount < 0) { + throw new IllegalArgumentException("Cannot skip negative amount: " + amount); + } + return 0; + } + + int count = 0; + while (--amount >= 0 && hasNext()) { + next(); + count++; + } + + return count; + } + + private void prepareNextChunk() throws FetchException { + if (mChunkIterator != null && (mChunkMatcher == null || mChunkIterator.hasNext())) { + // Ready to go. + return; + } + + Cursor cursor = mCursor; + SortBuffer buffer = mChunkBuffer; + Comparator matcher = mChunkMatcher; + + try { + mChunkIterator = null; + buffer.prepare(mChunkSorter); + + fill: { + if (matcher == null) { + // Buffer up entire results and sort. + int count = 0; + while (cursor.hasNext()) { + // Check every so often if interrupted. + if ((++count & ~0xff) == 0 && Thread.interrupted()) { + throw new FetchInterruptedException(); + } + buffer.add(cursor.next()); + } + break fill; + } + + // Read a chunk into the buffer. First record is compared against + // subsequent records, to determine when the chunk end is reached. + + S chunkStart; + if (mNextChunkStart != null) { + chunkStart = mNextChunkStart; + mNextChunkStart = null; + } else if (cursor.hasNext()) { + chunkStart = cursor.next(); + } else { + break fill; + } + + buffer.add(chunkStart); + int count = 1; + + while (cursor.hasNext()) { + // Check every so often if interrupted. + if ((++count & ~0xff) == 0 && Thread.interrupted()) { + throw new FetchInterruptedException(); + } + S next = cursor.next(); + if (matcher.compare(chunkStart, next) != 0) { + // Save for reading next chunk later. + mNextChunkStart = next; + break; + } + buffer.add(next); + } + } + + if (buffer.size() > 1) { + buffer.sort(); + } + + mChunkIterator = buffer.iterator(); + } catch (UndeclaredThrowableException e) { + throw toFetchException(e); + } + } + + private FetchException toFetchException(UndeclaredThrowableException e) { + Throwable cause = e.getCause(); + if (cause == null) { + cause = e; + } + if (cause instanceof FetchException) { + return (FetchException) cause; + } + return new FetchException(null, cause); + } +} diff --git a/src/main/java/com/amazon/carbonado/cursor/SymmetricDifferenceCursor.java b/src/main/java/com/amazon/carbonado/cursor/SymmetricDifferenceCursor.java new file mode 100644 index 0000000..6851464 --- /dev/null +++ b/src/main/java/com/amazon/carbonado/cursor/SymmetricDifferenceCursor.java @@ -0,0 +1,123 @@ +/* + * Copyright 2006 Amazon Technologies, Inc. or its affiliates. + * Amazon, Amazon.com and Carbonado are trademarks or registered trademarks + * of Amazon Technologies, Inc. or its affiliates. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.amazon.carbonado.cursor; + +import java.util.Comparator; +import java.util.NoSuchElementException; + +import com.amazon.carbonado.FetchException; +import com.amazon.carbonado.Cursor; + +/** + * Wraps two Cursors and performs a symmetric set difference + * operation. In boolean logic, this is an exclusive or operation. + * + *

Both cursors must return results in the same order. Ordering is preserved + * by the difference. + * + * @author Brian S O'Neill + * @see UnionCursor + * @see IntersectionCursor + * @see DifferenceCursor + */ +public class SymmetricDifferenceCursor extends AbstractCursor { + private final Cursor mLeftCursor; + private final Cursor mRightCursor; + private final Comparator mOrder; + + private S mNextLeft; + private S mNextRight; + private int mCompareResult; + + /** + * @param left cursor to wrap + * @param right cursor to wrap + * @param order describes sort ordering of wrapped cursors, which must be + * a total ordering + */ + public SymmetricDifferenceCursor(Cursor left, Cursor right, Comparator order) { + if (left == null || right == null || order == null) { + throw new IllegalArgumentException(); + } + mLeftCursor = left; + mRightCursor = right; + mOrder = order; + } + + public synchronized void close() throws FetchException { + mLeftCursor.close(); + mRightCursor.close(); + mNextLeft = null; + mNextRight = null; + } + + public boolean hasNext() throws FetchException { + return compareNext() != 0; + } + + /** + * Returns 0 if no next element available, <0 if next element is from + * left source cursor, and >0 if next element is from right source + * cursor. + */ + public synchronized int compareNext() throws FetchException { + if (mCompareResult != 0) { + return mCompareResult; + } + + while (true) { + if (mNextLeft == null && mLeftCursor.hasNext()) { + mNextLeft = mLeftCursor.next(); + } + if (mNextRight == null && mRightCursor.hasNext()) { + mNextRight = mRightCursor.next(); + } + + if (mNextLeft == null) { + return mNextRight != null ? 1 : 0; + } + if (mNextRight == null) { + return -1; + } + + if ((mCompareResult = mOrder.compare(mNextLeft, mNextRight)) == 0) { + mNextLeft = null; + mNextRight = null; + } else { + return mCompareResult; + } + } + } + + public synchronized S next() throws FetchException { + S next; + int result = compareNext(); + if (result < 0) { + next = mNextLeft; + mNextLeft = null; + } else if (result > 0) { + next = mNextRight; + mNextRight = null; + } else { + throw new NoSuchElementException(); + } + mCompareResult = 0; + return next; + } +} diff --git a/src/main/java/com/amazon/carbonado/cursor/ThrottledCursor.java b/src/main/java/com/amazon/carbonado/cursor/ThrottledCursor.java new file mode 100644 index 0000000..2f4c5a0 --- /dev/null +++ b/src/main/java/com/amazon/carbonado/cursor/ThrottledCursor.java @@ -0,0 +1,101 @@ +/* + * Copyright 2006 Amazon Technologies, Inc. or its affiliates. + * Amazon, Amazon.com and Carbonado are trademarks or registered trademarks + * of Amazon Technologies, Inc. or its affiliates. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.amazon.carbonado.cursor; + +import com.amazon.carbonado.Cursor; +import com.amazon.carbonado.FetchException; +import com.amazon.carbonado.FetchInterruptedException; + +import com.amazon.carbonado.util.Throttle; + +/** + * Wraps another cursor and fetches results at a reduced speed. + * + * @author Brian S O'Neill + */ +public class ThrottledCursor extends AbstractCursor { + private static final int WINDOW_SIZE = 10; + private static final int SLEEP_PRECISION_MILLIS = 50; + + private final Cursor mCursor; + private final Throttle mThrottle; + private final double mDesiredSpeed; + + /** + * @param cursor cursor to wrap + * @param throttle 1.0 = fetch at full speed, 0.5 = fetch at half speed, + * 0.1 = fetch at one tenth speed, etc. + */ + public ThrottledCursor(Cursor cursor, double throttle) { + mCursor = cursor; + if (throttle < 1.0) { + if (throttle < 0.0) { + throttle = 0.0; + } + mThrottle = new Throttle(WINDOW_SIZE); + mDesiredSpeed = throttle; + } else { + mThrottle = null; + mDesiredSpeed = 1.0; + } + } + + public void close() throws FetchException { + mCursor.close(); + } + + public boolean hasNext() throws FetchException { + return mCursor.hasNext(); + } + + public S next() throws FetchException { + throttle(); + return mCursor.next(); + } + + public int skipNext(int amount) throws FetchException { + if (amount <= 0) { + if (amount < 0) { + throw new IllegalArgumentException("Cannot skip negative amount: " + amount); + } + return 0; + } + + int count = 0; + while (--amount >= 0) { + throttle(); + if (skipNext(1) <= 0) { + break; + } + count++; + } + + return count; + } + + private void throttle() throws FetchInterruptedException { + if (mThrottle != null) { + try { + mThrottle.throttle(mDesiredSpeed, SLEEP_PRECISION_MILLIS); + } catch (InterruptedException e) { + throw new FetchInterruptedException(e); + } + } + } +} diff --git a/src/main/java/com/amazon/carbonado/cursor/TransformedCursor.java b/src/main/java/com/amazon/carbonado/cursor/TransformedCursor.java new file mode 100644 index 0000000..0860cf0 --- /dev/null +++ b/src/main/java/com/amazon/carbonado/cursor/TransformedCursor.java @@ -0,0 +1,119 @@ +/* + * Copyright 2006 Amazon Technologies, Inc. or its affiliates. + * Amazon, Amazon.com and Carbonado are trademarks or registered trademarks + * of Amazon Technologies, Inc. or its affiliates. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.amazon.carbonado.cursor; + +import java.util.NoSuchElementException; + +import com.amazon.carbonado.Cursor; +import com.amazon.carbonado.FetchException; +import com.amazon.carbonado.FetchInterruptedException; + +/** + * Abstract cursor which wraps another cursor and transforms each storable + * result into a target storable. This class can be used for implementing + * one-to-one joins. Use {@link MultiTransformedCursor} for one-to-many joins. + * + * @author Brian S O'Neill + */ +public abstract class TransformedCursor extends AbstractCursor { + private final Cursor mCursor; + + private T mNext; + + protected TransformedCursor(Cursor cursor) { + if (cursor == null) { + throw new IllegalArgumentException(); + } + mCursor = cursor; + } + + /** + * This method must be implemented to transform storables. If the storable + * cannot be transformed, either throw a FetchException or return null. If + * null is returned, the storable is simply filtered out. + * + * @return transformed storable, or null to filter it out + */ + protected abstract T transform(S storable) throws FetchException; + + public void close() throws FetchException { + synchronized (mCursor) { + mCursor.close(); + mNext = null; + } + } + + public boolean hasNext() throws FetchException { + synchronized (mCursor) { + if (mNext != null) { + return true; + } + try { + int count = 0; + while (mCursor.hasNext()) { + T next = transform(mCursor.next()); + if (next != null) { + mNext = next; + return true; + } + interruptCheck(++count); + } + } catch (NoSuchElementException e) { + } + return false; + } + } + + public T next() throws FetchException { + synchronized (mCursor) { + if (hasNext()) { + T next = mNext; + mNext = null; + return next; + } + throw new NoSuchElementException(); + } + } + + public int skipNext(int amount) throws FetchException { + synchronized (mCursor) { + if (amount <= 0) { + if (amount < 0) { + throw new IllegalArgumentException("Cannot skip negative amount: " + amount); + } + return 0; + } + + int count = 0; + while (--amount >= 0 && hasNext()) { + interruptCheck(++count); + mNext = null; + } + + return count; + } + } + + private void interruptCheck(int count) throws FetchException { + if ((count & ~0xff) == 0 && Thread.interrupted()) { + close(); + throw new FetchInterruptedException(); + } + } +} diff --git a/src/main/java/com/amazon/carbonado/cursor/UnionCursor.java b/src/main/java/com/amazon/carbonado/cursor/UnionCursor.java new file mode 100644 index 0000000..7a62678 --- /dev/null +++ b/src/main/java/com/amazon/carbonado/cursor/UnionCursor.java @@ -0,0 +1,106 @@ +/* + * Copyright 2006 Amazon Technologies, Inc. or its affiliates. + * Amazon, Amazon.com and Carbonado are trademarks or registered trademarks + * of Amazon Technologies, Inc. or its affiliates. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.amazon.carbonado.cursor; + +import java.util.Comparator; +import java.util.NoSuchElementException; + +import com.amazon.carbonado.FetchException; +import com.amazon.carbonado.Cursor; + +/** + * Wraps two Cursors and performs a set union operation. In boolean + * logic, this is an or operation. + * + *

Both cursors must return results in the same order. Ordering is preserved + * by the union. + * + * @author Brian S O'Neill + * @see IntersectionCursor + * @see DifferenceCursor + * @see SymmetricDifferenceCursor + */ +public class UnionCursor extends AbstractCursor { + private final Cursor mLeftCursor; + private final Cursor mRightCursor; + private final Comparator mOrder; + + private S mNextLeft; + private S mNextRight; + + /** + * @param left cursor to wrap + * @param right cursor to wrap + * @param order describes sort ordering of wrapped cursors, which must be + * a total ordering + */ + public UnionCursor(Cursor left, Cursor right, Comparator order) { + if (left == null || right == null || order == null) { + throw new IllegalArgumentException(); + } + mLeftCursor = left; + mRightCursor = right; + mOrder = order; + } + + public synchronized void close() throws FetchException { + mLeftCursor.close(); + mRightCursor.close(); + mNextLeft = null; + mNextRight = null; + } + + public synchronized boolean hasNext() throws FetchException { + if (mNextLeft == null && mLeftCursor.hasNext()) { + mNextLeft = mLeftCursor.next(); + } + if (mNextRight == null && mRightCursor.hasNext()) { + mNextRight = mRightCursor.next(); + } + return mNextLeft != null || mNextRight != null; + } + + public synchronized S next() throws FetchException { + if (hasNext()) { + S next; + if (mNextLeft == null) { + next = mNextRight; + mNextRight = null; + } else if (mNextRight == null) { + next = mNextLeft; + mNextLeft = null; + } else { + int result = mOrder.compare(mNextLeft, mNextRight); + if (result < 0) { + next = mNextLeft; + mNextLeft = null; + } else if (result > 0) { + next = mNextRight; + mNextRight = null; + } else { + next = mNextLeft; + mNextLeft = null; + mNextRight = null; + } + } + return next; + } + throw new NoSuchElementException(); + } +} diff --git a/src/main/java/com/amazon/carbonado/cursor/WorkFilePool.java b/src/main/java/com/amazon/carbonado/cursor/WorkFilePool.java new file mode 100644 index 0000000..94c7b84 --- /dev/null +++ b/src/main/java/com/amazon/carbonado/cursor/WorkFilePool.java @@ -0,0 +1,192 @@ +/* + * Copyright 2006 Amazon Technologies, Inc. or its affiliates. + * Amazon, Amazon.com and Carbonado are trademarks or registered trademarks + * of Amazon Technologies, Inc. or its affiliates. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.amazon.carbonado.cursor; + +import java.io.File; +import java.io.IOException; +import java.io.RandomAccessFile; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ConcurrentHashMap; + +/** + * Used internally by {@link MergeSortBuffer}. + * + * @author Brian S O'Neill + */ +class WorkFilePool { + private static final String cTempDir = System.getProperty("java.io.tmpdir"); + + private static final ConcurrentMap cPools = + new ConcurrentHashMap(); + + /** + * @param tempDir directory to store temp files for merging, or null for default + */ + static WorkFilePool getInstance(String tempDir) { + // This method uses ConcurrentMap features to eliminate the need to + // ever synchronize access, since this method may be called frequently. + + if (tempDir == null) { + tempDir = cTempDir; + } + + WorkFilePool pool = cPools.get(tempDir); + + if (pool != null) { + return pool; + } + + String canonical; + try { + canonical = new File(tempDir).getCanonicalPath(); + } catch (IOException e) { + canonical = new File(tempDir).getAbsolutePath(); + } + + if (!canonical.equals(tempDir)) { + pool = getInstance(canonical); + cPools.putIfAbsent(tempDir, pool); + return pool; + } + + pool = new WorkFilePool(new File(tempDir)); + + WorkFilePool existing = cPools.putIfAbsent(tempDir, pool); + + if (existing == null) { + // New pool is the winner, so finish off initialization. Pool can + // be used concurrently by another thread without shutdown hook. + pool.addShutdownHook(tempDir); + } else { + pool = existing; + } + + return pool; + } + + private final File mTempDir; + + // Work files not currently being used by any MergeSortBuffer. + private final List mWorkFilePool; + // Instances of MergeSortBuffer, to be notified on shutdown that they + // should close their work files. + private final Set> mWorkFileUsers; + + private Thread mShutdownHook; + + /** + * @param tempDir directory to store temp files for merging, or null for default + */ + private WorkFilePool(File tempDir) { + mTempDir = tempDir; + mWorkFilePool = new ArrayList(); + mWorkFileUsers = new HashSet>(); + } + + RandomAccessFile acquireWorkFile(MergeSortBuffer buffer) throws IOException { + synchronized (mWorkFileUsers) { + mWorkFileUsers.add(buffer); + } + synchronized (mWorkFilePool) { + if (mWorkFilePool.size() > 0) { + return mWorkFilePool.remove(mWorkFilePool.size() - 1); + } + } + File file = File.createTempFile("carbonado-mergesort-", null, mTempDir); + file.deleteOnExit(); + return new RandomAccessFile(file, "rw"); + } + + void releaseWorkFiles(List files) { + synchronized (mWorkFilePool) { + for (RandomAccessFile raf : files) { + try { + raf.seek(0); + // Return space to file system. + raf.setLength(0); + mWorkFilePool.add(raf); + } catch (IOException e) { + // Work file is broken, discard it. + try { + raf.close(); + } catch (IOException e2) { + } + } + } + } + } + + void unregisterWorkFileUser(MergeSortBuffer buffer) { + synchronized (mWorkFileUsers) { + mWorkFileUsers.remove(buffer); + mWorkFileUsers.notify(); + } + } + + private synchronized void addShutdownHook(String tempDir) { + if (mShutdownHook != null) { + return; + } + + // Add shutdown hook to close work files so that they can be deleted. + String threadName = "MergeSortBuffer shutdown (" + tempDir + ')'; + + mShutdownHook = new Thread(threadName) { + public void run() { + // Notify users of work files and wait for them to close. + synchronized (mWorkFileUsers) { + for (MergeSortBuffer buffer : mWorkFileUsers) { + buffer.stop(); + } + final long timeout = 10000; + final long giveup = System.currentTimeMillis() + timeout; + try { + while (mWorkFileUsers.size() > 0) { + long now = System.currentTimeMillis(); + if (now < giveup) { + mWorkFileUsers.wait(giveup - now); + } else { + break; + } + } + } catch (InterruptedException e) { + } + } + + synchronized (mWorkFilePool) { + for (RandomAccessFile raf : mWorkFilePool) { + try { + raf.close(); + } catch (IOException e) { + } + } + mWorkFilePool.clear(); + } + } + }; + + Runtime.getRuntime().addShutdownHook(mShutdownHook); + } +} diff --git a/src/main/java/com/amazon/carbonado/cursor/package-info.java b/src/main/java/com/amazon/carbonado/cursor/package-info.java new file mode 100644 index 0000000..3553d1a --- /dev/null +++ b/src/main/java/com/amazon/carbonado/cursor/package-info.java @@ -0,0 +1,23 @@ +/* + * Copyright 2006 Amazon Technologies, Inc. or its affiliates. + * Amazon, Amazon.com and Carbonado are trademarks or registered trademarks + * of Amazon Technologies, Inc. or its affiliates. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Support for advanced processing of cursor results, including basic set + * theory operations. + */ +package com.amazon.carbonado.cursor; -- cgit v1.2.3