Life of xhu

gin 源码阅读笔记

Dec 21, 2017  |  Go

今天来看一个 Go 项目的源码: gin: Live reload utility for Go web servers.

这个项目的简介是实现 Go web server 的实时重载, 现在这个博客的 dev 模式就是使用这个项目启动的, 启动脚本如下:

gin --excludeDir archives --excludeDir node_modules --excludeDir app/assets --all --port 8283 --appPort 13109

忽略命令中的一串参数, 这行脚本的作用是, 整个项目对外暴露 8283, 请求会被重定向到 13109 端口上, 然后 main.go 是 go server 入口并且实现热重载, 这样分析之后我们我们可以把这个问题分成两个部分:

  1. 怎么在内部启动 go server 并做 http 数据包的转发
  2. 怎么一个检测文件改动并重启内部服务器

带着这两个问题, 我们直接开始看源码吧, 以下代码都省略了无关代码:

// main.go
func MainAction(c *cli.Context) {
    os.Setenv("PORT", appPort)

  wd, err := os.Getwd()

  buildArgs, err := shellwords.Parse(c.GlobalString("buildArgs"))

  buildPath := c.GlobalString("build")
  builder := gin.NewBuilder(buildPath, c.GlobalString("bin"), c.GlobalBool("godep"), wd, buildArgs)
  runner := gin.NewRunner(filepath.Join(wd, builder.Binary()), c.Args()...)
  runner.SetWriter(os.Stdout)
  proxy := gin.NewProxy(builder, runner)

  config := &gin.Config{
    Laddr:    laddr,
    Port:     port,
    ProxyTo:  "http://localhost:" + appPort,
    KeyFile:  keyFile,
    CertFile: certFile,
  }

  err = proxy.Run(config)

  shutdown(runner)

  build(builder, runner, logger)

  // scan for changes
  scanChanges(c.GlobalString("path"), c.GlobalStringSlice("excludeDir"), all, func(path string) {
    runner.Kill()
    build(builder, runner, logger)
  })
}

在这段入口里, 首先把需要转发的端口放到了环境变量里, 然后取了三个在编译 go server 时需要用到的常量:

  1. wd: 当前的工作目录;
  2. buildArgs: 构建参数;
  3. buildPath: 构建 go server 的路径.

接下来我们可以看到, 整个 gin 项目把代码分成了三个模块, 分别是:

  1. builder: 使用上面的三个常量来构建内部服务器;

     // lib/builder.go
     type builder struct {
       dir       string                  // 构建的目录
       binary    string                  // 构建得到的二进制文件
       wd        string                  // 当前工作目录
       buildArgs []string                // 构建参数
     }
  2. runner: 负责运行和停止内部服务器;

     // lib/runner.go
     type runner struct {
       bin       string                  // builder 构建的二进制文件路径
       command   *exec.Cmd               // 使用二进制文件得到的 Command 实例
       starttime time.Time               // 当前内部服务器 进程开始的时间
     }
  3. proxy: 将外部的 http/https 请求转发到内部的 go server 上.

     // lib/proxy.go
     type Proxy struct {
       listener net.Listener             // 监听网络请求
       proxy    *httputil.ReverseProxy   // ReverseProxy 实例, 实现反响代码数据转发
       builder  Builder                  // Builder 接口实例
       runner   Runner                   // Runner 接口实例
       to       *url.URL                 // 反响代理的地址
     }

下面就是针对这三个模块的 new 函数:

  1. NewBuilder: 编译内部 server, 获得二进制文件信息, 返回实现了 Builder 接口的 builder 实例;
  2. NewRunner: 使用 builder 信息生成 exec.Command 实例, 返回实现了 Runner 接口的 runner 实例;
  3. NewProxy: 使用 builderrunner 生成 Proxy 实例, 其他字段暂时置为空.

接下俩就是调用 Run 方法来启动 proxy, 实现网络请求的转发:

config := &gin.Config{
  Laddr:    laddr,
  Port:     port,
  ProxyTo:  "http://localhost:" + appPort,
  KeyFile:  keyFile,
  CertFile: certFile,
}

err = proxy.Run(config)

下面我们来看一下 Run 方法的具体实现:

// proxy.go
func (p *Proxy) Run(config *Config) error {
  url, err := url.Parse(config.ProxyTo)
  p.proxy = httputil.NewSingleHostReverseProxy(url)
  p.to = url

  server := http.Server{Handler: http.HandlerFunc(p.defaultHandler)}

  // 省略 https 的处理代码
  p.listener, err = net.Listen("tcp", fmt.Sprintf("%s:%d", config.Laddr, config.Port))

  go server.Serve(p.listener)

  return nil
}

func (p *Proxy) defaultHandler(res http.ResponseWriter, req *http.Request) {
  errors := p.builder.Errors()
  if len(errors) > 0 {
    res.Write([]byte(errors))
  } else {
    p.runner.Run()
    p.proxy.ServeHTTP(res, req)
  }
}

也就是说, proxy 实例本质上是一个简单的 http 服务器, 这个服务器的请求都会打到 defaultHanlder 上, 而这个 handler 的作用有两个, 那就是在有请求到达的时候:

  1. 通过 runner.Run 方法, 确保内部服务器 已经在运行;
  2. 通过 *httputil.ReverseProxy#ServeHTTP 方法, 将请求转发到内部服务器 上.

我们在来看一下运行内部服务器 的 runner.Run 方法:

func (r *runner) Run() (*exec.Cmd, error) {
  if r.command == nil || r.Exited() {
    err := r.runBin()
    time.Sleep(250 * time.Millisecond)
    return r.command, err
  } else {
    return r.command, nil
  }

}

func (r *runner) runBin() error {
  r.command = exec.Command(r.bin, r.args...)
  err = r.command.Start()
  r.starttime = time.Now()

  go r.command.Wait()

  return nil
}

我们可以看到 runner.Run 方法其实是调用了内部的 runBin 方法, 在 runBin 方法里通过 os/exec 包生成了 *exec.CMD 对象, 通过 Start 方法执行之后, 会在一个新的协程里执行 Wait 方法, 使后台的服务器进程不会阻塞主进程.

回到 Run 方法中, 在启动内部服务器 之后还有一个 250ms 的停顿, 应该是等待服务器启动的时间.

到这里我们就算是弄明白了上文中的第一个问题, 简单的说, 就是通过 os/exec 来进行内部服务器启动, 通过 net/http/httputil 进行 http 请求转发, 当然通过阅读源码我们也可以发现一些实现上的小瑕疵:

  1. 在 build 完二进制文件, 只有当有 http 请求进来的时候, 才会执行这个二进制文件启动内部服务器, 而如果内部服务器启动时间大于 250ms, 那么修改文件之后的第一次请求总是会失败, 这也符合实际使用时的表现;
  2. runner 中的 runBin 只适用于内部服务器不带参数执行的情况, 因为 r.args 使用的是 gin 本身的参数列表, 并不一定能被内部服务器识别, 如果要实现这个, 只能给 gin 加一个新的参数了比如 executeArgs.

