libsignal_service/
messagepipe.rs

1use futures::{
2    channel::{
3        mpsc::{self, Sender},
4        oneshot,
5    },
6    prelude::*,
7};
8
9pub use crate::{
10    configuration::ServiceCredentials,
11    proto::{
12        web_socket_message, Envelope, WebSocketMessage,
13        WebSocketRequestMessage, WebSocketResponseMessage,
14    },
15};
16
17use crate::{
18    push_service::ServiceError,
19    websocket::{self, SignalWebSocket},
20};
21
22#[derive(Debug)]
23pub enum Incoming {
24    Envelope(Envelope),
25    QueueEmpty,
26}
27
28pub struct MessagePipe {
29    ws: SignalWebSocket<websocket::Identified>,
30    credentials: ServiceCredentials,
31}
32
33impl MessagePipe {
34    pub fn from_socket(
35        ws: SignalWebSocket<websocket::Identified>,
36        credentials: ServiceCredentials,
37    ) -> Self {
38        MessagePipe { ws, credentials }
39    }
40
41    /// Return a SignalWebSocket for sending messages and other purposes beyond receiving messages.
42    pub fn ws(&self) -> SignalWebSocket<websocket::Identified> {
43        self.ws.clone()
44    }
45
46    /// Worker task that processes the websocket into Envelopes
47    async fn run(
48        mut self,
49        mut sink: Sender<Result<Incoming, ServiceError>>,
50    ) -> Result<(), mpsc::SendError> {
51        let mut ws = self.ws.clone();
52        let mut stream = ws
53            .take_request_stream()
54            .expect("web socket request handler not in use");
55
56        while let Some((request, responder)) = stream.next().await {
57            // WebsocketConnection::onMessage(ByteString)
58            if let Some(env) =
59                self.process_request(request, responder).await.transpose()
60            {
61                sink.send(env).await?;
62            } else {
63                tracing::trace!("got empty message in websocket");
64            }
65        }
66
67        ws.return_request_stream(stream);
68
69        Ok(())
70    }
71
72    async fn process_request(
73        &mut self,
74        request: WebSocketRequestMessage,
75        responder: oneshot::Sender<WebSocketResponseMessage>,
76    ) -> Result<Option<Incoming>, ServiceError> {
77        // Java: MessagePipe::read
78        let response = WebSocketResponseMessage::from_request(&request);
79
80        // XXX Change the signature of this method to yield an enum of Envelope and EndOfQueue
81        let result = if request.is_signal_service_envelope() {
82            let body = if let Some(body) = request.body.as_ref() {
83                body
84            } else {
85                return Err(ServiceError::InvalidFrame {
86                    reason: "request without body.",
87                });
88            };
89            Some(Incoming::Envelope(Envelope::decrypt(
90                body,
91                self.credentials.signaling_key.as_ref(),
92                request.is_signal_key_encrypted(),
93            )?))
94        } else if request.is_queue_empty() {
95            Some(Incoming::QueueEmpty)
96        } else {
97            None
98        };
99
100        responder
101            .send(response)
102            .map_err(|_| ServiceError::WsClosing {
103                reason: "could not respond to message pipe request",
104            })?;
105
106        Ok(result)
107    }
108
109    /// Returns the stream of `Envelope`s
110    ///
111    /// Envelopes yielded are acknowledged.
112    pub fn stream(self) -> impl Stream<Item = Result<Incoming, ServiceError>> {
113        let (sink, stream) = mpsc::channel(1);
114
115        let stream = stream.map(Some);
116        let runner = self.run(sink).map(|e| {
117            tracing::info!("sink was closed: {:?}", e);
118            None
119        });
120
121        let combined = futures::stream::select(stream, runner.into_stream());
122        combined.filter_map(|x| async { x })
123    }
124}