BVB Source Codes

rq Show registry.py Source code

Return Download rq: download registry.py Source code - Download rq Source code - Type:.py
  1. from .compat import as_text
  2. from .connections import resolve_connection
  3. from .exceptions import NoSuchJobError
  4. from .job import Job, JobStatus
  5. from .queue import FailedQueue
  6. from .utils import current_timestamp
  7.  
  8.  
  9. class BaseRegistry(object):
  10.     """
  11.    Base implementation of a job registry, implemented in Redis sorted set.
  12.    Each job is stored as a key in the registry, scored by expiration time
  13.    (unix timestamp).
  14.    """
  15.  
  16.     def __init__(self, name='default', connection=None):
  17.         self.name = name
  18.         self.connection = resolve_connection(connection)
  19.  
  20.     def __len__(self):
  21.         """Returns the number of jobs in this registry"""
  22.         return self.count
  23.  
  24.     @property
  25.     def count(self):
  26.         """Returns the number of jobs in this registry"""
  27.         self.cleanup()
  28.         return self.connection.zcard(self.key)
  29.  
  30.     def add(self, job, ttl=0, pipeline=None):
  31.         """Adds a job to a registry with expiry time of now + ttl."""
  32.         score = ttl if ttl < 0 else current_timestamp() + ttl
  33.         if pipeline is not None:
  34.             return pipeline.zadd(self.key, score, job.id)
  35.  
  36.         return self.connection._zadd(self.key, score, job.id)
  37.  
  38.     def remove(self, job, pipeline=None):
  39.         connection = pipeline if pipeline is not None else self.connection
  40.         return connection.zrem(self.key, job.id)
  41.  
  42.     def get_expired_job_ids(self, timestamp=None):
  43.         """Returns job ids whose score are less than current timestamp.
  44.  
  45.        Returns ids for jobs with an expiry time earlier than timestamp,
  46.        specified as seconds since the Unix epoch. timestamp defaults to call
  47.        time if unspecified.
  48.        """
  49.         score = timestamp if timestamp is not None else current_timestamp()
  50.         return [as_text(job_id) for job_id in
  51.                 self.connection.zrangebyscore(self.key, 0, score)]
  52.  
  53.     def get_job_ids(self, start=0, end=-1):
  54.         """Returns list of all job ids."""
  55.         self.cleanup()
  56.         return [as_text(job_id) for job_id in
  57.                 self.connection.zrange(self.key, start, end)]
  58.  
  59.  
  60. class StartedJobRegistry(BaseRegistry):
  61.     """
  62.    Registry of currently executing jobs. Each queue maintains a
  63.    StartedJobRegistry. Jobs in this registry are ones that are currently
  64.    being executed.
  65.  
  66.    Jobs are added to registry right before they are executed and removed
  67.    right after completion (success or failure).
  68.    """
  69.  
  70.     def __init__(self, name='default', connection=None):
  71.         super(StartedJobRegistry, self).__init__(name, connection)
  72.         self.key = 'rq:wip:{0}'.format(name)
  73.  
  74.     def cleanup(self, timestamp=None):
  75.         """Remove expired jobs from registry and add them to FailedQueue.
  76.  
  77.        Removes jobs with an expiry time earlier than timestamp, specified as
  78.        seconds since the Unix epoch. timestamp defaults to call time if
  79.        unspecified. Removed jobs are added to the global failed job queue.
  80.        """
  81.         score = timestamp if timestamp is not None else current_timestamp()
  82.         job_ids = self.get_expired_job_ids(score)
  83.  
  84.         if job_ids:
  85.             failed_queue = FailedQueue(connection=self.connection)
  86.  
  87.             with self.connection.pipeline() as pipeline:
  88.                 for job_id in job_ids:
  89.                     try:
  90.                         job = Job.fetch(job_id, connection=self.connection)
  91.                         job.set_status(JobStatus.FAILED)
  92.                         job.save(pipeline=pipeline)
  93.                         failed_queue.push_job_id(job_id, pipeline=pipeline)
  94.                     except NoSuchJobError:
  95.                         pass
  96.  
  97.                 pipeline.zremrangebyscore(self.key, 0, score)
  98.                 pipeline.execute()
  99.  
  100.         return job_ids
  101.  
  102.  
  103. class FinishedJobRegistry(BaseRegistry):
  104.     """
  105.    Registry of jobs that have been completed. Jobs are added to this
  106.    registry after they have successfully completed for monitoring purposes.
  107.    """
  108.  
  109.     def __init__(self, name='default', connection=None):
  110.         super(FinishedJobRegistry, self).__init__(name, connection)
  111.         self.key = 'rq:finished:{0}'.format(name)
  112.  
  113.     def cleanup(self, timestamp=None):
  114.         """Remove expired jobs from registry.
  115.  
  116.        Removes jobs with an expiry time earlier than timestamp, specified as
  117.        seconds since the Unix epoch. timestamp defaults to call time if
  118.        unspecified.
  119.        """
  120.         score = timestamp if timestamp is not None else current_timestamp()
  121.         self.connection.zremrangebyscore(self.key, 0, score)
  122.  
  123.  
  124. class DeferredJobRegistry(BaseRegistry):
  125.     """
  126.    Registry of deferred jobs (waiting for another job to finish).
  127.    """
  128.  
  129.     def __init__(self, name='default', connection=None):
  130.         super(DeferredJobRegistry, self).__init__(name, connection)
  131.         self.key = 'rq:deferred:{0}'.format(name)
  132.  
  133.     def cleanup(self):
  134.         """This method is only here to prevent errors because this method is
  135.        automatically called by `count()` and `get_job_ids()` methods
  136.        implemented in BaseRegistry."""
  137.         pass
  138.  
  139.  
  140. def clean_registries(queue):
  141.     """Cleans StartedJobRegistry and FinishedJobRegistry of a queue."""
  142.     registry = FinishedJobRegistry(name=queue.name, connection=queue.connection)
  143.     registry.cleanup()
  144.     registry = StartedJobRegistry(name=queue.name, connection=queue.connection)
  145.     registry.cleanup()
  146.  
