Timing Report Parserプロト

# Project layout (save files as shown)
#
# trparser/
#   __init__.py
#   models.py
#   utils.py
#   parser.py
#   cli.py
# tests/
#   test_smoke.py
#   fixtures/
#       sample_basic.rpt
#       sample_nets_cap_tran.rpt
# pyproject.toml
# README.md

# ============================
# trparser/__init__.py
# ============================
"""Timing Report Parser — modularized package (v0.4).
Implements items (1)〜(6) with PEP 8 cleanups and test skeleton.
"""
from .models import DetectedFeatures, TimingPath, PathPoint
from .parser import detect_features, parse_report_stream

__all__ = [
    "DetectedFeatures",
    "TimingPath",
    "PathPoint",
    "detect_features",
    "parse_report_stream",
]

# ============================
# trparser/models.py
# ============================
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Optional, List


@dataclass(slots=True)
class DetectedFeatures:
    has_nets: bool = False
    has_cap: bool = False
    has_tran: bool = False
    has_input_pins: bool = False
    delay_type: str = "max"  # or "min"
    edge: Optional[str] = None  # rise/fall
    unit: str = "ns"  # detected unit in report


@dataclass(slots=True)
class TimingPath:
    path_id: int
    file_name: str
    unit: str
    detected_options: List[str] = field(default_factory=list)
    startpoint: Optional[str] = None
    endpoint: Optional[str] = None
    arrival: Optional[float] = None
    required: Optional[float] = None
    slack: Optional[float] = None
    edge: Optional[str] = None
    path_type: Optional[str] = None
    data_stages: int = 0
    buf_stages: int = 0
    inv_stages: int = 0
    max_tran: Optional[float] = None
    max_cap: Optional[float] = None


@dataclass(slots=True)
class PathPoint:
    path_id: int
    row_index: int
    point_type: str  # "net" | "cell_pin" | other
    inst: str
    pin: str
    net: str
    incr_ns: Optional[float]
    path_ns: Optional[float]
    cap: Optional[float]
    tran: Optional[float]
    derate: Optional[float]
    edge: str
    unit: str
    raw: str


# ============================
# trparser/utils.py
# ============================
from __future__ import annotations
import csv
import logging
import re
from pathlib import Path
from typing import Dict, Optional

logger = logging.getLogger("trparser")


def to_float(s: Optional[str]) -> Optional[float]:
    if s is None:
        return None
    s2 = s.replace(",", "")
    try:
        return float(s2)
    except Exception:
        return None


def convert_to_ns(val: Optional[float], unit: str) -> Optional[float]:
    if val is None:
        return None
    if unit.lower() == "ps":
        return val / 1000.0
    return val


def load_cell_map(path: Optional[str]) -> Dict[str, str]:
    """Load cell mapping CSV: pattern,type (BUF|INV|OTHER).
    pattern: substring (case-insensitive) or regex with prefix 're:'.
    """
    mapping: Dict[str, str] = {}
    if not path:
        return mapping
    p = Path(path)
    if not p.exists():
        logger.warning("cell map not found: %s", path)
        return mapping
    with p.open(encoding="utf-8") as f:
        reader = csv.DictReader(f)
        for row in reader:
            pat = (row.get("pattern") or "").strip()
            typ = (row.get("type") or "").strip().upper()
            if not pat or not typ:
                continue
            mapping[pat] = typ
    return mapping


def _match_kind(cell: str, cmap: Dict[str, str], want: str) -> bool:
    cell_u = cell.upper()
    for pat, typ in cmap.items():
        if typ != want:
            continue
        if pat.startswith("re:"):
            if re.search(pat[3:], cell_u):
                return True
        elif pat.upper() in cell_u:
            return True
    return False


def is_buf(cell: str, cmap: Dict[str, str]) -> bool:
    return _match_kind(cell, cmap, "BUF")


def is_inv(cell: str, cmap: Dict[str, str]) -> bool:
    return _match_kind(cell, cmap, "INV")


# ============================
# trparser/parser.py
# ============================
from __future__ import annotations
import logging
import re
from pathlib import Path
from typing import Iterator, Optional, Tuple

