zoukankan      html  css  js  c++  java
  • go 源码学习之---Tail 源码分析

    已经有两个月没有写博客了,也有好几个月没有看go相关的内容了,由于工作原因最近在做java以及大数据相关的内容,导致最近工作较忙,博客停止了更新,正好想捡起之前go的东西,所以找了一个源码学习

    这个也是之前用go写日志收集的时候用到的一个包 :github.com/hpcloud/tail, 这次就学习一下人家的源码,为了方便看这个代码,我将这个包进行了简化,也是用于方便理解,代码放到了:https://github.com/pythonsite/tail, 这个代码包可能无法正常用,只是为了方面理解tail这个包,以及学习人家的代码

    精简后的代码目录

    tail.go
    │
    └─watch
            filechanges.go
            inotify.go
            inotify_tracker.go
            watch.go

    tail.go: 这里包含着tail包的核心代码,主要的逻辑处理时在这个里面

    watch: 这个包主要用于对文件的监控,用于将文件的变化通知到tail.如:文件修改了,文件删除了,文件内容追加了

    tail.go 代码分析

    在tail.go中主要有几下几个结构体:

    // Line 结构体用于存读每行的时候的对象
    type Line struct {
        Text string            //当前行的内容
        Time time.Time        // 时间
        Err  error             // Error from tail
    }
    
    type SeekInfo struct {
        Offset         int64
        Whence        int    
    }
    
    // 关于配置的结构体
    type Config struct {
        Location     *SeekInfo
        ReOpen        bool
        MustExist    bool        // 要打开的文件是否必须存在
        Poll        bool
    
        Pipe        bool
    
        Follow        bool        // 是否继续读取新的一行,可以理解为tail -f 命令
        
    }
    
    // 核心的结构体Tail
    type Tail struct {
        Filename     string                    // 要打开的文件名
        Lines        chan *Line                // 用于存每行内容的Line结构体
    
        Config
    
        watcher     watch.FileWatcher
        changes     *watch.FileChanges
    
        tomb.Tomb    
    
        file         *os.File
        reader         *bufio.Reader
        lk sync.Mutex
    }
    Line 结构体用于存读取文件的每行内容
    Tail 是核心的结构体,我们使用tail这个包的时候其实就是会先调用初始化这个struct的方法TailFile,如我在写日志收集的时候的使用:
        tail,err := tail.TailFile(conf.LogPath,tail.Config{
            ReOpen:true,
            Follow:true,
            Location:&tail.SeekInfo{Offset:0,Whence:2},
            MustExist:false,
            Poll:true,
        })

    既然我们使用的时候就会在最开始的时候调用tail.TailFile方法,就直接看这个方法:

    // 主要用于Tail结构体的初始化
    func TailFile(filename string, config Config) (*Tail, error) {
        t := &Tail {
            Filename:    filename,
            Lines:        make(chan *Line),
            Config:        config,
        }
        t.watcher = watch.NewInotifyFileWatcher(filename)
        if t.MustExist {
            var err error
            t.file, err = OpenFile(t.Filename)
            if err != nil {
                return nil, err
            }
        }
        go t.tailFileSync()
    
        return t, nil
    }

    从这个代码里我们就可以看到它首先初始化了Tail结构体并且对Tail中的watcher进行的复制,先暂时不看watch相关的内容

    然后就是关于文件是否必须存在的判断处理,最后开启了一个一个线程执行tailFileSync()方法,我们接着看tailFileSync方法

    func (tail *Tail) tailFileSync(){
        defer tail.Done()
        defer tail.close()
    
        if !tail.MustExist {
            err := tail.reopen()
            if err != nil {
                if err != tomb.ErrDying {
                    tail.Kill(err)
                }
                return
            }
        }
    
        tail.openReader()
    
        var offset int64
        var err error
        // 一行行读文件内容
        for {
    
            if !tail.Pipe {
                offset,err = tail.Tell()
                if err != nil {
                    tail.Kill(err)
                    return
                }
            }
    
            line, err := tail.readLine()
            if err == nil {
                // 将读取的一行内容放到chan中
                tail.sendLine(line)
            } else if err == io.EOF {
                // 表示读到文件的最后了
                // 如果Follow 设置为false的话就不会继续读文件
                if !tail.Follow {
                    if line != "" {
                        tail.sendLine(line)
                    }
                    return
                }
                // 如果Follow设置为True则会继续读
                if tail.Follow && line != "" {
                    err := tail.seekTo(SeekInfo{Offset: offset, Whence: 0})
                    if err != nil {
                        tail.Kill(err)
                        return
                    }
                }
                // 如果读到文件最后,文件并没有新的内容增加
                err := tail.waitForChanges()
                if err != nil {
                    if err != ErrStop {
                        tail.Kill(err)
                    }
                    return
                }
    
    
            } else {
                // 既不是文件结尾,也没有error
                tail.Killf("error reading %s :%s", tail.Filename, err)
                return
            }
    
    
            select {
            case <- tail.Dying():
                if tail.Err() == errStopAtEOF {
                    continue
                }
                return
            default:    
            }
        }
    }

    这个方法里主要是先调用了openReader方法,这个方法其实并没有做什么,只是对tail.reqader进行了赋值:tail.reader = bufio.NewReader(tail.file)

    接着就是循环一行行的读文件

    在循环里最开始判断了tail.Pipe的值,这个值一般开始我也并不会设置,所以默认就是false,所以就会执行tail.Tell()方法,这个方法主要是用于获取文件当前行的位置信息,下面是Tell的代码内容:

    // 获取文件当前行的位置信息
    func (tail *Tail) Tell()(offset int64, err error) {
        if tail.file == nil {
            return
        }
        offset, err = tail.file.Seek(0, os.SEEK_CUR)
        if err != nil {
            return
        }
        tail.lk.Lock()
        defer tail.lk.Unlock()
        if tail.reader == nil {
            return
        }
        offset -= int64(tail.reader.Buffered())
        return
    }

    接着会调用tail.readLine()方法,这个方法就是用于获取文件的一行内容,同时将一行内容实例化为Line对象,然后扔到管道tail.Lines中

    //将读取的文件的每行内容存入到Line结构体中,并最终存入到tail.Lines的chan中
    func (tail *Tail) sendLine(line string) bool {
        now := time.Now()
        lines := []string{line}
    
        for _, line := range lines {
            tail.Lines <- &Line {
                line,
                now,
                nil,
            }
        }
        return true
    }

    最后的大量if 判断其实主要是针对读到文件末尾后的一些操作,

    Tail结构体在最后定义的时候有一个参数:Follow, 这个参数的目的就是当读到文件最后的时候是否继续读文件, 如果最开始设置了false,那么读到最后之后就不会在读文件了

    如果设置为True,那么读到文件最后之后会保存文件的位置信息,并执行waitForChanges() 去等待文件的变化,waitForChanges()代码内容如下:

    // 等待文件的变化事件
    func (tail *Tail) waitForChanges() error {
        if tail.changes == nil {
            // 这里是获取文件指针的当前位置
            pos, err := tail.file.Seek(0,os.SEEK_CUR)
            if err != nil {
                return err
            }
            tail.changes, err = tail.watcher.ChangeEvents(&tail.Tomb, pos)
            if err != nil {
                return err
            }
        }
        // 和inotify中进行很巧妙的配合,这里通过select 来进行查看那个chan变化了,来知道文件的变化
        select {
        case <- tail.changes.Modified:            // 文件被修改
            return nil
        case <- tail.changes.Deleted:            // 文件被删除或者移动到其他目录
            tail.changes = nil
            // 如果文件被删除或者被移动到其他目录,则会尝试重新打开文件
            if tail.ReOpen {
                fmt.Printf("Re-opening moved/deleted file %s...",tail.Filename)
                if err := tail.reopen();err != nil {
                    return err
                }
                fmt.Printf("Successfully reopened %s", tail.Filename)
                tail.openReader()
                return nil
            } else {
                fmt.Printf("Stoping tail as file not longer exists: %s", tail.Filename)
                return ErrStop
            }
        case <- tail.changes.Truncated:            // 文件被追加新的内容
            fmt.Printf("Re-opening truncated file %s....", tail.Filename)
            if err := tail.reopen();err != nil {
                return err
            }
            fmt.Printf("SuccessFuly reopend truncated %s", tail.Filename)
            tail.openReader()
            return nil
        case <- tail.Dying():
            return nil
        }
        panic("unreachable")
    }

    看到这里的时候其实就能感觉到,别人写的代码其实也并不是非常复杂,也是很普通的代码,但是你会觉得人家很多地方用的非常巧妙,

    这段代码中主要的是的内容就是select部分,这个部分通过select监控

    tail.changes.Modified
    tail.changes.Deleted
    tail.changes.Truncated

    从而知道文件的变化,是修改了,还是删除了,还是追加内容了,这几个其实都是一个channel,这几个channel中的内容是怎么放进去的呢,接下来看watch包中的内容

    watch包代码分析

    首先先看一下watch包中的watch.go,这个里面其实就是定一个了一个FileWatcher的接口

    type FileWatcher interface {
        BlockUntilExists(*tomb.Tomb) error
    
        ChangeEvents(*tomb.Tomb, int64) (*FileChanges, error)
    }

    接着我们看一下inotify.go文件,这个里面我们就可以看到定一个InotifyFileWatcher结构体,并且实现了FileWatcher 这个接口

    type InotifyFileWatcher struct {
        Filename     string
        Size        int64
    }
    
    func NewInotifyFileWatcher(filename string) *InotifyFileWatcher {
        fw := &InotifyFileWatcher {
            filepath.Clean(filename),
            0,
        }
        return fw
    }
    
    
    // 关于文件改变事件的处理,当文件被修改了或者文件内容被追加了,进行通知
    func (fw *InotifyFileWatcher) ChangeEvents(t *tomb.Tomb, pos int64) (*FileChanges, error) {
        err := Watch(fw.Filename)
        if err != nil {
            return nil, err
        }
    
        changes := NewFileChanges()
        fw.Size = pos
    
        go func() {
            events := Events(fw.Filename)
    
            for {
                prevSize := fw.Size
    
                var evt fsnotify.Event
                var ok bool
                
                select {
                case evt, ok = <- events:
                    if !ok {
                        RemoveWatch(fw.Filename)
                        return
                    }
                case <- t.Dying():
                    RemoveWatch(fw.Filename)
                    return
                }
                switch {
                case evt.Op & fsnotify.Remove == fsnotify.Remove:
                    fallthrough
                case evt.Op & fsnotify.Rename == fsnotify.Rename:
                    RemoveWatch(fw.Filename)
                    changes.NotifyDeleted()
                    return
    
                case evt.Op & fsnotify.Chmod == fsnotify.Chmod:
                    fallthrough
                case evt.Op & fsnotify.Write == fsnotify.Write:
                    fi, err := os.Stat(fw.Filename)
                    if err != nil {
                        // 文件如果被删除了通知文件删除到chan
                        if os.IsNotExist(err) {
                            RemoveWatch(fw.Filename)
                            changes.NotifyDeleted()
                            return
                        }
    
                    }
                    fw.Size = fi.Size()
    
                    if prevSize > 0 && prevSize > fw.Size {
                        // 表示文件内容增加了
                        changes.NotifyTruncated()
                    } else {
                        // 表示文件被修改了
                        changes.NotifyModified()
                    }
    
                    prevSize = fw.Size
                }
    
            }
        }()
        return changes, nil
    }
    
    func (fw *InotifyFileWatcher) BlockUntilExists(t *tomb.Tomb) error {
        err := WatchCreate(fw.Filename)
        if err != nil {
            return err
        }
        defer RemoveWatchCreate(fw.Filename)
        if _, err := os.Stat(fw.Filename);!os.IsNotExist(err) {
            return err
        }
        events := Events(fw.Filename)
        for {
            select {
            case evt, ok := <- events:
                if !ok {
                    return fmt.Errorf("inotify watcher has been closed")
                }
                evtName, err := filepath.Abs(evt.Name)
                if err != nil {
                    return err
                }
                fwFilename, err := filepath.Abs(fw.Filename)
                if err != nil {
                    return err
                }
                if evtName == fwFilename {
                    return nil
                }
            case <- t.Dying():
                return tomb.ErrDying
            }
        }
        panic("unreachable")
    }

    实现的接口就两个方法:

    ChangeEvents: 这个主要是监控文件的变化,是删除了,还是被修改了,或者是文件,然后将状态信息通过调用:changes.NotifyTruncated()或者
    changes.NotifyDeleted() 或者changes.NotifyModified() 将状态信息更新到channel中,这样我们在分析tail.go 中最后的分析的那部分channel中的数据,就是在这里
    放进去的
    BlockUntilExists:这个主要是关于文件不存在的时候,如果最开始的时候可以允许文件不存在,那么就会 在这里通过for循环一直等待,知道文件存在
     
    再看看filechanges.go 文件,代码内容如下:
    type FileChanges struct {
        Modified  chan bool        // 修改
        Truncated chan bool        // 增加
        Deleted   chan bool        // 删除
    }
    
    func NewFileChanges() *FileChanges {
        return &FileChanges{
            make(chan bool, 1),
            make(chan bool, 1),
            make(chan bool, 1),
        }
    }
    
    func (fc *FileChanges) NotifyModified() {
        sendOnlyIfEmpty(fc.Modified)
    }
    
    func (fc *FileChanges) NotifyTruncated() {
        sendOnlyIfEmpty(fc.Truncated)
    }
    
    func (fc *FileChanges) NotifyDeleted() {
        sendOnlyIfEmpty(fc.Deleted)
    }
    
    func sendOnlyIfEmpty(ch chan bool) {
        select {
        case ch <- true:
        default:
        }
    }

    在这个里面也是可以学习到人家写的这个地方非常巧妙,虽然谈不上代码高达上,但是看着会让你很舒服,通过这个结构体,当文件被删除,修改和增加的时候就会让对应的channel中插入一个true,并且这里

    的channel都是不带缓冲区的,只有当tail中触发一次之后,channel中的内容就会被获取出来,从而触发tail继续读文件的内容

  • 相关阅读:
    git的突出解决--git rebase之abort、continue、skip
    servlet中service() 和doGet() 、doPost() 学习笔记
    Spring IoC容器初始化过程学习
    浅探SpringMVC中HandlerExecutionChain之handler、interceptor
    常用路由命令
    路由配置命令
    cout 计算顺序问题
    第一次作业
    记录一个微信网页授权中不小心踩到的坑(Curl请求返回false)
    善用mysql中的FROM_UNIXTIME()函数和UNIX_TIMESTAMP()函数
  • 原文地址:https://www.cnblogs.com/zhaof/p/9663350.html
Copyright © 2011-2022 走看看