Commit d1c01fe8 authored by Nick Thomas's avatar Nick Thomas

Remove direct redis integration

parent 73e2cf53
REDIS_RB_VERSION=v3.3.3
REDIS_RB_VENDOR_DIR=lib/vendor/redis
PWD=`pwd`
all:
update-redis:
rm -rf $(REDIS_RB_VENDOR_DIR)
git clone -b $(REDIS_RB_VERSION) https://github.com/redis/redis-rb.git $(REDIS_RB_VENDOR_DIR)
rm -rf $(REDIS_RB_VENDOR_DIR)/.git
.PHONY=update-redis
......@@ -3,17 +3,6 @@
require_relative '../lib/gitlab_init'
require_relative '../lib/gitlab_net'
def ping_redis
print "Send ping to redis server: "
if GitlabNet.new.redis_client.ping
print 'OK'
else
abort 'FAILED'
end
puts "\n"
end
#
# GitLab shell check task
#
......@@ -30,16 +19,12 @@ begin
check_values = JSON.parse(resp.body)
if check_values.key?('redis')
print 'Redis available via internal API: '
if check_values['redis']
puts 'OK'
else
abort 'FAILED'
end
else
ping_redis
end
rescue GitlabNet::ApiUnreachableError
abort "FAILED: Failed to connect to internal API"
end
......
......@@ -35,23 +35,6 @@ auth_file: "/home/git/.ssh/authorized_keys"
# Default is hooks in the gitlab-shell directory.
# custom_hooks_dir: "/home/git/gitlab-shell/hooks"
# Redis settings used for pushing commit notices to gitlab
redis:
# host: 127.0.0.1
# port: 6379
# pass: redispass # Allows you to specify the password for Redis
database: 0
socket: /var/run/redis/redis.sock # Comment out this line if you want to use TCP or Sentinel
namespace: resque:gitlab
# sentinels:
# -
# host: 127.0.0.1
# port: 26380
# -
# host: 127.0.0.1
# port: 26381
# Log file.
# Default is gitlab-shell.log in the root directory.
# log_file: "/home/git/gitlab-shell/gitlab-shell.log"
......
......@@ -13,8 +13,6 @@ def increase_reference_counter(gl_repository, repo_path)
result = GitlabNet.new.pre_receive(gl_repository)
result['reference_counter_increased']
rescue GitlabNet::NotFound
GitlabReferenceCounter.new(repo_path).increase
end
require_relative '../lib/gitlab_custom_hook'
......
......@@ -34,14 +34,6 @@ class GitlabConfig
@config['http_settings'] ||= {}
end
def redis
@config['redis'] ||= {}
end
def redis_namespace
redis['namespace'] || 'resque:gitlab'
end
def log_file
@config['log_file'] ||= File.join(ROOT_PATH, 'gitlab-shell.log')
end
......
......@@ -5,7 +5,6 @@ require 'json'
require_relative 'gitlab_config'
require_relative 'gitlab_logger'
require_relative 'gitlab_access'
require_relative 'gitlab_redis'
require_relative 'gitlab_lfs_authentication'
require_relative 'httpunix'
......@@ -140,30 +139,6 @@ class GitlabNet
JSON.parse(resp.body) if resp.code == '200'
end
def redis_client
redis_config = config.redis
database = redis_config['database'] || 0
params = {
host: redis_config['host'] || '127.0.0.1',
port: redis_config['port'] || 6379,
db: database
}
if redis_config.has_key?('sentinels')
params[:sentinels] = redis_config['sentinels']
.select { |s| s['host'] && s['port'] }
.map { |s| { host: s['host'], port: s['port'] } }
end
if redis_config.has_key?("socket")
params = { path: redis_config['socket'], db: database }
elsif redis_config.has_key?("pass")
params[:password] = redis_config['pass']
end
Redis.new(params)
end
protected
def sanitize_path(repo)
......
require_relative 'gitlab_init'
require_relative 'gitlab_net'
require_relative 'gitlab_reference_counter'
require_relative 'gitlab_metrics'
require 'json'
require 'base64'
......@@ -32,8 +31,6 @@ class GitlabPostReceive
response['reference_counter_decreased']
rescue GitlabNet::ApiUnreachableError
false
rescue GitlabNet::NotFound
fallback_post_receive
end
protected
......@@ -95,55 +92,4 @@ class GitlabPostReceive
puts
puts "=" * total_width
end
def update_redis
# Encode changes as base64 so we don't run into trouble with non-UTF-8 input.
changes = Base64.encode64(@changes)
# TODO: Change to `@gl_repository` in next release.
# See https://gitlab.com/gitlab-org/gitlab-shell/merge_requests/130#note_28747613
project_identifier = @gl_repository || @repo_path
queue = "#{config.redis_namespace}:queue:post_receive"
msg = JSON.dump({
'class' => 'PostReceive',
'args' => [project_identifier, @actor, changes],
'jid' => @jid,
'enqueued_at' => Time.now.to_f
})
begin
GitlabNet.new.redis_client.rpush(queue, msg)
true
rescue => e
$stderr.puts "GitLab: An unexpected error occurred in writing to Redis: #{e}"
false
end
end
private
def fallback_post_receive
result = update_redis
begin
broadcast_message = GitlabMetrics.measure("broadcast-message") do
api.broadcast_message
end
if broadcast_message.has_key?("message")
print_broadcast_message(broadcast_message["message"])
end
merge_request_urls = GitlabMetrics.measure("merge-request-urls") do
api.merge_request_urls(@gl_repository, @repo_path, @changes)
end
print_merge_request_links(merge_request_urls)
api.notify_post_receive(gl_repository, repo_path)
rescue GitlabNet::ApiUnreachableError
nil
end
result && GitlabReferenceCounter.new(repo_path).decrease
end
end
$:.unshift(File.expand_path(File.join(File.dirname(__FILE__), 'vendor/redis/lib')))
require 'redis'
require_relative 'gitlab_init'
require_relative 'gitlab_net'
class GitlabReferenceCounter
REFERENCE_EXPIRE_TIME = 600
attr_reader :path, :key
def initialize(path)
@path = path
@key = "git-receive-pack-reference-counter:#{path}"
end
def value
(redis_client.get(key) || 0).to_i
end
def increase
redis_cmd do
redis_client.incr(key)
redis_client.expire(key, REFERENCE_EXPIRE_TIME)
end
end
def decrease
redis_cmd do
current_value = redis_client.decr(key)
if current_value < 0
$logger.warn "Reference counter for #{path} decreased when its value was less than 1. Reseting the counter."
redis_client.del(key)
end
end
end
private
def redis_client
@redis_client ||= GitlabNet.new.redis_client
end
def redis_cmd
begin
yield
true
rescue => e
message = "GitLab: An unexpected error occurred in writing to Redis: #{e}"
$stderr.puts message
$logger.error message
false
end
end
end
require "monitor"
require "redis/errors"
class Redis
def self.deprecate(message, trace = caller[0])
$stderr.puts "\n#{message} (in #{trace})"
end
attr :client
# @deprecated The preferred way to create a new client object is using `#new`.
# This method does not actually establish a connection to Redis,
# in contrary to what you might expect.
def self.connect(options = {})
new(options)
end
def self.current
@current ||= Redis.new
end
def self.current=(redis)
@current = redis
end
include MonitorMixin
# Create a new client instance
#
# @param [Hash] options
# @option options [String] :url (value of the environment variable REDIS_URL) a Redis URL, for a TCP connection: `redis://:[password]@[hostname]:[port]/[db]` (password, port and database are optional), for a unix socket connection: `unix://[path to Redis socket]`. This overrides all other options.
# @option options [String] :host ("127.0.0.1") server hostname
# @option options [Fixnum] :port (6379) server port
# @option options [String] :path path to server socket (overrides host and port)
# @option options [Float] :timeout (5.0) timeout in seconds
# @option options [Float] :connect_timeout (same as timeout) timeout for initial connect in seconds
# @option options [String] :password Password to authenticate against server
# @option options [Fixnum] :db (0) Database to select after initial connect
# @option options [Symbol] :driver Driver to use, currently supported: `:ruby`, `:hiredis`, `:synchrony`
# @option options [String] :id ID for the client connection, assigns name to current connection by sending `CLIENT SETNAME`
# @option options [Hash, Fixnum] :tcp_keepalive Keepalive values, if Fixnum `intvl` and `probe` are calculated based on the value, if Hash `time`, `intvl` and `probes` can be specified as a Fixnum
# @option options [Fixnum] :reconnect_attempts Number of attempts trying to connect
# @option options [Boolean] :inherit_socket (false) Whether to use socket in forked process or not
# @option options [Array] :sentinels List of sentinels to contact
# @option options [Symbol] :role (:master) Role to fetch via Sentinel, either `:master` or `:slave`
#
# @return [Redis] a new client instance
def initialize(options = {})
@options = options.dup
@original_client = @client = Client.new(options)
@queue = Hash.new { |h, k| h[k] = [] }
super() # Monitor#initialize
end
def synchronize
mon_synchronize { yield(@client) }
end
# Run code with the client reconnecting
def with_reconnect(val=true, &blk)
synchronize do |client|
client.with_reconnect(val, &blk)
end
end
# Run code without the client reconnecting
def without_reconnect(&blk)
with_reconnect(false, &blk)
end
# Test whether or not the client is connected
def connected?
@original_client.connected?
end
# Disconnect the client as quickly and silently as possible.
def close
@original_client.disconnect
end
alias disconnect! close
# Sends a command to Redis and returns its reply.
#
# Replies are converted to Ruby objects according to the RESP protocol, so
# you can expect a Ruby array, integer or nil when Redis sends one. Higher
# level transformations, such as converting an array of pairs into a Ruby
# hash, are up to consumers.
#
# Redis error replies are raised as Ruby exceptions.
def call(*command)
synchronize do |client|
client.call(command)
end
end
# Queues a command for pipelining.
#
# Commands in the queue are executed with the Redis#commit method.
#
# See http://redis.io/topics/pipelining for more details.
#
def queue(*command)
@queue[Thread.current.object_id] << command
end
# Sends all commands in the queue.
#
# See http://redis.io/topics/pipelining for more details.
#
def commit
synchronize do |client|
begin
client.call_pipelined(@queue[Thread.current.object_id])
ensure
@queue.delete(Thread.current.object_id)
end
end
end
# Authenticate to the server.
#
# @param [String] password must match the password specified in the
# `requirepass` directive in the configuration file
# @return [String] `OK`
def auth(password)
synchronize do |client|
client.call([:auth, password])
end
end
# Change the selected database for the current connection.
#
# @param [Fixnum] db zero-based index of the DB to use (0 to 15)
# @return [String] `OK`
def select(db)
synchronize do |client|
client.db = db
client.call([:select, db])
end
end
# Ping the server.
#
# @return [String] `PONG`
def ping
synchronize do |client|
client.call([:ping])
end
end
# Echo the given string.
#
# @param [String] value
# @return [String]
def echo(value)
synchronize do |client|
client.call([:echo, value])
end
end
# Close the connection.
#
# @return [String] `OK`
def quit
synchronize do |client|
begin
client.call([:quit])
rescue ConnectionError
ensure
client.disconnect
end
end
end
# Asynchronously rewrite the append-only file.
#
# @return [String] `OK`
def bgrewriteaof
synchronize do |client|
client.call([:bgrewriteaof])
end
end
# Asynchronously save the dataset to disk.
#
# @return [String] `OK`
def bgsave
synchronize do |client|
client.call([:bgsave])
end
end
# Get or set server configuration parameters.
#
# @param [Symbol] action e.g. `:get`, `:set`, `:resetstat`
# @return [String, Hash] string reply, or hash when retrieving more than one
# property with `CONFIG GET`
def config(action, *args)
synchronize do |client|
client.call([:config, action] + args) do |reply|
if reply.kind_of?(Array) && action == :get
Hashify.call(reply)
else
reply
end
end
end
end
# Return the number of keys in the selected database.
#
# @return [Fixnum]
def dbsize
synchronize do |client|
client.call([:dbsize])
end
end
def debug(*args)
synchronize do |client|
client.call([:debug] + args)
end
end
# Remove all keys from all databases.
#
# @return [String] `OK`
def flushall
synchronize do |client|
client.call([:flushall])
end
end
# Remove all keys from the current database.
#
# @return [String] `OK`
def flushdb
synchronize do |client|
client.call([:flushdb])
end
end
# Get information and statistics about the server.
#
# @param [String, Symbol] cmd e.g. "commandstats"
# @return [Hash<String, String>]
def info(cmd = nil)
synchronize do |client|
client.call([:info, cmd].compact) do |reply|
if reply.kind_of?(String)
reply = Hash[reply.split("\r\n").map do |line|
line.split(":", 2) unless line =~ /^(#|$)/
end.compact]
if cmd && cmd.to_s == "commandstats"
# Extract nested hashes for INFO COMMANDSTATS
reply = Hash[reply.map do |k, v|
v = v.split(",").map { |e| e.split("=") }
[k[/^cmdstat_(.*)$/, 1], Hash[v]]
end]
end
end
reply
end
end
end
# Get the UNIX time stamp of the last successful save to disk.
#
# @return [Fixnum]
def lastsave
synchronize do |client|
client.call([:lastsave])
end
end
# Listen for all requests received by the server in real time.
#
# There is no way to interrupt this command.
#
# @yield a block to be called for every line of output
# @yieldparam [String] line timestamp and command that was executed
def monitor(&block)
synchronize do |client|
client.call_loop([:monitor], &block)
end
end
# Synchronously save the dataset to disk.
#
# @return [String]
def save
synchronize do |client|
client.call([:save])
end
end
# Synchronously save the dataset to disk and then shut down the server.
def shutdown
synchronize do |client|
client.with_reconnect(false) do
begin
client.call([:shutdown])
rescue ConnectionError
# This means Redis has probably exited.
nil
end
end
end
end
# Make the server a slave of another instance, or promote it as master.
def slaveof(host, port)
synchronize do |client|
client.call([:slaveof, host, port])
end
end
# Interact with the slowlog (get, len, reset)
#
# @param [String] subcommand e.g. `get`, `len`, `reset`
# @param [Fixnum] length maximum number of entries to return
# @return [Array<String>, Fixnum, String] depends on subcommand
def slowlog(subcommand, length=nil)
synchronize do |client|
args = [:slowlog, subcommand]
args << length if length
client.call args
end
end
# Internal command used for replication.
def sync
synchronize do |client|
client.call([:sync])
end
end
# Return the server time.
#
# @example
# r.time # => [ 1333093196, 606806 ]
#
# @return [Array<Fixnum>] tuple of seconds since UNIX epoch and
# microseconds in the current second
def time
synchronize do |client|
client.call([:time]) do |reply|
reply.map(&:to_i) if reply
end
end
end
# Remove the expiration from a key.
#
# @param [String] key
# @return [Boolean] whether the timeout was removed or not
def persist(key)
synchronize do |client|
client.call([:persist, key], &Boolify)
end
end
# Set a key's time to live in seconds.
#
# @param [String] key
# @param [Fixnum] seconds time to live
# @return [Boolean] whether the timeout was set or not
def expire(key, seconds)
synchronize do |client|
client.call([:expire, key, seconds], &Boolify)
end
end
# Set the expiration for a key as a UNIX timestamp.
#
# @param [String] key
# @param [Fixnum] unix_time expiry time specified as a UNIX timestamp
# @return [Boolean] whether the timeout was set or not
def expireat(key, unix_time)
synchronize do |client|
client.call([:expireat, key, unix_time], &Boolify)
end
end
# Get the time to live (in seconds) for a key.
#
# @param [String] key
# @return [Fixnum] remaining time to live in seconds.
#
# In Redis 2.6 or older the command returns -1 if the key does not exist or if
# the key exist but has no associated expire.
#
# Starting with Redis 2.8 the return value in case of error changed:
#
# - The command returns -2 if the key does not exist.
# - The command returns -1 if the key exists but has no associated expire.
def ttl(key)
synchronize do |client|
client.call([:ttl, key])
end
end
# Set a key's time to live in milliseconds.
#
# @param [String] key
# @param [Fixnum] milliseconds time to live
# @return [Boolean] whether the timeout was set or not
def pexpire(key, milliseconds)
synchronize do |client|
client.call([:pexpire, key, milliseconds], &Boolify)
end
end
# Set the expiration for a key as number of milliseconds from UNIX Epoch.
#
# @param [String] key
# @param [Fixnum] ms_unix_time expiry time specified as number of milliseconds from UNIX Epoch.
# @return [Boolean] whether the timeout was set or not
def pexpireat(key, ms_unix_time)
synchronize do |client|
client.call([:pexpireat, key, ms_unix_time], &Boolify)
end
end
# Get the time to live (in milliseconds) for a key.
#
# @param [String] key
# @return [Fixnum] remaining time to live in milliseconds
# In Redis 2.6 or older the command returns -1 if the key does not exist or if
# the key exist but has no associated expire.
#
# Starting with Redis 2.8 the return value in case of error changed:
#
# - The command returns -2 if the key does not exist.
# - The command returns -1 if the key exists but has no associated expire.
def pttl(key)
synchronize do |client|
client.call([:pttl, key])
end
end
# Return a serialized version of the value stored at a key.
#
# @param [String] key
# @return [String] serialized_value
def dump(key)
synchronize do |client|
client.call([:dump, key])
end
end
# Create a key using the serialized value, previously obtained using DUMP.
#
# @param [String] key
# @param [String] ttl
# @param [String] serialized_value
# @return [String] `"OK"`
def restore(key, ttl, serialized_value)
synchronize do |client|
client.call([:restore, key, ttl, serialized_value])
end
end
# Transfer a key from the connected instance to another instance.
#
# @param [String] key
# @param [Hash] options
# - `:host => String`: host of instance to migrate to
# - `:port => Integer`: port of instance to migrate to
# - `:db => Integer`: database to migrate to (default: same as source)
# - `:timeout => Integer`: timeout (default: same as connection timeout)
# @return [String] `"OK"`
def migrate(key, options)
host = options[:host] || raise(RuntimeError, ":host not specified")
port = options[:port] || raise(RuntimeError, ":port not specified")
db = (options[:db] || client.db).to_i
timeout = (options[:timeout] || client.timeout).to_i
synchronize do |client|
client.call([:migrate, host, port, key, db, timeout])
end
end
# Delete one or more keys.
#
# @param [String, Array<String>] keys
# @return [Fixnum] number of keys that were deleted
def del(*keys)
synchronize do |client|
client.call([:del] + keys)
end
end
# Determine if a key exists.
#
# @param [String] key
# @return [Boolean]
def exists(key)
synchronize do |client|
client.call([:exists, key], &Boolify)
end
end
# Find all keys matching the given pattern.
#
# @param [String] pattern
# @return [Array<String>]
def keys(pattern = "*")
synchronize do |client|
client.call([:keys, pattern]) do |reply|
if reply.kind_of?(String)
reply.split(" ")
else
reply
end
end
end
end
# Move a key to another database.
#
# @example Move a key to another database
# redis.set "foo", "bar"
# # => "OK"
# redis.move "foo", 2
# # => true
# redis.exists "foo"
# # => false
# redis.select 2
# # => "OK"
# redis.exists "foo"
# # => true
# redis.get "foo"
# # => "bar"
#
# @param [String] key
# @param [Fixnum] db
# @return [Boolean] whether the key was moved or not
def move(key, db)
synchronize do |client|
client.call([:move, key, db], &Boolify)
end
end
def object(*args)
synchronize do |client|
client.call([:object] + args)
end
end
# Return a random key from the keyspace.
#
# @return [String]
def randomkey
synchronize do |client|
client.call([:randomkey])
end
end
# Rename a key. If the new key already exists it is overwritten.
#
# @param [String] old_name
# @param [String] new_name
# @return [String] `OK`
def rename(old_name, new_name)
synchronize do |client|
client.call([:rename, old_name, new_name])
end
end
# Rename a key, only if the new key does not exist.
#
# @param [String] old_name
# @param [String] new_name
# @return [Boolean] whether the key was renamed or not
def renamenx(old_name, new_name)
synchronize do |client|
client.call([:renamenx, old_name, new_name], &Boolify)
end
end
# Sort the elements in a list, set or sorted set.
#
# @example Retrieve the first 2 elements from an alphabetically sorted "list"
# redis.sort("list", :order => "alpha", :limit => [0, 2])
# # => ["a", "b"]
# @example Store an alphabetically descending list in "target"
# redis.sort("list", :order => "desc alpha", :store => "target")
# # => 26
#
# @param [String] key
# @param [Hash] options
# - `:by => String`: use external key to sort elements by
# - `:limit => [offset, count]`: skip `offset` elements, return a maximum
# of `count` elements
# - `:get => [String, Array<String>]`: single key or array of keys to
# retrieve per element in the result
# - `:order => String`: combination of `ASC`, `DESC` and optionally `ALPHA`
# - `:store => String`: key to store the result at
#
# @return [Array<String>, Array<Array<String>>, Fixnum]
# - when `:get` is not specified, or holds a single element, an array of elements
# - when `:get` is specified, and holds more than one element, an array of
# elements where every element is an array with the result for every
# element specified in `:get`
# - when `:store` is specified, the number of elements in the stored result
def sort(key, options = {})
args = []
by = options[:by]
args.concat(["BY", by]) if by
limit = options[:limit]
args.concat(["LIMIT"] + limit) if limit
get = Array(options[:get])
args.concat(["GET"].product(get).flatten) unless get.empty?
order = options[:order]
args.concat(order.split(" ")) if order
store = options[:store]
args.concat(["STORE", store]) if store
synchronize do |client|
client.call([:sort, key] + args) do |reply|
if get.size > 1 && !store
if reply
reply.each_slice(get.size).to_a
end
else
reply
end
end
end
end
# Determine the type stored at key.
#
# @param [String] key
# @return [String] `string`, `list`, `set`, `zset`, `hash` or `none`
def type(key)
synchronize do |client|
client.call([:type, key])
end
end
# Decrement the integer value of a key by one.
#
# @example
# redis.decr("value")
# # => 4
#
# @param [String] key
# @return [Fixnum] value after decrementing it
def decr(key)
synchronize do |client|
client.call([:decr, key])
end
end
# Decrement the integer value of a key by the given number.
#
# @example
# redis.decrby("value", 5)
# # => 0
#
# @param [String] key
# @param [Fixnum] decrement
# @return [Fixnum] value after decrementing it
def decrby(key, decrement)
synchronize do |client|
client.call([:decrby, key, decrement])
end
end
# Increment the integer value of a key by one.
#
# @example
# redis.incr("value")
# # => 6
#
# @param [String] key
# @return [Fixnum] value after incrementing it
def incr(key)
synchronize do |client|
client.call([:incr, key])
end
end
# Increment the integer value of a key by the given integer number.
#
# @example
# redis.incrby("value", 5)
# # => 10
#
# @param [String] key
# @param [Fixnum] increment
# @return [Fixnum] value after incrementing it
def incrby(key, increment)
synchronize do |client|
client.call([:incrby, key, increment])
end
end
# Increment the numeric value of a key by the given float number.
#
# @example
# redis.incrbyfloat("value", 1.23)
# # => 1.23
#
# @param [String] key
# @param [Float] increment
# @return [Float] value after incrementing it
def incrbyfloat(key, increment)
synchronize do |client|
client.call([:incrbyfloat, key, increment], &Floatify)
end
end
# Set the string value of a key.
#
# @param [String] key
# @param [String] value
# @param [Hash] options
# - `:ex => Fixnum`: Set the specified expire time, in seconds.
# - `:px => Fixnum`: Set the specified expire time, in milliseconds.
# - `:nx => true`: Only set the key if it does not already exist.
# - `:xx => true`: Only set the key if it already exist.
# @return [String, Boolean] `"OK"` or true, false if `:nx => true` or `:xx => true`
def set(key, value, options = {})
args = []
ex = options[:ex]
args.concat(["EX", ex]) if ex
px = options[:px]
args.concat(["PX", px]) if px
nx = options[:nx]
args.concat(["NX"]) if nx
xx = options[:xx]
args.concat(["XX"]) if xx
synchronize do |client|
if nx || xx
client.call([:set, key, value.to_s] + args, &BoolifySet)
else
client.call([:set, key, value.to_s] + args)
end
end
end
alias :[]= :set
# Set the time to live in seconds of a key.
#
# @param [String] key
# @param [Fixnum] ttl
# @param [String] value
# @return [String] `"OK"`
def setex(key, ttl, value)
synchronize do |client|
client.call([:setex, key, ttl, value.to_s])
end
end
# Set the time to live in milliseconds of a key.
#
# @param [String] key
# @param [Fixnum] ttl
# @param [String] value
# @return [String] `"OK"`
def psetex(key, ttl, value)
synchronize do |client|
client.call([:psetex, key, ttl, value.to_s])
end
end
# Set the value of a key, only if the key does not exist.
#
# @param [String] key
# @param [String] value
# @return [Boolean] whether the key was set or not
def setnx(key, value)
synchronize do |client|
client.call([:setnx, key, value.to_s], &Boolify)
end
end
# Set one or more values.
#
# @example
# redis.mset("key1", "v1", "key2", "v2")
# # => "OK"
#
# @param [Array<String>] args array of keys and values
# @return [String] `"OK"`
#
# @see #mapped_mset
def mset(*args)
synchronize do |client|
client.call([:mset] + args)
end
end
# Set one or more values.
#
# @example
# redis.mapped_mset({ "f1" => "v1", "f2" => "v2" })
# # => "OK"
#
# @param [Hash] hash keys mapping to values
# @return [String] `"OK"`
#
# @see #mset
def mapped_mset(hash)
mset(hash.to_a.flatten)
end
# Set one or more values, only if none of the keys exist.
#
# @example
# redis.msetnx("key1", "v1", "key2", "v2")
# # => true
#
# @param [Array<String>] args array of keys and values
# @return [Boolean] whether or not all values were set
#
# @see #mapped_msetnx
def msetnx(*args)
synchronize do |client|
client.call([:msetnx] + args, &Boolify)
end
end
# Set one or more values, only if none of the keys exist.
#
# @example
# redis.mapped_msetnx({ "key1" => "v1", "key2" => "v2" })
# # => true
#
# @param [Hash] hash keys mapping to values
# @return [Boolean] whether or not all values were set
#
# @see #msetnx
def mapped_msetnx(hash)
msetnx(hash.to_a.flatten)
end
# Get the value of a key.
#
# @param [String] key
# @return [String]
def get(key)
synchronize do |client|
client.call([:get, key])
end
end
alias :[] :get
# Get the values of all the given keys.
#
# @example
# redis.mget("key1", "key1")
# # => ["v1", "v2"]
#
# @param [Array<String>] keys
# @return [Array<String>] an array of values for the specified keys
#
# @see #mapped_mget
def mget(*keys, &blk)
synchronize do |client|
client.call([:mget] + keys, &blk)
end
end
# Get the values of all the given keys.
#
# @example
# redis.mapped_mget("key1", "key2")
# # => { "key1" => "v1", "key2" => "v2" }
#
# @param [Array<String>] keys array of keys
# @return [Hash] a hash mapping the specified keys to their values
#
# @see #mget
def mapped_mget(*keys)
mget(*keys) do |reply|
if reply.kind_of?(Array)
Hash[keys.zip(reply)]
else
reply
end
end
end
# Overwrite part of a string at key starting at the specified offset.
#
# @param [String] key
# @param [Fixnum] offset byte offset
# @param [String] value
# @return [Fixnum] length of the string after it was modified
def setrange(key, offset, value)
synchronize do |client|
client.call([:setrange, key, offset, value.to_s])
end
end
# Get a substring of the string stored at a key.
#
# @param [String] key
# @param [Fixnum] start zero-based start offset
# @param [Fixnum] stop zero-based end offset. Use -1 for representing
# the end of the string
# @return [Fixnum] `0` or `1`
def getrange(key, start, stop)
synchronize do |client|
client.call([:getrange, key, start, stop])
end
end
# Sets or clears the bit at offset in the string value stored at key.
#
# @param [String] key
# @param [Fixnum] offset bit offset
# @param [Fixnum] value bit value `0` or `1`
# @return [Fixnum] the original bit value stored at `offset`
def setbit(key, offset, value)
synchronize do |client|
client.call([:setbit, key, offset, value])
end
end
# Returns the bit value at offset in the string value stored at key.
#
# @param [String] key
# @param [Fixnum] offset bit offset
# @return [Fixnum] `0` or `1`
def getbit(key, offset)
synchronize do |client|
client.call([:getbit, key, offset])
end
end
# Append a value to a key.
#
# @param [String] key
# @param [String] value value to append
# @return [Fixnum] length of the string after appending
def append(key, value)
synchronize do |client|
client.call([:append, key, value])
end
end
# Count the number of set bits in a range of the string value stored at key.
#
# @param [String] key
# @param [Fixnum] start start index
# @param [Fixnum] stop stop index
# @return [Fixnum] the number of bits set to 1
def bitcount(key, start = 0, stop = -1)
synchronize do |client|
client.call([:bitcount, key, start, stop])
end
end
# Perform a bitwise operation between strings and store the resulting string in a key.
#
# @param [String] operation e.g. `and`, `or`, `xor`, `not`
# @param [String] destkey destination key
# @param [String, Array<String>] keys one or more source keys to perform `operation`
# @return [Fixnum] the length of the string stored in `destkey`
def bitop(operation, destkey, *keys)
synchronize do |client|
client.call([:bitop, operation, destkey] + keys)
end
end
# Return the position of the first bit set to 1 or 0 in a string.
#
# @param [String] key
# @param [Fixnum] bit whether to look for the first 1 or 0 bit
# @param [Fixnum] start start index
# @param [Fixnum] stop stop index
# @return [Fixnum] the position of the first 1/0 bit.
# -1 if looking for 1 and it is not found or start and stop are given.
def bitpos(key, bit, start=nil, stop=nil)
if stop and not start
raise(ArgumentError, 'stop parameter specified without start parameter')
end
synchronize do |client|
command = [:bitpos, key, bit]
command << start if start
command << stop if stop
client.call(command)
end
end
# Set the string value of a key and return its old value.
#
# @param [String] key
# @param [String] value value to replace the current value with
# @return [String] the old value stored in the key, or `nil` if the key
# did not exist
def getset(key, value)
synchronize do |client|
client.call([:getset, key, value.to_s])
end
end
# Get the length of the value stored in a key.
#
# @param [String] key
# @return [Fixnum] the length of the value stored in the key, or 0
# if the key does not exist
def strlen(key)
synchronize do |client|
client.call([:strlen, key])
end
end
# Get the length of a list.
#
# @param [String] key
# @return [Fixnum]
def llen(key)
synchronize do |client|
client.call([:llen, key])
end
end
# Prepend one or more values to a list, creating the list if it doesn't exist
#
# @param [String] key
# @param [String, Array] value string value, or array of string values to push
# @return [Fixnum] the length of the list after the push operation
def lpush(key, value)
synchronize do |client|
client.call([:lpush, key, value])
end
end
# Prepend a value to a list, only if the list exists.
#
# @param [String] key
# @param [String] value
# @return [Fixnum] the length of the list after the push operation
def lpushx(key, value)
synchronize do |client|
client.call([:lpushx, key, value])
end
end
# Append one or more values to a list, creating the list if it doesn't exist
#
# @param [String] key
# @param [String] value
# @return [Fixnum] the length of the list after the push operation
def rpush(key, value)
synchronize do |client|
client.call([:rpush, key, value])
end
end
# Append a value to a list, only if the list exists.
#
# @param [String] key
# @param [String] value
# @return [Fixnum] the length of the list after the push operation
def rpushx(key, value)
synchronize do |client|
client.call([:rpushx, key, value])
end
end
# Remove and get the first element in a list.
#
# @param [String] key
# @return [String]
def lpop(key)
synchronize do |client|
client.call([:lpop, key])
end
end
# Remove and get the last element in a list.
#
# @param [String] key
# @return [String]
def rpop(key)
synchronize do |client|
client.call([:rpop, key])
end
end
# Remove the last element in a list, append it to another list and return it.
#
# @param [String] source source key
# @param [String] destination destination key
# @return [nil, String] the element, or nil when the source key does not exist
def rpoplpush(source, destination)
synchronize do |client|
client.call([:rpoplpush, source, destination])
end
end
def _bpop(cmd, args)
options = {}
case args.last
when Hash
options = args.pop
when Integer
# Issue deprecation notice in obnoxious mode...
options[:timeout] = args.pop
end
if args.size > 1
# Issue deprecation notice in obnoxious mode...
end
keys = args.flatten
timeout = options[:timeout] || 0
synchronize do |client|
command = [cmd, keys, timeout]
timeout += client.timeout if timeout > 0
client.call_with_timeout(command, timeout)
end
end
# Remove and get the first element in a list, or block until one is available.
#
# @example With timeout
# list, element = redis.blpop("list", :timeout => 5)
# # => nil on timeout
# # => ["list", "element"] on success
# @example Without timeout
# list, element = redis.blpop("list")
# # => ["list", "element"]
# @example Blocking pop on multiple lists
# list, element = redis.blpop(["list", "another_list"])
# # => ["list", "element"]
#
# @param [String, Array<String>] keys one or more keys to perform the
# blocking pop on
# @param [Hash] options
# - `:timeout => Fixnum`: timeout in seconds, defaults to no timeout
#
# @return [nil, [String, String]]
# - `nil` when the operation timed out
# - tuple of the list that was popped from and element was popped otherwise
def blpop(*args)
_bpop(:blpop, args)
end
# Remove and get the last element in a list, or block until one is available.
#
# @param [String, Array<String>] keys one or more keys to perform the
# blocking pop on
# @param [Hash] options
# - `:timeout => Fixnum`: timeout in seconds, defaults to no timeout
#
# @return [nil, [String, String]]
# - `nil` when the operation timed out
# - tuple of the list that was popped from and element was popped otherwise
#
# @see #blpop
def brpop(*args)
_bpop(:brpop, args)
end
# Pop a value from a list, push it to another list and return it; or block
# until one is available.
#
# @param [String] source source key
# @param [String] destination destination key
# @param [Hash] options
# - `:timeout => Fixnum`: timeout in seconds, defaults to no timeout
#
# @return [nil, String]
# - `nil` when the operation timed out
# - the element was popped and pushed otherwise
def brpoplpush(source, destination, options = {})
case options
when Integer
# Issue deprecation notice in obnoxious mode...
options = { :timeout => options }
end
timeout = options[:timeout] || 0
synchronize do |client|
command = [:brpoplpush, source, destination, timeout]
timeout += client.timeout if timeout > 0
client.call_with_timeout(command, timeout)
end
end
# Get an element from a list by its index.
#
# @param [String] key
# @param [Fixnum] index
# @return [String]
def lindex(key, index)
synchronize do |client|
client.call([:lindex, key, index])
end
end
# Insert an element before or after another element in a list.
#
# @param [String] key
# @param [String, Symbol] where `BEFORE` or `AFTER`
# @param [String] pivot reference element
# @param [String] value
# @return [Fixnum] length of the list after the insert operation, or `-1`
# when the element `pivot` was not found
def linsert(key, where, pivot, value)
synchronize do |client|
client.call([:linsert, key, where, pivot, value])
end
end
# Get a range of elements from a list.
#
# @param [String] key
# @param [Fixnum] start start index
# @param [Fixnum] stop stop index
# @return [Array<String>]
def lrange(key, start, stop)
synchronize do |client|
client.call([:lrange, key, start, stop])
end
end
# Remove elements from a list.
#
# @param [String] key
# @param [Fixnum] count number of elements to remove. Use a positive
# value to remove the first `count` occurrences of `value`. A negative
# value to remove the last `count` occurrences of `value`. Or zero, to
# remove all occurrences of `value` from the list.
# @param [String] value
# @return [Fixnum] the number of removed elements
def lrem(key, count, value)
synchronize do |client|
client.call([:lrem, key, count, value])
end
end
# Set the value of an element in a list by its index.
#
# @param [String] key
# @param [Fixnum] index
# @param [String] value
# @return [String] `OK`
def lset(key, index, value)
synchronize do |client|
client.call([:lset, key, index, value])
end
end
# Trim a list to the specified range.
#
# @param [String] key
# @param [Fixnum] start start index
# @param [Fixnum] stop stop index
# @return [String] `OK`
def ltrim(key, start, stop)
synchronize do |client|
client.call([:ltrim, key, start, stop])
end
end
# Get the number of members in a set.
#
# @param [String] key
# @return [Fixnum]
def scard(key)
synchronize do |client|
client.call([:scard, key])
end
end
# Add one or more members to a set.
#
# @param [String] key
# @param [String, Array<String>] member one member, or array of members
# @return [Boolean, Fixnum] `Boolean` when a single member is specified,
# holding whether or not adding the member succeeded, or `Fixnum` when an
# array of members is specified, holding the number of members that were
# successfully added
def sadd(key, member)
synchronize do |client|
client.call([:sadd, key, member]) do |reply|
if member.is_a? Array
# Variadic: return integer
reply
else
# Single argument: return boolean
Boolify.call(reply)
end
end
end
end
# Remove one or more members from a set.
#
# @param [String] key
# @param [String, Array<String>] member one member, or array of members
# @return [Boolean, Fixnum] `Boolean` when a single member is specified,
# holding whether or not removing the member succeeded, or `Fixnum` when an
# array of members is specified, holding the number of members that were
# successfully removed
def srem(key, member)
synchronize do |client|
client.call([:srem, key, member]) do |reply|
if member.is_a? Array
# Variadic: return integer
reply
else
# Single argument: return boolean
Boolify.call(reply)
end
end
end
end
# Remove and return one or more random member from a set.
#
# @param [String] key
# @return [String]
# @param [Fixnum] count
def spop(key, count = nil)
synchronize do |client|
if count.nil?
client.call([:spop, key])
else
client.call([:spop, key, count])
end
end
end
# Get one or more random members from a set.
#
# @param [String] key
# @param [Fixnum] count
# @return [String]
def srandmember(key, count = nil)
synchronize do |client|
if count.nil?
client.call([:srandmember, key])
else
client.call([:srandmember, key, count])
end
end
end
# Move a member from one set to another.
#
# @param [String] source source key
# @param [String] destination destination key
# @param [String] member member to move from `source` to `destination`
# @return [Boolean]
def smove(source, destination, member)
synchronize do |client|
client.call([:smove, source, destination, member], &Boolify)
end
end
# Determine if a given value is a member of a set.
#
# @param [String] key
# @param [String] member
# @return [Boolean]
def sismember(key, member)
synchronize do |client|
client.call([:sismember, key, member], &Boolify)
end
end
# Get all the members in a set.
#
# @param [String] key
# @return [Array<String>]
def smembers(key)
synchronize do |client|
client.call([:smembers, key])
end
end
# Subtract multiple sets.
#
# @param [String, Array<String>] keys keys pointing to sets to subtract
# @return [Array<String>] members in the difference
def sdiff(*keys)
synchronize do |client|
client.call([:sdiff] + keys)
end
end
# Subtract multiple sets and store the resulting set in a key.
#
# @param [String] destination destination key
# @param [String, Array<String>] keys keys pointing to sets to subtract
# @return [Fixnum] number of elements in the resulting set
def sdiffstore(destination, *keys)
synchronize do |client|
client.call([:sdiffstore, destination] + keys)
end
end
# Intersect multiple sets.
#
# @param [String, Array<String>] keys keys pointing to sets to intersect
# @return [Array<String>] members in the intersection
def sinter(*keys)
synchronize do |client|
client.call([:sinter] + keys)
end
end
# Intersect multiple sets and store the resulting set in a key.
#
# @param [String] destination destination key
# @param [String, Array<String>] keys keys pointing to sets to intersect
# @return [Fixnum] number of elements in the resulting set
def sinterstore(destination, *keys)
synchronize do |client|
client.call([:sinterstore, destination] + keys)
end
end
# Add multiple sets.
#
# @param [String, Array<String>] keys keys pointing to sets to unify
# @return [Array<String>] members in the union
def sunion(*keys)
synchronize do |client|
client.call([:sunion] + keys)
end
end
# Add multiple sets and store the resulting set in a key.
#
# @param [String] destination destination key
# @param [String, Array<String>] keys keys pointing to sets to unify
# @return [Fixnum] number of elements in the resulting set
def sunionstore(destination, *keys)
synchronize do |client|
client.call([:sunionstore, destination] + keys)
end
end
# Get the number of members in a sorted set.
#
# @example
# redis.zcard("zset")
# # => 4
#
# @param [String] key
# @return [Fixnum]
def zcard(key)
synchronize do |client|
client.call([:zcard, key])
end
end
# Add one or more members to a sorted set, or update the score for members
# that already exist.
#
# @example Add a single `[score, member]` pair to a sorted set
# redis.zadd("zset", 32.0, "member")
# @example Add an array of `[score, member]` pairs to a sorted set
# redis.zadd("zset", [[32.0, "a"], [64.0, "b"]])
#
# @param [String] key
# @param [[Float, String], Array<[Float, String]>] args
# - a single `[score, member]` pair
# - an array of `[score, member]` pairs
# @param [Hash] options
# - `:xx => true`: Only update elements that already exist (never
# add elements)
# - `:nx => true`: Don't update already existing elements (always
# add new elements)
# - `:ch => true`: Modify the return value from the number of new
# elements added, to the total number of elements changed (CH is an
# abbreviation of changed); changed elements are new elements added
# and elements already existing for which the score was updated
# - `:incr => true`: When this option is specified ZADD acts like
# ZINCRBY; only one score-element pair can be specified in this mode
#
# @return [Boolean, Fixnum, Float]
# - `Boolean` when a single pair is specified, holding whether or not it was
# **added** to the sorted set.
# - `Fixnum` when an array of pairs is specified, holding the number of
# pairs that were **added** to the sorted set.
# - `Float` when option :incr is specified, holding the score of the member
# after incrementing it.
def zadd(key, *args) #, options
zadd_options = []
if args.last.is_a?(Hash)
options = args.pop
nx = options[:nx]
zadd_options << "NX" if nx
xx = options[:xx]
zadd_options << "XX" if xx
ch = options[:ch]
zadd_options << "CH" if ch
incr = options[:incr]
zadd_options << "INCR" if incr
end
synchronize do |client|
if args.size == 1 && args[0].is_a?(Array)
# Variadic: return float if INCR, integer if !INCR
client.call([:zadd, key] + zadd_options + args[0], &(incr ? Floatify : nil))
elsif args.size == 2
# Single pair: return float if INCR, boolean if !INCR
client.call([:zadd, key] + zadd_options + args, &(incr ? Floatify : Boolify))
else
raise ArgumentError, "wrong number of arguments"
end
end
end
# Increment the score of a member in a sorted set.
#
# @example
# redis.zincrby("zset", 32.0, "a")
# # => 64.0
#
# @param [String] key
# @param [Float] increment
# @param [String] member
# @return [Float] score of the member after incrementing it
def zincrby(key, increment, member)
synchronize do |client|
client.call([:zincrby, key, increment, member], &Floatify)
end
end
# Remove one or more members from a sorted set.
#
# @example Remove a single member from a sorted set
# redis.zrem("zset", "a")
# @example Remove an array of members from a sorted set
# redis.zrem("zset", ["a", "b"])
#
# @param [String] key
# @param [String, Array<String>] member
# - a single member
# - an array of members
#
# @return [Boolean, Fixnum]
# - `Boolean` when a single member is specified, holding whether or not it
# was removed from the sorted set
# - `Fixnum` when an array of pairs is specified, holding the number of
# members that were removed to the sorted set
def zrem(key, member)
synchronize do |client|
client.call([:zrem, key, member]) do |reply|
if member.is_a? Array
# Variadic: return integer
reply
else
# Single argument: return boolean
Boolify.call(reply)
end
end
end
end
# Get the score associated with the given member in a sorted set.
#
# @example Get the score for member "a"
# redis.zscore("zset", "a")
# # => 32.0
#
# @param [String] key
# @param [String] member
# @return [Float] score of the member
def zscore(key, member)
synchronize do |client|
client.call([:zscore, key, member], &Floatify)
end
end
# Return a range of members in a sorted set, by index.
#
# @example Retrieve all members from a sorted set
# redis.zrange("zset", 0, -1)
# # => ["a", "b"]
# @example Retrieve all members and their scores from a sorted set
# redis.zrange("zset", 0, -1, :with_scores => true)
# # => [["a", 32.0], ["b", 64.0]]
#
# @param [String] key
# @param [Fixnum] start start index
# @param [Fixnum] stop stop index
# @param [Hash] options
# - `:with_scores => true`: include scores in output
#
# @return [Array<String>, Array<[String, Float]>]
# - when `:with_scores` is not specified, an array of members
# - when `:with_scores` is specified, an array with `[member, score]` pairs
def zrange(key, start, stop, options = {})
args = []
with_scores = options[:with_scores] || options[:withscores]
if with_scores
args << "WITHSCORES"
block = FloatifyPairs
end
synchronize do |client|
client.call([:zrange, key, start, stop] + args, &block)
end
end
# Return a range of members in a sorted set, by index, with scores ordered
# from high to low.
#
# @example Retrieve all members from a sorted set
# redis.zrevrange("zset", 0, -1)
# # => ["b", "a"]
# @example Retrieve all members and their scores from a sorted set
# redis.zrevrange("zset", 0, -1, :with_scores => true)
# # => [["b", 64.0], ["a", 32.0]]
#
# @see #zrange
def zrevrange(key, start, stop, options = {})
args = []
with_scores = options[:with_scores] || options[:withscores]
if with_scores
args << "WITHSCORES"
block = FloatifyPairs
end
synchronize do |client|
client.call([:zrevrange, key, start, stop] + args, &block)
end
end
# Determine the index of a member in a sorted set.
#
# @param [String] key
# @param [String] member
# @return [Fixnum]
def zrank(key, member)
synchronize do |client|
client.call([:zrank, key, member])
end
end
# Determine the index of a member in a sorted set, with scores ordered from
# high to low.
#
# @param [String] key
# @param [String] member
# @return [Fixnum]
def zrevrank(key, member)
synchronize do |client|
client.call([:zrevrank, key, member])
end
end
# Remove all members in a sorted set within the given indexes.
#
# @example Remove first 5 members
# redis.zremrangebyrank("zset", 0, 4)
# # => 5
# @example Remove last 5 members
# redis.zremrangebyrank("zset", -5, -1)
# # => 5
#
# @param [String] key
# @param [Fixnum] start start index
# @param [Fixnum] stop stop index
# @return [Fixnum] number of members that were removed
def zremrangebyrank(key, start, stop)
synchronize do |client|
client.call([:zremrangebyrank, key, start, stop])
end
end
# Return a range of members with the same score in a sorted set, by lexicographical ordering
#
# @example Retrieve members matching a
# redis.zrangebylex("zset", "[a", "[a\xff")
# # => ["aaren", "aarika", "abagael", "abby"]
# @example Retrieve the first 2 members matching a
# redis.zrangebylex("zset", "[a", "[a\xff", :limit => [0, 2])
# # => ["aaren", "aarika"]
#
# @param [String] key
# @param [String] min
# - inclusive minimum is specified by prefixing `(`
# - exclusive minimum is specified by prefixing `[`
# @param [String] max
# - inclusive maximum is specified by prefixing `(`
# - exclusive maximum is specified by prefixing `[`
# @param [Hash] options
# - `:limit => [offset, count]`: skip `offset` members, return a maximum of
# `count` members
#
# @return [Array<String>, Array<[String, Float]>]
def zrangebylex(key, min, max, options = {})
args = []
limit = options[:limit]
args.concat(["LIMIT"] + limit) if limit
synchronize do |client|
client.call([:zrangebylex, key, min, max] + args)
end
end
# Return a range of members with the same score in a sorted set, by reversed lexicographical ordering.
# Apart from the reversed ordering, #zrevrangebylex is similar to #zrangebylex.
#
# @example Retrieve members matching a
# redis.zrevrangebylex("zset", "[a", "[a\xff")
# # => ["abbygail", "abby", "abagael", "aaren"]
# @example Retrieve the last 2 members matching a
# redis.zrevrangebylex("zset", "[a", "[a\xff", :limit => [0, 2])
# # => ["abbygail", "abby"]
#
# @see #zrangebylex
def zrevrangebylex(key, max, min, options = {})
args = []
limit = options[:limit]
args.concat(["LIMIT"] + limit) if limit
synchronize do |client|
client.call([:zrevrangebylex, key, max, min] + args)
end
end
# Return a range of members in a sorted set, by score.
#
# @example Retrieve members with score `>= 5` and `< 100`
# redis.zrangebyscore("zset", "5", "(100")
# # => ["a", "b"]
# @example Retrieve the first 2 members with score `>= 0`
# redis.zrangebyscore("zset", "0", "+inf", :limit => [0, 2])
# # => ["a", "b"]
# @example Retrieve members and their scores with scores `> 5`
# redis.zrangebyscore("zset", "(5", "+inf", :with_scores => true)
# # => [["a", 32.0], ["b", 64.0]]
#
# @param [String] key
# @param [String] min
# - inclusive minimum score is specified verbatim
# - exclusive minimum score is specified by prefixing `(`
# @param [String] max
# - inclusive maximum score is specified verbatim
# - exclusive maximum score is specified by prefixing `(`
# @param [Hash] options
# - `:with_scores => true`: include scores in output
# - `:limit => [offset, count]`: skip `offset` members, return a maximum of
# `count` members
#
# @return [Array<String>, Array<[String, Float]>]
# - when `:with_scores` is not specified, an array of members
# - when `:with_scores` is specified, an array with `[member, score]` pairs
def zrangebyscore(key, min, max, options = {})
args = []
with_scores = options[:with_scores] || options[:withscores]
if with_scores
args << "WITHSCORES"
block = FloatifyPairs
end
limit = options[:limit]
args.concat(["LIMIT"] + limit) if limit
synchronize do |client|
client.call([:zrangebyscore, key, min, max] + args, &block)
end
end
# Return a range of members in a sorted set, by score, with scores ordered
# from high to low.
#
# @example Retrieve members with score `< 100` and `>= 5`
# redis.zrevrangebyscore("zset", "(100", "5")
# # => ["b", "a"]
# @example Retrieve the first 2 members with score `<= 0`
# redis.zrevrangebyscore("zset", "0", "-inf", :limit => [0, 2])
# # => ["b", "a"]
# @example Retrieve members and their scores with scores `> 5`
# redis.zrevrangebyscore("zset", "+inf", "(5", :with_scores => true)
# # => [["b", 64.0], ["a", 32.0]]
#
# @see #zrangebyscore
def zrevrangebyscore(key, max, min, options = {})
args = []
with_scores = options[:with_scores] || options[:withscores]
if with_scores
args << ["WITHSCORES"]
block = FloatifyPairs
end
limit = options[:limit]
args.concat(["LIMIT"] + limit) if limit
synchronize do |client|
client.call([:zrevrangebyscore, key, max, min] + args, &block)
end
end
# Remove all members in a sorted set within the given scores.
#
# @example Remove members with score `>= 5` and `< 100`
# redis.zremrangebyscore("zset", "5", "(100")
# # => 2
# @example Remove members with scores `> 5`
# redis.zremrangebyscore("zset", "(5", "+inf")
# # => 2
#
# @param [String] key
# @param [String] min
# - inclusive minimum score is specified verbatim
# - exclusive minimum score is specified by prefixing `(`
# @param [String] max
# - inclusive maximum score is specified verbatim
# - exclusive maximum score is specified by prefixing `(`
# @return [Fixnum] number of members that were removed
def zremrangebyscore(key, min, max)
synchronize do |client|
client.call([:zremrangebyscore, key, min, max])
end
end
# Count the members in a sorted set with scores within the given values.
#
# @example Count members with score `>= 5` and `< 100`
# redis.zcount("zset", "5", "(100")
# # => 2
# @example Count members with scores `> 5`
# redis.zcount("zset", "(5", "+inf")
# # => 2
#
# @param [String] key
# @param [String] min
# - inclusive minimum score is specified verbatim
# - exclusive minimum score is specified by prefixing `(`
# @param [String] max
# - inclusive maximum score is specified verbatim
# - exclusive maximum score is specified by prefixing `(`
# @return [Fixnum] number of members in within the specified range
def zcount(key, min, max)
synchronize do |client|
client.call([:zcount, key, min, max])
end
end
# Intersect multiple sorted sets and store the resulting sorted set in a new
# key.
#
# @example Compute the intersection of `2*zsetA` with `1*zsetB`, summing their scores
# redis.zinterstore("zsetC", ["zsetA", "zsetB"], :weights => [2.0, 1.0], :aggregate => "sum")
# # => 4
#
# @param [String] destination destination key
# @param [Array<String>] keys source keys
# @param [Hash] options
# - `:weights => [Float, Float, ...]`: weights to associate with source
# sorted sets
# - `:aggregate => String`: aggregate function to use (sum, min, max, ...)
# @return [Fixnum] number of elements in the resulting sorted set
def zinterstore(destination, keys, options = {})
args = []
weights = options[:weights]
args.concat(["WEIGHTS"] + weights) if weights
aggregate = options[:aggregate]
args.concat(["AGGREGATE", aggregate]) if aggregate
synchronize do |client|
client.call([:zinterstore, destination, keys.size] + keys + args)
end
end
# Add multiple sorted sets and store the resulting sorted set in a new key.
#
# @example Compute the union of `2*zsetA` with `1*zsetB`, summing their scores
# redis.zunionstore("zsetC", ["zsetA", "zsetB"], :weights => [2.0, 1.0], :aggregate => "sum")
# # => 8
#
# @param [String] destination destination key
# @param [Array<String>] keys source keys
# @param [Hash] options
# - `:weights => [Float, Float, ...]`: weights to associate with source
# sorted sets
# - `:aggregate => String`: aggregate function to use (sum, min, max, ...)
# @return [Fixnum] number of elements in the resulting sorted set
def zunionstore(destination, keys, options = {})
args = []
weights = options[:weights]
args.concat(["WEIGHTS"] + weights) if weights
aggregate = options[:aggregate]
args.concat(["AGGREGATE", aggregate]) if aggregate
synchronize do |client|
client.call([:zunionstore, destination, keys.size] + keys + args)
end
end
# Get the number of fields in a hash.
#
# @param [String] key
# @return [Fixnum] number of fields in the hash
def hlen(key)
synchronize do |client|
client.call([:hlen, key])
end
end
# Set the string value of a hash field.
#
# @param [String] key
# @param [String] field
# @param [String] value
# @return [Boolean] whether or not the field was **added** to the hash
def hset(key, field, value)
synchronize do |client|
client.call([:hset, key, field, value], &Boolify)
end
end
# Set the value of a hash field, only if the field does not exist.
#
# @param [String] key
# @param [String] field
# @param [String] value
# @return [Boolean] whether or not the field was **added** to the hash
def hsetnx(key, field, value)
synchronize do |client|
client.call([:hsetnx, key, field, value], &Boolify)
end
end
# Set one or more hash values.
#
# @example
# redis.hmset("hash", "f1", "v1", "f2", "v2")
# # => "OK"
#
# @param [String] key
# @param [Array<String>] attrs array of fields and values
# @return [String] `"OK"`
#
# @see #mapped_hmset
def hmset(key, *attrs)
synchronize do |client|
client.call([:hmset, key] + attrs)
end
end
# Set one or more hash values.
#
# @example
# redis.mapped_hmset("hash", { "f1" => "v1", "f2" => "v2" })
# # => "OK"
#
# @param [String] key
# @param [Hash] hash a non-empty hash with fields mapping to values
# @return [String] `"OK"`
#
# @see #hmset
def mapped_hmset(key, hash)
hmset(key, hash.to_a.flatten)
end
# Get the value of a hash field.
#
# @param [String] key
# @param [String] field
# @return [String]
def hget(key, field)
synchronize do |client|
client.call([:hget, key, field])
end
end
# Get the values of all the given hash fields.
#
# @example
# redis.hmget("hash", "f1", "f2")
# # => ["v1", "v2"]
#
# @param [String] key
# @param [Array<String>] fields array of fields
# @return [Array<String>] an array of values for the specified fields
#
# @see #mapped_hmget
def hmget(key, *fields, &blk)
synchronize do |client|
client.call([:hmget, key] + fields, &blk)
end
end
# Get the values of all the given hash fields.
#
# @example
# redis.mapped_hmget("hash", "f1", "f2")
# # => { "f1" => "v1", "f2" => "v2" }
#
# @param [String] key
# @param [Array<String>] fields array of fields
# @return [Hash] a hash mapping the specified fields to their values
#
# @see #hmget
def mapped_hmget(key, *fields)
hmget(key, *fields) do |reply|
if reply.kind_of?(Array)
Hash[fields.zip(reply)]
else
reply
end
end
end
# Delete one or more hash fields.
#
# @param [String] key
# @param [String, Array<String>] field
# @return [Fixnum] the number of fields that were removed from the hash
def hdel(key, field)
synchronize do |client|
client.call([:hdel, key, field])
end
end
# Determine if a hash field exists.
#
# @param [String] key
# @param [String] field
# @return [Boolean] whether or not the field exists in the hash
def hexists(key, field)
synchronize do |client|
client.call([:hexists, key, field], &Boolify)
end
end
# Increment the integer value of a hash field by the given integer number.
#
# @param [String] key
# @param [String] field
# @param [Fixnum] increment
# @return [Fixnum] value of the field after incrementing it
def hincrby(key, field, increment)
synchronize do |client|
client.call([:hincrby, key, field, increment])
end
end
# Increment the numeric value of a hash field by the given float number.
#
# @param [String] key
# @param [String] field
# @param [Float] increment
# @return [Float] value of the field after incrementing it
def hincrbyfloat(key, field, increment)
synchronize do |client|
client.call([:hincrbyfloat, key, field, increment], &Floatify)
end
end
# Get all the fields in a hash.
#
# @param [String] key
# @return [Array<String>]
def hkeys(key)
synchronize do |client|
client.call([:hkeys, key])
end
end
# Get all the values in a hash.
#
# @param [String] key
# @return [Array<String>]
def hvals(key)
synchronize do |client|
client.call([:hvals, key])
end
end
# Get all the fields and values in a hash.
#
# @param [String] key
# @return [Hash<String, String>]
def hgetall(key)
synchronize do |client|
client.call([:hgetall, key], &Hashify)
end
end
# Post a message to a channel.
def publish(channel, message)
synchronize do |client|
client.call([:publish, channel, message])
end
end
def subscribed?
synchronize do |client|
client.kind_of? SubscribedClient
end
end
# Listen for messages published to the given channels.
def subscribe(*channels, &block)
synchronize do |client|
_subscription(:subscribe, 0, channels, block)
end
end
# Listen for messages published to the given channels. Throw a timeout error if there is no messages for a timeout period.
def subscribe_with_timeout(timeout, *channels, &block)
synchronize do |client|
_subscription(:subscribe_with_timeout, timeout, channels, block)
end
end
# Stop listening for messages posted to the given channels.
def unsubscribe(*channels)
synchronize do |client|
raise RuntimeError, "Can't unsubscribe if not subscribed." unless subscribed?
client.unsubscribe(*channels)
end
end
# Listen for messages published to channels matching the given patterns.
def psubscribe(*channels, &block)
synchronize do |client|
_subscription(:psubscribe, 0, channels, block)
end
end
# Listen for messages published to channels matching the given patterns. Throw a timeout error if there is no messages for a timeout period.
def psubscribe_with_timeout(timeout, *channels, &block)
synchronize do |client|
_subscription(:psubscribe_with_timeout, timeout, channels, block)
end
end
# Stop listening for messages posted to channels matching the given patterns.
def punsubscribe(*channels)
synchronize do |client|
raise RuntimeError, "Can't unsubscribe if not subscribed." unless subscribed?
client.punsubscribe(*channels)
end
end
# Inspect the state of the Pub/Sub subsystem.
# Possible subcommands: channels, numsub, numpat.
def pubsub(subcommand, *args)
synchronize do |client|
client.call([:pubsub, subcommand] + args)
end
end
# Watch the given keys to determine execution of the MULTI/EXEC block.
#
# Using a block is optional, but is necessary for thread-safety.
#
# An `#unwatch` is automatically issued if an exception is raised within the
# block that is a subclass of StandardError and is not a ConnectionError.
#
# @example With a block
# redis.watch("key") do
# if redis.get("key") == "some value"
# redis.multi do |multi|
# multi.set("key", "other value")
# multi.incr("counter")
# end
# else
# redis.unwatch
# end
# end
# # => ["OK", 6]
#
# @example Without a block
# redis.watch("key")
# # => "OK"
#
# @param [String, Array<String>] keys one or more keys to watch
# @return [Object] if using a block, returns the return value of the block
# @return [String] if not using a block, returns `OK`
#
# @see #unwatch
# @see #multi
def watch(*keys)
synchronize do |client|
res = client.call([:watch] + keys)
if block_given?
begin
yield(self)
rescue ConnectionError
raise
rescue StandardError
unwatch
raise
end
else
res
end
end
end
# Forget about all watched keys.
#
# @return [String] `OK`
#
# @see #watch
# @see #multi
def unwatch
synchronize do |client|
client.call([:unwatch])
end
end
def pipelined
synchronize do |client|
begin
original, @client = @client, Pipeline.new
yield(self)
original.call_pipeline(@client)
ensure
@client = original
end
end
end
# Mark the start of a transaction block.
#
# Passing a block is optional.
#
# @example With a block
# redis.multi do |multi|
# multi.set("key", "value")
# multi.incr("counter")
# end # => ["OK", 6]
#
# @example Without a block
# redis.multi
# # => "OK"
# redis.set("key", "value")
# # => "QUEUED"
# redis.incr("counter")
# # => "QUEUED"
# redis.exec
# # => ["OK", 6]
#
# @yield [multi] the commands that are called inside this block are cached
# and written to the server upon returning from it
# @yieldparam [Redis] multi `self`
#
# @return [String, Array<...>]
# - when a block is not given, `OK`
# - when a block is given, an array with replies
#
# @see #watch
# @see #unwatch
def multi
synchronize do |client|
if !block_given?
client.call([:multi])
else
begin
pipeline = Pipeline::Multi.new
original, @client = @client, pipeline
yield(self)
original.call_pipeline(pipeline)
ensure
@client = original
end
end
end
end
# Execute all commands issued after MULTI.
#
# Only call this method when `#multi` was called **without** a block.
#
# @return [nil, Array<...>]
# - when commands were not executed, `nil`
# - when commands were executed, an array with their replies
#
# @see #multi
# @see #discard
def exec
synchronize do |client|
client.call([:exec])
end
end
# Discard all commands issued after MULTI.
#
# Only call this method when `#multi` was called **without** a block.
#
# @return [String] `"OK"`
#
# @see #multi
# @see #exec
def discard
synchronize do |client|
client.call([:discard])
end
end
# Control remote script registry.
#
# @example Load a script
# sha = redis.script(:load, "return 1")
# # => <sha of this script>
# @example Check if a script exists
# redis.script(:exists, sha)
# # => true
# @example Check if multiple scripts exist
# redis.script(:exists, [sha, other_sha])
# # => [true, false]
# @example Flush the script registry
# redis.script(:flush)
# # => "OK"
# @example Kill a running script
# redis.script(:kill)
# # => "OK"
#
# @param [String] subcommand e.g. `exists`, `flush`, `load`, `kill`
# @param [Array<String>] args depends on subcommand
# @return [String, Boolean, Array<Boolean>, ...] depends on subcommand
#
# @see #eval
# @see #evalsha
def script(subcommand, *args)
subcommand = subcommand.to_s.downcase
if subcommand == "exists"
synchronize do |client|
arg = args.first
client.call([:script, :exists, arg]) do |reply|
reply = reply.map { |r| Boolify.call(r) }
if arg.is_a?(Array)
reply
else
reply.first
end
end
end
else
synchronize do |client|
client.call([:script, subcommand] + args)
end
end
end
def _eval(cmd, args)
script = args.shift
options = args.pop if args.last.is_a?(Hash)
options ||= {}
keys = args.shift || options[:keys] || []
argv = args.shift || options[:argv] || []
synchronize do |client|
client.call([cmd, script, keys.length] + keys + argv)
end
end
# Evaluate Lua script.
#
# @example EVAL without KEYS nor ARGV
# redis.eval("return 1")
# # => 1
# @example EVAL with KEYS and ARGV as array arguments
# redis.eval("return { KEYS, ARGV }", ["k1", "k2"], ["a1", "a2"])
# # => [["k1", "k2"], ["a1", "a2"]]
# @example EVAL with KEYS and ARGV in a hash argument
# redis.eval("return { KEYS, ARGV }", :keys => ["k1", "k2"], :argv => ["a1", "a2"])
# # => [["k1", "k2"], ["a1", "a2"]]
#
# @param [Array<String>] keys optional array with keys to pass to the script
# @param [Array<String>] argv optional array with arguments to pass to the script
# @param [Hash] options
# - `:keys => Array<String>`: optional array with keys to pass to the script
# - `:argv => Array<String>`: optional array with arguments to pass to the script
# @return depends on the script
#
# @see #script
# @see #evalsha
def eval(*args)
_eval(:eval, args)
end
# Evaluate Lua script by its SHA.
#
# @example EVALSHA without KEYS nor ARGV
# redis.evalsha(sha)
# # => <depends on script>
# @example EVALSHA with KEYS and ARGV as array arguments
# redis.evalsha(sha, ["k1", "k2"], ["a1", "a2"])
# # => <depends on script>
# @example EVALSHA with KEYS and ARGV in a hash argument
# redis.evalsha(sha, :keys => ["k1", "k2"], :argv => ["a1", "a2"])
# # => <depends on script>
#
# @param [Array<String>] keys optional array with keys to pass to the script
# @param [Array<String>] argv optional array with arguments to pass to the script
# @param [Hash] options
# - `:keys => Array<String>`: optional array with keys to pass to the script
# - `:argv => Array<String>`: optional array with arguments to pass to the script
# @return depends on the script
#
# @see #script
# @see #eval
def evalsha(*args)
_eval(:evalsha, args)
end
def _scan(command, cursor, args, options = {}, &block)
# SSCAN/ZSCAN/HSCAN already prepend the key to +args+.
args << cursor
if match = options[:match]
args.concat(["MATCH", match])
end
if count = options[:count]
args.concat(["COUNT", count])
end
synchronize do |client|
client.call([command] + args, &block)
end
end
# Scan the keyspace
#
# @example Retrieve the first batch of keys
# redis.scan(0)
# # => ["4", ["key:21", "key:47", "key:42"]]
# @example Retrieve a batch of keys matching a pattern
# redis.scan(4, :match => "key:1?")
# # => ["92", ["key:13", "key:18"]]
#
# @param [String, Integer] cursor the cursor of the iteration
# @param [Hash] options
# - `:match => String`: only return keys matching the pattern
# - `:count => Integer`: return count keys at most per iteration
#
# @return [String, Array<String>] the next cursor and all found keys
def scan(cursor, options={})
_scan(:scan, cursor, [], options)
end
# Scan the keyspace
#
# @example Retrieve all of the keys (with possible duplicates)
# redis.scan_each.to_a
# # => ["key:21", "key:47", "key:42"]
# @example Execute block for each key matching a pattern
# redis.scan_each(:match => "key:1?") {|key| puts key}
# # => key:13
# # => key:18
#
# @param [Hash] options
# - `:match => String`: only return keys matching the pattern
# - `:count => Integer`: return count keys at most per iteration
#
# @return [Enumerator] an enumerator for all found keys
def scan_each(options={}, &block)
return to_enum(:scan_each, options) unless block_given?
cursor = 0
loop do
cursor, keys = scan(cursor, options)
keys.each(&block)
break if cursor == "0"
end
end
# Scan a hash
#
# @example Retrieve the first batch of key/value pairs in a hash
# redis.hscan("hash", 0)
#
# @param [String, Integer] cursor the cursor of the iteration
# @param [Hash] options
# - `:match => String`: only return keys matching the pattern
# - `:count => Integer`: return count keys at most per iteration
#
# @return [String, Array<[String, String]>] the next cursor and all found keys
def hscan(key, cursor, options={})
_scan(:hscan, cursor, [key], options) do |reply|
[reply[0], reply[1].each_slice(2).to_a]
end
end
# Scan a hash
#
# @example Retrieve all of the key/value pairs in a hash
# redis.hscan_each("hash").to_a
# # => [["key70", "70"], ["key80", "80"]]
#
# @param [Hash] options
# - `:match => String`: only return keys matching the pattern
# - `:count => Integer`: return count keys at most per iteration
#
# @return [Enumerator] an enumerator for all found keys
def hscan_each(key, options={}, &block)
return to_enum(:hscan_each, key, options) unless block_given?
cursor = 0
loop do
cursor, values = hscan(key, cursor, options)
values.each(&block)
break if cursor == "0"
end
end
# Scan a sorted set
#
# @example Retrieve the first batch of key/value pairs in a hash
# redis.zscan("zset", 0)
#
# @param [String, Integer] cursor the cursor of the iteration
# @param [Hash] options
# - `:match => String`: only return keys matching the pattern
# - `:count => Integer`: return count keys at most per iteration
#
# @return [String, Array<[String, Float]>] the next cursor and all found
# members and scores
def zscan(key, cursor, options={})
_scan(:zscan, cursor, [key], options) do |reply|
[reply[0], FloatifyPairs.call(reply[1])]
end
end
# Scan a sorted set
#
# @example Retrieve all of the members/scores in a sorted set
# redis.zscan_each("zset").to_a
# # => [["key70", "70"], ["key80", "80"]]
#
# @param [Hash] options
# - `:match => String`: only return keys matching the pattern
# - `:count => Integer`: return count keys at most per iteration
#
# @return [Enumerator] an enumerator for all found scores and members
def zscan_each(key, options={}, &block)
return to_enum(:zscan_each, key, options) unless block_given?
cursor = 0
loop do
cursor, values = zscan(key, cursor, options)
values.each(&block)
break if cursor == "0"
end
end
# Scan a set
#
# @example Retrieve the first batch of keys in a set
# redis.sscan("set", 0)
#
# @param [String, Integer] cursor the cursor of the iteration
# @param [Hash] options
# - `:match => String`: only return keys matching the pattern
# - `:count => Integer`: return count keys at most per iteration
#
# @return [String, Array<String>] the next cursor and all found members
def sscan(key, cursor, options={})
_scan(:sscan, cursor, [key], options)
end
# Scan a set
#
# @example Retrieve all of the keys in a set
# redis.sscan_each("set").to_a
# # => ["key1", "key2", "key3"]
#
# @param [Hash] options
# - `:match => String`: only return keys matching the pattern
# - `:count => Integer`: return count keys at most per iteration
#
# @return [Enumerator] an enumerator for all keys in the set
def sscan_each(key, options={}, &block)
return to_enum(:sscan_each, key, options) unless block_given?
cursor = 0
loop do
cursor, keys = sscan(key, cursor, options)
keys.each(&block)
break if cursor == "0"
end
end
# Add one or more members to a HyperLogLog structure.
#
# @param [String] key
# @param [String, Array<String>] member one member, or array of members
# @return [Boolean] true if at least 1 HyperLogLog internal register was altered. false otherwise.
def pfadd(key, member)
synchronize do |client|
client.call([:pfadd, key, member], &Boolify)
end
end
# Get the approximate cardinality of members added to HyperLogLog structure.
#
# If called with multiple keys, returns the approximate cardinality of the
# union of the HyperLogLogs contained in the keys.
#
# @param [String, Array<String>] keys
# @return [Fixnum]
def pfcount(*keys)
synchronize do |client|
client.call([:pfcount] + keys)
end
end
# Merge multiple HyperLogLog values into an unique value that will approximate the cardinality of the union of
# the observed Sets of the source HyperLogLog structures.
#
# @param [String] dest_key destination key
# @param [String, Array<String>] source_key source key, or array of keys
# @return [Boolean]
def pfmerge(dest_key, *source_key)
synchronize do |client|
client.call([:pfmerge, dest_key, *source_key], &BoolifySet)
end
end
# Interact with the sentinel command (masters, master, slaves, failover)
#
# @param [String] subcommand e.g. `masters`, `master`, `slaves`
# @param [Array<String>] args depends on subcommand
# @return [Array<String>, Hash<String, String>, String] depends on subcommand
def sentinel(subcommand, *args)
subcommand = subcommand.to_s.downcase
synchronize do |client|
client.call([:sentinel, subcommand] + args) do |reply|
case subcommand
when "get-master-addr-by-name"
reply
else
if reply.kind_of?(Array)
if reply[0].kind_of?(Array)
reply.map(&Hashify)
else
Hashify.call(reply)
end
else
reply
end
end
end
end
end
def id
@original_client.id
end
def inspect
"#<Redis client v#{Redis::VERSION} for #{id}>"
end
def dup
self.class.new(@options)
end
def method_missing(command, *args)
synchronize do |client|
client.call([command] + args)
end
end
private
# Commands returning 1 for true and 0 for false may be executed in a pipeline
# where the method call will return nil. Propagate the nil instead of falsely
# returning false.
Boolify =
lambda { |value|
value == 1 if value
}
BoolifySet =
lambda { |value|
if value && "OK" == value
true
else
false
end
}
Hashify =
lambda { |array|
hash = Hash.new
array.each_slice(2) do |field, value|
hash[field] = value
end
hash
}
Floatify =
lambda { |str|
if str
if (inf = str.match(/^(-)?inf/i))
(inf[1] ? -1.0 : 1.0) / 0.0
else
Float(str)
end
end
}
FloatifyPairs =
lambda { |array|
if array
array.each_slice(2).map do |member, score|
[member, Floatify.call(score)]
end
end
}
def _subscription(method, timeout, channels, block)
return @client.call([method] + channels) if subscribed?
begin
original, @client = @client, SubscribedClient.new(@client)
if timeout > 0
@client.send(method, timeout, *channels, &block)
else
@client.send(method, *channels, &block)
end
ensure
@client = original
end
end
end
require "redis/version"
require "redis/connection"
require "redis/client"
require "redis/pipeline"
require "redis/subscribe"
require "redis/errors"
require "socket"
require "cgi"
class Redis
class Client
DEFAULTS = {
:url => lambda { ENV["REDIS_URL"] },
:scheme => "redis",
:host => "127.0.0.1",
:port => 6379,
:path => nil,
:timeout => 5.0,
:password => nil,
:db => 0,
:driver => nil,
:id => nil,
:tcp_keepalive => 0,
:reconnect_attempts => 1,
:inherit_socket => false
}
def options
Marshal.load(Marshal.dump(@options))
end
def scheme
@options[:scheme]
end
def host
@options[:host]
end
def port
@options[:port]
end
def path
@options[:path]
end
def read_timeout
@options[:read_timeout]
end
def connect_timeout
@options[:connect_timeout]
end
def timeout
@options[:read_timeout]
end
def password
@options[:password]
end
def db
@options[:db]
end
def db=(db)
@options[:db] = db.to_i
end
def driver
@options[:driver]
end
def inherit_socket?
@options[:inherit_socket]
end
attr_accessor :logger
attr_reader :connection
attr_reader :command_map
def initialize(options = {})
@options = _parse_options(options)
@reconnect = true
@logger = @options[:logger]
@connection = nil
@command_map = {}
@pending_reads = 0
if options.include?(:sentinels)
@connector = Connector::Sentinel.new(@options)
else
@connector = Connector.new(@options)
end
end
def connect
@pid = Process.pid
# Don't try to reconnect when the connection is fresh
with_reconnect(false) do
establish_connection
call [:auth, password] if password
call [:select, db] if db != 0
call [:client, :setname, @options[:id]] if @options[:id]
@connector.check(self)
end
self
end
def id
@options[:id] || "redis://#{location}/#{db}"
end
def location
path || "#{host}:#{port}"
end
def call(command)
reply = process([command]) { read }
raise reply if reply.is_a?(CommandError)
if block_given?
yield reply
else
reply
end
end
def call_loop(command, timeout = 0)
error = nil
result = with_socket_timeout(timeout) do
process([command]) do
loop do
reply = read
if reply.is_a?(CommandError)
error = reply
break
else
yield reply
end
end
end
end
# Raise error when previous block broke out of the loop.
raise error if error
# Result is set to the value that the provided block used to break.
result
end
def call_pipeline(pipeline)
with_reconnect pipeline.with_reconnect? do
begin
pipeline.finish(call_pipelined(pipeline.commands)).tap do
self.db = pipeline.db if pipeline.db
end
rescue ConnectionError => e
return nil if pipeline.shutdown?
# Assume the pipeline was sent in one piece, but execution of
# SHUTDOWN caused none of the replies for commands that were executed
# prior to it from coming back around.
raise e
end
end
end
def call_pipelined(commands)
return [] if commands.empty?
# The method #ensure_connected (called from #process) reconnects once on
# I/O errors. To make an effort in making sure that commands are not
# executed more than once, only allow reconnection before the first reply
# has been read. When an error occurs after the first reply has been
# read, retrying would re-execute the entire pipeline, thus re-issuing
# already successfully executed commands. To circumvent this, don't retry
# after the first reply has been read successfully.
result = Array.new(commands.size)
reconnect = @reconnect
begin
exception = nil
process(commands) do
result[0] = read
@reconnect = false
(commands.size - 1).times do |i|
reply = read
result[i + 1] = reply
exception = reply if exception.nil? && reply.is_a?(CommandError)
end
end
raise exception if exception
ensure
@reconnect = reconnect
end
result
end
def call_with_timeout(command, timeout, &blk)
with_socket_timeout(timeout) do
call(command, &blk)
end
rescue ConnectionError
retry
end
def call_without_timeout(command, &blk)
call_with_timeout(command, 0, &blk)
end
def process(commands)
logging(commands) do
ensure_connected do
commands.each do |command|
if command_map[command.first]
command = command.dup
command[0] = command_map[command.first]
end
write(command)
end
yield if block_given?
end
end
end
def connected?
!! (connection && connection.connected?)
end
def disconnect
connection.disconnect if connected?
end
def reconnect
disconnect
connect
end
def io
yield
rescue TimeoutError => e1
# Add a message to the exception without destroying the original stack
e2 = TimeoutError.new("Connection timed out")
e2.set_backtrace(e1.backtrace)
raise e2
rescue Errno::ECONNRESET, Errno::EPIPE, Errno::ECONNABORTED, Errno::EBADF, Errno::EINVAL => e
raise ConnectionError, "Connection lost (%s)" % [e.class.name.split("::").last]
end
def read
io do
value = connection.read
@pending_reads -= 1
value
end
end
def write(command)
io do
@pending_reads += 1
connection.write(command)
end
end
def with_socket_timeout(timeout)
connect unless connected?
begin
connection.timeout = timeout
yield
ensure
connection.timeout = self.timeout if connected?
end
end
def without_socket_timeout(&blk)
with_socket_timeout(0, &blk)
end
def with_reconnect(val=true)
begin
original, @reconnect = @reconnect, val
yield
ensure
@reconnect = original
end
end
def without_reconnect(&blk)
with_reconnect(false, &blk)
end
protected
def logging(commands)
return yield unless @logger && @logger.debug?
begin
commands.each do |name, *args|
logged_args = args.map do |a|
case
when a.respond_to?(:inspect) then a.inspect
when a.respond_to?(:to_s) then a.to_s
else
# handle poorly-behaved descendants of BasicObject
klass = a.instance_exec { (class << self; self end).superclass }
"\#<#{klass}:#{a.__id__}>"
end
end
@logger.debug("[Redis] command=#{name.to_s.upcase} args=#{logged_args.join(' ')}")
end
t1 = Time.now
yield
ensure
@logger.debug("[Redis] call_time=%0.2f ms" % ((Time.now - t1) * 1000)) if t1
end
end
def establish_connection
server = @connector.resolve.dup
@options[:host] = server[:host]
@options[:port] = Integer(server[:port]) if server.include?(:port)
@connection = @options[:driver].connect(@options)
@pending_reads = 0
rescue TimeoutError,
Errno::ECONNREFUSED,
Errno::EHOSTDOWN,
Errno::EHOSTUNREACH,
Errno::ENETUNREACH,
Errno::ETIMEDOUT
raise CannotConnectError, "Error connecting to Redis on #{location} (#{$!.class})"
end
def ensure_connected
disconnect if @pending_reads > 0
attempts = 0
begin
attempts += 1
if connected?
unless inherit_socket? || Process.pid == @pid
raise InheritedError,
"Tried to use a connection from a child process without reconnecting. " +
"You need to reconnect to Redis after forking " +
"or set :inherit_socket to true."
end
else
connect
end
yield
rescue BaseConnectionError
disconnect
if attempts <= @options[:reconnect_attempts] && @reconnect
retry
else
raise
end
rescue Exception
disconnect
raise
end
end
def _parse_options(options)
return options if options[:_parsed]
defaults = DEFAULTS.dup
options = options.dup
defaults.keys.each do |key|
# Fill in defaults if needed
if defaults[key].respond_to?(:call)
defaults[key] = defaults[key].call
end
# Symbolize only keys that are needed
options[key] = options[key.to_s] if options.has_key?(key.to_s)
end
url = options[:url] || defaults[:url]
# Override defaults from URL if given
if url
require "uri"
uri = URI(url)
if uri.scheme == "unix"
defaults[:path] = uri.path
elsif uri.scheme == "redis" || uri.scheme == "rediss"
defaults[:scheme] = uri.scheme
defaults[:host] = uri.host if uri.host
defaults[:port] = uri.port if uri.port
defaults[:password] = CGI.unescape(uri.password) if uri.password
defaults[:db] = uri.path[1..-1].to_i if uri.path
defaults[:role] = :master
else
raise ArgumentError, "invalid uri scheme '#{uri.scheme}'"
end
defaults[:ssl] = true if uri.scheme == "rediss"
end
# Use default when option is not specified or nil
defaults.keys.each do |key|
options[key] = defaults[key] if options[key].nil?
end
if options[:path]
# Unix socket
options[:scheme] = "unix"
options.delete(:host)
options.delete(:port)
else
# TCP socket
options[:host] = options[:host].to_s
options[:port] = options[:port].to_i
end
if options.has_key?(:timeout)
options[:connect_timeout] ||= options[:timeout]
options[:read_timeout] ||= options[:timeout]
options[:write_timeout] ||= options[:timeout]
end
options[:connect_timeout] = Float(options[:connect_timeout])
options[:read_timeout] = Float(options[:read_timeout])
options[:write_timeout] = Float(options[:write_timeout])
options[:db] = options[:db].to_i
options[:driver] = _parse_driver(options[:driver]) || Connection.drivers.last
case options[:tcp_keepalive]
when Hash
[:time, :intvl, :probes].each do |key|
unless options[:tcp_keepalive][key].is_a?(Integer)
raise "Expected the #{key.inspect} key in :tcp_keepalive to be an Integer"
end
end
when Integer
if options[:tcp_keepalive] >= 60
options[:tcp_keepalive] = {:time => options[:tcp_keepalive] - 20, :intvl => 10, :probes => 2}
elsif options[:tcp_keepalive] >= 30
options[:tcp_keepalive] = {:time => options[:tcp_keepalive] - 10, :intvl => 5, :probes => 2}
elsif options[:tcp_keepalive] >= 5
options[:tcp_keepalive] = {:time => options[:tcp_keepalive] - 2, :intvl => 2, :probes => 1}
end
end
options[:_parsed] = true
options
end
def _parse_driver(driver)
driver = driver.to_s if driver.is_a?(Symbol)
if driver.kind_of?(String)
begin
require "redis/connection/#{driver}"
driver = Connection.const_get(driver.capitalize)
rescue LoadError, NameError
raise RuntimeError, "Cannot load driver #{driver.inspect}"
end
end
driver
end
class Connector
def initialize(options)
@options = options.dup
end
def resolve
@options
end
def check(client)
end
class Sentinel < Connector
def initialize(options)
super(options)
@options[:password] = DEFAULTS.fetch(:password)
@options[:db] = DEFAULTS.fetch(:db)
@sentinels = @options.delete(:sentinels).dup
@role = @options.fetch(:role, "master").to_s
@master = @options[:host]
end
def check(client)
# Check the instance is really of the role we are looking for.
# We can't assume the command is supported since it was introduced
# recently and this client should work with old stuff.
begin
role = client.call([:role])[0]
rescue Redis::CommandError
# Assume the test is passed if we can't get a reply from ROLE...
role = @role
end
if role != @role
client.disconnect
raise ConnectionError, "Instance role mismatch. Expected #{@role}, got #{role}."
end
end
def resolve
result = case @role
when "master"
resolve_master
when "slave"
resolve_slave
else
raise ArgumentError, "Unknown instance role #{@role}"
end
result || (raise ConnectionError, "Unable to fetch #{@role} via Sentinel.")
end
def sentinel_detect
@sentinels.each do |sentinel|
client = Client.new(@options.merge({
:host => sentinel[:host],
:port => sentinel[:port],
:reconnect_attempts => 0,
}))
begin
if result = yield(client)
# This sentinel responded. Make sure we ask it first next time.
@sentinels.delete(sentinel)
@sentinels.unshift(sentinel)
return result
end
rescue BaseConnectionError
ensure
client.disconnect
end
end
raise CannotConnectError, "No sentinels available."
end
def resolve_master
sentinel_detect do |client|
if reply = client.call(["sentinel", "get-master-addr-by-name", @master])
{:host => reply[0], :port => reply[1]}
end
end
end
def resolve_slave
sentinel_detect do |client|
if reply = client.call(["sentinel", "slaves", @master])
slave = Hash[*reply.sample]
{:host => slave.fetch("ip"), :port => slave.fetch("port")}
end
end
end
end
end
end
end
require "redis/connection/registry"
# If a connection driver was required before this file, the array
# Redis::Connection.drivers will contain one or more classes. The last driver
# in this array will be used as default driver. If this array is empty, we load
# the plain Ruby driver as our default. Another driver can be required at a
# later point in time, causing it to be the last element of the #drivers array
# and therefore be chosen by default.
require "redis/connection/ruby" if Redis::Connection.drivers.empty?
\ No newline at end of file
class Redis
module Connection
module CommandHelper
COMMAND_DELIMITER = "\r\n"
def build_command(args)
command = [nil]
args.each do |i|
if i.is_a? Array
i.each do |j|
j = j.to_s
command << "$#{j.bytesize}"
command << j
end
else
i = i.to_s
command << "$#{i.bytesize}"
command << i
end
end
command[0] = "*#{(command.length - 1) / 2}"
# Trailing delimiter
command << ""
command.join(COMMAND_DELIMITER)
end
protected
if defined?(Encoding::default_external)
def encode(string)
string.force_encoding(Encoding::default_external)
end
else
def encode(string)
string
end
end
end
end
end
require "redis/connection/registry"
require "redis/errors"
require "hiredis/connection"
require "timeout"
class Redis
module Connection
class Hiredis
def self.connect(config)
connection = ::Hiredis::Connection.new
connect_timeout = (config.fetch(:connect_timeout, 0) * 1_000_000).to_i
if config[:scheme] == "unix"
connection.connect_unix(config[:path], connect_timeout)
elsif config[:scheme] == "rediss" || config[:ssl]
raise NotImplementedError, "SSL not supported by hiredis driver"
else
connection.connect(config[:host], config[:port], connect_timeout)
end
instance = new(connection)
instance.timeout = config[:read_timeout]
instance
rescue Errno::ETIMEDOUT
raise TimeoutError
end
def initialize(connection)
@connection = connection
end
def connected?
@connection && @connection.connected?
end
def timeout=(timeout)
# Hiredis works with microsecond timeouts
@connection.timeout = Integer(timeout * 1_000_000)
end
def disconnect
@connection.disconnect
@connection = nil
end
def write(command)
@connection.write(command.flatten(1))
rescue Errno::EAGAIN
raise TimeoutError
end
def read
reply = @connection.read
reply = CommandError.new(reply.message) if reply.is_a?(RuntimeError)
reply
rescue Errno::EAGAIN
raise TimeoutError
rescue RuntimeError => err
raise ProtocolError.new(err.message)
end
end
end
end
Redis::Connection.drivers << Redis::Connection::Hiredis
class Redis
module Connection
# Store a list of loaded connection drivers in the Connection module.
# Redis::Client uses the last required driver by default, and will be aware
# of the loaded connection drivers if the user chooses to override the
# default connection driver.
def self.drivers
@drivers ||= []
end
end
end
require "redis/connection/registry"
require "redis/connection/command_helper"
require "redis/errors"
require "socket"
require "timeout"
begin
require "openssl"
rescue LoadError
# Not all systems have OpenSSL support
end
if RUBY_VERSION < "1.9.3"
class String
# Ruby 1.8.7 does not have byteslice, but it handles encodings differently anyway.
# We can simply slice the string, which is a byte array there.
def byteslice(*args)
slice(*args)
end
end
end
class Redis
module Connection
module SocketMixin
CRLF = "\r\n".freeze
# Exceptions raised during non-blocking I/O ops that require retrying the op
if RUBY_VERSION >= "1.9.3"
NBIO_READ_EXCEPTIONS = [IO::WaitReadable]
NBIO_WRITE_EXCEPTIONS = [IO::WaitWritable]
else
NBIO_READ_EXCEPTIONS = [Errno::EWOULDBLOCK, Errno::EAGAIN]
NBIO_WRITE_EXCEPTIONS = [Errno::EWOULDBLOCK, Errno::EAGAIN]
end
def initialize(*args)
super(*args)
@timeout = @write_timeout = nil
@buffer = ""
end
def timeout=(timeout)
if timeout && timeout > 0
@timeout = timeout
else
@timeout = nil
end
end
def write_timeout=(timeout)
if timeout && timeout > 0
@write_timeout = timeout
else
@write_timeout = nil
end
end
def read(nbytes)
result = @buffer.slice!(0, nbytes)
while result.bytesize < nbytes
result << _read_from_socket(nbytes - result.bytesize)
end
result
end
def gets
crlf = nil
while (crlf = @buffer.index(CRLF)) == nil
@buffer << _read_from_socket(1024)
end
@buffer.slice!(0, crlf + CRLF.bytesize)
end
def _read_from_socket(nbytes)
begin
read_nonblock(nbytes)
rescue *NBIO_READ_EXCEPTIONS
if IO.select([self], nil, nil, @timeout)
retry
else
raise Redis::TimeoutError
end
rescue *NBIO_WRITE_EXCEPTIONS
if IO.select(nil, [self], nil, @timeout)
retry
else
raise Redis::TimeoutError
end
end
rescue EOFError
raise Errno::ECONNRESET
end
def _write_to_socket(data)
begin
write_nonblock(data)
rescue *NBIO_WRITE_EXCEPTIONS
if IO.select(nil, [self], nil, @write_timeout)
retry
else
raise Redis::TimeoutError
end
rescue *NBIO_READ_EXCEPTIONS
if IO.select([self], nil, nil, @write_timeout)
retry
else
raise Redis::TimeoutError
end
end
rescue EOFError
raise Errno::ECONNRESET
end
def write(data)
return super(data) unless @write_timeout
length = data.bytesize
total_count = 0
loop do
count = _write_to_socket(data)
total_count += count
return total_count if total_count >= length
data = data.byteslice(count..-1)
end
end
end
if defined?(RUBY_ENGINE) && RUBY_ENGINE == "jruby"
require "timeout"
class TCPSocket < ::TCPSocket
include SocketMixin
def self.connect(host, port, timeout)
Timeout.timeout(timeout) do
sock = new(host, port)
sock
end
rescue Timeout::Error
raise TimeoutError
end
end
if defined?(::UNIXSocket)
class UNIXSocket < ::UNIXSocket
include SocketMixin
def self.connect(path, timeout)
Timeout.timeout(timeout) do
sock = new(path)
sock
end
rescue Timeout::Error
raise TimeoutError
end
# JRuby raises Errno::EAGAIN on #read_nonblock even when IO.select
# says it is readable (1.6.6, in both 1.8 and 1.9 mode).
# Use the blocking #readpartial method instead.
def _read_from_socket(nbytes)
readpartial(nbytes)
rescue EOFError
raise Errno::ECONNRESET
end
end
end
else
class TCPSocket < ::Socket
include SocketMixin
def self.connect_addrinfo(ai, port, timeout)
sock = new(::Socket.const_get(ai[0]), Socket::SOCK_STREAM, 0)
sockaddr = ::Socket.pack_sockaddr_in(port, ai[3])
begin
sock.connect_nonblock(sockaddr)
rescue Errno::EINPROGRESS
if IO.select(nil, [sock], nil, timeout) == nil
raise TimeoutError
end
begin
sock.connect_nonblock(sockaddr)
rescue Errno::EISCONN
end
end
sock
end
def self.connect(host, port, timeout)
# Don't pass AI_ADDRCONFIG as flag to getaddrinfo(3)
#
# From the man page for getaddrinfo(3):
#
# If hints.ai_flags includes the AI_ADDRCONFIG flag, then IPv4
# addresses are returned in the list pointed to by res only if the
# local system has at least one IPv4 address configured, and IPv6
# addresses are returned only if the local system has at least one
# IPv6 address configured. The loopback address is not considered
# for this case as valid as a configured address.
#
# We do want the IPv6 loopback address to be returned if applicable,
# even if it is the only configured IPv6 address on the machine.
# Also see: https://github.com/redis/redis-rb/pull/394.
addrinfo = ::Socket.getaddrinfo(host, nil, Socket::AF_UNSPEC, Socket::SOCK_STREAM)
# From the man page for getaddrinfo(3):
#
# Normally, the application should try using the addresses in the
# order in which they are returned. The sorting function used
# within getaddrinfo() is defined in RFC 3484 [...].
#
addrinfo.each_with_index do |ai, i|
begin
return connect_addrinfo(ai, port, timeout)
rescue SystemCallError
# Raise if this was our last attempt.
raise if addrinfo.length == i+1
end
end
end
end
class UNIXSocket < ::Socket
include SocketMixin
def self.connect(path, timeout)
sock = new(::Socket::AF_UNIX, Socket::SOCK_STREAM, 0)
sockaddr = ::Socket.pack_sockaddr_un(path)
begin
sock.connect_nonblock(sockaddr)
rescue Errno::EINPROGRESS
if IO.select(nil, [sock], nil, timeout) == nil
raise TimeoutError
end
begin
sock.connect_nonblock(sockaddr)
rescue Errno::EISCONN
end
end
sock
end
end
end
if defined?(OpenSSL)
class SSLSocket < ::OpenSSL::SSL::SSLSocket
include SocketMixin
def self.connect(host, port, timeout, ssl_params)
# Note: this is using Redis::Connection::TCPSocket
tcp_sock = TCPSocket.connect(host, port, timeout)
ctx = OpenSSL::SSL::SSLContext.new
ctx.set_params(ssl_params) if ssl_params && !ssl_params.empty?
ssl_sock = new(tcp_sock, ctx)
ssl_sock.hostname = host
ssl_sock.connect
ssl_sock.post_connection_check(host)
ssl_sock
end
end
end
class Ruby
include Redis::Connection::CommandHelper
MINUS = "-".freeze
PLUS = "+".freeze
COLON = ":".freeze
DOLLAR = "$".freeze
ASTERISK = "*".freeze
def self.connect(config)
if config[:scheme] == "unix"
raise ArgumentError, "SSL incompatible with unix sockets" if config[:ssl]
sock = UNIXSocket.connect(config[:path], config[:connect_timeout])
elsif config[:scheme] == "rediss" || config[:ssl]
raise ArgumentError, "This library does not support SSL on Ruby < 1.9" if RUBY_VERSION < "1.9.3"
sock = SSLSocket.connect(config[:host], config[:port], config[:connect_timeout], config[:ssl_params])
else
sock = TCPSocket.connect(config[:host], config[:port], config[:connect_timeout])
end
instance = new(sock)
instance.timeout = config[:timeout]
instance.write_timeout = config[:write_timeout]
instance.set_tcp_keepalive config[:tcp_keepalive]
instance
end
if [:SOL_SOCKET, :SO_KEEPALIVE, :SOL_TCP, :TCP_KEEPIDLE, :TCP_KEEPINTVL, :TCP_KEEPCNT].all?{|c| Socket.const_defined? c}
def set_tcp_keepalive(keepalive)
return unless keepalive.is_a?(Hash)
@sock.setsockopt(Socket::SOL_SOCKET, Socket::SO_KEEPALIVE, true)
@sock.setsockopt(Socket::SOL_TCP, Socket::TCP_KEEPIDLE, keepalive[:time])
@sock.setsockopt(Socket::SOL_TCP, Socket::TCP_KEEPINTVL, keepalive[:intvl])
@sock.setsockopt(Socket::SOL_TCP, Socket::TCP_KEEPCNT, keepalive[:probes])
end
def get_tcp_keepalive
{
:time => @sock.getsockopt(Socket::SOL_TCP, Socket::TCP_KEEPIDLE).int,
:intvl => @sock.getsockopt(Socket::SOL_TCP, Socket::TCP_KEEPINTVL).int,
:probes => @sock.getsockopt(Socket::SOL_TCP, Socket::TCP_KEEPCNT).int,
}
end
else
def set_tcp_keepalive(keepalive)
end
def get_tcp_keepalive
{
}
end
end
def initialize(sock)
@sock = sock
end
def connected?
!! @sock
end
def disconnect
@sock.close
rescue
ensure
@sock = nil
end
def timeout=(timeout)
if @sock.respond_to?(:timeout=)
@sock.timeout = timeout
end
end
def write_timeout=(timeout)
@sock.write_timeout = timeout
end
def write(command)
@sock.write(build_command(command))
end
def read
line = @sock.gets
reply_type = line.slice!(0, 1)
format_reply(reply_type, line)
rescue Errno::EAGAIN
raise TimeoutError
end
def format_reply(reply_type, line)
case reply_type
when MINUS then format_error_reply(line)
when PLUS then format_status_reply(line)
when COLON then format_integer_reply(line)
when DOLLAR then format_bulk_reply(line)
when ASTERISK then format_multi_bulk_reply(line)
else raise ProtocolError.new(reply_type)
end
end
def format_error_reply(line)
CommandError.new(line.strip)
end
def format_status_reply(line)
line.strip
end
def format_integer_reply(line)
line.to_i
end
def format_bulk_reply(line)
bulklen = line.to_i
return if bulklen == -1
reply = encode(@sock.read(bulklen))
@sock.read(2) # Discard CRLF.
reply
end
def format_multi_bulk_reply(line)
n = line.to_i
return if n == -1
Array.new(n) { read }
end
end
end
end
Redis::Connection.drivers << Redis::Connection::Ruby
require "redis/connection/command_helper"
require "redis/connection/registry"
require "redis/errors"
require "em-synchrony"
require "hiredis/reader"
class Redis
module Connection
class RedisClient < EventMachine::Connection
include EventMachine::Deferrable
attr_accessor :timeout
def post_init
@req = nil
@connected = false
@reader = ::Hiredis::Reader.new
end
def connection_completed
@connected = true
succeed
end
def connected?
@connected
end
def receive_data(data)
@reader.feed(data)
loop do
begin
reply = @reader.gets
rescue RuntimeError => err
@req.fail [:error, ProtocolError.new(err.message)]
break
end
break if reply == false
reply = CommandError.new(reply.message) if reply.is_a?(RuntimeError)
@req.succeed [:reply, reply]
end
end
def read
@req = EventMachine::DefaultDeferrable.new
if @timeout > 0
@req.timeout(@timeout, :timeout)
end
EventMachine::Synchrony.sync @req
end
def send(data)
callback { send_data data }
end
def unbind
@connected = false
if @req
@req.fail [:error, Errno::ECONNRESET]
@req = nil
else
fail
end
end
end
class Synchrony
include Redis::Connection::CommandHelper
def self.connect(config)
if config[:scheme] == "unix"
conn = EventMachine.connect_unix_domain(config[:path], RedisClient)
elsif config[:scheme] == "rediss" || config[:ssl]
raise NotImplementedError, "SSL not supported by synchrony driver"
else
conn = EventMachine.connect(config[:host], config[:port], RedisClient) do |c|
c.pending_connect_timeout = [config[:connect_timeout], 0.1].max
end
end
fiber = Fiber.current
conn.callback { fiber.resume }
conn.errback { fiber.resume :refused }
raise Errno::ECONNREFUSED if Fiber.yield == :refused
instance = new(conn)
instance.timeout = config[:read_timeout]
instance
end
def initialize(connection)
@connection = connection
end
def connected?
@connection && @connection.connected?
end
def timeout=(timeout)
@connection.timeout = timeout
end
def disconnect
@connection.close_connection
@connection = nil
end
def write(command)
@connection.send(build_command(command))
end
def read
type, payload = @connection.read
if type == :reply
payload
elsif type == :error
raise payload
elsif type == :timeout
raise TimeoutError
else
raise "Unknown type #{type.inspect}"
end
end
end
end
end
Redis::Connection.drivers << Redis::Connection::Synchrony
require "redis/hash_ring"
class Redis
class Distributed
class CannotDistribute < RuntimeError
def initialize(command)
@command = command
end
def message
"#{@command.to_s.upcase} cannot be used in Redis::Distributed because the keys involved need to be on the same server or because we cannot guarantee that the operation will be atomic."
end
end
attr_reader :ring
def initialize(node_configs, options = {})
@tag = options[:tag] || /^\{(.+?)\}/
@ring = options[:ring] || HashRing.new
@node_configs = node_configs.dup
@default_options = options.dup
node_configs.each { |node_config| add_node(node_config) }
@subscribed_node = nil
end
def node_for(key)
@ring.get_node(key_tag(key.to_s) || key.to_s)
end
def nodes
@ring.nodes
end
def add_node(options)
options = { :url => options } if options.is_a?(String)
options = @default_options.merge(options)
@ring.add_node Redis.new( options )
end
# Change the selected database for the current connection.
def select(db)
on_each_node :select, db
end
# Ping the server.
def ping
on_each_node :ping
end
# Echo the given string.
def echo(value)
on_each_node :echo, value
end
# Close the connection.
def quit
on_each_node :quit
end
# Asynchronously save the dataset to disk.
def bgsave
on_each_node :bgsave
end
# Return the number of keys in the selected database.
def dbsize
on_each_node :dbsize
end
# Remove all keys from all databases.
def flushall
on_each_node :flushall
end
# Remove all keys from the current database.
def flushdb
on_each_node :flushdb
end
# Get information and statistics about the server.
def info(cmd = nil)
on_each_node :info, cmd
end
# Get the UNIX time stamp of the last successful save to disk.
def lastsave
on_each_node :lastsave
end
# Listen for all requests received by the server in real time.
def monitor
raise NotImplementedError
end
# Synchronously save the dataset to disk.
def save
on_each_node :save
end
# Get server time: an UNIX timestamp and the elapsed microseconds in the current second.
def time
on_each_node :time
end
# Remove the expiration from a key.
def persist(key)
node_for(key).persist(key)
end
# Set a key's time to live in seconds.
def expire(key, seconds)
node_for(key).expire(key, seconds)
end
# Set the expiration for a key as a UNIX timestamp.
def expireat(key, unix_time)
node_for(key).expireat(key, unix_time)
end
# Get the time to live (in seconds) for a key.
def ttl(key)
node_for(key).ttl(key)
end
# Set a key's time to live in milliseconds.
def pexpire(key, milliseconds)
node_for(key).pexpire(key, milliseconds)
end
# Set the expiration for a key as number of milliseconds from UNIX Epoch.
def pexpireat(key, ms_unix_time)
node_for(key).pexpireat(key, ms_unix_time)
end
# Get the time to live (in milliseconds) for a key.
def pttl(key)
node_for(key).pttl(key)
end
# Return a serialized version of the value stored at a key.
def dump(key)
node_for(key).dump(key)
end
# Create a key using the serialized value, previously obtained using DUMP.
def restore(key, ttl, serialized_value)
node_for(key).restore(key, ttl, serialized_value)
end
# Transfer a key from the connected instance to another instance.
def migrate(key, options)
raise CannotDistribute, :migrate
end
# Delete a key.
def del(*args)
keys_per_node = args.group_by { |key| node_for(key) }
keys_per_node.inject(0) do |sum, (node, keys)|
sum + node.del(*keys)
end
end
# Determine if a key exists.
def exists(key)
node_for(key).exists(key)
end
# Find all keys matching the given pattern.
def keys(glob = "*")
on_each_node(:keys, glob).flatten
end
# Move a key to another database.
def move(key, db)
node_for(key).move(key, db)
end
# Return a random key from the keyspace.
def randomkey
raise CannotDistribute, :randomkey
end
# Rename a key.
def rename(old_name, new_name)
ensure_same_node(:rename, [old_name, new_name]) do |node|
node.rename(old_name, new_name)
end
end
# Rename a key, only if the new key does not exist.
def renamenx(old_name, new_name)
ensure_same_node(:renamenx, [old_name, new_name]) do |node|
node.renamenx(old_name, new_name)
end
end
# Sort the elements in a list, set or sorted set.
def sort(key, options = {})
keys = [key, options[:by], options[:store], *Array(options[:get])].compact
ensure_same_node(:sort, keys) do |node|
node.sort(key, options)
end
end
# Determine the type stored at key.
def type(key)
node_for(key).type(key)
end
# Decrement the integer value of a key by one.
def decr(key)
node_for(key).decr(key)
end
# Decrement the integer value of a key by the given number.
def decrby(key, decrement)
node_for(key).decrby(key, decrement)
end
# Increment the integer value of a key by one.
def incr(key)
node_for(key).incr(key)
end
# Increment the integer value of a key by the given integer number.
def incrby(key, increment)
node_for(key).incrby(key, increment)
end
# Increment the numeric value of a key by the given float number.
def incrbyfloat(key, increment)
node_for(key).incrbyfloat(key, increment)
end
# Set the string value of a key.
def set(key, value, options = {})
node_for(key).set(key, value, options)
end
# Set the time to live in seconds of a key.
def setex(key, ttl, value)
node_for(key).setex(key, ttl, value)
end
# Set the time to live in milliseconds of a key.
def psetex(key, ttl, value)
node_for(key).psetex(key, ttl, value)
end
# Set the value of a key, only if the key does not exist.
def setnx(key, value)
node_for(key).setnx(key, value)
end
# Set multiple keys to multiple values.
def mset(*args)
raise CannotDistribute, :mset
end
def mapped_mset(hash)
raise CannotDistribute, :mapped_mset
end
# Set multiple keys to multiple values, only if none of the keys exist.
def msetnx(*args)
raise CannotDistribute, :msetnx
end
def mapped_msetnx(hash)
raise CannotDistribute, :mapped_msetnx
end
# Get the value of a key.
def get(key)
node_for(key).get(key)
end
# Get the values of all the given keys.
def mget(*keys)
raise CannotDistribute, :mget
end
def mapped_mget(*keys)
raise CannotDistribute, :mapped_mget
end
# Overwrite part of a string at key starting at the specified offset.
def setrange(key, offset, value)
node_for(key).setrange(key, offset, value)
end
# Get a substring of the string stored at a key.
def getrange(key, start, stop)
node_for(key).getrange(key, start, stop)
end
# Sets or clears the bit at offset in the string value stored at key.
def setbit(key, offset, value)
node_for(key).setbit(key, offset, value)
end
# Returns the bit value at offset in the string value stored at key.
def getbit(key, offset)
node_for(key).getbit(key, offset)
end
# Append a value to a key.
def append(key, value)
node_for(key).append(key, value)
end
# Count the number of set bits in a range of the string value stored at key.
def bitcount(key, start = 0, stop = -1)
node_for(key).bitcount(key, start, stop)
end
# Perform a bitwise operation between strings and store the resulting string in a key.
def bitop(operation, destkey, *keys)
ensure_same_node(:bitop, [destkey] + keys) do |node|
node.bitop(operation, destkey, *keys)
end
end
# Return the position of the first bit set to 1 or 0 in a string.
def bitpos(key, bit, start=nil, stop=nil)
node_for(key).bitpos(key, bit, start, stop)
end
# Set the string value of a key and return its old value.
def getset(key, value)
node_for(key).getset(key, value)
end
# Get the length of the value stored in a key.
def strlen(key)
node_for(key).strlen(key)
end
def [](key)
get(key)
end
def []=(key,value)
set(key, value)
end
# Get the length of a list.
def llen(key)
node_for(key).llen(key)
end
# Prepend one or more values to a list.
def lpush(key, value)
node_for(key).lpush(key, value)
end
# Prepend a value to a list, only if the list exists.
def lpushx(key, value)
node_for(key).lpushx(key, value)
end
# Append one or more values to a list.
def rpush(key, value)
node_for(key).rpush(key, value)
end
# Append a value to a list, only if the list exists.
def rpushx(key, value)
node_for(key).rpushx(key, value)
end
# Remove and get the first element in a list.
def lpop(key)
node_for(key).lpop(key)
end
# Remove and get the last element in a list.
def rpop(key)
node_for(key).rpop(key)
end
# Remove the last element in a list, append it to another list and return
# it.
def rpoplpush(source, destination)
ensure_same_node(:rpoplpush, [source, destination]) do |node|
node.rpoplpush(source, destination)
end
end
def _bpop(cmd, args)
options = {}
case args.last
when Hash
options = args.pop
when Integer
# Issue deprecation notice in obnoxious mode...
options[:timeout] = args.pop
end
if args.size > 1
# Issue deprecation notice in obnoxious mode...
end
keys = args.flatten
ensure_same_node(cmd, keys) do |node|
node.__send__(cmd, keys, options)
end
end
# Remove and get the first element in a list, or block until one is
# available.
def blpop(*args)
_bpop(:blpop, args)
end
# Remove and get the last element in a list, or block until one is
# available.
def brpop(*args)
_bpop(:brpop, args)
end
# Pop a value from a list, push it to another list and return it; or block
# until one is available.
def brpoplpush(source, destination, options = {})
case options
when Integer
# Issue deprecation notice in obnoxious mode...
options = { :timeout => options }
end
ensure_same_node(:brpoplpush, [source, destination]) do |node|
node.brpoplpush(source, destination, options)
end
end
# Get an element from a list by its index.
def lindex(key, index)
node_for(key).lindex(key, index)
end
# Insert an element before or after another element in a list.
def linsert(key, where, pivot, value)
node_for(key).linsert(key, where, pivot, value)
end
# Get a range of elements from a list.
def lrange(key, start, stop)
node_for(key).lrange(key, start, stop)
end
# Remove elements from a list.
def lrem(key, count, value)
node_for(key).lrem(key, count, value)
end
# Set the value of an element in a list by its index.
def lset(key, index, value)
node_for(key).lset(key, index, value)
end
# Trim a list to the specified range.
def ltrim(key, start, stop)
node_for(key).ltrim(key, start, stop)
end
# Get the number of members in a set.
def scard(key)
node_for(key).scard(key)
end
# Add one or more members to a set.
def sadd(key, member)
node_for(key).sadd(key, member)
end
# Remove one or more members from a set.
def srem(key, member)
node_for(key).srem(key, member)
end
# Remove and return a random member from a set.
def spop(key, count = nil)
node_for(key).spop(key, count)
end
# Get a random member from a set.
def srandmember(key, count = nil)
node_for(key).srandmember(key, count)
end
# Move a member from one set to another.
def smove(source, destination, member)
ensure_same_node(:smove, [source, destination]) do |node|
node.smove(source, destination, member)
end
end
# Determine if a given value is a member of a set.
def sismember(key, member)
node_for(key).sismember(key, member)
end
# Get all the members in a set.
def smembers(key)
node_for(key).smembers(key)
end
# Subtract multiple sets.
def sdiff(*keys)
ensure_same_node(:sdiff, keys) do |node|
node.sdiff(*keys)
end
end
# Subtract multiple sets and store the resulting set in a key.
def sdiffstore(destination, *keys)
ensure_same_node(:sdiffstore, [destination] + keys) do |node|
node.sdiffstore(destination, *keys)
end
end
# Intersect multiple sets.
def sinter(*keys)
ensure_same_node(:sinter, keys) do |node|
node.sinter(*keys)
end
end
# Intersect multiple sets and store the resulting set in a key.
def sinterstore(destination, *keys)
ensure_same_node(:sinterstore, [destination] + keys) do |node|
node.sinterstore(destination, *keys)
end
end
# Add multiple sets.
def sunion(*keys)
ensure_same_node(:sunion, keys) do |node|
node.sunion(*keys)
end
end
# Add multiple sets and store the resulting set in a key.
def sunionstore(destination, *keys)
ensure_same_node(:sunionstore, [destination] + keys) do |node|
node.sunionstore(destination, *keys)
end
end
# Get the number of members in a sorted set.
def zcard(key)
node_for(key).zcard(key)
end
# Add one or more members to a sorted set, or update the score for members
# that already exist.
def zadd(key, *args)
node_for(key).zadd(key, *args)
end
# Increment the score of a member in a sorted set.
def zincrby(key, increment, member)
node_for(key).zincrby(key, increment, member)
end
# Remove one or more members from a sorted set.
def zrem(key, member)
node_for(key).zrem(key, member)
end
# Get the score associated with the given member in a sorted set.
def zscore(key, member)
node_for(key).zscore(key, member)
end
# Return a range of members in a sorted set, by index.
def zrange(key, start, stop, options = {})
node_for(key).zrange(key, start, stop, options)
end
# Return a range of members in a sorted set, by index, with scores ordered
# from high to low.
def zrevrange(key, start, stop, options = {})
node_for(key).zrevrange(key, start, stop, options)
end
# Determine the index of a member in a sorted set.
def zrank(key, member)
node_for(key).zrank(key, member)
end
# Determine the index of a member in a sorted set, with scores ordered from
# high to low.
def zrevrank(key, member)
node_for(key).zrevrank(key, member)
end
# Remove all members in a sorted set within the given indexes.
def zremrangebyrank(key, start, stop)
node_for(key).zremrangebyrank(key, start, stop)
end
# Return a range of members in a sorted set, by score.
def zrangebyscore(key, min, max, options = {})
node_for(key).zrangebyscore(key, min, max, options)
end
# Return a range of members in a sorted set, by score, with scores ordered
# from high to low.
def zrevrangebyscore(key, max, min, options = {})
node_for(key).zrevrangebyscore(key, max, min, options)
end
# Remove all members in a sorted set within the given scores.
def zremrangebyscore(key, min, max)
node_for(key).zremrangebyscore(key, min, max)
end
# Get the number of members in a particular score range.
def zcount(key, min, max)
node_for(key).zcount(key, min, max)
end
# Intersect multiple sorted sets and store the resulting sorted set in a new
# key.
def zinterstore(destination, keys, options = {})
ensure_same_node(:zinterstore, [destination] + keys) do |node|
node.zinterstore(destination, keys, options)
end
end
# Add multiple sorted sets and store the resulting sorted set in a new key.
def zunionstore(destination, keys, options = {})
ensure_same_node(:zunionstore, [destination] + keys) do |node|
node.zunionstore(destination, keys, options)
end
end
# Get the number of fields in a hash.
def hlen(key)
node_for(key).hlen(key)
end
# Set the string value of a hash field.
def hset(key, field, value)
node_for(key).hset(key, field, value)
end
# Set the value of a hash field, only if the field does not exist.
def hsetnx(key, field, value)
node_for(key).hsetnx(key, field, value)
end
# Set multiple hash fields to multiple values.
def hmset(key, *attrs)
node_for(key).hmset(key, *attrs)
end
def mapped_hmset(key, hash)
node_for(key).hmset(key, *hash.to_a.flatten)
end
# Get the value of a hash field.
def hget(key, field)
node_for(key).hget(key, field)
end
# Get the values of all the given hash fields.
def hmget(key, *fields)
node_for(key).hmget(key, *fields)
end
def mapped_hmget(key, *fields)
Hash[*fields.zip(hmget(key, *fields)).flatten]
end
# Delete one or more hash fields.
def hdel(key, field)
node_for(key).hdel(key, field)
end
# Determine if a hash field exists.
def hexists(key, field)
node_for(key).hexists(key, field)
end
# Increment the integer value of a hash field by the given integer number.
def hincrby(key, field, increment)
node_for(key).hincrby(key, field, increment)
end
# Increment the numeric value of a hash field by the given float number.
def hincrbyfloat(key, field, increment)
node_for(key).hincrbyfloat(key, field, increment)
end
# Get all the fields in a hash.
def hkeys(key)
node_for(key).hkeys(key)
end
# Get all the values in a hash.
def hvals(key)
node_for(key).hvals(key)
end
# Get all the fields and values in a hash.
def hgetall(key)
node_for(key).hgetall(key)
end
# Post a message to a channel.
def publish(channel, message)
node_for(channel).publish(channel, message)
end
def subscribed?
!! @subscribed_node
end
# Listen for messages published to the given channels.
def subscribe(channel, *channels, &block)
if channels.empty?
@subscribed_node = node_for(channel)
@subscribed_node.subscribe(channel, &block)
else
ensure_same_node(:subscribe, [channel] + channels) do |node|
@subscribed_node = node
node.subscribe(channel, *channels, &block)
end
end
end
# Stop listening for messages posted to the given channels.
def unsubscribe(*channels)
raise RuntimeError, "Can't unsubscribe if not subscribed." unless subscribed?
@subscribed_node.unsubscribe(*channels)
end
# Listen for messages published to channels matching the given patterns.
def psubscribe(*channels, &block)
raise NotImplementedError
end
# Stop listening for messages posted to channels matching the given
# patterns.
def punsubscribe(*channels)
raise NotImplementedError
end
# Watch the given keys to determine execution of the MULTI/EXEC block.
def watch(*keys)
raise CannotDistribute, :watch
end
# Forget about all watched keys.
def unwatch
raise CannotDistribute, :unwatch
end
def pipelined
raise CannotDistribute, :pipelined
end
# Mark the start of a transaction block.
def multi
raise CannotDistribute, :multi
end
# Execute all commands issued after MULTI.
def exec
raise CannotDistribute, :exec
end
# Discard all commands issued after MULTI.
def discard
raise CannotDistribute, :discard
end
# Control remote script registry.
def script(subcommand, *args)
on_each_node(:script, subcommand, *args)
end
# Add one or more members to a HyperLogLog structure.
def pfadd(key, member)
node_for(key).pfadd(key, member)
end
# Get the approximate cardinality of members added to HyperLogLog structure.
def pfcount(*keys)
ensure_same_node(:pfcount, keys.flatten(1)) do |node|
node.pfcount(keys)
end
end
# Merge multiple HyperLogLog values into an unique value that will approximate the cardinality of the union of
# the observed Sets of the source HyperLogLog structures.
def pfmerge(dest_key, *source_key)
ensure_same_node(:pfmerge, [dest_key, *source_key]) do |node|
node.pfmerge(dest_key, *source_key)
end
end
def _eval(cmd, args)
script = args.shift
options = args.pop if args.last.is_a?(Hash)
options ||= {}
keys = args.shift || options[:keys] || []
argv = args.shift || options[:argv] || []
ensure_same_node(cmd, keys) do |node|
node.send(cmd, script, keys, argv)
end
end
# Evaluate Lua script.
def eval(*args)
_eval(:eval, args)
end
# Evaluate Lua script by its SHA.
def evalsha(*args)
_eval(:evalsha, args)
end
def inspect
"#<Redis client v#{Redis::VERSION} for #{nodes.map(&:id).join(', ')}>"
end
def dup
self.class.new(@node_configs, @default_options)
end
protected
def on_each_node(command, *args)
nodes.map do |node|
node.send(command, *args)
end
end
def node_index_for(key)
nodes.index(node_for(key))
end
def key_tag(key)
key.to_s[@tag, 1] if @tag
end
def ensure_same_node(command, keys)
all = true
tags = keys.map do |key|
tag = key_tag(key)
all = false unless tag
tag
end
if (all && tags.uniq.size != 1) || (!all && keys.uniq.size != 1)
# Not 1 unique tag or not 1 unique key
raise CannotDistribute, command
end
yield(node_for(keys.first))
end
end
end
class Redis
# Base error for all redis-rb errors.
class BaseError < RuntimeError
end
# Raised by the connection when a protocol error occurs.
class ProtocolError < BaseError
def initialize(reply_type)
super(<<-EOS.gsub(/(?:^|\n)\s*/, " "))
Got '#{reply_type}' as initial reply byte.
If you're in a forking environment, such as Unicorn, you need to
connect to Redis after forking.
EOS
end
end
# Raised by the client when command execution returns an error reply.
class CommandError < BaseError
end
# Base error for connection related errors.
class BaseConnectionError < BaseError
end
# Raised when connection to a Redis server cannot be made.
class CannotConnectError < BaseConnectionError
end
# Raised when connection to a Redis server is lost.
class ConnectionError < BaseConnectionError
end
# Raised when performing I/O times out.
class TimeoutError < BaseConnectionError
end
# Raised when the connection was inherited by a child process.
class InheritedError < BaseConnectionError
end
end
require 'zlib'
class Redis
class HashRing
POINTS_PER_SERVER = 160 # this is the default in libmemcached
attr_reader :ring, :sorted_keys, :replicas, :nodes
# nodes is a list of objects that have a proper to_s representation.
# replicas indicates how many virtual points should be used pr. node,
# replicas are required to improve the distribution.
def initialize(nodes=[], replicas=POINTS_PER_SERVER)
@replicas = replicas
@ring = {}
@nodes = []
@sorted_keys = []
nodes.each do |node|
add_node(node)
end
end
# Adds a `node` to the hash ring (including a number of replicas).
def add_node(node)
@nodes << node
@replicas.times do |i|
key = Zlib.crc32("#{node.id}:#{i}")
raise "Node ID collision" if @ring.has_key?(key)
@ring[key] = node
@sorted_keys << key
end
@sorted_keys.sort!
end
def remove_node(node)
@nodes.reject!{|n| n.id == node.id}
@replicas.times do |i|
key = Zlib.crc32("#{node.id}:#{i}")
@ring.delete(key)
@sorted_keys.reject! {|k| k == key}
end
end
# get the node in the hash ring for this key
def get_node(key)
get_node_pos(key)[0]
end
def get_node_pos(key)
return [nil,nil] if @ring.size == 0
crc = Zlib.crc32(key)
idx = HashRing.binary_search(@sorted_keys, crc)
return [@ring[@sorted_keys[idx]], idx]
end
def iter_nodes(key)
return [nil,nil] if @ring.size == 0
_, pos = get_node_pos(key)
@ring.size.times do |n|
yield @ring[@sorted_keys[(pos+n) % @ring.size]]
end
end
class << self
# gem install RubyInline to use this code
# Native extension to perform the binary search within the hashring.
# There's a pure ruby version below so this is purely optional
# for performance. In testing 20k gets and sets, the native
# binary search shaved about 12% off the runtime (9sec -> 8sec).
begin
require 'inline'
inline do |builder|
builder.c <<-EOM
int binary_search(VALUE ary, unsigned int r) {
int upper = RARRAY_LEN(ary) - 1;
int lower = 0;
int idx = 0;
while (lower <= upper) {
idx = (lower + upper) / 2;
VALUE continuumValue = RARRAY_PTR(ary)[idx];
unsigned int l = NUM2UINT(continuumValue);
if (l == r) {
return idx;
}
else if (l > r) {
upper = idx - 1;
}
else {
lower = idx + 1;
}
}
if (upper < 0) {
upper = RARRAY_LEN(ary) - 1;
}
return upper;
}
EOM
end
rescue Exception
# Find the closest index in HashRing with value <= the given value
def binary_search(ary, value, &block)
upper = ary.size - 1
lower = 0
idx = 0
while(lower <= upper) do
idx = (lower + upper) / 2
comp = ary[idx] <=> value
if comp == 0
return idx
elsif comp > 0
upper = idx - 1
else
lower = idx + 1
end
end
if upper < 0
upper = ary.size - 1
end
return upper
end
end
end
end
end
class Redis
unless defined?(::BasicObject)
class BasicObject
instance_methods.each { |meth| undef_method(meth) unless meth =~ /\A(__|instance_eval)/ }
end
end
class Pipeline
attr_accessor :db
attr :futures
def initialize
@with_reconnect = true
@shutdown = false
@futures = []
end
def with_reconnect?
@with_reconnect
end
def without_reconnect?
!@with_reconnect
end
def shutdown?
@shutdown
end
def call(command, &block)
# A pipeline that contains a shutdown should not raise ECONNRESET when
# the connection is gone.
@shutdown = true if command.first == :shutdown
future = Future.new(command, block)
@futures << future
future
end
def call_pipeline(pipeline)
@shutdown = true if pipeline.shutdown?
@futures.concat(pipeline.futures)
@db = pipeline.db
nil
end
def commands
@futures.map { |f| f._command }
end
def with_reconnect(val=true)
@with_reconnect = false unless val
yield
end
def without_reconnect(&blk)
with_reconnect(false, &blk)
end
def finish(replies, &blk)
if blk
futures.each_with_index.map do |future, i|
future._set(blk.call(replies[i]))
end
else
futures.each_with_index.map do |future, i|
future._set(replies[i])
end
end
end
class Multi < self
def finish(replies)
exec = replies.last
return if exec.nil? # The transaction failed because of WATCH.
# EXEC command failed.
raise exec if exec.is_a?(CommandError)
if exec.size < futures.size
# Some command wasn't recognized by Redis.
raise replies.detect { |r| r.is_a?(CommandError) }
end
super(exec) do |reply|
# Because an EXEC returns nested replies, hiredis won't be able to
# convert an error reply to a CommandError instance itself. This is
# specific to MULTI/EXEC, so we solve this here.
reply.is_a?(::RuntimeError) ? CommandError.new(reply.message) : reply
end
end
def commands
[[:multi]] + super + [[:exec]]
end
end
end
class FutureNotReady < RuntimeError
def initialize
super("Value will be available once the pipeline executes.")
end
end
class Future < BasicObject
FutureNotReady = ::Redis::FutureNotReady.new
def initialize(command, transformation)
@command = command
@transformation = transformation
@object = FutureNotReady
end
def inspect
"<Redis::Future #{@command.inspect}>"
end
def _set(object)
@object = @transformation ? @transformation.call(object) : object
value
end
def _command
@command
end
def value
::Kernel.raise(@object) if @object.kind_of?(::RuntimeError)
@object
end
def is_a?(other)
self.class.ancestors.include?(other)
end
def class
Future
end
end
end
class Redis
class SubscribedClient
def initialize(client)
@client = client
end
def call(command)
@client.process([command])
end
def subscribe(*channels, &block)
subscription("subscribe", "unsubscribe", channels, block)
end
def subscribe_with_timeout(timeout, *channels, &block)
subscription("subscribe", "unsubscribe", channels, block, timeout)
end
def psubscribe(*channels, &block)
subscription("psubscribe", "punsubscribe", channels, block)
end
def psubscribe_with_timeout(timeout, *channels, &block)
subscription("psubscribe", "punsubscribe", channels, block, timeout)
end
def unsubscribe(*channels)
call([:unsubscribe, *channels])
end
def punsubscribe(*channels)
call([:punsubscribe, *channels])
end
protected
def subscription(start, stop, channels, block, timeout = 0)
sub = Subscription.new(&block)
unsubscribed = false
begin
@client.call_loop([start, *channels], timeout) do |line|
type, *rest = line
sub.callbacks[type].call(*rest)
unsubscribed = type == stop && rest.last == 0
break if unsubscribed
end
ensure
# No need to unsubscribe here. The real client closes the connection
# whenever an exception is raised (see #ensure_connected).
end
end
end
class Subscription
attr :callbacks
def initialize
@callbacks = Hash.new do |hash, key|
hash[key] = lambda { |*_| }
end
yield(self)
end
def subscribe(&block)
@callbacks["subscribe"] = block
end
def unsubscribe(&block)
@callbacks["unsubscribe"] = block
end
def message(&block)
@callbacks["message"] = block
end
def psubscribe(&block)
@callbacks["psubscribe"] = block
end
def punsubscribe(&block)
@callbacks["punsubscribe"] = block
end
def pmessage(&block)
@callbacks["pmessage"] = block
end
end
end
class Redis
VERSION = "3.3.3"
end
redis:
host: 127.0.1.1
port: 6378
pass: secure
database: 1
socket: /var/run/redis/redis.sock
namespace: my:gitlab
sentinels:
-
host: 127.0.0.1
port: 26380
......@@ -4,21 +4,6 @@ require_relative '../lib/gitlab_config'
describe GitlabConfig do
let(:config) { GitlabConfig.new }
describe :redis do
before do
config_file = File.read('spec/fixtures/gitlab_config_redis.yml')
config.instance_variable_set(:@config, YAML.load(config_file))
end
it { config.redis['host'].should eq('127.0.1.1') }
it { config.redis['port'].should eq(6378) }
it { config.redis['database'].should eq(1) }
it { config.redis['namespace'].should eq('my:gitlab') }
it { config.redis['socket'].should eq('/var/run/redis/redis.sock') }
it { config.redis['pass'].should eq('secure') }
it { config.redis['sentinels'].should eq([{ 'host' => '127.0.0.1', 'port' => 26380 }]) }
end
describe :gitlab_url do
let(:url) { 'http://test.com' }
subject { config.gitlab_url }
......
......@@ -434,60 +434,4 @@ describe GitlabNet, vcr: true do
store.should_receive(:add_path).with('test_path')
end
end
describe '#redis_client' do
let(:config) { double('config') }
context "with empty redis config" do
it 'returns default parameters' do
allow(gitlab_net).to receive(:config).and_return(config)
allow(config).to receive(:redis).and_return( {} )
expect_any_instance_of(Redis).to receive(:initialize).with({ host: '127.0.0.1',
port: 6379,
db: 0 })
gitlab_net.redis_client
end
end
context "with password" do
it 'uses the specified host, port, and password' do
allow(gitlab_net).to receive(:config).and_return(config)
allow(config).to receive(:redis).and_return( { 'host' => 'localhost', 'port' => 1123, 'pass' => 'secret' } )
expect_any_instance_of(Redis).to receive(:initialize).with({ host: 'localhost',
port: 1123,
db: 0,
password: 'secret'})
gitlab_net.redis_client
end
end
context "with sentinels" do
it 'uses the specified sentinels' do
allow(gitlab_net).to receive(:config).and_return(config)
allow(config).to receive(:redis).and_return({ 'host' => 'localhost', 'port' => 1123,
'sentinels' => [{'host' => '127.0.0.1', 'port' => 26380}] })
expect_any_instance_of(Redis).to receive(:initialize).with({ host: 'localhost',
port: 1123,
db: 0,
sentinels: [{host: '127.0.0.1', port: 26380}] })
gitlab_net.redis_client
end
end
context "with redis socket" do
let(:socket) { '/tmp/redis.socket' }
it 'uses the socket' do
allow(gitlab_net).to receive(:config).and_return(config)
allow(config).to receive(:redis).and_return( { 'socket' => socket })
expect_any_instance_of(Redis).to receive(:initialize).with({ path: socket, db: 0 })
gitlab_net.redis_client
end
end
end
end
......@@ -13,7 +13,6 @@ describe GitlabPostReceive do
let(:gl_repository) { "project-1" }
let(:gitlab_post_receive) { GitlabPostReceive.new(gl_repository, repo_path, actor, wrongly_encoded_changes) }
let(:broadcast_message) { "test " * 10 + "message " * 10 }
let(:redis_client) { double('redis_client') }
let(:enqueued_at) { Time.new(2016, 6, 23, 6, 59) }
let(:new_merge_request_urls) do
[{
......@@ -36,151 +35,6 @@ describe GitlabPostReceive do
end
describe "#exec" do
context 'when the new post_receive API endpoint is not available' do
before do
GitlabNet.any_instance.stub(broadcast_message: { })
GitlabNet.any_instance.stub(:merge_request_urls).with(gl_repository, repo_path, wrongly_encoded_changes) { [] }
GitlabNet.any_instance.stub(notify_post_receive: true)
allow_any_instance_of(GitlabNet).to receive(:post_receive).and_raise(GitlabNet::NotFound)
allow_any_instance_of(GitlabNet).to receive(:redis_client).and_return(redis_client)
allow_any_instance_of(GitlabReferenceCounter).to receive(:redis_client).and_return(redis_client)
allow(redis_client).to receive(:get).and_return(1)
allow(redis_client).to receive(:incr).and_return(true)
allow(redis_client).to receive(:decr).and_return(0)
allow(redis_client).to receive(:rpush).and_return(true)
expect(Time).to receive(:now).and_return(enqueued_at)
end
context 'Without broad cast message' do
context 'pushing new branch' do
before do
GitlabNet.any_instance.stub(:merge_request_urls).with(gl_repository, repo_path, wrongly_encoded_changes) do
new_merge_request_urls
end
end
it "prints the new merge request url" do
assert_new_mr_printed(gitlab_post_receive)
gitlab_post_receive.exec
end
end
context 'pushing existing branch with merge request created' do
before do
GitlabNet.any_instance.stub(:merge_request_urls).with(gl_repository, repo_path, wrongly_encoded_changes) do
existing_merge_request_urls
end
end
it "prints the view merge request url" do
assert_existing_mr_printed(gitlab_post_receive)
gitlab_post_receive.exec
end
end
end
context 'show broadcast message and merge request link' do
before do
GitlabNet.any_instance.stub(:merge_request_urls).with(gl_repository, repo_path, wrongly_encoded_changes) do
new_merge_request_urls
end
GitlabNet.any_instance.stub(broadcast_message: { "message" => broadcast_message })
end
it 'prints the broadcast message and create new merge request link' do
assert_broadcast_message_printed(gitlab_post_receive)
assert_new_mr_printed(gitlab_post_receive)
gitlab_post_receive.exec
end
end
context 'Sidekiq jobs' do
it "pushes a Sidekiq job onto the queue" do
expect(redis_client).to receive(:rpush).with(
'resque:gitlab:queue:post_receive',
%Q/{"class":"PostReceive","args":["#{gl_repository}","#{actor}",#{base64_changes.inspect}],"jid":"#{gitlab_post_receive.jid}","enqueued_at":#{enqueued_at.to_f}}/
).and_return(true)
gitlab_post_receive.exec
end
context 'when gl_repository is nil' do
let(:gl_repository) { nil }
it "pushes a Sidekiq job with the repository path" do
expect(redis_client).to receive(:rpush).with(
'resque:gitlab:queue:post_receive',
%Q/{"class":"PostReceive","args":["#{repo_path}","#{actor}",#{base64_changes.inspect}],"jid":"#{gitlab_post_receive.jid}","enqueued_at":#{enqueued_at.to_f}}/
).and_return(true)
gitlab_post_receive.exec
end
end
end
context 'reference counter' do
it 'decreases the reference counter for the project' do
expect_any_instance_of(GitlabReferenceCounter).to receive(:decrease).and_return(true)
gitlab_post_receive.exec
end
context "when the redis command succeeds" do
before do
allow(redis_client).to receive(:decr).and_return(0)
end
it "returns true" do
expect(gitlab_post_receive.exec).to eq(true)
end
end
context "when the redis command fails" do
before do
allow(redis_client).to receive(:decr).and_raise('Fail')
end
it "returns false" do
expect(gitlab_post_receive.exec).to eq(false)
end
end
end
context 'post_receive notification' do
it 'calls the api to notify the execution of the hook' do
expect_any_instance_of(GitlabNet).to receive(:notify_post_receive).
with(gl_repository, repo_path)
gitlab_post_receive.exec
end
end
context "when the redis command succeeds" do
before do
allow(redis_client).to receive(:rpush).and_return(true)
end
it "returns true" do
expect(gitlab_post_receive.exec).to eq(true)
end
end
context "when the redis command fails" do
before do
allow(redis_client).to receive(:rpush).and_raise('Fail')
end
it "returns false" do
expect(gitlab_post_receive.exec).to eq(false)
end
end
end
context 'when the new post_receive API endpoint is available' do
let(:response) { { 'reference_counter_decreased' => true } }
it 'calls the api to notify the execution of the hook' do
......@@ -231,7 +85,6 @@ describe GitlabPostReceive do
end
end
end
end
private
......
# coding: utf-8
require 'spec_helper'
require 'gitlab_reference_counter'
describe GitlabReferenceCounter do
let(:redis_client) { double('redis_client') }
let(:reference_counter_key) { "git-receive-pack-reference-counter:/test/path" }
let(:gitlab_reference_counter) { GitlabReferenceCounter.new('/test/path') }
before do
allow(gitlab_reference_counter).to receive(:redis_client).and_return(redis_client)
$logger = double('logger').as_null_object
end
it 'increases and set the expire time of a reference count for a path' do
expect(redis_client).to receive(:incr).with(reference_counter_key)
expect(redis_client).to receive(:expire).with(reference_counter_key, GitlabReferenceCounter::REFERENCE_EXPIRE_TIME)
expect(gitlab_reference_counter.increase).to be(true)
end
it 'decreases the reference count for a path' do
allow(redis_client).to receive(:decr).and_return(0)
expect(redis_client).to receive(:decr).with(reference_counter_key)
expect(gitlab_reference_counter.decrease).to be(true)
end
it 'warns if attempting to decrease a counter with a value of one or less, and resets the counter' do
expect(redis_client).to receive(:decr).and_return(-1)
expect(redis_client).to receive(:del)
expect($logger).to receive(:warn).with("Reference counter for /test/path decreased when its value was less than 1. Reseting the counter.")
expect(gitlab_reference_counter.decrease).to be(true)
end
it 'get the reference count for a path' do
allow(redis_client).to receive(:get).and_return(1)
expect(gitlab_reference_counter.value).to be(1)
end
end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment