1use std::{collections::HashSet, time::SystemTime};
2
3use chrono::prelude::*;
4use libsignal_core::{curve::CurveError, InvalidDeviceId};
5use libsignal_protocol::{
6 process_prekey_bundle, Aci, DeviceId, IdentityKey, IdentityKeyPair, Pni,
7 ProtocolStore, SenderCertificate, SenderKeyStore, ServiceId,
8 SignalProtocolError,
9};
10use rand::{rng, CryptoRng, Rng};
11use tracing::{debug, error, info, trace, warn};
12use tracing_futures::Instrument;
13use uuid::Uuid;
14use zkgroup::GROUP_IDENTIFIER_LEN;
15
16use crate::{
17 cipher::{get_preferred_protocol_address, ServiceCipher},
18 content::ContentBody,
19 proto::{
20 attachment_pointer::{
21 AttachmentIdentifier, Flags as AttachmentPointerFlags,
22 },
23 sync_message::{
24 self, message_request_response, MessageRequestResponse,
25 },
26 AttachmentPointer, SyncMessage,
27 },
28 push_service::*,
29 service_address::ServiceIdExt,
30 session_store::SessionStoreExt,
31 unidentified_access::UnidentifiedAccess,
32 utils::serde_service_id,
33 websocket::SignalWebSocket,
34};
35
36pub use crate::proto::ContactDetails;
37
38#[derive(serde::Serialize, Debug)]
39#[serde(rename_all = "camelCase")]
40pub struct OutgoingPushMessage {
41 pub r#type: u32,
42 pub destination_device_id: u32,
43 pub destination_registration_id: u32,
44 pub content: String,
45}
46
47#[derive(serde::Serialize, Debug)]
48pub struct OutgoingPushMessages {
49 #[serde(with = "serde_service_id")]
50 pub destination: ServiceId,
51 pub timestamp: u64,
52 pub messages: Vec<OutgoingPushMessage>,
53 pub online: bool,
54}
55
56#[derive(serde::Deserialize, Debug)]
57#[serde(rename_all = "camelCase")]
58pub struct SendMessageResponse {
59 pub needs_sync: bool,
60}
61
62pub type SendMessageResult = Result<SentMessage, MessageSenderError>;
63
64#[derive(Debug, Clone)]
65pub struct SentMessage {
66 pub recipient: ServiceId,
67 pub used_identity_key: IdentityKey,
68 pub unidentified: bool,
69 pub needs_sync: bool,
70}
71
72#[derive(Debug, Default)]
76pub struct AttachmentSpec {
77 pub content_type: String,
78 pub length: usize,
79 pub file_name: Option<String>,
80 pub preview: Option<Vec<u8>>,
81 pub voice_note: Option<bool>,
82 pub borderless: Option<bool>,
83 pub width: Option<u32>,
84 pub height: Option<u32>,
85 pub caption: Option<String>,
86 pub blur_hash: Option<String>,
87}
88
89#[derive(Clone)]
90pub struct MessageSender<S> {
91 identified_ws: SignalWebSocket,
92 unidentified_ws: SignalWebSocket,
93 service: PushService,
94 cipher: ServiceCipher<S>,
95 protocol_store: S,
96 local_aci: Aci,
97 local_pni: Pni,
98 aci_identity: IdentityKeyPair,
99 pni_identity: Option<IdentityKeyPair>,
100 device_id: DeviceId,
101}
102
103#[derive(thiserror::Error, Debug)]
104pub enum AttachmentUploadError {
105 #[error("{0}")]
106 ServiceError(#[from] ServiceError),
107
108 #[error("Could not read attachment contents")]
109 IoError(#[from] std::io::Error),
110}
111
112#[derive(thiserror::Error, Debug)]
113pub enum MessageSenderError {
114 #[error("service error: {0}")]
115 ServiceError(#[from] ServiceError),
116
117 #[error("protocol error: {0}")]
118 ProtocolError(#[from] SignalProtocolError),
119
120 #[error("invalid private key: {0}")]
121 InvalidPrivateKey(#[from] CurveError),
122
123 #[error("invalid device ID: {0}")]
124 InvalidDeviceId(#[from] InvalidDeviceId),
125
126 #[error("Failed to upload attachment {0}")]
127 AttachmentUploadError(#[from] AttachmentUploadError),
128
129 #[error("primary device can't send sync message {0:?}")]
130 SendSyncMessageError(sync_message::request::Type),
131
132 #[error("Untrusted identity key with {address:?}")]
133 UntrustedIdentity { address: ServiceId },
134
135 #[error("Exceeded maximum number of retries")]
136 MaximumRetriesLimitExceeded,
137
138 #[error("Proof of type {options:?} required using token {token}")]
139 ProofRequired { token: String, options: Vec<String> },
140
141 #[error("Recipient not found: {service_id:?}")]
142 NotFound { service_id: ServiceId },
143
144 #[error("no messages were encrypted: this should not really happen and most likely implies a logic error")]
145 NoMessagesToSend,
146}
147
148pub type GroupV2Id = [u8; GROUP_IDENTIFIER_LEN];
149
150#[derive(Debug)]
151pub enum ThreadIdentifier {
152 Aci(Uuid),
153 Group(GroupV2Id),
154}
155
156#[derive(Debug)]
157pub struct EncryptedMessages {
158 messages: Vec<OutgoingPushMessage>,
159 used_identity_key: IdentityKey,
160}
161
162impl<S> MessageSender<S>
163where
164 S: ProtocolStore + SenderKeyStore + SessionStoreExt + Sync + Clone,
165{
166 #[allow(clippy::too_many_arguments)]
167 pub fn new(
168 identified_ws: SignalWebSocket,
169 unidentified_ws: SignalWebSocket,
170 service: PushService,
171 cipher: ServiceCipher<S>,
172 protocol_store: S,
173 local_aci: impl Into<Aci>,
174 local_pni: impl Into<Pni>,
175 aci_identity: IdentityKeyPair,
176 pni_identity: Option<IdentityKeyPair>,
177 device_id: DeviceId,
178 ) -> Self {
179 MessageSender {
180 service,
181 identified_ws,
182 unidentified_ws,
183 cipher,
184 protocol_store,
185 local_aci: local_aci.into(),
186 local_pni: local_pni.into(),
187 aci_identity,
188 pni_identity,
189 device_id,
190 }
191 }
192
193 #[tracing::instrument(skip(self, contents, csprng), fields(size = contents.len()))]
197 pub async fn upload_attachment<R: Rng + CryptoRng>(
198 &mut self,
199 spec: AttachmentSpec,
200 mut contents: Vec<u8>,
201 csprng: &mut R,
202 ) -> Result<AttachmentPointer, AttachmentUploadError> {
203 let len = contents.len();
204 let (key, iv) = {
206 let mut key = [0u8; 64];
207 let mut iv = [0u8; 16];
208 csprng.fill_bytes(&mut key);
209 csprng.fill_bytes(&mut iv);
210 (key, iv)
211 };
212
213 let padded_len: usize = {
217 std::cmp::max(
220 541,
221 1.05f64.powf((len as f64).log(1.05).ceil()).floor() as usize,
222 )
223 };
224 if padded_len < len {
225 error!(
226 "Padded len {} < len {}. Continuing with a privacy risk.",
227 padded_len, len
228 );
229 } else {
230 contents.resize(padded_len, 0);
231 }
232
233 tracing::trace_span!("encrypting attachment").in_scope(|| {
234 crate::attachment_cipher::encrypt_in_place(iv, key, &mut contents)
235 });
236
237 let attachment_upload_form = self
241 .service
242 .get_attachment_v4_upload_attributes()
243 .instrument(tracing::trace_span!("requesting upload attributes"))
244 .await?;
245
246 let resumable_upload_url = self
247 .service
248 .get_attachment_resumable_upload_url(&attachment_upload_form)
249 .await?;
250
251 let attachment_digest = self
252 .service
253 .upload_attachment_v4(
254 attachment_upload_form.cdn,
255 &resumable_upload_url,
256 contents.len() as u64,
257 attachment_upload_form.headers,
258 &mut std::io::Cursor::new(&contents),
259 )
260 .await?;
261
262 Ok(AttachmentPointer {
263 content_type: Some(spec.content_type),
264 key: Some(key.to_vec()),
265 size: Some(len as u32),
266 digest: Some(attachment_digest.digest),
268 file_name: spec.file_name,
269 flags: Some(
270 if spec.voice_note == Some(true) {
271 AttachmentPointerFlags::VoiceMessage as u32
272 } else {
273 0
274 } | if spec.borderless == Some(true) {
275 AttachmentPointerFlags::Borderless as u32
276 } else {
277 0
278 },
279 ),
280 width: spec.width,
281 height: spec.height,
282 caption: spec.caption,
283 blur_hash: spec.blur_hash,
284 upload_timestamp: Some(
285 SystemTime::now()
286 .duration_since(SystemTime::UNIX_EPOCH)
287 .expect("unix epoch in the past")
288 .as_millis() as u64,
289 ),
290 cdn_number: Some(attachment_upload_form.cdn),
291 attachment_identifier: Some(AttachmentIdentifier::CdnKey(
292 attachment_upload_form.key,
293 )),
294 ..Default::default()
295 })
296 }
297
298 #[tracing::instrument(skip(self, contacts))]
302 async fn upload_contact_details<Contacts>(
303 &mut self,
304 contacts: Contacts,
305 ) -> Result<AttachmentPointer, AttachmentUploadError>
306 where
307 Contacts: IntoIterator<Item = ContactDetails>,
308 {
309 use prost::Message;
310 let mut out = Vec::new();
311 for contact in contacts {
312 contact
313 .encode_length_delimited(&mut out)
314 .expect("infallible encoding");
315 }
317
318 let spec = AttachmentSpec {
319 content_type: "application/octet-stream".into(),
320 length: out.len(),
321 file_name: None,
322 preview: None,
323 voice_note: None,
324 borderless: None,
325 width: None,
326 height: None,
327 caption: None,
328 blur_hash: None,
329 };
330 self.upload_attachment(spec, out, &mut rng()).await
331 }
332
333 async fn is_multi_device(&self) -> bool {
338 if self.device_id == *DEFAULT_DEVICE_ID {
339 self.protocol_store
340 .get_sub_device_sessions(&self.local_aci.into())
341 .await
342 .is_ok_and(|s| !s.is_empty())
343 } else {
344 true
345 }
346 }
347
348 #[tracing::instrument(
350 skip(self, unidentified_access, message),
351 fields(unidentified_access = unidentified_access.is_some(), recipient = recipient.service_id_string()),
352 )]
353 pub async fn send_message(
354 &mut self,
355 recipient: &ServiceId,
356 mut unidentified_access: Option<UnidentifiedAccess>,
357 message: impl Into<ContentBody>,
358 timestamp: u64,
359 include_pni_signature: bool,
360 online: bool,
361 ) -> SendMessageResult {
362 let content_body = message.into();
363 let message_to_self = recipient == &self.local_aci;
364 let sync_message =
365 matches!(content_body, ContentBody::SynchronizeMessage(..));
366 let is_multi_device = self.is_multi_device().await;
367
368 use crate::proto::data_message::Flags;
369
370 let end_session = match &content_body {
371 ContentBody::DataMessage(message) => {
372 message.flags == Some(Flags::EndSession as u32)
373 },
374 _ => false,
375 };
376
377 if message_to_self && is_multi_device && !sync_message {
379 debug!("sending note to self");
380 if let Some(sync_message) = self
381 .create_multi_device_sent_transcript_content(
382 Some(recipient),
383 content_body,
384 timestamp,
385 None,
386 )
387 {
388 return self
389 .try_send_message(
390 *recipient,
391 None,
392 &sync_message,
393 timestamp,
394 include_pni_signature,
395 online,
396 )
397 .await;
398 } else {
399 error!("could not create sync message from message to self");
400 return SendMessageResult::Err(
401 MessageSenderError::NoMessagesToSend,
402 );
403 }
404 }
405
406 if end_session || sync_message {
409 unidentified_access.take();
410 }
411
412 let result = self
414 .try_send_message(
415 *recipient,
416 unidentified_access.as_ref(),
417 &content_body,
418 timestamp,
419 include_pni_signature,
420 online,
421 )
422 .await;
423
424 let needs_sync = match &result {
425 Ok(SentMessage { needs_sync, .. }) => *needs_sync,
426 _ => false,
427 };
428
429 if needs_sync || is_multi_device {
430 debug!("sending multi-device sync message");
431 if let Some(sync) = if sync_message {
432 Some(content_body)
433 } else {
434 self.create_multi_device_sent_transcript_content(
435 Some(recipient),
436 content_body,
437 timestamp,
438 Some(&result),
439 )
440 } {
441 self.try_send_message(
442 self.local_aci.into(),
443 None,
444 &sync,
445 timestamp,
446 false,
447 false,
448 )
449 .await?;
450 } else {
451 error!("could not create sync message from a direct message");
452 }
453 }
454
455 if end_session {
456 let n = self.protocol_store.delete_all_sessions(recipient).await?;
457 tracing::debug!(
458 "ended {} sessions with {}",
459 n,
460 recipient.raw_uuid()
461 );
462 }
463
464 result
465 }
466
467 #[tracing::instrument(
474 skip(self, recipients, message),
475 fields(recipients = recipients.as_ref().len()),
476 )]
477 pub async fn send_message_to_group(
478 &mut self,
479 recipients: impl AsRef<[(ServiceId, Option<UnidentifiedAccess>, bool)]>,
480 message: impl Into<ContentBody>,
481 timestamp: u64,
482 online: bool,
483 ) -> Vec<SendMessageResult> {
484 let content_body: ContentBody = message.into();
485 let mut results = vec![];
486
487 let mut needs_sync_in_results = false;
488
489 for (recipient, unidentified_access, include_pni_signature) in
490 recipients.as_ref()
491 {
492 let result = self
493 .try_send_message(
494 *recipient,
495 unidentified_access.as_ref(),
496 &content_body,
497 timestamp,
498 *include_pni_signature,
499 online,
500 )
501 .await;
502
503 match result {
504 Ok(SentMessage { needs_sync, .. }) if needs_sync => {
505 needs_sync_in_results = true;
506 },
507 _ => (),
508 };
509
510 results.push(result);
511 }
512
513 if needs_sync_in_results || self.is_multi_device().await {
515 if let Some(sync_message) = self
516 .create_multi_device_sent_transcript_content(
517 None,
518 content_body.clone(),
519 timestamp,
520 &results,
521 )
522 {
523 if let Err(error) = self
526 .try_send_message(
527 self.local_aci.into(),
528 None,
529 &sync_message,
530 timestamp,
531 false, false,
533 )
534 .await
535 {
536 error!(%error, "failed to send a synchronization message");
537 }
538 } else {
539 error!("could not create sync message from a group message")
540 }
541 }
542
543 results
544 }
545
546 #[tracing::instrument(
548 level = "trace",
549 skip(self, unidentified_access, content_body, recipient),
550 fields(unidentified_access = unidentified_access.is_some(), recipient = recipient.service_id_string()),
551 )]
552 async fn try_send_message(
553 &mut self,
554 recipient: ServiceId,
555 mut unidentified_access: Option<&UnidentifiedAccess>,
556 content_body: &ContentBody,
557 timestamp: u64,
558 include_pni_signature: bool,
559 online: bool,
560 ) -> SendMessageResult {
561 trace!("trying to send a message");
562
563 use prost::Message;
564
565 let mut content = content_body.clone().into_proto();
566 if include_pni_signature {
567 content.pni_signature_message = Some(self.create_pni_signature()?);
568 }
569
570 let content_bytes = content.encode_to_vec();
571
572 let mut rng = rng();
573
574 for _ in 0..4u8 {
575 let Some(EncryptedMessages {
576 messages,
577 used_identity_key,
578 }) = self
579 .create_encrypted_messages(
580 &recipient,
581 unidentified_access.map(|x| &x.certificate),
582 &content_bytes,
583 )
584 .await?
585 else {
586 return Err(MessageSenderError::NoMessagesToSend);
590 };
591
592 let messages = OutgoingPushMessages {
593 destination: recipient,
594 timestamp,
595 messages,
596 online,
597 };
598
599 let send = if let Some(unidentified) = &unidentified_access {
600 tracing::debug!("sending via unidentified");
601 self.unidentified_ws
602 .send_messages_unidentified(messages, unidentified)
603 .await
604 } else {
605 tracing::debug!("sending identified");
606 self.identified_ws.send_messages(messages).await
607 };
608
609 match send {
610 Ok(SendMessageResponse { needs_sync }) => {
611 tracing::debug!("message sent!");
612 return Ok(SentMessage {
613 recipient,
614 used_identity_key,
615 unidentified: unidentified_access.is_some(),
616 needs_sync,
617 });
618 },
619 Err(ServiceError::Unauthorized)
620 if unidentified_access.is_some() =>
621 {
622 tracing::trace!("unauthorized error using unidentified; retry over identified");
623 unidentified_access = None;
624 },
625 Err(ServiceError::MismatchedDevicesException(ref m)) => {
626 tracing::debug!("{:?}", m);
627 for extra_device_id in &m.extra_devices {
628 tracing::debug!(
629 "dropping session with device {}",
630 extra_device_id
631 );
632 self.protocol_store
633 .delete_service_addr_device_session(
634 &recipient.to_protocol_address(
635 (*extra_device_id).try_into()?,
636 )?,
637 )
638 .await?;
639 }
640
641 for missing_device_id in &m.missing_devices {
642 tracing::debug!(
643 "creating session with missing device {}",
644 missing_device_id
645 );
646 let remote_address = recipient.to_protocol_address(
647 (*missing_device_id).try_into()?,
648 )?;
649 let pre_key = self
650 .service
651 .get_pre_key(&recipient, *missing_device_id)
652 .await?;
653
654 process_prekey_bundle(
655 &remote_address,
656 &mut self.protocol_store.clone(),
657 &mut self.protocol_store,
658 &pre_key,
659 SystemTime::now(),
660 &mut rng,
661 )
662 .await
663 .map_err(|e| {
664 error!("failed to create session: {}", e);
665 MessageSenderError::UntrustedIdentity {
666 address: recipient,
667 }
668 })?;
669 }
670 },
671 Err(ServiceError::StaleDevices(ref m)) => {
672 tracing::debug!("{:?}", m);
673 for extra_device_id in &m.stale_devices {
674 tracing::debug!(
675 "dropping session with device {}",
676 extra_device_id
677 );
678 self.protocol_store
679 .delete_service_addr_device_session(
680 &recipient.to_protocol_address(
681 (*extra_device_id).try_into()?,
682 )?,
683 )
684 .await?;
685 }
686 },
687 Err(ServiceError::ProofRequiredError(ref p)) => {
688 tracing::debug!("{:?}", p);
689 return Err(MessageSenderError::ProofRequired {
690 token: p.token.clone(),
691 options: p.options.clone(),
692 });
693 },
694 Err(ServiceError::NotFoundError) => {
695 tracing::debug!("Not found when sending a message");
696 return Err(MessageSenderError::NotFound {
697 service_id: recipient,
698 });
699 },
700 Err(e) => {
701 tracing::debug!(
702 "Default error handler for ws.send_messages: {}",
703 e
704 );
705 return Err(MessageSenderError::ServiceError(e));
706 },
707 }
708 }
709
710 Err(MessageSenderError::MaximumRetriesLimitExceeded)
711 }
712
713 #[tracing::instrument(
715 skip(self, unidentified_access, contacts, recipient),
716 fields(unidentified_access = unidentified_access.is_some(), recipient = recipient.service_id_string()),
717 )]
718 pub async fn send_contact_details<Contacts>(
719 &mut self,
720 recipient: &ServiceId,
721 unidentified_access: Option<UnidentifiedAccess>,
722 contacts: Contacts,
726 online: bool,
727 complete: bool,
728 ) -> Result<(), MessageSenderError>
729 where
730 Contacts: IntoIterator<Item = ContactDetails>,
731 {
732 let ptr = self.upload_contact_details(contacts).await?;
733
734 let msg = SyncMessage {
735 contacts: Some(sync_message::Contacts {
736 blob: Some(ptr),
737 complete: Some(complete),
738 }),
739 ..SyncMessage::with_padding(&mut rng())
740 };
741
742 self.send_sync_message(msg).await?;
743
744 Ok(())
745 }
746
747 #[tracing::instrument(skip(self), fields(recipient = recipient.service_id_string()))]
749 pub async fn send_message_request_response(
750 &mut self,
751 recipient: &ServiceId,
752 thread: &ThreadIdentifier,
753 action: message_request_response::Type,
754 ) -> Result<(), MessageSenderError> {
755 let message_request_response = Some(match thread {
756 ThreadIdentifier::Aci(aci) => {
757 tracing::debug!(
758 "sending message request response {:?} for recipient {:?}",
759 action,
760 aci
761 );
762 MessageRequestResponse {
763 thread_aci: Some(aci.to_string()),
764 group_id: None,
765 r#type: Some(action.into()),
766 }
767 },
768 ThreadIdentifier::Group(id) => {
769 tracing::debug!(
770 "sending message request response {:?} for group {:?}",
771 action,
772 id
773 );
774 MessageRequestResponse {
775 thread_aci: None,
776 group_id: Some(id.to_vec()),
777 r#type: Some(action.into()),
778 }
779 },
780 });
781
782 let msg = SyncMessage {
783 message_request_response,
784 ..SyncMessage::with_padding(&mut rng())
785 };
786
787 let ts = Utc::now().timestamp_millis() as u64;
788 self.send_message(recipient, None, msg, ts, false, false)
789 .await?;
790
791 Ok(())
792 }
793
794 pub async fn send_sync_message(
796 &mut self,
797 sync: SyncMessage,
798 ) -> Result<(), MessageSenderError> {
799 if self.is_multi_device().await {
800 let content = sync.into();
801 let timestamp = Utc::now().timestamp_millis() as u64;
802 debug!(
803 "sending multi-device sync message with content {content:?}"
804 );
805 self.try_send_message(
806 self.local_aci.into(),
807 None,
808 &content,
809 timestamp,
810 false,
811 false,
812 )
813 .await?;
814 }
815 Ok(())
816 }
817
818 #[tracing::instrument(skip(self))]
820 pub async fn send_sync_message_request(
821 &mut self,
822 recipient: &ServiceId,
823 request_type: sync_message::request::Type,
824 ) -> Result<(), MessageSenderError> {
825 if self.device_id == *DEFAULT_DEVICE_ID {
826 return Err(MessageSenderError::SendSyncMessageError(request_type));
827 }
828
829 let msg = SyncMessage {
830 request: Some(sync_message::Request {
831 r#type: Some(request_type.into()),
832 }),
833 ..SyncMessage::with_padding(&mut rng())
834 };
835 self.send_sync_message(msg).await?;
836
837 Ok(())
838 }
839
840 #[expect(clippy::result_large_err)]
841 #[tracing::instrument(level = "trace", skip(self))]
842 fn create_pni_signature(
843 &mut self,
844 ) -> Result<crate::proto::PniSignatureMessage, MessageSenderError> {
845 let mut rng = rng();
846 let signature = self
847 .pni_identity
848 .expect("PNI key set when PNI signature requested")
849 .sign_alternate_identity(
850 self.aci_identity.identity_key(),
851 &mut rng,
852 )?;
853 Ok(crate::proto::PniSignatureMessage {
854 pni: Some(self.local_pni.service_id_binary()),
855 signature: Some(signature.into()),
856 })
857 }
858
859 #[tracing::instrument(
861 level = "trace",
862 skip(self, unidentified_access, content),
863 fields(unidentified_access = unidentified_access.is_some(), recipient = recipient.service_id_string()),
864 )]
865 async fn create_encrypted_messages(
866 &mut self,
867 recipient: &ServiceId,
868 unidentified_access: Option<&SenderCertificate>,
869 content: &[u8],
870 ) -> Result<Option<EncryptedMessages>, MessageSenderError> {
871 let mut messages = vec![];
872
873 let mut devices: HashSet<DeviceId> = self
874 .protocol_store
875 .get_sub_device_sessions(recipient)
876 .await?
877 .into_iter()
878 .collect();
879
880 devices.insert(*DEFAULT_DEVICE_ID);
882
883 match recipient {
885 ServiceId::Aci(aci) => {
886 if *aci == self.local_aci {
887 devices.remove(&self.device_id);
888 }
889 },
890 ServiceId::Pni(pni) => {
891 if *pni == self.local_pni {
892 devices.remove(&self.device_id);
893 }
894 },
895 };
896
897 for device_id in devices {
898 trace!("sending message to device {}", device_id);
899 for _attempt in 0..2 {
903 match self
904 .create_encrypted_message(
905 recipient,
906 unidentified_access,
907 device_id,
908 content,
909 )
910 .await
911 {
912 Ok(message) => {
913 messages.push(message);
914 break;
915 },
916 Err(MessageSenderError::ServiceError(
917 ServiceError::SignalProtocolError(
918 SignalProtocolError::SessionNotFound(addr),
919 ),
920 )) => {
921 tracing::warn!("Potential session corruption for {}, deleting session", addr);
926 match self.protocol_store.delete_session(&addr).await {
927 Ok(()) => continue,
928 Err(error) => {
929 tracing::warn!(%error, %addr, "failed to delete session");
930 return Err(
931 SignalProtocolError::SessionNotFound(addr)
932 .into(),
933 );
934 },
935 }
936 },
937 Err(e) => return Err(e),
938 }
939 }
940 }
941
942 if messages.is_empty() {
943 Ok(None)
944 } else {
945 Ok(Some(EncryptedMessages {
946 messages,
947 used_identity_key: self
948 .protocol_store
949 .get_identity(
950 &recipient.to_protocol_address(*DEFAULT_DEVICE_ID),
951 )
952 .await?
953 .ok_or(MessageSenderError::UntrustedIdentity {
954 address: *recipient,
955 })?,
956 }))
957 }
958 }
959
960 #[tracing::instrument(
964 level = "trace",
965 skip(self, unidentified_access, content),
966 fields(unidentified_access = unidentified_access.is_some(), recipient = recipient.service_id_string()),
967 )]
968 pub(crate) async fn create_encrypted_message(
969 &mut self,
970 recipient: &ServiceId,
971 unidentified_access: Option<&SenderCertificate>,
972 device_id: DeviceId,
973 content: &[u8],
974 ) -> Result<OutgoingPushMessage, MessageSenderError> {
975 let recipient_protocol_address =
976 recipient.to_protocol_address(device_id);
977
978 tracing::trace!(
979 "encrypting message for {}",
980 recipient_protocol_address
981 );
982
983 if self
986 .protocol_store
987 .load_session(&recipient_protocol_address)
988 .await?
989 .is_none()
990 {
991 info!(
992 "establishing new session with {}",
993 recipient_protocol_address
994 );
995 let pre_keys = match self
996 .service
997 .get_pre_keys(recipient, device_id.into())
998 .await
999 {
1000 Ok(ok) => {
1001 tracing::trace!("Get prekeys OK");
1002 ok
1003 },
1004 Err(ServiceError::NotFoundError) => {
1005 return Err(MessageSenderError::NotFound {
1006 service_id: *recipient,
1007 });
1008 },
1009 Err(e) => Err(e)?,
1010 };
1011
1012 let mut rng = rng();
1013
1014 for pre_key_bundle in pre_keys {
1015 if recipient == &self.local_aci
1016 && self.device_id == pre_key_bundle.device_id()?
1017 {
1018 trace!("not establishing a session with myself!");
1019 continue;
1020 }
1021
1022 let pre_key_address = get_preferred_protocol_address(
1023 &self.protocol_store,
1024 recipient,
1025 pre_key_bundle.device_id()?,
1026 )
1027 .await?;
1028
1029 process_prekey_bundle(
1030 &pre_key_address,
1031 &mut self.protocol_store.clone(),
1032 &mut self.protocol_store,
1033 &pre_key_bundle,
1034 SystemTime::now(),
1035 &mut rng,
1036 )
1037 .await?;
1038 }
1039 }
1040
1041 let message = self
1042 .cipher
1043 .encrypt(
1044 &recipient_protocol_address,
1045 unidentified_access,
1046 content,
1047 &mut rng(),
1048 )
1049 .instrument(tracing::trace_span!("encrypting message"))
1050 .await?;
1051
1052 Ok(message)
1053 }
1054
1055 fn create_multi_device_sent_transcript_content<'a>(
1056 &mut self,
1057 recipient: Option<&ServiceId>,
1058 content_body: ContentBody,
1059 timestamp: u64,
1060 send_message_results: impl IntoIterator<Item = &'a SendMessageResult>,
1061 ) -> Option<ContentBody> {
1062 use sync_message::sent::UnidentifiedDeliveryStatus;
1063 let (message, edit_message) = match content_body {
1064 ContentBody::DataMessage(m) => (Some(m), None),
1065 ContentBody::EditMessage(m) => (None, Some(m)),
1066 _ => return None,
1067 };
1068 let unidentified_status: Vec<UnidentifiedDeliveryStatus> =
1069 send_message_results
1070 .into_iter()
1071 .filter_map(|result| result.as_ref().ok())
1072 .map(|sent| {
1073 let SentMessage {
1074 recipient,
1075 unidentified,
1076 used_identity_key,
1077 ..
1078 } = sent;
1079 UnidentifiedDeliveryStatus {
1080 destination_service_id: Some(
1081 recipient.service_id_string(),
1082 ),
1083 unidentified: Some(*unidentified),
1084 destination_pni_identity_key: Some(
1085 used_identity_key.serialize().into(),
1086 ),
1087 }
1088 })
1089 .collect();
1090 Some(ContentBody::SynchronizeMessage(SyncMessage {
1091 sent: Some(sync_message::Sent {
1092 destination_service_id: recipient
1093 .map(ServiceId::service_id_string),
1094 destination_e164: None,
1095 expiration_start_timestamp: message
1096 .as_ref()
1097 .and_then(|m| m.expire_timer)
1098 .map(|_| timestamp),
1099 message,
1100 edit_message,
1101 timestamp: Some(timestamp),
1102 unidentified_status,
1103 ..Default::default()
1104 }),
1105 ..SyncMessage::with_padding(&mut rng())
1106 }))
1107 }
1108}