libsignal_service/
sender.rs

1use std::{collections::HashSet, time::SystemTime};
2
3use chrono::prelude::*;
4use libsignal_core::{curve::CurveError, InvalidDeviceId};
5use libsignal_protocol::{
6    process_prekey_bundle, Aci, DeviceId, IdentityKey, IdentityKeyPair, Pni,
7    ProtocolStore, SenderCertificate, SenderKeyStore, ServiceId,
8    SignalProtocolError,
9};
10use rand::{rng, CryptoRng, Rng};
11use tracing::{debug, error, info, trace, warn};
12use tracing_futures::Instrument;
13use uuid::Uuid;
14use zkgroup::GROUP_IDENTIFIER_LEN;
15
16use crate::{
17    cipher::{get_preferred_protocol_address, ServiceCipher},
18    content::ContentBody,
19    proto::{
20        attachment_pointer::{
21            AttachmentIdentifier, Flags as AttachmentPointerFlags,
22        },
23        sync_message::{
24            self, message_request_response, MessageRequestResponse,
25        },
26        AttachmentPointer, SyncMessage,
27    },
28    push_service::*,
29    service_address::ServiceIdExt,
30    session_store::SessionStoreExt,
31    unidentified_access::UnidentifiedAccess,
32    utils::{serde_device_id, serde_service_id},
33    websocket::{self, SignalWebSocket},
34};
35
36pub use crate::proto::ContactDetails;
37
38#[derive(serde::Serialize, Debug)]
39#[serde(rename_all = "camelCase")]
40pub struct OutgoingPushMessage {
41    pub r#type: u32,
42    #[serde(with = "serde_device_id")]
43    pub destination_device_id: DeviceId,
44    pub destination_registration_id: u32,
45    pub content: String,
46}
47
48#[derive(serde::Serialize, Debug)]
49pub struct OutgoingPushMessages {
50    #[serde(with = "serde_service_id")]
51    pub destination: ServiceId,
52    pub timestamp: u64,
53    pub messages: Vec<OutgoingPushMessage>,
54    pub online: bool,
55}
56
57#[derive(serde::Deserialize, Debug)]
58#[serde(rename_all = "camelCase")]
59pub struct SendMessageResponse {
60    pub needs_sync: bool,
61}
62
63pub type SendMessageResult = Result<SentMessage, MessageSenderError>;
64
65#[derive(Debug, Clone)]
66pub struct SentMessage {
67    pub recipient: ServiceId,
68    pub used_identity_key: IdentityKey,
69    pub unidentified: bool,
70    pub needs_sync: bool,
71}
72
73/// Attachment specification to be used for uploading.
74///
75/// Loose equivalent of Java's `SignalServiceAttachmentStream`.
76#[derive(Debug, Default)]
77pub struct AttachmentSpec {
78    pub content_type: String,
79    pub length: usize,
80    pub file_name: Option<String>,
81    pub preview: Option<Vec<u8>>,
82    pub voice_note: Option<bool>,
83    pub borderless: Option<bool>,
84    pub width: Option<u32>,
85    pub height: Option<u32>,
86    pub caption: Option<String>,
87    pub blur_hash: Option<String>,
88}
89
90#[derive(Clone)]
91pub struct MessageSender<S> {
92    identified_ws: SignalWebSocket<websocket::Identified>,
93    unidentified_ws: SignalWebSocket<websocket::Unidentified>,
94    service: PushService,
95    cipher: ServiceCipher<S>,
96    protocol_store: S,
97    local_aci: Aci,
98    local_pni: Pni,
99    aci_identity: IdentityKeyPair,
100    pni_identity: Option<IdentityKeyPair>,
101    device_id: DeviceId,
102}
103
104#[derive(thiserror::Error, Debug)]
105pub enum AttachmentUploadError {
106    #[error("{0}")]
107    ServiceError(#[from] ServiceError),
108
109    #[error("Could not read attachment contents")]
110    IoError(#[from] std::io::Error),
111}
112
113#[derive(thiserror::Error, Debug)]
114pub enum MessageSenderError {
115    #[error("service error: {0}")]
116    ServiceError(#[from] ServiceError),
117
118    #[error("protocol error: {0}")]
119    ProtocolError(#[from] SignalProtocolError),
120
121    #[error("invalid private key: {0}")]
122    InvalidPrivateKey(#[from] CurveError),
123
124    #[error("invalid device ID: {0}")]
125    InvalidDeviceId(#[from] InvalidDeviceId),
126
127    #[error("Failed to upload attachment {0}")]
128    AttachmentUploadError(#[from] AttachmentUploadError),
129
130    #[error("primary device can't send sync message {0:?}")]
131    SendSyncMessageError(sync_message::request::Type),
132
133    #[error("Untrusted identity key with {address:?}")]
134    UntrustedIdentity { address: ServiceId },
135
136    #[error("Exceeded maximum number of retries")]
137    MaximumRetriesLimitExceeded,
138
139    #[error("Proof of type {options:?} required using token {token}")]
140    ProofRequired { token: String, options: Vec<String> },
141
142    #[error("Recipient not found: {service_id:?}")]
143    NotFound { service_id: ServiceId },
144
145    #[error("no messages were encrypted: this should not really happen and most likely implies a logic error")]
146    NoMessagesToSend,
147}
148
149pub type GroupV2Id = [u8; GROUP_IDENTIFIER_LEN];
150
151#[derive(Debug)]
152pub enum ThreadIdentifier {
153    Aci(Uuid),
154    Group(GroupV2Id),
155}
156
157#[derive(Debug)]
158pub struct EncryptedMessages {
159    messages: Vec<OutgoingPushMessage>,
160    used_identity_key: IdentityKey,
161}
162
163impl<S> MessageSender<S>
164where
165    S: ProtocolStore + SenderKeyStore + SessionStoreExt + Sync + Clone,
166{
167    #[allow(clippy::too_many_arguments)]
168    pub fn new(
169        identified_ws: SignalWebSocket<websocket::Identified>,
170        unidentified_ws: SignalWebSocket<websocket::Unidentified>,
171        service: PushService,
172        cipher: ServiceCipher<S>,
173        protocol_store: S,
174        local_aci: impl Into<Aci>,
175        local_pni: impl Into<Pni>,
176        aci_identity: IdentityKeyPair,
177        pni_identity: Option<IdentityKeyPair>,
178        device_id: DeviceId,
179    ) -> Self {
180        MessageSender {
181            service,
182            identified_ws,
183            unidentified_ws,
184            cipher,
185            protocol_store,
186            local_aci: local_aci.into(),
187            local_pni: local_pni.into(),
188            aci_identity,
189            pni_identity,
190            device_id,
191        }
192    }
193
194    /// Encrypts and uploads an attachment
195    ///
196    /// Contents are accepted as an owned, plain text Vec, because encryption happens in-place.
197    #[tracing::instrument(skip(self, contents, csprng), fields(size = contents.len()))]
198    pub async fn upload_attachment<R: Rng + CryptoRng>(
199        &mut self,
200        spec: AttachmentSpec,
201        mut contents: Vec<u8>,
202        csprng: &mut R,
203    ) -> Result<AttachmentPointer, AttachmentUploadError> {
204        let len = contents.len();
205        // Encrypt
206        let (key, iv) = {
207            let mut key = [0u8; 64];
208            let mut iv = [0u8; 16];
209            csprng.fill_bytes(&mut key);
210            csprng.fill_bytes(&mut iv);
211            (key, iv)
212        };
213
214        // Padded length uses an exponential bracketting thingy.
215        // If you want to see how it looks:
216        // https://www.wolframalpha.com/input/?i=plot+floor%281.05%5Eceil%28log_1.05%28x%29%29%29+for+x+from+0+to+5000000
217        let padded_len: usize = {
218            // Java:
219            // return (int) Math.max(541, Math.floor(Math.pow(1.05, Math.ceil(Math.log(size) / Math.log(1.05)))))
220            std::cmp::max(
221                541,
222                1.05f64.powf((len as f64).log(1.05).ceil()).floor() as usize,
223            )
224        };
225        if padded_len < len {
226            error!(
227                "Padded len {} < len {}. Continuing with a privacy risk.",
228                padded_len, len
229            );
230        } else {
231            contents.resize(padded_len, 0);
232        }
233
234        tracing::trace_span!("encrypting attachment").in_scope(|| {
235            crate::attachment_cipher::encrypt_in_place(iv, key, &mut contents)
236        });
237
238        // Request upload attributes
239        // TODO: we can actually store the upload spec to be able to resume the upload later
240        // if it fails or stalls (= we should at least split the API calls so clients can decide what to do)
241        let attachment_upload_form = self
242            .service
243            .get_attachment_v4_upload_attributes()
244            .instrument(tracing::trace_span!("requesting upload attributes"))
245            .await?;
246
247        let resumable_upload_url = self
248            .service
249            .get_attachment_resumable_upload_url(&attachment_upload_form)
250            .await?;
251
252        let attachment_digest = self
253            .service
254            .upload_attachment_v4(
255                attachment_upload_form.cdn,
256                &resumable_upload_url,
257                contents.len() as u64,
258                attachment_upload_form.headers,
259                &mut std::io::Cursor::new(&contents),
260            )
261            .await?;
262
263        Ok(AttachmentPointer {
264            content_type: Some(spec.content_type),
265            key: Some(key.to_vec()),
266            size: Some(len as u32),
267            // thumbnail: Option<Vec<u8>>,
268            digest: Some(attachment_digest.digest),
269            file_name: spec.file_name,
270            flags: Some(
271                if spec.voice_note == Some(true) {
272                    AttachmentPointerFlags::VoiceMessage as u32
273                } else {
274                    0
275                } | if spec.borderless == Some(true) {
276                    AttachmentPointerFlags::Borderless as u32
277                } else {
278                    0
279                },
280            ),
281            width: spec.width,
282            height: spec.height,
283            caption: spec.caption,
284            blur_hash: spec.blur_hash,
285            upload_timestamp: Some(
286                SystemTime::now()
287                    .duration_since(SystemTime::UNIX_EPOCH)
288                    .expect("unix epoch in the past")
289                    .as_millis() as u64,
290            ),
291            cdn_number: Some(attachment_upload_form.cdn),
292            attachment_identifier: Some(AttachmentIdentifier::CdnKey(
293                attachment_upload_form.key,
294            )),
295            ..Default::default()
296        })
297    }
298
299    /// Upload contact details to the CDN
300    ///
301    /// Returns attachment ID and the attachment digest
302    #[tracing::instrument(skip(self, contacts))]
303    async fn upload_contact_details<Contacts>(
304        &mut self,
305        contacts: Contacts,
306    ) -> Result<AttachmentPointer, AttachmentUploadError>
307    where
308        Contacts: IntoIterator<Item = ContactDetails>,
309    {
310        use prost::Message;
311        let mut out = Vec::new();
312        for contact in contacts {
313            contact
314                .encode_length_delimited(&mut out)
315                .expect("infallible encoding");
316            // XXX add avatar here
317        }
318
319        let spec = AttachmentSpec {
320            content_type: "application/octet-stream".into(),
321            length: out.len(),
322            file_name: None,
323            preview: None,
324            voice_note: None,
325            borderless: None,
326            width: None,
327            height: None,
328            caption: None,
329            blur_hash: None,
330        };
331        self.upload_attachment(spec, out, &mut rng()).await
332    }
333
334    /// Return whether we have to prepare sync messages for other devices
335    ///
336    /// - If we are the main registered device, and there are established sub-device sessions (linked clients), return true
337    /// - If we are a secondary linked device, return true
338    async fn is_multi_device(&self) -> bool {
339        if self.device_id == *DEFAULT_DEVICE_ID {
340            self.protocol_store
341                .get_sub_device_sessions(&self.local_aci.into())
342                .await
343                .is_ok_and(|s| !s.is_empty())
344        } else {
345            true
346        }
347    }
348
349    /// Send a message `content` to a single `recipient`.
350    #[tracing::instrument(
351        skip(self, unidentified_access, message),
352        fields(unidentified_access = unidentified_access.is_some(), recipient = recipient.service_id_string()),
353    )]
354    pub async fn send_message(
355        &mut self,
356        recipient: &ServiceId,
357        mut unidentified_access: Option<UnidentifiedAccess>,
358        message: impl Into<ContentBody>,
359        timestamp: u64,
360        include_pni_signature: bool,
361        online: bool,
362    ) -> SendMessageResult {
363        let content_body = message.into();
364        let message_to_self = recipient == &self.local_aci;
365        let sync_message =
366            matches!(content_body, ContentBody::SynchronizeMessage(..));
367        let is_multi_device = self.is_multi_device().await;
368
369        use crate::proto::data_message::Flags;
370
371        let end_session = match &content_body {
372            ContentBody::DataMessage(message) => {
373                message.flags == Some(Flags::EndSession as u32)
374            },
375            _ => false,
376        };
377
378        // only send a sync message when sending to self and skip the rest of the process
379        if message_to_self && is_multi_device && !sync_message {
380            debug!("sending note to self");
381            if let Some(sync_message) = self
382                .create_multi_device_sent_transcript_content(
383                    Some(recipient),
384                    content_body,
385                    timestamp,
386                    None,
387                )
388            {
389                return self
390                    .try_send_message(
391                        *recipient,
392                        None,
393                        &sync_message,
394                        timestamp,
395                        include_pni_signature,
396                        online,
397                    )
398                    .await;
399            } else {
400                error!("could not create sync message from message to self");
401                return SendMessageResult::Err(
402                    MessageSenderError::NoMessagesToSend,
403                );
404            }
405        }
406
407        // don't send session enders as sealed sender
408        // sync messages are never sent as unidentified (reasons unclear), see: https://github.com/signalapp/Signal-Android/blob/main/libsignal-service/src/main/java/org/whispersystems/signalservice/api/SignalServiceMessageSender.java#L779
409        if end_session || sync_message {
410            unidentified_access.take();
411        }
412
413        // try to send the original message to all the recipient's devices
414        let result = self
415            .try_send_message(
416                *recipient,
417                unidentified_access.as_ref(),
418                &content_body,
419                timestamp,
420                include_pni_signature,
421                online,
422            )
423            .await;
424
425        let needs_sync = match &result {
426            Ok(SentMessage { needs_sync, .. }) => *needs_sync,
427            _ => false,
428        };
429
430        if needs_sync || is_multi_device {
431            debug!("sending multi-device sync message");
432            if let Some(sync) = if sync_message {
433                Some(content_body)
434            } else {
435                self.create_multi_device_sent_transcript_content(
436                    Some(recipient),
437                    content_body,
438                    timestamp,
439                    Some(&result),
440                )
441            } {
442                self.try_send_message(
443                    self.local_aci.into(),
444                    None,
445                    &sync,
446                    timestamp,
447                    false,
448                    false,
449                )
450                .await?;
451            } else {
452                error!("could not create sync message from a direct message");
453            }
454        }
455
456        if end_session {
457            let n = self.protocol_store.delete_all_sessions(recipient).await?;
458            tracing::debug!(
459                "ended {} sessions with {}",
460                n,
461                recipient.raw_uuid()
462            );
463        }
464
465        result
466    }
467
468    /// Send a message to the recipients in a group.
469    ///
470    /// Recipients are a list of tuples, each containing:
471    /// - The recipient's address
472    /// - The recipient's unidentified access
473    /// - Whether the recipient requires a PNI signature
474    #[tracing::instrument(
475        skip(self, recipients, message),
476        fields(recipients = recipients.as_ref().len()),
477    )]
478    pub async fn send_message_to_group(
479        &mut self,
480        recipients: impl AsRef<[(ServiceId, Option<UnidentifiedAccess>, bool)]>,
481        message: impl Into<ContentBody>,
482        timestamp: u64,
483        online: bool,
484    ) -> Vec<SendMessageResult> {
485        let content_body: ContentBody = message.into();
486        let mut results = vec![];
487
488        let mut needs_sync_in_results = false;
489
490        for (recipient, unidentified_access, include_pni_signature) in
491            recipients.as_ref()
492        {
493            let result = self
494                .try_send_message(
495                    *recipient,
496                    unidentified_access.as_ref(),
497                    &content_body,
498                    timestamp,
499                    *include_pni_signature,
500                    online,
501                )
502                .await;
503
504            match result {
505                Ok(SentMessage { needs_sync, .. }) if needs_sync => {
506                    needs_sync_in_results = true;
507                },
508                _ => (),
509            };
510
511            results.push(result);
512        }
513
514        // we only need to send a synchronization message once
515        if needs_sync_in_results || self.is_multi_device().await {
516            if let Some(sync_message) = self
517                .create_multi_device_sent_transcript_content(
518                    None,
519                    content_body.clone(),
520                    timestamp,
521                    &results,
522                )
523            {
524                // Note: the result of sending a sync message is not included in results
525                // See Signal Android `SignalServiceMessageSender.java:2817`
526                if let Err(error) = self
527                    .try_send_message(
528                        self.local_aci.into(),
529                        None,
530                        &sync_message,
531                        timestamp,
532                        false, // XXX: maybe the sync device does want a PNI signature?
533                        false,
534                    )
535                    .await
536                {
537                    error!(%error, "failed to send a synchronization message");
538                }
539            } else {
540                error!("could not create sync message from a group message")
541            }
542        }
543
544        results
545    }
546
547    /// Send a message (`content`) to an address (`recipient`).
548    #[tracing::instrument(
549        level = "trace",
550        skip(self, unidentified_access, content_body, recipient),
551        fields(unidentified_access = unidentified_access.is_some(), recipient = recipient.service_id_string()),
552    )]
553    async fn try_send_message(
554        &mut self,
555        recipient: ServiceId,
556        mut unidentified_access: Option<&UnidentifiedAccess>,
557        content_body: &ContentBody,
558        timestamp: u64,
559        include_pni_signature: bool,
560        online: bool,
561    ) -> SendMessageResult {
562        trace!("trying to send a message");
563
564        use prost::Message;
565
566        let mut content = content_body.clone().into_proto();
567        if include_pni_signature {
568            content.pni_signature_message = Some(self.create_pni_signature()?);
569        }
570
571        let content_bytes = content.encode_to_vec();
572
573        let mut rng = rng();
574
575        for _ in 0..4u8 {
576            let Some(EncryptedMessages {
577                messages,
578                used_identity_key,
579            }) = self
580                .create_encrypted_messages(
581                    &recipient,
582                    unidentified_access.map(|x| &x.certificate),
583                    &content_bytes,
584                )
585                .await?
586            else {
587                // this can happen for example when a device is primary, without any secondaries
588                // and we send a message to ourselves (which is only a SyncMessage { sent: ... })
589                // addressed to self
590                return Err(MessageSenderError::NoMessagesToSend);
591            };
592
593            let messages = OutgoingPushMessages {
594                destination: recipient,
595                timestamp,
596                messages,
597                online,
598            };
599
600            let send = if let Some(unidentified) = &unidentified_access {
601                tracing::debug!("sending via unidentified");
602                self.unidentified_ws
603                    .send_messages_unidentified(messages, unidentified)
604                    .await
605            } else {
606                tracing::debug!("sending identified");
607                self.identified_ws.send_messages(messages).await
608            };
609
610            match send {
611                Ok(SendMessageResponse { needs_sync }) => {
612                    tracing::debug!("message sent!");
613                    return Ok(SentMessage {
614                        recipient,
615                        used_identity_key,
616                        unidentified: unidentified_access.is_some(),
617                        needs_sync,
618                    });
619                },
620                Err(ServiceError::Unauthorized)
621                    if unidentified_access.is_some() =>
622                {
623                    tracing::trace!("unauthorized error using unidentified; retry over identified");
624                    unidentified_access = None;
625                },
626                Err(ServiceError::MismatchedDevicesException(ref m)) => {
627                    tracing::debug!("{:?}", m);
628                    for extra_device_id in &m.extra_devices {
629                        tracing::debug!(
630                            "dropping session with device {}",
631                            extra_device_id
632                        );
633                        self.protocol_store
634                            .delete_service_addr_device_session(
635                                &recipient
636                                    .to_protocol_address(*extra_device_id)?,
637                            )
638                            .await?;
639                    }
640
641                    for missing_device_id in &m.missing_devices {
642                        tracing::debug!(
643                            "creating session with missing device {}",
644                            missing_device_id
645                        );
646                        let remote_address = recipient
647                            .to_protocol_address(*missing_device_id)?;
648                        let pre_key = self
649                            .identified_ws
650                            .get_pre_key(&recipient, *missing_device_id)
651                            .await?;
652
653                        process_prekey_bundle(
654                            &remote_address,
655                            &mut self.protocol_store.clone(),
656                            &mut self.protocol_store,
657                            &pre_key,
658                            SystemTime::now(),
659                            &mut rng,
660                        )
661                        .await
662                        .map_err(|e| {
663                            error!("failed to create session: {}", e);
664                            MessageSenderError::UntrustedIdentity {
665                                address: recipient,
666                            }
667                        })?;
668                    }
669                },
670                Err(ServiceError::StaleDevices(ref m)) => {
671                    tracing::debug!("{:?}", m);
672                    for extra_device_id in &m.stale_devices {
673                        tracing::debug!(
674                            "dropping session with device {}",
675                            extra_device_id
676                        );
677                        self.protocol_store
678                            .delete_service_addr_device_session(
679                                &recipient
680                                    .to_protocol_address(*extra_device_id)?,
681                            )
682                            .await?;
683                    }
684                },
685                Err(ServiceError::ProofRequiredError(ref p)) => {
686                    tracing::debug!("{:?}", p);
687                    return Err(MessageSenderError::ProofRequired {
688                        token: p.token.clone(),
689                        options: p.options.clone(),
690                    });
691                },
692                Err(ServiceError::NotFoundError) => {
693                    tracing::debug!("Not found when sending a message");
694                    return Err(MessageSenderError::NotFound {
695                        service_id: recipient,
696                    });
697                },
698                Err(e) => {
699                    tracing::debug!(
700                        "Default error handler for ws.send_messages: {}",
701                        e
702                    );
703                    return Err(MessageSenderError::ServiceError(e));
704                },
705            }
706        }
707
708        Err(MessageSenderError::MaximumRetriesLimitExceeded)
709    }
710
711    /// Upload contact details to the CDN and send a sync message
712    #[tracing::instrument(
713        skip(self, unidentified_access, contacts, recipient),
714        fields(unidentified_access = unidentified_access.is_some(), recipient = recipient.service_id_string()),
715    )]
716    pub async fn send_contact_details<Contacts>(
717        &mut self,
718        recipient: &ServiceId,
719        unidentified_access: Option<UnidentifiedAccess>,
720        // XXX It may be interesting to use an intermediary type,
721        //     instead of ContactDetails directly,
722        //     because it allows us to add the avatar content.
723        contacts: Contacts,
724        online: bool,
725        complete: bool,
726    ) -> Result<(), MessageSenderError>
727    where
728        Contacts: IntoIterator<Item = ContactDetails>,
729    {
730        let ptr = self.upload_contact_details(contacts).await?;
731
732        let msg = SyncMessage {
733            contacts: Some(sync_message::Contacts {
734                blob: Some(ptr),
735                complete: Some(complete),
736            }),
737            ..SyncMessage::with_padding(&mut rng())
738        };
739
740        self.send_sync_message(msg).await?;
741
742        Ok(())
743    }
744
745    /// Send `MessageRequestResponse` synchronization message with either a recipient ACI or a GroupV2 ID
746    #[tracing::instrument(skip(self), fields(recipient = recipient.service_id_string()))]
747    pub async fn send_message_request_response(
748        &mut self,
749        recipient: &ServiceId,
750        thread: &ThreadIdentifier,
751        action: message_request_response::Type,
752    ) -> Result<(), MessageSenderError> {
753        let message_request_response = Some(match thread {
754            ThreadIdentifier::Aci(aci) => {
755                tracing::debug!(
756                    "sending message request response {:?} for recipient {:?}",
757                    action,
758                    aci
759                );
760                MessageRequestResponse {
761                    thread_aci: Some(aci.to_string()),
762                    group_id: None,
763                    r#type: Some(action.into()),
764                }
765            },
766            ThreadIdentifier::Group(id) => {
767                tracing::debug!(
768                    "sending message request response {:?} for group {:?}",
769                    action,
770                    id
771                );
772                MessageRequestResponse {
773                    thread_aci: None,
774                    group_id: Some(id.to_vec()),
775                    r#type: Some(action.into()),
776                }
777            },
778        });
779
780        let msg = SyncMessage {
781            message_request_response,
782            ..SyncMessage::with_padding(&mut rng())
783        };
784
785        let ts = Utc::now().timestamp_millis() as u64;
786        self.send_message(recipient, None, msg, ts, false, false)
787            .await?;
788
789        Ok(())
790    }
791
792    /// Send a `SyncMessage` to own devices, if any.
793    pub async fn send_sync_message(
794        &mut self,
795        sync: SyncMessage,
796    ) -> Result<(), MessageSenderError> {
797        if self.is_multi_device().await {
798            let content = sync.into();
799            let timestamp = Utc::now().timestamp_millis() as u64;
800            debug!(
801                "sending multi-device sync message with content {content:?}"
802            );
803            self.try_send_message(
804                self.local_aci.into(),
805                None,
806                &content,
807                timestamp,
808                false,
809                false,
810            )
811            .await?;
812        }
813        Ok(())
814    }
815
816    /// Send a `SyncMessage` request message
817    #[tracing::instrument(skip(self))]
818    pub async fn send_sync_message_request(
819        &mut self,
820        recipient: &ServiceId,
821        request_type: sync_message::request::Type,
822    ) -> Result<(), MessageSenderError> {
823        if self.device_id == *DEFAULT_DEVICE_ID {
824            return Err(MessageSenderError::SendSyncMessageError(request_type));
825        }
826
827        let msg = SyncMessage {
828            request: Some(sync_message::Request {
829                r#type: Some(request_type.into()),
830            }),
831            ..SyncMessage::with_padding(&mut rng())
832        };
833        self.send_sync_message(msg).await?;
834
835        Ok(())
836    }
837
838    #[tracing::instrument(level = "trace", skip(self))]
839    fn create_pni_signature(
840        &mut self,
841    ) -> Result<crate::proto::PniSignatureMessage, MessageSenderError> {
842        let mut rng = rng();
843        let signature = self
844            .pni_identity
845            .expect("PNI key set when PNI signature requested")
846            .sign_alternate_identity(
847                self.aci_identity.identity_key(),
848                &mut rng,
849            )?;
850        Ok(crate::proto::PniSignatureMessage {
851            pni: Some(self.local_pni.service_id_binary()),
852            signature: Some(signature.into()),
853        })
854    }
855
856    // Equivalent with `getEncryptedMessages`
857    #[tracing::instrument(
858        level = "trace",
859        skip(self, unidentified_access, content),
860        fields(unidentified_access = unidentified_access.is_some(), recipient = recipient.service_id_string()),
861    )]
862    async fn create_encrypted_messages(
863        &mut self,
864        recipient: &ServiceId,
865        unidentified_access: Option<&SenderCertificate>,
866        content: &[u8],
867    ) -> Result<Option<EncryptedMessages>, MessageSenderError> {
868        let mut messages = vec![];
869
870        let mut devices: HashSet<DeviceId> = self
871            .protocol_store
872            .get_sub_device_sessions(recipient)
873            .await?
874            .into_iter()
875            .collect();
876
877        // always send to the primary device no matter what
878        devices.insert(*DEFAULT_DEVICE_ID);
879
880        // never try to send messages to the sender device
881        match recipient {
882            ServiceId::Aci(aci) => {
883                if *aci == self.local_aci {
884                    devices.remove(&self.device_id);
885                }
886            },
887            ServiceId::Pni(pni) => {
888                if *pni == self.local_pni {
889                    devices.remove(&self.device_id);
890                }
891            },
892        };
893
894        for device_id in devices {
895            trace!("sending message to device {}", device_id);
896            // `create_encrypted_message` may fail with `SessionNotFound` if the session is corrupted;
897            // see https://github.com/whisperfish/libsignal-client/commit/601454d20.
898            // If this happens, delete the session and retry.
899            for _attempt in 0..2 {
900                match self
901                    .create_encrypted_message(
902                        recipient,
903                        unidentified_access,
904                        device_id,
905                        content,
906                    )
907                    .await
908                {
909                    Ok(message) => {
910                        messages.push(message);
911                        break;
912                    },
913                    Err(MessageSenderError::ServiceError(
914                        ServiceError::SignalProtocolError(
915                            SignalProtocolError::SessionNotFound(addr),
916                        ),
917                    )) => {
918                        // SessionNotFound is returned on certain session corruption.
919                        // Since delete_session *creates* a session if it doesn't exist,
920                        // the NotFound error is an indicator of session corruption.
921                        // Try to delete this session, if it gets succesfully deleted, retry.  Otherwise, fail.
922                        tracing::warn!("Potential session corruption for {}, deleting session", addr);
923                        match self.protocol_store.delete_session(&addr).await {
924                            Ok(()) => continue,
925                            Err(error) => {
926                                tracing::warn!(%error, %addr, "failed to delete session");
927                                return Err(
928                                    SignalProtocolError::SessionNotFound(addr)
929                                        .into(),
930                                );
931                            },
932                        }
933                    },
934                    Err(e) => return Err(e),
935                }
936            }
937        }
938
939        if messages.is_empty() {
940            Ok(None)
941        } else {
942            Ok(Some(EncryptedMessages {
943                messages,
944                used_identity_key: self
945                    .protocol_store
946                    .get_identity(
947                        &recipient.to_protocol_address(*DEFAULT_DEVICE_ID),
948                    )
949                    .await?
950                    .ok_or(MessageSenderError::UntrustedIdentity {
951                        address: *recipient,
952                    })?,
953            }))
954        }
955    }
956
957    /// Equivalent to `getEncryptedMessage`
958    ///
959    /// When no session with the recipient exists, we need to create one.
960    #[tracing::instrument(
961        level = "trace",
962        skip(self, unidentified_access, content),
963        fields(unidentified_access = unidentified_access.is_some(), recipient = recipient.service_id_string()),
964    )]
965    pub(crate) async fn create_encrypted_message(
966        &mut self,
967        recipient: &ServiceId,
968        unidentified_access: Option<&SenderCertificate>,
969        device_id: DeviceId,
970        content: &[u8],
971    ) -> Result<OutgoingPushMessage, MessageSenderError> {
972        let recipient_protocol_address =
973            recipient.to_protocol_address(device_id);
974
975        tracing::trace!(
976            "encrypting message for {}",
977            recipient_protocol_address
978        );
979
980        // establish a session with the recipient/device if necessary
981        // no need to establish a session with ourselves (and our own current device)
982        if self
983            .protocol_store
984            .load_session(&recipient_protocol_address)
985            .await?
986            .is_none()
987        {
988            info!(
989                "establishing new session with {}",
990                recipient_protocol_address
991            );
992            let pre_keys = match self
993                .identified_ws
994                .get_pre_keys(recipient, device_id)
995                .await
996            {
997                Ok(ok) => {
998                    tracing::trace!("Get prekeys OK");
999                    ok
1000                },
1001                Err(ServiceError::NotFoundError) => {
1002                    return Err(MessageSenderError::NotFound {
1003                        service_id: *recipient,
1004                    });
1005                },
1006                Err(e) => Err(e)?,
1007            };
1008
1009            let mut rng = rng();
1010
1011            for pre_key_bundle in pre_keys {
1012                if recipient == &self.local_aci
1013                    && self.device_id == pre_key_bundle.device_id()?
1014                {
1015                    trace!("not establishing a session with myself!");
1016                    continue;
1017                }
1018
1019                let pre_key_address = get_preferred_protocol_address(
1020                    &self.protocol_store,
1021                    recipient,
1022                    pre_key_bundle.device_id()?,
1023                )
1024                .await?;
1025
1026                process_prekey_bundle(
1027                    &pre_key_address,
1028                    &mut self.protocol_store.clone(),
1029                    &mut self.protocol_store,
1030                    &pre_key_bundle,
1031                    SystemTime::now(),
1032                    &mut rng,
1033                )
1034                .await?;
1035            }
1036        }
1037
1038        let message = self
1039            .cipher
1040            .encrypt(
1041                &recipient_protocol_address,
1042                unidentified_access,
1043                content,
1044                &mut rng(),
1045            )
1046            .instrument(tracing::trace_span!("encrypting message"))
1047            .await?;
1048
1049        Ok(message)
1050    }
1051
1052    fn create_multi_device_sent_transcript_content<'a>(
1053        &mut self,
1054        recipient: Option<&ServiceId>,
1055        content_body: ContentBody,
1056        timestamp: u64,
1057        send_message_results: impl IntoIterator<Item = &'a SendMessageResult>,
1058    ) -> Option<ContentBody> {
1059        use sync_message::sent::UnidentifiedDeliveryStatus;
1060        let (message, edit_message) = match content_body {
1061            ContentBody::DataMessage(m) => (Some(m), None),
1062            ContentBody::EditMessage(m) => (None, Some(m)),
1063            _ => return None,
1064        };
1065        let unidentified_status: Vec<UnidentifiedDeliveryStatus> =
1066            send_message_results
1067                .into_iter()
1068                .filter_map(|result| result.as_ref().ok())
1069                .map(|sent| {
1070                    let SentMessage {
1071                        recipient,
1072                        unidentified,
1073                        used_identity_key,
1074                        ..
1075                    } = sent;
1076                    UnidentifiedDeliveryStatus {
1077                        destination_service_id: Some(
1078                            recipient.service_id_string(),
1079                        ),
1080                        unidentified: Some(*unidentified),
1081                        destination_pni_identity_key: Some(
1082                            used_identity_key.serialize().into(),
1083                        ),
1084                    }
1085                })
1086                .collect();
1087        Some(ContentBody::SynchronizeMessage(SyncMessage {
1088            sent: Some(sync_message::Sent {
1089                destination_service_id: recipient
1090                    .map(ServiceId::service_id_string),
1091                destination_e164: None,
1092                expiration_start_timestamp: message
1093                    .as_ref()
1094                    .and_then(|m| m.expire_timer)
1095                    .map(|_| timestamp),
1096                message,
1097                edit_message,
1098                timestamp: Some(timestamp),
1099                unidentified_status,
1100                ..Default::default()
1101            }),
1102            ..SyncMessage::with_padding(&mut rng())
1103        }))
1104    }
1105}