BVB Source Codes

rq Show job.py Source code

Return Download rq: download job.py Source code - Download rq Source code - Type:.py
  1. # -*- coding: utf-8 -*-
  2. from __future__ import (absolute_import, division, print_function,
  3.                         unicode_literals)
  4.  
  5. import inspect
  6. import warnings
  7. from functools import partial
  8. from uuid import uuid4
  9.  
  10. from rq.compat import as_text, decode_redis_hash, string_types, text_type
  11.  
  12. from .connections import resolve_connection
  13. from .exceptions import NoSuchJobError, UnpickleError
  14. from .local import LocalStack
  15. from .utils import enum, import_attribute, utcformat, utcnow, utcparse
  16.  
  17. try:
  18.     import cPickle as pickle
  19. except ImportError:  # noqa
  20.     import pickle
  21.  
  22. # Serialize pickle dumps using the highest pickle protocol (binary, default
  23. # uses ascii)
  24. dumps = partial(pickle.dumps, protocol=pickle.HIGHEST_PROTOCOL)
  25. loads = pickle.loads
  26.  
  27.  
  28. JobStatus = enum(
  29.     'JobStatus',
  30.     QUEUED='queued',
  31.     FINISHED='finished',
  32.     FAILED='failed',
  33.     STARTED='started',
  34.     DEFERRED='deferred'
  35. )
  36.  
  37. # Sentinel value to mark that some of our lazily evaluated properties have not
  38. # yet been evaluated.
  39. UNEVALUATED = object()
  40.  
  41.  
  42. def unpickle(pickled_string):
  43.     """Unpickles a string, but raises a unified UnpickleError in case anything
  44.    fails.
  45.  
  46.    This is a helper method to not have to deal with the fact that `loads()`
  47.    potentially raises many types of exceptions (e.g. AttributeError,
  48.    IndexError, TypeError, KeyError, etc.)
  49.    """
  50.     try:
  51.         obj = loads(pickled_string)
  52.     except Exception as e:
  53.         raise UnpickleError('Could not unpickle', pickled_string, e)
  54.     return obj
  55.  
  56.  
  57. def cancel_job(job_id, connection=None):
  58.     """Cancels the job with the given job ID, preventing execution.  Discards
  59.    any job info (i.e. it can't be requeued later).
  60.    """
  61.     Job.fetch(job_id, connection=connection).cancel()
  62.  
  63.  
  64. def requeue_job(job_id, connection=None):
  65.     """Requeues the job with the given job ID.  If no such job exists, just
  66.    remove the job ID from the failed queue, otherwise the job ID should refer
  67.    to a failed job (i.e. it should be on the failed queue).
  68.    """
  69.     from .queue import get_failed_queue
  70.     fq = get_failed_queue(connection=connection)
  71.     fq.requeue(job_id)
  72.  
  73.  
  74. def get_current_job(connection=None):
  75.     """Returns the Job instance that is currently being executed.  If this
  76.    function is invoked from outside a job context, None is returned.
  77.    """
  78.     job_id = _job_stack.top
  79.     if job_id is None:
  80.         return None
  81.     return Job.fetch(job_id, connection=connection)
  82.  
  83.  
  84. class Job(object):
  85.     """A Job is just a convenient datastructure to pass around job (meta) data.
  86.    """
  87.  
  88.     # Job construction
  89.     @classmethod
  90.     def create(cls, func, args=None, kwargs=None, connection=None,
  91.                result_ttl=None, ttl=None, status=None, description=None,
  92.                depends_on=None, timeout=None, id=None, origin=None, meta=None):
  93.         """Creates a new Job instance for the given function, arguments, and
  94.        keyword arguments.
  95.        """
  96.         if args is None:
  97.             args = ()
  98.         if kwargs is None:
  99.             kwargs = {}
  100.  
  101.         if not isinstance(args, (tuple, list)):
  102.             raise TypeError('{0!r} is not a valid args list'.format(args))
  103.         if not isinstance(kwargs, dict):
  104.             raise TypeError('{0!r} is not a valid kwargs dict'.format(kwargs))
  105.  
  106.         job = cls(connection=connection)
  107.         if id is not None:
  108.             job.set_id(id)
  109.  
  110.         if origin is not None:
  111.             job.origin = origin
  112.  
  113.         # Set the core job tuple properties
  114.         job._instance = None
  115.         if inspect.ismethod(func):
  116.             job._instance = func.__self__
  117.             job._func_name = func.__name__
  118.         elif inspect.isfunction(func) or inspect.isbuiltin(func):
  119.             job._func_name = '{0}.{1}'.format(func.__module__, func.__name__)
  120.         elif isinstance(func, string_types):
  121.             job._func_name = as_text(func)
  122.         elif not inspect.isclass(func) and hasattr(func, '__call__'):  # a callable class instance
  123.             job._instance = func
  124.             job._func_name = '__call__'
  125.         else:
  126.             raise TypeError('Expected a callable or a string, but got: {}'.format(func))
  127.         job._args = args
  128.         job._kwargs = kwargs
  129.  
  130.         # Extra meta data
  131.         job.description = description or job.get_call_string()
  132.         job.result_ttl = result_ttl
  133.         job.ttl = ttl
  134.         job.timeout = timeout
  135.         job._status = status
  136.         job.meta = meta or {}
  137.  
  138.         # dependency could be job instance or id
  139.         if depends_on is not None:
  140.             job._dependency_id = depends_on.id if isinstance(depends_on, Job) else depends_on
  141.         return job
  142.  
  143.     def get_status(self):
  144.         self._status = as_text(self.connection.hget(self.key, 'status'))
  145.         return self._status
  146.  
  147.     def _get_status(self):
  148.         warnings.warn(
  149.             "job.status is deprecated. Use job.get_status() instead",
  150.             DeprecationWarning
  151.         )
  152.         return self.get_status()
  153.  
  154.     def set_status(self, status, pipeline=None):
  155.         self._status = status
  156.         self.connection._hset(self.key, 'status', self._status, pipeline)
  157.  
  158.     def _set_status(self, status):
  159.         warnings.warn(
  160.             "job.status is deprecated. Use job.set_status() instead",
  161.             DeprecationWarning
  162.         )
  163.         self.set_status(status)
  164.  
  165.     status = property(_get_status, _set_status)
  166.  
  167.     @property
  168.     def is_finished(self):
  169.         return self.get_status() == JobStatus.FINISHED
  170.  
  171.     @property
  172.     def is_queued(self):
  173.         return self.get_status() == JobStatus.QUEUED
  174.  
  175.     @property
  176.     def is_failed(self):
  177.         return self.get_status() == JobStatus.FAILED
  178.  
  179.     @property
  180.     def is_started(self):
  181.         return self.get_status() == JobStatus.STARTED
  182.  
  183.     @property
  184.     def dependency(self):
  185.         """Returns a job's dependency. To avoid repeated Redis fetches, we cache
  186.        job.dependency as job._dependency.
  187.        """
  188.         if self._dependency_id is None:
  189.             return None
  190.         if hasattr(self, '_dependency'):
  191.             return self._dependency
  192.         job = Job.fetch(self._dependency_id, connection=self.connection)
  193.         job.refresh()
  194.         self._dependency = job
  195.         return job
  196.  
  197.     @property
  198.     def func(self):
  199.         func_name = self.func_name
  200.         if func_name is None:
  201.             return None
  202.  
  203.         if self.instance:
  204.             return getattr(self.instance, func_name)
  205.  
  206.         return import_attribute(self.func_name)
  207.  
  208.     def _unpickle_data(self):
  209.         self._func_name, self._instance, self._args, self._kwargs = unpickle(self.data)
  210.  
  211.     @property
  212.     def data(self):
  213.         if self._data is UNEVALUATED:
  214.             if self._func_name is UNEVALUATED:
  215.                 raise ValueError('Cannot build the job data')
  216.  
  217.             if self._instance is UNEVALUATED:
  218.                 self._instance = None
  219.  
  220.             if self._args is UNEVALUATED:
  221.                 self._args = ()
  222.  
  223.             if self._kwargs is UNEVALUATED:
  224.                 self._kwargs = {}
  225.  
  226.             job_tuple = self._func_name, self._instance, self._args, self._kwargs
  227.             self._data = dumps(job_tuple)
  228.         return self._data
  229.  
  230.     @data.setter
  231.     def data(self, value):
  232.         self._data = value
  233.         self._func_name = UNEVALUATED
  234.         self._instance = UNEVALUATED
  235.         self._args = UNEVALUATED
  236.         self._kwargs = UNEVALUATED
  237.  
  238.     @property
  239.     def func_name(self):
  240.         if self._func_name is UNEVALUATED:
  241.             self._unpickle_data()
  242.         return self._func_name
  243.  
  244.     @func_name.setter
  245.     def func_name(self, value):
  246.         self._func_name = value
  247.         self._data = UNEVALUATED
  248.  
  249.     @property
  250.     def instance(self):
  251.         if self._instance is UNEVALUATED:
  252.             self._unpickle_data()
  253.         return self._instance
  254.  
  255.     @instance.setter
  256.     def instance(self, value):
  257.         self._instance = value
  258.         self._data = UNEVALUATED
  259.  
  260.     @property
  261.     def args(self):
  262.         if self._args is UNEVALUATED:
  263.             self._unpickle_data()
  264.         return self._args
  265.  
  266.     @args.setter
  267.     def args(self, value):
  268.         self._args = value
  269.         self._data = UNEVALUATED
  270.  
  271.     @property
  272.     def kwargs(self):
  273.         if self._kwargs is UNEVALUATED:
  274.             self._unpickle_data()
  275.         return self._kwargs
  276.  
  277.     @kwargs.setter
  278.     def kwargs(self, value):
  279.         self._kwargs = value
  280.         self._data = UNEVALUATED
  281.  
  282.     @classmethod
  283.     def exists(cls, job_id, connection=None):
  284.         """Returns whether a job hash exists for the given job ID."""
  285.         conn = resolve_connection(connection)
  286.         return conn.exists(cls.key_for(job_id))
  287.  
  288.     @classmethod
  289.     def fetch(cls, id, connection=None):
  290.         """Fetches a persisted job from its corresponding Redis key and
  291.        instantiates it.
  292.        """
  293.         job = cls(id, connection=connection)
  294.         job.refresh()
  295.         return job
  296.  
  297.     def __init__(self, id=None, connection=None):
  298.         self.connection = resolve_connection(connection)
  299.         self._id = id
  300.         self.created_at = utcnow()
  301.         self._data = UNEVALUATED
  302.         self._func_name = UNEVALUATED
  303.         self._instance = UNEVALUATED
  304.         self._args = UNEVALUATED
  305.         self._kwargs = UNEVALUATED
  306.         self.description = None
  307.         self.origin = None
  308.         self.enqueued_at = None
  309.         self.started_at = None
  310.         self.ended_at = None
  311.         self._result = None
  312.         self.exc_info = None
  313.         self.timeout = None
  314.         self.result_ttl = None
  315.         self.ttl = None
  316.         self._status = None
  317.         self._dependency_id = None
  318.         self.meta = {}
  319.  
  320.     def __repr__(self):  # noqa
  321.         return 'Job({0!r}, enqueued_at={1!r})'.format(self._id, self.enqueued_at)
  322.  
  323.     # Data access
  324.     def get_id(self):  # noqa
  325.         """The job ID for this job instance. Generates an ID lazily the
  326.        first time the ID is requested.
  327.        """
  328.         if self._id is None:
  329.             self._id = text_type(uuid4())
  330.         return self._id
  331.  
  332.     def set_id(self, value):
  333.         """Sets a job ID for the given job."""
  334.         if not isinstance(value, string_types):
  335.             raise TypeError('id must be a string, not {0}'.format(type(value)))
  336.         self._id = value
  337.  
  338.     id = property(get_id, set_id)
  339.  
  340.     @classmethod
  341.     def key_for(cls, job_id):
  342.         """The Redis key that is used to store job hash under."""
  343.         return b'rq:job:' + job_id.encode('utf-8')
  344.  
  345.     @classmethod
  346.     def dependents_key_for(cls, job_id):
  347.         """The Redis key that is used to store job hash under."""
  348.         return 'rq:job:{0}:dependents'.format(job_id)
  349.  
  350.     @property
  351.     def key(self):
  352.         """The Redis key that is used to store job hash under."""
  353.         return self.key_for(self.id)
  354.  
  355.     @property
  356.     def dependents_key(self):
  357.         """The Redis key that is used to store job hash under."""
  358.         return self.dependents_key_for(self.id)
  359.  
  360.     @property
  361.     def result(self):
  362.         """Returns the return value of the job.
  363.  
  364.        Initially, right after enqueueing a job, the return value will be
  365.        None.  But when the job has been executed, and had a return value or
  366.        exception, this will return that value or exception.
  367.  
  368.        Note that, when the job has no return value (i.e. returns None), the
  369.        ReadOnlyJob object is useless, as the result won't be written back to
  370.        Redis.
  371.  
  372.        Also note that you cannot draw the conclusion that a job has _not_
  373.        been executed when its return value is None, since return values
  374.        written back to Redis will expire after a given amount of time (500
  375.        seconds by default).
  376.        """
  377.         if self._result is None:
  378.             rv = self.connection.hget(self.key, 'result')
  379.             if rv is not None:
  380.                 # cache the result
  381.                 self._result = loads(rv)
  382.         return self._result
  383.  
  384.     """Backwards-compatibility accessor property `return_value`."""
  385.     return_value = result
  386.  
  387.     # Persistence
  388.     def refresh(self):  # noqa
  389.         """Overwrite the current instance's properties with the values in the
  390.        corresponding Redis key.
  391.  
  392.        Will raise a NoSuchJobError if no corresponding Redis key exists.
  393.        """
  394.         key = self.key
  395.         obj = decode_redis_hash(self.connection.hgetall(key))
  396.         if len(obj) == 0:
  397.             raise NoSuchJobError('No such job: {0}'.format(key))
  398.  
  399.         def to_date(date_str):
  400.             if date_str is None:
  401.                 return
  402.             else:
  403.                 return utcparse(as_text(date_str))
  404.  
  405.         try:
  406.             self.data = obj['data']
  407.         except KeyError:
  408.             raise NoSuchJobError('Unexpected job format: {0}'.format(obj))
  409.  
  410.         self.created_at = to_date(as_text(obj.get('created_at')))
  411.         self.origin = as_text(obj.get('origin'))
  412.         self.description = as_text(obj.get('description'))
  413.         self.enqueued_at = to_date(as_text(obj.get('enqueued_at')))
  414.         self.started_at = to_date(as_text(obj.get('started_at')))
  415.         self.ended_at = to_date(as_text(obj.get('ended_at')))
  416.         self._result = unpickle(obj.get('result')) if obj.get('result') else None  # noqa
  417.         self.exc_info = as_text(obj.get('exc_info'))
  418.         self.timeout = int(obj.get('timeout')) if obj.get('timeout') else None
  419.         self.result_ttl = int(obj.get('result_ttl')) if obj.get('result_ttl') else None  # noqa
  420.         self._status = as_text(obj.get('status') if obj.get('status') else None)
  421.         self._dependency_id = as_text(obj.get('dependency_id', None))
  422.         self.ttl = int(obj.get('ttl')) if obj.get('ttl') else None
  423.         self.meta = unpickle(obj.get('meta')) if obj.get('meta') else {}
  424.  
  425.     def to_dict(self):
  426.         """Returns a serialization of the current job instance"""
  427.         obj = {}
  428.         obj['created_at'] = utcformat(self.created_at or utcnow())
  429.         obj['data'] = self.data
  430.  
  431.         if self.origin is not None:
  432.             obj['origin'] = self.origin
  433.         if self.description is not None:
  434.             obj['description'] = self.description
  435.         if self.enqueued_at is not None:
  436.             obj['enqueued_at'] = utcformat(self.enqueued_at)
  437.         if self.started_at is not None:
  438.             obj['started_at'] = utcformat(self.started_at)
  439.         if self.ended_at is not None:
  440.             obj['ended_at'] = utcformat(self.ended_at)
  441.         if self._result is not None:
  442.             obj['result'] = dumps(self._result)
  443.         if self.exc_info is not None:
  444.             obj['exc_info'] = self.exc_info
  445.         if self.timeout is not None:
  446.             obj['timeout'] = self.timeout
  447.         if self.result_ttl is not None:
  448.             obj['result_ttl'] = self.result_ttl
  449.         if self._status is not None:
  450.             obj['status'] = self._status
  451.         if self._dependency_id is not None:
  452.             obj['dependency_id'] = self._dependency_id
  453.         if self.meta:
  454.             obj['meta'] = dumps(self.meta)
  455.         if self.ttl:
  456.             obj['ttl'] = self.ttl
  457.  
  458.         return obj
  459.  
  460.     def save(self, pipeline=None):
  461.         """Persists the current job instance to its corresponding Redis key."""
  462.         key = self.key
  463.         connection = pipeline if pipeline is not None else self.connection
  464.  
  465.         connection.hmset(key, self.to_dict())
  466.         self.cleanup(self.ttl, pipeline=connection)
  467.  
  468.     def cancel(self):
  469.         """Cancels the given job, which will prevent the job from ever being
  470.        ran (or inspected).
  471.  
  472.        This method merely exists as a high-level API call to cancel jobs
  473.        without worrying about the internals required to implement job
  474.        cancellation.
  475.        """
  476.         from .queue import Queue, get_failed_queue
  477.         pipeline = self.connection._pipeline()
  478.         if self.origin:
  479.             q = (get_failed_queue(connection=self.connection)
  480.                  if self.is_failed
  481.                  else Queue(name=self.origin, connection=self.connection))
  482.             q.remove(self, pipeline=pipeline)
  483.         pipeline.execute()
  484.  
  485.     def delete(self, pipeline=None, remove_from_queue=True):
  486.         """Cancels the job and deletes the job hash from Redis."""
  487.         if remove_from_queue:
  488.             self.cancel()
  489.         connection = pipeline if pipeline is not None else self.connection
  490.         connection.delete(self.key)
  491.         connection.delete(self.dependents_key)
  492.  
  493.     # Job execution
  494.     def perform(self):  # noqa
  495.         """Invokes the job function with the job arguments."""
  496.         self.connection.persist(self.key)
  497.         self.ttl = -1
  498.         _job_stack.push(self.id)
  499.         try:
  500.             self._result = self.func(*self.args, **self.kwargs)
  501.         finally:
  502.             assert self.id == _job_stack.pop()
  503.         return self._result
  504.  
  505.     def get_ttl(self, default_ttl=None):
  506.         """Returns ttl for a job that determines how long a job will be
  507.        persisted. In the future, this method will also be responsible
  508.        for determining ttl for repeated jobs.
  509.        """
  510.         return default_ttl if self.ttl is None else self.ttl
  511.  
  512.     def get_result_ttl(self, default_ttl=None):
  513.         """Returns ttl for a job that determines how long a jobs result will
  514.        be persisted. In the future, this method will also be responsible
  515.        for determining ttl for repeated jobs.
  516.        """
  517.         return default_ttl if self.result_ttl is None else self.result_ttl
  518.  
  519.     # Representation
  520.     def get_call_string(self):  # noqa
  521.         """Returns a string representation of the call, formatted as a regular
  522.        Python function invocation statement.
  523.        """
  524.         if self.func_name is None:
  525.             return None
  526.  
  527.         arg_list = [as_text(repr(arg)) for arg in self.args]
  528.  
  529.         kwargs = ['{0}={1}'.format(k, as_text(repr(v))) for k, v in self.kwargs.items()]
  530.         # Sort here because python 3.3 & 3.4 makes different call_string
  531.         arg_list += sorted(kwargs)
  532.         args = ', '.join(arg_list)
  533.  
  534.         return '{0}({1})'.format(self.func_name, args)
  535.  
  536.     def cleanup(self, ttl=None, pipeline=None, remove_from_queue=True):
  537.         """Prepare job for eventual deletion (if needed). This method is usually
  538.        called after successful execution. How long we persist the job and its
  539.        result depends on the value of ttl:
  540.        - If ttl is 0, cleanup the job immediately.
  541.        - If it's a positive number, set the job to expire in X seconds.
  542.        - If ttl is negative, don't set an expiry to it (persist
  543.          forever)
  544.        """
  545.         if ttl == 0:
  546.             self.delete(pipeline=pipeline, remove_from_queue=remove_from_queue)
  547.         elif not ttl:
  548.             return
  549.         elif ttl > 0:
  550.             connection = pipeline if pipeline is not None else self.connection
  551.             connection.expire(self.key, ttl)
  552.  
  553.     def register_dependency(self, pipeline=None):
  554.         """Jobs may have dependencies. Jobs are enqueued only if the job they
  555.        depend on is successfully performed. We record this relation as
  556.        a reverse dependency (a Redis set), with a key that looks something
  557.        like:
  558.  
  559.            rq:job:job_id:dependents = {'job_id_1', 'job_id_2'}
  560.  
  561.        This method adds the job in its dependency's dependents set
  562.        and adds the job to DeferredJobRegistry.
  563.        """
  564.         from .registry import DeferredJobRegistry
  565.  
  566.         registry = DeferredJobRegistry(self.origin, connection=self.connection)
  567.         registry.add(self, pipeline=pipeline)
  568.  
  569.         connection = pipeline if pipeline is not None else self.connection
  570.         connection.sadd(Job.dependents_key_for(self._dependency_id), self.id)
  571.  
  572.     def __str__(self):
  573.         return '<Job {0}: {1}>'.format(self.id, self.description)
  574.  
  575.     # Job equality
  576.     def __eq__(self, other):  # noqa
  577.         return isinstance(other, self.__class__) and self.id == other.id
  578.  
  579.     def __hash__(self):
  580.         return hash(self.id)
  581.  
  582.  
  583. _job_stack = LocalStack()
  584.  
