Press n or j to go to the next uncovered block, b, p or k for the previous block.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 | 1x 1x 1x 1x 1x 1x 8x 8x 8x 8x 8x 2x 2x 2x 2x 2x 2x 2x 2x 2x 2x 2x 2x 2x 2x 2x | import { Logger } from "../../../logger"; import { IRequest } from "../../../server/request"; import { DataType } from "../../data/data-type"; import { Database } from "../../data/database"; import { DatabaseValue } from "../../data/database-value"; import { RedisToken } from "../../protocol/redis-token"; import { IRespCommand } from "../resp-command"; /** * ### Available since 1.2.0. * ### RPOPLPUSH source destination * * Atomically returns and removes the last element (tail) of the list stored at source, and * pushes the element at the first element (head) of the list stored at destination. * * For example: consider source holding the list a,b,c, and destination holding the list x,y,z. * Executing RPOPLPUSH results in source holding a,b and destination holding c,x,y,z. * * If source does not exist, the value nil is returned and no operation is performed. If source * and destination are the same, the operation is equivalent to removing the last element from * the list and pushing it as first element of the list, so it can be considered as a list * rotation command. * * ### Return value * Bulk string reply: the element being popped and pushed. * * ### Examples * ``` * redis> RPUSH mylist "one" * (integer) 1 * redis> RPUSH mylist "two" * (integer) 2 * redis> RPUSH mylist "three" * (integer) 3 * redis> RPOPLPUSH mylist myotherlist * "three" * redis> LRANGE mylist 0 -1 * 1) "one" * 2) "two" * redis> LRANGE myotherlist 0 -1 * 1) "three" * redis> * ``` * ### Pattern: Reliable queue * Redis is often used as a messaging server to implement processing of background jobs or other * kinds of messaging tasks. A simple form of queue is often obtained pushing values into a list * in the producer side, and waiting for this values in the consumer side using RPOP (using * polling), or BRPOP if the client is better served by a blocking operation. * * However in this context the obtained queue is not reliable as messages can be lost, for example * in the case there is a network problem or if the consumer crashes just after the message is * received but it is still to process. * * RPOPLPUSH (or {@link resp/command/list/brpoplpush-command.BRPoplPushCommand | BRPOPLPUSH} for the blocking variant) offers a way to avoid * this problem: the consumer fetches the message and at the same time pushes it into a processing * list. It will use the LREM command in order to remove the message from the processing list once * the message has been processed. * * An additional client may monitor the processing list for items that remain there for too much * time, and will push those timed out items into the queue again if needed. * ### Pattern: Circular list * Using RPOPLPUSH with the same source and destination key, a client can visit all the elements * of an N-elements list, one after the other, in O(N) without transferring the full list from * the server to the client using a single LRANGE operation. * * The above pattern works even if the following two conditions: * - There are multiple clients rotating the list: they'll fetch different elements, until all * the elements of the list are visited, and the process restarts. * - Even if other clients are actively pushing new items at the end of the list. * * The above makes it very simple to implement a system where a set of items must be processed * by N workers continuously as fast as possible. An example is a monitoring system that must * check that a set of web sites are reachable, with the smallest delay possible, using a number * of parallel workers. * * Note that this implementation of workers is trivially scalable and reliable, because even if * a message is lost the item is still in the queue and will be processed at the next iteration. */ export class RPoplPushCommand extends IRespCommand { public DbDataType = DataType.LIST public maxParams = 2 public minParams = 2 public name = "rpoplpush" protected logger: Logger = new Logger(module.id); public execSync(request: IRequest, db: Database): RedisToken | Promise<RedisToken> { this.logger.debug( `${request.getCommand()}.execute(%s)`, ...request.getParams() ); return this.process( request, db ); } protected process(request: IRequest, db: Database): RedisToken { const src: string = request.getParam(0), dst: string = request.getParam(1); this.logger.debug(`process(src: ${src}, dst: ${dst})`); const dbSrcList: DatabaseValue = db.get(src); Iif (!dbSrcList) { this.logger.debug("src list is empty"); return RedisToken.nullString(); } const dbDstList: DatabaseValue = db.getOrDefault( dst, new DatabaseValue( DataType.LIST, [] ) ); this.logger.debug( "src list is %s", ...dbDstList.getList() ); const member: any = dbSrcList.getList().pop(); this.logger.debug(`Member is ${member}`); dbDstList.getList().unshift(member); db.put( dst, dbDstList ); // Fire events on dst? Eif (dst !== src) { Iif (dbSrcList.getList().length > 0) { db.put( src, dbSrcList ); // Fire events on src? } else { db.remove(src); } } return RedisToken.string(member); } } |