blockonbackground.c 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291
  1. #define REDISMODULE_EXPERIMENTAL_API
  2. #define _XOPEN_SOURCE 700
  3. #include "redismodule.h"
  4. #include <stdio.h>
  5. #include <stdlib.h>
  6. #include <pthread.h>
  7. #include <time.h>
  8. #define UNUSED(x) (void)(x)
  9. /* Reply callback for blocking command BLOCK.DEBUG */
  10. int HelloBlock_Reply(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
  11. UNUSED(argv);
  12. UNUSED(argc);
  13. int *myint = RedisModule_GetBlockedClientPrivateData(ctx);
  14. return RedisModule_ReplyWithLongLong(ctx,*myint);
  15. }
  16. /* Timeout callback for blocking command BLOCK.DEBUG */
  17. int HelloBlock_Timeout(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
  18. UNUSED(argv);
  19. UNUSED(argc);
  20. RedisModuleBlockedClient *bc = RedisModule_GetBlockedClientHandle(ctx);
  21. RedisModule_BlockedClientMeasureTimeEnd(bc);
  22. return RedisModule_ReplyWithSimpleString(ctx,"Request timedout");
  23. }
  24. /* Private data freeing callback for BLOCK.DEBUG command. */
  25. void HelloBlock_FreeData(RedisModuleCtx *ctx, void *privdata) {
  26. UNUSED(ctx);
  27. RedisModule_Free(privdata);
  28. }
  29. /* The thread entry point that actually executes the blocking part
  30. * of the command BLOCK.DEBUG. */
  31. void *BlockDebug_ThreadMain(void *arg) {
  32. void **targ = arg;
  33. RedisModuleBlockedClient *bc = targ[0];
  34. long long delay = (unsigned long)targ[1];
  35. long long enable_time_track = (unsigned long)targ[2];
  36. if (enable_time_track)
  37. RedisModule_BlockedClientMeasureTimeStart(bc);
  38. RedisModule_Free(targ);
  39. struct timespec ts;
  40. ts.tv_sec = delay / 1000;
  41. ts.tv_nsec = (delay % 1000) * 1000000;
  42. nanosleep(&ts, NULL);
  43. int *r = RedisModule_Alloc(sizeof(int));
  44. *r = rand();
  45. if (enable_time_track)
  46. RedisModule_BlockedClientMeasureTimeEnd(bc);
  47. RedisModule_UnblockClient(bc,r);
  48. return NULL;
  49. }
  50. /* The thread entry point that actually executes the blocking part
  51. * of the command BLOCK.DOUBLE_DEBUG. */
  52. void *DoubleBlock_ThreadMain(void *arg) {
  53. void **targ = arg;
  54. RedisModuleBlockedClient *bc = targ[0];
  55. long long delay = (unsigned long)targ[1];
  56. RedisModule_BlockedClientMeasureTimeStart(bc);
  57. RedisModule_Free(targ);
  58. struct timespec ts;
  59. ts.tv_sec = delay / 1000;
  60. ts.tv_nsec = (delay % 1000) * 1000000;
  61. nanosleep(&ts, NULL);
  62. int *r = RedisModule_Alloc(sizeof(int));
  63. *r = rand();
  64. RedisModule_BlockedClientMeasureTimeEnd(bc);
  65. /* call again RedisModule_BlockedClientMeasureTimeStart() and
  66. * RedisModule_BlockedClientMeasureTimeEnd and ensure that the
  67. * total execution time is 2x the delay. */
  68. RedisModule_BlockedClientMeasureTimeStart(bc);
  69. nanosleep(&ts, NULL);
  70. RedisModule_BlockedClientMeasureTimeEnd(bc);
  71. RedisModule_UnblockClient(bc,r);
  72. return NULL;
  73. }
  74. void HelloBlock_Disconnected(RedisModuleCtx *ctx, RedisModuleBlockedClient *bc) {
  75. RedisModule_Log(ctx,"warning","Blocked client %p disconnected!",
  76. (void*)bc);
  77. }
  78. /* BLOCK.DEBUG <delay_ms> <timeout_ms> -- Block for <count> milliseconds, then reply with
  79. * a random number. Timeout is the command timeout, so that you can test
  80. * what happens when the delay is greater than the timeout. */
  81. int HelloBlock_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
  82. if (argc != 3) return RedisModule_WrongArity(ctx);
  83. long long delay;
  84. long long timeout;
  85. if (RedisModule_StringToLongLong(argv[1],&delay) != REDISMODULE_OK) {
  86. return RedisModule_ReplyWithError(ctx,"ERR invalid count");
  87. }
  88. if (RedisModule_StringToLongLong(argv[2],&timeout) != REDISMODULE_OK) {
  89. return RedisModule_ReplyWithError(ctx,"ERR invalid count");
  90. }
  91. pthread_t tid;
  92. RedisModuleBlockedClient *bc = RedisModule_BlockClient(ctx,HelloBlock_Reply,HelloBlock_Timeout,HelloBlock_FreeData,timeout);
  93. /* Here we set a disconnection handler, however since this module will
  94. * block in sleep() in a thread, there is not much we can do in the
  95. * callback, so this is just to show you the API. */
  96. RedisModule_SetDisconnectCallback(bc,HelloBlock_Disconnected);
  97. /* Now that we setup a blocking client, we need to pass the control
  98. * to the thread. However we need to pass arguments to the thread:
  99. * the delay and a reference to the blocked client handle. */
  100. void **targ = RedisModule_Alloc(sizeof(void*)*3);
  101. targ[0] = bc;
  102. targ[1] = (void*)(unsigned long) delay;
  103. // pass 1 as flag to enable time tracking
  104. targ[2] = (void*)(unsigned long) 1;
  105. if (pthread_create(&tid,NULL,BlockDebug_ThreadMain,targ) != 0) {
  106. RedisModule_AbortBlock(bc);
  107. return RedisModule_ReplyWithError(ctx,"-ERR Can't start thread");
  108. }
  109. return REDISMODULE_OK;
  110. }
  111. /* BLOCK.DEBUG_NOTRACKING <delay_ms> <timeout_ms> -- Block for <count> milliseconds, then reply with
  112. * a random number. Timeout is the command timeout, so that you can test
  113. * what happens when the delay is greater than the timeout.
  114. * this command does not track background time so the background time should no appear in stats*/
  115. int HelloBlockNoTracking_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
  116. if (argc != 3) return RedisModule_WrongArity(ctx);
  117. long long delay;
  118. long long timeout;
  119. if (RedisModule_StringToLongLong(argv[1],&delay) != REDISMODULE_OK) {
  120. return RedisModule_ReplyWithError(ctx,"ERR invalid count");
  121. }
  122. if (RedisModule_StringToLongLong(argv[2],&timeout) != REDISMODULE_OK) {
  123. return RedisModule_ReplyWithError(ctx,"ERR invalid count");
  124. }
  125. pthread_t tid;
  126. RedisModuleBlockedClient *bc = RedisModule_BlockClient(ctx,HelloBlock_Reply,HelloBlock_Timeout,HelloBlock_FreeData,timeout);
  127. /* Here we set a disconnection handler, however since this module will
  128. * block in sleep() in a thread, there is not much we can do in the
  129. * callback, so this is just to show you the API. */
  130. RedisModule_SetDisconnectCallback(bc,HelloBlock_Disconnected);
  131. /* Now that we setup a blocking client, we need to pass the control
  132. * to the thread. However we need to pass arguments to the thread:
  133. * the delay and a reference to the blocked client handle. */
  134. void **targ = RedisModule_Alloc(sizeof(void*)*3);
  135. targ[0] = bc;
  136. targ[1] = (void*)(unsigned long) delay;
  137. // pass 0 as flag to enable time tracking
  138. targ[2] = (void*)(unsigned long) 0;
  139. if (pthread_create(&tid,NULL,BlockDebug_ThreadMain,targ) != 0) {
  140. RedisModule_AbortBlock(bc);
  141. return RedisModule_ReplyWithError(ctx,"-ERR Can't start thread");
  142. }
  143. return REDISMODULE_OK;
  144. }
  145. /* BLOCK.DOUBLE_DEBUG <delay_ms> -- Block for 2 x <count> milliseconds,
  146. * then reply with a random number.
  147. * This command is used to test multiple calls to RedisModule_BlockedClientMeasureTimeStart()
  148. * and RedisModule_BlockedClientMeasureTimeEnd() within the same execution. */
  149. int HelloDoubleBlock_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
  150. if (argc != 2) return RedisModule_WrongArity(ctx);
  151. long long delay;
  152. if (RedisModule_StringToLongLong(argv[1],&delay) != REDISMODULE_OK) {
  153. return RedisModule_ReplyWithError(ctx,"ERR invalid count");
  154. }
  155. pthread_t tid;
  156. RedisModuleBlockedClient *bc = RedisModule_BlockClient(ctx,HelloBlock_Reply,HelloBlock_Timeout,HelloBlock_FreeData,0);
  157. /* Now that we setup a blocking client, we need to pass the control
  158. * to the thread. However we need to pass arguments to the thread:
  159. * the delay and a reference to the blocked client handle. */
  160. void **targ = RedisModule_Alloc(sizeof(void*)*2);
  161. targ[0] = bc;
  162. targ[1] = (void*)(unsigned long) delay;
  163. if (pthread_create(&tid,NULL,DoubleBlock_ThreadMain,targ) != 0) {
  164. RedisModule_AbortBlock(bc);
  165. return RedisModule_ReplyWithError(ctx,"-ERR Can't start thread");
  166. }
  167. return REDISMODULE_OK;
  168. }
  169. RedisModuleBlockedClient *blocked_client = NULL;
  170. /* BLOCK.BLOCK [TIMEOUT] -- Blocks the current client until released
  171. * or TIMEOUT seconds. If TIMEOUT is zero, no timeout function is
  172. * registered.
  173. */
  174. int Block_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
  175. if (RedisModule_IsBlockedReplyRequest(ctx)) {
  176. RedisModuleString *r = RedisModule_GetBlockedClientPrivateData(ctx);
  177. return RedisModule_ReplyWithString(ctx, r);
  178. } else if (RedisModule_IsBlockedTimeoutRequest(ctx)) {
  179. RedisModule_UnblockClient(blocked_client, NULL); /* Must be called to avoid leaks. */
  180. blocked_client = NULL;
  181. return RedisModule_ReplyWithSimpleString(ctx, "Timed out");
  182. }
  183. if (argc != 2) return RedisModule_WrongArity(ctx);
  184. long long timeout;
  185. if (RedisModule_StringToLongLong(argv[1], &timeout) != REDISMODULE_OK) {
  186. return RedisModule_ReplyWithError(ctx, "ERR invalid timeout");
  187. }
  188. if (blocked_client) {
  189. return RedisModule_ReplyWithError(ctx, "ERR another client already blocked");
  190. }
  191. /* Block client. We use this function as both a reply and optional timeout
  192. * callback and differentiate the different code flows above.
  193. */
  194. blocked_client = RedisModule_BlockClient(ctx, Block_RedisCommand,
  195. timeout > 0 ? Block_RedisCommand : NULL, NULL, timeout);
  196. return REDISMODULE_OK;
  197. }
  198. /* BLOCK.IS_BLOCKED -- Returns 1 if we have a blocked client, or 0 otherwise.
  199. */
  200. int IsBlocked_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
  201. UNUSED(argv);
  202. UNUSED(argc);
  203. RedisModule_ReplyWithLongLong(ctx, blocked_client ? 1 : 0);
  204. return REDISMODULE_OK;
  205. }
  206. /* BLOCK.RELEASE [reply] -- Releases the blocked client and produce the specified reply.
  207. */
  208. int Release_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
  209. if (argc != 2) return RedisModule_WrongArity(ctx);
  210. if (!blocked_client) {
  211. return RedisModule_ReplyWithError(ctx, "ERR No blocked client");
  212. }
  213. RedisModuleString *replystr = argv[1];
  214. RedisModule_RetainString(ctx, replystr);
  215. RedisModule_UnblockClient(blocked_client, replystr);
  216. blocked_client = NULL;
  217. RedisModule_ReplyWithSimpleString(ctx, "OK");
  218. return REDISMODULE_OK;
  219. }
  220. int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
  221. UNUSED(argv);
  222. UNUSED(argc);
  223. if (RedisModule_Init(ctx,"block",1,REDISMODULE_APIVER_1)
  224. == REDISMODULE_ERR) return REDISMODULE_ERR;
  225. if (RedisModule_CreateCommand(ctx,"block.debug",
  226. HelloBlock_RedisCommand,"",0,0,0) == REDISMODULE_ERR)
  227. return REDISMODULE_ERR;
  228. if (RedisModule_CreateCommand(ctx,"block.double_debug",
  229. HelloDoubleBlock_RedisCommand,"",0,0,0) == REDISMODULE_ERR)
  230. return REDISMODULE_ERR;
  231. if (RedisModule_CreateCommand(ctx,"block.debug_no_track",
  232. HelloBlockNoTracking_RedisCommand,"",0,0,0) == REDISMODULE_ERR)
  233. return REDISMODULE_ERR;
  234. if (RedisModule_CreateCommand(ctx, "block.block",
  235. Block_RedisCommand, "", 0, 0, 0) == REDISMODULE_ERR)
  236. return REDISMODULE_ERR;
  237. if (RedisModule_CreateCommand(ctx,"block.is_blocked",
  238. IsBlocked_RedisCommand,"",0,0,0) == REDISMODULE_ERR)
  239. return REDISMODULE_ERR;
  240. if (RedisModule_CreateCommand(ctx,"block.release",
  241. Release_RedisCommand,"",0,0,0) == REDISMODULE_ERR)
  242. return REDISMODULE_ERR;
  243. return REDISMODULE_OK;
  244. }