libsignal_service/
sender.rs

1use std::{collections::HashSet, time::SystemTime};
2
3use chrono::prelude::*;
4use libsignal_core::{curve::CurveError, InvalidDeviceId};
5use libsignal_protocol::{
6    process_prekey_bundle, Aci, DeviceId, IdentityKey, IdentityKeyPair, Pni,
7    ProtocolStore, SenderCertificate, SenderKeyStore, ServiceId,
8    SignalProtocolError,
9};
10use rand::{rng, CryptoRng, Rng};
11use tracing::{debug, error, info, trace, warn};
12use tracing_futures::Instrument;
13use uuid::Uuid;
14use zkgroup::GROUP_IDENTIFIER_LEN;
15
16use crate::{
17    cipher::{get_preferred_protocol_address, ServiceCipher},
18    content::ContentBody,
19    proto::{
20        attachment_pointer::{
21            AttachmentIdentifier, Flags as AttachmentPointerFlags,
22        },
23        sync_message::{
24            self, message_request_response, MessageRequestResponse,
25        },
26        AttachmentPointer, SyncMessage,
27    },
28    push_service::*,
29    service_address::ServiceIdExt,
30    session_store::SessionStoreExt,
31    unidentified_access::UnidentifiedAccess,
32    utils::serde_service_id,
33    websocket::SignalWebSocket,
34};
35
36pub use crate::proto::ContactDetails;
37
38#[derive(serde::Serialize, Debug)]
39#[serde(rename_all = "camelCase")]
40pub struct OutgoingPushMessage {
41    pub r#type: u32,
42    pub destination_device_id: u32,
43    pub destination_registration_id: u32,
44    pub content: String,
45}
46
47#[derive(serde::Serialize, Debug)]
48pub struct OutgoingPushMessages {
49    #[serde(with = "serde_service_id")]
50    pub destination: ServiceId,
51    pub timestamp: u64,
52    pub messages: Vec<OutgoingPushMessage>,
53    pub online: bool,
54}
55
56#[derive(serde::Deserialize, Debug)]
57#[serde(rename_all = "camelCase")]
58pub struct SendMessageResponse {
59    pub needs_sync: bool,
60}
61
62pub type SendMessageResult = Result<SentMessage, MessageSenderError>;
63
64#[derive(Debug, Clone)]
65pub struct SentMessage {
66    pub recipient: ServiceId,
67    pub used_identity_key: IdentityKey,
68    pub unidentified: bool,
69    pub needs_sync: bool,
70}
71
72/// Attachment specification to be used for uploading.
73///
74/// Loose equivalent of Java's `SignalServiceAttachmentStream`.
75#[derive(Debug, Default)]
76pub struct AttachmentSpec {
77    pub content_type: String,
78    pub length: usize,
79    pub file_name: Option<String>,
80    pub preview: Option<Vec<u8>>,
81    pub voice_note: Option<bool>,
82    pub borderless: Option<bool>,
83    pub width: Option<u32>,
84    pub height: Option<u32>,
85    pub caption: Option<String>,
86    pub blur_hash: Option<String>,
87}
88
89#[derive(Clone)]
90pub struct MessageSender<S> {
91    identified_ws: SignalWebSocket,
92    unidentified_ws: SignalWebSocket,
93    service: PushService,
94    cipher: ServiceCipher<S>,
95    protocol_store: S,
96    local_aci: Aci,
97    local_pni: Pni,
98    aci_identity: IdentityKeyPair,
99    pni_identity: Option<IdentityKeyPair>,
100    device_id: DeviceId,
101}
102
103#[derive(thiserror::Error, Debug)]
104pub enum AttachmentUploadError {
105    #[error("{0}")]
106    ServiceError(#[from] ServiceError),
107
108    #[error("Could not read attachment contents")]
109    IoError(#[from] std::io::Error),
110}
111
112#[derive(thiserror::Error, Debug)]
113pub enum MessageSenderError {
114    #[error("service error: {0}")]
115    ServiceError(#[from] ServiceError),
116
117    #[error("protocol error: {0}")]
118    ProtocolError(#[from] SignalProtocolError),
119
120    #[error("invalid private key: {0}")]
121    InvalidPrivateKey(#[from] CurveError),
122
123    #[error("invalid device ID: {0}")]
124    InvalidDeviceId(#[from] InvalidDeviceId),
125
126    #[error("Failed to upload attachment {0}")]
127    AttachmentUploadError(#[from] AttachmentUploadError),
128
129    #[error("primary device can't send sync message {0:?}")]
130    SendSyncMessageError(sync_message::request::Type),
131
132    #[error("Untrusted identity key with {address:?}")]
133    UntrustedIdentity { address: ServiceId },
134
135    #[error("Exceeded maximum number of retries")]
136    MaximumRetriesLimitExceeded,
137
138    #[error("Proof of type {options:?} required using token {token}")]
139    ProofRequired { token: String, options: Vec<String> },
140
141    #[error("Recipient not found: {service_id:?}")]
142    NotFound { service_id: ServiceId },
143
144    #[error("no messages were encrypted: this should not really happen and most likely implies a logic error")]
145    NoMessagesToSend,
146}
147
148pub type GroupV2Id = [u8; GROUP_IDENTIFIER_LEN];
149
150#[derive(Debug)]
151pub enum ThreadIdentifier {
152    Aci(Uuid),
153    Group(GroupV2Id),
154}
155
156#[derive(Debug)]
157pub struct EncryptedMessages {
158    messages: Vec<OutgoingPushMessage>,
159    used_identity_key: IdentityKey,
160}
161
162impl<S> MessageSender<S>
163where
164    S: ProtocolStore + SenderKeyStore + SessionStoreExt + Sync + Clone,
165{
166    #[allow(clippy::too_many_arguments)]
167    pub fn new(
168        identified_ws: SignalWebSocket,
169        unidentified_ws: SignalWebSocket,
170        service: PushService,
171        cipher: ServiceCipher<S>,
172        protocol_store: S,
173        local_aci: impl Into<Aci>,
174        local_pni: impl Into<Pni>,
175        aci_identity: IdentityKeyPair,
176        pni_identity: Option<IdentityKeyPair>,
177        device_id: DeviceId,
178    ) -> Self {
179        MessageSender {
180            service,
181            identified_ws,
182            unidentified_ws,
183            cipher,
184            protocol_store,
185            local_aci: local_aci.into(),
186            local_pni: local_pni.into(),
187            aci_identity,
188            pni_identity,
189            device_id,
190        }
191    }
192
193    /// Encrypts and uploads an attachment
194    ///
195    /// Contents are accepted as an owned, plain text Vec, because encryption happens in-place.
196    #[tracing::instrument(skip(self, contents, csprng), fields(size = contents.len()))]
197    pub async fn upload_attachment<R: Rng + CryptoRng>(
198        &mut self,
199        spec: AttachmentSpec,
200        mut contents: Vec<u8>,
201        csprng: &mut R,
202    ) -> Result<AttachmentPointer, AttachmentUploadError> {
203        let len = contents.len();
204        // Encrypt
205        let (key, iv) = {
206            let mut key = [0u8; 64];
207            let mut iv = [0u8; 16];
208            csprng.fill_bytes(&mut key);
209            csprng.fill_bytes(&mut iv);
210            (key, iv)
211        };
212
213        // Padded length uses an exponential bracketting thingy.
214        // If you want to see how it looks:
215        // https://www.wolframalpha.com/input/?i=plot+floor%281.05%5Eceil%28log_1.05%28x%29%29%29+for+x+from+0+to+5000000
216        let padded_len: usize = {
217            // Java:
218            // return (int) Math.max(541, Math.floor(Math.pow(1.05, Math.ceil(Math.log(size) / Math.log(1.05)))))
219            std::cmp::max(
220                541,
221                1.05f64.powf((len as f64).log(1.05).ceil()).floor() as usize,
222            )
223        };
224        if padded_len < len {
225            error!(
226                "Padded len {} < len {}. Continuing with a privacy risk.",
227                padded_len, len
228            );
229        } else {
230            contents.resize(padded_len, 0);
231        }
232
233        tracing::trace_span!("encrypting attachment").in_scope(|| {
234            crate::attachment_cipher::encrypt_in_place(iv, key, &mut contents)
235        });
236
237        // Request upload attributes
238        // TODO: we can actually store the upload spec to be able to resume the upload later
239        // if it fails or stalls (= we should at least split the API calls so clients can decide what to do)
240        let attachment_upload_form = self
241            .service
242            .get_attachment_v4_upload_attributes()
243            .instrument(tracing::trace_span!("requesting upload attributes"))
244            .await?;
245
246        let resumable_upload_url = self
247            .service
248            .get_attachment_resumable_upload_url(&attachment_upload_form)
249            .await?;
250
251        let attachment_digest = self
252            .service
253            .upload_attachment_v4(
254                attachment_upload_form.cdn,
255                &resumable_upload_url,
256                contents.len() as u64,
257                attachment_upload_form.headers,
258                &mut std::io::Cursor::new(&contents),
259            )
260            .await?;
261
262        Ok(AttachmentPointer {
263            content_type: Some(spec.content_type),
264            key: Some(key.to_vec()),
265            size: Some(len as u32),
266            // thumbnail: Option<Vec<u8>>,
267            digest: Some(attachment_digest.digest),
268            file_name: spec.file_name,
269            flags: Some(
270                if spec.voice_note == Some(true) {
271                    AttachmentPointerFlags::VoiceMessage as u32
272                } else {
273                    0
274                } | if spec.borderless == Some(true) {
275                    AttachmentPointerFlags::Borderless as u32
276                } else {
277                    0
278                },
279            ),
280            width: spec.width,
281            height: spec.height,
282            caption: spec.caption,
283            blur_hash: spec.blur_hash,
284            upload_timestamp: Some(
285                SystemTime::now()
286                    .duration_since(SystemTime::UNIX_EPOCH)
287                    .expect("unix epoch in the past")
288                    .as_millis() as u64,
289            ),
290            cdn_number: Some(attachment_upload_form.cdn),
291            attachment_identifier: Some(AttachmentIdentifier::CdnKey(
292                attachment_upload_form.key,
293            )),
294            ..Default::default()
295        })
296    }
297
298    /// Upload contact details to the CDN
299    ///
300    /// Returns attachment ID and the attachment digest
301    #[tracing::instrument(skip(self, contacts))]
302    async fn upload_contact_details<Contacts>(
303        &mut self,
304        contacts: Contacts,
305    ) -> Result<AttachmentPointer, AttachmentUploadError>
306    where
307        Contacts: IntoIterator<Item = ContactDetails>,
308    {
309        use prost::Message;
310        let mut out = Vec::new();
311        for contact in contacts {
312            contact
313                .encode_length_delimited(&mut out)
314                .expect("infallible encoding");
315            // XXX add avatar here
316        }
317
318        let spec = AttachmentSpec {
319            content_type: "application/octet-stream".into(),
320            length: out.len(),
321            file_name: None,
322            preview: None,
323            voice_note: None,
324            borderless: None,
325            width: None,
326            height: None,
327            caption: None,
328            blur_hash: None,
329        };
330        self.upload_attachment(spec, out, &mut rng()).await
331    }
332
333    /// Return whether we have to prepare sync messages for other devices
334    ///
335    /// - If we are the main registered device, and there are established sub-device sessions (linked clients), return true
336    /// - If we are a secondary linked device, return true
337    async fn is_multi_device(&self) -> bool {
338        if self.device_id == *DEFAULT_DEVICE_ID {
339            self.protocol_store
340                .get_sub_device_sessions(&self.local_aci.into())
341                .await
342                .is_ok_and(|s| !s.is_empty())
343        } else {
344            true
345        }
346    }
347
348    /// Send a message `content` to a single `recipient`.
349    #[tracing::instrument(
350        skip(self, unidentified_access, message),
351        fields(unidentified_access = unidentified_access.is_some(), recipient = recipient.service_id_string()),
352    )]
353    pub async fn send_message(
354        &mut self,
355        recipient: &ServiceId,
356        mut unidentified_access: Option<UnidentifiedAccess>,
357        message: impl Into<ContentBody>,
358        timestamp: u64,
359        include_pni_signature: bool,
360        online: bool,
361    ) -> SendMessageResult {
362        let content_body = message.into();
363        let message_to_self = recipient == &self.local_aci;
364        let sync_message =
365            matches!(content_body, ContentBody::SynchronizeMessage(..));
366        let is_multi_device = self.is_multi_device().await;
367
368        use crate::proto::data_message::Flags;
369
370        let end_session = match &content_body {
371            ContentBody::DataMessage(message) => {
372                message.flags == Some(Flags::EndSession as u32)
373            },
374            _ => false,
375        };
376
377        // only send a sync message when sending to self and skip the rest of the process
378        if message_to_self && is_multi_device && !sync_message {
379            debug!("sending note to self");
380            if let Some(sync_message) = self
381                .create_multi_device_sent_transcript_content(
382                    Some(recipient),
383                    content_body,
384                    timestamp,
385                    None,
386                )
387            {
388                return self
389                    .try_send_message(
390                        *recipient,
391                        None,
392                        &sync_message,
393                        timestamp,
394                        include_pni_signature,
395                        online,
396                    )
397                    .await;
398            } else {
399                error!("could not create sync message from message to self");
400                return SendMessageResult::Err(
401                    MessageSenderError::NoMessagesToSend,
402                );
403            }
404        }
405
406        // don't send session enders as sealed sender
407        // sync messages are never sent as unidentified (reasons unclear), see: https://github.com/signalapp/Signal-Android/blob/main/libsignal-service/src/main/java/org/whispersystems/signalservice/api/SignalServiceMessageSender.java#L779
408        if end_session || sync_message {
409            unidentified_access.take();
410        }
411
412        // try to send the original message to all the recipient's devices
413        let result = self
414            .try_send_message(
415                *recipient,
416                unidentified_access.as_ref(),
417                &content_body,
418                timestamp,
419                include_pni_signature,
420                online,
421            )
422            .await;
423
424        let needs_sync = match &result {
425            Ok(SentMessage { needs_sync, .. }) => *needs_sync,
426            _ => false,
427        };
428
429        if needs_sync || is_multi_device {
430            debug!("sending multi-device sync message");
431            if let Some(sync) = if sync_message {
432                Some(content_body)
433            } else {
434                self.create_multi_device_sent_transcript_content(
435                    Some(recipient),
436                    content_body,
437                    timestamp,
438                    Some(&result),
439                )
440            } {
441                self.try_send_message(
442                    self.local_aci.into(),
443                    None,
444                    &sync,
445                    timestamp,
446                    false,
447                    false,
448                )
449                .await?;
450            } else {
451                error!("could not create sync message from a direct message");
452            }
453        }
454
455        if end_session {
456            let n = self.protocol_store.delete_all_sessions(recipient).await?;
457            tracing::debug!(
458                "ended {} sessions with {}",
459                n,
460                recipient.raw_uuid()
461            );
462        }
463
464        result
465    }
466
467    /// Send a message to the recipients in a group.
468    ///
469    /// Recipients are a list of tuples, each containing:
470    /// - The recipient's address
471    /// - The recipient's unidentified access
472    /// - Whether the recipient requires a PNI signature
473    #[tracing::instrument(
474        skip(self, recipients, message),
475        fields(recipients = recipients.as_ref().len()),
476    )]
477    pub async fn send_message_to_group(
478        &mut self,
479        recipients: impl AsRef<[(ServiceId, Option<UnidentifiedAccess>, bool)]>,
480        message: impl Into<ContentBody>,
481        timestamp: u64,
482        online: bool,
483    ) -> Vec<SendMessageResult> {
484        let content_body: ContentBody = message.into();
485        let mut results = vec![];
486
487        let mut needs_sync_in_results = false;
488
489        for (recipient, unidentified_access, include_pni_signature) in
490            recipients.as_ref()
491        {
492            let result = self
493                .try_send_message(
494                    *recipient,
495                    unidentified_access.as_ref(),
496                    &content_body,
497                    timestamp,
498                    *include_pni_signature,
499                    online,
500                )
501                .await;
502
503            match result {
504                Ok(SentMessage { needs_sync, .. }) if needs_sync => {
505                    needs_sync_in_results = true;
506                },
507                _ => (),
508            };
509
510            results.push(result);
511        }
512
513        // we only need to send a synchronization message once
514        if needs_sync_in_results || self.is_multi_device().await {
515            if let Some(sync_message) = self
516                .create_multi_device_sent_transcript_content(
517                    None,
518                    content_body.clone(),
519                    timestamp,
520                    &results,
521                )
522            {
523                // Note: the result of sending a sync message is not included in results
524                // See Signal Android `SignalServiceMessageSender.java:2817`
525                if let Err(error) = self
526                    .try_send_message(
527                        self.local_aci.into(),
528                        None,
529                        &sync_message,
530                        timestamp,
531                        false, // XXX: maybe the sync device does want a PNI signature?
532                        false,
533                    )
534                    .await
535                {
536                    error!(%error, "failed to send a synchronization message");
537                }
538            } else {
539                error!("could not create sync message from a group message")
540            }
541        }
542
543        results
544    }
545
546    /// Send a message (`content`) to an address (`recipient`).
547    #[tracing::instrument(
548        level = "trace",
549        skip(self, unidentified_access, content_body, recipient),
550        fields(unidentified_access = unidentified_access.is_some(), recipient = recipient.service_id_string()),
551    )]
552    async fn try_send_message(
553        &mut self,
554        recipient: ServiceId,
555        mut unidentified_access: Option<&UnidentifiedAccess>,
556        content_body: &ContentBody,
557        timestamp: u64,
558        include_pni_signature: bool,
559        online: bool,
560    ) -> SendMessageResult {
561        trace!("trying to send a message");
562
563        use prost::Message;
564
565        let mut content = content_body.clone().into_proto();
566        if include_pni_signature {
567            content.pni_signature_message = Some(self.create_pni_signature()?);
568        }
569
570        let content_bytes = content.encode_to_vec();
571
572        let mut rng = rng();
573
574        for _ in 0..4u8 {
575            let Some(EncryptedMessages {
576                messages,
577                used_identity_key,
578            }) = self
579                .create_encrypted_messages(
580                    &recipient,
581                    unidentified_access.map(|x| &x.certificate),
582                    &content_bytes,
583                )
584                .await?
585            else {
586                // this can happen for example when a device is primary, without any secondaries
587                // and we send a message to ourselves (which is only a SyncMessage { sent: ... })
588                // addressed to self
589                return Err(MessageSenderError::NoMessagesToSend);
590            };
591
592            let messages = OutgoingPushMessages {
593                destination: recipient,
594                timestamp,
595                messages,
596                online,
597            };
598
599            let send = if let Some(unidentified) = &unidentified_access {
600                tracing::debug!("sending via unidentified");
601                self.unidentified_ws
602                    .send_messages_unidentified(messages, unidentified)
603                    .await
604            } else {
605                tracing::debug!("sending identified");
606                self.identified_ws.send_messages(messages).await
607            };
608
609            match send {
610                Ok(SendMessageResponse { needs_sync }) => {
611                    tracing::debug!("message sent!");
612                    return Ok(SentMessage {
613                        recipient,
614                        used_identity_key,
615                        unidentified: unidentified_access.is_some(),
616                        needs_sync,
617                    });
618                },
619                Err(ServiceError::Unauthorized)
620                    if unidentified_access.is_some() =>
621                {
622                    tracing::trace!("unauthorized error using unidentified; retry over identified");
623                    unidentified_access = None;
624                },
625                Err(ServiceError::MismatchedDevicesException(ref m)) => {
626                    tracing::debug!("{:?}", m);
627                    for extra_device_id in &m.extra_devices {
628                        tracing::debug!(
629                            "dropping session with device {}",
630                            extra_device_id
631                        );
632                        self.protocol_store
633                            .delete_service_addr_device_session(
634                                &recipient.to_protocol_address(
635                                    (*extra_device_id).try_into()?,
636                                )?,
637                            )
638                            .await?;
639                    }
640
641                    for missing_device_id in &m.missing_devices {
642                        tracing::debug!(
643                            "creating session with missing device {}",
644                            missing_device_id
645                        );
646                        let remote_address = recipient.to_protocol_address(
647                            (*missing_device_id).try_into()?,
648                        )?;
649                        let pre_key = self
650                            .service
651                            .get_pre_key(&recipient, *missing_device_id)
652                            .await?;
653
654                        process_prekey_bundle(
655                            &remote_address,
656                            &mut self.protocol_store.clone(),
657                            &mut self.protocol_store,
658                            &pre_key,
659                            SystemTime::now(),
660                            &mut rng,
661                        )
662                        .await
663                        .map_err(|e| {
664                            error!("failed to create session: {}", e);
665                            MessageSenderError::UntrustedIdentity {
666                                address: recipient,
667                            }
668                        })?;
669                    }
670                },
671                Err(ServiceError::StaleDevices(ref m)) => {
672                    tracing::debug!("{:?}", m);
673                    for extra_device_id in &m.stale_devices {
674                        tracing::debug!(
675                            "dropping session with device {}",
676                            extra_device_id
677                        );
678                        self.protocol_store
679                            .delete_service_addr_device_session(
680                                &recipient.to_protocol_address(
681                                    (*extra_device_id).try_into()?,
682                                )?,
683                            )
684                            .await?;
685                    }
686                },
687                Err(ServiceError::ProofRequiredError(ref p)) => {
688                    tracing::debug!("{:?}", p);
689                    return Err(MessageSenderError::ProofRequired {
690                        token: p.token.clone(),
691                        options: p.options.clone(),
692                    });
693                },
694                Err(ServiceError::NotFoundError) => {
695                    tracing::debug!("Not found when sending a message");
696                    return Err(MessageSenderError::NotFound {
697                        service_id: recipient,
698                    });
699                },
700                Err(e) => {
701                    tracing::debug!(
702                        "Default error handler for ws.send_messages: {}",
703                        e
704                    );
705                    return Err(MessageSenderError::ServiceError(e));
706                },
707            }
708        }
709
710        Err(MessageSenderError::MaximumRetriesLimitExceeded)
711    }
712
713    /// Upload contact details to the CDN and send a sync message
714    #[tracing::instrument(
715        skip(self, unidentified_access, contacts, recipient),
716        fields(unidentified_access = unidentified_access.is_some(), recipient = recipient.service_id_string()),
717    )]
718    pub async fn send_contact_details<Contacts>(
719        &mut self,
720        recipient: &ServiceId,
721        unidentified_access: Option<UnidentifiedAccess>,
722        // XXX It may be interesting to use an intermediary type,
723        //     instead of ContactDetails directly,
724        //     because it allows us to add the avatar content.
725        contacts: Contacts,
726        online: bool,
727        complete: bool,
728    ) -> Result<(), MessageSenderError>
729    where
730        Contacts: IntoIterator<Item = ContactDetails>,
731    {
732        let ptr = self.upload_contact_details(contacts).await?;
733
734        let msg = SyncMessage {
735            contacts: Some(sync_message::Contacts {
736                blob: Some(ptr),
737                complete: Some(complete),
738            }),
739            ..SyncMessage::with_padding(&mut rng())
740        };
741
742        self.send_sync_message(msg).await?;
743
744        Ok(())
745    }
746
747    /// Send `MessageRequestResponse` synchronization message with either a recipient ACI or a GroupV2 ID
748    #[tracing::instrument(skip(self), fields(recipient = recipient.service_id_string()))]
749    pub async fn send_message_request_response(
750        &mut self,
751        recipient: &ServiceId,
752        thread: &ThreadIdentifier,
753        action: message_request_response::Type,
754    ) -> Result<(), MessageSenderError> {
755        let message_request_response = Some(match thread {
756            ThreadIdentifier::Aci(aci) => {
757                tracing::debug!(
758                    "sending message request response {:?} for recipient {:?}",
759                    action,
760                    aci
761                );
762                MessageRequestResponse {
763                    thread_aci: Some(aci.to_string()),
764                    group_id: None,
765                    r#type: Some(action.into()),
766                }
767            },
768            ThreadIdentifier::Group(id) => {
769                tracing::debug!(
770                    "sending message request response {:?} for group {:?}",
771                    action,
772                    id
773                );
774                MessageRequestResponse {
775                    thread_aci: None,
776                    group_id: Some(id.to_vec()),
777                    r#type: Some(action.into()),
778                }
779            },
780        });
781
782        let msg = SyncMessage {
783            message_request_response,
784            ..SyncMessage::with_padding(&mut rng())
785        };
786
787        let ts = Utc::now().timestamp_millis() as u64;
788        self.send_message(recipient, None, msg, ts, false, false)
789            .await?;
790
791        Ok(())
792    }
793
794    /// Send a `SyncMessage` to own devices, if any.
795    pub async fn send_sync_message(
796        &mut self,
797        sync: SyncMessage,
798    ) -> Result<(), MessageSenderError> {
799        if self.is_multi_device().await {
800            let content = sync.into();
801            let timestamp = Utc::now().timestamp_millis() as u64;
802            debug!(
803                "sending multi-device sync message with content {content:?}"
804            );
805            self.try_send_message(
806                self.local_aci.into(),
807                None,
808                &content,
809                timestamp,
810                false,
811                false,
812            )
813            .await?;
814        }
815        Ok(())
816    }
817
818    /// Send a `SyncMessage` request message
819    #[tracing::instrument(skip(self))]
820    pub async fn send_sync_message_request(
821        &mut self,
822        recipient: &ServiceId,
823        request_type: sync_message::request::Type,
824    ) -> Result<(), MessageSenderError> {
825        if self.device_id == *DEFAULT_DEVICE_ID {
826            return Err(MessageSenderError::SendSyncMessageError(request_type));
827        }
828
829        let msg = SyncMessage {
830            request: Some(sync_message::Request {
831                r#type: Some(request_type.into()),
832            }),
833            ..SyncMessage::with_padding(&mut rng())
834        };
835        self.send_sync_message(msg).await?;
836
837        Ok(())
838    }
839
840    #[expect(clippy::result_large_err)]
841    #[tracing::instrument(level = "trace", skip(self))]
842    fn create_pni_signature(
843        &mut self,
844    ) -> Result<crate::proto::PniSignatureMessage, MessageSenderError> {
845        let mut rng = rng();
846        let signature = self
847            .pni_identity
848            .expect("PNI key set when PNI signature requested")
849            .sign_alternate_identity(
850                self.aci_identity.identity_key(),
851                &mut rng,
852            )?;
853        Ok(crate::proto::PniSignatureMessage {
854            pni: Some(self.local_pni.service_id_binary()),
855            signature: Some(signature.into()),
856        })
857    }
858
859    // Equivalent with `getEncryptedMessages`
860    #[tracing::instrument(
861        level = "trace",
862        skip(self, unidentified_access, content),
863        fields(unidentified_access = unidentified_access.is_some(), recipient = recipient.service_id_string()),
864    )]
865    async fn create_encrypted_messages(
866        &mut self,
867        recipient: &ServiceId,
868        unidentified_access: Option<&SenderCertificate>,
869        content: &[u8],
870    ) -> Result<Option<EncryptedMessages>, MessageSenderError> {
871        let mut messages = vec![];
872
873        let mut devices: HashSet<DeviceId> = self
874            .protocol_store
875            .get_sub_device_sessions(recipient)
876            .await?
877            .into_iter()
878            .collect();
879
880        // always send to the primary device no matter what
881        devices.insert(*DEFAULT_DEVICE_ID);
882
883        // never try to send messages to the sender device
884        match recipient {
885            ServiceId::Aci(aci) => {
886                if *aci == self.local_aci {
887                    devices.remove(&self.device_id);
888                }
889            },
890            ServiceId::Pni(pni) => {
891                if *pni == self.local_pni {
892                    devices.remove(&self.device_id);
893                }
894            },
895        };
896
897        for device_id in devices {
898            trace!("sending message to device {}", device_id);
899            // `create_encrypted_message` may fail with `SessionNotFound` if the session is corrupted;
900            // see https://github.com/whisperfish/libsignal-client/commit/601454d20.
901            // If this happens, delete the session and retry.
902            for _attempt in 0..2 {
903                match self
904                    .create_encrypted_message(
905                        recipient,
906                        unidentified_access,
907                        device_id,
908                        content,
909                    )
910                    .await
911                {
912                    Ok(message) => {
913                        messages.push(message);
914                        break;
915                    },
916                    Err(MessageSenderError::ServiceError(
917                        ServiceError::SignalProtocolError(
918                            SignalProtocolError::SessionNotFound(addr),
919                        ),
920                    )) => {
921                        // SessionNotFound is returned on certain session corruption.
922                        // Since delete_session *creates* a session if it doesn't exist,
923                        // the NotFound error is an indicator of session corruption.
924                        // Try to delete this session, if it gets succesfully deleted, retry.  Otherwise, fail.
925                        tracing::warn!("Potential session corruption for {}, deleting session", addr);
926                        match self.protocol_store.delete_session(&addr).await {
927                            Ok(()) => continue,
928                            Err(error) => {
929                                tracing::warn!(%error, %addr, "failed to delete session");
930                                return Err(
931                                    SignalProtocolError::SessionNotFound(addr)
932                                        .into(),
933                                );
934                            },
935                        }
936                    },
937                    Err(e) => return Err(e),
938                }
939            }
940        }
941
942        if messages.is_empty() {
943            Ok(None)
944        } else {
945            Ok(Some(EncryptedMessages {
946                messages,
947                used_identity_key: self
948                    .protocol_store
949                    .get_identity(
950                        &recipient.to_protocol_address(*DEFAULT_DEVICE_ID),
951                    )
952                    .await?
953                    .ok_or(MessageSenderError::UntrustedIdentity {
954                        address: *recipient,
955                    })?,
956            }))
957        }
958    }
959
960    /// Equivalent to `getEncryptedMessage`
961    ///
962    /// When no session with the recipient exists, we need to create one.
963    #[tracing::instrument(
964        level = "trace",
965        skip(self, unidentified_access, content),
966        fields(unidentified_access = unidentified_access.is_some(), recipient = recipient.service_id_string()),
967    )]
968    pub(crate) async fn create_encrypted_message(
969        &mut self,
970        recipient: &ServiceId,
971        unidentified_access: Option<&SenderCertificate>,
972        device_id: DeviceId,
973        content: &[u8],
974    ) -> Result<OutgoingPushMessage, MessageSenderError> {
975        let recipient_protocol_address =
976            recipient.to_protocol_address(device_id);
977
978        tracing::trace!(
979            "encrypting message for {}",
980            recipient_protocol_address
981        );
982
983        // establish a session with the recipient/device if necessary
984        // no need to establish a session with ourselves (and our own current device)
985        if self
986            .protocol_store
987            .load_session(&recipient_protocol_address)
988            .await?
989            .is_none()
990        {
991            info!(
992                "establishing new session with {}",
993                recipient_protocol_address
994            );
995            let pre_keys = match self
996                .service
997                .get_pre_keys(recipient, device_id.into())
998                .await
999            {
1000                Ok(ok) => {
1001                    tracing::trace!("Get prekeys OK");
1002                    ok
1003                },
1004                Err(ServiceError::NotFoundError) => {
1005                    return Err(MessageSenderError::NotFound {
1006                        service_id: *recipient,
1007                    });
1008                },
1009                Err(e) => Err(e)?,
1010            };
1011
1012            let mut rng = rng();
1013
1014            for pre_key_bundle in pre_keys {
1015                if recipient == &self.local_aci
1016                    && self.device_id == pre_key_bundle.device_id()?
1017                {
1018                    trace!("not establishing a session with myself!");
1019                    continue;
1020                }
1021
1022                let pre_key_address = get_preferred_protocol_address(
1023                    &self.protocol_store,
1024                    recipient,
1025                    pre_key_bundle.device_id()?,
1026                )
1027                .await?;
1028
1029                process_prekey_bundle(
1030                    &pre_key_address,
1031                    &mut self.protocol_store.clone(),
1032                    &mut self.protocol_store,
1033                    &pre_key_bundle,
1034                    SystemTime::now(),
1035                    &mut rng,
1036                )
1037                .await?;
1038            }
1039        }
1040
1041        let message = self
1042            .cipher
1043            .encrypt(
1044                &recipient_protocol_address,
1045                unidentified_access,
1046                content,
1047                &mut rng(),
1048            )
1049            .instrument(tracing::trace_span!("encrypting message"))
1050            .await?;
1051
1052        Ok(message)
1053    }
1054
1055    fn create_multi_device_sent_transcript_content<'a>(
1056        &mut self,
1057        recipient: Option<&ServiceId>,
1058        content_body: ContentBody,
1059        timestamp: u64,
1060        send_message_results: impl IntoIterator<Item = &'a SendMessageResult>,
1061    ) -> Option<ContentBody> {
1062        use sync_message::sent::UnidentifiedDeliveryStatus;
1063        let (message, edit_message) = match content_body {
1064            ContentBody::DataMessage(m) => (Some(m), None),
1065            ContentBody::EditMessage(m) => (None, Some(m)),
1066            _ => return None,
1067        };
1068        let unidentified_status: Vec<UnidentifiedDeliveryStatus> =
1069            send_message_results
1070                .into_iter()
1071                .filter_map(|result| result.as_ref().ok())
1072                .map(|sent| {
1073                    let SentMessage {
1074                        recipient,
1075                        unidentified,
1076                        used_identity_key,
1077                        ..
1078                    } = sent;
1079                    UnidentifiedDeliveryStatus {
1080                        destination_service_id: Some(
1081                            recipient.service_id_string(),
1082                        ),
1083                        unidentified: Some(*unidentified),
1084                        destination_pni_identity_key: Some(
1085                            used_identity_key.serialize().into(),
1086                        ),
1087                    }
1088                })
1089                .collect();
1090        Some(ContentBody::SynchronizeMessage(SyncMessage {
1091            sent: Some(sync_message::Sent {
1092                destination_service_id: recipient
1093                    .map(ServiceId::service_id_string),
1094                destination_e164: None,
1095                expiration_start_timestamp: message
1096                    .as_ref()
1097                    .and_then(|m| m.expire_timer)
1098                    .map(|_| timestamp),
1099                message,
1100                edit_message,
1101                timestamp: Some(timestamp),
1102                unidentified_status,
1103                ..Default::default()
1104            }),
1105            ..SyncMessage::with_padding(&mut rng())
1106        }))
1107    }
1108}