mirror of
git://git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git
synced 2025-09-04 20:19:47 +08:00

Find out and record in env the name of the interface which remote host will use for the IP address provided via config. Interface name is useful for mausezahn and for setting up tunnels. Reviewed-by: Willem de Bruijn <willemb@google.com> Reviewed-by: Petr Machata <petrm@nvidia.com> Link: https://patch.msgid.link/20250218225426.77726-2-kuba@kernel.org Signed-off-by: Jakub Kicinski <kuba@kernel.org>
283 lines
9.5 KiB
Python
283 lines
9.5 KiB
Python
# SPDX-License-Identifier: GPL-2.0
|
|
|
|
import os
|
|
import time
|
|
from pathlib import Path
|
|
from lib.py import KsftSkipEx, KsftXfailEx
|
|
from lib.py import ksft_setup
|
|
from lib.py import cmd, ethtool, ip, CmdExitFailure
|
|
from lib.py import NetNS, NetdevSimDev
|
|
from .remote import Remote
|
|
|
|
|
|
class NetDrvEnvBase:
|
|
"""
|
|
Base class for a NIC / host envirnoments
|
|
"""
|
|
def __init__(self, src_path):
|
|
self.src_path = src_path
|
|
self.env = self._load_env_file()
|
|
|
|
def rpath(self, path):
|
|
"""
|
|
Get an absolute path to a file based on a path relative to the directory
|
|
containing the test which constructed env.
|
|
|
|
For example, if the test.py is in the same directory as
|
|
a binary (built from helper.c), the test can use env.rpath("helper")
|
|
to get the absolute path to the binary
|
|
"""
|
|
src_dir = Path(self.src_path).parent.resolve()
|
|
return (src_dir / path).as_posix()
|
|
|
|
def _load_env_file(self):
|
|
env = os.environ.copy()
|
|
|
|
src_dir = Path(self.src_path).parent.resolve()
|
|
if not (src_dir / "net.config").exists():
|
|
return ksft_setup(env)
|
|
|
|
with open((src_dir / "net.config").as_posix(), 'r') as fp:
|
|
for line in fp.readlines():
|
|
full_file = line
|
|
# Strip comments
|
|
pos = line.find("#")
|
|
if pos >= 0:
|
|
line = line[:pos]
|
|
line = line.strip()
|
|
if not line:
|
|
continue
|
|
pair = line.split('=', maxsplit=1)
|
|
if len(pair) != 2:
|
|
raise Exception("Can't parse configuration line:", full_file)
|
|
env[pair[0]] = pair[1]
|
|
return ksft_setup(env)
|
|
|
|
|
|
class NetDrvEnv(NetDrvEnvBase):
|
|
"""
|
|
Class for a single NIC / host env, with no remote end
|
|
"""
|
|
def __init__(self, src_path, **kwargs):
|
|
super().__init__(src_path)
|
|
|
|
self._ns = None
|
|
|
|
if 'NETIF' in self.env:
|
|
self.dev = ip("link show dev " + self.env['NETIF'], json=True)[0]
|
|
else:
|
|
self._ns = NetdevSimDev(**kwargs)
|
|
self.dev = self._ns.nsims[0].dev
|
|
self.ifname = self.dev['ifname']
|
|
self.ifindex = self.dev['ifindex']
|
|
|
|
def __enter__(self):
|
|
ip(f"link set dev {self.dev['ifname']} up")
|
|
|
|
return self
|
|
|
|
def __exit__(self, ex_type, ex_value, ex_tb):
|
|
"""
|
|
__exit__ gets called at the end of a "with" block.
|
|
"""
|
|
self.__del__()
|
|
|
|
def __del__(self):
|
|
if self._ns:
|
|
self._ns.remove()
|
|
self._ns = None
|
|
|
|
|
|
class NetDrvEpEnv(NetDrvEnvBase):
|
|
"""
|
|
Class for an environment with a local device and "remote endpoint"
|
|
which can be used to send traffic in.
|
|
|
|
For local testing it creates two network namespaces and a pair
|
|
of netdevsim devices.
|
|
"""
|
|
|
|
# Network prefixes used for local tests
|
|
nsim_v4_pfx = "192.0.2."
|
|
nsim_v6_pfx = "2001:db8::"
|
|
|
|
def __init__(self, src_path, nsim_test=None):
|
|
super().__init__(src_path)
|
|
|
|
self._stats_settle_time = None
|
|
|
|
# Things we try to destroy
|
|
self.remote = None
|
|
# These are for local testing state
|
|
self._netns = None
|
|
self._ns = None
|
|
self._ns_peer = None
|
|
|
|
if "NETIF" in self.env:
|
|
if nsim_test is True:
|
|
raise KsftXfailEx("Test only works on netdevsim")
|
|
self._check_env()
|
|
|
|
self.dev = ip("link show dev " + self.env['NETIF'], json=True)[0]
|
|
|
|
self.v4 = self.env.get("LOCAL_V4")
|
|
self.v6 = self.env.get("LOCAL_V6")
|
|
self.remote_v4 = self.env.get("REMOTE_V4")
|
|
self.remote_v6 = self.env.get("REMOTE_V6")
|
|
kind = self.env["REMOTE_TYPE"]
|
|
args = self.env["REMOTE_ARGS"]
|
|
else:
|
|
if nsim_test is False:
|
|
raise KsftXfailEx("Test does not work on netdevsim")
|
|
|
|
self.create_local()
|
|
|
|
self.dev = self._ns.nsims[0].dev
|
|
|
|
self.v4 = self.nsim_v4_pfx + "1"
|
|
self.v6 = self.nsim_v6_pfx + "1"
|
|
self.remote_v4 = self.nsim_v4_pfx + "2"
|
|
self.remote_v6 = self.nsim_v6_pfx + "2"
|
|
kind = "netns"
|
|
args = self._netns.name
|
|
|
|
self.remote = Remote(kind, args, src_path)
|
|
|
|
self.addr = self.v6 if self.v6 else self.v4
|
|
self.remote_addr = self.remote_v6 if self.remote_v6 else self.remote_v4
|
|
|
|
self.addr_ipver = "6" if self.v6 else "4"
|
|
# Bracketed addresses, some commands need IPv6 to be inside []
|
|
self.baddr = f"[{self.v6}]" if self.v6 else self.v4
|
|
self.remote_baddr = f"[{self.remote_v6}]" if self.remote_v6 else self.remote_v4
|
|
|
|
self.ifname = self.dev['ifname']
|
|
self.ifindex = self.dev['ifindex']
|
|
|
|
# resolve remote interface name
|
|
self.remote_ifname = self.resolve_remote_ifc()
|
|
|
|
self._required_cmd = {}
|
|
|
|
def create_local(self):
|
|
self._netns = NetNS()
|
|
self._ns = NetdevSimDev()
|
|
self._ns_peer = NetdevSimDev(ns=self._netns)
|
|
|
|
with open("/proc/self/ns/net") as nsfd0, \
|
|
open("/var/run/netns/" + self._netns.name) as nsfd1:
|
|
ifi0 = self._ns.nsims[0].ifindex
|
|
ifi1 = self._ns_peer.nsims[0].ifindex
|
|
NetdevSimDev.ctrl_write('link_device',
|
|
f'{nsfd0.fileno()}:{ifi0} {nsfd1.fileno()}:{ifi1}')
|
|
|
|
ip(f" addr add dev {self._ns.nsims[0].ifname} {self.nsim_v4_pfx}1/24")
|
|
ip(f"-6 addr add dev {self._ns.nsims[0].ifname} {self.nsim_v6_pfx}1/64 nodad")
|
|
ip(f" link set dev {self._ns.nsims[0].ifname} up")
|
|
|
|
ip(f" addr add dev {self._ns_peer.nsims[0].ifname} {self.nsim_v4_pfx}2/24", ns=self._netns)
|
|
ip(f"-6 addr add dev {self._ns_peer.nsims[0].ifname} {self.nsim_v6_pfx}2/64 nodad", ns=self._netns)
|
|
ip(f" link set dev {self._ns_peer.nsims[0].ifname} up", ns=self._netns)
|
|
|
|
def _check_env(self):
|
|
vars_needed = [
|
|
["LOCAL_V4", "LOCAL_V6"],
|
|
["REMOTE_V4", "REMOTE_V6"],
|
|
["REMOTE_TYPE"],
|
|
["REMOTE_ARGS"]
|
|
]
|
|
missing = []
|
|
|
|
for choice in vars_needed:
|
|
for entry in choice:
|
|
if entry in self.env:
|
|
break
|
|
else:
|
|
missing.append(choice)
|
|
# Make sure v4 / v6 configs are symmetric
|
|
if ("LOCAL_V6" in self.env) != ("REMOTE_V6" in self.env):
|
|
missing.append(["LOCAL_V6", "REMOTE_V6"])
|
|
if ("LOCAL_V4" in self.env) != ("REMOTE_V4" in self.env):
|
|
missing.append(["LOCAL_V4", "REMOTE_V4"])
|
|
if missing:
|
|
raise Exception("Invalid environment, missing configuration:", missing,
|
|
"Please see tools/testing/selftests/drivers/net/README.rst")
|
|
|
|
def resolve_remote_ifc(self):
|
|
v4 = v6 = None
|
|
if self.remote_v4:
|
|
v4 = ip("addr show to " + self.remote_v4, json=True, host=self.remote)
|
|
if self.remote_v6:
|
|
v6 = ip("addr show to " + self.remote_v6, json=True, host=self.remote)
|
|
if v4 and v6 and v4[0]["ifname"] != v6[0]["ifname"]:
|
|
raise Exception("Can't resolve remote interface name, v4 and v6 don't match")
|
|
if (v4 and len(v4) > 1) or (v6 and len(v6) > 1):
|
|
raise Exception("Can't resolve remote interface name, multiple interfaces match")
|
|
return v6[0]["ifname"] if v6 else v4[0]["ifname"]
|
|
|
|
def __enter__(self):
|
|
return self
|
|
|
|
def __exit__(self, ex_type, ex_value, ex_tb):
|
|
"""
|
|
__exit__ gets called at the end of a "with" block.
|
|
"""
|
|
self.__del__()
|
|
|
|
def __del__(self):
|
|
if self._ns:
|
|
self._ns.remove()
|
|
self._ns = None
|
|
if self._ns_peer:
|
|
self._ns_peer.remove()
|
|
self._ns_peer = None
|
|
if self._netns:
|
|
del self._netns
|
|
self._netns = None
|
|
if self.remote:
|
|
del self.remote
|
|
self.remote = None
|
|
|
|
def require_v4(self):
|
|
if not self.v4 or not self.remote_v4:
|
|
raise KsftSkipEx("Test requires IPv4 connectivity")
|
|
|
|
def require_v6(self):
|
|
if not self.v6 or not self.remote_v6:
|
|
raise KsftSkipEx("Test requires IPv6 connectivity")
|
|
|
|
def _require_cmd(self, comm, key, host=None):
|
|
cached = self._required_cmd.get(comm, {})
|
|
if cached.get(key) is None:
|
|
cached[key] = cmd("command -v -- " + comm, fail=False,
|
|
shell=True, host=host).ret == 0
|
|
self._required_cmd[comm] = cached
|
|
return cached[key]
|
|
|
|
def require_cmd(self, comm, local=True, remote=False):
|
|
if local:
|
|
if not self._require_cmd(comm, "local"):
|
|
raise KsftSkipEx("Test requires command: " + comm)
|
|
if remote:
|
|
if not self._require_cmd(comm, "remote"):
|
|
raise KsftSkipEx("Test requires (remote) command: " + comm)
|
|
|
|
def wait_hw_stats_settle(self):
|
|
"""
|
|
Wait for HW stats to become consistent, some devices DMA HW stats
|
|
periodically so events won't be reflected until next sync.
|
|
Good drivers will tell us via ethtool what their sync period is.
|
|
"""
|
|
if self._stats_settle_time is None:
|
|
data = {}
|
|
try:
|
|
data = ethtool("-c " + self.ifname, json=True)[0]
|
|
except CmdExitFailure as e:
|
|
if "Operation not supported" not in e.cmd.stderr:
|
|
raise
|
|
|
|
self._stats_settle_time = 0.025 + \
|
|
data.get('stats-block-usecs', 0) / 1000 / 1000
|
|
|
|
time.sleep(self._stats_settle_time)
|