#[macro_use] extern crate diesel; use actix_files::Files; use actix_web::http::header; use actix_web::{middleware, web, HttpResponse, HttpServer, Responder}; use chrono::{DateTime, Duration, Utc}; use diesel::prelude::*; use diesel::sqlite::SqliteConnection; use diesel::Connection; use hmac::{Hmac, Mac}; use serde::{Deserialize, Serialize}; use sha2::Sha256; use std::collections::BTreeMap; use std::io; use std::sync::Mutex; mod maxmin; mod models; mod schema; use self::maxmin::*; use self::models::*; /// Application context to be available on every request. struct Context { db: Mutex, secret_key: Vec, } /// Status of a device. #[derive(Serialize)] struct DeviceResponse { device_id: String, name: String, battery_level: Option, current_value: Option, water_level: Option, last_watered: Option>, last_updated: Option>, } /// Time series of datapoints for a device. #[derive(Serialize)] struct DeviceData { device_id: String, device: DeviceResponse, data: Vec, } /// Data for a series of devices. #[derive(Serialize)] struct DataResponse { devices: Vec, } /// Data for a series of devices. #[derive(Serialize, Deserialize)] struct PutDataRequest { device_id: String, value: f64, battery_value: f64, signature: String, } impl DeviceResponse { pub fn calculate_status(&mut self, data: &[Datapoint]) { self.last_updated = data .last() .map(|v| DateTime::::from_utc(v.timestamp, Utc)); self.battery_level = data.last().map(|v| v.battery_status); // Find the local extrema let mut values = data.iter().map(|v| v.value).collect(); normalize(&mut values); smooth(&mut values, 3.0); derive(&mut values); let extrema = find_extrema(&values); // Look for a sharp drop in the value to indicate watering. let last_watered_index = extrema .iter() .filter_map(|x| match x { Extremum::Minimum(i) => { if values[*i] < -0.1 { Some(i) } else { None } } _ => None, }) .last(); self.last_watered = last_watered_index.map(|i| DateTime::::from_utc(data[*i].timestamp, Utc)); // How much water is left? let low_watermark = last_watered_index.map(|i| data[*i].value); let high_watermark = last_watered_index .and_then(|i| { extrema .iter() .rev() .filter_map(|x| match x { Extremum::Maximum(j) => { if j < i { Some(j) } else { None } } _ => None, }) .next() }) .map(|i| data[*i].value); self.current_value = data.last().map(|v| v.value); self.water_level = if let (Some(high), Some(low), Some(current)) = (high_watermark, low_watermark, self.current_value) { Some( 1.0 - (current - f64::min(low, current)) / (f64::max(high, current) - f64::max(low, current)), ) } else { None }; } } impl From for DeviceResponse { fn from(device: Device) -> DeviceResponse { DeviceResponse { device_id: device.device_id, name: device.name, battery_level: None, water_level: None, current_value: None, last_watered: None, last_updated: None, } } } impl DeviceData { pub fn new(device: Device) -> Self { DeviceData { device_id: device.device_id.to_string(), device: device.into(), data: Vec::new(), } } pub fn add_datapoint(&mut self, datapoint: Datapoint) { self.data.push(datapoint); } pub fn calculate_status(&mut self) { self.device.calculate_status(&self.data); } } impl DataResponse { pub fn new() -> Self { DataResponse { devices: Vec::new(), } } pub fn add_device(&mut self, device_data: DeviceData) { self.devices.push(device_data); } } impl PutDataRequest { pub fn validate_signature(&self, secret_key: &[u8]) -> bool { let data = format!( "battery_value={:0.2}&device_id={}&value={:0.2}", self.battery_value, urlencoding::encode(&self.device_id), self.value ); let signature = match hex::decode(&self.signature) { Ok(signature) => signature, Err(_) => { log::info!( "Request has a malformed hex signature '{}'", &self.signature ); return false; } }; let mut mac = match Hmac::::new_from_slice(secret_key) { Ok(mac) => mac, Err(e) => { log::error!("Bad secret key configured: {}", e); return false; } }; log::debug!( "Verifying data '{}' matches signature '{}'", &data, &self.signature ); mac.update(data.as_bytes()); mac.verify_slice(&signature).is_ok() } } async fn get_data(ctx: web::Data) -> impl Responder { use self::schema::data::dsl::*; use self::schema::devices::dsl::*; let mut db = ctx.db.lock().unwrap(); let device_records = devices .load::(&mut *db) .expect("Error loading data"); let mut devicemap = BTreeMap::new(); device_records.into_iter().for_each(|record| { devicemap.insert(record.device_id.to_string(), DeviceData::new(record)); }); let start_time = Utc::now() - Duration::days(30); let data_records = data .filter(timestamp.ge(start_time.naive_utc())) .load::(&mut *db) .expect("Error loading data"); for datapoint in data_records { devicemap .get_mut(&datapoint.device_id) .expect("Missing device in database!") .add_datapoint(datapoint); } let mut response = DataResponse::new(); devicemap.into_iter().for_each(|(_k, mut v)| { v.calculate_status(); response.add_device(v); }); HttpResponse::Ok() .header(header::CACHE_CONTROL, "no-store") .header(header::PRAGMA, "no-cache") .json(response) } async fn put_data(req: web::Form, ctx: web::Data) -> impl Responder { use self::schema::data; use self::schema::devices; if !req.validate_signature(&ctx.secret_key) { return HttpResponse::BadRequest().body("{error:\"Invalid signature\"}"); } let mut db = ctx.db.lock().unwrap(); // Add the device record, if needed... let device_record = Device { device_id: req.device_id.to_string(), name: "A beautiful flower".to_string(), }; diesel::insert_into(devices::table) .values(&device_record) .on_conflict_do_nothing() .execute(&mut *db) .expect("Insert device record"); // Record the datapoint let datapoint = Datapoint { device_id: req.device_id.to_string(), timestamp: Utc::now().naive_utc(), value: req.value, battery_status: req.battery_value, }; diesel::insert_into(data::table) .values(&datapoint) .execute(&mut *db) .expect("Insert datapoint record"); HttpResponse::Created() .header(header::CACHE_CONTROL, "no-store") .header(header::PRAGMA, "no-cache") .finish() } fn open_database(db_filename: &str) -> io::Result { SqliteConnection::establish(db_filename).map_err(|e| io::Error::new(io::ErrorKind::Other, e)) } #[actix_rt::main] async fn main() -> io::Result<()> { dotenv::dotenv().ok(); env_logger::init(); let bind = std::env::var("BIND").unwrap_or_else(|_| ":::8180".to_string()); let db_path = std::env::var("DATABASE_URL").expect("Missing DATABASE env variable"); let secret_key = std::env::var("SECRET_KEY").expect("Missing SECRET_KEY env variable"); let ctx = web::Data::new(Context { db: Mutex::new(open_database(&db_path)?), secret_key: secret_key.into_bytes(), }); HttpServer::new(move || { actix_web::App::new() .wrap(middleware::Logger::default()) .app_data(ctx.clone()) .route("/data", web::get().to(get_data)) .route("/data", web::put().to(put_data)) .service(Files::new("/", "www/").index_file("index.html")) }) .bind(bind) .unwrap() .run() .await }