libsignal_service/
messagepipe.rs1use futures::{
2 channel::{
3 mpsc::{self, Sender},
4 oneshot,
5 },
6 prelude::*,
7};
8use prost::Message;
9
10pub use crate::{
11 configuration::ServiceCredentials,
12 proto::{
13 web_socket_message, Envelope, WebSocketMessage,
14 WebSocketRequestMessage, WebSocketResponseMessage,
15 },
16};
17
18use crate::{
19 push_service::ServiceError,
20 websocket::{self, SignalWebSocket},
21};
22
23#[derive(Debug)]
24#[allow(clippy::large_enum_variant)]
25pub enum Incoming {
26 Envelope(Envelope),
27 QueueEmpty,
28}
29
30pub struct MessagePipe {
31 ws: SignalWebSocket<websocket::Identified>,
32}
33
34impl MessagePipe {
35 pub fn from_socket(ws: SignalWebSocket<websocket::Identified>) -> Self {
36 MessagePipe { ws }
37 }
38
39 pub fn ws(&self) -> SignalWebSocket<websocket::Identified> {
41 self.ws.clone()
42 }
43
44 async fn run(
46 mut self,
47 mut sink: Sender<Result<Incoming, ServiceError>>,
48 ) -> Result<(), mpsc::SendError> {
49 let mut ws = self.ws.clone();
50 let mut stream = ws
51 .take_request_stream()
52 .expect("web socket request handler not in use");
53
54 while let Some((request, responder)) = stream.next().await {
55 if let Some(env) =
57 self.process_request(request, responder).await.transpose()
58 {
59 sink.send(env).await?;
60 } else {
61 tracing::trace!("got empty message in websocket");
62 }
63 }
64
65 ws.return_request_stream(stream);
66
67 Ok(())
68 }
69
70 async fn process_request(
71 &mut self,
72 request: WebSocketRequestMessage,
73 responder: oneshot::Sender<WebSocketResponseMessage>,
74 ) -> Result<Option<Incoming>, ServiceError> {
75 let response = WebSocketResponseMessage::from_request(&request);
77
78 let result = if request.is_signal_service_envelope() {
80 let body = if let Some(body) = request.body.as_ref() {
81 body
82 } else {
83 return Err(ServiceError::InvalidFrame {
84 reason: "request without body.",
85 });
86 };
87 Some(Incoming::Envelope(Envelope::decode(body.as_slice())?))
88 } else if request.is_queue_empty() {
89 Some(Incoming::QueueEmpty)
90 } else {
91 None
92 };
93
94 responder
95 .send(response)
96 .map_err(|_| ServiceError::WsClosing {
97 reason: "could not respond to message pipe request",
98 })?;
99
100 Ok(result)
101 }
102
103 pub fn stream(self) -> impl Stream<Item = Result<Incoming, ServiceError>> {
107 let (sink, stream) = mpsc::channel(1);
108
109 let stream = stream.map(Some);
110 let runner = self.run(sink).map(|e| {
111 tracing::info!("sink was closed: {:?}", e);
112 None
113 });
114
115 let combined = futures::stream::select(stream, runner.into_stream());
116 combined.filter_map(|x| async { x })
117 }
118}