Tailscale pick nearest derp

2022-12-08T20:54:00

一、项目地址
github tailscale

二、触发探测调用执行流

定时器触发

{lamp/}

三、实现逻辑
tailscale动态选路逻辑主要分为动态选路策略和链路探测逻辑

{collapse}
{collapse-item label="1.动态选路策略" open}

{lamp/}
{/collapse-item}
{collapse-item label="2.链路探测逻辑" open}

{lamp/}
{/collapse-item}
{/collapse}

四、代码细节分析

{collapse}
{collapse-item label="1.动态选路策略" open}

根据探测结果筛选出最近五分钟内所有探测任务的记录,如果出现多个探测任务出现相同的区域,则记录该区域最小延时到bestRecent的map中,从最新的探测结果中找出延时最小的区域,并记录这个区域的id作为最优区域,如果当前更改首选的 DERP,但旧的仍然可用,而新的最优延时1.5倍大于旧的延时,那就坚持用旧的区域。

// addReportHistoryAndSetPreferredDERP adds r to the set of recent Reports
// and mutates r.PreferredDERP to contain the best recent one.
func (c *Client) addReportHistoryAndSetPreferredDERP(r *Report) {
    c.mu.Lock()
    defer c.mu.Unlock()

    var prevDERP int
    if c.last != nil {
        prevDERP = c.last.PreferredDERP
    }
    if c.prev == nil {
        c.prev = map[time.Time]*Report{}
    }
    now := c.timeNow()
    c.prev[now] = r
    c.last = r

    const maxAge = 5 * time.Minute

    // region ID => its best recent latency in last maxAge
    bestRecent := map[int]time.Duration{}

    for t, pr := range c.prev {
        if now.Sub(t) > maxAge {
            delete(c.prev, t)
            continue
        }
        for regionID, d := range pr.RegionLatency {
            if bd, ok := bestRecent[regionID]; !ok || d < bd {
                bestRecent[regionID] = d
            }
        }
    }

    // Then, pick which currently-alive DERP server from the
    // current report has the best latency over the past maxAge.
    var bestAny time.Duration
    var oldRegionCurLatency time.Duration
    for regionID, d := range r.RegionLatency {
        if regionID == prevDERP {
            oldRegionCurLatency = d
        }
        best := bestRecent[regionID]
        if r.PreferredDERP == 0 || best < bestAny {
            bestAny = best
            r.PreferredDERP = regionID
        }
    }

    // If we're changing our preferred DERP but the old one's still
    // accessible and the new one's not much better, just stick with
    // where we are.
    if prevDERP != 0 &&
        r.PreferredDERP != prevDERP &&
        oldRegionCurLatency != 0 &&
        bestAny > oldRegionCurLatency/3*2 {
        r.PreferredDERP = prevDERP
    }
}

选出最优的区域之后,如果发现udp被block, pickDERPFallback 随机返回一个非零但确定的 DERP 节点,这仅在 netcheck 找不到最近的一个时使用(例如,如果UDP被阻止,因此 STUN 延迟检查不起作用)

if ni.PreferredDERP == 0 {
        // Perhaps UDP is blocked. Pick a deterministic but arbitrary
        // one.
        ni.PreferredDERP = c.pickDERPFallback()
    }
    if !c.setNearestDERP(ni.PreferredDERP) {
        ni.PreferredDERP = 0
    }

判断当前选出来的最优区域和当前使用的是否相等,如果不等,将更新,更改时,遍历当前正在连接的所有 DERP 服务器,如果当前使用的derp服务器的区域不是当前优选的话,将会调用closeForReconnect 关闭底层网络连接并 将客户端字段清零,以便将来对 Connect 的调用将重新连接,区域id作为端口传给goDerpConnect(),去连接指定区域的derp服务器,按顺序选择一个可用的,可以减少同一个区域的情况下路由的消耗。

