batch.go 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137
  1. // SPDX-FileCopyrightText: Copyright The Miniflux Authors. All rights reserved.
  2. // SPDX-License-Identifier: Apache-2.0
  3. package storage // import "miniflux.app/v2/internal/storage"
  4. import (
  5. "database/sql"
  6. "fmt"
  7. "log/slog"
  8. "strconv"
  9. "strings"
  10. "miniflux.app/v2/internal/model"
  11. "miniflux.app/v2/internal/urllib"
  12. )
  13. type BatchBuilder struct {
  14. db *sql.DB
  15. args []any
  16. conditions []string
  17. batchSize int
  18. limitPerHost int
  19. }
  20. func (s *Storage) NewBatchBuilder() *BatchBuilder {
  21. return &BatchBuilder{
  22. db: s.db,
  23. }
  24. }
  25. func (b *BatchBuilder) WithBatchSize(batchSize int) *BatchBuilder {
  26. b.batchSize = batchSize
  27. return b
  28. }
  29. func (b *BatchBuilder) WithUserID(userID int64) *BatchBuilder {
  30. b.conditions = append(b.conditions, "user_id = $"+strconv.Itoa(len(b.args)+1))
  31. b.args = append(b.args, userID)
  32. return b
  33. }
  34. func (b *BatchBuilder) WithCategoryID(categoryID int64) *BatchBuilder {
  35. b.conditions = append(b.conditions, "category_id = $"+strconv.Itoa(len(b.args)+1))
  36. b.args = append(b.args, categoryID)
  37. return b
  38. }
  39. func (b *BatchBuilder) WithErrorLimit(limit int) *BatchBuilder {
  40. if limit > 0 {
  41. b.conditions = append(b.conditions, "parsing_error_count < $"+strconv.Itoa(len(b.args)+1))
  42. b.args = append(b.args, limit)
  43. }
  44. return b
  45. }
  46. func (b *BatchBuilder) WithNextCheckExpired() *BatchBuilder {
  47. b.conditions = append(b.conditions, "next_check_at < now()")
  48. return b
  49. }
  50. func (b *BatchBuilder) WithoutDisabledFeeds() *BatchBuilder {
  51. b.conditions = append(b.conditions, "disabled IS false")
  52. return b
  53. }
  54. func (b *BatchBuilder) WithLimitPerHost(limit int) *BatchBuilder {
  55. if limit > 0 {
  56. b.limitPerHost = limit
  57. }
  58. return b
  59. }
  60. // FetchJobs retrieves a batch of jobs based on the conditions set in the builder.
  61. // When limitPerHost is set, it limits the number of jobs per feed hostname to prevent overwhelming a single host.
  62. func (b *BatchBuilder) FetchJobs() (model.JobList, error) {
  63. query := `SELECT id, user_id, feed_url FROM feeds`
  64. if len(b.conditions) > 0 {
  65. query += " WHERE " + strings.Join(b.conditions, " AND ")
  66. }
  67. query += " ORDER BY next_check_at ASC"
  68. if b.batchSize > 0 {
  69. query += " LIMIT " + strconv.Itoa(b.batchSize)
  70. }
  71. rows, err := b.db.Query(query, b.args...)
  72. if err != nil {
  73. return nil, fmt.Errorf(`store: unable to fetch batch of jobs: %v`, err)
  74. }
  75. defer rows.Close()
  76. jobs := make(model.JobList, 0, b.batchSize)
  77. hosts := make(map[string]int)
  78. nbRows := 0
  79. nbSkippedFeeds := 0
  80. for rows.Next() {
  81. var job model.Job
  82. if err := rows.Scan(&job.FeedID, &job.UserID, &job.FeedURL); err != nil {
  83. return nil, fmt.Errorf(`store: unable to fetch job record: %v`, err)
  84. }
  85. nbRows++
  86. if b.limitPerHost > 0 {
  87. feedHostname := urllib.Domain(job.FeedURL)
  88. if hosts[feedHostname] >= b.limitPerHost {
  89. slog.Debug("Feed host limit reached for this batch",
  90. slog.String("feed_url", job.FeedURL),
  91. slog.String("feed_hostname", feedHostname),
  92. slog.Int("limit_per_host", b.limitPerHost),
  93. slog.Int("current", hosts[feedHostname]),
  94. )
  95. nbSkippedFeeds++
  96. continue
  97. }
  98. hosts[feedHostname]++
  99. }
  100. jobs = append(jobs, job)
  101. }
  102. if err := rows.Err(); err != nil {
  103. return nil, fmt.Errorf(`store: error iterating on job records: %v`, err)
  104. }
  105. slog.Info("Created a batch of feeds",
  106. slog.Int("batch_size", b.batchSize),
  107. slog.Int("rows_count", nbRows),
  108. slog.Int("skipped_feeds_count", nbSkippedFeeds),
  109. slog.Int("jobs_count", len(jobs)),
  110. )
  111. return jobs, nil
  112. }