Press n or j to go to the next uncovered block, b, p or k for the previous block.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 | 1x 1x 1x 1x 1x 1x 4x 4x 4x 4x 4x 4x 4x 2x 2x 2x 2x 2x 2x | import { Logger } from "../../../logger"; import { IRequest } from "../../../server/request"; import { TimedEmitter } from "../../../timed-emitter"; import { DataType } from "../../data/data-type"; import { Database } from "../../data/database"; import { RedisToken } from "../../protocol/redis-token"; import { RPoplPushCommand } from "./rpoplpush-command"; /** * ### Available since 2.2.0. * ### BRPOPLPUSH source destination timeout * BRPOPLPUSH is the blocking variant of [RPOPLPUSH]{@link RPoplPushCommand}. When source contains * elements, this command behaves exactly like RPOPLPUSH. When used inside a * {@link resp/command/multi-command.MultiCommand | MULTI}/{@link resp/command/exec-command.ExecCommand | EXEC} block, * this command behaves exactly like RPOPLPUSH. When source is empty, Redis will block the * connection until another client pushes to it or until timeout is reached. A timeout of zero * can be used to block indefinitely. * * See [RPOPLPUSH]{@link RPoplPushCommand} for more information. * ### Return value * Bulk string reply: the element being popped from source and pushed to destination. If timeout * is reached, a Null reply is returned. * ### Pattern: Reliable queue * Please see the pattern description in the [RPOPLPUSH]{@link RPoplPushCommand} documentation. * ### Pattern: Circular list * Please see the pattern description in the [RPOPLPUSH]{@link RPoplPushCommand} documentation. */ export class BRPoplPushCommand extends RPoplPushCommand { public blocking = true public dbDataType = DataType.LIST public minParams = 3 public maxParams = 3 public name = "brpoplpush" protected logger: Logger; constructor() { super(); this.logger = new Logger(module.id); } public execSync(request: IRequest, db: Database): RedisToken | Promise<RedisToken> { return new Promise((resolve) => { const timeout: string = request.getParam(request.getParams().length - 1), // Run rpoplpush result: RedisToken = this.process( request, db ); Iif (result === RedisToken.nullString()) { const eventNames: string[] = [], eventCallbacks: any = {}, srcKey = request.getParam(0); eventNames.push(`__keyevent@${request.getSession().getCurrentDb()}__:lpush ${srcKey}`); eventNames.push(`__keyevent@${request.getSession().getCurrentDb()}__:rpush ${srcKey}`); eventNames.push(`__keyevent@${request.getSession().getCurrentDb()}__:linsert ${srcKey}`); eventNames.push(`__keyevent@${request.getSession().getCurrentDb()}__:lset ${srcKey}`); const timedEvent: TimedEmitter = new TimedEmitter( Number(timeout), eventNames, request.getServerContext() ); timedEvent.on( "timeout", () => { this.logger.debug("Timeout"); this.removeListeners( timedEvent, eventCallbacks ); resolve(RedisToken.nullString()); } ); for (const eventName of eventNames) { this.logger.debug(`Adding listener for ${eventName}`); eventCallbacks[eventName] = () => { this.logger.debug(`Received event ${eventName}`); resolve(this.process( request, db )); }; timedEvent.on( eventName, eventCallbacks[eventName] ); } } else { this.logger.debug(`Resolving ${result}`); resolve(result); } }); } private removeListeners(timedEvent: TimedEmitter, events: any) { for (const eName of Object.keys(events)) { this.logger.debug(`Removing listener for ${eName}`); timedEvent.off( eName, events[eName] ); } } } |