diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/main.rs | 139 | ||||
-rw-r--r-- | src/maxmin.rs | 145 | ||||
-rw-r--r-- | src/models.rs | 1 | ||||
-rw-r--r-- | src/schema.rs | 1 |
4 files changed, 275 insertions, 11 deletions
diff --git a/src/main.rs b/src/main.rs index ad27759..d7dda15 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,11 +1,13 @@ #[macro_use] extern crate diesel; +use actix_files::Files; +use actix_web::http::header; use actix_web::{middleware, web, HttpResponse, HttpServer, Responder}; -use chrono::{Duration, Utc}; -use diesel::Connection; +use chrono::{DateTime, Duration, Utc}; use diesel::prelude::*; use diesel::sqlite::SqliteConnection; +use diesel::Connection; use hmac::{Hmac, Mac}; use serde::{Deserialize, Serialize}; use sha2::Sha256; @@ -13,9 +15,11 @@ use std::collections::BTreeMap; use std::io; use std::sync::Mutex; +mod maxmin; mod models; mod schema; +use self::maxmin::*; use self::models::*; /// Application context to be available on every request. @@ -24,10 +28,23 @@ struct Context { secret_key: Vec<u8>, } +/// Status of a device. +#[derive(Serialize)] +struct DeviceResponse { + device_id: String, + name: String, + battery_level: Option<f64>, + current_value: Option<f64>, + water_level: Option<f64>, + last_watered: Option<DateTime<Utc>>, + last_updated: Option<DateTime<Utc>>, +} + /// Time series of datapoints for a device. #[derive(Serialize)] struct DeviceData { - device: Device, + device_id: String, + device: DeviceResponse, data: Vec<Datapoint>, } @@ -46,10 +63,92 @@ struct PutDataRequest { signature: String, } +impl DeviceResponse { + pub fn calculate_status(&mut self, data: &[Datapoint]) { + self.last_updated = data + .last() + .map(|v| DateTime::<Utc>::from_utc(v.timestamp, Utc)); + self.battery_level = data.last().map(|v| v.battery_status); + + // Find the local extrema + let mut values = data.iter().map(|v| v.value).collect(); + normalize(&mut values); + smooth(&mut values, 3.0); + derive(&mut values); + let extrema = find_extrema(&values); + + // Look for a sharp drop in the value to indicate watering. + let last_watered_index = extrema + .iter() + .filter_map(|x| match x { + Extremum::Minimum(i) => { + if values[*i] < -0.1 { + Some(i) + } else { + None + } + } + _ => None, + }) + .last(); + + self.last_watered = + last_watered_index.map(|i| DateTime::<Utc>::from_utc(data[*i].timestamp, Utc)); + + // How much water is left? + let low_watermark = last_watered_index.map(|i| data[*i].value); + let high_watermark = last_watered_index + .and_then(|i| { + extrema + .iter() + .rev() + .filter_map(|x| match x { + Extremum::Maximum(j) => { + if j < i { + Some(j) + } else { + None + } + } + _ => None, + }) + .next() + }) + .map(|i| data[*i].value); + + self.current_value = data.last().map(|v| v.value); + self.water_level = if let (Some(high), Some(low), Some(current)) = + (high_watermark, low_watermark, self.current_value) + { + Some( + 1.0 - (current - f64::min(low, current)) + / (f64::max(high, current) - f64::max(low, current)), + ) + } else { + None + }; + } +} + +impl From<Device> for DeviceResponse { + fn from(device: Device) -> DeviceResponse { + DeviceResponse { + device_id: device.device_id, + name: device.name, + battery_level: None, + water_level: None, + current_value: None, + last_watered: None, + last_updated: None, + } + } +} + impl DeviceData { pub fn new(device: Device) -> Self { DeviceData { - device, + device_id: device.device_id.to_string(), + device: device.into(), data: Vec::new(), } } @@ -57,6 +156,10 @@ impl DeviceData { pub fn add_datapoint(&mut self, datapoint: Datapoint) { self.data.push(datapoint); } + + pub fn calculate_status(&mut self) { + self.device.calculate_status(&self.data); + } } impl DataResponse { @@ -74,7 +177,7 @@ impl DataResponse { impl PutDataRequest { pub fn validate_signature(&self, secret_key: &[u8]) -> bool { let data = format!( - "battery_value={}&device_id={}&value={}", + "battery_value={:0.2}&device_id={}&value={:0.2}", self.battery_value, urlencoding::encode(&self.device_id), self.value @@ -99,6 +202,11 @@ impl PutDataRequest { } }; + log::debug!( + "Verifying data '{}' matches signature '{}'", + &data, + &self.signature + ); mac.update(data.as_bytes()); mac.verify_slice(&signature).is_ok() } @@ -132,11 +240,15 @@ async fn get_data(ctx: web::Data<Context>) -> impl Responder { } let mut response = DataResponse::new(); - devicemap - .into_iter() - .for_each(|(_k, v)| response.add_device(v)); + devicemap.into_iter().for_each(|(_k, mut v)| { + v.calculate_status(); + response.add_device(v); + }); - HttpResponse::Ok().json(response) + HttpResponse::Ok() + .header(header::CACHE_CONTROL, "no-store") + .header(header::PRAGMA, "no-cache") + .json(response) } async fn put_data(req: web::Form<PutDataRequest>, ctx: web::Data<Context>) -> impl Responder { @@ -165,13 +277,17 @@ async fn put_data(req: web::Form<PutDataRequest>, ctx: web::Data<Context>) -> im device_id: req.device_id.to_string(), timestamp: Utc::now().naive_utc(), value: req.value, + battery_status: req.battery_value, }; diesel::insert_into(data::table) .values(&datapoint) .execute(&mut *db) .expect("Insert datapoint record"); - HttpResponse::Created().finish() + HttpResponse::Created() + .header(header::CACHE_CONTROL, "no-store") + .header(header::PRAGMA, "no-cache") + .finish() } fn open_database(db_filename: &str) -> io::Result<SqliteConnection> { @@ -183,7 +299,7 @@ async fn main() -> io::Result<()> { dotenv::dotenv().ok(); env_logger::init(); - let bind = std::env::var("BIND").unwrap_or_else(|_| "127.0.0.1:8180".to_string()); + let bind = std::env::var("BIND").unwrap_or_else(|_| ":::8180".to_string()); let db_path = std::env::var("DATABASE_URL").expect("Missing DATABASE env variable"); let secret_key = std::env::var("SECRET_KEY").expect("Missing SECRET_KEY env variable"); @@ -198,6 +314,7 @@ async fn main() -> io::Result<()> { .app_data(ctx.clone()) .route("/data", web::get().to(get_data)) .route("/data", web::put().to(put_data)) + .service(Files::new("/", "www/").index_file("index.html")) }) .bind(bind) .unwrap() diff --git a/src/maxmin.rs b/src/maxmin.rs new file mode 100644 index 0000000..6bea2d7 --- /dev/null +++ b/src/maxmin.rs @@ -0,0 +1,145 @@ +use std::cmp::Ordering; + +pub fn normalize(data: &mut Vec<f64>) { + if data.is_empty() { + return; + } + + let max = *data + .iter() + .max_by(|a, b| a.partial_cmp(b).unwrap_or(Ordering::Equal)) + .unwrap(); + let min = *data + .iter() + .min_by(|a, b| a.partial_cmp(b).unwrap_or(Ordering::Equal)) + .unwrap(); + for x in data.iter_mut() { + *x = (*x - min) / (max - min); + } +} + +pub fn smooth(data: &mut Vec<f64>, sigma: f64) { + const WINDOW_SIZE: usize = 7; + + if data.is_empty() { + return; + } + + // Build Gaussian filter + let sigsig = 2.0 * sigma.powi(2); + let mut g: Vec<f64> = (0..WINDOW_SIZE) + .map(|i| { + let x2 = (i as f64).powi(2); + std::f64::consts::E.powf(-x2 / sigsig) + }) + .collect(); + + // Normalize Gaussian filter + let filter_sum: f64 = g.iter().sum(); + g.iter_mut().for_each(|x| *x /= filter_sum); + + // Smooth data + let first = data.first().unwrap(); + let mut buffer = [*first; WINDOW_SIZE]; + let mut bi = 0; + for (_loc, x) in data.iter_mut().enumerate() { + buffer[bi] = *x; + *x = 0.0; + for (i, b) in g.iter().enumerate() { + *x += b * buffer[(bi + i) % WINDOW_SIZE]; + } + bi = (bi + 1) % WINDOW_SIZE; + } +} + +pub fn derive(data: &mut Vec<f64>) { + const WINDOW_SIZE: usize = 3; + + if data.is_empty() { + return; + } + + // Calculate the derivative. + let first = data.first().unwrap(); + let mut buffer = [*first; WINDOW_SIZE]; + let mut bi = 0; + for (_loc, x) in data.iter_mut().enumerate() { + buffer[bi] = *x; + *x = (buffer[bi] - buffer[(bi + WINDOW_SIZE - 1) % (WINDOW_SIZE)]) + / (WINDOW_SIZE as f64 - 1.0); + bi = (bi + 1) % WINDOW_SIZE; + } +} + +enum State { + Start, + NoClue(usize, f64), + SeekingMinimum(usize, f64), + SeekingMaximum(usize, f64), +} + +#[derive(Debug)] +pub enum Extremum { + Maximum(usize), + Minimum(usize), +} + +pub fn find_extrema(data: &[f64]) -> Vec<Extremum> { + let mut result = Vec::new(); + let mut state = State::Start; + for (i, v) in data.iter().enumerate() { + let datum = *v; + state = match state { + State::Start => State::NoClue(i, datum), + State::NoClue(pi, prior) => { + if datum > prior { + result.push(Extremum::Minimum(pi)); + State::SeekingMaximum(i, datum) + } else { + result.push(Extremum::Maximum(pi)); + State::SeekingMinimum(i, datum) + } + } + State::SeekingMinimum(pi, prior) => { + if datum <= prior { + State::SeekingMinimum(i, datum) + } else { + result.push(Extremum::Minimum(pi)); + State::SeekingMaximum(i, datum) + } + } + State::SeekingMaximum(pi, prior) => { + if datum >= prior { + State::SeekingMaximum(i, datum) + } else { + result.push(Extremum::Maximum(pi)); + State::SeekingMinimum(i, datum) + } + } + }; + } + + match state { + State::Start => (), + State::NoClue(..) => (), + State::SeekingMinimum(i, _) => result.push(Extremum::Minimum(i)), + State::SeekingMaximum(i, _) => result.push(Extremum::Maximum(i)), + } + + result +} + +#[cfg(test)] +mod tests { + #[test] + fn normalize_normalizes() { + let mut x = vec![1.0, 2.0, 3.0, 4.0]; + + super::normalize(&mut x); + + assert!((0.0 - x[0]).abs() < 0.001); + assert!((0.333 - x[1]).abs() < 0.001); + assert!((0.667 - x[2]).abs() < 0.001); + assert!((1.0 - x[3]).abs() < 0.001); + } +} diff --git a/src/models.rs b/src/models.rs index d8796d0..ba57a9a 100644 --- a/src/models.rs +++ b/src/models.rs @@ -15,4 +15,5 @@ pub struct Datapoint { pub device_id: String, pub timestamp: NaiveDateTime, pub value: f64, + pub battery_status: f64, } diff --git a/src/schema.rs b/src/schema.rs index 04e88dc..40a3a97 100644 --- a/src/schema.rs +++ b/src/schema.rs @@ -3,6 +3,7 @@ table! { device_id -> Text, timestamp -> Timestamp, value -> Double, + battery_status -> Double, } } |