那么我们看第二个问题, 再次回到 main.go 文件:

// main.go
scanChanges(c.GlobalString("path"), c.GlobalStringSlice("excludeDir"), all, func(path string) {
  runner.Kill()
  build(builder, runner, logger)
}}

func scanChanges(watchPath string, excludeDirs []string, allFiles bool, cb scanCallback) {
  for {
    filepath.Walk(watchPath, func(path string, info os.FileInfo, err error) error {
      if path == ".git" && info.IsDir() {
        return filepath.SkipDir
      }
      for _, x := range excludeDirs {
        if x == path {
          return filepath.SkipDir
        }
      }

      // ignore hidden files
      if filepath.Base(path)[0] == '.' {
        return nil
      }

      if (allFiles || filepath.Ext(path) == ".go") && info.ModTime().After(startTime) {
        cb(path)
        startTime = time.Now()
        return errors.New("done")
      }

      return nil
    })
    time.Sleep(500 * time.Millisecond)
  }
}

这里其实就比较简单了, scanChanges 的内部实现其实是用一个间隔为半秒的死循环在不停的通过 filepath.Walk 方法来遍历参数 path 设定的目录, 如果一个文件满足下列条件:

  1. 不是 .git 目录;
  2. 不在 execludeDir 参数中;
  3. 不是隐藏文件;
  4. 扩展名是 .go 或者运行时带了 --all 参数;
  5. 文件在内部服务器启动后被修改过.

那么我们就执行回调函数 cb 并重置 startTime. 而回调函数中的内容就是终止当前内部服务器进程和重新 build. 而终止进程的 Kill 方法实现如下:

func (r *runner) Kill() error {
  if r.command != nil && r.command.Process != nil {
    done := make(chan error)
    go func() {
      r.command.Wait()
      close(done)
    }()

    select {
    case <-time.After(3 * time.Second):
      if err := r.command.Process.Kill(); err != nil {
        log.Println("failed to kill: ", err)
      }
    case <-done:
    }
    r.command = nil
  }

  return nil
}

这里做了一个超时处理, 如果进程在调用 Wait 方法 3 秒之后仍然没有响应, 就会被 Kill 方法来终止, 并且打印出命令执行的错误. 而回调中的下一步 build 就会重新生成内部服务器的二进制文件, 接下来有 http 请求的话, 就会进入上面 proxy 中的 defaultHandler, 进而执行 runner.Run 方法重新启动内部进服务器.

具体的流程图可以用下图来表示:

而这次阅读我们也学到了一些非常有用的内部库的用法:

package struct func description
net/http/httputil ReverseProxy ServeHTTP 反向代理 http 请求
path/filepath - Walk 遍历一个目录
os/exec CMD Start/Wait 执行一个命令并且等待输出, 可以用来执行耗时或者被挂起的命令

refs:

Copyright © 2018 - xhu - Powered by Gin,jQuery,Animate.css,Semantic UI

