summaryrefslogtreecommitdiff
path: root/src/main/java/com/amazon/carbonado/repo
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/com/amazon/carbonado/repo')
-rw-r--r--src/main/java/com/amazon/carbonado/repo/indexed/IndexedStorage.java115
-rw-r--r--src/main/java/com/amazon/carbonado/repo/indexed/ManagedIndex.java19
-rw-r--r--src/main/java/com/amazon/carbonado/repo/sleepycat/BDBStorage.java65
3 files changed, 164 insertions, 35 deletions
diff --git a/src/main/java/com/amazon/carbonado/repo/indexed/IndexedStorage.java b/src/main/java/com/amazon/carbonado/repo/indexed/IndexedStorage.java
index dbd6e4f..cefb031 100644
--- a/src/main/java/com/amazon/carbonado/repo/indexed/IndexedStorage.java
+++ b/src/main/java/com/amazon/carbonado/repo/indexed/IndexedStorage.java
@@ -29,9 +29,13 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import com.amazon.carbonado.Cursor;
+import com.amazon.carbonado.FetchDeadlockException;
import com.amazon.carbonado.FetchException;
+import com.amazon.carbonado.FetchTimeoutException;
import com.amazon.carbonado.IsolationLevel;
+import com.amazon.carbonado.PersistDeadlockException;
import com.amazon.carbonado.PersistException;
+import com.amazon.carbonado.PersistTimeoutException;
import com.amazon.carbonado.Query;
import com.amazon.carbonado.RepositoryException;
import com.amazon.carbonado.Storable;
@@ -118,12 +122,28 @@ class IndexedStorage<S extends Storable> implements Storage<S>, StorageAccess<S>
.with(getStorableType().getName() + '~' + '\uffff');
List<StoredIndexInfo> storedInfos;
- Transaction txn = repository.getWrappedRepository()
- .enterTopTransaction(IsolationLevel.READ_COMMITTED);
try {
- storedInfos = query.fetch().toList();
- } finally {
- txn.exit();
+ Transaction txn = repository.getWrappedRepository()
+ .enterTopTransaction(IsolationLevel.READ_COMMITTED);
+ try {
+ storedInfos = query.fetch().toList();
+ } finally {
+ txn.exit();
+ }
+ } catch (FetchException e) {
+ if (e instanceof FetchDeadlockException || e instanceof FetchTimeoutException) {
+ // Might be caused by coarse locks. Switch to nested
+ // transaction to share the locks.
+ Transaction txn = repository.getWrappedRepository()
+ .enterTransaction(IsolationLevel.READ_COMMITTED);
+ try {
+ storedInfos = query.fetch().toList();
+ } finally {
+ txn.exit();
+ }
+ } else {
+ throw e;
+ }
}
for (StoredIndexInfo indexInfo : storedInfos) {
@@ -460,34 +480,81 @@ class IndexedStorage<S extends Storable> implements Storage<S>, StorageAccess<S>
.storageFor(StoredIndexInfo.class).prepare();
info.setIndexName(index.getNameDescriptor());
- Transaction txn = mRepository.getWrappedRepository()
- .enterTopTransaction(IsolationLevel.READ_COMMITTED);
try {
- if (info.tryLoad()) {
- // Index already exists and is registered.
- return;
+ Transaction txn = mRepository.getWrappedRepository()
+ .enterTopTransaction(IsolationLevel.READ_COMMITTED);
+ try {
+ if (info.tryLoad()) {
+ // Index already exists and is registered.
+ return;
+ }
+ } finally {
+ txn.exit();
+ }
+ } catch (FetchException e) {
+ if (e instanceof FetchDeadlockException || e instanceof FetchTimeoutException) {
+ // Might be caused by coarse locks. Switch to nested
+ // transaction to share the locks.
+ Transaction txn = mRepository.getWrappedRepository()
+ .enterTransaction(IsolationLevel.READ_COMMITTED);
+ try {
+ if (info.tryLoad()) {
+ // Index already exists and is registered.
+ return;
+ }
+ } finally {
+ txn.exit();
+ }
+ } else {
+ throw e;
}
- } finally {
- txn.exit();
}
// New index, so populate it.
managedIndex.populateIndex(mRepository, mMasterStorage,
mRepository.getIndexRepairThrottle());
- txn = mRepository.getWrappedRepository()
- .enterTopTransaction(IsolationLevel.READ_COMMITTED);
- txn.setForUpdate(true);
- try {
- if (!info.tryLoad()) {
- info.setIndexTypeDescriptor(index.getTypeDescriptor());
- info.setCreationTimestamp(System.currentTimeMillis());
- info.setVersionNumber(0);
- info.insert();
- txn.commit();
+ boolean top = true;
+ while (true) {
+ try {
+ Transaction txn;
+ if (top) {
+ txn = mRepository.getWrappedRepository()
+ .enterTopTransaction(IsolationLevel.READ_COMMITTED);
+ } else {
+ txn = mRepository.getWrappedRepository()
+ .enterTransaction(IsolationLevel.READ_COMMITTED);
+ }
+
+ txn.setForUpdate(true);
+ try {
+ if (!info.tryLoad()) {
+ info.setIndexTypeDescriptor(index.getTypeDescriptor());
+ info.setCreationTimestamp(System.currentTimeMillis());
+ info.setVersionNumber(0);
+ info.insert();
+ txn.commit();
+ }
+ } finally {
+ txn.exit();
+ }
+
+ break;
+ } catch (RepositoryException e) {
+ if (e instanceof FetchDeadlockException ||
+ e instanceof FetchTimeoutException ||
+ e instanceof PersistDeadlockException ||
+ e instanceof PersistTimeoutException)
+ {
+ // Might be caused by coarse locks. Switch to nested
+ // transaction to share the locks.
+ if (top) {
+ top = false;
+ continue;
+ }
+ }
+ throw e;
}
- } finally {
- txn.exit();
}
}
diff --git a/src/main/java/com/amazon/carbonado/repo/indexed/ManagedIndex.java b/src/main/java/com/amazon/carbonado/repo/indexed/ManagedIndex.java
index 507a09f..ea3bbb3 100644
--- a/src/main/java/com/amazon/carbonado/repo/indexed/ManagedIndex.java
+++ b/src/main/java/com/amazon/carbonado/repo/indexed/ManagedIndex.java
@@ -242,17 +242,26 @@ class ManagedIndex<S extends Storable> implements IndexEntryAccessor<S> {
masterQuery = masterStorage.query().orderBy(naturalOrdering);
}
- // Enter top transaction with isolation level of none to make sure
- // preload operation does not run in a long nested transaction.
- Transaction txn = repo.enterTopTransaction(IsolationLevel.NONE);
- try {
- Cursor<S> cursor = masterQuery.fetch();
+ // Quick check to see if any records exist in master.
+ {
+ Transaction txn = repo.enterTransaction(IsolationLevel.READ_COMMITTED);
try {
+ Cursor<S> cursor = masterQuery.fetch();
if (!cursor.hasNext()) {
// Nothing exists in master, so nothing to populate.
return;
}
+ } finally {
+ txn.exit();
+ }
+ }
+ // Enter top transaction with isolation level of none to make sure
+ // preload operation does not run in a long nested transaction.
+ Transaction txn = repo.enterTopTransaction(IsolationLevel.NONE);
+ try {
+ Cursor<S> cursor = masterQuery.fetch();
+ try {
if (log.isInfoEnabled()) {
StringBuilder b = new StringBuilder();
b.append("Populating index on ");
diff --git a/src/main/java/com/amazon/carbonado/repo/sleepycat/BDBStorage.java b/src/main/java/com/amazon/carbonado/repo/sleepycat/BDBStorage.java
index 6c4d4e0..7332a61 100644
--- a/src/main/java/com/amazon/carbonado/repo/sleepycat/BDBStorage.java
+++ b/src/main/java/com/amazon/carbonado/repo/sleepycat/BDBStorage.java
@@ -27,9 +27,13 @@ import org.apache.commons.logging.LogFactory;
import org.cojen.classfile.TypeDesc;
import com.amazon.carbonado.Cursor;
+import com.amazon.carbonado.FetchDeadlockException;
import com.amazon.carbonado.FetchException;
+import com.amazon.carbonado.FetchTimeoutException;
import com.amazon.carbonado.IsolationLevel;
+import com.amazon.carbonado.PersistDeadlockException;
import com.amazon.carbonado.PersistException;
+import com.amazon.carbonado.PersistTimeoutException;
import com.amazon.carbonado.Query;
import com.amazon.carbonado.Repository;
import com.amazon.carbonado.RepositoryException;
@@ -491,12 +495,32 @@ abstract class BDBStorage<Txn, S extends Storable> implements Storage<S>, Storag
if (!readOnly) {
Repository repo = mRepository.getRootRepository();
- Transaction txn = repo.enterTopTransaction(IsolationLevel.READ_COMMITTED);
try {
- primaryInfo.update();
- txn.commit();
- } finally {
- txn.exit();
+ Transaction txn =
+ repo.enterTopTransaction(IsolationLevel.READ_COMMITTED);
+ try {
+ primaryInfo.update();
+ txn.commit();
+ } finally {
+ txn.exit();
+ }
+ } catch (PersistException e) {
+ if (e instanceof PersistDeadlockException ||
+ e instanceof PersistTimeoutException)
+ {
+ // Might be caused by coarse locks. Switch to
+ // nested transaction to share the locks.
+ Transaction txn =
+ repo.enterTransaction(IsolationLevel.READ_COMMITTED);
+ try {
+ primaryInfo.update();
+ txn.commit();
+ } finally {
+ txn.exit();
+ }
+ } else {
+ throw e;
+ }
}
}
}
@@ -788,9 +812,16 @@ abstract class BDBStorage<Txn, S extends Storable> implements Storage<S>, Storag
info.setDatabaseName(getStorableType().getName());
// Try to insert metadata up to three times.
+ boolean top = true;
for (int retryCount = 3;;) {
try {
- Transaction txn = repo.enterTopTransaction(IsolationLevel.READ_COMMITTED);
+ Transaction txn;
+ if (top) {
+ txn = mRepository.enterTopTransaction(IsolationLevel.READ_COMMITTED);
+ } else {
+ txn = mRepository.enterTransaction(IsolationLevel.READ_COMMITTED);
+ }
+
txn.setForUpdate(true);
try {
if (!info.tryLoad()) {
@@ -815,6 +846,28 @@ abstract class BDBStorage<Txn, S extends Storable> implements Storage<S>, Storag
// a few times before throwing exception. Wait up to a second
// before each retry.
retryCount = e.backoff(e, retryCount, 1000);
+ } catch (FetchException e) {
+ if (e instanceof FetchDeadlockException || e instanceof FetchTimeoutException) {
+ // Might be caused by coarse locks. Switch to nested
+ // transaction to share the locks.
+ if (top) {
+ top = false;
+ retryCount = e.backoff(e, retryCount, 100);
+ continue;
+ }
+ }
+ throw e;
+ } catch (PersistException e) {
+ if (e instanceof PersistDeadlockException || e instanceof PersistTimeoutException){
+ // Might be caused by coarse locks. Switch to nested
+ // transaction to share the locks.
+ if (top) {
+ top = false;
+ retryCount = e.backoff(e, retryCount, 100);
+ continue;
+ }
+ }
+ throw e;
}
}