|
#!/usr/bin/env python
from __future__ import with_statement
import os
import sys
import settings
# We might need to activate a virtualenv.
activate_this = getattr(settings, 'VIRTUALENV_ACTIVATE', None)
if activate_this and os.path.exists(activate_this):
execfile(activate_this, dict(__file__=activate_this))
if __file__:
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
import getopt
import logging
import random
import signal
import time
from threading import Thread
from redis import Redis
from redis.exceptions import ResponseError
from kilnconfig import KilnConfig
from lock import RedisLock, RedisLockTimeout
import tasks
import indextasks
import queueutils
import ogconfig
from bugzscout import BugzScout
LOG_LEVELS = {'debug': logging.DEBUG,
'info': logging.INFO,
'warning': logging.WARNING,
'error': logging.ERROR,
'critical': logging.CRITICAL}
LOG_NAME = 'kilnd'
class Daemon(Thread):
def __init__(self, host='localhost', port=56784, db=0, conf=None):
super(Daemon, self).__init__()
self.host = host
self.port = port
self.db = db
self.logger = logging.getLogger(LOG_NAME)
self.bugzscout = BugzScout(LOG_NAME)
self.halt = True
self.conf = conf or KilnConfig()
tasks.updaterepo.daemon = self
self.setup_logging() # We do this here because we don't always use main().
def init(self):
"""
Gets run right before we enter the loop.
"""
self.redis = Redis(self.host, self.port, self.db)
queueutils.upgrade(self.redis)
self.halt = False
def stop(self):
self.halt = True
self.join()
def setup_logging(self):
"""
Set up our logger based on the daemon's config.
"""
log_file = self.conf.read('Daemon', 'LogFile', default=None)
log_level = LOG_LEVELS.get(self.conf.read('Daemon', 'LogLevel', default='error').lower(), logging.ERROR)
self.logger.setLevel(log_level)
if (not hasattr(self.logger, 'log_file') or not self.logger.log_file) and log_file:
# We've not started any threads yet, so we should be able to do this just once.
try:
self.logger.log_file = log_file
fh = logging.FileHandler(log_file)
fh.setFormatter(logging.Formatter('%(asctime)s:%(levelname)s\t%(message)s'))
self.logger.addHandler(fh)
except IOError, e:
# We can still log to stderr.
self.logger.error('Could not create log file.')
self.logger.error(e)
def run(self):
self.init()
while not self.halt:
try:
delay = self.step()
except Exception, e:
self.bugzscout.report_exception(e)
self.redis.connection.disconnect() # The connection will automatically reconnect.
delay = 1.0 # Something happened, give it a moment to cool off.
if delay:
time.sleep(delay)
def step(self):
raise NotImplemented()
class QueueDaemon(Daemon):
def failed(self, task, id, maxfail):
failcount = queueutils.mark_failed(self.redis, task, id)
if maxfail and failcount > maxfail:
self.logger.warning('%s:%s CANCELED after %s attempts', task, id, failcount)
queueutils.cancel(self.redis, task, id)
return
# :after must have already happened or we wouldn't have run, so we don't need
# to read the old value out
delay = 2 ** failcount
after = time.time() + delay + random.randint(0, delay / 2)
queueutils.reschedule(self.redis, task, id, after)
self.logger.warn('%s:%s failed %s time%s; rescheduled for %s',
task, id, failcount, 's' if failcount > 1 else '', after)
def succeeded(self, task, id):
self.logger.info('%s:%s succeeded', task, id)
queueutils.delete(self.redis, task, id)
def step(self):
bored = True
first_id = None
while True:
method, id = queueutils.next_task(self.redis)
if method is None:
break
if id == first_id:
queueutils.requeue(self.redis, method, id)
break
first_id = first_id or id
if not queueutils.should_run(self.redis, method, id):
queueutils.requeue(self.redis, method, id)
break
bored = False
f = getattr(tasks, method)
try:
result = f(self, **queueutils.data(self.redis, method, id))
except TypeError, e:
# We called with too few arguments, which means this job was
# requeued for some reason, and has already succeeded.
result = True
except Exception, e:
self.bugzscout.report_exception(e)
result = False
if result:
self.succeeded(method, id)
queueutils.delete(self.redis, method, id)
else:
self.failed(method, id, getattr(f, 'max_fail', None))
if bored:
return 0.5
INIT_KEY = 'opengrok:firstrun:1'
INDEX_LOCK = 'opengrok:index:lock'
INDEX_QUEUE = 'opengrok:index'
INDEX_NEXT = 'opengrok:index:nextid'
UPDATE_QUEUE = 'opengrok:update'
CONFIG_LOCK = 'opengrok:config:lock'
CONFIG_NEXTRUN = 'opengrok:config:nextrun'
class IndexDaemon(Daemon):
def __init__(self, *args, **kwargs):
super(IndexDaemon, self).__init__(*args, **kwargs)
self.can_index = os.path.exists(self.conf.read('OpenGrok', 'Jar'))
if not self.can_index:
self.bugzscout.report_error('Cannot find OpenGrok for indexing')
def init(self):
super(IndexDaemon, self).init()
self.update_index_to_zsets()
def update_index_to_zsets(self):
try:
# Test that INDEX_QUEUE has been converted to a sorted set.
self.redis.zcard(INDEX_QUEUE)
except ResponseError:
# It was the wrong type. Delete so we can use it as a sorted set.
self.redis.delete(INDEX_QUEUE)
def step(self):
if self.redis.setnx(INIT_KEY, True):
repos = indextasks.get_all_repos(self.conf)
for repo in repos:
queueutils.enqueue(self.redis, 'updaterepo', {'repo': repo}, 'low')
return None
try:
with RedisLock(self.redis, CONFIG_LOCK):
if self.can_index and time.time() > float(self.redis.get(CONFIG_NEXTRUN) or 0):
ogconfig.update_config()
next_run = time.time() + (float(self.conf.read('OpenGrok', 'ConfigUpdateInterval', default=15)) * 60)
self.redis.set(CONFIG_NEXTRUN, next_run)
return None
except RedisLockTimeout:
pass
repo = None
try:
with RedisLock(self.redis, INDEX_LOCK, timeout=60):
if self.can_index and time.time() > float(self.redis.get('opengrok:index:nextrun') or 0):
repos = self.redis.zrange(INDEX_QUEUE, 0, 0)
if repos:
repo = repos[0]
self.redis.zrem(INDEX_QUEUE, repo)
except RedisLockTimeout:
return None
if repo:
result = indextasks.index(repo, self.conf)
self.save()
if result:
return None
else:
# Something happened, re-enqueue.
self.redis.lpush('opengrok:cancelations', repo)
return 0.250
return 0.250
def save(self):
queueutils.save(self.redis)
def create_daemon():
"""Detach a process from the controlling terminal and run it in the
background as a daemon.
"""
if os.name != 'posix':
# sorry, you can't daemonize on Windows
return
import resource # Import here, because Windows doesn't have it.
try:
pid = os.fork()
except OSError, e:
raise Exception, "%s [%d]" % (e.strerror, e.errno)
if pid == 0:
os.setsid()
try:
# Fork a second child and exit immediately to prevent zombies. This
# causes the second child process to be orphaned, making the init
# process responsible for its cleanup. And, since the first child is
# a session leader without a controlling terminal, it's possible for
# it to acquire one by opening a terminal in the future (System V-
# based systems). This second fork guarantees that the child is no
# longer a session leader, preventing the daemon from ever acquiring
# a controlling terminal.
pid = os.fork()
except OSError, e:
raise Exception, "%s [%d]" % (e.strerror, e.errno)
if pid == 0:
# Pick sane working dir and umask
os.chdir('/')
os.umask(0022)
else:
# use os._exit() to avoid double-flushing stdin/stdout
os._exit(0) # Exit parent (the first child) of the second child.
else:
os._exit(0) # Exit parent of the first child.
# Close all open file descriptors. This prevents the child from keeping
# open any file descriptors inherited from the parent. There is a variety
# of methods to accomplish this task. Three are listed below.
#
# Use the getrlimit method to retrieve the maximum file descriptor number
# that can be opened by this process. If there is not limit on the
# resource, use 1024.
#
maxfd = resource.getrlimit(resource.RLIMIT_NOFILE)[1]
if maxfd == resource.RLIM_INFINITY:
maxfd = 1024
for fd in xrange(maxfd):
try:
os.close(fd)
except OSError:
pass
# redirect stdout/stderr to /dev/null
os.open(os.devnull, os.O_RDWR)
# Duplicate standard input to standard output and standard error.
os.dup2(0, 1)
os.dup2(0, 2)
return 0
RUN = True
def sigterm(signum, frame):
global RUN
RUN = False
def main(args):
global logger
sec = 'Daemon'
opts = dict(getopt.getopt(args, 'h:p:c:d')[0])
conf_file = opts.get('-c', None)
conf = KilnConfig(conf_file)
host = opts.get('-h', None) or conf.read(sec, 'host', default='localhost')
port = int(opts.get('-p', None) or conf.read(sec, 'port', default=6379))
daemonize = ('-d' in opts ) or (conf.read(sec, 'daemonize', default='false').lower() == 'true')
db = int(conf.read(sec, 'db', default=0))
pid_file = conf.read(sec, 'pid_file', default=None)
user = conf.read(sec, 'user', default=None)
group = conf.read(sec, 'group', default=None)
queue_threads = int(conf.read(sec, 'QueueThreads', default=1))
index_threads = int(conf.read(sec, 'IndexThreads', default=0))
if user or group:
try:
if group:
import grp
os.setgid(grp.getgrnam(group)[2])
if user:
import pwd
os.setuid(pwd.getpwnam(user)[2])
except Exception, e:
BugzScout(LOG_NAME).report_exception(e, 'Could not run as %s:%s' % (user, group))
sys.stderr.write(e)
sys.stderr.write('Could not run as %s:%s\n' % (user, group))
raise
if daemonize:
create_daemon()
# create_daemon automatically exits appropriately if necessary
logger = logging.getLogger(LOG_NAME)
if not logger.handlers:
logger.addHandler(logging.StreamHandler()) # At least log to stderr
if pid_file:
with open(pid_file, 'w') as f:
f.write('%s\n' % os.getpid())
logger.info('launching %s daemons for %s:%s (db %s)', (queue_threads + index_threads), host, port, db)
daemons = [QueueDaemon(host=host, port=port, db=db, conf=conf) for x in xrange(queue_threads)]
daemons += [IndexDaemon(host=host, port=port, db=db, conf=conf) for x in xrange(index_threads)]
for d in daemons:
d.start()
signal.signal(signal.SIGTERM, sigterm)
while RUN:
try:
signal.pause()
except KeyboardInterrupt:
break
logger.info('Stopping...')
for d in daemons:
d.stop()
logger.info('Stopped')
if pid_file:
os.unlink(pid_file)
if __name__ == '__main__':
main(sys.argv[1:])
|
Loading...