Ver código fonte

Make BLMOVE atomic

Lock both keys to alter. Also fixed a bug when the key is already
blocked by the same connection.
Cesar Rodas 2 anos atrás
pai
commit
3057aea937
2 arquivos alterados com 21 adições e 6 exclusões
  1. 18 6
      src/cmd/list.rs
  2. 3 0
      src/db/mod.rs

+ 18 - 6
src/cmd/list.rs

@@ -419,7 +419,12 @@ pub async fn lmove(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
         return Err(Error::Syntax);
     };
 
-    let result = conn.db().get_map_or(
+    let db = conn.db();
+
+    /// Lock keys to alter exclusively
+    db.lock_keys(&args[1..=2]);
+
+    let result = db.get_map_or(
         &args[1],
         |v| match v {
             Value::List(source) => conn.db().get_map_or(
@@ -447,10 +452,11 @@ pub async fn lmove(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
                     _ => Err(Error::WrongType),
                 },
                 || {
+                    let mut source = source.write();
                     let element = if source_is_left {
-                        source.write().pop_front()
+                        source.pop_front()
                     } else {
-                        source.write().pop_back()
+                        source.pop_back()
                     };
 
                     if let Some(element) = element {
@@ -467,11 +473,17 @@ pub async fn lmove(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
             _ => Err(Error::WrongType),
         },
         || Ok(Value::Null),
-    )?;
+    );
 
-    conn.db().bump_version(&args[1]);
+    /// release the lock on keys
+    db.unlock_keys(&args[1..=2]);
 
-    Ok(result)
+    if result != Ok(Value::Null) {
+        conn.db().bump_version(&args[1]);
+        conn.db().bump_version(&args[2]);
+    }
+
+    result
 }
 
 /// Removes and returns the first elements of the list stored at key.

+ 3 - 0
src/db/mod.rs

@@ -172,8 +172,10 @@ impl Db {
                 if let Some(blocker) = lock.get(key) {
                     if *blocker == self.conn_id {
                         // It is blocked by us already.
+                        i += 1;
                         continue;
                     }
+                    println!("failed to block {:?}", key);
                     // It is blocked by another tx, we need to break
                     // and retry to gain the lock over this key
                     break;
@@ -710,6 +712,7 @@ impl Db {
     }
 
     /// Returns the version of a given key
+    #[inline]
     pub fn get_version(&self, key: &Bytes) -> u128 {
         let slot = self.slots[self.get_slot(key)].read();
         slot.get(key)