2
0

utils.py 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156
  1. from django.http import Http404
  2. from django.utils.translation import gettext_lazy as _
  3. from django_rq.queues import get_queue, get_queue_by_index, get_redis_connection
  4. from django_rq.settings import QUEUES_LIST, QUEUES_MAP
  5. from django_rq.utils import get_jobs, stop_jobs
  6. from rq import requeue_job
  7. from rq.exceptions import NoSuchJobError
  8. from rq.job import Job as RQ_Job
  9. from rq.job import JobStatus as RQJobStatus
  10. from rq.registry import (
  11. DeferredJobRegistry,
  12. FailedJobRegistry,
  13. FinishedJobRegistry,
  14. ScheduledJobRegistry,
  15. StartedJobRegistry,
  16. )
  17. __all__ = (
  18. 'delete_rq_job',
  19. 'enqueue_rq_job',
  20. 'get_rq_jobs',
  21. 'get_rq_jobs_from_status',
  22. 'requeue_rq_job',
  23. 'stop_rq_job',
  24. )
  25. def get_rq_jobs():
  26. """
  27. Return a list of all RQ jobs.
  28. """
  29. jobs = set()
  30. for queue in QUEUES_LIST:
  31. queue = get_queue(queue['name'])
  32. jobs.update(queue.get_jobs())
  33. return list(jobs)
  34. def get_rq_jobs_from_status(queue, status):
  35. """
  36. Return the RQ jobs with the given status.
  37. """
  38. jobs = []
  39. try:
  40. registry_cls = {
  41. RQJobStatus.STARTED: StartedJobRegistry,
  42. RQJobStatus.DEFERRED: DeferredJobRegistry,
  43. RQJobStatus.FINISHED: FinishedJobRegistry,
  44. RQJobStatus.FAILED: FailedJobRegistry,
  45. RQJobStatus.SCHEDULED: ScheduledJobRegistry,
  46. }[status]
  47. except KeyError:
  48. raise Http404
  49. registry = registry_cls(queue.name, queue.connection)
  50. job_ids = registry.get_job_ids()
  51. if status != RQJobStatus.DEFERRED:
  52. jobs = get_jobs(queue, job_ids, registry)
  53. else:
  54. # Deferred jobs require special handling
  55. for job_id in job_ids:
  56. try:
  57. jobs.append(RQ_Job.fetch(job_id, connection=queue.connection, serializer=queue.serializer))
  58. except NoSuchJobError:
  59. pass
  60. if jobs and status == RQJobStatus.SCHEDULED:
  61. for job in jobs:
  62. job.scheduled_at = registry.get_scheduled_time(job)
  63. return jobs
  64. def delete_rq_job(job_id):
  65. """
  66. Delete the specified RQ job.
  67. """
  68. config = QUEUES_LIST[0]
  69. try:
  70. job = RQ_Job.fetch(job_id, connection=get_redis_connection(config['connection_config']),)
  71. except NoSuchJobError:
  72. raise Http404(_("Job {job_id} not found").format(job_id=job_id))
  73. queue_index = QUEUES_MAP[job.origin]
  74. queue = get_queue_by_index(queue_index)
  75. # Remove job id from queue and delete the actual job
  76. queue.connection.lrem(queue.key, 0, job.id)
  77. job.delete()
  78. def requeue_rq_job(job_id):
  79. """
  80. Requeue the specified RQ job.
  81. """
  82. config = QUEUES_LIST[0]
  83. try:
  84. job = RQ_Job.fetch(job_id, connection=get_redis_connection(config['connection_config']),)
  85. except NoSuchJobError:
  86. raise Http404(_("Job {id} not found.").format(id=job_id))
  87. queue_index = QUEUES_MAP[job.origin]
  88. queue = get_queue_by_index(queue_index)
  89. requeue_job(job_id, connection=queue.connection, serializer=queue.serializer)
  90. def enqueue_rq_job(job_id):
  91. """
  92. Enqueue the specified RQ job.
  93. """
  94. config = QUEUES_LIST[0]
  95. try:
  96. job = RQ_Job.fetch(job_id, connection=get_redis_connection(config['connection_config']),)
  97. except NoSuchJobError:
  98. raise Http404(_("Job {id} not found.").format(id=job_id))
  99. queue_index = QUEUES_MAP[job.origin]
  100. queue = get_queue_by_index(queue_index)
  101. try:
  102. # _enqueue_job is new in RQ 1.14, this is used to enqueue
  103. # job regardless of its dependencies
  104. queue._enqueue_job(job)
  105. except AttributeError:
  106. queue.enqueue_job(job)
  107. # Remove job from correct registry if needed
  108. if job.get_status() == RQJobStatus.DEFERRED:
  109. registry = DeferredJobRegistry(queue.name, queue.connection)
  110. registry.remove(job)
  111. elif job.get_status() == RQJobStatus.FINISHED:
  112. registry = FinishedJobRegistry(queue.name, queue.connection)
  113. registry.remove(job)
  114. elif job.get_status() == RQJobStatus.SCHEDULED:
  115. registry = ScheduledJobRegistry(queue.name, queue.connection)
  116. registry.remove(job)
  117. def stop_rq_job(job_id):
  118. """
  119. Stop the specified RQ job.
  120. """
  121. config = QUEUES_LIST[0]
  122. try:
  123. job = RQ_Job.fetch(job_id, connection=get_redis_connection(config['connection_config']),)
  124. except NoSuchJobError:
  125. raise Http404(_("Job {job_id} not found").format(job_id=job_id))
  126. queue_index = QUEUES_MAP[job.origin]
  127. queue = get_queue_by_index(queue_index)
  128. return stop_jobs(queue, job_id)[0]