utils.py 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155
  1. from django.http import Http404
  2. from django.utils.translation import gettext_lazy as _
  3. from django_rq.queues import get_queue, get_queue_by_index, get_redis_connection
  4. from django_rq.settings import QUEUES_MAP, QUEUES_LIST
  5. from django_rq.utils import get_jobs, stop_jobs
  6. from rq import requeue_job
  7. from rq.exceptions import NoSuchJobError
  8. from rq.job import Job as RQ_Job, JobStatus as RQJobStatus
  9. from rq.registry import (
  10. DeferredJobRegistry,
  11. FailedJobRegistry,
  12. FinishedJobRegistry,
  13. ScheduledJobRegistry,
  14. StartedJobRegistry,
  15. )
  16. __all__ = (
  17. 'delete_rq_job',
  18. 'enqueue_rq_job',
  19. 'get_rq_jobs',
  20. 'get_rq_jobs_from_status',
  21. 'requeue_rq_job',
  22. 'stop_rq_job',
  23. )
  24. def get_rq_jobs():
  25. """
  26. Return a list of all RQ jobs.
  27. """
  28. jobs = set()
  29. for queue in QUEUES_LIST:
  30. queue = get_queue(queue['name'])
  31. jobs.update(queue.get_jobs())
  32. return list(jobs)
  33. def get_rq_jobs_from_status(queue, status):
  34. """
  35. Return the RQ jobs with the given status.
  36. """
  37. jobs = []
  38. try:
  39. registry_cls = {
  40. RQJobStatus.STARTED: StartedJobRegistry,
  41. RQJobStatus.DEFERRED: DeferredJobRegistry,
  42. RQJobStatus.FINISHED: FinishedJobRegistry,
  43. RQJobStatus.FAILED: FailedJobRegistry,
  44. RQJobStatus.SCHEDULED: ScheduledJobRegistry,
  45. }[status]
  46. except KeyError:
  47. raise Http404
  48. registry = registry_cls(queue.name, queue.connection)
  49. job_ids = registry.get_job_ids()
  50. if status != RQJobStatus.DEFERRED:
  51. jobs = get_jobs(queue, job_ids, registry)
  52. else:
  53. # Deferred jobs require special handling
  54. for job_id in job_ids:
  55. try:
  56. jobs.append(RQ_Job.fetch(job_id, connection=queue.connection, serializer=queue.serializer))
  57. except NoSuchJobError:
  58. pass
  59. if jobs and status == RQJobStatus.SCHEDULED:
  60. for job in jobs:
  61. job.scheduled_at = registry.get_scheduled_time(job)
  62. return jobs
  63. def delete_rq_job(job_id):
  64. """
  65. Delete the specified RQ job.
  66. """
  67. config = QUEUES_LIST[0]
  68. try:
  69. job = RQ_Job.fetch(job_id, connection=get_redis_connection(config['connection_config']),)
  70. except NoSuchJobError:
  71. raise Http404(_("Job {job_id} not found").format(job_id=job_id))
  72. queue_index = QUEUES_MAP[job.origin]
  73. queue = get_queue_by_index(queue_index)
  74. # Remove job id from queue and delete the actual job
  75. queue.connection.lrem(queue.key, 0, job.id)
  76. job.delete()
  77. def requeue_rq_job(job_id):
  78. """
  79. Requeue the specified RQ job.
  80. """
  81. config = QUEUES_LIST[0]
  82. try:
  83. job = RQ_Job.fetch(job_id, connection=get_redis_connection(config['connection_config']),)
  84. except NoSuchJobError:
  85. raise Http404(_("Job {id} not found.").format(id=job_id))
  86. queue_index = QUEUES_MAP[job.origin]
  87. queue = get_queue_by_index(queue_index)
  88. requeue_job(job_id, connection=queue.connection, serializer=queue.serializer)
  89. def enqueue_rq_job(job_id):
  90. """
  91. Enqueue the specified RQ job.
  92. """
  93. config = QUEUES_LIST[0]
  94. try:
  95. job = RQ_Job.fetch(job_id, connection=get_redis_connection(config['connection_config']),)
  96. except NoSuchJobError:
  97. raise Http404(_("Job {id} not found.").format(id=job_id))
  98. queue_index = QUEUES_MAP[job.origin]
  99. queue = get_queue_by_index(queue_index)
  100. try:
  101. # _enqueue_job is new in RQ 1.14, this is used to enqueue
  102. # job regardless of its dependencies
  103. queue._enqueue_job(job)
  104. except AttributeError:
  105. queue.enqueue_job(job)
  106. # Remove job from correct registry if needed
  107. if job.get_status() == RQJobStatus.DEFERRED:
  108. registry = DeferredJobRegistry(queue.name, queue.connection)
  109. registry.remove(job)
  110. elif job.get_status() == RQJobStatus.FINISHED:
  111. registry = FinishedJobRegistry(queue.name, queue.connection)
  112. registry.remove(job)
  113. elif job.get_status() == RQJobStatus.SCHEDULED:
  114. registry = ScheduledJobRegistry(queue.name, queue.connection)
  115. registry.remove(job)
  116. def stop_rq_job(job_id):
  117. """
  118. Stop the specified RQ job.
  119. """
  120. config = QUEUES_LIST[0]
  121. try:
  122. job = RQ_Job.fetch(job_id, connection=get_redis_connection(config['connection_config']),)
  123. except NoSuchJobError:
  124. raise Http404(_("Job {job_id} not found").format(job_id=job_id))
  125. queue_index = QUEUES_MAP[job.origin]
  126. queue = get_queue_by_index(queue_index)
  127. return stop_jobs(queue, job_id)[0]