|
@@ -6,12 +6,13 @@ use bytes::Bytes;
|
|
|
use entry::{new_version, Entry};
|
|
|
use expiration::ExpirationDb;
|
|
|
use log::trace;
|
|
|
+use parking_lot::{Mutex, RwLock};
|
|
|
use seahash::hash;
|
|
|
use std::{
|
|
|
collections::HashMap,
|
|
|
convert::{TryFrom, TryInto},
|
|
|
ops::AddAssign,
|
|
|
- sync::{Arc, Mutex, RwLock},
|
|
|
+ sync::Arc,
|
|
|
thread,
|
|
|
};
|
|
|
use tokio::time::{Duration, Instant};
|
|
@@ -81,7 +82,7 @@ impl Db {
|
|
|
|
|
|
let waiting = Duration::from_nanos(100);
|
|
|
|
|
|
- while let Some(blocker) = self.tx_key_locks.read().unwrap().get(key) {
|
|
|
+ while let Some(blocker) = self.tx_key_locks.read().get(key) {
|
|
|
// Loop while the key we are trying to access is being blocked by a
|
|
|
// connection in a transaction
|
|
|
if *blocker == self.conn_id {
|
|
@@ -99,7 +100,7 @@ impl Db {
|
|
|
pub fn lock_keys(&self, keys: &[Bytes]) {
|
|
|
let waiting = Duration::from_nanos(100);
|
|
|
loop {
|
|
|
- let mut lock = self.tx_key_locks.write().unwrap();
|
|
|
+ let mut lock = self.tx_key_locks.write();
|
|
|
let mut i = 0;
|
|
|
|
|
|
for key in keys.iter() {
|
|
@@ -129,7 +130,7 @@ impl Db {
|
|
|
}
|
|
|
|
|
|
pub fn unlock_keys(&self, keys: &[Bytes]) {
|
|
|
- let mut lock = self.tx_key_locks.write().unwrap();
|
|
|
+ let mut lock = self.tx_key_locks.write();
|
|
|
for key in keys.iter() {
|
|
|
lock.remove(key);
|
|
|
}
|
|
@@ -142,7 +143,7 @@ impl Db {
|
|
|
key: &Bytes,
|
|
|
incr_by: T,
|
|
|
) -> Result<Value, Error> {
|
|
|
- let mut entries = self.entries[self.get_slot(key)].write().unwrap();
|
|
|
+ let mut entries = self.entries[self.get_slot(key)].write();
|
|
|
match entries.get_mut(key) {
|
|
|
Some(x) => {
|
|
|
let value = x.get();
|
|
@@ -165,7 +166,7 @@ impl Db {
|
|
|
}
|
|
|
|
|
|
pub fn persist(&self, key: &Bytes) -> Value {
|
|
|
- let mut entries = self.entries[self.get_slot(key)].write().unwrap();
|
|
|
+ let mut entries = self.entries[self.get_slot(key)].write();
|
|
|
entries
|
|
|
.get_mut(key)
|
|
|
.filter(|x| x.is_valid())
|
|
@@ -180,29 +181,26 @@ impl Db {
|
|
|
}
|
|
|
|
|
|
pub fn set_ttl(&self, key: &Bytes, expires_in: Duration) -> Value {
|
|
|
- let mut entries = self.entries[self.get_slot(key)].write().unwrap();
|
|
|
+ let mut entries = self.entries[self.get_slot(key)].write();
|
|
|
let expires_at = Instant::now() + expires_in;
|
|
|
|
|
|
entries
|
|
|
.get_mut(key)
|
|
|
.filter(|x| x.is_valid())
|
|
|
.map_or(0.into(), |x| {
|
|
|
- self.expirations.lock().unwrap().add(key, expires_at);
|
|
|
+ self.expirations.lock().add(key, expires_at);
|
|
|
x.set_ttl(expires_at);
|
|
|
1.into()
|
|
|
})
|
|
|
}
|
|
|
|
|
|
pub fn del(&self, keys: &[Bytes]) -> Value {
|
|
|
- let mut expirations = self.expirations.lock().unwrap();
|
|
|
+ let mut expirations = self.expirations.lock();
|
|
|
|
|
|
keys.iter()
|
|
|
.filter_map(|key| {
|
|
|
expirations.remove(key);
|
|
|
- self.entries[self.get_slot(key)]
|
|
|
- .write()
|
|
|
- .unwrap()
|
|
|
- .remove(key)
|
|
|
+ self.entries[self.get_slot(key)].write().remove(key)
|
|
|
})
|
|
|
.filter(|key| key.is_valid())
|
|
|
.count()
|
|
@@ -213,7 +211,7 @@ impl Db {
|
|
|
let mut matches = 0;
|
|
|
keys.iter()
|
|
|
.map(|key| {
|
|
|
- let entries = self.entries[self.get_slot(key)].read().unwrap();
|
|
|
+ let entries = self.entries[self.get_slot(key)].read();
|
|
|
if entries.get(key).is_some() {
|
|
|
matches += 1;
|
|
|
}
|
|
@@ -228,7 +226,7 @@ impl Db {
|
|
|
F1: FnOnce(&Value) -> Result<Value, Error>,
|
|
|
F2: FnOnce() -> Result<Value, Error>,
|
|
|
{
|
|
|
- let entries = self.entries[self.get_slot(key)].read().unwrap();
|
|
|
+ let entries = self.entries[self.get_slot(key)].read();
|
|
|
let entry = entries.get(key).filter(|x| x.is_valid()).map(|e| e.get());
|
|
|
|
|
|
if let Some(entry) = entry {
|
|
@@ -242,7 +240,7 @@ impl Db {
|
|
|
}
|
|
|
|
|
|
pub fn bump_version(&self, key: &Bytes) -> bool {
|
|
|
- let mut entries = self.entries[self.get_slot(key)].write().unwrap();
|
|
|
+ let mut entries = self.entries[self.get_slot(key)].write();
|
|
|
entries
|
|
|
.get_mut(key)
|
|
|
.filter(|x| x.is_valid())
|
|
@@ -253,7 +251,7 @@ impl Db {
|
|
|
}
|
|
|
|
|
|
pub fn get_version(&self, key: &Bytes) -> u128 {
|
|
|
- let entries = self.entries[self.get_slot(key)].read().unwrap();
|
|
|
+ let entries = self.entries[self.get_slot(key)].read();
|
|
|
entries
|
|
|
.get(key)
|
|
|
.filter(|x| x.is_valid())
|
|
@@ -262,7 +260,7 @@ impl Db {
|
|
|
}
|
|
|
|
|
|
pub fn get(&self, key: &Bytes) -> Value {
|
|
|
- let entries = self.entries[self.get_slot(key)].read().unwrap();
|
|
|
+ let entries = self.entries[self.get_slot(key)].read();
|
|
|
entries
|
|
|
.get(key)
|
|
|
.filter(|x| x.is_valid())
|
|
@@ -272,7 +270,7 @@ impl Db {
|
|
|
pub fn get_multi(&self, keys: &[Bytes]) -> Value {
|
|
|
keys.iter()
|
|
|
.map(|key| {
|
|
|
- let entries = self.entries[self.get_slot(key)].read().unwrap();
|
|
|
+ let entries = self.entries[self.get_slot(key)].read();
|
|
|
entries
|
|
|
.get(key)
|
|
|
.filter(|x| x.is_valid() && x.is_clonable())
|
|
@@ -283,8 +281,8 @@ impl Db {
|
|
|
}
|
|
|
|
|
|
pub fn getset(&self, key: &Bytes, value: &Value) -> Value {
|
|
|
- let mut entries = self.entries[self.get_slot(key)].write().unwrap();
|
|
|
- self.expirations.lock().unwrap().remove(key);
|
|
|
+ let mut entries = self.entries[self.get_slot(key)].write();
|
|
|
+ self.expirations.lock().remove(key);
|
|
|
entries
|
|
|
.insert(key.clone(), Entry::new(value.clone(), None))
|
|
|
.filter(|x| x.is_valid())
|
|
@@ -292,26 +290,26 @@ impl Db {
|
|
|
}
|
|
|
|
|
|
pub fn getdel(&self, key: &Bytes) -> Value {
|
|
|
- let mut entries = self.entries[self.get_slot(key)].write().unwrap();
|
|
|
+ let mut entries = self.entries[self.get_slot(key)].write();
|
|
|
entries.remove(key).map_or(Value::Null, |x| {
|
|
|
- self.expirations.lock().unwrap().remove(key);
|
|
|
+ self.expirations.lock().remove(key);
|
|
|
x.clone_value()
|
|
|
})
|
|
|
}
|
|
|
|
|
|
pub fn set(&self, key: &Bytes, value: Value, expires_in: Option<Duration>) -> Value {
|
|
|
- let mut entries = self.entries[self.get_slot(key)].write().unwrap();
|
|
|
+ let mut entries = self.entries[self.get_slot(key)].write();
|
|
|
let expires_at = expires_in.map(|duration| Instant::now() + duration);
|
|
|
|
|
|
if let Some(expires_at) = expires_at {
|
|
|
- self.expirations.lock().unwrap().add(key, expires_at);
|
|
|
+ self.expirations.lock().add(key, expires_at);
|
|
|
}
|
|
|
entries.insert(key.clone(), Entry::new(value, expires_at));
|
|
|
Value::Ok
|
|
|
}
|
|
|
|
|
|
pub fn ttl(&self, key: &Bytes) -> Option<Option<Instant>> {
|
|
|
- let entries = self.entries[self.get_slot(key)].read().unwrap();
|
|
|
+ let entries = self.entries[self.get_slot(key)].read();
|
|
|
entries
|
|
|
.get(key)
|
|
|
.filter(|x| x.is_valid())
|
|
@@ -319,7 +317,7 @@ impl Db {
|
|
|
}
|
|
|
|
|
|
pub fn purge(&self) -> u64 {
|
|
|
- let mut expirations = self.expirations.lock().unwrap();
|
|
|
+ let mut expirations = self.expirations.lock();
|
|
|
let mut removed = 0;
|
|
|
|
|
|
trace!("Watching {} keys for expirations", expirations.len());
|
|
@@ -329,7 +327,7 @@ impl Db {
|
|
|
|
|
|
keys.iter()
|
|
|
.map(|key| {
|
|
|
- let mut entries = self.entries[self.get_slot(key)].write().unwrap();
|
|
|
+ let mut entries = self.entries[self.get_slot(key)].write();
|
|
|
if entries.remove(key).is_some() {
|
|
|
trace!("Removed key {:?} due timeout", key);
|
|
|
removed += 1;
|
|
@@ -363,7 +361,7 @@ mod test {
|
|
|
let db = Db::new(100);
|
|
|
db.set(&bytes!(b"num"), Value::Blob(bytes!("1.1")), None);
|
|
|
|
|
|
- assert_eq!(Value::Float(2.2), db.incr(&bytes!("num"), 1.1).unwrap());
|
|
|
+ assert_eq!(Ok(Value::Float(2.2)), db.incr(&bytes!("num"), 1.1));
|
|
|
assert_eq!(Value::Blob(bytes!("2.2")), db.get(&bytes!("num")));
|
|
|
}
|
|
|
|
|
@@ -372,7 +370,7 @@ mod test {
|
|
|
let db = Db::new(100);
|
|
|
db.set(&bytes!(b"num"), Value::Blob(bytes!("1")), None);
|
|
|
|
|
|
- assert_eq!(Value::Float(2.1), db.incr(&bytes!("num"), 1.1).unwrap());
|
|
|
+ assert_eq!(Ok(Value::Float(2.1)), db.incr(&bytes!("num"), 1.1));
|
|
|
assert_eq!(Value::Blob(bytes!("2.1")), db.get(&bytes!("num")));
|
|
|
}
|
|
|
|
|
@@ -381,21 +379,21 @@ mod test {
|
|
|
let db = Db::new(100);
|
|
|
db.set(&bytes!(b"num"), Value::Blob(bytes!("1")), None);
|
|
|
|
|
|
- assert_eq!(Value::Integer(2), db.incr(&bytes!("num"), 1).unwrap());
|
|
|
+ assert_eq!(Ok(Value::Integer(2)), db.incr(&bytes!("num"), 1));
|
|
|
assert_eq!(Value::Blob(bytes!("2")), db.get(&bytes!("num")));
|
|
|
}
|
|
|
|
|
|
#[test]
|
|
|
fn incr_blob_int_set() {
|
|
|
let db = Db::new(100);
|
|
|
- assert_eq!(Value::Integer(1), db.incr(&bytes!("num"), 1).unwrap());
|
|
|
+ assert_eq!(Ok(Value::Integer(1)), db.incr(&bytes!("num"), 1));
|
|
|
assert_eq!(Value::Blob(bytes!("1")), db.get(&bytes!("num")));
|
|
|
}
|
|
|
|
|
|
#[test]
|
|
|
fn incr_blob_float_set() {
|
|
|
let db = Db::new(100);
|
|
|
- assert_eq!(Value::Float(1.1), db.incr(&bytes!("num"), 1.1).unwrap());
|
|
|
+ assert_eq!(Ok(Value::Float(1.1)), db.incr(&bytes!("num"), 1.1));
|
|
|
assert_eq!(Value::Blob(bytes!("1.1")), db.get(&bytes!("num")));
|
|
|
}
|
|
|
|