2
0
mirror of git://git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git synced 2025-09-04 20:19:47 +08:00
linux/tools/testing/selftests/drivers/net/hw/devmem.py
Mina Almasry 2f1a805f32 selftests: ncdevmem: Implement devmem TCP TX
Add support for devmem TX in ncdevmem.

This is a combination of the ncdevmem from the devmem TCP series RFCv1
which included the TX path, and work by Stan to include the netlink API
and refactored on top of his generic memory_provider support.

Signed-off-by: Mina Almasry <almasrymina@google.com>
Signed-off-by: Stanislav Fomichev <sdf@fomichev.me>
Acked-by: Stanislav Fomichev <sdf@fomichev.me>
Link: https://patch.msgid.link/20250508004830.4100853-10-almasrymina@google.com
Signed-off-by: Paolo Abeni <pabeni@redhat.com>
2025-05-13 11:12:49 +02:00

64 lines
1.8 KiB
Python
Executable File

#!/usr/bin/env python3
# SPDX-License-Identifier: GPL-2.0
from os import path
from lib.py import ksft_run, ksft_exit
from lib.py import ksft_eq, KsftSkipEx
from lib.py import NetDrvEpEnv
from lib.py import bkg, cmd, rand_port, wait_port_listen
from lib.py import ksft_disruptive
def require_devmem(cfg):
if not hasattr(cfg, "_devmem_probed"):
probe_command = f"{cfg.bin_local} -f {cfg.ifname}"
cfg._devmem_supported = cmd(probe_command, fail=False, shell=True).ret == 0
cfg._devmem_probed = True
if not cfg._devmem_supported:
raise KsftSkipEx("Test requires devmem support")
@ksft_disruptive
def check_rx(cfg) -> None:
cfg.require_ipver("6")
require_devmem(cfg)
port = rand_port()
listen_cmd = f"{cfg.bin_local} -l -f {cfg.ifname} -s {cfg.addr_v['6']} -p {port}"
with bkg(listen_cmd) as socat:
wait_port_listen(port)
cmd(f"echo -e \"hello\\nworld\"| socat -u - TCP6:[{cfg.addr_v['6']}]:{port}", host=cfg.remote, shell=True)
ksft_eq(socat.stdout.strip(), "hello\nworld")
@ksft_disruptive
def check_tx(cfg) -> None:
cfg.require_ipver("6")
require_devmem(cfg)
port = rand_port()
listen_cmd = f"socat -U - TCP6-LISTEN:{port}"
with bkg(listen_cmd, exit_wait=True) as socat:
wait_port_listen(port)
cmd(f"echo -e \"hello\\nworld\"| {cfg.bin_remote} -f {cfg.ifname} -s {cfg.addr_v['6']} -p {port}", host=cfg.remote, shell=True)
ksft_eq(socat.stdout.strip(), "hello\nworld")
def main() -> None:
with NetDrvEpEnv(__file__) as cfg:
cfg.bin_local = path.abspath(path.dirname(__file__) + "/ncdevmem")
cfg.bin_remote = cfg.remote.deploy(cfg.bin_local)
ksft_run([check_rx, check_tx],
args=(cfg, ))
ksft_exit()
if __name__ == "__main__":
main()