All files / src/resp/command/list brpop-command.ts

25% Statements 13/52
0% Branches 0/4
16.66% Functions 1/6
26% Lines 13/50

Press n or j to go to the next uncovered block, b, p or k for the previous block.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 1361x   1x 1x 1x   1x                                                               1x 4x   4x         4x 4x 4x 4x 4x                                                                                                                                                                          
import { Logger } from "../../../logger";
import { IRequest } from "../../../server/request";
import { TimedEmitter } from "../../../timed-emitter";
import { RPopCommand } from "../../command/list/rpop-command";
import { DataType } from "../../data/data-type";
import { Database } from "../../data/database";
import { RedisToken } from "../../protocol/redis-token";
 
/**
 * ### Available since 2.0.0.
 * ### BRPOP key [key ...] timeout
 *
 * BRPOP is a blocking list pop primitive. It is the blocking version of {@link resp/command/list/rpop-command.RPopCommand | RPOP} because
 * it blocks the connection when there are no elements to pop from any of the given lists. An element
 * is popped from the tail of the first list that is non-empty, with the given keys being checked
 * in the order that they are given.
 *
 * ### See {@link TimedEmitter} for limitations
 *
 * See the {@link resp/command/list/blpop-command.BLPopCommand | BLPOP} documentation for the exact semantics, since BRPOP is
 * identical to BLPOP with the only difference being that it pops elements from the tail of a
 * list instead of popping from the head.
 * ### Return value
 * Array reply: specifically:
 * - A nil multi-bulk when no element could be popped and the timeout expired.
 * - A two-element multi-bulk with the first element being the name of the key where an element was
 * popped and the second element being the value of the popped element.
 * ### Examples
 * ```
 * redis> DEL list1 list2
 * (integer) 0
 * redis> RPUSH list1 a b c
 * (integer) 3
 * redis> BRPOP list1 list2 0
 * 1) "list1"
 * 2) "c"
 * ```
 */
export class BRPopCommand extends RPopCommand {
    public Blocking = true
 
    public DbDataType = DataType.LIST
 
    protected logger: Logger;
 
    constructor(maxParams: number, minParams: number, name: string) {
        super();
        this.constructor.prototype.maxParams = maxParams;
        this.constructor.prototype.minParams = minParams;
        this.constructor.prototype.name = name;
        this.logger = new Logger(module.id);
    }
 
    public execSync(request: IRequest, db: Database): RedisToken | Promise<RedisToken> {
        return new Promise((resolve) => {
            this.logger.debug(
                `${request.getCommand()}.execute(%s)`,
                ...request.getParams()
            );
            const timeout: string = request.getParam(request.getParams().length - 1),
                // Check all source keys first
                results: RedisToken[] = [];
            for (let index = 0; index < request.getParams().length - 1; index++) {
                const key = request.getParam(index),
                    result = this.process(
                        request,
                        db,
                        key
                    );
                if (result !== RedisToken.nullString()) {
                    results.push(RedisToken.string(key));
                    results.push(result);
                    break;
                }
            }
            if (results.length > 0) {
                resolve(RedisToken.array(results));
            } else {
                const eventNames: string[] = [],
                    eventCallbacks: any = {};
                for (let index = 0; index < request.getParams().length - 1; index++) {
                    const key = request.getParam(index);
                    eventNames.push(`__keyevent@${request.getSession().getCurrentDb()}__:lpush ${key}`);
                    eventNames.push(`__keyevent@${request.getSession().getCurrentDb()}__:rpush ${key}`);
                    eventNames.push(`__keyevent@${request.getSession().getCurrentDb()}__:linsert ${key}`);
                    eventNames.push(`__keyevent@${request.getSession().getCurrentDb()}__:lset ${key}`);
                }
                const timedEvent: TimedEmitter = new TimedEmitter(
                    Number(timeout),
                    eventNames,
                    request.getServerContext()
                );
                timedEvent.on(
                    "timeout",
                    () => {
                        this.logger.debug("Timeout");
                        this.removeListeners(
                            timedEvent,
                            eventCallbacks
                        );
                        resolve(RedisToken.nullString());
                    }
                );
                for (const eventName of eventNames) {
                    this.logger.debug(`Adding listener for ${eventName}`);
                    eventCallbacks[eventName] = () => {
                        const keyName: string = `${eventName.split(" ")[1]}`,
                            callresults: RedisToken[] = [RedisToken.string(keyName)];
                        callresults.push(this.process(
                            request,
                            db,
                            keyName
                        ));
                        resolve(RedisToken.array(callresults));
                    };
                    timedEvent.on(
                        eventName,
                        eventCallbacks[eventName]
                    );
                }
                //        Resolve(RedisToken.array(results));
            }
        });
    }
 
    private removeListeners(timedEvent: TimedEmitter, events: any) {
        for (const eName of Object.keys(events)) {
            this.logger.debug(`Removing listener for ${eName}`);
            timedEvent.off(
                eName,
                events[eName]
            );
        }
    }
}