diff options
| author | Jesse Morgan <jesse@jesterpm.net> | 2016-04-09 14:22:20 -0700 | 
|---|---|---|
| committer | Jesse Morgan <jesse@jesterpm.net> | 2016-04-09 15:48:01 -0700 | 
| commit | 3102d8bce3426d9cf41aeaf201c360d342677770 (patch) | |
| tree | 38c4f1e8828f9af9c4b77a173bee0d312b321698 /src/main/java/com/p4square/grow/backend/db | |
| parent | bbf907e51dfcf157bdee24dead1d531122aa25db (diff) | |
Switching from Ivy+Ant to Maven.
Diffstat (limited to 'src/main/java/com/p4square/grow/backend/db')
5 files changed, 463 insertions, 0 deletions
diff --git a/src/main/java/com/p4square/grow/backend/db/CassandraCollectionProvider.java b/src/main/java/com/p4square/grow/backend/db/CassandraCollectionProvider.java new file mode 100644 index 0000000..bfcb48d --- /dev/null +++ b/src/main/java/com/p4square/grow/backend/db/CassandraCollectionProvider.java @@ -0,0 +1,109 @@ +/* + * Copyright 2013 Jesse Morgan + */ + +package com.p4square.grow.backend.db; + +import java.io.IOException; + +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.Map; + +import com.netflix.astyanax.model.Column; +import com.netflix.astyanax.model.ColumnList; + +import com.p4square.grow.provider.CollectionProvider; +import com.p4square.grow.provider.JsonEncodedProvider; + +/** + * CollectionProvider implementation backed by a Cassandra ColumnFamily. + * + * @author Jesse Morgan <jesse@jesterpm.net> + */ +public class CassandraCollectionProvider<V> implements CollectionProvider<String, String, V> { +    private final CassandraDatabase mDb; +    private final String mCF; +    private final Class<V> mClazz; + +    public CassandraCollectionProvider(CassandraDatabase db, String columnFamily, Class<V> clazz) { +        mDb = db; +        mCF = columnFamily; +        mClazz = clazz; +    } + +    @Override +    public V get(String collection, String key) throws IOException { +        String blob = mDb.getKey(mCF, collection, key); +        return decode(blob); +    } + +    @Override +    public Map<String, V> query(String collection) throws IOException { +        return query(collection, -1); +    } + +    @Override +    public Map<String, V> query(String collection, int limit) throws IOException { +        Map<String, V> result = new LinkedHashMap<>(); + +        ColumnList<String> row = mDb.getRow(mCF, collection); +        if (!row.isEmpty()) { +            int count = 0; +            for (Column<String> c : row) { +                if (limit >= 0 && ++count > limit) { +                    break; // Limit reached. +                } + +                String key = c.getName(); +                String blob = c.getStringValue(); +                V obj = decode(blob); + +                result.put(key, obj); +            } +        } + +        return Collections.unmodifiableMap(result); +    } + +    @Override +    public void put(String collection, String key, V obj) throws IOException { +        String blob = encode(obj); +        mDb.putKey(mCF, collection, key, blob); +    } + +    /** +     * Encode the object as JSON. +     * +     * @param obj The object to encode. +     * @return The JSON encoding of obj. +     * @throws IOException if the object cannot be encoded. +     */ +    protected String encode(V obj) throws IOException { +        if (mClazz == String.class) { +            return (String) obj; +        } else { +            return JsonEncodedProvider.MAPPER.writeValueAsString(obj); +        } +    } + +    /** +     * Decode the JSON string as an object. +     * +     * @param blob The JSON data to decode. +     * @return The decoded object or null if blob is null. +     * @throws IOException If an object cannot be decoded. +     */ +    protected V decode(String blob) throws IOException { +        if (blob == null) { +            return null; +        } + +        if (mClazz == String.class) { +            return (V) blob; +        } + +        V obj = JsonEncodedProvider.MAPPER.readValue(blob, mClazz); +        return obj; +    } +} diff --git a/src/main/java/com/p4square/grow/backend/db/CassandraDatabase.java b/src/main/java/com/p4square/grow/backend/db/CassandraDatabase.java new file mode 100644 index 0000000..b8cb6df --- /dev/null +++ b/src/main/java/com/p4square/grow/backend/db/CassandraDatabase.java @@ -0,0 +1,212 @@ +/* + * Copyright 2013 Jesse Morgan + */ + +package com.p4square.grow.backend.db; + +import com.netflix.astyanax.AstyanaxContext; +import com.netflix.astyanax.connectionpool.exceptions.ConnectionException; +import com.netflix.astyanax.connectionpool.impl.ConnectionPoolConfigurationImpl; +import com.netflix.astyanax.connectionpool.impl.CountingConnectionPoolMonitor; +import com.netflix.astyanax.connectionpool.NodeDiscoveryType; +import com.netflix.astyanax.connectionpool.OperationResult; +import com.netflix.astyanax.impl.AstyanaxConfigurationImpl; +import com.netflix.astyanax.Keyspace; +import com.netflix.astyanax.ColumnMutation; +import com.netflix.astyanax.model.Column; +import com.netflix.astyanax.model.ColumnFamily; +import com.netflix.astyanax.model.ColumnList; +import com.netflix.astyanax.ColumnListMutation; +import com.netflix.astyanax.MutationBatch; +import com.netflix.astyanax.serializers.StringSerializer; +import com.netflix.astyanax.thrift.ThriftFamilyFactory; + +import org.apache.log4j.Logger; + +/** + * Cassandra Database Abstraction for the Backend. + * + * @author Jesse Morgan <jesse@jesterpm.net> + */ +public class CassandraDatabase { +    private static Logger cLog = Logger.getLogger(CassandraDatabase.class); + +    // Configuration fields. +    private String mClusterName; +    private String mKeyspaceName; +    private String mSeedEndpoint   = "127.0.0.1:9160"; +    private int    mPort           = 9160; + +    private AstyanaxContext<Keyspace>  mContext; +    private Keyspace mKeyspace; + +    /** +     * Connect to Cassandra. +     * +     * Cluster and Keyspace must be set before calling init(). +     */ +    public void init() { +        mContext = new AstyanaxContext.Builder() +            .forCluster(mClusterName) +            .forKeyspace(mKeyspaceName) +            .withAstyanaxConfiguration(new AstyanaxConfigurationImpl() +                .setDiscoveryType(NodeDiscoveryType.RING_DESCRIBE) +            ) +            .withConnectionPoolConfiguration(new ConnectionPoolConfigurationImpl("MyConnectionPool") +                .setPort(mPort) +                .setMaxConnsPerHost(1) +                .setSeeds(mSeedEndpoint) +            ) +            .withConnectionPoolMonitor(new CountingConnectionPoolMonitor()) +            .buildKeyspace(ThriftFamilyFactory.getInstance()); + +        mContext.start(); +        mKeyspace = mContext.getClient(); +    } + +    /** +     * Close the database connection. +     */ +    public void close() { +        mContext.shutdown(); +    } + +    /** +     * Set the cluster name to connect to. +     */ +    public void setClusterName(final String cluster) { +        mClusterName = cluster; +    } + +    /** +     * Set the name of the keyspace to open. +     */ +    public void setKeyspaceName(final String keyspace) { +        mKeyspaceName = keyspace; +    } + +    /** +     * Change the seed endpoint. +     * The default is 127.0.0.1:9160. +     */ +    public void setSeedEndpoint(final String endpoint) { +        mSeedEndpoint = endpoint; +    } + +    /** +     * Change the port to connect to. +     * The default is 9160. +     */ +    public void setPort(final int port) { +        mPort = port; +    } + +    /** +     * @return The entire row associated with this key. +     */ +    public ColumnList<String> getRow(final String cfName, final String key) { +        try { +            ColumnFamily<String, String> cf = new ColumnFamily(cfName, +                StringSerializer.get(), +                StringSerializer.get()); + +            OperationResult<ColumnList<String>> result = +                mKeyspace.prepareQuery(cf) +                    .getKey(key) +                    .execute(); + +            return result.getResult(); + +        } catch (ConnectionException e) { +            cLog.error("getRow failed due to Connection Exception", e); +            throw new RuntimeException(e); +        } +    } + +    /** +     * @return The value associated with the given key. +     */ +    public String getKey(final String cfName, final String key) { +        return getKey(cfName, key, "value"); +    } + +    /** +     * @return The value associated with the given key, column pair. +     */ +    public String getKey(final String cfName, final String key, final String column) { +        final ColumnList<String> row = getRow(cfName, key); + +        if (row != null) { +            final Column rowColumn = row.getColumnByName(column); +            if (rowColumn != null) { +                return rowColumn.getStringValue(); +            } +        } + +        return null; +    } + +    /** +     * Assign value to key. +     */ +    public void putKey(final String cfName, final String key, final String value) { +        putKey(cfName, key, "value", value); +    } + +    /** +     * Assign value to the key, column pair. +     */ +    public void putKey(final String cfName, final String key, +            final String column, final String value) { + +        ColumnFamily<String, String> cf = new ColumnFamily(cfName, +            StringSerializer.get(), +            StringSerializer.get()); + +        MutationBatch m = mKeyspace.prepareMutationBatch(); +        m.withRow(cf, key).putColumn(column, value); + +        try { +            m.execute(); +        } catch (ConnectionException e) { +            cLog.error("putKey failed due to Connection Exception", e); +            throw new RuntimeException(e); +        } +    } + +    /** +     * Remove a key, column pair. +     */ +    public void deleteKey(final String cfName, final String key, final String column) { +        ColumnFamily<String, String> cf = new ColumnFamily(cfName, +            StringSerializer.get(), +            StringSerializer.get()); + +        try { +            ColumnMutation m = mKeyspace.prepareColumnMutation(cf, key, column); +            m.deleteColumn().execute(); +        } catch (ConnectionException e) { +            cLog.error("deleteKey failed due to Connection Exception", e); +            throw new RuntimeException(e); +        } +    } + +    /** +     * Remove a row +     */ +    public void deleteRow(final String cfName, final String key) { +        ColumnFamily<String, String> cf = new ColumnFamily(cfName, +            StringSerializer.get(), +            StringSerializer.get()); + +        try { +            MutationBatch batch = mKeyspace.prepareMutationBatch(); +            ColumnListMutation<String> cfm = batch.withRow(cf, key).delete(); +            batch.execute(); + +        } catch (ConnectionException e) { +            cLog.error("deleteRow failed due to Connection Exception", e); +            throw new RuntimeException(e); +        } +    } +} diff --git a/src/main/java/com/p4square/grow/backend/db/CassandraKey.java b/src/main/java/com/p4square/grow/backend/db/CassandraKey.java new file mode 100644 index 0000000..853fe96 --- /dev/null +++ b/src/main/java/com/p4square/grow/backend/db/CassandraKey.java @@ -0,0 +1,34 @@ +/* + * Copyright 2013 Jesse Morgan + */ + +package com.p4square.grow.backend.db; + +/** + * CassandraKey represents a Cassandra key / column pair. + * + * @author Jesse Morgan <jesse@jesterpm.net> + */ +public class CassandraKey { +    private final String mColumnFamily; +    private final String mId; +    private final String mColumn; + +    public CassandraKey(String columnFamily, String id, String column) { +        mColumnFamily = columnFamily; +        mId = id; +        mColumn = column; +    } + +    public String getColumnFamily() { +        return mColumnFamily; +    } + +    public String getId() { +        return mId; +    } + +    public String getColumn() { +        return mColumn; +    } +} diff --git a/src/main/java/com/p4square/grow/backend/db/CassandraProviderImpl.java b/src/main/java/com/p4square/grow/backend/db/CassandraProviderImpl.java new file mode 100644 index 0000000..da5a9f2 --- /dev/null +++ b/src/main/java/com/p4square/grow/backend/db/CassandraProviderImpl.java @@ -0,0 +1,37 @@ +/* + * Copyright 2013 Jesse Morgan + */ + +package com.p4square.grow.backend.db; + +import java.io.IOException; + +import com.p4square.grow.provider.Provider; +import com.p4square.grow.provider.JsonEncodedProvider; + +/** + * Provider implementation backed by a Cassandra ColumnFamily. + * + * @author Jesse Morgan <jesse@jesterpm.net> + */ +public class CassandraProviderImpl<V> extends JsonEncodedProvider<V> implements Provider<CassandraKey, V> { +    private final CassandraDatabase mDb; + +    public CassandraProviderImpl(CassandraDatabase db, Class<V> clazz) { +        super(clazz); + +        mDb = db; +    } + +    @Override +    public V get(CassandraKey key) throws IOException { +        String blob = mDb.getKey(key.getColumnFamily(), key.getId(), key.getColumn()); +        return decode(blob); +    } + +    @Override +    public void put(CassandraKey key, V obj) throws IOException { +        String blob = encode(obj); +        mDb.putKey(key.getColumnFamily(), key.getId(), key.getColumn(), blob); +    } +} diff --git a/src/main/java/com/p4square/grow/backend/db/CassandraTrainingRecordProvider.java b/src/main/java/com/p4square/grow/backend/db/CassandraTrainingRecordProvider.java new file mode 100644 index 0000000..4face52 --- /dev/null +++ b/src/main/java/com/p4square/grow/backend/db/CassandraTrainingRecordProvider.java @@ -0,0 +1,71 @@ +/* + * Copyright 2013 Jesse Morgan + */ + +package com.p4square.grow.backend.db; + +import java.io.IOException; + +import com.p4square.grow.model.Playlist; +import com.p4square.grow.model.TrainingRecord; + +import com.p4square.grow.provider.JsonEncodedProvider; +import com.p4square.grow.provider.Provider; + +/** + * + * @author Jesse Morgan <jesse@jesterpm.net> + */ +public class CassandraTrainingRecordProvider implements Provider<String, TrainingRecord> { +    private static final CassandraKey DEFAULT_PLAYLIST_KEY = new CassandraKey("strings", "defaultPlaylist", "value"); + +    private static final String COLUMN_FAMILY = "training"; +    private static final String PLAYLIST_KEY = "playlist"; +    private static final String LAST_VIDEO_KEY = "lastVideo"; + +    private final CassandraDatabase mDb; +    private final Provider<CassandraKey, Playlist> mPlaylistProvider; + +    public CassandraTrainingRecordProvider(CassandraDatabase db) { +        mDb = db; +        mPlaylistProvider = new CassandraProviderImpl<>(db, Playlist.class); +    } + +    @Override +    public TrainingRecord get(String userid) throws IOException { +        Playlist playlist = mPlaylistProvider.get(new CassandraKey(COLUMN_FAMILY, userid, PLAYLIST_KEY)); + +        if (playlist == null) { +            // We consider no playlist to mean no record whatsoever. +            return null; +        } + +        TrainingRecord r = new TrainingRecord(); +        r.setPlaylist(playlist); +        r.setLastVideo(mDb.getKey(COLUMN_FAMILY, userid, LAST_VIDEO_KEY)); + +        return r; +    } + +    @Override +    public void put(String userid, TrainingRecord record) throws IOException { +        String lastVideo = record.getLastVideo(); +        Playlist playlist = record.getPlaylist(); + +        mDb.putKey(COLUMN_FAMILY, userid, LAST_VIDEO_KEY, lastVideo); +        mPlaylistProvider.put(new CassandraKey(COLUMN_FAMILY, userid, PLAYLIST_KEY), playlist); +    } + +    /** +     * @return the default playlist stored in the database. +     */ +    public Playlist getDefaultPlaylist() throws IOException { +        Playlist playlist = mPlaylistProvider.get(DEFAULT_PLAYLIST_KEY); + +        if (playlist == null) { +            playlist = new Playlist(); +        } + +        return playlist; +    } +}  | 
