summaryrefslogtreecommitdiff
path: root/src/main/java/com/amazon/carbonado/repo
diff options
context:
space:
mode:
authorBrian S. O'Neill <bronee@gmail.com>2011-05-04 00:20:02 +0000
committerBrian S. O'Neill <bronee@gmail.com>2011-05-04 00:20:02 +0000
commit121886bc0c92389610408e3b415abb992ad8a212 (patch)
treeccd7bcada5efd29b9106e2150734bee375fe1163 /src/main/java/com/amazon/carbonado/repo
parent5be9a7ea0f9aad9e97c4d70cb82ce8a22f2d412a (diff)
Add support for Query controller and timeouts; remove vestigial support for interrupts.
Diffstat (limited to 'src/main/java/com/amazon/carbonado/repo')
-rw-r--r--src/main/java/com/amazon/carbonado/repo/indexed/IndexedStorage.java44
-rw-r--r--src/main/java/com/amazon/carbonado/repo/indexed/ManagedIndex.java18
-rw-r--r--src/main/java/com/amazon/carbonado/repo/jdbc/H2ExceptionTransformer.java8
-rw-r--r--src/main/java/com/amazon/carbonado/repo/jdbc/JDBCExceptionTransformer.java20
-rw-r--r--src/main/java/com/amazon/carbonado/repo/jdbc/JDBCStorage.java92
-rw-r--r--src/main/java/com/amazon/carbonado/repo/jdbc/OracleExceptionTransformer.java7
-rw-r--r--src/main/java/com/amazon/carbonado/repo/logging/LoggingQuery.java94
-rw-r--r--src/main/java/com/amazon/carbonado/repo/map/MapStorage.java49
-rw-r--r--src/main/java/com/amazon/carbonado/repo/sleepycat/BDBStorage.java56
9 files changed, 371 insertions, 17 deletions
diff --git a/src/main/java/com/amazon/carbonado/repo/indexed/IndexedStorage.java b/src/main/java/com/amazon/carbonado/repo/indexed/IndexedStorage.java
index 4635f2c..d115a3b 100644
--- a/src/main/java/com/amazon/carbonado/repo/indexed/IndexedStorage.java
+++ b/src/main/java/com/amazon/carbonado/repo/indexed/IndexedStorage.java
@@ -191,14 +191,26 @@ class IndexedStorage<S extends Storable> implements Storage<S>, StorageAccess<S>
return new MergeSortBuffer<S>();
}
+ public SortBuffer<S> createSortBuffer(Query.Controller controller) {
+ return new MergeSortBuffer<S>(controller);
+ }
+
public long countAll() throws FetchException {
return mMasterStorage.query().count();
}
+ public long countAll(Query.Controller controller) throws FetchException {
+ return mMasterStorage.query().count(controller);
+ }
+
public Cursor<S> fetchAll() throws FetchException {
return mMasterStorage.query().fetch();
}
+ public Cursor<S> fetchAll(Query.Controller controller) throws FetchException {
+ return mMasterStorage.query().fetch(controller);
+ }
+
public Cursor<S> fetchOne(StorableIndex<S> index,
Object[] identityValues)
throws FetchException
@@ -207,6 +219,15 @@ class IndexedStorage<S extends Storable> implements Storage<S>, StorageAccess<S>
return indexInfo.fetchOne(this, identityValues);
}
+ public Cursor<S> fetchOne(StorableIndex<S> index,
+ Object[] identityValues,
+ Query.Controller controller)
+ throws FetchException
+ {
+ ManagedIndex<S> indexInfo = (ManagedIndex<S>) mAllIndexInfoMap.get(index);
+ return indexInfo.fetchOne(this, identityValues, controller);
+ }
+
public Query<?> indexEntryQuery(StorableIndex<S> index)
throws FetchException
{
@@ -221,6 +242,14 @@ class IndexedStorage<S extends Storable> implements Storage<S>, StorageAccess<S>
return indexInfo.fetchFromIndexEntryQuery(this, indexEntryQuery);
}
+ public Cursor<S> fetchFromIndexEntryQuery(StorableIndex<S> index, Query<?> indexEntryQuery,
+ Query.Controller controller)
+ throws FetchException
+ {
+ ManagedIndex<S> indexInfo = (ManagedIndex<S>) mAllIndexInfoMap.get(index);
+ return indexInfo.fetchFromIndexEntryQuery(this, indexEntryQuery, controller);
+ }
+
public Cursor<S> fetchSubset(StorableIndex<S> index,
Object[] identityValues,
BoundaryType rangeStartBoundary,
@@ -235,6 +264,21 @@ class IndexedStorage<S extends Storable> implements Storage<S>, StorageAccess<S>
throw new UnsupportedOperationException();
}
+ public Cursor<S> fetchSubset(StorableIndex<S> index,
+ Object[] identityValues,
+ BoundaryType rangeStartBoundary,
+ Object rangeStartValue,
+ BoundaryType rangeEndBoundary,
+ Object rangeEndValue,
+ boolean reverseRange,
+ boolean reverseOrder,
+ Query.Controller controller)
+ throws FetchException
+ {
+ // This method should never be called since a query was returned by indexEntryQuery.
+ throw new UnsupportedOperationException();
+ }
+
private void registerIndex(ManagedIndex<S> managedIndex)
throws RepositoryException
{
diff --git a/src/main/java/com/amazon/carbonado/repo/indexed/ManagedIndex.java b/src/main/java/com/amazon/carbonado/repo/indexed/ManagedIndex.java
index 2bb2f49..6c28619 100644
--- a/src/main/java/com/amazon/carbonado/repo/indexed/ManagedIndex.java
+++ b/src/main/java/com/amazon/carbonado/repo/indexed/ManagedIndex.java
@@ -173,6 +173,13 @@ class ManagedIndex<S extends Storable> implements IndexEntryAccessor<S> {
Cursor<S> fetchOne(IndexedStorage storage, Object[] identityValues)
throws FetchException
{
+ return fetchOne(storage, identityValues, null);
+ }
+
+ Cursor<S> fetchOne(IndexedStorage storage, Object[] identityValues,
+ Query.Controller controller)
+ throws FetchException
+ {
Query<?> query = mSingleMatchQuery;
if (query == null) {
@@ -184,13 +191,20 @@ class ManagedIndex<S extends Storable> implements IndexEntryAccessor<S> {
mSingleMatchQuery = query = mIndexEntryStorage.query(filter);
}
- return fetchFromIndexEntryQuery(storage, query.withValues(identityValues));
+ return fetchFromIndexEntryQuery(storage, query.withValues(identityValues), controller);
}
Cursor<S> fetchFromIndexEntryQuery(IndexedStorage storage, Query<?> indexEntryQuery)
throws FetchException
{
- return new IndexedCursor<S>(indexEntryQuery.fetch(), storage, mAccessor);
+ return fetchFromIndexEntryQuery(storage, indexEntryQuery, null);
+ }
+
+ Cursor<S> fetchFromIndexEntryQuery(IndexedStorage storage, Query<?> indexEntryQuery,
+ Query.Controller controller)
+ throws FetchException
+ {
+ return new IndexedCursor<S>(indexEntryQuery.fetch(controller), storage, mAccessor);
}
@Override
diff --git a/src/main/java/com/amazon/carbonado/repo/jdbc/H2ExceptionTransformer.java b/src/main/java/com/amazon/carbonado/repo/jdbc/H2ExceptionTransformer.java
index e760f58..d10856a 100644
--- a/src/main/java/com/amazon/carbonado/repo/jdbc/H2ExceptionTransformer.java
+++ b/src/main/java/com/amazon/carbonado/repo/jdbc/H2ExceptionTransformer.java
@@ -28,9 +28,15 @@ import java.sql.SQLException;
*/
class H2ExceptionTransformer extends JDBCExceptionTransformer {
public static int DUPLICATE_KEY = 23001;
+ public static int PROCESSING_CANCELED = 90051;
@Override
public boolean isUniqueConstraintError(SQLException e) {
- return DUPLICATE_KEY == e.getErrorCode();
+ return super.isUniqueConstraintError(e) || DUPLICATE_KEY == e.getErrorCode();
+ }
+
+ @Override
+ public boolean isTimeoutError(SQLException e) {
+ return super.isTimeoutError(e) || PROCESSING_CANCELED == e.getErrorCode();
}
}
diff --git a/src/main/java/com/amazon/carbonado/repo/jdbc/JDBCExceptionTransformer.java b/src/main/java/com/amazon/carbonado/repo/jdbc/JDBCExceptionTransformer.java
index f72c717..4780140 100644
--- a/src/main/java/com/amazon/carbonado/repo/jdbc/JDBCExceptionTransformer.java
+++ b/src/main/java/com/amazon/carbonado/repo/jdbc/JDBCExceptionTransformer.java
@@ -23,9 +23,11 @@ import java.sql.SQLException;
import com.amazon.carbonado.ConstraintException;
import com.amazon.carbonado.FetchDeadlockException;
import com.amazon.carbonado.FetchException;
+import com.amazon.carbonado.FetchTimeoutException;
import com.amazon.carbonado.PersistDeadlockException;
import com.amazon.carbonado.PersistDeniedException;
import com.amazon.carbonado.PersistException;
+import com.amazon.carbonado.PersistTimeoutException;
import com.amazon.carbonado.UniqueConstraintException;
import com.amazon.carbonado.spi.ExceptionTransformer;
@@ -59,6 +61,8 @@ class JDBCExceptionTransformer extends ExceptionTransformer {
*/
public static String SQLSTATE_DEADLOCK_WITH_ROLLBACK = "40001";
+ public static String SQLSTATE_PROCESSING_CANCELED = "57014";
+
/**
* Examines the SQLSTATE code of the given SQL exception and determines if
* it is a generic constaint violation.
@@ -103,6 +107,16 @@ class JDBCExceptionTransformer extends ExceptionTransformer {
return false;
}
+ public boolean isTimeoutError(SQLException e) {
+ if (e != null) {
+ String sqlstate = e.getSQLState();
+ if (sqlstate != null) {
+ return SQLSTATE_PROCESSING_CANCELED.equals(sqlstate);
+ }
+ }
+ return false;
+ }
+
JDBCExceptionTransformer() {
}
@@ -117,6 +131,9 @@ class JDBCExceptionTransformer extends ExceptionTransformer {
if (isDeadlockError(se)) {
return new FetchDeadlockException(e);
}
+ if (isTimeoutError(se)) {
+ return new FetchTimeoutException(e);
+ }
}
return null;
}
@@ -141,6 +158,9 @@ class JDBCExceptionTransformer extends ExceptionTransformer {
if (isDeadlockError(se)) {
return new PersistDeadlockException(e);
}
+ if (isTimeoutError(se)) {
+ return new PersistTimeoutException(e);
+ }
}
return null;
}
diff --git a/src/main/java/com/amazon/carbonado/repo/jdbc/JDBCStorage.java b/src/main/java/com/amazon/carbonado/repo/jdbc/JDBCStorage.java
index a49a150..bbe5589 100644
--- a/src/main/java/com/amazon/carbonado/repo/jdbc/JDBCStorage.java
+++ b/src/main/java/com/amazon/carbonado/repo/jdbc/JDBCStorage.java
@@ -27,6 +27,7 @@ import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.LogFactory;
@@ -34,6 +35,7 @@ import com.amazon.carbonado.Cursor;
import com.amazon.carbonado.FetchException;
import com.amazon.carbonado.IsolationLevel;
import com.amazon.carbonado.PersistException;
+import com.amazon.carbonado.Query;
import com.amazon.carbonado.Repository;
import com.amazon.carbonado.RepositoryException;
import com.amazon.carbonado.Storable;
@@ -42,6 +44,7 @@ import com.amazon.carbonado.SupportException;
import com.amazon.carbonado.Transaction;
import com.amazon.carbonado.Trigger;
import com.amazon.carbonado.capability.IndexInfo;
+import com.amazon.carbonado.cursor.ControllerCursor;
import com.amazon.carbonado.cursor.EmptyCursor;
import com.amazon.carbonado.cursor.LimitCursor;
import com.amazon.carbonado.filter.AndFilter;
@@ -329,6 +332,28 @@ class JDBCStorage<S extends Storable> extends StandardQueryFactory<S>
return new JDBCQuery(filter, values, ordering, hints);
}
+ static PreparedStatement prepareStatement(Connection con, String sql,
+ Query.Controller controller)
+ throws SQLException
+ {
+ PreparedStatement ps = con.prepareStatement(sql);
+
+ if (controller != null) {
+ long timeout = controller.getTimeout();
+ if (timeout >= 0) {
+ TimeUnit unit = controller.getTimeoutUnit();
+ if (unit != null) {
+ long seconds = unit.toSeconds(timeout);
+ int intSeconds = seconds <= 0 ? 1 :
+ (seconds <= Integer.MAX_VALUE ? ((int) seconds) : 0);
+ ps.setQueryTimeout(intSeconds);
+ }
+ }
+ }
+
+ return ps;
+ }
+
public S instantiate(ResultSet rs) throws SQLException {
return (S) mInstanceFactory.instantiate(this, rs, FIRST_RESULT_INDEX);
}
@@ -668,12 +693,21 @@ class JDBCStorage<S extends Storable> extends StandardQueryFactory<S>
}
}
+ @Override
public Cursor<S> fetch(FilterValues<S> values) throws FetchException {
+ return fetch(values, null);
+ }
+
+ @Override
+ public Cursor<S> fetch(FilterValues<S> values, Query.Controller controller)
+ throws FetchException
+ {
TransactionScope<JDBCTransaction> scope = mRepository.localTransactionScope();
boolean forUpdate = scope.isForUpdate();
Connection con = getConnection();
try {
- PreparedStatement ps = con.prepareStatement(prepareSelect(values, forUpdate));
+ PreparedStatement ps =
+ prepareStatement(con, prepareSelect(values, forUpdate), controller);
Integer fetchSize = mRepository.getFetchSize();
if (fetchSize != null) {
ps.setFetchSize(fetchSize);
@@ -681,7 +715,8 @@ class JDBCStorage<S extends Storable> extends StandardQueryFactory<S>
try {
setParameters(ps, values);
- return new JDBCCursor<S>(JDBCStorage.this, scope, con, ps);
+ return ControllerCursor.apply
+ (new JDBCCursor<S>(JDBCStorage.this, scope, con, ps), controller);
} catch (Exception e) {
// in case of exception, close statement
try {
@@ -706,6 +741,14 @@ class JDBCStorage<S extends Storable> extends StandardQueryFactory<S>
public Cursor<S> fetchSlice(FilterValues<S> values, long from, Long to)
throws FetchException
{
+ return fetchSlice(values, from, to, null);
+ }
+
+ @Override
+ public Cursor<S> fetchSlice(FilterValues<S> values, long from, Long to,
+ Query.Controller controller)
+ throws FetchException
+ {
if (to != null && (to - from) <= 0) {
return EmptyCursor.the();
}
@@ -716,17 +759,17 @@ class JDBCStorage<S extends Storable> extends StandardQueryFactory<S>
switch (option) {
case NOT_SUPPORTED: default:
- return super.fetchSlice(values, from, to);
+ return super.fetchSlice(values, from, to, controller);
case LIMIT_ONLY:
if (from > 0 || to == null) {
- return super.fetchSlice(values, from, to);
+ return super.fetchSlice(values, from, to, controller);
}
select = prepareSelect(values, false);
select = mSupportStrategy.buildSelectWithSlice(select, false, true);
break;
case OFFSET_ONLY:
if (from <= 0) {
- return super.fetchSlice(values, from, to);
+ return super.fetchSlice(values, from, to, controller);
}
select = prepareSelect(values, false);
select = mSupportStrategy.buildSelectWithSlice(select, true, false);
@@ -746,7 +789,7 @@ class JDBCStorage<S extends Storable> extends StandardQueryFactory<S>
Connection con = getConnection();
try {
- PreparedStatement ps = con.prepareStatement(select);
+ PreparedStatement ps = prepareStatement(con, select, controller);
Integer fetchSize = mRepository.getFetchSize();
if (fetchSize != null) {
ps.setFetchSize(fetchSize);
@@ -760,7 +803,10 @@ class JDBCStorage<S extends Storable> extends StandardQueryFactory<S>
switch (option) {
case OFFSET_ONLY:
ps.setLong(psOrdinal, from);
- Cursor<S> c = new JDBCCursor<S>(JDBCStorage.this, scope, con, ps);
+ Cursor<S> c =
+ ControllerCursor.apply
+ (new JDBCCursor<S>(JDBCStorage.this, scope, con, ps),
+ controller);
return new LimitCursor<S>(c, to - from);
case LIMIT_AND_OFFSET:
ps.setLong(psOrdinal, to - from);
@@ -782,7 +828,8 @@ class JDBCStorage<S extends Storable> extends StandardQueryFactory<S>
ps.setLong(psOrdinal, to);
}
- return new JDBCCursor<S>(JDBCStorage.this, scope, con, ps);
+ return ControllerCursor.apply
+ (new JDBCCursor<S>(JDBCStorage.this, scope, con, ps), controller);
} catch (Exception e) {
// in case of exception, close statement
try {
@@ -805,9 +852,17 @@ class JDBCStorage<S extends Storable> extends StandardQueryFactory<S>
@Override
public long count(FilterValues<S> values) throws FetchException {
+ return count(values, null);
+ }
+
+ @Override
+ public long count(FilterValues<S> values, Query.Controller controller)
+ throws FetchException
+ {
Connection con = getConnection();
try {
- PreparedStatement ps = con.prepareStatement(prepareCount(values));
+ PreparedStatement ps = prepareStatement(con, prepareCount(values), controller);
+
try {
setParameters(ps, values);
ResultSet rs = ps.executeQuery();
@@ -827,10 +882,12 @@ class JDBCStorage<S extends Storable> extends StandardQueryFactory<S>
}
}
+ @Override
public Filter<S> getFilter() {
return mFilter;
}
+ @Override
public OrderingList<S> getOrdering() {
return mOrdering;
}
@@ -846,6 +903,7 @@ class JDBCStorage<S extends Storable> extends StandardQueryFactory<S>
return true;
}
+ @Override
public boolean printPlan(Appendable app, int indentLevel, FilterValues<S> values)
throws IOException
{
@@ -862,7 +920,9 @@ class JDBCStorage<S extends Storable> extends StandardQueryFactory<S>
/**
* Delete operation is included in cursor factory for ease of implementation.
*/
- int executeDelete(FilterValues<S> filterValues) throws PersistException {
+ int executeDelete(FilterValues<S> filterValues, Query.Controller controller)
+ throws PersistException
+ {
Connection con;
try {
con = getConnection();
@@ -870,7 +930,8 @@ class JDBCStorage<S extends Storable> extends StandardQueryFactory<S>
throw e.toPersistException();
}
try {
- PreparedStatement ps = con.prepareStatement(prepareDelete(filterValues));
+ PreparedStatement ps =
+ prepareStatement(con, prepareDelete(filterValues), controller);
try {
setParameters(ps, filterValues);
return ps.executeUpdate();
@@ -976,13 +1037,18 @@ class JDBCStorage<S extends Storable> extends StandardQueryFactory<S>
@Override
public void deleteAll() throws PersistException {
+ deleteAll(null);
+ }
+
+ @Override
+ public void deleteAll(Controller controller) throws PersistException {
if (mTriggerManager.getDeleteTrigger() != null) {
// Super implementation loads one at time and calls
// delete. This allows delete trigger to be invoked on each.
- super.deleteAll();
+ super.deleteAll(controller);
} else {
try {
- ((Executor) executor()).executeDelete(getFilterValues());
+ ((Executor) executor()).executeDelete(getFilterValues(), controller);
} catch (RepositoryException e) {
throw e.toPersistException();
}
diff --git a/src/main/java/com/amazon/carbonado/repo/jdbc/OracleExceptionTransformer.java b/src/main/java/com/amazon/carbonado/repo/jdbc/OracleExceptionTransformer.java
index ad63941..ebaafd7 100644
--- a/src/main/java/com/amazon/carbonado/repo/jdbc/OracleExceptionTransformer.java
+++ b/src/main/java/com/amazon/carbonado/repo/jdbc/OracleExceptionTransformer.java
@@ -32,6 +32,8 @@ class OracleExceptionTransformer extends JDBCExceptionTransformer {
public static int DEADLOCK_DETECTED = 60;
+ public static int PROCESSING_CANCELED = 1013;
+
@Override
public boolean isUniqueConstraintError(SQLException e) {
if (isConstraintError(e)) {
@@ -63,4 +65,9 @@ class OracleExceptionTransformer extends JDBCExceptionTransformer {
}
return false;
}
+
+ @Override
+ public boolean isTimeoutError(SQLException e) {
+ return super.isTimeoutError(e) || e != null && PROCESSING_CANCELED == e.getErrorCode();
+ }
}
diff --git a/src/main/java/com/amazon/carbonado/repo/logging/LoggingQuery.java b/src/main/java/com/amazon/carbonado/repo/logging/LoggingQuery.java
index 08be6e3..5dc4d3b 100644
--- a/src/main/java/com/amazon/carbonado/repo/logging/LoggingQuery.java
+++ b/src/main/java/com/amazon/carbonado/repo/logging/LoggingQuery.java
@@ -165,6 +165,15 @@ class LoggingQuery<S extends Storable> implements Query<S> {
}
@Override
+ public Cursor<S> fetch(Controller controller) throws FetchException {
+ Log log = mStorage.mLog;
+ if (log.isEnabled()) {
+ log.write("Query.fetch(controller) on " + this + ", controller: " + controller);
+ }
+ return mQuery.fetch(controller);
+ }
+
+ @Override
public Cursor<S> fetchSlice(long from, Long to) throws FetchException {
Log log = mStorage.mLog;
if (log.isEnabled()) {
@@ -175,6 +184,16 @@ class LoggingQuery<S extends Storable> implements Query<S> {
}
@Override
+ public Cursor<S> fetchSlice(long from, Long to, Controller controller) throws FetchException {
+ Log log = mStorage.mLog;
+ if (log.isEnabled()) {
+ log.write("Query.fetchSlice(start, to, controller) on " + this +
+ ", from: " + from + ", to: " + to + ", controller: " + controller);
+ }
+ return mQuery.fetchSlice(from, to, controller);
+ }
+
+ @Override
public <T extends S> Cursor<S> fetchAfter(T start) throws FetchException {
Log log = mStorage.mLog;
if (log.isEnabled()) {
@@ -184,6 +203,18 @@ class LoggingQuery<S extends Storable> implements Query<S> {
}
@Override
+ public <T extends S> Cursor<S> fetchAfter(T start, Controller controller)
+ throws FetchException
+ {
+ Log log = mStorage.mLog;
+ if (log.isEnabled()) {
+ log.write("Query.fetchAfter(start, controller) on " + this + ", start: " + start
+ + ", controller: " + controller);
+ }
+ return mQuery.fetchAfter(start, controller);
+ }
+
+ @Override
public S loadOne() throws FetchException {
Log log = mStorage.mLog;
if (log.isEnabled()) {
@@ -193,6 +224,15 @@ class LoggingQuery<S extends Storable> implements Query<S> {
}
@Override
+ public S loadOne(Controller controller) throws FetchException {
+ Log log = mStorage.mLog;
+ if (log.isEnabled()) {
+ log.write("Query.loadOne() on " + this + ", controller: " + controller);
+ }
+ return mQuery.loadOne(controller);
+ }
+
+ @Override
public S tryLoadOne() throws FetchException {
Log log = mStorage.mLog;
if (log.isEnabled()) {
@@ -202,6 +242,15 @@ class LoggingQuery<S extends Storable> implements Query<S> {
}
@Override
+ public S tryLoadOne(Controller controller) throws FetchException {
+ Log log = mStorage.mLog;
+ if (log.isEnabled()) {
+ log.write("Query.tryLoadOne(controller) on " + this + ", controller: " + controller);
+ }
+ return mQuery.tryLoadOne(controller);
+ }
+
+ @Override
public void deleteOne() throws PersistException {
Log log = mStorage.mLog;
if (log.isEnabled()) {
@@ -211,6 +260,15 @@ class LoggingQuery<S extends Storable> implements Query<S> {
}
@Override
+ public void deleteOne(Controller controller) throws PersistException {
+ Log log = mStorage.mLog;
+ if (log.isEnabled()) {
+ log.write("Query.deleteOne(controller) on " + this + ", controller: " + controller);
+ }
+ mQuery.deleteOne(controller);
+ }
+
+ @Override
public boolean tryDeleteOne() throws PersistException {
Log log = mStorage.mLog;
if (log.isEnabled()) {
@@ -220,6 +278,15 @@ class LoggingQuery<S extends Storable> implements Query<S> {
}
@Override
+ public boolean tryDeleteOne(Controller controller) throws PersistException {
+ Log log = mStorage.mLog;
+ if (log.isEnabled()) {
+ log.write("Query.tryDeleteOne(controller) on " + this + ", controller: " + controller);
+ }
+ return mQuery.tryDeleteOne(controller);
+ }
+
+ @Override
public void deleteAll() throws PersistException {
Log log = mStorage.mLog;
if (log.isEnabled()) {
@@ -229,6 +296,15 @@ class LoggingQuery<S extends Storable> implements Query<S> {
}
@Override
+ public void deleteAll(Controller controller) throws PersistException {
+ Log log = mStorage.mLog;
+ if (log.isEnabled()) {
+ log.write("Query.deleteAll(controller) on " + this + ", controller: " + controller);
+ }
+ mQuery.deleteAll(controller);
+ }
+
+ @Override
public long count() throws FetchException {
Log log = mStorage.mLog;
if (log.isEnabled()) {
@@ -238,6 +314,15 @@ class LoggingQuery<S extends Storable> implements Query<S> {
}
@Override
+ public long count(Controller controller) throws FetchException {
+ Log log = mStorage.mLog;
+ if (log.isEnabled()) {
+ log.write("Query.count(controller) on " + this + ", controller: " + controller);
+ }
+ return mQuery.count(controller);
+ }
+
+ @Override
public boolean exists() throws FetchException {
Log log = mStorage.mLog;
if (log.isEnabled()) {
@@ -247,6 +332,15 @@ class LoggingQuery<S extends Storable> implements Query<S> {
}
@Override
+ public boolean exists(Controller controller) throws FetchException {
+ Log log = mStorage.mLog;
+ if (log.isEnabled()) {
+ log.write("Query.exists(controller) on " + this + ", controller: " + controller);
+ }
+ return mQuery.exists(controller);
+ }
+
+ @Override
public boolean printNative() {
return mQuery.printNative();
}
diff --git a/src/main/java/com/amazon/carbonado/repo/map/MapStorage.java b/src/main/java/com/amazon/carbonado/repo/map/MapStorage.java
index c84d8e3..c9c17b6 100644
--- a/src/main/java/com/amazon/carbonado/repo/map/MapStorage.java
+++ b/src/main/java/com/amazon/carbonado/repo/map/MapStorage.java
@@ -47,6 +47,7 @@ import com.amazon.carbonado.Trigger;
import com.amazon.carbonado.capability.IndexInfo;
import com.amazon.carbonado.cursor.ArraySortBuffer;
+import com.amazon.carbonado.cursor.ControllerCursor;
import com.amazon.carbonado.cursor.EmptyCursor;
import com.amazon.carbonado.cursor.FilteredCursor;
import com.amazon.carbonado.cursor.SingletonCursor;
@@ -540,6 +541,10 @@ class MapStorage<S extends Storable>
}
public long countAll() throws FetchException {
+ return countAll(null);
+ }
+
+ public long countAll(Query.Controller controller) throws FetchException {
try {
TransactionScope<MapTransaction> scope = mRepo.localTransactionScope();
MapTransaction txn = scope.getTxn();
@@ -578,9 +583,20 @@ class MapStorage<S extends Storable>
}
}
+ public Cursor<S> fetchAll(Query.Controller controller) throws FetchException {
+ return ControllerCursor.apply(fetchAll(), controller);
+ }
+
public Cursor<S> fetchOne(StorableIndex<S> index, Object[] identityValues)
throws FetchException
{
+ return fetchOne(index, identityValues, null);
+ }
+
+ public Cursor<S> fetchOne(StorableIndex<S> index, Object[] identityValues,
+ Query.Controller controller)
+ throws FetchException
+ {
try {
S key = prepare();
for (int i=0; i<identityValues.length; i++) {
@@ -643,6 +659,12 @@ class MapStorage<S extends Storable>
return null;
}
+ public Cursor<S> fetchFromIndexEntryQuery(StorableIndex<S> index, Query<?> indexEntryQuery,
+ Query.Controller controller)
+ {
+ return null;
+ }
+
public Cursor<S> fetchSubset(StorableIndex<S> index,
Object[] identityValues,
BoundaryType rangeStartBoundary,
@@ -770,6 +792,28 @@ class MapStorage<S extends Storable>
return cursor;
}
+ public Cursor<S> fetchSubset(StorableIndex<S> index,
+ Object[] identityValues,
+ BoundaryType rangeStartBoundary,
+ Object rangeStartValue,
+ BoundaryType rangeEndBoundary,
+ Object rangeEndValue,
+ boolean reverseRange,
+ boolean reverseOrder,
+ Query.Controller controller)
+ throws FetchException
+ {
+ return ControllerCursor.apply(fetchSubset(index,
+ identityValues,
+ rangeStartBoundary,
+ rangeStartValue,
+ rangeEndBoundary,
+ rangeEndValue,
+ reverseRange,
+ reverseOrder),
+ controller);
+ }
+
private List<OrderedProperty<S>> createPkPropList() {
return new ArrayList<OrderedProperty<S>>(mInfo.getPrimaryKey().getProperties());
}
@@ -806,6 +850,11 @@ class MapStorage<S extends Storable>
return new ArraySortBuffer<S>();
}
+ public SortBuffer<S> createSortBuffer(Query.Controller controller) {
+ // ArraySortBuffer doesn't support controller.
+ return new ArraySortBuffer<S>();
+ }
+
public static interface InstanceFactory {
Storable instantiate(DelegateSupport support);
}
diff --git a/src/main/java/com/amazon/carbonado/repo/sleepycat/BDBStorage.java b/src/main/java/com/amazon/carbonado/repo/sleepycat/BDBStorage.java
index f4d8f24..c0f882e 100644
--- a/src/main/java/com/amazon/carbonado/repo/sleepycat/BDBStorage.java
+++ b/src/main/java/com/amazon/carbonado/repo/sleepycat/BDBStorage.java
@@ -45,6 +45,7 @@ import com.amazon.carbonado.UniqueConstraintException;
import com.amazon.carbonado.capability.IndexInfo;
+import com.amazon.carbonado.cursor.ControllerCursor;
import com.amazon.carbonado.cursor.EmptyCursor;
import com.amazon.carbonado.cursor.MergeSortBuffer;
import com.amazon.carbonado.cursor.SingletonCursor;
@@ -263,22 +264,45 @@ abstract class BDBStorage<Txn, S extends Storable> implements Storage<S>, Storag
return new MergeSortBuffer<S>();
}
+ public SortBuffer<S> createSortBuffer(Query.Controller controller) {
+ return new MergeSortBuffer<S>(controller);
+ }
+
public long countAll() throws FetchException {
// Return -1 to indicate default algorithm should be used.
return -1;
}
+ public long countAll(Query.Controller controller) throws FetchException {
+ // Return -1 to indicate default algorithm should be used.
+ return -1;
+ }
+
public Cursor<S> fetchAll() throws FetchException {
+ return fetchAll(null);
+ }
+
+ public Cursor<S> fetchAll(Query.Controller controller) throws FetchException {
return fetchSubset(null, null,
BoundaryType.OPEN, null,
BoundaryType.OPEN, null,
- false, false);
+ false, false,
+ controller);
}
public Cursor<S> fetchOne(StorableIndex<S> index,
Object[] identityValues)
throws FetchException
{
+ return fetchOne(index, identityValues, null);
+ }
+
+ public Cursor<S> fetchOne(StorableIndex<S> index,
+ Object[] identityValues,
+ Query.Controller controller)
+ throws FetchException
+ {
+ // Note: Controller is never called.
byte[] key = mStorableCodec.encodePrimaryKey(identityValues);
byte[] value = mRawSupport.tryLoad(null, key);
if (value == null) {
@@ -296,6 +320,13 @@ abstract class BDBStorage<Txn, S extends Storable> implements Storage<S>, Storag
throw new UnsupportedOperationException();
}
+ public Cursor<S> fetchFromIndexEntryQuery(StorableIndex<S> index, Query<?> indexEntryQuery,
+ Query.Controller controller)
+ {
+ // This method should never be called since null was returned by indexEntryQuery.
+ throw new UnsupportedOperationException();
+ }
+
public Cursor<S> fetchSubset(StorableIndex<S> index,
Object[] identityValues,
BoundaryType rangeStartBoundary,
@@ -378,6 +409,7 @@ abstract class BDBStorage<Txn, S extends Storable> implements Storage<S>, Storag
getPrimaryDatabase());
cursor.open();
+
return cursor;
} catch (Exception e) {
throw toFetchException(e);
@@ -387,6 +419,28 @@ abstract class BDBStorage<Txn, S extends Storable> implements Storage<S>, Storag
}
}
+ public Cursor<S> fetchSubset(StorableIndex<S> index,
+ Object[] identityValues,
+ BoundaryType rangeStartBoundary,
+ Object rangeStartValue,
+ BoundaryType rangeEndBoundary,
+ Object rangeEndValue,
+ boolean reverseRange,
+ boolean reverseOrder,
+ Query.Controller controller)
+ throws FetchException
+ {
+ return ControllerCursor.apply(fetchSubset(index,
+ identityValues,
+ rangeStartBoundary,
+ rangeStartValue,
+ rangeEndBoundary,
+ rangeEndValue,
+ reverseRange,
+ reverseOrder),
+ controller);
+ }
+
private byte[] createBound(Object[] exactValues, byte[] exactKey, Object rangeValue,
StorableCodec<S> codec) {
Object[] values = {rangeValue};