libsignal_service/
sender.rs

1use std::{collections::HashSet, time::SystemTime};
2
3use chrono::prelude::*;
4use libsignal_core::{curve::CurveError, InvalidDeviceId};
5use libsignal_protocol::{
6    process_prekey_bundle, Aci, DeviceId, IdentityKey, IdentityKeyPair, Pni,
7    ProtocolStore, SenderCertificate, SenderKeyStore, ServiceId,
8    SignalProtocolError,
9};
10use rand::{rng, CryptoRng, Rng};
11use tracing::{debug, error, info, trace, warn};
12use tracing_futures::Instrument;
13use uuid::Uuid;
14use zkgroup::GROUP_IDENTIFIER_LEN;
15
16use crate::{
17    cipher::{get_preferred_protocol_address, ServiceCipher},
18    content::ContentBody,
19    proto::{
20        attachment_pointer::{
21            AttachmentIdentifier, Flags as AttachmentPointerFlags,
22        },
23        sync_message::{
24            self, message_request_response, MessageRequestResponse,
25        },
26        AttachmentPointer, SyncMessage,
27    },
28    push_service::*,
29    service_address::ServiceIdExt,
30    session_store::SessionStoreExt,
31    unidentified_access::UnidentifiedAccess,
32    utils::{serde_device_id, serde_service_id},
33    websocket::{self, SignalWebSocket},
34};
35
36pub use crate::proto::ContactDetails;
37
38#[derive(serde::Serialize, Debug)]
39#[serde(rename_all = "camelCase")]
40pub struct OutgoingPushMessage {
41    pub r#type: u32,
42    #[serde(with = "serde_device_id")]
43    pub destination_device_id: DeviceId,
44    pub destination_registration_id: u32,
45    pub content: String,
46}
47
48#[derive(serde::Serialize, Debug)]
49pub struct OutgoingPushMessages {
50    #[serde(with = "serde_service_id")]
51    pub destination: ServiceId,
52    pub timestamp: u64,
53    pub messages: Vec<OutgoingPushMessage>,
54    pub online: bool,
55}
56
57#[derive(serde::Deserialize, Debug)]
58#[serde(rename_all = "camelCase")]
59pub struct SendMessageResponse {
60    pub needs_sync: bool,
61}
62
63pub type SendMessageResult = Result<SentMessage, MessageSenderError>;
64
65#[derive(Debug, Clone)]
66pub struct SentMessage {
67    pub recipient: ServiceId,
68    pub used_identity_key: IdentityKey,
69    pub unidentified: bool,
70    pub needs_sync: bool,
71}
72
73/// Attachment specification to be used for uploading.
74///
75/// Loose equivalent of Java's `SignalServiceAttachmentStream`.
76#[derive(Debug, Default)]
77pub struct AttachmentSpec {
78    pub content_type: String,
79    pub length: usize,
80    pub file_name: Option<String>,
81    pub preview: Option<Vec<u8>>,
82    pub voice_note: Option<bool>,
83    pub borderless: Option<bool>,
84    pub width: Option<u32>,
85    pub height: Option<u32>,
86    pub caption: Option<String>,
87    pub blur_hash: Option<String>,
88}
89
90#[derive(Clone)]
91pub struct MessageSender<S> {
92    identified_ws: SignalWebSocket<websocket::Identified>,
93    unidentified_ws: SignalWebSocket<websocket::Unidentified>,
94    service: PushService,
95    cipher: ServiceCipher<S>,
96    protocol_store: S,
97    local_aci: Aci,
98    local_pni: Pni,
99    aci_identity: IdentityKeyPair,
100    pni_identity: Option<IdentityKeyPair>,
101    device_id: DeviceId,
102}
103
104#[derive(thiserror::Error, Debug)]
105pub enum AttachmentUploadError {
106    #[error("{0}")]
107    ServiceError(#[from] ServiceError),
108
109    #[error("Could not read attachment contents")]
110    IoError(#[from] std::io::Error),
111}
112
113#[derive(thiserror::Error, Debug)]
114pub enum MessageSenderError {
115    #[error("service error: {0}")]
116    ServiceError(#[from] ServiceError),
117
118    #[error("protocol error: {0}")]
119    ProtocolError(#[from] SignalProtocolError),
120
121    #[error("invalid private key: {0}")]
122    InvalidPrivateKey(#[from] CurveError),
123
124    #[error("invalid device ID: {0}")]
125    InvalidDeviceId(#[from] InvalidDeviceId),
126
127    #[error("Failed to upload attachment {0}")]
128    AttachmentUploadError(#[from] AttachmentUploadError),
129
130    #[error("primary device can't send sync message {0:?}")]
131    SendSyncMessageError(sync_message::request::Type),
132
133    #[error("Untrusted identity key with {address:?}")]
134    UntrustedIdentity { address: ServiceId },
135
136    #[error("Exceeded maximum number of retries")]
137    MaximumRetriesLimitExceeded,
138
139    #[error("Proof of type {options:?} required using token {token}")]
140    ProofRequired { token: String, options: Vec<String> },
141
142    #[error("Recipient not found: {service_id:?}")]
143    NotFound { service_id: ServiceId },
144
145    #[error("no messages were encrypted: this should not really happen and most likely implies a logic error")]
146    NoMessagesToSend,
147}
148
149pub type GroupV2Id = [u8; GROUP_IDENTIFIER_LEN];
150
151#[derive(Debug)]
152pub enum ThreadIdentifier {
153    Aci(Uuid),
154    Group(GroupV2Id),
155}
156
157#[derive(Debug)]
158pub struct EncryptedMessages {
159    messages: Vec<OutgoingPushMessage>,
160    used_identity_key: IdentityKey,
161}
162
163impl<S> MessageSender<S>
164where
165    S: ProtocolStore + SenderKeyStore + SessionStoreExt + Sync + Clone,
166{
167    #[allow(clippy::too_many_arguments)]
168    pub fn new(
169        identified_ws: SignalWebSocket<websocket::Identified>,
170        unidentified_ws: SignalWebSocket<websocket::Unidentified>,
171        service: PushService,
172        cipher: ServiceCipher<S>,
173        protocol_store: S,
174        local_aci: impl Into<Aci>,
175        local_pni: impl Into<Pni>,
176        aci_identity: IdentityKeyPair,
177        pni_identity: Option<IdentityKeyPair>,
178        device_id: DeviceId,
179    ) -> Self {
180        MessageSender {
181            service,
182            identified_ws,
183            unidentified_ws,
184            cipher,
185            protocol_store,
186            local_aci: local_aci.into(),
187            local_pni: local_pni.into(),
188            aci_identity,
189            pni_identity,
190            device_id,
191        }
192    }
193
194    /// Encrypts and uploads an attachment
195    ///
196    /// Contents are accepted as an owned, plain text Vec, because encryption happens in-place.
197    #[tracing::instrument(skip(self, contents, csprng), fields(size = contents.len()))]
198    pub async fn upload_attachment<R: Rng + CryptoRng>(
199        &mut self,
200        spec: AttachmentSpec,
201        mut contents: Vec<u8>,
202        csprng: &mut R,
203    ) -> Result<AttachmentPointer, AttachmentUploadError> {
204        let len = contents.len();
205        // Encrypt
206        let (key, iv) = {
207            let mut key = [0u8; 64];
208            let mut iv = [0u8; 16];
209            csprng.fill_bytes(&mut key);
210            csprng.fill_bytes(&mut iv);
211            (key, iv)
212        };
213
214        // Padded length uses an exponential bracketting thingy.
215        // If you want to see how it looks:
216        // https://www.wolframalpha.com/input/?i=plot+floor%281.05%5Eceil%28log_1.05%28x%29%29%29+for+x+from+0+to+5000000
217        let padded_len: usize = {
218            // Java:
219            // return (int) Math.max(541, Math.floor(Math.pow(1.05, Math.ceil(Math.log(size) / Math.log(1.05)))))
220            std::cmp::max(
221                541,
222                1.05f64.powf((len as f64).log(1.05).ceil()).floor() as usize,
223            )
224        };
225        if padded_len < len {
226            error!(
227                "Padded len {} < len {}. Continuing with a privacy risk.",
228                padded_len, len
229            );
230        } else {
231            contents.resize(padded_len, 0);
232        }
233
234        tracing::trace_span!("encrypting attachment").in_scope(|| {
235            crate::attachment_cipher::encrypt_in_place(iv, key, &mut contents)
236        });
237
238        // Request upload attributes
239        // TODO: we can actually store the upload spec to be able to resume the upload later
240        // if it fails or stalls (= we should at least split the API calls so clients can decide what to do)
241        let attachment_upload_form = self
242            .service
243            .get_attachment_v4_upload_attributes()
244            .instrument(tracing::trace_span!("requesting upload attributes"))
245            .await?;
246
247        let resumable_upload_url = self
248            .service
249            .get_attachment_resumable_upload_url(&attachment_upload_form)
250            .await?;
251
252        let attachment_digest = self
253            .service
254            .upload_attachment_v4(
255                attachment_upload_form.cdn,
256                &resumable_upload_url,
257                contents.len() as u64,
258                attachment_upload_form.headers,
259                &mut std::io::Cursor::new(&contents),
260            )
261            .await?;
262
263        Ok(AttachmentPointer {
264            content_type: Some(spec.content_type),
265            key: Some(key.to_vec()),
266            size: Some(len as u32),
267            // thumbnail: Option<Vec<u8>>,
268            digest: Some(attachment_digest.digest),
269            file_name: spec.file_name,
270            flags: Some(
271                if spec.voice_note == Some(true) {
272                    AttachmentPointerFlags::VoiceMessage as u32
273                } else {
274                    0
275                } | if spec.borderless == Some(true) {
276                    AttachmentPointerFlags::Borderless as u32
277                } else {
278                    0
279                },
280            ),
281            width: spec.width,
282            height: spec.height,
283            caption: spec.caption,
284            blur_hash: spec.blur_hash,
285            upload_timestamp: Some(
286                SystemTime::now()
287                    .duration_since(SystemTime::UNIX_EPOCH)
288                    .expect("unix epoch in the past")
289                    .as_millis() as u64,
290            ),
291            cdn_number: Some(attachment_upload_form.cdn),
292            attachment_identifier: Some(AttachmentIdentifier::CdnKey(
293                attachment_upload_form.key,
294            )),
295            ..Default::default()
296        })
297    }
298
299    /// Upload contact details to the CDN
300    ///
301    /// Returns attachment ID and the attachment digest
302    #[tracing::instrument(skip(self, contacts))]
303    async fn upload_contact_details<Contacts>(
304        &mut self,
305        contacts: Contacts,
306    ) -> Result<AttachmentPointer, AttachmentUploadError>
307    where
308        Contacts: IntoIterator<Item = ContactDetails>,
309    {
310        use prost::Message;
311        let mut out = Vec::new();
312        for contact in contacts {
313            contact
314                .encode_length_delimited(&mut out)
315                .expect("infallible encoding");
316            // XXX add avatar here
317        }
318
319        let spec = AttachmentSpec {
320            content_type: "application/octet-stream".into(),
321            length: out.len(),
322            file_name: None,
323            preview: None,
324            voice_note: None,
325            borderless: None,
326            width: None,
327            height: None,
328            caption: None,
329            blur_hash: None,
330        };
331        self.upload_attachment(spec, out, &mut rng()).await
332    }
333
334    /// Return whether we have to prepare sync messages for other devices
335    ///
336    /// - If we are the main registered device, and there are established sub-device sessions (linked clients), return true
337    /// - If we are a secondary linked device, return true
338    async fn is_multi_device(&self) -> bool {
339        if self.device_id == *DEFAULT_DEVICE_ID {
340            self.protocol_store
341                .get_sub_device_sessions(&self.local_aci.into())
342                .await
343                .is_ok_and(|s| !s.is_empty())
344        } else {
345            true
346        }
347    }
348
349    /// Send a message `content` to a single `recipient`.
350    #[tracing::instrument(
351        skip(self, unidentified_access, message),
352        fields(unidentified_access = unidentified_access.is_some(), recipient = recipient.service_id_string()),
353    )]
354    pub async fn send_message(
355        &mut self,
356        recipient: &ServiceId,
357        mut unidentified_access: Option<UnidentifiedAccess>,
358        message: impl Into<ContentBody>,
359        timestamp: u64,
360        include_pni_signature: bool,
361        online: bool,
362    ) -> SendMessageResult {
363        let content_body = message.into();
364        let message_to_self = recipient == &self.local_aci;
365        let sync_message =
366            matches!(content_body, ContentBody::SynchronizeMessage(..));
367        let is_multi_device = self.is_multi_device().await;
368
369        use crate::proto::data_message::Flags;
370
371        let end_session = match &content_body {
372            ContentBody::DataMessage(message) => {
373                message.flags == Some(Flags::EndSession as u32)
374            },
375            _ => false,
376        };
377
378        // only send a sync message when sending to self and skip the rest of the process
379        if message_to_self && is_multi_device && !sync_message {
380            debug!("sending note to self");
381            if let Some(sync_message) = self
382                .create_multi_device_sent_transcript_content(
383                    Some(recipient),
384                    content_body,
385                    timestamp,
386                    None,
387                )
388            {
389                return self
390                    .try_send_message(
391                        *recipient,
392                        None,
393                        &sync_message,
394                        timestamp,
395                        include_pni_signature,
396                        online,
397                    )
398                    .await;
399            } else {
400                error!("could not create sync message from message to self");
401                return SendMessageResult::Err(
402                    MessageSenderError::NoMessagesToSend,
403                );
404            }
405        }
406
407        // don't send session enders as sealed sender
408        // sync messages are never sent as unidentified (reasons unclear), see: https://github.com/signalapp/Signal-Android/blob/main/libsignal-service/src/main/java/org/whispersystems/signalservice/api/SignalServiceMessageSender.java#L779
409        if end_session || sync_message {
410            unidentified_access.take();
411        }
412
413        // try to send the original message to all the recipient's devices
414        let result = self
415            .try_send_message(
416                *recipient,
417                unidentified_access.as_ref(),
418                &content_body,
419                timestamp,
420                include_pni_signature,
421                online,
422            )
423            .await;
424
425        let needs_sync = match &result {
426            Ok(SentMessage { needs_sync, .. }) => *needs_sync,
427            _ => false,
428        };
429
430        if needs_sync || is_multi_device {
431            debug!("sending multi-device sync message");
432            if let Some(sync) = if sync_message {
433                Some(content_body)
434            } else {
435                self.create_multi_device_sent_transcript_content(
436                    Some(recipient),
437                    content_body,
438                    timestamp,
439                    Some(&result),
440                )
441            } {
442                self.try_send_message(
443                    self.local_aci.into(),
444                    None,
445                    &sync,
446                    timestamp,
447                    false,
448                    false,
449                )
450                .await?;
451            } else {
452                error!("could not create sync message from a direct message");
453            }
454        }
455
456        if end_session {
457            let n = self.protocol_store.delete_all_sessions(recipient).await?;
458            tracing::debug!(
459                "ended {} sessions with {}",
460                n,
461                recipient.raw_uuid()
462            );
463        }
464
465        result
466    }
467
468    /// Send a message to the recipients in a group.
469    ///
470    /// Recipients are a list of tuples, each containing:
471    /// - The recipient's address
472    /// - The recipient's unidentified access
473    /// - Whether the recipient requires a PNI signature
474    #[tracing::instrument(
475        skip(self, recipients, message),
476        fields(recipients = recipients.as_ref().len()),
477    )]
478    pub async fn send_message_to_group(
479        &mut self,
480        recipients: impl AsRef<[(ServiceId, Option<UnidentifiedAccess>, bool)]>,
481        message: impl Into<ContentBody>,
482        timestamp: u64,
483        online: bool,
484    ) -> Vec<SendMessageResult> {
485        let content_body: ContentBody = message.into();
486        let mut results = vec![];
487
488        let mut needs_sync_in_results = false;
489
490        for (recipient, unidentified_access, include_pni_signature) in
491            recipients.as_ref()
492        {
493            let result = self
494                .try_send_message(
495                    *recipient,
496                    unidentified_access.as_ref(),
497                    &content_body,
498                    timestamp,
499                    *include_pni_signature,
500                    online,
501                )
502                .await;
503
504            match result {
505                Ok(SentMessage { needs_sync, .. }) if needs_sync => {
506                    needs_sync_in_results = true;
507                },
508                _ => (),
509            };
510
511            results.push(result);
512        }
513
514        // we only need to send a synchronization message once
515        if needs_sync_in_results || self.is_multi_device().await {
516            if let Some(sync_message) = self
517                .create_multi_device_sent_transcript_content(
518                    None,
519                    content_body.clone(),
520                    timestamp,
521                    &results,
522                )
523            {
524                // Note: the result of sending a sync message is not included in results
525                // See Signal Android `SignalServiceMessageSender.java:2817`
526                if let Err(error) = self
527                    .try_send_message(
528                        self.local_aci.into(),
529                        None,
530                        &sync_message,
531                        timestamp,
532                        false, // XXX: maybe the sync device does want a PNI signature?
533                        false,
534                    )
535                    .await
536                {
537                    error!(%error, "failed to send a synchronization message");
538                }
539            } else {
540                error!("could not create sync message from a group message")
541            }
542        }
543
544        results
545    }
546
547    /// Send a message (`content`) to an address (`recipient`).
548    #[tracing::instrument(
549        level = "trace",
550        skip(self, unidentified_access, content_body, recipient),
551        fields(unidentified_access = unidentified_access.is_some(), recipient = recipient.service_id_string()),
552    )]
553    async fn try_send_message(
554        &mut self,
555        recipient: ServiceId,
556        mut unidentified_access: Option<&UnidentifiedAccess>,
557        content_body: &ContentBody,
558        timestamp: u64,
559        include_pni_signature: bool,
560        online: bool,
561    ) -> SendMessageResult {
562        trace!("trying to send a message");
563
564        use prost::Message;
565
566        let mut content = content_body.clone().into_proto();
567        if include_pni_signature {
568            content.pni_signature_message = Some(self.create_pni_signature()?);
569        }
570
571        let content_bytes = content.encode_to_vec();
572
573        let mut rng = rng();
574
575        for _ in 0..4u8 {
576            let Some(EncryptedMessages {
577                messages,
578                used_identity_key,
579            }) = self
580                .create_encrypted_messages(
581                    &recipient,
582                    unidentified_access.map(|x| &x.certificate),
583                    &content_bytes,
584                )
585                .await?
586            else {
587                // this can happen for example when a device is primary, without any secondaries
588                // and we send a message to ourselves (which is only a SyncMessage { sent: ... })
589                // addressed to self
590                return Err(MessageSenderError::NoMessagesToSend);
591            };
592
593            let messages = OutgoingPushMessages {
594                destination: recipient,
595                timestamp,
596                messages,
597                online,
598            };
599
600            let send = if let Some(unidentified) = &unidentified_access {
601                tracing::debug!("sending via unidentified");
602                self.unidentified_ws
603                    .send_messages_unidentified(messages, unidentified)
604                    .await
605            } else {
606                tracing::debug!("sending identified");
607                self.identified_ws.send_messages(messages).await
608            };
609
610            match send {
611                Ok(SendMessageResponse { needs_sync }) => {
612                    tracing::debug!("message sent!");
613                    return Ok(SentMessage {
614                        recipient,
615                        used_identity_key,
616                        unidentified: unidentified_access.is_some(),
617                        needs_sync,
618                    });
619                },
620                Err(ServiceError::Unauthorized)
621                    if unidentified_access.is_some() =>
622                {
623                    tracing::trace!("unauthorized error using unidentified; retry over identified");
624                    unidentified_access = None;
625                },
626                Err(ServiceError::MismatchedDevicesException(ref m)) => {
627                    tracing::debug!("{:?}", m);
628                    for extra_device_id in &m.extra_devices {
629                        tracing::debug!(
630                            "dropping session with device {}",
631                            extra_device_id
632                        );
633                        self.protocol_store
634                            .delete_service_addr_device_session(
635                                &recipient
636                                    .to_protocol_address(*extra_device_id)?,
637                            )
638                            .await?;
639                    }
640
641                    for missing_device_id in &m.missing_devices {
642                        tracing::debug!(
643                            "creating session with missing device {}",
644                            missing_device_id
645                        );
646                        let remote_address = recipient
647                            .to_protocol_address(*missing_device_id)?;
648                        let pre_key = self
649                            .identified_ws
650                            .get_pre_key(&recipient, *missing_device_id)
651                            .await?;
652
653                        process_prekey_bundle(
654                            &remote_address,
655                            &mut self.protocol_store.clone(),
656                            &mut self.protocol_store,
657                            &pre_key,
658                            SystemTime::now(),
659                            &mut rng,
660                        )
661                        .await
662                        .map_err(|e| {
663                            error!("failed to create session: {}", e);
664                            MessageSenderError::UntrustedIdentity {
665                                address: recipient,
666                            }
667                        })?;
668                    }
669                },
670                Err(ServiceError::StaleDevices(ref m)) => {
671                    tracing::debug!("{:?}", m);
672                    for extra_device_id in &m.stale_devices {
673                        tracing::debug!(
674                            "dropping session with device {}",
675                            extra_device_id
676                        );
677                        self.protocol_store
678                            .delete_service_addr_device_session(
679                                &recipient
680                                    .to_protocol_address(*extra_device_id)?,
681                            )
682                            .await?;
683                    }
684                },
685                Err(ServiceError::ProofRequiredError(ref p)) => {
686                    tracing::debug!("{:?}", p);
687                    return Err(MessageSenderError::ProofRequired {
688                        token: p.token.clone(),
689                        options: p.options.clone(),
690                    });
691                },
692                Err(ServiceError::NotFoundError) => {
693                    tracing::debug!("Not found when sending a message");
694                    return Err(MessageSenderError::NotFound {
695                        service_id: recipient,
696                    });
697                },
698                Err(e) => {
699                    tracing::debug!(
700                        "Default error handler for ws.send_messages: {}",
701                        e
702                    );
703                    return Err(MessageSenderError::ServiceError(e));
704                },
705            }
706        }
707
708        Err(MessageSenderError::MaximumRetriesLimitExceeded)
709    }
710
711    /// Upload contact details to the CDN and send a sync message
712    #[tracing::instrument(
713        skip(self, unidentified_access, contacts, recipient),
714        fields(unidentified_access = unidentified_access.is_some(), recipient = recipient.service_id_string()),
715    )]
716    pub async fn send_contact_details<Contacts>(
717        &mut self,
718        recipient: &ServiceId,
719        unidentified_access: Option<UnidentifiedAccess>,
720        // XXX It may be interesting to use an intermediary type,
721        //     instead of ContactDetails directly,
722        //     because it allows us to add the avatar content.
723        contacts: Contacts,
724        online: bool,
725        complete: bool,
726    ) -> Result<(), MessageSenderError>
727    where
728        Contacts: IntoIterator<Item = ContactDetails>,
729    {
730        let ptr = self.upload_contact_details(contacts).await?;
731
732        let msg = SyncMessage {
733            contacts: Some(sync_message::Contacts {
734                blob: Some(ptr),
735                complete: Some(complete),
736            }),
737            ..SyncMessage::with_padding(&mut rng())
738        };
739
740        self.send_sync_message(msg).await?;
741
742        Ok(())
743    }
744
745    /// Send `MessageRequestResponse` synchronization message with either a recipient ACI or a GroupV2 ID
746    #[tracing::instrument(skip(self), fields(recipient = recipient.service_id_string()))]
747    pub async fn send_message_request_response(
748        &mut self,
749        recipient: &ServiceId,
750        thread: &ThreadIdentifier,
751        action: message_request_response::Type,
752    ) -> Result<(), MessageSenderError> {
753        let message_request_response = Some(match thread {
754            ThreadIdentifier::Aci(aci) => {
755                tracing::debug!(
756                    "sending message request response {:?} for recipient {:?}",
757                    action,
758                    aci
759                );
760                MessageRequestResponse {
761                    thread_aci: Some(aci.to_string()),
762                    thread_aci_binary: Some(aci.into_bytes().to_vec()),
763                    group_id: None,
764                    r#type: Some(action.into()),
765                }
766            },
767            ThreadIdentifier::Group(id) => {
768                tracing::debug!(
769                    "sending message request response {:?} for group {:?}",
770                    action,
771                    id
772                );
773                MessageRequestResponse {
774                    thread_aci: None,
775                    thread_aci_binary: None,
776                    group_id: Some(id.to_vec()),
777                    r#type: Some(action.into()),
778                }
779            },
780        });
781
782        let msg = SyncMessage {
783            message_request_response,
784            ..SyncMessage::with_padding(&mut rng())
785        };
786
787        let ts = Utc::now().timestamp_millis() as u64;
788        self.send_message(recipient, None, msg, ts, false, false)
789            .await?;
790
791        Ok(())
792    }
793
794    /// Send a `SyncMessage` to own devices, if any.
795    pub async fn send_sync_message(
796        &mut self,
797        sync: SyncMessage,
798    ) -> Result<(), MessageSenderError> {
799        if self.is_multi_device().await {
800            let content = sync.into();
801            let timestamp = Utc::now().timestamp_millis() as u64;
802            debug!(
803                "sending multi-device sync message with content {content:?}"
804            );
805            self.try_send_message(
806                self.local_aci.into(),
807                None,
808                &content,
809                timestamp,
810                false,
811                false,
812            )
813            .await?;
814        }
815        Ok(())
816    }
817
818    /// Send a `SyncMessage` request message
819    #[tracing::instrument(skip(self))]
820    pub async fn send_sync_message_request(
821        &mut self,
822        recipient: &ServiceId,
823        request_type: sync_message::request::Type,
824    ) -> Result<(), MessageSenderError> {
825        if self.device_id == *DEFAULT_DEVICE_ID {
826            return Err(MessageSenderError::SendSyncMessageError(request_type));
827        }
828
829        let msg = SyncMessage {
830            request: Some(sync_message::Request {
831                r#type: Some(request_type.into()),
832            }),
833            ..SyncMessage::with_padding(&mut rng())
834        };
835        self.send_sync_message(msg).await?;
836
837        Ok(())
838    }
839
840    #[tracing::instrument(level = "trace", skip(self))]
841    fn create_pni_signature(
842        &mut self,
843    ) -> Result<crate::proto::PniSignatureMessage, MessageSenderError> {
844        let mut rng = rng();
845        let signature = self
846            .pni_identity
847            .expect("PNI key set when PNI signature requested")
848            .sign_alternate_identity(
849                self.aci_identity.identity_key(),
850                &mut rng,
851            )?;
852        Ok(crate::proto::PniSignatureMessage {
853            pni: Some(self.local_pni.service_id_binary()),
854            signature: Some(signature.into()),
855        })
856    }
857
858    // Equivalent with `getEncryptedMessages`
859    #[tracing::instrument(
860        level = "trace",
861        skip(self, unidentified_access, content),
862        fields(unidentified_access = unidentified_access.is_some(), recipient = recipient.service_id_string()),
863    )]
864    async fn create_encrypted_messages(
865        &mut self,
866        recipient: &ServiceId,
867        unidentified_access: Option<&SenderCertificate>,
868        content: &[u8],
869    ) -> Result<Option<EncryptedMessages>, MessageSenderError> {
870        let mut messages = vec![];
871
872        let mut devices: HashSet<DeviceId> = self
873            .protocol_store
874            .get_sub_device_sessions(recipient)
875            .await?
876            .into_iter()
877            .collect();
878
879        // always send to the primary device no matter what
880        devices.insert(*DEFAULT_DEVICE_ID);
881
882        // never try to send messages to the sender device
883        match recipient {
884            ServiceId::Aci(aci) => {
885                if *aci == self.local_aci {
886                    devices.remove(&self.device_id);
887                }
888            },
889            ServiceId::Pni(pni) => {
890                if *pni == self.local_pni {
891                    devices.remove(&self.device_id);
892                }
893            },
894        };
895
896        for device_id in devices {
897            trace!("sending message to device {}", device_id);
898            // `create_encrypted_message` may fail with `SessionNotFound` if the session is corrupted;
899            // see https://github.com/whisperfish/libsignal-client/commit/601454d20.
900            // If this happens, delete the session and retry.
901            for _attempt in 0..2 {
902                match self
903                    .create_encrypted_message(
904                        recipient,
905                        unidentified_access,
906                        device_id,
907                        content,
908                    )
909                    .await
910                {
911                    Ok(message) => {
912                        messages.push(message);
913                        break;
914                    },
915                    Err(MessageSenderError::ServiceError(
916                        ServiceError::SignalProtocolError(
917                            SignalProtocolError::SessionNotFound(addr),
918                        ),
919                    )) => {
920                        // SessionNotFound is returned on certain session corruption.
921                        // Since delete_session *creates* a session if it doesn't exist,
922                        // the NotFound error is an indicator of session corruption.
923                        // Try to delete this session, if it gets succesfully deleted, retry.  Otherwise, fail.
924                        tracing::warn!("Potential session corruption for {}, deleting session", addr);
925                        match self.protocol_store.delete_session(&addr).await {
926                            Ok(()) => continue,
927                            Err(error) => {
928                                tracing::warn!(%error, %addr, "failed to delete session");
929                                return Err(
930                                    SignalProtocolError::SessionNotFound(addr)
931                                        .into(),
932                                );
933                            },
934                        }
935                    },
936                    Err(e) => return Err(e),
937                }
938            }
939        }
940
941        if messages.is_empty() {
942            Ok(None)
943        } else {
944            Ok(Some(EncryptedMessages {
945                messages,
946                used_identity_key: self
947                    .protocol_store
948                    .get_identity(
949                        &recipient.to_protocol_address(*DEFAULT_DEVICE_ID),
950                    )
951                    .await?
952                    .ok_or(MessageSenderError::UntrustedIdentity {
953                        address: *recipient,
954                    })?,
955            }))
956        }
957    }
958
959    /// Equivalent to `getEncryptedMessage`
960    ///
961    /// When no session with the recipient exists, we need to create one.
962    #[tracing::instrument(
963        level = "trace",
964        skip(self, unidentified_access, content),
965        fields(unidentified_access = unidentified_access.is_some(), recipient = recipient.service_id_string()),
966    )]
967    pub(crate) async fn create_encrypted_message(
968        &mut self,
969        recipient: &ServiceId,
970        unidentified_access: Option<&SenderCertificate>,
971        device_id: DeviceId,
972        content: &[u8],
973    ) -> Result<OutgoingPushMessage, MessageSenderError> {
974        let recipient_protocol_address =
975            recipient.to_protocol_address(device_id);
976
977        tracing::trace!(
978            "encrypting message for {}",
979            recipient_protocol_address
980        );
981
982        // establish a session with the recipient/device if necessary
983        // no need to establish a session with ourselves (and our own current device)
984        if self
985            .protocol_store
986            .load_session(&recipient_protocol_address)
987            .await?
988            .is_none()
989        {
990            info!(
991                "establishing new session with {}",
992                recipient_protocol_address
993            );
994            let pre_keys = match self
995                .identified_ws
996                .get_pre_keys(recipient, device_id)
997                .await
998            {
999                Ok(ok) => {
1000                    tracing::trace!("Get prekeys OK");
1001                    ok
1002                },
1003                Err(ServiceError::NotFoundError) => {
1004                    return Err(MessageSenderError::NotFound {
1005                        service_id: *recipient,
1006                    });
1007                },
1008                Err(e) => Err(e)?,
1009            };
1010
1011            let mut rng = rng();
1012
1013            for pre_key_bundle in pre_keys {
1014                if recipient == &self.local_aci
1015                    && self.device_id == pre_key_bundle.device_id()?
1016                {
1017                    trace!("not establishing a session with myself!");
1018                    continue;
1019                }
1020
1021                let pre_key_address = get_preferred_protocol_address(
1022                    &self.protocol_store,
1023                    recipient,
1024                    pre_key_bundle.device_id()?,
1025                )
1026                .await?;
1027
1028                process_prekey_bundle(
1029                    &pre_key_address,
1030                    &mut self.protocol_store.clone(),
1031                    &mut self.protocol_store,
1032                    &pre_key_bundle,
1033                    SystemTime::now(),
1034                    &mut rng,
1035                )
1036                .await?;
1037            }
1038        }
1039
1040        let message = self
1041            .cipher
1042            .encrypt(
1043                &recipient_protocol_address,
1044                unidentified_access,
1045                content,
1046                &mut rng(),
1047            )
1048            .instrument(tracing::trace_span!("encrypting message"))
1049            .await?;
1050
1051        Ok(message)
1052    }
1053
1054    fn create_multi_device_sent_transcript_content<'a>(
1055        &mut self,
1056        recipient: Option<&ServiceId>,
1057        content_body: ContentBody,
1058        timestamp: u64,
1059        send_message_results: impl IntoIterator<Item = &'a SendMessageResult>,
1060    ) -> Option<ContentBody> {
1061        use sync_message::sent::UnidentifiedDeliveryStatus;
1062        let (message, edit_message) = match content_body {
1063            ContentBody::DataMessage(m) => (Some(m), None),
1064            ContentBody::EditMessage(m) => (None, Some(m)),
1065            _ => return None,
1066        };
1067        let unidentified_status: Vec<UnidentifiedDeliveryStatus> =
1068            send_message_results
1069                .into_iter()
1070                .filter_map(|result| result.as_ref().ok())
1071                .map(|sent| {
1072                    let SentMessage {
1073                        recipient,
1074                        unidentified,
1075                        used_identity_key,
1076                        ..
1077                    } = sent;
1078                    UnidentifiedDeliveryStatus {
1079                        destination_service_id: Some(
1080                            recipient.service_id_string(),
1081                        ),
1082                        destination_service_id_binary: Some(
1083                            recipient.service_id_binary(),
1084                        ),
1085                        unidentified: Some(*unidentified),
1086                        destination_pni_identity_key: Some(
1087                            used_identity_key.serialize().into(),
1088                        ),
1089                    }
1090                })
1091                .collect();
1092        Some(ContentBody::SynchronizeMessage(SyncMessage {
1093            sent: Some(sync_message::Sent {
1094                destination_service_id: recipient
1095                    .map(ServiceId::service_id_string),
1096                destination_e164: None,
1097                expiration_start_timestamp: message
1098                    .as_ref()
1099                    .and_then(|m| m.expire_timer)
1100                    .map(|_| timestamp),
1101                message,
1102                edit_message,
1103                timestamp: Some(timestamp),
1104                unidentified_status,
1105                ..Default::default()
1106            }),
1107            ..SyncMessage::with_padding(&mut rng())
1108        }))
1109    }
1110}