BVB Source Codes

rq Show worker.py Source code

Return Download rq: download worker.py Source code - Download rq Source code - Type:.py
  1. # -*- coding: utf-8 -*-
  2. from __future__ import (absolute_import, division, print_function,
  3.                         unicode_literals)
  4.  
  5. import errno
  6. import logging
  7. import os
  8. import random
  9. import signal
  10. import socket
  11. import sys
  12. import time
  13. import traceback
  14. import warnings
  15. from datetime import timedelta
  16.  
  17. from redis import WatchError
  18.  
  19. from rq.compat import as_text, string_types, text_type
  20.  
  21. from .compat import PY2
  22. from .connections import get_current_connection, push_connection, pop_connection
  23. from .defaults import DEFAULT_RESULT_TTL, DEFAULT_WORKER_TTL
  24. from .exceptions import DequeueTimeout, ShutDownImminentException
  25. from .job import Job, JobStatus
  26. from .logutils import setup_loghandlers
  27. from .queue import Queue, get_failed_queue
  28. from .registry import FinishedJobRegistry, StartedJobRegistry, clean_registries
  29. from .suspension import is_suspended
  30. from .timeouts import UnixSignalDeathPenalty
  31. from .utils import (ensure_list, enum, import_attribute, make_colorizer,
  32.                     utcformat, utcnow, utcparse)
  33. from .version import VERSION
  34.  
  35. try:
  36.     from procname import setprocname
  37. except ImportError:
  38.     def setprocname(*args, **kwargs):  # noqa
  39.         pass
  40.  
  41. green = make_colorizer('darkgreen')
  42. yellow = make_colorizer('darkyellow')
  43. blue = make_colorizer('darkblue')
  44.  
  45.  
  46. logger = logging.getLogger(__name__)
  47.  
  48.  
  49. class StopRequested(Exception):
  50.     pass
  51.  
  52.  
  53. def iterable(x):
  54.     return hasattr(x, '__iter__')
  55.  
  56.  
  57. def compact(l):
  58.     return [x for x in l if x is not None]
  59.  
  60.  
  61. _signames = dict((getattr(signal, signame), signame)
  62.                  for signame in dir(signal)
  63.                  if signame.startswith('SIG') and '_' not in signame)
  64.  
  65.  
  66. def signal_name(signum):
  67.     try:
  68.         if sys.version_info[:2] >= (3, 5):
  69.             return signal.Signals(signum).name
  70.         else:
  71.             return _signames[signum]
  72.  
  73.     except KeyError:
  74.         return 'SIG_UNKNOWN'
  75.     except ValueError:
  76.         return 'SIG_UNKNOWN'
  77.  
  78.  
  79. WorkerStatus = enum(
  80.     'WorkerStatus',
  81.     STARTED='started',
  82.     SUSPENDED='suspended',
  83.     BUSY='busy',
  84.     IDLE='idle'
  85. )
  86.  
  87.  
  88. class Worker(object):
  89.     redis_worker_namespace_prefix = 'rq:worker:'
  90.     redis_workers_keys = 'rq:workers'
  91.     death_penalty_class = UnixSignalDeathPenalty
  92.     queue_class = Queue
  93.     job_class = Job
  94.  
  95.     @classmethod
  96.     def all(cls, connection=None):
  97.         """Returns an iterable of all Workers.
  98.        """
  99.         if connection is None:
  100.             connection = get_current_connection()
  101.         reported_working = connection.smembers(cls.redis_workers_keys)
  102.         workers = [cls.find_by_key(as_text(key), connection)
  103.                    for key in reported_working]
  104.         return compact(workers)
  105.  
  106.     @classmethod
  107.     def find_by_key(cls, worker_key, connection=None):
  108.         """Returns a Worker instance, based on the naming conventions for
  109.        naming the internal Redis keys.  Can be used to reverse-lookup Workers
  110.        by their Redis keys.
  111.        """
  112.         prefix = cls.redis_worker_namespace_prefix
  113.         if not worker_key.startswith(prefix):
  114.             raise ValueError('Not a valid RQ worker key: {0}'.format(worker_key))
  115.  
  116.         if connection is None:
  117.             connection = get_current_connection()
  118.         if not connection.exists(worker_key):
  119.             connection.srem(cls.redis_workers_keys, worker_key)
  120.             return None
  121.  
  122.         name = worker_key[len(prefix):]
  123.         worker = cls([], name, connection=connection)
  124.         queues = as_text(connection.hget(worker.key, 'queues'))
  125.         worker._state = as_text(connection.hget(worker.key, 'state') or '?')
  126.         worker._job_id = connection.hget(worker.key, 'current_job') or None
  127.         if queues:
  128.             worker.queues = [cls.queue_class(queue, connection=connection)
  129.                              for queue in queues.split(',')]
  130.         return worker
  131.  
  132.     def __init__(self, queues, name=None,
  133.                  default_result_ttl=None, connection=None, exc_handler=None,
  134.                  exception_handlers=None, default_worker_ttl=None,
  135.                  job_class=None, queue_class=None):  # noqa
  136.         if connection is None:
  137.             connection = get_current_connection()
  138.         self.connection = connection
  139.  
  140.         if job_class is not None:
  141.             if isinstance(job_class, string_types):
  142.                 job_class = import_attribute(job_class)
  143.             self.job_class = job_class
  144.  
  145.         if queue_class is not None:
  146.             if isinstance(queue_class, string_types):
  147.                 queue_class = import_attribute(queue_class)
  148.             self.queue_class = queue_class
  149.  
  150.         queues = [self.queue_class(name=q, connection=connection)
  151.                   if isinstance(q, string_types) else q
  152.                   for q in ensure_list(queues)]
  153.         self._name = name
  154.         self.queues = queues
  155.         self.validate_queues()
  156.         self._exc_handlers = []
  157.  
  158.         if default_result_ttl is None:
  159.             default_result_ttl = DEFAULT_RESULT_TTL
  160.         self.default_result_ttl = default_result_ttl
  161.  
  162.         if default_worker_ttl is None:
  163.             default_worker_ttl = DEFAULT_WORKER_TTL
  164.         self.default_worker_ttl = default_worker_ttl
  165.  
  166.         self._state = 'starting'
  167.         self._is_horse = False
  168.         self._horse_pid = 0
  169.         self._stop_requested = False
  170.         self.log = logger
  171.         self.failed_queue = get_failed_queue(connection=self.connection)
  172.         self.last_cleaned_at = None
  173.  
  174.         # By default, push the "move-to-failed-queue" exception handler onto
  175.         # the stack
  176.         if exception_handlers is None:
  177.             self.push_exc_handler(self.move_to_failed_queue)
  178.             if exc_handler is not None:
  179.                 self.push_exc_handler(exc_handler)
  180.                 warnings.warn(
  181.                     "use of exc_handler is deprecated, pass a list to exception_handlers instead.",
  182.                     DeprecationWarning
  183.                 )
  184.         elif isinstance(exception_handlers, list):
  185.             for h in exception_handlers:
  186.                 self.push_exc_handler(h)
  187.         elif exception_handlers is not None:
  188.             self.push_exc_handler(exception_handlers)
  189.  
  190.     def validate_queues(self):
  191.         """Sanity check for the given queues."""
  192.         for queue in self.queues:
  193.             if not isinstance(queue, self.queue_class):
  194.                 raise TypeError('{0} is not of type {1} or string types'.format(queue, self.queue_class))
  195.  
  196.     def queue_names(self):
  197.         """Returns the queue names of this worker's queues."""
  198.         return list(map(lambda q: q.name, self.queues))
  199.  
  200.     def queue_keys(self):
  201.         """Returns the Redis keys representing this worker's queues."""
  202.         return list(map(lambda q: q.key, self.queues))
  203.  
  204.     @property
  205.     def name(self):
  206.         """Returns the name of the worker, under which it is registered to the
  207.        monitoring system.
  208.  
  209.        By default, the name of the worker is constructed from the current
  210.        (short) host name and the current PID.
  211.        """
  212.         if self._name is None:
  213.             hostname = socket.gethostname()
  214.             shortname, _, _ = hostname.partition('.')
  215.             self._name = '{0}.{1}'.format(shortname, self.pid)
  216.         return self._name
  217.  
  218.     @property
  219.     def key(self):
  220.         """Returns the worker's Redis hash key."""
  221.         return self.redis_worker_namespace_prefix + self.name
  222.  
  223.     @property
  224.     def pid(self):
  225.         """The current process ID."""
  226.         return os.getpid()
  227.  
  228.     @property
  229.     def horse_pid(self):
  230.         """The horse's process ID.  Only available in the worker.  Will return
  231.        0 in the horse part of the fork.
  232.        """
  233.         return self._horse_pid
  234.  
  235.     @property
  236.     def is_horse(self):
  237.         """Returns whether or not this is the worker or the work horse."""
  238.         return self._is_horse
  239.  
  240.     def procline(self, message):
  241.         """Changes the current procname for the process.
  242.  
  243.        This can be used to make `ps -ef` output more readable.
  244.        """
  245.         setprocname('rq: {0}'.format(message))
  246.  
  247.     def register_birth(self):
  248.         """Registers its own birth."""
  249.         self.log.debug('Registering birth of worker {0}'.format(self.name))
  250.         if self.connection.exists(self.key) and \
  251.                 not self.connection.hexists(self.key, 'death'):
  252.             msg = 'There exists an active worker named {0!r} already'
  253.             raise ValueError(msg.format(self.name))
  254.         key = self.key
  255.         queues = ','.join(self.queue_names())
  256.         with self.connection._pipeline() as p:
  257.             p.delete(key)
  258.             p.hset(key, 'birth', utcformat(utcnow()))
  259.             p.hset(key, 'queues', queues)
  260.             p.sadd(self.redis_workers_keys, key)
  261.             p.expire(key, self.default_worker_ttl)
  262.             p.execute()
  263.  
  264.     def register_death(self):
  265.         """Registers its own death."""
  266.         self.log.debug('Registering death')
  267.         with self.connection._pipeline() as p:
  268.             # We cannot use self.state = 'dead' here, because that would
  269.             # rollback the pipeline
  270.             p.srem(self.redis_workers_keys, self.key)
  271.             p.hset(self.key, 'death', utcformat(utcnow()))
  272.             p.expire(self.key, 60)
  273.             p.execute()
  274.  
  275.     def set_shutdown_requested_date(self):
  276.         """Sets the date on which the worker received a (warm) shutdown request"""
  277.         self.connection.hset(self.key, 'shutdown_requested_date', utcformat(utcnow()))
  278.  
  279.     @property
  280.     def birth_date(self):
  281.         """Fetches birth date from Redis."""
  282.         birth_timestamp = self.connection.hget(self.key, 'birth')
  283.         if birth_timestamp is not None:
  284.             return utcparse(as_text(birth_timestamp))
  285.  
  286.     @property
  287.     def shutdown_requested_date(self):
  288.         """Fetches shutdown_requested_date from Redis."""
  289.         shutdown_requested_timestamp = self.connection.hget(self.key, 'shutdown_requested_date')
  290.         if shutdown_requested_timestamp is not None:
  291.             return utcparse(as_text(shutdown_requested_timestamp))
  292.  
  293.     @property
  294.     def death_date(self):
  295.         """Fetches death date from Redis."""
  296.         death_timestamp = self.connection.hget(self.key, 'death')
  297.         if death_timestamp is not None:
  298.             return utcparse(as_text(death_timestamp))
  299.  
  300.     def set_state(self, state, pipeline=None):
  301.         self._state = state
  302.         connection = pipeline if pipeline is not None else self.connection
  303.         connection.hset(self.key, 'state', state)
  304.  
  305.     def _set_state(self, state):
  306.         """Raise a DeprecationWarning if ``worker.state = X`` is used"""
  307.         warnings.warn(
  308.             "worker.state is deprecated, use worker.set_state() instead.",
  309.             DeprecationWarning
  310.         )
  311.         self.set_state(state)
  312.  
  313.     def get_state(self):
  314.         return self._state
  315.  
  316.     def _get_state(self):
  317.         """Raise a DeprecationWarning if ``worker.state == X`` is used"""
  318.         warnings.warn(
  319.             "worker.state is deprecated, use worker.get_state() instead.",
  320.             DeprecationWarning
  321.         )
  322.         return self.get_state()
  323.  
  324.     state = property(_get_state, _set_state)
  325.  
  326.     def set_current_job_id(self, job_id, pipeline=None):
  327.         connection = pipeline if pipeline is not None else self.connection
  328.  
  329.         if job_id is None:
  330.             connection.hdel(self.key, 'current_job')
  331.         else:
  332.             connection.hset(self.key, 'current_job', job_id)
  333.  
  334.     def get_current_job_id(self, pipeline=None):
  335.         connection = pipeline if pipeline is not None else self.connection
  336.         return as_text(connection.hget(self.key, 'current_job'))
  337.  
  338.     def get_current_job(self):
  339.         """Returns the job id of the currently executing job."""
  340.         job_id = self.get_current_job_id()
  341.  
  342.         if job_id is None:
  343.             return None
  344.  
  345.         return self.job_class.fetch(job_id, self.connection)
  346.  
  347.     def _install_signal_handlers(self):
  348.         """Installs signal handlers for handling SIGINT and SIGTERM
  349.        gracefully.
  350.        """
  351.  
  352.         signal.signal(signal.SIGINT, self.request_stop)
  353.         signal.signal(signal.SIGTERM, self.request_stop)
  354.  
  355.     def request_force_stop(self, signum, frame):
  356.         """Terminates the application (cold shutdown).
  357.        """
  358.         self.log.warning('Cold shut down')
  359.  
  360.         # Take down the horse with the worker
  361.         if self.horse_pid:
  362.             msg = 'Taking down horse {0} with me'.format(self.horse_pid)
  363.             self.log.debug(msg)
  364.             try:
  365.                 os.kill(self.horse_pid, signal.SIGKILL)
  366.             except OSError as e:
  367.                 # ESRCH ("No such process") is fine with us
  368.                 if e.errno != errno.ESRCH:
  369.                     self.log.debug('Horse already down')
  370.                     raise
  371.         raise SystemExit()
  372.  
  373.     def request_stop(self, signum, frame):
  374.         """Stops the current worker loop but waits for child processes to
  375.        end gracefully (warm shutdown).
  376.        """
  377.         self.log.debug('Got signal {0}'.format(signal_name(signum)))
  378.  
  379.         signal.signal(signal.SIGINT, self.request_force_stop)
  380.         signal.signal(signal.SIGTERM, self.request_force_stop)
  381.  
  382.         self.handle_warm_shutdown_request()
  383.  
  384.         # If shutdown is requested in the middle of a job, wait until
  385.         # finish before shutting down and save the request in redis
  386.         if self.get_state() == 'busy':
  387.             self._stop_requested = True
  388.             self.set_shutdown_requested_date()
  389.             self.log.debug('Stopping after current horse is finished. '
  390.                            'Press Ctrl+C again for a cold shutdown.')
  391.         else:
  392.             raise StopRequested()
  393.  
  394.     def handle_warm_shutdown_request(self):
  395.         self.log.warning('Warm shut down requested')
  396.  
  397.     def check_for_suspension(self, burst):
  398.         """Check to see if workers have been suspended by `rq suspend`"""
  399.  
  400.         before_state = None
  401.         notified = False
  402.  
  403.         while not self._stop_requested and is_suspended(self.connection):
  404.  
  405.             if burst:
  406.                 self.log.info('Suspended in burst mode, exiting')
  407.                 self.log.info('Note: There could still be unfinished jobs on the queue')
  408.                 raise StopRequested
  409.  
  410.             if not notified:
  411.                 self.log.info('Worker suspended, run `rq resume` to resume')
  412.                 before_state = self.get_state()
  413.                 self.set_state(WorkerStatus.SUSPENDED)
  414.                 notified = True
  415.             time.sleep(1)
  416.  
  417.         if before_state:
  418.             self.set_state(before_state)
  419.  
  420.     def work(self, burst=False, logging_level="INFO"):
  421.         """Starts the work loop.
  422.  
  423.        Pops and performs all jobs on the current list of queues.  When all
  424.        queues are empty, block and wait for new jobs to arrive on any of the
  425.        queues, unless `burst` mode is enabled.
  426.  
  427.        The return value indicates whether any jobs were processed.
  428.        """
  429.         setup_loghandlers(logging_level)
  430.         self._install_signal_handlers()
  431.  
  432.         did_perform_work = False
  433.         self.register_birth()
  434.         self.log.info("RQ worker {0!r} started, version {1}".format(self.key, VERSION))
  435.         self.set_state(WorkerStatus.STARTED)
  436.  
  437.         try:
  438.             while True:
  439.                 try:
  440.                     self.check_for_suspension(burst)
  441.  
  442.                     if self.should_run_maintenance_tasks:
  443.                         self.clean_registries()
  444.  
  445.                     if self._stop_requested:
  446.                         self.log.info('Stopping on request')
  447.                         break
  448.  
  449.                     timeout = None if burst else max(1, self.default_worker_ttl - 60)
  450.  
  451.                     result = self.dequeue_job_and_maintain_ttl(timeout)
  452.                     if result is None:
  453.                         if burst:
  454.                             self.log.info("RQ worker {0!r} done, quitting".format(self.key))
  455.                         break
  456.                 except StopRequested:
  457.                     break
  458.  
  459.                 job, queue = result
  460.                 self.execute_job(job, queue)
  461.                 self.heartbeat()
  462.  
  463.                 did_perform_work = True
  464.  
  465.         finally:
  466.             if not self.is_horse:
  467.                 self.register_death()
  468.         return did_perform_work
  469.  
  470.     def dequeue_job_and_maintain_ttl(self, timeout):
  471.         result = None
  472.         qnames = self.queue_names()
  473.  
  474.         self.set_state(WorkerStatus.IDLE)
  475.         self.procline('Listening on {0}'.format(','.join(qnames)))
  476.         self.log.info('')
  477.         self.log.info('*** Listening on {0}...'.format(green(', '.join(qnames))))
  478.  
  479.         while True:
  480.             self.heartbeat()
  481.  
  482.             try:
  483.                 result = self.queue_class.dequeue_any(self.queues, timeout,
  484.                                                       connection=self.connection)
  485.                 if result is not None:
  486.                     job, queue = result
  487.                     self.log.info('{0}: {1} ({2})'.format(green(queue.name),
  488.                                                           blue(job.description), job.id))
  489.  
  490.                 break
  491.             except DequeueTimeout:
  492.                 pass
  493.  
  494.         self.heartbeat()
  495.         return result
  496.  
  497.     def heartbeat(self, timeout=0, pipeline=None):
  498.         """Specifies a new worker timeout, typically by extending the
  499.        expiration time of the worker, effectively making this a "heartbeat"
  500.        to not expire the worker until the timeout passes.
  501.  
  502.        The next heartbeat should come before this time, or the worker will
  503.        die (at least from the monitoring dashboards).
  504.  
  505.        The effective timeout can never be shorter than default_worker_ttl,
  506.        only larger.
  507.        """
  508.         timeout = max(timeout, self.default_worker_ttl)
  509.         connection = pipeline if pipeline is not None else self.connection
  510.         connection.expire(self.key, timeout)
  511.         self.log.debug('Sent heartbeat to prevent worker timeout. '
  512.                        'Next one should arrive within {0} seconds.'.format(timeout))
  513.  
  514.     def fork_work_horse(self, job, queue):
  515.         """Spawns a work horse to perform the actual work and passes it a job.
  516.        """
  517.         child_pid = os.fork()
  518.         os.environ['RQ_WORKER_ID'] = self.name
  519.         os.environ['RQ_JOB_ID'] = job.id
  520.         if child_pid == 0:
  521.             self.main_work_horse(job, queue)
  522.         else:
  523.             self._horse_pid = child_pid
  524.             self.procline('Forked {0} at {1}'.format(child_pid, time.time()))
  525.  
  526.     def monitor_work_horse(self, job):
  527.         """The worker will monitor the work horse and make sure that it
  528.        either executes successfully or the status of the job is set to
  529.        failed
  530.        """
  531.         while True:
  532.             try:
  533.                 _, ret_val = os.waitpid(self._horse_pid, 0)
  534.                 if ret_val != os.EX_OK:
  535.                     job_status = job.get_status()
  536.                     if job_status is None:
  537.                         # Job completed and its ttl has expired
  538.                         break
  539.                     if job_status not in [JobStatus.FINISHED, JobStatus.FAILED]:
  540.                         self.handle_job_failure(
  541.                             job=job
  542.                         )
  543.  
  544.                         # Unhandled failure: move the job to the failed queue
  545.                         self.log.warning(
  546.                             'Moving job to {0!r} queue'.format(
  547.                                 self.failed_queue.name
  548.                             )
  549.                         )
  550.                         self.failed_queue.quarantine(
  551.                             job,
  552.                             exc_info=(
  553.                                 "Work-horse proccess "
  554.                                 "was terminated unexpectedly"
  555.                             )
  556.                         )
  557.                 break
  558.             except OSError as e:
  559.                 # In case we encountered an OSError due to EINTR (which is
  560.                 # caused by a SIGINT or SIGTERM signal during
  561.                 # os.waitpid()), we simply ignore it and enter the next
  562.                 # iteration of the loop, waiting for the child to end.  In
  563.                 # any other case, this is some other unexpected OS error,
  564.                 # which we don't want to catch, so we re-raise those ones.
  565.                 if e.errno != errno.EINTR:
  566.                     raise
  567.  
  568.     def execute_job(self, job, queue):
  569.         """Spawns a work horse to perform the actual work and passes it a job.
  570.        The worker will wait for the work horse and make sure it executes
  571.        within the given timeout bounds, or will end the work horse with
  572.        SIGALRM.
  573.        """
  574.         self.set_state('busy')
  575.         self.fork_work_horse(job, queue)
  576.         self.monitor_work_horse(job)
  577.         self.set_state('idle')
  578.  
  579.     def main_work_horse(self, job, queue):
  580.         """This is the entry point of the newly spawned work horse."""
  581.         # After fork()'ing, always assure we are generating random sequences
  582.         # that are different from the worker.
  583.         random.seed()
  584.  
  585.         self.setup_work_horse_signals()
  586.  
  587.         self._is_horse = True
  588.         self.log = logger
  589.  
  590.         success = self.perform_job(job, queue)
  591.  
  592.         # os._exit() is the way to exit from childs after a fork(), in
  593.         # constrast to the regular sys.exit()
  594.         os._exit(int(not success))
  595.  
  596.     def setup_work_horse_signals(self):
  597.         """Setup signal handing for the newly spawned work horse."""
  598.         # Always ignore Ctrl+C in the work horse, as it might abort the
  599.         # currently running job.
  600.         # The main worker catches the Ctrl+C and requests graceful shutdown
  601.         # after the current work is done.  When cold shutdown is requested, it
  602.         # kills the current job anyway.
  603.         signal.signal(signal.SIGINT, signal.SIG_IGN)
  604.         signal.signal(signal.SIGTERM, signal.SIG_DFL)
  605.  
  606.     def prepare_job_execution(self, job):
  607.         """Performs misc bookkeeping like updating states prior to
  608.        job execution.
  609.        """
  610.         timeout = (job.timeout or 180) + 60
  611.  
  612.         with self.connection._pipeline() as pipeline:
  613.             self.set_state(WorkerStatus.BUSY, pipeline=pipeline)
  614.             self.set_current_job_id(job.id, pipeline=pipeline)
  615.             self.heartbeat(timeout, pipeline=pipeline)
  616.             registry = StartedJobRegistry(job.origin, self.connection)
  617.             registry.add(job, timeout, pipeline=pipeline)
  618.             job.set_status(JobStatus.STARTED, pipeline=pipeline)
  619.             self.connection._hset(job.key, 'started_at',
  620.                                   utcformat(utcnow()), pipeline)
  621.             pipeline.execute()
  622.  
  623.         msg = 'Processing {0} from {1} since {2}'
  624.         self.procline(msg.format(job.func_name, job.origin, time.time()))
  625.  
  626.     def handle_job_failure(
  627.         self,
  628.         job,
  629.         started_job_registry=None
  630.     ):
  631.         """Handles the failure or an executing job by:
  632.            1. Setting the job status to failed
  633.            2. Removing the job from the started_job_registry
  634.            3. Setting the workers current job to None
  635.        """
  636.  
  637.         with self.connection._pipeline() as pipeline:
  638.             if started_job_registry is None:
  639.                 started_job_registry = StartedJobRegistry(
  640.                     job.origin,
  641.                     self.connection
  642.                 )
  643.             job.set_status(JobStatus.FAILED, pipeline=pipeline)
  644.             started_job_registry.remove(job, pipeline=pipeline)
  645.             self.set_current_job_id(None, pipeline=pipeline)
  646.             try:
  647.                 pipeline.execute()
  648.             except Exception:
  649.                 # Ensure that custom exception handlers are called
  650.                 # even if Redis is down
  651.                 pass
  652.  
  653.     def handle_job_success(
  654.         self,
  655.         job,
  656.         queue,
  657.         started_job_registry
  658.     ):
  659.         with self.connection._pipeline() as pipeline:
  660.             while True:
  661.                 try:
  662.                     # if dependencies are inserted after enqueue_dependents
  663.                     # a WatchError is thrown by execute()
  664.                     pipeline.watch(job.dependents_key)
  665.                     # enqueue_dependents calls multi() on the pipeline!
  666.                     queue.enqueue_dependents(job, pipeline=pipeline)
  667.  
  668.                     self.set_current_job_id(None, pipeline=pipeline)
  669.  
  670.                     result_ttl = job.get_result_ttl(self.default_result_ttl)
  671.                     if result_ttl != 0:
  672.                         job.set_status(JobStatus.FINISHED, pipeline=pipeline)
  673.                         job.save(pipeline=pipeline)
  674.  
  675.                         finished_job_registry = FinishedJobRegistry(job.origin,
  676.                                                                     self.connection)
  677.                         finished_job_registry.add(job, result_ttl, pipeline)
  678.  
  679.                     job.cleanup(result_ttl, pipeline=pipeline,
  680.                                 remove_from_queue=False)
  681.                     started_job_registry.remove(job, pipeline=pipeline)
  682.  
  683.                     pipeline.execute()
  684.                     break
  685.                 except WatchError:
  686.                     continue
  687.  
  688.     def perform_job(self, job, queue):
  689.         """Performs the actual work of a job.  Will/should only be called
  690.        inside the work horse's process.
  691.        """
  692.         self.prepare_job_execution(job)
  693.  
  694.         push_connection(self.connection)
  695.  
  696.         started_job_registry = StartedJobRegistry(job.origin, self.connection)
  697.  
  698.         try:
  699.             with self.death_penalty_class(job.timeout or self.queue_class.DEFAULT_TIMEOUT):
  700.                 rv = job.perform()
  701.  
  702.             job.ended_at = utcnow()
  703.  
  704.             # Pickle the result in the same try-except block since we need
  705.             # to use the same exc handling when pickling fails
  706.             job._result = rv
  707.  
  708.             self.handle_job_success(
  709.                 job=job,
  710.                 queue=queue,
  711.                 started_job_registry=started_job_registry
  712.             )
  713.         except Exception:
  714.             self.handle_job_failure(
  715.                 job=job,
  716.                 started_job_registry=started_job_registry
  717.             )
  718.             self.handle_exception(job, *sys.exc_info())
  719.             return False
  720.  
  721.         finally:
  722.             pop_connection()
  723.  
  724.         self.log.info('{0}: {1} ({2})'.format(green(job.origin), blue('Job OK'), job.id))
  725.         if rv is not None:
  726.             log_result = "{0!r}".format(as_text(text_type(rv)))
  727.             self.log.debug('Result: {0}'.format(yellow(log_result)))
  728.  
  729.         result_ttl = job.get_result_ttl(self.default_result_ttl)
  730.         if result_ttl == 0:
  731.             self.log.info('Result discarded immediately')
  732.         elif result_ttl > 0:
  733.             self.log.info('Result is kept for {0} seconds'.format(result_ttl))
  734.         else:
  735.             self.log.warning('Result will never expire, clean up result key manually')
  736.  
  737.         return True
  738.  
  739.     def handle_exception(self, job, *exc_info):
  740.         """Walks the exception handler stack to delegate exception handling."""
  741.         exc_string = ''.join(traceback.format_exception_only(*exc_info[:2]) +
  742.                              traceback.format_exception(*exc_info))
  743.         self.log.error(exc_string, exc_info=True, extra={
  744.             'func': job.func_name,
  745.             'arguments': job.args,
  746.             'kwargs': job.kwargs,
  747.             'queue': job.origin,
  748.         })
  749.  
  750.         for handler in reversed(self._exc_handlers):
  751.             self.log.debug('Invoking exception handler {0}'.format(handler))
  752.             fallthrough = handler(job, *exc_info)
  753.  
  754.             # Only handlers with explicit return values should disable further
  755.             # exc handling, so interpret a None return value as True.
  756.             if fallthrough is None:
  757.                 fallthrough = True
  758.  
  759.             if not fallthrough:
  760.                 break
  761.  
  762.     def move_to_failed_queue(self, job, *exc_info):
  763.         """Default exception handler: move the job to the failed queue."""
  764.         exc_string = ''.join(traceback.format_exception(*exc_info))
  765.         self.log.warning('Moving job to {0!r} queue'.format(self.failed_queue.name))
  766.         self.failed_queue.quarantine(job, exc_info=exc_string)
  767.  
  768.     def push_exc_handler(self, handler_func):
  769.         """Pushes an exception handler onto the exc handler stack."""
  770.         self._exc_handlers.append(handler_func)
  771.  
  772.     def pop_exc_handler(self):
  773.         """Pops the latest exception handler off of the exc handler stack."""
  774.         return self._exc_handlers.pop()
  775.  
  776.     def __eq__(self, other):
  777.         """Equality does not take the database/connection into account"""
  778.         if not isinstance(other, self.__class__):
  779.             raise TypeError('Cannot compare workers to other types (of workers)')
  780.         return self.name == other.name
  781.  
  782.     def __hash__(self):
  783.         """The hash does not take the database/connection into account"""
  784.         return hash(self.name)
  785.  
  786.     def clean_registries(self):
  787.         """Runs maintenance jobs on each Queue's registries."""
  788.         for queue in self.queues:
  789.             self.log.info('Cleaning registries for queue: {0}'.format(queue.name))
  790.             clean_registries(queue)
  791.         self.last_cleaned_at = utcnow()
  792.  
  793.     @property
  794.     def should_run_maintenance_tasks(self):
  795.         """Maintenance tasks should run on first startup or every hour."""
  796.         if self.last_cleaned_at is None:
  797.             return True
  798.         if (utcnow() - self.last_cleaned_at) > timedelta(hours=1):
  799.             return True
  800.         return False
  801.  
  802.  
  803. class SimpleWorker(Worker):
  804.     def main_work_horse(self, *args, **kwargs):
  805.         raise NotImplementedError("Test worker does not implement this method")
  806.  
  807.     def execute_job(self, *args, **kwargs):
  808.         """Execute job in same thread/process, do not fork()"""
  809.         return self.perform_job(*args, **kwargs)
  810.  
  811.  
  812. class HerokuWorker(Worker):
  813.     """
  814.    Modified version of rq worker which:
  815.    * stops work horses getting killed with SIGTERM
  816.    * sends SIGRTMIN to work horses on SIGTERM to the main process which in turn
  817.    causes the horse to crash `imminent_shutdown_delay` seconds later
  818.    """
  819.     imminent_shutdown_delay = 6
  820.  
  821.     frame_properties = ['f_code', 'f_lasti', 'f_lineno', 'f_locals', 'f_trace']
  822.     if PY2:
  823.         frame_properties.extend(
  824.             ['f_exc_traceback', 'f_exc_type', 'f_exc_value', 'f_restricted']
  825.         )
  826.  
  827.     def setup_work_horse_signals(self):
  828.         """Modified to ignore SIGINT and SIGTERM and only handle SIGRTMIN"""
  829.         signal.signal(signal.SIGRTMIN, self.request_stop_sigrtmin)
  830.         signal.signal(signal.SIGINT, signal.SIG_IGN)
  831.         signal.signal(signal.SIGTERM, signal.SIG_IGN)
  832.  
  833.     def handle_warm_shutdown_request(self):
  834.         """If horse is alive send it SIGRTMIN"""
  835.         if self.horse_pid != 0:
  836.             self.log.warning('Warm shut down requested, sending horse SIGRTMIN signal')
  837.             os.kill(self.horse_pid, signal.SIGRTMIN)
  838.         else:
  839.             self.log.warning('Warm shut down requested, no horse found')
  840.  
  841.     def request_stop_sigrtmin(self, signum, frame):
  842.         if self.imminent_shutdown_delay == 0:
  843.             self.log.warning('Imminent shutdown, raising ShutDownImminentException immediately')
  844.             self.request_force_stop_sigrtmin(signum, frame)
  845.         else:
  846.             self.log.warning('Imminent shutdown, raising ShutDownImminentException in %d seconds',
  847.                              self.imminent_shutdown_delay)
  848.             signal.signal(signal.SIGRTMIN, self.request_force_stop_sigrtmin)
  849.             signal.signal(signal.SIGALRM, self.request_force_stop_sigrtmin)
  850.             signal.alarm(self.imminent_shutdown_delay)
  851.  
  852.     def request_force_stop_sigrtmin(self, signum, frame):
  853.         info = dict((attr, getattr(frame, attr)) for attr in self.frame_properties)
  854.         self.log.warning('raising ShutDownImminentException to cancel job...')
  855.         raise ShutDownImminentException('shut down imminent (signal: %s)' % signal_name(signum), info)
  856.  