func (c *Conn) setNearestDERP(derpNum int) (wantDERP bool) {
    c.mu.Lock()
    defer c.mu.Unlock()
    if !c.wantDerpLocked() {
        c.myDerp = 0
        health.SetMagicSockDERPHome(0)
        return false
    }
    if derpNum == c.myDerp {
        // No change.
        return true
    }
    if c.myDerp != 0 && derpNum != 0 {
        metricDERPHomeChange.Add(1)
    }
    c.myDerp = derpNum
    health.SetMagicSockDERPHome(derpNum)

    if c.privateKey.IsZero() {
        // No private key yet, so DERP connections won't come up anyway.
        // Return early rather than ultimately log a couple lines of noise.
        return true
    }

    // On change, notify all currently connected DERP servers and
    // start connecting to our home DERP if we are not already.
    dr := c.derpMap.Regions[derpNum]
    if dr == nil {
        c.logf("[unexpected] magicsock: derpMap.Regions[%v] is nil", derpNum)
    } else {
        c.logf("magicsock: home is now derp-%v (%v)", derpNum, c.derpMap.Regions[derpNum].RegionCode)
    }
    for i, ad := range c.activeDerp {
        go ad.c.NotePreferred(i == c.myDerp)
    }
    c.goDerpConnect(derpNum)
    return true
}

{/collapse-item}
{collapse-item label="2.链路探测逻辑" open}

链路探测调用入口,绑定接收stun回包的函数,收到回包之后校验完之后,调用GetReport()时在启动探测任务runProbe()时设置的回调函数,来计算从设置回调函数到回复包收到之后调用回调函数的时间差作为该节点的延时

