Update dependencies

This commit is contained in:
bluepython508
2025-04-09 01:00:12 +01:00
parent f0641ffd6e
commit 5a9cfc022c
882 changed files with 68930 additions and 24201 deletions

View File

@@ -102,8 +102,7 @@ func (c *Conn) ServeHTTPDebug(w http.ResponseWriter, r *http.Request) {
sort.Slice(ent, func(i, j int) bool { return ent[i].pub.Less(ent[j].pub) })
peers := map[key.NodePublic]tailcfg.NodeView{}
for i := range c.peers.Len() {
p := c.peers.At(i)
for _, p := range c.peers.All() {
peers[p.Key()] = p
}

View File

@@ -64,10 +64,30 @@ func (c *Conn) removeDerpPeerRoute(peer key.NodePublic, regionID int, dc *derpht
// addDerpPeerRoute adds a DERP route entry, noting that peer was seen
// on DERP node derpID, at least on the connection identified by dc.
// See issue 150 for details.
func (c *Conn) addDerpPeerRoute(peer key.NodePublic, derpID int, dc *derphttp.Client) {
func (c *Conn) addDerpPeerRoute(peer key.NodePublic, regionID int, dc *derphttp.Client) {
c.mu.Lock()
defer c.mu.Unlock()
mak.Set(&c.derpRoute, peer, derpRoute{derpID, dc})
mak.Set(&c.derpRoute, peer, derpRoute{regionID, dc})
}
// fallbackDERPRegionForPeer returns the DERP region ID we might be able to use
// to contact peer, learned from observing recent DERP traffic from them.
//
// This is used as a fallback when a peer receives a packet from a peer
// over DERP but doesn't known that peer's home DERP or any UDP endpoints.
// This is particularly useful for large one-way nodes (such as hello.ts.net)
// that don't actively reach out to other nodes, so don't need to be told
// the DERP home of peers. They can instead learn the DERP home upon getting the
// first connection.
//
// This can also help nodes from a slow or misbehaving control plane.
func (c *Conn) fallbackDERPRegionForPeer(peer key.NodePublic) (regionID int) {
c.mu.Lock()
defer c.mu.Unlock()
if dr, ok := c.derpRoute[peer]; ok {
return dr.regionID
}
return 0
}
// activeDerp contains fields for an active DERP connection.
@@ -158,10 +178,10 @@ func (c *Conn) maybeSetNearestDERP(report *netcheck.Report) (preferredDERP int)
} else {
connectedToControl = c.health.GetInPollNetMap()
}
c.mu.Lock()
myDerp := c.myDerp
c.mu.Unlock()
if !connectedToControl {
c.mu.Lock()
myDerp := c.myDerp
c.mu.Unlock()
if myDerp != 0 {
metricDERPHomeNoChangeNoControl.Add(1)
return myDerp
@@ -178,6 +198,11 @@ func (c *Conn) maybeSetNearestDERP(report *netcheck.Report) (preferredDERP int)
// one.
preferredDERP = c.pickDERPFallback()
}
if preferredDERP != myDerp {
c.logf(
"magicsock: home DERP changing from derp-%d [%dms] to derp-%d [%dms]",
c.myDerp, report.RegionLatency[myDerp].Milliseconds(), preferredDERP, report.RegionLatency[preferredDERP].Milliseconds())
}
if !c.setNearestDERP(preferredDERP) {
preferredDERP = 0
}
@@ -627,7 +652,7 @@ func (c *Conn) runDerpReader(ctx context.Context, regionID int, dc *derphttp.Cli
// Do nothing.
case derp.PeerGoneReasonNotHere:
metricRecvDiscoDERPPeerNotHere.Add(1)
c.logf("[unexpected] magicsock: derp-%d does not know about peer %s, removing route",
c.logf("magicsock: derp-%d does not know about peer %s, removing route",
regionID, key.NodePublic(m.Peer).ShortString())
default:
metricRecvDiscoDERPPeerGoneUnknown.Add(1)
@@ -644,9 +669,10 @@ func (c *Conn) runDerpReader(ctx context.Context, regionID int, dc *derphttp.Cli
}
type derpWriteRequest struct {
addr netip.AddrPort
pubKey key.NodePublic
b []byte // copied; ownership passed to receiver
addr netip.AddrPort
pubKey key.NodePublic
b []byte // copied; ownership passed to receiver
isDisco bool
}
// runDerpWriter runs in a goroutine for the life of a DERP
@@ -668,8 +694,12 @@ func (c *Conn) runDerpWriter(ctx context.Context, dc *derphttp.Client, ch <-chan
if err != nil {
c.logf("magicsock: derp.Send(%v): %v", wr.addr, err)
metricSendDERPError.Add(1)
} else {
metricSendDERP.Add(1)
if !wr.isDisco {
c.metrics.outboundPacketsDroppedErrors.Add(1)
}
} else if !wr.isDisco {
c.metrics.outboundPacketsDERPTotal.Add(1)
c.metrics.outboundBytesDERPTotal.Add(int64(len(wr.b)))
}
}
}
@@ -690,7 +720,6 @@ func (c *connBind) receiveDERP(buffs [][]byte, sizes []int, eps []conn.Endpoint)
// No data read occurred. Wait for another packet.
continue
}
metricRecvDataDERP.Add(1)
sizes[0] = n
eps[0] = ep
return 1, nil
@@ -728,8 +757,11 @@ func (c *Conn) processDERPReadResult(dm derpReadResult, b []byte) (n int, ep *en
ep.noteRecvActivity(ipp, mono.Now())
if stats := c.stats.Load(); stats != nil {
stats.UpdateRxPhysical(ep.nodeAddr, ipp, dm.n)
stats.UpdateRxPhysical(ep.nodeAddr, ipp, 1, dm.n)
}
c.metrics.inboundPacketsDERPTotal.Add(1)
c.metrics.inboundBytesDERPTotal.Add(int64(n))
return n, ep
}

View File

@@ -9,6 +9,7 @@ import (
"encoding/binary"
"errors"
"fmt"
"iter"
"math"
"math/rand/v2"
"net"
@@ -20,7 +21,6 @@ import (
"sync/atomic"
"time"
xmaps "golang.org/x/exp/maps"
"golang.org/x/net/ipv4"
"golang.org/x/net/ipv6"
"tailscale.com/disco"
@@ -33,6 +33,7 @@ import (
"tailscale.com/types/logger"
"tailscale.com/util/mak"
"tailscale.com/util/ringbuffer"
"tailscale.com/util/slicesx"
)
var mtuProbePingSizesV4 []int
@@ -586,7 +587,7 @@ func (de *endpoint) addrForWireGuardSendLocked(now mono.Time) (udpAddr netip.Add
needPing := len(de.endpointState) > 1 && now.Sub(oldestPing) > wireguardPingInterval
if !udpAddr.IsValid() {
candidates := xmaps.Keys(de.endpointState)
candidates := slicesx.MapKeys(de.endpointState)
// Randomly select an address to use until we retrieve latency information
// and give it a short trustBestAddrUntil time so we avoid flapping between
@@ -947,7 +948,15 @@ func (de *endpoint) send(buffs [][]byte) error {
de.mu.Unlock()
if !udpAddr.IsValid() && !derpAddr.IsValid() {
return errNoUDPOrDERP
// Make a last ditch effort to see if we have a DERP route for them. If
// they contacted us over DERP and we don't know their UDP endpoints or
// their DERP home, we can at least assume they're reachable over the
// DERP they used to contact us.
if rid := de.c.fallbackDERPRegionForPeer(de.publicKey); rid != 0 {
derpAddr = netip.AddrPortFrom(tailcfg.DerpMagicIPAddr, uint16(rid))
} else {
return errNoUDPOrDERP
}
}
var err error
if udpAddr.IsValid() {
@@ -960,26 +969,40 @@ func (de *endpoint) send(buffs [][]byte) error {
de.noteBadEndpoint(udpAddr)
}
var txBytes int
for _, b := range buffs {
txBytes += len(b)
}
switch {
case udpAddr.Addr().Is4():
de.c.metrics.outboundPacketsIPv4Total.Add(int64(len(buffs)))
de.c.metrics.outboundBytesIPv4Total.Add(int64(txBytes))
case udpAddr.Addr().Is6():
de.c.metrics.outboundPacketsIPv6Total.Add(int64(len(buffs)))
de.c.metrics.outboundBytesIPv6Total.Add(int64(txBytes))
}
// TODO(raggi): needs updating for accuracy, as in error conditions we may have partial sends.
if stats := de.c.stats.Load(); err == nil && stats != nil {
var txBytes int
for _, b := range buffs {
txBytes += len(b)
}
stats.UpdateTxPhysical(de.nodeAddr, udpAddr, txBytes)
stats.UpdateTxPhysical(de.nodeAddr, udpAddr, len(buffs), txBytes)
}
}
if derpAddr.IsValid() {
allOk := true
var txBytes int
for _, buff := range buffs {
ok, _ := de.c.sendAddr(derpAddr, de.publicKey, buff)
if stats := de.c.stats.Load(); stats != nil {
stats.UpdateTxPhysical(de.nodeAddr, derpAddr, len(buff))
}
const isDisco = false
ok, _ := de.c.sendAddr(derpAddr, de.publicKey, buff, isDisco)
txBytes += len(buff)
if !ok {
allOk = false
}
}
if stats := de.c.stats.Load(); stats != nil {
stats.UpdateTxPhysical(de.nodeAddr, derpAddr, len(buffs), txBytes)
}
if allOk {
return nil
}
@@ -1344,7 +1367,7 @@ func (de *endpoint) updateFromNode(n tailcfg.NodeView, heartbeatDisabled bool, p
})
de.resetLocked()
}
if n.DERP() == "" {
if n.HomeDERP() == 0 {
if de.derpAddr.IsValid() {
de.debugUpdates.Add(EndpointChange{
When: time.Now(),
@@ -1354,7 +1377,7 @@ func (de *endpoint) updateFromNode(n tailcfg.NodeView, heartbeatDisabled bool, p
}
de.derpAddr = netip.AddrPort{}
} else {
newDerp, _ := netip.ParseAddrPort(n.DERP())
newDerp := netip.AddrPortFrom(tailcfg.DerpMagicIPAddr, uint16(n.HomeDERP()))
if de.derpAddr != newDerp {
de.debugUpdates.Add(EndpointChange{
When: time.Now(),
@@ -1370,20 +1393,18 @@ func (de *endpoint) updateFromNode(n tailcfg.NodeView, heartbeatDisabled bool, p
}
func (de *endpoint) setEndpointsLocked(eps interface {
Len() int
At(i int) netip.AddrPort
All() iter.Seq2[int, netip.AddrPort]
}) {
for _, st := range de.endpointState {
st.index = indexSentinelDeleted // assume deleted until updated in next loop
}
var newIpps []netip.AddrPort
for i := range eps.Len() {
for i, ipp := range eps.All() {
if i > math.MaxInt16 {
// Seems unlikely.
break
}
ipp := eps.At(i)
if !ipp.IsValid() {
de.c.logf("magicsock: bogus netmap endpoint from %v", eps)
continue

View File

@@ -10,17 +10,18 @@ import (
"bytes"
"context"
"errors"
"expvar"
"fmt"
"io"
"net"
"net/netip"
"reflect"
"runtime"
"slices"
"strconv"
"strings"
"sync"
"sync/atomic"
"syscall"
"time"
"github.com/tailscale/wireguard-go/conn"
@@ -59,9 +60,7 @@ import (
"tailscale.com/util/ringbuffer"
"tailscale.com/util/set"
"tailscale.com/util/testenv"
"tailscale.com/util/uniq"
"tailscale.com/util/usermetric"
"tailscale.com/wgengine/capture"
"tailscale.com/wgengine/wgint"
)
@@ -80,6 +79,58 @@ const (
socketBufferSize = 7 << 20
)
// Path is a label indicating the type of path a packet took.
type Path string
const (
PathDirectIPv4 Path = "direct_ipv4"
PathDirectIPv6 Path = "direct_ipv6"
PathDERP Path = "derp"
)
type pathLabel struct {
// Path indicates the path that the packet took:
// - direct_ipv4
// - direct_ipv6
// - derp
Path Path
}
// metrics in wgengine contains the usermetrics counters for magicsock, it
// is however a bit special. All them metrics are labeled, but looking up
// the metric everytime we need to record it has an overhead, and includes
// a lock in MultiLabelMap. The metrics are therefore instead created with
// wgengine and the underlying expvar.Int is stored to be used directly.
type metrics struct {
// inboundPacketsTotal is the total number of inbound packets received,
// labeled by the path the packet took.
inboundPacketsIPv4Total expvar.Int
inboundPacketsIPv6Total expvar.Int
inboundPacketsDERPTotal expvar.Int
// inboundBytesTotal is the total number of inbound bytes received,
// labeled by the path the packet took.
inboundBytesIPv4Total expvar.Int
inboundBytesIPv6Total expvar.Int
inboundBytesDERPTotal expvar.Int
// outboundPacketsTotal is the total number of outbound packets sent,
// labeled by the path the packet took.
outboundPacketsIPv4Total expvar.Int
outboundPacketsIPv6Total expvar.Int
outboundPacketsDERPTotal expvar.Int
// outboundBytesTotal is the total number of outbound bytes sent,
// labeled by the path the packet took.
outboundBytesIPv4Total expvar.Int
outboundBytesIPv6Total expvar.Int
outboundBytesDERPTotal expvar.Int
// outboundPacketsDroppedErrors is the total number of outbound packets
// dropped due to errors.
outboundPacketsDroppedErrors expvar.Int
}
// A Conn routes UDP packets and actively manages a list of its endpoints.
type Conn struct {
// This block mirrors the contents and field order of the Options
@@ -126,6 +177,10 @@ type Conn struct {
// port mappings from NAT devices.
portMapper *portmapper.Client
// portMapperLogfUnregister is the function to call to unregister
// the portmapper log limiter.
portMapperLogfUnregister func()
// derpRecvCh is used by receiveDERP to read DERP messages.
// It must have buffer size > 0; see issue 3736.
derpRecvCh chan derpReadResult
@@ -186,7 +241,7 @@ type Conn struct {
stats atomic.Pointer[connstats.Statistics]
// captureHook, if non-nil, is the pcap logging callback when capturing.
captureHook syncs.AtomicValue[capture.Callback]
captureHook syncs.AtomicValue[packet.CaptureCallback]
// discoPrivate is the private naclbox key used for active
// discovery traffic. It is always present, and immutable.
@@ -312,15 +367,18 @@ type Conn struct {
// wireguard state by its public key. If nil, it's not used.
getPeerByKey func(key.NodePublic) (_ wgint.Peer, ok bool)
// lastEPERMRebind tracks the last time a rebind was performed
// after experiencing a syscall.EPERM.
lastEPERMRebind syncs.AtomicValue[time.Time]
// lastErrRebind tracks the last time a rebind was performed after
// experiencing a write error, and is used to throttle the rate of rebinds.
lastErrRebind syncs.AtomicValue[time.Time]
// staticEndpoints are user set endpoints that this node should
// advertise amongst its wireguard endpoints. It is user's
// responsibility to ensure that traffic from these endpoints is routed
// to the node.
staticEndpoints views.Slice[netip.AddrPort]
// metrics contains the metrics for the magicsock instance.
metrics *metrics
}
// SetDebugLoggingEnabled controls whether spammy debug logging is enabled.
@@ -478,10 +536,15 @@ func NewConn(opts Options) (*Conn, error) {
c.idleFunc = opts.IdleFunc
c.testOnlyPacketListener = opts.TestOnlyPacketListener
c.noteRecvActivity = opts.NoteRecvActivity
// Don't log the same log messages possibly every few seconds in our
// portmapper.
portmapperLogf := logger.WithPrefix(c.logf, "portmapper: ")
portmapperLogf, c.portMapperLogfUnregister = netmon.LinkChangeLogLimiter(portmapperLogf, opts.NetMon)
portMapOpts := &portmapper.DebugKnobs{
DisableAll: func() bool { return opts.DisablePortMapper || c.onlyTCP443.Load() },
}
c.portMapper = portmapper.NewClient(logger.WithPrefix(c.logf, "portmapper: "), opts.NetMon, portMapOpts, opts.ControlKnobs, c.onPortMapChanged)
c.portMapper = portmapper.NewClient(portmapperLogf, opts.NetMon, portMapOpts, opts.ControlKnobs, c.onPortMapChanged)
c.portMapper.SetGatewayLookupFunc(opts.NetMon.GatewayAndSelfIP)
c.netMon = opts.NetMon
c.health = opts.HealthTracker
@@ -503,6 +566,8 @@ func NewConn(opts Options) (*Conn, error) {
UseDNSCache: true,
}
c.metrics = registerMetrics(opts.Metrics)
if d4, err := c.listenRawDisco("ip4"); err == nil {
c.logf("[v1] using BPF disco receiver for IPv4")
c.closeDisco4 = d4
@@ -520,11 +585,85 @@ func NewConn(opts Options) (*Conn, error) {
return c, nil
}
// registerMetrics wires up the metrics for wgengine, instead of
// registering the label metric directly, the underlying expvar is exposed.
// See metrics for more info.
func registerMetrics(reg *usermetric.Registry) *metrics {
pathDirectV4 := pathLabel{Path: PathDirectIPv4}
pathDirectV6 := pathLabel{Path: PathDirectIPv6}
pathDERP := pathLabel{Path: PathDERP}
inboundPacketsTotal := usermetric.NewMultiLabelMapWithRegistry[pathLabel](
reg,
"tailscaled_inbound_packets_total",
"counter",
"Counts the number of packets received from other peers",
)
inboundBytesTotal := usermetric.NewMultiLabelMapWithRegistry[pathLabel](
reg,
"tailscaled_inbound_bytes_total",
"counter",
"Counts the number of bytes received from other peers",
)
outboundPacketsTotal := usermetric.NewMultiLabelMapWithRegistry[pathLabel](
reg,
"tailscaled_outbound_packets_total",
"counter",
"Counts the number of packets sent to other peers",
)
outboundBytesTotal := usermetric.NewMultiLabelMapWithRegistry[pathLabel](
reg,
"tailscaled_outbound_bytes_total",
"counter",
"Counts the number of bytes sent to other peers",
)
outboundPacketsDroppedErrors := reg.DroppedPacketsOutbound()
m := new(metrics)
// Map clientmetrics to the usermetric counters.
metricRecvDataPacketsIPv4.Register(&m.inboundPacketsIPv4Total)
metricRecvDataPacketsIPv6.Register(&m.inboundPacketsIPv6Total)
metricRecvDataPacketsDERP.Register(&m.inboundPacketsDERPTotal)
metricSendUDP.Register(&m.outboundPacketsIPv4Total)
metricSendUDP.Register(&m.outboundPacketsIPv6Total)
metricSendDERP.Register(&m.outboundPacketsDERPTotal)
inboundPacketsTotal.Set(pathDirectV4, &m.inboundPacketsIPv4Total)
inboundPacketsTotal.Set(pathDirectV6, &m.inboundPacketsIPv6Total)
inboundPacketsTotal.Set(pathDERP, &m.inboundPacketsDERPTotal)
inboundBytesTotal.Set(pathDirectV4, &m.inboundBytesIPv4Total)
inboundBytesTotal.Set(pathDirectV6, &m.inboundBytesIPv6Total)
inboundBytesTotal.Set(pathDERP, &m.inboundBytesDERPTotal)
outboundPacketsTotal.Set(pathDirectV4, &m.outboundPacketsIPv4Total)
outboundPacketsTotal.Set(pathDirectV6, &m.outboundPacketsIPv6Total)
outboundPacketsTotal.Set(pathDERP, &m.outboundPacketsDERPTotal)
outboundBytesTotal.Set(pathDirectV4, &m.outboundBytesIPv4Total)
outboundBytesTotal.Set(pathDirectV6, &m.outboundBytesIPv6Total)
outboundBytesTotal.Set(pathDERP, &m.outboundBytesDERPTotal)
outboundPacketsDroppedErrors.Set(usermetric.DropLabels{Reason: usermetric.ReasonError}, &m.outboundPacketsDroppedErrors)
return m
}
// deregisterMetrics unregisters the underlying usermetrics expvar counters
// from clientmetrics.
func deregisterMetrics(m *metrics) {
metricRecvDataPacketsIPv4.UnregisterAll()
metricRecvDataPacketsIPv6.UnregisterAll()
metricRecvDataPacketsDERP.UnregisterAll()
metricSendUDP.UnregisterAll()
metricSendDERP.UnregisterAll()
}
// InstallCaptureHook installs a callback which is called to
// log debug information into the pcap stream. This function
// can be called with a nil argument to uninstall the capture
// hook.
func (c *Conn) InstallCaptureHook(cb capture.Callback) {
func (c *Conn) InstallCaptureHook(cb packet.CaptureCallback) {
c.captureHook.Store(cb)
}
@@ -988,8 +1127,8 @@ func (c *Conn) determineEndpoints(ctx context.Context) ([]tailcfg.Endpoint, erro
// re-run.
eps = c.endpointTracker.update(time.Now(), eps)
for i := range c.staticEndpoints.Len() {
addAddr(c.staticEndpoints.At(i), tailcfg.EndpointExplicitConf)
for _, ep := range c.staticEndpoints.All() {
addAddr(ep, tailcfg.EndpointExplicitConf)
}
if localAddr := c.pconn4.LocalAddr(); localAddr.IP.IsUnspecified() {
@@ -1078,8 +1217,13 @@ func (c *Conn) networkDown() bool { return !c.networkUp.Load() }
// Send implements conn.Bind.
//
// See https://pkg.go.dev/golang.zx2c4.com/wireguard/conn#Bind.Send
func (c *Conn) Send(buffs [][]byte, ep conn.Endpoint) error {
func (c *Conn) Send(buffs [][]byte, ep conn.Endpoint) (err error) {
n := int64(len(buffs))
defer func() {
if err != nil {
c.metrics.outboundPacketsDroppedErrors.Add(n)
}
}()
metricSendData.Add(n)
if c.networkDown() {
metricSendDataNetworkDown.Add(n)
@@ -1122,7 +1266,7 @@ func (c *Conn) sendUDPBatch(addr netip.AddrPort, buffs [][]byte) (sent bool, err
c.logf("magicsock: %s", errGSO.Error())
err = errGSO.RetryErr
} else {
_ = c.maybeRebindOnError(runtime.GOOS, err)
c.maybeRebindOnError(err)
}
}
return err == nil, err
@@ -1130,48 +1274,44 @@ func (c *Conn) sendUDPBatch(addr netip.AddrPort, buffs [][]byte) (sent bool, err
// sendUDP sends UDP packet b to ipp.
// See sendAddr's docs on the return value meanings.
func (c *Conn) sendUDP(ipp netip.AddrPort, b []byte) (sent bool, err error) {
func (c *Conn) sendUDP(ipp netip.AddrPort, b []byte, isDisco bool) (sent bool, err error) {
if runtime.GOOS == "js" {
return false, errNoUDP
}
sent, err = c.sendUDPStd(ipp, b)
if err != nil {
metricSendUDPError.Add(1)
_ = c.maybeRebindOnError(runtime.GOOS, err)
c.maybeRebindOnError(err)
} else {
if sent {
metricSendUDP.Add(1)
if sent && !isDisco {
switch {
case ipp.Addr().Is4():
c.metrics.outboundPacketsIPv4Total.Add(1)
c.metrics.outboundBytesIPv4Total.Add(int64(len(b)))
case ipp.Addr().Is6():
c.metrics.outboundPacketsIPv6Total.Add(1)
c.metrics.outboundBytesIPv6Total.Add(int64(len(b)))
}
}
}
return
}
// maybeRebindOnError performs a rebind and restun if the error is defined and
// any conditionals are met.
func (c *Conn) maybeRebindOnError(os string, err error) bool {
switch {
case errors.Is(err, syscall.EPERM):
why := "operation-not-permitted-rebind"
switch os {
// We currently will only rebind and restun on a syscall.EPERM if it is experienced
// on a client running darwin.
// TODO(charlotte, raggi): expand os options if required.
case "darwin":
// TODO(charlotte): implement a backoff, so we don't end up in a rebind loop for persistent
// EPERMs.
if c.lastEPERMRebind.Load().Before(time.Now().Add(-5 * time.Second)) {
c.logf("magicsock: performing %q", why)
c.lastEPERMRebind.Store(time.Now())
c.Rebind()
go c.ReSTUN(why)
return true
}
default:
c.logf("magicsock: not performing %q", why)
return false
}
// maybeRebindOnError performs a rebind and restun if the error is one that is
// known to be healed by a rebind, and the rebind is not throttled.
func (c *Conn) maybeRebindOnError(err error) {
ok, reason := shouldRebind(err)
if !ok {
return
}
if c.lastErrRebind.Load().Before(time.Now().Add(-5 * time.Second)) {
c.logf("magicsock: performing rebind due to %q", reason)
c.Rebind()
go c.ReSTUN(reason)
} else {
c.logf("magicsock: not performing %q rebind due to throttle", reason)
}
return false
}
// sendUDPNetcheck sends b via UDP to addr. It is used exclusively by netcheck.
@@ -1225,9 +1365,9 @@ func (c *Conn) sendUDPStd(addr netip.AddrPort, b []byte) (sent bool, err error)
// An example of when they might be different: sending to an
// IPv6 address when the local machine doesn't have IPv6 support
// returns (false, nil); it's not an error, but nothing was sent.
func (c *Conn) sendAddr(addr netip.AddrPort, pubKey key.NodePublic, b []byte) (sent bool, err error) {
func (c *Conn) sendAddr(addr netip.AddrPort, pubKey key.NodePublic, b []byte, isDisco bool) (sent bool, err error) {
if addr.Addr() != tailcfg.DerpMagicIPAddr {
return c.sendUDP(addr, b)
return c.sendUDP(addr, b, isDisco)
}
regionID := int(addr.Port())
@@ -1248,7 +1388,7 @@ func (c *Conn) sendAddr(addr netip.AddrPort, pubKey key.NodePublic, b []byte) (s
case <-c.donec:
metricSendDERPErrorClosed.Add(1)
return false, errConnClosed
case ch <- derpWriteRequest{addr, pubKey, pkt}:
case ch <- derpWriteRequest{addr, pubKey, pkt, isDisco}:
metricSendDERPQueued.Add(1)
return true, nil
default:
@@ -1278,19 +1418,24 @@ func (c *Conn) putReceiveBatch(batch *receiveBatch) {
c.receiveBatchPool.Put(batch)
}
// receiveIPv4 creates an IPv4 ReceiveFunc reading from c.pconn4.
func (c *Conn) receiveIPv4() conn.ReceiveFunc {
return c.mkReceiveFunc(&c.pconn4, c.health.ReceiveFuncStats(health.ReceiveIPv4), metricRecvDataIPv4)
return c.mkReceiveFunc(&c.pconn4, c.health.ReceiveFuncStats(health.ReceiveIPv4),
&c.metrics.inboundPacketsIPv4Total,
&c.metrics.inboundBytesIPv4Total,
)
}
// receiveIPv6 creates an IPv6 ReceiveFunc reading from c.pconn6.
func (c *Conn) receiveIPv6() conn.ReceiveFunc {
return c.mkReceiveFunc(&c.pconn6, c.health.ReceiveFuncStats(health.ReceiveIPv6), metricRecvDataIPv6)
return c.mkReceiveFunc(&c.pconn6, c.health.ReceiveFuncStats(health.ReceiveIPv6),
&c.metrics.inboundPacketsIPv6Total,
&c.metrics.inboundBytesIPv6Total,
)
}
// mkReceiveFunc creates a ReceiveFunc reading from ruc.
// The provided healthItem and metric are updated if non-nil.
func (c *Conn) mkReceiveFunc(ruc *RebindingUDPConn, healthItem *health.ReceiveFuncStats, metric *clientmetric.Metric) conn.ReceiveFunc {
// The provided healthItem and metrics are updated if non-nil.
func (c *Conn) mkReceiveFunc(ruc *RebindingUDPConn, healthItem *health.ReceiveFuncStats, packetMetric, bytesMetric *expvar.Int) conn.ReceiveFunc {
// epCache caches an IPPort->endpoint for hot flows.
var epCache ippEndpointCache
@@ -1327,8 +1472,11 @@ func (c *Conn) mkReceiveFunc(ruc *RebindingUDPConn, healthItem *health.ReceiveFu
}
ipp := msg.Addr.(*net.UDPAddr).AddrPort()
if ep, ok := c.receiveIP(msg.Buffers[0][:msg.N], ipp, &epCache); ok {
if metric != nil {
metric.Add(1)
if packetMetric != nil {
packetMetric.Add(1)
}
if bytesMetric != nil {
bytesMetric.Add(int64(msg.N))
}
eps[i] = ep
sizes[i] = msg.N
@@ -1384,7 +1532,7 @@ func (c *Conn) receiveIP(b []byte, ipp netip.AddrPort, cache *ippEndpointCache)
ep.lastRecvUDPAny.StoreAtomic(now)
ep.noteRecvActivity(ipp, now)
if stats := c.stats.Load(); stats != nil {
stats.UpdateRxPhysical(ep.nodeAddr, ipp, len(b))
stats.UpdateRxPhysical(ep.nodeAddr, ipp, 1, len(b))
}
return ep, true
}
@@ -1438,7 +1586,8 @@ func (c *Conn) sendDiscoMessage(dst netip.AddrPort, dstKey key.NodePublic, dstDi
box := di.sharedKey.Seal(m.AppendMarshal(nil))
pkt = append(pkt, box...)
sent, err = c.sendAddr(dst, dstKey, pkt)
const isDisco = true
sent, err = c.sendAddr(dst, dstKey, pkt, isDisco)
if sent {
if logLevel == discoLog || (logLevel == discoVerboseLog && debugDisco()) {
node := "?"
@@ -1568,7 +1717,7 @@ func (c *Conn) handleDiscoMessage(msg []byte, src netip.AddrPort, derpNodeSrc ke
// Emit information about the disco frame into the pcap stream
// if a capture hook is installed.
if cb := c.captureHook.Load(); cb != nil {
cb(capture.PathDisco, time.Now(), disco.ToPCAPFrame(src, derpNodeSrc, payload), packet.CaptureMeta{})
cb(packet.PathDisco, time.Now(), disco.ToPCAPFrame(src, derpNodeSrc, payload), packet.CaptureMeta{})
}
dm, err := disco.Parse(payload)
@@ -2196,10 +2345,7 @@ func devPanicf(format string, a ...any) {
func (c *Conn) logEndpointCreated(n tailcfg.NodeView) {
c.logf("magicsock: created endpoint key=%s: disco=%s; %v", n.Key().ShortString(), n.DiscoKey().ShortString(), logger.ArgWriter(func(w *bufio.Writer) {
const derpPrefix = "127.3.3.40:"
if strings.HasPrefix(n.DERP(), derpPrefix) {
ipp, _ := netip.ParseAddrPort(n.DERP())
regionID := int(ipp.Port())
if regionID := n.HomeDERP(); regionID != 0 {
code := c.derpRegionCodeLocked(regionID)
if code != "" {
code = "(" + code + ")"
@@ -2207,16 +2353,14 @@ func (c *Conn) logEndpointCreated(n tailcfg.NodeView) {
fmt.Fprintf(w, "derp=%v%s ", regionID, code)
}
for i := range n.AllowedIPs().Len() {
a := n.AllowedIPs().At(i)
for _, a := range n.AllowedIPs().All() {
if a.IsSingleIP() {
fmt.Fprintf(w, "aip=%v ", a.Addr())
} else {
fmt.Fprintf(w, "aip=%v ", a)
}
}
for i := range n.Endpoints().Len() {
ep := n.Endpoints().At(i)
for _, ep := range n.Endpoints().All() {
fmt.Fprintf(w, "ep=%v ", ep)
}
}))
@@ -2346,6 +2490,7 @@ func (c *Conn) Close() error {
}
c.stopPeriodicReSTUNTimerLocked()
c.portMapper.Close()
c.portMapperLogfUnregister()
c.peerMap.forEachEndpoint(func(ep *endpoint) {
ep.stopAndReset()
@@ -2377,6 +2522,8 @@ func (c *Conn) Close() error {
pinger.Close()
}
deregisterMetrics(c.metrics)
return nil
}
@@ -2525,7 +2672,7 @@ func (c *Conn) bindSocket(ruc *RebindingUDPConn, network string, curPortFate cur
}
ports = append(ports, 0)
// Remove duplicates. (All duplicates are consecutive.)
uniq.ModifySlice(&ports)
ports = slices.Compact(ports)
if debugBindSocket() {
c.logf("magicsock: bindSocket: candidate ports: %+v", ports)
@@ -2860,6 +3007,14 @@ func (c *Conn) DebugPickNewDERP() error {
return errors.New("too few regions")
}
func (c *Conn) DebugForcePreferDERP(n int) {
c.mu.Lock()
defer c.mu.Unlock()
c.logf("magicsock: [debug] force preferred DERP set to: %d", n)
c.netChecker.SetForcePreferredDERP(n)
}
// portableTrySetSocketBuffer sets SO_SNDBUF and SO_RECVBUF on pconn to socketBufferSize,
// logging an error if it occurs.
func portableTrySetSocketBuffer(pconn nettype.PacketConn, logf logger.Logf) {
@@ -2930,17 +3085,17 @@ var (
metricSendDERPErrorChan = clientmetric.NewCounter("magicsock_send_derp_error_chan")
metricSendDERPErrorClosed = clientmetric.NewCounter("magicsock_send_derp_error_closed")
metricSendDERPErrorQueue = clientmetric.NewCounter("magicsock_send_derp_error_queue")
metricSendUDP = clientmetric.NewCounter("magicsock_send_udp")
metricSendUDP = clientmetric.NewAggregateCounter("magicsock_send_udp")
metricSendUDPError = clientmetric.NewCounter("magicsock_send_udp_error")
metricSendDERP = clientmetric.NewCounter("magicsock_send_derp")
metricSendDERP = clientmetric.NewAggregateCounter("magicsock_send_derp")
metricSendDERPError = clientmetric.NewCounter("magicsock_send_derp_error")
// Data packets (non-disco)
metricSendData = clientmetric.NewCounter("magicsock_send_data")
metricSendDataNetworkDown = clientmetric.NewCounter("magicsock_send_data_network_down")
metricRecvDataDERP = clientmetric.NewCounter("magicsock_recv_data_derp")
metricRecvDataIPv4 = clientmetric.NewCounter("magicsock_recv_data_ipv4")
metricRecvDataIPv6 = clientmetric.NewCounter("magicsock_recv_data_ipv6")
metricRecvDataPacketsDERP = clientmetric.NewAggregateCounter("magicsock_recv_data_derp")
metricRecvDataPacketsIPv4 = clientmetric.NewAggregateCounter("magicsock_recv_data_ipv4")
metricRecvDataPacketsIPv6 = clientmetric.NewAggregateCounter("magicsock_recv_data_ipv6")
// Disco packets
metricSendDiscoUDP = clientmetric.NewCounter("magicsock_disco_send_udp")

View File

@@ -0,0 +1,31 @@
// Copyright (c) Tailscale Inc & AUTHORS
// SPDX-License-Identifier: BSD-3-Clause
//go:build !plan9
package magicsock
import (
"errors"
"syscall"
)
// shouldRebind returns if the error is one that is known to be healed by a
// rebind, and if so also returns a resason string for the rebind.
func shouldRebind(err error) (ok bool, reason string) {
switch {
// EPIPE/ENOTCONN are common errors when a send fails due to a closed
// socket. There is some platform and version inconsistency in which
// error is returned, but the meaning is the same.
case errors.Is(err, syscall.EPIPE), errors.Is(err, syscall.ENOTCONN):
return true, "broken-pipe"
// EPERM is typically caused by EDR software, and has been observed to be
// transient, it seems that some versions of some EDR lose track of sockets
// at times, and return EPERM, but reconnects will establish appropriate
// rights associated with a new socket.
case errors.Is(err, syscall.EPERM):
return true, "operation-not-permitted"
}
return false, ""
}

View File

@@ -0,0 +1,12 @@
// Copyright (c) Tailscale Inc & AUTHORS
// SPDX-License-Identifier: BSD-3-Clause
//go:build plan9
package magicsock
// shouldRebind returns if the error is one that is known to be healed by a
// rebind, and if so also returns a resason string for the rebind.
func shouldRebind(err error) (ok bool, reason string) {
return false, ""
}