diff options
Diffstat (limited to 'src/main/java/com/amazon')
13 files changed, 637 insertions, 307 deletions
| diff --git a/src/main/java/com/amazon/carbonado/cursor/AbstractCursor.java b/src/main/java/com/amazon/carbonado/cursor/AbstractCursor.java index fb34617..5fb0fa8 100644 --- a/src/main/java/com/amazon/carbonado/cursor/AbstractCursor.java +++ b/src/main/java/com/amazon/carbonado/cursor/AbstractCursor.java @@ -37,31 +37,67 @@ public abstract class AbstractCursor<S> implements Cursor<S> {      }
      public int copyInto(Collection<? super S> c) throws FetchException {
 -        int originalSize = c.size();
 -        while (hasNext()) {
 -            c.add(next());
 +        try {
 +            int originalSize = c.size();
 +            while (hasNext()) {
 +                c.add(next());
 +            }
 +            return c.size() - originalSize;
 +        } catch (FetchException e) {
 +            try {
 +                close();
 +            } catch (Exception e2) {
 +                // Don't care.
 +            }
 +            throw e;
          }
 -        return c.size() - originalSize;
      }
      public int copyInto(Collection<? super S> c, int limit) throws FetchException {
 -        int originalSize = c.size();
 -        while (--limit >= 0 && hasNext()) {
 -            c.add(next());
 +        try {
 +            int originalSize = c.size();
 +            while (--limit >= 0 && hasNext()) {
 +                c.add(next());
 +            }
 +            return c.size() - originalSize;
 +        } catch (FetchException e) {
 +            try {
 +                close();
 +            } catch (Exception e2) {
 +                // Don't care.
 +            }
 +            throw e;
          }
 -        return c.size() - originalSize;
      }
      public List<S> toList() throws FetchException {
 -        List<S> list = new ArrayList<S>();
 -        copyInto(list);
 -        return list;
 +        try {
 +            List<S> list = new ArrayList<S>();
 +            copyInto(list);
 +            return list;
 +        } catch (FetchException e) {
 +            try {
 +                close();
 +            } catch (Exception e2) {
 +                // Don't care.
 +            }
 +            throw e;
 +        }
      }
      public List<S> toList(int limit) throws FetchException {
 -        List<S> list = new ArrayList<S>();
 -        copyInto(list, limit);
 -        return list;
 +        try {
 +            List<S> list = new ArrayList<S>();
 +            copyInto(list, limit);
 +            return list;
 +        } catch (FetchException e) {
 +            try {
 +                close();
 +            } catch (Exception e2) {
 +                // Don't care.
 +            }
 +            throw e;
 +        }
      }
      public int skipNext(int amount) throws FetchException {
 @@ -72,12 +108,21 @@ public abstract class AbstractCursor<S> implements Cursor<S> {              return 0;
          }
 -        int count = 0;
 -        while (--amount >= 0 && hasNext()) {
 -            next();
 -            count++;
 -        }
 +        try {
 +            int count = 0;
 +            while (--amount >= 0 && hasNext()) {
 +                next();
 +                count++;
 +            }
 -        return count;
 +            return count;
 +        } catch (FetchException e) {
 +            try {
 +                close();
 +            } catch (Exception e2) {
 +                // Don't care.
 +            }
 +            throw e;
 +        }
      }
  }
 diff --git a/src/main/java/com/amazon/carbonado/cursor/DifferenceCursor.java b/src/main/java/com/amazon/carbonado/cursor/DifferenceCursor.java index 8b330ea..a04aec0 100644 --- a/src/main/java/com/amazon/carbonado/cursor/DifferenceCursor.java +++ b/src/main/java/com/amazon/carbonado/cursor/DifferenceCursor.java @@ -73,47 +73,65 @@ public class DifferenceCursor<S> extends AbstractCursor<S> {          S nextLeft;
 -        while (true) {
 -            if (mLeftCursor.hasNext()) {
 -                nextLeft = mLeftCursor.next();
 -            } else {
 -                close();
 -                return false;
 -            }
 -            if (mNextRight == null) {
 -                if (mRightCursor.hasNext()) {
 -                    mNextRight = mRightCursor.next();
 +        try {
 +            while (true) {
 +                if (mLeftCursor.hasNext()) {
 +                    nextLeft = mLeftCursor.next();
                  } else {
 -                    mNext = nextLeft;
 -                    return true;
 +                    close();
 +                    return false;
                  }
 -            }
 -
 -            while (true) {
 -                int result = mOrder.compare(nextLeft, mNextRight);
 -                if (result < 0) {
 -                    mNext = nextLeft;
 -                    return true;
 -                } else if (result > 0) {
 +                if (mNextRight == null) {
                      if (mRightCursor.hasNext()) {
                          mNextRight = mRightCursor.next();
                      } else {
 -                        mNextRight = null;
                          mNext = nextLeft;
                          return true;
                      }
 -                } else {
 -                    break;
 +                }
 +
 +                while (true) {
 +                    int result = mOrder.compare(nextLeft, mNextRight);
 +                    if (result < 0) {
 +                        mNext = nextLeft;
 +                        return true;
 +                    } else if (result > 0) {
 +                        if (mRightCursor.hasNext()) {
 +                            mNextRight = mRightCursor.next();
 +                        } else {
 +                            mNextRight = null;
 +                            mNext = nextLeft;
 +                            return true;
 +                        }
 +                    } else {
 +                        break;
 +                    }
                  }
              }
 +        } catch (FetchException e) {
 +            try {
 +                close();
 +            } catch (Exception e2) {
 +                // Don't care.
 +            }
 +            throw e;
          }
      }
      public S next() throws FetchException {
 -        if (hasNext()) {
 -            S next = mNext;
 -            mNext = null;
 -            return next;
 +        try {
 +            if (hasNext()) {
 +                S next = mNext;
 +                mNext = null;
 +                return next;
 +            }
 +        } catch (FetchException e) {
 +            try {
 +                close();
 +            } catch (Exception e2) {
 +                // Don't care.
 +            }
 +            throw e;
          }
          throw new NoSuchElementException();
      }
 diff --git a/src/main/java/com/amazon/carbonado/cursor/FilteredCursor.java b/src/main/java/com/amazon/carbonado/cursor/FilteredCursor.java index f426f36..c25f8db 100644 --- a/src/main/java/com/amazon/carbonado/cursor/FilteredCursor.java +++ b/src/main/java/com/amazon/carbonado/cursor/FilteredCursor.java @@ -101,15 +101,31 @@ public abstract class FilteredCursor<S> extends AbstractCursor<S> {                  interruptCheck(++count);
              }
          } catch (NoSuchElementException e) {
 +        } catch (FetchException e) {
 +            try {
 +                close();
 +            } catch (Exception e2) {
 +                // Don't care.
 +            }
 +            throw e;
          }
          return false;
      }
      public S next() throws FetchException {
 -        if (hasNext()) {
 -            S next = mNext;
 -            mNext = null;
 -            return next;
 +        try {
 +            if (hasNext()) {
 +                S next = mNext;
 +                mNext = null;
 +                return next;
 +            }
 +        } catch (FetchException e) {
 +            try {
 +                close();
 +            } catch (Exception e2) {
 +                // Don't care.
 +            }
 +            throw e;
          }
          throw new NoSuchElementException();
      }
 @@ -122,13 +138,22 @@ public abstract class FilteredCursor<S> extends AbstractCursor<S> {              return 0;
          }
 -        int count = 0;
 -        while (--amount >= 0 && hasNext()) {
 -            interruptCheck(++count);
 -            mNext = null;
 -        }
 +        try {
 +            int count = 0;
 +            while (--amount >= 0 && hasNext()) {
 +                interruptCheck(++count);
 +                mNext = null;
 +            }
 -        return count;
 +            return count;
 +        } catch (FetchException e) {
 +            try {
 +                close();
 +            } catch (Exception e2) {
 +                // Don't care.
 +            }
 +            throw e;
 +        }
      }
      private void interruptCheck(int count) throws FetchException {
 diff --git a/src/main/java/com/amazon/carbonado/cursor/GroupedCursor.java b/src/main/java/com/amazon/carbonado/cursor/GroupedCursor.java index 202f73a..daf9419 100644 --- a/src/main/java/com/amazon/carbonado/cursor/GroupedCursor.java +++ b/src/main/java/com/amazon/carbonado/cursor/GroupedCursor.java @@ -157,16 +157,32 @@ public abstract class GroupedCursor<S, G> extends AbstractCursor<G> {                  }
              }
          } catch (NoSuchElementException e) {
 +        } catch (FetchException e) {
 +            try {
 +                close();
 +            } catch (Exception e2) {
 +                // Don't care.
 +            }
 +            throw e;
          }
          return false;
      }
      public G next() throws FetchException {
 -        if (hasNext()) {
 -            G next = mNextAggregate;
 -            mNextAggregate = null;
 -            return next;
 +        try {
 +            if (hasNext()) {
 +                G next = mNextAggregate;
 +                mNextAggregate = null;
 +                return next;
 +            }
 +        } catch (FetchException e) {
 +            try {
 +                close();
 +            } catch (Exception e2) {
 +                // Don't care.
 +            }
 +            throw e;
          }
          throw new NoSuchElementException();
      }
 @@ -179,13 +195,22 @@ public abstract class GroupedCursor<S, G> extends AbstractCursor<G> {              return 0;
          }
 -        int count = 0;
 -        while (--amount >= 0 && hasNext()) {
 -            interruptCheck(++count);
 -            mNextAggregate = null;
 -        }
 +        try {
 +            int count = 0;
 +            while (--amount >= 0 && hasNext()) {
 +                interruptCheck(++count);
 +                mNextAggregate = null;
 +            }
 -        return count;
 +            return count;
 +        } catch (FetchException e) {
 +            try {
 +                close();
 +            } catch (Exception e2) {
 +                // Don't care.
 +            }
 +            throw e;
 +        }
      }
      private void interruptCheck(int count) throws FetchException {
 diff --git a/src/main/java/com/amazon/carbonado/cursor/IntersectionCursor.java b/src/main/java/com/amazon/carbonado/cursor/IntersectionCursor.java index 58aba0d..6eb3179 100644 --- a/src/main/java/com/amazon/carbonado/cursor/IntersectionCursor.java +++ b/src/main/java/com/amazon/carbonado/cursor/IntersectionCursor.java @@ -71,47 +71,65 @@ public class IntersectionCursor<S> extends AbstractCursor<S> {          S nextLeft, nextRight;
 -        if (mLeftCursor.hasNext()) {
 -            nextLeft = mLeftCursor.next();
 -        } else {
 -            close();
 -            return false;
 -        }
 -        if (mRightCursor.hasNext()) {
 -            nextRight = mRightCursor.next();
 -        } else {
 -            close();
 -            return false;
 -        }
 +        try {
 +            if (mLeftCursor.hasNext()) {
 +                nextLeft = mLeftCursor.next();
 +            } else {
 +                close();
 +                return false;
 +            }
 +            if (mRightCursor.hasNext()) {
 +                nextRight = mRightCursor.next();
 +            } else {
 +                close();
 +                return false;
 +            }
 -        while (true) {
 -            int result = mOrder.compare(nextLeft, nextRight);
 -            if (result < 0) {
 -                if (mLeftCursor.hasNext()) {
 -                    nextLeft = mLeftCursor.next();
 +            while (true) {
 +                int result = mOrder.compare(nextLeft, nextRight);
 +                if (result < 0) {
 +                    if (mLeftCursor.hasNext()) {
 +                        nextLeft = mLeftCursor.next();
 +                    } else {
 +                        close();
 +                        return false;
 +                    }
 +                } else if (result > 0) {
 +                    if (mRightCursor.hasNext()) {
 +                        nextRight = mRightCursor.next();
 +                    } else {
 +                        close();
 +                        return false;
 +                    }
                  } else {
 -                    close();
 -                    return false;
 +                    mNext = nextLeft;
 +                    return true;
                  }
 -            } else if (result > 0) {
 -                if (mRightCursor.hasNext()) {
 -                    nextRight = mRightCursor.next();
 -                } else {
 -                    close();
 -                    return false;
 -                }
 -            } else {
 -                mNext = nextLeft;
 -                return true;
              }
 +        } catch (FetchException e) {
 +            try {
 +                close();
 +            } catch (Exception e2) {
 +                // Don't care.
 +            }
 +            throw e;
          }
      }
      public S next() throws FetchException {
 -        if (hasNext()) {
 -            S next = mNext;
 -            mNext = null;
 -            return next;
 +        try {
 +            if (hasNext()) {
 +                S next = mNext;
 +                mNext = null;
 +                return next;
 +            }
 +        } catch (FetchException e) {
 +            try {
 +                close();
 +            } catch (Exception e2) {
 +                // Don't care.
 +            }
 +            throw e;
          }
          throw new NoSuchElementException();
      }
 diff --git a/src/main/java/com/amazon/carbonado/cursor/MultiTransformedCursor.java b/src/main/java/com/amazon/carbonado/cursor/MultiTransformedCursor.java index 7606eab..4b4a25a 100644 --- a/src/main/java/com/amazon/carbonado/cursor/MultiTransformedCursor.java +++ b/src/main/java/com/amazon/carbonado/cursor/MultiTransformedCursor.java @@ -64,34 +64,52 @@ public abstract class MultiTransformedCursor<S, T> extends AbstractCursor<T> {      }
      public boolean hasNext() throws FetchException {
 -        if (mNextCursor != null) {
 -            if (mNextCursor.hasNext()) {
 -                return true;
 -            }
 -            mNextCursor.close();
 -            mNextCursor = null;
 -        }
          try {
 -            int count = 0;
 -            while (mCursor.hasNext()) {
 -                Cursor<T> nextCursor = transform(mCursor.next());
 -                if (nextCursor != null) {
 -                    if (nextCursor.hasNext()) {
 -                        mNextCursor = nextCursor;
 -                        return true;
 +            if (mNextCursor != null) {
 +                if (mNextCursor.hasNext()) {
 +                    return true;
 +                }
 +                mNextCursor.close();
 +                mNextCursor = null;
 +            }
 +            try {
 +                int count = 0;
 +                while (mCursor.hasNext()) {
 +                    Cursor<T> nextCursor = transform(mCursor.next());
 +                    if (nextCursor != null) {
 +                        if (nextCursor.hasNext()) {
 +                            mNextCursor = nextCursor;
 +                            return true;
 +                        }
 +                        nextCursor.close();
                      }
 -                    nextCursor.close();
 +                    interruptCheck(++count);
                  }
 -                interruptCheck(++count);
 +            } catch (NoSuchElementException e) {
 +            }
 +        } catch (FetchException e) {
 +            try {
 +                close();
 +            } catch (Exception e2) {
 +                // Don't care.
              }
 -        } catch (NoSuchElementException e) {
 +            throw e;
          }
          return false;
      }
      public T next() throws FetchException {
 -        if (hasNext()) {
 -            return mNextCursor.next();
 +        try {
 +            if (hasNext()) {
 +                return mNextCursor.next();
 +            }
 +        } catch (FetchException e) {
 +            try {
 +                close();
 +            } catch (Exception e2) {
 +                // Don't care.
 +            }
 +            throw e;
          }
          throw new NoSuchElementException();
      }
 @@ -104,17 +122,26 @@ public abstract class MultiTransformedCursor<S, T> extends AbstractCursor<T> {              return 0;
          }
 -        int count = 0;
 -        while (hasNext()) {
 -            int chunk = mNextCursor.skipNext(amount);
 -            count += chunk;
 -            if ((amount -= chunk) <= 0) {
 -                break;
 +        try {
 +            int count = 0;
 +            while (hasNext()) {
 +                int chunk = mNextCursor.skipNext(amount);
 +                count += chunk;
 +                if ((amount -= chunk) <= 0) {
 +                    break;
 +                }
 +                interruptCheck(count);
              }
 -            interruptCheck(count);
 -        }
 -        return count;
 +            return count;
 +        } catch (FetchException e) {
 +            try {
 +                close();
 +            } catch (Exception e2) {
 +                // Don't care.
 +            }
 +            throw e;
 +        }
      }
      private void interruptCheck(int count) throws FetchException {
 diff --git a/src/main/java/com/amazon/carbonado/cursor/SortedCursor.java b/src/main/java/com/amazon/carbonado/cursor/SortedCursor.java index 9cee89b..15a72d9 100644 --- a/src/main/java/com/amazon/carbonado/cursor/SortedCursor.java +++ b/src/main/java/com/amazon/carbonado/cursor/SortedCursor.java @@ -205,28 +205,46 @@ public class SortedCursor<S> extends AbstractCursor<S> {      }
      public boolean hasNext() throws FetchException {
 -        prepareNextChunk();
          try {
 -            if (mChunkIterator.hasNext()) {
 -                return true;
 +            prepareNextChunk();
 +            try {
 +                if (mChunkIterator.hasNext()) {
 +                    return true;
 +                }
 +            } catch (UndeclaredThrowableException e) {
 +                throw toFetchException(e);
              }
 -        } catch (UndeclaredThrowableException e) {
 -            throw toFetchException(e);
 +        } catch (FetchException e) {
 +            try {
 +                close();
 +            } catch (Exception e2) {
 +                // Don't care.
 +            }
 +            throw e;
          }
          close();
          return false;
      }
      public S next() throws FetchException {
 -        prepareNextChunk();
          try {
 -            return mChunkIterator.next();
 -        } catch (UndeclaredThrowableException e) {
 -            throw toFetchException(e);
 -        } catch (NoSuchElementException e) {
 +            prepareNextChunk();
 +            try {
 +                return mChunkIterator.next();
 +            } catch (UndeclaredThrowableException e) {
 +                throw toFetchException(e);
 +            } catch (NoSuchElementException e) {
 +                try {
 +                    close();
 +                } catch (FetchException e2) {
 +                    // Don't care.
 +                }
 +                throw e;
 +            }
 +        } catch (FetchException e) {
              try {
                  close();
 -            } catch (FetchException e2) {
 +            } catch (Exception e2) {
                  // Don't care.
              }
              throw e;
 @@ -241,13 +259,22 @@ public class SortedCursor<S> extends AbstractCursor<S> {              return 0;
          }
 -        int count = 0;
 -        while (--amount >= 0 && hasNext()) {
 -            next();
 -            count++;
 -        }
 +        try {
 +            int count = 0;
 +            while (--amount >= 0 && hasNext()) {
 +                next();
 +                count++;
 +            }
 -        return count;
 +            return count;
 +        } catch (FetchException e) {
 +            try {
 +                close();
 +            } catch (Exception e2) {
 +                // Don't care.
 +            }
 +            throw e;
 +        }
      }
      private void prepareNextChunk() throws FetchException {
 diff --git a/src/main/java/com/amazon/carbonado/cursor/SymmetricDifferenceCursor.java b/src/main/java/com/amazon/carbonado/cursor/SymmetricDifferenceCursor.java index 1f94e30..1cd381a 100644 --- a/src/main/java/com/amazon/carbonado/cursor/SymmetricDifferenceCursor.java +++ b/src/main/java/com/amazon/carbonado/cursor/SymmetricDifferenceCursor.java @@ -68,7 +68,16 @@ public class SymmetricDifferenceCursor<S> extends AbstractCursor<S> {      }
      public boolean hasNext() throws FetchException {
 -        return compareNext() != 0;
 +        try {
 +            return compareNext() != 0;
 +        } catch (FetchException e) {
 +            try {
 +                close();
 +            } catch (Exception e2) {
 +                // Don't care.
 +            }
 +            throw e;
 +        }
      }
      /**
 @@ -81,43 +90,61 @@ public class SymmetricDifferenceCursor<S> extends AbstractCursor<S> {              return mCompareResult;
          }
 -        while (true) {
 -            if (mNextLeft == null && mLeftCursor.hasNext()) {
 -                mNextLeft = mLeftCursor.next();
 -            }
 -            if (mNextRight == null && mRightCursor.hasNext()) {
 -                mNextRight = mRightCursor.next();
 -            }
 +        try {
 +            while (true) {
 +                if (mNextLeft == null && mLeftCursor.hasNext()) {
 +                    mNextLeft = mLeftCursor.next();
 +                }
 +                if (mNextRight == null && mRightCursor.hasNext()) {
 +                    mNextRight = mRightCursor.next();
 +                }
 -            if (mNextLeft == null) {
 -                return mNextRight != null ? 1 : 0;
 +                if (mNextLeft == null) {
 +                    return mNextRight != null ? 1 : 0;
 +                }
 +                if (mNextRight == null) {
 +                    return -1;
 +                }
 +
 +                if ((mCompareResult = mOrder.compare(mNextLeft, mNextRight)) == 0) {
 +                    mNextLeft = null;
 +                    mNextRight = null;
 +                } else {
 +                    return mCompareResult;
 +                }
              }
 -            if (mNextRight == null) {
 -                return -1;
 +        } catch (FetchException e) {
 +            try {
 +                close();
 +            } catch (Exception e2) {
 +                // Don't care.
              }
 +            throw e;
 +        }
 +    }
 -            if ((mCompareResult = mOrder.compare(mNextLeft, mNextRight)) == 0) {
 +    public S next() throws FetchException {
 +        try {
 +            S next;
 +            int result = compareNext();
 +            if (result < 0) {
 +                next = mNextLeft;
                  mNextLeft = null;
 +            } else if (result > 0) {
 +                next = mNextRight;
                  mNextRight = null;
              } else {
 -                return mCompareResult;
 +                throw new NoSuchElementException();
              }
 +            mCompareResult = 0;
 +            return next;
 +        } catch (FetchException e) {
 +            try {
 +                close();
 +            } catch (Exception e2) {
 +                // Don't care.
 +            }
 +            throw e;
          }
      }
 -
 -    public S next() throws FetchException {
 -        S next;
 -        int result = compareNext();
 -        if (result < 0) {
 -            next = mNextLeft;
 -            mNextLeft = null;
 -        } else if (result > 0) {
 -            next = mNextRight;
 -            mNextRight = null;
 -        } else {
 -            throw new NoSuchElementException();
 -        }
 -        mCompareResult = 0;
 -        return next;
 -    }
  }
 diff --git a/src/main/java/com/amazon/carbonado/cursor/ThrottledCursor.java b/src/main/java/com/amazon/carbonado/cursor/ThrottledCursor.java index 2f4c5a0..d108d95 100644 --- a/src/main/java/com/amazon/carbonado/cursor/ThrottledCursor.java +++ b/src/main/java/com/amazon/carbonado/cursor/ThrottledCursor.java @@ -61,12 +61,30 @@ public class ThrottledCursor<S> extends AbstractCursor<S> {      }
      public boolean hasNext() throws FetchException {
 -        return mCursor.hasNext();
 +        try {
 +            return mCursor.hasNext();
 +        } catch (FetchException e) {
 +            try {
 +                close();
 +            } catch (Exception e2) {
 +                // Don't care.
 +            }
 +            throw e;
 +        }
      }
      public S next() throws FetchException {
 -        throttle();
 -        return mCursor.next();
 +        try {
 +            throttle();
 +            return mCursor.next();
 +        } catch (FetchException e) {
 +            try {
 +                close();
 +            } catch (Exception e2) {
 +                // Don't care.
 +            }
 +            throw e;
 +        }
      }
      public int skipNext(int amount) throws FetchException {
 @@ -77,16 +95,25 @@ public class ThrottledCursor<S> extends AbstractCursor<S> {              return 0;
          }
 -        int count = 0;
 -        while (--amount >= 0) {
 -            throttle();
 -            if (skipNext(1) <= 0) {
 -                break;
 +        try {
 +            int count = 0;
 +            while (--amount >= 0) {
 +                throttle();
 +                if (skipNext(1) <= 0) {
 +                    break;
 +                }
 +                count++;
              }
 -            count++;
 -        }
 -        return count;
 +            return count;
 +        } catch (FetchException e) {
 +            try {
 +                close();
 +            } catch (Exception e2) {
 +                // Don't care.
 +            }
 +            throw e;
 +        }
      }
      private void throttle() throws FetchInterruptedException {
 diff --git a/src/main/java/com/amazon/carbonado/cursor/TransformedCursor.java b/src/main/java/com/amazon/carbonado/cursor/TransformedCursor.java index 70161cc..46dedd1 100644 --- a/src/main/java/com/amazon/carbonado/cursor/TransformedCursor.java +++ b/src/main/java/com/amazon/carbonado/cursor/TransformedCursor.java @@ -74,15 +74,31 @@ public abstract class TransformedCursor<S, T> extends AbstractCursor<T> {                  interruptCheck(++count);
              }
          } catch (NoSuchElementException e) {
 +        } catch (FetchException e) {
 +            try {
 +                close();
 +            } catch (Exception e2) {
 +                // Don't care.
 +            }
 +            throw e;
          }
          return false;
      }
      public T next() throws FetchException {
 -        if (hasNext()) {
 -            T next = mNext;
 -            mNext = null;
 -            return next;
 +        try {
 +            if (hasNext()) {
 +                T next = mNext;
 +                mNext = null;
 +                return next;
 +            }
 +        } catch (FetchException e) {
 +            try {
 +                close();
 +            } catch (Exception e2) {
 +                // Don't care.
 +            }
 +            throw e;
          }
          throw new NoSuchElementException();
      }
 @@ -95,13 +111,22 @@ public abstract class TransformedCursor<S, T> extends AbstractCursor<T> {              return 0;
          }
 -        int count = 0;
 -        while (--amount >= 0 && hasNext()) {
 -            interruptCheck(++count);
 -            mNext = null;
 -        }
 +        try {
 +            int count = 0;
 +            while (--amount >= 0 && hasNext()) {
 +                interruptCheck(++count);
 +                mNext = null;
 +            }
 -        return count;
 +            return count;
 +        } catch (FetchException e) {
 +            try {
 +                close();
 +            } catch (Exception e2) {
 +                // Don't care.
 +            }
 +            throw e;
 +        }
      }
      private void interruptCheck(int count) throws FetchException {
 diff --git a/src/main/java/com/amazon/carbonado/cursor/UnionCursor.java b/src/main/java/com/amazon/carbonado/cursor/UnionCursor.java index 7e4fde3..c2bdd19 100644 --- a/src/main/java/com/amazon/carbonado/cursor/UnionCursor.java +++ b/src/main/java/com/amazon/carbonado/cursor/UnionCursor.java @@ -67,39 +67,57 @@ public class UnionCursor<S> extends AbstractCursor<S> {      }
      public boolean hasNext() throws FetchException {
 -        if (mNextLeft == null && mLeftCursor.hasNext()) {
 -            mNextLeft = mLeftCursor.next();
 -        }
 -        if (mNextRight == null && mRightCursor.hasNext()) {
 -            mNextRight = mRightCursor.next();
 +        try {
 +            if (mNextLeft == null && mLeftCursor.hasNext()) {
 +                mNextLeft = mLeftCursor.next();
 +            }
 +            if (mNextRight == null && mRightCursor.hasNext()) {
 +                mNextRight = mRightCursor.next();
 +            }
 +        } catch (FetchException e) {
 +            try {
 +                close();
 +            } catch (Exception e2) {
 +                // Don't care.
 +            }
 +            throw e;
          }
          return mNextLeft != null || mNextRight != null;
      }
      public S next() throws FetchException {
 -        if (hasNext()) {
 -            S next;
 -            if (mNextLeft == null) {
 -                next = mNextRight;
 -                mNextRight = null;
 -            } else if (mNextRight == null) {
 -                next = mNextLeft;
 -                mNextLeft = null;
 -            } else {
 -                int result = mOrder.compare(mNextLeft, mNextRight);
 -                if (result < 0) {
 -                    next = mNextLeft;
 -                    mNextLeft = null;
 -                } else if (result > 0) {
 +        try {
 +            if (hasNext()) {
 +                S next;
 +                if (mNextLeft == null) {
                      next = mNextRight;
                      mNextRight = null;
 -                } else {
 +                } else if (mNextRight == null) {
                      next = mNextLeft;
                      mNextLeft = null;
 -                    mNextRight = null;
 +                } else {
 +                    int result = mOrder.compare(mNextLeft, mNextRight);
 +                    if (result < 0) {
 +                        next = mNextLeft;
 +                        mNextLeft = null;
 +                    } else if (result > 0) {
 +                        next = mNextRight;
 +                        mNextRight = null;
 +                    } else {
 +                        next = mNextLeft;
 +                        mNextLeft = null;
 +                        mNextRight = null;
 +                    }
                  }
 +                return next;
 +            }
 +        } catch (FetchException e) {
 +            try {
 +                close();
 +            } catch (Exception e2) {
 +                // Don't care.
              }
 -            return next;
 +            throw e;
          }
          throw new NoSuchElementException();
      }
 diff --git a/src/main/java/com/amazon/carbonado/repo/indexed/IndexedCursor.java b/src/main/java/com/amazon/carbonado/repo/indexed/IndexedCursor.java index 892af49..35fc492 100644 --- a/src/main/java/com/amazon/carbonado/repo/indexed/IndexedCursor.java +++ b/src/main/java/com/amazon/carbonado/repo/indexed/IndexedCursor.java @@ -63,117 +63,144 @@ class IndexedCursor<S extends Storable> extends AbstractCursor<S> {          if (mNext != null) {
              return true;
          }
 -        while (mCursor.hasNext()) {
 -            final Storable indexEntry = mCursor.next();
 -
 -            S master = mStorage.mMasterStorage.prepare();
 -            mGenerator.copyToMasterPrimaryKey(indexEntry, master);
 -
 -            if (!master.tryLoad()) {
 -                LogFactory.getLog(getClass()).warn
 -                    ("Master is missing for index entry: " + indexEntry);
 -            } else {
 -                if (mGenerator.isConsistent(indexEntry, master)) {
 -                    mNext = master;
 -                    return true;
 -                }
 -
 -                // This index entry is stale. Repair is needed.
 -
 -                // Insert a correct index entry, just to be sure.
 -                try {
 -                    final IndexedRepository repo = mStorage.mRepository;
 -                    final Storage<?> indexEntryStorage =
 -                        repo.getIndexEntryStorageFor(mGenerator.getIndexEntryClass());
 -                    Storable newIndexEntry = indexEntryStorage.prepare();
 -                    mGenerator.copyFromMaster(newIndexEntry, master);
 -
 -                    if (newIndexEntry.tryLoad()) {
 -                        // Good, the correct index entry exists. We'll see
 -                        // the master record eventually, so skip.
 -                    } else {
 -                        // We have no choice but to return the master, at
 -                        // the risk of seeing it multiple times. This is
 -                        // better than seeing it never.
 -                        LogFactory.getLog(getClass()).warn
 -                            ("Inconsistent index entry: " + indexEntry);
 +        try {
 +            while (mCursor.hasNext()) {
 +                final Storable indexEntry = mCursor.next();
 +
 +                S master = mStorage.mMasterStorage.prepare();
 +                mGenerator.copyToMasterPrimaryKey(indexEntry, master);
 +
 +                if (!master.tryLoad()) {
 +                    LogFactory.getLog(getClass()).warn
 +                        ("Master is missing for index entry: " + indexEntry);
 +                } else {
 +                    if (mGenerator.isConsistent(indexEntry, master)) {
                          mNext = master;
 +                        return true;
                      }
 -                    // Repair the stale index entry.
 -                    RepairExecutor.execute(new Runnable() {
 -                        public void run() {
 -                            Transaction txn = repo.enterTransaction();
 -                            try {
 -                                // Reload master and verify inconsistency.
 -                                S master = mStorage.mMasterStorage.prepare();
 -                                mGenerator.copyToMasterPrimaryKey(indexEntry, master);
 -
 -                                if (master.tryLoad()) {
 -                                    Storable newIndexEntry = indexEntryStorage.prepare();
 -                                    mGenerator.copyFromMaster(newIndexEntry, master);
 -
 -                                    newIndexEntry.tryInsert();
 +                    // This index entry is stale. Repair is needed.
 +
 +                    // Insert a correct index entry, just to be sure.
 +                    try {
 +                        final IndexedRepository repo = mStorage.mRepository;
 +                        final Storage<?> indexEntryStorage =
 +                            repo.getIndexEntryStorageFor(mGenerator.getIndexEntryClass());
 +                        Storable newIndexEntry = indexEntryStorage.prepare();
 +                        mGenerator.copyFromMaster(newIndexEntry, master);
 +
 +                        if (newIndexEntry.tryLoad()) {
 +                            // Good, the correct index entry exists. We'll see
 +                            // the master record eventually, so skip.
 +                        } else {
 +                            // We have no choice but to return the master, at
 +                            // the risk of seeing it multiple times. This is
 +                            // better than seeing it never.
 +                            LogFactory.getLog(getClass()).warn
 +                                ("Inconsistent index entry: " + indexEntry);
 +                            mNext = master;
 +                        }
 -                                    indexEntry.tryDelete();
 -                                    txn.commit();
 -                                }
 -                            } catch (FetchException fe) {
 -                                LogFactory.getLog(IndexedCursor.class).warn
 -                                    ("Unable to check if repair required for " +
 -                                     "inconsistent index entry " +
 -                                     indexEntry, fe);
 -                            } catch (PersistException pe) {
 -                                LogFactory.getLog(IndexedCursor.class).error
 -                                    ("Unable to repair inconsistent index entry " +
 -                                     indexEntry, pe);
 -                            } finally {
 +                        // Repair the stale index entry.
 +                        RepairExecutor.execute(new Runnable() {
 +                            public void run() {
 +                                Transaction txn = repo.enterTransaction();
                                  try {
 -                                    txn.exit();
 +                                    // Reload master and verify inconsistency.
 +                                    S master = mStorage.mMasterStorage.prepare();
 +                                    mGenerator.copyToMasterPrimaryKey(indexEntry, master);
 +
 +                                    if (master.tryLoad()) {
 +                                        Storable newIndexEntry = indexEntryStorage.prepare();
 +                                        mGenerator.copyFromMaster(newIndexEntry, master);
 +
 +                                        newIndexEntry.tryInsert();
 +
 +                                        indexEntry.tryDelete();
 +                                        txn.commit();
 +                                    }
 +                                } catch (FetchException fe) {
 +                                    LogFactory.getLog(IndexedCursor.class).warn
 +                                        ("Unable to check if repair required for " +
 +                                         "inconsistent index entry " +
 +                                         indexEntry, fe);
                                  } catch (PersistException pe) {
                                      LogFactory.getLog(IndexedCursor.class).error
                                          ("Unable to repair inconsistent index entry " +
                                           indexEntry, pe);
 +                                } finally {
 +                                    try {
 +                                        txn.exit();
 +                                    } catch (PersistException pe) {
 +                                        LogFactory.getLog(IndexedCursor.class).error
 +                                            ("Unable to repair inconsistent index entry " +
 +                                             indexEntry, pe);
 +                                    }
                                  }
                              }
 -                        }
 -                    });
 -                } catch (RepositoryException re) {
 -                    LogFactory.getLog(getClass()).error
 -                        ("Unable to inspect inconsistent index entry " +
 -                         indexEntry, re);
 -                }
 +                        });
 +                    } catch (RepositoryException re) {
 +                        LogFactory.getLog(getClass()).error
 +                            ("Unable to inspect inconsistent index entry " +
 +                             indexEntry, re);
 +                    }
 -                if (mNext != null) {
 -                    return true;
 +                    if (mNext != null) {
 +                        return true;
 +                    }
                  }
              }
 +        } catch (FetchException e) {
 +            try {
 +                close();
 +            } catch (Exception e2) {
 +                // Don't care.
 +            }
 +            throw e;
          }
          return false;
      }
      public S next() throws FetchException {
 -        if (hasNext()) {
 -            S next = mNext;
 -            mNext = null;
 -            return next;
 +        try {
 +            if (hasNext()) {
 +                S next = mNext;
 +                mNext = null;
 +                return next;
 +            }
 +        } catch (FetchException e) {
 +            try {
 +                close();
 +            } catch (Exception e2) {
 +                // Don't care.
 +            }
 +            throw e;
          }
          throw new NoSuchElementException();
      }
      public int skipNext(int amount) throws FetchException {
 -        if (mNext == null) {
 -            return mCursor.skipNext(amount);
 -        }
 +        try {
 +            if (mNext == null) {
 +                return mCursor.skipNext(amount);
 +            }
 -        if (amount <= 0) {
 -            if (amount < 0) {
 -                throw new IllegalArgumentException("Cannot skip negative amount: " + amount);
 +            if (amount <= 0) {
 +                if (amount < 0) {
 +                    throw new IllegalArgumentException("Cannot skip negative amount: " + amount);
 +                }
 +                return 0;
              }
 -            return 0;
 -        }
 -        mNext = null;
 -        return 1 + mCursor.skipNext(amount - 1);
 +            mNext = null;
 +            return 1 + mCursor.skipNext(amount - 1);
 +        } catch (FetchException e) {
 +            try {
 +                close();
 +            } catch (Exception e2) {
 +                // Don't care.
 +            }
 +            throw e;
 +        }
      }
  }
 diff --git a/src/main/java/com/amazon/carbonado/repo/jdbc/JDBCCursor.java b/src/main/java/com/amazon/carbonado/repo/jdbc/JDBCCursor.java index f419c0d..fa80fde 100644 --- a/src/main/java/com/amazon/carbonado/repo/jdbc/JDBCCursor.java +++ b/src/main/java/com/amazon/carbonado/repo/jdbc/JDBCCursor.java @@ -35,10 +35,10 @@ import com.amazon.carbonado.cursor.AbstractCursor;   */
  class JDBCCursor<S extends Storable> extends AbstractCursor<S> {
      private final JDBCStorage<S> mStorage;
 -    private Connection mConnection;
 -    private PreparedStatement mStatement;
 -    private ResultSet mResultSet;
 +    private final Connection mConnection;
 +    private final PreparedStatement mStatement;
 +    private ResultSet mResultSet;
      private boolean mHasNext;
      JDBCCursor(JDBCStorage<S> storage,
 @@ -49,7 +49,17 @@ class JDBCCursor<S extends Storable> extends AbstractCursor<S> {          mStorage = storage;
          mConnection = con;
          mStatement = statement;
 -        mResultSet = statement.executeQuery();
 +        try {
 +            mResultSet = statement.executeQuery();
 +        } catch (SQLException e) {
 +            try {
 +                statement.close();
 +                storage.mRepository.yieldConnection(con);
 +            } catch (Exception e2) {
 +                // Don't care.
 +            }
 +            throw e;
 +        }
      }
      public void close() throws FetchException {
 @@ -62,6 +72,7 @@ class JDBCCursor<S extends Storable> extends AbstractCursor<S> {                  throw mStorage.getJDBCRepository().toFetchException(e);
              } finally {
                  mResultSet = null;
 +                mHasNext = false;
              }
          }
      }
 @@ -75,6 +86,11 @@ class JDBCCursor<S extends Storable> extends AbstractCursor<S> {              try {
                  mHasNext = rs.next();
              } catch (SQLException e) {
 +                try {
 +                    close();
 +                } catch (FetchException e2) {
 +                    // Don't care.
 +                }
                  throw mStorage.getJDBCRepository().toFetchException(e);
              }
              if (!mHasNext) {
 @@ -93,6 +109,11 @@ class JDBCCursor<S extends Storable> extends AbstractCursor<S> {              mHasNext = false;
              return obj;
          } catch (SQLException e) {
 +            try {
 +                close();
 +            } catch (FetchException e2) {
 +                // Don't care.
 +            }
              throw mStorage.getJDBCRepository().toFetchException(e);
          }
      }
 | 
