reports.py 7.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258
  1. import importlib
  2. import inspect
  3. import logging
  4. import pkgutil
  5. import traceback
  6. from collections import OrderedDict
  7. from django.conf import settings
  8. from django.utils import timezone
  9. from django_rq import job
  10. from .choices import JobResultStatusChoices, LogLevelChoices
  11. from .models import JobResult
  12. logger = logging.getLogger(__name__)
  13. def is_report(obj):
  14. """
  15. Returns True if the given object is a Report.
  16. """
  17. return obj in Report.__subclasses__()
  18. def get_report(module_name, report_name):
  19. """
  20. Return a specific report from within a module.
  21. """
  22. file_path = '{}/{}.py'.format(settings.REPORTS_ROOT, module_name)
  23. spec = importlib.util.spec_from_file_location(module_name, file_path)
  24. module = importlib.util.module_from_spec(spec)
  25. try:
  26. spec.loader.exec_module(module)
  27. except FileNotFoundError:
  28. return None
  29. report = getattr(module, report_name, None)
  30. if report is None:
  31. return None
  32. return report()
  33. def get_reports():
  34. """
  35. Compile a list of all reports available across all modules in the reports path. Returns a list of tuples:
  36. [
  37. (module_name, (report, report, report, ...)),
  38. (module_name, (report, report, report, ...)),
  39. ...
  40. ]
  41. """
  42. module_list = []
  43. # Iterate through all modules within the reports path. These are the user-created files in which reports are
  44. # defined.
  45. for importer, module_name, _ in pkgutil.iter_modules([settings.REPORTS_ROOT]):
  46. module = importer.find_module(module_name).load_module(module_name)
  47. report_list = [cls() for _, cls in inspect.getmembers(module, is_report)]
  48. module_list.append((module_name, report_list))
  49. return module_list
  50. @job('default')
  51. def run_report(job_result, *args, **kwargs):
  52. """
  53. Helper function to call the run method on a report. This is needed to get around the inability to pickle an instance
  54. method for queueing into the background processor.
  55. """
  56. module_name, report_name = job_result.name.split('.', 1)
  57. report = get_report(module_name, report_name)
  58. try:
  59. report.run(job_result)
  60. except Exception as e:
  61. print(e)
  62. job_result.set_status(JobResultStatusChoices.STATUS_ERRORED)
  63. job_result.save()
  64. logging.error(f"Error during execution of report {job_result.name}")
  65. # Delete any previous terminal state results
  66. JobResult.objects.filter(
  67. obj_type=job_result.obj_type,
  68. name=job_result.name,
  69. status__in=JobResultStatusChoices.TERMINAL_STATE_CHOICES
  70. ).exclude(
  71. pk=job_result.pk
  72. ).delete()
  73. class Report(object):
  74. """
  75. NetBox users can extend this object to write custom reports to be used for validating data within NetBox. Each
  76. report must have one or more test methods named `test_*`.
  77. The `_results` attribute of a completed report will take the following form:
  78. {
  79. 'test_bar': {
  80. 'failures': 42,
  81. 'log': [
  82. (<datetime>, <level>, <object>, <message>),
  83. ...
  84. ]
  85. },
  86. 'test_foo': {
  87. 'failures': 0,
  88. 'log': [
  89. (<datetime>, <level>, <object>, <message>),
  90. ...
  91. ]
  92. }
  93. }
  94. """
  95. description = None
  96. def __init__(self):
  97. self._results = OrderedDict()
  98. self.active_test = None
  99. self.failed = False
  100. self.logger = logging.getLogger(f"netbox.reports.{self.full_name}")
  101. # Compile test methods and initialize results skeleton
  102. test_methods = []
  103. for method in dir(self):
  104. if method.startswith('test_') and callable(getattr(self, method)):
  105. test_methods.append(method)
  106. self._results[method] = OrderedDict([
  107. ('success', 0),
  108. ('info', 0),
  109. ('warning', 0),
  110. ('failure', 0),
  111. ('log', []),
  112. ])
  113. if not test_methods:
  114. raise Exception("A report must contain at least one test method.")
  115. self.test_methods = test_methods
  116. @property
  117. def module(self):
  118. return self.__module__
  119. @property
  120. def class_name(self):
  121. return self.__class__.__name__
  122. @property
  123. def name(self):
  124. """
  125. Override this attribute to set a custom display name.
  126. """
  127. return self.class_name
  128. @property
  129. def full_name(self):
  130. return f'{self.module}.{self.class_name}'
  131. def _log(self, obj, message, level=LogLevelChoices.LOG_DEFAULT):
  132. """
  133. Log a message from a test method. Do not call this method directly; use one of the log_* wrappers below.
  134. """
  135. if level not in LogLevelChoices.as_dict():
  136. raise Exception("Unknown logging level: {}".format(level))
  137. self._results[self.active_test]['log'].append((
  138. timezone.now().isoformat(),
  139. level,
  140. str(obj) if obj else None,
  141. obj.get_absolute_url() if hasattr(obj, 'get_absolute_url') else None,
  142. message,
  143. ))
  144. def log(self, message):
  145. """
  146. Log a message which is not associated with a particular object.
  147. """
  148. self._log(None, message, level=LogLevelChoices.LOG_DEFAULT)
  149. self.logger.info(message)
  150. def log_success(self, obj, message=None):
  151. """
  152. Record a successful test against an object. Logging a message is optional.
  153. """
  154. if message:
  155. self._log(obj, message, level=LogLevelChoices.LOG_SUCCESS)
  156. self._results[self.active_test]['success'] += 1
  157. self.logger.info(f"Success | {obj}: {message}")
  158. def log_info(self, obj, message):
  159. """
  160. Log an informational message.
  161. """
  162. self._log(obj, message, level=LogLevelChoices.LOG_INFO)
  163. self._results[self.active_test]['info'] += 1
  164. self.logger.info(f"Info | {obj}: {message}")
  165. def log_warning(self, obj, message):
  166. """
  167. Log a warning.
  168. """
  169. self._log(obj, message, level=LogLevelChoices.LOG_WARNING)
  170. self._results[self.active_test]['warning'] += 1
  171. self.logger.info(f"Warning | {obj}: {message}")
  172. def log_failure(self, obj, message):
  173. """
  174. Log a failure. Calling this method will automatically mark the report as failed.
  175. """
  176. self._log(obj, message, level=LogLevelChoices.LOG_FAILURE)
  177. self._results[self.active_test]['failure'] += 1
  178. self.logger.info(f"Failure | {obj}: {message}")
  179. self.failed = True
  180. def run(self, job_result):
  181. """
  182. Run the report and save its results. Each test method will be executed in order.
  183. """
  184. self.logger.info(f"Running report")
  185. job_result.status = JobResultStatusChoices.STATUS_RUNNING
  186. job_result.save()
  187. try:
  188. for method_name in self.test_methods:
  189. self.active_test = method_name
  190. test_method = getattr(self, method_name)
  191. test_method()
  192. if self.failed:
  193. self.logger.warning("Report failed")
  194. job_result.status = JobResultStatusChoices.STATUS_FAILED
  195. else:
  196. self.logger.info("Report completed successfully")
  197. job_result.status = JobResultStatusChoices.STATUS_COMPLETED
  198. except Exception as e:
  199. stacktrace = traceback.format_exc()
  200. self.log_failure(None, f"An exception occurred: {type(e).__name__}: {e} <pre>{stacktrace}</pre>")
  201. logger.error(f"Exception raised during report execution: {e}")
  202. job_result.set_status(JobResultStatusChoices.STATUS_ERRORED)
  203. job_result.data = self._results
  204. job_result.completed = timezone.now()
  205. job_result.save()
  206. # Perform any post-run tasks
  207. self.post_run()
  208. def post_run(self):
  209. """
  210. Extend this method to include any tasks which should execute after the report has been run.
  211. """
  212. pass