from .models import DetectedFeatures, PathPoint, TimingPath
from .utils import convert_to_ns, is_buf, is_inv, to_float

logger = logging.getLogger("trparser")

# ------------- precompiled regex (PEP8-friendly with VERBOSE) -------------
RE_FLOAT = r"[-+]?\d{1,3}(?:,\d{3})*(?:\.\d+)?(?:[eE][-+]?\d+)?|[-+]?\.\d+(?:[eE][-+]?\d+)?"

RE_START = re.compile(r"^\s*Startpoint:\s*(.+)$", re.I)
RE_END = re.compile(r"^\s*Endpoint:\s*(.+)$", re.I)
RE_ARR = re.compile(rf"^\s*data\s+arrival\s+time\s*[:=]?\s*(?P<val>{RE_FLOAT})", re.I)
RE_REQ = re.compile(rf"^\s*data\s+required\s+time\s*[:=]?\s*(?P<val>{RE_FLOAT})", re.I)
RE_SLACK = re.compile(rf"^\s*slack\b[^\d-+]*?(?P<val>{RE_FLOAT})", re.I)
RE_UNIT_PS = re.compile(r"\bps\b", re.I)
RE_UNIT_NS = re.compile(r"\bns\b", re.I)
RE_HEADER = re.compile(r"^\s*Point\s+Incr\s+Path", re.I)
RE_RULE = re.compile(r"^\s*-{5,}")
RE_INPUTPIN = re.compile(r"^\s*Input\s*pin\b", re.I)
RE_EDGE = re.compile(r"\b(rise|fall)\b", re.I)
RE_MINMAX = re.compile(r"\b(min|max)\b", re.I)

RE_NET = re.compile(
    rf"""
    ^\s*
    net\s*(?P<net>\S+|\(.+?\))    # net name or (N123)
    \s+(?P<incr>{RE_FLOAT})         # incr
    \s+(?P<path>{RE_FLOAT})         # path
    (?: \s+(?P<cap>{RE_FLOAT}) )?   # optional cap
    (?: \s+(?P<tran>{RE_FLOAT}) )?  # optional tran
    """,
    re.I | re.VERBOSE,
)

RE_PIN = re.compile(
    rf"""
    ^\s*
    (?P<inst>[A-Za-z0-9_./$]+)   # instance
    /(?P<pin>[A-Za-z0-9_./$]+)   # pin
    \s+(?P<incr>{RE_FLOAT})     # incr
    \s+(?P<path>{RE_FLOAT})     # path
    (?: \s+(?P<cap>{RE_FLOAT}) )?  # optional cap
    (?: \s+(?P<tran>{RE_FLOAT}) )? # optional tran
    """,
    re.I | re.VERBOSE,
)

# type alias to shorten signatures
ParseEvent = Tuple[Optional[TimingPath], Optional[PathPoint]]


def detect_features(lines: list[str]) -> DetectedFeatures:
    """Scan first ~80 lines and infer available columns/options/units."""
    ft = DetectedFeatures()
    head = lines[:80]

    for ln in head:
        if RE_UNIT_PS.search(ln):
            ft.unit = "ps"
            break
        if RE_UNIT_NS.search(ln):
            ft.unit = "ns"
            break

    seen_header = False
    for ln in head:
        if RE_HEADER.search(ln):
            seen_header = True
            continue
        if seen_header and RE_RULE.match(ln):
            continue
        if seen_header and ln.strip():
            if RE_NET.match(ln):
                ft.has_nets = True
            mp = RE_PIN.match(ln)
            if mp:
                if mp.group("cap"):
                    ft.has_cap = True
                if mp.group("tran"):
                    ft.has_tran = True
        if RE_INPUTPIN.search(ln):
            ft.has_input_pins = True
        em = RE_EDGE.search(ln)
        if em:
            ft.edge = em.group(1).lower()
        mm = RE_MINMAX.search(ln)
        if mm:
            ft.delay_type = mm.group(1).lower()
    return ft


