libsignal_protocol/
session_cipher.rs

1//
2// Copyright 2020-2022 Signal Messenger, LLC.
3// SPDX-License-Identifier: AGPL-3.0-only
4//
5
6use std::time::SystemTime;
7
8use rand::{CryptoRng, Rng};
9
10use crate::consts::{MAX_FORWARD_JUMPS, MAX_UNACKNOWLEDGED_SESSION_AGE};
11use crate::ratchet::{ChainKey, MessageKeyGenerator};
12use crate::state::{InvalidSessionError, SessionState};
13use crate::{
14    CiphertextMessage, CiphertextMessageType, Direction, IdentityKeyStore, KeyPair, KyberPayload,
15    KyberPreKeyStore, PreKeySignalMessage, PreKeyStore, ProtocolAddress, PublicKey, Result,
16    SessionRecord, SessionStore, SignalMessage, SignalProtocolError, SignedPreKeyStore, session,
17};
18
19pub async fn message_encrypt<R: Rng + CryptoRng>(
20    ptext: &[u8],
21    remote_address: &ProtocolAddress,
22    local_address: &ProtocolAddress,
23    session_store: &mut dyn SessionStore,
24    identity_store: &mut dyn IdentityKeyStore,
25    now: SystemTime,
26    csprng: &mut R,
27) -> Result<CiphertextMessage> {
28    let mut session_record = session_store
29        .load_session(remote_address)
30        .await?
31        .ok_or_else(|| SignalProtocolError::SessionNotFound(remote_address.clone()))?;
32    let session_state = session_record
33        .session_state_mut()
34        .ok_or_else(|| SignalProtocolError::SessionNotFound(remote_address.clone()))?;
35
36    let chain_key = session_state.get_sender_chain_key()?;
37
38    let (pqr_msg, pqr_key) = session_state.pq_ratchet_send(csprng).map_err(|e| {
39        // Since we're sending, this must be an error with the state.
40        SignalProtocolError::InvalidState(
41            "message_encrypt",
42            format!("post-quantum ratchet send error: {e}"),
43        )
44    })?;
45    let message_keys = chain_key.message_keys().generate_keys(pqr_key);
46
47    let sender_ephemeral = session_state.sender_ratchet_key()?;
48    let previous_counter = session_state.previous_counter();
49    let session_version = session_state
50        .session_version()?
51        .try_into()
52        .map_err(|_| SignalProtocolError::InvalidSessionStructure("version does not fit in u8"))?;
53
54    let local_identity_key = session_state.local_identity_key()?;
55    let their_identity_key = session_state.remote_identity_key()?.ok_or_else(|| {
56        SignalProtocolError::InvalidState(
57            "message_encrypt",
58            format!("no remote identity key for {remote_address}"),
59        )
60    })?;
61
62    let ctext =
63        signal_crypto::aes_256_cbc_encrypt(ptext, message_keys.cipher_key(), message_keys.iv())
64            .map_err(|_| {
65                log::error!("session state corrupt for {remote_address}");
66                SignalProtocolError::InvalidSessionStructure("invalid sender chain message keys")
67            })?;
68
69    let message = if let Some(items) = session_state.unacknowledged_pre_key_message_items()? {
70        let timestamp_as_unix_time = items
71            .timestamp()
72            .duration_since(SystemTime::UNIX_EPOCH)
73            .unwrap_or_default()
74            .as_secs();
75        if items.timestamp() + MAX_UNACKNOWLEDGED_SESSION_AGE < now {
76            log::warn!(
77                "stale unacknowledged session for {remote_address} (created at {timestamp_as_unix_time})"
78            );
79            return Err(SignalProtocolError::SessionNotFound(remote_address.clone()));
80        }
81
82        let local_registration_id = session_state.local_registration_id();
83
84        log::info!(
85            "Building PreKeyWhisperMessage for: {} with preKeyId: {} (session created at {})",
86            remote_address,
87            items
88                .pre_key_id()
89                .map_or_else(|| "<none>".to_string(), |id| id.to_string()),
90            timestamp_as_unix_time,
91        );
92
93        let message = SignalMessage::new(
94            session_version,
95            message_keys.mac_key(),
96            Some((local_address, remote_address)),
97            sender_ephemeral,
98            chain_key.index(),
99            previous_counter,
100            &ctext,
101            &local_identity_key,
102            &their_identity_key,
103            &pqr_msg,
104        )?;
105
106        let kyber_payload = items
107            .kyber_pre_key_id()
108            .zip(items.kyber_ciphertext())
109            .map(|(id, ciphertext)| KyberPayload::new(id, ciphertext.into()));
110
111        CiphertextMessage::PreKeySignalMessage(PreKeySignalMessage::new(
112            session_version,
113            local_registration_id,
114            items.pre_key_id(),
115            items.signed_pre_key_id(),
116            kyber_payload,
117            *items.base_key(),
118            local_identity_key,
119            message,
120        )?)
121    } else {
122        CiphertextMessage::SignalMessage(SignalMessage::new(
123            session_version,
124            message_keys.mac_key(),
125            None,
126            sender_ephemeral,
127            chain_key.index(),
128            previous_counter,
129            &ctext,
130            &local_identity_key,
131            &their_identity_key,
132            &pqr_msg,
133        )?)
134    };
135
136    session_state.set_sender_chain_key(&chain_key.next_chain_key());
137
138    // XXX why is this check after everything else?!!
139    if !identity_store
140        .is_trusted_identity(remote_address, &their_identity_key, Direction::Sending)
141        .await?
142    {
143        log::warn!(
144            "Identity key {} is not trusted for remote address {}",
145            hex::encode(their_identity_key.public_key().public_key_bytes()),
146            remote_address,
147        );
148        return Err(SignalProtocolError::UntrustedIdentity(
149            remote_address.clone(),
150        ));
151    }
152
153    // XXX this could be combined with the above call to the identity store (in a new API)
154    identity_store
155        .save_identity(remote_address, &their_identity_key)
156        .await?;
157
158    session_store
159        .store_session(remote_address, &session_record)
160        .await?;
161    Ok(message)
162}
163
164#[allow(clippy::too_many_arguments)]
165pub async fn message_decrypt<R: Rng + CryptoRng>(
166    ciphertext: &CiphertextMessage,
167    remote_address: &ProtocolAddress,
168    local_address: &ProtocolAddress,
169    session_store: &mut dyn SessionStore,
170    identity_store: &mut dyn IdentityKeyStore,
171    pre_key_store: &mut dyn PreKeyStore,
172    signed_pre_key_store: &dyn SignedPreKeyStore,
173    kyber_pre_key_store: &mut dyn KyberPreKeyStore,
174    csprng: &mut R,
175) -> Result<Vec<u8>> {
176    match ciphertext {
177        CiphertextMessage::SignalMessage(m) => {
178            let _ = local_address;
179            message_decrypt_signal(m, remote_address, session_store, identity_store, csprng).await
180        }
181        CiphertextMessage::PreKeySignalMessage(m) => {
182            message_decrypt_prekey(
183                m,
184                remote_address,
185                local_address,
186                session_store,
187                identity_store,
188                pre_key_store,
189                signed_pre_key_store,
190                kyber_pre_key_store,
191                csprng,
192            )
193            .await
194        }
195        _ => Err(SignalProtocolError::InvalidArgument(format!(
196            "message_decrypt cannot be used to decrypt {:?} messages",
197            ciphertext.message_type()
198        ))),
199    }
200}
201
202#[allow(clippy::too_many_arguments)]
203pub async fn message_decrypt_prekey<R: Rng + CryptoRng>(
204    ciphertext: &PreKeySignalMessage,
205    remote_address: &ProtocolAddress,
206    local_address: &ProtocolAddress,
207    session_store: &mut dyn SessionStore,
208    identity_store: &mut dyn IdentityKeyStore,
209    pre_key_store: &mut dyn PreKeyStore,
210    signed_pre_key_store: &dyn SignedPreKeyStore,
211    kyber_pre_key_store: &mut dyn KyberPreKeyStore,
212    csprng: &mut R,
213) -> Result<Vec<u8>> {
214    let mut session_record = session_store
215        .load_session(remote_address)
216        .await?
217        .unwrap_or_else(SessionRecord::new_fresh);
218
219    // Make sure we log the session state if we fail to process the pre-key.
220    let process_prekey_result = session::process_prekey(
221        ciphertext,
222        remote_address,
223        &mut session_record,
224        identity_store,
225        pre_key_store,
226        signed_pre_key_store,
227        kyber_pre_key_store,
228    )
229    .await;
230
231    let (pre_key_used, identity_to_save) = match process_prekey_result {
232        Ok(result) => result,
233        Err(e) => {
234            let errs = [e];
235            log::error!(
236                "{}",
237                create_decryption_failure_log(
238                    remote_address,
239                    &errs,
240                    &session_record,
241                    ciphertext.message()
242                )?
243            );
244            let [e] = errs;
245            return Err(e);
246        }
247    };
248
249    let ptext = decrypt_message_with_record(
250        remote_address,
251        Some(local_address),
252        &mut session_record,
253        ciphertext.message(),
254        CiphertextMessageType::PreKey,
255        csprng,
256    )?;
257
258    identity_store
259        .save_identity(
260            identity_to_save.remote_address,
261            identity_to_save.their_identity_key,
262        )
263        .await?;
264
265    if let Some(pre_key_used) = pre_key_used {
266        if let Some(kyber_pre_key_id) = pre_key_used.kyber_pre_key_id {
267            kyber_pre_key_store
268                .mark_kyber_pre_key_used(
269                    kyber_pre_key_id,
270                    pre_key_used.signed_ec_pre_key_id,
271                    ciphertext.base_key(),
272                )
273                .await?;
274        }
275
276        if let Some(pre_key_id) = pre_key_used.one_time_ec_pre_key_id {
277            pre_key_store.remove_pre_key(pre_key_id).await?;
278        }
279    }
280
281    session_store
282        .store_session(remote_address, &session_record)
283        .await?;
284
285    Ok(ptext)
286}
287
288pub async fn message_decrypt_signal<R: Rng + CryptoRng>(
289    ciphertext: &SignalMessage,
290    remote_address: &ProtocolAddress,
291    session_store: &mut dyn SessionStore,
292    identity_store: &mut dyn IdentityKeyStore,
293    csprng: &mut R,
294) -> Result<Vec<u8>> {
295    let mut session_record = session_store
296        .load_session(remote_address)
297        .await?
298        .ok_or_else(|| SignalProtocolError::SessionNotFound(remote_address.clone()))?;
299
300    let ptext = decrypt_message_with_record(
301        remote_address,
302        None,
303        &mut session_record,
304        ciphertext,
305        CiphertextMessageType::Whisper,
306        csprng,
307    )?;
308
309    // Why are we performing this check after decryption instead of before?
310    let their_identity_key = session_record
311        .session_state()
312        .expect("successfully decrypted; must have a current state")
313        .remote_identity_key()
314        .expect("successfully decrypted; must have a remote identity key")
315        .expect("successfully decrypted; must have a remote identity key");
316
317    if !identity_store
318        .is_trusted_identity(remote_address, &their_identity_key, Direction::Receiving)
319        .await?
320    {
321        log::warn!(
322            "Identity key {} is not trusted for remote address {}",
323            hex::encode(their_identity_key.public_key().public_key_bytes()),
324            remote_address,
325        );
326        return Err(SignalProtocolError::UntrustedIdentity(
327            remote_address.clone(),
328        ));
329    }
330
331    identity_store
332        .save_identity(remote_address, &their_identity_key)
333        .await?;
334
335    session_store
336        .store_session(remote_address, &session_record)
337        .await?;
338
339    Ok(ptext)
340}
341
342fn create_decryption_failure_log(
343    remote_address: &ProtocolAddress,
344    mut errs: &[SignalProtocolError],
345    record: &SessionRecord,
346    ciphertext: &SignalMessage,
347) -> Result<String> {
348    fn append_session_summary(
349        lines: &mut Vec<String>,
350        idx: usize,
351        state: std::result::Result<&SessionState, InvalidSessionError>,
352        err: Option<&SignalProtocolError>,
353    ) {
354        let chains = state.map(|state| state.all_receiver_chain_logging_info());
355        match (err, &chains) {
356            (Some(err), Ok(chains)) => {
357                lines.push(format!(
358                    "Candidate session {} failed with '{}', had {} receiver chains",
359                    idx,
360                    err,
361                    chains.len()
362                ));
363            }
364            (Some(err), Err(state_err)) => {
365                lines.push(format!(
366                    "Candidate session {idx} failed with '{err}'; cannot get receiver chain info ({state_err})",
367                ));
368            }
369            (None, Ok(chains)) => {
370                lines.push(format!(
371                    "Candidate session {} had {} receiver chains",
372                    idx,
373                    chains.len()
374                ));
375            }
376            (None, Err(state_err)) => {
377                lines.push(format!(
378                    "Candidate session {idx}: cannot get receiver chain info ({state_err})",
379                ));
380            }
381        }
382
383        if let Ok(chains) = chains {
384            for chain in chains {
385                let chain_idx = match chain.1 {
386                    Some(i) => i.to_string(),
387                    None => "missing in protobuf".to_string(),
388                };
389
390                lines.push(format!(
391                    "Receiver chain with sender ratchet public key {} chain key index {}",
392                    hex::encode(chain.0),
393                    chain_idx
394                ));
395            }
396        }
397    }
398
399    let mut lines = vec![];
400
401    lines.push(format!(
402        "Message from {} failed to decrypt; sender ratchet public key {} message counter {}",
403        remote_address,
404        hex::encode(ciphertext.sender_ratchet_key().public_key_bytes()),
405        ciphertext.counter()
406    ));
407
408    if let Some(current_session) = record.session_state() {
409        let err = errs.first();
410        if err.is_some() {
411            errs = &errs[1..];
412        }
413        append_session_summary(&mut lines, 0, Ok(current_session), err);
414    } else {
415        lines.push("No current session".to_string());
416    }
417
418    for (idx, (state, err)) in record
419        .previous_session_states()
420        .zip(errs.iter().map(Some).chain(std::iter::repeat(None)))
421        .enumerate()
422    {
423        let state = match state {
424            Ok(ref state) => Ok(state),
425            Err(err) => Err(err),
426        };
427        append_session_summary(&mut lines, idx + 1, state, err);
428    }
429
430    Ok(lines.join("\n"))
431}
432
433fn decrypt_message_with_record<R: Rng + CryptoRng>(
434    remote_address: &ProtocolAddress,
435    local_address: Option<&ProtocolAddress>,
436    record: &mut SessionRecord,
437    ciphertext: &SignalMessage,
438    original_message_type: CiphertextMessageType,
439    csprng: &mut R,
440) -> Result<Vec<u8>> {
441    debug_assert!(matches!(
442        original_message_type,
443        CiphertextMessageType::Whisper | CiphertextMessageType::PreKey
444    ));
445    let log_decryption_failure = |state: &SessionState, error: &SignalProtocolError| {
446        // A warning rather than an error because we try multiple sessions.
447        log::warn!(
448            "Failed to decrypt {:?} message with ratchet key: {} and counter: {}. \
449             Session loaded for {}. Local session has base key: {} and counter: {}. {}",
450            original_message_type,
451            hex::encode(ciphertext.sender_ratchet_key().public_key_bytes()),
452            ciphertext.counter(),
453            remote_address,
454            state
455                .sender_ratchet_key_for_logging()
456                .unwrap_or_else(|e| format!("<error: {e}>")),
457            state.previous_counter(),
458            error
459        );
460    };
461
462    let mut errs = vec![];
463
464    if let Some(current_state) = record.session_state() {
465        let mut current_state = current_state.clone();
466        let result = decrypt_message_with_state(
467            CurrentOrPrevious::Current,
468            &mut current_state,
469            ciphertext,
470            original_message_type,
471            remote_address,
472            local_address,
473            csprng,
474        );
475
476        match result {
477            Ok(ptext) => {
478                log::info!(
479                    "decrypted {:?} message from {} with current session state (base key {})",
480                    original_message_type,
481                    remote_address,
482                    current_state
483                        .sender_ratchet_key_for_logging()
484                        .expect("successful decrypt always has a valid base key"),
485                );
486                record.set_session_state(current_state); // update the state
487                return Ok(ptext);
488            }
489            Err(SignalProtocolError::DuplicatedMessage(_, _)) => {
490                return result;
491            }
492            Err(e) => {
493                log_decryption_failure(&current_state, &e);
494                errs.push(e);
495                match original_message_type {
496                    CiphertextMessageType::PreKey => {
497                        // A PreKey message creates a session and then decrypts a Whisper message
498                        // using that session. No need to check older sessions.
499                        log::error!(
500                            "{}",
501                            create_decryption_failure_log(
502                                remote_address,
503                                &errs,
504                                record,
505                                ciphertext
506                            )?
507                        );
508                        // Note that we don't propagate `e` here; we always return InvalidMessage,
509                        // as we would for a Whisper message that tried several sessions.
510                        return Err(SignalProtocolError::InvalidMessage(
511                            original_message_type,
512                            "decryption failed",
513                        ));
514                    }
515                    CiphertextMessageType::Whisper => {}
516                    CiphertextMessageType::SenderKey | CiphertextMessageType::Plaintext => {
517                        unreachable!("should not be using Double Ratchet for these")
518                    }
519                }
520            }
521        }
522    }
523
524    // Try some old sessions:
525    let mut updated_session = None;
526
527    for (idx, previous) in record.previous_session_states().enumerate() {
528        let mut previous = previous?;
529
530        let result = decrypt_message_with_state(
531            CurrentOrPrevious::Previous,
532            &mut previous,
533            ciphertext,
534            original_message_type,
535            remote_address,
536            local_address,
537            csprng,
538        );
539
540        match result {
541            Ok(ptext) => {
542                log::info!(
543                    "decrypted {:?} message from {} with PREVIOUS session state (base key {})",
544                    original_message_type,
545                    remote_address,
546                    previous
547                        .sender_ratchet_key_for_logging()
548                        .expect("successful decrypt always has a valid base key"),
549                );
550                updated_session = Some((ptext, idx, previous));
551                break;
552            }
553            Err(SignalProtocolError::DuplicatedMessage(_, _)) => {
554                return result;
555            }
556            Err(e) => {
557                log_decryption_failure(&previous, &e);
558                errs.push(e);
559            }
560        }
561    }
562
563    if let Some((ptext, idx, updated_session)) = updated_session {
564        record.promote_old_session(idx, updated_session);
565        Ok(ptext)
566    } else {
567        let previous_state_count = || record.previous_session_states().len();
568
569        if let Some(current_state) = record.session_state() {
570            log::error!(
571                "No valid session for recipient: {}, current session base key {}, number of previous states: {}",
572                remote_address,
573                current_state
574                    .sender_ratchet_key_for_logging()
575                    .unwrap_or_else(|e| format!("<error: {e}>")),
576                previous_state_count(),
577            );
578        } else {
579            log::error!(
580                "No valid session for recipient: {}, (no current session state), number of previous states: {}",
581                remote_address,
582                previous_state_count(),
583            );
584        }
585        log::error!(
586            "{}",
587            create_decryption_failure_log(remote_address, &errs, record, ciphertext)?
588        );
589        Err(SignalProtocolError::InvalidMessage(
590            original_message_type,
591            "decryption failed",
592        ))
593    }
594}
595
596#[derive(Clone, Copy)]
597enum CurrentOrPrevious {
598    Current,
599    Previous,
600}
601
602impl std::fmt::Display for CurrentOrPrevious {
603    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
604        match self {
605            Self::Current => write!(f, "current"),
606            Self::Previous => write!(f, "previous"),
607        }
608    }
609}
610
611fn decrypt_message_with_state<R: Rng + CryptoRng>(
612    current_or_previous: CurrentOrPrevious,
613    state: &mut SessionState,
614    ciphertext: &SignalMessage,
615    original_message_type: CiphertextMessageType,
616    remote_address: &ProtocolAddress,
617    local_address: Option<&ProtocolAddress>,
618    csprng: &mut R,
619) -> Result<Vec<u8>> {
620    // Check for a completely empty or invalid session state before we do anything else.
621    let _ = state.root_key().map_err(|_| {
622        SignalProtocolError::InvalidMessage(
623            original_message_type,
624            "No session available to decrypt",
625        )
626    })?;
627
628    let ciphertext_version = ciphertext.message_version() as u32;
629    if ciphertext_version != state.session_version()? {
630        return Err(SignalProtocolError::UnrecognizedMessageVersion(
631            ciphertext_version,
632        ));
633    }
634
635    let their_ephemeral = ciphertext.sender_ratchet_key();
636    let counter = ciphertext.counter();
637    let chain_key = get_or_create_chain_key(state, their_ephemeral, remote_address, csprng)?;
638    let message_key_gen = get_or_create_message_key(
639        state,
640        their_ephemeral,
641        remote_address,
642        original_message_type,
643        &chain_key,
644        counter,
645    )?;
646    let pqr_key = state
647        .pq_ratchet_recv(ciphertext.pq_ratchet())
648        .map_err(|e| match e {
649            spqr::Error::StateDecode => SignalProtocolError::InvalidState(
650                "decrypt_message_with_state",
651                format!("post-quantum ratchet error: {e}"),
652            ),
653            _ => {
654                log::info!("post-quantum ratchet error in decrypt_message_with_state: {e}");
655                SignalProtocolError::InvalidMessage(
656                    original_message_type,
657                    "post-quantum ratchet error",
658                )
659            }
660        })?;
661    let message_keys = message_key_gen.generate_keys(pqr_key);
662
663    let their_identity_key =
664        state
665            .remote_identity_key()?
666            .ok_or(SignalProtocolError::InvalidSessionStructure(
667                "cannot decrypt without remote identity key",
668            ))?;
669
670    let mac_valid = match local_address {
671        Some(local_address) => ciphertext.verify_mac_with_addresses(
672            remote_address,
673            local_address,
674            &their_identity_key,
675            &state.local_identity_key()?,
676            message_keys.mac_key(),
677        )?,
678        None => ciphertext.verify_mac(
679            &their_identity_key,
680            &state.local_identity_key()?,
681            message_keys.mac_key(),
682        )?,
683    };
684
685    if !mac_valid {
686        return Err(SignalProtocolError::InvalidMessage(
687            original_message_type,
688            "MAC verification failed",
689        ));
690    }
691
692    let ptext = match signal_crypto::aes_256_cbc_decrypt(
693        ciphertext.body(),
694        message_keys.cipher_key(),
695        message_keys.iv(),
696    ) {
697        Ok(ptext) => ptext,
698        Err(signal_crypto::DecryptionError::BadKeyOrIv) => {
699            log::warn!("{current_or_previous} session state corrupt for {remote_address}",);
700            return Err(SignalProtocolError::InvalidSessionStructure(
701                "invalid receiver chain message keys",
702            ));
703        }
704        Err(signal_crypto::DecryptionError::BadCiphertext(msg)) => {
705            log::warn!("failed to decrypt 1:1 message: {msg}");
706            return Err(SignalProtocolError::InvalidMessage(
707                original_message_type,
708                "failed to decrypt",
709            ));
710        }
711    };
712
713    state.clear_unacknowledged_pre_key_message();
714
715    Ok(ptext)
716}
717
718fn get_or_create_chain_key<R: Rng + CryptoRng>(
719    state: &mut SessionState,
720    their_ephemeral: &PublicKey,
721    remote_address: &ProtocolAddress,
722    csprng: &mut R,
723) -> Result<ChainKey> {
724    if let Some(chain) = state.get_receiver_chain_key(their_ephemeral)? {
725        log::debug!("{remote_address} has existing receiver chain.");
726        return Ok(chain);
727    }
728
729    log::info!("{remote_address} creating new chains.");
730
731    let root_key = state.root_key()?;
732    let our_ephemeral = state.sender_ratchet_private_key()?;
733    let receiver_chain = root_key.create_chain(their_ephemeral, &our_ephemeral)?;
734    let our_new_ephemeral = KeyPair::generate(csprng);
735    let sender_chain = receiver_chain
736        .0
737        .create_chain(their_ephemeral, &our_new_ephemeral.private_key)?;
738
739    state.set_root_key(&sender_chain.0);
740    state.add_receiver_chain(their_ephemeral, &receiver_chain.1);
741
742    let current_index = state.get_sender_chain_key()?.index();
743    let previous_index = if current_index > 0 {
744        current_index - 1
745    } else {
746        0
747    };
748    state.set_previous_counter(previous_index);
749    state.set_sender_chain(&our_new_ephemeral, &sender_chain.1);
750
751    Ok(receiver_chain.1)
752}
753
754fn get_or_create_message_key(
755    state: &mut SessionState,
756    their_ephemeral: &PublicKey,
757    remote_address: &ProtocolAddress,
758    original_message_type: CiphertextMessageType,
759    chain_key: &ChainKey,
760    counter: u32,
761) -> Result<MessageKeyGenerator> {
762    let chain_index = chain_key.index();
763
764    if chain_index > counter {
765        return match state.get_message_keys(their_ephemeral, counter)? {
766            Some(keys) => Ok(keys),
767            None => {
768                log::info!("{remote_address} Duplicate message for counter: {counter}");
769                Err(SignalProtocolError::DuplicatedMessage(chain_index, counter))
770            }
771        };
772    }
773
774    assert!(chain_index <= counter);
775
776    let jump = (counter - chain_index) as usize;
777
778    if jump > MAX_FORWARD_JUMPS {
779        if state.session_with_self()? {
780            log::info!(
781                "{remote_address} Jumping ahead {jump} messages (index: {chain_index}, counter: {counter})"
782            );
783        } else {
784            log::error!(
785                "{remote_address} Exceeded future message limit: {MAX_FORWARD_JUMPS}, index: {chain_index}, counter: {counter})"
786            );
787            return Err(SignalProtocolError::InvalidMessage(
788                original_message_type,
789                "message from too far into the future",
790            ));
791        }
792    }
793
794    let mut chain_key = chain_key.clone();
795
796    while chain_key.index() < counter {
797        let message_keys = chain_key.message_keys();
798        state.set_message_keys(their_ephemeral, message_keys)?;
799        chain_key = chain_key.next_chain_key();
800    }
801
802    state.set_receiver_chain_key(their_ephemeral, &chain_key.next_chain_key())?;
803    Ok(chain_key.message_keys())
804}