From 17f39437744585c656cf37d4515a0b115cc73cc8 Mon Sep 17 00:00:00 2001 From: "Brian S. O'Neill" Date: Mon, 29 Jan 2007 23:24:57 +0000 Subject: Fixed multiple cursor implementations to be extra paranoid and close the cursor when an exception is thrown. --- .../carbonado/repo/indexed/IndexedCursor.java | 197 ++++++++++++--------- .../com/amazon/carbonado/repo/jdbc/JDBCCursor.java | 29 ++- 2 files changed, 137 insertions(+), 89 deletions(-) (limited to 'src/main/java/com/amazon/carbonado/repo') diff --git a/src/main/java/com/amazon/carbonado/repo/indexed/IndexedCursor.java b/src/main/java/com/amazon/carbonado/repo/indexed/IndexedCursor.java index 892af49..35fc492 100644 --- a/src/main/java/com/amazon/carbonado/repo/indexed/IndexedCursor.java +++ b/src/main/java/com/amazon/carbonado/repo/indexed/IndexedCursor.java @@ -63,117 +63,144 @@ class IndexedCursor extends AbstractCursor { if (mNext != null) { return true; } - while (mCursor.hasNext()) { - final Storable indexEntry = mCursor.next(); - - S master = mStorage.mMasterStorage.prepare(); - mGenerator.copyToMasterPrimaryKey(indexEntry, master); - - if (!master.tryLoad()) { - LogFactory.getLog(getClass()).warn - ("Master is missing for index entry: " + indexEntry); - } else { - if (mGenerator.isConsistent(indexEntry, master)) { - mNext = master; - return true; - } - - // This index entry is stale. Repair is needed. - - // Insert a correct index entry, just to be sure. - try { - final IndexedRepository repo = mStorage.mRepository; - final Storage indexEntryStorage = - repo.getIndexEntryStorageFor(mGenerator.getIndexEntryClass()); - Storable newIndexEntry = indexEntryStorage.prepare(); - mGenerator.copyFromMaster(newIndexEntry, master); - - if (newIndexEntry.tryLoad()) { - // Good, the correct index entry exists. We'll see - // the master record eventually, so skip. - } else { - // We have no choice but to return the master, at - // the risk of seeing it multiple times. This is - // better than seeing it never. - LogFactory.getLog(getClass()).warn - ("Inconsistent index entry: " + indexEntry); + try { + while (mCursor.hasNext()) { + final Storable indexEntry = mCursor.next(); + + S master = mStorage.mMasterStorage.prepare(); + mGenerator.copyToMasterPrimaryKey(indexEntry, master); + + if (!master.tryLoad()) { + LogFactory.getLog(getClass()).warn + ("Master is missing for index entry: " + indexEntry); + } else { + if (mGenerator.isConsistent(indexEntry, master)) { mNext = master; + return true; } - // Repair the stale index entry. - RepairExecutor.execute(new Runnable() { - public void run() { - Transaction txn = repo.enterTransaction(); - try { - // Reload master and verify inconsistency. - S master = mStorage.mMasterStorage.prepare(); - mGenerator.copyToMasterPrimaryKey(indexEntry, master); - - if (master.tryLoad()) { - Storable newIndexEntry = indexEntryStorage.prepare(); - mGenerator.copyFromMaster(newIndexEntry, master); - - newIndexEntry.tryInsert(); + // This index entry is stale. Repair is needed. + + // Insert a correct index entry, just to be sure. + try { + final IndexedRepository repo = mStorage.mRepository; + final Storage indexEntryStorage = + repo.getIndexEntryStorageFor(mGenerator.getIndexEntryClass()); + Storable newIndexEntry = indexEntryStorage.prepare(); + mGenerator.copyFromMaster(newIndexEntry, master); + + if (newIndexEntry.tryLoad()) { + // Good, the correct index entry exists. We'll see + // the master record eventually, so skip. + } else { + // We have no choice but to return the master, at + // the risk of seeing it multiple times. This is + // better than seeing it never. + LogFactory.getLog(getClass()).warn + ("Inconsistent index entry: " + indexEntry); + mNext = master; + } - indexEntry.tryDelete(); - txn.commit(); - } - } catch (FetchException fe) { - LogFactory.getLog(IndexedCursor.class).warn - ("Unable to check if repair required for " + - "inconsistent index entry " + - indexEntry, fe); - } catch (PersistException pe) { - LogFactory.getLog(IndexedCursor.class).error - ("Unable to repair inconsistent index entry " + - indexEntry, pe); - } finally { + // Repair the stale index entry. + RepairExecutor.execute(new Runnable() { + public void run() { + Transaction txn = repo.enterTransaction(); try { - txn.exit(); + // Reload master and verify inconsistency. + S master = mStorage.mMasterStorage.prepare(); + mGenerator.copyToMasterPrimaryKey(indexEntry, master); + + if (master.tryLoad()) { + Storable newIndexEntry = indexEntryStorage.prepare(); + mGenerator.copyFromMaster(newIndexEntry, master); + + newIndexEntry.tryInsert(); + + indexEntry.tryDelete(); + txn.commit(); + } + } catch (FetchException fe) { + LogFactory.getLog(IndexedCursor.class).warn + ("Unable to check if repair required for " + + "inconsistent index entry " + + indexEntry, fe); } catch (PersistException pe) { LogFactory.getLog(IndexedCursor.class).error ("Unable to repair inconsistent index entry " + indexEntry, pe); + } finally { + try { + txn.exit(); + } catch (PersistException pe) { + LogFactory.getLog(IndexedCursor.class).error + ("Unable to repair inconsistent index entry " + + indexEntry, pe); + } } } - } - }); - } catch (RepositoryException re) { - LogFactory.getLog(getClass()).error - ("Unable to inspect inconsistent index entry " + - indexEntry, re); - } + }); + } catch (RepositoryException re) { + LogFactory.getLog(getClass()).error + ("Unable to inspect inconsistent index entry " + + indexEntry, re); + } - if (mNext != null) { - return true; + if (mNext != null) { + return true; + } } } + } catch (FetchException e) { + try { + close(); + } catch (Exception e2) { + // Don't care. + } + throw e; } return false; } public S next() throws FetchException { - if (hasNext()) { - S next = mNext; - mNext = null; - return next; + try { + if (hasNext()) { + S next = mNext; + mNext = null; + return next; + } + } catch (FetchException e) { + try { + close(); + } catch (Exception e2) { + // Don't care. + } + throw e; } throw new NoSuchElementException(); } public int skipNext(int amount) throws FetchException { - if (mNext == null) { - return mCursor.skipNext(amount); - } + try { + if (mNext == null) { + return mCursor.skipNext(amount); + } - if (amount <= 0) { - if (amount < 0) { - throw new IllegalArgumentException("Cannot skip negative amount: " + amount); + if (amount <= 0) { + if (amount < 0) { + throw new IllegalArgumentException("Cannot skip negative amount: " + amount); + } + return 0; } - return 0; - } - mNext = null; - return 1 + mCursor.skipNext(amount - 1); + mNext = null; + return 1 + mCursor.skipNext(amount - 1); + } catch (FetchException e) { + try { + close(); + } catch (Exception e2) { + // Don't care. + } + throw e; + } } } diff --git a/src/main/java/com/amazon/carbonado/repo/jdbc/JDBCCursor.java b/src/main/java/com/amazon/carbonado/repo/jdbc/JDBCCursor.java index f419c0d..fa80fde 100644 --- a/src/main/java/com/amazon/carbonado/repo/jdbc/JDBCCursor.java +++ b/src/main/java/com/amazon/carbonado/repo/jdbc/JDBCCursor.java @@ -35,10 +35,10 @@ import com.amazon.carbonado.cursor.AbstractCursor; */ class JDBCCursor extends AbstractCursor { private final JDBCStorage mStorage; - private Connection mConnection; - private PreparedStatement mStatement; - private ResultSet mResultSet; + private final Connection mConnection; + private final PreparedStatement mStatement; + private ResultSet mResultSet; private boolean mHasNext; JDBCCursor(JDBCStorage storage, @@ -49,7 +49,17 @@ class JDBCCursor extends AbstractCursor { mStorage = storage; mConnection = con; mStatement = statement; - mResultSet = statement.executeQuery(); + try { + mResultSet = statement.executeQuery(); + } catch (SQLException e) { + try { + statement.close(); + storage.mRepository.yieldConnection(con); + } catch (Exception e2) { + // Don't care. + } + throw e; + } } public void close() throws FetchException { @@ -62,6 +72,7 @@ class JDBCCursor extends AbstractCursor { throw mStorage.getJDBCRepository().toFetchException(e); } finally { mResultSet = null; + mHasNext = false; } } } @@ -75,6 +86,11 @@ class JDBCCursor extends AbstractCursor { try { mHasNext = rs.next(); } catch (SQLException e) { + try { + close(); + } catch (FetchException e2) { + // Don't care. + } throw mStorage.getJDBCRepository().toFetchException(e); } if (!mHasNext) { @@ -93,6 +109,11 @@ class JDBCCursor extends AbstractCursor { mHasNext = false; return obj; } catch (SQLException e) { + try { + close(); + } catch (FetchException e2) { + // Don't care. + } throw mStorage.getJDBCRepository().toFetchException(e); } } -- cgit v1.2.3