All files / src/resp/command/list blpop-command.ts

25% Statements 13/52
0% Branches 0/4
16.66% Functions 1/6
26% Lines 13/50

Press n or j to go to the next uncovered block, b, p or k for the previous block.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 1401x   1x 1x   1x 1x                                                                       1x 4x   4x         4x 4x 4x 4x 4x                                                                                                                                                                          
import { Logger } from "../../../logger";
import { IRequest } from "../../../server/request";
import { TimedEmitter } from "../../../timed-emitter";
import { DataType } from "../../data/data-type";
import { Database } from "../../data/database";
import { RedisToken } from "../../protocol/redis-token";
import { LPopCommand } from "./lpop-command";
 
/**
 * ### Available since 2.0.0.
 * ### BLPOP key [key ...] timeout
 * BLPOP is a blocking list pop primitive. It is the blocking version of LPOP because it blocks
 * the connection when there are no elements to pop from any of the given lists. An element is
 * popped from the head of the first list that is non-empty, with the given keys being checked
 * in the order that they are given.
 *
 * ### Non-blocking behavior
 * When BLPOP is called, if at least one of the specified keys contains a non-empty list, an
 * element is popped from the head of the list and returned to the caller together with the key
 * it was popped from.
 *
 * Keys are checked in the order that they are given. Let's say that the key list1 doesn't exist
 * and list2 and list3 hold non-empty lists. Consider the following command:
 * ```
 * BLPOP list1 list2 list3 0
 * ```
 * BLPOP guarantees to return an element from the list stored at list2 (since it is the first
 * non empty list when checking list1, list2 and list3 in that order).
 * ### Blocking behavior
 * If none of the specified keys exist, BLPOP blocks the connection until another client performs
 * an LPUSH or RPUSH operation against one of the keys.
 *
 * Once new data is present on one of the lists, the client returns with the name of the key
 * unblocking it and the popped value.
 *
 * When BLPOP causes a client to block and a non-zero timeout is specified, the client will
 * unblock returning a nil multi-bulk value when the specified timeout has expired without a
 * push operation against at least one of the specified keys.
 *
 * The timeout argument is interpreted as an integer value specifying the maximum number of
 * seconds to block. A timeout of zero can be used to block indefinitely.
 */
export class BLPopCommand extends LPopCommand {
    public Blocking = true
 
    public DbDataType = DataType.LIST
 
    protected logger: Logger;
 
    constructor(maxParams: number, minParams: number, name: string) {
        super();
        this.constructor.prototype.maxParams = maxParams;
        this.constructor.prototype.minParams = minParams;
        this.constructor.prototype.name = name;
        this.logger = new Logger(module.id);
    }
 
    public execSync(request: IRequest, db: Database): Promise<RedisToken> {
        return new Promise((resolve) => {
            this.logger.debug(
                `${request.getCommand()}.execute(%s)`,
                ...request.getParams()
            );
            const timeout: string = request.getParam(request.getParams().length - 1),
                // Check all source keys first
                results: RedisToken[] = [];
            for (let index = 0; index < request.getParams().length - 1; index++) {
                const key = request.getParam(index),
                    result = this.process(
                        request,
                        db,
                        key
                    );
                if (result !== RedisToken.nullString()) {
                    results.push(RedisToken.string(key));
                    results.push(result);
                    break;
                }
            }
            if (results.length > 0) {
                resolve(RedisToken.array(results));
            } else {
                const eventNames: string[] = [],
                    eventCallbacks: any = {};
                for (let index = 0; index < request.getParams().length - 1; index++) {
                    const key = request.getParam(index);
                    eventNames.push(`__keyevent@${request.getSession().getCurrentDb()}__:lpush ${key}`);
                    eventNames.push(`__keyevent@${request.getSession().getCurrentDb()}__:rpush ${key}`);
                    eventNames.push(`__keyevent@${request.getSession().getCurrentDb()}__:linsert ${key}`);
                    eventNames.push(`__keyevent@${request.getSession().getCurrentDb()}__:lset ${key}`);
                }
                const timedEvent: TimedEmitter = new TimedEmitter(
                    Number(timeout),
                    eventNames,
                    request.getServerContext()
                );
                timedEvent.on(
                    "timeout",
                    () => {
                        this.logger.debug("Timeout");
                        this.removeListeners(
                            timedEvent,
                            eventCallbacks
                        );
                        resolve(RedisToken.nullString());
                    }
                );
                for (const eventName of eventNames) {
                    this.logger.debug(`Adding listener for ${eventName}`);
                    eventCallbacks[eventName] = () => {
                        const keyName: string = `${eventName.split(" ")[1]}`,
                            callresults: RedisToken[] = [RedisToken.string(keyName)];
                        callresults.push(this.process(
                            request,
                            db,
                            keyName
                        ));
                        resolve(RedisToken.array(callresults));
                    };
                    timedEvent.on(
                        eventName,
                        eventCallbacks[eventName]
                    );
                }
                //        Resolve(RedisToken.array(results));
            }
        });
    }
 
    private removeListeners(timedEvent: TimedEmitter, events: any) {
        for (const eName of Object.keys(events)) {
            this.logger.debug(`Removing listener for ${eName}`);
            timedEvent.off(
                eName,
                events[eName]
            );
        }
    }
}