libsignal_service/
messagepipe.rs

1use futures::{
2    channel::{
3        mpsc::{self, Sender},
4        oneshot,
5    },
6    prelude::*,
7};
8
9pub use crate::{
10    configuration::ServiceCredentials,
11    proto::{
12        web_socket_message, Envelope, WebSocketMessage,
13        WebSocketRequestMessage, WebSocketResponseMessage,
14    },
15};
16
17use crate::{push_service::ServiceError, websocket::SignalWebSocket};
18
19#[derive(Debug)]
20pub enum Incoming {
21    Envelope(Envelope),
22    QueueEmpty,
23}
24
25pub struct MessagePipe {
26    ws: SignalWebSocket,
27    credentials: ServiceCredentials,
28}
29
30impl MessagePipe {
31    pub fn from_socket(
32        ws: SignalWebSocket,
33        credentials: ServiceCredentials,
34    ) -> Self {
35        MessagePipe { ws, credentials }
36    }
37
38    /// Return a SignalWebSocket for sending messages and other purposes beyond receiving messages.
39    pub fn ws(&self) -> SignalWebSocket {
40        self.ws.clone()
41    }
42
43    /// Worker task that processes the websocket into Envelopes
44    async fn run(
45        mut self,
46        mut sink: Sender<Result<Incoming, ServiceError>>,
47    ) -> Result<(), mpsc::SendError> {
48        let mut ws = self.ws.clone();
49        let mut stream = ws
50            .take_request_stream()
51            .expect("web socket request handler not in use");
52
53        while let Some((request, responder)) = stream.next().await {
54            // WebsocketConnection::onMessage(ByteString)
55            if let Some(env) =
56                self.process_request(request, responder).await.transpose()
57            {
58                sink.send(env).await?;
59            } else {
60                tracing::trace!("got empty message in websocket");
61            }
62        }
63
64        ws.return_request_stream(stream);
65
66        Ok(())
67    }
68
69    async fn process_request(
70        &mut self,
71        request: WebSocketRequestMessage,
72        responder: oneshot::Sender<WebSocketResponseMessage>,
73    ) -> Result<Option<Incoming>, ServiceError> {
74        // Java: MessagePipe::read
75        let response = WebSocketResponseMessage::from_request(&request);
76
77        // XXX Change the signature of this method to yield an enum of Envelope and EndOfQueue
78        let result = if request.is_signal_service_envelope() {
79            let body = if let Some(body) = request.body.as_ref() {
80                body
81            } else {
82                return Err(ServiceError::InvalidFrame {
83                    reason: "request without body.",
84                });
85            };
86            Some(Incoming::Envelope(Envelope::decrypt(
87                body,
88                self.credentials.signaling_key.as_ref(),
89                request.is_signal_key_encrypted(),
90            )?))
91        } else if request.is_queue_empty() {
92            Some(Incoming::QueueEmpty)
93        } else {
94            None
95        };
96
97        responder
98            .send(response)
99            .map_err(|_| ServiceError::WsClosing {
100                reason: "could not respond to message pipe request",
101            })?;
102
103        Ok(result)
104    }
105
106    /// Returns the stream of `Envelope`s
107    ///
108    /// Envelopes yielded are acknowledged.
109    pub fn stream(self) -> impl Stream<Item = Result<Incoming, ServiceError>> {
110        let (sink, stream) = mpsc::channel(1);
111
112        let stream = stream.map(Some);
113        let runner = self.run(sink).map(|e| {
114            tracing::info!("sink was closed: {:?}", e);
115            None
116        });
117
118        let combined = futures::stream::select(stream, runner.into_stream());
119        combined.filter_map(|x| async { x })
120    }
121}