libsignal_service/push_service/
cdn.rs

1use std::{
2    collections::HashMap,
3    io::{self, Read, SeekFrom},
4};
5
6use futures::TryStreamExt;
7use reqwest::{
8    header::{CONTENT_LENGTH, CONTENT_RANGE, CONTENT_TYPE, RANGE},
9    multipart::Part,
10    Method, StatusCode,
11};
12use serde::Deserialize;
13use tracing::{debug, trace};
14use url::Url;
15
16use crate::{
17    configuration::Endpoint, prelude::AttachmentIdentifier,
18    proto::AttachmentPointer, push_service::HttpAuthOverride,
19};
20
21use super::{response::ReqwestExt, PushService, ServiceError};
22
23#[derive(Debug, serde::Deserialize, Default)]
24#[serde(rename_all = "camelCase")]
25pub struct AttachmentV2UploadAttributes {
26    key: String,
27    credential: String,
28    acl: String,
29    algorithm: String,
30    date: String,
31    policy: String,
32    signature: String,
33}
34
35#[derive(Debug, Deserialize)]
36#[serde(rename_all = "camelCase")]
37pub struct AttachmentUploadForm {
38    pub cdn: u32,
39    pub key: String,
40    pub headers: HashMap<String, String>,
41    pub signed_upload_location: Url,
42}
43
44#[derive(Debug, Deserialize)]
45#[serde(rename_all = "camelCase")]
46pub struct AttachmentDigest {
47    pub digest: Vec<u8>,
48    pub incremental_digest: Option<Vec<u8>>,
49    pub incremental_mac_chunk_size: u64,
50}
51
52#[derive(Debug)]
53pub struct ResumeInfo {
54    pub content_range: Option<String>,
55    pub content_start: u64,
56}
57
58impl PushService {
59    pub async fn get_attachment(
60        &mut self,
61        ptr: &AttachmentPointer,
62    ) -> Result<impl futures::io::AsyncRead + Send + Unpin, ServiceError> {
63        let path = match ptr.attachment_identifier.as_ref() {
64            Some(AttachmentIdentifier::CdnId(id)) => {
65                format!("attachments/{}", id)
66            },
67            Some(AttachmentIdentifier::CdnKey(key)) => {
68                format!("attachments/{}", key)
69            },
70            None => {
71                return Err(ServiceError::InvalidFrame {
72                    reason: "no attachment identifier in pointer",
73                });
74            },
75        };
76        self.get_from_cdn(ptr.cdn_number(), &path).await
77    }
78
79    #[tracing::instrument(skip(self))]
80    pub(crate) async fn get_from_cdn(
81        &mut self,
82        cdn_id: u32,
83        path: &str,
84    ) -> Result<impl futures::io::AsyncRead + Send + Unpin, ServiceError> {
85        let response_stream = self
86            .request(
87                Method::GET,
88                Endpoint::cdn(cdn_id, path),
89                HttpAuthOverride::Unidentified, // CDN requests are always without authentication
90            )?
91            .send()
92            .await?
93            .error_for_status()?
94            .bytes_stream()
95            .map_err(io::Error::other)
96            .into_async_read();
97
98        Ok(response_stream)
99    }
100
101    pub(crate) async fn get_attachment_v4_upload_attributes(
102        &mut self,
103    ) -> Result<AttachmentUploadForm, ServiceError> {
104        self.request(
105            Method::GET,
106            Endpoint::service("/v4/attachments/form/upload"),
107            HttpAuthOverride::NoOverride,
108        )?
109        .send()
110        .await?
111        .service_error_for_status()
112        .await?
113        .json()
114        .await
115        .map_err(Into::into)
116    }
117
118    #[tracing::instrument(skip(self), level=tracing::Level::TRACE)]
119    pub(crate) async fn get_attachment_resumable_upload_url(
120        &mut self,
121        attachment_upload_form: &AttachmentUploadForm,
122    ) -> Result<Url, ServiceError> {
123        let mut request = self.request(
124            Method::POST,
125            Endpoint::Absolute(
126                attachment_upload_form.signed_upload_location.clone(),
127            ),
128            HttpAuthOverride::Unidentified,
129        )?;
130
131        for (key, value) in &attachment_upload_form.headers {
132            request = request.header(key, value);
133        }
134        request = request.header(CONTENT_LENGTH, "0");
135
136        if attachment_upload_form.cdn == 2 {
137            request = request.header(CONTENT_TYPE, "application/octet-stream");
138        } else if attachment_upload_form.cdn == 3 {
139            request = request
140                .header("Upload-Defer-Length", "1")
141                .header("Tus-Resumable", "1.0.0");
142        } else {
143            return Err(ServiceError::UnknownCdnVersion(
144                attachment_upload_form.cdn,
145            ));
146        };
147
148        Ok(request
149            .send()
150            .await?
151            .error_for_status()?
152            .headers()
153            .get("location")
154            .ok_or(ServiceError::InvalidFrame {
155                reason: "missing location header in HTTP response",
156            })?
157            .to_str()
158            .map_err(|_| ServiceError::InvalidFrame {
159                reason: "invalid location header bytes in HTTP response",
160            })?
161            .parse()?)
162    }
163
164    #[tracing::instrument(skip(self))]
165    async fn get_attachment_resume_info_cdn2(
166        &mut self,
167        resumable_url: &Url,
168        content_length: u64,
169    ) -> Result<ResumeInfo, ServiceError> {
170        let response = self
171            .request(
172                Method::PUT,
173                Endpoint::cdn_url(2, resumable_url),
174                HttpAuthOverride::Unidentified,
175            )?
176            .header(CONTENT_RANGE, format!("bytes */{content_length}"))
177            .send()
178            .await?
179            .error_for_status()?;
180
181        let status = response.status();
182
183        if status.is_success() {
184            Ok(ResumeInfo {
185                content_range: None,
186                content_start: content_length,
187            })
188        } else if status == StatusCode::PERMANENT_REDIRECT {
189            let offset =
190                match response.headers().get(RANGE) {
191                    Some(range) => range
192                        .to_str()
193                        .map_err(|_| ServiceError::InvalidFrame {
194                            reason: "invalid format for Range HTTP header",
195                        })?
196                        .split('-')
197                        .nth(1)
198                        .ok_or(ServiceError::InvalidFrame {
199                            reason:
200                                "invalid value format for Range HTTP header",
201                        })?
202                        .parse::<u64>()
203                        .map_err(|_| ServiceError::InvalidFrame {
204                            reason:
205                                "invalid number format for Range HTTP header",
206                        })?
207                        + 1,
208                    None => 0,
209                };
210
211            Ok(ResumeInfo {
212                content_range: Some(format!(
213                    "bytes {}-{}/{}",
214                    offset,
215                    content_length - 1,
216                    content_length
217                )),
218                content_start: offset,
219            })
220        } else {
221            Err(ServiceError::InvalidFrame {
222                reason: "failed to get resumable upload data from CDN2",
223            })
224        }
225    }
226
227    #[tracing::instrument(skip(self))]
228    async fn get_attachment_resume_info_cdn3(
229        &mut self,
230        resumable_url: &Url,
231        headers: &HashMap<String, String>,
232    ) -> Result<ResumeInfo, ServiceError> {
233        let mut request = self
234            .request(
235                Method::HEAD,
236                Endpoint::cdn_url(3, resumable_url),
237                HttpAuthOverride::Unidentified,
238            )?
239            .header("Tus-Resumable", "1.0.0");
240
241        for (key, value) in headers {
242            request = request.header(key, value);
243        }
244
245        let response = request.send().await?.error_for_status()?;
246
247        let upload_offset = response
248            .headers()
249            .get("upload-offset")
250            .ok_or(ServiceError::InvalidFrame {
251                reason: "no Upload-Offset header in response",
252            })?
253            .to_str()
254            .map_err(|_| ServiceError::InvalidFrame {
255                reason: "invalid upload-offset header bytes in HTTP response",
256            })?
257            .parse()
258            .map_err(|_| ServiceError::InvalidFrame {
259                reason: "invalid integer value for Upload-Offset header",
260            })?;
261
262        Ok(ResumeInfo {
263            content_range: None,
264            content_start: upload_offset,
265        })
266    }
267
268    /// Upload attachment
269    ///
270    /// Returns attachment ID and the attachment digest
271    #[tracing::instrument(skip(self, headers, content))]
272    pub(crate) async fn upload_attachment_v4(
273        &mut self,
274        cdn_id: u32,
275        resumable_url: &Url,
276        content_length: u64,
277        headers: HashMap<String, String>,
278        content: impl std::io::Read + std::io::Seek + Send,
279    ) -> Result<AttachmentDigest, ServiceError> {
280        if cdn_id == 2 {
281            self.upload_to_cdn2(resumable_url, content_length, content)
282                .await
283        } else {
284            self.upload_to_cdn3(
285                resumable_url,
286                &headers,
287                content_length,
288                content,
289            )
290            .await
291        }
292    }
293
294    #[tracing::instrument(skip(self, upload_attributes, reader))]
295    pub async fn upload_to_cdn0(
296        &mut self,
297        path: &str,
298        upload_attributes: AttachmentV2UploadAttributes,
299        filename: String,
300        mut reader: impl Read + Send,
301    ) -> Result<(), ServiceError> {
302        let mut buf = Vec::new();
303        reader
304            .read_to_end(&mut buf)
305            .expect("infallible Read instance");
306
307        // Amazon S3 expects multipart fields in a very specific order
308        // DO NOT CHANGE THIS (or do it, but feel the wrath of the gods)
309        let form = reqwest::multipart::Form::new()
310            .text("acl", upload_attributes.acl)
311            .text("key", upload_attributes.key)
312            .text("policy", upload_attributes.policy)
313            .text("Content-Type", "application/octet-stream")
314            .text("x-amz-algorithm", upload_attributes.algorithm)
315            .text("x-amz-credential", upload_attributes.credential)
316            .text("x-amz-date", upload_attributes.date)
317            .text("x-amz-signature", upload_attributes.signature)
318            .part(
319                "file",
320                Part::stream(buf)
321                    .mime_str("application/octet-stream")?
322                    .file_name(filename),
323            );
324
325        let response = self
326            .request(
327                Method::POST,
328                Endpoint::cdn(0, path),
329                HttpAuthOverride::NoOverride,
330            )?
331            .multipart(form)
332            .send()
333            .await?
334            .error_for_status()?;
335
336        debug!("HyperPushService::PUT response: {:?}", response);
337
338        Ok(())
339    }
340
341    #[tracing::instrument(skip(self, content))]
342    async fn upload_to_cdn2(
343        &mut self,
344        resumable_url: &Url,
345        content_length: u64,
346        mut content: impl std::io::Read + std::io::Seek + Send,
347    ) -> Result<AttachmentDigest, ServiceError> {
348        let resume_info = self
349            .get_attachment_resume_info_cdn2(resumable_url, content_length)
350            .await?;
351
352        let mut digester =
353            crate::digeststream::DigestingReader::new(&mut content);
354
355        let mut buf = Vec::new();
356        digester.read_to_end(&mut buf)?;
357
358        trace!("digested content");
359
360        let mut request = self.request(
361            Method::PUT,
362            Endpoint::cdn_url(2, resumable_url),
363            HttpAuthOverride::Unidentified,
364        )?;
365
366        if let Some(content_range) = resume_info.content_range {
367            request = request.header(CONTENT_RANGE, content_range);
368        }
369
370        request.body(buf).send().await?.error_for_status()?;
371
372        Ok(AttachmentDigest {
373            digest: digester.finalize(),
374            incremental_digest: None,
375            incremental_mac_chunk_size: 0,
376        })
377    }
378
379    #[tracing::instrument(skip(self, content))]
380    async fn upload_to_cdn3(
381        &mut self,
382        resumable_url: &Url,
383        headers: &HashMap<String, String>,
384        content_length: u64,
385        mut content: impl std::io::Read + std::io::Seek + Send,
386    ) -> Result<AttachmentDigest, ServiceError> {
387        let resume_info = self
388            .get_attachment_resume_info_cdn3(resumable_url, headers)
389            .await?;
390
391        trace!(?resume_info, "got resume info");
392
393        if resume_info.content_start == content_length {
394            let mut digester =
395                crate::digeststream::DigestingReader::new(&mut content);
396            let mut buf = Vec::new();
397            digester.read_to_end(&mut buf)?;
398            return Ok(AttachmentDigest {
399                digest: digester.finalize(),
400                incremental_digest: None,
401                incremental_mac_chunk_size: 0,
402            });
403        }
404
405        let mut digester =
406            crate::digeststream::DigestingReader::new(&mut content);
407        digester.seek(SeekFrom::Start(resume_info.content_start))?;
408
409        let mut buf = Vec::new();
410        digester.read_to_end(&mut buf)?;
411
412        trace!("digested content");
413
414        let mut request = self.request(
415            Method::PATCH,
416            Endpoint::cdn(3, resumable_url.path()),
417            HttpAuthOverride::Unidentified,
418        )?;
419
420        for (key, value) in headers {
421            request = request.header(key, value);
422        }
423
424        request
425            .header("Tus-Resumable", "1.0.0")
426            .header("Upload-Offset", resume_info.content_start)
427            .header("Upload-Length", buf.len())
428            .header(CONTENT_TYPE, "application/offset+octet-stream")
429            .body(buf)
430            .send()
431            .await?
432            .error_for_status()?;
433
434        trace!("attachment uploaded");
435
436        Ok(AttachmentDigest {
437            digest: digester.finalize(),
438            incremental_digest: None,
439            incremental_mac_chunk_size: 0,
440        })
441    }
442}