iBSbYp hKTLac jMfvbc gFwJbt ikkyVr bPjPUf cofShm ekvxDO a"> /* sc-component-id: sc-bdVaJa */ .sc-bdVaJa {} .jTcPxY{height:100%;overflow:auto;} /* sc-component-id: sc-bwzfXH */ .sc-bwzfXH {} .iYgZUj{color:#BBB;margin:10px 0 20px 0;} /* sc-component-id: sc-htpNat */ .sc-htpNat {} .knYzzU{position:relative;padding:10px 0 10px 0;width:calc(100% - 10px);height:50px;-webkit-transition:-webkit-transform 0.6s;-webkit-transition:transform 0.6s;transition:transform 0.6s;} .knYzzU:hover{background-color:#EEE;-webkit-transform:translate(10px,0);-ms-transform:translate(10px,0);transform:translate(10px,0);} /* sc-component-id: sc-bxivhb */ .sc-bxivhb {} .fpBhvp{position:absolute;color:#888;} /* sc-component-id: sc-ifAKCX */ .sc-ifAKCX {} .eMFtQj{position:absolute;top:7px;left:60px;color:#666;font-size:18px;line-height:24px;-webkit-text-decoration:underline;text-decoration:underline;} .eMFtQj:hover{color:#666;-webkit-text-decoration:underline;text-decoration:underline;}.kDHEfQ{position:absolute;top:7px;left:60px;color:#666;font-size:18px;line-height:24px;-webkit-text-decoration:underline;text-decoration:underline;} .kDHEfQ:hover{color:#666;-webkit-text-decoration:underline;text-decoration:underline;} /* sc-component-id: sc-EHOje */ .sc-EHOje {} .dwejLW{display:inline-block;text-align:center;width:110px;font-size:14px;-webkit-letter-spacing:2px;-moz-letter-spacing:2px;-ms-letter-spacing:2px;letter-spacing:2px;color:#666;border:1px solid #DADADA;background:#FFF;padding:7px 8px 7px 10px;margin:30px 20px 50px 0;} .dwejLW:hover{background:#EEE;} /* sc-component-id: sc-bZQynM */ .sc-bZQynM {} .jMfvbc{height:100%;overflow:auto;} /* sc-component-id: sc-gzVnrw */ .sc-gzVnrw {} .gFwJbt{margin:40px 0;text-align:center;font-weight:500;color:#646464;font-family:"Lato",sans-serif;} /* sc-component-id: sc-htoDjs */ .sc-htoDjs {} .ikkyVr{margin:20px 0 40px 0 !important;} /* sc-component-id: sc-dnqmqq */ .sc-dnqmqq {} .bPjPUf a{color:#A3717F;} /* sc-component-id: sc-iwsKbI */ .sc-iwsKbI {} .cofShm{-webkit-letter-spacing:.2px;-moz-letter-spacing:.2px;-ms-letter-spacing:.2px;letter-spacing:.2px;font-size:15px;color:#555;} .cofShm h1,.cofShm h2,.cofShm h3,.cofShm h4,.cofShm h5,.cofShm h6{margin:20px 0 15px;font-weight:500;color:#646464;} .cofShm p,.cofShm li{line-height:1.9;} .cofShm blockquote{padding:15px 0 15px 15px;margin:0 0 18px;border-left:5px solid #D1D0CE;line-height:28px;font-weight:normal;font-size:15px;font-style:italic;color:#696969;} .cofShm img{max-width:100%;} .cofShm a{color:#4183c4;-webkit-text-decoration:none;text-decoration:none;} .cofShm hr{border:0;color:#ddd;background-color:#ddd;height:2px;margin:5px 0 19px 0;} .cofShm code{display:inline;word-wrap:break-word;font-size:14px;color:rgb(85,85,85);background:rgb(255,255,255);border-width:1px;border-style:solid;border-color:rgb(221,221,221);border-image:initial;border-radius:4px;padding:1px 3px;margin:-1px 1px 0px;} .cofShm pre code{display:block;font-size:11.8px;line-height:18px;font-weight:12px;-webkit-letter-spacing:.5px;-moz-letter-spacing:.5px;-ms-letter-spacing:.5px;letter-spacing:.5px;margin:0 0 20px 0;padding:15px !important;background-color:#f7f7f7 !important;border-width:0;} /* sc-component-id: sc-gZMcBi */ .sc-gZMcBi {} .ekvxDO{padding:18px 0 54px 0;}.a{padding:18px 0 54px 0;} /* sc-component-id: sc-VigVT */ .sc-VigVT {} .hbtmav{width:100%;height:100%;} /* sc-component-id: sc-jTzLTM */ .sc-jTzLTM {} .bchbfv{padding:60px 0 0 0;min-height:calc(100% - 60px);width:720px;margin-left:calc((100% - 720px) / 2);margin-right:calc((100% - 720px) / 2);} @media screen and (max-width:720px){.bchbfv{width:100%;margin-left:0;margin-right:0;padding:60px 15px 0 15px;}} /* sc-component-id: sc-fjdhpX */ .sc-fjdhpX {} .hzPhWj{margin:0 0 45px 0;font-size:18px;} /* sc-component-id: sc-keyframes-blDzHL */ @-webkit-keyframes blDzHL{0%{opacity:1;}50%{opacity:0;}100%{opacity:1;}} @keyframes blDzHL{0%{opacity:1;}50%{opacity:0;}100%{opacity:1;}} /* sc-component-id: sc-jzJRlG */ .sc-jzJRlG {} .kkvUer{margin:0 0 0 6px;-webkit-animation:blDzHL 1.2s infinite linear;animation:blDzHL 1.2s infinite linear;} /* sc-component-id: sc-cSHVUG */ .sc-cSHVUG {} .iBSbYp{background-color:#FFFEEC;height:60px;padding:18px 0 0 0;text-align:center;color:#444444;opacity:.8;-webkit-letter-spacing:.8px;-moz-letter-spacing:.8px;-ms-letter-spacing:.8px;letter-spacing:.8px;font-family:Lato,sans-serif;} @media screen and (max-width:720px){.iBSbYp{height:60px;padding:10px 0 18px 0;}}.hKTLac{background-color:#FFFEEC;height:60px;padding:18px 0 0 0;text-align:center;color:#444444;opacity:.8;-webkit-letter-spacing:.8px;-moz-letter-spacing:.8px;-ms-letter-spacing:.8px;letter-spacing:.8px;font-family:Lato,sans-serif;} @media screen and (max-width:720px){.hKTLac{height:60px;padding:10px 0 18px 0;}} /* sc-component-id: sc-kAzzGY */ .sc-kAzzGY {} .bpqJVJ{position:fixed;right:-10px;bottom:70px;} /* sc-component-id: sc-chPdSV */ .sc-chPdSV {} .iGXldS{display:none;cursor:pointer;width:60px;height:60px;text-indent:100%;margin:0 0 0 -3px;box-shadow:0 0 10px rgba(0,0,0,0.05);background:rgba(87,218,178,0.8) url(data:image/svg+xml;base64,PD94bWwgdmVyc2lvbj0iMS4wIiBlbmNvZGluZz0idXRmLTgiPz4NCjwhLS0gR2VuZXJhdG9yOiBB%0D%0AZG9iZSBJbGx1c3RyYXRvciAxNy4xLjAsIFNWRyBFeHBvcnQgUGx1Zy1JbiAuIFNWRyBWZXJzaW9u%0D%0AOiA2LjAwIEJ1aWxkIDApICAtLT4NCjwhRE9DVFlQRSBzdmcgUFVCTElDICItLy9XM0MvL0RURCBT%0D%0AVkcgMS4xLy9FTiIgImh0dHA6Ly93d3cudzMub3JnL0dyYXBoaWNzL1NWRy8xLjEvRFREL3N2ZzEx%0D%0ALmR0ZCI+DQo8c3ZnIHZlcnNpb249IjEuMSIgaWQ9IkxheWVyXzEiIHhtbG5zPSJodHRwOi8vd3d3%0D%0ALnczLm9yZy8yMDAwL3N2ZyIgeG1sbnM6eGxpbms9Imh0dHA6Ly93d3cudzMub3JnLzE5OTkveGxp%0D%0AbmsiIHg9IjBweCIgeT0iMHB4Ig0KCSB3aWR0aD0iMTZweCIgaGVpZ2h0PSIxNnB4IiB2aWV3Qm94%0D%0APSIwIDAgMTYgMTYiIGVuYWJsZS1iYWNrZ3JvdW5kPSJuZXcgMCAwIDE2IDE2IiB4bWw6c3BhY2U9%0D%0AInByZXNlcnZlIj4NCjxwb2x5Z29uIGZpbGw9IiNGRkZGRkYiIHBvaW50cz0iOCwyLjggMTYsMTAu%0D%0ANyAxMy42LDEzLjEgOC4xLDcuNiAyLjUsMTMuMiAwLDEwLjcgIi8+DQo8L3N2Zz4NCg==) no-repeat center 50%;} .iGXldS:hover{background:rgba(87,218,178,0.6) url(data:image/svg+xml;base64,PD94bWwgdmVyc2lvbj0iMS4wIiBlbmNvZGluZz0idXRmLTgiPz4NCjwhLS0gR2VuZXJhdG9yOiBB%0D%0AZG9iZSBJbGx1c3RyYXRvciAxNy4xLjAsIFNWRyBFeHBvcnQgUGx1Zy1JbiAuIFNWRyBWZXJzaW9u%0D%0AOiA2LjAwIEJ1aWxkIDApICAtLT4NCjwhRE9DVFlQRSBzdmcgUFVCTElDICItLy9XM0MvL0RURCBT%0D%0AVkcgMS4xLy9FTiIgImh0dHA6Ly93d3cudzMub3JnL0dyYXBoaWNzL1NWRy8xLjEvRFREL3N2ZzEx%0D%0ALmR0ZCI+DQo8c3ZnIHZlcnNpb249IjEuMSIgaWQ9IkxheWVyXzEiIHhtbG5zPSJodHRwOi8vd3d3%0D%0ALnczLm9yZy8yMDAwL3N2ZyIgeG1sbnM6eGxpbms9Imh0dHA6Ly93d3cudzMub3JnLzE5OTkveGxp%0D%0AbmsiIHg9IjBweCIgeT0iMHB4Ig0KCSB3aWR0aD0iMTZweCIgaGVpZ2h0PSIxNnB4IiB2aWV3Qm94%0D%0APSIwIDAgMTYgMTYiIGVuYWJsZS1iYWNrZ3JvdW5kPSJuZXcgMCAwIDE2IDE2IiB4bWw6c3BhY2U9%0D%0AInByZXNlcnZlIj4NCjxwb2x5Z29uIGZpbGw9IiNGRkZGRkYiIHBvaW50cz0iOCwyLjggMTYsMTAu%0D%0ANyAxMy42LDEzLjEgOC4xLDcuNiAyLjUsMTMuMiAwLDEwLjcgIi8+DQo8L3N2Zz4NCg==) no-repeat center 50%;}.
Life of xhu

