Press n or j to go to the next uncovered block, b, p or k for the previous block.
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 | 1x 1x 1x 1x 1x 1x 4x 4x 4x 4x 4x 4x 4x 2x 2x 2x 2x 2x 2x | import { Logger } from "../../../logger";
import { IRequest } from "../../../server/request";
import { TimedEmitter } from "../../../timed-emitter";
import { DataType } from "../../data/data-type";
import { Database } from "../../data/database";
import { RedisToken } from "../../protocol/redis-token";
import { RPoplPushCommand } from "./rpoplpush-command";
/**
* ### Available since 2.2.0.
* ### BRPOPLPUSH source destination timeout
* BRPOPLPUSH is the blocking variant of [RPOPLPUSH]{@link RPoplPushCommand}. When source contains
* elements, this command behaves exactly like RPOPLPUSH. When used inside a
* {@link resp/command/multi-command.MultiCommand | MULTI}/{@link resp/command/exec-command.ExecCommand | EXEC} block,
* this command behaves exactly like RPOPLPUSH. When source is empty, Redis will block the
* connection until another client pushes to it or until timeout is reached. A timeout of zero
* can be used to block indefinitely.
*
* See [RPOPLPUSH]{@link RPoplPushCommand} for more information.
* ### Return value
* Bulk string reply: the element being popped from source and pushed to destination. If timeout
* is reached, a Null reply is returned.
* ### Pattern: Reliable queue
* Please see the pattern description in the [RPOPLPUSH]{@link RPoplPushCommand} documentation.
* ### Pattern: Circular list
* Please see the pattern description in the [RPOPLPUSH]{@link RPoplPushCommand} documentation.
*/
export class BRPoplPushCommand extends RPoplPushCommand {
public blocking = true
public dbDataType = DataType.LIST
public minParams = 3
public maxParams = 3
public name = "brpoplpush"
protected logger: Logger;
constructor() {
super();
this.logger = new Logger(module.id);
}
public execSync(request: IRequest, db: Database): RedisToken | Promise<RedisToken> {
return new Promise((resolve) => {
const timeout: string = request.getParam(request.getParams().length - 1),
// Run rpoplpush
result: RedisToken = this.process(
request,
db
);
Iif (result === RedisToken.nullString()) {
const eventNames: string[] = [],
eventCallbacks: any = {},
srcKey = request.getParam(0);
eventNames.push(`__keyevent@${request.getSession().getCurrentDb()}__:lpush ${srcKey}`);
eventNames.push(`__keyevent@${request.getSession().getCurrentDb()}__:rpush ${srcKey}`);
eventNames.push(`__keyevent@${request.getSession().getCurrentDb()}__:linsert ${srcKey}`);
eventNames.push(`__keyevent@${request.getSession().getCurrentDb()}__:lset ${srcKey}`);
const timedEvent: TimedEmitter = new TimedEmitter(
Number(timeout),
eventNames,
request.getServerContext()
);
timedEvent.on(
"timeout",
() => {
this.logger.debug("Timeout");
this.removeListeners(
timedEvent,
eventCallbacks
);
resolve(RedisToken.nullString());
}
);
for (const eventName of eventNames) {
this.logger.debug(`Adding listener for ${eventName}`);
eventCallbacks[eventName] = () => {
this.logger.debug(`Received event ${eventName}`);
resolve(this.process(
request,
db
));
};
timedEvent.on(
eventName,
eventCallbacks[eventName]
);
}
} else {
this.logger.debug(`Resolving ${result}`);
resolve(result);
}
});
}
private removeListeners(timedEvent: TimedEmitter, events: any) {
for (const eName of Object.keys(events)) {
this.logger.debug(`Removing listener for ${eName}`);
timedEvent.off(
eName,
events[eName]
);
}
}
}
|