1use std::{collections::HashSet, time::SystemTime};
2
3use chrono::prelude::*;
4use libsignal_core::{curve::CurveError, InvalidDeviceId};
5use libsignal_protocol::{
6 process_prekey_bundle, Aci, DeviceId, IdentityKey, IdentityKeyPair, Pni,
7 ProtocolStore, SenderCertificate, SenderKeyStore, ServiceId,
8 SignalProtocolError,
9};
10use rand::{rng, CryptoRng, Rng};
11use tracing::{debug, error, info, trace, warn};
12use tracing_futures::Instrument;
13use uuid::Uuid;
14use zkgroup::GROUP_IDENTIFIER_LEN;
15
16use crate::{
17 cipher::{get_preferred_protocol_address, ServiceCipher},
18 content::ContentBody,
19 proto::{
20 attachment_pointer::{
21 AttachmentIdentifier, Flags as AttachmentPointerFlags,
22 },
23 sync_message::{
24 self, message_request_response, MessageRequestResponse,
25 },
26 AttachmentPointer, SyncMessage,
27 },
28 push_service::*,
29 service_address::ServiceIdExt,
30 session_store::SessionStoreExt,
31 unidentified_access::UnidentifiedAccess,
32 utils::{serde_device_id, serde_service_id},
33 websocket::{self, SignalWebSocket},
34};
35
36pub use crate::proto::ContactDetails;
37
38#[derive(serde::Serialize, Debug)]
39#[serde(rename_all = "camelCase")]
40pub struct OutgoingPushMessage {
41 pub r#type: u32,
42 #[serde(with = "serde_device_id")]
43 pub destination_device_id: DeviceId,
44 pub destination_registration_id: u32,
45 pub content: String,
46}
47
48#[derive(serde::Serialize, Debug)]
49pub struct OutgoingPushMessages {
50 #[serde(with = "serde_service_id")]
51 pub destination: ServiceId,
52 pub timestamp: u64,
53 pub messages: Vec<OutgoingPushMessage>,
54 pub online: bool,
55}
56
57#[derive(serde::Deserialize, Debug)]
58#[serde(rename_all = "camelCase")]
59pub struct SendMessageResponse {
60 pub needs_sync: bool,
61}
62
63pub type SendMessageResult = Result<SentMessage, MessageSenderError>;
64
65#[derive(Debug, Clone)]
66pub struct SentMessage {
67 pub recipient: ServiceId,
68 pub used_identity_key: IdentityKey,
69 pub unidentified: bool,
70 pub needs_sync: bool,
71}
72
73#[derive(Debug, Default)]
77pub struct AttachmentSpec {
78 pub content_type: String,
79 pub length: usize,
80 pub file_name: Option<String>,
81 pub preview: Option<Vec<u8>>,
82 pub voice_note: Option<bool>,
83 pub borderless: Option<bool>,
84 pub width: Option<u32>,
85 pub height: Option<u32>,
86 pub caption: Option<String>,
87 pub blur_hash: Option<String>,
88}
89
90#[derive(Clone)]
91pub struct MessageSender<S> {
92 identified_ws: SignalWebSocket<websocket::Identified>,
93 unidentified_ws: SignalWebSocket<websocket::Unidentified>,
94 service: PushService,
95 cipher: ServiceCipher<S>,
96 protocol_store: S,
97 local_aci: Aci,
98 local_pni: Pni,
99 aci_identity: IdentityKeyPair,
100 pni_identity: Option<IdentityKeyPair>,
101 device_id: DeviceId,
102}
103
104#[derive(thiserror::Error, Debug)]
105pub enum AttachmentUploadError {
106 #[error("{0}")]
107 ServiceError(#[from] ServiceError),
108
109 #[error("Could not read attachment contents")]
110 IoError(#[from] std::io::Error),
111}
112
113#[derive(thiserror::Error, Debug)]
114pub enum MessageSenderError {
115 #[error("service error: {0}")]
116 ServiceError(#[from] ServiceError),
117
118 #[error("protocol error: {0}")]
119 ProtocolError(#[from] SignalProtocolError),
120
121 #[error("invalid private key: {0}")]
122 InvalidPrivateKey(#[from] CurveError),
123
124 #[error("invalid device ID: {0}")]
125 InvalidDeviceId(#[from] InvalidDeviceId),
126
127 #[error("Failed to upload attachment {0}")]
128 AttachmentUploadError(#[from] AttachmentUploadError),
129
130 #[error("primary device can't send sync message {0:?}")]
131 SendSyncMessageError(sync_message::request::Type),
132
133 #[error("Untrusted identity key with {address:?}")]
134 UntrustedIdentity { address: ServiceId },
135
136 #[error("Exceeded maximum number of retries")]
137 MaximumRetriesLimitExceeded,
138
139 #[error("Proof of type {options:?} required using token {token}")]
140 ProofRequired { token: String, options: Vec<String> },
141
142 #[error("Recipient not found: {service_id:?}")]
143 NotFound { service_id: ServiceId },
144
145 #[error("no messages were encrypted: this should not really happen and most likely implies a logic error")]
146 NoMessagesToSend,
147}
148
149pub type GroupV2Id = [u8; GROUP_IDENTIFIER_LEN];
150
151#[derive(Debug)]
152pub enum ThreadIdentifier {
153 Aci(Uuid),
154 Group(GroupV2Id),
155}
156
157#[derive(Debug)]
158pub struct EncryptedMessages {
159 messages: Vec<OutgoingPushMessage>,
160 used_identity_key: IdentityKey,
161}
162
163impl<S> MessageSender<S>
164where
165 S: ProtocolStore + SenderKeyStore + SessionStoreExt + Sync + Clone,
166{
167 #[allow(clippy::too_many_arguments)]
168 pub fn new(
169 identified_ws: SignalWebSocket<websocket::Identified>,
170 unidentified_ws: SignalWebSocket<websocket::Unidentified>,
171 service: PushService,
172 cipher: ServiceCipher<S>,
173 protocol_store: S,
174 local_aci: impl Into<Aci>,
175 local_pni: impl Into<Pni>,
176 aci_identity: IdentityKeyPair,
177 pni_identity: Option<IdentityKeyPair>,
178 device_id: DeviceId,
179 ) -> Self {
180 MessageSender {
181 service,
182 identified_ws,
183 unidentified_ws,
184 cipher,
185 protocol_store,
186 local_aci: local_aci.into(),
187 local_pni: local_pni.into(),
188 aci_identity,
189 pni_identity,
190 device_id,
191 }
192 }
193
194 #[tracing::instrument(skip(self, contents, csprng), fields(size = contents.len()))]
198 pub async fn upload_attachment<R: Rng + CryptoRng>(
199 &mut self,
200 spec: AttachmentSpec,
201 mut contents: Vec<u8>,
202 csprng: &mut R,
203 ) -> Result<AttachmentPointer, AttachmentUploadError> {
204 let len = contents.len();
205 let (key, iv) = {
207 let mut key = [0u8; 64];
208 let mut iv = [0u8; 16];
209 csprng.fill_bytes(&mut key);
210 csprng.fill_bytes(&mut iv);
211 (key, iv)
212 };
213
214 let padded_len: usize = {
218 std::cmp::max(
221 541,
222 1.05f64.powf((len as f64).log(1.05).ceil()).floor() as usize,
223 )
224 };
225 if padded_len < len {
226 error!(
227 "Padded len {} < len {}. Continuing with a privacy risk.",
228 padded_len, len
229 );
230 } else {
231 contents.resize(padded_len, 0);
232 }
233
234 tracing::trace_span!("encrypting attachment").in_scope(|| {
235 crate::attachment_cipher::encrypt_in_place(iv, key, &mut contents)
236 });
237
238 let attachment_upload_form = self
242 .service
243 .get_attachment_v4_upload_attributes()
244 .instrument(tracing::trace_span!("requesting upload attributes"))
245 .await?;
246
247 let resumable_upload_url = self
248 .service
249 .get_attachment_resumable_upload_url(&attachment_upload_form)
250 .await?;
251
252 let attachment_digest = self
253 .service
254 .upload_attachment_v4(
255 attachment_upload_form.cdn,
256 &resumable_upload_url,
257 contents.len() as u64,
258 attachment_upload_form.headers,
259 &mut std::io::Cursor::new(&contents),
260 )
261 .await?;
262
263 Ok(AttachmentPointer {
264 content_type: Some(spec.content_type),
265 key: Some(key.to_vec()),
266 size: Some(len as u32),
267 digest: Some(attachment_digest.digest),
269 file_name: spec.file_name,
270 flags: Some(
271 if spec.voice_note == Some(true) {
272 AttachmentPointerFlags::VoiceMessage as u32
273 } else {
274 0
275 } | if spec.borderless == Some(true) {
276 AttachmentPointerFlags::Borderless as u32
277 } else {
278 0
279 },
280 ),
281 width: spec.width,
282 height: spec.height,
283 caption: spec.caption,
284 blur_hash: spec.blur_hash,
285 upload_timestamp: Some(
286 SystemTime::now()
287 .duration_since(SystemTime::UNIX_EPOCH)
288 .expect("unix epoch in the past")
289 .as_millis() as u64,
290 ),
291 cdn_number: Some(attachment_upload_form.cdn),
292 attachment_identifier: Some(AttachmentIdentifier::CdnKey(
293 attachment_upload_form.key,
294 )),
295 ..Default::default()
296 })
297 }
298
299 #[tracing::instrument(skip(self, contacts))]
303 async fn upload_contact_details<Contacts>(
304 &mut self,
305 contacts: Contacts,
306 ) -> Result<AttachmentPointer, AttachmentUploadError>
307 where
308 Contacts: IntoIterator<Item = ContactDetails>,
309 {
310 use prost::Message;
311 let mut out = Vec::new();
312 for contact in contacts {
313 contact
314 .encode_length_delimited(&mut out)
315 .expect("infallible encoding");
316 }
318
319 let spec = AttachmentSpec {
320 content_type: "application/octet-stream".into(),
321 length: out.len(),
322 file_name: None,
323 preview: None,
324 voice_note: None,
325 borderless: None,
326 width: None,
327 height: None,
328 caption: None,
329 blur_hash: None,
330 };
331 self.upload_attachment(spec, out, &mut rng()).await
332 }
333
334 async fn is_multi_device(&self) -> bool {
339 if self.device_id == *DEFAULT_DEVICE_ID {
340 self.protocol_store
341 .get_sub_device_sessions(&self.local_aci.into())
342 .await
343 .is_ok_and(|s| !s.is_empty())
344 } else {
345 true
346 }
347 }
348
349 #[tracing::instrument(
351 skip(self, unidentified_access, message),
352 fields(unidentified_access = unidentified_access.is_some(), recipient = recipient.service_id_string()),
353 )]
354 pub async fn send_message(
355 &mut self,
356 recipient: &ServiceId,
357 mut unidentified_access: Option<UnidentifiedAccess>,
358 message: impl Into<ContentBody>,
359 timestamp: u64,
360 include_pni_signature: bool,
361 online: bool,
362 ) -> SendMessageResult {
363 let content_body = message.into();
364 let message_to_self = recipient == &self.local_aci;
365 let sync_message =
366 matches!(content_body, ContentBody::SynchronizeMessage(..));
367 let is_multi_device = self.is_multi_device().await;
368
369 use crate::proto::data_message::Flags;
370
371 let end_session = match &content_body {
372 ContentBody::DataMessage(message) => {
373 message.flags == Some(Flags::EndSession as u32)
374 },
375 _ => false,
376 };
377
378 if message_to_self && is_multi_device && !sync_message {
380 debug!("sending note to self");
381 if let Some(sync_message) = self
382 .create_multi_device_sent_transcript_content(
383 Some(recipient),
384 content_body,
385 timestamp,
386 None,
387 )
388 {
389 return self
390 .try_send_message(
391 *recipient,
392 None,
393 &sync_message,
394 timestamp,
395 include_pni_signature,
396 online,
397 )
398 .await;
399 } else {
400 error!("could not create sync message from message to self");
401 return SendMessageResult::Err(
402 MessageSenderError::NoMessagesToSend,
403 );
404 }
405 }
406
407 if end_session || sync_message {
410 unidentified_access.take();
411 }
412
413 let result = self
415 .try_send_message(
416 *recipient,
417 unidentified_access.as_ref(),
418 &content_body,
419 timestamp,
420 include_pni_signature,
421 online,
422 )
423 .await;
424
425 let needs_sync = match &result {
426 Ok(SentMessage { needs_sync, .. }) => *needs_sync,
427 _ => false,
428 };
429
430 if needs_sync || is_multi_device {
431 let sync_body = if sync_message {
432 Some(content_body)
433 } else {
434 self.create_multi_device_sent_transcript_content(
437 Some(recipient),
438 content_body,
439 timestamp,
440 Some(&result),
441 )
442 };
443 if let Some(body) = sync_body {
444 debug!("sending multi-device sync message");
445 self.try_send_message(
446 self.local_aci.into(),
447 None,
448 &body,
449 timestamp,
450 false,
451 false,
452 )
453 .await?;
454 }
455 }
456
457 if end_session {
458 let n = self.protocol_store.delete_all_sessions(recipient).await?;
459 tracing::debug!(
460 "ended {} sessions with {}",
461 n,
462 recipient.raw_uuid()
463 );
464 }
465
466 result
467 }
468
469 #[tracing::instrument(
476 skip(self, recipients, message),
477 fields(recipients = recipients.as_ref().len()),
478 )]
479 pub async fn send_message_to_group(
480 &mut self,
481 recipients: impl AsRef<[(ServiceId, Option<UnidentifiedAccess>, bool)]>,
482 message: impl Into<ContentBody>,
483 timestamp: u64,
484 online: bool,
485 ) -> Vec<SendMessageResult> {
486 let content_body: ContentBody = message.into();
487 let mut results = vec![];
488
489 let mut needs_sync_in_results = false;
490
491 for (recipient, unidentified_access, include_pni_signature) in
492 recipients.as_ref()
493 {
494 let result = self
495 .try_send_message(
496 *recipient,
497 unidentified_access.as_ref(),
498 &content_body,
499 timestamp,
500 *include_pni_signature,
501 online,
502 )
503 .await;
504
505 match result {
506 Ok(SentMessage { needs_sync, .. }) if needs_sync => {
507 needs_sync_in_results = true;
508 },
509 _ => (),
510 };
511
512 results.push(result);
513 }
514
515 if needs_sync_in_results || self.is_multi_device().await {
517 if let Some(sync_message) = self
518 .create_multi_device_sent_transcript_content(
519 None,
520 content_body.clone(),
521 timestamp,
522 &results,
523 )
524 {
525 if let Err(error) = self
528 .try_send_message(
529 self.local_aci.into(),
530 None,
531 &sync_message,
532 timestamp,
533 false, false,
535 )
536 .await
537 {
538 error!(%error, "failed to send a synchronization message");
539 }
540 } else {
541 error!("could not create sync message from a group message")
542 }
543 }
544
545 results
546 }
547
548 #[tracing::instrument(
550 level = "trace",
551 skip(self, unidentified_access, content_body, recipient),
552 fields(unidentified_access = unidentified_access.is_some(), recipient = recipient.service_id_string()),
553 )]
554 async fn try_send_message(
555 &mut self,
556 recipient: ServiceId,
557 mut unidentified_access: Option<&UnidentifiedAccess>,
558 content_body: &ContentBody,
559 timestamp: u64,
560 include_pni_signature: bool,
561 online: bool,
562 ) -> SendMessageResult {
563 trace!("trying to send a message");
564
565 use prost::Message;
566
567 let mut content = content_body.clone().into_proto();
568 if include_pni_signature {
569 content.pni_signature_message = Some(self.create_pni_signature()?);
570 }
571
572 let content_bytes = content.encode_to_vec();
573
574 let mut rng = rng();
575
576 for _ in 0..4u8 {
577 let Some(EncryptedMessages {
578 messages,
579 used_identity_key,
580 }) = self
581 .create_encrypted_messages(
582 &recipient,
583 unidentified_access.map(|x| &x.certificate),
584 &content_bytes,
585 )
586 .await?
587 else {
588 return Err(MessageSenderError::NoMessagesToSend);
592 };
593
594 let messages = OutgoingPushMessages {
595 destination: recipient,
596 timestamp,
597 messages,
598 online,
599 };
600
601 let send = if let Some(unidentified) = &unidentified_access {
602 tracing::debug!("sending via unidentified");
603 self.unidentified_ws
604 .send_messages_unidentified(messages, unidentified)
605 .await
606 } else {
607 tracing::debug!("sending identified");
608 self.identified_ws.send_messages(messages).await
609 };
610
611 match send {
612 Ok(SendMessageResponse { needs_sync }) => {
613 tracing::debug!("message sent!");
614 return Ok(SentMessage {
615 recipient,
616 used_identity_key,
617 unidentified: unidentified_access.is_some(),
618 needs_sync,
619 });
620 },
621 Err(ServiceError::Unauthorized)
622 if unidentified_access.is_some() =>
623 {
624 tracing::trace!("unauthorized error using unidentified; retry over identified");
625 unidentified_access = None;
626 },
627 Err(ServiceError::MismatchedDevicesException(ref m)) => {
628 tracing::debug!("{:?}", m);
629 for extra_device_id in &m.extra_devices {
630 tracing::debug!(
631 "dropping session with device {}",
632 extra_device_id
633 );
634 self.protocol_store
635 .delete_service_addr_device_session(
636 &recipient
637 .to_protocol_address(*extra_device_id)?,
638 )
639 .await?;
640 }
641
642 for missing_device_id in &m.missing_devices {
643 tracing::debug!(
644 "creating session with missing device {}",
645 missing_device_id
646 );
647 let remote_address = recipient
648 .to_protocol_address(*missing_device_id)?;
649 let pre_key = self
650 .identified_ws
651 .get_pre_key(&recipient, *missing_device_id)
652 .await?;
653
654 process_prekey_bundle(
655 &remote_address,
656 &mut self.protocol_store.clone(),
657 &mut self.protocol_store,
658 &pre_key,
659 SystemTime::now(),
660 &mut rng,
661 )
662 .await
663 .map_err(|e| {
664 error!("failed to create session: {}", e);
665 MessageSenderError::UntrustedIdentity {
666 address: recipient,
667 }
668 })?;
669 }
670 },
671 Err(ServiceError::StaleDevices(ref m)) => {
672 tracing::debug!("{:?}", m);
673 for extra_device_id in &m.stale_devices {
674 tracing::debug!(
675 "dropping session with device {}",
676 extra_device_id
677 );
678 self.protocol_store
679 .delete_service_addr_device_session(
680 &recipient
681 .to_protocol_address(*extra_device_id)?,
682 )
683 .await?;
684 }
685 },
686 Err(ServiceError::ProofRequiredError(ref p)) => {
687 tracing::debug!("{:?}", p);
688 return Err(MessageSenderError::ProofRequired {
689 token: p.token.clone(),
690 options: p.options.clone(),
691 });
692 },
693 Err(ServiceError::NotFoundError) => {
694 tracing::debug!("Not found when sending a message");
695 return Err(MessageSenderError::NotFound {
696 service_id: recipient,
697 });
698 },
699 Err(e) => {
700 tracing::debug!(
701 "Default error handler for ws.send_messages: {}",
702 e
703 );
704 return Err(MessageSenderError::ServiceError(e));
705 },
706 }
707 }
708
709 Err(MessageSenderError::MaximumRetriesLimitExceeded)
710 }
711
712 #[tracing::instrument(
714 skip(self, unidentified_access, contacts, recipient),
715 fields(unidentified_access = unidentified_access.is_some(), recipient = recipient.service_id_string()),
716 )]
717 pub async fn send_contact_details<Contacts>(
718 &mut self,
719 recipient: &ServiceId,
720 unidentified_access: Option<UnidentifiedAccess>,
721 contacts: Contacts,
725 online: bool,
726 complete: bool,
727 ) -> Result<(), MessageSenderError>
728 where
729 Contacts: IntoIterator<Item = ContactDetails>,
730 {
731 let ptr = self.upload_contact_details(contacts).await?;
732
733 let msg = SyncMessage {
734 contacts: Some(sync_message::Contacts {
735 blob: Some(ptr),
736 complete: Some(complete),
737 }),
738 ..SyncMessage::with_padding(&mut rng())
739 };
740
741 self.send_sync_message(msg).await?;
742
743 Ok(())
744 }
745
746 #[tracing::instrument(skip(self), fields(recipient = recipient.service_id_string()))]
748 pub async fn send_message_request_response(
749 &mut self,
750 recipient: &ServiceId,
751 thread: &ThreadIdentifier,
752 action: message_request_response::Type,
753 ) -> Result<(), MessageSenderError> {
754 let message_request_response = Some(match thread {
755 ThreadIdentifier::Aci(aci) => {
756 tracing::debug!(
757 "sending message request response {:?} for recipient {:?}",
758 action,
759 aci
760 );
761 MessageRequestResponse {
762 thread_aci: Some(aci.to_string()),
763 thread_aci_binary: Some(aci.into_bytes().to_vec()),
764 group_id: None,
765 r#type: Some(action.into()),
766 }
767 },
768 ThreadIdentifier::Group(id) => {
769 tracing::debug!(
770 "sending message request response {:?} for group {:?}",
771 action,
772 id
773 );
774 MessageRequestResponse {
775 thread_aci: None,
776 thread_aci_binary: None,
777 group_id: Some(id.to_vec()),
778 r#type: Some(action.into()),
779 }
780 },
781 });
782
783 let msg = SyncMessage {
784 message_request_response,
785 ..SyncMessage::with_padding(&mut rng())
786 };
787
788 let ts = Utc::now().timestamp_millis() as u64;
789 self.send_message(recipient, None, msg, ts, false, false)
790 .await?;
791
792 Ok(())
793 }
794
795 pub async fn send_sync_message(
797 &mut self,
798 sync: SyncMessage,
799 ) -> Result<(), MessageSenderError> {
800 if self.is_multi_device().await {
801 let content = sync.into();
802 let timestamp = Utc::now().timestamp_millis() as u64;
803 debug!(
804 "sending multi-device sync message with content {content:?}"
805 );
806 self.try_send_message(
807 self.local_aci.into(),
808 None,
809 &content,
810 timestamp,
811 false,
812 false,
813 )
814 .await?;
815 }
816 Ok(())
817 }
818
819 #[tracing::instrument(skip(self))]
821 pub async fn send_sync_message_request(
822 &mut self,
823 recipient: &ServiceId,
824 request_type: sync_message::request::Type,
825 ) -> Result<(), MessageSenderError> {
826 if self.device_id == *DEFAULT_DEVICE_ID {
827 return Err(MessageSenderError::SendSyncMessageError(request_type));
828 }
829
830 let msg = SyncMessage {
831 request: Some(sync_message::Request {
832 r#type: Some(request_type.into()),
833 }),
834 ..SyncMessage::with_padding(&mut rng())
835 };
836 self.send_sync_message(msg).await?;
837
838 Ok(())
839 }
840
841 #[tracing::instrument(level = "trace", skip(self))]
842 fn create_pni_signature(
843 &mut self,
844 ) -> Result<crate::proto::PniSignatureMessage, MessageSenderError> {
845 let mut rng = rng();
846 let signature = self
847 .pni_identity
848 .expect("PNI key set when PNI signature requested")
849 .sign_alternate_identity(
850 self.aci_identity.identity_key(),
851 &mut rng,
852 )?;
853 Ok(crate::proto::PniSignatureMessage {
854 pni: Some(self.local_pni.service_id_binary()),
855 signature: Some(signature.into()),
856 })
857 }
858
859 #[tracing::instrument(
861 level = "trace",
862 skip(self, unidentified_access, content),
863 fields(unidentified_access = unidentified_access.is_some(), recipient = recipient.service_id_string()),
864 )]
865 async fn create_encrypted_messages(
866 &mut self,
867 recipient: &ServiceId,
868 unidentified_access: Option<&SenderCertificate>,
869 content: &[u8],
870 ) -> Result<Option<EncryptedMessages>, MessageSenderError> {
871 let mut messages = vec![];
872
873 let mut devices: HashSet<DeviceId> = self
874 .protocol_store
875 .get_sub_device_sessions(recipient)
876 .await?
877 .into_iter()
878 .collect();
879
880 devices.insert(*DEFAULT_DEVICE_ID);
882
883 match recipient {
885 ServiceId::Aci(aci) => {
886 if *aci == self.local_aci {
887 devices.remove(&self.device_id);
888 }
889 },
890 ServiceId::Pni(pni) => {
891 if *pni == self.local_pni {
892 devices.remove(&self.device_id);
893 }
894 },
895 };
896
897 for device_id in devices {
898 trace!("sending message to device {}", device_id);
899 for _attempt in 0..2 {
903 match self
904 .create_encrypted_message(
905 recipient,
906 unidentified_access,
907 device_id,
908 content,
909 )
910 .await
911 {
912 Ok(message) => {
913 messages.push(message);
914 break;
915 },
916 Err(MessageSenderError::ServiceError(
917 ServiceError::SignalProtocolError(
918 SignalProtocolError::SessionNotFound(addr),
919 ),
920 )) => {
921 tracing::warn!("Potential session corruption for {}, deleting session", addr);
926 match self.protocol_store.delete_session(&addr).await {
927 Ok(()) => continue,
928 Err(error) => {
929 tracing::warn!(%error, %addr, "failed to delete session");
930 return Err(
931 SignalProtocolError::SessionNotFound(addr)
932 .into(),
933 );
934 },
935 }
936 },
937 Err(e) => return Err(e),
938 }
939 }
940 }
941
942 if messages.is_empty() {
943 Ok(None)
944 } else {
945 Ok(Some(EncryptedMessages {
946 messages,
947 used_identity_key: self
948 .protocol_store
949 .get_identity(
950 &recipient.to_protocol_address(*DEFAULT_DEVICE_ID),
951 )
952 .await?
953 .ok_or(MessageSenderError::UntrustedIdentity {
954 address: *recipient,
955 })?,
956 }))
957 }
958 }
959
960 #[tracing::instrument(
964 level = "trace",
965 skip(self, unidentified_access, content),
966 fields(unidentified_access = unidentified_access.is_some(), recipient = recipient.service_id_string()),
967 )]
968 pub(crate) async fn create_encrypted_message(
969 &mut self,
970 recipient: &ServiceId,
971 unidentified_access: Option<&SenderCertificate>,
972 device_id: DeviceId,
973 content: &[u8],
974 ) -> Result<OutgoingPushMessage, MessageSenderError> {
975 let recipient_protocol_address =
976 recipient.to_protocol_address(device_id);
977
978 tracing::trace!(
979 "encrypting message for {}",
980 recipient_protocol_address
981 );
982
983 if self
986 .protocol_store
987 .load_session(&recipient_protocol_address)
988 .await?
989 .is_none()
990 {
991 info!(
992 "establishing new session with {}",
993 recipient_protocol_address
994 );
995 let pre_keys = match self
996 .identified_ws
997 .get_pre_keys(recipient, device_id)
998 .await
999 {
1000 Ok(ok) => {
1001 tracing::trace!("Get prekeys OK");
1002 ok
1003 },
1004 Err(ServiceError::NotFoundError) => {
1005 return Err(MessageSenderError::NotFound {
1006 service_id: *recipient,
1007 });
1008 },
1009 Err(e) => Err(e)?,
1010 };
1011
1012 let mut rng = rng();
1013
1014 for pre_key_bundle in pre_keys {
1015 if recipient == &self.local_aci
1016 && self.device_id == pre_key_bundle.device_id()?
1017 {
1018 trace!("not establishing a session with myself!");
1019 continue;
1020 }
1021
1022 let pre_key_address = get_preferred_protocol_address(
1023 &self.protocol_store,
1024 recipient,
1025 pre_key_bundle.device_id()?,
1026 )
1027 .await?;
1028
1029 process_prekey_bundle(
1030 &pre_key_address,
1031 &mut self.protocol_store.clone(),
1032 &mut self.protocol_store,
1033 &pre_key_bundle,
1034 SystemTime::now(),
1035 &mut rng,
1036 )
1037 .await?;
1038 }
1039 }
1040
1041 let message = self
1042 .cipher
1043 .encrypt(
1044 &recipient_protocol_address,
1045 unidentified_access,
1046 content,
1047 &mut rng(),
1048 )
1049 .instrument(tracing::trace_span!("encrypting message"))
1050 .await?;
1051
1052 Ok(message)
1053 }
1054
1055 fn create_multi_device_sent_transcript_content<'a>(
1056 &mut self,
1057 recipient: Option<&ServiceId>,
1058 content_body: ContentBody,
1059 timestamp: u64,
1060 send_message_results: impl IntoIterator<Item = &'a SendMessageResult>,
1061 ) -> Option<ContentBody> {
1062 use sync_message::sent::UnidentifiedDeliveryStatus;
1063 let (message, edit_message) = match content_body {
1064 ContentBody::DataMessage(m) => (Some(m), None),
1065 ContentBody::EditMessage(m) => (None, Some(m)),
1066 content_body => {
1067 tracing::trace!(?content_body, "not syncing to self");
1068 return None;
1069 },
1070 };
1071 let unidentified_status: Vec<UnidentifiedDeliveryStatus> =
1072 send_message_results
1073 .into_iter()
1074 .filter_map(|result| result.as_ref().ok())
1075 .map(|sent| {
1076 let SentMessage {
1077 recipient,
1078 unidentified,
1079 used_identity_key,
1080 ..
1081 } = sent;
1082 UnidentifiedDeliveryStatus {
1083 destination_service_id: Some(
1084 recipient.service_id_string(),
1085 ),
1086 destination_service_id_binary: Some(
1087 recipient.service_id_binary(),
1088 ),
1089 unidentified: Some(*unidentified),
1090 destination_pni_identity_key: Some(
1091 used_identity_key.serialize().into(),
1092 ),
1093 }
1094 })
1095 .collect();
1096 Some(ContentBody::SynchronizeMessage(SyncMessage {
1097 sent: Some(sync_message::Sent {
1098 destination_service_id: recipient
1099 .map(ServiceId::service_id_string),
1100 destination_service_id_binary: recipient
1101 .map(ServiceId::service_id_binary),
1102 destination_e164: None,
1103 expiration_start_timestamp: message
1104 .as_ref()
1105 .and_then(|m| m.expire_timer)
1106 .map(|_| timestamp),
1107 message,
1108 edit_message,
1109 timestamp: Some(timestamp),
1110 unidentified_status,
1111 ..Default::default()
1112 }),
1113 ..SyncMessage::with_padding(&mut rng())
1114 }))
1115 }
1116}