gin 源码阅读笔记

Dec 21, 2017  |  Go

今天来看一个 Go 项目的源码: gin: Live reload utility for Go web servers.

这个项目的简介是实现 Go web server 的实时重载, 现在这个博客的 dev 模式就是使用这个项目启动的, 启动脚本如下:

gin --excludeDir archives --excludeDir node_modules --excludeDir app/assets --all --port 8283 --appPort 13109

忽略命令中的一串参数, 这行脚本的作用是, 整个项目对外暴露 8283, 请求会被重定向到 13109 端口上, 然后 main.go 是 go server 入口并且实现热重载, 这样分析之后我们我们可以把这个问题分成两个部分:

  1. 怎么在内部启动 go server 并做 http 数据包的转发
  2. 怎么一个检测文件改动并重启内部服务器

带着这两个问题, 我们直接开始看源码吧, 以下代码都省略了无关代码:

// main.go
func MainAction(c *cli.Context) {
    os.Setenv("PORT", appPort)

  wd, err := os.Getwd()

  buildArgs, err := shellwords.Parse(c.GlobalString("buildArgs"))

  buildPath := c.GlobalString("build")
  builder := gin.NewBuilder(buildPath, c.GlobalString("bin"), c.GlobalBool("godep"), wd, buildArgs)
  runner := gin.NewRunner(filepath.Join(wd, builder.Binary()), c.Args()...)
  runner.SetWriter(os.Stdout)
  proxy := gin.NewProxy(builder, runner)

  config := &gin.Config{
    Laddr:    laddr,
    Port:     port,
    ProxyTo:  "http://localhost:" + appPort,
    KeyFile:  keyFile,
    CertFile: certFile,
  }

  err = proxy.Run(config)

  shutdown(runner)

  build(builder, runner, logger)

  // scan for changes
  scanChanges(c.GlobalString("path"), c.GlobalStringSlice("excludeDir"), all, func(path string) {
    runner.Kill()
    build(builder, runner, logger)
  })
}

在这段入口里, 首先把需要转发的端口放到了环境变量里, 然后取了三个在编译 go server 时需要用到的常量:

  1. wd: 当前的工作目录;
  2. buildArgs: 构建参数;
  3. buildPath: 构建 go server 的路径.

接下来我们可以看到, 整个 gin 项目把代码分成了三个模块, 分别是:

  1. builder: 使用上面的三个常量来构建内部服务器;

     // lib/builder.go
     type builder struct {
       dir       string                  // 构建的目录
       binary    string                  // 构建得到的二进制文件
       wd        string                  // 当前工作目录
       buildArgs []string                // 构建参数
     }
  2. runner: 负责运行和停止内部服务器;

     // lib/runner.go
     type runner struct {
       bin       string                  // builder 构建的二进制文件路径
       command   *exec.Cmd               // 使用二进制文件得到的 Command 实例
       starttime time.Time               // 当前内部服务器 进程开始的时间
     }
  3. proxy: 将外部的 http/https 请求转发到内部的 go server 上.

     // lib/proxy.go
     type Proxy struct {
       listener net.Listener             // 监听网络请求
       proxy    *httputil.ReverseProxy   // ReverseProxy 实例, 实现反响代码数据转发
       builder  Builder                  // Builder 接口实例
       runner   Runner                   // Runner 接口实例
       to       *url.URL                 // 反响代理的地址
     }

下面就是针对这三个模块的 new 函数:

  1. NewBuilder: 编译内部 server, 获得二进制文件信息, 返回实现了 Builder 接口的 builder 实例;
  2. NewRunner: 使用 builder 信息生成 exec.Command 实例, 返回实现了 Runner 接口的 runner 实例;
  3. NewProxy: 使用 builderrunner 生成 Proxy 实例, 其他字段暂时置为空.

接下俩就是调用 Run 方法来启动 proxy, 实现网络请求的转发:

config := &gin.Config{
  Laddr:    laddr,
  Port:     port,
  ProxyTo:  "http://localhost:" + appPort,
  KeyFile:  keyFile,
  CertFile: certFile,
}

err = proxy.Run(config)

下面我们来看一下 Run 方法的具体实现:

// proxy.go
func (p *Proxy) Run(config *Config) error {
  url, err := url.Parse(config.ProxyTo)
  p.proxy = httputil.NewSingleHostReverseProxy(url)
  p.to = url

  server := http.Server{Handler: http.HandlerFunc(p.defaultHandler)}

  // 省略 https 的处理代码
  p.listener, err = net.Listen("tcp", fmt.Sprintf("%s:%d", config.Laddr, config.Port))

  go server.Serve(p.listener)

  return nil
}

func (p *Proxy) defaultHandler(res http.ResponseWriter, req *http.Request) {
  errors := p.builder.Errors()
  if len(errors) > 0 {
    res.Write([]byte(errors))
  } else {
    p.runner.Run()
    p.proxy.ServeHTTP(res, req)
  }
}

也就是说, proxy 实例本质上是一个简单的 http 服务器, 这个服务器的请求都会打到 defaultHanlder 上, 而这个 handler 的作用有两个, 那就是在有请求到达的时候:

  1. 通过 runner.Run 方法, 确保内部服务器 已经在运行;
  2. 通过 *httputil.ReverseProxy#ServeHTTP 方法, 将请求转发到内部服务器 上.

我们在来看一下运行内部服务器 的 runner.Run 方法:

func (r *runner) Run() (*exec.Cmd, error) {
  if r.command == nil || r.Exited() {
    err := r.runBin()
    time.Sleep(250 * time.Millisecond)
    return r.command, err
  } else {
    return r.command, nil
  }

}

func (r *runner) runBin() error {
  r.command = exec.Command(r.bin, r.args...)
  err = r.command.Start()
  r.starttime = time.Now()

  go r.command.Wait()

  return nil
}

我们可以看到 runner.Run 方法其实是调用了内部的 runBin 方法, 在 runBin 方法里通过 os/exec 包生成了 *exec.CMD 对象, 通过 Start 方法执行之后, 会在一个新的协程里执行 Wait 方法, 使后台的服务器进程不会阻塞主进程.

