scheduler.go 2.5 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879
  1. // Copyright 2018 Frédéric Guillot. All rights reserved.
  2. // Use of this source code is governed by the Apache 2.0
  3. // license that can be found in the LICENSE file.
  4. package scheduler // import "miniflux.app/service/scheduler"
  5. import (
  6. "time"
  7. "miniflux.app/config"
  8. "miniflux.app/logger"
  9. "miniflux.app/metric"
  10. "miniflux.app/model"
  11. "miniflux.app/storage"
  12. "miniflux.app/worker"
  13. )
  14. // Serve starts the internal scheduler.
  15. func Serve(store *storage.Storage, pool *worker.Pool) {
  16. logger.Info(`Starting scheduler...`)
  17. go feedScheduler(
  18. store,
  19. pool,
  20. config.Opts.PollingFrequency(),
  21. config.Opts.BatchSize(),
  22. )
  23. go cleanupScheduler(
  24. store,
  25. config.Opts.CleanupFrequencyHours(),
  26. config.Opts.CleanupArchiveReadDays(),
  27. config.Opts.CleanupArchiveUnreadDays(),
  28. config.Opts.CleanupArchiveBatchSize(),
  29. config.Opts.CleanupRemoveSessionsDays(),
  30. )
  31. }
  32. func feedScheduler(store *storage.Storage, pool *worker.Pool, frequency, batchSize int) {
  33. for range time.Tick(time.Duration(frequency) * time.Minute) {
  34. jobs, err := store.NewBatch(batchSize)
  35. if err != nil {
  36. logger.Error("[Scheduler:Feed] %v", err)
  37. } else {
  38. logger.Debug("[Scheduler:Feed] Pushing %d jobs", len(jobs))
  39. pool.Push(jobs)
  40. }
  41. }
  42. }
  43. func cleanupScheduler(store *storage.Storage, frequency, archiveReadDays, archiveUnreadDays, archiveBatchSize, sessionsDays int) {
  44. for range time.Tick(time.Duration(frequency) * time.Hour) {
  45. nbSessions := store.CleanOldSessions(sessionsDays)
  46. nbUserSessions := store.CleanOldUserSessions(sessionsDays)
  47. logger.Info("[Scheduler:Cleanup] Cleaned %d sessions and %d user sessions", nbSessions, nbUserSessions)
  48. startTime := time.Now()
  49. if rowsAffected, err := store.ArchiveEntries(model.EntryStatusRead, archiveReadDays, archiveBatchSize); err != nil {
  50. logger.Error("[Scheduler:ArchiveReadEntries] %v", err)
  51. } else {
  52. logger.Info("[Scheduler:ArchiveReadEntries] %d entries changed", rowsAffected)
  53. if config.Opts.HasMetricsCollector() {
  54. metric.ArchiveEntriesDuration.WithLabelValues(model.EntryStatusRead).Observe(time.Since(startTime).Seconds())
  55. }
  56. }
  57. startTime = time.Now()
  58. if rowsAffected, err := store.ArchiveEntries(model.EntryStatusUnread, archiveUnreadDays, archiveBatchSize); err != nil {
  59. logger.Error("[Scheduler:ArchiveUnreadEntries] %v", err)
  60. } else {
  61. logger.Info("[Scheduler:ArchiveUnreadEntries] %d entries changed", rowsAffected)
  62. if config.Opts.HasMetricsCollector() {
  63. metric.ArchiveEntriesDuration.WithLabelValues(model.EntryStatusUnread).Observe(time.Since(startTime).Seconds())
  64. }
  65. }
  66. }
  67. }