1
0

blockonkeys.c 18 KB


  1. #define REDISMODULE_EXPERIMENTAL_API
  2. #include "redismodule.h"
  3. #include <string.h>
  4. #include <strings.h>
  5. #include <assert.h>
  6. #include <unistd.h>
  7. #define LIST_SIZE 1024
  8. typedef struct {
  9. long long list[LIST_SIZE];
  10. long long length;
  11. } fsl_t; /* Fixed-size list */
  12. static RedisModuleType *fsltype = NULL;
  13. fsl_t *fsl_type_create() {
  14. fsl_t *o;
  15. o = RedisModule_Alloc(sizeof(*o));
  16. o->length = 0;
  17. return o;
  18. }
  19. void fsl_type_free(fsl_t *o) {
  20. RedisModule_Free(o);
  21. }
  22. /* ========================== "fsltype" type methods ======================= */
  23. void *fsl_rdb_load(RedisModuleIO *rdb, int encver) {
  24. if (encver != 0) {
  25. return NULL;
  26. }
  27. fsl_t *fsl = fsl_type_create();
  28. fsl->length = RedisModule_LoadUnsigned(rdb);
  29. for (long long i = 0; i < fsl->length; i++)
  30. fsl->list[i] = RedisModule_LoadSigned(rdb);
  31. return fsl;
  32. }
  33. void fsl_rdb_save(RedisModuleIO *rdb, void *value) {
  34. fsl_t *fsl = value;
  35. RedisModule_SaveUnsigned(rdb,fsl->length);
  36. for (long long i = 0; i < fsl->length; i++)
  37. RedisModule_SaveSigned(rdb, fsl->list[i]);
  38. }
  39. void fsl_aofrw(RedisModuleIO *aof, RedisModuleString *key, void *value) {
  40. fsl_t *fsl = value;
  41. for (long long i = 0; i < fsl->length; i++)
  42. RedisModule_EmitAOF(aof, "FSL.PUSH","sl", key, fsl->list[i]);
  43. }
  44. void fsl_free(void *value) {
  45. fsl_type_free(value);
  46. }
  47. /* ========================== helper methods ======================= */
  48. int get_fsl(RedisModuleCtx *ctx, RedisModuleString *keyname, int mode, int create, fsl_t **fsl, int reply_on_failure) {
  49. RedisModuleKey *key = RedisModule_OpenKey(ctx, keyname, mode);
  50. int type = RedisModule_KeyType(key);
  51. if (type != REDISMODULE_KEYTYPE_EMPTY && RedisModule_ModuleTypeGetType(key) != fsltype) {
  52. RedisModule_CloseKey(key);
  53. if (reply_on_failure)
  54. RedisModule_ReplyWithError(ctx, REDISMODULE_ERRORMSG_WRONGTYPE);
  55. RedisModuleCallReply *reply = RedisModule_Call(ctx, "INCR", "c", "fsl_wrong_type");
  56. RedisModule_FreeCallReply(reply);
  57. return 0;
  58. }
  59. /* Create an empty value object if the key is currently empty. */
  60. if (type == REDISMODULE_KEYTYPE_EMPTY) {
  61. if (!create) {
  62. /* Key is empty but we cannot create */
  63. RedisModule_CloseKey(key);
  64. *fsl = NULL;
  65. return 1;
  66. }
  67. *fsl = fsl_type_create();
  68. RedisModule_ModuleTypeSetValue(key, fsltype, *fsl);
  69. } else {
  70. *fsl = RedisModule_ModuleTypeGetValue(key);
  71. }
  72. RedisModule_CloseKey(key);
  73. return 1;
  74. }
  75. /* ========================== commands ======================= */
  76. /* FSL.PUSH <key> <int> - Push an integer to the fixed-size list (to the right).
  77. * It must be greater than the element in the head of the list. */
  78. int fsl_push(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
  79. if (argc != 3)
  80. return RedisModule_WrongArity(ctx);
  81. long long ele;
  82. if (RedisModule_StringToLongLong(argv[2],&ele) != REDISMODULE_OK)
  83. return RedisModule_ReplyWithError(ctx,"ERR invalid integer");
  84. fsl_t *fsl;
  85. if (!get_fsl(ctx, argv[1], REDISMODULE_WRITE, 1, &fsl, 1))
  86. return REDISMODULE_OK;
  87. if (fsl->length == LIST_SIZE)
  88. return RedisModule_ReplyWithError(ctx,"ERR list is full");
  89. if (fsl->length != 0 && fsl->list[fsl->length-1] >= ele)
  90. return RedisModule_ReplyWithError(ctx,"ERR new element has to be greater than the head element");
  91. fsl->list[fsl->length++] = ele;
  92. RedisModule_SignalKeyAsReady(ctx, argv[1]);
  93. return RedisModule_ReplyWithSimpleString(ctx, "OK");
  94. }
  95. int bpop_reply_callback(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
  96. REDISMODULE_NOT_USED(argv);
  97. REDISMODULE_NOT_USED(argc);
  98. RedisModuleString *keyname = RedisModule_GetBlockedClientReadyKey(ctx);
  99. fsl_t *fsl;
  100. if (!get_fsl(ctx, keyname, REDISMODULE_READ, 0, &fsl, 0) || !fsl)
  101. return REDISMODULE_ERR;
  102. RedisModule_ReplyWithLongLong(ctx, fsl->list[--fsl->length]);
  103. return REDISMODULE_OK;
  104. }
  105. int bpop_timeout_callback(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
  106. REDISMODULE_NOT_USED(argv);
  107. REDISMODULE_NOT_USED(argc);
  108. return RedisModule_ReplyWithSimpleString(ctx, "Request timedout");
  109. }
  110. /* FSL.BPOP <key> <timeout> - Block clients until list has two or more elements.
  111. * When that happens, unblock client and pop the last two elements (from the right). */
  112. int fsl_bpop(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
  113. if (argc != 3)
  114. return RedisModule_WrongArity(ctx);
  115. long long timeout;
  116. if (RedisModule_StringToLongLong(argv[2],&timeout) != REDISMODULE_OK || timeout < 0)
  117. return RedisModule_ReplyWithError(ctx,"ERR invalid timeout");
  118. fsl_t *fsl;
  119. if (!get_fsl(ctx, argv[1], REDISMODULE_READ, 0, &fsl, 1))
  120. return REDISMODULE_OK;
  121. if (!fsl) {
  122. RedisModule_BlockClientOnKeys(ctx, bpop_reply_callback, bpop_timeout_callback,
  123. NULL, timeout, &argv[1], 1, NULL);
  124. } else {
  125. RedisModule_ReplyWithLongLong(ctx, fsl->list[--fsl->length]);
  126. }
  127. return REDISMODULE_OK;
  128. }
  129. int bpopgt_reply_callback(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
  130. REDISMODULE_NOT_USED(argv);
  131. REDISMODULE_NOT_USED(argc);
  132. RedisModuleString *keyname = RedisModule_GetBlockedClientReadyKey(ctx);
  133. long long *pgt = RedisModule_GetBlockedClientPrivateData(ctx);
  134. fsl_t *fsl;
  135. if (!get_fsl(ctx, keyname, REDISMODULE_READ, 0, &fsl, 0) || !fsl)
  136. return REDISMODULE_ERR;
  137. if (fsl->list[fsl->length-1] <= *pgt)
  138. return REDISMODULE_ERR;
  139. RedisModule_ReplyWithLongLong(ctx, fsl->list[--fsl->length]);
  140. return REDISMODULE_OK;
  141. }
  142. int bpopgt_timeout_callback(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
  143. REDISMODULE_NOT_USED(argv);
  144. REDISMODULE_NOT_USED(argc);
  145. return RedisModule_ReplyWithSimpleString(ctx, "Request timedout");
  146. }
  147. void bpopgt_free_privdata(RedisModuleCtx *ctx, void *privdata) {
  148. REDISMODULE_NOT_USED(ctx);
  149. RedisModule_Free(privdata);
  150. }
  151. /* FSL.BPOPGT <key> <gt> <timeout> - Block clients until list has an element greater than <gt>.
  152. * When that happens, unblock client and pop the last element (from the right). */
  153. int fsl_bpopgt(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
  154. if (argc != 4)
  155. return RedisModule_WrongArity(ctx);
  156. long long gt;
  157. if (RedisModule_StringToLongLong(argv[2],&gt) != REDISMODULE_OK)
  158. return RedisModule_ReplyWithError(ctx,"ERR invalid integer");
  159. long long timeout;
  160. if (RedisModule_StringToLongLong(argv[3],&timeout) != REDISMODULE_OK || timeout < 0)
  161. return RedisModule_ReplyWithError(ctx,"ERR invalid timeout");
  162. fsl_t *fsl;
  163. if (!get_fsl(ctx, argv[1], REDISMODULE_READ, 0, &fsl, 1))
  164. return REDISMODULE_OK;
  165. if (!fsl || fsl->list[fsl->length-1] <= gt) {
  166. /* We use malloc so the tests in blockedonkeys.tcl can check for memory leaks */
  167. long long *pgt = RedisModule_Alloc(sizeof(long long));
  168. *pgt = gt;
  169. RedisModule_BlockClientOnKeys(ctx, bpopgt_reply_callback, bpopgt_timeout_callback,
  170. bpopgt_free_privdata, timeout, &argv[1], 1, pgt);
  171. } else {
  172. RedisModule_ReplyWithLongLong(ctx, fsl->list[--fsl->length]);
  173. }
  174. return REDISMODULE_OK;
  175. }
  176. int bpoppush_reply_callback(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
  177. REDISMODULE_NOT_USED(argv);
  178. REDISMODULE_NOT_USED(argc);
  179. RedisModuleString *src_keyname = RedisModule_GetBlockedClientReadyKey(ctx);
  180. RedisModuleString *dst_keyname = RedisModule_GetBlockedClientPrivateData(ctx);
  181. fsl_t *src;
  182. if (!get_fsl(ctx, src_keyname, REDISMODULE_READ, 0, &src, 0) || !src)
  183. return REDISMODULE_ERR;
  184. fsl_t *dst;
  185. if (!get_fsl(ctx, dst_keyname, REDISMODULE_WRITE, 1, &dst, 0) || !dst)
  186. return REDISMODULE_ERR;
  187. long long ele = src->list[--src->length];
  188. dst->list[dst->length++] = ele;
  189. RedisModule_SignalKeyAsReady(ctx, dst_keyname);
  190. return RedisModule_ReplyWithLongLong(ctx, ele);
  191. }
  192. int bpoppush_timeout_callback(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
  193. REDISMODULE_NOT_USED(argv);
  194. REDISMODULE_NOT_USED(argc);
  195. return RedisModule_ReplyWithSimpleString(ctx, "Request timedout");
  196. }
  197. void bpoppush_free_privdata(RedisModuleCtx *ctx, void *privdata) {
  198. RedisModule_FreeString(ctx, privdata);
  199. }
  200. /* FSL.BPOPPUSH <src> <dst> <timeout> - Block clients until <src> has an element.
  201. * When that happens, unblock client, pop the last element from <src> and push it to <dst>
  202. * (from the right). */
  203. int fsl_bpoppush(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
  204. if (argc != 4)
  205. return RedisModule_WrongArity(ctx);
  206. long long timeout;
  207. if (RedisModule_StringToLongLong(argv[3],&timeout) != REDISMODULE_OK || timeout < 0)
  208. return RedisModule_ReplyWithError(ctx,"ERR invalid timeout");
  209. fsl_t *src;
  210. if (!get_fsl(ctx, argv[1], REDISMODULE_READ, 0, &src, 1))
  211. return REDISMODULE_OK;
  212. if (!src) {
  213. /* Retain string for reply callback */
  214. RedisModule_RetainString(ctx, argv[2]);
  215. /* Key is empty, we must block */
  216. RedisModule_BlockClientOnKeys(ctx, bpoppush_reply_callback, bpoppush_timeout_callback,
  217. bpoppush_free_privdata, timeout, &argv[1], 1, argv[2]);
  218. } else {
  219. fsl_t *dst;
  220. if (!get_fsl(ctx, argv[2], REDISMODULE_WRITE, 1, &dst, 1))
  221. return REDISMODULE_OK;
  222. long long ele = src->list[--src->length];
  223. dst->list[dst->length++] = ele;
  224. RedisModule_SignalKeyAsReady(ctx, argv[2]);
  225. RedisModule_ReplyWithLongLong(ctx, ele);
  226. }
  227. return REDISMODULE_OK;
  228. }
  229. /* FSL.GETALL <key> - Reply with an array containing all elements. */
  230. int fsl_getall(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
  231. if (argc != 2)
  232. return RedisModule_WrongArity(ctx);
  233. fsl_t *fsl;
  234. if (!get_fsl(ctx, argv[1], REDISMODULE_READ, 0, &fsl, 1))
  235. return REDISMODULE_OK;
  236. if (!fsl)
  237. return RedisModule_ReplyWithArray(ctx, 0);
  238. RedisModule_ReplyWithArray(ctx, fsl->length);
  239. for (int i = 0; i < fsl->length; i++)
  240. RedisModule_ReplyWithLongLong(ctx, fsl->list[i]);
  241. return REDISMODULE_OK;
  242. }
  243. /* Callback for blockonkeys_popall */
  244. int blockonkeys_popall_reply_callback(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
  245. REDISMODULE_NOT_USED(argc);
  246. RedisModuleKey *key = RedisModule_OpenKey(ctx, argv[1], REDISMODULE_WRITE);
  247. if (RedisModule_KeyType(key) == REDISMODULE_KEYTYPE_LIST) {
  248. RedisModuleString *elem;
  249. long len = 0;
  250. RedisModule_ReplyWithArray(ctx, REDISMODULE_POSTPONED_ARRAY_LEN);
  251. while ((elem = RedisModule_ListPop(key, REDISMODULE_LIST_HEAD)) != NULL) {
  252. len++;
  253. RedisModule_ReplyWithString(ctx, elem);
  254. RedisModule_FreeString(ctx, elem);
  255. }
  256. RedisModule_ReplySetArrayLength(ctx, len);
  257. } else {
  258. RedisModule_ReplyWithError(ctx, "ERR Not a list");
  259. }
  260. RedisModule_CloseKey(key);
  261. return REDISMODULE_OK;
  262. }
  263. int blockonkeys_popall_timeout_callback(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
  264. REDISMODULE_NOT_USED(argv);
  265. REDISMODULE_NOT_USED(argc);
  266. return RedisModule_ReplyWithError(ctx, "ERR Timeout");
  267. }
  268. /* BLOCKONKEYS.POPALL key
  269. *
  270. * Blocks on an empty key for up to 3 seconds. When unblocked by a list
  271. * operation like LPUSH, all the elements are popped and returned. Fails with an
  272. * error on timeout. */
  273. int blockonkeys_popall(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
  274. if (argc != 2)
  275. return RedisModule_WrongArity(ctx);
  276. RedisModuleKey *key = RedisModule_OpenKey(ctx, argv[1], REDISMODULE_READ);
  277. if (RedisModule_KeyType(key) == REDISMODULE_KEYTYPE_EMPTY) {
  278. RedisModule_BlockClientOnKeys(ctx, blockonkeys_popall_reply_callback,
  279. blockonkeys_popall_timeout_callback,
  280. NULL, 3000, &argv[1], 1, NULL);
  281. } else {
  282. RedisModule_ReplyWithError(ctx, "ERR Key not empty");
  283. }
  284. RedisModule_CloseKey(key);
  285. return REDISMODULE_OK;
  286. }
  287. /* BLOCKONKEYS.LPUSH key val [val ..]
  288. * BLOCKONKEYS.LPUSH_UNBLOCK key val [val ..]
  289. *
  290. * A module equivalent of LPUSH. If the name LPUSH_UNBLOCK is used,
  291. * RM_SignalKeyAsReady() is also called. */
  292. int blockonkeys_lpush(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
  293. if (argc < 3)
  294. return RedisModule_WrongArity(ctx);
  295. RedisModuleKey *key = RedisModule_OpenKey(ctx, argv[1], REDISMODULE_WRITE);
  296. if (RedisModule_KeyType(key) != REDISMODULE_KEYTYPE_EMPTY &&
  297. RedisModule_KeyType(key) != REDISMODULE_KEYTYPE_LIST) {
  298. RedisModule_ReplyWithError(ctx, REDISMODULE_ERRORMSG_WRONGTYPE);
  299. } else {
  300. for (int i = 2; i < argc; i++) {
  301. if (RedisModule_ListPush(key, REDISMODULE_LIST_HEAD,
  302. argv[i]) != REDISMODULE_OK) {
  303. RedisModule_CloseKey(key);
  304. return RedisModule_ReplyWithError(ctx, "ERR Push failed");
  305. }
  306. }
  307. }
  308. RedisModule_CloseKey(key);
  309. /* signal key as ready if the command is lpush_unblock */
  310. size_t len;
  311. const char *str = RedisModule_StringPtrLen(argv[0], &len);
  312. if (!strncasecmp(str, "blockonkeys.lpush_unblock", len)) {
  313. RedisModule_SignalKeyAsReady(ctx, argv[1]);
  314. }
  315. return RedisModule_ReplyWithSimpleString(ctx, "OK");
  316. }
  317. /* Callback for the BLOCKONKEYS.BLPOPN command */
  318. int blockonkeys_blpopn_reply_callback(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
  319. REDISMODULE_NOT_USED(argc);
  320. long long n;
  321. RedisModule_StringToLongLong(argv[2], &n);
  322. RedisModuleKey *key = RedisModule_OpenKey(ctx, argv[1], REDISMODULE_WRITE);
  323. int result;
  324. if (RedisModule_KeyType(key) == REDISMODULE_KEYTYPE_LIST &&
  325. RedisModule_ValueLength(key) >= (size_t)n) {
  326. RedisModule_ReplyWithArray(ctx, n);
  327. for (long i = 0; i < n; i++) {
  328. RedisModuleString *elem = RedisModule_ListPop(key, REDISMODULE_LIST_HEAD);
  329. RedisModule_ReplyWithString(ctx, elem);
  330. RedisModule_FreeString(ctx, elem);
  331. }
  332. result = REDISMODULE_OK;
  333. } else if (RedisModule_KeyType(key) == REDISMODULE_KEYTYPE_LIST ||
  334. RedisModule_KeyType(key) == REDISMODULE_KEYTYPE_EMPTY) {
  335. /* continue blocking */
  336. result = REDISMODULE_ERR;
  337. } else {
  338. result = RedisModule_ReplyWithError(ctx, REDISMODULE_ERRORMSG_WRONGTYPE);
  339. }
  340. RedisModule_CloseKey(key);
  341. return result;
  342. }
  343. int blockonkeys_blpopn_timeout_callback(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
  344. REDISMODULE_NOT_USED(argv);
  345. REDISMODULE_NOT_USED(argc);
  346. return RedisModule_ReplyWithError(ctx, "ERR Timeout");
  347. }
  348. /* BLOCKONKEYS.BLPOPN key N
  349. *
  350. * Blocks until key has N elements and then pops them or fails after 3 seconds.
  351. */
  352. int blockonkeys_blpopn(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
  353. if (argc < 3) return RedisModule_WrongArity(ctx);
  354. long long n;
  355. if (RedisModule_StringToLongLong(argv[2], &n) != REDISMODULE_OK) {
  356. return RedisModule_ReplyWithError(ctx, "ERR Invalid N");
  357. }
  358. RedisModuleKey *key = RedisModule_OpenKey(ctx, argv[1], REDISMODULE_WRITE);
  359. int keytype = RedisModule_KeyType(key);
  360. if (keytype != REDISMODULE_KEYTYPE_EMPTY &&
  361. keytype != REDISMODULE_KEYTYPE_LIST) {
  362. RedisModule_ReplyWithError(ctx, REDISMODULE_ERRORMSG_WRONGTYPE);
  363. } else if (keytype == REDISMODULE_KEYTYPE_LIST &&
  364. RedisModule_ValueLength(key) >= (size_t)n) {
  365. RedisModule_ReplyWithArray(ctx, n);
  366. for (long i = 0; i < n; i++) {
  367. RedisModuleString *elem = RedisModule_ListPop(key, REDISMODULE_LIST_HEAD);
  368. RedisModule_ReplyWithString(ctx, elem);
  369. RedisModule_FreeString(ctx, elem);
  370. }
  371. } else {
  372. RedisModule_BlockClientOnKeys(ctx, blockonkeys_blpopn_reply_callback,
  373. blockonkeys_blpopn_timeout_callback,
  374. NULL, 3000, &argv[1], 1, NULL);
  375. }
  376. RedisModule_CloseKey(key);
  377. return REDISMODULE_OK;
  378. }
  379. int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
  380. REDISMODULE_NOT_USED(argv);
  381. REDISMODULE_NOT_USED(argc);
  382. if (RedisModule_Init(ctx, "blockonkeys", 1, REDISMODULE_APIVER_1)== REDISMODULE_ERR)
  383. return REDISMODULE_ERR;
  384. RedisModuleTypeMethods tm = {
  385. .version = REDISMODULE_TYPE_METHOD_VERSION,
  386. .rdb_load = fsl_rdb_load,
  387. .rdb_save = fsl_rdb_save,
  388. .aof_rewrite = fsl_aofrw,
  389. .mem_usage = NULL,
  390. .free = fsl_free,
  391. .digest = NULL
  392. };
  393. fsltype = RedisModule_CreateDataType(ctx, "fsltype_t", 0, &tm);
  394. if (fsltype == NULL)
  395. return REDISMODULE_ERR;
  396. if (RedisModule_CreateCommand(ctx,"fsl.push",fsl_push,"",0,0,0) == REDISMODULE_ERR)
  397. return REDISMODULE_ERR;
  398. if (RedisModule_CreateCommand(ctx,"fsl.bpop",fsl_bpop,"",0,0,0) == REDISMODULE_ERR)
  399. return REDISMODULE_ERR;
  400. if (RedisModule_CreateCommand(ctx,"fsl.bpopgt",fsl_bpopgt,"",0,0,0) == REDISMODULE_ERR)
  401. return REDISMODULE_ERR;
  402. if (RedisModule_CreateCommand(ctx,"fsl.bpoppush",fsl_bpoppush,"",0,0,0) == REDISMODULE_ERR)
  403. return REDISMODULE_ERR;
  404. if (RedisModule_CreateCommand(ctx,"fsl.getall",fsl_getall,"",0,0,0) == REDISMODULE_ERR)
  405. return REDISMODULE_ERR;
  406. if (RedisModule_CreateCommand(ctx, "blockonkeys.popall", blockonkeys_popall,
  407. "", 1, 1, 1) == REDISMODULE_ERR)
  408. return REDISMODULE_ERR;
  409. if (RedisModule_CreateCommand(ctx, "blockonkeys.lpush", blockonkeys_lpush,
  410. "", 1, 1, 1) == REDISMODULE_ERR)
  411. return REDISMODULE_ERR;
  412. if (RedisModule_CreateCommand(ctx, "blockonkeys.lpush_unblock", blockonkeys_lpush,
  413. "", 1, 1, 1) == REDISMODULE_ERR)
  414. return REDISMODULE_ERR;
  415. if (RedisModule_CreateCommand(ctx, "blockonkeys.blpopn", blockonkeys_blpopn,
  416. "", 1, 1, 1) == REDISMODULE_ERR)
  417. return REDISMODULE_ERR;
  418. return REDISMODULE_OK;
  419. }