The Redis keyspace to use for the transport.
# File lib/sensu/transport/redis.rb, line 11 def initialize @options = {} @connections = {} super end
Close ALL Redis connections.
# File lib/sensu/transport/redis.rb, line 59 def close @connections.each_value do |connection| connection.close end end
Redis transport connection setup. This method sets `@options`, creates a named Redis connection “redis”, and sets the deferred status to `:succeeded` via `succeed()`.
@param options [Hash, String]
# File lib/sensu/transport/redis.rb, line 22 def connect(options={}) @options = options || {} redis_connection("redis") do |connection| connection.callback do succeed end end end
Indicates if ALL Redis connections are connected.
@return [TrueClass, FalseClass]
# File lib/sensu/transport/redis.rb, line 52 def connected? !@connections.empty? && @connections.values.all? do |connection| connection.connected? end end
Publish a message to the Redis transport. The transport pipe type determines the method of sending messages to consumers using Redis, either using PubSub or a list. The appropriate publish method is call for the pipe type given. The Redis transport ignores publish options.
@param type [Symbol] the transport pipe type, possible values
are: :direct and :fanout.
@param pipe [String] the transport pipe name. @param message [String] the message to be published to the transport. @param options [Hash] IGNORED by this transport. @yield [info] passes publish info to an optional callback/block. @yieldparam info [Hash] contains publish information, which
may contain an error object.
# File lib/sensu/transport/redis.rb, line 79 def publish(type, pipe, message, options={}, &callback) case type.to_sym when :fanout pubsub_publish(pipe, message, &callback) when :direct list_publish(pipe, message, &callback) end end
Reconnect to the Redis transport. The Redis connections used by the transport have auto-reconnect disabled; if a single connection is unhealthy, all connections are closed, the transport is reset, and new connections are made. If the transport is not already reconnecting to Redis, the `@before_reconnect` transport callback is called.
@param force [Boolean] the reconnect.
# File lib/sensu/transport/redis.rb, line 39 def reconnect(force=false) @before_reconnect.call unless @reconnecting unless @reconnecting && !force @reconnecting = true close reset connect(@options) end end
Redis transport pipe/funnel stats, such as message and consumer counts. This method is currently unable to determine the consumer count for a Redis list.
@param funnel [String] the transport funnel to get stats for. @param options [Hash] IGNORED by this transport. @yield [info] passes list stats to the callback/block. @yieldparam info [Hash] contains list stats.
# File lib/sensu/transport/redis.rb, line 140 def stats(funnel, options={}) redis_connection("redis") do |connection| connection.llen(funnel) do |messages| info = { :messages => messages, :consumers => 0 } yield(info) end end end
Subscribe to a Redis transport pipe. The transport pipe type determines the method of consuming messages from Redis, either using PubSub or a list. The appropriate subscribe method is call for the pipe type given. The Redis transport ignores subscribe options and the funnel name.
@param type [Symbol] the transport pipe type, possible values
are: :direct and :fanout.
@param pipe [String] the transport pipe name. @param funnel [String] IGNORED by this transport. @param options [Hash] IGNORED by this transport. @yield [info, message] passes message info and content to
the consumer callback/block.
@yieldparam info [Hash] contains message information. @yieldparam message [String] message.
# File lib/sensu/transport/redis.rb, line 103 def subscribe(type, pipe, funnel=nil, options={}, &callback) case type.to_sym when :fanout pubsub_subscribe(pipe, &callback) when :direct list_subscribe(pipe, &callback) end end
Unsubscribe from all transport pipes. This method iterates through the current named Redis connections, unsubscribing the “pubsub” connection from Redis channels, and closing/deleting BLPOP connections.
@yield [info] passes info to an optional callback/block. @yieldparam info [Hash] empty hash.
# File lib/sensu/transport/redis.rb, line 119 def unsubscribe @connections.each do |name, connection| case name when "pubsub" connection.unsubscribe when /^#{REDIS_KEYSPACE}/ connection.close @connections.delete(name) end end super end
Shift a message off of a Redis list and schedule another shift on the next tick of the event loop (reactor). Redis BLPOP is a connection blocking Redis command, this method creates a named Redis connection for each list. Multiple Redis connections for BLPOP commands is far more efficient than timer or next tick polling with LPOP.
@param list [String] @yield [info, message] passes message info and content to
the consumer/method callback/block.
@yieldparam info [Hash] an empty hash. @yieldparam message [String] message content.
# File lib/sensu/transport/redis.rb, line 307 def list_blpop(list, &callback) redis_connection(list) do |connection| connection.blpop(list, 0) do |_, message| EM::next_tick { list_blpop(list, &callback) } callback.call({}, message) end end end
Push (publish) a message onto a Redis list. The `redis_key()` method is used to create a Redis list key, using the transport pipe name. The publish callback info includes the current list size (queued).
@param pipe [String] the transport pipe name. @param message [String] the message to be published to the transport. @yield [info] passes publish info to an optional callback/block. @yieldparam info [Hash] contains publish information. @yieldparam queued [String] current list size.
# File lib/sensu/transport/redis.rb, line 285 def list_publish(pipe, message) list = redis_key("list", pipe) redis_connection("redis") do |connection| connection.rpush(list, message) do |queued| info = {:queued => queued} yield(info) if block_given? end end end
Subscribe to a Redis list, shifting message off as they become available. The `redis_key()` method is used to create a Redis list key, using the transport pipe name. The `list_blpop()` method is used to do the actual work.
@param pipe [String] the transport pipe name. @yield [info, message] passes message info and content to
the consumer/method callback/block.
@yieldparam info [Hash] an empty hash. @yieldparam message [String] message content.
# File lib/sensu/transport/redis.rb, line 326 def list_subscribe(pipe, &callback) list = redis_key("list", pipe) list_blpop(list, &callback) end
Monitor current Redis connections, the connection “pool”. A timer is used to check on the connections, every `3` seconds. If one or more connections is not connected, a forced `reconnect()` is triggered. If all connections are connected after reconnecting, the transport `@after_reconnect` callback is called. If a connection monitor (timer) already exists, it is canceled.
# File lib/sensu/transport/redis.rb, line 166 def monitor_connections @connection_monitor.cancel if @connection_monitor @connection_monitor = EM::PeriodicTimer.new(3) do if !connected? reconnect(true) elsif @reconnecting @after_reconnect.call @reconnecting = false end end end
Publish a message to a Redis channel (PubSub). The `redis_key()` method is used to create a Redis channel key, using the transport pipe name. The publish callback info includes the current subscriber count for the Redis channel.
@param pipe [String] the transport pipe name. @param message [String] the message to be published to the transport. @yield [info] passes publish info to an optional callback/block. @yieldparam info [Hash] contains publish information. @yieldparam subscribers [String] current subscriber count.
# File lib/sensu/transport/redis.rb, line 231 def pubsub_publish(pipe, message) channel = redis_key("channel", pipe) redis_connection("redis") do |connection| connection.publish(channel, message) do |subscribers| info = {:subscribers => subscribers} yield(info) if block_given? end end end
Subscribe to a Redis channel (PubSub). The `redis_key()` method is used to create a Redis channel key, using the transport pipe name. The named Redis connection “pubsub” is used for the Redis SUBSCRIBE command set, as the Redis context is limited and enforced for the connection. The subscribe callback is called whenever a message is published to the Redis channel. Channel messages with the type “subscribe” and “unsubscribe” are ignored, only messages with type “message” are passsed to the provided consumer/method callback/block.
@param pipe [String] the transport pipe name. @yield [info, message] passes message info and content to
the consumer/method callback/block.
@yieldparam info [Hash] contains the channel name. @yieldparam message [String] message content.
# File lib/sensu/transport/redis.rb, line 258 def pubsub_subscribe(pipe) channel = redis_key("channel", pipe) redis_connection("pubsub") do |connection| connection.subscribe(channel) do |type, channel, message| case type when "subscribe" @logger.debug("subscribed to redis channel: #{channel}") if @logger when "unsubscribe" @logger.debug("unsubscribed from redis channel: #{channel}") if @logger when "message" info = {:channel => channel} yield(info, message) end end end end
Return or setup a named Redis connection. This method creates a Redis connection object using the provided Redis transport options. Redis auto-reconnect is disabled as the connection “pool” is monitored as a whole. The transport `@on_error` callback is called when Redis errors are encountered. This method creates/replaces the connection monitor after setting up the connection and before adding it to the pool.
@param name [String] the Redis connection name. @yield [Object] passes the named connection object to the
callback/block.
# File lib/sensu/transport/redis.rb, line 189 def redis_connection(name) if @connections[name] yield(@connections[name]) else Sensu::Redis.connect(@options) do |connection| connection.auto_reconnect = false connection.reconnect_on_error = false connection.on_error do |error| @on_error.call(error) end monitor_connections @connections[name] = connection yield(connection) end end end
Create a Redis key within the defined Redis keyspace. This method is used to create keys that are unlikely to collide. The Redis connection database number is included in the Redis key as pubsub is not scoped to the selected database.
@param type [String] @param name [String] @return [String]
# File lib/sensu/transport/redis.rb, line 214 def redis_key(type, name) db = @options.is_a?(Hash) ? (@options[:db] || 0) : 0 [REDIS_KEYSPACE, db, type, name].join(":") end
Reset instance variables, called when reconnecting.
# File lib/sensu/transport/redis.rb, line 155 def reset @connections = {} end