def parse_report_stream(
    path: str,
    cell_map: Optional[dict[str, str]] = None,
    strict: bool = False,
) -> Iterator[ParseEvent]:
    """Yield (TimingPath|None, PathPoint|None) while parsing a report."""
    file_name = Path(path).name
    try:
        with open(path, encoding="utf-8") as f:
            lines = f.readlines()
    except OSError as e:
        logger.error("failed to read report: %s", e)
        return

    ft = detect_features(lines)
    unit = ft.unit

    path_id = 0
    cur: Optional[TimingPath] = None
    in_table = False
    row_idx = 0

    def update_metrics_from_point(pp: PathPoint) -> None:
        nonlocal cur
        if not cur:
            return
        if pp.point_type == "cell_pin":
            cur.data_stages += 1
            if cell_map and is_buf(pp.inst, cell_map):
                cur.buf_stages += 1
            if cell_map and is_inv(pp.inst, cell_map):
                cur.inv_stages += 1
        if pp.tran is not None:
            cur.max_tran = pp.tran if cur.max_tran is None else max(cur.max_tran, pp.tran)
        if pp.cap is not None:
            cur.max_cap = pp.cap if cur.max_cap is None else max(cur.max_cap, pp.cap)

    idx = 0
    n = len(lines)
    while idx < n:
        ln = lines[idx].rstrip("\n")
        idx += 1

        m = RE_START.match(ln)
        if m:
            if cur:
                yield (cur, None)
            path_id += 1
            cur = TimingPath(
                path_id=path_id,
                file_name=file_name,
                unit="ns",
            )
            cur.path_type = ft.delay_type
            cur.edge = ft.edge
            cur.startpoint = m.group(1).strip()
            in_table = False
            row_idx = 0
            continue

        if cur is None:
            continue

        m = RE_END.match(ln)
        if m:
            cur.endpoint = m.group(1).strip()
            continue

        m = RE_ARR.match(ln)
        if m:
            cur.arrival = convert_to_ns(to_float(m.group("val")), unit)
            continue

        m = RE_REQ.match(ln)
        if m:
            cur.required = convert_to_ns(to_float(m.group("val")), unit)
            continue

        m = RE_SLACK.match(ln)
        if m:
            cur.slack = convert_to_ns(to_float(m.group("val")), unit)
            continue

        if RE_HEADER.search(ln):
            in_table = True
            if idx < n and re.match(r"^\s*-{5,}", lines[idx]):
                idx += 1
            row_idx = 0
            continue

        if in_table:
            if not ln.strip():
                in_table = False
                continue
            mnet = RE_NET.match(ln) if ft.has_nets else None
            if mnet:
                row_idx += 1
                incr = convert_to_ns(to_float(mnet.group("incr")), unit)
                pathv = convert_to_ns(to_float(mnet.group("path")), unit)
                cap = convert_to_ns(to_float(mnet.group("cap")), unit)
                tran = convert_to_ns(to_float(mnet.group("tran")), unit)
                pp = PathPoint(
                    path_id=cur.path_id,
                    row_index=row_idx,
                    point_type="net",
                    inst="",
                    pin="",
                    net=mnet.group("net").strip("() "),
                    incr_ns=incr,
                    path_ns=pathv,
                    cap=cap,
                    tran=tran,
                    derate=None,
                    edge=ft.edge or "",
                    unit="ns",
                    raw=ln,
                )
                update_metrics_from_point(pp)
                yield (None, pp)
                continue
            mpin = RE_PIN.match(ln)
            if mpin:
                row_idx += 1
                incr = convert_to_ns(to_float(mpin.group("incr")), unit)
                pathv = convert_to_ns(to_float(mpin.group("path")), unit)
                cap = convert_to_ns(to_float(mpin.group("cap")), unit)
                tran = convert_to_ns(to_float(mpin.group("tran")), unit)
                pp = PathPoint(
                    path_id=cur.path_id,
                    row_index=row_idx,
                    point_type="cell_pin",
                    inst=mpin.group("inst"),
                    pin=mpin.group("pin"),
                    net="",
                    incr_ns=incr,
                    path_ns=pathv,
                    cap=cap,
                    tran=tran,
                    derate=None,
                    edge=ft.edge or "",
                    unit="ns",
                    raw=ln,
                )
                update_metrics_from_point(pp)
                yield (None, pp)
                continue
            if strict:
                logger.debug("unrecognized table line: %s", ln)
            continue

    if cur:
        opts = []
        if ft.has_nets:
            opts.append("nets")
        if ft.has_cap:
            opts.append("capacitance")
        if ft.has_tran:
            opts.append("transition")
        if ft.has_input_pins:
            opts.append("input_pins")
        cur.detected_options = opts
        yield (cur, None)


