libsignal_service/
receiver.rs1use bytes::{Buf, Bytes};
2
3use crate::{
4 attachment_cipher::decrypt_in_place,
5 configuration::ServiceCredentials,
6 messagepipe::MessagePipe,
7 models::{Contact, ParseContactError},
8 push_service::*,
9};
10
11#[derive(Clone)]
13pub struct MessageReceiver {
14 service: PushService,
15}
16
17impl MessageReceiver {
18 pub fn new(service: PushService) -> Self {
21 MessageReceiver { service }
22 }
23
24 pub async fn create_message_pipe(
25 &mut self,
26 credentials: ServiceCredentials,
27 allow_stories: bool,
28 ) -> Result<MessagePipe, ServiceError> {
29 let headers = &[(
30 "X-Signal-Receive-Stories",
31 if allow_stories { "true" } else { "false" },
32 )];
33 let ws = self
34 .service
35 .ws(
36 "/v1/websocket/",
37 "/v1/keepalive",
38 headers,
39 Some(credentials.clone()),
40 )
41 .await?;
42 Ok(MessagePipe::from_socket(ws, credentials))
43 }
44
45 pub async fn retrieve_contacts(
46 &mut self,
47 contacts: &crate::proto::sync_message::Contacts,
48 ) -> Result<
49 impl Iterator<Item = Result<Contact, ParseContactError>>,
50 ServiceError,
51 > {
52 if let Some(ref blob) = contacts.blob {
53 use futures::io::AsyncReadExt;
54
55 const MAX_DOWNLOAD_RETRIES: u8 = 3;
56 let mut retries = 0;
57
58 let mut stream = loop {
59 let r = self.service.get_attachment(blob).await;
60 match r {
61 Ok(stream) => break stream,
62 Err(ServiceError::Timeout { .. }) => {
63 tracing::warn!("get_attachment timed out, retrying");
64 retries += 1;
65 if retries >= MAX_DOWNLOAD_RETRIES {
66 return Err(ServiceError::Timeout {
67 reason: "too many retries",
68 });
69 }
70 },
71 Err(e) => return Err(e),
72 }
73 };
74
75 let mut ciphertext = Vec::new();
76 stream
77 .read_to_end(&mut ciphertext)
78 .await
79 .expect("streamed attachment");
80
81 let key_material = blob.key();
82 assert_eq!(
83 key_material.len(),
84 64,
85 "key material for attachments is ought to be 64 bytes"
86 );
87 let mut key = [0u8; 64];
88 key.copy_from_slice(key_material);
89
90 decrypt_in_place(key, &mut ciphertext)
91 .expect("attachment decryption");
92
93 Ok(DeviceContactsIterator::new(Bytes::from(ciphertext)))
94 } else {
95 Ok(DeviceContactsIterator::default())
96 }
97 }
98}
99
100#[derive(Default)]
101struct DeviceContactsIterator {
102 decrypted_buffer: Bytes,
103}
104
105impl DeviceContactsIterator {
106 fn new(decrypted_buffer: Bytes) -> Self {
107 Self { decrypted_buffer }
108 }
109}
110
111impl Iterator for DeviceContactsIterator {
112 type Item = Result<Contact, ParseContactError>;
113
114 fn next(&mut self) -> Option<Self::Item> {
115 use crate::proto::{contact_details::Avatar, ContactDetails};
116
117 if !self.decrypted_buffer.has_remaining() {
118 return None;
119 }
120
121 let contact_details: ContactDetails =
122 prost::Message::decode_length_delimited(&mut self.decrypted_buffer)
123 .map_err(ParseContactError::Protobuf)
124 .ok()?;
125
126 let avatar_data = if let Some(Avatar {
127 length: Some(length),
128 ..
129 }) = contact_details.avatar
130 {
131 Some(self.decrypted_buffer.copy_to_bytes(length as usize))
132 } else {
133 None
134 };
135
136 Some(Contact::from_proto(contact_details, avatar_data))
137 }
138}