Ver código fonte

Move server logic out of main.rs

Cesar Rodas 3 anos atrás
pai
commit
5c9c8d0075
5 arquivos alterados com 105 adições e 101 exclusões
  1. 1 3
      src/cmd/key.rs
  2. 8 6
      src/db/mod.rs
  3. 3 91
      src/main.rs
  4. 92 0
      src/server.rs
  5. 1 1
      src/value.rs

+ 1 - 3
src/cmd/key.rs

@@ -15,9 +15,7 @@ pub fn expire(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
 
     let expires_at = Duration::new(expires_at as u64, 0);
 
-    Ok(conn
-        .db()
-        .expire(&args[1], expires_at))
+    Ok(conn.db().expire(&args[1], expires_at))
 }
 
 pub fn persist(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {

+ 8 - 6
src/db/mod.rs

@@ -143,12 +143,14 @@ impl Db {
 
     pub fn del(&self, keys: &[Bytes]) -> Value {
         let mut deleted = 0_i64;
-        keys.iter().map(|key| {
-            let mut entries = self.entries[self.get_slot(key)].write().unwrap();
-            if entries.remove(key).is_some() {
-                deleted += 1;
-            }
-        }).for_each(drop);
+        keys.iter()
+            .map(|key| {
+                let mut entries = self.entries[self.get_slot(key)].write().unwrap();
+                if entries.remove(key).is_some() {
+                    deleted += 1;
+                }
+            })
+            .for_each(drop);
 
         deleted.into()
     }

+ 3 - 91
src/main.rs

@@ -4,19 +4,10 @@ mod db;
 mod dispatcher;
 mod error;
 mod macros;
+mod server;
 mod value;
 
-use bytes::{Buf, Bytes, BytesMut};
-use connection::Connections;
-use dispatcher::Dispatcher;
-use futures::SinkExt;
-use log::{info, trace, warn};
-use redis_zero_protocol_parser::{parse_server, Error as RedisError};
-use std::{env, error::Error, io, ops::Deref, sync::Arc};
-use tokio::net::TcpListener;
-use tokio_stream::StreamExt;
-use tokio_util::codec::{Decoder, Encoder, Framed};
-use value::Value;
+use std::{env, error::Error};
 
 #[tokio::main]
 async fn main() -> Result<(), Box<dyn Error>> {
@@ -26,84 +17,5 @@ async fn main() -> Result<(), Box<dyn Error>> {
 
     env_logger::init();
 
-    let listener = TcpListener::bind(&addr).await?;
-    info!("Listening on: {}", addr);
-
-    let db = Arc::new(db::Db::new(1000));
-    let mut all_connections = Connections::new();
-
-    loop {
-        match listener.accept().await {
-            Ok((socket, addr)) => {
-                let conn = all_connections.new_connection(db.clone(), addr);
-
-                tokio::spawn(async move {
-                    let mut transport = Framed::new(socket, RedisParser);
-
-                    trace!("New connection {}", conn.lock().unwrap().id());
-
-                    while let Some(result) = transport.next().await {
-                        match result {
-                            Ok(args) => match Dispatcher::new(&args) {
-                                Ok(handler) => {
-                                    let r = handler
-                                        .deref()
-                                        .execute(&mut conn.lock().unwrap(), &args)
-                                        .unwrap_or_else(|x| x.into());
-                                    if transport.send(r).await.is_err() {
-                                        break;
-                                    }
-                                }
-                                Err(err) => {
-                                    if transport.send(err.into()).await.is_err() {
-                                        break;
-                                    }
-                                }
-                            },
-                            Err(e) => {
-                                warn!("error on decoding from socket; error = {:?}", e);
-                                break;
-                            }
-                        }
-                    }
-                });
-            }
-            Err(e) => println!("error accepting socket; error = {:?}", e),
-        }
-    }
-}
-
-struct RedisParser;
-
-impl Encoder<Value> for RedisParser {
-    type Error = io::Error;
-
-    fn encode(&mut self, response: Value, dst: &mut BytesMut) -> io::Result<()> {
-        let v: Vec<u8> = response.into();
-        dst.extend_from_slice(&v);
-        Ok(())
-    }
-}
-
-impl Decoder for RedisParser {
-    type Item = Vec<Bytes>;
-    type Error = io::Error;
-
-    fn decode(&mut self, src: &mut BytesMut) -> io::Result<Option<Self::Item>> {
-        let (frame, proccesed) = {
-            let (unused, mut val) = match parse_server(src) {
-                Ok((buf, val)) => (buf, val),
-                Err(RedisError::Partial) => return Ok(None),
-                Err(_) => return Err(io::Error::new(io::ErrorKind::Other, "something")),
-            };
-            (
-                val.iter_mut().map(|e| Bytes::copy_from_slice(e)).collect(),
-                src.len() - unused.len(),
-            )
-        };
-
-        src.advance(proccesed);
-
-        Ok(Some(frame))
-    }
+    server::serve(addr).await
 }

+ 92 - 0
src/server.rs

@@ -0,0 +1,92 @@
+use crate::{connection::Connections, dispatcher::Dispatcher, value::Value, db::Db};
+use bytes::{Buf, Bytes, BytesMut};
+use futures::SinkExt;
+use log::{info, trace, warn};
+use redis_zero_protocol_parser::{parse_server, Error as RedisError};
+use std::{error::Error, io, ops::Deref, sync::Arc};
+use tokio::net::TcpListener;
+use tokio_stream::StreamExt;
+use tokio_util::codec::{Decoder, Encoder, Framed};
+
+struct RedisParser;
+
+impl Encoder<Value> for RedisParser {
+    type Error = io::Error;
+
+    fn encode(&mut self, response: Value, dst: &mut BytesMut) -> io::Result<()> {
+        let v: Vec<u8> = response.into();
+        dst.extend_from_slice(&v);
+        Ok(())
+    }
+}
+
+impl Decoder for RedisParser {
+    type Item = Vec<Bytes>;
+    type Error = io::Error;
+
+    fn decode(&mut self, src: &mut BytesMut) -> io::Result<Option<Self::Item>> {
+        let (frame, proccesed) = {
+            let (unused, mut val) = match parse_server(src) {
+                Ok((buf, val)) => (buf, val),
+                Err(RedisError::Partial) => return Ok(None),
+                Err(_) => return Err(io::Error::new(io::ErrorKind::Other, "something")),
+            };
+            (
+                val.iter_mut().map(|e| Bytes::copy_from_slice(e)).collect(),
+                src.len() - unused.len(),
+            )
+        };
+
+        src.advance(proccesed);
+
+        Ok(Some(frame))
+    }
+}
+
+pub async fn serve(addr: String) -> Result<(), Box<dyn Error>> {
+    let listener = TcpListener::bind(&addr).await?;
+    info!("Listening on: {}", addr);
+
+    let db = Arc::new(Db::new(1000));
+    let mut all_connections = Connections::new();
+
+    loop {
+        match listener.accept().await {
+            Ok((socket, addr)) => {
+                let conn = all_connections.new_connection(db.clone(), addr);
+
+                tokio::spawn(async move {
+                    let mut transport = Framed::new(socket, RedisParser);
+
+                    trace!("New connection {}", conn.lock().unwrap().id());
+
+                    while let Some(result) = transport.next().await {
+                        match result {
+                            Ok(args) => match Dispatcher::new(&args) {
+                                Ok(handler) => {
+                                    let r = handler
+                                        .deref()
+                                        .execute(&mut conn.lock().unwrap(), &args)
+                                        .unwrap_or_else(|x| x.into());
+                                    if transport.send(r).await.is_err() {
+                                        break;
+                                    }
+                                }
+                                Err(err) => {
+                                    if transport.send(err.into()).await.is_err() {
+                                        break;
+                                    }
+                                }
+                            },
+                            Err(e) => {
+                                warn!("error on decoding from socket; error = {:?}", e);
+                                break;
+                            }
+                        }
+                    }
+                });
+            }
+            Err(e) => println!("error accepting socket; error = {:?}", e),
+        }
+    }
+}

+ 1 - 1
src/value.rs

@@ -3,7 +3,7 @@ use bytes::{Bytes, BytesMut};
 use redis_zero_protocol_parser::Value as ParsedValue;
 use std::{
     convert::{TryFrom, TryInto},
-    str::FromStr
+    str::FromStr,
 };
 
 #[derive(Debug, PartialEq, Clone)]