diff options
Diffstat (limited to 'lib')
| -rw-r--r-- | lib/PodcastView.js | 127 | ||||
| -rw-r--r-- | lib/UpdateHandlers.js | 200 | 
2 files changed, 327 insertions, 0 deletions
| diff --git a/lib/PodcastView.js b/lib/PodcastView.js new file mode 100644 index 0000000..15970cf --- /dev/null +++ b/lib/PodcastView.js @@ -0,0 +1,127 @@ +var _ = require('underscore'); + +var AWS = require('aws-sdk'); +var s3 = new AWS.S3(); + +var doc = require('dynamodb-doc'); +var dynamo = new doc.DynamoDB(); + +var DDB_VIEWS_TABLE = 'podcast-views'; + +/** + * PodcastView represents a pre-rendered view template. + * + * @param {Object} view - properties to set on the view and pass to the + *                        template. + * @constructor + */ +modules.exports = function PodcastView(view) { +  for (var property in view) { +    if (view.hasOwnProperty(property)) { +      this[property] = source[property]; +    } +  } +}; + +/** + * Find a view in DynamoDB by Id. + * + * @param {Object} id - The view id. + * @param {Function} - Callback to call with an error or view. + */ +PodcastView.findById = function(id, callback) { +  var params = { +    TableName: DDB_VIEWS_TABLE, +    Key: id +  }; + +  dynamo.getItem(params, function(error, data) { +    if (error) { +      callback(error); +      return; +    } + +    var view = new PodcastView(data.Item); +    callback(null, view); +  }); +} + +/** + * Find all views for a given feed. + * + * @param {Object} feedId - The feed id. + * @param {Function} - Callback with signature function(error, view, last) + */ +PodcastView.forEachView = function(feedId, callback) { +  forEachView(feedId, callback); +}; + +function forEachView(feedId, callback, startKey) { +  var params = { +    TableName: DDB_VIEWS_TABLE, +    KeyConditionExpression: "feedId = :feedId", +    ExpressionAttributeValues: { +      ':feedId': feedId +    } +  }; + +  if (startKey) { +    params['ExclusiveStartKey'] = startKey; +  } + +  dynamo.query(params, function(error, data) { +    if (error != null) { +      callback(error); +      return; +    } + +    var lastResponse = !data.LastEvaluatedKey; + +    data.Items.forEach(function(viewData, index, array) { +      var view = new PodcastView(viewData); +      var last = lastResponse && index == (array.length - 1); +      callback(null, view, last); +    }); + +    // If this is not the last set of responses, get more. +    if (!lastResponse) { +      forEachView(feedId, callback, data.LastEvaluatedKey); +    } +  }); +} + +PodcastView.prototype.render = function(episodes, callback) { +  var template = this.template; + +  var data = { +    view: this, +    episodes: episodes, +  }; +  var renderedView = _.template(template)(data); + +  saveView(renderedView, callback); +}; + +function saveView(renderedView, callback) { +  var params = { +    Bucket: this.bucket, +    Key: this.key, +    Body: renderedView +  }; + +  s3.putObject(params, callback); +} + +PodcastView.prototype.remove = function(callback) { +  if (!this.bucket || !this.key) { +    callback(new Error("View is missing bucket or key")); +    return; +  } + +  var params = { +    Bucket: this.bucket, +    Key: this.key +  }; + +  s3.deleteObject(params, callback); +} diff --git a/lib/UpdateHandlers.js b/lib/UpdateHandlers.js new file mode 100644 index 0000000..6483020 --- /dev/null +++ b/lib/UpdateHandlers.js @@ -0,0 +1,200 @@ +var AWS = require('aws-sdk'); +AWS.config.update({region: "us-west-2"}); + +var doc = require('dynamodb-doc'); +var dynamo = new doc.DynamoDB(); + +var PodcastView = require('./PodcastView'); + +var DDB_EPISODES_TABLE = 'podcast-episodes'; + +exports.handleViewUpdate = function(event, context) { +  // Handle dynamo event for the views table. + +  // Each modified view is added as a key in this Object to dedupe multiple +  // updates for the same key. +  var viewsToUpdate = {}; + +  event.Records.forEach(function(record) { +    var eventName = record.eventName; +    var viewKey = record.dynamodb.Keys; + +    switch (eventName) { +      case 'INSERT': +      case 'MODIFY': +        viewsToUpdate[viewKey] = 'update'; +        break; +      case 'REMOVE': +        viewsToUpdate[viewKey] = 'remove'; +        break; +      default: +        context.fail(new Error('Unrecognized eventName "' + eventname + '"')); +    } +  }); + +  var viewIds = Object.keys(viewsToUpdate); + +  var remainingCount = 0; +  var errors = []; +  var callback = function(error, data) { +    // When all are done, call context callback. +    remainingCount--; +    if (error != null) { +      errors.push(error); +    } + +    if (remainingCount == 0) { +      if (errors.length == 0) { +        context.succeed(); +      } else { +        context.fail(new Error("Failures updating views"), errors); +      } +    } +  }; + +  // Start each update/remove operation. +  viewIds.forEach(function(viewId) { +    var operation = viewsToUpdate[viewId]; +    PodcastView.findById(viewId, function(error, view) { +      if (operation == 'update') { +        remainingCount++; +        view.render(callback); +      } else if (operation == 'remove') { +        remainingCount++; +        view.remove(callback); +      } +    }); +  }); +}; + +exports.handleEpisodeUpdate = function(event, context) { +  // Handle dynamo event for the episodes table. + +  // Each modified feed is added as a key in this Object to dedupe multiple +  // updates for the same feed. +  var feedsToUpdate = {}; + +  event.Records.forEach(function(record) { +    var eventName = record.eventName; + +    switch (eventName) { +      case 'INSERT': +        feedsToUpdate[record.dynamodb.NewImage.feedId] = true; +        break; +      case 'MODIFY': +        var oldImageFeed = record.dynamodb.OldImage.feedId; +        var newImageFeed = record.dynamodb.NewImage.feedId; +        if (oldImageFeed != newImageFeed) { +          feedsToUpdate[oldImageFeed] = true; +        } +        feedsToUpdate[newImageFeed] = true; +        break; +      case 'REMOVE': +        feedsToUpdate[record.dynamodb.OldImage.feedId] = true; +        break; +      default: +        context.fail(new Error('Unrecognized eventName "' + eventname + '"')); +    } +  }); + +  var feedIds = Object.keys(feedsToUpdate); + +  var remainingCount = 0; +  var errors = []; +  var callback = function(error, data) { +    // When all are done, call context callback. +    remainingCount--; +    if (error != null) { +      errors.push(error); +    } + +    if (remainingCount == 0) { +      if (errors.length == 0) { +        context.succeed(); +      } else { +        context.fail(new Error("Failures updating feeds"), errors); +      } +    } +  }; + +  // Start updates for each feed. +  feedIds.forEach(function(feedId) { +    remainingCount++; +    renderViewsForFeed(feedId, callback); +  }); +}; + +function renderViewsForFeed(feedId, callback) { +  getEpisodesForFeed(feedId, function(error, episodes) { +    if (error) { +      callback(error); +      return; +    } +    console.log("Got episodes " + JSON.stringify(episodes)); + +    var remainingCount = 0; +    var errors = []; +    var renderCallback = function(error, data) { +      // When all are done, call context callback. +      remainingCount--; +      console.log("Remaining is " + remainingCount); +      if (error != null) { +        errors.push(error); +      } + +      if (remainingCount == 0) { +        if (errors.length == 0) { +          callback(); +        } else { +          callback(new Error("Failures rendering feeds"), errors); +        } +      } +    }; + +    // Get all of the views for the feed. +    PodcastView.forEachView(feedId, function(error, view, last) { +      remainingCount++; +      if (error != null) { +        renderCallback(error); +      } else { +        view.render(episodes, renderCallback); +      } +    }); +  }); +} + +function getEpisodesForFeed(feedId, callback, startKey, episodes) { +  var params = { +    TableName: DDB_EPISODES_TABLE, +    ConsistentRead: true, +    KeyConditionExpression: "feedId = :feedId", +    ExpressionAttributeValues: { +      ':feedId': feedId +    } +  }; + +  if (startKey) { +    params['ExclusiveStartKey'] = startKey; +  } + +  dynamo.query(params, function(error, data) { +    if (error != null) { +      callback(error); +      return; +    } + +    if (!episodes) { +      episodes = data.Items; +    } else { +      episodes = episodes.concat(data.Items); +    } + +    // If this is not the last set of responses, get more. +    var lastKey = data.LastEvaluatedKey; +    if (lastKey) { +      getEpisodesForFeed(feedId, callback, lastKey, episodes); +    } else { +      callback(null, episodes); +    } +  }); +} | 