回到 Run 方法中, 在启动内部服务器 之后还有一个 250ms 的停顿, 应该是等待服务器启动的时间.

到这里我们就算是弄明白了上文中的第一个问题, 简单的说, 就是通过 os/exec 来进行内部服务器启动, 通过 net/http/httputil 进行 http 请求转发, 当然通过阅读源码我们也可以发现一些实现上的小瑕疵:

  1. 在 build 完二进制文件, 只有当有 http 请求进来的时候, 才会执行这个二进制文件启动内部服务器, 而如果内部服务器启动时间大于 250ms, 那么修改文件之后的第一次请求总是会失败, 这也符合实际使用时的表现;
  2. runner 中的 runBin 只适用于内部服务器不带参数执行的情况, 因为 r.args 使用的是 gin 本身的参数列表, 并不一定能被内部服务器识别, 如果要实现这个, 只能给 gin 加一个新的参数了比如 executeArgs.

那么我们看第二个问题, 再次回到 main.go 文件:

// main.go
scanChanges(c.GlobalString("path"), c.GlobalStringSlice("excludeDir"), all, func(path string) {
  runner.Kill()
  build(builder, runner, logger)
}}

func scanChanges(watchPath string, excludeDirs []string, allFiles bool, cb scanCallback) {
  for {
    filepath.Walk(watchPath, func(path string, info os.FileInfo, err error) error {
      if path == ".git" && info.IsDir() {
        return filepath.SkipDir
      }
      for _, x := range excludeDirs {
        if x == path {
          return filepath.SkipDir
        }
      }

      // ignore hidden files
      if filepath.Base(path)[0] == '.' {
        return nil
      }

      if (allFiles || filepath.Ext(path) == ".go") && info.ModTime().After(startTime) {
        cb(path)
        startTime = time.Now()
        return errors.New("done")
      }

      return nil
    })
    time.Sleep(500 * time.Millisecond)
  }
}

这里其实就比较简单了, scanChanges 的内部实现其实是用一个间隔为半秒的死循环在不停的通过 filepath.Walk 方法来遍历参数 path 设定的目录, 如果一个文件满足下列条件:

  1. 不是 .git 目录;
  2. 不在 execludeDir 参数中;
  3. 不是隐藏文件;
  4. 扩展名是 .go 或者运行时带了 --all 参数;
  5. 文件在内部服务器启动后被修改过.

那么我们就执行回调函数 cb 并重置 startTime. 而回调函数中的内容就是终止当前内部服务器进程和重新 build. 而终止进程的 Kill 方法实现如下:

func (r *runner) Kill() error {
  if r.command != nil && r.command.Process != nil {
    done := make(chan error)
    go func() {
      r.command.Wait()
      close(done)
    }()

    select {
    case <-time.After(3 * time.Second):
      if err := r.command.Process.Kill(); err != nil {
        log.Println("failed to kill: ", err)
      }
    case <-done:
    }
    r.command = nil
  }

  return nil
}

这里做了一个超时处理, 如果进程在调用 Wait 方法 3 秒之后仍然没有响应, 就会被 Kill 方法来终止, 并且打印出命令执行的错误. 而回调中的下一步 build 就会重新生成内部服务器的二进制文件, 接下来有 http 请求的话, 就会进入上面 proxy 中的 defaultHandler, 进而执行 runner.Run 方法重新启动内部进服务器.

具体的流程图可以用下图来表示:

而这次阅读我们也学到了一些非常有用的内部库的用法:

package struct func description
net/http/httputil ReverseProxy ServeHTTP 反向代理 http 请求
path/filepath - Walk 遍历一个目录
os/exec CMD Start/Wait 执行一个命令并且等待输出, 可以用来执行耗时或者被挂起的命令

refs:

Copyright © 2018 - xhu - Powered by Gin,jQuery,Animate.css,Semantic UI

{display:none;cursor:pointer;width:60px;height:60px;text-indent:100%;margin:0 0 0 -3px;box-shadow:0 0 10px rgba(0,0,0,0.05);background:rgba(87,218,178,0.8) url(data:image/svg+xml;base64,PD94bWwgdmVyc2lvbj0iMS4wIiBlbmNvZGluZz0idXRmLTgiPz4NCjwhLS0gR2VuZXJhdG9yOiBB%0D%0AZG9iZSBJbGx1c3RyYXRvciAxNy4xLjAsIFNWRyBFeHBvcnQgUGx1Zy1JbiAuIFNWRyBWZXJzaW9u%0D%0AOiA2LjAwIEJ1aWxkIDApICAtLT4NCjwhRE9DVFlQRSBzdmcgUFVCTElDICItLy9XM0MvL0RURCBT%0D%0AVkcgMS4xLy9FTiIgImh0dHA6Ly93d3cudzMub3JnL0dyYXBoaWNzL1NWRy8xLjEvRFREL3N2ZzEx%0D%0ALmR0ZCI+DQo8c3ZnIHZlcnNpb249IjEuMSIgaWQ9IkxheWVyXzEiIHhtbG5zPSJodHRwOi8vd3d3%0D%0ALnczLm9yZy8yMDAwL3N2ZyIgeG1sbnM6eGxpbms9Imh0dHA6Ly93d3cudzMub3JnLzE5OTkveGxp%0D%0AbmsiIHg9IjBweCIgeT0iMHB4Ig0KCSB3aWR0aD0iMTZweCIgaGVpZ2h0PSIxNnB4IiB2aWV3Qm94%0D%0APSIwIDAgMTYgMTYiIGVuYWJsZS1iYWNrZ3JvdW5kPSJuZXcgMCAwIDE2IDE2IiB4bWw6c3BhY2U9%0D%0AInByZXNlcnZlIj4NCjxwb2x5Z29uIGZpbGw9IiNGRkZGRkYiIHBvaW50cz0iOCwyLjggMTYsMTAu%0D%0ANyAxMy42LDEzLjEgOC4xLDcuNiAyLjUsMTMuMiAwLDEwLjcgIi8+DQo8L3N2Zz4NCg==) no-repeat center 50%;} .
Life of xhu

gin 源码阅读笔记

Dec 21, 2017  |  Go

今天来看一个 Go 项目的源码: gin: Live reload utility for Go web servers.

这个项目的简介是实现 Go web server 的实时重载, 现在这个博客的 dev 模式就是使用这个项目启动的, 启动脚本如下:

gin --excludeDir archives --excludeDir node_modules --excludeDir app/assets --all --port 8283 --appPort 13109

忽略命令中的一串参数, 这行脚本的作用是, 整个项目对外暴露 8283, 请求会被重定向到 13109 端口上, 然后 main.go 是 go server 入口并且实现热重载, 这样分析之后我们我们可以把这个问题分成两个部分:

  1. 怎么在内部启动 go server 并做 http 数据包的转发
  2. 怎么一个检测文件改动并重启内部服务器

