|
|
@@ -5,6 +5,7 @@
|
|
|
package storage // import "miniflux.app/storage"
|
|
|
|
|
|
import (
|
|
|
+ "database/sql"
|
|
|
"errors"
|
|
|
"fmt"
|
|
|
"time"
|
|
|
@@ -74,7 +75,7 @@ func (s *Storage) UpdateEntryContent(entry *model.Entry) error {
|
|
|
}
|
|
|
|
|
|
// createEntry add a new entry.
|
|
|
-func (s *Storage) createEntry(entry *model.Entry) error {
|
|
|
+func (s *Storage) createEntry(tx *sql.Tx, entry *model.Entry) error {
|
|
|
query := `
|
|
|
INSERT INTO entries
|
|
|
(title, hash, url, comments_url, published_at, content, author, user_id, feed_id, changed_at, document_vectors)
|
|
|
@@ -83,7 +84,7 @@ func (s *Storage) createEntry(entry *model.Entry) error {
|
|
|
RETURNING
|
|
|
id, status
|
|
|
`
|
|
|
- err := s.db.QueryRow(
|
|
|
+ err := tx.QueryRow(
|
|
|
query,
|
|
|
entry.Title,
|
|
|
entry.Hash,
|
|
|
@@ -103,7 +104,7 @@ func (s *Storage) createEntry(entry *model.Entry) error {
|
|
|
for i := 0; i < len(entry.Enclosures); i++ {
|
|
|
entry.Enclosures[i].EntryID = entry.ID
|
|
|
entry.Enclosures[i].UserID = entry.UserID
|
|
|
- err := s.CreateEnclosure(entry.Enclosures[i])
|
|
|
+ err := s.createEnclosure(tx, entry.Enclosures[i])
|
|
|
if err != nil {
|
|
|
return err
|
|
|
}
|
|
|
@@ -115,7 +116,7 @@ func (s *Storage) createEntry(entry *model.Entry) error {
|
|
|
// updateEntry updates an entry when a feed is refreshed.
|
|
|
// Note: we do not update the published date because some feeds do not contains any date,
|
|
|
// it default to time.Now() which could change the order of items on the history page.
|
|
|
-func (s *Storage) updateEntry(entry *model.Entry) error {
|
|
|
+func (s *Storage) updateEntry(tx *sql.Tx, entry *model.Entry) error {
|
|
|
query := `
|
|
|
UPDATE
|
|
|
entries
|
|
|
@@ -131,7 +132,7 @@ func (s *Storage) updateEntry(entry *model.Entry) error {
|
|
|
RETURNING
|
|
|
id
|
|
|
`
|
|
|
- err := s.db.QueryRow(
|
|
|
+ err := tx.QueryRow(
|
|
|
query,
|
|
|
entry.Title,
|
|
|
entry.URL,
|
|
|
@@ -152,15 +153,19 @@ func (s *Storage) updateEntry(entry *model.Entry) error {
|
|
|
enclosure.EntryID = entry.ID
|
|
|
}
|
|
|
|
|
|
- return s.UpdateEnclosures(entry.Enclosures)
|
|
|
+ return s.updateEnclosures(tx, entry.UserID, entry.ID, entry.Enclosures)
|
|
|
}
|
|
|
|
|
|
// entryExists checks if an entry already exists based on its hash when refreshing a feed.
|
|
|
-func (s *Storage) entryExists(entry *model.Entry) bool {
|
|
|
- var result int
|
|
|
- query := `SELECT 1 FROM entries WHERE user_id=$1 AND feed_id=$2 AND hash=$3`
|
|
|
- s.db.QueryRow(query, entry.UserID, entry.FeedID, entry.Hash).Scan(&result)
|
|
|
- return result == 1
|
|
|
+func (s *Storage) entryExists(tx *sql.Tx, entry *model.Entry) bool {
|
|
|
+ var result bool
|
|
|
+ tx.QueryRow(
|
|
|
+ `SELECT true FROM entries WHERE user_id=$1 AND feed_id=$2 AND hash=$3`,
|
|
|
+ entry.UserID,
|
|
|
+ entry.FeedID,
|
|
|
+ entry.Hash,
|
|
|
+ ).Scan(&result)
|
|
|
+ return result
|
|
|
}
|
|
|
|
|
|
// cleanupEntries deletes from the database entries marked as "removed" and not visible anymore in the feed.
|
|
|
@@ -180,31 +185,44 @@ func (s *Storage) cleanupEntries(feedID int64, entryHashes []string) error {
|
|
|
return nil
|
|
|
}
|
|
|
|
|
|
-// UpdateEntries updates a list of entries while refreshing a feed.
|
|
|
-func (s *Storage) UpdateEntries(userID, feedID int64, entries model.Entries, updateExistingEntries bool) (err error) {
|
|
|
+// RefreshFeedEntries updates feed entries while refreshing a feed.
|
|
|
+func (s *Storage) RefreshFeedEntries(userID, feedID int64, entries model.Entries, updateExistingEntries bool) (err error) {
|
|
|
var entryHashes []string
|
|
|
+
|
|
|
for _, entry := range entries {
|
|
|
entry.UserID = userID
|
|
|
entry.FeedID = feedID
|
|
|
|
|
|
- if s.entryExists(entry) {
|
|
|
+ tx, err := s.db.Begin()
|
|
|
+ if err != nil {
|
|
|
+ return fmt.Errorf(`store: unable to start transaction: %v`, err)
|
|
|
+ }
|
|
|
+
|
|
|
+ if s.entryExists(tx, entry) {
|
|
|
if updateExistingEntries {
|
|
|
- err = s.updateEntry(entry)
|
|
|
+ err = s.updateEntry(tx, entry)
|
|
|
}
|
|
|
} else {
|
|
|
- err = s.createEntry(entry)
|
|
|
+ err = s.createEntry(tx, entry)
|
|
|
}
|
|
|
|
|
|
if err != nil {
|
|
|
+ tx.Rollback()
|
|
|
return err
|
|
|
}
|
|
|
|
|
|
+ if err := tx.Commit(); err != nil {
|
|
|
+ return fmt.Errorf(`store: unable to commit transaction: %v`, err)
|
|
|
+ }
|
|
|
+
|
|
|
entryHashes = append(entryHashes, entry.Hash)
|
|
|
}
|
|
|
|
|
|
- if err := s.cleanupEntries(feedID, entryHashes); err != nil {
|
|
|
- logger.Error(`store: feed #%d: %v`, feedID, err)
|
|
|
- }
|
|
|
+ go func() {
|
|
|
+ if err := s.cleanupEntries(feedID, entryHashes); err != nil {
|
|
|
+ logger.Error(`store: feed #%d: %v`, feedID, err)
|
|
|
+ }
|
|
|
+ }()
|
|
|
|
|
|
return nil
|
|
|
}
|