summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/main.rs139
-rw-r--r--src/maxmin.rs145
-rw-r--r--src/models.rs1
-rw-r--r--src/schema.rs1
4 files changed, 275 insertions, 11 deletions
diff --git a/src/main.rs b/src/main.rs
index ad27759..d7dda15 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -1,11 +1,13 @@
#[macro_use]
extern crate diesel;
+use actix_files::Files;
+use actix_web::http::header;
use actix_web::{middleware, web, HttpResponse, HttpServer, Responder};
-use chrono::{Duration, Utc};
-use diesel::Connection;
+use chrono::{DateTime, Duration, Utc};
use diesel::prelude::*;
use diesel::sqlite::SqliteConnection;
+use diesel::Connection;
use hmac::{Hmac, Mac};
use serde::{Deserialize, Serialize};
use sha2::Sha256;
@@ -13,9 +15,11 @@ use std::collections::BTreeMap;
use std::io;
use std::sync::Mutex;
+mod maxmin;
mod models;
mod schema;
+use self::maxmin::*;
use self::models::*;
/// Application context to be available on every request.
@@ -24,10 +28,23 @@ struct Context {
secret_key: Vec<u8>,
}
+/// Status of a device.
+#[derive(Serialize)]
+struct DeviceResponse {
+ device_id: String,
+ name: String,
+ battery_level: Option<f64>,
+ current_value: Option<f64>,
+ water_level: Option<f64>,
+ last_watered: Option<DateTime<Utc>>,
+ last_updated: Option<DateTime<Utc>>,
+}
+
/// Time series of datapoints for a device.
#[derive(Serialize)]
struct DeviceData {
- device: Device,
+ device_id: String,
+ device: DeviceResponse,
data: Vec<Datapoint>,
}
@@ -46,10 +63,92 @@ struct PutDataRequest {
signature: String,
}
+impl DeviceResponse {
+ pub fn calculate_status(&mut self, data: &[Datapoint]) {
+ self.last_updated = data
+ .last()
+ .map(|v| DateTime::<Utc>::from_utc(v.timestamp, Utc));
+ self.battery_level = data.last().map(|v| v.battery_status);
+
+ // Find the local extrema
+ let mut values = data.iter().map(|v| v.value).collect();
+ normalize(&mut values);
+ smooth(&mut values, 3.0);
+ derive(&mut values);
+ let extrema = find_extrema(&values);
+
+ // Look for a sharp drop in the value to indicate watering.
+ let last_watered_index = extrema
+ .iter()
+ .filter_map(|x| match x {
+ Extremum::Minimum(i) => {
+ if values[*i] < -0.1 {
+ Some(i)
+ } else {
+ None
+ }
+ }
+ _ => None,
+ })
+ .last();
+
+ self.last_watered =
+ last_watered_index.map(|i| DateTime::<Utc>::from_utc(data[*i].timestamp, Utc));
+
+ // How much water is left?
+ let low_watermark = last_watered_index.map(|i| data[*i].value);
+ let high_watermark = last_watered_index
+ .and_then(|i| {
+ extrema
+ .iter()
+ .rev()
+ .filter_map(|x| match x {
+ Extremum::Maximum(j) => {
+ if j < i {
+ Some(j)
+ } else {
+ None
+ }
+ }
+ _ => None,
+ })
+ .next()
+ })
+ .map(|i| data[*i].value);
+
+ self.current_value = data.last().map(|v| v.value);
+ self.water_level = if let (Some(high), Some(low), Some(current)) =
+ (high_watermark, low_watermark, self.current_value)
+ {
+ Some(
+ 1.0 - (current - f64::min(low, current))
+ / (f64::max(high, current) - f64::max(low, current)),
+ )
+ } else {
+ None
+ };
+ }
+}
+
+impl From<Device> for DeviceResponse {
+ fn from(device: Device) -> DeviceResponse {
+ DeviceResponse {
+ device_id: device.device_id,
+ name: device.name,
+ battery_level: None,
+ water_level: None,
+ current_value: None,
+ last_watered: None,
+ last_updated: None,
+ }
+ }
+}
+
impl DeviceData {
pub fn new(device: Device) -> Self {
DeviceData {
- device,
+ device_id: device.device_id.to_string(),
+ device: device.into(),
data: Vec::new(),
}
}
@@ -57,6 +156,10 @@ impl DeviceData {
pub fn add_datapoint(&mut self, datapoint: Datapoint) {
self.data.push(datapoint);
}
+
+ pub fn calculate_status(&mut self) {
+ self.device.calculate_status(&self.data);
+ }
}
impl DataResponse {
@@ -74,7 +177,7 @@ impl DataResponse {
impl PutDataRequest {
pub fn validate_signature(&self, secret_key: &[u8]) -> bool {
let data = format!(
- "battery_value={}&device_id={}&value={}",
+ "battery_value={:0.2}&device_id={}&value={:0.2}",
self.battery_value,
urlencoding::encode(&self.device_id),
self.value
@@ -99,6 +202,11 @@ impl PutDataRequest {
}
};
+ log::debug!(
+ "Verifying data '{}' matches signature '{}'",
+ &data,
+ &self.signature
+ );
mac.update(data.as_bytes());
mac.verify_slice(&signature).is_ok()
}
@@ -132,11 +240,15 @@ async fn get_data(ctx: web::Data<Context>) -> impl Responder {
}
let mut response = DataResponse::new();
- devicemap
- .into_iter()
- .for_each(|(_k, v)| response.add_device(v));
+ devicemap.into_iter().for_each(|(_k, mut v)| {
+ v.calculate_status();
+ response.add_device(v);
+ });
- HttpResponse::Ok().json(response)
+ HttpResponse::Ok()
+ .header(header::CACHE_CONTROL, "no-store")
+ .header(header::PRAGMA, "no-cache")
+ .json(response)
}
async fn put_data(req: web::Form<PutDataRequest>, ctx: web::Data<Context>) -> impl Responder {
@@ -165,13 +277,17 @@ async fn put_data(req: web::Form<PutDataRequest>, ctx: web::Data<Context>) -> im
device_id: req.device_id.to_string(),
timestamp: Utc::now().naive_utc(),
value: req.value,
+ battery_status: req.battery_value,
};
diesel::insert_into(data::table)
.values(&datapoint)
.execute(&mut *db)
.expect("Insert datapoint record");
- HttpResponse::Created().finish()
+ HttpResponse::Created()
+ .header(header::CACHE_CONTROL, "no-store")
+ .header(header::PRAGMA, "no-cache")
+ .finish()
}
fn open_database(db_filename: &str) -> io::Result<SqliteConnection> {
@@ -183,7 +299,7 @@ async fn main() -> io::Result<()> {
dotenv::dotenv().ok();
env_logger::init();
- let bind = std::env::var("BIND").unwrap_or_else(|_| "127.0.0.1:8180".to_string());
+ let bind = std::env::var("BIND").unwrap_or_else(|_| ":::8180".to_string());
let db_path = std::env::var("DATABASE_URL").expect("Missing DATABASE env variable");
let secret_key = std::env::var("SECRET_KEY").expect("Missing SECRET_KEY env variable");
@@ -198,6 +314,7 @@ async fn main() -> io::Result<()> {
.app_data(ctx.clone())
.route("/data", web::get().to(get_data))
.route("/data", web::put().to(put_data))
+ .service(Files::new("/", "www/").index_file("index.html"))
})
.bind(bind)
.unwrap()
diff --git a/src/maxmin.rs b/src/maxmin.rs
new file mode 100644
index 0000000..6bea2d7
--- /dev/null
+++ b/src/maxmin.rs
@@ -0,0 +1,145 @@
+use std::cmp::Ordering;
+
+pub fn normalize(data: &mut Vec<f64>) {
+ if data.is_empty() {
+ return;
+ }
+
+ let max = *data
+ .iter()
+ .max_by(|a, b| a.partial_cmp(b).unwrap_or(Ordering::Equal))
+ .unwrap();
+ let min = *data
+ .iter()
+ .min_by(|a, b| a.partial_cmp(b).unwrap_or(Ordering::Equal))
+ .unwrap();
+ for x in data.iter_mut() {
+ *x = (*x - min) / (max - min);
+ }
+}
+
+pub fn smooth(data: &mut Vec<f64>, sigma: f64) {
+ const WINDOW_SIZE: usize = 7;
+
+ if data.is_empty() {
+ return;
+ }
+
+ // Build Gaussian filter
+ let sigsig = 2.0 * sigma.powi(2);
+ let mut g: Vec<f64> = (0..WINDOW_SIZE)
+ .map(|i| {
+ let x2 = (i as f64).powi(2);
+ std::f64::consts::E.powf(-x2 / sigsig)
+ })
+ .collect();
+
+ // Normalize Gaussian filter
+ let filter_sum: f64 = g.iter().sum();
+ g.iter_mut().for_each(|x| *x /= filter_sum);
+
+ // Smooth data
+ let first = data.first().unwrap();
+ let mut buffer = [*first; WINDOW_SIZE];
+ let mut bi = 0;
+ for (_loc, x) in data.iter_mut().enumerate() {
+ buffer[bi] = *x;
+ *x = 0.0;
+ for (i, b) in g.iter().enumerate() {
+ *x += b * buffer[(bi + i) % WINDOW_SIZE];
+ }
+ bi = (bi + 1) % WINDOW_SIZE;
+ }
+}
+
+pub fn derive(data: &mut Vec<f64>) {
+ const WINDOW_SIZE: usize = 3;
+
+ if data.is_empty() {
+ return;
+ }
+
+ // Calculate the derivative.
+ let first = data.first().unwrap();
+ let mut buffer = [*first; WINDOW_SIZE];
+ let mut bi = 0;
+ for (_loc, x) in data.iter_mut().enumerate() {
+ buffer[bi] = *x;
+ *x = (buffer[bi] - buffer[(bi + WINDOW_SIZE - 1) % (WINDOW_SIZE)])
+ / (WINDOW_SIZE as f64 - 1.0);
+ bi = (bi + 1) % WINDOW_SIZE;
+ }
+}
+
+enum State {
+ Start,
+ NoClue(usize, f64),
+ SeekingMinimum(usize, f64),
+ SeekingMaximum(usize, f64),
+}
+
+#[derive(Debug)]
+pub enum Extremum {
+ Maximum(usize),
+ Minimum(usize),
+}
+
+pub fn find_extrema(data: &[f64]) -> Vec<Extremum> {
+ let mut result = Vec::new();
+ let mut state = State::Start;
+ for (i, v) in data.iter().enumerate() {
+ let datum = *v;
+ state = match state {
+ State::Start => State::NoClue(i, datum),
+ State::NoClue(pi, prior) => {
+ if datum > prior {
+ result.push(Extremum::Minimum(pi));
+ State::SeekingMaximum(i, datum)
+ } else {
+ result.push(Extremum::Maximum(pi));
+ State::SeekingMinimum(i, datum)
+ }
+ }
+ State::SeekingMinimum(pi, prior) => {
+ if datum <= prior {
+ State::SeekingMinimum(i, datum)
+ } else {
+ result.push(Extremum::Minimum(pi));
+ State::SeekingMaximum(i, datum)
+ }
+ }
+ State::SeekingMaximum(pi, prior) => {
+ if datum >= prior {
+ State::SeekingMaximum(i, datum)
+ } else {
+ result.push(Extremum::Maximum(pi));
+ State::SeekingMinimum(i, datum)
+ }
+ }
+ };
+ }
+
+ match state {
+ State::Start => (),
+ State::NoClue(..) => (),
+ State::SeekingMinimum(i, _) => result.push(Extremum::Minimum(i)),
+ State::SeekingMaximum(i, _) => result.push(Extremum::Maximum(i)),
+ }
+
+ result
+}
+
+#[cfg(test)]
+mod tests {
+ #[test]
+ fn normalize_normalizes() {
+ let mut x = vec![1.0, 2.0, 3.0, 4.0];
+
+ super::normalize(&mut x);
+
+ assert!((0.0 - x[0]).abs() < 0.001);
+ assert!((0.333 - x[1]).abs() < 0.001);
+ assert!((0.667 - x[2]).abs() < 0.001);
+ assert!((1.0 - x[3]).abs() < 0.001);
+ }
+}
diff --git a/src/models.rs b/src/models.rs
index d8796d0..ba57a9a 100644
--- a/src/models.rs
+++ b/src/models.rs
@@ -15,4 +15,5 @@ pub struct Datapoint {
pub device_id: String,
pub timestamp: NaiveDateTime,
pub value: f64,
+ pub battery_status: f64,
}
diff --git a/src/schema.rs b/src/schema.rs
index 04e88dc..40a3a97 100644
--- a/src/schema.rs
+++ b/src/schema.rs
@@ -3,6 +3,7 @@ table! {
device_id -> Text,
timestamp -> Timestamp,
value -> Double,
+ battery_status -> Double,
}
}