scheduler.go 2.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778
  1. // Copyright 2018 Frédéric Guillot. All rights reserved.
  2. // Use of this source code is governed by the Apache 2.0
  3. // license that can be found in the LICENSE file.
  4. package scheduler // import "miniflux.app/service/scheduler"
  5. import (
  6. "time"
  7. "miniflux.app/config"
  8. "miniflux.app/logger"
  9. "miniflux.app/metric"
  10. "miniflux.app/model"
  11. "miniflux.app/storage"
  12. "miniflux.app/worker"
  13. )
  14. // Serve starts the internal scheduler.
  15. func Serve(store *storage.Storage, pool *worker.Pool) {
  16. logger.Info(`Starting scheduler...`)
  17. go feedScheduler(
  18. store,
  19. pool,
  20. config.Opts.PollingFrequency(),
  21. config.Opts.BatchSize(),
  22. )
  23. go cleanupScheduler(
  24. store,
  25. config.Opts.CleanupFrequencyHours(),
  26. config.Opts.CleanupArchiveReadDays(),
  27. config.Opts.CleanupArchiveUnreadDays(),
  28. config.Opts.CleanupRemoveSessionsDays(),
  29. )
  30. }
  31. func feedScheduler(store *storage.Storage, pool *worker.Pool, frequency, batchSize int) {
  32. for range time.Tick(time.Duration(frequency) * time.Minute) {
  33. jobs, err := store.NewBatch(batchSize)
  34. if err != nil {
  35. logger.Error("[Scheduler:Feed] %v", err)
  36. } else {
  37. logger.Debug("[Scheduler:Feed] Pushing %d jobs", len(jobs))
  38. pool.Push(jobs)
  39. }
  40. }
  41. }
  42. func cleanupScheduler(store *storage.Storage, frequency, archiveReadDays, archiveUnreadDays, sessionsDays int) {
  43. for range time.Tick(time.Duration(frequency) * time.Hour) {
  44. nbSessions := store.CleanOldSessions(sessionsDays)
  45. nbUserSessions := store.CleanOldUserSessions(sessionsDays)
  46. logger.Info("[Scheduler:Cleanup] Cleaned %d sessions and %d user sessions", nbSessions, nbUserSessions)
  47. startTime := time.Now()
  48. if rowsAffected, err := store.ArchiveEntries(model.EntryStatusRead, archiveReadDays); err != nil {
  49. logger.Error("[Scheduler:ArchiveReadEntries] %v", err)
  50. } else {
  51. logger.Info("[Scheduler:ArchiveReadEntries] %d entries changed", rowsAffected)
  52. if config.Opts.HasMetricsCollector() {
  53. metric.ArchiveEntriesDuration.WithLabelValues(model.EntryStatusRead).Observe(time.Since(startTime).Seconds())
  54. }
  55. }
  56. startTime = time.Now()
  57. if rowsAffected, err := store.ArchiveEntries(model.EntryStatusUnread, archiveUnreadDays); err != nil {
  58. logger.Error("[Scheduler:ArchiveUnreadEntries] %v", err)
  59. } else {
  60. logger.Info("[Scheduler:ArchiveUnreadEntries] %d entries changed", rowsAffected)
  61. if config.Opts.HasMetricsCollector() {
  62. metric.ArchiveEntriesDuration.WithLabelValues(model.EntryStatusUnread).Observe(time.Since(startTime).Seconds())
  63. }
  64. }
  65. }
  66. }