refresh_feeds.go 2.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081
  1. // SPDX-FileCopyrightText: Copyright The Miniflux Authors. All rights reserved.
  2. // SPDX-License-Identifier: Apache-2.0
  3. package cli // import "miniflux.app/v2/internal/cli"
  4. import (
  5. "log/slog"
  6. "sync"
  7. "time"
  8. "miniflux.app/v2/internal/config"
  9. "miniflux.app/v2/internal/model"
  10. feedHandler "miniflux.app/v2/internal/reader/handler"
  11. "miniflux.app/v2/internal/storage"
  12. )
  13. func refreshFeeds(store *storage.Storage) {
  14. var wg sync.WaitGroup
  15. startTime := time.Now()
  16. // Generate a batch of feeds for any user that has feeds to refresh.
  17. batchBuilder := store.NewBatchBuilder()
  18. batchBuilder.WithBatchSize(config.Opts.BatchSize())
  19. batchBuilder.WithErrorLimit(config.Opts.PollingParsingErrorLimit())
  20. batchBuilder.WithoutDisabledFeeds()
  21. batchBuilder.WithNextCheckExpired()
  22. jobs, err := batchBuilder.FetchJobs()
  23. if err != nil {
  24. slog.Error("Unable to fetch jobs from database", slog.Any("error", err))
  25. return
  26. }
  27. nbJobs := len(jobs)
  28. slog.Info("Created a batch of feeds",
  29. slog.Int("nb_jobs", nbJobs),
  30. slog.Int("batch_size", config.Opts.BatchSize()),
  31. )
  32. var jobQueue = make(chan model.Job, nbJobs)
  33. slog.Info("Starting a pool of workers",
  34. slog.Int("nb_workers", config.Opts.WorkerPoolSize()),
  35. )
  36. for i := range config.Opts.WorkerPoolSize() {
  37. wg.Add(1)
  38. go func(workerID int) {
  39. defer wg.Done()
  40. for job := range jobQueue {
  41. slog.Info("Refreshing feed",
  42. slog.Int64("feed_id", job.FeedID),
  43. slog.Int64("user_id", job.UserID),
  44. slog.Int("worker_id", workerID),
  45. )
  46. if localizedError := feedHandler.RefreshFeed(store, job.UserID, job.FeedID, false); localizedError != nil {
  47. slog.Warn("Unable to refresh feed",
  48. slog.Int64("feed_id", job.FeedID),
  49. slog.Int64("user_id", job.UserID),
  50. slog.Any("error", localizedError.Error()),
  51. )
  52. }
  53. }
  54. }(i)
  55. }
  56. for _, job := range jobs {
  57. jobQueue <- job
  58. }
  59. close(jobQueue)
  60. wg.Wait()
  61. slog.Info("Refreshed a batch of feeds",
  62. slog.Int("nb_feeds", nbJobs),
  63. slog.String("duration", time.Since(startTime).String()),
  64. )
  65. }