libsignal_service/
receiver.rs

1use bytes::{Buf, Bytes};
2
3use crate::{
4    attachment_cipher::decrypt_in_place,
5    configuration::ServiceCredentials,
6    messagepipe::MessagePipe,
7    models::{Contact, ParseContactError},
8    push_service::*,
9};
10
11/// Equivalent of Java's `SignalServiceMessageReceiver`.
12#[derive(Clone)]
13pub struct MessageReceiver {
14    service: PushService,
15}
16
17impl MessageReceiver {
18    // TODO: to avoid providing the wrong service/wrong credentials
19    // change it like LinkingManager or ProvisioningManager
20    pub fn new(service: PushService) -> Self {
21        MessageReceiver { service }
22    }
23
24    pub async fn create_message_pipe(
25        &mut self,
26        credentials: ServiceCredentials,
27        allow_stories: bool,
28    ) -> Result<MessagePipe, ServiceError> {
29        let headers = &[(
30            "X-Signal-Receive-Stories",
31            if allow_stories { "true" } else { "false" },
32        )];
33        let ws = self
34            .service
35            .ws(
36                "/v1/websocket/",
37                "/v1/keepalive",
38                headers,
39                Some(credentials.clone()),
40            )
41            .await?;
42        Ok(MessagePipe::from_socket(ws, credentials))
43    }
44
45    pub async fn retrieve_contacts(
46        &mut self,
47        contacts: &crate::proto::sync_message::Contacts,
48    ) -> Result<
49        impl Iterator<Item = Result<Contact, ParseContactError>>,
50        ServiceError,
51    > {
52        if let Some(ref blob) = contacts.blob {
53            use futures::io::AsyncReadExt;
54
55            const MAX_DOWNLOAD_RETRIES: u8 = 3;
56            let mut retries = 0;
57
58            let mut stream = loop {
59                let r = self.service.get_attachment(blob).await;
60                match r {
61                    Ok(stream) => break stream,
62                    Err(ServiceError::Timeout { .. }) => {
63                        tracing::warn!("get_attachment timed out, retrying");
64                        retries += 1;
65                        if retries >= MAX_DOWNLOAD_RETRIES {
66                            return Err(ServiceError::Timeout {
67                                reason: "too many retries",
68                            });
69                        }
70                    },
71                    Err(e) => return Err(e),
72                }
73            };
74
75            let mut ciphertext = Vec::new();
76            stream
77                .read_to_end(&mut ciphertext)
78                .await
79                .expect("streamed attachment");
80
81            let key_material = blob.key();
82            assert_eq!(
83                key_material.len(),
84                64,
85                "key material for attachments is ought to be 64 bytes"
86            );
87            let mut key = [0u8; 64];
88            key.copy_from_slice(key_material);
89
90            decrypt_in_place(key, &mut ciphertext)
91                .expect("attachment decryption");
92
93            Ok(DeviceContactsIterator::new(Bytes::from(ciphertext)))
94        } else {
95            Ok(DeviceContactsIterator::default())
96        }
97    }
98}
99
100#[derive(Default)]
101struct DeviceContactsIterator {
102    decrypted_buffer: Bytes,
103}
104
105impl DeviceContactsIterator {
106    fn new(decrypted_buffer: Bytes) -> Self {
107        Self { decrypted_buffer }
108    }
109}
110
111impl Iterator for DeviceContactsIterator {
112    type Item = Result<Contact, ParseContactError>;
113
114    fn next(&mut self) -> Option<Self::Item> {
115        use crate::proto::{contact_details::Avatar, ContactDetails};
116
117        if !self.decrypted_buffer.has_remaining() {
118            return None;
119        }
120
121        let contact_details: ContactDetails =
122            prost::Message::decode_length_delimited(&mut self.decrypted_buffer)
123                .map_err(ParseContactError::Protobuf)
124                .ok()?;
125
126        let avatar_data = if let Some(Avatar {
127            length: Some(length),
128            ..
129        }) = contact_details.avatar
130        {
131            Some(self.decrypted_buffer.copy_to_bytes(length as usize))
132        } else {
133            None
134        };
135
136        Some(Contact::from_proto(contact_details, avatar_data))
137    }
138}