stream.c 9.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258
  1. #include "redismodule.h"
  2. #include <string.h>
  3. #include <strings.h>
  4. #include <assert.h>
  5. #include <unistd.h>
  6. #include <errno.h>
  7. /* Command which adds a stream entry with automatic ID, like XADD *.
  8. *
  9. * Syntax: STREAM.ADD key field1 value1 [ field2 value2 ... ]
  10. *
  11. * The response is the ID of the added stream entry or an error message.
  12. */
  13. int stream_add(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
  14. if (argc < 2 || argc % 2 != 0) {
  15. RedisModule_WrongArity(ctx);
  16. return REDISMODULE_OK;
  17. }
  18. RedisModuleKey *key = RedisModule_OpenKey(ctx, argv[1], REDISMODULE_WRITE);
  19. RedisModuleStreamID id;
  20. if (RedisModule_StreamAdd(key, REDISMODULE_STREAM_ADD_AUTOID, &id,
  21. &argv[2], (argc-2)/2) == REDISMODULE_OK) {
  22. RedisModuleString *id_str = RedisModule_CreateStringFromStreamID(ctx, &id);
  23. RedisModule_ReplyWithString(ctx, id_str);
  24. RedisModule_FreeString(ctx, id_str);
  25. } else {
  26. RedisModule_ReplyWithError(ctx, "ERR StreamAdd failed");
  27. }
  28. RedisModule_CloseKey(key);
  29. return REDISMODULE_OK;
  30. }
  31. /* Command which adds a stream entry N times.
  32. *
  33. * Syntax: STREAM.ADD key N field1 value1 [ field2 value2 ... ]
  34. *
  35. * Returns the number of successfully added entries.
  36. */
  37. int stream_addn(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
  38. if (argc < 3 || argc % 2 == 0) {
  39. RedisModule_WrongArity(ctx);
  40. return REDISMODULE_OK;
  41. }
  42. long long n, i;
  43. if (RedisModule_StringToLongLong(argv[2], &n) == REDISMODULE_ERR) {
  44. RedisModule_ReplyWithError(ctx, "N must be a number");
  45. return REDISMODULE_OK;
  46. }
  47. RedisModuleKey *key = RedisModule_OpenKey(ctx, argv[1], REDISMODULE_WRITE);
  48. for (i = 0; i < n; i++) {
  49. if (RedisModule_StreamAdd(key, REDISMODULE_STREAM_ADD_AUTOID, NULL,
  50. &argv[3], (argc-3)/2) == REDISMODULE_ERR)
  51. break;
  52. }
  53. RedisModule_ReplyWithLongLong(ctx, i);
  54. RedisModule_CloseKey(key);
  55. return REDISMODULE_OK;
  56. }
  57. /* STREAM.DELETE key stream-id */
  58. int stream_delete(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
  59. if (argc != 3) return RedisModule_WrongArity(ctx);
  60. RedisModuleStreamID id;
  61. if (RedisModule_StringToStreamID(argv[2], &id) != REDISMODULE_OK) {
  62. return RedisModule_ReplyWithError(ctx, "Invalid stream ID");
  63. }
  64. RedisModuleKey *key = RedisModule_OpenKey(ctx, argv[1], REDISMODULE_WRITE);
  65. if (RedisModule_StreamDelete(key, &id) == REDISMODULE_OK) {
  66. RedisModule_ReplyWithSimpleString(ctx, "OK");
  67. } else {
  68. RedisModule_ReplyWithError(ctx, "ERR StreamDelete failed");
  69. }
  70. RedisModule_CloseKey(key);
  71. return REDISMODULE_OK;
  72. }
  73. /* STREAM.RANGE key start-id end-id
  74. *
  75. * Returns an array of stream items. Each item is an array on the form
  76. * [stream-id, [field1, value1, field2, value2, ...]].
  77. *
  78. * A funny side-effect used for testing RM_StreamIteratorDelete() is that if any
  79. * entry has a field named "selfdestruct", the stream entry is deleted. It is
  80. * however included in the results of this command.
  81. */
  82. int stream_range(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
  83. if (argc != 4) {
  84. RedisModule_WrongArity(ctx);
  85. return REDISMODULE_OK;
  86. }
  87. RedisModuleStreamID startid, endid;
  88. if (RedisModule_StringToStreamID(argv[2], &startid) != REDISMODULE_OK ||
  89. RedisModule_StringToStreamID(argv[3], &endid) != REDISMODULE_OK) {
  90. RedisModule_ReplyWithError(ctx, "Invalid stream ID");
  91. return REDISMODULE_OK;
  92. }
  93. /* If startid > endid, we swap and set the reverse flag. */
  94. int flags = 0;
  95. if (startid.ms > endid.ms ||
  96. (startid.ms == endid.ms && startid.seq > endid.seq)) {
  97. RedisModuleStreamID tmp = startid;
  98. startid = endid;
  99. endid = tmp;
  100. flags |= REDISMODULE_STREAM_ITERATOR_REVERSE;
  101. }
  102. /* Open key and start iterator. */
  103. int openflags = REDISMODULE_READ | REDISMODULE_WRITE;
  104. RedisModuleKey *key = RedisModule_OpenKey(ctx, argv[1], openflags);
  105. if (RedisModule_StreamIteratorStart(key, flags,
  106. &startid, &endid) != REDISMODULE_OK) {
  107. /* Key is not a stream, etc. */
  108. RedisModule_ReplyWithError(ctx, "ERR StreamIteratorStart failed");
  109. RedisModule_CloseKey(key);
  110. return REDISMODULE_OK;
  111. }
  112. /* Check error handling: Delete current entry when no current entry. */
  113. assert(RedisModule_StreamIteratorDelete(key) ==
  114. REDISMODULE_ERR);
  115. assert(errno == ENOENT);
  116. /* Check error handling: Fetch fields when no current entry. */
  117. assert(RedisModule_StreamIteratorNextField(key, NULL, NULL) ==
  118. REDISMODULE_ERR);
  119. assert(errno == ENOENT);
  120. /* Return array. */
  121. RedisModule_ReplyWithArray(ctx, REDISMODULE_POSTPONED_ARRAY_LEN);
  122. RedisModule_AutoMemory(ctx);
  123. RedisModuleStreamID id;
  124. long numfields;
  125. long len = 0;
  126. while (RedisModule_StreamIteratorNextID(key, &id,
  127. &numfields) == REDISMODULE_OK) {
  128. RedisModule_ReplyWithArray(ctx, 2);
  129. RedisModuleString *id_str = RedisModule_CreateStringFromStreamID(ctx, &id);
  130. RedisModule_ReplyWithString(ctx, id_str);
  131. RedisModule_ReplyWithArray(ctx, numfields * 2);
  132. int delete = 0;
  133. RedisModuleString *field, *value;
  134. for (long i = 0; i < numfields; i++) {
  135. assert(RedisModule_StreamIteratorNextField(key, &field, &value) ==
  136. REDISMODULE_OK);
  137. RedisModule_ReplyWithString(ctx, field);
  138. RedisModule_ReplyWithString(ctx, value);
  139. /* check if this is a "selfdestruct" field */
  140. size_t field_len;
  141. const char *field_str = RedisModule_StringPtrLen(field, &field_len);
  142. if (!strncmp(field_str, "selfdestruct", field_len)) delete = 1;
  143. }
  144. if (delete) {
  145. assert(RedisModule_StreamIteratorDelete(key) == REDISMODULE_OK);
  146. }
  147. /* check error handling: no more fields to fetch */
  148. assert(RedisModule_StreamIteratorNextField(key, &field, &value) ==
  149. REDISMODULE_ERR);
  150. assert(errno == ENOENT);
  151. len++;
  152. }
  153. RedisModule_ReplySetArrayLength(ctx, len);
  154. RedisModule_StreamIteratorStop(key);
  155. RedisModule_CloseKey(key);
  156. return REDISMODULE_OK;
  157. }
  158. /*
  159. * STREAM.TRIM key (MAXLEN (=|~) length | MINID (=|~) id)
  160. */
  161. int stream_trim(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
  162. if (argc != 5) {
  163. RedisModule_WrongArity(ctx);
  164. return REDISMODULE_OK;
  165. }
  166. /* Parse args */
  167. int trim_by_id = 0; /* 0 = maxlen, 1 = minid */
  168. long long maxlen;
  169. RedisModuleStreamID minid;
  170. size_t arg_len;
  171. const char *arg = RedisModule_StringPtrLen(argv[2], &arg_len);
  172. if (!strcasecmp(arg, "minid")) {
  173. trim_by_id = 1;
  174. if (RedisModule_StringToStreamID(argv[4], &minid) != REDISMODULE_OK) {
  175. RedisModule_ReplyWithError(ctx, "ERR Invalid stream ID");
  176. return REDISMODULE_OK;
  177. }
  178. } else if (!strcasecmp(arg, "maxlen")) {
  179. if (RedisModule_StringToLongLong(argv[4], &maxlen) == REDISMODULE_ERR) {
  180. RedisModule_ReplyWithError(ctx, "ERR Maxlen must be a number");
  181. return REDISMODULE_OK;
  182. }
  183. } else {
  184. RedisModule_ReplyWithError(ctx, "ERR Invalid arguments");
  185. return REDISMODULE_OK;
  186. }
  187. /* Approx or exact */
  188. int flags;
  189. arg = RedisModule_StringPtrLen(argv[3], &arg_len);
  190. if (arg_len == 1 && arg[0] == '~') {
  191. flags = REDISMODULE_STREAM_TRIM_APPROX;
  192. } else if (arg_len == 1 && arg[0] == '=') {
  193. flags = 0;
  194. } else {
  195. RedisModule_ReplyWithError(ctx, "ERR Invalid approx-or-exact mark");
  196. return REDISMODULE_OK;
  197. }
  198. /* Trim */
  199. RedisModuleKey *key = RedisModule_OpenKey(ctx, argv[1], REDISMODULE_WRITE);
  200. long long trimmed;
  201. if (trim_by_id) {
  202. trimmed = RedisModule_StreamTrimByID(key, flags, &minid);
  203. } else {
  204. trimmed = RedisModule_StreamTrimByLength(key, flags, maxlen);
  205. }
  206. /* Return result */
  207. if (trimmed < 0) {
  208. RedisModule_ReplyWithError(ctx, "ERR Trimming failed");
  209. } else {
  210. RedisModule_ReplyWithLongLong(ctx, trimmed);
  211. }
  212. RedisModule_CloseKey(key);
  213. return REDISMODULE_OK;
  214. }
  215. int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
  216. REDISMODULE_NOT_USED(argv);
  217. REDISMODULE_NOT_USED(argc);
  218. if (RedisModule_Init(ctx, "stream", 1, REDISMODULE_APIVER_1) == REDISMODULE_ERR)
  219. return REDISMODULE_ERR;
  220. if (RedisModule_CreateCommand(ctx, "stream.add", stream_add, "",
  221. 1, 1, 1) == REDISMODULE_ERR)
  222. return REDISMODULE_ERR;
  223. if (RedisModule_CreateCommand(ctx, "stream.addn", stream_addn, "",
  224. 1, 1, 1) == REDISMODULE_ERR)
  225. return REDISMODULE_ERR;
  226. if (RedisModule_CreateCommand(ctx, "stream.delete", stream_delete, "",
  227. 1, 1, 1) == REDISMODULE_ERR)
  228. return REDISMODULE_ERR;
  229. if (RedisModule_CreateCommand(ctx, "stream.range", stream_range, "",
  230. 1, 1, 1) == REDISMODULE_ERR)
  231. return REDISMODULE_ERR;
  232. if (RedisModule_CreateCommand(ctx, "stream.trim", stream_trim, "",
  233. 1, 1, 1) == REDISMODULE_ERR)
  234. return REDISMODULE_ERR;
  235. return REDISMODULE_OK;
  236. }