"""Visual state, flash subscriptions, and styling roles for manager list rows."""
from __future__ import annotations
import logging
from typing import Any, Optional
from PyQt6.QtCore import QRect
from PyQt6.QtWidgets import QListWidgetItem, QWidget
from objectstate import ObjectStateRegistry
from pyqt_reactive.animation import FlashElement, WindowFlashOverlay
from pyqt_reactive.widgets.shared.list_item_delegate import (
DIRTY_FIELDS_ROLE,
FLASH_KEY_ROLE,
LAYOUT_ROLE,
SIG_DIFF_FIELDS_ROLE,
StyledText,
StyledTextLayout,
)
from pyqt_reactive.widgets.shared.scope_color_utils import get_scope_color_scheme
logger = logging.getLogger(__name__)
[docs]
class ManagerListVisualState:
"""Owns row visual roles, ObjectState subscriptions, and flash geometry."""
[docs]
def __init__(self, manager: Any, scope_border_role: int, item_access: Any) -> None:
self._manager = manager
self._scope_border_role = scope_border_role
self._item_access = item_access
self._flash_subscriptions: dict[str, tuple[Any, Any]] = {}
self._dirty_subscriptions: dict[str, tuple[Any, Any]] = {}
self._scope_to_list_item: dict[str, QListWidgetItem] = {}
@property
def has_navigation_items(self) -> bool:
return bool(self._scope_to_list_item)
[docs]
def subscribed_scope_ids(self) -> set[str]:
return set(self._flash_subscriptions.keys())
[docs]
def clear_scope_to_list_item(self) -> None:
self._scope_to_list_item.clear()
[docs]
def subscribe_flash(
self,
item: Any,
list_item: QListWidgetItem,
scope_id: Optional[str] = None,
) -> None:
if scope_id is None:
scope_id = self._item_access.scope_for_item(item)
logger.debug(
"FLASH_DEBUG subscribe_flash: item=%s, scope_id=%s",
type(item).__name__,
scope_id,
)
if not scope_id:
logger.debug("FLASH_DEBUG: No scope_id for item %s, returning", item)
return
self._scope_to_list_item[scope_id] = list_item
if scope_id in self._flash_subscriptions:
logger.debug("FLASH_DEBUG: Already subscribed to %s, skipping", scope_id)
return
element = FlashElement(
key=scope_id,
get_rect_in_window=lambda window: self._list_item_rect(scope_id, window),
needs_scroll_clipping=False,
source_id=f"list_item:{id(self._manager)}:{scope_id}",
skip_overlay_paint=True,
delegate_widget=self._manager.item_list,
get_model_index=lambda: self._model_index(scope_id),
)
overlay = WindowFlashOverlay.get_for_window(self._manager)
logger.debug(
"FLASH_DEBUG: get_for_window returned overlay=%s, window=%s",
overlay,
self._manager.window(),
)
if overlay:
overlay.register_element(element)
logger.debug(
"FLASH_DEBUG: Registered element for %s, overlay has %s keys",
scope_id,
len(overlay._elements),
)
else:
logger.debug(
"FLASH_DEBUG: No overlay for window, cannot register list item %s",
scope_id,
)
state = ObjectStateRegistry.get_by_scope(scope_id)
logger.debug("FLASH_DEBUG: ObjectStateRegistry.get_by_scope(%s) = %s", scope_id, state)
if not state:
logger.debug("FLASH_DEBUG: No ObjectState for scope %s, returning", scope_id)
return
def on_change(changed_paths):
logger.debug(
"FLASH_DEBUG on_change CALLBACK FIRED: scope=%s, paths=%s",
scope_id,
changed_paths,
)
self._manager.queue_flash(scope_id)
self._manager.queue_visual_update()
state.on_resolved_changed(on_change)
self._flash_subscriptions[scope_id] = (state, on_change)
logger.debug(
"FLASH_DEBUG: Subscribed to %s, total subscriptions=%s",
scope_id,
len(self._flash_subscriptions),
)
if scope_id not in self._dirty_subscriptions:
def on_state_changed():
logger.debug("DIRTY_DEBUG on_state_changed: scope=%s", scope_id)
self._manager.queue_visual_update()
state.on_state_changed(on_state_changed)
self._dirty_subscriptions[scope_id] = (state, on_state_changed)
logger.debug("DIRTY_DEBUG: Subscribed to dirty changes for %s", scope_id)
[docs]
def cleanup(self) -> None:
logger.debug(
"FLASH_DEBUG cleanup: manager=%s, clearing %s flash + %s dirty subscriptions",
type(self._manager).__name__,
len(self._flash_subscriptions),
len(self._dirty_subscriptions),
)
for scope_id, (state, on_change_callback) in list(self._flash_subscriptions.items()):
logger.debug("FLASH_DEBUG: Unsubscribing from %s", scope_id)
try:
state.off_resolved_changed(on_change_callback)
except Exception as error:
logger.debug("FLASH_DEBUG: Error unsubscribing from %s: %s", scope_id, error)
overlay = WindowFlashOverlay.get_for_window(self._manager)
if overlay:
logger.debug("FLASH_DEBUG: Unregistering FlashElement for %s", scope_id)
overlay.unregister_element(scope_id)
for scope_id, (state, on_dirty_callback) in list(self._dirty_subscriptions.items()):
try:
state.off_state_changed(on_dirty_callback)
except Exception as error:
logger.debug("DIRTY_DEBUG: Error unsubscribing dirty from %s: %s", scope_id, error)
self._flash_subscriptions.clear()
self._dirty_subscriptions.clear()
self._scope_to_list_item.clear()
logger.debug("FLASH_DEBUG: Subscriptions cleared")
[docs]
def dirty_fields(self, item: Any) -> set:
try:
scope_id = self._item_access.scope_for_item(item)
state = ObjectStateRegistry.get_by_scope(scope_id)
return state.dirty_fields if state else set()
except Exception:
return set()
[docs]
def signature_diff_fields(self, item: Any) -> set:
try:
scope_id = self._item_access.scope_for_item(item)
state = ObjectStateRegistry.get_by_scope(scope_id)
return state.signature_diff_fields if state else set()
except Exception:
return set()
[docs]
def set_item_styling_roles(
self,
list_item: QListWidgetItem,
display_text: Any,
item_obj: Any,
) -> None:
layout = None
if isinstance(display_text, StyledText):
layout = display_text.layout
elif isinstance(display_text, StyledTextLayout):
layout = display_text
if layout is not None:
list_item.setData(LAYOUT_ROLE, layout)
list_item.setData(DIRTY_FIELDS_ROLE, self.dirty_fields(item_obj))
list_item.setData(SIG_DIFF_FIELDS_ROLE, self.signature_diff_fields(item_obj))
else:
logger.error(
"Cannot set LAYOUT_ROLE: display_text=%s, is_StyledText=%s",
type(display_text),
isinstance(display_text, StyledText),
)
[docs]
def apply_scope_color(self, list_item: QListWidgetItem, item: Any, index: int) -> None:
scope_info = self._list_item_scope(item, index)
if not scope_info:
return
scope_id, item_type = scope_info
scheme = get_scope_color_scheme(scope_id, step_index=index)
bg_color = item_type.get_background_color(scheme)
if bg_color:
list_item.setBackground(bg_color)
list_item.setData(self._scope_border_role, scheme)
list_item.setData(FLASH_KEY_ROLE, scope_id)
def _list_item_scope(self, item: Any, index: int) -> Optional[tuple[str, Any]]:
return self._item_access.list_item_scope(item, index)
def _list_item_rect(self, scope_id: str, window: QWidget) -> Optional[QRect]:
if scope_id not in self._scope_to_list_item:
logger.debug(
"FLASH_DEBUG list_item_rect: scope_id %s not in scope map (has %s keys)",
scope_id,
len(self._scope_to_list_item),
)
return None
item = self._scope_to_list_item[scope_id]
if item is None:
logger.debug("FLASH_DEBUG list_item_rect: item is None for %s", scope_id)
return None
visual_rect = self._manager.item_list.visualItemRect(item)
if visual_rect.isEmpty():
logger.debug("FLASH_DEBUG list_item_rect: visual_rect is empty for %s", scope_id)
return None
viewport = self._manager.item_list.viewport()
if viewport is None:
logger.debug("FLASH_DEBUG list_item_rect: viewport is None for %s", scope_id)
return None
clipped_rect = visual_rect.intersected(viewport.rect())
if clipped_rect.isEmpty():
logger.debug("FLASH_DEBUG list_item_rect: clipped_rect is empty for %s", scope_id)
return None
global_pos = viewport.mapToGlobal(clipped_rect.topLeft())
local_pos = window.mapFromGlobal(global_pos)
result = QRect(local_pos, clipped_rect.size())
logger.debug("FLASH_DEBUG list_item_rect: SUCCESS for %s, rect=%s", scope_id, result)
return result
def _model_index(self, scope_id: str):
if scope_id not in self._scope_to_list_item:
return None
item = self._scope_to_list_item[scope_id]
if item is None:
return None
return self._manager.item_list.indexFromItem(item)