/* * Copyright 2006-2012 Amazon Technologies, Inc. or its affiliates. * Amazon, Amazon.com and Carbonado are trademarks or registered trademarks * of Amazon Technologies, Inc. or its affiliates. All rights reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package com.amazon.carbonado.spi; import java.lang.reflect.Method; import java.util.ArrayList; import java.util.Arrays; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import com.amazon.carbonado.FetchException; import com.amazon.carbonado.PersistException; import com.amazon.carbonado.RepositoryException; import com.amazon.carbonado.Storable; import com.amazon.carbonado.Transaction; import com.amazon.carbonado.Trigger; import com.amazon.carbonado.TriggerFactory; /** * Used by Storage implementations to manage triggers and consolidate them into * single logical triggers. This class is thread-safe and ensures that changes * to the trigger set do not affect transactions in progress. * * @author Brian S O'Neill */ public class TriggerManager extends Trigger { // Bit masks returned by selectTypes. private static final int FOR_INSERT = 1; private static final int FOR_UPDATE = 2; private static final int FOR_DELETE = 4; private static final int FOR_LOAD = 8; private static final Method[] INSERT_METHODS; private static final Method[] UPDATE_METHODS; private static final Method[] DELETE_METHODS; private static final Method AFTER_LOAD_METHOD; static { Class triggerClass = Trigger.class; Class[] ONE_PARAM = {Object.class}; Class[] TXN_PARAMS = {Transaction.class, Object.class}; Class[] TWO_PARAMS = {Object.class, Object.class}; try { INSERT_METHODS = new Method[] { triggerClass.getMethod("beforeInsert", ONE_PARAM), triggerClass.getMethod("beforeInsert", TXN_PARAMS), triggerClass.getMethod("beforeTryInsert", ONE_PARAM), triggerClass.getMethod("beforeTryInsert", TXN_PARAMS), triggerClass.getMethod("afterInsert", TWO_PARAMS), triggerClass.getMethod("afterTryInsert", TWO_PARAMS), triggerClass.getMethod("failedInsert", TWO_PARAMS) }; UPDATE_METHODS = new Method[] { triggerClass.getMethod("beforeUpdate", ONE_PARAM), triggerClass.getMethod("beforeUpdate", TXN_PARAMS), triggerClass.getMethod("beforeTryUpdate", ONE_PARAM), triggerClass.getMethod("beforeTryUpdate", TXN_PARAMS), triggerClass.getMethod("afterUpdate", TWO_PARAMS), triggerClass.getMethod("afterTryUpdate", TWO_PARAMS), triggerClass.getMethod("failedUpdate", TWO_PARAMS) }; DELETE_METHODS = new Method[] { triggerClass.getMethod("beforeDelete", ONE_PARAM), triggerClass.getMethod("beforeDelete", TXN_PARAMS), triggerClass.getMethod("beforeTryDelete", ONE_PARAM), triggerClass.getMethod("beforeTryDelete", TXN_PARAMS), triggerClass.getMethod("afterDelete", TWO_PARAMS), triggerClass.getMethod("afterTryDelete", TWO_PARAMS), triggerClass.getMethod("failedDelete", TWO_PARAMS) }; AFTER_LOAD_METHOD = triggerClass.getMethod("afterLoad", ONE_PARAM); } catch (NoSuchMethodException e) { Error error = new NoSuchMethodError(); error.initCause(e); throw error; } } private final ForInsert mForInsert = new ForInsert(); private final ForUpdate mForUpdate = new ForUpdate(); private final ForDelete mForDelete = new ForDelete(); private final ForLoad mForLoad = new ForLoad(); public TriggerManager() { } /** * @param triggerFactories TriggerFactories which will be called upon to * optionally return a trigger to initially register */ public TriggerManager(Class type, Iterable triggerFactories) throws RepositoryException { if (triggerFactories != null) { addTriggers(type, triggerFactories); } } /** * Returns a consolidated trigger to call for insert operations, or null if * none. If not null, the consolidated trigger is not a snapshot -- it will * change as the set of triggers in this manager changes. */ public Trigger getInsertTrigger() { ForInsert forInsert = mForInsert; return forInsert.isEmpty() ? null : forInsert; } /** * Returns a consolidated trigger to call for update operations, or null if * none. If not null, the consolidated trigger is not a snapshot -- it will * change as the set of triggers in this manager changes. */ public Trigger getUpdateTrigger() { ForUpdate forUpdate = mForUpdate; return forUpdate.isEmpty() ? null : forUpdate; } /** * Returns a consolidated trigger to call for delete operations, or null if * none. If not null, the consolidated trigger is not a snapshot -- it will * change as the set of triggers in this manager changes. */ public Trigger getDeleteTrigger() { ForDelete forDelete = mForDelete; return forDelete.isEmpty() ? null : forDelete; } /** * Returns a consolidated trigger to call for load operations, or null if * none. If not null, the consolidated trigger is not a snapshot -- it will * change as the set of triggers in this manager changes. * * @since 1.2 */ public Trigger getLoadTrigger() { ForLoad forLoad = mForLoad; return forLoad.isEmpty() ? null : forLoad; } public boolean addTrigger(Trigger trigger) { if (trigger == null) { throw new IllegalArgumentException(); } int types = selectTypes(trigger); boolean retValue = false; if ((types & FOR_INSERT) != 0) { retValue |= mForInsert.add(trigger); } if ((types & FOR_UPDATE) != 0) { retValue |= mForUpdate.add(trigger); } if ((types & FOR_DELETE) != 0) { retValue |= mForDelete.add(trigger); } if ((types & FOR_LOAD) != 0) { retValue |= mForLoad.add(trigger); } return retValue; } public boolean removeTrigger(Trigger trigger) { if (trigger == null) { throw new IllegalArgumentException(); } int types = selectTypes(trigger); boolean retValue = false; if ((types & FOR_INSERT) != 0) { retValue |= mForInsert.remove(trigger); } if ((types & FOR_UPDATE) != 0) { retValue |= mForUpdate.remove(trigger); } if ((types & FOR_DELETE) != 0) { retValue |= mForDelete.remove(trigger); } if ((types & FOR_LOAD) != 0) { retValue |= mForLoad.remove(trigger); } return retValue; } public void addTriggers(Class type, Iterable triggerFactories) throws RepositoryException { for (TriggerFactory factory : triggerFactories) { Trigger trigger = factory.triggerFor(type); if (trigger != null) { addTrigger(trigger); } } } /** * Disables execution of all managed insert triggers for the current * thread. Call locallyEnableInsert to enable again. This call can be made * multiple times, but be sure to call locallyEnableInsert the same number of * times to fully enable. * * @since 1.2 */ public void locallyDisableInsert() { mForInsert.locallyDisable(); } /** * Enables execution of all managed insert triggers for the current thread, * if they had been disabled before. * * @since 1.2 */ public void locallyEnableInsert() { mForInsert.locallyEnable(); } /** * Disables execution of all managed update triggers for the current * thread. Call locallyEnableUpdate to enable again. This call can be made * multiple times, but be sure to call locallyEnableUpdate the same number of * times to fully enable. * * @since 1.2 */ public void locallyDisableUpdate() { mForUpdate.locallyDisable(); } /** * Enables execution of all managed update triggers for the current thread, * if they had been disabled before. * * @since 1.2 */ public void locallyEnableUpdate() { mForUpdate.locallyEnable(); } /** * Disables execution of all managed delete triggers for the current * thread. Call locallyEnableDelete to enable again. This call can be made * multiple times, but be sure to call locallyEnableDelete the same number of * times to fully enable. * * @since 1.2 */ public void locallyDisableDelete() { mForDelete.locallyDisable(); } /** * Enables execution of all managed delete triggers for the current thread, * if they had been disabled before. * * @since 1.2 */ public void locallyEnableDelete() { mForDelete.locallyEnable(); } /** * Disables execution of all managed load triggers for the current * thread. Call locallyEnableLoad to enable again. This call can be made * multiple times, but be sure to call locallyEnableLoad the same number of * times to fully enable. * * @since 1.2 */ public void locallyDisableLoad() { mForLoad.locallyDisable(); } /** * Enables execution of all managed load triggers for the current thread, * if they had been disabled before. * * @since 1.2 */ public void locallyEnableLoad() { mForLoad.locallyEnable(); } @Override public Object beforeInsert(S storable) throws PersistException { return mForInsert.beforeInsert(storable); } @Override public Object beforeInsert(Transaction txn, S storable) throws PersistException { return mForInsert.beforeInsert(txn, storable); } @Override public Object beforeTryInsert(S storable) throws PersistException { return mForInsert.beforeTryInsert(storable); } @Override public Object beforeTryInsert(Transaction txn, S storable) throws PersistException { return mForInsert.beforeTryInsert(txn, storable); } @Override public void afterInsert(S storable, Object state) throws PersistException { mForInsert.afterInsert(storable, state); } @Override public void afterTryInsert(S storable, Object state) throws PersistException { mForInsert.afterTryInsert(storable, state); } @Override public void failedInsert(S storable, Object state) { mForInsert.failedInsert(storable, state); } @Override public Object beforeUpdate(S storable) throws PersistException { return mForUpdate.beforeUpdate(storable); } @Override public Object beforeUpdate(Transaction txn, S storable) throws PersistException { return mForUpdate.beforeUpdate(txn, storable); } @Override public Object beforeTryUpdate(S storable) throws PersistException { return mForUpdate.beforeTryUpdate(storable); } @Override public Object beforeTryUpdate(Transaction txn, S storable) throws PersistException { return mForUpdate.beforeTryUpdate(txn, storable); } @Override public void afterUpdate(S storable, Object state) throws PersistException { mForUpdate.afterUpdate(storable, state); } @Override public void afterTryUpdate(S storable, Object state) throws PersistException { mForUpdate.afterTryUpdate(storable, state); } @Override public void failedUpdate(S storable, Object state) { mForUpdate.failedUpdate(storable, state); } @Override public Object beforeDelete(S storable) throws PersistException { return mForDelete.beforeDelete(storable); } @Override public Object beforeDelete(Transaction txn, S storable) throws PersistException { return mForDelete.beforeDelete(txn, storable); } @Override public Object beforeTryDelete(S storable) throws PersistException { return mForDelete.beforeTryDelete(storable); } @Override public Object beforeTryDelete(Transaction txn, S storable) throws PersistException { return mForDelete.beforeTryDelete(txn, storable); } @Override public void afterDelete(S storable, Object state) throws PersistException { mForDelete.afterDelete(storable, state); } @Override public void afterTryDelete(S storable, Object state) throws PersistException { mForDelete.afterTryDelete(storable, state); } @Override public void failedDelete(S storable, Object state) { mForDelete.failedDelete(storable, state); } @Override public void afterLoad(S storable) throws FetchException { mForLoad.afterLoad(storable); } /** * Determines which operations the given trigger overrides. */ private int selectTypes(Trigger trigger) { Class triggerClass = trigger.getClass(); int types = 0; if (overridesOneMethod(triggerClass, INSERT_METHODS)) { types |= FOR_INSERT; } if (overridesOneMethod(triggerClass, UPDATE_METHODS)) { types |= FOR_UPDATE; } if (overridesOneMethod(triggerClass, DELETE_METHODS)) { types |= FOR_DELETE; } if (overridesMethod(triggerClass, AFTER_LOAD_METHOD)) { types |= FOR_LOAD; } return types; } private static boolean overridesOneMethod(Class triggerClass, Method[] methods) { for (Method method : methods) { if (overridesMethod(triggerClass, method)) { return true; } } return false; } private static boolean overridesMethod(Class triggerClass, Method method) { try { return !method.equals(triggerClass.getMethod(method.getName(), method.getParameterTypes())); } catch (NoSuchMethodException e) { return false; } } private static class TriggerStates { final Trigger[] mTriggers; final Object[] mStates; TriggerStates(Trigger[] triggers) { mTriggers = triggers; mStates = new Object[triggers.length]; } } private static abstract class ManagedTrigger extends Trigger { private static final AtomicReferenceFieldUpdater cDisabledFlagRef = AtomicReferenceFieldUpdater .newUpdater(ManagedTrigger.class, ThreadLocal.class, "mDisabledFlag"); private static Trigger[] NO_TRIGGERS = new Trigger[0]; protected volatile Trigger[] mTriggers; private volatile ThreadLocal mDisabledFlag; ManagedTrigger() { mTriggers = NO_TRIGGERS; } boolean add(Trigger trigger) { ArrayList> list = new ArrayList>(Arrays.asList(mTriggers)); if (list.contains(trigger)) { return false; } list.add(trigger); mTriggers = list.toArray(new Trigger[list.size()]); return true; } boolean remove(Trigger trigger) { ArrayList> list = new ArrayList>(Arrays.asList(mTriggers)); if (!list.remove(trigger)) { return false; } mTriggers = list.toArray(new Trigger[list.size()]); return true; } boolean isEmpty() { return mTriggers.length == 0; } boolean isLocallyDisabled() { ThreadLocal disabledFlag = mDisabledFlag; if (disabledFlag == null) { return false; } // Count indicates how many times disabled (nested) AtomicInteger i = disabledFlag.get(); return i != null && i.get() > 0; } void locallyDisable() { // Using a count allows this method call to be nested. ThreadLocal disabledFlag = disabledFlag(); AtomicInteger i = disabledFlag.get(); if (i == null) { disabledFlag.set(new AtomicInteger(1)); } else { i.incrementAndGet(); } } void locallyEnable() { // Using a count allows this method call to be nested. AtomicInteger i = disabledFlag().get(); if (i != null) { i.decrementAndGet(); } } private ThreadLocal disabledFlag() { ThreadLocal disabledFlag = mDisabledFlag; while (disabledFlag == null) { disabledFlag = new ThreadLocal(); if (cDisabledFlagRef.compareAndSet(this, null, disabledFlag)) { break; } disabledFlag = mDisabledFlag; } return disabledFlag; } } private static class ForInsert extends ManagedTrigger { @Override public Object beforeInsert(S storable) throws PersistException { if (isLocallyDisabled()) { return null; } TriggerStates triggerStates = null; Trigger[] triggers = mTriggers; for (int i=triggers.length; --i>=0; ) { Object state = triggers[i].beforeInsert(storable); if (state != null) { if (triggerStates == null) { triggerStates = new TriggerStates(triggers); } triggerStates.mStates[i] = state; } } return triggerStates == null ? triggers : triggerStates; } @Override public Object beforeInsert(Transaction txn, S storable) throws PersistException { if (isLocallyDisabled()) { return null; } TriggerStates triggerStates = null; Trigger[] triggers = mTriggers; for (int i=triggers.length; --i>=0; ) { Object state = triggers[i].beforeInsert(txn, storable); if (state != null) { if (triggerStates == null) { triggerStates = new TriggerStates(triggers); } triggerStates.mStates[i] = state; } } return triggerStates == null ? triggers : triggerStates; } @Override public Object beforeTryInsert(S storable) throws PersistException { if (isLocallyDisabled()) { return null; } TriggerStates triggerStates = null; Trigger[] triggers = mTriggers; for (int i=triggers.length; --i>=0; ) { Object state = triggers[i].beforeTryInsert(storable); if (state != null) { if (triggerStates == null) { triggerStates = new TriggerStates(triggers); } triggerStates.mStates[i] = state; } } return triggerStates == null ? triggers : triggerStates; } @Override public Object beforeTryInsert(Transaction txn, S storable) throws PersistException { if (isLocallyDisabled()) { return null; } TriggerStates triggerStates = null; Trigger[] triggers = mTriggers; for (int i=triggers.length; --i>=0; ) { Object state = triggers[i].beforeTryInsert(txn, storable); if (state != null) { if (triggerStates == null) { triggerStates = new TriggerStates(triggers); } triggerStates.mStates[i] = state; } } return triggerStates == null ? triggers : triggerStates; } @Override public void afterInsert(S storable, Object state) throws PersistException { if (isLocallyDisabled()) { return; } TriggerStates triggerStates; Trigger[] triggers; if (state == null) { triggerStates = null; triggers = mTriggers; } else if (state instanceof TriggerStates) { triggerStates = (TriggerStates) state; triggers = triggerStates.mTriggers; } else { triggerStates = null; triggers = (Trigger[]) state; } int length = triggers.length; if (triggerStates == null) { for (int i=0; i triggerStates; Trigger[] triggers; if (state == null) { triggerStates = null; triggers = mTriggers; } else if (state instanceof TriggerStates) { triggerStates = (TriggerStates) state; triggers = triggerStates.mTriggers; } else { triggerStates = null; triggers = (Trigger[]) state; } int length = triggers.length; if (triggerStates == null) { for (int i=0; i triggerStates; Trigger[] triggers; if (state == null) { triggerStates = null; triggers = mTriggers; } else if (state instanceof TriggerStates) { triggerStates = (TriggerStates) state; triggers = triggerStates.mTriggers; } else { triggerStates = null; triggers = (Trigger[]) state; } int length = triggers.length; if (triggerStates == null) { for (int i=0; i extends ManagedTrigger { @Override public Object beforeUpdate(S storable) throws PersistException { if (isLocallyDisabled()) { return null; } TriggerStates triggerStates = null; Trigger[] triggers = mTriggers; for (int i=triggers.length; --i>=0; ) { Object state = triggers[i].beforeUpdate(storable); if (state != null) { if (triggerStates == null) { triggerStates = new TriggerStates(triggers); } triggerStates.mStates[i] = state; } } return triggerStates == null ? triggers : triggerStates; } @Override public Object beforeUpdate(Transaction txn, S storable) throws PersistException { if (isLocallyDisabled()) { return null; } TriggerStates triggerStates = null; Trigger[] triggers = mTriggers; for (int i=triggers.length; --i>=0; ) { Object state = triggers[i].beforeUpdate(txn, storable); if (state != null) { if (triggerStates == null) { triggerStates = new TriggerStates(triggers); } triggerStates.mStates[i] = state; } } return triggerStates == null ? triggers : triggerStates; } @Override public Object beforeTryUpdate(S storable) throws PersistException { if (isLocallyDisabled()) { return null; } TriggerStates triggerStates = null; Trigger[] triggers = mTriggers; for (int i=triggers.length; --i>=0; ) { Object state = triggers[i].beforeTryUpdate(storable); if (state != null) { if (triggerStates == null) { triggerStates = new TriggerStates(triggers); } triggerStates.mStates[i] = state; } } return triggerStates == null ? triggers : triggerStates; } @Override public Object beforeTryUpdate(Transaction txn, S storable) throws PersistException { if (isLocallyDisabled()) { return null; } TriggerStates triggerStates = null; Trigger[] triggers = mTriggers; for (int i=triggers.length; --i>=0; ) { Object state = triggers[i].beforeTryUpdate(txn, storable); if (state != null) { if (triggerStates == null) { triggerStates = new TriggerStates(triggers); } triggerStates.mStates[i] = state; } } return triggerStates == null ? triggers : triggerStates; } @Override public void afterUpdate(S storable, Object state) throws PersistException { if (isLocallyDisabled()) { return; } TriggerStates triggerStates; Trigger[] triggers; if (state == null) { triggerStates = null; triggers = mTriggers; } else if (state instanceof TriggerStates) { triggerStates = (TriggerStates) state; triggers = triggerStates.mTriggers; } else { triggerStates = null; triggers = (Trigger[]) state; } int length = triggers.length; if (triggerStates == null) { for (int i=0; i triggerStates; Trigger[] triggers; if (state == null) { triggerStates = null; triggers = mTriggers; } else if (state instanceof TriggerStates) { triggerStates = (TriggerStates) state; triggers = triggerStates.mTriggers; } else { triggerStates = null; triggers = (Trigger[]) state; } int length = triggers.length; if (triggerStates == null) { for (int i=0; i triggerStates; Trigger[] triggers; if (state == null) { triggerStates = null; triggers = mTriggers; } else if (state instanceof TriggerStates) { triggerStates = (TriggerStates) state; triggers = triggerStates.mTriggers; } else { triggerStates = null; triggers = (Trigger[]) state; } int length = triggers.length; if (triggerStates == null) { for (int i=0; i extends ManagedTrigger { @Override public Object beforeDelete(S storable) throws PersistException { if (isLocallyDisabled()) { return null; } TriggerStates triggerStates = null; Trigger[] triggers = mTriggers; for (int i=triggers.length; --i>=0; ) { Object state = triggers[i].beforeDelete(storable); if (state != null) { if (triggerStates == null) { triggerStates = new TriggerStates(triggers); } triggerStates.mStates[i] = state; } } return triggerStates == null ? triggers : triggerStates; } @Override public Object beforeDelete(Transaction txn, S storable) throws PersistException { if (isLocallyDisabled()) { return null; } TriggerStates triggerStates = null; Trigger[] triggers = mTriggers; for (int i=triggers.length; --i>=0; ) { Object state = triggers[i].beforeDelete(txn, storable); if (state != null) { if (triggerStates == null) { triggerStates = new TriggerStates(triggers); } triggerStates.mStates[i] = state; } } return triggerStates == null ? triggers : triggerStates; } @Override public Object beforeTryDelete(S storable) throws PersistException { if (isLocallyDisabled()) { return null; } TriggerStates triggerStates = null; Trigger[] triggers = mTriggers; for (int i=triggers.length; --i>=0; ) { Object state = triggers[i].beforeTryDelete(storable); if (state != null) { if (triggerStates == null) { triggerStates = new TriggerStates(triggers); } triggerStates.mStates[i] = state; } } return triggerStates == null ? triggers : triggerStates; } @Override public Object beforeTryDelete(Transaction txn, S storable) throws PersistException { if (isLocallyDisabled()) { return null; } TriggerStates triggerStates = null; Trigger[] triggers = mTriggers; for (int i=triggers.length; --i>=0; ) { Object state = triggers[i].beforeTryDelete(txn, storable); if (state != null) { if (triggerStates == null) { triggerStates = new TriggerStates(triggers); } triggerStates.mStates[i] = state; } } return triggerStates == null ? triggers : triggerStates; } @Override public void afterDelete(S storable, Object state) throws PersistException { if (isLocallyDisabled()) { return; } TriggerStates triggerStates; Trigger[] triggers; if (state == null) { triggerStates = null; triggers = mTriggers; } else if (state instanceof TriggerStates) { triggerStates = (TriggerStates) state; triggers = triggerStates.mTriggers; } else { triggerStates = null; triggers = (Trigger[]) state; } int length = triggers.length; if (triggerStates == null) { for (int i=0; i triggerStates; Trigger[] triggers; if (state == null) { triggerStates = null; triggers = mTriggers; } else if (state instanceof TriggerStates) { triggerStates = (TriggerStates) state; triggers = triggerStates.mTriggers; } else { triggerStates = null; triggers = (Trigger[]) state; } int length = triggers.length; if (triggerStates == null) { for (int i=0; i triggerStates; Trigger[] triggers; if (state == null) { triggerStates = null; triggers = mTriggers; } else if (state instanceof TriggerStates) { triggerStates = (TriggerStates) state; triggers = triggerStates.mTriggers; } else { triggerStates = null; triggers = (Trigger[]) state; } int length = triggers.length; if (triggerStates == null) { for (int i=0; i extends ManagedTrigger { @Override public void afterLoad(S storable) throws FetchException { if (!isLocallyDisabled()) { Trigger[] triggers = mTriggers; for (int i=triggers.length; --i>=0; ) { triggers[i].afterLoad(storable); } } } } }