# ============================
# trparser/cli.py
# ============================
from __future__ import annotations
import argparse
import csv
import json
import logging
from pathlib import Path
from typing import Optional

try:
    import pandas as pd  # type: ignore
except Exception:  # pragma: no cover
    pd = None

from .models import PathPoint, TimingPath
from .parser import parse_report_stream
from .utils import load_cell_map

LOGGER = logging.getLogger("trparser")

POINT_FIELDS = [
    "path_id",
    "row_index",
    "point_type",
    "inst",
    "pin",
    "net",
    "incr_ns",
    "path_ns",
    "cap",
    "tran",
    "derate",
    "edge",
    "unit",
    "raw",
]

PATH_FIELDS = [
    "path_id",
    "file_name",
    "startpoint",
    "endpoint",
    "arrival",
    "required",
    "slack",
    "edge",
    "path_type",
    "data_stages",
    "buf_stages",
    "inv_stages",
    "max_tran",
    "max_cap",
    "detected_options",
]


def setup_logging(level: str = "WARNING") -> None:
    lvl = getattr(logging, level.upper(), logging.WARNING)
    logging.basicConfig(level=lvl, format="%(asctime)s %(levelname)s %(name)s: %(message)s")


def write_outputs(in_path: str, outdir: str, cell_map_csv: Optional[str], strict: bool = False) -> tuple[str, str, str]:
    Path(outdir).mkdir(parents=True, exist_ok=True)
    base = Path(in_path).stem
    points_csv = str(Path(outdir) / f"{base}.points.csv")
    paths_csv = str(Path(outdir) / f"{base}.paths.csv")
    summary_json = str(Path(outdir) / f"{base}.summary.json")

    cmap = load_cell_map(cell_map_csv)

    with open(points_csv, "w", newline="", encoding="utf-8") as fpts, open(
        paths_csv, "w", newline="", encoding="utf-8"
    ) as fpth:
        wpts = csv.DictWriter(fpts, fieldnames=POINT_FIELDS)
        wpts.writeheader()
        wpth = csv.DictWriter(fpth, fieldnames=PATH_FIELDS)
        wpth.writeheader()

        buffer_path: Optional[TimingPath] = None
        for tp, pp in parse_report_stream(in_path, cmap, strict=strict):
            if tp is not None:
                if buffer_path and buffer_path.path_id != tp.path_id:
                    wpth.writerow(
                        {
                            "path_id": buffer_path.path_id,
                            "file_name": buffer_path.file_name,
                            "startpoint": buffer_path.startpoint,
                            "endpoint": buffer_path.endpoint,
                            "arrival": buffer_path.arrival,
                            "required": buffer_path.required,
                            "slack": buffer_path.slack,
                            "edge": buffer_path.edge,
                            "path_type": buffer_path.path_type,
                            "data_stages": buffer_path.data_stages,
                            "buf_stages": buffer_path.buf_stages,
                            "inv_stages": buffer_path.inv_stages,
                            "max_tran": buffer_path.max_tran,
                            "max_cap": buffer_path.max_cap,
                            "detected_options": ",".join(buffer_path.detected_options),
                        }
                    )
                buffer_path = tp
            if pp is not None:
                wpts.writerow(
                    {
                        "path_id": pp.path_id,
                        "row_index": pp.row_index,
                        "point_type": pp.point_type,
                        "inst": pp.inst,
                        "pin": pp.pin,
                        "net": pp.net,
                        "incr_ns": pp.incr_ns,
                        "path_ns": pp.path_ns,
                        "cap": pp.cap,
                        "tran": pp.tran,
                        "derate": pp.derate,
                        "edge": pp.edge,
                        "unit": pp.unit,
                        "raw": pp.raw,
                    }
                )
        if buffer_path:
            wpth.writerow(
                {
                    "path_id": buffer_path.path_id,
                    "file_name": buffer_path.file_name,
                    "startpoint": buffer_path.startpoint,
                    "endpoint": buffer_path.endpoint,
                    "arrival": buffer_path.arrival,
                    "required": buffer_path.required,
                    "slack": buffer_path.slack,
                    "edge": buffer_path.edge,
                    "path_type": buffer_path.path_type,
                    "data_stages": buffer_path.data_stages,
                    "buf_stages": buffer_path.buf_stages,
                    "inv_stages": buffer_path.inv_stages,
                    "max_tran": buffer_path.max_tran,
                    "max_cap": buffer_path.max_cap,
                    "detected_options": ",".join(buffer_path.detected_options),
                }
            )

    summary: dict = {"input": in_path, "paths_csv": paths_csv, "points_csv": points_csv}
    try:
        if pd is not None:
            df = pd.read_csv(paths_csv)
            worst = (
                df.dropna(subset=["slack"]).nsmallest(10, "slack")[
                    [
                        "path_id",
                        "startpoint",
                        "endpoint",
                        "slack",
                        "data_stages",
                        "buf_stages",
                        "inv_stages",
                        "max_tran",
                        "max_cap",
                    ]
                ]
            )
            summary["worst10"] = worst.to_dict(orient="records")
        else:
            summary["note"] = "Install pandas to enrich summary"
    except Exception as e:
        LOGGER.warning("summary build failed: %s", e)

    with open(summary_json, "w", encoding="utf-8") as f:
        import json as _json

        _json.dump(summary, f, ensure_ascii=False, indent=2)

    return points_csv, paths_csv, summary_json


