refresh_feeds.go 1.9 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677
  1. // SPDX-FileCopyrightText: Copyright The Miniflux Authors. All rights reserved.
  2. // SPDX-License-Identifier: Apache-2.0
  3. package cli // import "miniflux.app/v2/internal/cli"
  4. import (
  5. "log/slog"
  6. "sync"
  7. "time"
  8. "miniflux.app/v2/internal/config"
  9. "miniflux.app/v2/internal/model"
  10. feedHandler "miniflux.app/v2/internal/reader/handler"
  11. "miniflux.app/v2/internal/storage"
  12. )
  13. func refreshFeeds(store *storage.Storage) {
  14. var wg sync.WaitGroup
  15. startTime := time.Now()
  16. // Generate a batch of feeds for any user that has feeds to refresh.
  17. jobs, err := store.NewBatchBuilder().
  18. WithBatchSize(config.Opts.BatchSize()).
  19. WithErrorLimit(config.Opts.PollingParsingErrorLimit()).
  20. WithoutDisabledFeeds().
  21. WithNextCheckExpired().
  22. WithLimitPerHost(config.Opts.PollingLimitPerHost()).
  23. FetchJobs()
  24. if err != nil {
  25. slog.Error("Unable to fetch jobs from database", slog.Any("error", err))
  26. return
  27. }
  28. slog.Debug("Feed URLs in this batch", slog.Any("feed_urls", jobs.FeedURLs()))
  29. nbJobs := len(jobs)
  30. jobQueue := make(chan model.Job, nbJobs)
  31. slog.Info("Starting a pool of workers",
  32. slog.Int("nb_workers", config.Opts.WorkerPoolSize()),
  33. )
  34. for i := range config.Opts.WorkerPoolSize() {
  35. wg.Add(1)
  36. go func(workerID int) {
  37. defer wg.Done()
  38. for job := range jobQueue {
  39. slog.Info("Refreshing feed",
  40. slog.Int64("feed_id", job.FeedID),
  41. slog.Int64("user_id", job.UserID),
  42. slog.Int("worker_id", workerID),
  43. )
  44. if localizedError := feedHandler.RefreshFeed(store, job.UserID, job.FeedID, false); localizedError != nil {
  45. slog.Warn("Unable to refresh feed",
  46. slog.Int64("feed_id", job.FeedID),
  47. slog.Int64("user_id", job.UserID),
  48. slog.Any("error", localizedError.Error()),
  49. )
  50. }
  51. }
  52. }(i)
  53. }
  54. for _, job := range jobs {
  55. jobQueue <- job
  56. }
  57. close(jobQueue)
  58. wg.Wait()
  59. slog.Info("Refreshed a batch of feeds",
  60. slog.Int("nb_feeds", nbJobs),
  61. slog.String("duration", time.Since(startTime).String()),
  62. )
  63. }