123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258 |
- #include "redismodule.h"
- #include <string.h>
- #include <strings.h>
- #include <assert.h>
- #include <unistd.h>
- #include <errno.h>
- /* Command which adds a stream entry with automatic ID, like XADD *.
- *
- * Syntax: STREAM.ADD key field1 value1 [ field2 value2 ... ]
- *
- * The response is the ID of the added stream entry or an error message.
- */
- int stream_add(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
- if (argc < 2 || argc % 2 != 0) {
- RedisModule_WrongArity(ctx);
- return REDISMODULE_OK;
- }
- RedisModuleKey *key = RedisModule_OpenKey(ctx, argv[1], REDISMODULE_WRITE);
- RedisModuleStreamID id;
- if (RedisModule_StreamAdd(key, REDISMODULE_STREAM_ADD_AUTOID, &id,
- &argv[2], (argc-2)/2) == REDISMODULE_OK) {
- RedisModuleString *id_str = RedisModule_CreateStringFromStreamID(ctx, &id);
- RedisModule_ReplyWithString(ctx, id_str);
- RedisModule_FreeString(ctx, id_str);
- } else {
- RedisModule_ReplyWithError(ctx, "ERR StreamAdd failed");
- }
- RedisModule_CloseKey(key);
- return REDISMODULE_OK;
- }
- /* Command which adds a stream entry N times.
- *
- * Syntax: STREAM.ADD key N field1 value1 [ field2 value2 ... ]
- *
- * Returns the number of successfully added entries.
- */
- int stream_addn(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
- if (argc < 3 || argc % 2 == 0) {
- RedisModule_WrongArity(ctx);
- return REDISMODULE_OK;
- }
- long long n, i;
- if (RedisModule_StringToLongLong(argv[2], &n) == REDISMODULE_ERR) {
- RedisModule_ReplyWithError(ctx, "N must be a number");
- return REDISMODULE_OK;
- }
- RedisModuleKey *key = RedisModule_OpenKey(ctx, argv[1], REDISMODULE_WRITE);
- for (i = 0; i < n; i++) {
- if (RedisModule_StreamAdd(key, REDISMODULE_STREAM_ADD_AUTOID, NULL,
- &argv[3], (argc-3)/2) == REDISMODULE_ERR)
- break;
- }
- RedisModule_ReplyWithLongLong(ctx, i);
- RedisModule_CloseKey(key);
- return REDISMODULE_OK;
- }
- /* STREAM.DELETE key stream-id */
- int stream_delete(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
- if (argc != 3) return RedisModule_WrongArity(ctx);
- RedisModuleStreamID id;
- if (RedisModule_StringToStreamID(argv[2], &id) != REDISMODULE_OK) {
- return RedisModule_ReplyWithError(ctx, "Invalid stream ID");
- }
- RedisModuleKey *key = RedisModule_OpenKey(ctx, argv[1], REDISMODULE_WRITE);
- if (RedisModule_StreamDelete(key, &id) == REDISMODULE_OK) {
- RedisModule_ReplyWithSimpleString(ctx, "OK");
- } else {
- RedisModule_ReplyWithError(ctx, "ERR StreamDelete failed");
- }
- RedisModule_CloseKey(key);
- return REDISMODULE_OK;
- }
- /* STREAM.RANGE key start-id end-id
- *
- * Returns an array of stream items. Each item is an array on the form
- * [stream-id, [field1, value1, field2, value2, ...]].
- *
- * A funny side-effect used for testing RM_StreamIteratorDelete() is that if any
- * entry has a field named "selfdestruct", the stream entry is deleted. It is
- * however included in the results of this command.
- */
- int stream_range(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
- if (argc != 4) {
- RedisModule_WrongArity(ctx);
- return REDISMODULE_OK;
- }
- RedisModuleStreamID startid, endid;
- if (RedisModule_StringToStreamID(argv[2], &startid) != REDISMODULE_OK ||
- RedisModule_StringToStreamID(argv[3], &endid) != REDISMODULE_OK) {
- RedisModule_ReplyWithError(ctx, "Invalid stream ID");
- return REDISMODULE_OK;
- }
- /* If startid > endid, we swap and set the reverse flag. */
- int flags = 0;
- if (startid.ms > endid.ms ||
- (startid.ms == endid.ms && startid.seq > endid.seq)) {
- RedisModuleStreamID tmp = startid;
- startid = endid;
- endid = tmp;
- flags |= REDISMODULE_STREAM_ITERATOR_REVERSE;
- }
- /* Open key and start iterator. */
- int openflags = REDISMODULE_READ | REDISMODULE_WRITE;
- RedisModuleKey *key = RedisModule_OpenKey(ctx, argv[1], openflags);
- if (RedisModule_StreamIteratorStart(key, flags,
- &startid, &endid) != REDISMODULE_OK) {
- /* Key is not a stream, etc. */
- RedisModule_ReplyWithError(ctx, "ERR StreamIteratorStart failed");
- RedisModule_CloseKey(key);
- return REDISMODULE_OK;
- }
- /* Check error handling: Delete current entry when no current entry. */
- assert(RedisModule_StreamIteratorDelete(key) ==
- REDISMODULE_ERR);
- assert(errno == ENOENT);
- /* Check error handling: Fetch fields when no current entry. */
- assert(RedisModule_StreamIteratorNextField(key, NULL, NULL) ==
- REDISMODULE_ERR);
- assert(errno == ENOENT);
- /* Return array. */
- RedisModule_ReplyWithArray(ctx, REDISMODULE_POSTPONED_ARRAY_LEN);
- RedisModule_AutoMemory(ctx);
- RedisModuleStreamID id;
- long numfields;
- long len = 0;
- while (RedisModule_StreamIteratorNextID(key, &id,
- &numfields) == REDISMODULE_OK) {
- RedisModule_ReplyWithArray(ctx, 2);
- RedisModuleString *id_str = RedisModule_CreateStringFromStreamID(ctx, &id);
- RedisModule_ReplyWithString(ctx, id_str);
- RedisModule_ReplyWithArray(ctx, numfields * 2);
- int delete = 0;
- RedisModuleString *field, *value;
- for (long i = 0; i < numfields; i++) {
- assert(RedisModule_StreamIteratorNextField(key, &field, &value) ==
- REDISMODULE_OK);
- RedisModule_ReplyWithString(ctx, field);
- RedisModule_ReplyWithString(ctx, value);
- /* check if this is a "selfdestruct" field */
- size_t field_len;
- const char *field_str = RedisModule_StringPtrLen(field, &field_len);
- if (!strncmp(field_str, "selfdestruct", field_len)) delete = 1;
- }
- if (delete) {
- assert(RedisModule_StreamIteratorDelete(key) == REDISMODULE_OK);
- }
- /* check error handling: no more fields to fetch */
- assert(RedisModule_StreamIteratorNextField(key, &field, &value) ==
- REDISMODULE_ERR);
- assert(errno == ENOENT);
- len++;
- }
- RedisModule_ReplySetArrayLength(ctx, len);
- RedisModule_StreamIteratorStop(key);
- RedisModule_CloseKey(key);
- return REDISMODULE_OK;
- }
- /*
- * STREAM.TRIM key (MAXLEN (=|~) length | MINID (=|~) id)
- */
- int stream_trim(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
- if (argc != 5) {
- RedisModule_WrongArity(ctx);
- return REDISMODULE_OK;
- }
- /* Parse args */
- int trim_by_id = 0; /* 0 = maxlen, 1 = minid */
- long long maxlen;
- RedisModuleStreamID minid;
- size_t arg_len;
- const char *arg = RedisModule_StringPtrLen(argv[2], &arg_len);
- if (!strcasecmp(arg, "minid")) {
- trim_by_id = 1;
- if (RedisModule_StringToStreamID(argv[4], &minid) != REDISMODULE_OK) {
- RedisModule_ReplyWithError(ctx, "ERR Invalid stream ID");
- return REDISMODULE_OK;
- }
- } else if (!strcasecmp(arg, "maxlen")) {
- if (RedisModule_StringToLongLong(argv[4], &maxlen) == REDISMODULE_ERR) {
- RedisModule_ReplyWithError(ctx, "ERR Maxlen must be a number");
- return REDISMODULE_OK;
- }
- } else {
- RedisModule_ReplyWithError(ctx, "ERR Invalid arguments");
- return REDISMODULE_OK;
- }
- /* Approx or exact */
- int flags;
- arg = RedisModule_StringPtrLen(argv[3], &arg_len);
- if (arg_len == 1 && arg[0] == '~') {
- flags = REDISMODULE_STREAM_TRIM_APPROX;
- } else if (arg_len == 1 && arg[0] == '=') {
- flags = 0;
- } else {
- RedisModule_ReplyWithError(ctx, "ERR Invalid approx-or-exact mark");
- return REDISMODULE_OK;
- }
- /* Trim */
- RedisModuleKey *key = RedisModule_OpenKey(ctx, argv[1], REDISMODULE_WRITE);
- long long trimmed;
- if (trim_by_id) {
- trimmed = RedisModule_StreamTrimByID(key, flags, &minid);
- } else {
- trimmed = RedisModule_StreamTrimByLength(key, flags, maxlen);
- }
- /* Return result */
- if (trimmed < 0) {
- RedisModule_ReplyWithError(ctx, "ERR Trimming failed");
- } else {
- RedisModule_ReplyWithLongLong(ctx, trimmed);
- }
- RedisModule_CloseKey(key);
- return REDISMODULE_OK;
- }
- int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
- REDISMODULE_NOT_USED(argv);
- REDISMODULE_NOT_USED(argc);
- if (RedisModule_Init(ctx, "stream", 1, REDISMODULE_APIVER_1) == REDISMODULE_ERR)
- return REDISMODULE_ERR;
- if (RedisModule_CreateCommand(ctx, "stream.add", stream_add, "",
- 1, 1, 1) == REDISMODULE_ERR)
- return REDISMODULE_ERR;
- if (RedisModule_CreateCommand(ctx, "stream.addn", stream_addn, "",
- 1, 1, 1) == REDISMODULE_ERR)
- return REDISMODULE_ERR;
- if (RedisModule_CreateCommand(ctx, "stream.delete", stream_delete, "",
- 1, 1, 1) == REDISMODULE_ERR)
- return REDISMODULE_ERR;
- if (RedisModule_CreateCommand(ctx, "stream.range", stream_range, "",
- 1, 1, 1) == REDISMODULE_ERR)
- return REDISMODULE_ERR;
- if (RedisModule_CreateCommand(ctx, "stream.trim", stream_trim, "",
- 1, 1, 1) == REDISMODULE_ERR)
- return REDISMODULE_ERR;
- return REDISMODULE_OK;
- }
|