diff options
author | Jesse Morgan <jesse@jesterpm.net> | 2016-04-09 14:22:20 -0700 |
---|---|---|
committer | Jesse Morgan <jesse@jesterpm.net> | 2016-04-09 15:48:01 -0700 |
commit | 3102d8bce3426d9cf41aeaf201c360d342677770 (patch) | |
tree | 38c4f1e8828f9af9c4b77a173bee0d312b321698 /src/main/java/com/p4square/grow/backend/db | |
parent | bbf907e51dfcf157bdee24dead1d531122aa25db (diff) |
Switching from Ivy+Ant to Maven.
Diffstat (limited to 'src/main/java/com/p4square/grow/backend/db')
5 files changed, 463 insertions, 0 deletions
diff --git a/src/main/java/com/p4square/grow/backend/db/CassandraCollectionProvider.java b/src/main/java/com/p4square/grow/backend/db/CassandraCollectionProvider.java new file mode 100644 index 0000000..bfcb48d --- /dev/null +++ b/src/main/java/com/p4square/grow/backend/db/CassandraCollectionProvider.java @@ -0,0 +1,109 @@ +/* + * Copyright 2013 Jesse Morgan + */ + +package com.p4square.grow.backend.db; + +import java.io.IOException; + +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.Map; + +import com.netflix.astyanax.model.Column; +import com.netflix.astyanax.model.ColumnList; + +import com.p4square.grow.provider.CollectionProvider; +import com.p4square.grow.provider.JsonEncodedProvider; + +/** + * CollectionProvider implementation backed by a Cassandra ColumnFamily. + * + * @author Jesse Morgan <jesse@jesterpm.net> + */ +public class CassandraCollectionProvider<V> implements CollectionProvider<String, String, V> { + private final CassandraDatabase mDb; + private final String mCF; + private final Class<V> mClazz; + + public CassandraCollectionProvider(CassandraDatabase db, String columnFamily, Class<V> clazz) { + mDb = db; + mCF = columnFamily; + mClazz = clazz; + } + + @Override + public V get(String collection, String key) throws IOException { + String blob = mDb.getKey(mCF, collection, key); + return decode(blob); + } + + @Override + public Map<String, V> query(String collection) throws IOException { + return query(collection, -1); + } + + @Override + public Map<String, V> query(String collection, int limit) throws IOException { + Map<String, V> result = new LinkedHashMap<>(); + + ColumnList<String> row = mDb.getRow(mCF, collection); + if (!row.isEmpty()) { + int count = 0; + for (Column<String> c : row) { + if (limit >= 0 && ++count > limit) { + break; // Limit reached. + } + + String key = c.getName(); + String blob = c.getStringValue(); + V obj = decode(blob); + + result.put(key, obj); + } + } + + return Collections.unmodifiableMap(result); + } + + @Override + public void put(String collection, String key, V obj) throws IOException { + String blob = encode(obj); + mDb.putKey(mCF, collection, key, blob); + } + + /** + * Encode the object as JSON. + * + * @param obj The object to encode. + * @return The JSON encoding of obj. + * @throws IOException if the object cannot be encoded. + */ + protected String encode(V obj) throws IOException { + if (mClazz == String.class) { + return (String) obj; + } else { + return JsonEncodedProvider.MAPPER.writeValueAsString(obj); + } + } + + /** + * Decode the JSON string as an object. + * + * @param blob The JSON data to decode. + * @return The decoded object or null if blob is null. + * @throws IOException If an object cannot be decoded. + */ + protected V decode(String blob) throws IOException { + if (blob == null) { + return null; + } + + if (mClazz == String.class) { + return (V) blob; + } + + V obj = JsonEncodedProvider.MAPPER.readValue(blob, mClazz); + return obj; + } +} diff --git a/src/main/java/com/p4square/grow/backend/db/CassandraDatabase.java b/src/main/java/com/p4square/grow/backend/db/CassandraDatabase.java new file mode 100644 index 0000000..b8cb6df --- /dev/null +++ b/src/main/java/com/p4square/grow/backend/db/CassandraDatabase.java @@ -0,0 +1,212 @@ +/* + * Copyright 2013 Jesse Morgan + */ + +package com.p4square.grow.backend.db; + +import com.netflix.astyanax.AstyanaxContext; +import com.netflix.astyanax.connectionpool.exceptions.ConnectionException; +import com.netflix.astyanax.connectionpool.impl.ConnectionPoolConfigurationImpl; +import com.netflix.astyanax.connectionpool.impl.CountingConnectionPoolMonitor; +import com.netflix.astyanax.connectionpool.NodeDiscoveryType; +import com.netflix.astyanax.connectionpool.OperationResult; +import com.netflix.astyanax.impl.AstyanaxConfigurationImpl; +import com.netflix.astyanax.Keyspace; +import com.netflix.astyanax.ColumnMutation; +import com.netflix.astyanax.model.Column; +import com.netflix.astyanax.model.ColumnFamily; +import com.netflix.astyanax.model.ColumnList; +import com.netflix.astyanax.ColumnListMutation; +import com.netflix.astyanax.MutationBatch; +import com.netflix.astyanax.serializers.StringSerializer; +import com.netflix.astyanax.thrift.ThriftFamilyFactory; + +import org.apache.log4j.Logger; + +/** + * Cassandra Database Abstraction for the Backend. + * + * @author Jesse Morgan <jesse@jesterpm.net> + */ +public class CassandraDatabase { + private static Logger cLog = Logger.getLogger(CassandraDatabase.class); + + // Configuration fields. + private String mClusterName; + private String mKeyspaceName; + private String mSeedEndpoint = "127.0.0.1:9160"; + private int mPort = 9160; + + private AstyanaxContext<Keyspace> mContext; + private Keyspace mKeyspace; + + /** + * Connect to Cassandra. + * + * Cluster and Keyspace must be set before calling init(). + */ + public void init() { + mContext = new AstyanaxContext.Builder() + .forCluster(mClusterName) + .forKeyspace(mKeyspaceName) + .withAstyanaxConfiguration(new AstyanaxConfigurationImpl() + .setDiscoveryType(NodeDiscoveryType.RING_DESCRIBE) + ) + .withConnectionPoolConfiguration(new ConnectionPoolConfigurationImpl("MyConnectionPool") + .setPort(mPort) + .setMaxConnsPerHost(1) + .setSeeds(mSeedEndpoint) + ) + .withConnectionPoolMonitor(new CountingConnectionPoolMonitor()) + .buildKeyspace(ThriftFamilyFactory.getInstance()); + + mContext.start(); + mKeyspace = mContext.getClient(); + } + + /** + * Close the database connection. + */ + public void close() { + mContext.shutdown(); + } + + /** + * Set the cluster name to connect to. + */ + public void setClusterName(final String cluster) { + mClusterName = cluster; + } + + /** + * Set the name of the keyspace to open. + */ + public void setKeyspaceName(final String keyspace) { + mKeyspaceName = keyspace; + } + + /** + * Change the seed endpoint. + * The default is 127.0.0.1:9160. + */ + public void setSeedEndpoint(final String endpoint) { + mSeedEndpoint = endpoint; + } + + /** + * Change the port to connect to. + * The default is 9160. + */ + public void setPort(final int port) { + mPort = port; + } + + /** + * @return The entire row associated with this key. + */ + public ColumnList<String> getRow(final String cfName, final String key) { + try { + ColumnFamily<String, String> cf = new ColumnFamily(cfName, + StringSerializer.get(), + StringSerializer.get()); + + OperationResult<ColumnList<String>> result = + mKeyspace.prepareQuery(cf) + .getKey(key) + .execute(); + + return result.getResult(); + + } catch (ConnectionException e) { + cLog.error("getRow failed due to Connection Exception", e); + throw new RuntimeException(e); + } + } + + /** + * @return The value associated with the given key. + */ + public String getKey(final String cfName, final String key) { + return getKey(cfName, key, "value"); + } + + /** + * @return The value associated with the given key, column pair. + */ + public String getKey(final String cfName, final String key, final String column) { + final ColumnList<String> row = getRow(cfName, key); + + if (row != null) { + final Column rowColumn = row.getColumnByName(column); + if (rowColumn != null) { + return rowColumn.getStringValue(); + } + } + + return null; + } + + /** + * Assign value to key. + */ + public void putKey(final String cfName, final String key, final String value) { + putKey(cfName, key, "value", value); + } + + /** + * Assign value to the key, column pair. + */ + public void putKey(final String cfName, final String key, + final String column, final String value) { + + ColumnFamily<String, String> cf = new ColumnFamily(cfName, + StringSerializer.get(), + StringSerializer.get()); + + MutationBatch m = mKeyspace.prepareMutationBatch(); + m.withRow(cf, key).putColumn(column, value); + + try { + m.execute(); + } catch (ConnectionException e) { + cLog.error("putKey failed due to Connection Exception", e); + throw new RuntimeException(e); + } + } + + /** + * Remove a key, column pair. + */ + public void deleteKey(final String cfName, final String key, final String column) { + ColumnFamily<String, String> cf = new ColumnFamily(cfName, + StringSerializer.get(), + StringSerializer.get()); + + try { + ColumnMutation m = mKeyspace.prepareColumnMutation(cf, key, column); + m.deleteColumn().execute(); + } catch (ConnectionException e) { + cLog.error("deleteKey failed due to Connection Exception", e); + throw new RuntimeException(e); + } + } + + /** + * Remove a row + */ + public void deleteRow(final String cfName, final String key) { + ColumnFamily<String, String> cf = new ColumnFamily(cfName, + StringSerializer.get(), + StringSerializer.get()); + + try { + MutationBatch batch = mKeyspace.prepareMutationBatch(); + ColumnListMutation<String> cfm = batch.withRow(cf, key).delete(); + batch.execute(); + + } catch (ConnectionException e) { + cLog.error("deleteRow failed due to Connection Exception", e); + throw new RuntimeException(e); + } + } +} diff --git a/src/main/java/com/p4square/grow/backend/db/CassandraKey.java b/src/main/java/com/p4square/grow/backend/db/CassandraKey.java new file mode 100644 index 0000000..853fe96 --- /dev/null +++ b/src/main/java/com/p4square/grow/backend/db/CassandraKey.java @@ -0,0 +1,34 @@ +/* + * Copyright 2013 Jesse Morgan + */ + +package com.p4square.grow.backend.db; + +/** + * CassandraKey represents a Cassandra key / column pair. + * + * @author Jesse Morgan <jesse@jesterpm.net> + */ +public class CassandraKey { + private final String mColumnFamily; + private final String mId; + private final String mColumn; + + public CassandraKey(String columnFamily, String id, String column) { + mColumnFamily = columnFamily; + mId = id; + mColumn = column; + } + + public String getColumnFamily() { + return mColumnFamily; + } + + public String getId() { + return mId; + } + + public String getColumn() { + return mColumn; + } +} diff --git a/src/main/java/com/p4square/grow/backend/db/CassandraProviderImpl.java b/src/main/java/com/p4square/grow/backend/db/CassandraProviderImpl.java new file mode 100644 index 0000000..da5a9f2 --- /dev/null +++ b/src/main/java/com/p4square/grow/backend/db/CassandraProviderImpl.java @@ -0,0 +1,37 @@ +/* + * Copyright 2013 Jesse Morgan + */ + +package com.p4square.grow.backend.db; + +import java.io.IOException; + +import com.p4square.grow.provider.Provider; +import com.p4square.grow.provider.JsonEncodedProvider; + +/** + * Provider implementation backed by a Cassandra ColumnFamily. + * + * @author Jesse Morgan <jesse@jesterpm.net> + */ +public class CassandraProviderImpl<V> extends JsonEncodedProvider<V> implements Provider<CassandraKey, V> { + private final CassandraDatabase mDb; + + public CassandraProviderImpl(CassandraDatabase db, Class<V> clazz) { + super(clazz); + + mDb = db; + } + + @Override + public V get(CassandraKey key) throws IOException { + String blob = mDb.getKey(key.getColumnFamily(), key.getId(), key.getColumn()); + return decode(blob); + } + + @Override + public void put(CassandraKey key, V obj) throws IOException { + String blob = encode(obj); + mDb.putKey(key.getColumnFamily(), key.getId(), key.getColumn(), blob); + } +} diff --git a/src/main/java/com/p4square/grow/backend/db/CassandraTrainingRecordProvider.java b/src/main/java/com/p4square/grow/backend/db/CassandraTrainingRecordProvider.java new file mode 100644 index 0000000..4face52 --- /dev/null +++ b/src/main/java/com/p4square/grow/backend/db/CassandraTrainingRecordProvider.java @@ -0,0 +1,71 @@ +/* + * Copyright 2013 Jesse Morgan + */ + +package com.p4square.grow.backend.db; + +import java.io.IOException; + +import com.p4square.grow.model.Playlist; +import com.p4square.grow.model.TrainingRecord; + +import com.p4square.grow.provider.JsonEncodedProvider; +import com.p4square.grow.provider.Provider; + +/** + * + * @author Jesse Morgan <jesse@jesterpm.net> + */ +public class CassandraTrainingRecordProvider implements Provider<String, TrainingRecord> { + private static final CassandraKey DEFAULT_PLAYLIST_KEY = new CassandraKey("strings", "defaultPlaylist", "value"); + + private static final String COLUMN_FAMILY = "training"; + private static final String PLAYLIST_KEY = "playlist"; + private static final String LAST_VIDEO_KEY = "lastVideo"; + + private final CassandraDatabase mDb; + private final Provider<CassandraKey, Playlist> mPlaylistProvider; + + public CassandraTrainingRecordProvider(CassandraDatabase db) { + mDb = db; + mPlaylistProvider = new CassandraProviderImpl<>(db, Playlist.class); + } + + @Override + public TrainingRecord get(String userid) throws IOException { + Playlist playlist = mPlaylistProvider.get(new CassandraKey(COLUMN_FAMILY, userid, PLAYLIST_KEY)); + + if (playlist == null) { + // We consider no playlist to mean no record whatsoever. + return null; + } + + TrainingRecord r = new TrainingRecord(); + r.setPlaylist(playlist); + r.setLastVideo(mDb.getKey(COLUMN_FAMILY, userid, LAST_VIDEO_KEY)); + + return r; + } + + @Override + public void put(String userid, TrainingRecord record) throws IOException { + String lastVideo = record.getLastVideo(); + Playlist playlist = record.getPlaylist(); + + mDb.putKey(COLUMN_FAMILY, userid, LAST_VIDEO_KEY, lastVideo); + mPlaylistProvider.put(new CassandraKey(COLUMN_FAMILY, userid, PLAYLIST_KEY), playlist); + } + + /** + * @return the default playlist stored in the database. + */ + public Playlist getDefaultPlaylist() throws IOException { + Playlist playlist = mPlaylistProvider.get(DEFAULT_PLAYLIST_KEY); + + if (playlist == null) { + playlist = new Playlist(); + } + + return playlist; + } +} |