|
#!/usr/bin/env python
# Copyright (C) 2011 Fog Creek Software. All rights reserved.
from __future__ import with_statement
import errno
import logging
import optparse
import os
import pickle
import random
import re
import select
import signal
import socket
import sys
import time
from collections import deque as _deque
from logging import handlers
from bugzscout import BugzScout
LOGGER_NAME = 'miniredis'
class deque(_deque):
def count(self, value):
matches = 0
for x in self:
if x == value:
matches += 1
return matches
class RedisConstant(object):
def __init__(self, type):
self.type = type
def __repr__(self):
return '<RedisConstant(%s)>' % self.type
class RedisMessage(object):
def __init__(self, message):
self.message = message
def __str__(self):
return '+%s' % self.message
def __repr__(self):
return '<RedisMessage(%s)>' % self.message
class RedisError(RedisMessage):
def __init__(self, message):
self.message = message
def __str__(self):
return '-ERR %s' % self.message
def __repr__(self):
return '<RedisError(%s)>' % self.message
CHANGE_COMMANDS = ['del', 'rename', 'renamenx', 'expire', 'persist', 'move', 'flushdb', 'flushall', 'set', 'getset', 'setnx', 'setex', 'mset', 'msetnx', 'incr', 'incrby', 'decr', 'decrby', 'append', 'substr', 'rpush', 'lpush', 'ltrim', 'lset', 'lrem', 'lpop', 'rpop', 'blpop', 'brpop', 'rpoplpush', 'sadd', 'srem', 'spop', 'smove', 'sinterstore', 'sunionstore', 'sdiffstore', 'zadd', 'zrem', 'zincrby', 'zscore', 'zremrangebyrank', 'zremrangebyscore', 'zunionstore', 'hset', 'hmset', 'hincrby', 'hdel',]
EMPTY_SCALAR = RedisConstant('EmptyScalar')
EMPTY_LIST = RedisConstant('EmptyList')
BAD_VALUE = RedisError('Operation against a key holding the wrong kind of value')
def who(client):
try:
return '%s:%s' % client.socket.getpeername() if client else 'SERVER'
except:
return '<CLOSED>'
def key_x(x):
def _getter(t):
return t[x]
return _getter
def ishash(data):
return data is None or isinstance(data, dict)
def islist(data):
return data is None or isinstance(data, deque)
def isscalar(data):
return data is None or isinstance(data, basestring) or isinstance(data, int) or isinstance(data, float)
def isset(data):
return data is None or isinstance(data, set)
def issortedset(data):
return data is None or isinstance(data, dict)
def slice(data, start, end=None):
if isinstance(data, deque):
data = list(data)
return data[start:end + 1 if end != -1 else None]
def score_in_range(low, high):
def _cmp(t):
return t[1] >= low and t[1] <= high
return _cmp
def xfloat(x, low=True):
if isinstance(x, basestring):
if x == '-inf':
return sys.float_info.min
if x == 'inf':
return sys.float_info.max
if x.startswith('('):
if low:
return float(x[1:]) + sys.float_info.epsilon
else:
return float(x[1:]) - sys.float_info.epsilon
return float(x)
class RedisClient(object):
def __init__(self, socket):
self.socket = socket
self.wfile = socket.makefile('wb')
self.rfile = socket.makefile('rb')
self.db = None
self.table = None
class DisconnectedException(Exception):
def __init__(self, client):
super(DisconnectedException, self).__init__()
self.client = client
def __str__(self):
return repr(self)
def __repr__(self):
return '<DisconnectedException(%s)>' % who(self.client)
class MiniRedis(object):
def __init__(self, host='127.0.0.1', port=6379, log_file=None, db_file=None):
super(MiniRedis, self).__init__()
self.host = host
self.port = port
self.logger = logging.getLogger(LOGGER_NAME)
if log_file:
handler = handlers.RotatingFileHandler(log_file, maxBytes=0, backupCount=0)
handler.formatter = logging.Formatter('[%(process)d] %(asctime)s - %(levelname)s %(message)s', '%d %b %H:%M:%S')
self.logger.addHandler(handler)
self.logger.setLevel(logging.DEBUG)
self.scout = BugzScout(LOGGER_NAME)
self.halt = True
self.clients = {}
self.tables = {}
self.db_file = db_file
self.lastsave = int(time.time())
self.connections_received = 0
self.commands_processed = 0
self.changes_since_last_save = 0
self.start_time = time.time()
self.load()
def critical(self, client, s):
if isinstance(s, list):
s = ''.join(s)
self.logger.critical('%s: %s' % (who(client), s))
def debug(self, client, s):
if isinstance(s, list):
s = ''.join(s)
self.logger.debug('%s: %s' % (who(client), s))
def dump(self, client, o):
nl = '\r\n'
if isinstance(o, bool):
if o:
client.wfile.write('+OK\r\n')
# Show nothing for a false return; that means be quiet
elif o == EMPTY_SCALAR:
client.wfile.write('$-1\r\n')
elif o == EMPTY_LIST:
client.wfile.write('*-1\r\n')
elif isinstance(o, int):
client.wfile.write(':' + str(o) + nl)
elif isinstance(o, float):
client.wfile.write('$' + str(len(str(o))) + nl)
client.wfile.write(str(o) + nl)
elif isinstance(o, str):
client.wfile.write('$' + str(len(o)) + nl)
client.wfile.write(o + nl)
elif isinstance(o, list):
client.wfile.write('*' + str(len(o)) + nl)
for val in o:
self.dump(client, val)
elif isinstance(o, RedisMessage):
client.wfile.write('%s\r\n' % o)
else:
client.wfile.write('return type not yet implemented\r\n')
client.wfile.flush()
def load(self):
if self.db_file and os.path.lexists(self.db_file):
with open(self.db_file, 'rb') as f:
try:
self.tables = pickle.load(f)
self.log(None, 'loaded database from file "%s"' % self.db_file)
except:
self.log(None, 'database was old format; load aborted')
def log(self, client, s):
if isinstance(s, list):
s = ''.join(s)
self.logger.info('%s: %s', who(client), s)
def handle(self, client):
line = client.rfile.readline()
if not line:
raise DisconnectedException(client)
items = int(line[1:].strip())
args = []
for x in xrange(0, items):
length = int(client.rfile.readline().strip()[1:])
args.append(client.rfile.read(length))
client.rfile.read(2) # throw out newline
command = args[0].lower()
self.dump(client, getattr(self, 'handle_' + command)(client, *args[1:]))
self.commands_processed += 1
if command in CHANGE_COMMANDS:
self.changes_since_last_save += 1
def rotate(self):
for handler in self.logger.handlers:
if hasattr(handler, 'doRollover'):
handler.doRollover()
def run(self):
self.halt = False
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server.bind((self.host, self.port))
server.listen(5)
while not self.halt:
try:
readable, _, _ = select.select([server] + self.clients.keys(), [], [], 1.0)
except select.error, e:
if e.args[0] == errno.EINTR:
continue
raise
for sock in readable:
if sock == server:
(client_socket, address) = server.accept()
client = RedisClient(client_socket)
self.clients[client_socket] = client
self.log(client, 'client connected')
self.select(client, 0)
else:
client = self.clients[sock]
try:
self.handle(client)
except Exception, e:
if isinstance(e, DisconnectedException):
self.log(None, 'client disconnected: %s' % e)
else:
import traceback
self.critical(client, 'EXCEPTION: %s' % e)
self.critical(client, traceback.format_exc())
self.handle_quit(client)
for client_socket in self.clients.iterkeys():
client_socket.close()
self.clients.clear()
server.close()
def save(self):
if self.db_file:
with open(self.db_file, 'wb') as f:
pickle.dump(self.tables, f, pickle.HIGHEST_PROTOCOL)
self.lastsave = int(time.time())
self.changes_since_last_save = 0
def select(self, client, db):
if db not in self.tables:
self.tables[db] = {}
client.db = db
client.table = self.tables[db]
self.connections_received += 1
def stop(self):
if not self.halt:
self.log(None, 'STOPPING')
self.save()
self.halt = True
def who(self, client):
try:
who = '%s:%s' % client.socket.getpeername() if client else 'SERVER'
except:
who = '<CLOSED>'
return who
# HANDLERS
def handle_append(self, client, key, value):
data = client.table.get(key, '')
if not isscalar(data):
return BAD_VALUE
client.table[key] = str(data) + str(value)
self.debug(client, 'APPEND %s %s -> %s' % (key, value, client.table[key]))
return len(client.table[key])
def handle_bgsave(self, client):
if hasattr(os, 'fork'):
if not os.fork():
self.save()
sys.exit(0)
else:
self.save()
self.debug(client, 'BGSAVE')
return RedisMessage('Background saving started')
def handle_dbsize(self, client):
return len(client.table)
def handle_decr(self, client, key):
return self.handle_decrby(client, key, 1)
def handle_decrby(self, client, key, by):
try:
by = -1 * int(by)
except (TypeError, ValueError):
return BAD_VALUE
return self.handle_incrby(client, key, by)
def handle_del(self, client, key):
self.debug(client, 'DEL %s' % key)
if key not in client.table:
return 0
del client.table[key]
return 1
def handle_exists(self, client, key):
self.debug(client, 'EXISTS %s' % key)
if key in client.table:
return 1
return 0
def handle_flushdb(self, client):
self.debug(client, 'FLUSHDB')
client.table.clear()
return True
def handle_flushall(self, client):
self.debug(client, 'FLUSHALL')
for table in self.tables.itervalues():
table.clear()
return True
def handle_get(self, client, key):
data = client.table.get(key, None)
if not isscalar(data):
return BAD_VALUE
if data != None:
data = str(data)
else:
data = EMPTY_SCALAR
self.debug(client, 'GET %s -> %s' % (key, data))
return data
def handle_getset(self, client, key, new):
data = client.table.get(key, None)
if not isscalar(data):
return BAD_VALUE
if data != None:
data = str(data)
else:
data = EMPTY_SCALAR
self.debug(client, 'GETSET %s %s -> %s' % (key, new, data))
client.table[key] = new
return data
# Hash Functions
def handle_hdel(self, client, key, field):
hashtable = client.table.get(key, {})
if not ishash(hashtable):
return BAD_VALUE
exists = hashtable.has_key(field)
if exists:
hashtable.pop(field)
self.debug(client, 'HDEL %s %s -> %s' % (key, field, exists))
return 1 if exists else 0
def handle_hexists(self, client, key, field):
hashtable = client.table.get(key, {})
if not ishash(hashtable):
return BAD_VALUE
exists = hashtable.has_key(field)
self.debug(client, 'HEXISTS %s %s -> %s' % (key, field, exists))
return 1 if exists else 0
def handle_hget(self, client, key, field):
hashtable = client.table.get(key, {})
if not ishash(hashtable):
return BAD_VALUE
if not hashtable.has_key(field):
return EMPTY_LIST
value = hashtable[field]
self.debug(client, 'HGET %s %s -> %s' % (key, field, value))
return value
def handle_hgetall(self, client, key):
hashtable = client.table.get(key, {})
if not ishash(hashtable):
return BAD_VALUE
all = []
for key, value in hashtable.iteritems():
all.append(key)
all.append(value)
self.debug(client, 'HGETALL %s' % (key,))
return all
def handle_hincrby(self, client, key, field, by):
hashtable = client.table.get(key, None)
if hashtable is None:
hashtable = {}
client.table[key] = hashtable
if not ishash(hashtable):
return BAD_VALUE
data = hashtable.get(field, 0)
try:
data = int(data)
except (TypeError, ValueError):
return BAD_VALUE
try:
by = int(by)
except (TypeError, ValueError):
return RedisError('value is not an integer')
hashtable[field] = data + by
self.debug(client, 'HINCRBY %s %s %s -> %s' % (key, field, by, client.table[key][field]))
return client.table[key][field]
def handle_hkeys(self, client, key):
hashtable = client.table.get(key, {})
if not ishash(hashtable):
return BAD_VALUE
keys = hashtable.keys()
self.debug(client, 'HKEYS %s' % (key,))
return keys
def handle_hlen(self, client, key):
hashtable = client.table.get(key, {})
if not ishash(hashtable):
return BAD_VALUE
value = len(hashtable)
self.debug(client, 'HLEN %s -> %s' % (key, value))
return value
def handle_hmget(self, client, key, *args):
hashtable = client.table.get(key, {})
if not ishash(hashtable):
return BAD_VALUE
if not args:
return RedisError("wrong number of arguments for 'hmget' command")
values = []
for field in args:
values.append(hashtable.get(field, EMPTY_SCALAR))
self.debug(client, 'HMGET %s %s' % (key, ' '.join(args)))
return values
def handle_hmset(self, client, key, *args):
hashtable = client.table.get(key, None)
if hashtable is None:
hashtable = {}
client.table[key] = hashtable
if not ishash(hashtable):
return BAD_VALUE
if not args or len(args) % 2 > 0:
return RedisError('wrong number of arguments for HMSET')
for field, value in zip(args[::2], args[1::2]):
hashtable[field] = value
self.debug(client, 'HMSET %s %s' % (key, ' '.join(args)))
return True
def handle_hset(self, client, key, field, value):
hashtable = client.table.get(key, None)
if hashtable is None:
hashtable = {}
client.table[key] = hashtable
if not ishash(hashtable):
return BAD_VALUE
field_exists = hashtable.has_key(field)
hashtable[field] = value
self.debug(client, 'HSET %s %s %s' % (key, field, value))
return 0 if field_exists else 1
def handle_hsetnx(self, client, key, field, value):
hashtable = client.table.get(key, None)
if hashtable is None:
hashtable = {}
client.table[key] = hashtable
if not ishash(hashtable):
return BAD_VALUE
field_exists = hashtable.has_key(field)
if not field_exists:
hashtable[field] = value
self.debug(client, 'HSETNX %s %s %s' % (key, field, value))
return 0 if field_exists else 1
def handle_hvals(self, client, key):
hashtable = client.table.get(key, {})
if not ishash(hashtable):
return BAD_VALUE
values = hashtable.values()
self.debug(client, 'HKEYS %s' % (key,))
return values
def handle_incr(self, client, key):
return self.handle_incrby(client, key, 1)
def handle_incrby(self, client, key, by):
data = client.table.get(key, 0)
try:
data = int(data)
except (TypeError, ValueError):
return BAD_VALUE
try:
by = int(by)
except (TypeError, ValueError):
return RedisError('value is not an integer')
client.table[key] = data + by
self.debug(client, 'INCRBY %s %s -> %s' % (key, by, client.table[key]))
return client.table[key]
def handle_info(self, client):
self.debug(client, 'INFO')
uptime = time.time()-self.start_time
return """redis_version:1.3.17
connected_clients:%d
connected_slaves:0
used_memory:%d
changes_since_last_save:%d
last_save_time:%d
total_connections_received:%d
total_commands_processed:%d
uptime_in_seconds:%.2f
uptime_in_days:%.2f
db%d:keys=%d,expires=0""" % (len(self.clients), client.table.__sizeof__(), self.lastsave, self.changes_since_last_save, self.connections_received, self.commands_processed, uptime, uptime / 86400.0, client.db, len(client.table))
def handle_keys(self, client, pattern):
r = re.compile(pattern.replace('*', '.*'))
self.debug(client, 'KEYS %s' % pattern)
return [k for k in client.table.keys() if r.search(k)]
def handle_lastsave(self, client):
return self.lastsave
def handle_lindex(self, client, key, index):
index = int(index)
if key not in client.table:
return EMPTY_LIST
if not islist(client.table[key]):
return BAD_VALUE
try:
data = list(client.table[key])[index]
except IndexError:
return EMPTY_SCALAR
self.debug(client, 'LINDEX %s %s -> %s' % (key, index, data))
return data
def handle_llen(self, client, key):
if key not in client.table:
return 0
if not islist(client.table[key]):
return BAD_VALUE
return len(client.table[key])
def handle_lpop(self, client, key):
if key not in client.table:
return EMPTY_SCALAR
if not islist(client.table[key]):
return BAD_VALUE
if len(client.table[key]) > 0:
data = client.table[key].popleft()
else:
data = EMPTY_SCALAR
self.debug(client, 'LPOP %s -> %s' % (key, data))
return data
def handle_lpush(self, client, key, data):
if key not in client.table:
client.table[key] = deque()
elif not islist(client.table[key]):
return BAD_VALUE
client.table[key].appendleft(data)
self.debug(client, 'LPUSH %s %s' % (key, data))
return len(client.table[key])
def handle_lrange(self, client, key, low, high):
low, high = int(low), int(high)
if key not in client.table:
return []
if not islist(client.table[key]):
return BAD_VALUE
l = slice(client.table[key], low, high)
self.debug(client, 'LRANGE %s %s %s -> %s' % (key, low, high-1 if high else -1, l))
return l
def handle_lrem(self, client, key, count, value):
if key not in client.table:
return 0
if not islist(client.table[key]):
return BAD_VALUE
old_list = client.table[key]
count = int(count)
reverse = (count < 0)
if count == 0 or abs(count) >= old_list.count(value):
# Removing more than exist. Front or back doesn't matter.
new_list = deque((x for x in old_list if x != value))
removed = len(old_list) - len(new_list)
client.table[key] = new_list
else:
# More elements matching value than count.
new_list = old_list
if reverse:
count *= -1
new_list = deque(reversed(new_list))
removed = 0
to_remove = count
while to_remove > 0:
# No, it's not the most efficient way if count is large...
new_list.remove(value)
to_remove -= 1
removed += 1
if reverse:
new_list = deque(reversed(new_list))
client.table[key] = new_list
self.debug(client, 'LREM %s %s %s -> %s' % (key, count, value, removed))
return removed
def handle_lset(self, client, key, index, value):
if key not in client.table:
client.table[key] = deque()
elif not islist(client.table[key]):
return BAD_VALUE
try:
index = int(index)
except (TypeError, ValueError):
return RedisError('value is not an integer')
try:
client.table[key][index] = value
except IndexError:
return RedisError('index out of range')
self.debug(client, 'LSET %s %s %s' % (key, index, value))
return True
def handle_ltrim(self, client, key, start, end):
data = client.table.get(key, None)
if not islist(data):
return BAD_VALUE
try:
start = int(start)
end = int(end)
except (TypeError, ValueError):
return RedisError('value is not an integer')
if data:
client.table[key] = deque(slice(data, start, end))
return True
def handle_mget(self, client, *args):
if not args:
return RedisError("wrong number of arguments for 'hmget' command")
values = []
for key in args:
data = client.table.get(key, None)
if not isscalar(data):
data = None
values.append(data or EMPTY_SCALAR)
self.debug(client, 'MGET %s %s' % (key, ' '.join(args)))
return values
def handle_mset(self, client, *args):
if not args or len(args) % 2 > 0:
return RedisError('wrong number of arguments for MSET')
for key, value in zip(args[::2], args[1::2]):
client.table[key] = value
self.debug(client, 'MSET %s' % (' '.join(args)))
return True
def handle_msetnx(self, client, *args):
if not args or len(args) % 2 > 0:
return RedisError('wrong number of arguments for MSET')
for key in args[::2]:
if key in client.table:
return 0
for key, value in zip(args[::2], args[1::2]):
client.table[key] = value
self.debug(client, 'MSET %s' % (' '.join(args)))
return 1
def handle_ping(self, client):
self.debug(client, 'PING -> PONG')
return RedisMessage('PONG')
def handle_randomkey(self, client):
self.debug(client, 'RANDOMKEY')
if not len(client.table):
return EMPTY_SCALAR
return random.choice(client.table.keys())
def handle_rename(self, client, old, new):
if old == new:
return RedisError('source and destination objects are the same')
if old not in client.table:
return RedisError('no such key')
self.debug(client, 'RENAME %s %s' % (old, new))
client.table[new] = client.table.pop(old)
return True
def handle_renamenx(self, client, old, new):
if old == new:
return RedisError('source and destination objects are the same')
if old not in client.table:
return RedisError('no such key')
self.debug(client, 'RENAMENX %s %s' % (old, new))
if new in client.table:
return 0
client.table[new] = client.table.pop(old)
return 1
def handle_rpop(self, client, key):
if key not in client.table:
return EMPTY_SCALAR
if not islist(client.table[key]):
return BAD_VALUE
if len(client.table[key]) > 0:
data = client.table[key].pop()
else:
data = EMPTY_SCALAR
self.debug(client, 'RPOP %s -> %s' % (key, data))
return data
def handle_rpoplpush(self, client, src, dest):
if src not in client.table:
return EMPTY_SCALAR
if dest not in client.table:
client.table[dest] = deque()
if not islist(client.table[src]) or not islist(client.table[dest]):
return BAD_VALUE
if len(client.table[src]) == 0:
return EMPTY_SCALAR
data = client.table[src].pop()
client.table[dest].appendleft(data)
self.debug(client, 'RPOPLPUSH %s %s -> %s' % (src, dest, data))
return data
def handle_rpush(self, client, key, data):
if key not in client.table:
client.table[key] = deque()
elif not islist(client.table[key]):
return BAD_VALUE
client.table[key].append(data)
self.debug(client, 'RPUSH %s %s' % (key, data))
return len(client.table[key])
def handle_type(self, client, key):
if key not in client.table:
return RedisMessage('none')
data = client.table[key]
if islist(data):
return RedisMessage('list')
elif isset(data):
return RedisMessage('set')
elif ishash(data):
return RedisMessage('hash')
elif isscalar(data):
return RedisMessage('string')
else:
return RedisError('unknown data type')
def handle_quit(self, client):
self.debug(client, 'QUIT')
if client.socket in self.clients:
del self.clients[client.socket]
try:
client.socket.shutdown(socket.SHUT_RDWR)
except socket.error:
pass
finally:
try:
client.socket.close()
except socket.error:
pass
return False
def handle_sadd(self, client, key, data):
if key not in client.table:
client.table[key] = set()
elif not isset(client.table[key]):
return BAD_VALUE
exists = 0
if data not in client.table[key]:
client.table[key].add(data)
exists = 1
self.debug(client, 'SADD %s %s' % (key, data))
return exists
def handle_scard(self, client, key):
if key not in client.table:
return 0
elif not isset(client.table[key]):
return BAD_VALUE
else:
data = len(client.table[key])
self.debug(client, 'SCARD %s -> %d' % (key, data))
return data
def handle_sdiff(self, client, key, *keys):
if key not in client.table:
return []
elif not isset(client.table[key]):
return BAD_VALUE
result = client.table[key]
for diff_key in keys:
if diff_key not in client.table:
continue
elif not isset(client.table[diff_key]):
return BAD_VALUE
result = result.difference(client.table[diff_key])
self.debug(client, 'SDIFF %s %s' % (key, ' '.join(keys)))
return list(result)
def handle_sdiffstore(self, client, dest, key, *keys):
result = self.handle_sdiff(client, key, *keys)
if not isinstance(result, list):
return BAD_VALUE
client.table[dest] = set(result)
self.debug(client, 'SDIFFSTORE %s %s %s' % (dest, key, ' '.join(keys)))
return len(result)
def handle_sinter(self, client, key, *keys):
if key not in client.table:
return []
elif not isset(client.table[key]):
return BAD_VALUE
result = client.table[key]
for inter_key in keys:
if inter_key not in client.table:
continue
elif not isset(client.table[inter_key]):
return BAD_VALUE
result = result.intersection(client.table[inter_key])
self.debug(client, 'SINTER %s %s' % (key, ' '.join(keys)))
return list(result)
def handle_sinterstore(self, client, dest, key, *keys):
result = self.handle_sinter(client, key, *keys)
if not isinstance(result, list):
return BAD_VALUE
client.table[dest] = set(result)
self.debug(client, 'SINTERSTORE %s %s %s' % (dest, key, ' '.join(keys)))
return len(result)
def handle_sismember(self, client, key, data):
if key not in client.table:
return 0
elif not isset(client.table[key]):
return BAD_VALUE
self.debug(client, 'SISMEMBER %s %s' % (key, data))
return 1 if data in client.table[key] else 0
def handle_smembers(self, client, key):
if key not in client.table:
client.table[key] = set()
elif not isset(client.table[key]):
return BAD_VALUE
self.debug(client, 'SMEMBERS %s' % (key,))
return list(client.table[key])
def handle_smove(self, client, src, dest, data):
if src not in client.table:
return EMPTY_LIST
elif not isset(client.table[src]):
return BAD_VALUE
if dest not in client.table:
client.table[dest] = set()
elif not isset(client.table[dest]):
return BAD_VALUE
exists = 0
if data in client.table[src]:
client.table[src].remove(data)
client.table[dest].add(data)
exists = 1
self.debug(client, 'SMOVE %s %s %s' % (src, dest, data,))
return exists
def handle_spop(self, client, key):
if key not in client.table:
return EMPTY_LIST
elif not isset(client.table[key]):
return BAD_VALUE
try:
data = random.sample(client.table[key], 1)[0]
client.table[key].remove(data)
except (KeyError, ValueError):
data = EMPTY_LIST
self.debug(client, 'SPOP %s -> %s' % (key, data))
return data
def handle_srandmember(self, client, key):
if key not in client.table:
return EMPTY_LIST
elif not isset(client.table[key]):
return BAD_VALUE
try:
data = random.sample(client.table[key], 1)[0]
except (KeyError, ValueError):
data = EMPTY_LIST
self.debug(client, 'SRANDMEMBER %s -> %s' % (key, data))
return data
def handle_srem(self, client, key, data):
if key not in client.table:
client.table[key] = set()
elif not isset(client.table[key]):
return BAD_VALUE
exists = 0
if data in client.table[key]:
client.table[key].remove(data)
exists = 1
self.debug(client, 'SREM %s %s' % (key, data))
return exists
def handle_substr(self, client, key, start, end):
data = client.table.get(key, '')
if not isscalar(data):
return BAD_VALUE
try:
start = int(start)
end = int(end)
except (TypeError, ValueError):
return RedisError('value is not an integer')
data = slice(str(data), start, end)
self.debug(client, 'SUBSTR %s %s %s -> %s' % (key, start, end, data))
return data
def handle_sunion(self, client, key, *keys):
if key not in client.table:
return []
elif not isset(client.table[key]):
return BAD_VALUE
result = client.table[key]
for union_key in keys:
if union_key not in client.table:
continue
elif not isset(client.table[union_key]):
return BAD_VALUE
result = result.union(client.table[union_key])
self.debug(client, 'SUNION %s %s' % (key, ' '.join(keys)))
return list(result)
def handle_sunionstore(self, client, dest, key, *keys):
result = self.handle_sunion(client, key, *keys)
if not isinstance(result, list):
return BAD_VALUE
client.table[dest] = set(result)
self.debug(client, 'SUNIONSTORE %s %s %s' % (dest, key, ' '.join(keys)))
return len(result)
def handle_save(self, client):
self.save()
self.debug(client, 'SAVE')
return True
def handle_select(self, client, db):
db = int(db)
self.select(client, db)
self.debug(client, 'SELECT %s' % db)
return True
def handle_set(self, client, key, data):
client.table[key] = data
self.debug(client, 'SET %s -> %s' % (key, data))
return True
def handle_setnx(self, client, key, data):
if key in client.table:
self.debug(client, 'SETNX %s -> %s FAILED' % (key, data))
return 0
client.table[key] = data
self.debug(client, 'SETNX %s -> %s' % (key, data))
return 1
def handle_shutdown(self, client):
self.debug(client, 'SHUTDOWN')
self.halt = True
self.save()
return self.handle_quit(client)
def handle_zadd(self, client, key, score, member):
if key not in client.table:
client.table[key] = dict()
elif not issortedset(client.table[key]):
return BAD_VALUE
try:
score = float(score)
except (TypeError, ValueError):
return BAD_VALUE
exists = 0
if member not in client.table[key]:
client.table[key][member] = score
exists = 1
self.debug(client, 'ZADD %s %s %s' % (key, member, score))
return exists
def handle_zrem(self, client, key, member):
if key not in client.table:
client.table[key] = dict()
elif not issortedset(client.table[key]):
return BAD_VALUE
exists = 0
if member in client.table[key]:
client.table[key].pop(member)
exists = 1
self.debug(client, 'ZREM %s %s' % (key, member))
return exists
def handle_zincrby(self, client, key, score, member):
if key not in client.table:
client.table[key] = dict()
elif not issortedset(client.table[key]):
return BAD_VALUE
try:
score = float(score)
except (TypeError, ValueError):
return RedisError('value is not an integer')
client.table[key].setdefault(member, 0.0)
client.table[key][member] += score
self.debug(client, 'ZINCRBY %s %s -> %s' % (key, score, client.table[key]))
return client.table[key][member]
def handle_zrank(self, client, key, member, reverse=False):
if key not in client.table:
client.table[key] = dict()
elif not issortedset(client.table[key]):
return BAD_VALUE
if member not in client.table[key]:
return EMPTY_SCALAR
l = sorted(client.table[key].values(), reverse=reverse).index(client.table[key][member])
self.debug(client, 'Z%sRANK %s %s -> %s' % ('REV' if reverse else '', key, member, l))
return l
def handle_zrevrank(self, client, key, member):
return self.handle_zrank(client, key, member, True)
def handle_zrange(self, client, key, low, high, withscores=False, reverse=False):
try:
low, high = int(low), int(high)
except (TypeError, ValueError):
return BAD_VALUE
if key not in client.table:
return []
if not issortedset(client.table[key]):
return BAD_VALUE
l = slice(sorted(client.table[key].items(), key=key_x(1), reverse=reverse), low, high)
self.debug(client, 'Z%sRANGE %s %s %s -> %s' % ('REV' if reverse else '', key, low, high-1 if high else -1, l))
if withscores:
return list(sum(l, ()))
return [t[0] for t in l]
def handle_zrevrange(self, client, key, low, high, withscores=False):
return self.handle_zrange(client, key, low, high, withscores=withscores, reverse=True)
def handle_zcount(self, client, key, low, high):
try:
low, high = xfloat(low), xfloat(high)
except (TypeError, ValueError):
return BAD_VALUE
if key not in client.table:
return []
if not issortedset(client.table[key]):
return BAD_VALUE
l = filter(score_in_range(low, high), sorted(client.table[key].items(), key=key_x(1)))
self.debug(client, 'ZCOUNT %s %s %s -> %s' % (key, low, high, len(l)))
return len(l)
def handle_zrangebyscore(self, client, key, low, high, limit=False, offset=None, count=None, withscores=False):
try:
low, high = xfloat(low), xfloat(high)
except (TypeError, ValueError):
return BAD_VALUE
if key not in client.table:
return []
if not issortedset(client.table[key]):
return BAD_VALUE
l = sorted(filter(score_in_range(low, high), client.table[key].items()), key=key_x(1))
if limit:
if offset is not None and count is not None:
try:
offset, count = int(offset), int(count)
except (TypeError, ValueError):
return BAD_VALUE
l = slice(l, offset, count)
else:
withscores = True
self.debug(client, 'ZRANGEBYSCORE %s %s %s -> %s' % (key, low, high, l))
if withscores:
return list(sum(l, ()))
return [t[0] for t in l]
def handle_zremrangebyscore(self, client, key, low, high):
try:
low, high = xfloat(low), xfloat(high)
except (TypeError, ValueError):
return BAD_VALUE
if key not in client.table:
return []
if not issortedset(client.table[key]):
return BAD_VALUE
l = sorted(filter(score_in_range(low, high), client.table[key].items()), key=key_x(1))
l = [t[0] for t in l]
self.debug(client, 'ZREMRANGEBYSCORE %s %s %s -> %s' % (key, low, high, len(l)))
for x in l:
client.table[key].pop(x)
return len(l)
def handle_zremrangebyrank(self, client, key, low, high, withscores=False, reverse=False):
try:
low, high = int(low), int(high)
except (TypeError, ValueError):
return BAD_VALUE
if key not in client.table:
return []
if not issortedset(client.table[key]):
return BAD_VALUE
l = slice(sorted(client.table[key].items(), key=key_x(1), reverse=reverse), low, high)
l = [t[0] for t in l]
self.debug(client, 'Z%sRANGE %s %s %s -> %s' % ('REV' if reverse else '', key, low, high-1 if high else -1, l))
for x in l:
client.table[key].pop(x)
return len(l)
def handle_zcard(self, client, key):
if key not in client.table:
client.table[key] = dict()
elif not issortedset(client.table[key]):
return BAD_VALUE
data = len(client.table[key])
self.debug(client, 'ZCARD %s -> %s' % (key, data))
return data
def handle_zscore(self, client, key, member):
if key not in client.table:
client.table[key] = dict()
elif not issortedset(client.table[key]):
return BAD_VALUE
if member not in client.table[key]:
return EMPTY_SCALAR
data = client.table[key][member]
self.debug(client, 'ZSCORE %s %s -> %s' % (key, member, data))
return data
def create_daemon():
"""Detach a process from the controlling terminal and run it in the
background as a daemon.
"""
if os.name != 'posix':
# sorry, you can't daemonize on Windows
return
import resource # Import here, because Windows doesn't have it.
try:
pid = os.fork()
except OSError, e:
raise Exception, "%s [%d]" % (e.strerror, e.errno)
if pid == 0:
os.setsid()
try:
# Fork a second child and exit immediately to prevent zombies. This
# causes the second child process to be orphaned, making the init
# process responsible for its cleanup. And, since the first child is
# a session leader without a controlling terminal, it's possible for
# it to acquire one by opening a terminal in the future (System V-
# based systems). This second fork guarantees that the child is no
# longer a session leader, preventing the daemon from ever acquiring
# a controlling terminal.
pid = os.fork()
except OSError, e:
raise Exception, "%s [%d]" % (e.strerror, e.errno)
if pid == 0:
# Pick sane working dir and umask
os.chdir('/')
os.umask(0022)
else:
# use os._exit() to avoid double-flushing stdin/stdout
os._exit(0) # Exit parent (the first child) of the second child.
else:
os._exit(0) # Exit parent of the first child.
# Close all open file descriptors. This prevents the child from keeping
# open any file descriptors inherited from the parent. There is a variety
# of methods to accomplish this task. Three are listed below.
#
# Use the getrlimit method to retrieve the maximum file descriptor number
# that can be opened by this process. If there is not limit on the
# resource, use 1024.
#
maxfd = resource.getrlimit(resource.RLIMIT_NOFILE)[1]
if maxfd == resource.RLIM_INFINITY:
maxfd = 1024
for fd in xrange(maxfd):
try:
os.close(fd)
except OSError:
pass
# redirect stdout/stderr to /dev/null
os.open(os.devnull, os.O_RDWR)
# Duplicate standard input to standard output and standard error.
os.dup2(0, 1)
os.dup2(0, 2)
return 0
def main(args):
global logger
logger = logging.getLogger(LOGGER_NAME)
if not logger.handlers:
logger.addHandler(logging.StreamHandler())
parser = optparse.OptionParser()
parser.add_option('--host', help='address to listen on',
default='127.0.0.1')
parser.add_option('-p', '--port', type='int', help='port to listen on',
default=6379)
parser.add_option('-d', '--dbfile', help='database file name',
dest='db_file')
parser.add_option('-l', '--logfile', help='file to log to',
dest='log_file')
parser.add_option('--daemon', action='store_true', dest='daemonize',
help='run as a daemon (Unix only)')
parser.add_option('--user', help='user for daemon to run as')
parser.add_option('--group', help='group for daemon to run as')
parser.add_option('--pid', help='name of pid file', dest='pid_file')
opts, args = parser.parse_args()
host = opts.host
port = opts.port
log_file = os.path.abspath(opts.log_file) if opts.log_file else None
pid_file = os.path.abspath(opts.pid_file) if opts.pid_file else None
db_file = os.path.abspath(opts.db_file) if opts.db_file else None
user = opts.user
group = opts.group
daemonize = opts.daemonize
unix = os.name == 'posix'
if user or group:
if not unix:
logger.error('impersonation only works on Unix')
else:
try:
if group:
import grp
os.setgid(grp.getgrnam(group)[2])
if user:
import pwd
os.setuid(pwd.getpwnam(user)[2])
except:
logger.error('insufficient permissions to run as %s:%s', user, group)
if daemonize:
create_daemon()
if pid_file:
with open(pid_file, 'w') as f:
f.write('%s\n' % os.getpid())
if unix:
def sigterm(signum, frame):
m.stop()
def sighup(signum, frame):
m.rotate()
signal.signal(signal.SIGTERM, sigterm)
signal.signal(signal.SIGHUP, sighup)
m = MiniRedis(host=host, port=port, log_file=log_file, db_file=db_file)
try:
m.run()
except KeyboardInterrupt:
m.stop()
except Exception, e:
m.scout.report_exception(e)
if pid_file:
os.unlink(pid_file)
sys.exit(0)
if __name__ == '__main__':
main(sys.argv[1:])
|
Loading...