mod.rs 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356
  1. //! # Connection module
  2. use crate::{db::Db, error::Error, value::Value};
  3. use bytes::Bytes;
  4. use parking_lot::RwLock;
  5. use std::{collections::HashSet, sync::Arc};
  6. use self::pubsub_server::Pubsub;
  7. pub mod connections;
  8. pub mod pubsub_connection;
  9. pub mod pubsub_server;
  10. /// Possible status of connections
  11. #[derive(Debug, Clone, Copy, Eq, PartialEq)]
  12. pub enum ConnectionStatus {
  13. /// The connection is in a MULTI stage and commands are being queued
  14. Multi,
  15. /// Failed Tx
  16. FailedTx,
  17. /// The connection is executing a transaction
  18. ExecutingTx,
  19. /// The connection is in pub-sub only mode
  20. Pubsub,
  21. /// The connection is a normal conection
  22. Normal,
  23. }
  24. impl Default for ConnectionStatus {
  25. fn default() -> Self {
  26. ConnectionStatus::Normal
  27. }
  28. }
  29. #[derive(Debug, Copy, Clone)]
  30. /// Reason while a client was unblocked
  31. pub enum UnblockReason {
  32. /// Timeout
  33. Timeout,
  34. /// Throw an error
  35. Error,
  36. /// Operation finished successfully
  37. Finished,
  38. }
  39. /// Connection information
  40. #[derive(Debug)]
  41. pub struct ConnectionInfo {
  42. current_db: usize,
  43. db: Arc<Db>,
  44. name: Option<String>,
  45. watch_keys: Vec<(Bytes, u128)>,
  46. tx_keys: HashSet<Bytes>,
  47. status: ConnectionStatus,
  48. commands: Option<Vec<Vec<Bytes>>>,
  49. is_blocked: bool,
  50. unblock_reason: Option<UnblockReason>,
  51. }
  52. /// Connection
  53. #[derive(Debug)]
  54. pub struct Connection {
  55. id: u128,
  56. all_connections: Arc<connections::Connections>,
  57. addr: String,
  58. info: RwLock<ConnectionInfo>,
  59. pubsub_client: pubsub_connection::PubsubClient,
  60. }
  61. impl ConnectionInfo {
  62. /// Creates a new connection
  63. fn new(db: Arc<Db>) -> Self {
  64. Self {
  65. name: None,
  66. watch_keys: vec![],
  67. db,
  68. current_db: 0,
  69. tx_keys: HashSet::new(),
  70. commands: None,
  71. status: ConnectionStatus::Normal,
  72. is_blocked: false,
  73. unblock_reason: None,
  74. }
  75. }
  76. }
  77. impl Connection {
  78. /// Returns a connection database.
  79. ///
  80. /// The database object is unique to this connection but most of its internal structure is
  81. /// shared (like the entries).
  82. pub fn db(&self) -> Arc<Db> {
  83. self.info.read().db.clone()
  84. }
  85. /// Creates a clone connection
  86. pub fn clone(&self) -> Arc<Connection> {
  87. self.all_connections
  88. .get_by_conn_id(self.id)
  89. .expect("Connection must be registered")
  90. }
  91. /// Returns the global pubsub server
  92. pub fn pubsub(&self) -> Arc<Pubsub> {
  93. self.all_connections.pubsub()
  94. }
  95. /// Queue response, this is the only way that a handler has to send multiple
  96. /// responses leveraging internally the pubsub to itself.
  97. pub fn append_response(&self, message: Value) {
  98. self.pubsub_client.send(message)
  99. }
  100. /// Returns a reference to the pubsub client
  101. pub fn pubsub_client(&self) -> &pubsub_connection::PubsubClient {
  102. &self.pubsub_client
  103. }
  104. /// Switch the connection to a pub-sub only mode
  105. pub fn start_pubsub(&self) -> Result<Value, Error> {
  106. let mut info = self.info.write();
  107. match info.status {
  108. ConnectionStatus::Normal | ConnectionStatus::Pubsub => {
  109. info.status = ConnectionStatus::Pubsub;
  110. Ok(Value::Ignore)
  111. }
  112. _ => Err(Error::NestedTx),
  113. }
  114. }
  115. /// Block the connection
  116. pub fn block(&self) {
  117. let mut info = self.info.write();
  118. info.is_blocked = true;
  119. info.unblock_reason = None;
  120. }
  121. /// Unblock connection
  122. pub fn unblock(&self, reason: UnblockReason) -> bool {
  123. let mut info = self.info.write();
  124. if info.is_blocked {
  125. info.is_blocked = false;
  126. info.unblock_reason = Some(reason);
  127. true
  128. } else {
  129. false
  130. }
  131. }
  132. /// If the current connection has been externally unblocked
  133. #[inline]
  134. pub fn has_been_unblocked_externally(&self) -> Option<UnblockReason> {
  135. self.info.read().unblock_reason
  136. }
  137. /// Is the current connection blocked?
  138. #[inline]
  139. pub fn is_blocked(&self) -> bool {
  140. self.info.read().is_blocked
  141. }
  142. /// Connection ID
  143. #[inline]
  144. pub fn id(&self) -> u128 {
  145. self.id
  146. }
  147. /// Drops a multi/transaction and reset the connection
  148. ///
  149. /// If the connection was not in a MULTI stage an error is thrown.
  150. pub fn stop_transaction(&self) -> Result<Value, Error> {
  151. let mut info = self.info.write();
  152. match info.status {
  153. ConnectionStatus::Multi
  154. | ConnectionStatus::FailedTx
  155. | ConnectionStatus::ExecutingTx => {
  156. info.commands = None;
  157. info.watch_keys.clear();
  158. info.tx_keys.clear();
  159. info.status = ConnectionStatus::Normal;
  160. Ok(Value::Ok)
  161. }
  162. _ => Err(Error::NotInTx),
  163. }
  164. }
  165. /// Flag the transaction as failed
  166. pub fn fail_transaction(&self) {
  167. let mut info = self.info.write();
  168. info.status = ConnectionStatus::FailedTx;
  169. }
  170. /// Starts a transaction/multi
  171. ///
  172. /// Nested transactions are not possible.
  173. pub fn start_transaction(&self) -> Result<Value, Error> {
  174. let mut info = self.info.write();
  175. if info.status == ConnectionStatus::Normal {
  176. info.status = ConnectionStatus::Multi;
  177. Ok(Value::Ok)
  178. } else {
  179. Err(Error::NestedTx)
  180. }
  181. }
  182. /// Resets the current connection.
  183. pub fn reset(&self) {
  184. let mut info = self.info.write();
  185. info.status = ConnectionStatus::Normal;
  186. info.name = None;
  187. info.watch_keys = vec![];
  188. info.commands = None;
  189. info.tx_keys = HashSet::new();
  190. drop(info);
  191. let pubsub = self.pubsub();
  192. let pubsub_client = self.pubsub_client();
  193. if !pubsub_client.subscriptions().is_empty() {
  194. pubsub.unsubscribe(&self.pubsub_client.subscriptions(), self, false);
  195. }
  196. if !pubsub_client.psubscriptions().is_empty() {
  197. pubsub.punsubscribe(&self.pubsub_client.psubscriptions(), self, false);
  198. }
  199. }
  200. /// Returns the status of the connection
  201. #[inline]
  202. pub fn status(&self) -> ConnectionStatus {
  203. self.info.read().status
  204. }
  205. /// Is connection executing transaction?
  206. #[inline]
  207. pub fn is_executing_tx(&self) -> bool {
  208. self.info.read().status == ConnectionStatus::ExecutingTx
  209. }
  210. /// Watches keys. In a transaction watched keys are a mechanism to discard a transaction if
  211. /// some value changed since the moment the command was queued until the execution time.
  212. pub fn watch_key(&self, keys: &[(&Bytes, u128)]) {
  213. let watch_keys = &mut self.info.write().watch_keys;
  214. keys.iter()
  215. .map(|(bytes, version)| {
  216. watch_keys.push(((*bytes).clone(), *version));
  217. })
  218. .for_each(drop);
  219. }
  220. /// Returns true if any of the watched keys changed their value
  221. pub fn did_keys_change(&self) -> bool {
  222. let watch_keys = &self.info.read().watch_keys;
  223. for key in watch_keys.iter() {
  224. if self.info.read().db.get_version(&key.0) != key.1 {
  225. return true;
  226. }
  227. }
  228. false
  229. }
  230. /// Resets the watched keys list
  231. pub fn discard_watched_keys(&self) {
  232. self.info.write().watch_keys.clear()
  233. }
  234. /// Returns a list of key that are involved in a transaction. These keys will be locked as
  235. /// exclusive, even if they don't exists, during the execution of a transction.
  236. ///
  237. /// The original implementation of Redis does not need this promise because only one
  238. /// transaction is executed at a time, in microredis transactions reserve their keys and do not
  239. /// prevent other connections to continue modifying the database.
  240. pub fn get_tx_keys(&self) -> Vec<Bytes> {
  241. self.info
  242. .read()
  243. .tx_keys
  244. .iter()
  245. .cloned()
  246. .collect::<Vec<Bytes>>()
  247. }
  248. /// Queues a command for later execution
  249. pub fn queue_command(&self, args: &[Bytes]) {
  250. let mut info = self.info.write();
  251. let commands = info.commands.get_or_insert(vec![]);
  252. commands.push(args.iter().map(|m| (*m).clone()).collect());
  253. }
  254. /// Returns a list of queued commands.
  255. pub fn get_queue_commands(&self) -> Option<Vec<Vec<Bytes>>> {
  256. let mut info = self.info.write();
  257. info.watch_keys = vec![];
  258. info.status = ConnectionStatus::ExecutingTx;
  259. info.commands.take()
  260. }
  261. /// Returns a lsit of transaction keys
  262. pub fn tx_keys(&self, keys: Vec<&Bytes>) {
  263. #[allow(clippy::mutable_key_type)]
  264. let tx_keys = &mut self.info.write().tx_keys;
  265. keys.iter()
  266. .map(|k| {
  267. tx_keys.insert((*k).clone());
  268. })
  269. .for_each(drop);
  270. }
  271. /// Disconnects from the server, disconnect from all pubsub channels and remove itself from the
  272. /// all_connection lists.
  273. pub fn destroy(self: Arc<Connection>) {
  274. let pubsub = self.pubsub();
  275. self.clone().unblock(UnblockReason::Timeout);
  276. pubsub.unsubscribe(&self.pubsub_client.subscriptions(), &self, false);
  277. pubsub.punsubscribe(&self.pubsub_client.psubscriptions(), &self, false);
  278. self.all_connections.clone().remove(self);
  279. }
  280. /// Returns the all_connections (Connections) instance
  281. pub fn all_connections(&self) -> Arc<connections::Connections> {
  282. self.all_connections.clone()
  283. }
  284. /// Returns the connection name
  285. pub fn name(&self) -> Option<String> {
  286. self.info.read().name.clone()
  287. }
  288. /// Sets a connection name
  289. pub fn set_name(&self, name: String) {
  290. let mut r = self.info.write();
  291. r.name = Some(name);
  292. }
  293. /// Changes the current db for the current connection
  294. pub fn selectdb(&self, db: usize) -> Result<Value, Error> {
  295. let mut info = self.info.write();
  296. info.db = self
  297. .all_connections
  298. .get_databases()
  299. .get(db)?
  300. .new_db_instance(self.id);
  301. info.current_db = db;
  302. Ok(Value::Ok)
  303. }
  304. }
  305. impl ToString for Connection {
  306. /// Returns a string representation of this connection
  307. fn to_string(&self) -> String {
  308. let info = self.info.read();
  309. format!(
  310. "id={} addr={} name={:?} db={}\r\n",
  311. self.id, self.addr, info.name, info.current_db
  312. )
  313. }
  314. }