Press n or j to go to the next uncovered block, b, p or k for the previous block.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 | 1x 1x 1x 1x 1x 4x 4x 4x 4x 4x | import { Logger } from "../../../logger"; import { IRequest } from "../../../server/request"; import { TimedEmitter } from "../../../timed-emitter"; import { RedisToken } from "../../protocol/redis-token"; import { IRespCommand } from "../resp-command"; /** * ### Available since 2.0.0. * ### SUBSCRIBE channel [channel ...] * * Subscribes the client to the specified channels. * * Once the client enters the subscribed state it is not supposed to issue any other commands, * except for additional SUBSCRIBE, PSUBSCRIBE, UNSUBSCRIBE, PUNSUBSCRIBE, PING and QUIT commands. * ### Return value * Array reply: The 'subscribe' keyword followed by [channel name, aggregate subscribe count] * for each channel supplied. */ export class SubscribeCommand extends IRespCommand { public blocking = true public maxParams = -1 public minParams = 1 public name = "subscribe" private logger: Logger = new Logger(module.id); public execSync(request: IRequest): Promise<RedisToken> { return new Promise((resolve) => { this.logger.debug( `${request.getCommand()}.execute(%s)`, ...request.getParams() ); const channels: string[] = []; for (const channel of request.getParams()) { if (!request.getSession().isSubscribed(channel)) { channels.push(channel); } } if (channels.length > 0) { for (const channel of channels) { const timedEvent: TimedEmitter = new TimedEmitter( 0, [channel], request.getServerContext() ), removeListener: any = (name: string, te: TimedEmitter, callback: any) => { this.logger.debug(`Removing listener for ${name}`); te.off( name, callback ); }; timedEvent.on( "timeout", (eventNames: string[]) => { for (const name of eventNames) { this.logger.debug(`Timeout on channel "${name}"`); removeListener( name, timedEvent, request.getSession().getSubscription(name) ); // Reinstate these listeners this.execSync(request); } } ); this.logger.debug(`Adding listener for channel "${channel}"`); timedEvent.callback = (data: any) => { this.logger.debug( "TimedEvent.callback received name: \"%s\"", data ); const publish: RedisToken[] = [ RedisToken.string("message"), RedisToken.string(data.channel), RedisToken.string(data.message) ]; request.getSession().publish(RedisToken.array(publish)); }; timedEvent.on( channel, timedEvent.callback ); request.getSession().subscribe( channel, timedEvent ); const response: RedisToken[] = [RedisToken.string("subscribe")]; response.push(RedisToken.string(channel)); response.push(RedisToken.integer(request.getSession().getSubscriptionNames().length)); request.getSession().publish(RedisToken.array(response)); } /* * This will not return until unsubscribe is called on all of the channels in this request * TODO: actually resolve this when required * Return (RedisToken.responseOk()); */ } else { // I'm only guessing this is the proper response return RedisToken.responseOk(); } }); } } |