summaryrefslogtreecommitdiff
path: root/src/s3.rs
blob: f72b57964bd6b948477703fdd306f5dd703759b9 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
use std::{
    error::Error,
    io::{Seek, Write},
};

use aws_sdk_s3::{
    error::SdkError, operation::get_object::GetObjectError, primitives::ByteStream, Client,
};
use aws_smithy_types_convert::date_time::DateTimeExt;
use bytes::buf::Buf;
use futures::TryFutureExt;
use tempfile::tempfile;

use crate::{Entry, Index};

pub struct Access {
    client: Client,
    bucket: String,
}

impl Access {
    pub fn new(client: Client, bucket: String) -> Self {
        Self { client, bucket }
    }

    pub async fn fetch_index(&self) -> Result<(Index, Option<String>), Box<dyn Error>> {
        let result = self
            .client
            .get_object()
            .bucket(&self.bucket)
            .key(".mp32rss/index.json")
            .send()
            .await;

        match result {
            Ok(obj) => {
                let data = obj.body.collect().await?;
                let index: Index = serde_json::from_reader(data.reader())?;
                Ok((index, obj.e_tag))
            }

            Err(SdkError::ServiceError(ref e)) => {
                if let GetObjectError::NoSuchKey(_) = e.err() {
                    Ok((Index::default(), None))
                } else {
                    Err(Box::from(result.err().unwrap()))
                }
            }

            _ => Err(Box::from(result.err().unwrap())),
        }
    }

    pub async fn put_index(
        &self,
        index: &Index,
        index_etag: Option<&str>,
    ) -> Result<Option<String>, Box<dyn Error>> {
        let data = serde_json::to_string_pretty(index)?;

        let mut builder = self
            .client
            .put_object()
            .bucket(&self.bucket)
            .key(".mp32rss/index.json")
            .body(ByteStream::from(data.into_bytes()));

        builder = if let Some(etag) = index_etag {
            builder.if_match(etag)
        } else {
            builder.if_none_match('*')
        };

        let resp = builder.send().await?;
        Ok(resp.e_tag)
    }

    pub async fn put_file(
        &self,
        key: &str,
        content_type: &str,
        data: String,
    ) -> Result<Option<String>, Box<dyn Error>> {
        let resp = self
            .client
            .put_object()
            .bucket(&self.bucket)
            .key(key)
            .content_type(content_type)
            .body(ByteStream::from(data.into_bytes()))
            .send()
            .await?;
        Ok(resp.e_tag)
    }

    pub async fn list_mp3s(&self) -> Result<Vec<String>, Box<dyn Error>> {
        Ok(self
            .client
            .list_objects_v2()
            .bucket(&self.bucket)
            .into_paginator()
            .send()
            .try_collect()
            .await?
            .iter()
            .flat_map(|resp| resp.contents())
            .map(|obj| obj.key().expect("Missing key from s3 object").to_string())
            .filter(|key| key.ends_with(".mp3"))
            .collect())
    }

    pub async fn fetch_entry(&self, key: String) -> Result<Entry, Box<dyn Error>> {
        let mut file = tempfile()?;

        let mut obj = self
            .client
            .get_object()
            .bucket(&self.bucket)
            .key(&key)
            .send()
            .await?;

        let mut size = 0;
        while let Some(bytes) = obj.body.try_next().await? {
            size += bytes.len() as i64;
            file.write_all(&bytes)?;
        }

        file.rewind()?;

        Entry::read_from(
            key,
            obj.e_tag,
            size,
            obj.last_modified
                .and_then(|dt| dt.to_chrono_utc().ok())
                .map(|dt| dt.date_naive()),
            file,
        )
    }

    pub async fn remove_file(&self, key: &str) -> Result<(), Box<dyn Error>> {
        self.client
            .delete_object()
            .bucket(&self.bucket)
            .key(key)
            .send()
            .map_ok(|_| ())
            .map_err(Box::from)
            .await
    }
}