BVB Source Codes

rq Show test_decorator.py Source code

Return Download rq: download test_decorator.py Source code - Download rq Source code - Type:.py
  1. # -*- coding: utf-8 -*-
  2. from __future__ import (absolute_import, division, print_function,
  3.                         unicode_literals)
  4.  
  5. import mock
  6. from redis import StrictRedis
  7. from rq.decorators import job
  8. from rq.job import Job
  9. from rq.worker import DEFAULT_RESULT_TTL
  10.  
  11. from tests import RQTestCase
  12. from tests.fixtures import decorated_job
  13.  
  14.  
  15. class TestDecorator(RQTestCase):
  16.  
  17.     def setUp(self):
  18.         super(TestDecorator, self).setUp()
  19.  
  20.     def test_decorator_preserves_functionality(self):
  21.         """Ensure that a decorated function's functionality is still preserved.
  22.        """
  23.         self.assertEqual(decorated_job(1, 2), 3)
  24.  
  25.     def test_decorator_adds_delay_attr(self):
  26.         """Ensure that decorator adds a delay attribute to function that returns
  27.        a Job instance when called.
  28.        """
  29.         self.assertTrue(hasattr(decorated_job, 'delay'))
  30.         result = decorated_job.delay(1, 2)
  31.         self.assertTrue(isinstance(result, Job))
  32.         # Ensure that job returns the right result when performed
  33.         self.assertEqual(result.perform(), 3)
  34.  
  35.     def test_decorator_accepts_queue_name_as_argument(self):
  36.         """Ensure that passing in queue name to the decorator puts the job in
  37.        the right queue.
  38.        """
  39.         @job(queue='queue_name')
  40.         def hello():
  41.             return 'Hi'
  42.         result = hello.delay()
  43.         self.assertEqual(result.origin, 'queue_name')
  44.  
  45.     def test_decorator_accepts_result_ttl_as_argument(self):
  46.         """Ensure that passing in result_ttl to the decorator sets the
  47.        result_ttl on the job
  48.        """
  49.         # Ensure default
  50.         result = decorated_job.delay(1, 2)
  51.         self.assertEqual(result.result_ttl, DEFAULT_RESULT_TTL)
  52.  
  53.         @job('default', result_ttl=10)
  54.         def hello():
  55.             return 'Why hello'
  56.         result = hello.delay()
  57.         self.assertEqual(result.result_ttl, 10)
  58.  
  59.     def test_decorator_accepts_ttl_as_argument(self):
  60.         """Ensure that passing in ttl to the decorator sets the ttl on the job
  61.        """
  62.         # Ensure default
  63.         result = decorated_job.delay(1, 2)
  64.         self.assertEqual(result.ttl, None)
  65.  
  66.         @job('default', ttl=30)
  67.         def hello():
  68.             return 'Hello'
  69.         result = hello.delay()
  70.         self.assertEqual(result.ttl, 30)
  71.  
  72.     def test_decorator_accepts_result_depends_on_as_argument(self):
  73.         """Ensure that passing in depends_on to the decorator sets the
  74.        correct dependency on the job
  75.        """
  76.  
  77.         @job(queue='queue_name')
  78.         def foo():
  79.             return 'Firstly'
  80.  
  81.         @job(queue='queue_name')
  82.         def bar():
  83.             return 'Secondly'
  84.  
  85.         foo_job = foo.delay()
  86.         bar_job = bar.delay(depends_on=foo_job)
  87.  
  88.         self.assertIsNone(foo_job._dependency_id)
  89.  
  90.         self.assertEqual(bar_job.dependency, foo_job)
  91.  
  92.         self.assertEqual(bar_job._dependency_id, foo_job.id)
  93.  
  94.     @mock.patch('rq.queue.resolve_connection')
  95.     def test_decorator_connection_laziness(self, resolve_connection):
  96.         """Ensure that job decorator resolve connection in `lazy` way """
  97.  
  98.         resolve_connection.return_value = StrictRedis()
  99.  
  100.         @job(queue='queue_name')
  101.         def foo():
  102.             return 'do something'
  103.  
  104.         self.assertEqual(resolve_connection.call_count, 0)
  105.  
  106.         foo()
  107.  
  108.         self.assertEqual(resolve_connection.call_count, 0)
  109.  
  110.         foo.delay()
  111.  
  112.         self.assertEqual(resolve_connection.call_count, 1)
  113.  
downloadtest_decorator.py Source code - Download rq Source code
Related Source Codes/Software:
amazon-dsstne - Deep Scalable Sparse Tensor Network Engine (DSSTNE... 2017-01-08
webpack-demos - a collection of simple demos of Webpack 2017-01-08
Squire - HTML5 rich text editor. Try the demo integration a... 2017-01-08
thor - Thor is a toolkit for building powerful command-li... 2017-01-08
glide - Package Management for Golang h... 2017-01-08
emmet-vim - emmet for vim: http://emmet.io/ ... 2017-01-08
prose - A Content Editor for GitHub. ht... 2017-01-08
sshrc - ring your .bashrc, .vimrc, etc. with you when you ... 2017-01-08
typed.js - A jQuery typing animation script. ... 2017-01-08
find - High-precision indoor positioning framework for mo... 2017-01-08
profiling - An interactive continuous Python profiler. 2017-02-18
AQGridView - A grid view for iPhone/iPad, designed to look simi... 2017-02-18
SCLAlertView - Beautiful animated Alert View. Written in Objectiv... 2017-02-18
csvkit - A suite of utilities for converting to and working... 2017-02-18
Messenger - This is a native iOS Messenger app, making realtim... 2017-02-18
meteor-up - Production Quality Meteor Deployment 2017-02-18
book-of-modern-frontend-tooling - The Front-end Tooling Book 2017-02-17
sorcery - Magical authentication for Rails 3 & 4 2017-02-17
iScript - Xiami.com script--about shrimp, Baidu's Web site, ... 2017-02-17
AndroidViewHover - An elegant way to show your menu or messages. 2017-02-17

 Back to top