libsignal_service/
messagepipe.rs1use futures::{
2 channel::{
3 mpsc::{self, Sender},
4 oneshot,
5 },
6 prelude::*,
7};
8
9pub use crate::{
10 configuration::ServiceCredentials,
11 proto::{
12 web_socket_message, Envelope, WebSocketMessage,
13 WebSocketRequestMessage, WebSocketResponseMessage,
14 },
15};
16
17use crate::{push_service::ServiceError, websocket::SignalWebSocket};
18
19#[derive(Debug)]
20pub enum Incoming {
21 Envelope(Envelope),
22 QueueEmpty,
23}
24
25pub struct MessagePipe {
26 ws: SignalWebSocket,
27 credentials: ServiceCredentials,
28}
29
30impl MessagePipe {
31 pub fn from_socket(
32 ws: SignalWebSocket,
33 credentials: ServiceCredentials,
34 ) -> Self {
35 MessagePipe { ws, credentials }
36 }
37
38 pub fn ws(&self) -> SignalWebSocket {
40 self.ws.clone()
41 }
42
43 async fn run(
45 mut self,
46 mut sink: Sender<Result<Incoming, ServiceError>>,
47 ) -> Result<(), mpsc::SendError> {
48 let mut ws = self.ws.clone();
49 let mut stream = ws
50 .take_request_stream()
51 .expect("web socket request handler not in use");
52
53 while let Some((request, responder)) = stream.next().await {
54 if let Some(env) =
56 self.process_request(request, responder).await.transpose()
57 {
58 sink.send(env).await?;
59 } else {
60 tracing::trace!("got empty message in websocket");
61 }
62 }
63
64 ws.return_request_stream(stream);
65
66 Ok(())
67 }
68
69 async fn process_request(
70 &mut self,
71 request: WebSocketRequestMessage,
72 responder: oneshot::Sender<WebSocketResponseMessage>,
73 ) -> Result<Option<Incoming>, ServiceError> {
74 let response = WebSocketResponseMessage::from_request(&request);
76
77 let result = if request.is_signal_service_envelope() {
79 let body = if let Some(body) = request.body.as_ref() {
80 body
81 } else {
82 return Err(ServiceError::InvalidFrame {
83 reason: "request without body.",
84 });
85 };
86 Some(Incoming::Envelope(Envelope::decrypt(
87 body,
88 self.credentials.signaling_key.as_ref(),
89 request.is_signal_key_encrypted(),
90 )?))
91 } else if request.is_queue_empty() {
92 Some(Incoming::QueueEmpty)
93 } else {
94 None
95 };
96
97 responder
98 .send(response)
99 .map_err(|_| ServiceError::WsClosing {
100 reason: "could not respond to message pipe request",
101 })?;
102
103 Ok(result)
104 }
105
106 pub fn stream(self) -> impl Stream<Item = Result<Incoming, ServiceError>> {
110 let (sink, stream) = mpsc::channel(1);
111
112 let stream = stream.map(Some);
113 let runner = self.run(sink).map(|e| {
114 tracing::info!("sink was closed: {:?}", e);
115 None
116 });
117
118 let combined = futures::stream::select(stream, runner.into_stream());
119 combined.filter_map(|x| async { x })
120 }
121}