All files / src/resp/command/list brpoplpush-command.ts

47.5% Statements 19/40
50% Branches 1/2
50% Functions 3/6
47.5% Lines 19/40

Press n or j to go to the next uncovered block, b, p or k for the previous block.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 1091x   1x 1x   1x 1x                                         1x 4x   4x   4x   4x   4x         4x 4x       2x 2x   2x       2x                                                                             2x 2x                              
import { Logger } from "../../../logger";
import { IRequest } from "../../../server/request";
import { TimedEmitter } from "../../../timed-emitter";
import { DataType } from "../../data/data-type";
import { Database } from "../../data/database";
import { RedisToken } from "../../protocol/redis-token";
import { RPoplPushCommand } from "./rpoplpush-command";
 
/**
 * ### Available since 2.2.0.
 * ### BRPOPLPUSH source destination timeout
 * BRPOPLPUSH is the blocking variant of [RPOPLPUSH]{@link RPoplPushCommand}. When source contains
 * elements, this command behaves exactly like RPOPLPUSH. When used inside a
 * {@link resp/command/multi-command.MultiCommand | MULTI}/{@link resp/command/exec-command.ExecCommand | EXEC} block,
 * this command behaves exactly like RPOPLPUSH. When source is empty, Redis will block the
 * connection until another client pushes to it or until timeout is reached. A timeout of zero
 * can be used to block indefinitely.
 *
 * See [RPOPLPUSH]{@link RPoplPushCommand} for more information.
 * ### Return value
 * Bulk string reply: the element being popped from source and pushed to destination. If timeout
 * is reached, a Null reply is returned.
 * ### Pattern: Reliable queue
 * Please see the pattern description in the [RPOPLPUSH]{@link RPoplPushCommand} documentation.
 * ### Pattern: Circular list
 * Please see the pattern description in the [RPOPLPUSH]{@link RPoplPushCommand} documentation.
 */
export class BRPoplPushCommand extends RPoplPushCommand {
    public blocking = true
 
    public dbDataType = DataType.LIST
 
    public minParams = 3
 
    public maxParams = 3
 
    public name = "brpoplpush"
 
    protected logger: Logger;
 
    constructor() {
        super();
        this.logger = new Logger(module.id);
    }
 
    public execSync(request: IRequest, db: Database): RedisToken | Promise<RedisToken> {
        return new Promise((resolve) => {
            const timeout: string = request.getParam(request.getParams().length - 1),
                // Run rpoplpush
                result: RedisToken = this.process(
                    request,
                    db
                );
            Iif (result === RedisToken.nullString()) {
                const eventNames: string[] = [],
                    eventCallbacks: any = {},
                    srcKey = request.getParam(0);
                eventNames.push(`__keyevent@${request.getSession().getCurrentDb()}__:lpush ${srcKey}`);
                eventNames.push(`__keyevent@${request.getSession().getCurrentDb()}__:rpush ${srcKey}`);
                eventNames.push(`__keyevent@${request.getSession().getCurrentDb()}__:linsert ${srcKey}`);
                eventNames.push(`__keyevent@${request.getSession().getCurrentDb()}__:lset ${srcKey}`);
                const timedEvent: TimedEmitter = new TimedEmitter(
                    Number(timeout),
                    eventNames,
                    request.getServerContext()
                );
                timedEvent.on(
                    "timeout",
                    () => {
                        this.logger.debug("Timeout");
                        this.removeListeners(
                            timedEvent,
                            eventCallbacks
                        );
                        resolve(RedisToken.nullString());
                    }
                );
                for (const eventName of eventNames) {
                    this.logger.debug(`Adding listener for ${eventName}`);
                    eventCallbacks[eventName] = () => {
                        this.logger.debug(`Received event ${eventName}`);
                        resolve(this.process(
                            request,
                            db
                        ));
                    };
                    timedEvent.on(
                        eventName,
                        eventCallbacks[eventName]
                    );
                }
            } else {
                this.logger.debug(`Resolving ${result}`);
                resolve(result);
            }
        });
    }
 
    private removeListeners(timedEvent: TimedEmitter, events: any) {
        for (const eName of Object.keys(events)) {
            this.logger.debug(`Removing listener for ${eName}`);
            timedEvent.off(
                eName,
                events[eName]
            );
        }
    }
}