def build_parser() -> argparse.ArgumentParser:
    p = argparse.ArgumentParser(prog="tr", description="Timing Report Parser (modular v0.4)")
    sub = p.add_subparsers(dest="cmd")

    def add_common(ap: argparse.ArgumentParser) -> None:
        ap.add_argument("input", help="input timing report (.rpt)")
        ap.add_argument("-o", "--outdir", default="out", help="output directory")
        ap.add_argument("--cell-map", help="CSV mapping: pattern,type (BUF/INV)")
        ap.add_argument("--strict", action="store_true", help="strict mode: log unknown lines")
        ap.add_argument("--lenient", dest="strict", action="store_false", help="lenient mode (default)")
        ap.add_argument(
            "--log-level",
            default="WARNING",
            choices=["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"],
            help="logging level",
        )

    ap_parse = sub.add_parser("parse", help="parse report to CSVs")
    add_common(ap_parse)

    ap_stats = sub.add_parser("stats", help="quick stats from paths.csv (pandas required)")
    ap_stats.add_argument("paths_csv", help="paths.csv produced by parse")
    ap_stats.add_argument("--top", type=int, default=10, help="show worst-N slack")

    ap_graph = sub.add_parser("graph", help="plot slack histogram (pandas+matplotlib)")
    ap_graph.add_argument("paths_csv", help="paths.csv produced by parse")
    ap_graph.add_argument("--outfile", default="out/slack_hist.png", help="output image file")

    return p


def cmd_parse(args: argparse.Namespace) -> None:
    setup_logging(args.log_level)
    pts, pth, summ = write_outputs(args.input, args.outdir, args.cell_map, strict=args.strict)
    print("points:", pts)
    print("paths:", pth)
    print("summary:", summ)


def cmd_stats(args: argparse.Namespace) -> None:
    setup_logging("WARNING")
    if pd is None:
        print("Install pandas to use stats command")
        return
    df = pd.read_csv(args.paths_csv)
    print("Total paths:", len(df))
    print("Slack stats (ns):")
    print(df["slack"].describe())
    worst = df.dropna(subset=["slack"]).nsmallest(args.top, "slack")[
        [
            "path_id",
            "startpoint",
            "endpoint",
            "slack",
            "data_stages",
            "buf_stages",
            "inv_stages",
            "max_tran",
            "max_cap",
        ]
    ]
    print("\nWorst", args.top, ":")
    print(worst.to_string(index=False))


