diff options
| author | Brian S. O'Neill <bronee@gmail.com> | 2006-08-30 01:55:10 +0000 | 
|---|---|---|
| committer | Brian S. O'Neill <bronee@gmail.com> | 2006-08-30 01:55:10 +0000 | 
| commit | ab635e23ad5fb9cd9edb61665a3654ee3e4b372a (patch) | |
| tree | 7feecd04c2a7305db1b4367fdf4513249c619d5b | |
| parent | edcd84a1cb3c2009b0596939f30cee35028afcfc (diff) | |
Add cursor implementations
20 files changed, 3704 insertions, 0 deletions
| diff --git a/src/main/java/com/amazon/carbonado/cursor/AbstractCursor.java b/src/main/java/com/amazon/carbonado/cursor/AbstractCursor.java new file mode 100644 index 0000000..c71393a --- /dev/null +++ b/src/main/java/com/amazon/carbonado/cursor/AbstractCursor.java @@ -0,0 +1,83 @@ +/*
 + * Copyright 2006 Amazon Technologies, Inc. or its affiliates.
 + * Amazon, Amazon.com and Carbonado are trademarks or registered trademarks
 + * of Amazon Technologies, Inc. or its affiliates.  All rights reserved.
 + *
 + * Licensed under the Apache License, Version 2.0 (the "License");
 + * you may not use this file except in compliance with the License.
 + * You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package com.amazon.carbonado.cursor;
 +
 +import java.util.ArrayList;
 +import java.util.Collection;
 +import java.util.List;
 +
 +import com.amazon.carbonado.Cursor;
 +import com.amazon.carbonado.FetchException;
 +
 +/**
 + * AbstractCursor implements a small set of common Cursor methods.
 + *
 + * @author Brian S O'Neill
 + */
 +public abstract class AbstractCursor<S> implements Cursor<S> {
 +    // Note: Since constructor takes no parameters, this class is called
 +    // Abstract instead of Base.
 +    protected AbstractCursor() {
 +    }
 +
 +    public int copyInto(Collection<? super S> c) throws FetchException {
 +        int originalSize = c.size();
 +        while (hasNext()) {
 +            c.add(next());
 +        }
 +        return c.size() - originalSize;
 +    }
 +
 +    public int copyInto(Collection<? super S> c, int limit) throws FetchException {
 +        int originalSize = c.size();
 +        while (--limit >= 0 && hasNext()) {
 +            c.add(next());
 +        }
 +        return c.size() - originalSize;
 +    }
 +
 +    public List<S> toList() throws FetchException {
 +        List<S> list = new ArrayList<S>();
 +        copyInto(list);
 +        return list;
 +    }
 +
 +    public List<S> toList(int limit) throws FetchException {
 +        List<S> list = new ArrayList<S>();
 +        copyInto(list, limit);
 +        return list;
 +    }
 +
 +    public synchronized int skipNext(int amount) throws FetchException {
 +        if (amount <= 0) {
 +            if (amount < 0) {
 +                throw new IllegalArgumentException("Cannot skip negative amount: " + amount);
 +            }
 +            return 0;
 +        }
 +
 +        int count = 0;
 +        while (--amount >= 0 && hasNext()) {
 +            next();
 +            count++;
 +        }
 +
 +        return count;
 +    }
 +}
 diff --git a/src/main/java/com/amazon/carbonado/cursor/ArraySortBuffer.java b/src/main/java/com/amazon/carbonado/cursor/ArraySortBuffer.java new file mode 100644 index 0000000..db74134 --- /dev/null +++ b/src/main/java/com/amazon/carbonado/cursor/ArraySortBuffer.java @@ -0,0 +1,62 @@ +/*
 + * Copyright 2006 Amazon Technologies, Inc. or its affiliates.
 + * Amazon, Amazon.com and Carbonado are trademarks or registered trademarks
 + * of Amazon Technologies, Inc. or its affiliates.  All rights reserved.
 + *
 + * Licensed under the Apache License, Version 2.0 (the "License");
 + * you may not use this file except in compliance with the License.
 + * You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package com.amazon.carbonado.cursor;
 +
 +import java.util.ArrayList;
 +import java.util.Collections;
 +import java.util.Comparator;
 +
 +/**
 + * Sort buffer implementation backed by an ArrayList.
 + *
 + * @author Brian S O'Neill
 + * @see SortedCursor
 + */
 +public class ArraySortBuffer<S> extends ArrayList<S> implements SortBuffer<S> {
 +    private static final long serialVersionUID = -5622302375191321452L;
 +
 +    private Comparator<S> mComparator;
 +
 +    public ArraySortBuffer() {
 +        super();
 +    }
 +
 +    public ArraySortBuffer(int initialCapacity) {
 +        super(initialCapacity);
 +    }
 +
 +    public void prepare(Comparator<S> comparator) {
 +        if (comparator == null) {
 +            throw new IllegalArgumentException();
 +        }
 +        clear();
 +        mComparator = comparator;
 +    }
 +
 +    public void sort() {
 +        if (mComparator == null) {
 +            throw new IllegalStateException("Buffer was not prepared");
 +        }
 +        Collections.sort(this, mComparator);
 +    }
 +
 +    public void close() {
 +        clear();
 +    }
 +}
 diff --git a/src/main/java/com/amazon/carbonado/cursor/DifferenceCursor.java b/src/main/java/com/amazon/carbonado/cursor/DifferenceCursor.java new file mode 100644 index 0000000..39455c8 --- /dev/null +++ b/src/main/java/com/amazon/carbonado/cursor/DifferenceCursor.java @@ -0,0 +1,120 @@ +/*
 + * Copyright 2006 Amazon Technologies, Inc. or its affiliates.
 + * Amazon, Amazon.com and Carbonado are trademarks or registered trademarks
 + * of Amazon Technologies, Inc. or its affiliates.  All rights reserved.
 + *
 + * Licensed under the Apache License, Version 2.0 (the "License");
 + * you may not use this file except in compliance with the License.
 + * You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package com.amazon.carbonado.cursor;
 +
 +import java.util.Comparator;
 +import java.util.NoSuchElementException;
 +
 +import com.amazon.carbonado.FetchException;
 +import com.amazon.carbonado.Cursor;
 +
 +/**
 + * Wraps two Cursors and performs an <i>asymmetric set difference</i>
 + * operation.
 + *
 + * <p>Both cursors must return results in the same order. Ordering is preserved
 + * by the difference.
 + *
 + * @author Brian S O'Neill
 + * @see UnionCursor
 + * @see IntersectionCursor
 + * @see SymmetricDifferenceCursor
 + */
 +public class DifferenceCursor<S> extends AbstractCursor<S> {
 +    private final Cursor<S> mLeftCursor;
 +    private final Cursor<S> mRightCursor;
 +    private final Comparator<S> mOrder;
 +
 +    private S mNext;
 +    private S mNextRight;
 +
 +    /**
 +     * @param left cursor to wrap
 +     * @param right cursor to wrap whose results are completely discarded
 +     * @param order describes sort ordering of wrapped cursors, which must be
 +     * a total ordering
 +     */
 +    public DifferenceCursor(Cursor<S> left, Cursor<S> right, Comparator<S> order) {
 +        if (left == null || right == null || order == null) {
 +            throw new IllegalArgumentException();
 +        }
 +        mLeftCursor = left;
 +        mRightCursor = right;
 +        mOrder = order;
 +    }
 +
 +    public synchronized void close() throws FetchException {
 +        mLeftCursor.close();
 +        mRightCursor.close();
 +        mNext = null;
 +        mNextRight = null;
 +    }
 +
 +    public synchronized boolean hasNext() throws FetchException {
 +        if (mNext != null) {
 +            return true;
 +        }
 +
 +        S nextLeft;
 +
 +        while (true) {
 +            if (mLeftCursor.hasNext()) {
 +                nextLeft = mLeftCursor.next();
 +            } else {
 +                close();
 +                return false;
 +            }
 +            if (mNextRight == null) {
 +                if (mRightCursor.hasNext()) {
 +                    mNextRight = mRightCursor.next();
 +                } else {
 +                    mNext = nextLeft;
 +                    return true;
 +                }
 +            }
 +
 +            while (true) {
 +                int result = mOrder.compare(nextLeft, mNextRight);
 +                if (result < 0) {
 +                    mNext = nextLeft;
 +                    return true;
 +                } else if (result > 0) {
 +                    if (mRightCursor.hasNext()) {
 +                        mNextRight = mRightCursor.next();
 +                    } else {
 +                        mNextRight = null;
 +                        mNext = nextLeft;
 +                        return true;
 +                    }
 +                } else {
 +                    break;
 +                }
 +            }
 +        }
 +    }
 +
 +    public synchronized S next() throws FetchException {
 +        if (hasNext()) {
 +            S next = mNext;
 +            mNext = null;
 +            return next;
 +        }
 +        throw new NoSuchElementException();
 +    }
 +}
 diff --git a/src/main/java/com/amazon/carbonado/cursor/EmptyCursor.java b/src/main/java/com/amazon/carbonado/cursor/EmptyCursor.java new file mode 100644 index 0000000..db3778b --- /dev/null +++ b/src/main/java/com/amazon/carbonado/cursor/EmptyCursor.java @@ -0,0 +1,103 @@ +/*
 + * Copyright 2006 Amazon Technologies, Inc. or its affiliates.
 + * Amazon, Amazon.com and Carbonado are trademarks or registered trademarks
 + * of Amazon Technologies, Inc. or its affiliates.  All rights reserved.
 + *
 + * Licensed under the Apache License, Version 2.0 (the "License");
 + * you may not use this file except in compliance with the License.
 + * You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package com.amazon.carbonado.cursor;
 +
 +import java.util.Collection;
 +import java.util.Collections;
 +import java.util.List;
 +import java.util.NoSuchElementException;
 +
 +import com.amazon.carbonado.Cursor;
 +
 +/**
 + * Special cursor implementation that is empty.
 + *
 + * @author Brian S O'Neill
 + */
 +public class EmptyCursor<S> implements Cursor<S> {
 +    private static final Cursor EMPTY_CURSOR = new EmptyCursor();
 +
 +    /**
 +     * Returns the singleton empty cursor instance.
 +     */
 +    @SuppressWarnings("unchecked")
 +    public static <S> Cursor<S> getEmptyCursor() {
 +        return EMPTY_CURSOR;
 +    }
 +
 +    // Package-private, to be used by test suite
 +    EmptyCursor() {
 +    }
 +
 +    /**
 +     * Does nothing.
 +     */
 +    public void close() {
 +    }
 +
 +    /**
 +     * Always returns false.
 +     */
 +    public boolean hasNext() {
 +        return false;
 +    }
 +
 +    /**
 +     * Always throws NoSuchElementException.
 +     */
 +    public S next() {
 +        throw new NoSuchElementException();
 +    }
 +
 +    /**
 +     * Always returns 0.
 +     */
 +    public int skipNext(int amount) {
 +        return 0;
 +    }
 +
 +    /**
 +     * Performs no copy and always returns 0.
 +     */
 +    public int copyInto(Collection<? super S> c) {
 +        return 0;
 +    }
 +
 +    /**
 +     * Performs no copy and always returns 0.
 +     */
 +    public int copyInto(Collection<? super S> c, int limit) {
 +        return 0;
 +    }
 +
 +    /**
 +     * Always returns an empty list.
 +     */
 +    public List<S> toList() {
 +        return Collections.emptyList();
 +    }
 +
 +    /**
 +     * Always returns an empty list.
 +     */
 +    public List<S> toList(int limit) {
 +        return Collections.emptyList();
 +    }
 +}
 +
 diff --git a/src/main/java/com/amazon/carbonado/cursor/FilteredCursor.java b/src/main/java/com/amazon/carbonado/cursor/FilteredCursor.java new file mode 100644 index 0000000..60b3c81 --- /dev/null +++ b/src/main/java/com/amazon/carbonado/cursor/FilteredCursor.java @@ -0,0 +1,148 @@ +/*
 + * Copyright 2006 Amazon Technologies, Inc. or its affiliates.
 + * Amazon, Amazon.com and Carbonado are trademarks or registered trademarks
 + * of Amazon Technologies, Inc. or its affiliates.  All rights reserved.
 + *
 + * Licensed under the Apache License, Version 2.0 (the "License");
 + * you may not use this file except in compliance with the License.
 + * You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package com.amazon.carbonado.cursor;
 +
 +import java.util.NoSuchElementException;
 +
 +import com.amazon.carbonado.Cursor;
 +import com.amazon.carbonado.FetchException;
 +import com.amazon.carbonado.FetchInterruptedException;
 +import com.amazon.carbonado.Storable;
 +
 +import com.amazon.carbonado.filter.ClosedFilter;
 +import com.amazon.carbonado.filter.Filter;
 +import com.amazon.carbonado.filter.FilterValues;
 +import com.amazon.carbonado.filter.OpenFilter;
 +
 +/**
 + * Wraps another cursor and applies custom filtering to reduce the set of
 + * results.
 + *
 + * @author Brian S O'Neill
 + */
 +public abstract class FilteredCursor<S> extends AbstractCursor<S> {
 +    /**
 +     * Returns a Cursor that is filtered by the given Filter and FilterValues.
 +     * The given Filter must be composed only of the same PropertyFilter
 +     * instances as used to construct the FilterValues. An
 +     * IllegalStateException will result otherwise.
 +     *
 +     * @param filter filter to apply
 +     * @param filterValues values for filter
 +     * @param cursor cursor to wrap
 +     * @return wrapped cursor which filters results
 +     * @throws IllegalStateException if any values are not specified
 +     * @throws IllegalArgumentException if filter is closed
 +     */
 +    public static <S extends Storable> Cursor<S> applyFilter(Filter<S> filter,
 +                                                             FilterValues<S> filterValues,
 +                                                             Cursor<S> cursor)
 +    {
 +        if (filter instanceof OpenFilter) {
 +            return cursor;
 +        }
 +        if (filter instanceof ClosedFilter) {
 +            throw new IllegalArgumentException();
 +        }
 +
 +        return FilteredCursorGenerator.getFactory(filter)
 +            .newFilteredCursor(cursor, filterValues.getValuesFor(filter));
 +    }
 +
 +    private final Cursor<S> mCursor;
 +
 +    private S mNext;
 +
 +    protected FilteredCursor(Cursor<S> cursor) {
 +        if (cursor == null) {
 +            throw new IllegalArgumentException();
 +        }
 +        mCursor = cursor;
 +    }
 +
 +    /**
 +     * @return false if object should not be in results
 +     */
 +    protected abstract boolean isAllowed(S storable);
 +
 +    public void close() throws FetchException {
 +        synchronized (mCursor) {
 +            mCursor.close();
 +            mNext = null;
 +        }
 +    }
 +
 +    public boolean hasNext() throws FetchException {
 +        synchronized (mCursor) {
 +            if (mNext != null) {
 +                return true;
 +            }
 +            try {
 +                int count = 0;
 +                while (mCursor.hasNext()) {
 +                    S next = mCursor.next();
 +                    if (isAllowed(next)) {
 +                        mNext = next;
 +                        return true;
 +                    }
 +                    interruptCheck(++count);
 +                }
 +            } catch (NoSuchElementException e) {
 +            }
 +            return false;
 +        }
 +    }
 +
 +    public S next() throws FetchException {
 +        synchronized (mCursor) {
 +            if (hasNext()) {
 +                S next = mNext;
 +                mNext = null;
 +                return next;
 +            }
 +            throw new NoSuchElementException();
 +        }
 +    }
 +
 +    public int skipNext(int amount) throws FetchException {
 +        synchronized (mCursor) {
 +            if (amount <= 0) {
 +                if (amount < 0) {
 +                    throw new IllegalArgumentException("Cannot skip negative amount: " + amount);
 +                }
 +                return 0;
 +            }
 +
 +            int count = 0;
 +            while (--amount >= 0 && hasNext()) {
 +                interruptCheck(++count);
 +                mNext = null;
 +            }
 +
 +            return count;
 +        }
 +    }
 +
 +    private void interruptCheck(int count) throws FetchException {
 +        if ((count & ~0xff) == 0 && Thread.interrupted()) {
 +            close();
 +            throw new FetchInterruptedException();
 +        }
 +    }
 +}
 diff --git a/src/main/java/com/amazon/carbonado/cursor/FilteredCursorGenerator.java b/src/main/java/com/amazon/carbonado/cursor/FilteredCursorGenerator.java new file mode 100644 index 0000000..ea1baa8 --- /dev/null +++ b/src/main/java/com/amazon/carbonado/cursor/FilteredCursorGenerator.java @@ -0,0 +1,665 @@ +/*
 + * Copyright 2006 Amazon Technologies, Inc. or its affiliates.
 + * Amazon, Amazon.com and Carbonado are trademarks or registered trademarks
 + * of Amazon Technologies, Inc. or its affiliates.  All rights reserved.
 + *
 + * Licensed under the Apache License, Version 2.0 (the "License");
 + * you may not use this file except in compliance with the License.
 + * You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package com.amazon.carbonado.cursor;
 +
 +import java.lang.reflect.Method;
 +import java.util.Arrays;
 +import java.util.Map;
 +import java.util.Stack;
 +
 +import org.cojen.classfile.ClassFile;
 +import org.cojen.classfile.CodeAssembler;
 +import org.cojen.classfile.CodeBuilder;
 +import org.cojen.classfile.Label;
 +import org.cojen.classfile.LocalVariable;
 +import org.cojen.classfile.MethodInfo;
 +import org.cojen.classfile.Modifiers;
 +import org.cojen.classfile.Opcode;
 +import org.cojen.classfile.TypeDesc;
 +import static org.cojen.classfile.TypeDesc.*;
 +
 +import org.cojen.util.ClassInjector;
 +import org.cojen.util.WeakIdentityMap;
 +
 +import com.amazon.carbonado.Cursor;
 +import com.amazon.carbonado.Storable;
 +
 +import com.amazon.carbonado.filter.AndFilter;
 +import com.amazon.carbonado.filter.OrFilter;
 +import com.amazon.carbonado.filter.Filter;
 +import com.amazon.carbonado.filter.PropertyFilter;
 +import com.amazon.carbonado.filter.RelOp;
 +import com.amazon.carbonado.filter.Visitor;
 +
 +import com.amazon.carbonado.info.ChainedProperty;
 +import com.amazon.carbonado.info.StorableProperty;
 +
 +import com.amazon.carbonado.util.QuickConstructorGenerator;
 +
 +
 +/**
 + * Generates Cursor implementations that wrap another Cursor, applying a
 + * property matching filter on each result.
 + *
 + * @author Brian S O'Neill
 + * @see FilteredCursor
 + */
 +class FilteredCursorGenerator {
 +    private static Map cCache = new WeakIdentityMap();
 +
 +    /**
 +     * Returns a factory for creating new filtered cursor instances.
 +     *
 +     * @param filter filter specification
 +     * @throws IllegalArgumentException if filter is null
 +     * @throws UnsupportedOperationException if filter is not supported
 +     */
 +    @SuppressWarnings("unchecked")
 +    static <S extends Storable> Factory<S> getFactory(Filter<S> filter) {
 +        if (filter == null) {
 +            throw new IllegalArgumentException();
 +        }
 +        synchronized (cCache) {
 +            Factory<S> factory = (Factory<S>) cCache.get(filter);
 +            if (factory != null) {
 +                return factory;
 +            }
 +            Class<Cursor<S>> clazz = generateClass(filter);
 +            factory = QuickConstructorGenerator.getInstance(clazz, Factory.class);
 +            cCache.put(filter, factory);
 +            return factory;
 +        }
 +    }
 +
 +    @SuppressWarnings("unchecked")
 +    private static <S extends Storable> Class<Cursor<S>> generateClass(Filter<S> filter) {
 +        filter.accept(new Validator<S>(), null);
 +
 +        Class<S> type = filter.getStorableType();
 +
 +        String packageName;
 +        {
 +            String name = type.getName();
 +            int index = name.lastIndexOf('.');
 +            if (index >= 0) {
 +                packageName = name.substring(0, index);
 +            } else {
 +                packageName = "";
 +            }
 +        }
 +
 +        // Try to generate against the same loader as the storable type. This
 +        // allows the generated class access to it, preventing a
 +        // NoClassDefFoundError.
 +        ClassLoader loader = type.getClassLoader();
 +
 +        ClassInjector ci = ClassInjector.create(packageName + ".FilteredCursor", loader);
 +        ClassFile cf = new ClassFile(ci.getClassName(), FilteredCursor.class);
 +        cf.markSynthetic();
 +        cf.setSourceFile(FilteredCursorGenerator.class.getName());
 +        cf.setTarget("1.5");
 +
 +        // Begin constructor definition.
 +        CodeBuilder ctorBuilder;
 +        {
 +            TypeDesc cursorType = TypeDesc.forClass(Cursor.class);
 +            TypeDesc objectArrayType = TypeDesc.forClass(Object[].class);
 +            TypeDesc[] params = {cursorType, objectArrayType};
 +            MethodInfo ctor = cf.addConstructor(Modifiers.PUBLIC, params);
 +            ctorBuilder = new CodeBuilder(ctor);
 +            ctorBuilder.loadThis();
 +            ctorBuilder.loadLocal(ctorBuilder.getParameter(0));
 +            ctorBuilder.invokeSuperConstructor(new TypeDesc[] {cursorType});
 +        }
 +
 +        // Begin isAllowed method definition.
 +        CodeBuilder isAllowedBuilder;
 +        LocalVariable storableVar;
 +        {
 +            TypeDesc[] params = {TypeDesc.OBJECT};
 +            MethodInfo mi = cf.addMethod(Modifiers.PUBLIC, "isAllowed", BOOLEAN, params);
 +            isAllowedBuilder = new CodeBuilder(mi);
 +
 +            storableVar = isAllowedBuilder.getParameter(0);
 +
 +            // Filter out any instances of null. Shouldn't happen though.
 +            isAllowedBuilder.loadLocal(storableVar);
 +            Label notNull = isAllowedBuilder.createLabel();
 +            isAllowedBuilder.ifNullBranch(notNull, false);
 +            isAllowedBuilder.loadConstant(false);
 +            isAllowedBuilder.returnValue(BOOLEAN);
 +            notNull.setLocation();
 +
 +            // Cast the parameter to the expected type.
 +            isAllowedBuilder.loadLocal(storableVar);
 +            TypeDesc storableType = TypeDesc.forClass(type);
 +            isAllowedBuilder.checkCast(storableType);
 +            storableVar = isAllowedBuilder.createLocalVariable("storable", storableType);
 +            isAllowedBuilder.storeLocal(storableVar);
 +        }
 +
 +        filter.accept(new CodeGen<S>(cf, ctorBuilder, isAllowedBuilder, storableVar), null);
 +
 +        // Finish constructor.
 +        ctorBuilder.returnVoid();
 +
 +        return (Class<Cursor<S>>) ci.defineClass(cf);
 +    }
 +
 +    public static interface Factory<S extends Storable> {
 +        /**
 +         * @param cursor cursor to wrap and filter
 +         * @param filterValues values corresponding to original filter used to create this factory
 +         */
 +        Cursor<S> newFilteredCursor(Cursor<S> cursor, Object... filterValues);
 +    }
 +
 +    /**
 +     * Ensures properties can be read and that relational operation is
 +     * supported.
 +     */
 +    private static class Validator<S extends Storable> extends Visitor<S, Object, Object> {
 +        Validator() {
 +        }
 +
 +        public Object visit(PropertyFilter<S> filter, Object param) {
 +            ChainedProperty<S> chained = filter.getChainedProperty();
 +
 +            switch (filter.getOperator()) {
 +            case EQ: case NE:
 +                // Use equals() method instead of comparator.
 +                break;
 +            default:
 +                TypeDesc typeDesc = TypeDesc.forClass(chained.getType()).toObjectType();
 +
 +                if (!Comparable.class.isAssignableFrom(typeDesc.toClass())) {
 +                    throw new UnsupportedOperationException
 +                        ("Property \"" + chained + "\" does not implement Comparable");
 +                }
 +                break;
 +            }
 +
 +            // Follow the chain, verifying that each property has an accessible
 +            // read method.
 +
 +            checkForReadMethod(chained, chained.getPrimeProperty());
 +            for (int i=0; i<chained.getChainCount(); i++) {
 +                checkForReadMethod(chained, chained.getChainedProperty(i));
 +            }
 +
 +            return null;
 +        }
 +
 +        private void checkForReadMethod(ChainedProperty<S> chained,
 +                                        StorableProperty<?> property) {
 +            if (property.getReadMethod() == null) {
 +                String msg;
 +                if (chained.getChainCount() == 0) {
 +                    msg = "Property \"" + property.getName() + "\" cannot be read";
 +                } else {
 +                    msg = "Property \"" + property.getName() + "\" of \"" + chained +
 +                        "\" cannot be read";
 +                }
 +                throw new UnsupportedOperationException(msg);
 +            }
 +        }
 +    }
 +
 +    private static class CodeGen<S extends Storable> extends Visitor<S, Object, Object> {
 +        private static String FIELD_PREFIX = "value$";
 +
 +        private final ClassFile mClassFile;
 +        private final CodeBuilder mCtorBuilder;
 +        private final CodeBuilder mIsAllowedBuilder;
 +        private final LocalVariable mStorableVar;
 +
 +        private final Stack<Scope> mScopeStack;
 +
 +        private int mPropertyOrdinal;
 +
 +        CodeGen(ClassFile cf,
 +                CodeBuilder ctorBuilder,
 +                CodeBuilder isAllowedBuilder, LocalVariable storableVar) {
 +            mClassFile = cf;
 +            mCtorBuilder = ctorBuilder;
 +            mIsAllowedBuilder = isAllowedBuilder;
 +            mStorableVar = storableVar;
 +            mScopeStack = new Stack<Scope>();
 +            mScopeStack.push(new Scope(null, null));
 +        }
 +
 +        public Object visit(OrFilter<S> filter, Object param) {
 +            Label failLocation = mIsAllowedBuilder.createLabel();
 +            // Inherit success location to short-circuit if 'or' test succeeds.
 +            mScopeStack.push(new Scope(failLocation, getScope().mSuccessLocation));
 +            filter.getLeftFilter().accept(this, param);
 +            failLocation.setLocation();
 +            mScopeStack.pop();
 +            filter.getRightFilter().accept(this, param);
 +            return null;
 +        }
 +
 +        public Object visit(AndFilter<S> filter, Object param) {
 +            Label successLocation = mIsAllowedBuilder.createLabel();
 +            // Inherit fail location to short-circuit if 'and' test fails.
 +            mScopeStack.push(new Scope(getScope().mFailLocation, successLocation));
 +            filter.getLeftFilter().accept(this, param);
 +            successLocation.setLocation();
 +            mScopeStack.pop();
 +            filter.getRightFilter().accept(this, param);
 +            return null;
 +        }
 +
 +        public Object visit(PropertyFilter<S> filter, Object param) {
 +            ChainedProperty<S> chained = filter.getChainedProperty();
 +            TypeDesc type = TypeDesc.forClass(chained.getType());
 +            TypeDesc fieldType = actualFieldType(type);
 +            String fieldName = FIELD_PREFIX + mPropertyOrdinal;
 +
 +            // Define storage field.
 +            mClassFile.addField(Modifiers.PRIVATE.toFinal(true), fieldName, fieldType);
 +
 +            // Add code to constructor to store value into field.
 +            CodeBuilder b = mCtorBuilder;
 +            b.loadThis();
 +            b.loadLocal(b.getParameter(1));
 +            b.loadConstant(mPropertyOrdinal);
 +            b.loadFromArray(OBJECT);
 +            b.checkCast(type.toObjectType());
 +            b.convert(type.toObjectType(), fieldType, CodeAssembler.CONVERT_FP_BITS);
 +            b.storeField(fieldName, fieldType);
 +
 +            // Add code to load property value to stack.
 +            b = mIsAllowedBuilder;
 +            b.loadLocal(mStorableVar);
 +            loadProperty(b, chained.getPrimeProperty());
 +            for (int i=0; i<chained.getChainCount(); i++) {
 +                // Check if last loaded property was null, and fail if so.
 +                b.dup();
 +                Label notNull = b.createLabel();
 +                b.ifNullBranch(notNull, false);
 +                b.pop();
 +                getScope().fail(b);
 +                notNull.setLocation();
 +
 +                // Now load next property in chain.
 +                loadProperty(b, chained.getChainedProperty(i));
 +            }
 +
 +            addPropertyFilter(b, type, filter.getOperator());
 +
 +            mPropertyOrdinal++;
 +            return null;
 +        }
 +
 +        private Scope getScope() {
 +            return mScopeStack.peek();
 +        }
 +
 +        private void loadProperty(CodeBuilder b, StorableProperty<?> property) {
 +            Method readMethod = property.getReadMethod();
 +            if (readMethod == null) {
 +                // Invoke synthetic package accessible method.
 +                String readName = property.getReadMethodName();
 +                try {
 +                    readMethod = property.getEnclosingType().getDeclaredMethod(readName);
 +                } catch (NoSuchMethodException e) {
 +                    // This shouldn't happen since it was checked for earlier.
 +                    throw new IllegalArgumentException
 +                        ("Property \"" + property.getName() + "\" cannot be read");
 +                }
 +            }
 +            b.invoke(readMethod);
 +        }
 +
 +        private void addPropertyFilter(CodeBuilder b, TypeDesc type, RelOp relOp) {
 +            TypeDesc fieldType = actualFieldType(type);
 +            String fieldName = FIELD_PREFIX + mPropertyOrdinal;
 +
 +            if (type.getTypeCode() == OBJECT_CODE) {
 +                // Check if actual property being examined is null.
 +
 +                LocalVariable actualValue = b.createLocalVariable("temp", type);
 +                b.storeLocal(actualValue);
 +                b.loadLocal(actualValue);
 +                Label notNull = b.createLabel();
 +
 +                switch (relOp) {
 +                case EQ: default: // actual = ?
 +                    // If actual value is null, so test value must be null also
 +                    // to succeed.
 +                case LE: // actual <= ?
 +                    // If actual value is null, and since null is high, test
 +                    // value must be null also to succeed.
 +                    b.ifNullBranch(notNull, false);
 +                    b.loadThis();
 +                    b.loadField(fieldName, fieldType);
 +                    getScope().successIfNullElseFail(b, true);
 +                    break;
 +                case NE: // actual != ?
 +                    // If actual value is null, so test value must be non-null
 +                    // to succeed.
 +                case GT: // actual > ?
 +                    // If actual value is null, and since null is high, test
 +                    // value must be non-null to succeed.
 +                    b.ifNullBranch(notNull, false);
 +                    b.loadThis();
 +                    b.loadField(fieldName, fieldType);
 +                    getScope().successIfNullElseFail(b, false);
 +                    break;
 +                case LT: // actual < ?
 +                    // Since null is high, always fail if actual value is
 +                    // null. Don't need to examine test value.
 +                    getScope().failIfNull(b, true);
 +                    break;
 +                case GE: // actual >= ?
 +                    // Since null is high, always succeed if actual value is
 +                    // null. Don't need to examine test value.
 +                    getScope().successIfNull(b, true);
 +                    break;
 +                }
 +
 +                notNull.setLocation();
 +
 +                // At this point, actual property value is known to not be null.
 +
 +                // Check if property being tested for is null.
 +                b.loadThis();
 +                b.loadField(fieldName, fieldType);
 +
 +                switch (relOp) {
 +                case EQ: default: // non-null actual = ?
 +                    // If test value is null, fail.
 +                case GT: // non-null actual > ?
 +                    // If test value is null, and since null is high, fail.
 +                case GE: // non-null actual >= ?
 +                    // If test value is null, and since null is high, fail.
 +                    getScope().failIfNull(b, true);
 +                    break;
 +                case NE: // non-null actual != ?
 +                    // If test value is null, succeed
 +                case LE: // non-null actual <= ?
 +                    // If test value is null, and since null is high, succeed.
 +                case LT: // non-null actual < ?
 +                    // If test value is null, and since null is high, succeed.
 +                    getScope().successIfNull(b, true);
 +                    break;
 +                }
 +
 +                b.loadLocal(actualValue);
 +            }
 +
 +            // When this point is reached, actual property value is on the
 +            // operand stack, and it is non-null. Property value to test
 +            // against is not on the stack, but it is known to be non-null.
 +
 +            TypeDesc primitiveType = type.toPrimitiveType();
 +
 +            if (primitiveType == null) {
 +                b.loadThis();
 +                b.loadField(fieldName, fieldType);
 +
 +                // Do object comparison
 +                switch (relOp) {
 +                case EQ: case NE: default:
 +                    if (fieldType.isArray()) {
 +                        TypeDesc arraysDesc = TypeDesc.forClass(Arrays.class);
 +                        TypeDesc componentType = fieldType.getComponentType();
 +                        TypeDesc arrayType = fieldType;
 +                        String methodName;
 +                        if (componentType.isArray()) {
 +                            methodName = "deepEquals";
 +                            arrayType = OBJECT.toArrayType();
 +                        } else {
 +                            methodName = "equals";
 +                            if (!componentType.isPrimitive()) {
 +                                arrayType = OBJECT.toArrayType();
 +                            }
 +                        }
 +                        b.invokeStatic(arraysDesc, methodName, BOOLEAN,
 +                                       new TypeDesc[] {arrayType, arrayType});
 +                    } else {
 +                        b.invokeVirtual(OBJECT, "equals", BOOLEAN, new TypeDesc[] {OBJECT});
 +                    }
 +                    // Success if boolean value is non-zero for EQ, zero for NE.
 +                    getScope().successIfZeroComparisonElseFail(b, relOp.reverse());
 +                    break;
 +
 +                case GT: case GE: case LE: case LT:
 +                    // Compare method exists because it was checked for earlier
 +                    b.invokeInterface(TypeDesc.forClass(Comparable.class), "compareTo",
 +                                      INT, new TypeDesc[] {OBJECT});
 +                    getScope().successIfZeroComparisonElseFail(b, relOp);
 +                    break;
 +                }
 +            } else {
 +                if (!type.isPrimitive()) {
 +                    // Extract primitive value out of wrapper.
 +                    b.convert(type, primitiveType);
 +                }
 +
 +                // Floating point values are compared based on actual
 +                // bits. This allows NaN to be considered in the comparison.
 +                if (primitiveType == FLOAT) {
 +                    b.convert(primitiveType, INT, CodeBuilder.CONVERT_FP_BITS);
 +                    primitiveType = INT;
 +                } else if (primitiveType == DOUBLE) {
 +                    b.convert(primitiveType, LONG, CodeBuilder.CONVERT_FP_BITS);
 +                    primitiveType = LONG;
 +                }
 +
 +                b.loadThis();
 +                b.loadField(fieldName, fieldType);
 +                // No need to do anything special for floating point since it
 +                // has been pre-converted to bits.
 +                b.convert(fieldType, primitiveType);
 +
 +                switch (primitiveType.getTypeCode()) {
 +                case INT_CODE: default:
 +                    getScope().successIfComparisonElseFail(b, relOp);
 +                    break;
 +
 +                case LONG_CODE:
 +                    b.math(Opcode.LCMP);
 +                    getScope().successIfZeroComparisonElseFail(b, relOp);
 +                    break;
 +                }
 +            }
 +        }
 +
 +        /**
 +         * Returns the actual field type used to store the given property
 +         * type. Floating point values are represented in their "bit" form, in
 +         * order to compare against NaN.
 +         */
 +        private static TypeDesc actualFieldType(TypeDesc type) {
 +            if (type.toPrimitiveType() == FLOAT) {
 +                if (type.isPrimitive()) {
 +                    type = INT;
 +                } else {
 +                    type = INT.toObjectType();
 +                }
 +            } else if (type.toPrimitiveType() == DOUBLE) {
 +                if (type.isPrimitive()) {
 +                    type = LONG;
 +                } else {
 +                    type = LONG.toObjectType();
 +                }
 +            }
 +            return type;
 +        }
 +
 +        /**
 +         * Defines boolean logic branching scope.
 +         */
 +        private static class Scope {
 +            // If null, return false on test failure.
 +            final Label mFailLocation;
 +            // If null, return true on test success.
 +            final Label mSuccessLocation;
 +
 +            Scope(Label failLocation, Label successLocation) {
 +                mFailLocation = failLocation;
 +                mSuccessLocation = successLocation;
 +            }
 +
 +            void fail(CodeBuilder b) {
 +                if (mFailLocation != null) {
 +                    b.branch(mFailLocation);
 +                } else {
 +                    b.loadConstant(false);
 +                    b.returnValue(BOOLEAN);
 +                }
 +            }
 +
 +            /**
 +             * Branch to the fail location if the value on the stack is
 +             * null. When choice is false, fail on non-null.
 +             *
 +             * @param choice if true, fail when null, else fail when not null
 +             */
 +            void failIfNull(CodeBuilder b, boolean choice) {
 +                if (mFailLocation != null) {
 +                    b.ifNullBranch(mFailLocation, choice);
 +                } else {
 +                    Label pass = b.createLabel();
 +                    b.ifNullBranch(pass, !choice);
 +                    b.loadConstant(false);
 +                    b.returnValue(BOOLEAN);
 +                    pass.setLocation();
 +                }
 +            }
 +
 +            void success(CodeBuilder b) {
 +                if (mSuccessLocation != null) {
 +                    b.branch(mSuccessLocation);
 +                } else {
 +                    b.loadConstant(true);
 +                    b.returnValue(BOOLEAN);
 +                }
 +            }
 +
 +            /**
 +             * Branch to the success location if the value on the stack is
 +             * null. When choice is false, success on non-null.
 +             *
 +             * @param choice if true, success when null, else success when not null
 +             */
 +            void successIfNull(CodeBuilder b, boolean choice) {
 +                if (mSuccessLocation != null) {
 +                    b.ifNullBranch(mSuccessLocation, choice);
 +                } else {
 +                    Label pass = b.createLabel();
 +                    b.ifNullBranch(pass, !choice);
 +                    b.loadConstant(true);
 +                    b.returnValue(BOOLEAN);
 +                    pass.setLocation();
 +                }
 +            }
 +
 +            /**
 +             * Branch to the success location if the value on the stack is
 +             * null. When choice is false, success on non-null. If the success
 +             * condition is not met, the fail location is branched to.
 +             *
 +             * @param choice if true, success when null, else success when not null
 +             */
 +            void successIfNullElseFail(CodeBuilder b, boolean choice) {
 +                if (mSuccessLocation != null) {
 +                    b.ifNullBranch(mSuccessLocation, choice);
 +                    fail(b);
 +                } else if (mFailLocation != null) {
 +                    b.ifNullBranch(mFailLocation, !choice);
 +                    success(b);
 +                } else {
 +                    Label success = b.createLabel();
 +                    b.ifNullBranch(success, choice);
 +                    b.loadConstant(false);
 +                    b.returnValue(BOOLEAN);
 +                    success.setLocation();
 +                    b.loadConstant(true);
 +                    b.returnValue(BOOLEAN);
 +                }
 +            }
 +
 +            void successIfZeroComparisonElseFail(CodeBuilder b, RelOp relOp) {
 +                if (mSuccessLocation != null) {
 +                    b.ifZeroComparisonBranch(mSuccessLocation, relOpToChoice(relOp));
 +                    fail(b);
 +                } else if (mFailLocation != null) {
 +                    b.ifZeroComparisonBranch(mFailLocation, relOpToChoice(relOp.reverse()));
 +                    success(b);
 +                } else {
 +                    if (relOp == RelOp.NE) {
 +                        b.returnValue(BOOLEAN);
 +                    } else if (relOp == RelOp.EQ) {
 +                        b.loadConstant(1);
 +                        b.math(Opcode.IAND);
 +                        b.loadConstant(1);
 +                        b.math(Opcode.IXOR);
 +                        b.returnValue(BOOLEAN);
 +                    } else {
 +                        Label success = b.createLabel();
 +                        b.ifZeroComparisonBranch(success, relOpToChoice(relOp));
 +                        b.loadConstant(false);
 +                        b.returnValue(BOOLEAN);
 +                        success.setLocation();
 +                        b.loadConstant(true);
 +                        b.returnValue(BOOLEAN);
 +                    }
 +                }
 +            }
 +
 +            void successIfComparisonElseFail(CodeBuilder b, RelOp relOp) {
 +                if (mSuccessLocation != null) {
 +                    b.ifComparisonBranch(mSuccessLocation, relOpToChoice(relOp));
 +                    fail(b);
 +                } else if (mFailLocation != null) {
 +                    b.ifComparisonBranch(mFailLocation, relOpToChoice(relOp.reverse()));
 +                    success(b);
 +                } else {
 +                    Label success = b.createLabel();
 +                    b.ifComparisonBranch(success, relOpToChoice(relOp));
 +                    b.loadConstant(false);
 +                    b.returnValue(BOOLEAN);
 +                    success.setLocation();
 +                    b.loadConstant(true);
 +                    b.returnValue(BOOLEAN);
 +                }
 +            }
 +
 +            private String relOpToChoice(RelOp relOp) {
 +                switch (relOp) {
 +                case EQ: default:
 +                    return "==";
 +                case NE:
 +                    return "!=";
 +                case LT:
 +                    return "<";
 +                case GE:
 +                    return ">=";
 +                case GT:
 +                    return ">";
 +                case LE:
 +                    return "<=";
 +                }
 +            }
 +        }
 +    }
 +}
 diff --git a/src/main/java/com/amazon/carbonado/cursor/GroupedCursor.java b/src/main/java/com/amazon/carbonado/cursor/GroupedCursor.java new file mode 100644 index 0000000..0b17e50 --- /dev/null +++ b/src/main/java/com/amazon/carbonado/cursor/GroupedCursor.java @@ -0,0 +1,205 @@ +/*
 + * Copyright 2006 Amazon Technologies, Inc. or its affiliates.
 + * Amazon, Amazon.com and Carbonado are trademarks or registered trademarks
 + * of Amazon Technologies, Inc. or its affiliates.  All rights reserved.
 + *
 + * Licensed under the Apache License, Version 2.0 (the "License");
 + * you may not use this file except in compliance with the License.
 + * You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package com.amazon.carbonado.cursor;
 +
 +import java.util.Comparator;
 +import java.util.NoSuchElementException;
 +
 +import org.cojen.util.BeanComparator;
 +
 +import com.amazon.carbonado.Cursor;
 +import com.amazon.carbonado.FetchException;
 +import com.amazon.carbonado.FetchInterruptedException;
 +
 +/**
 + * Abstract cursor for aggregation and finding distinct data. The source cursor
 + * must be ordered in some fashion by the grouping properties. The sequence of
 + * properties must match, but it does not matter if they are ascending or
 + * descending.
 + *
 + * @author Brian S O'Neill
 + * @see SortedCursor
 + */
 +public abstract class GroupedCursor<S, G> extends AbstractCursor<G> {
 +    private final Cursor<S> mCursor;
 +    private final Comparator<S> mGroupComparator;
 +
 +    private S mGroupLeader;
 +    private G mNextAggregate;
 +
 +    /**
 +     * Create a GroupedCursor with an existing group comparator. The comparator
 +     * defines the ordering of the source cursor, and it should be a partial
 +     * odering. If group comparator defines a total ordering, then all groups
 +     * have one member.
 +     *
 +     * @param cursor source of elements which must be ordered properly
 +     * @param groupComparator comparator which defines ordering of source cursor
 +     */
 +    protected GroupedCursor(Cursor<S> cursor, Comparator<S> groupComparator) {
 +        if (cursor == null || groupComparator == null) {
 +            throw new IllegalArgumentException();
 +        }
 +        mCursor = cursor;
 +        mGroupComparator = groupComparator;
 +    }
 +
 +    /**
 +     * Create a GroupedCursor using properties to define the group
 +     * comparator. The set of properties defines the ordering of the source
 +     * cursor, and it should be a partial ordering. If properties define a
 +     * total ordering, then all groups have one member.
 +     *
 +     * @param cursor source of elements which must be ordered properly
 +     * @param type type of storable to create cursor for
 +     * @param groupProperties list of properties to group by
 +     * @throws IllegalArgumentException if any property is null or not a member
 +     * of storable type
 +     */
 +    protected GroupedCursor(Cursor<S> cursor, Class<S> type, String... groupProperties) {
 +        if (cursor == null) {
 +            throw new IllegalArgumentException();
 +        }
 +        mCursor = cursor;
 +        mGroupComparator = SortedCursor.createComparator(type, groupProperties);
 +    }
 +
 +    /**
 +     * Returns the comparator used to identify group boundaries.
 +     */
 +    public Comparator<S> comparator() {
 +        return mGroupComparator;
 +    }
 +
 +    /**
 +     * This method is called for the first entry in a group. This method is not
 +     * called again until after finishGroup is called.
 +     *
 +     * @param groupLeader first entry in group
 +     */
 +    protected abstract void beginGroup(S groupLeader) throws FetchException;
 +
 +    /**
 +     * This method is called when more entries are found for the current
 +     * group. This method is not called until after beginGroup has been
 +     * called. It may called multiple times until finishGroup is called.
 +     *
 +     * @param groupMember additional entry in group
 +     */
 +    protected abstract void addToGroup(S groupMember) throws FetchException;
 +
 +    /**
 +     * This method is called when a group is finished, and it can return an
 +     * aggregate. Simply return null if aggregate should be filtered out.
 +     *
 +     * @return aggregate, or null to filter it out
 +     */
 +    protected abstract G finishGroup() throws FetchException;
 +
 +    public void close() throws FetchException {
 +        synchronized (mCursor) {
 +            mCursor.close();
 +            mGroupLeader = null;
 +            mNextAggregate = null;
 +        }
 +    }
 +
 +    public boolean hasNext() throws FetchException {
 +        synchronized (mCursor) {
 +            if (mNextAggregate != null) {
 +                return true;
 +            }
 +
 +            try {
 +                int count = 0;
 +                if (mCursor.hasNext()) {
 +                    if (mGroupLeader == null) {
 +                        beginGroup(mGroupLeader = mCursor.next());
 +                    }
 +
 +                    while (mCursor.hasNext()) {
 +                        S groupMember = mCursor.next();
 +
 +                        if (mGroupComparator.compare(mGroupLeader, groupMember) == 0) {
 +                            addToGroup(groupMember);
 +                        } else {
 +                            G aggregate = finishGroup();
 +
 +                            beginGroup(mGroupLeader = groupMember);
 +
 +                            if (aggregate != null) {
 +                                mNextAggregate = aggregate;
 +                                return true;
 +                            }
 +                        }
 +
 +                        interruptCheck(++count);
 +                    }
 +
 +                    G aggregate = finishGroup();
 +
 +                    if (aggregate != null) {
 +                        mNextAggregate = aggregate;
 +                        return true;
 +                    }
 +                }
 +            } catch (NoSuchElementException e) {
 +            }
 +
 +            return false;
 +        }
 +    }
 +
 +    public G next() throws FetchException {
 +        synchronized (mCursor) {
 +            if (hasNext()) {
 +                G next = mNextAggregate;
 +                mNextAggregate = null;
 +                return next;
 +            }
 +            throw new NoSuchElementException();
 +        }
 +    }
 +
 +    public int skipNext(int amount) throws FetchException {
 +        synchronized (mCursor) {
 +            if (amount <= 0) {
 +                if (amount < 0) {
 +                    throw new IllegalArgumentException("Cannot skip negative amount: " + amount);
 +                }
 +                return 0;
 +            }
 +
 +            int count = 0;
 +            while (--amount >= 0 && hasNext()) {
 +                interruptCheck(++count);
 +                mNextAggregate = null;
 +            }
 +
 +            return count;
 +        }
 +    }
 +
 +    private void interruptCheck(int count) throws FetchException {
 +        if ((count & ~0xff) == 0 && Thread.interrupted()) {
 +            close();
 +            throw new FetchInterruptedException();
 +        }
 +    }
 +}
 diff --git a/src/main/java/com/amazon/carbonado/cursor/IntersectionCursor.java b/src/main/java/com/amazon/carbonado/cursor/IntersectionCursor.java new file mode 100644 index 0000000..9d11b2f --- /dev/null +++ b/src/main/java/com/amazon/carbonado/cursor/IntersectionCursor.java @@ -0,0 +1,118 @@ +/*
 + * Copyright 2006 Amazon Technologies, Inc. or its affiliates.
 + * Amazon, Amazon.com and Carbonado are trademarks or registered trademarks
 + * of Amazon Technologies, Inc. or its affiliates.  All rights reserved.
 + *
 + * Licensed under the Apache License, Version 2.0 (the "License");
 + * you may not use this file except in compliance with the License.
 + * You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package com.amazon.carbonado.cursor;
 +
 +import java.util.Comparator;
 +import java.util.NoSuchElementException;
 +
 +import com.amazon.carbonado.FetchException;
 +import com.amazon.carbonado.Cursor;
 +
 +/**
 + * Wraps two Cursors and performs a <i>set intersection</i> operation. In
 + * boolean logic, this is an <i>and</i> operation.
 + *
 + * <p>Both cursors must return results in the same order. Ordering is preserved
 + * by the intersection.
 + *
 + * @author Brian S O'Neill
 + * @see UnionCursor
 + * @see DifferenceCursor
 + * @see SymmetricDifferenceCursor
 + */
 +public class IntersectionCursor<S> extends AbstractCursor<S> {
 +    private final Cursor<S> mLeftCursor;
 +    private final Cursor<S> mRightCursor;
 +    private final Comparator<S> mOrder;
 +
 +    private S mNext;
 +
 +    /**
 +     * @param left cursor to wrap
 +     * @param right cursor to wrap
 +     * @param order describes sort ordering of wrapped cursors, which must be
 +     * a total ordering
 +     */
 +    public IntersectionCursor(Cursor<S> left, Cursor<S> right, Comparator<S> order) {
 +        if (left == null || right == null || order == null) {
 +            throw new IllegalArgumentException();
 +        }
 +        mLeftCursor = left;
 +        mRightCursor = right;
 +        mOrder = order;
 +    }
 +
 +    public synchronized void close() throws FetchException {
 +        mLeftCursor.close();
 +        mRightCursor.close();
 +        mNext = null;
 +    }
 +
 +    public synchronized boolean hasNext() throws FetchException {
 +        if (mNext != null) {
 +            return true;
 +        }
 +
 +        S nextLeft, nextRight;
 +
 +        if (mLeftCursor.hasNext()) {
 +            nextLeft = mLeftCursor.next();
 +        } else {
 +            close();
 +            return false;
 +        }
 +        if (mRightCursor.hasNext()) {
 +            nextRight = mRightCursor.next();
 +        } else {
 +            close();
 +            return false;
 +        }
 +
 +        while (true) {
 +            int result = mOrder.compare(nextLeft, nextRight);
 +            if (result < 0) {
 +                if (mLeftCursor.hasNext()) {
 +                    nextLeft = mLeftCursor.next();
 +                } else {
 +                    close();
 +                    return false;
 +                }
 +            } else if (result > 0) {
 +                if (mRightCursor.hasNext()) {
 +                    nextRight = mRightCursor.next();
 +                } else {
 +                    close();
 +                    return false;
 +                }
 +            } else {
 +                mNext = nextLeft;
 +                return true;
 +            }
 +        }
 +    }
 +
 +    public synchronized S next() throws FetchException {
 +        if (hasNext()) {
 +            S next = mNext;
 +            mNext = null;
 +            return next;
 +        }
 +        throw new NoSuchElementException();
 +    }
 +}
 diff --git a/src/main/java/com/amazon/carbonado/cursor/IteratorCursor.java b/src/main/java/com/amazon/carbonado/cursor/IteratorCursor.java new file mode 100644 index 0000000..0330629 --- /dev/null +++ b/src/main/java/com/amazon/carbonado/cursor/IteratorCursor.java @@ -0,0 +1,62 @@ +/*
 + * Copyright 2006 Amazon Technologies, Inc. or its affiliates.
 + * Amazon, Amazon.com and Carbonado are trademarks or registered trademarks
 + * of Amazon Technologies, Inc. or its affiliates.  All rights reserved.
 + *
 + * Licensed under the Apache License, Version 2.0 (the "License");
 + * you may not use this file except in compliance with the License.
 + * You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package com.amazon.carbonado.cursor;
 +
 +import java.util.Iterator;
 +import java.util.NoSuchElementException;
 +
 +/**
 + * Adapts an Iterator into a Cursor.
 + *
 + * @author Brian S O'Neill
 + */
 +public class IteratorCursor<S> extends AbstractCursor<S> {
 +    private Iterator<S> mIterator;
 +
 +    /**
 +     * @param iterable collection to iterate over, or null for empty cursor
 +     */
 +    public IteratorCursor(Iterable<S> iterable) {
 +        this(iterable == null ? (Iterator<S>) null : iterable.iterator());
 +    }
 +
 +    /**
 +     * @param iterator iterator to wrap, or null for empty cursor
 +     */
 +    public IteratorCursor(Iterator<S> iterator) {
 +        mIterator = iterator;
 +    }
 +
 +    public void close() {
 +        mIterator = null;
 +    }
 +
 +    public boolean hasNext() {
 +        Iterator it = mIterator;
 +        return it != null && it.hasNext();
 +    }
 +
 +    public S next() {
 +        Iterator<S> it = mIterator;
 +        if (it == null) {
 +            throw new NoSuchElementException();
 +        }
 +        return it.next();
 +    }
 +}
 diff --git a/src/main/java/com/amazon/carbonado/cursor/JoinedCursorFactory.java b/src/main/java/com/amazon/carbonado/cursor/JoinedCursorFactory.java new file mode 100644 index 0000000..ae98a95 --- /dev/null +++ b/src/main/java/com/amazon/carbonado/cursor/JoinedCursorFactory.java @@ -0,0 +1,459 @@ +/*
 + * Copyright 2006 Amazon Technologies, Inc. or its affiliates.
 + * Amazon, Amazon.com and Carbonado are trademarks or registered trademarks
 + * of Amazon Technologies, Inc. or its affiliates.  All rights reserved.
 + *
 + * Licensed under the Apache License, Version 2.0 (the "License");
 + * you may not use this file except in compliance with the License.
 + * You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package com.amazon.carbonado.cursor;
 +
 +import java.lang.reflect.Field;
 +
 +import java.util.Map;
 +
 +import org.cojen.classfile.ClassFile;
 +import org.cojen.classfile.CodeBuilder;
 +import org.cojen.classfile.FieldInfo;
 +import org.cojen.classfile.Label;
 +import org.cojen.classfile.LocalVariable;
 +import org.cojen.classfile.MethodInfo;
 +import org.cojen.classfile.Modifiers;
 +import org.cojen.classfile.TypeDesc;
 +
 +import org.cojen.util.ClassInjector;
 +import org.cojen.util.KeyFactory;
 +import org.cojen.util.SoftValuedHashMap;
 +import org.cojen.util.WeakIdentityMap;
 +
 +import com.amazon.carbonado.Cursor;
 +import com.amazon.carbonado.FetchException;
 +import com.amazon.carbonado.Query;
 +import com.amazon.carbonado.Repository;
 +import com.amazon.carbonado.RepositoryException;
 +import com.amazon.carbonado.Storable;
 +import com.amazon.carbonado.Storage;
 +import com.amazon.carbonado.SupportException;
 +
 +import com.amazon.carbonado.info.ChainedProperty;
 +import com.amazon.carbonado.info.StorableInfo;
 +import com.amazon.carbonado.info.StorableIntrospector;
 +import com.amazon.carbonado.info.StorableProperty;
 +
 +import com.amazon.carbonado.util.QuickConstructorGenerator;
 +
 +import com.amazon.carbonado.spi.CodeBuilderUtil;
 +import static com.amazon.carbonado.spi.CommonMethodNames.*;
 +
 +/**
 + * Given two joined types <i>A</i> and <i>B</i>, this factory converts a cursor
 + * over type <i>A</i> into a cursor over type <i>B</i>. For example, consider
 + * two storable types, Employer and Person. A query filter for persons with a
 + * given employer might look like: {@code "employer.name = ?" }  The join can be
 + * manually implemented by querying for employers and then using this factory
 + * to produce persons:
 + *
 + * <pre>
 + * JoinedCursorFactory<Employer, Person> factory = new JoinedCursorFactory<Employer, Person>
 + *     (repo, Person.class, "employer", Employer.class);
 + *
 + * Cursor<Employer> employerCursor = repo.storageFor(Employer.class)
 + *     .query("name = ?").with(...).fetch();
 + *
 + * Cursor<Person> personCursor = factory.join(employerCursor);
 + * </pre>
 + *
 + * Chained properties are supported as well. A query filter for persons with an
 + * employer in a given state might look like: {@code "employer.address.state = ?" }
 + * The join can be manually implemented as:
 + *
 + * <pre>
 + * JoinedCursorFactory<Address, Person> factory = new JoinedCursorFactory<Address, Person>
 + *     (repo, Person.class, "employer.address", Address.class);
 + *
 + * Cursor<Address> addressCursor = repo.storageFor(Address.class)
 + *     .query("state = ?").with(...).fetch();
 + *
 + * Cursor<Person> personCursor = factory.join(addressCursor);
 + * </pre>
 + *
 + * @author Brian S O'Neill
 + */
 +public class JoinedCursorFactory<A, B extends Storable> {
 +    private static final String STORAGE_FIELD_NAME = "storage";
 +    private static final String QUERY_FIELD_NAME = "query";
 +    private static final String QUERY_FILTER_FIELD_NAME = "queryFilter";
 +    private static final String ACTIVE_A_FIELD_NAME = "active";
 +
 +    private static final Map<Object, Class> cJoinerCursorClassCache;
 +
 +    static {
 +        cJoinerCursorClassCache = new SoftValuedHashMap();
 +    }
 +
 +    private static synchronized <B extends Storable> Joiner<?, B>
 +        newBasicJoiner(StorableProperty<B> bToAProperty, Storage<B> bStorage)
 +        throws FetchException
 +    {
 +        Class<?> aType = bToAProperty.getType();
 +
 +        final Object key = KeyFactory.createKey
 +            (new Object[] {aType,
 +                           bToAProperty.getEnclosingType(),
 +                           bToAProperty.getName()});
 +
 +        Class clazz = cJoinerCursorClassCache.get(key);
 +
 +        if (clazz == null) {
 +            clazz = generateBasicJoinerCursor(aType, bToAProperty);
 +            cJoinerCursorClassCache.put(key, clazz);
 +        }
 +
 +        // Transforming cursor class may need a Query to operate on.
 +        Query<B> bQuery = null;
 +        try {
 +            String filter = (String) clazz.getField(QUERY_FILTER_FIELD_NAME).get(null);
 +            bQuery = bStorage.query(filter);
 +        } catch (NoSuchFieldException e) {
 +        } catch (IllegalAccessException e) {
 +        }
 +
 +        BasicJoiner.Factory<?, B> factory = (BasicJoiner.Factory<?, B>) QuickConstructorGenerator
 +            .getInstance(clazz, BasicJoiner.Factory.class);
 +
 +        return new BasicJoiner(factory, bStorage, bQuery);
 +    }
 +
 +    private static <B extends Storable> Class<Cursor<B>>
 +        generateBasicJoinerCursor(Class<?> aType, StorableProperty<B> bToAProperty)
 +    {
 +        final int propCount = bToAProperty.getJoinElementCount();
 +
 +        // Determine if join is one-to-one, in which case slightly more optimal
 +        // code can be generated.
 +        boolean isOneToOne = true;
 +        for (int i=0; i<propCount; i++) {
 +            if (!bToAProperty.getInternalJoinElement(i).isPrimaryKeyMember()) {
 +                isOneToOne = false;
 +                break;
 +            }
 +            if (!bToAProperty.getExternalJoinElement(i).isPrimaryKeyMember()) {
 +                isOneToOne = false;
 +                break;
 +            }
 +        }
 +
 +        Class<B> bType = bToAProperty.getEnclosingType();
 +
 +        String packageName;
 +        {
 +            String name = bType.getName();
 +            int index = name.lastIndexOf('.');
 +            if (index >= 0) {
 +                packageName = name.substring(0, index);
 +            } else {
 +                packageName = "";
 +            }
 +        }
 +
 +        ClassLoader loader = bType.getClassLoader();
 +
 +        ClassInjector ci = ClassInjector.create(packageName + ".JoinedCursor", loader);
 +        Class superclass = isOneToOne ? TransformedCursor.class : MultiTransformedCursor.class;
 +        ClassFile cf = new ClassFile(ci.getClassName(), superclass);
 +        cf.markSynthetic();
 +        cf.setSourceFile(JoinedCursorFactory.class.getName());
 +        cf.setTarget("1.5");
 +
 +        final TypeDesc queryType = TypeDesc.forClass(Query.class);
 +        final TypeDesc cursorType = TypeDesc.forClass(Cursor.class);
 +        final TypeDesc storageType = TypeDesc.forClass(Storage.class);
 +        final TypeDesc storableType = TypeDesc.forClass(Storable.class);
 +
 +        if (isOneToOne) {
 +            cf.addField(Modifiers.PRIVATE.toFinal(true), STORAGE_FIELD_NAME, storageType);
 +        } else {
 +            // Field to hold query which fetches type B.
 +            cf.addField(Modifiers.PRIVATE.toFinal(true), QUERY_FIELD_NAME, queryType);
 +        }
 +
 +        boolean canSetAReference = bToAProperty.getWriteMethod() != null;
 +
 +        if (canSetAReference && !isOneToOne) {
 +            // Field to hold active A storable.
 +            cf.addField(Modifiers.PRIVATE, ACTIVE_A_FIELD_NAME, TypeDesc.forClass(aType));
 +        }
 +
 +        // Constructor accepts a Storage and Query, but Storage is only used
 +        // for one-to-one, and Query is only used for one-to-many.
 +        {
 +            MethodInfo mi = cf.addConstructor(Modifiers.PUBLIC,
 +                                              new TypeDesc[] {cursorType, storageType, queryType});
 +            CodeBuilder b = new CodeBuilder(mi);
 +
 +            b.loadThis();
 +            b.loadLocal(b.getParameter(0)); // pass A cursor to superclass
 +            b.invokeSuperConstructor(new TypeDesc[] {cursorType});
 +
 +            if (isOneToOne) {
 +                b.loadThis();
 +                b.loadLocal(b.getParameter(1)); // push B storage to stack
 +                b.storeField(STORAGE_FIELD_NAME, storageType);
 +            } else {
 +                b.loadThis();
 +                b.loadLocal(b.getParameter(2)); // push B query to stack
 +                b.storeField(QUERY_FIELD_NAME, queryType);
 +            }
 +
 +            b.returnVoid();
 +        }
 +
 +        // For one-to-many, a query is needed. Save the query filter in a
 +        // public static field to be grabbed later.
 +        if (!isOneToOne) {
 +            StringBuilder queryBuilder = new StringBuilder();
 +
 +            for (int i=0; i<propCount; i++) {
 +                if (i > 0) {
 +                    queryBuilder.append(" & ");
 +                }
 +                queryBuilder.append(bToAProperty.getInternalJoinElement(i).getName());
 +                queryBuilder.append(" = ?");
 +            }
 +
 +            FieldInfo fi = cf.addField(Modifiers.PUBLIC.toStatic(true).toFinal(true),
 +                                       QUERY_FILTER_FIELD_NAME, TypeDesc.STRING);
 +            fi.setConstantValue(queryBuilder.toString());
 +        }
 +
 +        // Implement the transform method.
 +        if (isOneToOne) {
 +            MethodInfo mi = cf.addMethod(Modifiers.PROTECTED, "transform", TypeDesc.OBJECT,
 +                                         new TypeDesc[] {TypeDesc.OBJECT});
 +            mi.addException(TypeDesc.forClass(FetchException.class));
 +            CodeBuilder b = new CodeBuilder(mi);
 +
 +            LocalVariable aVar = b.createLocalVariable(null, storableType);
 +            b.loadLocal(b.getParameter(0));
 +            b.checkCast(TypeDesc.forClass(aType));
 +            b.storeLocal(aVar);
 +
 +            // Prepare B storable.
 +            b.loadThis();
 +            b.loadField(STORAGE_FIELD_NAME, storageType);
 +            b.invokeInterface(storageType, PREPARE_METHOD_NAME, storableType, null);
 +            LocalVariable bVar = b.createLocalVariable(null, storableType);
 +            b.checkCast(TypeDesc.forClass(bType));
 +            b.storeLocal(bVar);
 +
 +            // Copy pk property values from A to B.
 +            for (int i=0; i<propCount; i++) {
 +                StorableProperty<B> internal = bToAProperty.getInternalJoinElement(i);
 +                StorableProperty<?> external = bToAProperty.getExternalJoinElement(i);
 +
 +                b.loadLocal(bVar);
 +                b.loadLocal(aVar);
 +                b.invoke(external.getReadMethod());
 +                b.invoke(internal.getWriteMethod());
 +            }
 +
 +            // tryLoad b.
 +            b.loadLocal(bVar);
 +            b.invokeInterface(storableType, TRY_LOAD_METHOD_NAME, TypeDesc.BOOLEAN, null);
 +            Label wasLoaded = b.createLabel();
 +            b.ifZeroComparisonBranch(wasLoaded, "!=");
 +
 +            b.loadNull();
 +            b.returnValue(storableType);
 +
 +            wasLoaded.setLocation();
 +
 +            if (canSetAReference) {
 +                b.loadLocal(bVar);
 +                b.loadLocal(aVar);
 +                b.invoke(bToAProperty.getWriteMethod());
 +            }
 +
 +            b.loadLocal(bVar);
 +            b.returnValue(storableType);
 +        } else {
 +            MethodInfo mi = cf.addMethod(Modifiers.PROTECTED, "transform", cursorType,
 +                                         new TypeDesc[] {TypeDesc.OBJECT});
 +            mi.addException(TypeDesc.forClass(FetchException.class));
 +            CodeBuilder b = new CodeBuilder(mi);
 +
 +            LocalVariable aVar = b.createLocalVariable(null, storableType);
 +            b.loadLocal(b.getParameter(0));
 +            b.checkCast(TypeDesc.forClass(aType));
 +            b.storeLocal(aVar);
 +
 +            if (canSetAReference) {
 +                b.loadThis();
 +                b.loadLocal(aVar);
 +                b.storeField(ACTIVE_A_FIELD_NAME, TypeDesc.forClass(aType));
 +            }
 +
 +            // Populate query parameters.
 +            b.loadThis();
 +            b.loadField(QUERY_FIELD_NAME, queryType);
 +
 +            for (int i=0; i<propCount; i++) {
 +                StorableProperty<?> external = bToAProperty.getExternalJoinElement(i);
 +                b.loadLocal(aVar);
 +                b.invoke(external.getReadMethod());
 +
 +                TypeDesc bindType = CodeBuilderUtil.bindQueryParam(external.getType());
 +                CodeBuilderUtil.convertValue(b, external.getType(), bindType.toClass());
 +                b.invokeInterface(queryType, WITH_METHOD_NAME, queryType,
 +                                  new TypeDesc[] {bindType});
 +            }
 +
 +            // Now fetch and return.
 +            b.invokeInterface(queryType, FETCH_METHOD_NAME, cursorType, null);
 +            b.returnValue(cursorType);
 +        }
 +
 +        if (canSetAReference && !isOneToOne) {
 +            // Override the "next" method to set A object on B.
 +            MethodInfo mi = cf.addMethod(Modifiers.PUBLIC, "next", TypeDesc.OBJECT, null);
 +            mi.addException(TypeDesc.forClass(FetchException.class));
 +            CodeBuilder b = new CodeBuilder(mi);
 +
 +            b.loadThis();
 +            b.invokeSuper(TypeDesc.forClass(MultiTransformedCursor.class),
 +                          "next", TypeDesc.OBJECT, null);
 +            b.checkCast(TypeDesc.forClass(bType));
 +            b.dup();
 +
 +            b.loadThis();
 +            b.loadField(ACTIVE_A_FIELD_NAME, TypeDesc.forClass(aType));
 +            b.invoke(bToAProperty.getWriteMethod());
 +
 +            b.returnValue(storableType);
 +        }
 +
 +        return (Class<Cursor<B>>) ci.defineClass(cf);
 +    }
 +
 +    private final Joiner<A, B> mJoiner;
 +
 +    /**
 +     * @param repo access to storage instances for properties
 +     * @param bType type of <i>B</i> instances
 +     * @param bToAProperty property of <i>B</i> type which maps to instances of
 +     * <i>A</i> type.
 +     * @param aType type of <i>A</i> instances
 +     * @throws IllegalArgumentException if property type is not <i>A</i>
 +     */
 +    public JoinedCursorFactory(Repository repo,
 +                               Class<B> bType,
 +                               String bToAProperty,
 +                               Class<A> aType)
 +        throws SupportException, FetchException, RepositoryException
 +    {
 +        this(repo,
 +             ChainedProperty.parse(StorableIntrospector.examine(bType), bToAProperty),
 +             aType);
 +    }
 +
 +    /**
 +     * @param repo access to storage instances for properties
 +     * @param bToAProperty property of <i>B</i> type which maps to instances of
 +     * <i>A</i> type.
 +     * @param aType type of <i>A</i> instances
 +     * @throws IllegalArgumentException if property type is not <i>A</i>
 +     */
 +    public JoinedCursorFactory(Repository repo,
 +                               ChainedProperty<B> bToAProperty,
 +                               Class<A> aType)
 +        throws SupportException, FetchException, RepositoryException
 +    {
 +        if (bToAProperty.getType() != aType) {
 +            throw new IllegalArgumentException
 +                ("Property is not of type \"" + aType.getName() + "\": " +
 +                 bToAProperty);
 +        }
 +
 +        StorableProperty<B> primeB = bToAProperty.getPrimeProperty();
 +        Storage<B> primeBStorage = repo.storageFor(primeB.getEnclosingType());
 +
 +        Joiner joiner = newBasicJoiner(primeB, primeBStorage);
 +
 +        int chainCount = bToAProperty.getChainCount();
 +        for (int i=0; i<chainCount; i++) {
 +            StorableProperty prop = bToAProperty.getChainedProperty(i);
 +            Storage storage = repo.storageFor(prop.getEnclosingType());
 +
 +            joiner = new MultiJoiner(newBasicJoiner(prop, storage), joiner);
 +        }
 +
 +        mJoiner = (Joiner<A, B>) joiner;
 +    }
 +
 +    /**
 +     * Given a cursor over <i>A</i>, returns a new cursor over joined property
 +     * of type <i>B</i>.
 +     */
 +    public Cursor<B> join(Cursor<A> cursor) {
 +        return mJoiner.join(cursor);
 +    }
 +
 +    private static interface Joiner<A, B extends Storable> {
 +        Cursor<B> join(Cursor<A> cursor);
 +    }
 +
 +    /**
 +     * Support for joins without an intermediate hop.
 +     */
 +    private static class BasicJoiner<A, B extends Storable> implements Joiner<A, B> {
 +        private final Factory<A, B> mJoinerFactory;
 +        private final Storage<B> mBStorage;
 +        private final Query<B> mBQuery;
 +
 +        BasicJoiner(Factory<A, B> factory, Storage<B> bStorage, Query<B> bQuery) {
 +            mJoinerFactory = factory;
 +            mBStorage = bStorage;
 +            mBQuery = bQuery;
 +        }
 +
 +        public Cursor<B> join(Cursor<A> cursor) {
 +            return mJoinerFactory.newJoinedCursor(cursor, mBStorage, mBQuery);
 +        }
 +
 +        /**
 +         * Needs to be public for {@link QuickConstructorGenerator}.
 +         */
 +        public static interface Factory<A, B extends Storable> {
 +            Cursor<B> newJoinedCursor(Cursor<A> cursor, Storage<B> bStorage, Query<B> bQuery);
 +        }
 +    }
 +
 +    /**
 +     * Support for joins with an intermediate hop -- multi-way joins.
 +     */
 +    private static class MultiJoiner<A, X extends Storable, B extends Storable>
 +        implements Joiner<A, B>
 +    {
 +        private final Joiner<A, X> mAToMid;
 +        private final Joiner<X, B> mMidToB;
 +
 +        MultiJoiner(Joiner<A, X> aToMidJoiner, Joiner<X, B> midToBJoiner) {
 +            mAToMid = aToMidJoiner;
 +            mMidToB = midToBJoiner;
 +        }
 +
 +        public Cursor<B> join(Cursor<A> cursor) {
 +            return mMidToB.join(mAToMid.join(cursor));
 +        }
 +    }
 +}
 diff --git a/src/main/java/com/amazon/carbonado/cursor/MergeSortBuffer.java b/src/main/java/com/amazon/carbonado/cursor/MergeSortBuffer.java new file mode 100644 index 0000000..0094183 --- /dev/null +++ b/src/main/java/com/amazon/carbonado/cursor/MergeSortBuffer.java @@ -0,0 +1,498 @@ +/*
 + * Copyright 2006 Amazon Technologies, Inc. or its affiliates.
 + * Amazon, Amazon.com and Carbonado are trademarks or registered trademarks
 + * of Amazon Technologies, Inc. or its affiliates.  All rights reserved.
 + *
 + * Licensed under the Apache License, Version 2.0 (the "License");
 + * you may not use this file except in compliance with the License.
 + * You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package com.amazon.carbonado.cursor;
 +
 +import java.io.BufferedInputStream;
 +import java.io.BufferedOutputStream;
 +import java.io.InputStream;
 +import java.io.OutputStream;
 +import java.io.EOFException;
 +import java.io.IOException;
 +import java.io.RandomAccessFile;
 +
 +import java.lang.reflect.UndeclaredThrowableException;
 +
 +import java.util.AbstractCollection;
 +import java.util.ArrayList;
 +import java.util.Arrays;
 +import java.util.Comparator;
 +import java.util.Iterator;
 +import java.util.List;
 +import java.util.NoSuchElementException;
 +import java.util.PriorityQueue;
 +
 +import com.amazon.carbonado.FetchInterruptedException;
 +import com.amazon.carbonado.Storable;
 +import com.amazon.carbonado.Storage;
 +import com.amazon.carbonado.SupportException;
 +
 +import com.amazon.carbonado.spi.RAFInputStream;
 +import com.amazon.carbonado.spi.RAFOutputStream;
 +import com.amazon.carbonado.spi.StorableSerializer;
 +
 +/**
 + * Sort buffer implemented via a merge sort algorithm. If there are too many
 + * storables to fit in the reserved memory buffer, they are sorted and
 + * serialized to temporary files.
 + *
 + * @author Brian S O'Neill
 + * @see SortedCursor
 + */
 +public class MergeSortBuffer<S extends Storable> extends AbstractCollection<S>
 +    implements SortBuffer<S>
 +{
 +    private static final int MIN_ARRAY_CAPACITY = 64;
 +
 +    // Bigger means better performance, but more memory is used.
 +    private static final int MAX_ARRAY_CAPACITY = 8192;
 +
 +    // Bigger means better performance, but more file handles may be used.
 +    private static final int MAX_FILE_COUNT = 100;
 +
 +    // Bigger may improve write performance, but not by much.
 +    private static final int OUTPUT_BUFFER_SIZE = 10000;
 +
 +    private final Storage<S> mStorage;
 +    private final String mTempDir;
 +    private final int mMaxArrayCapacity;
 +
 +    private S[] mElements;
 +    private int mSize;
 +    private int mTotalSize;
 +
 +    private StorableSerializer<S> mSerializer;
 +    private WorkFilePool mWorkFilePool;
 +    private List<RandomAccessFile> mFilesInUse;
 +
 +    private Comparator<S> mComparator;
 +
 +    private volatile boolean mStop;
 +
 +    /**
 +     * @param storage storage type of elements
 +     */
 +    public MergeSortBuffer(Storage<S> storage) {
 +        this(storage, null, MAX_ARRAY_CAPACITY);
 +    }
 +
 +    /**
 +     * @param storage storage type of elements
 +     * @param tempDir directory to store temp files for merging, or null for default
 +     */
 +    public MergeSortBuffer(Storage<S> storage, String tempDir) {
 +        this(storage, tempDir, MAX_ARRAY_CAPACITY);
 +    }
 +
 +    /**
 +     * @param storage storage type of elements
 +     * @param tempDir directory to store temp files for merging, or null for default
 +     * @param maxArrayCapacity maximum amount of storables to keep in an array
 +     * before serializing to a file
 +     */
 +    @SuppressWarnings("unchecked")
 +    public MergeSortBuffer(Storage<S> storage, String tempDir, int maxArrayCapacity) {
 +        mStorage = storage;
 +        mTempDir = tempDir;
 +        mMaxArrayCapacity = maxArrayCapacity;
 +        int cap = Math.min(MIN_ARRAY_CAPACITY, maxArrayCapacity);
 +        mElements = (S[]) new Storable[cap];
 +    }
 +
 +    public void prepare(Comparator<S> comparator) {
 +        if (comparator == null) {
 +            throw new IllegalArgumentException();
 +        }
 +        clear();
 +        mComparator = comparator;
 +    }
 +
 +    public boolean add(S storable) {
 +        arrayPrep:
 +        if (mSize >= mElements.length) {
 +            if (mElements.length < mMaxArrayCapacity) {
 +                // Increase array capacity.
 +                int newCap = mElements.length * 2;
 +                if (newCap > mMaxArrayCapacity) {
 +                    newCap = mMaxArrayCapacity;
 +                }
 +                S[] newElements = (S[]) new Storable[newCap];
 +                System.arraycopy(mElements, 0, newElements, 0, mElements.length);
 +                mElements = newElements;
 +                break arrayPrep;
 +            }
 +
 +            // Sort current in-memory results and serialize to a temp file.
 +
 +            // Make sure everything is set up to use temp files.
 +            {
 +                if (mSerializer == null) {
 +                    try {
 +                        mSerializer = StorableSerializer.forType(mStorage.getStorableType());
 +                    } catch (SupportException e) {
 +                        throw new UndeclaredThrowableException(e);
 +                    }
 +                    mWorkFilePool = WorkFilePool.getInstance(mTempDir);
 +                    mFilesInUse = new ArrayList<RandomAccessFile>();
 +                }
 +            }
 +
 +            Arrays.sort(mElements, mComparator);
 +
 +            RandomAccessFile raf;
 +            try {
 +                raf = mWorkFilePool.acquireWorkFile(this);
 +                OutputStream out =
 +                    new BufferedOutputStream(new RAFOutputStream(raf), OUTPUT_BUFFER_SIZE);
 +
 +                StorableSerializer<S> serializer = mSerializer;
 +
 +                if (mFilesInUse.size() < (MAX_FILE_COUNT - 1)) {
 +                    mFilesInUse.add(raf);
 +                    int count = 0;
 +                    for (S element : mElements) {
 +                        // Check every so often if interrupted.
 +                        interruptCheck(++count);
 +                        serializer.write(element, out);
 +                    }
 +                } else {
 +                    // Merge files together.
 +
 +                    // Determine the average length per file in use.
 +                    long totalLength = 0;
 +                    int fileCount = mFilesInUse.size();
 +                    for (int i=0; i<fileCount; i++) {
 +                        totalLength += mFilesInUse.get(i).length();
 +                    }
 +
 +                    // Compute average with ceiling rounding mode.
 +                    long averageLength = (totalLength + fileCount) / fileCount;
 +
 +                    // For any file whose length is above average, don't merge
 +                    // it. The goal is to evenly distribute file growth.
 +
 +                    List<RandomAccessFile> filesToExclude = new ArrayList<RandomAccessFile>();
 +                    List<RandomAccessFile> filesToMerge = new ArrayList<RandomAccessFile>();
 +
 +                    long mergedLength = 0;
 +                    for (int i=0; i<fileCount; i++) {
 +                        RandomAccessFile fileInUse = mFilesInUse.get(i);
 +                        long fileLength = fileInUse.length();
 +                        if (fileLength > averageLength) {
 +                            filesToExclude.add(fileInUse);
 +                        } else {
 +                            filesToMerge.add(fileInUse);
 +                            mergedLength += fileLength;
 +                        }
 +                    }
 +
 +                    mFilesInUse.add(raf);
 +
 +                    // Pre-allocate space, in an attempt to improve performance
 +                    // as well as error out earlier, should the disk be full.
 +                    raf.setLength(mergedLength);
 +
 +                    int count = 0;
 +                    Iterator<S> it = iterator(filesToMerge);
 +                    while (it.hasNext()) {
 +                        // Check every so often if interrupted.
 +                        interruptCheck(++count);
 +                        S element = it.next();
 +                        serializer.write(element, out);
 +                    }
 +
 +                    mWorkFilePool.releaseWorkFiles(filesToMerge);
 +                    mFilesInUse = filesToExclude;
 +                    mFilesInUse.add(raf);
 +                }
 +
 +                out.flush();
 +
 +                // Truncate any data from last time file was used.
 +                raf.setLength(raf.getFilePointer());
 +                // Reset to start of file in preparation for reading later.
 +                raf.seek(0);
 +            } catch (IOException e) {
 +                throw new UndeclaredThrowableException(e);
 +            }
 +
 +            mSize = 0;
 +        }
 +
 +        mElements[mSize++] = storable;
 +        mTotalSize++;
 +        return true;
 +    }
 +
 +    public int size() {
 +        return mTotalSize;
 +    }
 +
 +    public Iterator<S> iterator() {
 +        return iterator(mFilesInUse);
 +    }
 +
 +    private Iterator<S> iterator(List<RandomAccessFile> filesToMerge) {
 +        if (mSerializer == null) {
 +            return new ObjectArrayIterator<S>(mElements, 0, mSize);
 +        }
 +
 +        // Merge with the files. Use a priority queue to decide which is the
 +        // next buffer to pull an element from.
 +
 +        PriorityQueue<Iter<S>> pq = new PriorityQueue<Iter<S>>(1 + mFilesInUse.size());
 +        pq.add(new ArrayIter<S>(mComparator, mElements, mSize));
 +        for (RandomAccessFile raf : filesToMerge) {
 +            try {
 +                raf.seek(0);
 +            } catch (IOException e) {
 +                throw new UndeclaredThrowableException(e);
 +            }
 +
 +            InputStream in = new BufferedInputStream(new RAFInputStream(raf));
 +
 +            pq.add(new InputIter<S>(mComparator, mSerializer, mStorage, in));
 +        }
 +
 +        return new Merger<S>(pq);
 +    }
 +
 +    public void clear() {
 +        if (mTotalSize > 0) {
 +            mSize = 0;
 +            mTotalSize = 0;
 +            if (mWorkFilePool != null && mFilesInUse != null) {
 +                mWorkFilePool.releaseWorkFiles(mFilesInUse);
 +                mFilesInUse.clear();
 +            }
 +        }
 +    }
 +
 +    public void sort() {
 +        // Sort current in-memory results. Anything residing in files has
 +        // already been sorted.
 +        if (mComparator == null) {
 +            throw new IllegalStateException("Buffer was not prepared");
 +        }
 +        Arrays.sort(mElements, 0, mSize, mComparator);
 +    }
 +
 +    public void close() {
 +        clear();
 +        if (mWorkFilePool != null) {
 +            mWorkFilePool.unregisterWorkFileUser(this);
 +        }
 +    }
 +
 +    void stop() {
 +        mStop = true;
 +    }
 +
 +    private void interruptCheck(int count) {
 +        if ((count & ~0xff) == 0 && (mStop || Thread.interrupted())) {
 +            close();
 +            throw new UndeclaredThrowableException(new FetchInterruptedException());
 +        }
 +    }
 +
 +    /**
 +     * Simple interator interface that supports peeking at next element.
 +     */
 +    private abstract static class Iter<S extends Storable> implements Comparable<Iter<S>> {
 +        private final Comparator<S> mComparator;
 +
 +        protected Iter(Comparator<S> comparator) {
 +            mComparator = comparator;
 +        }
 +
 +        /**
 +         * Returns null if iterator is exhausted.
 +         */
 +        abstract S peek();
 +
 +        /**
 +         * Returns null if iterator is exhausted.
 +         */
 +        abstract S next();
 +
 +        public int compareTo(Iter<S> iter) {
 +            S thisPeek = peek();
 +            S thatPeek = iter.peek();
 +            if (thisPeek == null) {
 +                if (thatPeek == null) {
 +                    return 0;
 +                }
 +                // Null is low in order to rise to top of priority queue. This
 +                // Iter will then be tossed out of the priority queue.
 +                return -1;
 +            } else if (thatPeek == null) {
 +                return 1;
 +            }
 +            return mComparator.compare(thisPeek, thatPeek);
 +        }
 +    }
 +
 +    /**
 +     * Iterator that reads from an array.
 +     */
 +    private static class ArrayIter<S extends Storable> extends Iter<S> {
 +        private final S[] mArray;
 +        private final int mSize;
 +        private int mPos;
 +
 +        ArrayIter(Comparator<S> comparator, S[] array, int size) {
 +            super(comparator);
 +            mArray = array;
 +            mSize = size;
 +        }
 +
 +        S peek() {
 +            int pos = mPos;
 +            if (pos >= mSize) {
 +                return null;
 +            }
 +            return mArray[pos];
 +        }
 +
 +        S next() {
 +            int pos = mPos;
 +            if (pos >= mSize) {
 +                return null;
 +            }
 +            S next = mArray[pos];
 +            mPos = pos + 1;
 +            return next;
 +        }
 +    }
 +
 +    /**
 +     * Iterator that reads from an input stream of serialized Storables.
 +     */
 +    private static class InputIter<S extends Storable> extends Iter<S> {
 +        private StorableSerializer<S> mSerializer;
 +        private Storage<S> mStorage;
 +        private InputStream mIn;
 +
 +        private S mNext;
 +
 +        InputIter(Comparator<S> comparator,
 +                  StorableSerializer<S> serializer, Storage<S> storage, InputStream in)
 +        {
 +            super(comparator);
 +            mSerializer = serializer;
 +            mStorage = storage;
 +            mIn = in;
 +        }
 +
 +        S peek() {
 +            if (mNext != null) {
 +                return mNext;
 +            }
 +            if (mIn != null) {
 +                try {
 +                    mNext = mSerializer.read(mStorage, mIn);
 +                    // TODO: Serializer is unable to determine state of
 +                    // properties, and so they are lost. Since these storables
 +                    // came directly from a cursor, we know they are clean.
 +                    mNext.markAllPropertiesClean();
 +                } catch (EOFException e) {
 +                    mIn = null;
 +                } catch (IOException e) {
 +                    throw new UndeclaredThrowableException(e);
 +                }
 +            }
 +            return mNext;
 +        }
 +
 +        S next() {
 +            S next = peek();
 +            mNext = null;
 +            return next;
 +        }
 +    }
 +
 +    private static class Merger<S extends Storable> implements Iterator<S> {
 +        private final PriorityQueue<Iter<S>> mPQ;
 +
 +        private S mNext;
 +
 +        Merger(PriorityQueue<Iter<S>> pq) {
 +            mPQ = pq;
 +        }
 +
 +        public boolean hasNext() {
 +            if (mNext == null) {
 +                while (true) {
 +                    Iter<S> iter = mPQ.poll();
 +                    if (iter == null) {
 +                        return false;
 +                    }
 +                    if ((mNext = iter.next()) != null) {
 +                        // Iter is not exhausted, so put it back in to be used
 +                        // again. Adding it back causes it to be inserted in
 +                        // the proper order, based on the next element it has
 +                        // to offer.
 +                        mPQ.add(iter);
 +                        return true;
 +                    }
 +                }
 +            }
 +            return true;
 +        }
 +
 +        public S next() {
 +            if (hasNext()) {
 +                S next = mNext;
 +                mNext = null;
 +                return next;
 +            }
 +            throw new NoSuchElementException();
 +        }
 +
 +        public void remove() {
 +            throw new UnsupportedOperationException();
 +        }
 +    }
 +
 +    private static class ObjectArrayIterator<E> implements Iterator<E> {
 +        private final E[] mElements;
 +        private final int mEnd;
 +        private int mIndex;
 +
 +        public ObjectArrayIterator(E[] elements, int start, int end) {
 +            mElements = elements;
 +            mEnd = end;
 +            mIndex = start;
 +        }
 +
 +        public boolean hasNext() {
 +            return mIndex < mEnd;
 +        }
 +
 +        public E next() {
 +            if (mIndex >= mEnd) {
 +                throw new NoSuchElementException();
 +            }
 +            return mElements[mIndex++];
 +        }
 +
 +        public void remove() {
 +            throw new UnsupportedOperationException();
 +        }
 +    }
 +}
 diff --git a/src/main/java/com/amazon/carbonado/cursor/MultiTransformedCursor.java b/src/main/java/com/amazon/carbonado/cursor/MultiTransformedCursor.java new file mode 100644 index 0000000..0cfd197 --- /dev/null +++ b/src/main/java/com/amazon/carbonado/cursor/MultiTransformedCursor.java @@ -0,0 +1,132 @@ +/*
 + * Copyright 2006 Amazon Technologies, Inc. or its affiliates.
 + * Amazon, Amazon.com and Carbonado are trademarks or registered trademarks
 + * of Amazon Technologies, Inc. or its affiliates.  All rights reserved.
 + *
 + * Licensed under the Apache License, Version 2.0 (the "License");
 + * you may not use this file except in compliance with the License.
 + * You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package com.amazon.carbonado.cursor;
 +
 +import java.util.NoSuchElementException;
 +
 +import com.amazon.carbonado.Cursor;
 +import com.amazon.carbonado.FetchException;
 +import com.amazon.carbonado.FetchInterruptedException;
 +
 +/**
 + * Abstract cursor which wraps another cursor and transforms each storable
 + * result into a set of target storables. This class can be used for
 + * implementing one-to-many joins. Use {@link TransformedCursor} for one-to-one
 + * joins.
 + *
 + * @author Brian S O'Neill
 + */
 +public abstract class MultiTransformedCursor<S, T> extends AbstractCursor<T> {
 +    private final Cursor<S> mCursor;
 +
 +    private Cursor<T> mNextCursor;
 +
 +    protected MultiTransformedCursor(Cursor<S> cursor) {
 +        if (cursor == null) {
 +            throw new IllegalArgumentException();
 +        }
 +        mCursor = cursor;
 +    }
 +
 +    /**
 +     * This method must be implemented to transform storables. If the storable
 +     * cannot be transformed, either throw a FetchException or return null. If
 +     * null is returned, the storable is simply filtered out.
 +     *
 +     * @return transformed storables, or null to filter it out
 +     */
 +    protected abstract Cursor<T> transform(S storable) throws FetchException;
 +
 +    public void close() throws FetchException {
 +        synchronized (mCursor) {
 +            mCursor.close();
 +            if (mNextCursor != null) {
 +                mNextCursor.close();
 +                mNextCursor = null;
 +            }
 +        }
 +    }
 +
 +    public boolean hasNext() throws FetchException {
 +        synchronized (mCursor) {
 +            if (mNextCursor != null) {
 +                if (mNextCursor.hasNext()) {
 +                    return true;
 +                }
 +                mNextCursor.close();
 +                mNextCursor = null;
 +            }
 +            try {
 +                int count = 0;
 +                while (mCursor.hasNext()) {
 +                    Cursor<T> nextCursor = transform(mCursor.next());
 +                    if (nextCursor != null) {
 +                        if (nextCursor.hasNext()) {
 +                            mNextCursor = nextCursor;
 +                            return true;
 +                        }
 +                        nextCursor.close();
 +                    }
 +                    interruptCheck(++count);
 +                }
 +            } catch (NoSuchElementException e) {
 +            }
 +            return false;
 +        }
 +    }
 +
 +    public T next() throws FetchException {
 +        synchronized (mCursor) {
 +            if (hasNext()) {
 +                return mNextCursor.next();
 +            }
 +            throw new NoSuchElementException();
 +        }
 +    }
 +
 +    public int skipNext(int amount) throws FetchException {
 +        synchronized (mCursor) {
 +            if (amount <= 0) {
 +                if (amount < 0) {
 +                    throw new IllegalArgumentException("Cannot skip negative amount: " + amount);
 +                }
 +                return 0;
 +            }
 +
 +            int count = 0;
 +            while (hasNext()) {
 +                int chunk = mNextCursor.skipNext(amount);
 +                count += chunk;
 +                if ((amount -= chunk) <= 0) {
 +                    break;
 +                }
 +                interruptCheck(count);
 +            }
 +
 +            return count;
 +        }
 +    }
 +
 +    private void interruptCheck(int count) throws FetchException {
 +        if ((count & ~0xff) == 0 && Thread.interrupted()) {
 +            close();
 +            throw new FetchInterruptedException();
 +        }
 +    }
 +}
 diff --git a/src/main/java/com/amazon/carbonado/cursor/SortBuffer.java b/src/main/java/com/amazon/carbonado/cursor/SortBuffer.java new file mode 100644 index 0000000..089488c --- /dev/null +++ b/src/main/java/com/amazon/carbonado/cursor/SortBuffer.java @@ -0,0 +1,53 @@ +/*
 + * Copyright 2006 Amazon Technologies, Inc. or its affiliates.
 + * Amazon, Amazon.com and Carbonado are trademarks or registered trademarks
 + * of Amazon Technologies, Inc. or its affiliates.  All rights reserved.
 + *
 + * Licensed under the Apache License, Version 2.0 (the "License");
 + * you may not use this file except in compliance with the License.
 + * You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package com.amazon.carbonado.cursor;
 +
 +import java.util.Comparator;
 +import java.util.Collection;
 +
 +import com.amazon.carbonado.FetchException;
 +
 +/**
 + * Buffers up Storable instances allowing them to be sorted. Should any method
 + * need to throw an undeclared exception, wrap it with an
 + * UndeclaredThrowableException.
 + *
 + * @author Brian S O'Neill
 + * @see SortedCursor
 + */
 +public interface SortBuffer<S> extends Collection<S> {
 +    /**
 +     * Clears buffer and assigns a comparator for sorting.
 +     *
 +     * @throws IllegalArgumentException if comparator is null
 +     */
 +    void prepare(Comparator<S> comparator);
 +
 +    /**
 +     * Finish sorting buffer.
 +     *
 +     * @throws IllegalStateException if prepare was never called
 +     */
 +    void sort() throws FetchException;
 +
 +    /**
 +     * Clear and close buffer.
 +     */
 +    void close() throws FetchException;
 +}
 diff --git a/src/main/java/com/amazon/carbonado/cursor/SortedCursor.java b/src/main/java/com/amazon/carbonado/cursor/SortedCursor.java new file mode 100644 index 0000000..7a728bb --- /dev/null +++ b/src/main/java/com/amazon/carbonado/cursor/SortedCursor.java @@ -0,0 +1,332 @@ +/*
 + * Copyright 2006 Amazon Technologies, Inc. or its affiliates.
 + * Amazon, Amazon.com and Carbonado are trademarks or registered trademarks
 + * of Amazon Technologies, Inc. or its affiliates.  All rights reserved.
 + *
 + * Licensed under the Apache License, Version 2.0 (the "License");
 + * you may not use this file except in compliance with the License.
 + * You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package com.amazon.carbonado.cursor;
 +
 +import java.lang.reflect.UndeclaredThrowableException;
 +import java.util.Comparator;
 +import java.util.Iterator;
 +import java.util.List;
 +import java.util.NoSuchElementException;
 +
 +import org.cojen.util.BeanComparator;
 +
 +import com.amazon.carbonado.FetchException;
 +import com.amazon.carbonado.FetchInterruptedException;
 +import com.amazon.carbonado.Cursor;
 +import com.amazon.carbonado.Storable;
 +
 +import com.amazon.carbonado.info.Direction;
 +import com.amazon.carbonado.info.OrderedProperty;
 +
 +/**
 + * Wraps another Cursor and ensures the results are sorted. If the elements in
 + * the source cursor are already partially sorted, a handled comparator can be
 + * passed in which specifies the partial ordering. Elements are then processed
 + * in smaller chunks rather than sorting the entire set. The handled comparator
 + * can represent ascending or descending order of source elements.
 + *
 + * @author Brian S O'Neill
 + */
 +public class SortedCursor<S> extends AbstractCursor<S> {
 +    /**
 +     * Convenience method to create a comparator which orders storables by the
 +     * given order-by properties. The property names may be prefixed with '+'
 +     * or '-' to indicate ascending or descending order. If the prefix is
 +     * omitted, ascending order is assumed.
 +     *
 +     * @param type type of storable to create comparator for
 +     * @param orderProperties list of properties to order by
 +     * @throws IllegalArgumentException if any property is null or not a member
 +     * of storable type
 +     */
 +    public static <S> Comparator<S> createComparator(Class<S> type, String... orderProperties) {
 +        BeanComparator bc = BeanComparator.forClass(type);
 +        for (String property : orderProperties) {
 +            bc = bc.orderBy(property);
 +            bc = bc.caseSensitive();
 +        }
 +        return bc;
 +    }
 +
 +    /**
 +     * Convenience method to create a comparator which orders storables by the
 +     * given properties.
 +     *
 +     * @param properties list of properties to order by
 +     * @throws IllegalArgumentException if no properties or if any property is null
 +     */
 +    public static <S extends Storable> Comparator<S>
 +        createComparator(OrderedProperty<S>... properties)
 +    {
 +        if (properties == null || properties.length == 0 || properties[0] == null) {
 +            throw new IllegalArgumentException();
 +        }
 +
 +        Class<S> type = properties[0].getChainedProperty().getPrimeProperty().getEnclosingType();
 +
 +        BeanComparator bc = BeanComparator.forClass(type);
 +
 +        for (OrderedProperty<S> property : properties) {
 +            if (property == null) {
 +                throw new IllegalArgumentException();
 +            }
 +            bc = bc.orderBy(property.getChainedProperty().toString());
 +            bc = bc.caseSensitive();
 +            if (property.getDirection() == Direction.DESCENDING) {
 +                bc = bc.reverse();
 +            }
 +        }
 +
 +        return bc;
 +    }
 +
 +    /**
 +     * Convenience method to create a comparator which orders storables by the
 +     * given properties.
 +     *
 +     * @param properties list of properties to order by
 +     * @throws IllegalArgumentException if no properties or if any property is null
 +     */
 +    public static <S extends Storable> Comparator<S>
 +        createComparator(List<OrderedProperty<S>> properties)
 +    {
 +        if (properties == null || properties.size() == 0 || properties.get(0) == null) {
 +            throw new IllegalArgumentException();
 +        }
 +
 +        Class<S> type =
 +            properties.get(0).getChainedProperty().getPrimeProperty().getEnclosingType();
 +
 +        BeanComparator bc = BeanComparator.forClass(type);
 +
 +        for (OrderedProperty<S> property : properties) {
 +            if (property == null) {
 +                throw new IllegalArgumentException();
 +            }
 +            bc = bc.orderBy(property.getChainedProperty().toString());
 +            bc = bc.caseSensitive();
 +            if (property.getDirection() == Direction.DESCENDING) {
 +                bc = bc.reverse();
 +            }
 +        }
 +
 +        return bc;
 +    }
 +
 +    /** Wrapped cursor */
 +    private final Cursor<S> mCursor;
 +    /** Buffer to store and sort results */
 +    private final SortBuffer<S> mChunkBuffer;
 +    /** Optional comparator which matches ordering already handled by wrapped cursor */
 +    private final Comparator<S> mChunkMatcher;
 +    /** Comparator to use for sorting chunks */
 +    private final Comparator<S> mChunkSorter;
 +
 +    /** Iteration over current contents in mChunkBuffer */
 +    private Iterator<S> mChunkIterator;
 +
 +    /**
 +     * First record in a chunk, according to chunk matcher. In order to tell if
 +     * the next chunk has been reached, a record has to be read from the
 +     * wrapped cursor. The record is "pushed back" here for use when the next
 +     * chunk is ready to be processed.
 +     */
 +    private S mNextChunkStart;
 +
 +    /**
 +     * @param cursor cursor to wrap
 +     * @param buffer required buffer to hold results
 +     * @param handled optional comparator which represents how the results are
 +     * already sorted
 +     * @param finisher required comparator which finishes the sort
 +     */
 +    public SortedCursor(Cursor<S> cursor, SortBuffer<S> buffer,
 +                        Comparator<S> handled, Comparator<S> finisher) {
 +        if (cursor == null || finisher == null) {
 +            throw new IllegalArgumentException();
 +        }
 +        mCursor = cursor;
 +        mChunkBuffer =  buffer;
 +        mChunkMatcher = handled;
 +        mChunkSorter = finisher;
 +    }
 +
 +    /**
 +     * @param cursor cursor to wrap
 +     * @param buffer required buffer to hold results
 +     * @param type type of storable to create cursor for
 +     * @param orderProperties list of properties to order by
 +     * @throws IllegalArgumentException if any property is null or not a member
 +     * of storable type
 +     */
 +    public SortedCursor(Cursor<S> cursor, SortBuffer<S> buffer,
 +                        Class<S> type, String... orderProperties) {
 +        this(cursor, buffer, null, createComparator(type, orderProperties));
 +    }
 +
 +    /**
 +     * Returns a comparator representing the effective sort order of this cursor.
 +     */
 +    public Comparator<S> comparator() {
 +        if (mChunkMatcher == null) {
 +            return mChunkSorter;
 +        }
 +        return new Comparator<S>() {
 +            public int compare(S a, S b) {
 +                int result = mChunkMatcher.compare(a, b);
 +                if (result == 0) {
 +                    result = mChunkSorter.compare(a, b);
 +                }
 +                return result;
 +            }
 +        };
 +    }
 +
 +    public synchronized void close() throws FetchException {
 +        mCursor.close();
 +        mChunkIterator = null;
 +        mChunkBuffer.close();
 +    }
 +
 +    public synchronized boolean hasNext() throws FetchException {
 +        prepareNextChunk();
 +        try {
 +            if (mChunkIterator.hasNext()) {
 +                return true;
 +            }
 +        } catch (UndeclaredThrowableException e) {
 +            throw toFetchException(e);
 +        }
 +        close();
 +        return false;
 +    }
 +
 +    public synchronized S next() throws FetchException {
 +        prepareNextChunk();
 +        try {
 +            return mChunkIterator.next();
 +        } catch (UndeclaredThrowableException e) {
 +            throw toFetchException(e);
 +        } catch (NoSuchElementException e) {
 +            try {
 +                close();
 +            } catch (FetchException e2) {
 +                // Don't care.
 +            }
 +            throw e;
 +        }
 +    }
 +
 +    public synchronized int skipNext(int amount) throws FetchException {
 +        if (amount <= 0) {
 +            if (amount < 0) {
 +                throw new IllegalArgumentException("Cannot skip negative amount: " + amount);
 +            }
 +            return 0;
 +        }
 +
 +        int count = 0;
 +        while (--amount >= 0 && hasNext()) {
 +            next();
 +            count++;
 +        }
 +
 +        return count;
 +    }
 +
 +    private void prepareNextChunk() throws FetchException {
 +        if (mChunkIterator != null && (mChunkMatcher == null || mChunkIterator.hasNext())) {
 +            // Ready to go.
 +            return;
 +        }
 +
 +        Cursor<S> cursor = mCursor;
 +        SortBuffer<S> buffer = mChunkBuffer;
 +        Comparator<S> matcher = mChunkMatcher;
 +
 +        try {
 +            mChunkIterator = null;
 +            buffer.prepare(mChunkSorter);
 +
 +            fill: {
 +                if (matcher == null) {
 +                    // Buffer up entire results and sort.
 +                    int count = 0;
 +                    while (cursor.hasNext()) {
 +                        // Check every so often if interrupted.
 +                        if ((++count & ~0xff) == 0 && Thread.interrupted()) {
 +                            throw new FetchInterruptedException();
 +                        }
 +                        buffer.add(cursor.next());
 +                    }
 +                    break fill;
 +                }
 +
 +                // Read a chunk into the buffer. First record is compared against
 +                // subsequent records, to determine when the chunk end is reached.
 +
 +                S chunkStart;
 +                if (mNextChunkStart != null) {
 +                    chunkStart = mNextChunkStart;
 +                    mNextChunkStart = null;
 +                } else if (cursor.hasNext()) {
 +                    chunkStart = cursor.next();
 +                } else {
 +                    break fill;
 +                }
 +
 +                buffer.add(chunkStart);
 +                int count = 1;
 +
 +                while (cursor.hasNext()) {
 +                    // Check every so often if interrupted.
 +                    if ((++count & ~0xff) == 0 && Thread.interrupted()) {
 +                        throw new FetchInterruptedException();
 +                    }
 +                    S next = cursor.next();
 +                    if (matcher.compare(chunkStart, next) != 0) {
 +                        // Save for reading next chunk later.
 +                        mNextChunkStart = next;
 +                        break;
 +                    }
 +                    buffer.add(next);
 +                }
 +            }
 +
 +            if (buffer.size() > 1) {
 +                buffer.sort();
 +            }
 +
 +            mChunkIterator = buffer.iterator();
 +        } catch (UndeclaredThrowableException e) {
 +            throw toFetchException(e);
 +        }
 +    }
 +
 +    private FetchException toFetchException(UndeclaredThrowableException e) {
 +        Throwable cause = e.getCause();
 +        if (cause == null) {
 +            cause = e;
 +        }
 +        if (cause instanceof FetchException) {
 +            return (FetchException) cause;
 +        }
 +        return new FetchException(null, cause);
 +    }
 +}
 diff --git a/src/main/java/com/amazon/carbonado/cursor/SymmetricDifferenceCursor.java b/src/main/java/com/amazon/carbonado/cursor/SymmetricDifferenceCursor.java new file mode 100644 index 0000000..6851464 --- /dev/null +++ b/src/main/java/com/amazon/carbonado/cursor/SymmetricDifferenceCursor.java @@ -0,0 +1,123 @@ +/*
 + * Copyright 2006 Amazon Technologies, Inc. or its affiliates.
 + * Amazon, Amazon.com and Carbonado are trademarks or registered trademarks
 + * of Amazon Technologies, Inc. or its affiliates.  All rights reserved.
 + *
 + * Licensed under the Apache License, Version 2.0 (the "License");
 + * you may not use this file except in compliance with the License.
 + * You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package com.amazon.carbonado.cursor;
 +
 +import java.util.Comparator;
 +import java.util.NoSuchElementException;
 +
 +import com.amazon.carbonado.FetchException;
 +import com.amazon.carbonado.Cursor;
 +
 +/**
 + * Wraps two Cursors and performs a <i>symmetric set difference</i>
 + * operation. In boolean logic, this is an <i>exclusive or</i> operation.
 + *
 + * <p>Both cursors must return results in the same order. Ordering is preserved
 + * by the difference.
 + *
 + * @author Brian S O'Neill
 + * @see UnionCursor
 + * @see IntersectionCursor
 + * @see DifferenceCursor
 + */
 +public class SymmetricDifferenceCursor<S> extends AbstractCursor<S> {
 +    private final Cursor<S> mLeftCursor;
 +    private final Cursor<S> mRightCursor;
 +    private final Comparator<S> mOrder;
 +
 +    private S mNextLeft;
 +    private S mNextRight;
 +    private int mCompareResult;
 +
 +    /**
 +     * @param left cursor to wrap
 +     * @param right cursor to wrap
 +     * @param order describes sort ordering of wrapped cursors, which must be
 +     * a total ordering
 +     */
 +    public SymmetricDifferenceCursor(Cursor<S> left, Cursor<S> right, Comparator<S> order) {
 +        if (left == null || right == null || order == null) {
 +            throw new IllegalArgumentException();
 +        }
 +        mLeftCursor = left;
 +        mRightCursor = right;
 +        mOrder = order;
 +    }
 +
 +    public synchronized void close() throws FetchException {
 +        mLeftCursor.close();
 +        mRightCursor.close();
 +        mNextLeft = null;
 +        mNextRight = null;
 +    }
 +
 +    public boolean hasNext() throws FetchException {
 +        return compareNext() != 0;
 +    }
 +
 +    /**
 +     * Returns 0 if no next element available, <0 if next element is from
 +     * left source cursor, and >0 if next element is from right source
 +     * cursor.
 +     */
 +    public synchronized int compareNext() throws FetchException {
 +        if (mCompareResult != 0) {
 +            return mCompareResult;
 +        }
 +
 +        while (true) {
 +            if (mNextLeft == null && mLeftCursor.hasNext()) {
 +                mNextLeft = mLeftCursor.next();
 +            }
 +            if (mNextRight == null && mRightCursor.hasNext()) {
 +                mNextRight = mRightCursor.next();
 +            }
 +
 +            if (mNextLeft == null) {
 +                return mNextRight != null ? 1 : 0;
 +            }
 +            if (mNextRight == null) {
 +                return -1;
 +            }
 +
 +            if ((mCompareResult = mOrder.compare(mNextLeft, mNextRight)) == 0) {
 +                mNextLeft = null;
 +                mNextRight = null;
 +            } else {
 +                return mCompareResult;
 +            }
 +        }
 +    }
 +
 +    public synchronized S next() throws FetchException {
 +        S next;
 +        int result = compareNext();
 +        if (result < 0) {
 +            next = mNextLeft;
 +            mNextLeft = null;
 +        } else if (result > 0) {
 +            next = mNextRight;
 +            mNextRight = null;
 +        } else {
 +            throw new NoSuchElementException();
 +        }
 +        mCompareResult = 0;
 +        return next;
 +    }
 +}
 diff --git a/src/main/java/com/amazon/carbonado/cursor/ThrottledCursor.java b/src/main/java/com/amazon/carbonado/cursor/ThrottledCursor.java new file mode 100644 index 0000000..2f4c5a0 --- /dev/null +++ b/src/main/java/com/amazon/carbonado/cursor/ThrottledCursor.java @@ -0,0 +1,101 @@ +/*
 + * Copyright 2006 Amazon Technologies, Inc. or its affiliates.
 + * Amazon, Amazon.com and Carbonado are trademarks or registered trademarks
 + * of Amazon Technologies, Inc. or its affiliates.  All rights reserved.
 + *
 + * Licensed under the Apache License, Version 2.0 (the "License");
 + * you may not use this file except in compliance with the License.
 + * You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package com.amazon.carbonado.cursor;
 +
 +import com.amazon.carbonado.Cursor;
 +import com.amazon.carbonado.FetchException;
 +import com.amazon.carbonado.FetchInterruptedException;
 +
 +import com.amazon.carbonado.util.Throttle;
 +
 +/**
 + * Wraps another cursor and fetches results at a reduced speed.
 + *
 + * @author Brian S O'Neill
 + */
 +public class ThrottledCursor<S> extends AbstractCursor<S> {
 +    private static final int WINDOW_SIZE = 10;
 +    private static final int SLEEP_PRECISION_MILLIS = 50;
 +
 +    private final Cursor<S> mCursor;
 +    private final Throttle mThrottle;
 +    private final double mDesiredSpeed;
 +
 +    /**
 +     * @param cursor cursor to wrap
 +     * @param throttle 1.0 = fetch at full speed, 0.5 = fetch at half speed,
 +     * 0.1 = fetch at one tenth speed, etc.
 +     */
 +    public ThrottledCursor(Cursor<S> cursor, double throttle) {
 +        mCursor = cursor;
 +        if (throttle < 1.0) {
 +            if (throttle < 0.0) {
 +                throttle = 0.0;
 +            }
 +            mThrottle = new Throttle(WINDOW_SIZE);
 +            mDesiredSpeed = throttle;
 +        } else {
 +            mThrottle = null;
 +            mDesiredSpeed = 1.0;
 +        }
 +    }
 +
 +    public void close() throws FetchException {
 +        mCursor.close();
 +    }
 +
 +    public boolean hasNext() throws FetchException {
 +        return mCursor.hasNext();
 +    }
 +
 +    public S next() throws FetchException {
 +        throttle();
 +        return mCursor.next();
 +    }
 +
 +    public int skipNext(int amount) throws FetchException {
 +        if (amount <= 0) {
 +            if (amount < 0) {
 +                throw new IllegalArgumentException("Cannot skip negative amount: " + amount);
 +            }
 +            return 0;
 +        }
 +
 +        int count = 0;
 +        while (--amount >= 0) {
 +            throttle();
 +            if (skipNext(1) <= 0) {
 +                break;
 +            }
 +            count++;
 +        }
 +
 +        return count;
 +    }
 +
 +    private void throttle() throws FetchInterruptedException {
 +        if (mThrottle != null) {
 +            try {
 +                mThrottle.throttle(mDesiredSpeed, SLEEP_PRECISION_MILLIS);
 +            } catch (InterruptedException e) {
 +                throw new FetchInterruptedException(e);
 +            }
 +        }
 +    }
 +}
 diff --git a/src/main/java/com/amazon/carbonado/cursor/TransformedCursor.java b/src/main/java/com/amazon/carbonado/cursor/TransformedCursor.java new file mode 100644 index 0000000..0860cf0 --- /dev/null +++ b/src/main/java/com/amazon/carbonado/cursor/TransformedCursor.java @@ -0,0 +1,119 @@ +/*
 + * Copyright 2006 Amazon Technologies, Inc. or its affiliates.
 + * Amazon, Amazon.com and Carbonado are trademarks or registered trademarks
 + * of Amazon Technologies, Inc. or its affiliates.  All rights reserved.
 + *
 + * Licensed under the Apache License, Version 2.0 (the "License");
 + * you may not use this file except in compliance with the License.
 + * You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package com.amazon.carbonado.cursor;
 +
 +import java.util.NoSuchElementException;
 +
 +import com.amazon.carbonado.Cursor;
 +import com.amazon.carbonado.FetchException;
 +import com.amazon.carbonado.FetchInterruptedException;
 +
 +/**
 + * Abstract cursor which wraps another cursor and transforms each storable
 + * result into a target storable. This class can be used for implementing
 + * one-to-one joins. Use {@link MultiTransformedCursor} for one-to-many joins.
 + *
 + * @author Brian S O'Neill
 + */
 +public abstract class TransformedCursor<S, T> extends AbstractCursor<T> {
 +    private final Cursor<S> mCursor;
 +
 +    private T mNext;
 +
 +    protected TransformedCursor(Cursor<S> cursor) {
 +        if (cursor == null) {
 +            throw new IllegalArgumentException();
 +        }
 +        mCursor = cursor;
 +    }
 +
 +    /**
 +     * This method must be implemented to transform storables. If the storable
 +     * cannot be transformed, either throw a FetchException or return null. If
 +     * null is returned, the storable is simply filtered out.
 +     *
 +     * @return transformed storable, or null to filter it out
 +     */
 +    protected abstract T transform(S storable) throws FetchException;
 +
 +    public void close() throws FetchException {
 +        synchronized (mCursor) {
 +            mCursor.close();
 +            mNext = null;
 +        }
 +    }
 +
 +    public boolean hasNext() throws FetchException {
 +        synchronized (mCursor) {
 +            if (mNext != null) {
 +                return true;
 +            }
 +            try {
 +                int count = 0;
 +                while (mCursor.hasNext()) {
 +                    T next = transform(mCursor.next());
 +                    if (next != null) {
 +                        mNext = next;
 +                        return true;
 +                    }
 +                    interruptCheck(++count);
 +                }
 +            } catch (NoSuchElementException e) {
 +            }
 +            return false;
 +        }
 +    }
 +
 +    public T next() throws FetchException {
 +        synchronized (mCursor) {
 +            if (hasNext()) {
 +                T next = mNext;
 +                mNext = null;
 +                return next;
 +            }
 +            throw new NoSuchElementException();
 +        }
 +    }
 +
 +    public int skipNext(int amount) throws FetchException {
 +        synchronized (mCursor) {
 +            if (amount <= 0) {
 +                if (amount < 0) {
 +                    throw new IllegalArgumentException("Cannot skip negative amount: " + amount);
 +                }
 +                return 0;
 +            }
 +
 +            int count = 0;
 +            while (--amount >= 0 && hasNext()) {
 +                interruptCheck(++count);
 +                mNext = null;
 +            }
 +
 +            return count;
 +        }
 +    }
 +
 +    private void interruptCheck(int count) throws FetchException {
 +        if ((count & ~0xff) == 0 && Thread.interrupted()) {
 +            close();
 +            throw new FetchInterruptedException();
 +        }
 +    }
 +}
 diff --git a/src/main/java/com/amazon/carbonado/cursor/UnionCursor.java b/src/main/java/com/amazon/carbonado/cursor/UnionCursor.java new file mode 100644 index 0000000..7a62678 --- /dev/null +++ b/src/main/java/com/amazon/carbonado/cursor/UnionCursor.java @@ -0,0 +1,106 @@ +/*
 + * Copyright 2006 Amazon Technologies, Inc. or its affiliates.
 + * Amazon, Amazon.com and Carbonado are trademarks or registered trademarks
 + * of Amazon Technologies, Inc. or its affiliates.  All rights reserved.
 + *
 + * Licensed under the Apache License, Version 2.0 (the "License");
 + * you may not use this file except in compliance with the License.
 + * You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package com.amazon.carbonado.cursor;
 +
 +import java.util.Comparator;
 +import java.util.NoSuchElementException;
 +
 +import com.amazon.carbonado.FetchException;
 +import com.amazon.carbonado.Cursor;
 +
 +/**
 + * Wraps two Cursors and performs a <i>set union</i> operation. In boolean
 + * logic, this is an <i>or</i> operation.
 + *
 + * <p>Both cursors must return results in the same order. Ordering is preserved
 + * by the union.
 + *
 + * @author Brian S O'Neill
 + * @see IntersectionCursor
 + * @see DifferenceCursor
 + * @see SymmetricDifferenceCursor
 + */
 +public class UnionCursor<S> extends AbstractCursor<S> {
 +    private final Cursor<S> mLeftCursor;
 +    private final Cursor<S> mRightCursor;
 +    private final Comparator<S> mOrder;
 +
 +    private S mNextLeft;
 +    private S mNextRight;
 +
 +    /**
 +     * @param left cursor to wrap
 +     * @param right cursor to wrap
 +     * @param order describes sort ordering of wrapped cursors, which must be
 +     * a total ordering
 +     */
 +    public UnionCursor(Cursor<S> left, Cursor<S> right, Comparator<S> order) {
 +        if (left == null || right == null || order == null) {
 +            throw new IllegalArgumentException();
 +        }
 +        mLeftCursor = left;
 +        mRightCursor = right;
 +        mOrder = order;
 +    }
 +
 +    public synchronized void close() throws FetchException {
 +        mLeftCursor.close();
 +        mRightCursor.close();
 +        mNextLeft = null;
 +        mNextRight = null;
 +    }
 +
 +    public synchronized boolean hasNext() throws FetchException {
 +        if (mNextLeft == null && mLeftCursor.hasNext()) {
 +            mNextLeft = mLeftCursor.next();
 +        }
 +        if (mNextRight == null && mRightCursor.hasNext()) {
 +            mNextRight = mRightCursor.next();
 +        }
 +        return mNextLeft != null || mNextRight != null;
 +    }
 +
 +    public synchronized S next() throws FetchException {
 +        if (hasNext()) {
 +            S next;
 +            if (mNextLeft == null) {
 +                next = mNextRight;
 +                mNextRight = null;
 +            } else if (mNextRight == null) {
 +                next = mNextLeft;
 +                mNextLeft = null;
 +            } else {
 +                int result = mOrder.compare(mNextLeft, mNextRight);
 +                if (result < 0) {
 +                    next = mNextLeft;
 +                    mNextLeft = null;
 +                } else if (result > 0) {
 +                    next = mNextRight;
 +                    mNextRight = null;
 +                } else {
 +                    next = mNextLeft;
 +                    mNextLeft = null;
 +                    mNextRight = null;
 +                }
 +            }
 +            return next;
 +        }
 +        throw new NoSuchElementException();
 +    }
 +}
 diff --git a/src/main/java/com/amazon/carbonado/cursor/WorkFilePool.java b/src/main/java/com/amazon/carbonado/cursor/WorkFilePool.java new file mode 100644 index 0000000..94c7b84 --- /dev/null +++ b/src/main/java/com/amazon/carbonado/cursor/WorkFilePool.java @@ -0,0 +1,192 @@ +/*
 + * Copyright 2006 Amazon Technologies, Inc. or its affiliates.
 + * Amazon, Amazon.com and Carbonado are trademarks or registered trademarks
 + * of Amazon Technologies, Inc. or its affiliates.  All rights reserved.
 + *
 + * Licensed under the Apache License, Version 2.0 (the "License");
 + * you may not use this file except in compliance with the License.
 + * You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package com.amazon.carbonado.cursor;
 +
 +import java.io.File;
 +import java.io.IOException;
 +import java.io.RandomAccessFile;
 +
 +import java.util.ArrayList;
 +import java.util.HashSet;
 +import java.util.List;
 +import java.util.Set;
 +
 +import java.util.concurrent.ConcurrentMap;
 +import java.util.concurrent.ConcurrentHashMap;
 +
 +/**
 + * Used internally by {@link MergeSortBuffer}.
 + *
 + * @author Brian S O'Neill
 + */
 +class WorkFilePool {
 +    private static final String cTempDir = System.getProperty("java.io.tmpdir");
 +
 +    private static final ConcurrentMap<String, WorkFilePool> cPools =
 +        new ConcurrentHashMap<String, WorkFilePool>();
 +
 +    /**
 +     * @param tempDir directory to store temp files for merging, or null for default
 +     */
 +    static WorkFilePool getInstance(String tempDir) {
 +        // This method uses ConcurrentMap features to eliminate the need to
 +        // ever synchronize access, since this method may be called frequently.
 +
 +        if (tempDir == null) {
 +            tempDir = cTempDir;
 +        }
 +
 +        WorkFilePool pool = cPools.get(tempDir);
 +
 +        if (pool != null) {
 +            return pool;
 +        }
 +
 +        String canonical;
 +        try {
 +            canonical = new File(tempDir).getCanonicalPath();
 +        } catch (IOException e) {
 +            canonical = new File(tempDir).getAbsolutePath();
 +        }
 +
 +        if (!canonical.equals(tempDir)) {
 +            pool = getInstance(canonical);
 +            cPools.putIfAbsent(tempDir, pool);
 +            return pool;
 +        }
 +
 +        pool = new WorkFilePool(new File(tempDir));
 +
 +        WorkFilePool existing = cPools.putIfAbsent(tempDir, pool);
 +
 +        if (existing == null) {
 +            // New pool is the winner, so finish off initialization. Pool can
 +            // be used concurrently by another thread without shutdown hook.
 +            pool.addShutdownHook(tempDir);
 +        } else {
 +            pool = existing;
 +        }
 +
 +        return pool;
 +    }
 +
 +    private final File mTempDir;
 +
 +    // Work files not currently being used by any MergeSortBuffer.
 +    private final List<RandomAccessFile> mWorkFilePool;
 +    // Instances of MergeSortBuffer, to be notified on shutdown that they
 +    // should close their work files.
 +    private final Set<MergeSortBuffer<?>> mWorkFileUsers;
 +
 +    private Thread mShutdownHook;
 +
 +    /**
 +     * @param tempDir directory to store temp files for merging, or null for default
 +     */
 +    private WorkFilePool(File tempDir) {
 +        mTempDir = tempDir;
 +        mWorkFilePool = new ArrayList<RandomAccessFile>();
 +        mWorkFileUsers = new HashSet<MergeSortBuffer<?>>();
 +    }
 +
 +    RandomAccessFile acquireWorkFile(MergeSortBuffer<?> buffer) throws IOException {
 +        synchronized (mWorkFileUsers) {
 +            mWorkFileUsers.add(buffer);
 +        }
 +        synchronized (mWorkFilePool) {
 +            if (mWorkFilePool.size() > 0) {
 +                return mWorkFilePool.remove(mWorkFilePool.size() - 1);
 +            }
 +        }
 +        File file = File.createTempFile("carbonado-mergesort-", null, mTempDir);
 +        file.deleteOnExit();
 +        return new RandomAccessFile(file, "rw");
 +    }
 +
 +    void releaseWorkFiles(List<RandomAccessFile> files) {
 +        synchronized (mWorkFilePool) {
 +            for (RandomAccessFile raf : files) {
 +                try {
 +                    raf.seek(0);
 +                    // Return space to file system.
 +                    raf.setLength(0);
 +                    mWorkFilePool.add(raf);
 +                } catch (IOException e) {
 +                    // Work file is broken, discard it.
 +                    try {
 +                        raf.close();
 +                    } catch (IOException e2) {
 +                    }
 +                }
 +            }
 +        }
 +    }
 +
 +    void unregisterWorkFileUser(MergeSortBuffer<?> buffer) {
 +        synchronized (mWorkFileUsers) {
 +            mWorkFileUsers.remove(buffer);
 +            mWorkFileUsers.notify();
 +        }
 +    }
 +
 +    private synchronized void addShutdownHook(String tempDir) {
 +        if (mShutdownHook != null) {
 +            return;
 +        }
 +
 +        // Add shutdown hook to close work files so that they can be deleted.
 +        String threadName = "MergeSortBuffer shutdown (" + tempDir + ')';
 +
 +        mShutdownHook = new Thread(threadName) {
 +            public void run() {
 +                // Notify users of work files and wait for them to close.
 +                synchronized (mWorkFileUsers) {
 +                    for (MergeSortBuffer<?> buffer : mWorkFileUsers) {
 +                        buffer.stop();
 +                    }
 +                    final long timeout = 10000;
 +                    final long giveup = System.currentTimeMillis() + timeout;
 +                    try {
 +                        while (mWorkFileUsers.size() > 0) {
 +                            long now = System.currentTimeMillis();
 +                            if (now < giveup) {
 +                                mWorkFileUsers.wait(giveup - now);
 +                            } else {
 +                                break;
 +                            }
 +                        }
 +                    } catch (InterruptedException e) {
 +                    }
 +                }
 +
 +                synchronized (mWorkFilePool) {
 +                    for (RandomAccessFile raf : mWorkFilePool) {
 +                        try {
 +                            raf.close();
 +                        } catch (IOException e) {
 +                        }
 +                    }
 +                    mWorkFilePool.clear();
 +                }
 +            }
 +        };
 +
 +        Runtime.getRuntime().addShutdownHook(mShutdownHook);
 +    }
 +}
 diff --git a/src/main/java/com/amazon/carbonado/cursor/package-info.java b/src/main/java/com/amazon/carbonado/cursor/package-info.java new file mode 100644 index 0000000..3553d1a --- /dev/null +++ b/src/main/java/com/amazon/carbonado/cursor/package-info.java @@ -0,0 +1,23 @@ +/*
 + * Copyright 2006 Amazon Technologies, Inc. or its affiliates.
 + * Amazon, Amazon.com and Carbonado are trademarks or registered trademarks
 + * of Amazon Technologies, Inc. or its affiliates.  All rights reserved.
 + *
 + * Licensed under the Apache License, Version 2.0 (the "License");
 + * you may not use this file except in compliance with the License.
 + * You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +/**
 + * Support for advanced processing of cursor results, including basic set
 + * theory operations.
 + */
 +package com.amazon.carbonado.cursor;
 | 
