BVB Source Codes

rq Show test_cli.py Source code

Return Download rq: download test_cli.py Source code - Download rq Source code - Type:.py
  1. # -*- coding: utf-8 -*-
  2. from __future__ import (absolute_import, division, print_function,
  3.                         unicode_literals)
  4.  
  5. from click.testing import CliRunner
  6. from rq import get_failed_queue, Queue
  7. from rq.compat import is_python_version
  8. from rq.job import Job
  9. from rq.cli import main
  10. from rq.cli.helpers import read_config_file
  11.  
  12. from tests import RQTestCase
  13. from tests.fixtures import div_by_zero
  14.  
  15. if is_python_version((2, 7), (3, 2)):
  16.     from unittest import TestCase
  17. else:
  18.     from unittest2 import TestCase  # noqa
  19.  
  20.  
  21. class TestCommandLine(TestCase):
  22.     def test_config_file(self):
  23.         settings = read_config_file("tests.dummy_settings")
  24.         self.assertIn("REDIS_HOST", settings)
  25.         self.assertEqual(settings['REDIS_HOST'], "testhost.example.com")
  26.  
  27.  
  28. class TestRQCli(RQTestCase):
  29.  
  30.     def assert_normal_execution(self, result):
  31.         if result.exit_code == 0:
  32.             return True
  33.         else:
  34.             print("Non normal execution")
  35.             print("Exit Code: {}".format(result.exit_code))
  36.             print("Output: {}".format(result.output))
  37.             print("Exception: {}".format(result.exception))
  38.             self.assertEqual(result.exit_code, 0)
  39.  
  40.     """Test rq_cli script"""
  41.     def setUp(self):
  42.         super(TestRQCli, self).setUp()
  43.         db_num = self.testconn.connection_pool.connection_kwargs['db']
  44.         self.redis_url = 'redis://127.0.0.1:6379/%d' % db_num
  45.  
  46.         job = Job.create(func=div_by_zero, args=(1, 2, 3))
  47.         job.origin = 'fake'
  48.         job.save()
  49.         get_failed_queue().quarantine(job, Exception('Some fake error'))  # noqa
  50.  
  51.     def test_empty(self):
  52.         """rq empty -u <url> failed"""
  53.         runner = CliRunner()
  54.         result = runner.invoke(main, ['empty', '-u', self.redis_url, 'failed'])
  55.         self.assert_normal_execution(result)
  56.         self.assertEqual(result.output.strip(), '1 jobs removed from failed queue')
  57.  
  58.     def test_requeue(self):
  59.         """rq requeue -u <url> --all"""
  60.         runner = CliRunner()
  61.         result = runner.invoke(main, ['requeue', '-u', self.redis_url, '--all'])
  62.         self.assert_normal_execution(result)
  63.         self.assertEqual(result.output.strip(), 'Requeueing 1 jobs from failed queue')
  64.  
  65.     def test_info(self):
  66.         """rq info -u <url>"""
  67.         runner = CliRunner()
  68.         result = runner.invoke(main, ['info', '-u', self.redis_url])
  69.         self.assert_normal_execution(result)
  70.         self.assertIn('1 queues, 1 jobs total', result.output)
  71.  
  72.     def test_worker(self):
  73.         """rq worker -u <url> -b"""
  74.         runner = CliRunner()
  75.         result = runner.invoke(main, ['worker', '-u', self.redis_url, '-b'])
  76.         self.assert_normal_execution(result)
  77.  
  78.     def test_exception_handlers(self):
  79.         """rq worker -u <url> -b --exception-handler <handler>"""
  80.         q = Queue()
  81.         failed_q = get_failed_queue()
  82.         failed_q.empty()
  83.  
  84.         runner = CliRunner()
  85.  
  86.         # If exception handler is not given, failed job goes to FailedQueue
  87.         q.enqueue(div_by_zero)
  88.         runner.invoke(main, ['worker', '-u', self.redis_url, '-b'])
  89.         self.assertEqual(failed_q.count, 1)
  90.  
  91.         # Black hole exception handler doesn't add failed jobs to FailedQueue
  92.         q.enqueue(div_by_zero)
  93.         runner.invoke(main, ['worker', '-u', self.redis_url, '-b',
  94.                              '--exception-handler', 'tests.fixtures.black_hole'])
  95.         self.assertEqual(failed_q.count, 1)
  96.  
  97.     def test_suspend_and_resume(self):
  98.         """rq suspend -u <url>
  99.           rq resume -u <url>
  100.        """
  101.         runner = CliRunner()
  102.         result = runner.invoke(main, ['suspend', '-u', self.redis_url])
  103.         self.assert_normal_execution(result)
  104.  
  105.         result = runner.invoke(main, ['resume', '-u', self.redis_url])
  106.         self.assert_normal_execution(result)
  107.  
  108.     def test_suspend_with_ttl(self):
  109.         """rq suspend -u <url> --duration=2
  110.        """
  111.         runner = CliRunner()
  112.         result = runner.invoke(main, ['suspend', '-u', self.redis_url, '--duration', 1])
  113.         self.assert_normal_execution(result)
  114.  
  115.     def test_suspend_with_invalid_ttl(self):
  116.         """rq suspend -u <url> --duration=0
  117.        """
  118.         runner = CliRunner()
  119.         result = runner.invoke(main, ['suspend', '-u', self.redis_url, '--duration', 0])
  120.  
  121.         self.assertEqual(result.exit_code, 1)
  122.         self.assertIn("Duration must be an integer greater than 1", result.output)
  123.  
downloadtest_cli.py Source code - Download rq Source code
Related Source Codes/Software:
amazon-dsstne - Deep Scalable Sparse Tensor Network Engine (DSSTNE... 2017-01-08
webpack-demos - a collection of simple demos of Webpack 2017-01-08
Squire - HTML5 rich text editor. Try the demo integration a... 2017-01-08
thor - Thor is a toolkit for building powerful command-li... 2017-01-08
glide - Package Management for Golang h... 2017-01-08
emmet-vim - emmet for vim: http://emmet.io/ ... 2017-01-08
prose - A Content Editor for GitHub. ht... 2017-01-08
sshrc - ring your .bashrc, .vimrc, etc. with you when you ... 2017-01-08
typed.js - A jQuery typing animation script. ... 2017-01-08
find - High-precision indoor positioning framework for mo... 2017-01-08
CRYENGINE - CRYENGINE is a powerful real-time game development... 2017-06-11
postal - 2017-06-11
reactide - Reactide is the first dedicated IDE for React web ... 2017-06-11
rkt - rkt is a pod-native container engine for Linux. It... 2017-06-11
uWebSockets - Tiny WebSockets https://for... 2017-06-11
realworld - TodoMVC for the RealWorld - Exemplary fullstack Me... 2017-06-11
goreplay - GoReplay is an open-source tool for capturing and ... 2017-06-10
pyenv - Simple Python version management 2017-06-10
redux-saga - An alternative side effect model for Redux apps ... 2017-06-10
angular-starter - 2017-06-10

 Back to top