summaryrefslogtreecommitdiff
path: root/src/s3.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/s3.rs')
-rw-r--r--src/s3.rs152
1 files changed, 152 insertions, 0 deletions
diff --git a/src/s3.rs b/src/s3.rs
new file mode 100644
index 0000000..f72b579
--- /dev/null
+++ b/src/s3.rs
@@ -0,0 +1,152 @@
+use std::{
+ error::Error,
+ io::{Seek, Write},
+};
+
+use aws_sdk_s3::{
+ error::SdkError, operation::get_object::GetObjectError, primitives::ByteStream, Client,
+};
+use aws_smithy_types_convert::date_time::DateTimeExt;
+use bytes::buf::Buf;
+use futures::TryFutureExt;
+use tempfile::tempfile;
+
+use crate::{Entry, Index};
+
+pub struct Access {
+ client: Client,
+ bucket: String,
+}
+
+impl Access {
+ pub fn new(client: Client, bucket: String) -> Self {
+ Self { client, bucket }
+ }
+
+ pub async fn fetch_index(&self) -> Result<(Index, Option<String>), Box<dyn Error>> {
+ let result = self
+ .client
+ .get_object()
+ .bucket(&self.bucket)
+ .key(".mp32rss/index.json")
+ .send()
+ .await;
+
+ match result {
+ Ok(obj) => {
+ let data = obj.body.collect().await?;
+ let index: Index = serde_json::from_reader(data.reader())?;
+ Ok((index, obj.e_tag))
+ }
+
+ Err(SdkError::ServiceError(ref e)) => {
+ if let GetObjectError::NoSuchKey(_) = e.err() {
+ Ok((Index::default(), None))
+ } else {
+ Err(Box::from(result.err().unwrap()))
+ }
+ }
+
+ _ => Err(Box::from(result.err().unwrap())),
+ }
+ }
+
+ pub async fn put_index(
+ &self,
+ index: &Index,
+ index_etag: Option<&str>,
+ ) -> Result<Option<String>, Box<dyn Error>> {
+ let data = serde_json::to_string_pretty(index)?;
+
+ let mut builder = self
+ .client
+ .put_object()
+ .bucket(&self.bucket)
+ .key(".mp32rss/index.json")
+ .body(ByteStream::from(data.into_bytes()));
+
+ builder = if let Some(etag) = index_etag {
+ builder.if_match(etag)
+ } else {
+ builder.if_none_match('*')
+ };
+
+ let resp = builder.send().await?;
+ Ok(resp.e_tag)
+ }
+
+ pub async fn put_file(
+ &self,
+ key: &str,
+ content_type: &str,
+ data: String,
+ ) -> Result<Option<String>, Box<dyn Error>> {
+ let resp = self
+ .client
+ .put_object()
+ .bucket(&self.bucket)
+ .key(key)
+ .content_type(content_type)
+ .body(ByteStream::from(data.into_bytes()))
+ .send()
+ .await?;
+ Ok(resp.e_tag)
+ }
+
+ pub async fn list_mp3s(&self) -> Result<Vec<String>, Box<dyn Error>> {
+ Ok(self
+ .client
+ .list_objects_v2()
+ .bucket(&self.bucket)
+ .into_paginator()
+ .send()
+ .try_collect()
+ .await?
+ .iter()
+ .flat_map(|resp| resp.contents())
+ .map(|obj| obj.key().expect("Missing key from s3 object").to_string())
+ .filter(|key| key.ends_with(".mp3"))
+ .collect())
+ }
+
+ pub async fn fetch_entry(&self, key: String) -> Result<Entry, Box<dyn Error>> {
+ let mut file = tempfile()?;
+
+ let mut obj = self
+ .client
+ .get_object()
+ .bucket(&self.bucket)
+ .key(&key)
+ .send()
+ .await?;
+
+ let mut size = 0;
+ while let Some(bytes) = obj.body.try_next().await? {
+ size += bytes.len() as i64;
+ file.write_all(&bytes)?;
+ }
+
+ file.rewind()?;
+
+ Entry::read_from(
+ key,
+ obj.e_tag,
+ size,
+ obj.last_modified
+ .and_then(|dt| dt.to_chrono_utc().ok())
+ .map(|dt| dt.date_naive()),
+ file,
+ )
+ }
+
+ pub async fn remove_file(&self, key: &str) -> Result<(), Box<dyn Error>> {
+ self.client
+ .delete_object()
+ .bucket(&self.bucket)
+ .key(key)
+ .send()
+ .map_ok(|_| ())
+ .map_err(Box::from)
+ .await
+ }
+}