cursor.rs 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115
  1. //! Cursor implementation
  2. use byteorder::{LittleEndian, WriteBytesExt};
  3. use bytes::Bytes;
  4. use crc32fast::Hasher as Crc32Hasher;
  5. use std::{convert::TryFrom, num::ParseIntError, str::FromStr};
  6. use thiserror::Error;
  7. /// Error
  8. #[derive(Error, Debug, PartialEq)]
  9. pub enum Error {
  10. #[error("Parsing Error")]
  11. /// Parsing Int error
  12. Int(#[from] ParseIntError),
  13. #[error("I/O Error")]
  14. /// I/O error
  15. Io,
  16. }
  17. /// Cursor.
  18. ///
  19. /// Redis cursors are stateless. They serialize into a u128 integer information
  20. /// about the latest processed bucket and the last position with a checksum
  21. /// value to make sure the number is valid.
  22. #[derive(Debug, Eq, PartialEq)]
  23. pub struct Cursor {
  24. checksum: u32,
  25. /// Current Bucket ID
  26. pub bucket: u16,
  27. /// Last position of the key that was processed
  28. pub last_position: u64,
  29. }
  30. impl Cursor {
  31. /// Creates a new cursor
  32. pub fn new(bucket: u16, last_position: u64) -> Result<Self, Error> {
  33. let mut hasher = Crc32Hasher::new();
  34. let mut buf = vec![];
  35. buf.write_u16::<LittleEndian>(bucket)
  36. .map_err(|_| Error::Io)?;
  37. buf.write_u64::<LittleEndian>(last_position)
  38. .map_err(|_| Error::Io)?;
  39. hasher.update(&buf);
  40. Ok(Self {
  41. checksum: hasher.finalize(),
  42. bucket,
  43. last_position,
  44. })
  45. }
  46. /// Serializes the cursor a single u128 integer
  47. pub fn serialize(&self) -> u128 {
  48. let bucket: u128 = self.bucket.into();
  49. let last_position: u128 = self.last_position as u128;
  50. if bucket == last_position && bucket == 0 {
  51. return 0;
  52. }
  53. let checksum: u128 = self.checksum.into();
  54. (checksum << 80) | (bucket << 64) | (last_position)
  55. }
  56. }
  57. impl FromStr for Cursor {
  58. type Err = Error;
  59. /// Deserializes a cursor from a string. The string must be a valid number.
  60. /// If the number is invalid or the checksum is not valid a new cursor with
  61. /// position 0,0 is returned.
  62. fn from_str(s: &str) -> Result<Self, Self::Err> {
  63. let raw_number: u128 = u128::from_str(s)?;
  64. let checksum: u32 = (raw_number >> 80) as u32;
  65. let cursor = Self::new((raw_number >> 64) as u16, raw_number as u64)?;
  66. if cursor.checksum == checksum {
  67. Ok(cursor)
  68. } else {
  69. Ok(Self::new(0, 0)?)
  70. }
  71. }
  72. }
  73. impl TryFrom<&Bytes> for Cursor {
  74. type Error = Error;
  75. fn try_from(v: &Bytes) -> Result<Self, Self::Error> {
  76. Cursor::from_str(&String::from_utf8_lossy(v))
  77. }
  78. }
  79. impl ToString for Cursor {
  80. fn to_string(&self) -> String {
  81. self.serialize().to_string()
  82. }
  83. }
  84. #[cfg(test)]
  85. mod test {
  86. use super::*;
  87. #[test]
  88. fn serialize_end() {
  89. let x = Cursor::new(0, 0).unwrap();
  90. assert_eq!("0", x.to_string());
  91. }
  92. #[test]
  93. fn serialize() {
  94. for e in 0..255 {
  95. for i in 1..10000 {
  96. let x = Cursor::new(e, i).unwrap();
  97. let y = Cursor::from_str(&x.to_string()).unwrap();
  98. assert_eq!(x, y);
  99. }
  100. }
  101. }
  102. }