libsignal_service/
messagepipe.rs

1use futures::{
2    channel::{
3        mpsc::{self, Sender},
4        oneshot,
5    },
6    prelude::*,
7};
8use prost::Message;
9
10pub use crate::{
11    configuration::ServiceCredentials,
12    proto::{
13        web_socket_message, Envelope, WebSocketMessage,
14        WebSocketRequestMessage, WebSocketResponseMessage,
15    },
16};
17
18use crate::{
19    push_service::ServiceError,
20    websocket::{self, SignalWebSocket},
21};
22
23#[derive(Debug)]
24#[allow(clippy::large_enum_variant)]
25pub enum Incoming {
26    Envelope(Envelope),
27    QueueEmpty,
28}
29
30pub struct MessagePipe {
31    ws: SignalWebSocket<websocket::Identified>,
32}
33
34impl MessagePipe {
35    pub fn from_socket(ws: SignalWebSocket<websocket::Identified>) -> Self {
36        MessagePipe { ws }
37    }
38
39    /// Return a SignalWebSocket for sending messages and other purposes beyond receiving messages.
40    pub fn ws(&self) -> SignalWebSocket<websocket::Identified> {
41        self.ws.clone()
42    }
43
44    /// Worker task that processes the websocket into Envelopes
45    async fn run(
46        mut self,
47        mut sink: Sender<Result<Incoming, ServiceError>>,
48    ) -> Result<(), mpsc::SendError> {
49        let mut ws = self.ws.clone();
50        let mut stream = ws
51            .take_request_stream()
52            .expect("web socket request handler not in use");
53
54        while let Some((request, responder)) = stream.next().await {
55            // WebsocketConnection::onMessage(ByteString)
56            if let Some(env) =
57                self.process_request(request, responder).await.transpose()
58            {
59                sink.send(env).await?;
60            } else {
61                tracing::trace!("got empty message in websocket");
62            }
63        }
64
65        ws.return_request_stream(stream);
66
67        Ok(())
68    }
69
70    async fn process_request(
71        &mut self,
72        request: WebSocketRequestMessage,
73        responder: oneshot::Sender<WebSocketResponseMessage>,
74    ) -> Result<Option<Incoming>, ServiceError> {
75        // Java: MessagePipe::read
76        let response = WebSocketResponseMessage::from_request(&request);
77
78        // XXX Change the signature of this method to yield an enum of Envelope and EndOfQueue
79        let result = if request.is_signal_service_envelope() {
80            let body = if let Some(body) = request.body.as_ref() {
81                body
82            } else {
83                return Err(ServiceError::InvalidFrame {
84                    reason: "request without body.",
85                });
86            };
87            Some(Incoming::Envelope(Envelope::decode(body.as_slice())?))
88        } else if request.is_queue_empty() {
89            Some(Incoming::QueueEmpty)
90        } else {
91            None
92        };
93
94        responder
95            .send(response)
96            .map_err(|_| ServiceError::WsClosing {
97                reason: "could not respond to message pipe request",
98            })?;
99
100        Ok(result)
101    }
102
103    /// Returns the stream of `Envelope`s
104    ///
105    /// Envelopes yielded are acknowledged.
106    pub fn stream(self) -> impl Stream<Item = Result<Incoming, ServiceError>> {
107        let (sink, stream) = mpsc::channel(1);
108
109        let stream = stream.map(Some);
110        let runner = self.run(sink).map(|e| {
111            tracing::info!("sink was closed: {:?}", e);
112            None
113        });
114
115        let combined = futures::stream::select(stream, runner.into_stream());
116        combined.filter_map(|x| async { x })
117    }
118}