mod.rs 7.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257
  1. //! # Connection module
  2. use crate::{db::Db, error::Error, value::Value};
  3. use bytes::Bytes;
  4. use parking_lot::RwLock;
  5. use std::{collections::HashSet, net::SocketAddr, sync::Arc};
  6. use self::pubsub_server::Pubsub;
  7. pub mod connections;
  8. pub mod pubsub_connection;
  9. pub mod pubsub_server;
  10. /// Possible status of connections
  11. #[derive(Debug, Clone, Copy, Eq, PartialEq)]
  12. pub enum ConnectionStatus {
  13. /// The connection is in a MULTI stage and commands are being queued
  14. Multi,
  15. /// The connection is executing a transaction
  16. ExecutingTx,
  17. /// The connection is in pub-sub only mode
  18. Pubsub,
  19. /// The connection is a normal conection
  20. Normal,
  21. }
  22. impl Default for ConnectionStatus {
  23. fn default() -> Self {
  24. ConnectionStatus::Normal
  25. }
  26. }
  27. /// Connection information
  28. #[derive(Debug)]
  29. pub struct ConnectionInfo {
  30. name: Option<String>,
  31. watch_keys: Vec<(Bytes, u128)>,
  32. tx_keys: HashSet<Bytes>,
  33. status: ConnectionStatus,
  34. commands: Option<Vec<Vec<Bytes>>>,
  35. }
  36. /// Connection
  37. #[derive(Debug)]
  38. pub struct Connection {
  39. id: u128,
  40. db: Db,
  41. current_db: u32,
  42. all_connections: Arc<connections::Connections>,
  43. addr: SocketAddr,
  44. info: RwLock<ConnectionInfo>,
  45. pubsub_client: pubsub_connection::PubsubClient,
  46. }
  47. impl ConnectionInfo {
  48. /// Creates a new connection
  49. fn new() -> Self {
  50. Self {
  51. name: None,
  52. watch_keys: vec![],
  53. tx_keys: HashSet::new(),
  54. commands: None,
  55. status: ConnectionStatus::Normal,
  56. }
  57. }
  58. }
  59. impl Connection {
  60. /// Returns a connection database.
  61. ///
  62. /// The database object is unique to this connection but most of its internal structure is
  63. /// shared (like the entries).
  64. pub fn db(&self) -> &Db {
  65. &self.db
  66. }
  67. /// Returns the global pubsub server
  68. pub fn pubsub(&self) -> Arc<Pubsub> {
  69. self.all_connections.pubsub()
  70. }
  71. /// Returns a reference to the pubsub client
  72. pub fn pubsub_client(&self) -> &pubsub_connection::PubsubClient {
  73. &self.pubsub_client
  74. }
  75. /// Switch the connection to a pub-sub only mode
  76. pub fn start_pubsub(&self) -> Result<Value, Error> {
  77. let mut info = self.info.write();
  78. match info.status {
  79. ConnectionStatus::Normal | ConnectionStatus::Pubsub => {
  80. info.status = ConnectionStatus::Pubsub;
  81. Ok(Value::Ok)
  82. }
  83. _ => Err(Error::NestedTx),
  84. }
  85. }
  86. /// Connection ID
  87. pub fn id(&self) -> u128 {
  88. self.id
  89. }
  90. /// Drops a multi/transaction and reset the connection
  91. ///
  92. /// If the connection was not in a MULTI stage an error is thrown.
  93. pub fn stop_transaction(&self) -> Result<Value, Error> {
  94. let mut info = self.info.write();
  95. if info.status == ConnectionStatus::Multi {
  96. info.commands = None;
  97. info.watch_keys.clear();
  98. info.tx_keys.clear();
  99. info.status = ConnectionStatus::ExecutingTx;
  100. Ok(Value::Ok)
  101. } else {
  102. Err(Error::NotInTx)
  103. }
  104. }
  105. /// Starts a transaction/multi
  106. ///
  107. /// Nested transactions are not possible.
  108. pub fn start_transaction(&self) -> Result<Value, Error> {
  109. let mut info = self.info.write();
  110. if info.status == ConnectionStatus::Normal {
  111. info.status = ConnectionStatus::Multi;
  112. Ok(Value::Ok)
  113. } else {
  114. Err(Error::NestedTx)
  115. }
  116. }
  117. /// Resets the current connection.
  118. pub fn reset(&self) {
  119. let mut info = self.info.write();
  120. info.status = ConnectionStatus::Normal;
  121. info.name = None;
  122. info.watch_keys = vec![];
  123. info.commands = None;
  124. info.tx_keys = HashSet::new();
  125. let pubsub = self.pubsub();
  126. pubsub.unsubscribe(&self.pubsub_client.subscriptions(), self);
  127. pubsub.punsubscribe(&self.pubsub_client.psubscriptions(), self);
  128. }
  129. /// Returns the status of the connection
  130. pub fn status(&self) -> ConnectionStatus {
  131. self.info.read().status
  132. }
  133. /// Watches keys. In a transaction watched keys are a mechanism to discard a transaction if
  134. /// some value changed since the moment the command was queued until the execution time.
  135. pub fn watch_key(&self, keys: &[(&Bytes, u128)]) {
  136. let watch_keys = &mut self.info.write().watch_keys;
  137. keys.iter()
  138. .map(|(bytes, version)| {
  139. watch_keys.push(((*bytes).clone(), *version));
  140. })
  141. .for_each(drop);
  142. }
  143. /// Returns true if any of the watched keys changed their value
  144. pub fn did_keys_change(&self) -> bool {
  145. let watch_keys = &self.info.read().watch_keys;
  146. for key in watch_keys.iter() {
  147. if self.db.get_version(&key.0) != key.1 {
  148. return true;
  149. }
  150. }
  151. false
  152. }
  153. /// Resets the watched keys list
  154. pub fn discard_watched_keys(&self) {
  155. self.info.write().watch_keys.clear()
  156. }
  157. /// Returns a list of key that are involved in a transaction. These keys will be locked as
  158. /// exclusive, even if they don't exists, during the execution of a transction.
  159. ///
  160. /// The original implementation of Redis does not need this promise because only one
  161. /// transaction is executed at a time, in microredis transactions reserve their keys and do not
  162. /// prevent other connections to continue modifying the database.
  163. pub fn get_tx_keys(&self) -> Vec<Bytes> {
  164. self.info
  165. .read()
  166. .tx_keys
  167. .iter()
  168. .cloned()
  169. .collect::<Vec<Bytes>>()
  170. }
  171. /// Queues a command for later execution
  172. pub fn queue_command(&self, args: &[Bytes]) {
  173. let mut info = self.info.write();
  174. let commands = info.commands.get_or_insert(vec![]);
  175. commands.push(args.iter().map(|m| (*m).clone()).collect());
  176. }
  177. /// Returns a list of queued commands.
  178. pub fn get_queue_commands(&self) -> Option<Vec<Vec<Bytes>>> {
  179. let mut info = self.info.write();
  180. info.watch_keys = vec![];
  181. info.status = ConnectionStatus::ExecutingTx;
  182. info.commands.take()
  183. }
  184. /// Returns a lsit of transaction keys
  185. pub fn tx_keys(&self, keys: Vec<&Bytes>) {
  186. #[allow(clippy::mutable_key_type)]
  187. let tx_keys = &mut self.info.write().tx_keys;
  188. keys.iter()
  189. .map(|k| {
  190. tx_keys.insert((*k).clone());
  191. })
  192. .for_each(drop);
  193. }
  194. /// Disconnects from the server, disconnect from all pubsub channels and remove itself from the
  195. /// all_connection lists.
  196. pub fn destroy(self: Arc<Connection>) {
  197. let pubsub = self.pubsub();
  198. pubsub.unsubscribe(&self.pubsub_client.subscriptions(), &self);
  199. pubsub.punsubscribe(&self.pubsub_client.psubscriptions(), &self);
  200. self.all_connections.clone().remove(self);
  201. }
  202. /// Returns the all_connections (Connections) instance
  203. pub fn all_connections(&self) -> Arc<connections::Connections> {
  204. self.all_connections.clone()
  205. }
  206. /// Returns the connection name
  207. pub fn name(&self) -> Option<String> {
  208. self.info.read().name.clone()
  209. }
  210. /// Sets a connection name
  211. pub fn set_name(&self, name: String) {
  212. let mut r = self.info.write();
  213. r.name = Some(name);
  214. }
  215. /// Returns a string representation of this connection
  216. pub fn as_string(&self) -> String {
  217. format!(
  218. "id={} addr={} name={:?} db={}\r\n",
  219. self.id,
  220. self.addr,
  221. self.info.read().name,
  222. self.current_db
  223. )
  224. }
  225. }