func (c *Conn) updateNetInfo(ctx context.Context) (*netcheck.Report, error) {
    c.mu.Lock()
    dm := c.derpMap
    c.mu.Unlock()

    if dm == nil || c.networkDown() {
        return new(netcheck.Report), nil
    }

    ctx, cancel := context.WithTimeout(ctx, 2*time.Second)
    defer cancel()

    c.stunReceiveFunc.Store(c.netChecker.ReceiveSTUNPacket)
    defer c.ignoreSTUNPackets()

    report, err := c.netChecker.GetReport(ctx, dm)
    if err != nil {
        return nil, err
    }

metricNumGetReportError 统计探测次数和metricNumGetReport探测失败的指标,对探测节点的列表进行判断,为空则不探测,为了防止探测并发执行,对curState进行校验,即使配置的下发是全量的,也会记录上一次探测选出来的最优derp区域。判断是否对所有节点都做探测时,根据上一次全量探测距离现在超过5分钟或者之前的探测被门户拦截了来决定是否要进行全量探测。

func (c *Client) GetReport(ctx context.Context, dm *tailcfg.DERPMap) (_ *Report, reterr error) {
    defer func() {
        if reterr != nil {
            metricNumGetReportError.Add(1)    //每次探测失败,探测的指标计数就会加一
        }
    }()
    metricNumGetReport.Add(1) //每次探测,都会增加一次探测的计数
    // Mask user context with ours that we guarantee to cancel so
    // we can depend on it being closed in goroutines later.
    // (User ctx might be context.Background, etc)
    ctx, cancel := context.WithTimeout(ctx, overallProbeTimeout) //保证每次探测不会超过5s
    defer cancel()

    if dm == nil { // 判断需要探测的节点列表是否为空
        return nil, errors.New("netcheck: GetReport: DERP map is nil")
    }

    c.mu.Lock()
    if c.curState != nil {
        c.mu.Unlock()
        return nil, errors.New("invalid concurrent call to GetReport")
    }
    rs := &reportState{
        c:           c,
        report:      newReport(),
        inFlight:    map[stun.TxID]func(netip.AddrPort){},
        hairTX:      stun.NewTxID(), // random payload
        gotHairSTUN: make(chan netip.AddrPort, 1),
        hairTimeout: make(chan struct{}),
        stopProbeCh: make(chan struct{}, 1),
    }
    c.curState = rs
    last := c.last 

    // Even if we're doing a non-incremental update, we may want to try our
    // preferred DERP region for captive portal detection. Save that, if we
    // have it.
    var preferredDERP int
    if last != nil {
        preferredDERP = last.PreferredDERP  //即使是全量更新配置,仍然记录上一次探测选点的结果
    }

        now := c.timeNow()

    doFull := false
    if c.nextFull || now.Sub(c.lastFull) > 5*time.Minute {
        doFull = true
    }
    // If the last report had a captive portal and reported no UDP access,
    // it's possible that we didn't get a useful netcheck due to the
    // captive portal blocking us. If so, make this report a full
    // (non-incremental) one.
    if !doFull && last != nil {
        doFull = !last.UDP && last.CaptivePortal.EqualBool(true)
    }
    if doFull {
        last = nil // causes makeProbePlan below to do a full (initial) plan
        c.nextFull = false
        c.lastFull = now
        metricNumGetReportFull.Add(1)
    }

    rs.incremental = last != nil
    c.mu.Unlock()

    defer func() {
        c.mu.Lock()
        defer c.mu.Unlock()
        c.curState = nil   //每次探测完之后,将当前状态置为空
    }()

存储机器的网络接口、路由表和其他网络配置的状态

ifState, err := interfaces.GetState()
    if err != nil {
        c.logf("[v1] interfaces: %v", err)
        return nil, err
    }

检测本地环境对v4/v6的支持情况

// See if IPv6 works at all, or if it's been hard disabled at the
    // OS level.
    v6udp, err := nettype.MakePacketListenerWithNetIP(netns.Listener(c.logf)).ListenPacket(ctx, "udp6", "[::1]:0")
    if err == nil {
        rs.report.OSHasIPv6 = true
        v6udp.Close()
    }

    // Create a UDP4 socket used for sending to our discovered IPv4 address.
    rs.pc4Hair, err = nettype.MakePacketListenerWithNetIP(netns.Listener(c.logf)).ListenPacket(ctx, "udp4", ":0")
    if err != nil {
        c.logf("udp4: %v", err)
        return nil, err
    }
    defer rs.pc4Hair.Close()

如果是不测试,则SkipExternalNetwork为false,在本地默认网关设备上发起UPnP IGD、NAT-PMP 和 PCP请求。 如果其中一个协议引起响应,将请求公网端口映射。 可以将其视为一种着增强式的 stun,除了找到本机的公网ip:port映射之外,任何地方到达映射端口的任何数据包都会返回给本地。

if !c.SkipExternalNetwork && c.PortMapper != nil {
        rs.waitPortMap.Add(1)
        go rs.probePortMapServices()
    }

生成探测derpMap的任务

plan := makeProbePlan(dm, ifState, last)

生成探测任务的时候,根据上次探测结果来决定这次探测任务的策略

func makeProbePlan(dm *tailcfg.DERPMap, ifState *interfaces.State, last *Report) (plan probePlan) {
    if last == nil || len(last.RegionLatency) == 0 {
        return makeProbePlanInitial(dm, ifState)
    }

当上次探测的结果为空或者需要做全量探测的时候,会根据不同区域依次选出三个节点进行探测,如果有区域不足三个节点则采用轮询的方式循环三次将该区域的节点加入到探测任务中,例如:如果某个区域只有一个节点,则会生成三个对该节点探测的任务,每个节点开始执行探测任务的延迟分别是0ms,100ms,200ms.

func makeProbePlanInitial(dm *tailcfg.DERPMap, ifState *interfaces.State) (plan probePlan) {
    plan = make(probePlan)

    for _, reg := range dm.Regions {
        var p4 []probe
        var p6 []probe
        for try := 0; try < 3; try++ {
            n := reg.Nodes[try%len(reg.Nodes)]
            delay := time.Duration(try) * defaultInitialRetransmitTime
            if ifState.HaveV4 && nodeMight4(n) {
                p4 = append(p4, probe{delay: delay, node: n.Name, proto: probeIPv4})
            }
            if ifState.HaveV6 && nodeMight6(n) {
                p6 = append(p6, probe{delay: delay, node: n.Name, proto: probeIPv6})
            }
        }
        if len(p4) > 0 {
            plan[fmt.Sprintf("region-%d-v4", reg.RegionID)] = p4
        }
        if len(p6) > 0 {
            plan[fmt.Sprintf("region-%d-v6", reg.RegionID)] = p6
        }
    }
    return plan
}

如果上次探测结果不为空,按照上次不同区域的延时从小到大排序,只探测延时最短的三个区域,如果探测的区域是上一次选出的最优区域,会将该区域探测的节点数增加到4个。每个探测任务启动的时候会设置延时为(try50ms+try上次探测该区域的延时*1.2),try表示每个区域需要探测几个节点,这个延时主要用于重试 UDP 丢包或超时。探测最快的两个区域时,分别选择两个节点探测,探测延时快慢排序第三的区域选出一个节点探测。

func makeProbePlan(dm *tailcfg.DERPMap, ifState *interfaces.State, last *Report) (plan probePlan) {
    if last == nil || len(last.RegionLatency) == 0 {
        return makeProbePlanInitial(dm, ifState)
    }
    have6if := ifState.HaveV6
    have4if := ifState.HaveV4
    plan = make(probePlan)
    if !have4if && !have6if {
        return plan
    }
    had4 := len(last.RegionV4Latency) > 0
    had6 := len(last.RegionV6Latency) > 0
    hadBoth := have6if && had4 && had6
    for ri, reg := range sortRegions(dm, last) {
        if ri == numIncrementalRegions {
            break
        }
        var p4, p6 []probe
        do4 := have4if
        do6 := have6if

        // By default, each node only gets one STUN packet sent,
        // except the fastest two from the previous round.
        tries := 1
        isFastestTwo := ri < 2

        if isFastestTwo {
            tries = 2
        } else if hadBoth {
            // For dual stack machines, make the 3rd & slower nodes alternate
            // between.
            if ri%2 == 0 {
                do4, do6 = true, false
            } else {
                do4, do6 = false, true
            }
        }
        if !isFastestTwo && !had6 {
            do6 = false
        }

        if reg.RegionID == last.PreferredDERP {
            // But if we already had a DERP home, try extra hard to
            // make sure it's there so we don't flip flop around.
            tries = 4
        }

        for try := 0; try < tries; try++ {
            if len(reg.Nodes) == 0 {
                // Shouldn't be possible.
                continue
            }
            if try != 0 && !had6 {
                do6 = false
            }
            n := reg.Nodes[try%len(reg.Nodes)]
            prevLatency := last.RegionLatency[reg.RegionID] * 120 / 100
            if prevLatency == 0 {
                prevLatency = defaultActiveRetransmitTime
            }
            delay := time.Duration(try) * prevLatency
            if try > 1 {
                delay += time.Duration(try) * 50 * time.Millisecond
            }
            if do4 {
                p4 = append(p4, probe{delay: delay, node: n.Name, proto: probeIPv4})
            }
            if do6 {
                p6 = append(p6, probe{delay: delay, node: n.Name, proto: probeIPv6})
            }
        }
        if len(p4) > 0 {
            plan[fmt.Sprintf("region-%d-v4", reg.RegionID)] = p4
        }
        if len(p6) > 0 {
            plan[fmt.Sprintf("region-%d-v6", reg.RegionID)] = p6
        }
    }
    return plan
}

根据策略生成完探测任务之后,遍历所有的探测任务,开始执行任务,通过自己封装的waitGroup,等执行完探测任务之后,计数减一,当所有的探测任务goroutine结束之后通过wg.DoneChan()可以感知到

wg := syncs.NewWaitGroupChan()
    wg.Add(len(plan))
    for _, probeSet := range plan {
        setCtx, cancelSet := context.WithCancel(ctx)
        go func(probeSet []probe) {
            for _, probe := range probeSet {
                go rs.runProbe(setCtx, dm, probe, cancelSet)
            }
            <-setCtx.Done()
            wg.Decr()
        }(probeSet)
    }
        stunTimer := time.NewTimer(stunProbeTimeout)
    defer stunTimer.Stop()

    select {
    case <-stunTimer.C:
    case <-ctx.Done():
    case <-wg.DoneChan():
        // All of our probes finished, so if we have >0 responses, we
        // stop our captive portal check.
        if rs.anyUDP() {
            captivePortalStop()
        }
    case <-rs.stopProbeCh:
        // Saw enough regions.
        c.vlogf("saw enough regions; not waiting for rest")
        // We can stop the captive portal check since we know that we
        // got a bunch of STUN responses.
        captivePortalStop()
    }

探测任务执行流,每个探测任务在执行之前都需要用makeProbe生成当前任务时设置的延时,定时器在这个延时后才会执行探测。封装stun的数据报发送给被探测的节点。

func (rs *reportState) runProbe(ctx context.Context, dm *tailcfg.DERPMap, probe probe, cancelSet func()) {
    c := rs.c
    node := namedNode(dm, probe.node)
    if node == nil {
        c.logf("netcheck.runProbe: named node %q not found", probe.node)
        return
    }

    if probe.delay > 0 {
        delayTimer := time.NewTimer(probe.delay)
        select {
        case <-delayTimer.C:
        case <-ctx.Done():
            delayTimer.Stop()
            return
        }
    }

    if !rs.probeWouldHelp(probe, node) {
        cancelSet()
        return
    }

    addr := c.nodeAddr(ctx, node, probe.proto)
    if !addr.IsValid() {
        return
    }

    txID := stun.NewTxID()
    req := stun.Request(txID)

    sent := time.Now() // after DNS lookup above

    rs.mu.Lock()
    rs.inFlight[txID] = func(ipp netip.AddrPort) {
        rs.addNodeLatency(node, ipp, time.Since(sent))
        cancelSet() // abort other nodes in this set
    }
    rs.mu.Unlock()

    switch probe.proto {
    case probeIPv4:
        metricSTUNSend4.Add(1)
        n, err := rs.pc4.WriteToUDPAddrPort(req, addr)
        if n == len(req) && err == nil || neterror.TreatAsLostUDP(err) {
            rs.mu.Lock()
            rs.report.IPv4CanSend = true
            rs.mu.Unlock()
        }
    case probeIPv6:
        metricSTUNSend6.Add(1)
        n, err := rs.pc6.WriteToUDPAddrPort(req, addr)
        if n == len(req) && err == nil || neterror.TreatAsLostUDP(err) {
            rs.mu.Lock()
            rs.report.IPv6CanSend = true
            rs.mu.Unlock()
        }
    default:
        panic("bad probe proto " + fmt.Sprint(probe.proto))
    }
    c.vlogf("sent to %v", addr)
}

尝试 HTTPS 和 ICMP 延迟检查是否所有 STUN 探测都因 UDP 可能被阻止而失败。在 HTTPS 检查的同时启动 ICMP, 不会为这些探测重用相同的 WaitGroup,需要在超时后或所有 ICMP 探测完成后关闭底层 Pinger,而不管 HTTPS 探测是否已完成。

// Try HTTPS and ICMP latency check if all STUN probes failed due to
    // UDP presumably being blocked.
    // TODO: this should be moved into the probePlan, using probeProto probeHTTPS.
    if !rs.anyUDP() && ctx.Err() == nil {
        var wg sync.WaitGroup
        var need []*tailcfg.DERPRegion
        for rid, reg := range dm.Regions {
            if !rs.haveRegionLatency(rid) && regionHasDERPNode(reg) {
                need = append(need, reg)
            }
        }
        if len(need) > 0 {
            // Kick off ICMP in parallel to HTTPS checks; we don't
            // reuse the same WaitGroup for those probes because we
            // need to close the underlying Pinger after a timeout
            // or when all ICMP probes are done, regardless of
            // whether the HTTPS probes have finished.
            wg.Add(1)
            go func() {
                defer wg.Done()
                if err := c.measureAllICMPLatency(ctx, rs, need); err != nil {
                    c.logf("[v1] measureAllICMPLatency: %v", err)
                }
            }()

            wg.Add(len(need))
            c.logf("netcheck: UDP is blocked, trying HTTPS")
        }
        for _, reg := range need {
            go func(reg *tailcfg.DERPRegion) {
                defer wg.Done()
                if d, ip, err := c.measureHTTPSLatency(ctx, reg); err != nil {
                    c.logf("[v1] netcheck: measuring HTTPS latency of %v (%d): %v", reg.RegionCode, reg.RegionID, err)
                } else {
                    rs.mu.Lock()
                    if l, ok := rs.report.RegionLatency[reg.RegionID]; !ok {
                        mak.Set(&rs.report.RegionLatency, reg.RegionID, d)
                    } else if l >= d {
                        rs.report.RegionLatency[reg.RegionID] = d
                    }
                    // We set these IPv4 and IPv6 but they're not really used
                    // and we don't necessarily set them both. If UDP is blocked
                    // and both IPv4 and IPv6 are available over TCP, it's basically
                    // random which fields end up getting set here.
                    // Since they're not needed, that's fine for now.
                    if ip.Is4() {
                        rs.report.IPv4 = true
                    }
                    if ip.Is6() {
                        rs.report.IPv6 = true
                    }
                    rs.mu.Unlock()
                }
            }(reg)
        }
        wg.Wait()
    }

完成探测并记录探测数据,并选出最优的区域

func (c *Client) finishAndStoreReport(rs *reportState, dm *tailcfg.DERPMap) *Report {
    rs.mu.Lock()
    report := rs.report.Clone()
    rs.mu.Unlock()

    c.addReportHistoryAndSetPreferredDERP(report)
    c.logConciseReport(report, dm)

    return report
}

{/collapse-item}
{/collapse}

当前页面是本站的「Baidu MIP」版。发表评论请点击:完整版 »