1use std::{collections::HashSet, time::SystemTime};
2
3use chrono::prelude::*;
4use libsignal_core::{curve::CurveError, InvalidDeviceId};
5use libsignal_protocol::{
6 process_prekey_bundle, Aci, DeviceId, IdentityKey, IdentityKeyPair, Pni,
7 ProtocolStore, SenderCertificate, SenderKeyStore, ServiceId,
8 SignalProtocolError,
9};
10use rand::{rng, CryptoRng, Rng};
11use tracing::{debug, error, info, trace, warn};
12use tracing_futures::Instrument;
13use uuid::Uuid;
14use zkgroup::GROUP_IDENTIFIER_LEN;
15
16use crate::{
17 cipher::{get_preferred_protocol_address, ServiceCipher},
18 content::ContentBody,
19 proto::{
20 attachment_pointer::{
21 AttachmentIdentifier, Flags as AttachmentPointerFlags,
22 },
23 sync_message::{
24 self, message_request_response, MessageRequestResponse,
25 },
26 AttachmentPointer, SyncMessage,
27 },
28 push_service::*,
29 service_address::ServiceIdExt,
30 session_store::SessionStoreExt,
31 unidentified_access::UnidentifiedAccess,
32 utils::{serde_device_id, serde_service_id},
33 websocket::{self, SignalWebSocket},
34};
35
36pub use crate::proto::ContactDetails;
37
38#[derive(serde::Serialize, Debug)]
39#[serde(rename_all = "camelCase")]
40pub struct OutgoingPushMessage {
41 pub r#type: u32,
42 #[serde(with = "serde_device_id")]
43 pub destination_device_id: DeviceId,
44 pub destination_registration_id: u32,
45 pub content: String,
46}
47
48#[derive(serde::Serialize, Debug)]
49pub struct OutgoingPushMessages {
50 #[serde(with = "serde_service_id")]
51 pub destination: ServiceId,
52 pub timestamp: u64,
53 pub messages: Vec<OutgoingPushMessage>,
54 pub online: bool,
55}
56
57#[derive(serde::Deserialize, Debug)]
58#[serde(rename_all = "camelCase")]
59pub struct SendMessageResponse {
60 pub needs_sync: bool,
61}
62
63pub type SendMessageResult = Result<SentMessage, MessageSenderError>;
64
65#[derive(Debug, Clone)]
66pub struct SentMessage {
67 pub recipient: ServiceId,
68 pub used_identity_key: IdentityKey,
69 pub unidentified: bool,
70 pub needs_sync: bool,
71}
72
73#[derive(Debug, Default)]
77pub struct AttachmentSpec {
78 pub content_type: String,
79 pub length: usize,
80 pub file_name: Option<String>,
81 pub preview: Option<Vec<u8>>,
82 pub voice_note: Option<bool>,
83 pub borderless: Option<bool>,
84 pub width: Option<u32>,
85 pub height: Option<u32>,
86 pub caption: Option<String>,
87 pub blur_hash: Option<String>,
88}
89
90#[derive(Clone)]
91pub struct MessageSender<S> {
92 identified_ws: SignalWebSocket<websocket::Identified>,
93 unidentified_ws: SignalWebSocket<websocket::Unidentified>,
94 service: PushService,
95 cipher: ServiceCipher<S>,
96 protocol_store: S,
97 local_aci: Aci,
98 local_pni: Pni,
99 aci_identity: IdentityKeyPair,
100 pni_identity: Option<IdentityKeyPair>,
101 device_id: DeviceId,
102}
103
104#[derive(thiserror::Error, Debug)]
105pub enum AttachmentUploadError {
106 #[error("{0}")]
107 ServiceError(#[from] ServiceError),
108
109 #[error("Could not read attachment contents")]
110 IoError(#[from] std::io::Error),
111}
112
113#[derive(thiserror::Error, Debug)]
114pub enum MessageSenderError {
115 #[error("service error: {0}")]
116 ServiceError(#[from] ServiceError),
117
118 #[error("protocol error: {0}")]
119 ProtocolError(#[from] SignalProtocolError),
120
121 #[error("invalid private key: {0}")]
122 InvalidPrivateKey(#[from] CurveError),
123
124 #[error("invalid device ID: {0}")]
125 InvalidDeviceId(#[from] InvalidDeviceId),
126
127 #[error("Failed to upload attachment {0}")]
128 AttachmentUploadError(#[from] AttachmentUploadError),
129
130 #[error("primary device can't send sync message {0:?}")]
131 SendSyncMessageError(sync_message::request::Type),
132
133 #[error("Untrusted identity key with {address:?}")]
134 UntrustedIdentity { address: ServiceId },
135
136 #[error("Exceeded maximum number of retries")]
137 MaximumRetriesLimitExceeded,
138
139 #[error("Proof of type {options:?} required using token {token}")]
140 ProofRequired { token: String, options: Vec<String> },
141
142 #[error("Recipient not found: {service_id:?}")]
143 NotFound { service_id: ServiceId },
144
145 #[error("no messages were encrypted: this should not really happen and most likely implies a logic error")]
146 NoMessagesToSend,
147}
148
149pub type GroupV2Id = [u8; GROUP_IDENTIFIER_LEN];
150
151#[derive(Debug)]
152pub enum ThreadIdentifier {
153 Aci(Uuid),
154 Group(GroupV2Id),
155}
156
157#[derive(Debug)]
158pub struct EncryptedMessages {
159 messages: Vec<OutgoingPushMessage>,
160 used_identity_key: IdentityKey,
161}
162
163impl<S> MessageSender<S>
164where
165 S: ProtocolStore + SenderKeyStore + SessionStoreExt + Sync + Clone,
166{
167 #[allow(clippy::too_many_arguments)]
168 pub fn new(
169 identified_ws: SignalWebSocket<websocket::Identified>,
170 unidentified_ws: SignalWebSocket<websocket::Unidentified>,
171 service: PushService,
172 cipher: ServiceCipher<S>,
173 protocol_store: S,
174 local_aci: impl Into<Aci>,
175 local_pni: impl Into<Pni>,
176 aci_identity: IdentityKeyPair,
177 pni_identity: Option<IdentityKeyPair>,
178 device_id: DeviceId,
179 ) -> Self {
180 MessageSender {
181 service,
182 identified_ws,
183 unidentified_ws,
184 cipher,
185 protocol_store,
186 local_aci: local_aci.into(),
187 local_pni: local_pni.into(),
188 aci_identity,
189 pni_identity,
190 device_id,
191 }
192 }
193
194 #[tracing::instrument(skip(self, contents, csprng), fields(size = contents.len()))]
198 pub async fn upload_attachment<R: Rng + CryptoRng>(
199 &mut self,
200 spec: AttachmentSpec,
201 mut contents: Vec<u8>,
202 csprng: &mut R,
203 ) -> Result<AttachmentPointer, AttachmentUploadError> {
204 let len = contents.len();
205 let (key, iv) = {
207 let mut key = [0u8; 64];
208 let mut iv = [0u8; 16];
209 csprng.fill_bytes(&mut key);
210 csprng.fill_bytes(&mut iv);
211 (key, iv)
212 };
213
214 let padded_len: usize = {
218 std::cmp::max(
221 541,
222 1.05f64.powf((len as f64).log(1.05).ceil()).floor() as usize,
223 )
224 };
225 if padded_len < len {
226 error!(
227 "Padded len {} < len {}. Continuing with a privacy risk.",
228 padded_len, len
229 );
230 } else {
231 contents.resize(padded_len, 0);
232 }
233
234 tracing::trace_span!("encrypting attachment").in_scope(|| {
235 crate::attachment_cipher::encrypt_in_place(iv, key, &mut contents)
236 });
237
238 let attachment_upload_form = self
242 .service
243 .get_attachment_v4_upload_attributes()
244 .instrument(tracing::trace_span!("requesting upload attributes"))
245 .await?;
246
247 let resumable_upload_url = self
248 .service
249 .get_attachment_resumable_upload_url(&attachment_upload_form)
250 .await?;
251
252 let attachment_digest = self
253 .service
254 .upload_attachment_v4(
255 attachment_upload_form.cdn,
256 &resumable_upload_url,
257 contents.len() as u64,
258 attachment_upload_form.headers,
259 &mut std::io::Cursor::new(&contents),
260 )
261 .await?;
262
263 Ok(AttachmentPointer {
264 content_type: Some(spec.content_type),
265 key: Some(key.to_vec()),
266 size: Some(len as u32),
267 digest: Some(attachment_digest.digest),
269 file_name: spec.file_name,
270 flags: Some(
271 if spec.voice_note == Some(true) {
272 AttachmentPointerFlags::VoiceMessage as u32
273 } else {
274 0
275 } | if spec.borderless == Some(true) {
276 AttachmentPointerFlags::Borderless as u32
277 } else {
278 0
279 },
280 ),
281 width: spec.width,
282 height: spec.height,
283 caption: spec.caption,
284 blur_hash: spec.blur_hash,
285 upload_timestamp: Some(
286 SystemTime::now()
287 .duration_since(SystemTime::UNIX_EPOCH)
288 .expect("unix epoch in the past")
289 .as_millis() as u64,
290 ),
291 cdn_number: Some(attachment_upload_form.cdn),
292 attachment_identifier: Some(AttachmentIdentifier::CdnKey(
293 attachment_upload_form.key,
294 )),
295 ..Default::default()
296 })
297 }
298
299 #[tracing::instrument(skip(self, contacts))]
303 async fn upload_contact_details<Contacts>(
304 &mut self,
305 contacts: Contacts,
306 ) -> Result<AttachmentPointer, AttachmentUploadError>
307 where
308 Contacts: IntoIterator<Item = ContactDetails>,
309 {
310 use prost::Message;
311 let mut out = Vec::new();
312 for contact in contacts {
313 contact
314 .encode_length_delimited(&mut out)
315 .expect("infallible encoding");
316 }
318
319 let spec = AttachmentSpec {
320 content_type: "application/octet-stream".into(),
321 length: out.len(),
322 file_name: None,
323 preview: None,
324 voice_note: None,
325 borderless: None,
326 width: None,
327 height: None,
328 caption: None,
329 blur_hash: None,
330 };
331 self.upload_attachment(spec, out, &mut rng()).await
332 }
333
334 async fn is_multi_device(&self) -> bool {
339 if self.device_id == *DEFAULT_DEVICE_ID {
340 self.protocol_store
341 .get_sub_device_sessions(&self.local_aci.into())
342 .await
343 .is_ok_and(|s| !s.is_empty())
344 } else {
345 true
346 }
347 }
348
349 #[tracing::instrument(
351 skip(self, unidentified_access, message),
352 fields(unidentified_access = unidentified_access.is_some(), recipient = recipient.service_id_string()),
353 )]
354 pub async fn send_message(
355 &mut self,
356 recipient: &ServiceId,
357 mut unidentified_access: Option<UnidentifiedAccess>,
358 message: impl Into<ContentBody>,
359 timestamp: u64,
360 include_pni_signature: bool,
361 online: bool,
362 ) -> SendMessageResult {
363 let content_body = message.into();
364 let message_to_self = recipient == &self.local_aci;
365 let sync_message =
366 matches!(content_body, ContentBody::SynchronizeMessage(..));
367 let is_multi_device = self.is_multi_device().await;
368
369 use crate::proto::data_message::Flags;
370
371 let end_session = match &content_body {
372 ContentBody::DataMessage(message) => {
373 message.flags == Some(Flags::EndSession as u32)
374 },
375 _ => false,
376 };
377
378 if message_to_self && is_multi_device && !sync_message {
380 debug!("sending note to self");
381 if let Some(sync_message) = self
382 .create_multi_device_sent_transcript_content(
383 Some(recipient),
384 content_body,
385 timestamp,
386 None,
387 )
388 {
389 return self
390 .try_send_message(
391 *recipient,
392 None,
393 &sync_message,
394 timestamp,
395 include_pni_signature,
396 online,
397 )
398 .await;
399 } else {
400 error!("could not create sync message from message to self");
401 return SendMessageResult::Err(
402 MessageSenderError::NoMessagesToSend,
403 );
404 }
405 }
406
407 if end_session || sync_message {
410 unidentified_access.take();
411 }
412
413 let result = self
415 .try_send_message(
416 *recipient,
417 unidentified_access.as_ref(),
418 &content_body,
419 timestamp,
420 include_pni_signature,
421 online,
422 )
423 .await;
424
425 let needs_sync = match &result {
426 Ok(SentMessage { needs_sync, .. }) => *needs_sync,
427 _ => false,
428 };
429
430 if needs_sync || is_multi_device {
431 debug!("sending multi-device sync message");
432 if let Some(sync) = if sync_message {
433 Some(content_body)
434 } else {
435 self.create_multi_device_sent_transcript_content(
436 Some(recipient),
437 content_body,
438 timestamp,
439 Some(&result),
440 )
441 } {
442 self.try_send_message(
443 self.local_aci.into(),
444 None,
445 &sync,
446 timestamp,
447 false,
448 false,
449 )
450 .await?;
451 } else {
452 error!("could not create sync message from a direct message");
453 }
454 }
455
456 if end_session {
457 let n = self.protocol_store.delete_all_sessions(recipient).await?;
458 tracing::debug!(
459 "ended {} sessions with {}",
460 n,
461 recipient.raw_uuid()
462 );
463 }
464
465 result
466 }
467
468 #[tracing::instrument(
475 skip(self, recipients, message),
476 fields(recipients = recipients.as_ref().len()),
477 )]
478 pub async fn send_message_to_group(
479 &mut self,
480 recipients: impl AsRef<[(ServiceId, Option<UnidentifiedAccess>, bool)]>,
481 message: impl Into<ContentBody>,
482 timestamp: u64,
483 online: bool,
484 ) -> Vec<SendMessageResult> {
485 let content_body: ContentBody = message.into();
486 let mut results = vec![];
487
488 let mut needs_sync_in_results = false;
489
490 for (recipient, unidentified_access, include_pni_signature) in
491 recipients.as_ref()
492 {
493 let result = self
494 .try_send_message(
495 *recipient,
496 unidentified_access.as_ref(),
497 &content_body,
498 timestamp,
499 *include_pni_signature,
500 online,
501 )
502 .await;
503
504 match result {
505 Ok(SentMessage { needs_sync, .. }) if needs_sync => {
506 needs_sync_in_results = true;
507 },
508 _ => (),
509 };
510
511 results.push(result);
512 }
513
514 if needs_sync_in_results || self.is_multi_device().await {
516 if let Some(sync_message) = self
517 .create_multi_device_sent_transcript_content(
518 None,
519 content_body.clone(),
520 timestamp,
521 &results,
522 )
523 {
524 if let Err(error) = self
527 .try_send_message(
528 self.local_aci.into(),
529 None,
530 &sync_message,
531 timestamp,
532 false, false,
534 )
535 .await
536 {
537 error!(%error, "failed to send a synchronization message");
538 }
539 } else {
540 error!("could not create sync message from a group message")
541 }
542 }
543
544 results
545 }
546
547 #[tracing::instrument(
549 level = "trace",
550 skip(self, unidentified_access, content_body, recipient),
551 fields(unidentified_access = unidentified_access.is_some(), recipient = recipient.service_id_string()),
552 )]
553 async fn try_send_message(
554 &mut self,
555 recipient: ServiceId,
556 mut unidentified_access: Option<&UnidentifiedAccess>,
557 content_body: &ContentBody,
558 timestamp: u64,
559 include_pni_signature: bool,
560 online: bool,
561 ) -> SendMessageResult {
562 trace!("trying to send a message");
563
564 use prost::Message;
565
566 let mut content = content_body.clone().into_proto();
567 if include_pni_signature {
568 content.pni_signature_message = Some(self.create_pni_signature()?);
569 }
570
571 let content_bytes = content.encode_to_vec();
572
573 let mut rng = rng();
574
575 for _ in 0..4u8 {
576 let Some(EncryptedMessages {
577 messages,
578 used_identity_key,
579 }) = self
580 .create_encrypted_messages(
581 &recipient,
582 unidentified_access.map(|x| &x.certificate),
583 &content_bytes,
584 )
585 .await?
586 else {
587 return Err(MessageSenderError::NoMessagesToSend);
591 };
592
593 let messages = OutgoingPushMessages {
594 destination: recipient,
595 timestamp,
596 messages,
597 online,
598 };
599
600 let send = if let Some(unidentified) = &unidentified_access {
601 tracing::debug!("sending via unidentified");
602 self.unidentified_ws
603 .send_messages_unidentified(messages, unidentified)
604 .await
605 } else {
606 tracing::debug!("sending identified");
607 self.identified_ws.send_messages(messages).await
608 };
609
610 match send {
611 Ok(SendMessageResponse { needs_sync }) => {
612 tracing::debug!("message sent!");
613 return Ok(SentMessage {
614 recipient,
615 used_identity_key,
616 unidentified: unidentified_access.is_some(),
617 needs_sync,
618 });
619 },
620 Err(ServiceError::Unauthorized)
621 if unidentified_access.is_some() =>
622 {
623 tracing::trace!("unauthorized error using unidentified; retry over identified");
624 unidentified_access = None;
625 },
626 Err(ServiceError::MismatchedDevicesException(ref m)) => {
627 tracing::debug!("{:?}", m);
628 for extra_device_id in &m.extra_devices {
629 tracing::debug!(
630 "dropping session with device {}",
631 extra_device_id
632 );
633 self.protocol_store
634 .delete_service_addr_device_session(
635 &recipient
636 .to_protocol_address(*extra_device_id)?,
637 )
638 .await?;
639 }
640
641 for missing_device_id in &m.missing_devices {
642 tracing::debug!(
643 "creating session with missing device {}",
644 missing_device_id
645 );
646 let remote_address = recipient
647 .to_protocol_address(*missing_device_id)?;
648 let pre_key = self
649 .identified_ws
650 .get_pre_key(&recipient, *missing_device_id)
651 .await?;
652
653 process_prekey_bundle(
654 &remote_address,
655 &mut self.protocol_store.clone(),
656 &mut self.protocol_store,
657 &pre_key,
658 SystemTime::now(),
659 &mut rng,
660 )
661 .await
662 .map_err(|e| {
663 error!("failed to create session: {}", e);
664 MessageSenderError::UntrustedIdentity {
665 address: recipient,
666 }
667 })?;
668 }
669 },
670 Err(ServiceError::StaleDevices(ref m)) => {
671 tracing::debug!("{:?}", m);
672 for extra_device_id in &m.stale_devices {
673 tracing::debug!(
674 "dropping session with device {}",
675 extra_device_id
676 );
677 self.protocol_store
678 .delete_service_addr_device_session(
679 &recipient
680 .to_protocol_address(*extra_device_id)?,
681 )
682 .await?;
683 }
684 },
685 Err(ServiceError::ProofRequiredError(ref p)) => {
686 tracing::debug!("{:?}", p);
687 return Err(MessageSenderError::ProofRequired {
688 token: p.token.clone(),
689 options: p.options.clone(),
690 });
691 },
692 Err(ServiceError::NotFoundError) => {
693 tracing::debug!("Not found when sending a message");
694 return Err(MessageSenderError::NotFound {
695 service_id: recipient,
696 });
697 },
698 Err(e) => {
699 tracing::debug!(
700 "Default error handler for ws.send_messages: {}",
701 e
702 );
703 return Err(MessageSenderError::ServiceError(e));
704 },
705 }
706 }
707
708 Err(MessageSenderError::MaximumRetriesLimitExceeded)
709 }
710
711 #[tracing::instrument(
713 skip(self, unidentified_access, contacts, recipient),
714 fields(unidentified_access = unidentified_access.is_some(), recipient = recipient.service_id_string()),
715 )]
716 pub async fn send_contact_details<Contacts>(
717 &mut self,
718 recipient: &ServiceId,
719 unidentified_access: Option<UnidentifiedAccess>,
720 contacts: Contacts,
724 online: bool,
725 complete: bool,
726 ) -> Result<(), MessageSenderError>
727 where
728 Contacts: IntoIterator<Item = ContactDetails>,
729 {
730 let ptr = self.upload_contact_details(contacts).await?;
731
732 let msg = SyncMessage {
733 contacts: Some(sync_message::Contacts {
734 blob: Some(ptr),
735 complete: Some(complete),
736 }),
737 ..SyncMessage::with_padding(&mut rng())
738 };
739
740 self.send_sync_message(msg).await?;
741
742 Ok(())
743 }
744
745 #[tracing::instrument(skip(self), fields(recipient = recipient.service_id_string()))]
747 pub async fn send_message_request_response(
748 &mut self,
749 recipient: &ServiceId,
750 thread: &ThreadIdentifier,
751 action: message_request_response::Type,
752 ) -> Result<(), MessageSenderError> {
753 let message_request_response = Some(match thread {
754 ThreadIdentifier::Aci(aci) => {
755 tracing::debug!(
756 "sending message request response {:?} for recipient {:?}",
757 action,
758 aci
759 );
760 MessageRequestResponse {
761 thread_aci: Some(aci.to_string()),
762 group_id: None,
763 r#type: Some(action.into()),
764 }
765 },
766 ThreadIdentifier::Group(id) => {
767 tracing::debug!(
768 "sending message request response {:?} for group {:?}",
769 action,
770 id
771 );
772 MessageRequestResponse {
773 thread_aci: None,
774 group_id: Some(id.to_vec()),
775 r#type: Some(action.into()),
776 }
777 },
778 });
779
780 let msg = SyncMessage {
781 message_request_response,
782 ..SyncMessage::with_padding(&mut rng())
783 };
784
785 let ts = Utc::now().timestamp_millis() as u64;
786 self.send_message(recipient, None, msg, ts, false, false)
787 .await?;
788
789 Ok(())
790 }
791
792 pub async fn send_sync_message(
794 &mut self,
795 sync: SyncMessage,
796 ) -> Result<(), MessageSenderError> {
797 if self.is_multi_device().await {
798 let content = sync.into();
799 let timestamp = Utc::now().timestamp_millis() as u64;
800 debug!(
801 "sending multi-device sync message with content {content:?}"
802 );
803 self.try_send_message(
804 self.local_aci.into(),
805 None,
806 &content,
807 timestamp,
808 false,
809 false,
810 )
811 .await?;
812 }
813 Ok(())
814 }
815
816 #[tracing::instrument(skip(self))]
818 pub async fn send_sync_message_request(
819 &mut self,
820 recipient: &ServiceId,
821 request_type: sync_message::request::Type,
822 ) -> Result<(), MessageSenderError> {
823 if self.device_id == *DEFAULT_DEVICE_ID {
824 return Err(MessageSenderError::SendSyncMessageError(request_type));
825 }
826
827 let msg = SyncMessage {
828 request: Some(sync_message::Request {
829 r#type: Some(request_type.into()),
830 }),
831 ..SyncMessage::with_padding(&mut rng())
832 };
833 self.send_sync_message(msg).await?;
834
835 Ok(())
836 }
837
838 #[tracing::instrument(level = "trace", skip(self))]
839 fn create_pni_signature(
840 &mut self,
841 ) -> Result<crate::proto::PniSignatureMessage, MessageSenderError> {
842 let mut rng = rng();
843 let signature = self
844 .pni_identity
845 .expect("PNI key set when PNI signature requested")
846 .sign_alternate_identity(
847 self.aci_identity.identity_key(),
848 &mut rng,
849 )?;
850 Ok(crate::proto::PniSignatureMessage {
851 pni: Some(self.local_pni.service_id_binary()),
852 signature: Some(signature.into()),
853 })
854 }
855
856 #[tracing::instrument(
858 level = "trace",
859 skip(self, unidentified_access, content),
860 fields(unidentified_access = unidentified_access.is_some(), recipient = recipient.service_id_string()),
861 )]
862 async fn create_encrypted_messages(
863 &mut self,
864 recipient: &ServiceId,
865 unidentified_access: Option<&SenderCertificate>,
866 content: &[u8],
867 ) -> Result<Option<EncryptedMessages>, MessageSenderError> {
868 let mut messages = vec![];
869
870 let mut devices: HashSet<DeviceId> = self
871 .protocol_store
872 .get_sub_device_sessions(recipient)
873 .await?
874 .into_iter()
875 .collect();
876
877 devices.insert(*DEFAULT_DEVICE_ID);
879
880 match recipient {
882 ServiceId::Aci(aci) => {
883 if *aci == self.local_aci {
884 devices.remove(&self.device_id);
885 }
886 },
887 ServiceId::Pni(pni) => {
888 if *pni == self.local_pni {
889 devices.remove(&self.device_id);
890 }
891 },
892 };
893
894 for device_id in devices {
895 trace!("sending message to device {}", device_id);
896 for _attempt in 0..2 {
900 match self
901 .create_encrypted_message(
902 recipient,
903 unidentified_access,
904 device_id,
905 content,
906 )
907 .await
908 {
909 Ok(message) => {
910 messages.push(message);
911 break;
912 },
913 Err(MessageSenderError::ServiceError(
914 ServiceError::SignalProtocolError(
915 SignalProtocolError::SessionNotFound(addr),
916 ),
917 )) => {
918 tracing::warn!("Potential session corruption for {}, deleting session", addr);
923 match self.protocol_store.delete_session(&addr).await {
924 Ok(()) => continue,
925 Err(error) => {
926 tracing::warn!(%error, %addr, "failed to delete session");
927 return Err(
928 SignalProtocolError::SessionNotFound(addr)
929 .into(),
930 );
931 },
932 }
933 },
934 Err(e) => return Err(e),
935 }
936 }
937 }
938
939 if messages.is_empty() {
940 Ok(None)
941 } else {
942 Ok(Some(EncryptedMessages {
943 messages,
944 used_identity_key: self
945 .protocol_store
946 .get_identity(
947 &recipient.to_protocol_address(*DEFAULT_DEVICE_ID),
948 )
949 .await?
950 .ok_or(MessageSenderError::UntrustedIdentity {
951 address: *recipient,
952 })?,
953 }))
954 }
955 }
956
957 #[tracing::instrument(
961 level = "trace",
962 skip(self, unidentified_access, content),
963 fields(unidentified_access = unidentified_access.is_some(), recipient = recipient.service_id_string()),
964 )]
965 pub(crate) async fn create_encrypted_message(
966 &mut self,
967 recipient: &ServiceId,
968 unidentified_access: Option<&SenderCertificate>,
969 device_id: DeviceId,
970 content: &[u8],
971 ) -> Result<OutgoingPushMessage, MessageSenderError> {
972 let recipient_protocol_address =
973 recipient.to_protocol_address(device_id);
974
975 tracing::trace!(
976 "encrypting message for {}",
977 recipient_protocol_address
978 );
979
980 if self
983 .protocol_store
984 .load_session(&recipient_protocol_address)
985 .await?
986 .is_none()
987 {
988 info!(
989 "establishing new session with {}",
990 recipient_protocol_address
991 );
992 let pre_keys = match self
993 .identified_ws
994 .get_pre_keys(recipient, device_id)
995 .await
996 {
997 Ok(ok) => {
998 tracing::trace!("Get prekeys OK");
999 ok
1000 },
1001 Err(ServiceError::NotFoundError) => {
1002 return Err(MessageSenderError::NotFound {
1003 service_id: *recipient,
1004 });
1005 },
1006 Err(e) => Err(e)?,
1007 };
1008
1009 let mut rng = rng();
1010
1011 for pre_key_bundle in pre_keys {
1012 if recipient == &self.local_aci
1013 && self.device_id == pre_key_bundle.device_id()?
1014 {
1015 trace!("not establishing a session with myself!");
1016 continue;
1017 }
1018
1019 let pre_key_address = get_preferred_protocol_address(
1020 &self.protocol_store,
1021 recipient,
1022 pre_key_bundle.device_id()?,
1023 )
1024 .await?;
1025
1026 process_prekey_bundle(
1027 &pre_key_address,
1028 &mut self.protocol_store.clone(),
1029 &mut self.protocol_store,
1030 &pre_key_bundle,
1031 SystemTime::now(),
1032 &mut rng,
1033 )
1034 .await?;
1035 }
1036 }
1037
1038 let message = self
1039 .cipher
1040 .encrypt(
1041 &recipient_protocol_address,
1042 unidentified_access,
1043 content,
1044 &mut rng(),
1045 )
1046 .instrument(tracing::trace_span!("encrypting message"))
1047 .await?;
1048
1049 Ok(message)
1050 }
1051
1052 fn create_multi_device_sent_transcript_content<'a>(
1053 &mut self,
1054 recipient: Option<&ServiceId>,
1055 content_body: ContentBody,
1056 timestamp: u64,
1057 send_message_results: impl IntoIterator<Item = &'a SendMessageResult>,
1058 ) -> Option<ContentBody> {
1059 use sync_message::sent::UnidentifiedDeliveryStatus;
1060 let (message, edit_message) = match content_body {
1061 ContentBody::DataMessage(m) => (Some(m), None),
1062 ContentBody::EditMessage(m) => (None, Some(m)),
1063 _ => return None,
1064 };
1065 let unidentified_status: Vec<UnidentifiedDeliveryStatus> =
1066 send_message_results
1067 .into_iter()
1068 .filter_map(|result| result.as_ref().ok())
1069 .map(|sent| {
1070 let SentMessage {
1071 recipient,
1072 unidentified,
1073 used_identity_key,
1074 ..
1075 } = sent;
1076 UnidentifiedDeliveryStatus {
1077 destination_service_id: Some(
1078 recipient.service_id_string(),
1079 ),
1080 unidentified: Some(*unidentified),
1081 destination_pni_identity_key: Some(
1082 used_identity_key.serialize().into(),
1083 ),
1084 }
1085 })
1086 .collect();
1087 Some(ContentBody::SynchronizeMessage(SyncMessage {
1088 sent: Some(sync_message::Sent {
1089 destination_service_id: recipient
1090 .map(ServiceId::service_id_string),
1091 destination_e164: None,
1092 expiration_start_timestamp: message
1093 .as_ref()
1094 .and_then(|m| m.expire_timer)
1095 .map(|_| timestamp),
1096 message,
1097 edit_message,
1098 timestamp: Some(timestamp),
1099 unidentified_status,
1100 ..Default::default()
1101 }),
1102 ..SyncMessage::with_padding(&mut rng())
1103 }))
1104 }
1105}