1use std::{
2 collections::HashMap,
3 io::{self, Read, SeekFrom},
4};
5
6use futures::TryStreamExt;
7use reqwest::{
8 header::{CONTENT_LENGTH, CONTENT_RANGE, CONTENT_TYPE, RANGE},
9 multipart::Part,
10 Method, StatusCode,
11};
12use serde::Deserialize;
13use tracing::{debug, trace};
14use url::Url;
15
16use crate::{
17 configuration::Endpoint, prelude::AttachmentIdentifier,
18 proto::AttachmentPointer, push_service::HttpAuthOverride,
19};
20
21use super::{response::ReqwestExt, PushService, ServiceError};
22
23#[derive(Debug, serde::Deserialize, Default)]
24#[serde(rename_all = "camelCase")]
25pub struct AttachmentV2UploadAttributes {
26 key: String,
27 credential: String,
28 acl: String,
29 algorithm: String,
30 date: String,
31 policy: String,
32 signature: String,
33}
34
35#[derive(Debug, Deserialize)]
36#[serde(rename_all = "camelCase")]
37pub struct AttachmentUploadForm {
38 pub cdn: u32,
39 pub key: String,
40 pub headers: HashMap<String, String>,
41 pub signed_upload_location: Url,
42}
43
44#[derive(Debug, Deserialize)]
45#[serde(rename_all = "camelCase")]
46pub struct AttachmentDigest {
47 pub digest: Vec<u8>,
48 pub incremental_digest: Option<Vec<u8>>,
49 pub incremental_mac_chunk_size: u64,
50}
51
52#[derive(Debug)]
53pub struct ResumeInfo {
54 pub content_range: Option<String>,
55 pub content_start: u64,
56}
57
58impl PushService {
59 pub async fn get_attachment(
60 &mut self,
61 ptr: &AttachmentPointer,
62 ) -> Result<impl futures::io::AsyncRead + Send + Unpin, ServiceError> {
63 let path = match ptr.attachment_identifier.as_ref() {
64 Some(AttachmentIdentifier::CdnId(id)) => {
65 format!("attachments/{}", id)
66 },
67 Some(AttachmentIdentifier::CdnKey(key)) => {
68 format!("attachments/{}", key)
69 },
70 None => {
71 return Err(ServiceError::InvalidFrame {
72 reason: "no attachment identifier in pointer",
73 });
74 },
75 };
76 self.get_from_cdn(ptr.cdn_number(), &path).await
77 }
78
79 #[tracing::instrument(skip(self))]
80 pub(crate) async fn get_from_cdn(
81 &mut self,
82 cdn_id: u32,
83 path: &str,
84 ) -> Result<impl futures::io::AsyncRead + Send + Unpin, ServiceError> {
85 let response_stream = self
86 .request(
87 Method::GET,
88 Endpoint::cdn(cdn_id, path),
89 HttpAuthOverride::Unidentified, )?
91 .send()
92 .await?
93 .error_for_status()?
94 .bytes_stream()
95 .map_err(io::Error::other)
96 .into_async_read();
97
98 Ok(response_stream)
99 }
100
101 pub(crate) async fn get_attachment_v4_upload_attributes(
102 &mut self,
103 ) -> Result<AttachmentUploadForm, ServiceError> {
104 self.request(
105 Method::GET,
106 Endpoint::service("/v4/attachments/form/upload"),
107 HttpAuthOverride::NoOverride,
108 )?
109 .send()
110 .await?
111 .service_error_for_status()
112 .await?
113 .json()
114 .await
115 .map_err(Into::into)
116 }
117
118 #[tracing::instrument(skip(self), level=tracing::Level::TRACE)]
119 pub(crate) async fn get_attachment_resumable_upload_url(
120 &mut self,
121 attachment_upload_form: &AttachmentUploadForm,
122 ) -> Result<Url, ServiceError> {
123 let mut request = self.request(
124 Method::POST,
125 Endpoint::Absolute(
126 attachment_upload_form.signed_upload_location.clone(),
127 ),
128 HttpAuthOverride::Unidentified,
129 )?;
130
131 for (key, value) in &attachment_upload_form.headers {
132 request = request.header(key, value);
133 }
134 request = request.header(CONTENT_LENGTH, "0");
135
136 if attachment_upload_form.cdn == 2 {
137 request = request.header(CONTENT_TYPE, "application/octet-stream");
138 } else if attachment_upload_form.cdn == 3 {
139 request = request
140 .header("Upload-Defer-Length", "1")
141 .header("Tus-Resumable", "1.0.0");
142 } else {
143 return Err(ServiceError::UnknownCdnVersion(
144 attachment_upload_form.cdn,
145 ));
146 };
147
148 Ok(request
149 .send()
150 .await?
151 .error_for_status()?
152 .headers()
153 .get("location")
154 .ok_or(ServiceError::InvalidFrame {
155 reason: "missing location header in HTTP response",
156 })?
157 .to_str()
158 .map_err(|_| ServiceError::InvalidFrame {
159 reason: "invalid location header bytes in HTTP response",
160 })?
161 .parse()?)
162 }
163
164 #[tracing::instrument(skip(self))]
165 async fn get_attachment_resume_info_cdn2(
166 &mut self,
167 resumable_url: &Url,
168 content_length: u64,
169 ) -> Result<ResumeInfo, ServiceError> {
170 let response = self
171 .request(
172 Method::PUT,
173 Endpoint::cdn_url(2, resumable_url),
174 HttpAuthOverride::Unidentified,
175 )?
176 .header(CONTENT_RANGE, format!("bytes */{content_length}"))
177 .send()
178 .await?
179 .error_for_status()?;
180
181 let status = response.status();
182
183 if status.is_success() {
184 Ok(ResumeInfo {
185 content_range: None,
186 content_start: content_length,
187 })
188 } else if status == StatusCode::PERMANENT_REDIRECT {
189 let offset =
190 match response.headers().get(RANGE) {
191 Some(range) => range
192 .to_str()
193 .map_err(|_| ServiceError::InvalidFrame {
194 reason: "invalid format for Range HTTP header",
195 })?
196 .split('-')
197 .nth(1)
198 .ok_or(ServiceError::InvalidFrame {
199 reason:
200 "invalid value format for Range HTTP header",
201 })?
202 .parse::<u64>()
203 .map_err(|_| ServiceError::InvalidFrame {
204 reason:
205 "invalid number format for Range HTTP header",
206 })?
207 + 1,
208 None => 0,
209 };
210
211 Ok(ResumeInfo {
212 content_range: Some(format!(
213 "bytes {}-{}/{}",
214 offset,
215 content_length - 1,
216 content_length
217 )),
218 content_start: offset,
219 })
220 } else {
221 Err(ServiceError::InvalidFrame {
222 reason: "failed to get resumable upload data from CDN2",
223 })
224 }
225 }
226
227 #[tracing::instrument(skip(self))]
228 async fn get_attachment_resume_info_cdn3(
229 &mut self,
230 resumable_url: &Url,
231 headers: &HashMap<String, String>,
232 ) -> Result<ResumeInfo, ServiceError> {
233 let mut request = self
234 .request(
235 Method::HEAD,
236 Endpoint::cdn_url(3, resumable_url),
237 HttpAuthOverride::Unidentified,
238 )?
239 .header("Tus-Resumable", "1.0.0");
240
241 for (key, value) in headers {
242 request = request.header(key, value);
243 }
244
245 let response = request.send().await?.error_for_status()?;
246
247 let upload_offset = response
248 .headers()
249 .get("upload-offset")
250 .ok_or(ServiceError::InvalidFrame {
251 reason: "no Upload-Offset header in response",
252 })?
253 .to_str()
254 .map_err(|_| ServiceError::InvalidFrame {
255 reason: "invalid upload-offset header bytes in HTTP response",
256 })?
257 .parse()
258 .map_err(|_| ServiceError::InvalidFrame {
259 reason: "invalid integer value for Upload-Offset header",
260 })?;
261
262 Ok(ResumeInfo {
263 content_range: None,
264 content_start: upload_offset,
265 })
266 }
267
268 #[tracing::instrument(skip(self, headers, content))]
272 pub(crate) async fn upload_attachment_v4(
273 &mut self,
274 cdn_id: u32,
275 resumable_url: &Url,
276 content_length: u64,
277 headers: HashMap<String, String>,
278 content: impl std::io::Read + std::io::Seek + Send,
279 ) -> Result<AttachmentDigest, ServiceError> {
280 if cdn_id == 2 {
281 self.upload_to_cdn2(resumable_url, content_length, content)
282 .await
283 } else {
284 self.upload_to_cdn3(
285 resumable_url,
286 &headers,
287 content_length,
288 content,
289 )
290 .await
291 }
292 }
293
294 #[tracing::instrument(skip(self, upload_attributes, reader))]
295 pub async fn upload_to_cdn0(
296 &mut self,
297 path: &str,
298 upload_attributes: AttachmentV2UploadAttributes,
299 filename: String,
300 mut reader: impl Read + Send,
301 ) -> Result<(), ServiceError> {
302 let mut buf = Vec::new();
303 reader
304 .read_to_end(&mut buf)
305 .expect("infallible Read instance");
306
307 let form = reqwest::multipart::Form::new()
310 .text("acl", upload_attributes.acl)
311 .text("key", upload_attributes.key)
312 .text("policy", upload_attributes.policy)
313 .text("Content-Type", "application/octet-stream")
314 .text("x-amz-algorithm", upload_attributes.algorithm)
315 .text("x-amz-credential", upload_attributes.credential)
316 .text("x-amz-date", upload_attributes.date)
317 .text("x-amz-signature", upload_attributes.signature)
318 .part(
319 "file",
320 Part::stream(buf)
321 .mime_str("application/octet-stream")?
322 .file_name(filename),
323 );
324
325 let response = self
326 .request(
327 Method::POST,
328 Endpoint::cdn(0, path),
329 HttpAuthOverride::NoOverride,
330 )?
331 .multipart(form)
332 .send()
333 .await?
334 .error_for_status()?;
335
336 debug!("HyperPushService::PUT response: {:?}", response);
337
338 Ok(())
339 }
340
341 #[tracing::instrument(skip(self, content))]
342 async fn upload_to_cdn2(
343 &mut self,
344 resumable_url: &Url,
345 content_length: u64,
346 mut content: impl std::io::Read + std::io::Seek + Send,
347 ) -> Result<AttachmentDigest, ServiceError> {
348 let resume_info = self
349 .get_attachment_resume_info_cdn2(resumable_url, content_length)
350 .await?;
351
352 let mut digester =
353 crate::digeststream::DigestingReader::new(&mut content);
354
355 let mut buf = Vec::new();
356 digester.read_to_end(&mut buf)?;
357
358 trace!("digested content");
359
360 let mut request = self.request(
361 Method::PUT,
362 Endpoint::cdn_url(2, resumable_url),
363 HttpAuthOverride::Unidentified,
364 )?;
365
366 if let Some(content_range) = resume_info.content_range {
367 request = request.header(CONTENT_RANGE, content_range);
368 }
369
370 request.body(buf).send().await?.error_for_status()?;
371
372 Ok(AttachmentDigest {
373 digest: digester.finalize(),
374 incremental_digest: None,
375 incremental_mac_chunk_size: 0,
376 })
377 }
378
379 #[tracing::instrument(skip(self, content))]
380 async fn upload_to_cdn3(
381 &mut self,
382 resumable_url: &Url,
383 headers: &HashMap<String, String>,
384 content_length: u64,
385 mut content: impl std::io::Read + std::io::Seek + Send,
386 ) -> Result<AttachmentDigest, ServiceError> {
387 let resume_info = self
388 .get_attachment_resume_info_cdn3(resumable_url, headers)
389 .await?;
390
391 trace!(?resume_info, "got resume info");
392
393 if resume_info.content_start == content_length {
394 let mut digester =
395 crate::digeststream::DigestingReader::new(&mut content);
396 let mut buf = Vec::new();
397 digester.read_to_end(&mut buf)?;
398 return Ok(AttachmentDigest {
399 digest: digester.finalize(),
400 incremental_digest: None,
401 incremental_mac_chunk_size: 0,
402 });
403 }
404
405 let mut digester =
406 crate::digeststream::DigestingReader::new(&mut content);
407 digester.seek(SeekFrom::Start(resume_info.content_start))?;
408
409 let mut buf = Vec::new();
410 digester.read_to_end(&mut buf)?;
411
412 trace!("digested content");
413
414 let mut request = self.request(
415 Method::PATCH,
416 Endpoint::cdn(3, resumable_url.path()),
417 HttpAuthOverride::Unidentified,
418 )?;
419
420 for (key, value) in headers {
421 request = request.header(key, value);
422 }
423
424 request
425 .header("Tus-Resumable", "1.0.0")
426 .header("Upload-Offset", resume_info.content_start)
427 .header("Upload-Length", buf.len())
428 .header(CONTENT_TYPE, "application/offset+octet-stream")
429 .body(buf)
430 .send()
431 .await?
432 .error_for_status()?;
433
434 trace!("attachment uploaded");
435
436 Ok(AttachmentDigest {
437 digest: digester.finalize(),
438 incremental_digest: None,
439 incremental_mac_chunk_size: 0,
440 })
441 }
442}