From 5a2aeb3ab59f286a6d2a5d8b7d62f4b17132b2b7 Mon Sep 17 00:00:00 2001 From: "Brian S. O'Neill" Date: Wed, 30 Aug 2006 02:24:36 +0000 Subject: Add core repository implementations --- .../repo/replicated/ReplicatedRepository.java | 583 +++++++++++++++++++++ .../replicated/ReplicatedRepositoryBuilder.java | 161 ++++++ .../repo/replicated/ReplicatedStorage.java | 121 +++++ .../repo/replicated/ReplicationTrigger.java | 398 ++++++++++++++ .../carbonado/repo/replicated/package-info.java | 27 + 5 files changed, 1290 insertions(+) create mode 100644 src/main/java/com/amazon/carbonado/repo/replicated/ReplicatedRepository.java create mode 100644 src/main/java/com/amazon/carbonado/repo/replicated/ReplicatedRepositoryBuilder.java create mode 100644 src/main/java/com/amazon/carbonado/repo/replicated/ReplicatedStorage.java create mode 100644 src/main/java/com/amazon/carbonado/repo/replicated/ReplicationTrigger.java create mode 100644 src/main/java/com/amazon/carbonado/repo/replicated/package-info.java (limited to 'src/main/java/com/amazon/carbonado/repo/replicated') diff --git a/src/main/java/com/amazon/carbonado/repo/replicated/ReplicatedRepository.java b/src/main/java/com/amazon/carbonado/repo/replicated/ReplicatedRepository.java new file mode 100644 index 0000000..5003053 --- /dev/null +++ b/src/main/java/com/amazon/carbonado/repo/replicated/ReplicatedRepository.java @@ -0,0 +1,583 @@ +/* + * Copyright 2006 Amazon Technologies, Inc. or its affiliates. + * Amazon, Amazon.com and Carbonado are trademarks or registered trademarks + * of Amazon Technologies, Inc. or its affiliates. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.amazon.carbonado.repo.replicated; + +import java.util.Comparator; +import java.util.IdentityHashMap; +import java.util.LinkedHashSet; +import java.util.Map; +import java.util.Set; + +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import org.cojen.util.BeanComparator; + +import com.amazon.carbonado.Cursor; +import com.amazon.carbonado.FetchInterruptedException; +import com.amazon.carbonado.IsolationLevel; +import com.amazon.carbonado.MalformedTypeException; +import com.amazon.carbonado.Query; +import com.amazon.carbonado.Repository; +import com.amazon.carbonado.RepositoryException; +import com.amazon.carbonado.Storable; +import com.amazon.carbonado.Storage; +import com.amazon.carbonado.SupportException; +import com.amazon.carbonado.Transaction; + +import com.amazon.carbonado.capability.Capability; +import com.amazon.carbonado.capability.IndexInfo; +import com.amazon.carbonado.capability.IndexInfoCapability; +import com.amazon.carbonado.capability.ResyncCapability; +import com.amazon.carbonado.capability.ShutdownCapability; +import com.amazon.carbonado.capability.StorableInfoCapability; + +import com.amazon.carbonado.info.Direction; +import com.amazon.carbonado.info.StorableIntrospector; + +import com.amazon.carbonado.spi.TransactionPair; + +import com.amazon.carbonado.util.Throttle; + +/** + * A ReplicatedRepository binds two repositories together. One will be used + * for reading (the replica), and the other will be used for writing; changes + * to the master repository will be copied to the replica. + * + * @author Don Schneider + */ +class ReplicatedRepository + implements Repository, + ResyncCapability, + ShutdownCapability, + StorableInfoCapability +{ + // Constants used by resync method. + private static final int RESYNC_QUEUE_SIZE = 1000; + private static final long RESYNC_QUEUE_TIMEOUT_MS = 30000; + + /** + * Utility method to select the natural ordering of a storage, by looking + * for a clustered index on the primary key. Returns null if no clustered + * index was found. + * + * TODO: Try to incorporate this into standard storage interface somehow. + */ + private static String[] selectNaturalOrder(Repository repo, Class type) + throws RepositoryException + { + IndexInfoCapability capability = repo.getCapability(IndexInfoCapability.class); + if (capability == null) { + return null; + } + IndexInfo info = null; + for (IndexInfo candidate : capability.getIndexInfo(type)) { + if (candidate.isClustered()) { + info = candidate; + break; + } + } + if (info == null) { + return null; + } + + // Verify index is part of primary key. + Set pkSet = StorableIntrospector.examine(type).getPrimaryKeyProperties().keySet(); + + String[] propNames = info.getPropertyNames(); + for (String prop : propNames) { + if (!pkSet.contains(prop)) { + return null; + } + } + + String[] orderBy = new String[pkSet.size()]; + + Direction[] directions = info.getPropertyDirections(); + + // Clone to remove elements. + pkSet = new LinkedHashSet(pkSet); + + int i; + for (i=0; i 0) { + for (String prop : pkSet) { + orderBy[i++] = prop; + } + } + + return orderBy; + } + + private String mName; + private Repository mReplicaRepository; + private Repository mMasterRepository; + + // Map of storages by storable class + private final Map, ReplicatedStorage> mStorages; + + ReplicatedRepository(String aName, + Repository aReplicaRepository, + Repository aMasterRepository) { + mName = aName; + mReplicaRepository = aReplicaRepository; + mMasterRepository = aMasterRepository; + + mStorages = new IdentityHashMap, ReplicatedStorage>(); + } + + public String getName() { + return mName; + } + + Repository getReplicaRepository() { + return mReplicaRepository; + } + + Repository getMasterRepository() { + return mMasterRepository; + } + + @SuppressWarnings("unchecked") + public Storage storageFor(Class type) + throws MalformedTypeException, SupportException, RepositoryException + { + synchronized (mStorages) { + ReplicatedStorage storage = mStorages.get(type); + if (storage == null) { + // Examine and throw exception if there is a problem. + StorableIntrospector.examine(type); + + storage = createStorage(type); + mStorages.put(type, storage); + } + return storage; + } + } + + private ReplicatedStorage createStorage(Class type) + throws SupportException, RepositoryException + { + return new ReplicatedStorage(this, type); + } + + public Transaction enterTransaction() { + return new TransactionPair(mMasterRepository.enterTransaction(), + mReplicaRepository.enterTransaction()); + } + + public Transaction enterTransaction(IsolationLevel level) { + return new TransactionPair(mMasterRepository.enterTransaction(level), + mReplicaRepository.enterTransaction(level)); + } + + public Transaction enterTopTransaction(IsolationLevel level) { + return new TransactionPair(mMasterRepository.enterTopTransaction(level), + mReplicaRepository.enterTopTransaction(level)); + } + + public IsolationLevel getTransactionIsolationLevel() { + IsolationLevel replicaLevel = mReplicaRepository.getTransactionIsolationLevel(); + if (replicaLevel == null) { + return null; + } + IsolationLevel masterLevel = mMasterRepository.getTransactionIsolationLevel(); + if (masterLevel == null) { + return null; + } + return replicaLevel.lowestCommon(masterLevel); + } + + @SuppressWarnings("unchecked") + public C getCapability(Class capabilityType) { + if (capabilityType.isInstance(this)) { + if (ShutdownCapability.class.isAssignableFrom(capabilityType)) { + if (mReplicaRepository.getCapability(capabilityType) == null && + mMasterRepository.getCapability(capabilityType) == null) { + + return null; + } + } + return (C) this; + } + + C cap = mMasterRepository.getCapability(capabilityType); + if (cap == null) { + cap = mReplicaRepository.getCapability(capabilityType); + } + + return cap; + } + + public void close() { + mReplicaRepository.close(); + mMasterRepository.close(); + } + + public String[] getUserStorableTypeNames() throws RepositoryException { + StorableInfoCapability replicaCap = + mReplicaRepository.getCapability(StorableInfoCapability.class); + StorableInfoCapability masterCap = + mMasterRepository.getCapability(StorableInfoCapability.class); + + if (replicaCap == null) { + if (masterCap == null) { + return new String[0]; + } + return masterCap.getUserStorableTypeNames(); + } else if (masterCap == null) { + return replicaCap.getUserStorableTypeNames(); + } + + // Merge the two sets together. + Set names = new LinkedHashSet(); + for (String name : replicaCap.getUserStorableTypeNames()) { + names.add(name); + } + for (String name : masterCap.getUserStorableTypeNames()) { + names.add(name); + } + + return names.toArray(new String[names.size()]); + } + + public boolean isSupported(Class type) { + StorableInfoCapability replicaCap = + mReplicaRepository.getCapability(StorableInfoCapability.class); + StorableInfoCapability masterCap = + mMasterRepository.getCapability(StorableInfoCapability.class); + + return (masterCap == null || masterCap.isSupported(type)) + && (replicaCap == null || replicaCap.isSupported(type)); + } + + public boolean isPropertySupported(Class type, String name) { + StorableInfoCapability replicaCap = + mReplicaRepository.getCapability(StorableInfoCapability.class); + StorableInfoCapability masterCap = + mMasterRepository.getCapability(StorableInfoCapability.class); + + return (masterCap == null || masterCap.isPropertySupported(type, name)) + && (replicaCap == null || replicaCap.isPropertySupported(type, name)); + } + + public boolean isAutoShutdownEnabled() { + ShutdownCapability cap = mReplicaRepository.getCapability(ShutdownCapability.class); + if (cap != null && cap.isAutoShutdownEnabled()) { + return true; + } + cap = mMasterRepository.getCapability(ShutdownCapability.class); + if (cap != null && cap.isAutoShutdownEnabled()) { + return true; + } + return false; + } + + public void setAutoShutdownEnabled(boolean enabled) { + ShutdownCapability cap = mReplicaRepository.getCapability(ShutdownCapability.class); + if (cap != null) { + cap.setAutoShutdownEnabled(enabled); + } + cap = mMasterRepository.getCapability(ShutdownCapability.class); + if (cap != null) { + cap.setAutoShutdownEnabled(enabled); + } + } + + public void shutdown() { + ShutdownCapability cap = mReplicaRepository.getCapability(ShutdownCapability.class); + if (cap != null) { + cap.shutdown(); + } else { + mReplicaRepository.close(); + } + cap = mMasterRepository.getCapability(ShutdownCapability.class); + if (cap != null) { + cap.shutdown(); + } else { + mMasterRepository.close(); + } + } + + /** + * Repairs replicated storables by synchronizing the replica repository + * against the master repository. + * + * @param type type of storable to re-sync + * @param desiredSpeed throttling parameter - 1.0 = full speed, 0.5 = half + * speed, 0.1 = one-tenth speed, etc + * @param filter optional query filter to limit which objects get re-sync'ed + * @param filterValues filter values for optional filter + */ + public void resync(Class type, + double desiredSpeed, + String filter, + Object... filterValues) + throws RepositoryException + { + Storage replicaStorage, masterStorage; + replicaStorage = mReplicaRepository.storageFor(type); + masterStorage = mMasterRepository.storageFor(type); + + Query replicaQuery, masterQuery; + if (filter == null) { + replicaQuery = replicaStorage.query(); + masterQuery = masterStorage.query(); + } else { + replicaQuery = replicaStorage.query(filter).withValues(filterValues); + masterQuery = masterStorage.query(filter).withValues(filterValues); + } + + // Order both queries the same so that they can be run in parallel. + String[] orderBy = selectNaturalOrder(mMasterRepository, type); + if (orderBy == null) { + orderBy = selectNaturalOrder(mReplicaRepository, type); + if (orderBy == null) { + Set pkSet = + StorableIntrospector.examine(type).getPrimaryKeyProperties().keySet(); + orderBy = pkSet.toArray(new String[0]); + } + } + + BeanComparator bc = BeanComparator.forClass(type); + for (String order : orderBy) { + bc = bc.orderBy(order); + bc = bc.caseSensitive(); + } + + replicaQuery = replicaQuery.orderBy(orderBy); + masterQuery = masterQuery.orderBy(orderBy); + + Throttle throttle; + if (desiredSpeed >= 1.0) { + throttle = null; + } else { + if (desiredSpeed < 0.0) { + desiredSpeed = 0.0; + } + // 50 samples + throttle = new Throttle(50); + } + + Cursor replicaCursor = replicaQuery.fetch(); + try { + Cursor masterCursor = masterQuery.fetch(); + try { + resync(((ReplicatedStorage) storageFor(type)).getTrigger(), + replicaCursor, + masterCursor, + throttle, desiredSpeed, + bc); + } finally { + masterCursor.close(); + } + } finally { + replicaCursor.close(); + } + } + + @SuppressWarnings("unchecked") + private void resync(ReplicationTrigger trigger, + Cursor replicaCursor, + Cursor masterCursor, + Throttle throttle, double desiredSpeed, + Comparator comparator) + throws RepositoryException + { + // Enqueue resyncs to a separate thread since open cursors hold locks + // on currently referenced entries. + BlockingQueue resyncQueue = + new ArrayBlockingQueue(RESYNC_QUEUE_SIZE, true); + ResyncThread resyncThread = new ResyncThread(resyncQueue); + resyncThread.start(); + + try { + S replicaEntry = null; + S masterEntry = null; + + while (true) { + if (throttle != null) { + try { + // 100 millisecond clock precision + throttle.throttle(desiredSpeed, 100); + } catch (InterruptedException e) { + throw new FetchInterruptedException(e); + } + } + + if (replicaEntry == null && replicaCursor.hasNext()) { + replicaEntry = replicaCursor.next(); + } + + if (masterEntry == null && masterCursor.hasNext()) { + masterEntry = masterCursor.next(); + } + + // Comparator should treat null as high. + int compare = comparator.compare(replicaEntry, masterEntry); + + if (compare < 0) { + // Bogus exists only in replica so delete it. + resyncThread.addResyncTask(trigger, replicaEntry, null); + // Allow replica to advance. + replicaEntry = null; + } else if (compare > 0) { + // Replica cursor is missing an entry so copy it. + resyncThread.addResyncTask(trigger, null, masterEntry); + // Allow master to advance. + masterEntry = null; + } else { + if (replicaEntry == null && masterEntry == null) { + // Both cursors exhausted -- resync is complete. + break; + } + + if (!replicaEntry.equalProperties(masterEntry)) { + // Replica is stale. + resyncThread.addResyncTask(trigger, replicaEntry, masterEntry); + } + + // Entries are synchronized so allow both cursors to advance. + replicaEntry = null; + masterEntry = null; + } + } + } finally { + resyncThread.waitUntilDone(); + } + } + + // TODO: Use TaskQueueThread + + private static class ResyncThread extends Thread { + private static final int + STATE_RUNNING = 0, + STATE_SHOULD_STOP = 1, + STATE_STOPPED = 2; + + private static final Runnable STOP_TASK = new Runnable() {public void run() {}}; + + private final BlockingQueue mQueue; + + private int mState = STATE_RUNNING; + + ResyncThread(BlockingQueue queue) { + super("ReplicatedRepository Resync"); + mQueue = queue; + } + + public void run() { + try { + while (true) { + boolean isStopping; + synchronized (this) { + isStopping = mState != STATE_RUNNING; + } + + Runnable task; + if (isStopping) { + // Poll the queue so this thread doesn't block when it + // should be stopping. + task = mQueue.poll(); + } else { + try { + task = mQueue.take(); + } catch (InterruptedException e) { + break; + } + } + + if (task == null || task == STOP_TASK) { + // Marker to indicate we should stop. + break; + } + + task.run(); + } + } finally { + synchronized (this) { + mState = STATE_STOPPED; + notifyAll(); + } + } + } + + void addResyncTask(final ReplicationTrigger trigger, + final S replicaEntry, + final S masterEntry) + throws RepositoryException + { + if (replicaEntry == null && masterEntry == null) { + // If both are null, then there's nothing to do, is there? + // Note: Caller shouldn't have passed double nulls to + // addResyncTask in the first place. + return; + } + + Runnable task = new Runnable() { + public void run() { + try { + trigger.resyncEntries(replicaEntry, masterEntry); + } catch (Exception e) { + LogFactory.getLog(ReplicatedRepository.class).error(null, e); + } + } + }; + + addResyncTask(task); + } + + + void addResyncTask(Runnable task) + throws RepositoryException + { + try { + if (!mQueue.offer(task, RESYNC_QUEUE_TIMEOUT_MS, TimeUnit.MILLISECONDS)) { + throw new RepositoryException("Unable to enqueue resync task"); + } + } catch (InterruptedException e) { + throw new RepositoryException(e); + } + } + + synchronized void waitUntilDone() throws RepositoryException { + if (mState == STATE_STOPPED) { + return; + } + mState = STATE_SHOULD_STOP; + try { + // Inject stop task into the queue so it knows to stop. + mQueue.offer(STOP_TASK, RESYNC_QUEUE_TIMEOUT_MS, TimeUnit.MILLISECONDS); + while (mState != STATE_STOPPED) { + wait(); + } + } catch (InterruptedException e) { + throw new RepositoryException(e); + } + } + } +} diff --git a/src/main/java/com/amazon/carbonado/repo/replicated/ReplicatedRepositoryBuilder.java b/src/main/java/com/amazon/carbonado/repo/replicated/ReplicatedRepositoryBuilder.java new file mode 100644 index 0000000..8d5cc75 --- /dev/null +++ b/src/main/java/com/amazon/carbonado/repo/replicated/ReplicatedRepositoryBuilder.java @@ -0,0 +1,161 @@ +/* + * Copyright 2006 Amazon Technologies, Inc. or its affiliates. + * Amazon, Amazon.com and Carbonado are trademarks or registered trademarks + * of Amazon Technologies, Inc. or its affiliates. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.amazon.carbonado.repo.replicated; + +import java.util.Collection; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import com.amazon.carbonado.ConfigurationException; +import com.amazon.carbonado.Repository; +import com.amazon.carbonado.RepositoryBuilder; +import com.amazon.carbonado.RepositoryException; + +import com.amazon.carbonado.spi.AbstractRepositoryBuilder; +import com.amazon.carbonado.spi.BelatedRepositoryCreator; + +/** + * Repository builder for the replicated repository. + *

+ * The following extra capabilities are supported: + *

    + *
  • {@link com.amazon.carbonado.capability.ResyncCapability ResyncCapability} + *
+ * + * @author Don Schneider + * @author Brian S O'Neill + */ +public class ReplicatedRepositoryBuilder extends AbstractRepositoryBuilder { + static final int DEFAULT_MASTER_TIMEOUT_MILLIS = 15000; + static final int DEFAULT_RETRY_MILLIS = 30000; + + private String mName; + private boolean mIsMaster = true; + private RepositoryBuilder mReplicaRepositoryBuilder; + private RepositoryBuilder mMasterRepositoryBuilder; + + public ReplicatedRepositoryBuilder() { + } + + public Repository build(RepositoryReference rootRef) throws RepositoryException { + assertReady(); + + Repository replica, master; + + { + boolean originalOption = mReplicaRepositoryBuilder.isMaster(); + try { + mReplicaRepositoryBuilder.setMaster(false); + replica = mReplicaRepositoryBuilder.build(rootRef); + } finally { + mReplicaRepositoryBuilder.setMaster(originalOption); + } + } + + { + // Create master using BelatedRepositoryCreator such that we can + // start up and read from replica even if master is down. + + final boolean originalOption = mMasterRepositoryBuilder.isMaster(); + mMasterRepositoryBuilder.setMaster(mIsMaster); + + Log log = LogFactory.getLog(ReplicatedRepositoryBuilder.class); + BelatedRepositoryCreator creator = new BelatedRepositoryCreator + (log, mMasterRepositoryBuilder, rootRef, DEFAULT_RETRY_MILLIS) { + + protected void createdNotification(Repository repo) { + // Don't need builder any more so restore it. + mMasterRepositoryBuilder.setMaster(originalOption); + } + }; + + master = creator.get(DEFAULT_MASTER_TIMEOUT_MILLIS); + } + + Repository repo = new ReplicatedRepository(getName(), replica, master); + rootRef.set(repo); + return repo; + } + + public String getName() { + String name = mName; + if (name == null) { + if (mReplicaRepositoryBuilder != null && mReplicaRepositoryBuilder.getName() != null) { + name = mReplicaRepositoryBuilder.getName(); + } else if (mMasterRepositoryBuilder != null) { + name = mMasterRepositoryBuilder.getName(); + } + } + return name; + } + + public void setName(String name) { + mName = name; + } + + public boolean isMaster() { + return mIsMaster; + } + + public void setMaster(boolean b) { + mIsMaster = b; + } + + /** + * @return "replica" respository to replicate to. + */ + public RepositoryBuilder getReplicaRepositoryBuilder() { + return mReplicaRepositoryBuilder; + } + + /** + * Set "replica" respository to replicate to, which is required. This builder + * automatically sets the master option of the given repository builder to + * false. + */ + public void setReplicaRepositoryBuilder(RepositoryBuilder replicaRepositoryBuilder) { + mReplicaRepositoryBuilder = replicaRepositoryBuilder; + } + + /** + * @return "master" respository to replicate from. + */ + public RepositoryBuilder getMasterRepositoryBuilder() { + return mMasterRepositoryBuilder; + } + + /** + * Set "master" respository to replicate from, which is required. This + * builder automatically sets the master option of the given repository to + * true. + */ + public void setMasterRepositoryBuilder(RepositoryBuilder masterRepositoryBuilder) { + mMasterRepositoryBuilder = masterRepositoryBuilder; + } + + public void errorCheck(Collection messages) throws ConfigurationException { + super.errorCheck(messages); + if (null == getReplicaRepositoryBuilder()) { + messages.add("replicaRepositoryBuilder missing"); + } + if (null == getMasterRepositoryBuilder()) { + messages.add("masterRepositoryBuilder missing"); + } + } +} diff --git a/src/main/java/com/amazon/carbonado/repo/replicated/ReplicatedStorage.java b/src/main/java/com/amazon/carbonado/repo/replicated/ReplicatedStorage.java new file mode 100644 index 0000000..cf30e5d --- /dev/null +++ b/src/main/java/com/amazon/carbonado/repo/replicated/ReplicatedStorage.java @@ -0,0 +1,121 @@ +/* + * Copyright 2006 Amazon Technologies, Inc. or its affiliates. + * Amazon, Amazon.com and Carbonado are trademarks or registered trademarks + * of Amazon Technologies, Inc. or its affiliates. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.amazon.carbonado.repo.replicated; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import com.amazon.carbonado.FetchException; +import com.amazon.carbonado.OptimisticLockException; +import com.amazon.carbonado.PersistException; +import com.amazon.carbonado.PersistNoneException; +import com.amazon.carbonado.Query; +import com.amazon.carbonado.Storage; +import com.amazon.carbonado.Storable; +import com.amazon.carbonado.Repository; +import com.amazon.carbonado.SupportException; +import com.amazon.carbonado.RepositoryException; +import com.amazon.carbonado.Transaction; +import com.amazon.carbonado.Trigger; +import com.amazon.carbonado.UniqueConstraintException; +import com.amazon.carbonado.UnsupportedTypeException; + +import com.amazon.carbonado.filter.Filter; + +import com.amazon.carbonado.spi.BelatedStorageCreator; +import com.amazon.carbonado.spi.RepairExecutor; + +/** + * ReplicatedStorage + * + * @author Don Schneider + * @author Brian S O'Neill + */ +class ReplicatedStorage implements Storage { + final Storage mReplicaStorage; + final ReplicationTrigger mTrigger; + + public ReplicatedStorage(ReplicatedRepository aRepository, Class aType) + throws SupportException, RepositoryException + { + mReplicaStorage = aRepository.getReplicaRepository().storageFor(aType); + + // Create master using BelatedStorageCreator such that we can start up + // and read from replica even if master is down. + + Log log = LogFactory.getLog(getClass()); + BelatedStorageCreator creator = new BelatedStorageCreator + (log, aRepository.getMasterRepository(), aType, + ReplicatedRepositoryBuilder.DEFAULT_RETRY_MILLIS); + + Storage masterStorage; + try { + masterStorage = creator.get(ReplicatedRepositoryBuilder.DEFAULT_MASTER_TIMEOUT_MILLIS); + } catch (UnsupportedTypeException e) { + // Master doesn't support Storable, but it is marked as Independent. + masterStorage = null; + } + + mTrigger = new ReplicationTrigger(aRepository, mReplicaStorage, masterStorage); + addTrigger(mTrigger); + } + + /** + * For testing only. + */ + ReplicatedStorage(Repository aRepository, + Storage replicaStorage, + Storage masterStorage) + { + mReplicaStorage = replicaStorage; + mTrigger = new ReplicationTrigger(aRepository, mReplicaStorage, masterStorage); + addTrigger(mTrigger); + } + + public Class getStorableType() { + return mReplicaStorage.getStorableType(); + } + + public S prepare() { + return mReplicaStorage.prepare(); + } + + public Query query() throws FetchException { + return mReplicaStorage.query(); + } + + public Query query(String filter) throws FetchException { + return mReplicaStorage.query(filter); + } + + public Query query(Filter filter) throws FetchException { + return mReplicaStorage.query(filter); + } + + public boolean addTrigger(Trigger trigger) { + return mReplicaStorage.addTrigger(trigger); + } + + public boolean removeTrigger(Trigger trigger) { + return mReplicaStorage.removeTrigger(trigger); + } + + ReplicationTrigger getTrigger() { + return mTrigger; + } +} diff --git a/src/main/java/com/amazon/carbonado/repo/replicated/ReplicationTrigger.java b/src/main/java/com/amazon/carbonado/repo/replicated/ReplicationTrigger.java new file mode 100644 index 0000000..19915b7 --- /dev/null +++ b/src/main/java/com/amazon/carbonado/repo/replicated/ReplicationTrigger.java @@ -0,0 +1,398 @@ +/* + * Copyright 2006 Amazon Technologies, Inc. or its affiliates. + * Amazon, Amazon.com and Carbonado are trademarks or registered trademarks + * of Amazon Technologies, Inc. or its affiliates. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.amazon.carbonado.repo.replicated; + +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import com.amazon.carbonado.FetchDeadlockException; +import com.amazon.carbonado.FetchException; +import com.amazon.carbonado.FetchNoneException; +import com.amazon.carbonado.IsolationLevel; +import com.amazon.carbonado.OptimisticLockException; +import com.amazon.carbonado.PersistDeadlockException; +import com.amazon.carbonado.PersistException; +import com.amazon.carbonado.PersistNoneException; +import com.amazon.carbonado.Repository; +import com.amazon.carbonado.Storable; +import com.amazon.carbonado.Storage; +import com.amazon.carbonado.Transaction; +import com.amazon.carbonado.Trigger; +import com.amazon.carbonado.UniqueConstraintException; + +import com.amazon.carbonado.spi.RepairExecutor; + +/** + * All inserts/updates/deletes are first committed to the master storage, then + * duplicated and committed to the replica. + * + * @author Don Schneider + * @author Brian S O'Neill + */ +class ReplicationTrigger extends Trigger { + private final Repository mRepository; + private final Storage mReplicaStorage; + private final Storage mMasterStorage; + + private final ThreadLocal mDisabled = new ThreadLocal(); + + ReplicationTrigger(Repository repository, + Storage replicaStorage, + Storage masterStorage) + { + mRepository = repository; + mReplicaStorage = replicaStorage; + mMasterStorage = masterStorage; + } + + @Override + public Object beforeInsert(S replica) throws PersistException { + return beforeInsert(replica, false); + } + + @Override + public Object beforeTryInsert(S replica) throws PersistException { + return beforeInsert(replica, true); + } + + private Object beforeInsert(S replica, boolean forTry) throws PersistException { + if (isReplicationDisabled()) { + return null; + } + + final S master = mMasterStorage.prepare(); + replica.copyAllProperties(master); + + try { + if (forTry) { + if (!master.tryInsert()) { + throw abortTry(); + } + } else { + master.insert(); + } + } catch (UniqueConstraintException e) { + // This may be caused by an inconsistency between replica and + // master. Here's one scenerio: user called tryLoad and saw the + // entry does not exist. So instead of calling update, he/she calls + // insert. If the master entry exists, then there is an + // inconsistency. The code below checks for this specific kind of + // error and repairs it by inserting a record in the replica. + + // Here's another scenerio: Unique constraint was caused by an + // inconsistency with the values of the alternate keys. User + // expected alternate keys to have unique values, as indicated by + // replica. + + repair(replica); + + // Throw exception since we don't know what the user's intentions + // really are. + throw e; + } + + // Master may have applied sequences to unitialized primary keys, so + // copy primary keys to replica. Mark properties as dirty to allow + // primary key to be changed. + replica.markPropertiesDirty(); + + // Copy all properties in order to trigger constraints that + // master should have resolved. + master.copyAllProperties(replica); + + return null; + } + + @Override + public Object beforeUpdate(S replica) throws PersistException { + return beforeUpdate(replica, false); + } + + @Override + public Object beforeTryUpdate(S replica) throws PersistException { + return beforeUpdate(replica, true); + } + + private Object beforeUpdate(S replica, boolean forTry) throws PersistException { + if (isReplicationDisabled()) { + return null; + } + + final S master = mMasterStorage.prepare(); + replica.copyPrimaryKeyProperties(master); + + if (!replica.hasDirtyProperties()) { + // Nothing to update, but must load from master anyhow, since + // update must always perform a fresh load as a side-effect. We + // cannot simply call update on the master, since it may need a + // version property to be set. Setting the version has the + // side-effect of making the storable look dirty, so the master + // will perform an update. This in turn causes the version to + // increase for no reason. + try { + if (forTry) { + if (!master.tryLoad()) { + // Master record does not exist. To ensure consistency, + // delete record from replica. + deleteReplica(replica); + throw abortTry(); + } + } else { + try { + master.load(); + } catch (FetchNoneException e) { + // Master record does not exist. To ensure consistency, + // delete record from replica. + deleteReplica(replica); + throw e; + } + } + } catch (FetchException e) { + throw e.toPersistException + ("Could not load master object for update: " + master.toStringKeyOnly()); + } + } else { + replica.copyVersionProperty(master); + replica.copyDirtyProperties(master); + + try { + if (forTry) { + if (!master.tryUpdate()) { + // Master record does not exist. To ensure consistency, + // delete record from replica. + deleteReplica(replica); + throw abortTry(); + } + } else { + try { + master.update(); + } catch (PersistNoneException e) { + // Master record does not exist. To ensure consistency, + // delete record from replica. + deleteReplica(replica); + throw e; + } + } + } catch (OptimisticLockException e) { + // This may be caused by an inconsistency between replica and + // master. + + repair(replica); + + // Throw original exception since we don't know what the user's + // intentions really are. + throw e; + } + } + + // Copy master properties back, since its repository may have + // altered property values as a side effect. + master.copyUnequalProperties(replica); + + return null; + } + + @Override + public Object beforeDelete(S replica) throws PersistException { + if (isReplicationDisabled()) { + return null; + } + + S master = mMasterStorage.prepare(); + replica.copyPrimaryKeyProperties(master); + + // If this fails to delete anything, don't care. Any delete failure + // will be detected when the replica is deleted. If there was an + // inconsistency, it is resolved after the replica is deleted. + master.tryDelete(); + + return null; + } + + /** + * Re-sync the replica to the master. The primary keys of both entries are + * assumed to match. + * + * @param replicaEntry current replica entry, or null if none + * @param masterEntry current master entry, or null if none + */ + void resyncEntries(S replicaEntry, S masterEntry) throws FetchException, PersistException { + if (replicaEntry == null && masterEntry == null) { + return; + } + + Log log = LogFactory.getLog(ReplicatedRepository.class); + + setReplicationDisabled(true); + try { + Transaction txn = mRepository.enterTransaction(); + try { + if (replicaEntry != null) { + if (masterEntry == null) { + log.info("Deleting bogus entry: " + replicaEntry); + } + replicaEntry.tryDelete(); + } + if (masterEntry != null) { + Storable newReplicaEntry = mReplicaStorage.prepare(); + if (replicaEntry == null) { + masterEntry.copyAllProperties(newReplicaEntry); + log.info("Adding missing entry: " + newReplicaEntry); + } else { + if (replicaEntry.equalProperties(masterEntry)) { + return; + } + // First copy from old replica to preserve values of + // any independent properties. Be sure not to copy + // nulls from old replica to new replica, in case new + // non-nullable properties have been added. This is why + // copyUnequalProperties is called instead of + // copyAllProperties. + replicaEntry.copyUnequalProperties(newReplicaEntry); + // Calling copyAllProperties will skip unsupported + // independent properties in master, thus preserving + // old independent property values. + masterEntry.copyAllProperties(newReplicaEntry); + log.info("Replacing stale entry with: " + newReplicaEntry); + } + if (!newReplicaEntry.tryInsert()) { + // Try to correct bizarre corruption. + newReplicaEntry.tryDelete(); + newReplicaEntry.tryInsert(); + } + } + txn.commit(); + } finally { + txn.exit(); + } + } finally { + setReplicationDisabled(false); + } + } + + /** + * Runs a repair in a background thread. This is done for two reasons: It + * allows repair to not be hindered by locks acquired by transactions and + * repairs don't get rolled back when culprit exception is thrown. Culprit + * may be UniqueConstraintException or OptimisticLockException. + */ + private void repair(S replica) throws PersistException { + replica = (S) replica.copy(); + S master = mMasterStorage.prepare(); + replica.copyPrimaryKeyProperties(master); + + try { + if (replica.tryLoad()) { + if (master.tryLoad()) { + if (replica.equalProperties(master)) { + // Both are equal -- no repair needed. + return; + } + } + } else if (!master.tryLoad()) { + // Both are missing -- no repair needed. + return; + } + } catch (FetchException e) { + throw e.toPersistException(); + } + + final S finalReplica = replica; + final S finalMaster = master; + + RepairExecutor.execute(new Runnable() { + public void run() { + try { + Transaction txn = mRepository.enterTransaction(); + try { + txn.setForUpdate(true); + if (finalReplica.tryLoad()) { + if (finalMaster.tryLoad()) { + resyncEntries(finalReplica, finalMaster); + } else { + resyncEntries(finalReplica, null); + } + } else if (finalMaster.tryLoad()) { + resyncEntries(null, finalMaster); + } + txn.commit(); + } finally { + txn.exit(); + } + } catch (FetchException fe) { + Log log = LogFactory.getLog(ReplicatedRepository.class); + log.warn("Unable to check if repair is required for " + + finalReplica.toStringKeyOnly(), fe); + } catch (PersistException pe) { + Log log = LogFactory.getLog(ReplicatedRepository.class); + log.error("Unable to repair entry " + + finalReplica.toStringKeyOnly(), pe); + } + } + }); + } + + /** + * Deletes the replica entry with replication disabled. + */ + private void deleteReplica(S replica) throws PersistException { + // Disable replication to prevent trigger from being invoked by + // deleting replica. + setReplicationDisabled(true); + try { + replica.tryDelete(); + } finally { + setReplicationDisabled(false); + } + } + + /** + * Returns true if replication is disabled for the current thread. + */ + private boolean isReplicationDisabled() { + // Count indicates how many times disabled (nested) + AtomicInteger i = mDisabled.get(); + return i != null && i.get() > 0; + } + + /** + * By default, replication is enabled for the current thread. Pass true to + * disable during re-sync operations. + */ + private void setReplicationDisabled(boolean disabled) { + // Using a count allows this method call to be nested. Based on the + // current implementation, it should never be nested, so this extra + // work is just a safeguard. + AtomicInteger i = mDisabled.get(); + if (disabled) { + if (i == null) { + i = new AtomicInteger(1); + mDisabled.set(i); + } else { + i.incrementAndGet(); + } + } else { + if (i != null) { + i.decrementAndGet(); + } + } + } +} diff --git a/src/main/java/com/amazon/carbonado/repo/replicated/package-info.java b/src/main/java/com/amazon/carbonado/repo/replicated/package-info.java new file mode 100644 index 0000000..44d8124 --- /dev/null +++ b/src/main/java/com/amazon/carbonado/repo/replicated/package-info.java @@ -0,0 +1,27 @@ +/* + * Copyright 2006 Amazon Technologies, Inc. or its affiliates. + * Amazon, Amazon.com and Carbonado are trademarks or registered trademarks + * of Amazon Technologies, Inc. or its affiliates. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Repository implementation that supports replication between two + * repositories. One repository is the replica, and the other is the + * master. Read operations are served by the replica, and the master is + * consulted when writing. Changes to the master are copied to the replica. + * + * @see com.amazon.carbonado.repo.replicated.ReplicatedRepositoryBuilder + */ +package com.amazon.carbonado.repo.replicated; -- cgit v1.2.3