"""
Simple multiprocessing server for gearman.
"""
__version__ = '0.2'
import time
import sys, traceback
import gearman
import signal
from multiprocessing import Process, Queue, cpu_count, active_children
from Queue import Empty
# Logging
import logging
log = logging.getLogger(__name__)
#
# A simple task function wrapper designed to exit gracefully and re-queue jobs
# when interrupted.
#
[docs]class Task(object):
"""
This class is a simple wrapper around worker functions that does its best to
return a job to the queue if the function receives a KeyboardInterrupt or
raises an exception.
Enable verbose to call log.error() with exception details.
"""
def __init__(self, task, callback, verbose=False):
self.task = task
self.callback = callback
self.verbose = verbose
def __call__(self, worker, job):
try:
return self.callback(worker, job)
except Exception, e:
if self.verbose:
log.error('WORKER FAILED: {0}, {1}\n{2}'.format(
self.task,
str(e),
traceback.format_exc()
))
# Disconnect so this job goes back into the queue
worker.shutdown()
# Continue with the exception
raise
#
# Main server class
#
[docs]class GearmanTaskServer(object):
"""
The main task server class.
**host_list**
List of gearman hosts to connect to. See ``gearman`` for full
documentation.
**tasks**
List of tasks. Tasks may be Task() objects, dicts, lists or tuples.
**max_workers**
Number of worker processes to launch. Defaults to
``multiprocessing.cpu_count()``
**id_prefix**
If you want your workers to register a client_id with gearman, provide
a prefix here. GearmanTaskServer will append an incrementing number
to the end of this, representing the total number of subprocesses
started in this run.
**GMWorker**
GearmanWorker class to use. Defaults to gearman.GearmanWorker.
**sighandler**
Set to False if you would prefer to use your own signal handlers
instead of trapping SIGINT and SIGTERM as KeyboardInterrupt events.
**verbose**
Set to True to enable logger.
"""
def __init__(self,
host_list, tasks, max_workers=None,
id_prefix = None, GMWorker=None, sighandler=True, verbose=False
):
self.host_list = host_list
self.tasks = tasks
self.max_workers = int(max_workers)
self.worker = GMWorker
self.id_prefix = id_prefix
self.verbose = verbose
if not self.worker:
self.worker = gearman.GearmanWorker
# Signal Handler override?
if sighandler:
self._setup_sighandler()
# Sanity check
if self.max_workers < 1:
try:
self.max_workers = int(cpu_count())
except:
self.max_workers = 1
[docs] def serve_forever(self):
"""
Launch the multi-process server and process jobs until an interrupt
is received.
"""
# Initialize a queue designed to track child processes that exit.
doneq = Queue()
# Keep track of how many clients we have created, so they can have unique IDs
process_counter = 0
# Loop
workers = []
try:
while True:
while len(workers) < self.max_workers:
process_counter += 1
client_id = None
if self.id_prefix:
client_id = '{0}{1}'.format(self.id_prefix, process_counter)
p = Process(target=_worker_process, args=(
self.tasks,
doneq,
self.host_list,
self.worker,
client_id,
self.verbose
))
p.start()
workers.append(p)
if self.verbose:
log.info("Num workers: {0} of {1}".format(len(workers), self.max_workers))
# Use the queue as a poor man's wait/select against the first
# child process to finish. Add a timeout so we can repopulate
# processes that terminate abnormally.
try:
r = doneq.get(True, 5)
except Empty:
r = None
if r is not None:
if isinstance(r, gearman.errors.ServerUnavailable):
#: @todo non-blocking doneq.get() to clear it out of
# similar errors? or maybe just reset doneq above
# when len(workers)==0
if self.verbose:
log.info("Reconnecting.")
time.sleep(2)
elif r is True:
# Job exited normally. Except this shouldn't actually happen.
log.info('Normal process exit (May actually be a problem)')
# Give things a fraction of a second to catch up, then filter out
# the finished process(es)
time.sleep(0.1)
workers = filter(lambda w: w in workers, active_children())
except KeyboardInterrupt:
log.error('EXIT. RECEIVED INTERRUPT')
def _setup_sighandler(self):
"""
Initialize our slight change to the signal handler setup
Provided as a separate function so that users can opt in to this feature,
or write their own.
"""
signal.signal(signal.SIGINT, _interrupt_handler)
signal.signal(signal.SIGTERM, _interrupt_handler)
#
# Our own interrupt handler
#
def _interrupt_handler(signum, frame):
"""
Python maps SIGINT to KeyboardInterrupt by default, but we need to
catch SIGTERM as well, so we can give jobs as much of a chance as
possible to alert the gearman server to requeue the job.
"""
raise KeyboardInterrupt()
#
# Worker process/function
#
def _worker_process(tasks, doneq, host_list, worker_class=None, client_id=None, verbose=False):
try:
if not worker_class:
worker_class = gearman.GearmanWorker
try:
# Connect
gm_worker = worker_class(host_list = host_list)
if client_id:
gm_worker.set_client_id(client_id)
# Load the tasks
for task in tasks:
taskname = callback = None
if isinstance(task, dict):
taskname = task['task']
callback = task['callback']
elif isinstance(task, list) or isinstance(task, tuple):
taskname, callback = task
else:
taskname = task.task
callback = task
if verbose:
log.info("Registering {0} task {1}".format(client_id, taskname))
gm_worker.register_task(taskname, callback)
# Enter the work loop
gm_worker.work()
except gearman.errors.ServerUnavailable, e:
doneq.put(e)
return
# Really should never touch this code but it's here just in case.
doneq.put(True)
except KeyboardInterrupt:
# Prevent this child process from printing a traceback
pass