stream.tcl 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155
  1. set testmodule [file normalize tests/modules/stream.so]
  2. start_server {tags {"modules"}} {
  3. r module load $testmodule
  4. test {Module stream add and delete} {
  5. r del mystream
  6. # add to empty key
  7. set streamid1 [r stream.add mystream item 1 value a]
  8. # add to existing stream
  9. set streamid2 [r stream.add mystream item 2 value b]
  10. # check result
  11. assert { [string match "*-*" $streamid1] }
  12. set items [r XRANGE mystream - +]
  13. assert_equal $items \
  14. "{$streamid1 {item 1 value a}} {$streamid2 {item 2 value b}}"
  15. # delete one of them and try deleting non-existing ID
  16. assert_equal OK [r stream.delete mystream $streamid1]
  17. assert_error "ERR StreamDelete*" {r stream.delete mystream 123-456}
  18. assert_error "Invalid stream ID*" {r stream.delete mystream foo}
  19. assert_equal "{$streamid2 {item 2 value b}}" [r XRANGE mystream - +]
  20. # check error condition: wrong type
  21. r del mystream
  22. r set mystream mystring
  23. assert_error "ERR StreamAdd*" {r stream.add mystream item 1 value a}
  24. assert_error "ERR StreamDelete*" {r stream.delete mystream 123-456}
  25. }
  26. test {Module stream add unblocks blocking xread} {
  27. r del mystream
  28. # Blocking XREAD on an empty key
  29. set rd1 [redis_deferring_client]
  30. $rd1 XREAD BLOCK 3000 STREAMS mystream $
  31. # wait until client is actually blocked
  32. wait_for_condition 50 100 {
  33. [s 0 blocked_clients] eq {1}
  34. } else {
  35. fail "Client is not blocked"
  36. }
  37. set id [r stream.add mystream field 1 value a]
  38. assert_equal "{mystream {{$id {field 1 value a}}}}" [$rd1 read]
  39. # Blocking XREAD on an existing stream
  40. set rd2 [redis_deferring_client]
  41. $rd2 XREAD BLOCK 3000 STREAMS mystream $
  42. # wait until client is actually blocked
  43. wait_for_condition 50 100 {
  44. [s 0 blocked_clients] eq {1}
  45. } else {
  46. fail "Client is not blocked"
  47. }
  48. set id [r stream.add mystream field 2 value b]
  49. assert_equal "{mystream {{$id {field 2 value b}}}}" [$rd2 read]
  50. }
  51. test {Module stream add benchmark (1M stream add)} {
  52. set n 1000000
  53. r del mystream
  54. set result [r stream.addn mystream $n field value]
  55. assert_equal $result $n
  56. }
  57. test {Module stream iterator} {
  58. r del mystream
  59. set streamid1 [r xadd mystream * item 1 value a]
  60. set streamid2 [r xadd mystream * item 2 value b]
  61. # range result
  62. set result1 [r stream.range mystream "-" "+"]
  63. set expect1 [r xrange mystream "-" "+"]
  64. assert_equal $result1 $expect1
  65. # reverse range
  66. set result_rev [r stream.range mystream "+" "-"]
  67. set expect_rev [r xrevrange mystream "+" "-"]
  68. assert_equal $result_rev $expect_rev
  69. # only one item: range with startid = endid
  70. set result2 [r stream.range mystream "-" $streamid1]
  71. assert_equal $result2 "{$streamid1 {item 1 value a}}"
  72. assert_equal $result2 [list [list $streamid1 {item 1 value a}]]
  73. # only one item: range with startid = endid
  74. set result3 [r stream.range mystream $streamid2 $streamid2]
  75. assert_equal $result3 "{$streamid2 {item 2 value b}}"
  76. assert_equal $result3 [list [list $streamid2 {item 2 value b}]]
  77. }
  78. test {Module stream iterator delete} {
  79. r del mystream
  80. set id1 [r xadd mystream * normal item]
  81. set id2 [r xadd mystream * selfdestruct yes]
  82. set id3 [r xadd mystream * another item]
  83. # stream.range deletes the "selfdestruct" item after returning it
  84. assert_equal \
  85. "{$id1 {normal item}} {$id2 {selfdestruct yes}} {$id3 {another item}}" \
  86. [r stream.range mystream - +]
  87. # now, the "selfdestruct" item is gone
  88. assert_equal \
  89. "{$id1 {normal item}} {$id3 {another item}}" \
  90. [r stream.range mystream - +]
  91. }
  92. test {Module stream trim by length} {
  93. r del mystream
  94. # exact maxlen
  95. r xadd mystream * item 1 value a
  96. r xadd mystream * item 2 value b
  97. r xadd mystream * item 3 value c
  98. assert_equal 3 [r xlen mystream]
  99. assert_equal 0 [r stream.trim mystream maxlen = 5]
  100. assert_equal 3 [r xlen mystream]
  101. assert_equal 2 [r stream.trim mystream maxlen = 1]
  102. assert_equal 1 [r xlen mystream]
  103. assert_equal 1 [r stream.trim mystream maxlen = 0]
  104. # check that there is no limit for exact maxlen
  105. r stream.addn mystream 20000 item x value y
  106. assert_equal 20000 [r stream.trim mystream maxlen = 0]
  107. # approx maxlen (100 items per node implies default limit 10K items)
  108. r stream.addn mystream 20000 item x value y
  109. assert_equal 20000 [r xlen mystream]
  110. assert_equal 10000 [r stream.trim mystream maxlen ~ 2]
  111. assert_equal 9900 [r stream.trim mystream maxlen ~ 2]
  112. assert_equal 0 [r stream.trim mystream maxlen ~ 2]
  113. assert_equal 100 [r xlen mystream]
  114. assert_equal 100 [r stream.trim mystream maxlen ~ 0]
  115. assert_equal 0 [r xlen mystream]
  116. }
  117. test {Module stream trim by ID} {
  118. r del mystream
  119. # exact minid
  120. r xadd mystream * item 1 value a
  121. r xadd mystream * item 2 value b
  122. set minid [r xadd mystream * item 3 value c]
  123. assert_equal 3 [r xlen mystream]
  124. assert_equal 0 [r stream.trim mystream minid = -]
  125. assert_equal 3 [r xlen mystream]
  126. assert_equal 2 [r stream.trim mystream minid = $minid]
  127. assert_equal 1 [r xlen mystream]
  128. assert_equal 1 [r stream.trim mystream minid = +]
  129. # check that there is no limit for exact minid
  130. r stream.addn mystream 20000 item x value y
  131. assert_equal 20000 [r stream.trim mystream minid = +]
  132. # approx minid (100 items per node implies default limit 10K items)
  133. r stream.addn mystream 19980 item x value y
  134. set minid [r xadd mystream * item x value y]
  135. r stream.addn mystream 19 item x value y
  136. assert_equal 20000 [r xlen mystream]
  137. assert_equal 10000 [r stream.trim mystream minid ~ $minid]
  138. assert_equal 9900 [r stream.trim mystream minid ~ $minid]
  139. assert_equal 0 [r stream.trim mystream minid ~ $minid]
  140. assert_equal 100 [r xlen mystream]
  141. assert_equal 100 [r stream.trim mystream minid ~ +]
  142. assert_equal 0 [r xlen mystream]
  143. }
  144. }