server.rs 9.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301
  1. //! # Server
  2. //!
  3. //! Redis TCP server. This module also includes a simple HTTP server to dump the prometheus
  4. //! metrics.
  5. use crate::{
  6. config::Config,
  7. connection::{connections::Connections, Connection},
  8. db::{pool::Databases, Db},
  9. dispatcher::Dispatcher,
  10. error::Error,
  11. value::Value,
  12. };
  13. use bytes::{Buf, Bytes, BytesMut};
  14. use futures::{future, SinkExt};
  15. use log::{info, trace, warn};
  16. use redis_zero_protocol_parser::{parse_server, Error as RedisError};
  17. use std::{collections::VecDeque, io, sync::Arc};
  18. #[cfg(unix)]
  19. use tokio::net::UnixListener;
  20. use tokio::{
  21. io::{AsyncReadExt, AsyncWriteExt},
  22. net::TcpListener,
  23. time::{sleep, Duration},
  24. };
  25. use tokio_stream::StreamExt;
  26. use tokio_util::codec::{Decoder, Encoder, Framed};
  27. /// Redis Parser Encoder/Decoder
  28. struct RedisParser;
  29. impl Encoder<Value> for RedisParser {
  30. type Error = io::Error;
  31. fn encode(&mut self, response: Value, dst: &mut BytesMut) -> io::Result<()> {
  32. let v: Vec<u8> = response.into();
  33. dst.extend_from_slice(&v);
  34. Ok(())
  35. }
  36. }
  37. impl Decoder for RedisParser {
  38. type Item = VecDeque<Bytes>;
  39. type Error = io::Error;
  40. fn decode(&mut self, src: &mut BytesMut) -> io::Result<Option<Self::Item>> {
  41. let (frame, proccesed) = {
  42. let (unused, val) = match parse_server(src) {
  43. Ok((buf, val)) => (buf, val),
  44. Err(RedisError::Partial) => return Ok(None),
  45. Err(e) => {
  46. log::debug!("{:?}", e);
  47. return Err(io::Error::new(io::ErrorKind::Other, "something"));
  48. }
  49. };
  50. (
  51. val.iter().map(|e| Bytes::copy_from_slice(e)).collect(),
  52. src.len() - unused.len(),
  53. )
  54. };
  55. src.advance(proccesed);
  56. Ok(Some(frame))
  57. }
  58. }
  59. /// Spawn a very simple HTTP server to serve metrics.
  60. ///
  61. /// The incoming HTTP request is discarded and the response is always the metrics in a prometheus
  62. /// format
  63. async fn server_metrics(all_connections: Arc<Connections>) -> Result<(), Error> {
  64. info!("Listening on 127.0.0.1:7878 for metrics");
  65. let listener = tokio::net::TcpListener::bind("127.0.0.1:7878")
  66. .await
  67. .expect("Failed to start metrics server");
  68. let mut globals = std::collections::HashMap::new();
  69. globals.insert("service", "microredis");
  70. loop {
  71. let (mut stream, _) = listener.accept().await.expect("accept client");
  72. let mut buf = vec![0; 1024];
  73. let _ = match stream.read(&mut buf).await {
  74. Ok(n) => n,
  75. Err(_) => continue,
  76. };
  77. let serialized = serde_prometheus::to_string(
  78. &all_connections
  79. .get_dispatcher()
  80. .get_service_metric_registry(),
  81. Some("redis"),
  82. globals.clone(),
  83. )
  84. .unwrap_or_else(|_| "".to_owned());
  85. let response = format!(
  86. "HTTP/1.1 200 OK\r\nContent-Length: {}\r\n\r\n{}",
  87. serialized.len(),
  88. serialized
  89. );
  90. let _ = stream.write_all(response.as_bytes()).await;
  91. let _ = stream.flush().await;
  92. }
  93. }
  94. /// Spawn the TCP/IP micro-redis server.
  95. async fn serve_tcp(
  96. addr: &str,
  97. default_db: Arc<Db>,
  98. all_connections: Arc<Connections>,
  99. ) -> Result<(), Error> {
  100. let listener = TcpListener::bind(addr).await?;
  101. info!("Starting server {}", addr);
  102. info!("Ready to accept connections on {}", addr);
  103. loop {
  104. match listener.accept().await {
  105. Ok((socket, addr)) => {
  106. let transport = Framed::new(socket, RedisParser);
  107. let all_connections = all_connections.clone();
  108. let default_db = default_db.clone();
  109. tokio::spawn(async move {
  110. handle_new_connection(transport, all_connections, default_db, addr).await;
  111. });
  112. }
  113. Err(e) => println!("error accepting socket; error = {:?}", e),
  114. }
  115. }
  116. }
  117. #[cfg(unix)]
  118. async fn serve_unixsocket(
  119. file: &str,
  120. default_db: Arc<Db>,
  121. all_connections: Arc<Connections>,
  122. ) -> Result<(), Error> {
  123. use std::fs::remove_file;
  124. info!("Ready to accept connections on unix://{}", file);
  125. let _ = remove_file(file);
  126. let listener = UnixListener::bind(file)?;
  127. loop {
  128. match listener.accept().await {
  129. Ok((socket, addr)) => {
  130. let transport = Framed::new(socket, RedisParser);
  131. let all_connections = all_connections.clone();
  132. let default_db = default_db.clone();
  133. tokio::spawn(async move {
  134. handle_new_connection(
  135. transport,
  136. all_connections,
  137. default_db,
  138. addr.as_pathname()
  139. .and_then(|p| p.to_str())
  140. .unwrap_or_default(),
  141. )
  142. .await;
  143. });
  144. }
  145. Err(e) => println!("error accepting socket; error = {:?}", e),
  146. }
  147. }
  148. }
  149. #[inline]
  150. async fn execute_command(
  151. conn: &Connection,
  152. dispatcher: &Dispatcher,
  153. args: VecDeque<Bytes>,
  154. ) -> Option<Value> {
  155. match dispatcher.execute(conn, args).await {
  156. Ok(result) => Some(result),
  157. Err(Error::EmptyLine) => Some(Value::Ignore),
  158. Err(Error::Quit) => None,
  159. Err(err) => Some(err.into()),
  160. }
  161. }
  162. /// Handles a new connection
  163. ///
  164. /// The new connection can be created from a new TCP or Unix stream.
  165. #[inline]
  166. async fn handle_new_connection<T: AsyncReadExt + AsyncWriteExt + Unpin, A: ToString>(
  167. mut transport: Framed<T, RedisParser>,
  168. all_connections: Arc<Connections>,
  169. default_db: Arc<Db>,
  170. addr: A,
  171. ) {
  172. let (mut pubsub, conn) = all_connections.new_connection(default_db, addr);
  173. let dispatcher = all_connections.get_dispatcher();
  174. // Commands are being buffered when the client is blocked.
  175. let mut buffered_commands: Vec<VecDeque<Bytes>> = vec![];
  176. trace!("New connection {}", conn.id());
  177. loop {
  178. tokio::select! {
  179. Some(msg) = pubsub.recv() => {
  180. // Pub-sub message
  181. if transport.send(msg).await.is_err() {
  182. break;
  183. }
  184. 'outer: for args in buffered_commands.iter() {
  185. // Client sent commands while the connection was blocked,
  186. // now it is time to process them one by one
  187. match execute_command(&conn, &dispatcher, args.clone()).await {
  188. Some(result) => if result != Value::Ignore && transport.send(result).await.is_err() {
  189. break 'outer;
  190. },
  191. None => {
  192. let _ = transport.send(Value::Ok).await;
  193. break 'outer;
  194. }
  195. }
  196. }
  197. buffered_commands.clear();
  198. }
  199. result = transport.next() => match result {
  200. Some(Ok(args)) => {
  201. if conn.is_blocked() {
  202. buffered_commands.push(args);
  203. continue;
  204. }
  205. match execute_command(&conn, &dispatcher, args).await {
  206. Some(result) => if result != Value::Ignore && transport.send(result).await.is_err() {
  207. break;
  208. },
  209. None => {
  210. let _ = transport.send(Value::Ok).await;
  211. break;
  212. }
  213. };
  214. },
  215. Some(Err(e)) => {
  216. warn!("error on decoding from socket; error = {:?}", e);
  217. break;
  218. },
  219. None => break,
  220. }
  221. }
  222. }
  223. conn.destroy();
  224. }
  225. /// Spawn redis server
  226. ///
  227. /// Spawn a redis server. This function will create Connections object, the in-memory database, the
  228. /// purge process and the TCP server.
  229. ///
  230. /// This process is also listening for any incoming message through the internal pub-sub.
  231. ///
  232. /// This function will block the main thread and will never exit.
  233. pub async fn serve(config: Config) -> Result<(), Error> {
  234. let (default_db, all_dbs) = Databases::new(16, 1000);
  235. let all_connections = Arc::new(Connections::new(all_dbs.clone()));
  236. let all_connections_for_metrics = all_connections.clone();
  237. all_dbs
  238. .into_iter()
  239. .map(|db_for_purging| {
  240. tokio::spawn(async move {
  241. loop {
  242. db_for_purging.purge();
  243. sleep(Duration::from_millis(5000)).await;
  244. }
  245. });
  246. })
  247. .for_each(drop);
  248. let mut services = vec![tokio::spawn(async move {
  249. server_metrics(all_connections_for_metrics).await
  250. })];
  251. config
  252. .get_tcp_hostnames()
  253. .iter()
  254. .map(|host| {
  255. let default_db = default_db.clone();
  256. let all_connections = all_connections.clone();
  257. let host = host.clone();
  258. services.push(tokio::spawn(async move {
  259. serve_tcp(&host, default_db, all_connections).await
  260. }));
  261. })
  262. .for_each(drop);
  263. #[cfg(unix)]
  264. if let Some(file) = config.unixsocket {
  265. services.push(tokio::spawn(async move {
  266. serve_unixsocket(&file, default_db, all_connections).await
  267. }))
  268. }
  269. future::join_all(services).await;
  270. Ok(())
  271. }