Skip to content

Commit

Permalink
address review notes
Browse files Browse the repository at this point in the history
  • Loading branch information
kvch committed Oct 2, 2020
1 parent 7dd231b commit fb3dbab
Showing 1 changed file with 21 additions and 33 deletions.
54 changes: 21 additions & 33 deletions filebeat/input/filestream/fswatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,13 +131,10 @@ func (w *fileWatcher) watch(ctx unison.Canceler) {
w.log.Info("Start next scan")

paths := w.scanner.GetFiles()
files := getKeys(paths)

newFiles := make(map[string]os.FileInfo)

for i := 0; i < len(paths); i++ {
path := files[i]
info := paths[path]
for path, info := range paths {

prevInfo, ok := w.prev[path]
if !ok {
Expand All @@ -149,7 +146,7 @@ func (w *fileWatcher) watch(ctx unison.Canceler) {
select {
case <-ctx.Done():
return
case w.events <- w.writeEvent(path, info):
case w.events <- writeEvent(path, info):
}
}

Expand All @@ -159,33 +156,33 @@ func (w *fileWatcher) watch(ctx unison.Canceler) {

// remaining files are in the prev map are the ones that are missing
// either because they have been deleted or renamed
for deletedPath, deletedInfo := range w.prev {
for removedPath, removedInfo := range w.prev {
for newPath, newInfo := range newFiles {
if os.SameFile(deletedInfo, newInfo) {
if os.SameFile(removedInfo, newInfo) {
select {
case <-ctx.Done():
return
case w.events <- w.renamedEvent(deletedPath, newPath, newInfo):
case w.events <- renamedEvent(removedPath, newPath, newInfo):
delete(newFiles, newPath)
goto CHECK_NEXT_DELETED
goto CHECK_NEXT_REMOVED
}
}
}

select {
case <-ctx.Done():
return
case w.events <- w.deleteEvent(deletedPath, deletedInfo):
case w.events <- deleteEvent(removedPath, removedInfo):
}
CHECK_NEXT_DELETED:
CHECK_NEXT_REMOVED:
}

// remaining files in newFiles are new
for path, info := range newFiles {
select {
case <-ctx.Done():
return
case w.events <- w.createEvent(path, info):
case w.events <- createEvent(path, info):
}

}
Expand All @@ -194,27 +191,19 @@ func (w *fileWatcher) watch(ctx unison.Canceler) {
w.prev = paths
}

func getKeys(paths map[string]os.FileInfo) []string {
files := make([]string, 0)
for file := range paths {
files = append(files, file)
}
return files
}

func (w *fileWatcher) createEvent(path string, fi os.FileInfo) loginp.FSEvent {
func createEvent(path string, fi os.FileInfo) loginp.FSEvent {
return loginp.FSEvent{Op: loginp.OpCreate, OldPath: "", NewPath: path, Info: fi}
}

func (w *fileWatcher) writeEvent(path string, fi os.FileInfo) loginp.FSEvent {
func writeEvent(path string, fi os.FileInfo) loginp.FSEvent {
return loginp.FSEvent{Op: loginp.OpWrite, OldPath: path, NewPath: path, Info: fi}
}

func (w *fileWatcher) renamedEvent(oldPath, path string, fi os.FileInfo) loginp.FSEvent {
func renamedEvent(oldPath, path string, fi os.FileInfo) loginp.FSEvent {
return loginp.FSEvent{Op: loginp.OpRename, OldPath: oldPath, NewPath: path, Info: fi}
}

func (w *fileWatcher) deleteEvent(path string, fi os.FileInfo) loginp.FSEvent {
func deleteEvent(path string, fi os.FileInfo) loginp.FSEvent {
return loginp.FSEvent{Op: loginp.OpDelete, OldPath: path, NewPath: "", Info: fi}
}

Expand Down Expand Up @@ -295,7 +284,7 @@ func (s *fileScanner) normalizeGlobPatterns() error {
// GetFiles returns a map of files and fileinfos which
// match the configured paths.
func (s *fileScanner) GetFiles() map[string]os.FileInfo {
paths := map[string]os.FileInfo{}
pathInfo := map[string]os.FileInfo{}

for _, path := range s.paths {
matches, err := filepath.Glob(path)
Expand All @@ -304,31 +293,30 @@ func (s *fileScanner) GetFiles() map[string]os.FileInfo {
continue
}

OUTER:
for _, file := range matches {
if s.skippedFile(file) {
if s.shouldSkipFile(file) {
continue
}

// If symlink is enabled, it is checked that original is not part of same input
// If original is harvested by other input, states will potentially overwrite each other
if s.isOriginalAndSymlinkConfigured(file, paths) {
continue OUTER
if s.isOriginalAndSymlinkConfigured(file, pathInfo) {
continue
}

fileInfo, err := os.Stat(file)
if err != nil {
s.log.Debug("stat(%s) failed: %s", file, err)
continue
}
paths[file] = fileInfo
pathInfo[file] = fileInfo
}
}

return paths
return pathInfo
}

func (s *fileScanner) skippedFile(file string) bool {
func (s *fileScanner) shouldSkipFile(file string) bool {
if s.isFileExcluded(file) {
s.log.Debugf("Exclude file: %s", file)
return true
Expand Down Expand Up @@ -359,7 +347,7 @@ func (s *fileScanner) isOriginalAndSymlinkConfigured(file string, paths map[stri
fileInfo, err := os.Stat(file)
if err != nil {
s.log.Debugf("stat(%s) failed: %s", file, err)
return true
return false
}

for _, finfo := range paths {
Expand Down

0 comments on commit fb3dbab

Please sign in to comment.