command.rs 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228
  1. //! # Dispatcher
  2. //!
  3. //! Here is where every command is defined. Each command has some definition and a handler. Their
  4. //! handler are rust functions.
  5. //!
  6. //! Each command is defined with the dispatcher macro, which generates efficient and developer
  7. //! friendly code.
  8. use crate::{
  9. connection::{Connection, ConnectionStatus},
  10. dispatcher,
  11. error::Error,
  12. value::Value,
  13. };
  14. use bytes::Bytes;
  15. use metered::{ErrorCount, HitCount, InFlight, ResponseTime, Throughput};
  16. use std::convert::TryInto;
  17. /// Command Flags
  18. #[derive(Debug, Eq, PartialEq, Clone, Copy)]
  19. pub enum Flag {
  20. /// May result in database modification
  21. Write,
  22. /// Will never modify database
  23. ReadOnly,
  24. /// Can fail if the server runs out of memory
  25. DenyOom,
  26. /// Server Admin command
  27. Admin,
  28. /// Pubsub related command
  29. PubSub,
  30. /// Not used, added to be compatible
  31. NoScript,
  32. /// Random result
  33. Random,
  34. /// Not used, added to be compatible
  35. SortForScript,
  36. /// Allow command while database is loading
  37. Loading,
  38. /// Allow command while replica has stale data
  39. Stale,
  40. /// Do not show this command in MONITOR
  41. SkipMonitor,
  42. /// Do not gather stats about slow log
  43. SkipSlowlog,
  44. /// The command is fast (Close to log(N) time)
  45. Fast,
  46. /// Command may be replicated to other nodes
  47. MayReplicate,
  48. }
  49. impl ToString for Flag {
  50. fn to_string(&self) -> String {
  51. match self {
  52. Self::Write => "write",
  53. Self::DenyOom => "denyoom",
  54. Self::ReadOnly => "readonly",
  55. Self::Admin => "admin",
  56. Self::PubSub => "pubsub",
  57. Self::NoScript => "noscript",
  58. Self::Random => "random",
  59. Self::SortForScript => "sort_for_script",
  60. Self::Loading => "loading",
  61. Self::Stale => "stale",
  62. Self::SkipMonitor => "skip_monitor",
  63. Self::SkipSlowlog => "skip_slowlog",
  64. Self::Fast => "fast",
  65. Self::MayReplicate => "may_replicate",
  66. }
  67. .to_owned()
  68. }
  69. }
  70. /// Command definition
  71. #[derive(Debug)]
  72. pub struct Command {
  73. name: &'static str,
  74. group: &'static str,
  75. flags: &'static [Flag],
  76. min_args: i32,
  77. key_start: i32,
  78. key_stop: i32,
  79. key_step: usize,
  80. is_queueable: bool,
  81. metrics: Metrics,
  82. }
  83. /// Metric struct for all command
  84. #[derive(Debug, Default, serde::Serialize)]
  85. pub struct Metrics {
  86. /// Command hits
  87. pub hit_count: HitCount,
  88. /// Error count
  89. pub error_count: ErrorCount,
  90. /// How many concurrent executions are happening right now
  91. pub in_flight: InFlight,
  92. /// Response time
  93. pub response_time: ResponseTime,
  94. /// Throughput
  95. pub throughput: Throughput,
  96. }
  97. impl Command {
  98. /// Creates a new comamnd
  99. pub fn new(
  100. name: &'static str,
  101. group: &'static str,
  102. flags: &'static [Flag],
  103. min_args: i32,
  104. key_start: i32,
  105. key_stop: i32,
  106. key_step: usize,
  107. is_queueable: bool,
  108. ) -> Self {
  109. Self {
  110. name,
  111. group,
  112. flags,
  113. min_args,
  114. key_start,
  115. key_stop,
  116. key_step,
  117. is_queueable,
  118. metrics: Metrics::default(),
  119. }
  120. }
  121. /// Returns a reference to the metrics
  122. pub fn metrics(&self) -> &Metrics {
  123. &self.metrics
  124. }
  125. /// Can this command be executed in a pub-sub only mode?
  126. pub fn is_pubsub_executable(&self) -> bool {
  127. self.group == "pubsub" || self.name == "PING" || self.name == "RESET" || self.name == "QUIT"
  128. }
  129. /// Can this command be queued in a transaction or should it be executed right away?
  130. pub fn is_queueable(&self) -> bool {
  131. self.is_queueable
  132. }
  133. /// Returns all database keys from the command arguments
  134. pub fn get_keys<'a>(&self, args: &'a [Bytes]) -> Vec<&'a Bytes> {
  135. let start = self.key_start;
  136. let stop = if self.key_stop > 0 {
  137. self.key_stop
  138. } else {
  139. (args.len() as i32) + self.key_stop
  140. };
  141. if start == 0 {
  142. return vec![];
  143. }
  144. let mut result = vec![];
  145. for i in (start..stop + 1).step_by(self.key_step) {
  146. result.push(&args[i as usize]);
  147. }
  148. result
  149. }
  150. /// Checks if a given number of args is expected by this command
  151. pub fn check_number_args(&self, n: usize) -> bool {
  152. if (self.min_args >= 0) {
  153. n == (self.min_args as i32).try_into().unwrap_or(0)
  154. } else {
  155. let s: usize = (self.min_args as i32).abs().try_into().unwrap_or(0);
  156. n >= s
  157. }
  158. }
  159. /// Returns information about this command. The response is encoded as a
  160. /// Value, following the output of the COMMAND command in redis
  161. pub fn get_command_info(&self) -> Value {
  162. Value::Array(vec![
  163. self.name().into(),
  164. self.get_min_args().into(),
  165. Value::Array(
  166. self.get_flags()
  167. .iter()
  168. .map(|m| m.to_string().into())
  169. .collect(),
  170. ),
  171. self.get_key_start().into(),
  172. self.get_key_stop().into(),
  173. self.get_key_step().into(),
  174. ])
  175. }
  176. /// Returns the command's flags
  177. pub fn get_flags(&self) -> Vec<Flag> {
  178. self.flags.to_vec()
  179. }
  180. /// Returns the minimum arguments (including the command name itself) that
  181. /// this command takes. This is also known as the arity of a command.
  182. pub fn get_min_args(&self) -> i32 {
  183. self.min_args
  184. }
  185. /// Where does the first database 'key' start in the arguments list
  186. pub fn get_key_start(&self) -> i32 {
  187. self.key_start
  188. }
  189. /// Where does the last database 'key' start in the arguments list
  190. pub fn get_key_stop(&self) -> i32 {
  191. self.key_stop
  192. }
  193. /// Useful to extract 'keys' from the arguments alongside with key_stop and
  194. /// key_start
  195. pub fn get_key_step(&self) -> usize {
  196. self.key_step
  197. }
  198. /// Command group
  199. pub fn group(&self) -> &'static str {
  200. &self.group
  201. }
  202. /// Command name
  203. pub fn name(&self) -> &'static str {
  204. &self.name
  205. }
  206. }