file_change_notify.go 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169
  1. package filehelper
  2. import (
  3. "github.com/fsnotify/fsnotify"
  4. log "github.com/sirupsen/logrus"
  5. "path/filepath"
  6. "time"
  7. "sync"
  8. )
  9. var (
  10. debounceWriteLog map[string]*FsNotifyLogEntry
  11. debounceWriteLogMutex = sync.Mutex{}
  12. )
  13. func init() {
  14. debounceWriteLog = make(map[string]*FsNotifyLogEntry)
  15. }
  16. type FsNotifyLogEntry struct {
  17. callbackWrapper *time.Timer
  18. callbackComplete bool
  19. }
  20. const (
  21. debounceDelay = 300 * time.Millisecond
  22. )
  23. type watchContext struct {
  24. filename string
  25. filedir string
  26. callback func(filename string)
  27. interestedEvent fsnotify.Op
  28. event *fsnotify.Event
  29. }
  30. func WatchDirectoryCreate(fullpath string, callback func(filename string)) {
  31. watchPath(&watchContext{
  32. filedir: fullpath,
  33. filename: "",
  34. callback: callback,
  35. interestedEvent: fsnotify.Create,
  36. })
  37. }
  38. func WatchDirectoryWrite(fullpath string, callback func(filename string)) {
  39. watchPath(&watchContext{
  40. filedir: fullpath,
  41. filename: "",
  42. callback: callback,
  43. interestedEvent: fsnotify.Write,
  44. })
  45. }
  46. func WatchFileWrite(fullpath string, callback func(filename string)) {
  47. filename := filepath.Base(fullpath)
  48. filedir := filepath.Dir(fullpath)
  49. watchPath(&watchContext{
  50. filedir: filedir,
  51. filename: filename,
  52. callback: callback,
  53. interestedEvent: fsnotify.Write,
  54. })
  55. }
  56. func watchPath(ctx *watchContext) {
  57. watcher, err := fsnotify.NewWatcher()
  58. if err != nil {
  59. log.Errorf("Could not watch for files being created: %v", err)
  60. return
  61. }
  62. defer watcher.Close()
  63. done := make(chan bool)
  64. go func() {
  65. for {
  66. processEvent(ctx, watcher)
  67. }
  68. }()
  69. err = watcher.Add(ctx.filedir)
  70. if err != nil {
  71. log.Errorf("Could not create watcher: %v", err)
  72. }
  73. <-done
  74. }
  75. func processEvent(ctx *watchContext, watcher *fsnotify.Watcher) {
  76. select {
  77. case event, ok := <-watcher.Events:
  78. ctx.event = &event
  79. if !consumeEvent(ok, ctx) {
  80. return
  81. }
  82. break
  83. case err := <-watcher.Errors:
  84. log.Errorf("Error in fsnotify: %v", err)
  85. return
  86. }
  87. }
  88. func consumeEvent(ok bool, ctx *watchContext) bool {
  89. if !ok {
  90. return false
  91. }
  92. if ctx.filename != "" && filepath.Base(ctx.event.Name) != ctx.filename {
  93. log.Tracef("fsnotify irreleventa event different file %+v", ctx.event)
  94. return true
  95. }
  96. consumeRelevantEvents(ctx)
  97. return true
  98. }
  99. func consumeRelevantEvents(ctx *watchContext) {
  100. if ctx.event.Has(ctx.interestedEvent) {
  101. log.Debugf("fsnotify event relevant: %v", ctx.event)
  102. processDebounce(ctx)
  103. } else {
  104. log.Debugf("fsnotify event irrelevant: %v", ctx.event)
  105. }
  106. }
  107. func processDebounce(ctx *watchContext) {
  108. debounceWriteLogMutex.Lock()
  109. logEntry, found := debounceWriteLog[ctx.filename]
  110. if !found {
  111. logEntry = &FsNotifyLogEntry{
  112. callbackComplete: false,
  113. callbackWrapper: nil,
  114. }
  115. debounceWriteLog[ctx.filename] = logEntry
  116. }
  117. log.Debugf("fsnotify event %+v", logEntry)
  118. if logEntry.callbackComplete || logEntry.callbackWrapper == nil {
  119. log.Debugf("fsnotify event callback queued within debounce delay: %v", ctx.filename)
  120. logEntry.callbackComplete = false
  121. logEntry.callbackWrapper = time.AfterFunc(debounceDelay, func() {
  122. log.Debugf("fsnotify event callback being fired: %v", ctx.filename)
  123. ctx.callback(ctx.event.Name)
  124. logEntry.callbackComplete = true
  125. })
  126. } else {
  127. log.Debugf("fsnotify event suppressed because it's within the debounce delay: %v", ctx.filename)
  128. }
  129. debounceWriteLogMutex.Unlock()
  130. }