def cmd_graph(args: argparse.Namespace) -> None:
    setup_logging("WARNING")
    try:
        import matplotlib.pyplot as plt  # type: ignore
    except Exception:
        print("Install matplotlib to use graph command")
        return
    if pd is None:
        print("Install pandas to use graph command")
        return
    df = pd.read_csv(args.paths_csv)
    s = df["slack"].dropna()
    import matplotlib.pyplot as plt  # type: ignore

    plt.figure()
    plt.hist(s, bins=40)
    plt.xlabel("Slack (ns)")
    plt.ylabel("Count")
    plt.title("Slack distribution")
    plt.tight_layout()
    out = args.outfile
    Path(out).parent.mkdir(parents=True, exist_ok=True)
    plt.savefig(out)
    print("saved:", out)


def main() -> None:
    parser = build_parser()
    args = parser.parse_args()
    if args.cmd == "parse":
        cmd_parse(args)
    elif args.cmd == "stats":
        cmd_stats(args)
    elif args.cmd == "graph":
        cmd_graph(args)
    else:
        parser.print_help()


if __name__ == "__main__":
    main()

# ============================
# tests/test_smoke.py
# ============================
import os
from pathlib import Path
from trparser.cli import write_outputs


def test_parse_basic(tmp_path: Path):
    rpt = Path(__file__).parent / "fixtures" / "sample_basic.rpt"
    outdir = tmp_path / "out"
    pts, pth, summ = write_outputs(str(rpt), str(outdir), cell_map_csv=None, strict=False)
    assert Path(pts).exists()
    assert Path(pth).exists()
    assert Path(summ).exists()


# ============================
# tests/fixtures/sample_basic.rpt
# ============================
# Minimal synthetic sample (nosplit assumed)
# Start of report
Startpoint: U1/Q
Endpoint: U5/D
Path Group: default
Point     Incr       Path
--------------------------------
U1/Q      0.050      0.050
U2/Z      0.030      0.080
U5/D                 0.080

  data arrival time        0.080
  data required time       0.120
  slack (MET)              0.040

# ============================
# tests/fixtures/sample_nets_cap_tran.rpt
# ============================
Startpoint: U3/Q
Endpoint: U9/D
Path Group: default
Point     Incr       Path
--------------------------------
U3/Q      0.020      0.020     0.02   0.10
net (N1)  0.010      0.030     0.03   0.12
U4/Z      0.040      0.070     0.04   0.15
U9/D                 0.070

  data arrival time        0.070
  data required time       0.100
  slack (VIOLATED)        -0.030

# ============================
# pyproject.toml
# ============================
[project]
name = "trparser"
version = "0.4.0"
description = "Timing Report Parser (modularized)"
requires-python = ">=3.9"

[project.optional-dependencies]
full = ["pandas", "matplotlib"]

[tool.pytest.ini_options]
addopts = "-q"

[tool.black]
line-length = 88

[tool.ruff]
line-length = 88
select = ["E","F","I","W"]

# ============================
# README.md (excerpt)
# ============================
# trparser (v0.4)

```bash
# install (editable)
pip install -e .

# parse\python -m trparser.cli parse tests/fixtures/sample_basic.rpt -o out

# stats / graph (optional deps)
python -m trparser.cli stats out/sample_basic.paths.csv
python -m trparser.cli graph out/sample_basic.paths.csv --outfile out/hist.png
# 1) 仮想環境は任意
python -m venv .venv && source .venv/bin/activate  # Windowsは .venv\Scripts\activate

# 2) 依存
pip install -e .           # パッケージとして開発インストール
pip install -U pytest      # テスト用
pip install 'pandas; python_version>=\"3.9\"' 'matplotlib; python_version>=\"3.9\"'  # 任意
# 解析(CSV出力)
python -m trparser.cli parse tests/fixtures/sample_basic.rpt -o out

# 統計(pandas任意)
python -m trparser.cli stats out/sample_basic.paths.csv

# グラフ(pandas+matplotlib任意)
python -m trparser.cli graph out/sample_basic.paths.csv --outfile out/slack_hist.png
タイトルとURLをコピーしました