reports.py 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248
  1. import inspect
  2. import logging
  3. import traceback
  4. from datetime import timedelta
  5. from django.utils import timezone
  6. from django.utils.functional import classproperty
  7. from django_rq import job
  8. from core.choices import JobStatusChoices
  9. from core.models import Job
  10. from .choices import LogLevelChoices
  11. from .models import ReportModule
  12. __all__ = (
  13. 'Report',
  14. 'get_module_and_report',
  15. 'run_report',
  16. )
  17. logger = logging.getLogger(__name__)
  18. def get_module_and_report(module_name, report_name):
  19. module = ReportModule.objects.get(file_path=f'{module_name}.py')
  20. report = module.reports.get(report_name)()
  21. return module, report
  22. @job('default')
  23. def run_report(job, *args, **kwargs):
  24. """
  25. Helper function to call the run method on a report. This is needed to get around the inability to pickle an instance
  26. method for queueing into the background processor.
  27. """
  28. job.start()
  29. module = ReportModule.objects.get(pk=job.object_id)
  30. report = module.reports.get(job.name)()
  31. try:
  32. report.run(job)
  33. except Exception as e:
  34. job.terminate(status=JobStatusChoices.STATUS_ERRORED, error=repr(e))
  35. logging.error(f"Error during execution of report {job.name}")
  36. finally:
  37. # Schedule the next job if an interval has been set
  38. if job.interval:
  39. new_scheduled_time = job.scheduled + timedelta(minutes=job.interval)
  40. Job.enqueue(
  41. run_report,
  42. instance=job.object,
  43. name=job.name,
  44. user=job.user,
  45. job_timeout=report.job_timeout,
  46. schedule_at=new_scheduled_time,
  47. interval=job.interval
  48. )
  49. class Report(object):
  50. """
  51. NetBox users can extend this object to write custom reports to be used for validating data within NetBox. Each
  52. report must have one or more test methods named `test_*`.
  53. The `_results` attribute of a completed report will take the following form:
  54. {
  55. 'test_bar': {
  56. 'failures': 42,
  57. 'log': [
  58. (<datetime>, <level>, <object>, <message>),
  59. ...
  60. ]
  61. },
  62. 'test_foo': {
  63. 'failures': 0,
  64. 'log': [
  65. (<datetime>, <level>, <object>, <message>),
  66. ...
  67. ]
  68. }
  69. }
  70. """
  71. description = None
  72. scheduling_enabled = True
  73. job_timeout = None
  74. def __init__(self):
  75. self._results = {}
  76. self.active_test = None
  77. self.failed = False
  78. self.logger = logging.getLogger(f"netbox.reports.{self.__module__}.{self.__class__.__name__}")
  79. # Compile test methods and initialize results skeleton
  80. test_methods = []
  81. for method in dir(self):
  82. if method.startswith('test_') and callable(getattr(self, method)):
  83. test_methods.append(method)
  84. self._results[method] = {
  85. 'success': 0,
  86. 'info': 0,
  87. 'warning': 0,
  88. 'failure': 0,
  89. 'log': [],
  90. }
  91. self.test_methods = test_methods
  92. @classproperty
  93. def module(self):
  94. return self.__module__
  95. @classproperty
  96. def class_name(self):
  97. return self.__name__
  98. @classproperty
  99. def full_name(self):
  100. return f'{self.module}.{self.class_name}'
  101. @property
  102. def name(self):
  103. """
  104. Override this attribute to set a custom display name.
  105. """
  106. return self.class_name
  107. @property
  108. def filename(self):
  109. return inspect.getfile(self.__class__)
  110. @property
  111. def source(self):
  112. return inspect.getsource(self.__class__)
  113. @property
  114. def is_valid(self):
  115. """
  116. Indicates whether the report can be run.
  117. """
  118. return bool(self.test_methods)
  119. #
  120. # Logging methods
  121. #
  122. def _log(self, obj, message, level=LogLevelChoices.LOG_DEFAULT):
  123. """
  124. Log a message from a test method. Do not call this method directly; use one of the log_* wrappers below.
  125. """
  126. if level not in LogLevelChoices.values():
  127. raise Exception(f"Unknown logging level: {level}")
  128. self._results[self.active_test]['log'].append((
  129. timezone.now().isoformat(),
  130. level,
  131. str(obj) if obj else None,
  132. obj.get_absolute_url() if hasattr(obj, 'get_absolute_url') else None,
  133. message,
  134. ))
  135. def log(self, message):
  136. """
  137. Log a message which is not associated with a particular object.
  138. """
  139. self._log(None, message, level=LogLevelChoices.LOG_DEFAULT)
  140. self.logger.info(message)
  141. def log_success(self, obj, message=None):
  142. """
  143. Record a successful test against an object. Logging a message is optional.
  144. """
  145. if message:
  146. self._log(obj, message, level=LogLevelChoices.LOG_SUCCESS)
  147. self._results[self.active_test]['success'] += 1
  148. self.logger.info(f"Success | {obj}: {message}")
  149. def log_info(self, obj, message):
  150. """
  151. Log an informational message.
  152. """
  153. self._log(obj, message, level=LogLevelChoices.LOG_INFO)
  154. self._results[self.active_test]['info'] += 1
  155. self.logger.info(f"Info | {obj}: {message}")
  156. def log_warning(self, obj, message):
  157. """
  158. Log a warning.
  159. """
  160. self._log(obj, message, level=LogLevelChoices.LOG_WARNING)
  161. self._results[self.active_test]['warning'] += 1
  162. self.logger.info(f"Warning | {obj}: {message}")
  163. def log_failure(self, obj, message):
  164. """
  165. Log a failure. Calling this method will automatically mark the report as failed.
  166. """
  167. self._log(obj, message, level=LogLevelChoices.LOG_FAILURE)
  168. self._results[self.active_test]['failure'] += 1
  169. self.logger.info(f"Failure | {obj}: {message}")
  170. self.failed = True
  171. #
  172. # Run methods
  173. #
  174. def run(self, job):
  175. """
  176. Run the report and save its results. Each test method will be executed in order.
  177. """
  178. self.logger.info(f"Running report")
  179. # Perform any post-run tasks
  180. self.pre_run()
  181. try:
  182. for method_name in self.test_methods:
  183. self.active_test = method_name
  184. test_method = getattr(self, method_name)
  185. test_method()
  186. job.data = self._results
  187. if self.failed:
  188. self.logger.warning("Report failed")
  189. job.terminate(status=JobStatusChoices.STATUS_FAILED)
  190. else:
  191. self.logger.info("Report completed successfully")
  192. job.terminate()
  193. except Exception as e:
  194. stacktrace = traceback.format_exc()
  195. self.log_failure(None, f"An exception occurred: {type(e).__name__}: {e} <pre>{stacktrace}</pre>")
  196. logger.error(f"Exception raised during report execution: {e}")
  197. job.terminate(status=JobStatusChoices.STATUS_ERRORED, error=repr(e))
  198. # Perform any post-run tasks
  199. self.post_run()
  200. def pre_run(self):
  201. """
  202. Extend this method to include any tasks which should execute *before* the report is run.
  203. """
  204. pass
  205. def post_run(self):
  206. """
  207. Extend this method to include any tasks which should execute *after* the report is run.
  208. """
  209. pass