Преглед на файлове

Close websocket connections sooner (#1050)

* Close websocket connections sooner

* Handle Ping,  Binary and Close
C преди 1 месец
родител
ревизия
62de0c9925
променени са 1 файла, в които са добавени 45 реда и са изтрити 7 реда
  1. 45 7
      crates/cdk-axum/src/ws/mod.rs

+ 45 - 7
crates/cdk-axum/src/ws/mod.rs

@@ -1,6 +1,6 @@
 use std::collections::HashMap;
 
-use axum::extract::ws::{Message, WebSocket};
+use axum::extract::ws::{CloseFrame, Message, WebSocket};
 use cdk::mint::QuoteId;
 use cdk::nuts::nut17::NotificationPayload;
 use cdk::pub_sub::SubId;
@@ -74,12 +74,48 @@ pub async fn main_websocket(mut socket: WebSocket, state: MintState) {
                     }
                 };
 
-          if let Err(err)= socket.send(Message::Text(message.into())).await {
-                tracing::error!("Could not send websocket message: {}", err);
-                break;
-          }
+                if let Err(err)= socket.send(Message::Text(message.into())).await {
+                    tracing::error!("Could not send websocket message: {}", err);
+                    break;
+                }
             }
-            Some(Ok(Message::Text(text))) = socket.next() => {
+
+            Some(from_ws) = socket.next() => {
+                let text = match from_ws {
+                    Ok(Message::Text(text)) => text.to_string(),
+                    Ok(Message::Binary(bin)) => String::from_utf8_lossy(&bin).to_string(),
+                    Ok(Message::Ping(payload)) => {
+                        // Reply with Pong with same payload
+                        if let Err(e) = socket.send(Message::Pong(payload)).await {
+                            tracing::error!("failed to send pong: {e}");
+                            break;
+                        }
+                        continue;
+                    },
+                    Ok(Message::Pong(_payload)) => {
+                        tracing::error!("Unexpected pong");
+                        continue;
+                    },
+                    Ok(Message::Close(frame)) => {
+                        if let Some(CloseFrame { code, reason }) = frame {
+                            tracing::info!("ws-close: code={code:?} reason='{reason}'");
+                        } else {
+                            tracing::info!("ws-close: no frame");
+                        }
+
+                        let _ = socket.send(Message::Close(Some(CloseFrame {
+                            code: axum::extract::ws::close_code::NORMAL,
+                            reason: "bye!".into(),
+                        }))).await;
+                        break;
+                    }
+                    Err(err) => {
+                        tracing::error!("ws-error: {err}");
+                        break;
+                    }
+                };
+
+
                 let request = match serde_json::from_str::<WsRequest>(&text) {
                     Ok(request) => request,
                     Err(err) => {
@@ -105,7 +141,9 @@ pub async fn main_websocket(mut socket: WebSocket, state: MintState) {
                 }
             }
             else =>  {
-
+                // Unexpected, we should exit the loop
+                tracing::warn!("Unexpected event, closing ws");
+                break;
             }
         }
     }