From 17f39437744585c656cf37d4515a0b115cc73cc8 Mon Sep 17 00:00:00 2001 From: "Brian S. O'Neill" Date: Mon, 29 Jan 2007 23:24:57 +0000 Subject: Fixed multiple cursor implementations to be extra paranoid and close the cursor when an exception is thrown. --- .../amazon/carbonado/cursor/AbstractCursor.java | 85 ++++++--- .../amazon/carbonado/cursor/DifferenceCursor.java | 72 +++++--- .../amazon/carbonado/cursor/FilteredCursor.java | 45 +++-- .../com/amazon/carbonado/cursor/GroupedCursor.java | 45 +++-- .../carbonado/cursor/IntersectionCursor.java | 84 +++++---- .../carbonado/cursor/MultiTransformedCursor.java | 83 ++++++--- .../com/amazon/carbonado/cursor/SortedCursor.java | 61 +++++-- .../cursor/SymmetricDifferenceCursor.java | 87 +++++---- .../amazon/carbonado/cursor/ThrottledCursor.java | 49 +++-- .../amazon/carbonado/cursor/TransformedCursor.java | 45 +++-- .../com/amazon/carbonado/cursor/UnionCursor.java | 62 ++++--- .../carbonado/repo/indexed/IndexedCursor.java | 197 ++++++++++++--------- .../com/amazon/carbonado/repo/jdbc/JDBCCursor.java | 29 ++- 13 files changed, 637 insertions(+), 307 deletions(-) (limited to 'src/main') diff --git a/src/main/java/com/amazon/carbonado/cursor/AbstractCursor.java b/src/main/java/com/amazon/carbonado/cursor/AbstractCursor.java index fb34617..5fb0fa8 100644 --- a/src/main/java/com/amazon/carbonado/cursor/AbstractCursor.java +++ b/src/main/java/com/amazon/carbonado/cursor/AbstractCursor.java @@ -37,31 +37,67 @@ public abstract class AbstractCursor implements Cursor { } public int copyInto(Collection c) throws FetchException { - int originalSize = c.size(); - while (hasNext()) { - c.add(next()); + try { + int originalSize = c.size(); + while (hasNext()) { + c.add(next()); + } + return c.size() - originalSize; + } catch (FetchException e) { + try { + close(); + } catch (Exception e2) { + // Don't care. + } + throw e; } - return c.size() - originalSize; } public int copyInto(Collection c, int limit) throws FetchException { - int originalSize = c.size(); - while (--limit >= 0 && hasNext()) { - c.add(next()); + try { + int originalSize = c.size(); + while (--limit >= 0 && hasNext()) { + c.add(next()); + } + return c.size() - originalSize; + } catch (FetchException e) { + try { + close(); + } catch (Exception e2) { + // Don't care. + } + throw e; } - return c.size() - originalSize; } public List toList() throws FetchException { - List list = new ArrayList(); - copyInto(list); - return list; + try { + List list = new ArrayList(); + copyInto(list); + return list; + } catch (FetchException e) { + try { + close(); + } catch (Exception e2) { + // Don't care. + } + throw e; + } } public List toList(int limit) throws FetchException { - List list = new ArrayList(); - copyInto(list, limit); - return list; + try { + List list = new ArrayList(); + copyInto(list, limit); + return list; + } catch (FetchException e) { + try { + close(); + } catch (Exception e2) { + // Don't care. + } + throw e; + } } public int skipNext(int amount) throws FetchException { @@ -72,12 +108,21 @@ public abstract class AbstractCursor implements Cursor { return 0; } - int count = 0; - while (--amount >= 0 && hasNext()) { - next(); - count++; - } + try { + int count = 0; + while (--amount >= 0 && hasNext()) { + next(); + count++; + } - return count; + return count; + } catch (FetchException e) { + try { + close(); + } catch (Exception e2) { + // Don't care. + } + throw e; + } } } diff --git a/src/main/java/com/amazon/carbonado/cursor/DifferenceCursor.java b/src/main/java/com/amazon/carbonado/cursor/DifferenceCursor.java index 8b330ea..a04aec0 100644 --- a/src/main/java/com/amazon/carbonado/cursor/DifferenceCursor.java +++ b/src/main/java/com/amazon/carbonado/cursor/DifferenceCursor.java @@ -73,47 +73,65 @@ public class DifferenceCursor extends AbstractCursor { S nextLeft; - while (true) { - if (mLeftCursor.hasNext()) { - nextLeft = mLeftCursor.next(); - } else { - close(); - return false; - } - if (mNextRight == null) { - if (mRightCursor.hasNext()) { - mNextRight = mRightCursor.next(); + try { + while (true) { + if (mLeftCursor.hasNext()) { + nextLeft = mLeftCursor.next(); } else { - mNext = nextLeft; - return true; + close(); + return false; } - } - - while (true) { - int result = mOrder.compare(nextLeft, mNextRight); - if (result < 0) { - mNext = nextLeft; - return true; - } else if (result > 0) { + if (mNextRight == null) { if (mRightCursor.hasNext()) { mNextRight = mRightCursor.next(); } else { - mNextRight = null; mNext = nextLeft; return true; } - } else { - break; + } + + while (true) { + int result = mOrder.compare(nextLeft, mNextRight); + if (result < 0) { + mNext = nextLeft; + return true; + } else if (result > 0) { + if (mRightCursor.hasNext()) { + mNextRight = mRightCursor.next(); + } else { + mNextRight = null; + mNext = nextLeft; + return true; + } + } else { + break; + } } } + } catch (FetchException e) { + try { + close(); + } catch (Exception e2) { + // Don't care. + } + throw e; } } public S next() throws FetchException { - if (hasNext()) { - S next = mNext; - mNext = null; - return next; + try { + if (hasNext()) { + S next = mNext; + mNext = null; + return next; + } + } catch (FetchException e) { + try { + close(); + } catch (Exception e2) { + // Don't care. + } + throw e; } throw new NoSuchElementException(); } diff --git a/src/main/java/com/amazon/carbonado/cursor/FilteredCursor.java b/src/main/java/com/amazon/carbonado/cursor/FilteredCursor.java index f426f36..c25f8db 100644 --- a/src/main/java/com/amazon/carbonado/cursor/FilteredCursor.java +++ b/src/main/java/com/amazon/carbonado/cursor/FilteredCursor.java @@ -101,15 +101,31 @@ public abstract class FilteredCursor extends AbstractCursor { interruptCheck(++count); } } catch (NoSuchElementException e) { + } catch (FetchException e) { + try { + close(); + } catch (Exception e2) { + // Don't care. + } + throw e; } return false; } public S next() throws FetchException { - if (hasNext()) { - S next = mNext; - mNext = null; - return next; + try { + if (hasNext()) { + S next = mNext; + mNext = null; + return next; + } + } catch (FetchException e) { + try { + close(); + } catch (Exception e2) { + // Don't care. + } + throw e; } throw new NoSuchElementException(); } @@ -122,13 +138,22 @@ public abstract class FilteredCursor extends AbstractCursor { return 0; } - int count = 0; - while (--amount >= 0 && hasNext()) { - interruptCheck(++count); - mNext = null; - } + try { + int count = 0; + while (--amount >= 0 && hasNext()) { + interruptCheck(++count); + mNext = null; + } - return count; + return count; + } catch (FetchException e) { + try { + close(); + } catch (Exception e2) { + // Don't care. + } + throw e; + } } private void interruptCheck(int count) throws FetchException { diff --git a/src/main/java/com/amazon/carbonado/cursor/GroupedCursor.java b/src/main/java/com/amazon/carbonado/cursor/GroupedCursor.java index 202f73a..daf9419 100644 --- a/src/main/java/com/amazon/carbonado/cursor/GroupedCursor.java +++ b/src/main/java/com/amazon/carbonado/cursor/GroupedCursor.java @@ -157,16 +157,32 @@ public abstract class GroupedCursor extends AbstractCursor { } } } catch (NoSuchElementException e) { + } catch (FetchException e) { + try { + close(); + } catch (Exception e2) { + // Don't care. + } + throw e; } return false; } public G next() throws FetchException { - if (hasNext()) { - G next = mNextAggregate; - mNextAggregate = null; - return next; + try { + if (hasNext()) { + G next = mNextAggregate; + mNextAggregate = null; + return next; + } + } catch (FetchException e) { + try { + close(); + } catch (Exception e2) { + // Don't care. + } + throw e; } throw new NoSuchElementException(); } @@ -179,13 +195,22 @@ public abstract class GroupedCursor extends AbstractCursor { return 0; } - int count = 0; - while (--amount >= 0 && hasNext()) { - interruptCheck(++count); - mNextAggregate = null; - } + try { + int count = 0; + while (--amount >= 0 && hasNext()) { + interruptCheck(++count); + mNextAggregate = null; + } - return count; + return count; + } catch (FetchException e) { + try { + close(); + } catch (Exception e2) { + // Don't care. + } + throw e; + } } private void interruptCheck(int count) throws FetchException { diff --git a/src/main/java/com/amazon/carbonado/cursor/IntersectionCursor.java b/src/main/java/com/amazon/carbonado/cursor/IntersectionCursor.java index 58aba0d..6eb3179 100644 --- a/src/main/java/com/amazon/carbonado/cursor/IntersectionCursor.java +++ b/src/main/java/com/amazon/carbonado/cursor/IntersectionCursor.java @@ -71,47 +71,65 @@ public class IntersectionCursor extends AbstractCursor { S nextLeft, nextRight; - if (mLeftCursor.hasNext()) { - nextLeft = mLeftCursor.next(); - } else { - close(); - return false; - } - if (mRightCursor.hasNext()) { - nextRight = mRightCursor.next(); - } else { - close(); - return false; - } + try { + if (mLeftCursor.hasNext()) { + nextLeft = mLeftCursor.next(); + } else { + close(); + return false; + } + if (mRightCursor.hasNext()) { + nextRight = mRightCursor.next(); + } else { + close(); + return false; + } - while (true) { - int result = mOrder.compare(nextLeft, nextRight); - if (result < 0) { - if (mLeftCursor.hasNext()) { - nextLeft = mLeftCursor.next(); + while (true) { + int result = mOrder.compare(nextLeft, nextRight); + if (result < 0) { + if (mLeftCursor.hasNext()) { + nextLeft = mLeftCursor.next(); + } else { + close(); + return false; + } + } else if (result > 0) { + if (mRightCursor.hasNext()) { + nextRight = mRightCursor.next(); + } else { + close(); + return false; + } } else { - close(); - return false; + mNext = nextLeft; + return true; } - } else if (result > 0) { - if (mRightCursor.hasNext()) { - nextRight = mRightCursor.next(); - } else { - close(); - return false; - } - } else { - mNext = nextLeft; - return true; } + } catch (FetchException e) { + try { + close(); + } catch (Exception e2) { + // Don't care. + } + throw e; } } public S next() throws FetchException { - if (hasNext()) { - S next = mNext; - mNext = null; - return next; + try { + if (hasNext()) { + S next = mNext; + mNext = null; + return next; + } + } catch (FetchException e) { + try { + close(); + } catch (Exception e2) { + // Don't care. + } + throw e; } throw new NoSuchElementException(); } diff --git a/src/main/java/com/amazon/carbonado/cursor/MultiTransformedCursor.java b/src/main/java/com/amazon/carbonado/cursor/MultiTransformedCursor.java index 7606eab..4b4a25a 100644 --- a/src/main/java/com/amazon/carbonado/cursor/MultiTransformedCursor.java +++ b/src/main/java/com/amazon/carbonado/cursor/MultiTransformedCursor.java @@ -64,34 +64,52 @@ public abstract class MultiTransformedCursor extends AbstractCursor { } public boolean hasNext() throws FetchException { - if (mNextCursor != null) { - if (mNextCursor.hasNext()) { - return true; - } - mNextCursor.close(); - mNextCursor = null; - } try { - int count = 0; - while (mCursor.hasNext()) { - Cursor nextCursor = transform(mCursor.next()); - if (nextCursor != null) { - if (nextCursor.hasNext()) { - mNextCursor = nextCursor; - return true; + if (mNextCursor != null) { + if (mNextCursor.hasNext()) { + return true; + } + mNextCursor.close(); + mNextCursor = null; + } + try { + int count = 0; + while (mCursor.hasNext()) { + Cursor nextCursor = transform(mCursor.next()); + if (nextCursor != null) { + if (nextCursor.hasNext()) { + mNextCursor = nextCursor; + return true; + } + nextCursor.close(); } - nextCursor.close(); + interruptCheck(++count); } - interruptCheck(++count); + } catch (NoSuchElementException e) { + } + } catch (FetchException e) { + try { + close(); + } catch (Exception e2) { + // Don't care. } - } catch (NoSuchElementException e) { + throw e; } return false; } public T next() throws FetchException { - if (hasNext()) { - return mNextCursor.next(); + try { + if (hasNext()) { + return mNextCursor.next(); + } + } catch (FetchException e) { + try { + close(); + } catch (Exception e2) { + // Don't care. + } + throw e; } throw new NoSuchElementException(); } @@ -104,17 +122,26 @@ public abstract class MultiTransformedCursor extends AbstractCursor { return 0; } - int count = 0; - while (hasNext()) { - int chunk = mNextCursor.skipNext(amount); - count += chunk; - if ((amount -= chunk) <= 0) { - break; + try { + int count = 0; + while (hasNext()) { + int chunk = mNextCursor.skipNext(amount); + count += chunk; + if ((amount -= chunk) <= 0) { + break; + } + interruptCheck(count); } - interruptCheck(count); - } - return count; + return count; + } catch (FetchException e) { + try { + close(); + } catch (Exception e2) { + // Don't care. + } + throw e; + } } private void interruptCheck(int count) throws FetchException { diff --git a/src/main/java/com/amazon/carbonado/cursor/SortedCursor.java b/src/main/java/com/amazon/carbonado/cursor/SortedCursor.java index 9cee89b..15a72d9 100644 --- a/src/main/java/com/amazon/carbonado/cursor/SortedCursor.java +++ b/src/main/java/com/amazon/carbonado/cursor/SortedCursor.java @@ -205,28 +205,46 @@ public class SortedCursor extends AbstractCursor { } public boolean hasNext() throws FetchException { - prepareNextChunk(); try { - if (mChunkIterator.hasNext()) { - return true; + prepareNextChunk(); + try { + if (mChunkIterator.hasNext()) { + return true; + } + } catch (UndeclaredThrowableException e) { + throw toFetchException(e); } - } catch (UndeclaredThrowableException e) { - throw toFetchException(e); + } catch (FetchException e) { + try { + close(); + } catch (Exception e2) { + // Don't care. + } + throw e; } close(); return false; } public S next() throws FetchException { - prepareNextChunk(); try { - return mChunkIterator.next(); - } catch (UndeclaredThrowableException e) { - throw toFetchException(e); - } catch (NoSuchElementException e) { + prepareNextChunk(); + try { + return mChunkIterator.next(); + } catch (UndeclaredThrowableException e) { + throw toFetchException(e); + } catch (NoSuchElementException e) { + try { + close(); + } catch (FetchException e2) { + // Don't care. + } + throw e; + } + } catch (FetchException e) { try { close(); - } catch (FetchException e2) { + } catch (Exception e2) { // Don't care. } throw e; @@ -241,13 +259,22 @@ public class SortedCursor extends AbstractCursor { return 0; } - int count = 0; - while (--amount >= 0 && hasNext()) { - next(); - count++; - } + try { + int count = 0; + while (--amount >= 0 && hasNext()) { + next(); + count++; + } - return count; + return count; + } catch (FetchException e) { + try { + close(); + } catch (Exception e2) { + // Don't care. + } + throw e; + } } private void prepareNextChunk() throws FetchException { diff --git a/src/main/java/com/amazon/carbonado/cursor/SymmetricDifferenceCursor.java b/src/main/java/com/amazon/carbonado/cursor/SymmetricDifferenceCursor.java index 1f94e30..1cd381a 100644 --- a/src/main/java/com/amazon/carbonado/cursor/SymmetricDifferenceCursor.java +++ b/src/main/java/com/amazon/carbonado/cursor/SymmetricDifferenceCursor.java @@ -68,7 +68,16 @@ public class SymmetricDifferenceCursor extends AbstractCursor { } public boolean hasNext() throws FetchException { - return compareNext() != 0; + try { + return compareNext() != 0; + } catch (FetchException e) { + try { + close(); + } catch (Exception e2) { + // Don't care. + } + throw e; + } } /** @@ -81,43 +90,61 @@ public class SymmetricDifferenceCursor extends AbstractCursor { return mCompareResult; } - while (true) { - if (mNextLeft == null && mLeftCursor.hasNext()) { - mNextLeft = mLeftCursor.next(); - } - if (mNextRight == null && mRightCursor.hasNext()) { - mNextRight = mRightCursor.next(); - } + try { + while (true) { + if (mNextLeft == null && mLeftCursor.hasNext()) { + mNextLeft = mLeftCursor.next(); + } + if (mNextRight == null && mRightCursor.hasNext()) { + mNextRight = mRightCursor.next(); + } - if (mNextLeft == null) { - return mNextRight != null ? 1 : 0; + if (mNextLeft == null) { + return mNextRight != null ? 1 : 0; + } + if (mNextRight == null) { + return -1; + } + + if ((mCompareResult = mOrder.compare(mNextLeft, mNextRight)) == 0) { + mNextLeft = null; + mNextRight = null; + } else { + return mCompareResult; + } } - if (mNextRight == null) { - return -1; + } catch (FetchException e) { + try { + close(); + } catch (Exception e2) { + // Don't care. } + throw e; + } + } - if ((mCompareResult = mOrder.compare(mNextLeft, mNextRight)) == 0) { + public S next() throws FetchException { + try { + S next; + int result = compareNext(); + if (result < 0) { + next = mNextLeft; mNextLeft = null; + } else if (result > 0) { + next = mNextRight; mNextRight = null; } else { - return mCompareResult; + throw new NoSuchElementException(); } + mCompareResult = 0; + return next; + } catch (FetchException e) { + try { + close(); + } catch (Exception e2) { + // Don't care. + } + throw e; } } - - public S next() throws FetchException { - S next; - int result = compareNext(); - if (result < 0) { - next = mNextLeft; - mNextLeft = null; - } else if (result > 0) { - next = mNextRight; - mNextRight = null; - } else { - throw new NoSuchElementException(); - } - mCompareResult = 0; - return next; - } } diff --git a/src/main/java/com/amazon/carbonado/cursor/ThrottledCursor.java b/src/main/java/com/amazon/carbonado/cursor/ThrottledCursor.java index 2f4c5a0..d108d95 100644 --- a/src/main/java/com/amazon/carbonado/cursor/ThrottledCursor.java +++ b/src/main/java/com/amazon/carbonado/cursor/ThrottledCursor.java @@ -61,12 +61,30 @@ public class ThrottledCursor extends AbstractCursor { } public boolean hasNext() throws FetchException { - return mCursor.hasNext(); + try { + return mCursor.hasNext(); + } catch (FetchException e) { + try { + close(); + } catch (Exception e2) { + // Don't care. + } + throw e; + } } public S next() throws FetchException { - throttle(); - return mCursor.next(); + try { + throttle(); + return mCursor.next(); + } catch (FetchException e) { + try { + close(); + } catch (Exception e2) { + // Don't care. + } + throw e; + } } public int skipNext(int amount) throws FetchException { @@ -77,16 +95,25 @@ public class ThrottledCursor extends AbstractCursor { return 0; } - int count = 0; - while (--amount >= 0) { - throttle(); - if (skipNext(1) <= 0) { - break; + try { + int count = 0; + while (--amount >= 0) { + throttle(); + if (skipNext(1) <= 0) { + break; + } + count++; } - count++; - } - return count; + return count; + } catch (FetchException e) { + try { + close(); + } catch (Exception e2) { + // Don't care. + } + throw e; + } } private void throttle() throws FetchInterruptedException { diff --git a/src/main/java/com/amazon/carbonado/cursor/TransformedCursor.java b/src/main/java/com/amazon/carbonado/cursor/TransformedCursor.java index 70161cc..46dedd1 100644 --- a/src/main/java/com/amazon/carbonado/cursor/TransformedCursor.java +++ b/src/main/java/com/amazon/carbonado/cursor/TransformedCursor.java @@ -74,15 +74,31 @@ public abstract class TransformedCursor extends AbstractCursor { interruptCheck(++count); } } catch (NoSuchElementException e) { + } catch (FetchException e) { + try { + close(); + } catch (Exception e2) { + // Don't care. + } + throw e; } return false; } public T next() throws FetchException { - if (hasNext()) { - T next = mNext; - mNext = null; - return next; + try { + if (hasNext()) { + T next = mNext; + mNext = null; + return next; + } + } catch (FetchException e) { + try { + close(); + } catch (Exception e2) { + // Don't care. + } + throw e; } throw new NoSuchElementException(); } @@ -95,13 +111,22 @@ public abstract class TransformedCursor extends AbstractCursor { return 0; } - int count = 0; - while (--amount >= 0 && hasNext()) { - interruptCheck(++count); - mNext = null; - } + try { + int count = 0; + while (--amount >= 0 && hasNext()) { + interruptCheck(++count); + mNext = null; + } - return count; + return count; + } catch (FetchException e) { + try { + close(); + } catch (Exception e2) { + // Don't care. + } + throw e; + } } private void interruptCheck(int count) throws FetchException { diff --git a/src/main/java/com/amazon/carbonado/cursor/UnionCursor.java b/src/main/java/com/amazon/carbonado/cursor/UnionCursor.java index 7e4fde3..c2bdd19 100644 --- a/src/main/java/com/amazon/carbonado/cursor/UnionCursor.java +++ b/src/main/java/com/amazon/carbonado/cursor/UnionCursor.java @@ -67,39 +67,57 @@ public class UnionCursor extends AbstractCursor { } public boolean hasNext() throws FetchException { - if (mNextLeft == null && mLeftCursor.hasNext()) { - mNextLeft = mLeftCursor.next(); - } - if (mNextRight == null && mRightCursor.hasNext()) { - mNextRight = mRightCursor.next(); + try { + if (mNextLeft == null && mLeftCursor.hasNext()) { + mNextLeft = mLeftCursor.next(); + } + if (mNextRight == null && mRightCursor.hasNext()) { + mNextRight = mRightCursor.next(); + } + } catch (FetchException e) { + try { + close(); + } catch (Exception e2) { + // Don't care. + } + throw e; } return mNextLeft != null || mNextRight != null; } public S next() throws FetchException { - if (hasNext()) { - S next; - if (mNextLeft == null) { - next = mNextRight; - mNextRight = null; - } else if (mNextRight == null) { - next = mNextLeft; - mNextLeft = null; - } else { - int result = mOrder.compare(mNextLeft, mNextRight); - if (result < 0) { - next = mNextLeft; - mNextLeft = null; - } else if (result > 0) { + try { + if (hasNext()) { + S next; + if (mNextLeft == null) { next = mNextRight; mNextRight = null; - } else { + } else if (mNextRight == null) { next = mNextLeft; mNextLeft = null; - mNextRight = null; + } else { + int result = mOrder.compare(mNextLeft, mNextRight); + if (result < 0) { + next = mNextLeft; + mNextLeft = null; + } else if (result > 0) { + next = mNextRight; + mNextRight = null; + } else { + next = mNextLeft; + mNextLeft = null; + mNextRight = null; + } } + return next; + } + } catch (FetchException e) { + try { + close(); + } catch (Exception e2) { + // Don't care. } - return next; + throw e; } throw new NoSuchElementException(); } diff --git a/src/main/java/com/amazon/carbonado/repo/indexed/IndexedCursor.java b/src/main/java/com/amazon/carbonado/repo/indexed/IndexedCursor.java index 892af49..35fc492 100644 --- a/src/main/java/com/amazon/carbonado/repo/indexed/IndexedCursor.java +++ b/src/main/java/com/amazon/carbonado/repo/indexed/IndexedCursor.java @@ -63,117 +63,144 @@ class IndexedCursor extends AbstractCursor { if (mNext != null) { return true; } - while (mCursor.hasNext()) { - final Storable indexEntry = mCursor.next(); - - S master = mStorage.mMasterStorage.prepare(); - mGenerator.copyToMasterPrimaryKey(indexEntry, master); - - if (!master.tryLoad()) { - LogFactory.getLog(getClass()).warn - ("Master is missing for index entry: " + indexEntry); - } else { - if (mGenerator.isConsistent(indexEntry, master)) { - mNext = master; - return true; - } - - // This index entry is stale. Repair is needed. - - // Insert a correct index entry, just to be sure. - try { - final IndexedRepository repo = mStorage.mRepository; - final Storage indexEntryStorage = - repo.getIndexEntryStorageFor(mGenerator.getIndexEntryClass()); - Storable newIndexEntry = indexEntryStorage.prepare(); - mGenerator.copyFromMaster(newIndexEntry, master); - - if (newIndexEntry.tryLoad()) { - // Good, the correct index entry exists. We'll see - // the master record eventually, so skip. - } else { - // We have no choice but to return the master, at - // the risk of seeing it multiple times. This is - // better than seeing it never. - LogFactory.getLog(getClass()).warn - ("Inconsistent index entry: " + indexEntry); + try { + while (mCursor.hasNext()) { + final Storable indexEntry = mCursor.next(); + + S master = mStorage.mMasterStorage.prepare(); + mGenerator.copyToMasterPrimaryKey(indexEntry, master); + + if (!master.tryLoad()) { + LogFactory.getLog(getClass()).warn + ("Master is missing for index entry: " + indexEntry); + } else { + if (mGenerator.isConsistent(indexEntry, master)) { mNext = master; + return true; } - // Repair the stale index entry. - RepairExecutor.execute(new Runnable() { - public void run() { - Transaction txn = repo.enterTransaction(); - try { - // Reload master and verify inconsistency. - S master = mStorage.mMasterStorage.prepare(); - mGenerator.copyToMasterPrimaryKey(indexEntry, master); - - if (master.tryLoad()) { - Storable newIndexEntry = indexEntryStorage.prepare(); - mGenerator.copyFromMaster(newIndexEntry, master); - - newIndexEntry.tryInsert(); + // This index entry is stale. Repair is needed. + + // Insert a correct index entry, just to be sure. + try { + final IndexedRepository repo = mStorage.mRepository; + final Storage indexEntryStorage = + repo.getIndexEntryStorageFor(mGenerator.getIndexEntryClass()); + Storable newIndexEntry = indexEntryStorage.prepare(); + mGenerator.copyFromMaster(newIndexEntry, master); + + if (newIndexEntry.tryLoad()) { + // Good, the correct index entry exists. We'll see + // the master record eventually, so skip. + } else { + // We have no choice but to return the master, at + // the risk of seeing it multiple times. This is + // better than seeing it never. + LogFactory.getLog(getClass()).warn + ("Inconsistent index entry: " + indexEntry); + mNext = master; + } - indexEntry.tryDelete(); - txn.commit(); - } - } catch (FetchException fe) { - LogFactory.getLog(IndexedCursor.class).warn - ("Unable to check if repair required for " + - "inconsistent index entry " + - indexEntry, fe); - } catch (PersistException pe) { - LogFactory.getLog(IndexedCursor.class).error - ("Unable to repair inconsistent index entry " + - indexEntry, pe); - } finally { + // Repair the stale index entry. + RepairExecutor.execute(new Runnable() { + public void run() { + Transaction txn = repo.enterTransaction(); try { - txn.exit(); + // Reload master and verify inconsistency. + S master = mStorage.mMasterStorage.prepare(); + mGenerator.copyToMasterPrimaryKey(indexEntry, master); + + if (master.tryLoad()) { + Storable newIndexEntry = indexEntryStorage.prepare(); + mGenerator.copyFromMaster(newIndexEntry, master); + + newIndexEntry.tryInsert(); + + indexEntry.tryDelete(); + txn.commit(); + } + } catch (FetchException fe) { + LogFactory.getLog(IndexedCursor.class).warn + ("Unable to check if repair required for " + + "inconsistent index entry " + + indexEntry, fe); } catch (PersistException pe) { LogFactory.getLog(IndexedCursor.class).error ("Unable to repair inconsistent index entry " + indexEntry, pe); + } finally { + try { + txn.exit(); + } catch (PersistException pe) { + LogFactory.getLog(IndexedCursor.class).error + ("Unable to repair inconsistent index entry " + + indexEntry, pe); + } } } - } - }); - } catch (RepositoryException re) { - LogFactory.getLog(getClass()).error - ("Unable to inspect inconsistent index entry " + - indexEntry, re); - } + }); + } catch (RepositoryException re) { + LogFactory.getLog(getClass()).error + ("Unable to inspect inconsistent index entry " + + indexEntry, re); + } - if (mNext != null) { - return true; + if (mNext != null) { + return true; + } } } + } catch (FetchException e) { + try { + close(); + } catch (Exception e2) { + // Don't care. + } + throw e; } return false; } public S next() throws FetchException { - if (hasNext()) { - S next = mNext; - mNext = null; - return next; + try { + if (hasNext()) { + S next = mNext; + mNext = null; + return next; + } + } catch (FetchException e) { + try { + close(); + } catch (Exception e2) { + // Don't care. + } + throw e; } throw new NoSuchElementException(); } public int skipNext(int amount) throws FetchException { - if (mNext == null) { - return mCursor.skipNext(amount); - } + try { + if (mNext == null) { + return mCursor.skipNext(amount); + } - if (amount <= 0) { - if (amount < 0) { - throw new IllegalArgumentException("Cannot skip negative amount: " + amount); + if (amount <= 0) { + if (amount < 0) { + throw new IllegalArgumentException("Cannot skip negative amount: " + amount); + } + return 0; } - return 0; - } - mNext = null; - return 1 + mCursor.skipNext(amount - 1); + mNext = null; + return 1 + mCursor.skipNext(amount - 1); + } catch (FetchException e) { + try { + close(); + } catch (Exception e2) { + // Don't care. + } + throw e; + } } } diff --git a/src/main/java/com/amazon/carbonado/repo/jdbc/JDBCCursor.java b/src/main/java/com/amazon/carbonado/repo/jdbc/JDBCCursor.java index f419c0d..fa80fde 100644 --- a/src/main/java/com/amazon/carbonado/repo/jdbc/JDBCCursor.java +++ b/src/main/java/com/amazon/carbonado/repo/jdbc/JDBCCursor.java @@ -35,10 +35,10 @@ import com.amazon.carbonado.cursor.AbstractCursor; */ class JDBCCursor extends AbstractCursor { private final JDBCStorage mStorage; - private Connection mConnection; - private PreparedStatement mStatement; - private ResultSet mResultSet; + private final Connection mConnection; + private final PreparedStatement mStatement; + private ResultSet mResultSet; private boolean mHasNext; JDBCCursor(JDBCStorage storage, @@ -49,7 +49,17 @@ class JDBCCursor extends AbstractCursor { mStorage = storage; mConnection = con; mStatement = statement; - mResultSet = statement.executeQuery(); + try { + mResultSet = statement.executeQuery(); + } catch (SQLException e) { + try { + statement.close(); + storage.mRepository.yieldConnection(con); + } catch (Exception e2) { + // Don't care. + } + throw e; + } } public void close() throws FetchException { @@ -62,6 +72,7 @@ class JDBCCursor extends AbstractCursor { throw mStorage.getJDBCRepository().toFetchException(e); } finally { mResultSet = null; + mHasNext = false; } } } @@ -75,6 +86,11 @@ class JDBCCursor extends AbstractCursor { try { mHasNext = rs.next(); } catch (SQLException e) { + try { + close(); + } catch (FetchException e2) { + // Don't care. + } throw mStorage.getJDBCRepository().toFetchException(e); } if (!mHasNext) { @@ -93,6 +109,11 @@ class JDBCCursor extends AbstractCursor { mHasNext = false; return obj; } catch (SQLException e) { + try { + close(); + } catch (FetchException e2) { + // Don't care. + } throw mStorage.getJDBCRepository().toFetchException(e); } } -- cgit v1.2.3