1
0

mod.rs 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389
  1. //! # Connection module
  2. use self::pubsub_server::Pubsub;
  3. use crate::{db::Db, error::Error, value::Value};
  4. use bytes::Bytes;
  5. use parking_lot::RwLock;
  6. use std::{
  7. collections::{HashSet, VecDeque},
  8. sync::Arc,
  9. };
  10. use tokio::sync::broadcast::{self, Receiver, Sender};
  11. pub mod connections;
  12. pub mod pubsub_connection;
  13. pub mod pubsub_server;
  14. /// Possible status of connections
  15. #[derive(Debug, Clone, Copy, Eq, PartialEq, Default)]
  16. pub enum ConnectionStatus {
  17. /// The connection is in a MULTI stage and commands are being queued
  18. Multi,
  19. /// Failed Tx
  20. FailedTx,
  21. /// The connection is executing a transaction
  22. ExecutingTx,
  23. /// The connection is in pub-sub only mode
  24. Pubsub,
  25. /// The connection is a normal conection
  26. #[default]
  27. Normal,
  28. }
  29. #[derive(Debug, Copy, Clone, Eq, PartialEq)]
  30. /// Reason while a client was unblocked
  31. pub enum UnblockReason {
  32. /// Timeout
  33. Timeout,
  34. /// Throw an error
  35. Error,
  36. /// Operation finished successfully
  37. Finished,
  38. }
  39. /// Connection information
  40. #[derive(Debug)]
  41. pub struct ConnectionInfo {
  42. current_db: usize,
  43. db: Arc<Db>,
  44. name: Option<String>,
  45. watch_keys: Vec<(Bytes, usize)>,
  46. tx_keys: HashSet<Bytes>,
  47. status: ConnectionStatus,
  48. commands: Option<Vec<VecDeque<Bytes>>>,
  49. is_blocked: bool,
  50. blocked_notification: Option<Sender<()>>,
  51. block_id: usize,
  52. unblock_reason: Option<UnblockReason>,
  53. }
  54. /// Connection
  55. #[derive(Debug)]
  56. pub struct Connection {
  57. id: u128,
  58. all_connections: Arc<connections::Connections>,
  59. addr: String,
  60. info: RwLock<ConnectionInfo>,
  61. pubsub_client: pubsub_connection::PubsubClient,
  62. }
  63. impl ConnectionInfo {
  64. /// Creates a new connection
  65. fn new(db: Arc<Db>) -> Self {
  66. Self {
  67. name: None,
  68. watch_keys: vec![],
  69. db,
  70. current_db: 0,
  71. tx_keys: HashSet::new(),
  72. commands: None,
  73. status: ConnectionStatus::default(),
  74. blocked_notification: None,
  75. is_blocked: false,
  76. block_id: 0,
  77. unblock_reason: None,
  78. }
  79. }
  80. }
  81. impl Connection {
  82. /// Returns a connection database.
  83. ///
  84. /// The database object is unique to this connection but most of its internal structure is
  85. /// shared (like the entries).
  86. pub fn db(&self) -> Arc<Db> {
  87. self.info.read().db.clone()
  88. }
  89. /// Creates a clone connection
  90. pub fn get_connection(&self) -> Arc<Connection> {
  91. self.all_connections
  92. .get_by_conn_id(self.id)
  93. .expect("Connection must be registered")
  94. }
  95. /// Returns the global pubsub server
  96. pub fn pubsub(&self) -> Arc<Pubsub> {
  97. self.all_connections.pubsub()
  98. }
  99. /// Queue response, this is the only way that a handler has to send multiple
  100. /// responses leveraging internally the pubsub to itself.
  101. pub fn append_response(&self, message: Value) {
  102. self.pubsub_client.send(message)
  103. }
  104. /// Returns a reference to the pubsub client
  105. pub fn pubsub_client(&self) -> &pubsub_connection::PubsubClient {
  106. &self.pubsub_client
  107. }
  108. /// Switch the connection to a pub-sub only mode
  109. pub fn start_pubsub(&self) -> Result<Value, Error> {
  110. let mut info = self.info.write();
  111. match info.status {
  112. ConnectionStatus::Normal | ConnectionStatus::Pubsub => {
  113. info.status = ConnectionStatus::Pubsub;
  114. Ok(Value::Ignore)
  115. }
  116. _ => Err(Error::NestedTx),
  117. }
  118. }
  119. /// Block the connection
  120. pub fn block(&self) {
  121. let notification = broadcast::channel(1);
  122. let mut info = self.info.write();
  123. info.is_blocked = true;
  124. info.blocked_notification = Some(notification.0);
  125. info.block_id += 1;
  126. info.unblock_reason = None;
  127. }
  128. /// Returns the current block task ID number. This is an internal ID to
  129. /// identify each blocking command as unique
  130. #[inline]
  131. pub fn get_block_id(&self) -> Option<usize> {
  132. let info = self.info.read();
  133. if info.is_blocked {
  134. Some(info.block_id)
  135. } else {
  136. None
  137. }
  138. }
  139. /// Returns a receiver that will be called if the client is externally unblocked
  140. #[inline]
  141. pub fn get_unblocked_subscription(&self) -> Option<Receiver<()>> {
  142. self.info
  143. .read()
  144. .blocked_notification
  145. .as_ref()
  146. .map(|notification| notification.subscribe())
  147. }
  148. /// Unblock connection
  149. pub fn unblock(&self, reason: UnblockReason) -> bool {
  150. let mut info = self.info.write();
  151. if info.is_blocked {
  152. let notification = info.blocked_notification.as_ref().cloned();
  153. info.is_blocked = false;
  154. info.unblock_reason = Some(reason);
  155. info.blocked_notification = None;
  156. drop(info); // drop write lock
  157. if let Some(s) = notification {
  158. // Notify connection about this change
  159. let _ = s.send(());
  160. }
  161. true
  162. } else {
  163. false
  164. }
  165. }
  166. /// Is the current connection blocked?
  167. #[inline]
  168. pub fn is_blocked(&self) -> bool {
  169. self.info.read().is_blocked
  170. }
  171. /// Connection ID
  172. #[inline]
  173. pub fn id(&self) -> u128 {
  174. self.id
  175. }
  176. /// Drops a multi/transaction and reset the connection
  177. ///
  178. /// If the connection was not in a MULTI stage an error is thrown.
  179. pub fn stop_transaction(&self) -> Result<Value, Error> {
  180. let mut info = self.info.write();
  181. match info.status {
  182. ConnectionStatus::Multi
  183. | ConnectionStatus::FailedTx
  184. | ConnectionStatus::ExecutingTx => {
  185. info.commands = None;
  186. info.watch_keys.clear();
  187. info.tx_keys.clear();
  188. info.status = ConnectionStatus::default();
  189. Ok(Value::Ok)
  190. }
  191. _ => Err(Error::NotInTx),
  192. }
  193. }
  194. /// Flag the transaction as failed
  195. pub fn fail_transaction(&self) {
  196. let mut info = self.info.write();
  197. info.status = ConnectionStatus::FailedTx;
  198. }
  199. /// Starts a transaction/multi
  200. ///
  201. /// Nested transactions are not possible.
  202. pub fn start_transaction(&self) -> Result<Value, Error> {
  203. let mut info = self.info.write();
  204. if info.status == ConnectionStatus::Normal {
  205. info.status = ConnectionStatus::Multi;
  206. Ok(Value::Ok)
  207. } else {
  208. Err(Error::NestedTx)
  209. }
  210. }
  211. /// Resets the current connection.
  212. pub fn reset(&self) {
  213. let mut info = self.info.write();
  214. info.status = ConnectionStatus::default();
  215. info.name = None;
  216. info.watch_keys = vec![];
  217. info.commands = None;
  218. info.tx_keys = HashSet::new();
  219. drop(info);
  220. let pubsub = self.pubsub();
  221. let pubsub_client = self.pubsub_client();
  222. if !pubsub_client.subscriptions().is_empty() {
  223. pubsub.unsubscribe(&self.pubsub_client.subscriptions(), self, false);
  224. }
  225. if !pubsub_client.psubscriptions().is_empty() {
  226. pubsub.punsubscribe(&self.pubsub_client.psubscriptions(), self, false);
  227. }
  228. }
  229. /// Returns the status of the connection
  230. #[inline]
  231. pub fn status(&self) -> ConnectionStatus {
  232. self.info.read().status
  233. }
  234. /// Is connection executing transaction?
  235. #[inline]
  236. pub fn is_executing_tx(&self) -> bool {
  237. self.info.read().status == ConnectionStatus::ExecutingTx
  238. }
  239. /// Watches keys. In a transaction watched keys are a mechanism to discard a transaction if
  240. /// some value changed since the moment the command was queued until the execution time.
  241. pub fn watch_key(&self, keys: Vec<(Bytes, usize)>) {
  242. let watch_keys = &mut self.info.write().watch_keys;
  243. keys.into_iter()
  244. .map(|value| {
  245. watch_keys.push(value);
  246. })
  247. .for_each(drop);
  248. }
  249. /// Returns true if any of the watched keys changed their value
  250. pub fn did_keys_change(&self) -> bool {
  251. let watch_keys = &self.info.read().watch_keys;
  252. for key in watch_keys.iter() {
  253. if self.info.read().db.get(&key.0).version() != key.1 {
  254. return true;
  255. }
  256. }
  257. false
  258. }
  259. /// Resets the watched keys list
  260. pub fn discard_watched_keys(&self) {
  261. self.info.write().watch_keys.clear()
  262. }
  263. /// Returns a list of key that are involved in a transaction. These keys will be locked as
  264. /// exclusive, even if they don't exists, during the execution of a transction.
  265. ///
  266. /// The original implementation of Redis does not need this promise because only one
  267. /// transaction is executed at a time, in microredis transactions reserve their keys and do not
  268. /// prevent other connections to continue modifying the database.
  269. pub fn get_tx_keys(&self) -> Vec<Bytes> {
  270. self.info
  271. .read()
  272. .tx_keys
  273. .iter()
  274. .cloned()
  275. .collect::<Vec<Bytes>>()
  276. }
  277. /// Queues a command for later execution
  278. pub fn queue_command(&self, args: VecDeque<Bytes>) {
  279. let mut info = self.info.write();
  280. let commands = info.commands.get_or_insert(vec![]);
  281. commands.push(args);
  282. }
  283. /// Returns a list of queued commands.
  284. pub fn get_queue_commands(&self) -> Option<Vec<VecDeque<Bytes>>> {
  285. let mut info = self.info.write();
  286. info.watch_keys = vec![];
  287. info.status = ConnectionStatus::ExecutingTx;
  288. info.commands.take()
  289. }
  290. /// Returns a list of transaction keys
  291. pub fn tx_keys<T>(&self, keys: T)
  292. where
  293. T: IntoIterator<Item = Bytes>,
  294. {
  295. #[allow(clippy::mutable_key_type)]
  296. let tx_keys = &mut self.info.write().tx_keys;
  297. keys.into_iter()
  298. .map(|k| {
  299. tx_keys.insert(k);
  300. })
  301. .for_each(drop);
  302. }
  303. /// Disconnects from the server, disconnect from all pubsub channels and remove itself from the
  304. /// all_connection lists.
  305. pub fn destroy(self: Arc<Connection>) {
  306. let pubsub = self.pubsub();
  307. self.clone().unblock(UnblockReason::Timeout);
  308. pubsub.unsubscribe(&self.pubsub_client.subscriptions(), &self, false);
  309. pubsub.punsubscribe(&self.pubsub_client.psubscriptions(), &self, false);
  310. self.all_connections.clone().remove(self);
  311. }
  312. /// Returns the all_connections (Connections) instance
  313. pub fn all_connections(&self) -> Arc<connections::Connections> {
  314. self.all_connections.clone()
  315. }
  316. /// Returns the connection name
  317. pub fn name(&self) -> Option<String> {
  318. self.info.read().name.clone()
  319. }
  320. /// Sets a connection name
  321. pub fn set_name(&self, name: String) {
  322. let mut r = self.info.write();
  323. r.name = Some(name);
  324. }
  325. /// Changes the current db for the current connection
  326. pub fn selectdb(&self, db: usize) -> Result<Value, Error> {
  327. let mut info = self.info.write();
  328. info.db = self
  329. .all_connections
  330. .get_databases()
  331. .get(db)?
  332. .set_conn_id(self.id);
  333. info.current_db = db;
  334. Ok(Value::Ok)
  335. }
  336. }
  337. impl ToString for Connection {
  338. /// Returns a string representation of this connection
  339. fn to_string(&self) -> String {
  340. let info = self.info.read();
  341. format!(
  342. "id={} addr={} name={:?} db={}\r\n",
  343. self.id, self.addr, info.name, info.current_db
  344. )
  345. }
  346. }