mod.rs 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388
  1. //! # Connection module
  2. use self::pubsub_server::Pubsub;
  3. use crate::{db::Db, error::Error, value::Value};
  4. use bytes::Bytes;
  5. use parking_lot::RwLock;
  6. use std::{collections::HashSet, sync::Arc};
  7. use tokio::sync::broadcast::{self, Receiver, Sender};
  8. pub mod connections;
  9. pub mod pubsub_connection;
  10. pub mod pubsub_server;
  11. /// Possible status of connections
  12. #[derive(Debug, Clone, Copy, Eq, PartialEq)]
  13. pub enum ConnectionStatus {
  14. /// The connection is in a MULTI stage and commands are being queued
  15. Multi,
  16. /// Failed Tx
  17. FailedTx,
  18. /// The connection is executing a transaction
  19. ExecutingTx,
  20. /// The connection is in pub-sub only mode
  21. Pubsub,
  22. /// The connection is a normal conection
  23. Normal,
  24. }
  25. impl Default for ConnectionStatus {
  26. fn default() -> Self {
  27. ConnectionStatus::Normal
  28. }
  29. }
  30. #[derive(Debug, Copy, Clone, Eq, PartialEq)]
  31. /// Reason while a client was unblocked
  32. pub enum UnblockReason {
  33. /// Timeout
  34. Timeout,
  35. /// Throw an error
  36. Error,
  37. /// Operation finished successfully
  38. Finished,
  39. }
  40. /// Connection information
  41. #[derive(Debug)]
  42. pub struct ConnectionInfo {
  43. current_db: usize,
  44. db: Arc<Db>,
  45. name: Option<String>,
  46. watch_keys: Vec<(Bytes, u128)>,
  47. tx_keys: HashSet<Bytes>,
  48. status: ConnectionStatus,
  49. commands: Option<Vec<Vec<Bytes>>>,
  50. is_blocked: bool,
  51. blocked_notification: Option<Sender<()>>,
  52. block_id: usize,
  53. unblock_reason: Option<UnblockReason>,
  54. }
  55. /// Connection
  56. #[derive(Debug)]
  57. pub struct Connection {
  58. id: u128,
  59. all_connections: Arc<connections::Connections>,
  60. addr: String,
  61. info: RwLock<ConnectionInfo>,
  62. pubsub_client: pubsub_connection::PubsubClient,
  63. }
  64. impl ConnectionInfo {
  65. /// Creates a new connection
  66. fn new(db: Arc<Db>) -> Self {
  67. Self {
  68. name: None,
  69. watch_keys: vec![],
  70. db,
  71. current_db: 0,
  72. tx_keys: HashSet::new(),
  73. commands: None,
  74. status: ConnectionStatus::Normal,
  75. blocked_notification: None,
  76. is_blocked: false,
  77. block_id: 0,
  78. unblock_reason: None,
  79. }
  80. }
  81. }
  82. impl Connection {
  83. /// Returns a connection database.
  84. ///
  85. /// The database object is unique to this connection but most of its internal structure is
  86. /// shared (like the entries).
  87. pub fn db(&self) -> Arc<Db> {
  88. self.info.read().db.clone()
  89. }
  90. /// Creates a clone connection
  91. pub fn clone(&self) -> Arc<Connection> {
  92. self.all_connections
  93. .get_by_conn_id(self.id)
  94. .expect("Connection must be registered")
  95. }
  96. /// Returns the global pubsub server
  97. pub fn pubsub(&self) -> Arc<Pubsub> {
  98. self.all_connections.pubsub()
  99. }
  100. /// Queue response, this is the only way that a handler has to send multiple
  101. /// responses leveraging internally the pubsub to itself.
  102. pub fn append_response(&self, message: Value) {
  103. self.pubsub_client.send(message)
  104. }
  105. /// Returns a reference to the pubsub client
  106. pub fn pubsub_client(&self) -> &pubsub_connection::PubsubClient {
  107. &self.pubsub_client
  108. }
  109. /// Switch the connection to a pub-sub only mode
  110. pub fn start_pubsub(&self) -> Result<Value, Error> {
  111. let mut info = self.info.write();
  112. match info.status {
  113. ConnectionStatus::Normal | ConnectionStatus::Pubsub => {
  114. info.status = ConnectionStatus::Pubsub;
  115. Ok(Value::Ignore)
  116. }
  117. _ => Err(Error::NestedTx),
  118. }
  119. }
  120. /// Block the connection
  121. pub fn block(&self) {
  122. let notification = broadcast::channel(1);
  123. let mut info = self.info.write();
  124. info.is_blocked = true;
  125. info.blocked_notification = Some(notification.0);
  126. info.block_id += 1;
  127. info.unblock_reason = None;
  128. }
  129. /// Returns the current block task ID number. This is an internal ID to
  130. /// identify each blocking command as unique
  131. #[inline]
  132. pub fn get_block_id(&self) -> Option<usize> {
  133. let info = self.info.read();
  134. if info.is_blocked {
  135. Some(info.block_id)
  136. } else {
  137. None
  138. }
  139. }
  140. /// Returns a receiver that will be called if the client is externally unblocked
  141. #[inline]
  142. pub fn get_unblocked_subscription(&self) -> Option<Receiver<()>> {
  143. self.info
  144. .read()
  145. .blocked_notification
  146. .as_ref()
  147. .map(|notification| notification.subscribe())
  148. }
  149. /// Unblock connection
  150. pub fn unblock(&self, reason: UnblockReason) -> bool {
  151. let mut info = self.info.write();
  152. if info.is_blocked {
  153. let notification = info.blocked_notification.as_ref().map(|s| s.clone());
  154. info.is_blocked = false;
  155. info.unblock_reason = Some(reason);
  156. info.blocked_notification = None;
  157. drop(info); // drop write lock
  158. if let Some(s) = notification {
  159. // Notify connection about this change
  160. s.send(());
  161. }
  162. true
  163. } else {
  164. false
  165. }
  166. }
  167. /// Is the current connection blocked?
  168. #[inline]
  169. pub fn is_blocked(&self) -> bool {
  170. self.info.read().is_blocked
  171. }
  172. /// Connection ID
  173. #[inline]
  174. pub fn id(&self) -> u128 {
  175. self.id
  176. }
  177. /// Drops a multi/transaction and reset the connection
  178. ///
  179. /// If the connection was not in a MULTI stage an error is thrown.
  180. pub fn stop_transaction(&self) -> Result<Value, Error> {
  181. let mut info = self.info.write();
  182. match info.status {
  183. ConnectionStatus::Multi
  184. | ConnectionStatus::FailedTx
  185. | ConnectionStatus::ExecutingTx => {
  186. info.commands = None;
  187. info.watch_keys.clear();
  188. info.tx_keys.clear();
  189. info.status = ConnectionStatus::Normal;
  190. Ok(Value::Ok)
  191. }
  192. _ => Err(Error::NotInTx),
  193. }
  194. }
  195. /// Flag the transaction as failed
  196. pub fn fail_transaction(&self) {
  197. let mut info = self.info.write();
  198. info.status = ConnectionStatus::FailedTx;
  199. }
  200. /// Starts a transaction/multi
  201. ///
  202. /// Nested transactions are not possible.
  203. pub fn start_transaction(&self) -> Result<Value, Error> {
  204. let mut info = self.info.write();
  205. if info.status == ConnectionStatus::Normal {
  206. info.status = ConnectionStatus::Multi;
  207. Ok(Value::Ok)
  208. } else {
  209. Err(Error::NestedTx)
  210. }
  211. }
  212. /// Resets the current connection.
  213. pub fn reset(&self) {
  214. let mut info = self.info.write();
  215. info.status = ConnectionStatus::Normal;
  216. info.name = None;
  217. info.watch_keys = vec![];
  218. info.commands = None;
  219. info.tx_keys = HashSet::new();
  220. drop(info);
  221. let pubsub = self.pubsub();
  222. let pubsub_client = self.pubsub_client();
  223. if !pubsub_client.subscriptions().is_empty() {
  224. pubsub.unsubscribe(&self.pubsub_client.subscriptions(), self, false);
  225. }
  226. if !pubsub_client.psubscriptions().is_empty() {
  227. pubsub.punsubscribe(&self.pubsub_client.psubscriptions(), self, false);
  228. }
  229. }
  230. /// Returns the status of the connection
  231. #[inline]
  232. pub fn status(&self) -> ConnectionStatus {
  233. self.info.read().status
  234. }
  235. /// Is connection executing transaction?
  236. #[inline]
  237. pub fn is_executing_tx(&self) -> bool {
  238. self.info.read().status == ConnectionStatus::ExecutingTx
  239. }
  240. /// Watches keys. In a transaction watched keys are a mechanism to discard a transaction if
  241. /// some value changed since the moment the command was queued until the execution time.
  242. pub fn watch_key(&self, keys: &[(&Bytes, u128)]) {
  243. let watch_keys = &mut self.info.write().watch_keys;
  244. keys.iter()
  245. .map(|(bytes, version)| {
  246. watch_keys.push(((*bytes).clone(), *version));
  247. })
  248. .for_each(drop);
  249. }
  250. /// Returns true if any of the watched keys changed their value
  251. pub fn did_keys_change(&self) -> bool {
  252. let watch_keys = &self.info.read().watch_keys;
  253. for key in watch_keys.iter() {
  254. if self.info.read().db.get_version(&key.0) != key.1 {
  255. return true;
  256. }
  257. }
  258. false
  259. }
  260. /// Resets the watched keys list
  261. pub fn discard_watched_keys(&self) {
  262. self.info.write().watch_keys.clear()
  263. }
  264. /// Returns a list of key that are involved in a transaction. These keys will be locked as
  265. /// exclusive, even if they don't exists, during the execution of a transction.
  266. ///
  267. /// The original implementation of Redis does not need this promise because only one
  268. /// transaction is executed at a time, in microredis transactions reserve their keys and do not
  269. /// prevent other connections to continue modifying the database.
  270. pub fn get_tx_keys(&self) -> Vec<Bytes> {
  271. self.info
  272. .read()
  273. .tx_keys
  274. .iter()
  275. .cloned()
  276. .collect::<Vec<Bytes>>()
  277. }
  278. /// Queues a command for later execution
  279. pub fn queue_command(&self, args: &[Bytes]) {
  280. let mut info = self.info.write();
  281. let commands = info.commands.get_or_insert(vec![]);
  282. commands.push(args.iter().map(|m| (*m).clone()).collect());
  283. }
  284. /// Returns a list of queued commands.
  285. pub fn get_queue_commands(&self) -> Option<Vec<Vec<Bytes>>> {
  286. let mut info = self.info.write();
  287. info.watch_keys = vec![];
  288. info.status = ConnectionStatus::ExecutingTx;
  289. info.commands.take()
  290. }
  291. /// Returns a lsit of transaction keys
  292. pub fn tx_keys(&self, keys: Vec<&Bytes>) {
  293. #[allow(clippy::mutable_key_type)]
  294. let tx_keys = &mut self.info.write().tx_keys;
  295. keys.iter()
  296. .map(|k| {
  297. tx_keys.insert((*k).clone());
  298. })
  299. .for_each(drop);
  300. }
  301. /// Disconnects from the server, disconnect from all pubsub channels and remove itself from the
  302. /// all_connection lists.
  303. pub fn destroy(self: Arc<Connection>) {
  304. let pubsub = self.pubsub();
  305. self.clone().unblock(UnblockReason::Timeout);
  306. pubsub.unsubscribe(&self.pubsub_client.subscriptions(), &self, false);
  307. pubsub.punsubscribe(&self.pubsub_client.psubscriptions(), &self, false);
  308. self.all_connections.clone().remove(self);
  309. }
  310. /// Returns the all_connections (Connections) instance
  311. pub fn all_connections(&self) -> Arc<connections::Connections> {
  312. self.all_connections.clone()
  313. }
  314. /// Returns the connection name
  315. pub fn name(&self) -> Option<String> {
  316. self.info.read().name.clone()
  317. }
  318. /// Sets a connection name
  319. pub fn set_name(&self, name: String) {
  320. let mut r = self.info.write();
  321. r.name = Some(name);
  322. }
  323. /// Changes the current db for the current connection
  324. pub fn selectdb(&self, db: usize) -> Result<Value, Error> {
  325. let mut info = self.info.write();
  326. info.db = self
  327. .all_connections
  328. .get_databases()
  329. .get(db)?
  330. .new_db_instance(self.id);
  331. info.current_db = db;
  332. Ok(Value::Ok)
  333. }
  334. }
  335. impl ToString for Connection {
  336. /// Returns a string representation of this connection
  337. fn to_string(&self) -> String {
  338. let info = self.info.read();
  339. format!(
  340. "id={} addr={} name={:?} db={}\r\n",
  341. self.id, self.addr, info.name, info.current_db
  342. )
  343. }
  344. }