downloadjob.py Source code - Download rq Source code
Related Source Codes/Software:
amazon-dsstne - Deep Scalable Sparse Tensor Network Engine (DSSTNE... 2017-01-08
webpack-demos - a collection of simple demos of Webpack 2017-01-08
Squire - HTML5 rich text editor. Try the demo integration a... 2017-01-08
thor - Thor is a toolkit for building powerful command-li... 2017-01-08
glide - Package Management for Golang h... 2017-01-08
emmet-vim - emmet for vim: http://emmet.io/ ... 2017-01-08
prose - A Content Editor for GitHub. ht... 2017-01-08
sshrc - ring your .bashrc, .vimrc, etc. with you when you ... 2017-01-08
typed.js - A jQuery typing animation script. ... 2017-01-08
find - High-precision indoor positioning framework for mo... 2017-01-08
CRYENGINE - CRYENGINE is a powerful real-time game development... 2017-06-11
postal - 2017-06-11
reactide - Reactide is the first dedicated IDE for React web ... 2017-06-11
rkt - rkt is a pod-native container engine for Linux. It... 2017-06-11
uWebSockets - Tiny WebSockets https://for... 2017-06-11
realworld - TodoMVC for the RealWorld - Exemplary fullstack Me... 2017-06-11
goreplay - GoReplay is an open-source tool for capturing and ... 2017-06-10
pyenv - Simple Python version management 2017-06-10
redux-saga - An alternative side effect model for Redux apps ... 2017-06-10
angular-starter - 2017-06-10

 Back to top