main.rs 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102
  1. mod commands;
  2. mod db;
  3. mod dispatcher;
  4. mod error;
  5. mod macros;
  6. mod value;
  7. use bytes::{Buf, Bytes, BytesMut};
  8. use dispatcher::Dispatcher;
  9. use futures::SinkExt;
  10. use redis_zero_parser::{parse_server, Error as RedisError};
  11. use std::env;
  12. use std::error::Error;
  13. use std::ops::Deref;
  14. use std::{io, sync::Arc};
  15. use tokio::net::TcpListener;
  16. use tokio_stream::StreamExt;
  17. use tokio_util::codec::{Decoder, Encoder, Framed};
  18. use value::Value;
  19. #[tokio::main]
  20. async fn main() -> Result<(), Box<dyn Error>> {
  21. let addr = env::args()
  22. .nth(1)
  23. .unwrap_or_else(|| "127.0.0.1:8080".to_string());
  24. let listener = TcpListener::bind(&addr).await?;
  25. println!("Listening on: {}", addr);
  26. let db = Arc::new(db::Db::new(12));
  27. loop {
  28. match listener.accept().await {
  29. Ok((socket, _)) => {
  30. let db = db.clone();
  31. tokio::spawn(async move {
  32. let mut transport = Framed::new(socket, RedisParser);
  33. while let Some(result) = transport.next().await {
  34. match result {
  35. Ok(args) => match Dispatcher::new(&args) {
  36. Ok(handler) => {
  37. let r = handler
  38. .deref()
  39. .execute(&db, &args)
  40. .unwrap_or_else(|x| x.into());
  41. transport.send(r).await;
  42. }
  43. Err(err) => {
  44. let err: Value = err.into();
  45. transport.send(err).await;
  46. }
  47. },
  48. Err(e) => {
  49. println!("error on decoding from socket; error = {:?}", e);
  50. break;
  51. }
  52. }
  53. }
  54. });
  55. }
  56. Err(e) => println!("error accepting socket; error = {:?}", e),
  57. }
  58. }
  59. Ok(())
  60. }
  61. struct RedisParser;
  62. impl Encoder<Value> for RedisParser {
  63. type Error = io::Error;
  64. fn encode(&mut self, response: Value, dst: &mut BytesMut) -> io::Result<()> {
  65. let v: Vec<u8> = response.into();
  66. dst.extend_from_slice(&v);
  67. Ok(())
  68. }
  69. }
  70. impl Decoder for RedisParser {
  71. type Item = Vec<Bytes>;
  72. type Error = io::Error;
  73. fn decode(&mut self, src: &mut BytesMut) -> io::Result<Option<Self::Item>> {
  74. let (frame, proccesed) = {
  75. let (unused, mut val) = match parse_server(src) {
  76. Ok((buf, val)) => (buf, val),
  77. Err(RedisError::Partial) => return Ok(None),
  78. Err(_) => return Err(io::Error::new(io::ErrorKind::Other, "something")),
  79. };
  80. (
  81. val.iter_mut().map(|e| Bytes::copy_from_slice(e)).collect(),
  82. src.len() - unused.len(),
  83. )
  84. };
  85. src.advance(proccesed);
  86. Ok(Some(frame))
  87. }
  88. }