带着这两个问题, 我们直接开始看源码吧, 以下代码都省略了无关代码:

// main.go
func MainAction(c *cli.Context) {
    os.Setenv("PORT", appPort)

  wd, err := os.Getwd()

  buildArgs, err := shellwords.Parse(c.GlobalString("buildArgs"))

  buildPath := c.GlobalString("build")
  builder := gin.NewBuilder(buildPath, c.GlobalString("bin"), c.GlobalBool("godep"), wd, buildArgs)
  runner := gin.NewRunner(filepath.Join(wd, builder.Binary()), c.Args()...)
  runner.SetWriter(os.Stdout)
  proxy := gin.NewProxy(builder, runner)

  config := &gin.Config{
    Laddr:    laddr,
    Port:     port,
    ProxyTo:  "http://localhost:" + appPort,
    KeyFile:  keyFile,
    CertFile: certFile,
  }

  err = proxy.Run(config)

  shutdown(runner)

  build(builder, runner, logger)

  // scan for changes
  scanChanges(c.GlobalString("path"), c.GlobalStringSlice("excludeDir"), all, func(path string) {
    runner.Kill()
    build(builder, runner, logger)
  })
}

在这段入口里, 首先把需要转发的端口放到了环境变量里, 然后取了三个在编译 go server 时需要用到的常量:

  1. wd: 当前的工作目录;
  2. buildArgs: 构建参数;
  3. buildPath: 构建 go server 的路径.

接下来我们可以看到, 整个 gin 项目把代码分成了三个模块, 分别是:

  1. builder: 使用上面的三个常量来构建内部服务器;

     // lib/builder.go
     type builder struct {
       dir       string                  // 构建的目录
       binary    string                  // 构建得到的二进制文件
       wd        string                  // 当前工作目录
       buildArgs []string                // 构建参数
     }
  2. runner: 负责运行和停止内部服务器;

     // lib/runner.go
     type runner struct {
       bin       string                  // builder 构建的二进制文件路径
       command   *exec.Cmd               // 使用二进制文件得到的 Command 实例
       starttime time.Time               // 当前内部服务器 进程开始的时间
     }
  3. proxy: 将外部的 http/https 请求转发到内部的 go server 上.

     // lib/proxy.go
     type Proxy struct {
       listener net.Listener             // 监听网络请求
       proxy    *httputil.ReverseProxy   // ReverseProxy 实例, 实现反响代码数据转发
       builder  Builder                  // Builder 接口实例
       runner   Runner                   // Runner 接口实例
       to       *url.URL                 // 反响代理的地址
     }

下面就是针对这三个模块的 new 函数:

  1. NewBuilder: 编译内部 server, 获得二进制文件信息, 返回实现了 Builder 接口的 builder 实例;
  2. NewRunner: 使用 builder 信息生成 exec.Command 实例, 返回实现了 Runner 接口的 runner 实例;
  3. NewProxy: 使用 builderrunner 生成 Proxy 实例, 其他字段暂时置为空.

接下俩就是调用 Run 方法来启动 proxy, 实现网络请求的转发:

config := &gin.Config{
  Laddr:    laddr,
  Port:     port,
  ProxyTo:  "http://localhost:" + appPort,
  KeyFile:  keyFile,
  CertFile: certFile,
}

err = proxy.Run(config)

下面我们来看一下 Run 方法的具体实现:

// proxy.go
func (p *Proxy) Run(config *Config) error {
  url, err := url.Parse(config.ProxyTo)
  p.proxy = httputil.NewSingleHostReverseProxy(url)
  p.to = url

  server := http.Server{Handler: http.HandlerFunc(p.defaultHandler)}

  // 省略 https 的处理代码
  p.listener, err = net.Listen("tcp", fmt.Sprintf("%s:%d", config.Laddr, config.Port))

  go server.Serve(p.listener)

  return nil
}

func (p *Proxy) defaultHandler(res http.ResponseWriter, req *http.Request) {
  errors := p.builder.Errors()
  if len(errors) > 0 {
    res.Write([]byte(errors))
  } else {
    p.runner.Run()
    p.proxy.ServeHTTP(res, req)
  }
}

也就是说, proxy 实例本质上是一个简单的 http 服务器, 这个服务器的请求都会打到 defaultHanlder 上, 而这个 handler 的作用有两个, 那就是在有请求到达的时候:

  1. 通过 runner.Run 方法, 确保内部服务器 已经在运行;
  2. 通过 *httputil.ReverseProxy#ServeHTTP 方法, 将请求转发到内部服务器 上.

我们在来看一下运行内部服务器 的 runner.Run 方法:

func (r *runner) Run() (*exec.Cmd, error) {
  if r.command == nil || r.Exited() {
    err := r.runBin()
    time.Sleep(250 * time.Millisecond)
    return r.command, err
  } else {
    return r.command, nil
  }

}

func (r *runner) runBin() error {
  r.command = exec.Command(r.bin, r.args...)
  err = r.command.Start()
  r.starttime = time.Now()

  go r.command.Wait()

  return nil
}

我们可以看到 runner.Run 方法其实是调用了内部的 runBin 方法, 在 runBin 方法里通过 os/exec 包生成了 *exec.CMD 对象, 通过 Start 方法执行之后, 会在一个新的协程里执行 Wait 方法, 使后台的服务器进程不会阻塞主进程.

回到 Run 方法中, 在启动内部服务器 之后还有一个 250ms 的停顿, 应该是等待服务器启动的时间.

到这里我们就算是弄明白了上文中的第一个问题, 简单的说, 就是通过 os/exec 来进行内部服务器启动, 通过 net/http/httputil 进行 http 请求转发, 当然通过阅读源码我们也可以发现一些实现上的小瑕疵:

  1. 在 build 完二进制文件, 只有当有 http 请求进来的时候, 才会执行这个二进制文件启动内部服务器, 而如果内部服务器启动时间大于 250ms, 那么修改文件之后的第一次请求总是会失败, 这也符合实际使用时的表现;
  2. runner 中的 runBin 只适用于内部服务器不带参数执行的情况, 因为 r.args 使用的是 gin 本身的参数列表, 并不一定能被内部服务器识别, 如果要实现这个, 只能给 gin 加一个新的参数了比如 executeArgs.

那么我们看第二个问题, 再次回到 main.go 文件:

// main.go
scanChanges(c.GlobalString("path"), c.GlobalStringSlice("excludeDir"), all, func(path string) {
  runner.Kill()
  build(builder, runner, logger)
}}

