refresh_feeds.go 2.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778
  1. // SPDX-FileCopyrightText: Copyright The Miniflux Authors. All rights reserved.
  2. // SPDX-License-Identifier: Apache-2.0
  3. package cli // import "miniflux.app/v2/internal/cli"
  4. import (
  5. "log/slog"
  6. "sync"
  7. "time"
  8. "miniflux.app/v2/internal/config"
  9. "miniflux.app/v2/internal/model"
  10. feedHandler "miniflux.app/v2/internal/reader/handler"
  11. "miniflux.app/v2/internal/storage"
  12. )
  13. func refreshFeeds(store *storage.Storage) {
  14. var wg sync.WaitGroup
  15. startTime := time.Now()
  16. // Generate a batch of feeds for any user that has feeds to refresh.
  17. batchBuilder := store.NewBatchBuilder()
  18. batchBuilder.WithBatchSize(config.Opts.BatchSize())
  19. batchBuilder.WithErrorLimit(config.Opts.PollingParsingErrorLimit())
  20. batchBuilder.WithoutDisabledFeeds()
  21. batchBuilder.WithNextCheckExpired()
  22. batchBuilder.WithLimitPerHost(config.Opts.PollingLimitPerHost())
  23. jobs, err := batchBuilder.FetchJobs()
  24. if err != nil {
  25. slog.Error("Unable to fetch jobs from database", slog.Any("error", err))
  26. return
  27. }
  28. slog.Debug("Feed URLs in this batch", slog.Any("feed_urls", jobs.FeedURLs()))
  29. nbJobs := len(jobs)
  30. var jobQueue = make(chan model.Job, nbJobs)
  31. slog.Info("Starting a pool of workers",
  32. slog.Int("nb_workers", config.Opts.WorkerPoolSize()),
  33. )
  34. for i := range config.Opts.WorkerPoolSize() {
  35. wg.Add(1)
  36. go func(workerID int) {
  37. defer wg.Done()
  38. for job := range jobQueue {
  39. slog.Info("Refreshing feed",
  40. slog.Int64("feed_id", job.FeedID),
  41. slog.Int64("user_id", job.UserID),
  42. slog.Int("worker_id", workerID),
  43. )
  44. if localizedError := feedHandler.RefreshFeed(store, job.UserID, job.FeedID, false); localizedError != nil {
  45. slog.Warn("Unable to refresh feed",
  46. slog.Int64("feed_id", job.FeedID),
  47. slog.Int64("user_id", job.UserID),
  48. slog.Any("error", localizedError.Error()),
  49. )
  50. }
  51. }
  52. }(i)
  53. }
  54. for _, job := range jobs {
  55. jobQueue <- job
  56. }
  57. close(jobQueue)
  58. wg.Wait()
  59. slog.Info("Refreshed a batch of feeds",
  60. slog.Int("nb_feeds", nbJobs),
  61. slog.String("duration", time.Since(startTime).String()),
  62. )
  63. }