libsignal_service/
messagepipe.rs1use futures::{
2 channel::{
3 mpsc::{self, Sender},
4 oneshot,
5 },
6 prelude::*,
7};
8
9pub use crate::{
10 configuration::ServiceCredentials,
11 proto::{
12 web_socket_message, Envelope, WebSocketMessage,
13 WebSocketRequestMessage, WebSocketResponseMessage,
14 },
15};
16
17use crate::{
18 push_service::ServiceError,
19 websocket::{self, SignalWebSocket},
20};
21
22#[derive(Debug)]
23pub enum Incoming {
24 Envelope(Envelope),
25 QueueEmpty,
26}
27
28pub struct MessagePipe {
29 ws: SignalWebSocket<websocket::Identified>,
30 credentials: ServiceCredentials,
31}
32
33impl MessagePipe {
34 pub fn from_socket(
35 ws: SignalWebSocket<websocket::Identified>,
36 credentials: ServiceCredentials,
37 ) -> Self {
38 MessagePipe { ws, credentials }
39 }
40
41 pub fn ws(&self) -> SignalWebSocket<websocket::Identified> {
43 self.ws.clone()
44 }
45
46 async fn run(
48 mut self,
49 mut sink: Sender<Result<Incoming, ServiceError>>,
50 ) -> Result<(), mpsc::SendError> {
51 let mut ws = self.ws.clone();
52 let mut stream = ws
53 .take_request_stream()
54 .expect("web socket request handler not in use");
55
56 while let Some((request, responder)) = stream.next().await {
57 if let Some(env) =
59 self.process_request(request, responder).await.transpose()
60 {
61 sink.send(env).await?;
62 } else {
63 tracing::trace!("got empty message in websocket");
64 }
65 }
66
67 ws.return_request_stream(stream);
68
69 Ok(())
70 }
71
72 async fn process_request(
73 &mut self,
74 request: WebSocketRequestMessage,
75 responder: oneshot::Sender<WebSocketResponseMessage>,
76 ) -> Result<Option<Incoming>, ServiceError> {
77 let response = WebSocketResponseMessage::from_request(&request);
79
80 let result = if request.is_signal_service_envelope() {
82 let body = if let Some(body) = request.body.as_ref() {
83 body
84 } else {
85 return Err(ServiceError::InvalidFrame {
86 reason: "request without body.",
87 });
88 };
89 Some(Incoming::Envelope(Envelope::decrypt(
90 body,
91 self.credentials.signaling_key.as_ref(),
92 request.is_signal_key_encrypted(),
93 )?))
94 } else if request.is_queue_empty() {
95 Some(Incoming::QueueEmpty)
96 } else {
97 None
98 };
99
100 responder
101 .send(response)
102 .map_err(|_| ServiceError::WsClosing {
103 reason: "could not respond to message pipe request",
104 })?;
105
106 Ok(result)
107 }
108
109 pub fn stream(self) -> impl Stream<Item = Result<Incoming, ServiceError>> {
113 let (sink, stream) = mpsc::channel(1);
114
115 let stream = stream.map(Some);
116 let runner = self.run(sink).map(|e| {
117 tracing::info!("sink was closed: {:?}", e);
118 None
119 });
120
121 let combined = futures::stream::select(stream, runner.into_stream());
122 combined.filter_map(|x| async { x })
123 }
124}