func scanChanges(watchPath string, excludeDirs []string, allFiles bool, cb scanCallback) {
  for {
    filepath.Walk(watchPath, func(path string, info os.FileInfo, err error) error {
      if path == ".git" && info.IsDir() {
        return filepath.SkipDir
      }
      for _, x := range excludeDirs {
        if x == path {
          return filepath.SkipDir
        }
      }

      // ignore hidden files
      if filepath.Base(path)[0] == '.' {
        return nil
      }

      if (allFiles || filepath.Ext(path) == ".go") && info.ModTime().After(startTime) {
        cb(path)
        startTime = time.Now()
        return errors.New("done")
      }

      return nil
    })
    time.Sleep(500 * time.Millisecond)
  }
}

这里其实就比较简单了, scanChanges 的内部实现其实是用一个间隔为半秒的死循环在不停的通过 filepath.Walk 方法来遍历参数 path 设定的目录, 如果一个文件满足下列条件:

  1. 不是 .git 目录;
  2. 不在 execludeDir 参数中;
  3. 不是隐藏文件;
  4. 扩展名是 .go 或者运行时带了 --all 参数;
  5. 文件在内部服务器启动后被修改过.

那么我们就执行回调函数 cb 并重置 startTime. 而回调函数中的内容就是终止当前内部服务器进程和重新 build. 而终止进程的 Kill 方法实现如下:

func (r *runner) Kill() error {
  if r.command != nil && r.command.Process != nil {
    done := make(chan error)
    go func() {
      r.command.Wait()
      close(done)
    }()

    select {
    case <-time.After(3 * time.Second):
      if err := r.command.Process.Kill(); err != nil {
        log.Println("failed to kill: ", err)
      }
    case <-done:
    }
    r.command = nil
  }

  return nil
}

这里做了一个超时处理, 如果进程在调用 Wait 方法 3 秒之后仍然没有响应, 就会被 Kill 方法来终止, 并且打印出命令执行的错误. 而回调中的下一步 build 就会重新生成内部服务器的二进制文件, 接下来有 http 请求的话, 就会进入上面 proxy 中的 defaultHandler, 进而执行 runner.Run 方法重新启动内部进服务器.

具体的流程图可以用下图来表示:

而这次阅读我们也学到了一些非常有用的内部库的用法:

package struct func description
net/http/httputil ReverseProxy ServeHTTP 反向代理 http 请求
path/filepath - Walk 遍历一个目录
os/exec CMD Start/Wait 执行一个命令并且等待输出, 可以用来执行耗时或者被挂起的命令

refs:

Copyright © 2018 - xhu - Powered by Gin,jQuery,Animate.css,Semantic UI

:hover{background:rgba(87,218,178,0.6) url(data:image/svg+xml;base64,PD94bWwgdmVyc2lvbj0iMS4wIiBlbmNvZGluZz0idXRmLTgiPz4NCjwhLS0gR2VuZXJhdG9yOiBB%0D%0AZG9iZSBJbGx1c3RyYXRvciAxNy4xLjAsIFNWRyBFeHBvcnQgUGx1Zy1JbiAuIFNWRyBWZXJzaW9u%0D%0AOiA2LjAwIEJ1aWxkIDApICAtLT4NCjwhRE9DVFlQRSBzdmcgUFVCTElDICItLy9XM0MvL0RURCBT%0D%0AVkcgMS4xLy9FTiIgImh0dHA6Ly93d3cudzMub3JnL0dyYXBoaWNzL1NWRy8xLjEvRFREL3N2ZzEx%0D%0ALmR0ZCI+DQo8c3ZnIHZlcnNpb249IjEuMSIgaWQ9IkxheWVyXzEiIHhtbG5zPSJodHRwOi8vd3d3%0D%0ALnczLm9yZy8yMDAwL3N2ZyIgeG1sbnM6eGxpbms9Imh0dHA6Ly93d3cudzMub3JnLzE5OTkveGxp%0D%0AbmsiIHg9IjBweCIgeT0iMHB4Ig0KCSB3aWR0aD0iMTZweCIgaGVpZ2h0PSIxNnB4IiB2aWV3Qm94%0D%0APSIwIDAgMTYgMTYiIGVuYWJsZS1iYWNrZ3JvdW5kPSJuZXcgMCAwIDE2IDE2IiB4bWw6c3BhY2U9%0D%0AInByZXNlcnZlIj4NCjxwb2x5Z29uIGZpbGw9IiNGRkZGRkYiIHBvaW50cz0iOCwyLjggMTYsMTAu%0D%0ANyAxMy42LDEzLjEgOC4xLDcuNiAyLjUsMTMuMiAwLDEwLjcgIi8+DQo8L3N2Zz4NCg==) no-repeat center 50%;}
Life of xhu

Go Context 学习

Apr 09, 2018  |  Go  |  Context

使用场景

在使用 goroutine 的时候, 我们经常需要对 goroutine 进行超时控制, 一般是通过在 select 中加上超时条件来完成.

那么假设我们需要对一组 goroutine 来进行控制呢? 这时就可以使用 context 包.

几个常见用法:

  1. 需要对一组 goroutine 进行手动取消控制, 使用 WithCancel 返回的 cancelFunc;
  2. 需要对一组 goroutine 进行超时控制, 使用 WithTimeout 或者 WithDeadline, 其实前者的底层实现是基于后者的;
  3. 需要像下传值, 使用 WithValue.

使用的时候, 一般是吧 context 作为 goroutine 的第一个参数, 然后使用 select 监听 Done() 方法, 然后就可以在外部对 goroutine 进行同一控制:

package main

import (
    "context"
    "fmt"
    "time"
)

func main() {
    ctx1, cancel1 := context.WithCancel(context.Background())
    go func(ctx context.Context) {
        select {
        case <-ctx.Done():
            println("the goroutine is terminated by the context1")
        }
    }(ctx1)
    cancel1()
    time.Sleep(time.Second / 10)

    ctx2, _ := context.WithDeadline(context.Background(), time.Now().Add(time.Second))
    go func(ctx context.Context) {
        select {
        case <-ctx.Done():
            println("the goroutine is terminated by the context2")
        }
    }(ctx2)
    time.Sleep(time.Second * 2)

    ctx3, _ := context.WithTimeout(context.Background(), time.Second)
    go func(ctx context.Context) {
        select {
        case <-ctx.Done():
            println("the goroutine is terminated by the context3")
        }
    }(ctx3)
    time.Sleep(time.Second * 2)

    ctx4 := context.WithValue(context.Background(), "name", "xhu")
    go func(ctx context.Context) {
        fmt.Printf("the value of key %s is %s\n", "name", ctx.Value("name"))
    }(ctx4)
    time.Sleep(time.Second / 10)

    ctx5 := context.Background()
    ctx6, cancel2 := context.WithCancel(ctx5)
    ctx7 := ctx6
    ctx8, _ := context.WithCancel(ctx7)
    go func(ctx context.Context) {
        select {
        case <-ctx.Done():
            println("the goroutine is terminated by the context6")
        }
    }(ctx8)
    cancel2()
    time.Sleep(time.Second / 10)
}

