reports.py 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244
  1. import inspect
  2. import logging
  3. import traceback
  4. from datetime import timedelta
  5. from django.utils import timezone
  6. from django.utils.functional import classproperty
  7. from django_rq import job
  8. from core.choices import JobStatusChoices
  9. from core.models import Job
  10. from .choices import LogLevelChoices
  11. from .models import ReportModule
  12. __all__ = (
  13. 'Report',
  14. 'get_module_and_report',
  15. 'run_report',
  16. )
  17. logger = logging.getLogger(__name__)
  18. def get_module_and_report(module_name, report_name):
  19. module = ReportModule.objects.get(file_path=f'{module_name}.py')
  20. report = module.reports.get(report_name)
  21. return module, report
  22. @job('default')
  23. def run_report(job, *args, **kwargs):
  24. """
  25. Helper function to call the run method on a report. This is needed to get around the inability to pickle an instance
  26. method for queueing into the background processor.
  27. """
  28. job.start()
  29. module = ReportModule.objects.get(pk=job.object_id)
  30. report = module.reports.get(job.name)()
  31. try:
  32. report.run(job)
  33. except Exception:
  34. job.terminate(status=JobStatusChoices.STATUS_ERRORED)
  35. logging.error(f"Error during execution of report {job.name}")
  36. finally:
  37. # Schedule the next job if an interval has been set
  38. if job.interval:
  39. new_scheduled_time = job.scheduled + timedelta(minutes=job.interval)
  40. Job.enqueue(
  41. run_report,
  42. instance=job.object,
  43. name=job.name,
  44. user=job.user,
  45. job_timeout=report.job_timeout,
  46. schedule_at=new_scheduled_time,
  47. interval=job.interval
  48. )
  49. class Report(object):
  50. """
  51. NetBox users can extend this object to write custom reports to be used for validating data within NetBox. Each
  52. report must have one or more test methods named `test_*`.
  53. The `_results` attribute of a completed report will take the following form:
  54. {
  55. 'test_bar': {
  56. 'failures': 42,
  57. 'log': [
  58. (<datetime>, <level>, <object>, <message>),
  59. ...
  60. ]
  61. },
  62. 'test_foo': {
  63. 'failures': 0,
  64. 'log': [
  65. (<datetime>, <level>, <object>, <message>),
  66. ...
  67. ]
  68. }
  69. }
  70. """
  71. description = None
  72. job_timeout = None
  73. def __init__(self):
  74. self._results = {}
  75. self.active_test = None
  76. self.failed = False
  77. self.logger = logging.getLogger(f"netbox.reports.{self.__module__}.{self.__class__.__name__}")
  78. # Compile test methods and initialize results skeleton
  79. test_methods = []
  80. for method in dir(self):
  81. if method.startswith('test_') and callable(getattr(self, method)):
  82. test_methods.append(method)
  83. self._results[method] = {
  84. 'success': 0,
  85. 'info': 0,
  86. 'warning': 0,
  87. 'failure': 0,
  88. 'log': [],
  89. }
  90. if not test_methods:
  91. raise Exception("A report must contain at least one test method.")
  92. self.test_methods = test_methods
  93. @classproperty
  94. def module(self):
  95. return self.__module__
  96. @classproperty
  97. def class_name(self):
  98. return self.__name__
  99. @classproperty
  100. def full_name(self):
  101. return f'{self.module}.{self.class_name}'
  102. @property
  103. def name(self):
  104. """
  105. Override this attribute to set a custom display name.
  106. """
  107. return self.class_name
  108. @property
  109. def filename(self):
  110. return inspect.getfile(self.__class__)
  111. @property
  112. def source(self):
  113. return inspect.getsource(self.__class__)
  114. #
  115. # Logging methods
  116. #
  117. def _log(self, obj, message, level=LogLevelChoices.LOG_DEFAULT):
  118. """
  119. Log a message from a test method. Do not call this method directly; use one of the log_* wrappers below.
  120. """
  121. if level not in LogLevelChoices.values():
  122. raise Exception(f"Unknown logging level: {level}")
  123. self._results[self.active_test]['log'].append((
  124. timezone.now().isoformat(),
  125. level,
  126. str(obj) if obj else None,
  127. obj.get_absolute_url() if hasattr(obj, 'get_absolute_url') else None,
  128. message,
  129. ))
  130. def log(self, message):
  131. """
  132. Log a message which is not associated with a particular object.
  133. """
  134. self._log(None, message, level=LogLevelChoices.LOG_DEFAULT)
  135. self.logger.info(message)
  136. def log_success(self, obj, message=None):
  137. """
  138. Record a successful test against an object. Logging a message is optional.
  139. """
  140. if message:
  141. self._log(obj, message, level=LogLevelChoices.LOG_SUCCESS)
  142. self._results[self.active_test]['success'] += 1
  143. self.logger.info(f"Success | {obj}: {message}")
  144. def log_info(self, obj, message):
  145. """
  146. Log an informational message.
  147. """
  148. self._log(obj, message, level=LogLevelChoices.LOG_INFO)
  149. self._results[self.active_test]['info'] += 1
  150. self.logger.info(f"Info | {obj}: {message}")
  151. def log_warning(self, obj, message):
  152. """
  153. Log a warning.
  154. """
  155. self._log(obj, message, level=LogLevelChoices.LOG_WARNING)
  156. self._results[self.active_test]['warning'] += 1
  157. self.logger.info(f"Warning | {obj}: {message}")
  158. def log_failure(self, obj, message):
  159. """
  160. Log a failure. Calling this method will automatically mark the report as failed.
  161. """
  162. self._log(obj, message, level=LogLevelChoices.LOG_FAILURE)
  163. self._results[self.active_test]['failure'] += 1
  164. self.logger.info(f"Failure | {obj}: {message}")
  165. self.failed = True
  166. #
  167. # Run methods
  168. #
  169. def run(self, job):
  170. """
  171. Run the report and save its results. Each test method will be executed in order.
  172. """
  173. self.logger.info(f"Running report")
  174. # Perform any post-run tasks
  175. self.pre_run()
  176. try:
  177. for method_name in self.test_methods:
  178. self.active_test = method_name
  179. test_method = getattr(self, method_name)
  180. test_method()
  181. if self.failed:
  182. self.logger.warning("Report failed")
  183. job.status = JobStatusChoices.STATUS_FAILED
  184. else:
  185. self.logger.info("Report completed successfully")
  186. job.status = JobStatusChoices.STATUS_COMPLETED
  187. except Exception as e:
  188. stacktrace = traceback.format_exc()
  189. self.log_failure(None, f"An exception occurred: {type(e).__name__}: {e} <pre>{stacktrace}</pre>")
  190. logger.error(f"Exception raised during report execution: {e}")
  191. job.terminate(status=JobStatusChoices.STATUS_ERRORED)
  192. finally:
  193. job.data = self._results
  194. job.terminate()
  195. # Perform any post-run tasks
  196. self.post_run()
  197. def pre_run(self):
  198. """
  199. Extend this method to include any tasks which should execute *before* the report is run.
  200. """
  201. pass
  202. def post_run(self):
  203. """
  204. Extend this method to include any tasks which should execute *after* the report is run.
  205. """
  206. pass