downloadworker.py Source code - Download rq Source code
Related Source Codes/Software:
amazon-dsstne - Deep Scalable Sparse Tensor Network Engine (DSSTNE... 2017-01-08
webpack-demos - a collection of simple demos of Webpack 2017-01-08
Squire - HTML5 rich text editor. Try the demo integration a... 2017-01-08
thor - Thor is a toolkit for building powerful command-li... 2017-01-08
glide - Package Management for Golang h... 2017-01-08
emmet-vim - emmet for vim: http://emmet.io/ ... 2017-01-08
prose - A Content Editor for GitHub. ht... 2017-01-08
sshrc - ring your .bashrc, .vimrc, etc. with you when you ... 2017-01-08
typed.js - A jQuery typing animation script. ... 2017-01-08
find - High-precision indoor positioning framework for mo... 2017-01-08
CRYENGINE - CRYENGINE is a powerful real-time game development... 2017-06-11
postal - 2017-06-11
reactide - Reactide is the first dedicated IDE for React web ... 2017-06-11
rkt - rkt is a pod-native container engine for Linux. It... 2017-06-11
uWebSockets - Tiny WebSockets https://for... 2017-06-11
realworld - TodoMVC for the RealWorld - Exemplary fullstack Me... 2017-06-11
goreplay - GoReplay is an open-source tool for capturing and ... 2017-06-10
pyenv - Simple Python version management 2017-06-10
redux-saga - An alternative side effect model for Redux apps ... 2017-06-10
angular-starter - 2017-06-10

 Back to top