diff options
Diffstat (limited to 'src/s3.rs')
-rw-r--r-- | src/s3.rs | 152 |
1 files changed, 152 insertions, 0 deletions
diff --git a/src/s3.rs b/src/s3.rs new file mode 100644 index 0000000..f72b579 --- /dev/null +++ b/src/s3.rs @@ -0,0 +1,152 @@ +use std::{ + error::Error, + io::{Seek, Write}, +}; + +use aws_sdk_s3::{ + error::SdkError, operation::get_object::GetObjectError, primitives::ByteStream, Client, +}; +use aws_smithy_types_convert::date_time::DateTimeExt; +use bytes::buf::Buf; +use futures::TryFutureExt; +use tempfile::tempfile; + +use crate::{Entry, Index}; + +pub struct Access { + client: Client, + bucket: String, +} + +impl Access { + pub fn new(client: Client, bucket: String) -> Self { + Self { client, bucket } + } + + pub async fn fetch_index(&self) -> Result<(Index, Option<String>), Box<dyn Error>> { + let result = self + .client + .get_object() + .bucket(&self.bucket) + .key(".mp32rss/index.json") + .send() + .await; + + match result { + Ok(obj) => { + let data = obj.body.collect().await?; + let index: Index = serde_json::from_reader(data.reader())?; + Ok((index, obj.e_tag)) + } + + Err(SdkError::ServiceError(ref e)) => { + if let GetObjectError::NoSuchKey(_) = e.err() { + Ok((Index::default(), None)) + } else { + Err(Box::from(result.err().unwrap())) + } + } + + _ => Err(Box::from(result.err().unwrap())), + } + } + + pub async fn put_index( + &self, + index: &Index, + index_etag: Option<&str>, + ) -> Result<Option<String>, Box<dyn Error>> { + let data = serde_json::to_string_pretty(index)?; + + let mut builder = self + .client + .put_object() + .bucket(&self.bucket) + .key(".mp32rss/index.json") + .body(ByteStream::from(data.into_bytes())); + + builder = if let Some(etag) = index_etag { + builder.if_match(etag) + } else { + builder.if_none_match('*') + }; + + let resp = builder.send().await?; + Ok(resp.e_tag) + } + + pub async fn put_file( + &self, + key: &str, + content_type: &str, + data: String, + ) -> Result<Option<String>, Box<dyn Error>> { + let resp = self + .client + .put_object() + .bucket(&self.bucket) + .key(key) + .content_type(content_type) + .body(ByteStream::from(data.into_bytes())) + .send() + .await?; + Ok(resp.e_tag) + } + + pub async fn list_mp3s(&self) -> Result<Vec<String>, Box<dyn Error>> { + Ok(self + .client + .list_objects_v2() + .bucket(&self.bucket) + .into_paginator() + .send() + .try_collect() + .await? + .iter() + .flat_map(|resp| resp.contents()) + .map(|obj| obj.key().expect("Missing key from s3 object").to_string()) + .filter(|key| key.ends_with(".mp3")) + .collect()) + } + + pub async fn fetch_entry(&self, key: String) -> Result<Entry, Box<dyn Error>> { + let mut file = tempfile()?; + + let mut obj = self + .client + .get_object() + .bucket(&self.bucket) + .key(&key) + .send() + .await?; + + let mut size = 0; + while let Some(bytes) = obj.body.try_next().await? { + size += bytes.len() as i64; + file.write_all(&bytes)?; + } + + file.rewind()?; + + Entry::read_from( + key, + obj.e_tag, + size, + obj.last_modified + .and_then(|dt| dt.to_chrono_utc().ok()) + .map(|dt| dt.date_naive()), + file, + ) + } + + pub async fn remove_file(&self, key: &str) -> Result<(), Box<dyn Error>> { + self.client + .delete_object() + .bucket(&self.bucket) + .key(key) + .send() + .map_ok(|_| ()) + .map_err(Box::from) + .await + } +} |