#[macro_use] extern crate diesel; use actix_files::Files; use actix_web::http::header; use actix_web::{middleware, web, HttpResponse, HttpServer, Responder}; use chrono::{DateTime, Duration, Utc}; use diesel::prelude::*; use diesel::sqlite::SqliteConnection; use diesel::Connection; use hmac::{Hmac, Mac}; use serde::{Deserialize, Serialize}; use sha2::Sha256; use std::collections::BTreeMap; use std::io; use std::sync::Mutex; mod maxmin; mod models; mod schema; mod device; use self::maxmin::*; use self::models::*; use self::device::DeviceResponse; /// Application context to be available on every request. struct Context { db: Mutex, secret_key: Vec, } /// Time series of datapoints for a device. #[derive(Serialize)] struct DeviceData { device_id: String, device: DeviceResponse, data: Vec, } /// Data for a series of devices. #[derive(Serialize)] struct DataResponse { devices: Vec, } /// Data for a series of devices. #[derive(Serialize, Deserialize)] struct PutDataRequest { device_id: String, value: f64, battery_value: f64, signature: String, temperature_value: Option, relative_humidity_value: Option, } impl DeviceData { pub fn new(device: Device) -> Self { DeviceData { device_id: device.device_id.to_string(), device: device.into(), data: Vec::new(), } } pub fn add_datapoint(&mut self, datapoint: Datapoint) { self.data.push(datapoint); } pub fn calculate_status(&mut self) { self.device.calculate_status(&self.data); } } impl DataResponse { pub fn new() -> Self { DataResponse { devices: Vec::new(), } } pub fn add_device(&mut self, device_data: DeviceData) { self.devices.push(device_data); } } impl PutDataRequest { pub fn validate_signature(&self, secret_key: &[u8]) -> bool { let data = format!( "battery_value={:0.2}&device_id={}&value={:0.2}", self.battery_value, urlencoding::encode(&self.device_id), self.value ); let signature = match hex::decode(&self.signature) { Ok(signature) => signature, Err(_) => { log::info!( "Request has a malformed hex signature '{}'", &self.signature ); return false; } }; let mut mac = match Hmac::::new_from_slice(secret_key) { Ok(mac) => mac, Err(e) => { log::error!("Bad secret key configured: {}", e); return false; } }; log::debug!( "Verifying data '{}' matches signature '{}'", &data, &self.signature ); mac.update(data.as_bytes()); mac.verify_slice(&signature).is_ok() } } async fn get_data(ctx: web::Data) -> impl Responder { use self::schema::data::dsl::*; use self::schema::devices::dsl::*; let mut db = ctx.db.lock().unwrap(); let device_records = devices .load::(&mut *db) .expect("Error loading data"); let mut devicemap = BTreeMap::new(); device_records.into_iter().for_each(|record| { devicemap.insert(record.device_id.to_string(), DeviceData::new(record)); }); let start_time = Utc::now() - Duration::days(30); let data_records = data .filter(timestamp.ge(start_time.naive_utc())) .load::(&mut *db) .expect("Error loading data"); for datapoint in data_records { devicemap .get_mut(&datapoint.device_id) .expect("Missing device in database!") .add_datapoint(datapoint); } let mut response = DataResponse::new(); devicemap.into_iter().for_each(|(_k, mut v)| { v.calculate_status(); response.add_device(v); }); HttpResponse::Ok() .header(header::CACHE_CONTROL, "no-store") .header(header::PRAGMA, "no-cache") .json(response) } async fn put_data(req: web::Form, ctx: web::Data) -> impl Responder { use self::schema::data; use self::schema::devices; if !req.validate_signature(&ctx.secret_key) { return HttpResponse::BadRequest().body("{error:\"Invalid signature\"}"); } let mut db = ctx.db.lock().unwrap(); // Add the device record, if needed... let device_record = Device { device_id: req.device_id.to_string(), name: "A beautiful flower".to_string(), }; diesel::insert_into(devices::table) .values(&device_record) .on_conflict_do_nothing() .execute(&mut *db) .expect("Insert device record"); // Record the datapoint let datapoint = Datapoint { device_id: req.device_id.to_string(), timestamp: Utc::now().naive_utc(), value: req.value, battery_status: req.battery_value, }; diesel::insert_into(data::table) .values(&datapoint) .execute(&mut *db) .expect("Insert datapoint record"); HttpResponse::Created() .header(header::CACHE_CONTROL, "no-store") .header(header::PRAGMA, "no-cache") .finish() } fn open_database(db_filename: &str) -> io::Result { SqliteConnection::establish(db_filename).map_err(|e| io::Error::new(io::ErrorKind::Other, e)) } #[actix_rt::main] async fn main() -> io::Result<()> { dotenv::dotenv().ok(); env_logger::init(); let bind = std::env::var("BIND").unwrap_or_else(|_| ":::8180".to_string()); let db_path = std::env::var("DATABASE_URL").expect("Missing DATABASE env variable"); let secret_key = std::env::var("SECRET_KEY").expect("Missing SECRET_KEY env variable"); let ctx = web::Data::new(Context { db: Mutex::new(open_database(&db_path)?), secret_key: secret_key.into_bytes(), }); HttpServer::new(move || { actix_web::App::new() .wrap(middleware::Logger::default()) .app_data(ctx.clone()) .route("/data", web::get().to(get_data)) .route("/data", web::put().to(put_data)) .service(Files::new("/", "./www").index_file("index.html")) }) .bind(bind) .unwrap() .run() .await }