libsignal_service/
sender.rs

1use std::{collections::HashSet, time::SystemTime};
2
3use chrono::prelude::*;
4use libsignal_core::{curve::CurveError, InvalidDeviceId};
5use libsignal_protocol::{
6    process_prekey_bundle, Aci, DeviceId, IdentityKey, IdentityKeyPair, Pni,
7    ProtocolStore, SenderCertificate, SenderKeyStore, ServiceId,
8    SignalProtocolError,
9};
10use rand::{rng, CryptoRng, Rng};
11use tracing::{debug, error, info, trace, warn};
12use tracing_futures::Instrument;
13use uuid::Uuid;
14use zkgroup::GROUP_IDENTIFIER_LEN;
15
16use crate::{
17    cipher::{get_preferred_protocol_address, ServiceCipher},
18    content::ContentBody,
19    proto::{
20        attachment_pointer::{
21            AttachmentIdentifier, Flags as AttachmentPointerFlags,
22        },
23        sync_message::{
24            self, message_request_response, MessageRequestResponse,
25        },
26        AttachmentPointer, SyncMessage,
27    },
28    push_service::*,
29    service_address::ServiceIdExt,
30    session_store::SessionStoreExt,
31    unidentified_access::UnidentifiedAccess,
32    utils::{serde_device_id, serde_service_id},
33    websocket::{self, SignalWebSocket},
34};
35
36pub use crate::proto::ContactDetails;
37
38#[derive(serde::Serialize, Debug)]
39#[serde(rename_all = "camelCase")]
40pub struct OutgoingPushMessage {
41    pub r#type: u32,
42    #[serde(with = "serde_device_id")]
43    pub destination_device_id: DeviceId,
44    pub destination_registration_id: u32,
45    pub content: String,
46}
47
48#[derive(serde::Serialize, Debug)]
49pub struct OutgoingPushMessages {
50    #[serde(with = "serde_service_id")]
51    pub destination: ServiceId,
52    pub timestamp: u64,
53    pub messages: Vec<OutgoingPushMessage>,
54    pub online: bool,
55}
56
57#[derive(serde::Deserialize, Debug)]
58#[serde(rename_all = "camelCase")]
59pub struct SendMessageResponse {
60    pub needs_sync: bool,
61}
62
63pub type SendMessageResult = Result<SentMessage, MessageSenderError>;
64
65#[derive(Debug, Clone)]
66pub struct SentMessage {
67    pub recipient: ServiceId,
68    pub used_identity_key: IdentityKey,
69    pub unidentified: bool,
70    pub needs_sync: bool,
71}
72
73/// Attachment specification to be used for uploading.
74///
75/// Loose equivalent of Java's `SignalServiceAttachmentStream`.
76#[derive(Debug, Default)]
77pub struct AttachmentSpec {
78    pub content_type: String,
79    pub length: usize,
80    pub file_name: Option<String>,
81    pub preview: Option<Vec<u8>>,
82    pub voice_note: Option<bool>,
83    pub borderless: Option<bool>,
84    pub width: Option<u32>,
85    pub height: Option<u32>,
86    pub caption: Option<String>,
87    pub blur_hash: Option<String>,
88}
89
90#[derive(Clone)]
91pub struct MessageSender<S> {
92    identified_ws: SignalWebSocket<websocket::Identified>,
93    unidentified_ws: SignalWebSocket<websocket::Unidentified>,
94    service: PushService,
95    cipher: ServiceCipher<S>,
96    protocol_store: S,
97    local_aci: Aci,
98    local_pni: Pni,
99    aci_identity: IdentityKeyPair,
100    pni_identity: Option<IdentityKeyPair>,
101    device_id: DeviceId,
102}
103
104#[derive(thiserror::Error, Debug)]
105pub enum AttachmentUploadError {
106    #[error("{0}")]
107    ServiceError(#[from] ServiceError),
108
109    #[error("Could not read attachment contents")]
110    IoError(#[from] std::io::Error),
111}
112
113#[derive(thiserror::Error, Debug)]
114pub enum MessageSenderError {
115    #[error("service error: {0}")]
116    ServiceError(#[from] ServiceError),
117
118    #[error("protocol error: {0}")]
119    ProtocolError(#[from] SignalProtocolError),
120
121    #[error("invalid private key: {0}")]
122    InvalidPrivateKey(#[from] CurveError),
123
124    #[error("invalid device ID: {0}")]
125    InvalidDeviceId(#[from] InvalidDeviceId),
126
127    #[error("Failed to upload attachment {0}")]
128    AttachmentUploadError(#[from] AttachmentUploadError),
129
130    #[error("primary device can't send sync message {0:?}")]
131    SendSyncMessageError(sync_message::request::Type),
132
133    #[error("Untrusted identity key with {address:?}")]
134    UntrustedIdentity { address: ServiceId },
135
136    #[error("Exceeded maximum number of retries")]
137    MaximumRetriesLimitExceeded,
138
139    #[error("Proof of type {options:?} required using token {token}")]
140    ProofRequired { token: String, options: Vec<String> },
141
142    #[error("Recipient not found: {service_id:?}")]
143    NotFound { service_id: ServiceId },
144
145    #[error("no messages were encrypted: this should not really happen and most likely implies a logic error")]
146    NoMessagesToSend,
147}
148
149pub type GroupV2Id = [u8; GROUP_IDENTIFIER_LEN];
150
151#[derive(Debug)]
152pub enum ThreadIdentifier {
153    Aci(Uuid),
154    Group(GroupV2Id),
155}
156
157#[derive(Debug)]
158pub struct EncryptedMessages {
159    messages: Vec<OutgoingPushMessage>,
160    used_identity_key: IdentityKey,
161}
162
163impl<S> MessageSender<S>
164where
165    S: ProtocolStore + SenderKeyStore + SessionStoreExt + Sync + Clone,
166{
167    #[allow(clippy::too_many_arguments)]
168    pub fn new(
169        identified_ws: SignalWebSocket<websocket::Identified>,
170        unidentified_ws: SignalWebSocket<websocket::Unidentified>,
171        service: PushService,
172        cipher: ServiceCipher<S>,
173        protocol_store: S,
174        local_aci: impl Into<Aci>,
175        local_pni: impl Into<Pni>,
176        aci_identity: IdentityKeyPair,
177        pni_identity: Option<IdentityKeyPair>,
178        device_id: DeviceId,
179    ) -> Self {
180        MessageSender {
181            service,
182            identified_ws,
183            unidentified_ws,
184            cipher,
185            protocol_store,
186            local_aci: local_aci.into(),
187            local_pni: local_pni.into(),
188            aci_identity,
189            pni_identity,
190            device_id,
191        }
192    }
193
194    /// Encrypts and uploads an attachment
195    ///
196    /// Contents are accepted as an owned, plain text Vec, because encryption happens in-place.
197    #[tracing::instrument(skip(self, contents, csprng), fields(size = contents.len()))]
198    pub async fn upload_attachment<R: Rng + CryptoRng>(
199        &mut self,
200        spec: AttachmentSpec,
201        mut contents: Vec<u8>,
202        csprng: &mut R,
203    ) -> Result<AttachmentPointer, AttachmentUploadError> {
204        let len = contents.len();
205        // Encrypt
206        let (key, iv) = {
207            let mut key = [0u8; 64];
208            let mut iv = [0u8; 16];
209            csprng.fill_bytes(&mut key);
210            csprng.fill_bytes(&mut iv);
211            (key, iv)
212        };
213
214        // Padded length uses an exponential bracketting thingy.
215        // If you want to see how it looks:
216        // https://www.wolframalpha.com/input/?i=plot+floor%281.05%5Eceil%28log_1.05%28x%29%29%29+for+x+from+0+to+5000000
217        let padded_len: usize = {
218            // Java:
219            // return (int) Math.max(541, Math.floor(Math.pow(1.05, Math.ceil(Math.log(size) / Math.log(1.05)))))
220            std::cmp::max(
221                541,
222                1.05f64.powf((len as f64).log(1.05).ceil()).floor() as usize,
223            )
224        };
225        if padded_len < len {
226            error!(
227                "Padded len {} < len {}. Continuing with a privacy risk.",
228                padded_len, len
229            );
230        } else {
231            contents.resize(padded_len, 0);
232        }
233
234        tracing::trace_span!("encrypting attachment").in_scope(|| {
235            crate::attachment_cipher::encrypt_in_place(iv, key, &mut contents)
236        });
237
238        // Request upload attributes
239        // TODO: we can actually store the upload spec to be able to resume the upload later
240        // if it fails or stalls (= we should at least split the API calls so clients can decide what to do)
241        let attachment_upload_form = self
242            .service
243            .get_attachment_v4_upload_attributes()
244            .instrument(tracing::trace_span!("requesting upload attributes"))
245            .await?;
246
247        let resumable_upload_url = self
248            .service
249            .get_attachment_resumable_upload_url(&attachment_upload_form)
250            .await?;
251
252        let attachment_digest = self
253            .service
254            .upload_attachment_v4(
255                attachment_upload_form.cdn,
256                &resumable_upload_url,
257                contents.len() as u64,
258                attachment_upload_form.headers,
259                &mut std::io::Cursor::new(&contents),
260            )
261            .await?;
262
263        Ok(AttachmentPointer {
264            content_type: Some(spec.content_type),
265            key: Some(key.to_vec()),
266            size: Some(len as u32),
267            // thumbnail: Option<Vec<u8>>,
268            digest: Some(attachment_digest.digest),
269            file_name: spec.file_name,
270            flags: Some(
271                if spec.voice_note == Some(true) {
272                    AttachmentPointerFlags::VoiceMessage as u32
273                } else {
274                    0
275                } | if spec.borderless == Some(true) {
276                    AttachmentPointerFlags::Borderless as u32
277                } else {
278                    0
279                },
280            ),
281            width: spec.width,
282            height: spec.height,
283            caption: spec.caption,
284            blur_hash: spec.blur_hash,
285            upload_timestamp: Some(
286                SystemTime::now()
287                    .duration_since(SystemTime::UNIX_EPOCH)
288                    .expect("unix epoch in the past")
289                    .as_millis() as u64,
290            ),
291            cdn_number: Some(attachment_upload_form.cdn),
292            attachment_identifier: Some(AttachmentIdentifier::CdnKey(
293                attachment_upload_form.key,
294            )),
295            ..Default::default()
296        })
297    }
298
299    /// Upload contact details to the CDN
300    ///
301    /// Returns attachment ID and the attachment digest
302    #[tracing::instrument(skip(self, contacts))]
303    async fn upload_contact_details<Contacts>(
304        &mut self,
305        contacts: Contacts,
306    ) -> Result<AttachmentPointer, AttachmentUploadError>
307    where
308        Contacts: IntoIterator<Item = ContactDetails>,
309    {
310        use prost::Message;
311        let mut out = Vec::new();
312        for contact in contacts {
313            contact
314                .encode_length_delimited(&mut out)
315                .expect("infallible encoding");
316            // XXX add avatar here
317        }
318
319        let spec = AttachmentSpec {
320            content_type: "application/octet-stream".into(),
321            length: out.len(),
322            file_name: None,
323            preview: None,
324            voice_note: None,
325            borderless: None,
326            width: None,
327            height: None,
328            caption: None,
329            blur_hash: None,
330        };
331        self.upload_attachment(spec, out, &mut rng()).await
332    }
333
334    /// Return whether we have to prepare sync messages for other devices
335    ///
336    /// - If we are the main registered device, and there are established sub-device sessions (linked clients), return true
337    /// - If we are a secondary linked device, return true
338    async fn is_multi_device(&self) -> bool {
339        if self.device_id == *DEFAULT_DEVICE_ID {
340            self.protocol_store
341                .get_sub_device_sessions(&self.local_aci.into())
342                .await
343                .is_ok_and(|s| !s.is_empty())
344        } else {
345            true
346        }
347    }
348
349    /// Send a message `content` to a single `recipient`.
350    #[tracing::instrument(
351        skip(self, unidentified_access, message),
352        fields(unidentified_access = unidentified_access.is_some(), recipient = recipient.service_id_string()),
353    )]
354    pub async fn send_message(
355        &mut self,
356        recipient: &ServiceId,
357        mut unidentified_access: Option<UnidentifiedAccess>,
358        message: impl Into<ContentBody>,
359        timestamp: u64,
360        include_pni_signature: bool,
361        online: bool,
362    ) -> SendMessageResult {
363        let content_body = message.into();
364        let message_to_self = recipient == &self.local_aci;
365        let sync_message =
366            matches!(content_body, ContentBody::SynchronizeMessage(..));
367        let is_multi_device = self.is_multi_device().await;
368
369        use crate::proto::data_message::Flags;
370
371        let end_session = match &content_body {
372            ContentBody::DataMessage(message) => {
373                message.flags == Some(Flags::EndSession as u32)
374            },
375            _ => false,
376        };
377
378        // only send a sync message when sending to self and skip the rest of the process
379        if message_to_self && is_multi_device && !sync_message {
380            debug!("sending note to self");
381            if let Some(sync_message) = self
382                .create_multi_device_sent_transcript_content(
383                    Some(recipient),
384                    content_body,
385                    timestamp,
386                    None,
387                )
388            {
389                return self
390                    .try_send_message(
391                        *recipient,
392                        None,
393                        &sync_message,
394                        timestamp,
395                        include_pni_signature,
396                        online,
397                    )
398                    .await;
399            } else {
400                error!("could not create sync message from message to self");
401                return SendMessageResult::Err(
402                    MessageSenderError::NoMessagesToSend,
403                );
404            }
405        }
406
407        // don't send session enders as sealed sender
408        // sync messages are never sent as unidentified (reasons unclear), see: https://github.com/signalapp/Signal-Android/blob/main/libsignal-service/src/main/java/org/whispersystems/signalservice/api/SignalServiceMessageSender.java#L779
409        if end_session || sync_message {
410            unidentified_access.take();
411        }
412
413        // try to send the original message to all the recipient's devices
414        let result = self
415            .try_send_message(
416                *recipient,
417                unidentified_access.as_ref(),
418                &content_body,
419                timestamp,
420                include_pni_signature,
421                online,
422            )
423            .await;
424
425        let needs_sync = match &result {
426            Ok(SentMessage { needs_sync, .. }) => *needs_sync,
427            _ => false,
428        };
429
430        if needs_sync || is_multi_device {
431            let sync_body = if sync_message {
432                Some(content_body)
433            } else {
434                // Only some ContentBody types are syncable to self,
435                // not getting a content body to sync is not an error.
436                self.create_multi_device_sent_transcript_content(
437                    Some(recipient),
438                    content_body,
439                    timestamp,
440                    Some(&result),
441                )
442            };
443            if let Some(body) = sync_body {
444                debug!("sending multi-device sync message");
445                self.try_send_message(
446                    self.local_aci.into(),
447                    None,
448                    &body,
449                    timestamp,
450                    false,
451                    false,
452                )
453                .await?;
454            }
455        }
456
457        if end_session {
458            let n = self.protocol_store.delete_all_sessions(recipient).await?;
459            tracing::debug!(
460                "ended {} sessions with {}",
461                n,
462                recipient.raw_uuid()
463            );
464        }
465
466        result
467    }
468
469    /// Send a message to the recipients in a group.
470    ///
471    /// Recipients are a list of tuples, each containing:
472    /// - The recipient's address
473    /// - The recipient's unidentified access
474    /// - Whether the recipient requires a PNI signature
475    #[tracing::instrument(
476        skip(self, recipients, message),
477        fields(recipients = recipients.as_ref().len()),
478    )]
479    pub async fn send_message_to_group(
480        &mut self,
481        recipients: impl AsRef<[(ServiceId, Option<UnidentifiedAccess>, bool)]>,
482        message: impl Into<ContentBody>,
483        timestamp: u64,
484        online: bool,
485    ) -> Vec<SendMessageResult> {
486        let content_body: ContentBody = message.into();
487        let mut results = vec![];
488
489        let mut needs_sync_in_results = false;
490
491        for (recipient, unidentified_access, include_pni_signature) in
492            recipients.as_ref()
493        {
494            let result = self
495                .try_send_message(
496                    *recipient,
497                    unidentified_access.as_ref(),
498                    &content_body,
499                    timestamp,
500                    *include_pni_signature,
501                    online,
502                )
503                .await;
504
505            match result {
506                Ok(SentMessage { needs_sync, .. }) if needs_sync => {
507                    needs_sync_in_results = true;
508                },
509                _ => (),
510            };
511
512            results.push(result);
513        }
514
515        // we only need to send a synchronization message once
516        if needs_sync_in_results || self.is_multi_device().await {
517            if let Some(sync_message) = self
518                .create_multi_device_sent_transcript_content(
519                    None,
520                    content_body.clone(),
521                    timestamp,
522                    &results,
523                )
524            {
525                // Note: the result of sending a sync message is not included in results
526                // See Signal Android `SignalServiceMessageSender.java:2817`
527                if let Err(error) = self
528                    .try_send_message(
529                        self.local_aci.into(),
530                        None,
531                        &sync_message,
532                        timestamp,
533                        false, // XXX: maybe the sync device does want a PNI signature?
534                        false,
535                    )
536                    .await
537                {
538                    error!(%error, "failed to send a synchronization message");
539                }
540            } else {
541                error!("could not create sync message from a group message")
542            }
543        }
544
545        results
546    }
547
548    /// Send a message (`content`) to an address (`recipient`).
549    #[tracing::instrument(
550        level = "trace",
551        skip(self, unidentified_access, content_body, recipient),
552        fields(unidentified_access = unidentified_access.is_some(), recipient = recipient.service_id_string()),
553    )]
554    async fn try_send_message(
555        &mut self,
556        recipient: ServiceId,
557        mut unidentified_access: Option<&UnidentifiedAccess>,
558        content_body: &ContentBody,
559        timestamp: u64,
560        include_pni_signature: bool,
561        online: bool,
562    ) -> SendMessageResult {
563        trace!("trying to send a message");
564
565        use prost::Message;
566
567        let mut content = content_body.clone().into_proto();
568        if include_pni_signature {
569            content.pni_signature_message = Some(self.create_pni_signature()?);
570        }
571
572        let content_bytes = content.encode_to_vec();
573
574        let mut rng = rng();
575
576        for _ in 0..4u8 {
577            let Some(EncryptedMessages {
578                messages,
579                used_identity_key,
580            }) = self
581                .create_encrypted_messages(
582                    &recipient,
583                    unidentified_access.map(|x| &x.certificate),
584                    &content_bytes,
585                )
586                .await?
587            else {
588                // this can happen for example when a device is primary, without any secondaries
589                // and we send a message to ourselves (which is only a SyncMessage { sent: ... })
590                // addressed to self
591                return Err(MessageSenderError::NoMessagesToSend);
592            };
593
594            let messages = OutgoingPushMessages {
595                destination: recipient,
596                timestamp,
597                messages,
598                online,
599            };
600
601            let send = if let Some(unidentified) = &unidentified_access {
602                tracing::debug!("sending via unidentified");
603                self.unidentified_ws
604                    .send_messages_unidentified(messages, unidentified)
605                    .await
606            } else {
607                tracing::debug!("sending identified");
608                self.identified_ws.send_messages(messages).await
609            };
610
611            match send {
612                Ok(SendMessageResponse { needs_sync }) => {
613                    tracing::debug!("message sent!");
614                    return Ok(SentMessage {
615                        recipient,
616                        used_identity_key,
617                        unidentified: unidentified_access.is_some(),
618                        needs_sync,
619                    });
620                },
621                Err(ServiceError::Unauthorized)
622                    if unidentified_access.is_some() =>
623                {
624                    tracing::trace!("unauthorized error using unidentified; retry over identified");
625                    unidentified_access = None;
626                },
627                Err(ServiceError::MismatchedDevicesException(ref m)) => {
628                    tracing::debug!("{:?}", m);
629                    for extra_device_id in &m.extra_devices {
630                        tracing::debug!(
631                            "dropping session with device {}",
632                            extra_device_id
633                        );
634                        self.protocol_store
635                            .delete_service_addr_device_session(
636                                &recipient
637                                    .to_protocol_address(*extra_device_id)?,
638                            )
639                            .await?;
640                    }
641
642                    for missing_device_id in &m.missing_devices {
643                        tracing::debug!(
644                            "creating session with missing device {}",
645                            missing_device_id
646                        );
647                        let remote_address = recipient
648                            .to_protocol_address(*missing_device_id)?;
649                        let pre_key = self
650                            .identified_ws
651                            .get_pre_key(&recipient, *missing_device_id)
652                            .await?;
653
654                        process_prekey_bundle(
655                            &remote_address,
656                            &mut self.protocol_store.clone(),
657                            &mut self.protocol_store,
658                            &pre_key,
659                            SystemTime::now(),
660                            &mut rng,
661                        )
662                        .await
663                        .map_err(|e| {
664                            error!("failed to create session: {}", e);
665                            MessageSenderError::UntrustedIdentity {
666                                address: recipient,
667                            }
668                        })?;
669                    }
670                },
671                Err(ServiceError::StaleDevices(ref m)) => {
672                    tracing::debug!("{:?}", m);
673                    for extra_device_id in &m.stale_devices {
674                        tracing::debug!(
675                            "dropping session with device {}",
676                            extra_device_id
677                        );
678                        self.protocol_store
679                            .delete_service_addr_device_session(
680                                &recipient
681                                    .to_protocol_address(*extra_device_id)?,
682                            )
683                            .await?;
684                    }
685                },
686                Err(ServiceError::ProofRequiredError(ref p)) => {
687                    tracing::debug!("{:?}", p);
688                    return Err(MessageSenderError::ProofRequired {
689                        token: p.token.clone(),
690                        options: p.options.clone(),
691                    });
692                },
693                Err(ServiceError::NotFoundError) => {
694                    tracing::debug!("Not found when sending a message");
695                    return Err(MessageSenderError::NotFound {
696                        service_id: recipient,
697                    });
698                },
699                Err(e) => {
700                    tracing::debug!(
701                        "Default error handler for ws.send_messages: {}",
702                        e
703                    );
704                    return Err(MessageSenderError::ServiceError(e));
705                },
706            }
707        }
708
709        Err(MessageSenderError::MaximumRetriesLimitExceeded)
710    }
711
712    /// Upload contact details to the CDN and send a sync message
713    #[tracing::instrument(
714        skip(self, unidentified_access, contacts, recipient),
715        fields(unidentified_access = unidentified_access.is_some(), recipient = recipient.service_id_string()),
716    )]
717    pub async fn send_contact_details<Contacts>(
718        &mut self,
719        recipient: &ServiceId,
720        unidentified_access: Option<UnidentifiedAccess>,
721        // XXX It may be interesting to use an intermediary type,
722        //     instead of ContactDetails directly,
723        //     because it allows us to add the avatar content.
724        contacts: Contacts,
725        online: bool,
726        complete: bool,
727    ) -> Result<(), MessageSenderError>
728    where
729        Contacts: IntoIterator<Item = ContactDetails>,
730    {
731        let ptr = self.upload_contact_details(contacts).await?;
732
733        let msg = SyncMessage {
734            contacts: Some(sync_message::Contacts {
735                blob: Some(ptr),
736                complete: Some(complete),
737            }),
738            ..SyncMessage::with_padding(&mut rng())
739        };
740
741        self.send_sync_message(msg).await?;
742
743        Ok(())
744    }
745
746    /// Send `MessageRequestResponse` synchronization message with either a recipient ACI or a GroupV2 ID
747    #[tracing::instrument(skip(self), fields(recipient = recipient.service_id_string()))]
748    pub async fn send_message_request_response(
749        &mut self,
750        recipient: &ServiceId,
751        thread: &ThreadIdentifier,
752        action: message_request_response::Type,
753    ) -> Result<(), MessageSenderError> {
754        let message_request_response = Some(match thread {
755            ThreadIdentifier::Aci(aci) => {
756                tracing::debug!(
757                    "sending message request response {:?} for recipient {:?}",
758                    action,
759                    aci
760                );
761                MessageRequestResponse {
762                    thread_aci: Some(aci.to_string()),
763                    thread_aci_binary: Some(aci.into_bytes().to_vec()),
764                    group_id: None,
765                    r#type: Some(action.into()),
766                }
767            },
768            ThreadIdentifier::Group(id) => {
769                tracing::debug!(
770                    "sending message request response {:?} for group {:?}",
771                    action,
772                    id
773                );
774                MessageRequestResponse {
775                    thread_aci: None,
776                    thread_aci_binary: None,
777                    group_id: Some(id.to_vec()),
778                    r#type: Some(action.into()),
779                }
780            },
781        });
782
783        let msg = SyncMessage {
784            message_request_response,
785            ..SyncMessage::with_padding(&mut rng())
786        };
787
788        let ts = Utc::now().timestamp_millis() as u64;
789        self.send_message(recipient, None, msg, ts, false, false)
790            .await?;
791
792        Ok(())
793    }
794
795    /// Send a `SyncMessage` to own devices, if any.
796    pub async fn send_sync_message(
797        &mut self,
798        sync: SyncMessage,
799    ) -> Result<(), MessageSenderError> {
800        if self.is_multi_device().await {
801            let content = sync.into();
802            let timestamp = Utc::now().timestamp_millis() as u64;
803            debug!(
804                "sending multi-device sync message with content {content:?}"
805            );
806            self.try_send_message(
807                self.local_aci.into(),
808                None,
809                &content,
810                timestamp,
811                false,
812                false,
813            )
814            .await?;
815        }
816        Ok(())
817    }
818
819    /// Send a `SyncMessage` request message
820    #[tracing::instrument(skip(self))]
821    pub async fn send_sync_message_request(
822        &mut self,
823        recipient: &ServiceId,
824        request_type: sync_message::request::Type,
825    ) -> Result<(), MessageSenderError> {
826        if self.device_id == *DEFAULT_DEVICE_ID {
827            return Err(MessageSenderError::SendSyncMessageError(request_type));
828        }
829
830        let msg = SyncMessage {
831            request: Some(sync_message::Request {
832                r#type: Some(request_type.into()),
833            }),
834            ..SyncMessage::with_padding(&mut rng())
835        };
836        self.send_sync_message(msg).await?;
837
838        Ok(())
839    }
840
841    #[tracing::instrument(level = "trace", skip(self))]
842    fn create_pni_signature(
843        &mut self,
844    ) -> Result<crate::proto::PniSignatureMessage, MessageSenderError> {
845        let mut rng = rng();
846        let signature = self
847            .pni_identity
848            .expect("PNI key set when PNI signature requested")
849            .sign_alternate_identity(
850                self.aci_identity.identity_key(),
851                &mut rng,
852            )?;
853        Ok(crate::proto::PniSignatureMessage {
854            pni: Some(self.local_pni.service_id_binary()),
855            signature: Some(signature.into()),
856        })
857    }
858
859    // Equivalent with `getEncryptedMessages`
860    #[tracing::instrument(
861        level = "trace",
862        skip(self, unidentified_access, content),
863        fields(unidentified_access = unidentified_access.is_some(), recipient = recipient.service_id_string()),
864    )]
865    async fn create_encrypted_messages(
866        &mut self,
867        recipient: &ServiceId,
868        unidentified_access: Option<&SenderCertificate>,
869        content: &[u8],
870    ) -> Result<Option<EncryptedMessages>, MessageSenderError> {
871        let mut messages = vec![];
872
873        let mut devices: HashSet<DeviceId> = self
874            .protocol_store
875            .get_sub_device_sessions(recipient)
876            .await?
877            .into_iter()
878            .collect();
879
880        // always send to the primary device no matter what
881        devices.insert(*DEFAULT_DEVICE_ID);
882
883        // never try to send messages to the sender device
884        match recipient {
885            ServiceId::Aci(aci) => {
886                if *aci == self.local_aci {
887                    devices.remove(&self.device_id);
888                }
889            },
890            ServiceId::Pni(pni) => {
891                if *pni == self.local_pni {
892                    devices.remove(&self.device_id);
893                }
894            },
895        };
896
897        for device_id in devices {
898            trace!("sending message to device {}", device_id);
899            // `create_encrypted_message` may fail with `SessionNotFound` if the session is corrupted;
900            // see https://github.com/whisperfish/libsignal-client/commit/601454d20.
901            // If this happens, delete the session and retry.
902            for _attempt in 0..2 {
903                match self
904                    .create_encrypted_message(
905                        recipient,
906                        unidentified_access,
907                        device_id,
908                        content,
909                    )
910                    .await
911                {
912                    Ok(message) => {
913                        messages.push(message);
914                        break;
915                    },
916                    Err(MessageSenderError::ServiceError(
917                        ServiceError::SignalProtocolError(
918                            SignalProtocolError::SessionNotFound(addr),
919                        ),
920                    )) => {
921                        // SessionNotFound is returned on certain session corruption.
922                        // Since delete_session *creates* a session if it doesn't exist,
923                        // the NotFound error is an indicator of session corruption.
924                        // Try to delete this session, if it gets succesfully deleted, retry.  Otherwise, fail.
925                        tracing::warn!("Potential session corruption for {}, deleting session", addr);
926                        match self.protocol_store.delete_session(&addr).await {
927                            Ok(()) => continue,
928                            Err(error) => {
929                                tracing::warn!(%error, %addr, "failed to delete session");
930                                return Err(
931                                    SignalProtocolError::SessionNotFound(addr)
932                                        .into(),
933                                );
934                            },
935                        }
936                    },
937                    Err(e) => return Err(e),
938                }
939            }
940        }
941
942        if messages.is_empty() {
943            Ok(None)
944        } else {
945            Ok(Some(EncryptedMessages {
946                messages,
947                used_identity_key: self
948                    .protocol_store
949                    .get_identity(
950                        &recipient.to_protocol_address(*DEFAULT_DEVICE_ID),
951                    )
952                    .await?
953                    .ok_or(MessageSenderError::UntrustedIdentity {
954                        address: *recipient,
955                    })?,
956            }))
957        }
958    }
959
960    /// Equivalent to `getEncryptedMessage`
961    ///
962    /// When no session with the recipient exists, we need to create one.
963    #[tracing::instrument(
964        level = "trace",
965        skip(self, unidentified_access, content),
966        fields(unidentified_access = unidentified_access.is_some(), recipient = recipient.service_id_string()),
967    )]
968    pub(crate) async fn create_encrypted_message(
969        &mut self,
970        recipient: &ServiceId,
971        unidentified_access: Option<&SenderCertificate>,
972        device_id: DeviceId,
973        content: &[u8],
974    ) -> Result<OutgoingPushMessage, MessageSenderError> {
975        let recipient_protocol_address =
976            recipient.to_protocol_address(device_id);
977
978        tracing::trace!(
979            "encrypting message for {}",
980            recipient_protocol_address
981        );
982
983        // establish a session with the recipient/device if necessary
984        // no need to establish a session with ourselves (and our own current device)
985        if self
986            .protocol_store
987            .load_session(&recipient_protocol_address)
988            .await?
989            .is_none()
990        {
991            info!(
992                "establishing new session with {}",
993                recipient_protocol_address
994            );
995            let pre_keys = match self
996                .identified_ws
997                .get_pre_keys(recipient, device_id)
998                .await
999            {
1000                Ok(ok) => {
1001                    tracing::trace!("Get prekeys OK");
1002                    ok
1003                },
1004                Err(ServiceError::NotFoundError) => {
1005                    return Err(MessageSenderError::NotFound {
1006                        service_id: *recipient,
1007                    });
1008                },
1009                Err(e) => Err(e)?,
1010            };
1011
1012            let mut rng = rng();
1013
1014            for pre_key_bundle in pre_keys {
1015                if recipient == &self.local_aci
1016                    && self.device_id == pre_key_bundle.device_id()?
1017                {
1018                    trace!("not establishing a session with myself!");
1019                    continue;
1020                }
1021
1022                let pre_key_address = get_preferred_protocol_address(
1023                    &self.protocol_store,
1024                    recipient,
1025                    pre_key_bundle.device_id()?,
1026                )
1027                .await?;
1028
1029                process_prekey_bundle(
1030                    &pre_key_address,
1031                    &mut self.protocol_store.clone(),
1032                    &mut self.protocol_store,
1033                    &pre_key_bundle,
1034                    SystemTime::now(),
1035                    &mut rng,
1036                )
1037                .await?;
1038            }
1039        }
1040
1041        let message = self
1042            .cipher
1043            .encrypt(
1044                &recipient_protocol_address,
1045                unidentified_access,
1046                content,
1047                &mut rng(),
1048            )
1049            .instrument(tracing::trace_span!("encrypting message"))
1050            .await?;
1051
1052        Ok(message)
1053    }
1054
1055    fn create_multi_device_sent_transcript_content<'a>(
1056        &mut self,
1057        recipient: Option<&ServiceId>,
1058        content_body: ContentBody,
1059        timestamp: u64,
1060        send_message_results: impl IntoIterator<Item = &'a SendMessageResult>,
1061    ) -> Option<ContentBody> {
1062        use sync_message::sent::UnidentifiedDeliveryStatus;
1063        let (message, edit_message) = match content_body {
1064            ContentBody::DataMessage(m) => (Some(m), None),
1065            ContentBody::EditMessage(m) => (None, Some(m)),
1066            content_body => {
1067                tracing::trace!(?content_body, "not syncing to self");
1068                return None;
1069            },
1070        };
1071        let unidentified_status: Vec<UnidentifiedDeliveryStatus> =
1072            send_message_results
1073                .into_iter()
1074                .filter_map(|result| result.as_ref().ok())
1075                .map(|sent| {
1076                    let SentMessage {
1077                        recipient,
1078                        unidentified,
1079                        used_identity_key,
1080                        ..
1081                    } = sent;
1082                    UnidentifiedDeliveryStatus {
1083                        destination_service_id: Some(
1084                            recipient.service_id_string(),
1085                        ),
1086                        destination_service_id_binary: Some(
1087                            recipient.service_id_binary(),
1088                        ),
1089                        unidentified: Some(*unidentified),
1090                        destination_pni_identity_key: Some(
1091                            used_identity_key.serialize().into(),
1092                        ),
1093                    }
1094                })
1095                .collect();
1096        Some(ContentBody::SynchronizeMessage(SyncMessage {
1097            sent: Some(sync_message::Sent {
1098                destination_service_id: recipient
1099                    .map(ServiceId::service_id_string),
1100                destination_service_id_binary: recipient
1101                    .map(ServiceId::service_id_binary),
1102                destination_e164: None,
1103                expiration_start_timestamp: message
1104                    .as_ref()
1105                    .and_then(|m| m.expire_timer)
1106                    .map(|_| timestamp),
1107                message,
1108                edit_message,
1109                timestamp: Some(timestamp),
1110                unidentified_status,
1111                ..Default::default()
1112            }),
1113            ..SyncMessage::with_padding(&mut rng())
1114        }))
1115    }
1116}