Преглед на файлове

refactor(worker): add graceful shutdown to worker pool

Close the job channel and use a WaitGroup so workers drain in-flight
jobs before the process exits. Also fix godoc comments.
Frédéric Guillot преди 1 седмица
родител
ревизия
2572809998
променени са 3 файла, в които са добавени 23 реда и са изтрити 8 реда
  1. 4 0
      internal/cli/daemon.go
  2. 13 3
      internal/worker/pool.go
  3. 6 5
      internal/worker/worker.go

+ 4 - 0
internal/cli/daemon.go

@@ -94,5 +94,9 @@ func startDaemon(store *storage.Storage) {
 		slog.Debug("No HTTP servers to shut down.")
 	}
 
+	slog.Debug("Shutting down worker pool...")
+	pool.Shutdown()
+	slog.Debug("Worker pool shut down.")
+
 	slog.Debug("Process gracefully stopped")
 }

+ 13 - 3
internal/worker/pool.go

@@ -4,22 +4,31 @@
 package worker // import "miniflux.app/v2/internal/worker"
 
 import (
+	"sync"
+
 	"miniflux.app/v2/internal/model"
 	"miniflux.app/v2/internal/storage"
 )
 
-// Pool handles a pool of workers.
+// Pool manages a set of background workers that process feed refresh jobs.
 type Pool struct {
 	queue chan model.Job
+	wg    sync.WaitGroup
 }
 
-// Push send a list of jobs to the queue.
+// Push sends a list of jobs to the queue.
 func (p *Pool) Push(jobs model.JobList) {
 	for _, job := range jobs {
 		p.queue <- job
 	}
 }
 
+// Shutdown closes the job queue and waits for all workers to finish their current jobs.
+func (p *Pool) Shutdown() {
+	close(p.queue)
+	p.wg.Wait()
+}
+
 // NewPool creates a pool of background workers.
 func NewPool(store *storage.Storage, nbWorkers int) *Pool {
 	workerPool := &Pool{
@@ -27,8 +36,9 @@ func NewPool(store *storage.Storage, nbWorkers int) *Pool {
 	}
 
 	for i := range nbWorkers {
+		workerPool.wg.Add(1)
 		worker := &worker{id: i, store: store}
-		go worker.Run(workerPool.queue)
+		go worker.Run(workerPool.queue, &workerPool.wg)
 	}
 
 	return workerPool

+ 6 - 5
internal/worker/worker.go

@@ -5,6 +5,7 @@ package worker // import "miniflux.app/v2/internal/worker"
 
 import (
 	"log/slog"
+	"sync"
 	"time"
 
 	"miniflux.app/v2/internal/config"
@@ -14,20 +15,20 @@ import (
 	"miniflux.app/v2/internal/storage"
 )
 
-// worker refreshes a feed in the background.
 type worker struct {
 	id    int
 	store *storage.Storage
 }
 
-// Run wait for a job and refresh the given feed.
-func (w *worker) Run(c <-chan model.Job) {
+// Run processes feed refresh jobs from the channel until it is closed.
+func (w *worker) Run(c <-chan model.Job, wg *sync.WaitGroup) {
+	defer wg.Done()
+
 	slog.Debug("Worker started",
 		slog.Int("worker_id", w.id),
 	)
 
-	for {
-		job := <-c
+	for job := range c {
 		slog.Debug("Job received by worker",
 			slog.Int("worker_id", w.id),
 			slog.Int64("user_id", job.UserID),