1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
|
# SPDX-License-Identifier: GPL-2.0
import re
import time
import json
from lib.py import ksft_pr, cmd, ip, rand_port, wait_port_listen
class Iperf3Runner:
"""
Sets up and runs iperf3 traffic.
"""
def __init__(self, env, port=None, server_ip=None, client_ip=None):
env.require_cmd("iperf3", local=True, remote=True)
self.env = env
self.port = rand_port() if port is None else port
self.server_ip = server_ip
self.client_ip = client_ip
def _build_server(self):
cmdline = f"iperf3 -s -1 -p {self.port}"
if self.server_ip:
cmdline += f" -B {self.server_ip}"
return cmdline
def _build_client(self, streams, duration, reverse):
host = self.env.addr if self.server_ip is None else self.server_ip
cmdline = f"iperf3 -c {host} -p {self.port} -P {streams} -t {duration} -J"
if self.client_ip:
cmdline += f" -B {self.client_ip}"
if reverse:
cmdline += " --reverse"
return cmdline
def start_server(self):
"""
Starts an iperf3 server with optional bind IP.
"""
cmdline = self._build_server()
proc = cmd(cmdline, background=True)
wait_port_listen(self.port)
time.sleep(0.1)
return proc
def start_client(self, background=False, streams=1, duration=10, reverse=False):
"""
Starts the iperf3 client with the configured options.
"""
cmdline = self._build_client(streams, duration, reverse)
return cmd(cmdline, background=background, host=self.env.remote)
def measure_bandwidth(self, reverse=False):
"""
Runs an iperf3 measurement and returns the average bandwidth (Gbps).
Discards the first and last few reporting intervals and uses only the
middle part of the run where throughput is typically stable.
"""
self.start_server()
result = self.start_client(duration=10, reverse=reverse)
if result.ret != 0:
raise RuntimeError("iperf3 failed to run successfully")
try:
out = json.loads(result.stdout)
except json.JSONDecodeError as exc:
raise ValueError("Failed to parse iperf3 JSON output") from exc
intervals = out.get("intervals", [])
samples = [i["sum"]["bits_per_second"] / 1e9 for i in intervals]
if len(samples) < 10:
raise ValueError(f"iperf3 returned too few intervals: {len(samples)}")
# Discard potentially unstable first and last 3 seconds.
stable = samples[3:-3]
avg = sum(stable) / len(stable)
return avg
class GenerateTraffic:
def __init__(self, env, port=None):
self.env = env
self.runner = Iperf3Runner(env, port)
self._iperf_server = self.runner.start_server()
self._iperf_client = self.runner.start_client(background=True, streams=16, duration=86400)
# Wait for traffic to ramp up
if not self._wait_pkts(pps=1000):
self.stop(verbose=True)
raise Exception("iperf3 traffic did not ramp up")
def _wait_pkts(self, pkt_cnt=None, pps=None):
"""
Wait until we've seen pkt_cnt or until traffic ramps up to pps.
Only one of pkt_cnt or pss can be specified.
"""
pkt_start = ip("-s link show dev " + self.env.ifname, json=True)[0]["stats64"]["rx"]["packets"]
for _ in range(50):
time.sleep(0.1)
pkt_now = ip("-s link show dev " + self.env.ifname, json=True)[0]["stats64"]["rx"]["packets"]
if pps:
if pkt_now - pkt_start > pps / 10:
return True
pkt_start = pkt_now
elif pkt_cnt:
if pkt_now - pkt_start > pkt_cnt:
return True
return False
def wait_pkts_and_stop(self, pkt_cnt):
failed = not self._wait_pkts(pkt_cnt=pkt_cnt)
self.stop(verbose=failed)
def stop(self, verbose=None):
self._iperf_client.process(terminate=True)
if verbose:
ksft_pr(">> Client:")
ksft_pr(self._iperf_client.stdout)
ksft_pr(self._iperf_client.stderr)
self._iperf_server.process(terminate=True)
if verbose:
ksft_pr(">> Server:")
ksft_pr(self._iperf_server.stdout)
ksft_pr(self._iperf_server.stderr)
self._wait_client_stopped()
def _wait_client_stopped(self, sleep=0.005, timeout=5):
end = time.monotonic() + timeout
live_port_pattern = re.compile(fr":{self.runner.port:04X} 0[^6] ")
while time.monotonic() < end:
data = cmd("cat /proc/net/tcp*", host=self.env.remote).stdout
if not live_port_pattern.search(data):
return
time.sleep(sleep)
raise Exception(f"Waiting for client to stop timed out after {timeout}s")
|