downloadregistry.py Source code - Download rq Source code
Related Source Codes/Software:
amazon-dsstne - Deep Scalable Sparse Tensor Network Engine (DSSTNE... 2017-01-08
webpack-demos - a collection of simple demos of Webpack 2017-01-08
Squire - HTML5 rich text editor. Try the demo integration a... 2017-01-08
thor - Thor is a toolkit for building powerful command-li... 2017-01-08
glide - Package Management for Golang h... 2017-01-08
emmet-vim - emmet for vim: http://emmet.io/ ... 2017-01-08
prose - A Content Editor for GitHub. ht... 2017-01-08
sshrc - ring your .bashrc, .vimrc, etc. with you when you ... 2017-01-08
typed.js - A jQuery typing animation script. ... 2017-01-08
find - High-precision indoor positioning framework for mo... 2017-01-08
profiling - An interactive continuous Python profiler. 2017-02-18
AQGridView - A grid view for iPhone/iPad, designed to look simi... 2017-02-18
SCLAlertView - Beautiful animated Alert View. Written in Objectiv... 2017-02-18
csvkit - A suite of utilities for converting to and working... 2017-02-18
Messenger - This is a native iOS Messenger app, making realtim... 2017-02-18
meteor-up - Production Quality Meteor Deployment 2017-02-18
book-of-modern-frontend-tooling - The Front-end Tooling Book 2017-02-17
sorcery - Magical authentication for Rails 3 & 4 2017-02-17
iScript - Xiami.com script--about shrimp, Baidu's Web site, ... 2017-02-17
AndroidViewHover - An elegant way to show your menu or messages. 2017-02-17

 Back to top