summaryrefslogtreecommitdiff
path: root/src/main.rs
diff options
context:
space:
mode:
authorJesse Morgan <jesse@jesterpm.net>2021-12-16 09:04:08 -0800
committerJesse Morgan <jesse@jesterpm.net>2021-12-16 09:09:20 -0800
commitb13c9d23c792ec8c1764444a96986f0800d7084a (patch)
tree4966b426b090e9fda0172558a3e44ccb8b40cae8 /src/main.rs
Initial commit of the flowerpot service
At this point it can store and retrieve data from the sqlite database. Everything could definitely use some cleanup and proper error handling...
Diffstat (limited to 'src/main.rs')
-rw-r--r--src/main.rs206
1 files changed, 206 insertions, 0 deletions
diff --git a/src/main.rs b/src/main.rs
new file mode 100644
index 0000000..ad27759
--- /dev/null
+++ b/src/main.rs
@@ -0,0 +1,206 @@
+#[macro_use]
+extern crate diesel;
+
+use actix_web::{middleware, web, HttpResponse, HttpServer, Responder};
+use chrono::{Duration, Utc};
+use diesel::Connection;
+use diesel::prelude::*;
+use diesel::sqlite::SqliteConnection;
+use hmac::{Hmac, Mac};
+use serde::{Deserialize, Serialize};
+use sha2::Sha256;
+use std::collections::BTreeMap;
+use std::io;
+use std::sync::Mutex;
+
+mod models;
+mod schema;
+
+use self::models::*;
+
+/// Application context to be available on every request.
+struct Context {
+ db: Mutex<SqliteConnection>,
+ secret_key: Vec<u8>,
+}
+
+/// Time series of datapoints for a device.
+#[derive(Serialize)]
+struct DeviceData {
+ device: Device,
+ data: Vec<Datapoint>,
+}
+
+/// Data for a series of devices.
+#[derive(Serialize)]
+struct DataResponse {
+ devices: Vec<DeviceData>,
+}
+
+/// Data for a series of devices.
+#[derive(Serialize, Deserialize)]
+struct PutDataRequest {
+ device_id: String,
+ value: f64,
+ battery_value: f64,
+ signature: String,
+}
+
+impl DeviceData {
+ pub fn new(device: Device) -> Self {
+ DeviceData {
+ device,
+ data: Vec::new(),
+ }
+ }
+
+ pub fn add_datapoint(&mut self, datapoint: Datapoint) {
+ self.data.push(datapoint);
+ }
+}
+
+impl DataResponse {
+ pub fn new() -> Self {
+ DataResponse {
+ devices: Vec::new(),
+ }
+ }
+
+ pub fn add_device(&mut self, device_data: DeviceData) {
+ self.devices.push(device_data);
+ }
+}
+
+impl PutDataRequest {
+ pub fn validate_signature(&self, secret_key: &[u8]) -> bool {
+ let data = format!(
+ "battery_value={}&device_id={}&value={}",
+ self.battery_value,
+ urlencoding::encode(&self.device_id),
+ self.value
+ );
+
+ let signature = match hex::decode(&self.signature) {
+ Ok(signature) => signature,
+ Err(_) => {
+ log::info!(
+ "Request has a malformed hex signature '{}'",
+ &self.signature
+ );
+ return false;
+ }
+ };
+
+ let mut mac = match Hmac::<Sha256>::new_from_slice(secret_key) {
+ Ok(mac) => mac,
+ Err(e) => {
+ log::error!("Bad secret key configured: {}", e);
+ return false;
+ }
+ };
+
+ mac.update(data.as_bytes());
+ mac.verify_slice(&signature).is_ok()
+ }
+}
+
+async fn get_data(ctx: web::Data<Context>) -> impl Responder {
+ use self::schema::data::dsl::*;
+ use self::schema::devices::dsl::*;
+
+ let mut db = ctx.db.lock().unwrap();
+
+ let device_records = devices
+ .load::<Device>(&mut *db)
+ .expect("Error loading data");
+ let mut devicemap = BTreeMap::new();
+ device_records.into_iter().for_each(|record| {
+ devicemap.insert(record.device_id.to_string(), DeviceData::new(record));
+ });
+
+ let start_time = Utc::now() - Duration::days(30);
+ let data_records = data
+ .filter(timestamp.ge(start_time.naive_utc()))
+ .load::<Datapoint>(&mut *db)
+ .expect("Error loading data");
+
+ for datapoint in data_records {
+ devicemap
+ .get_mut(&datapoint.device_id)
+ .expect("Missing device in database!")
+ .add_datapoint(datapoint);
+ }
+
+ let mut response = DataResponse::new();
+ devicemap
+ .into_iter()
+ .for_each(|(_k, v)| response.add_device(v));
+
+ HttpResponse::Ok().json(response)
+}
+
+async fn put_data(req: web::Form<PutDataRequest>, ctx: web::Data<Context>) -> impl Responder {
+ use self::schema::data;
+ use self::schema::devices;
+
+ if !req.validate_signature(&ctx.secret_key) {
+ return HttpResponse::BadRequest().body("{error:\"Invalid signature\"}");
+ }
+
+ let mut db = ctx.db.lock().unwrap();
+
+ // Add the device record, if needed...
+ let device_record = Device {
+ device_id: req.device_id.to_string(),
+ name: "A beautiful flower".to_string(),
+ };
+ diesel::insert_into(devices::table)
+ .values(&device_record)
+ .on_conflict_do_nothing()
+ .execute(&mut *db)
+ .expect("Insert device record");
+
+ // Record the datapoint
+ let datapoint = Datapoint {
+ device_id: req.device_id.to_string(),
+ timestamp: Utc::now().naive_utc(),
+ value: req.value,
+ };
+ diesel::insert_into(data::table)
+ .values(&datapoint)
+ .execute(&mut *db)
+ .expect("Insert datapoint record");
+
+ HttpResponse::Created().finish()
+}
+
+fn open_database(db_filename: &str) -> io::Result<SqliteConnection> {
+ SqliteConnection::establish(db_filename).map_err(|e| io::Error::new(io::ErrorKind::Other, e))
+}
+
+#[actix_rt::main]
+async fn main() -> io::Result<()> {
+ dotenv::dotenv().ok();
+ env_logger::init();
+
+ let bind = std::env::var("BIND").unwrap_or_else(|_| "127.0.0.1:8180".to_string());
+ let db_path = std::env::var("DATABASE_URL").expect("Missing DATABASE env variable");
+ let secret_key = std::env::var("SECRET_KEY").expect("Missing SECRET_KEY env variable");
+
+ let ctx = web::Data::new(Context {
+ db: Mutex::new(open_database(&db_path)?),
+ secret_key: secret_key.into_bytes(),
+ });
+
+ HttpServer::new(move || {
+ actix_web::App::new()
+ .wrap(middleware::Logger::default())
+ .app_data(ctx.clone())
+ .route("/data", web::get().to(get_data))
+ .route("/data", web::put().to(put_data))
+ })
+ .bind(bind)
+ .unwrap()
+ .run()
+ .await
+}