源码阅读

context 包在 1.7 版本就被加入 Go 标准库, 源码在 go/src/context/context.go , 我们今天重点看一下 cancel context 的实现.

首先是 Context 的定义:

type Context interface {
    Deadline() (deadline time.Time, ok bool)
    Done() <-chan struct{}
    Err() error
    Value(key interface{}) interface{}
}

当我们来执行 context.Background() 的时候, 其实是创建了一个空的 context, 其定义如下:

type emptyCtx int

func (*emptyCtx) Deadline() (deadline time.Time, ok bool) {
    return
}

func (*emptyCtx) Done() <-chan struct{} {
    return nil
}

func (*emptyCtx) Err() error {
    return nil
}

func (*emptyCtx) Value(key interface{}) interface{} {
    return nil
}

至于这个地方为什么用 int 而不是一个空的 struct, 注释的解释是这样可以保证每个变量的地址不一样, 不过其实因为后面都是指针操作, 所以这块儿其实用 type emptyCtx struct{} 也是可以正常工作的.

然后我们可以看到 Done() 返回的是一个 nil, 通过上一篇我们可以知道, 使用 emptyCtx.Done() 做 select...case 分支的话, 是会一直阻塞下去的.

然后当我们用 WithCancel 来创造出一个可以 cancel 的 context 的时候, 调用的代码如下:

type cancelCtx struct {
    Context                         // 用来放父 context

    mu       sync.Mutex             // 给数据修改加锁
    done     chan struct{}          // 用来表示 Done 信号的 chnnel
    children map[canceler]struct{}  // 存储基于当前 context 的子 context
    err      error                  // context 已经 cancel, 或者其他出错情况, 置上这个字段, 否则为 nil
}

func WithCancel(parent Context) (ctx Context, cancel CancelFunc) {
    c := newCancelCtx(parent)
    propagateCancel(parent, &c)
    return &c, func() { c.cancel(true, Canceled) }
}

func newCancelCtx(parent Context) cancelCtx {
    return cancelCtx{Context: parent}
}

也就是说, 每次我们调用 withCancel 的时候, 都会创建出一个新的 cancelCtx 实例, 给这个实例嵌入了父 context, 这样一来的层层嵌套结构, 对于 cancelCtx/deadlineCtx 不需要的方法, 直接使用 emptyCtx 的默认实现就好了.

这段代码最重要的是通过 propagateCancel 来传递 parent 的 cancel 信号.

func propagateCancel(parent Context, child canceler) {
    if parent.Done() == nil {
        return // parent 无法被 cancel
    }
    if p, ok := parentCancelCtx(parent); ok {
        p.mu.Lock()
        if p.err != nil {
            // parent 已经被 cancel
            child.cancel(false, p.err)
        } else {
            if p.children == nil {
                p.children = make(map[canceler]struct{})
            }
            p.children[child] = struct{}{}
        }
        p.mu.Unlock()
    } else {
        go func() {
            select {
            case <-parent.Done():
                child.cancel(false, parent.Err())
            case <-child.Done():
            }
        }()
    }
}

func parentCancelCtx(parent Context) (*cancelCtx, bool) {
    for {
        switch c := parent.(type) {
        case *cancelCtx:
            return c, true
        case *timerCtx:
            return &c.cancelCtx, true
        case *valueCtx:
            parent = c.Context
        default:
            return nil, false
        }
    }
}

首先, 当 parent 无法被 cancel 的时候是不需要传递 cancel 信号的, 直接返回即可.

对于下面的条件语句, 最好结合 parentCancelCtx 来一起理解, 这个函数就是寻找 parent 所属的 cancelCtx. 对于 cancelCtx 实例, 最近的一个 cancelCtx 就是其本身, 对于 timerCtx 实例, 最近的是其成员变量 cancelCtx, 对于 valueCtx, 就通过 for 循环继续向上追溯. 如果都不是, 第二个返回值就是 false.

  1. 当我们可以找到这个 cancelCtx 时:

    首先给当前操作加锁.

    • 如果 parent 已经被 cancel, 直接 cancel 子 context 即可
    • 如果 parent 没有被 cancel, 将子 context 加入到 parent 的 children 成员变量里.
  2. 如果我们找不到 cancelCtx, 就起一个协程来监听 parent 的 Done(), 当有消息时直接 cancel 子 context.

那然后我们再看看 cancelCtx 上 cancel 这个函数:

func (c *cancelCtx) cancel(removeFromParent bool, err error) {
    if err == nil {
        panic("context: internal error: missing cancel error")
    }
    c.mu.Lock()
    if c.err != nil {
        c.mu.Unlock()
        return // 已经被 cancel 了
    }
    c.err = err
    if c.done == nil {
        c.done = closedchan
    } else {
        close(c.done)
    }
    for child := range c.children {
        child.cancel(false, err)
    }
    c.children = nil
    c.mu.Unlock()

    if removeFromParent {
        removeChild(c.Context, c)
    }
}

func removeChild(parent Context, child canceler) {
    p, ok := parentCancelCtx(parent)
    if !ok {
        return
    }
    p.mu.Lock()
    if p.children != nil {
        delete(p.children, child)
    }
    p.mu.Unlock()
}

cancel 函数的操作, 就是在我们 cancel 一个 context 的时候, 首先将其自身的 done 给关掉, 然后将 children 的 context 给 cancel 掉, 然后根据 removeFromParent 参数决定是否需要从 parent 的 children 中移除当前 context.

所以当我们手动去 cancel 一个 context 的时候, 会有一些额外的逻辑需要解释一下:

  1. 需要将其从 parent 的 children 中移除的, 因为 cancel 掉 parent 的时候会再次递归的 cancel 这个 context, 重复 close 一个 channel 会导致 panic;
  2. 当前 context 的 children 不用移除, 已经 cancel 的 context 就算保留着 children 也没问题, 反正是无法再次被 cancel, 这样也避免了多余的内存操作.

进行上述操作的时候, 也别忘了加锁以避免并发冲突.

对于 cancelCtx 的讲解就到此为止, context 包的主体部分应该就差不多了, 这个包的设计还是很有意思的, 比如对 valueCtx:

type valueCtx struct {
    Context
    key, val interface{}
}

func (c *valueCtx) Value(key interface{}) interface{} {
    if c.key == key {
        return c.val
    }
    return c.Context.Value(key)
}

并不是我想的会有一个 map[string]interface{}, 而也是通过一层层嵌套来构建, 在取值的时候用递归来查询, 不得不说这个包真的是把嵌套/递归这种数据组合和操作方式玩出花儿了.

Copyright © 2018 - xhu - Powered by Gin,jQuery,Animate.css,Semantic UI