Step 2 - LDF Loading: - ldfparser integration (Python) / custom regex parser (C++) - QTreeWidget with expandable signal rows, merged Value column - Hex/Dec toggle, FreeFormat schedule entries, auto-reload - Baud rate auto-detection from LDF Step 3 - Signal Editing: - Bit packing/unpacking (signal value ↔ frame bytes) - ReadOnlyColumnDelegate for per-column editability - Value clamping to signal width, recursion guard Step 4 - Rx Panel: - receive_rx_frame() API with timestamp, signal unpacking - Change highlighting (yellow), auto-scroll toggle, clear button - Dashboard view (in-place update per frame_id) Step 5 - Connection Panel: - ConnectionManager with state machine (Disconnected/Connecting/Connected/Error) - Port scanning (pyserial / QSerialPort), connect/disconnect with UI mapping Step 6 - BabyLIN Backend: - BabyLinBackend wrapping Lipowsky BabyLIN_library.py DLL - Mock mode for macOS/CI, device scan, SDF loading, signal access - Frame callbacks, raw command access Step 7 - Master Scheduler: - QTimer-based schedule execution with start/stop/pause - Frame sent callback with visual highlighting - Mock Rx simulation, manual send, global rate override Tests: Python 171 | C++ 124 (Steps 1-5 parity, Steps 6-7 Python-first) Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
258 lines
7.6 KiB
Python
258 lines
7.6 KiB
Python
"""
|
||
test_scheduler.py — Tests for Step 7: Master scheduler.
|
||
|
||
Tests schedule execution, start/stop/pause, frame callbacks,
|
||
mock Rx simulation, and GUI integration.
|
||
"""
|
||
|
||
import sys
|
||
import pytest
|
||
from pathlib import Path
|
||
|
||
sys.path.insert(0, str(Path(__file__).parent.parent / "src"))
|
||
|
||
from PyQt6.QtWidgets import QApplication
|
||
from PyQt6.QtCore import Qt, QTimer
|
||
from PyQt6.QtTest import QTest
|
||
|
||
from scheduler import Scheduler
|
||
from ldf_handler import parse_ldf, ScheduleEntryInfo
|
||
|
||
SAMPLE_LDF = str(Path(__file__).parent.parent.parent / "resources" / "sample.ldf")
|
||
|
||
|
||
@pytest.fixture(scope="session")
|
||
def app():
|
||
application = QApplication.instance() or QApplication(sys.argv)
|
||
yield application
|
||
|
||
|
||
@pytest.fixture
|
||
def ldf_data():
|
||
return parse_ldf(SAMPLE_LDF)
|
||
|
||
|
||
@pytest.fixture
|
||
def scheduler(app, ldf_data):
|
||
s = Scheduler()
|
||
normal = next(st for st in ldf_data.schedule_tables if st.name == "NormalSchedule")
|
||
s.set_schedule(normal.entries, ldf_data)
|
||
return s
|
||
|
||
|
||
@pytest.fixture
|
||
def window(app):
|
||
from main_window import MainWindow
|
||
w = MainWindow()
|
||
w._load_ldf_file(SAMPLE_LDF)
|
||
return w
|
||
|
||
|
||
class TestSchedulerInit:
|
||
"""Test initial scheduler state."""
|
||
|
||
def test_not_running_initially(self, scheduler):
|
||
assert not scheduler.is_running
|
||
|
||
def test_not_paused_initially(self, scheduler):
|
||
assert not scheduler.is_paused
|
||
|
||
|
||
class TestSchedulerControl:
|
||
"""Test start/stop/pause."""
|
||
|
||
def test_start(self, scheduler):
|
||
scheduler.start()
|
||
assert scheduler.is_running
|
||
assert not scheduler.is_paused
|
||
scheduler.stop()
|
||
|
||
def test_stop(self, scheduler):
|
||
scheduler.start()
|
||
scheduler.stop()
|
||
assert not scheduler.is_running
|
||
|
||
def test_pause(self, scheduler):
|
||
scheduler.start()
|
||
scheduler.pause()
|
||
assert scheduler.is_running
|
||
assert scheduler.is_paused
|
||
scheduler.stop()
|
||
|
||
def test_resume(self, scheduler):
|
||
scheduler.start()
|
||
scheduler.pause()
|
||
scheduler.resume()
|
||
assert scheduler.is_running
|
||
assert not scheduler.is_paused
|
||
scheduler.stop()
|
||
|
||
def test_stop_resets_to_beginning(self, scheduler):
|
||
scheduler.start()
|
||
# Wait a bit for some frames to process
|
||
QTest.qWait(50)
|
||
scheduler.stop()
|
||
assert scheduler._current_index == 0
|
||
|
||
|
||
class TestFrameCallback:
|
||
"""Test that the scheduler notifies about sent frames."""
|
||
|
||
def test_frame_sent_callback_called(self, scheduler):
|
||
sent_frames = []
|
||
scheduler.set_frame_sent_callback(
|
||
lambda name, fid, is_tx: sent_frames.append((name, fid, is_tx))
|
||
)
|
||
scheduler.start()
|
||
# Wait for at least one full cycle (4 entries × ~10ms each)
|
||
QTest.qWait(100)
|
||
scheduler.stop()
|
||
|
||
assert len(sent_frames) > 0
|
||
|
||
def test_tx_frames_reported(self, scheduler):
|
||
sent_frames = []
|
||
scheduler.set_frame_sent_callback(
|
||
lambda name, fid, is_tx: sent_frames.append((name, fid, is_tx))
|
||
)
|
||
scheduler.start()
|
||
QTest.qWait(100)
|
||
scheduler.stop()
|
||
|
||
# Should include Tx frames (is_tx=True)
|
||
tx_names = [name for name, fid, is_tx in sent_frames if is_tx]
|
||
assert any("Motor_Command" in n or "Door_Command" in n for n in tx_names)
|
||
|
||
def test_rx_frames_reported(self, scheduler):
|
||
sent_frames = []
|
||
scheduler.set_frame_sent_callback(
|
||
lambda name, fid, is_tx: sent_frames.append((name, fid, is_tx))
|
||
)
|
||
scheduler.start()
|
||
QTest.qWait(100)
|
||
scheduler.stop()
|
||
|
||
# Should include Rx frames (is_tx=False)
|
||
rx_names = [name for name, fid, is_tx in sent_frames if not is_tx]
|
||
assert any("Motor_Status" in n or "Door_Status" in n for n in rx_names)
|
||
|
||
|
||
class TestMockRx:
|
||
"""Test mock Rx data generation."""
|
||
|
||
def test_rx_callback_receives_data(self, scheduler):
|
||
rx_frames = []
|
||
scheduler.set_rx_data_callback(
|
||
lambda fid, data: rx_frames.append((fid, data))
|
||
)
|
||
scheduler.start()
|
||
QTest.qWait(100)
|
||
scheduler.stop()
|
||
|
||
assert len(rx_frames) > 0
|
||
|
||
def test_rx_data_has_correct_length(self, scheduler, ldf_data):
|
||
rx_frames = []
|
||
scheduler.set_rx_data_callback(
|
||
lambda fid, data: rx_frames.append((fid, data))
|
||
)
|
||
scheduler.start()
|
||
QTest.qWait(100)
|
||
scheduler.stop()
|
||
|
||
# All Rx frames in sample.ldf have length 2
|
||
for fid, data in rx_frames:
|
||
assert len(data) == 2
|
||
|
||
def test_rx_data_changes_over_time(self, scheduler):
|
||
rx_frames = []
|
||
scheduler.set_rx_data_callback(
|
||
lambda fid, data: rx_frames.append((fid, list(data)))
|
||
)
|
||
scheduler.start()
|
||
QTest.qWait(200)
|
||
scheduler.stop()
|
||
|
||
# Find frames with same ID and check data differs
|
||
if len(rx_frames) >= 2:
|
||
first = rx_frames[0]
|
||
# Find next frame with same ID
|
||
for fid, data in rx_frames[1:]:
|
||
if fid == first[0]:
|
||
assert data != first[1], "Mock Rx data should change over time"
|
||
break
|
||
|
||
|
||
class TestGlobalRate:
|
||
"""Test global rate setting."""
|
||
|
||
def test_global_rate_affects_timing(self, scheduler):
|
||
"""Faster rate should produce more frames in the same time window."""
|
||
# Fast rate
|
||
scheduler.set_global_rate(5)
|
||
fast_frames = []
|
||
scheduler.set_frame_sent_callback(
|
||
lambda name, fid, is_tx: fast_frames.append(name)
|
||
)
|
||
scheduler.start()
|
||
QTest.qWait(100)
|
||
scheduler.stop()
|
||
|
||
# Slow rate
|
||
slow_frames = []
|
||
scheduler.set_global_rate(50)
|
||
scheduler.set_frame_sent_callback(
|
||
lambda name, fid, is_tx: slow_frames.append(name)
|
||
)
|
||
scheduler.start()
|
||
QTest.qWait(100)
|
||
scheduler.stop()
|
||
|
||
# Fast should have more frames (timing isn't exact, so allow margin)
|
||
assert len(fast_frames) >= len(slow_frames)
|
||
|
||
|
||
class TestGuiIntegration:
|
||
"""Test scheduler integration with MainWindow."""
|
||
|
||
def test_start_button_enables_stop(self, window):
|
||
# Select the first schedule table before starting
|
||
window.combo_schedule.setCurrentIndex(0)
|
||
window._on_start_scheduler()
|
||
assert not window.btn_start.isEnabled()
|
||
assert window.btn_stop.isEnabled()
|
||
assert window.btn_pause.isEnabled()
|
||
window._on_stop_scheduler()
|
||
|
||
def test_stop_button_enables_start(self, window):
|
||
window.combo_schedule.setCurrentIndex(0)
|
||
window._on_start_scheduler()
|
||
window._on_stop_scheduler()
|
||
assert window.btn_start.isEnabled()
|
||
assert not window.btn_stop.isEnabled()
|
||
assert not window.btn_pause.isEnabled()
|
||
|
||
def test_pause_toggles_text(self, window):
|
||
window.combo_schedule.setCurrentIndex(0)
|
||
window._on_start_scheduler()
|
||
window._on_pause_scheduler()
|
||
assert "Resume" in window.btn_pause.text()
|
||
window._on_pause_scheduler() # Resume
|
||
assert "Pause" in window.btn_pause.text()
|
||
window._on_stop_scheduler()
|
||
|
||
def test_rx_frames_appear_during_run(self, window):
|
||
window.combo_schedule.setCurrentIndex(0)
|
||
window._on_start_scheduler()
|
||
QTest.qWait(150)
|
||
window._on_stop_scheduler()
|
||
|
||
# Check that at least one Rx frame got a timestamp
|
||
has_timestamp = False
|
||
for i in range(window.rx_table.topLevelItemCount()):
|
||
ts = window.rx_table.topLevelItem(i).text(0)
|
||
if ts != "—":
|
||
has_timestamp = True
|
||
break
|
||
assert has_timestamp, "Rx frames should receive mock data during scheduler run"
|