mod.rs 8.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271
  1. //! # Connection module
  2. use crate::{db::Db, error::Error, value::Value};
  3. use bytes::Bytes;
  4. use parking_lot::RwLock;
  5. use std::{collections::HashSet, sync::Arc};
  6. use self::pubsub_server::Pubsub;
  7. pub mod connections;
  8. pub mod pubsub_connection;
  9. pub mod pubsub_server;
  10. /// Possible status of connections
  11. #[derive(Debug, Clone, Copy, Eq, PartialEq)]
  12. pub enum ConnectionStatus {
  13. /// The connection is in a MULTI stage and commands are being queued
  14. Multi,
  15. /// The connection is executing a transaction
  16. ExecutingTx,
  17. /// The connection is in pub-sub only mode
  18. Pubsub,
  19. /// The connection is a normal conection
  20. Normal,
  21. }
  22. impl Default for ConnectionStatus {
  23. fn default() -> Self {
  24. ConnectionStatus::Normal
  25. }
  26. }
  27. /// Connection information
  28. #[derive(Debug)]
  29. pub struct ConnectionInfo {
  30. current_db: usize,
  31. db: Arc<Db>,
  32. name: Option<String>,
  33. watch_keys: Vec<(Bytes, u128)>,
  34. tx_keys: HashSet<Bytes>,
  35. status: ConnectionStatus,
  36. commands: Option<Vec<Vec<Bytes>>>,
  37. }
  38. /// Connection
  39. #[derive(Debug)]
  40. pub struct Connection {
  41. id: u128,
  42. all_connections: Arc<connections::Connections>,
  43. addr: String,
  44. info: RwLock<ConnectionInfo>,
  45. pubsub_client: pubsub_connection::PubsubClient,
  46. }
  47. impl ConnectionInfo {
  48. /// Creates a new connection
  49. fn new(db: Arc<Db>) -> Self {
  50. Self {
  51. name: None,
  52. watch_keys: vec![],
  53. db,
  54. current_db: 0,
  55. tx_keys: HashSet::new(),
  56. commands: None,
  57. status: ConnectionStatus::Normal,
  58. }
  59. }
  60. }
  61. impl Connection {
  62. /// Returns a connection database.
  63. ///
  64. /// The database object is unique to this connection but most of its internal structure is
  65. /// shared (like the entries).
  66. pub fn db(&self) -> Arc<Db> {
  67. self.info.read().db.clone()
  68. }
  69. /// Returns the global pubsub server
  70. pub fn pubsub(&self) -> Arc<Pubsub> {
  71. self.all_connections.pubsub()
  72. }
  73. /// Returns a reference to the pubsub client
  74. pub fn pubsub_client(&self) -> &pubsub_connection::PubsubClient {
  75. &self.pubsub_client
  76. }
  77. /// Switch the connection to a pub-sub only mode
  78. pub fn start_pubsub(&self) -> Result<Value, Error> {
  79. let mut info = self.info.write();
  80. match info.status {
  81. ConnectionStatus::Normal | ConnectionStatus::Pubsub => {
  82. info.status = ConnectionStatus::Pubsub;
  83. Ok(Value::Ok)
  84. }
  85. _ => Err(Error::NestedTx),
  86. }
  87. }
  88. /// Connection ID
  89. pub fn id(&self) -> u128 {
  90. self.id
  91. }
  92. /// Drops a multi/transaction and reset the connection
  93. ///
  94. /// If the connection was not in a MULTI stage an error is thrown.
  95. pub fn stop_transaction(&self) -> Result<Value, Error> {
  96. let mut info = self.info.write();
  97. if info.status == ConnectionStatus::Multi || info.status == ConnectionStatus::ExecutingTx {
  98. info.commands = None;
  99. info.watch_keys.clear();
  100. info.tx_keys.clear();
  101. info.status = ConnectionStatus::Normal;
  102. Ok(Value::Ok)
  103. } else {
  104. Err(Error::NotInTx)
  105. }
  106. }
  107. /// Starts a transaction/multi
  108. ///
  109. /// Nested transactions are not possible.
  110. pub fn start_transaction(&self) -> Result<Value, Error> {
  111. let mut info = self.info.write();
  112. if info.status == ConnectionStatus::Normal {
  113. info.status = ConnectionStatus::Multi;
  114. Ok(Value::Ok)
  115. } else {
  116. Err(Error::NestedTx)
  117. }
  118. }
  119. /// Resets the current connection.
  120. pub fn reset(&self) {
  121. let mut info = self.info.write();
  122. info.status = ConnectionStatus::Normal;
  123. info.name = None;
  124. info.watch_keys = vec![];
  125. info.commands = None;
  126. info.tx_keys = HashSet::new();
  127. let pubsub = self.pubsub();
  128. pubsub.unsubscribe(&self.pubsub_client.subscriptions(), self);
  129. pubsub.punsubscribe(&self.pubsub_client.psubscriptions(), self);
  130. }
  131. /// Returns the status of the connection
  132. pub fn status(&self) -> ConnectionStatus {
  133. self.info.read().status
  134. }
  135. /// Watches keys. In a transaction watched keys are a mechanism to discard a transaction if
  136. /// some value changed since the moment the command was queued until the execution time.
  137. pub fn watch_key(&self, keys: &[(&Bytes, u128)]) {
  138. let watch_keys = &mut self.info.write().watch_keys;
  139. keys.iter()
  140. .map(|(bytes, version)| {
  141. watch_keys.push(((*bytes).clone(), *version));
  142. })
  143. .for_each(drop);
  144. }
  145. /// Returns true if any of the watched keys changed their value
  146. pub fn did_keys_change(&self) -> bool {
  147. let watch_keys = &self.info.read().watch_keys;
  148. for key in watch_keys.iter() {
  149. if self.info.read().db.get_version(&key.0) != key.1 {
  150. return true;
  151. }
  152. }
  153. false
  154. }
  155. /// Resets the watched keys list
  156. pub fn discard_watched_keys(&self) {
  157. self.info.write().watch_keys.clear()
  158. }
  159. /// Returns a list of key that are involved in a transaction. These keys will be locked as
  160. /// exclusive, even if they don't exists, during the execution of a transction.
  161. ///
  162. /// The original implementation of Redis does not need this promise because only one
  163. /// transaction is executed at a time, in microredis transactions reserve their keys and do not
  164. /// prevent other connections to continue modifying the database.
  165. pub fn get_tx_keys(&self) -> Vec<Bytes> {
  166. self.info
  167. .read()
  168. .tx_keys
  169. .iter()
  170. .cloned()
  171. .collect::<Vec<Bytes>>()
  172. }
  173. /// Queues a command for later execution
  174. pub fn queue_command(&self, args: &[Bytes]) {
  175. let mut info = self.info.write();
  176. let commands = info.commands.get_or_insert(vec![]);
  177. commands.push(args.iter().map(|m| (*m).clone()).collect());
  178. }
  179. /// Returns a list of queued commands.
  180. pub fn get_queue_commands(&self) -> Option<Vec<Vec<Bytes>>> {
  181. let mut info = self.info.write();
  182. info.watch_keys = vec![];
  183. info.status = ConnectionStatus::ExecutingTx;
  184. info.commands.take()
  185. }
  186. /// Returns a lsit of transaction keys
  187. pub fn tx_keys(&self, keys: Vec<&Bytes>) {
  188. #[allow(clippy::mutable_key_type)]
  189. let tx_keys = &mut self.info.write().tx_keys;
  190. keys.iter()
  191. .map(|k| {
  192. tx_keys.insert((*k).clone());
  193. })
  194. .for_each(drop);
  195. }
  196. /// Disconnects from the server, disconnect from all pubsub channels and remove itself from the
  197. /// all_connection lists.
  198. pub fn destroy(self: Arc<Connection>) {
  199. let pubsub = self.pubsub();
  200. pubsub.unsubscribe(&self.pubsub_client.subscriptions(), &self);
  201. pubsub.punsubscribe(&self.pubsub_client.psubscriptions(), &self);
  202. self.all_connections.clone().remove(self);
  203. }
  204. /// Returns the all_connections (Connections) instance
  205. pub fn all_connections(&self) -> Arc<connections::Connections> {
  206. self.all_connections.clone()
  207. }
  208. /// Returns the connection name
  209. pub fn name(&self) -> Option<String> {
  210. self.info.read().name.clone()
  211. }
  212. /// Sets a connection name
  213. pub fn set_name(&self, name: String) {
  214. let mut r = self.info.write();
  215. r.name = Some(name);
  216. }
  217. /// Changes the current db for the current connection
  218. pub fn selectdb(&self, db: usize) -> Result<Value, Error> {
  219. let mut info = self.info.write();
  220. info.db = self
  221. .all_connections
  222. .get_databases()
  223. .get(db)?
  224. .new_db_instance(self.id);
  225. info.current_db = db;
  226. Ok(Value::Ok)
  227. }
  228. }
  229. impl ToString for Connection {
  230. /// Returns a string representation of this connection
  231. fn to_string(&self) -> String {
  232. let info = self.info.read();
  233. format!(
  234. "id={} addr={} name={:?} db={}\r\n",
  235. self.id, self.addr, info.name, info.current_db
  236. )
  237. }
  238. }