summaryrefslogtreecommitdiff
path: root/src/main/java/com/amazon/carbonado/repo/indexed
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/com/amazon/carbonado/repo/indexed')
-rw-r--r--src/main/java/com/amazon/carbonado/repo/indexed/IndexedStorage.java115
-rw-r--r--src/main/java/com/amazon/carbonado/repo/indexed/ManagedIndex.java19
2 files changed, 105 insertions, 29 deletions
diff --git a/src/main/java/com/amazon/carbonado/repo/indexed/IndexedStorage.java b/src/main/java/com/amazon/carbonado/repo/indexed/IndexedStorage.java
index dbd6e4f..cefb031 100644
--- a/src/main/java/com/amazon/carbonado/repo/indexed/IndexedStorage.java
+++ b/src/main/java/com/amazon/carbonado/repo/indexed/IndexedStorage.java
@@ -29,9 +29,13 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import com.amazon.carbonado.Cursor;
+import com.amazon.carbonado.FetchDeadlockException;
import com.amazon.carbonado.FetchException;
+import com.amazon.carbonado.FetchTimeoutException;
import com.amazon.carbonado.IsolationLevel;
+import com.amazon.carbonado.PersistDeadlockException;
import com.amazon.carbonado.PersistException;
+import com.amazon.carbonado.PersistTimeoutException;
import com.amazon.carbonado.Query;
import com.amazon.carbonado.RepositoryException;
import com.amazon.carbonado.Storable;
@@ -118,12 +122,28 @@ class IndexedStorage<S extends Storable> implements Storage<S>, StorageAccess<S>
.with(getStorableType().getName() + '~' + '\uffff');
List<StoredIndexInfo> storedInfos;
- Transaction txn = repository.getWrappedRepository()
- .enterTopTransaction(IsolationLevel.READ_COMMITTED);
try {
- storedInfos = query.fetch().toList();
- } finally {
- txn.exit();
+ Transaction txn = repository.getWrappedRepository()
+ .enterTopTransaction(IsolationLevel.READ_COMMITTED);
+ try {
+ storedInfos = query.fetch().toList();
+ } finally {
+ txn.exit();
+ }
+ } catch (FetchException e) {
+ if (e instanceof FetchDeadlockException || e instanceof FetchTimeoutException) {
+ // Might be caused by coarse locks. Switch to nested
+ // transaction to share the locks.
+ Transaction txn = repository.getWrappedRepository()
+ .enterTransaction(IsolationLevel.READ_COMMITTED);
+ try {
+ storedInfos = query.fetch().toList();
+ } finally {
+ txn.exit();
+ }
+ } else {
+ throw e;
+ }
}
for (StoredIndexInfo indexInfo : storedInfos) {
@@ -460,34 +480,81 @@ class IndexedStorage<S extends Storable> implements Storage<S>, StorageAccess<S>
.storageFor(StoredIndexInfo.class).prepare();
info.setIndexName(index.getNameDescriptor());
- Transaction txn = mRepository.getWrappedRepository()
- .enterTopTransaction(IsolationLevel.READ_COMMITTED);
try {
- if (info.tryLoad()) {
- // Index already exists and is registered.
- return;
+ Transaction txn = mRepository.getWrappedRepository()
+ .enterTopTransaction(IsolationLevel.READ_COMMITTED);
+ try {
+ if (info.tryLoad()) {
+ // Index already exists and is registered.
+ return;
+ }
+ } finally {
+ txn.exit();
+ }
+ } catch (FetchException e) {
+ if (e instanceof FetchDeadlockException || e instanceof FetchTimeoutException) {
+ // Might be caused by coarse locks. Switch to nested
+ // transaction to share the locks.
+ Transaction txn = mRepository.getWrappedRepository()
+ .enterTransaction(IsolationLevel.READ_COMMITTED);
+ try {
+ if (info.tryLoad()) {
+ // Index already exists and is registered.
+ return;
+ }
+ } finally {
+ txn.exit();
+ }
+ } else {
+ throw e;
}
- } finally {
- txn.exit();
}
// New index, so populate it.
managedIndex.populateIndex(mRepository, mMasterStorage,
mRepository.getIndexRepairThrottle());
- txn = mRepository.getWrappedRepository()
- .enterTopTransaction(IsolationLevel.READ_COMMITTED);
- txn.setForUpdate(true);
- try {
- if (!info.tryLoad()) {
- info.setIndexTypeDescriptor(index.getTypeDescriptor());
- info.setCreationTimestamp(System.currentTimeMillis());
- info.setVersionNumber(0);
- info.insert();
- txn.commit();
+ boolean top = true;
+ while (true) {
+ try {
+ Transaction txn;
+ if (top) {
+ txn = mRepository.getWrappedRepository()
+ .enterTopTransaction(IsolationLevel.READ_COMMITTED);
+ } else {
+ txn = mRepository.getWrappedRepository()
+ .enterTransaction(IsolationLevel.READ_COMMITTED);
+ }
+
+ txn.setForUpdate(true);
+ try {
+ if (!info.tryLoad()) {
+ info.setIndexTypeDescriptor(index.getTypeDescriptor());
+ info.setCreationTimestamp(System.currentTimeMillis());
+ info.setVersionNumber(0);
+ info.insert();
+ txn.commit();
+ }
+ } finally {
+ txn.exit();
+ }
+
+ break;
+ } catch (RepositoryException e) {
+ if (e instanceof FetchDeadlockException ||
+ e instanceof FetchTimeoutException ||
+ e instanceof PersistDeadlockException ||
+ e instanceof PersistTimeoutException)
+ {
+ // Might be caused by coarse locks. Switch to nested
+ // transaction to share the locks.
+ if (top) {
+ top = false;
+ continue;
+ }
+ }
+ throw e;
}
- } finally {
- txn.exit();
}
}
diff --git a/src/main/java/com/amazon/carbonado/repo/indexed/ManagedIndex.java b/src/main/java/com/amazon/carbonado/repo/indexed/ManagedIndex.java
index 507a09f..ea3bbb3 100644
--- a/src/main/java/com/amazon/carbonado/repo/indexed/ManagedIndex.java
+++ b/src/main/java/com/amazon/carbonado/repo/indexed/ManagedIndex.java
@@ -242,17 +242,26 @@ class ManagedIndex<S extends Storable> implements IndexEntryAccessor<S> {
masterQuery = masterStorage.query().orderBy(naturalOrdering);
}
- // Enter top transaction with isolation level of none to make sure
- // preload operation does not run in a long nested transaction.
- Transaction txn = repo.enterTopTransaction(IsolationLevel.NONE);
- try {
- Cursor<S> cursor = masterQuery.fetch();
+ // Quick check to see if any records exist in master.
+ {
+ Transaction txn = repo.enterTransaction(IsolationLevel.READ_COMMITTED);
try {
+ Cursor<S> cursor = masterQuery.fetch();
if (!cursor.hasNext()) {
// Nothing exists in master, so nothing to populate.
return;
}
+ } finally {
+ txn.exit();
+ }
+ }
+ // Enter top transaction with isolation level of none to make sure
+ // preload operation does not run in a long nested transaction.
+ Transaction txn = repo.enterTopTransaction(IsolationLevel.NONE);
+ try {
+ Cursor<S> cursor = masterQuery.fetch();
+ try {
if (log.isInfoEnabled()) {
StringBuilder b = new StringBuilder();
b.append("Populating index on ");