diff options
Diffstat (limited to 'tools/perf/python')
| -rwxr-xr-x | tools/perf/python/ilist.py | 495 | 
1 files changed, 495 insertions, 0 deletions
| diff --git a/tools/perf/python/ilist.py b/tools/perf/python/ilist.py new file mode 100755 index 000000000000..9d6465c60df3 --- /dev/null +++ b/tools/perf/python/ilist.py @@ -0,0 +1,495 @@ +#!/usr/bin/env python3 +# SPDX-License-Identifier: (LGPL-2.1 OR BSD-2-Clause) +"""Interactive perf list.""" + +from abc import ABC, abstractmethod +import argparse +from dataclasses import dataclass +import math +from typing import Any, Dict, Optional, Tuple +import perf +from textual import on +from textual.app import App, ComposeResult +from textual.binding import Binding +from textual.containers import Horizontal, HorizontalGroup, Vertical, VerticalScroll +from textual.css.query import NoMatches +from textual.command import SearchIcon +from textual.screen import ModalScreen +from textual.widgets import Button, Footer, Header, Input, Label, Sparkline, Static, Tree +from textual.widgets.tree import TreeNode + + +def get_info(info: Dict[str, str], key: str): +    return (info[key] + "\n") if key in info else "" + + +class TreeValue(ABC): +    """Abstraction for the data of value in the tree.""" + +    @abstractmethod +    def name(self) -> str: +        pass + +    @abstractmethod +    def description(self) -> str: +        pass + +    @abstractmethod +    def matches(self, query: str) -> bool: +        pass + +    @abstractmethod +    def parse(self) -> perf.evlist: +        pass + +    @abstractmethod +    def value(self, evlist: perf.evlist, evsel: perf.evsel, cpu: int, thread: int) -> float: +        pass + + +@dataclass +class Metric(TreeValue): +    """A metric in the tree.""" +    metric_name: str + +    def name(self) -> str: +        return self.metric_name + +    def description(self) -> str: +        """Find and format metric description.""" +        for metric in perf.metrics(): +            if metric["MetricName"] != self.metric_name: +                continue +            desc = get_info(metric, "BriefDescription") +            desc += get_info(metric, "PublicDescription") +            desc += get_info(metric, "MetricExpr") +            desc += get_info(metric, "MetricThreshold") +            return desc +        return "description" + +    def matches(self, query: str) -> bool: +        return query in self.metric_name + +    def parse(self) -> perf.evlist: +        return perf.parse_metrics(self.metric_name) + +    def value(self, evlist: perf.evlist, evsel: perf.evsel, cpu: int, thread: int) -> float: +        val = evlist.compute_metric(self.metric_name, cpu, thread) +        return 0 if math.isnan(val) else val + + +@dataclass +class PmuEvent(TreeValue): +    """A PMU and event within the tree.""" +    pmu: str +    event: str + +    def name(self) -> str: +        if self.event.startswith(self.pmu) or ':' in self.event: +            return self.event +        else: +            return f"{self.pmu}/{self.event}/" + +    def description(self) -> str: +        """Find and format event description for {pmu}/{event}/.""" +        for p in perf.pmus(): +            if p.name() != self.pmu: +                continue +            for info in p.events(): +                if "name" not in info or info["name"] != self.event: +                    continue + +                desc = get_info(info, "topic") +                desc += get_info(info, "event_type_desc") +                desc += get_info(info, "desc") +                desc += get_info(info, "long_desc") +                desc += get_info(info, "encoding_desc") +                return desc +        return "description" + +    def matches(self, query: str) -> bool: +        return query in self.pmu or query in self.event + +    def parse(self) -> perf.evlist: +        return perf.parse_events(self.name()) + +    def value(self, evlist: perf.evlist, evsel: perf.evsel, cpu: int, thread: int) -> float: +        return evsel.read(cpu, thread).val + + +class ErrorScreen(ModalScreen[bool]): +    """Pop up dialog for errors.""" + +    CSS = """ +    ErrorScreen { +        align: center middle; +    } +    """ + +    def __init__(self, error: str): +        self.error = error +        super().__init__() + +    def compose(self) -> ComposeResult: +        yield Button(f"Error: {self.error}", variant="primary", id="error") + +    def on_button_pressed(self, event: Button.Pressed) -> None: +        self.dismiss(True) + + +class SearchScreen(ModalScreen[str]): +    """Pop up dialog for search.""" + +    CSS = """ +    SearchScreen Horizontal { +        align: center middle; +        margin-top: 1; +    } +    SearchScreen Input { +        width: 1fr; +    } +    """ + +    def compose(self) -> ComposeResult: +        yield Horizontal(SearchIcon(), Input(placeholder="Event name")) + +    def on_input_submitted(self, event: Input.Submitted) -> None: +        """Handle the user pressing Enter in the input field.""" +        self.dismiss(event.value) + + +class Counter(HorizontalGroup): +    """Two labels for a CPU and its counter value.""" + +    CSS = """ +    Label { +        gutter: 1; +    } +    """ + +    def __init__(self, cpu: int) -> None: +        self.cpu = cpu +        super().__init__() + +    def compose(self) -> ComposeResult: +        label = f"cpu{self.cpu}" if self.cpu >= 0 else "total" +        yield Label(label + " ") +        yield Label("0", id=f"counter_{label}") + + +class CounterSparkline(HorizontalGroup): +    """A Sparkline for a performance counter.""" + +    def __init__(self, cpu: int) -> None: +        self.cpu = cpu +        super().__init__() + +    def compose(self) -> ComposeResult: +        label = f"cpu{self.cpu}" if self.cpu >= 0 else "total" +        yield Label(label) +        yield Sparkline([], summary_function=max, id=f"sparkline_{label}") + + +class IListApp(App): +    TITLE = "Interactive Perf List" + +    BINDINGS = [ +        Binding(key="s", action="search", description="Search", +                tooltip="Search events and PMUs"), +        Binding(key="n", action="next", description="Next", +                tooltip="Next search result or item"), +        Binding(key="p", action="prev", description="Previous", +                tooltip="Previous search result or item"), +        Binding(key="c", action="collapse", description="Collapse", +                tooltip="Collapse the current PMU"), +        Binding(key="^q", action="quit", description="Quit", +                tooltip="Quit the app"), +    ] + +    CSS = """ +        /* Make the 'total' sparkline a different color. */ +        #sparkline_total > .sparkline--min-color { +            color: $accent; +        } +        #sparkline_total > .sparkline--max-color { +            color: $accent 30%; +        } +        /* +         * Make the active_search initially not displayed with the text in +         * the middle of the line. +         */ +        #active_search { +            display: none; +            width: 100%; +            text-align: center; +        } +    """ + +    def __init__(self, interval: float) -> None: +        self.interval = interval +        self.evlist = None +        self.selected: Optional[TreeValue] = None +        self.search_results: list[TreeNode[TreeValue]] = [] +        self.cur_search_result: TreeNode[TreeValue] | None = None +        super().__init__() + +    def expand_and_select(self, node: TreeNode[Any]) -> None: +        """Expand select a node in the tree.""" +        if node.parent: +            node.parent.expand() +            if node.parent.parent: +                node.parent.parent.expand() +        node.expand() +        node.tree.select_node(node) +        node.tree.scroll_to_node(node) + +    def set_searched_tree_node(self, previous: bool) -> None: +        """Set the cur_search_result node to either the next or previous.""" +        l = len(self.search_results) + +        if l < 1: +            tree: Tree[TreeValue] = self.query_one("#root", Tree) +            if previous: +                tree.action_cursor_up() +            else: +                tree.action_cursor_down() +            return + +        if self.cur_search_result: +            idx = self.search_results.index(self.cur_search_result) +            if previous: +                idx = idx - 1 if idx > 0 else l - 1 +            else: +                idx = idx + 1 if idx < l - 1 else 0 +        else: +            idx = l - 1 if previous else 0 + +        node = self.search_results[idx] +        if node == self.cur_search_result: +            return + +        self.cur_search_result = node +        self.expand_and_select(node) + +    def action_search(self) -> None: +        """Search was chosen.""" +        def set_initial_focus(event: str | None) -> None: +            """Sets the focus after the SearchScreen is dismissed.""" + +            search_label = self.query_one("#active_search", Label) +            search_label.display = True if event else False +            if not event: +                return +            event = event.lower() +            search_label.update(f'Searching for events matching "{event}"') + +            tree: Tree[str] = self.query_one("#root", Tree) + +            def find_search_results(event: str, node: TreeNode[str], +                                    cursor_seen: bool = False, +                                    match_after_cursor: Optional[TreeNode[str]] = None +                                    ) -> Tuple[bool, Optional[TreeNode[str]]]: +                """Find nodes that match the search remembering the one after the cursor.""" +                if not cursor_seen and node == tree.cursor_node: +                    cursor_seen = True +                if node.data and node.data.matches(event): +                    if cursor_seen and not match_after_cursor: +                        match_after_cursor = node +                    self.search_results.append(node) + +                if node.children: +                    for child in node.children: +                        (cursor_seen, match_after_cursor) = \ +                            find_search_results(event, child, cursor_seen, match_after_cursor) +                return (cursor_seen, match_after_cursor) + +            self.search_results.clear() +            (_, self.cur_search_result) = find_search_results(event, tree.root) +            if len(self.search_results) < 1: +                self.push_screen(ErrorScreen(f"Failed to find pmu/event or metric {event}")) +                search_label.display = False +            elif self.cur_search_result: +                self.expand_and_select(self.cur_search_result) +            else: +                self.set_searched_tree_node(previous=False) + +        self.push_screen(SearchScreen(), set_initial_focus) + +    def action_next(self) -> None: +        """Next was chosen.""" +        self.set_searched_tree_node(previous=False) + +    def action_prev(self) -> None: +        """Previous was chosen.""" +        self.set_searched_tree_node(previous=True) + +    def action_collapse(self) -> None: +        """Collapse the part of the tree currently on.""" +        tree: Tree[str] = self.query_one("#root", Tree) +        node = tree.cursor_node +        if node and node.parent: +            node.parent.collapse_all() +            node.tree.scroll_to_node(node.parent) + +    def update_counts(self) -> None: +        """Called every interval to update counts.""" +        if not self.selected or not self.evlist: +            return + +        def update_count(cpu: int, count: int): +            # Update the raw count display. +            counter: Label = self.query(f"#counter_cpu{cpu}" if cpu >= 0 else "#counter_total") +            if not counter: +                return +            counter = counter.first(Label) +            counter.update(str(count)) + +            # Update the sparkline. +            line: Sparkline = self.query(f"#sparkline_cpu{cpu}" if cpu >= 0 else "#sparkline_total") +            if not line: +                return +            line = line.first(Sparkline) +            # If there are more events than the width, remove the front event. +            if len(line.data) > line.size.width: +                line.data.pop(0) +            line.data.append(count) +            line.mutate_reactive(Sparkline.data) + +        # Update the total and each CPU counts, assume there's just 1 evsel. +        total = 0 +        self.evlist.disable() +        for evsel in self.evlist: +            for cpu in evsel.cpus(): +                aggr = 0 +                for thread in evsel.threads(): +                    aggr += self.selected.value(self.evlist, evsel, cpu, thread) +                update_count(cpu, aggr) +                total += aggr +        update_count(-1, total) +        self.evlist.enable() + +    def on_mount(self) -> None: +        """When App starts set up periodic event updating.""" +        self.update_counts() +        self.set_interval(self.interval, self.update_counts) + +    def set_selected(self, value: TreeValue) -> None: +        """Updates the event/description and starts the counters.""" +        try: +            label_name = self.query_one("#event_name", Label) +            event_description = self.query_one("#event_description", Static) +            lines = self.query_one("#lines") +        except NoMatches: +            # A race with rendering, ignore the update as we can't +            # mount the assumed output widgets. +            return + +        self.selected = value + +        # Remove previous event information. +        if self.evlist: +            self.evlist.disable() +            self.evlist.close() +            old_lines = self.query(CounterSparkline) +            for line in old_lines: +                line.remove() +            old_counters = self.query(Counter) +            for counter in old_counters: +                counter.remove() + +        # Update event/metric text and description. +        label_name.update(value.name()) +        event_description.update(value.description()) + +        # Open the event. +        try: +            self.evlist = value.parse() +            if self.evlist: +                self.evlist.open() +                self.evlist.enable() +        except: +            self.evlist = None + +        if not self.evlist: +            self.push_screen(ErrorScreen(f"Failed to open {value.name()}")) +            return + +        # Add spark lines for all the CPUs. Note, must be done after +        # open so that the evlist CPUs have been computed by propagate +        # maps. +        line = CounterSparkline(cpu=-1) +        lines.mount(line) +        for cpu in self.evlist.all_cpus(): +            line = CounterSparkline(cpu) +            lines.mount(line) +        line = Counter(cpu=-1) +        lines.mount(line) +        for cpu in self.evlist.all_cpus(): +            line = Counter(cpu) +            lines.mount(line) + +    def compose(self) -> ComposeResult: +        """Draws the app.""" +        def metric_event_tree() -> Tree: +            """Create tree of PMUs and metricgroups with events or metrics under.""" +            tree: Tree[TreeValue] = Tree("Root", id="root") +            pmus = tree.root.add("PMUs") +            for pmu in perf.pmus(): +                pmu_name = pmu.name().lower() +                pmu_node = pmus.add(pmu_name) +                try: +                    for event in sorted(pmu.events(), key=lambda x: x["name"]): +                        if "name" in event: +                            e = event["name"].lower() +                            if "alias" in event: +                                pmu_node.add_leaf(f'{e} ({event["alias"]})', +                                                  data=PmuEvent(pmu_name, e)) +                            else: +                                pmu_node.add_leaf(e, data=PmuEvent(pmu_name, e)) +                except: +                    # Reading events may fail with EPERM, ignore. +                    pass +            metrics = tree.root.add("Metrics") +            groups = set() +            for metric in perf.metrics(): +                groups.update(metric["MetricGroup"]) + +            def add_metrics_to_tree(node: TreeNode[TreeValue], parent: str): +                for metric in sorted(perf.metrics(), key=lambda x: x["MetricName"]): +                    if parent in metric["MetricGroup"]: +                        name = metric["MetricName"] +                        node.add_leaf(name, data=Metric(name)) +                        child_group_name = f'{name}_group' +                        if child_group_name in groups: +                            add_metrics_to_tree(node.add(child_group_name), child_group_name) + +            for group in sorted(groups): +                if group.endswith('_group'): +                    continue +                add_metrics_to_tree(metrics.add(group), group) + +            tree.root.expand() +            return tree + +        yield Header(id="header") +        yield Horizontal(Vertical(metric_event_tree(), id="events"), +                         Vertical(Label("event name", id="event_name"), +                                  Static("description", markup=False, id="event_description"), +                                  )) +        yield Label(id="active_search") +        yield VerticalScroll(id="lines") +        yield Footer(id="footer") + +    @on(Tree.NodeSelected) +    def on_tree_node_selected(self, event: Tree.NodeSelected[TreeValue]) -> None: +        """Called when a tree node is selected, selecting the event.""" +        if event.node.data: +            self.set_selected(event.node.data) + + +if __name__ == "__main__": +    ap = argparse.ArgumentParser() +    ap.add_argument('-I', '--interval', help="Counter update interval in seconds", default=0.1) +    args = ap.parse_args() +    app = IListApp(float(args.interval)) +    app.run() | 
