1
0

server.rs 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106
  1. use crate::{connection::Connections, db::Db, dispatcher::Dispatcher, value::Value};
  2. use bytes::{Buf, Bytes, BytesMut};
  3. use futures::SinkExt;
  4. use log::{info, trace, warn};
  5. use redis_zero_protocol_parser::{parse_server, Error as RedisError};
  6. use std::{error::Error, io, ops::Deref, sync::Arc};
  7. use tokio::{
  8. net::TcpListener,
  9. time::{sleep, Duration},
  10. };
  11. use tokio_stream::StreamExt;
  12. use tokio_util::codec::{Decoder, Encoder, Framed};
  13. struct RedisParser;
  14. impl Encoder<Value> for RedisParser {
  15. type Error = io::Error;
  16. fn encode(&mut self, response: Value, dst: &mut BytesMut) -> io::Result<()> {
  17. let v: Vec<u8> = response.into();
  18. dst.extend_from_slice(&v);
  19. Ok(())
  20. }
  21. }
  22. impl Decoder for RedisParser {
  23. type Item = Vec<Bytes>;
  24. type Error = io::Error;
  25. fn decode(&mut self, src: &mut BytesMut) -> io::Result<Option<Self::Item>> {
  26. let (frame, proccesed) = {
  27. let (unused, val) = match parse_server(src) {
  28. Ok((buf, val)) => (buf, val),
  29. Err(RedisError::Partial) => return Ok(None),
  30. Err(_) => return Err(io::Error::new(io::ErrorKind::Other, "something")),
  31. };
  32. (
  33. val.iter().map(|e| Bytes::copy_from_slice(e)).collect(),
  34. src.len() - unused.len(),
  35. )
  36. };
  37. src.advance(proccesed);
  38. Ok(Some(frame))
  39. }
  40. }
  41. pub async fn serve(addr: String) -> Result<(), Box<dyn Error>> {
  42. let listener = TcpListener::bind(&addr).await?;
  43. info!("Listening on: {}", addr);
  44. let db = Arc::new(Db::new(1000));
  45. let all_connections = Arc::new(Connections::new());
  46. let db_for_purging = db.clone();
  47. tokio::spawn(async move {
  48. loop {
  49. db_for_purging.purge();
  50. sleep(Duration::from_millis(5000)).await;
  51. }
  52. });
  53. loop {
  54. match listener.accept().await {
  55. Ok((socket, addr)) => {
  56. let conn = all_connections.new_connection(db.clone(), addr);
  57. tokio::spawn(async move {
  58. let mut transport = Framed::new(socket, RedisParser);
  59. trace!("New connection {}", conn.id());
  60. while let Some(result) = transport.next().await {
  61. match result {
  62. Ok(args) => match Dispatcher::new(&args) {
  63. Ok(handler) => {
  64. let r = handler
  65. .deref()
  66. .execute(&conn, &args)
  67. .await
  68. .unwrap_or_else(|x| x.into());
  69. if transport.send(r).await.is_err() {
  70. break;
  71. }
  72. }
  73. Err(err) => {
  74. if transport.send(err.into()).await.is_err() {
  75. break;
  76. }
  77. }
  78. },
  79. Err(e) => {
  80. warn!("error on decoding from socket; error = {:?}", e);
  81. break;
  82. }
  83. }
  84. }
  85. conn.destroy();
  86. });
  87. }
  88. Err(e) => println!("error accepting socket; error = {:?}", e),
  89. }
  90. }
  91. }