summaryrefslogtreecommitdiff
path: root/src/main/java/com/amazon/carbonado/repo/indexed
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/com/amazon/carbonado/repo/indexed')
-rw-r--r--src/main/java/com/amazon/carbonado/repo/indexed/IndexEntryAccessor.java1
-rw-r--r--src/main/java/com/amazon/carbonado/repo/indexed/IndexEntryGenerator.java1
-rw-r--r--src/main/java/com/amazon/carbonado/repo/indexed/IndexedRepository.java17
-rw-r--r--src/main/java/com/amazon/carbonado/repo/indexed/IndexedRepositoryBuilder.java21
-rw-r--r--src/main/java/com/amazon/carbonado/repo/indexed/IndexedStorage.java92
5 files changed, 97 insertions, 35 deletions
diff --git a/src/main/java/com/amazon/carbonado/repo/indexed/IndexEntryAccessor.java b/src/main/java/com/amazon/carbonado/repo/indexed/IndexEntryAccessor.java
index b107a6e..0c1e6b8 100644
--- a/src/main/java/com/amazon/carbonado/repo/indexed/IndexEntryAccessor.java
+++ b/src/main/java/com/amazon/carbonado/repo/indexed/IndexEntryAccessor.java
@@ -20,7 +20,6 @@ package com.amazon.carbonado.repo.indexed;
import java.util.Comparator;
-import com.amazon.carbonado.FetchException;
import com.amazon.carbonado.Storable;
import com.amazon.carbonado.Storage;
diff --git a/src/main/java/com/amazon/carbonado/repo/indexed/IndexEntryGenerator.java b/src/main/java/com/amazon/carbonado/repo/indexed/IndexEntryGenerator.java
index 4059ac3..a126224 100644
--- a/src/main/java/com/amazon/carbonado/repo/indexed/IndexEntryGenerator.java
+++ b/src/main/java/com/amazon/carbonado/repo/indexed/IndexEntryGenerator.java
@@ -24,7 +24,6 @@ import java.util.WeakHashMap;
import java.lang.ref.Reference;
import java.lang.ref.SoftReference;
-import com.amazon.carbonado.FetchException;
import com.amazon.carbonado.Storable;
import com.amazon.carbonado.SupportException;
import com.amazon.carbonado.info.StorableIndex;
diff --git a/src/main/java/com/amazon/carbonado/repo/indexed/IndexedRepository.java b/src/main/java/com/amazon/carbonado/repo/indexed/IndexedRepository.java
index d558950..097185a 100644
--- a/src/main/java/com/amazon/carbonado/repo/indexed/IndexedRepository.java
+++ b/src/main/java/com/amazon/carbonado/repo/indexed/IndexedRepository.java
@@ -43,7 +43,7 @@ import com.amazon.carbonado.info.StorableIntrospector;
import com.amazon.carbonado.qe.RepositoryAccess;
import com.amazon.carbonado.qe.StorageAccess;
-import com.amazon.carbonado.spi.StorageCollection;
+import com.amazon.carbonado.spi.StoragePool;
/**
* Wraps another repository in order to make it support indexes. The wrapped
@@ -62,20 +62,23 @@ class IndexedRepository implements Repository,
private final String mName;
private final boolean mIndexRepairEnabled;
private final double mIndexThrottle;
- private final StorageCollection mStorages;
+ private final boolean mAllClustered;
+ private final StoragePool mStoragePool;
IndexedRepository(AtomicReference<Repository> rootRef, String name,
Repository repository,
boolean indexRepairEnabled,
- double indexThrottle)
+ double indexThrottle,
+ boolean allClustered)
{
mRootRef = rootRef;
mRepository = repository;
mName = name;
mIndexRepairEnabled = indexRepairEnabled;
mIndexThrottle = indexThrottle;
+ mAllClustered = allClustered;
- mStorages = new StorageCollection() {
+ mStoragePool = new StoragePool() {
protected <S extends Storable> Storage<S> createStorage(Class<S> type)
throws RepositoryException
{
@@ -112,7 +115,7 @@ class IndexedRepository implements Repository,
public <S extends Storable> Storage<S> storageFor(Class<S> type)
throws MalformedTypeException, SupportException, RepositoryException
{
- return mStorages.storageFor(type);
+ return mStoragePool.get(type);
}
public Transaction enterTransaction() {
@@ -221,4 +224,8 @@ class IndexedRepository implements Repository,
double getIndexRepairThrottle() {
return mIndexThrottle;
}
+
+ boolean isAllClustered() {
+ return mAllClustered;
+ }
}
diff --git a/src/main/java/com/amazon/carbonado/repo/indexed/IndexedRepositoryBuilder.java b/src/main/java/com/amazon/carbonado/repo/indexed/IndexedRepositoryBuilder.java
index 4cd4afb..e7ad8ed 100644
--- a/src/main/java/com/amazon/carbonado/repo/indexed/IndexedRepositoryBuilder.java
+++ b/src/main/java/com/amazon/carbonado/repo/indexed/IndexedRepositoryBuilder.java
@@ -49,6 +49,7 @@ public class IndexedRepositoryBuilder extends AbstractRepositoryBuilder {
private RepositoryBuilder mRepoBuilder;
private boolean mIndexRepairEnabled = true;
private double mIndexThrottle = 1.0;
+ private boolean mAllClustered;
public IndexedRepositoryBuilder() {
}
@@ -75,7 +76,8 @@ public class IndexedRepositoryBuilder extends AbstractRepositoryBuilder {
Repository repo = new IndexedRepository(rootRef, getName(), wrapped,
isIndexRepairEnabled(),
- getIndexRepairThrottle());
+ getIndexRepairThrottle(),
+ isAllClustered());
rootRef.set(repo);
return repo;
}
@@ -167,6 +169,23 @@ public class IndexedRepositoryBuilder extends AbstractRepositoryBuilder {
mIndexThrottle = desiredSpeed;
}
+ /**
+ * Returns true if all indexes should be identified as clustered. This
+ * affects how indexes are selected by the query analyzer.
+ */
+ public boolean isAllClustered() {
+ return mAllClustered;
+ }
+
+ /**
+ * When all indexes are identified as clustered, the query analyzer treats
+ * all indexes as performing equally well. This is suitable for indexing
+ * repositories that never read from a slow storage medium.
+ */
+ public void setAllClustered(boolean clustered) {
+ mAllClustered = clustered;
+ }
+
public void errorCheck(Collection<String> messages) throws ConfigurationException {
super.errorCheck(messages);
if (null == getWrappedRepository()) {
diff --git a/src/main/java/com/amazon/carbonado/repo/indexed/IndexedStorage.java b/src/main/java/com/amazon/carbonado/repo/indexed/IndexedStorage.java
index 7b66f50..fe0cfe8 100644
--- a/src/main/java/com/amazon/carbonado/repo/indexed/IndexedStorage.java
+++ b/src/main/java/com/amazon/carbonado/repo/indexed/IndexedStorage.java
@@ -30,6 +30,7 @@ import org.apache.commons.logging.LogFactory;
import com.amazon.carbonado.Cursor;
import com.amazon.carbonado.FetchException;
import com.amazon.carbonado.IsolationLevel;
+import com.amazon.carbonado.PersistException;
import com.amazon.carbonado.Query;
import com.amazon.carbonado.RepositoryException;
import com.amazon.carbonado.Storable;
@@ -188,6 +189,9 @@ class IndexedStorage<S extends Storable> implements Storage<S>, StorageAccess<S>
// Assume index is malformed, so ignore it.
continue;
}
+ if (mRepository.isAllClustered()) {
+ freeIndex = freeIndex.clustered(true);
+ }
mAllIndexInfoMap.put(freeIndex, ii);
freeIndexSet.add(freeIndex);
}
@@ -210,6 +214,10 @@ class IndexedStorage<S extends Storable> implements Storage<S>, StorageAccess<S>
// Add the indexes we get for free.
queryableIndexSet.addAll(freeIndexSet);
+
+ if (mRepository.isAllClustered()) {
+ queryableIndexSet.markClustered(true);
+ }
}
// The set of indexes that should be kept up-to-date. If index repair
@@ -313,6 +321,36 @@ class IndexedStorage<S extends Storable> implements Storage<S>, StorageAccess<S>
return mQueryEngine.query(filter);
}
+ public void truncate() throws PersistException {
+ hasManagedIndexes: {
+ for (IndexInfo info : mAllIndexInfoMap.values()) {
+ if (info instanceof ManagedIndex) {
+ break hasManagedIndexes;
+ }
+ }
+
+ // No managed indexes, so nothing special to do.
+ mMasterStorage.truncate();
+ return;
+ }
+
+ Transaction txn = mRepository.enterTransaction();
+ try {
+ mMasterStorage.truncate();
+
+ // Now truncate the indexes.
+ for (IndexInfo info : mAllIndexInfoMap.values()) {
+ if (info instanceof ManagedIndex) {
+ ((ManagedIndex) info).getIndexEntryStorage().truncate();
+ }
+ }
+
+ txn.commit();
+ } finally {
+ txn.exit();
+ }
+ }
+
public boolean addTrigger(Trigger<? super S> trigger) {
return mMasterStorage.addTrigger(trigger);
}
@@ -372,9 +410,6 @@ class IndexedStorage<S extends Storable> implements Storage<S>, StorageAccess<S>
}
}
- // FIXME: sort buffer should be on repository access. Also, create abstract
- // repository access that creates the correct merge sort buffer. And more:
- // create capability for managing merge sort buffers.
return new MergeSortBuffer<S>(mRootStorage);
}
@@ -521,20 +556,24 @@ class IndexedStorage<S extends Storable> implements Storage<S>, StorageAccess<S>
{
// Doesn't completely remove the index, but it should free up space.
+
double desiredSpeed = mRepository.getIndexRepairThrottle();
Throttle throttle = desiredSpeed < 1.0 ? new Throttle(POPULATE_THROTTLE_WINDOW) : null;
- long totalDropped = 0;
- while (true) {
- Transaction txn = mRepository.getWrappedRepository()
- .enterTopTransaction(IsolationLevel.READ_COMMITTED);
- txn.setForUpdate(true);
- try {
- Cursor<? extends Storable> cursor = indexEntryStorage.query().fetch();
- if (!cursor.hasNext()) {
- break;
- }
- int count = 0;
+ if (throttle == null) {
+ indexEntryStorage.truncate();
+ } else {
+ long totalDropped = 0;
+ while (true) {
+ Transaction txn = mRepository.getWrappedRepository()
+ .enterTopTransaction(IsolationLevel.READ_COMMITTED);
+ txn.setForUpdate(true);
+ try {
+ Cursor<? extends Storable> cursor = indexEntryStorage.query().fetch();
+ if (!cursor.hasNext()) {
+ break;
+ }
+ int count = 0;
final long savedTotal = totalDropped;
boolean anyFailure = false;
try {
@@ -544,26 +583,25 @@ class IndexedStorage<S extends Storable> implements Storage<S>, StorageAccess<S>
} else {
anyFailure = true;
}
+ }
+ } finally {
+ cursor.close();
+ }
+ txn.commit();
+ if (log.isInfoEnabled()) {
+ log.info("Removed " + totalDropped + " index entries");
}
- } finally {
- cursor.close();
- }
- txn.commit();
- if (log.isInfoEnabled()) {
- log.info("Removed " + totalDropped + " index entries");
- }
if (anyFailure && totalDropped <= savedTotal) {
log.warn("No indexes removed in last batch. " +
"Aborting index removal cleanup");
break;
}
- } catch (FetchException e) {
- throw e.toPersistException();
- } finally {
- txn.exit();
- }
+ } catch (FetchException e) {
+ throw e.toPersistException();
+ } finally {
+ txn.exit();
+ }
- if (throttle != null) {
try {
throttle.throttle(desiredSpeed, POPULATE_THROTTLE_SLEEP_PRECISION);
} catch (InterruptedException e) {