Source code for klotho.chronos.temporal_units.temporal

# ------------------------------------------------------------------------
# Klotho/klotho/chronos/temporal_units/ut.py
# ------------------------------------------------------------------------
"""
Temporal units.

A temporal unit binds a rhythm tree to a tempo and beat reference, producing
concrete onset times and durations in seconds. Temporal units can be
collected into sequences and blocks for polyphonic or multi-layered timing
structures.
"""
from dataclasses import dataclass
from fractions import Fraction
from typing import Any, Callable, Iterable, Iterator, Optional, Union
from ..rhythm_trees import Meas, RhythmTree
from ..rhythm_trees.algorithms import auto_subdiv
from klotho.chronos.utils import calc_onsets, beat_duration, seconds_to_hmsms

from enum import Enum
import pandas as pd
import copy

class ProlatioTypes(Enum):
    """
    Enum of prolatio (subdivision) types for a temporal unit.

    The four types describe how a time signature is subdivided:

    - **DURATION** -- a single sustained note spanning the entire measure.
    - **REST** -- a single rest spanning the entire measure.
    - **PULSE** -- evenly spaced pulses matching the numerator.
    - **SUBDIVISION** -- a custom subdivision tuple.

    Each type also carries a set of string aliases for convenient parsing.
    """
    DURATION    = 'Duration'
    REST        = 'Rest'
    PULSE       = 'Pulse'
    SUBDIVISION = 'Subdivision'
    DURTYPES    = {'d', 'duration', 'dur'}
    RESTYPES    = {'r', 'rest', 'silence'}
    PULSTYPES   = {'p', 'pulse', 'phase'}
    SUBTYPES    = {'s', 'subdivision', 'subdivisions'}


class TemporalMeta(type):
    """Metaclass for all temporal structures."""
    pass


class UTNodeHandle:
    __slots__ = ("_owner", "_node_id")

    def __init__(self, owner: Any, node_id: int):
        self._owner = owner
        self._node_id = node_id

    def __repr__(self) -> str:
        return f"{type(self).__name__}(id={self._node_id})"

    def __hash__(self) -> int:
        return hash((id(self._owner), self._node_id))

    def __eq__(self, other):
        if isinstance(other, UTNodeHandle):
            return self._owner is other._owner and self._node_id == other._node_id
        if isinstance(other, int):
            return self._node_id == other
        return NotImplemented

    @property
    def id(self) -> int:
        return self._node_id

    @property
    def node_id(self) -> int:
        return self._node_id

    def _rt_node(self):
        return self._owner._rt[self._node_id]

    @property
    def depth(self) -> int:
        return self._owner._rt.depth_of(self._node_id)

    @property
    def sibling_index(self) -> int:
        parent = self._owner._rt.parent(self._node_id)
        siblings = list(self._owner._rt.successors(parent)) if parent is not None else [self._node_id]
        return siblings.index(self._node_id)

    @property
    def sibling_total(self) -> int:
        parent = self._owner._rt.parent(self._node_id)
        siblings = list(self._owner._rt.successors(parent)) if parent is not None else [self._node_id]
        return len(siblings)

    @property
    def parent(self) -> Optional["UTNodeHandle"]:
        parent_id = self._owner._rt.parent(self._node_id)
        if parent_id is None:
            return None
        return self._owner._build_node_handle(parent_id)

    @property
    def proportion(self):
        return self._rt_node().get("proportion")

    @property
    def metric_onset(self):
        return self._rt_node().get("metric_onset")

    @property
    def metric_duration(self):
        return self._rt_node().get("metric_duration")

    @property
    def real_onset(self):
        self._owner._ensure_timing_cache()
        return self._owner._real_times[self._node_id]["real_onset"]

    @property
    def real_duration(self):
        self._owner._ensure_timing_cache()
        return self._owner._real_times[self._node_id]["real_duration"]

    @property
    def leaves(self) -> "UTNodeSelector":
        rt = self._owner._rt
        if self._node_id in rt.leaf_nodes:
            ids = (self._node_id,)
        else:
            ids = tuple(rt.subtree_leaves(self._node_id))
        return self._owner._node_selector_class(self._owner, ids)

    @property
    def children(self) -> "UTNodeSelector":
        return self._owner._node_selector_class(
            self._owner, tuple(self._owner._rt.successors(self._node_id))
        )

    @property
    def first_leaf(self):
        return self.leaves.first

    @property
    def last_leaf(self):
        return self.leaves.last

    @property
    def first_child(self):
        return self.children.first

    @property
    def last_child(self):
        return self.children.last

    def make_rest(self):
        return self._owner.make_rest(self._node_id)

    def subdivide(self, S):
        self._owner.subdivide(self._node_id, S)
        return self._owner

    def sparsify(self, probability):
        return self._owner.sparsify(probability, node=self._node_id)

    def __getitem__(self, key):
        if key in ("real_onset", "real_duration"):
            return getattr(self, key)
        return self._rt_node()[key]

    def get(self, key, default=None):
        if key in ("real_onset", "real_duration"):
            return getattr(self, key)
        return self._rt_node().get(key, default)

    def __contains__(self, key):
        if key in ("real_onset", "real_duration"):
            return True
        return key in self._rt_node()


@dataclass(frozen=True)
class NodeContext:
    ref: UTNodeHandle
    index: int
    total: int

    @property
    def id(self) -> int:
        return self.ref.id

    @property
    def parent(self) -> Optional[UTNodeHandle]:
        return self.ref.parent

    def __getattr__(self, key):
        return getattr(self.ref, key)


class UTNodeSelector:
    """An ordered, owner-bound collection of node IDs with fluent selection ops.

    Carries UT-level structural verbs (``make_rest``, ``subdivide``,
    ``sparsify``). Subclasses add domain-specific action verbs; in particular,
    :class:`~klotho.thetos.composition.compositional.UCNodeSelector` adds
    parameter/envelope/slur verbs.

    The selector preserves ownership identity: operations that return another
    selector always point at the same owner (UT/UC) so that subsequent verb
    calls dispatch to the owning object's mutators.

    Indexing, slicing, fancy-indexing, ``filter``, ``where``, and set-algebra
    operations all return a new selector of the same concrete subclass.

    Equality is strict: two selectors compare equal iff they share the same
    owner (``is``) and hold the same ids in the same order; a selector also
    compares equal to a ``tuple`` or ``list`` with matching ids (enabling
    tuple-based test assertions). Any other type returns ``NotImplemented`` -
    in particular, ``selector == int`` is always False via Python's fallback,
    surfacing mistakes loudly rather than silently.
    """

    __slots__ = ('_owner', '_ids')

    def __init__(self, owner: Any, ids: Iterable[int]):
        object.__setattr__(self, '_owner', owner)
        object.__setattr__(self, '_ids', tuple(ids))

    # --- Sequence protocol ---
    def __len__(self) -> int:
        return len(self._ids)

    def __iter__(self) -> Iterator[UTNodeHandle]:
        owner = self._owner
        return (owner._build_node_handle(n) for n in self._ids)

    def __contains__(self, node) -> bool:
        if isinstance(node, UTNodeHandle):
            if node._owner is not self._owner:
                return False
            node = node.id
        return node in self._ids

    def __bool__(self) -> bool:
        return bool(self._ids)

    def __repr__(self) -> str:
        return f"{type(self).__name__}({list(self._ids)})"

    def __eq__(self, other):
        if isinstance(other, UTNodeSelector):
            return self._owner is other._owner and self._ids == other._ids
        if isinstance(other, (tuple, list)):
            return self._ids == tuple(other)
        return NotImplemented

    def __hash__(self) -> int:
        return hash((id(self._owner), self._ids))

    # --- Raw access ---
    @property
    def ids(self) -> tuple:
        """Underlying tuple of node IDs."""
        return self._ids

    @property
    def first(self) -> UTNodeHandle:
        """The first node handle in the selection."""
        return self._owner._build_node_handle(self._ids[0])

    @property
    def last(self) -> UTNodeHandle:
        """The last node handle in the selection."""
        return self._owner._build_node_handle(self._ids[-1])

    @property
    def first_id(self) -> int:
        return self._ids[0]

    @property
    def last_id(self) -> int:
        return self._ids[-1]

    @property
    def owner(self):
        """The UT/UC this selector is bound to."""
        return self._owner

    # --- Indexing (always returns same subclass) ---
    def __getitem__(self, key):
        if isinstance(key, int):
            return self._owner._build_node_handle(self._ids[key])
        if isinstance(key, slice):
            return type(self)(self._owner, self._ids[key])
        if isinstance(key, (list, tuple)):
            return type(self)(self._owner, tuple(self._ids[i] for i in key))
        raise TypeError(
            f"Invalid selector index: {type(key).__name__}; "
            f"expected int, slice, or list/tuple of ints"
        )

    # --- Sub-selection on the underlying tree ---
    def _require_singleton(self, name: str) -> int:
        if len(self._ids) != 1:
            raise ValueError(
                f"{name} requires a single-node selector; got {len(self._ids)} nodes. "
                f"Iterate (for branch in sel:) or use {type(self._owner).__name__.lower()}.select(...)"
            )
        return self._ids[0]

    def selectors(self):
        cls = type(self)
        owner = self._owner
        return tuple(cls(owner, (n,)) for n in self._ids)

    def singletons(self):
        return self.selectors()

    @property
    def leaves(self) -> 'UTNodeSelector':
        """Leaves of the subtree rooted at this single selected node."""
        n = self._require_singleton("leaves")
        rt = self._owner._rt
        if n in rt.leaf_nodes:
            ids = (n,)
        else:
            ids = tuple(rt.subtree_leaves(n))
        return type(self)(self._owner, ids)

    @property
    def children(self) -> 'UTNodeSelector':
        """Direct children of this single selected node."""
        n = self._require_singleton("children")
        return type(self)(self._owner, tuple(self._owner._rt.successors(n)))

    @property
    def first_leaf(self) -> 'UTNodeSelector':
        return self.leaves[0]

    @property
    def last_leaf(self) -> 'UTNodeSelector':
        return self.leaves[-1]

    @property
    def first_child(self) -> 'UTNodeSelector':
        return self.children[0]

    @property
    def last_child(self) -> 'UTNodeSelector':
        return self.children[-1]

    # --- Composition (all preserve subclass) ---
    def filter(self, predicate: Callable[['NodeContext'], bool]) -> 'UTNodeSelector':
        """Keep nodes for which ``predicate(NodeContext)`` returns truthy."""
        total = len(self._ids)
        return type(self)(self._owner, tuple(
            n for i, n in enumerate(self._ids)
            if predicate(self._owner._build_node_context(n, i, total))
        ))

    def where(self, mask: Iterable[bool]) -> 'UTNodeSelector':
        """Keep nodes where the corresponding mask entry is truthy."""
        mask_list = list(mask)
        if len(mask_list) != len(self._ids):
            raise ValueError(
                f"where() mask length mismatch: got {len(mask_list)}, "
                f"expected {len(self._ids)}"
            )
        return type(self)(
            self._owner,
            tuple(n for n, m in zip(self._ids, mask_list) if m),
        )

    def __or__(self, other):
        if not isinstance(other, UTNodeSelector) or other._owner is not self._owner:
            return NotImplemented
        seen = set(self._ids)
        tail = tuple(n for n in other._ids if n not in seen)
        return type(self)(self._owner, self._ids + tail)

    def __and__(self, other):
        if not isinstance(other, UTNodeSelector) or other._owner is not self._owner:
            return NotImplemented
        other_set = set(other._ids)
        return type(self)(self._owner, tuple(n for n in self._ids if n in other_set))

    def __sub__(self, other):
        if not isinstance(other, UTNodeSelector) or other._owner is not self._owner:
            return NotImplemented
        other_set = set(other._ids)
        return type(self)(self._owner, tuple(n for n in self._ids if n not in other_set))

    # --- UT-level mutators ---
    def make_rest(self):
        """Rest every node in the selection (and its subtree)."""
        return self._owner.make_rest(self)

    def subdivide(self, S):
        """Subdivide every node in the selection with structure ``S``."""
        for n in self._ids:
            self._owner.subdivide(n, S)
        return self._owner

    def sparsify(self, probability):
        """Sparsify leaves under the selection's nodes with ``probability``."""
        return self._owner.sparsify(probability, node=self)


class UTNodeView:
    """View of UT nodes; subscripting returns a Chronon for that node."""

    def __init__(self, ut):
        self._ut = ut

    def __getitem__(self, node):
        self._ut._ensure_timing_cache()
        node_id = self._ut._coerce_singleton_node_target(node, "nodes")
        return self._ut._make_node_proxy(node_id)

    def __iter__(self):
        return iter(self._ut._rt.nodes)

    def __contains__(self, node):
        try:
            node_id = self._ut._coerce_singleton_node_target(node, "nodes")
        except (TypeError, ValueError):
            return False
        return node_id in self._ut._rt

    def __len__(self):
        return len(self._ut._rt)

    def __call__(self, data=False):
        self._ut._ensure_timing_cache()
        if data:
            for node in self._ut._rt.nodes:
                yield (node, self._ut._make_node_proxy(node))
        else:
            for node in self._ut._rt.nodes:
                yield node


class Chronon(metaclass=TemporalMeta):
    """
    A node in its temporal context within a :class:`TemporalUnit`.

    Exposes real-time onset/duration and metric data for any node (leaf or branch).
    Supports dict-like access (e.g. chronon['real_onset']) for compatibility.

    Parameters
    ----------
    node_id : int
        The node identifier within the rhythm tree.
    ut : TemporalUnit
        The parent temporal unit that owns this node.
    """
    __slots__ = ('_node_id', '_ut')

    def __init__(self, node_id: int, ut: 'TemporalUnit'):
        self._node_id = node_id
        self._ut = ut

    def _rt_node(self):
        return self._ut._rt[self._node_id]

    def _real_data(self):
        self._ut._ensure_timing_cache()
        return self._ut._real_times.get(self._node_id, {})

    def __getattr__(self, key):
        if key in ('real_onset', 'real_duration'):
            return self._real_data()[key]
        try:
            return self._rt_node()[key]
        except KeyError:
            raise AttributeError(f"'{type(self).__name__}' has no attribute '{key}'")

    def __getitem__(self, key):
        if key in ('real_onset', 'real_duration'):
            return self._real_data()[key]
        return self._rt_node()[key]

    def get(self, key, default=None):
        if key in ('real_onset', 'real_duration'):
            return self._real_data().get(key, default)
        return self._rt_node().get(key, default)

    def __contains__(self, key):
        if key in ('real_onset', 'real_duration'):
            return key in self._real_data()
        return key in self._rt_node()

    @property
    def start(self):
        """The absolute start time in seconds."""
        return abs(self.real_onset)

    @property
    def duration(self):
        """The absolute duration in seconds."""
        return abs(self.real_duration)

    @property
    def end(self):
        """The absolute end time in seconds."""
        return self.start + abs(self.duration)

    @property
    def proportion(self):
        """The integer proportion value from the rhythm tree."""
        return self._rt_node()['proportion']

    @property
    def metric_duration(self):
        """The fractional metric duration relative to the measure."""
        return self._rt_node()['metric_duration']

    @property
    def metric_onset(self):
        """The fractional metric onset relative to the measure."""
        return self._rt_node()['metric_onset']

    @property
    def node_id(self):
        """The node identifier within the parent rhythm tree."""
        return self._node_id

    @property
    def is_rest(self):
        """Whether this event is a rest (negative proportion)."""
        return self._rt_node()['proportion'] < 0

    def __str__(self):
        return pd.DataFrame({
            'node_id': [self.node_id],
            'start': [self.start],
            'duration': [self.duration], 
            'end': [self.end],
            'is_rest': [self.is_rest],
            'proportion': [self.proportion],
            'metric_onset': [self.metric_onset],
            'metric_duration': [self.metric_duration],
        }, index=['']).__str__()
    
    def __repr__(self):
        return self.__str__()


[docs] class TemporalUnit(metaclass=TemporalMeta): """ A rhythmic structure bound to a tempo, producing real-time events. A ``TemporalUnit`` combines a :class:`RhythmTree` (defined by *tempus* and *prolatio*) with a tempo specification (*beat*, *bpm*) to produce concrete onset times and durations in seconds. Outside a :class:`~klotho.thetos.composition.score.Score`, a temporal unit always starts at time 0 and its duration is fixed after construction. Placement within a timeline and duration adjustment are handled by :class:`~klotho.thetos.composition.score.ScoreItem` after the unit has been added to a Score. Parameters ---------- span : int, float, or Fraction, optional Number of measures. Default is 1. tempus : Meas, Fraction, int, float, or str, optional The time signature. Default is ``'4/4'``. prolatio : tuple or str, optional The subdivision specification. A tuple gives explicit proportions; a string selects a preset (``'d'`` = duration, ``'r'`` = rest, ``'p'`` = pulse). Default is ``'d'``. beat : Fraction, int, float, str, or None, optional The beat reference for tempo calculation. When None, the denominator of the time signature is used. Default is None. bpm : int, float, or None, optional Beats per minute. Default is None (falls back to 60). Examples -------- >>> ut = TemporalUnit(tempus='4/4', prolatio='p', bpm=120) >>> len(ut) 4 """
[docs] def __init__(self, span : Union[int,float,Fraction] = 1, tempus : Union[Meas,Fraction,int,float,str] = '4/4', prolatio : Union[tuple,str] = 'd', beat : Union[None,Fraction,int,float,str] = None, bpm : Union[None,int,float] = None, ): self._type = None self._rt = self._set_rt(span, abs(Meas(tempus)), prolatio) self._real_times = {} self._beat = Fraction(beat) if beat else Fraction(1, self._rt.meas._denominator) self._bpm = bpm if bpm else 60 self._offset = 0.0 self._timing_dirty = True
[docs] @classmethod def from_rt(cls, rt:RhythmTree, beat = None, bpm = None): """ Construct a ``TemporalUnit`` from an existing :class:`RhythmTree`. Parameters ---------- rt : RhythmTree The rhythm tree to wrap. beat : Fraction, int, float, str, or None, optional Beat reference. Default is None. bpm : int, float, or None, optional Beats per minute. Default is None. Returns ------- TemporalUnit """ return cls(span = rt.span, tempus = rt.meas, prolatio = rt.subdivisions, beat = beat, bpm = bpm)
_node_selector_class = UTNodeSelector _node_handle_class = UTNodeHandle @property def nodes(self): return UTNodeView(self) def _coerce_node_targets(self, node) -> list[int]: def _append(item, out): if isinstance(item, int): out.append(item) elif isinstance(item, UTNodeHandle): if item._owner is not self: raise ValueError("node handle belongs to a different owner") out.append(item.id) elif isinstance(item, UTNodeSelector): if item.owner is not self: raise ValueError("selector belongs to a different owner") out.extend(item.ids) else: raise TypeError("node must be int, node handle, selector, or iterable thereof") if isinstance(node, (int, UTNodeHandle, UTNodeSelector)): ids = [] _append(node, ids) else: ids = [] for item in node: _append(item, ids) if not ids: raise ValueError("Selection cannot be empty") return ids def _coerce_singleton_node_target(self, node, name: str) -> int: ids = self._coerce_node_targets(node) if len(ids) != 1: raise ValueError( f"{name} requires a single-node selector; got {len(ids)} nodes. " "Iterate (for branch in sel:) and call subtree helpers on each singleton." ) return ids[0] def _build_node_handle(self, node_id: int) -> UTNodeHandle: self._ensure_timing_cache() if node_id not in self._rt.nodes: raise ValueError(f"Node {node_id} not found in tree") return self._node_handle_class(self, node_id) def _build_node_ref(self, node_id: int) -> UTNodeHandle: return self._build_node_handle(node_id) def _build_node_context(self, node_id: int, index: int, total: int) -> NodeContext: return NodeContext(ref=self._build_node_handle(node_id), index=index, total=total) # ------------------------------------------------------------------ # Node-returning traversal (returns selector bound to this UT/UC) # ------------------------------------------------------------------ @property def leaves(self): """All leaves in left-to-right order (selector form of RT.leaf_nodes).""" return self._node_selector_class(self, self._rt.leaf_nodes) @property def root(self): """1-element selector for the root node. Chain mutations: ``uc.root.set_pfields(amp=0.3)``. """ return self._node_selector_class(self, (self._rt.root,))
[docs] def leaves_of(self, node): """Leaves of the subtree rooted at ``node`` (selector form of RT.subtree_leaves).""" node_id = self._coerce_singleton_node_target(node, "leaves_of") return self._node_selector_class(self, self._rt.subtree_leaves(node_id))
[docs] def at_depth(self, d: int, operator: str = '=='): """Nodes at a specific depth (selector form of RT.at_depth).""" return self._node_selector_class(self, self._rt.at_depth(d, operator))
[docs] def successors(self, node): """Direct children of ``node`` (selector form of RT.successors).""" node_id = self._coerce_singleton_node_target(node, "successors") return self._node_selector_class(self, self._rt.successors(node_id))
[docs] def select(self, *ids): """Build an ad-hoc selector from ints/selectors or iterables thereof.""" if len(ids) == 1: selected = self._coerce_node_targets(ids[0]) else: selected = self._coerce_node_targets(ids) return self._node_selector_class(self, tuple(selected))
# ------------------------------------------------------------------ # Non-node scalar forwards (unchanged return types) # ------------------------------------------------------------------ @property def depth(self): """Maximum depth of the underlying RT.""" return self._rt.depth @property def k(self): """Maximum branching factor of the underlying RT.""" return self._rt.k
[docs] def depth_of(self, node): """Depth of ``node`` in the underlying RT.""" return self._rt.depth_of(node)
[docs] def out_degree(self, node): """Out-degree of ``node`` in the underlying RT.""" return self._rt.out_degree(node)
[docs] def topological_sort(self): """Topological sort of the underlying RT's nodes.""" return self._rt.topological_sort()
@property def span(self): """The number of measures that the TemporalUnit spans.""" return self._rt.span @property def tempus(self): """The time signature of the TemporalUnit.""" return self._rt.meas @property def prolationis(self): """The S-part of a RhythmTree which describes the subdivisions of the TemporalUnit.""" return self._rt.subdivisions # @prolationis.setter # def prolationis(self, prolatio: Union[tuple, str]): # self._rt = self._set_rt(self.span, self.tempus, prolatio) @property def rt(self): """The RhythmTree of the TemporalUnit (returns a copy).""" return self._rt.copy() @property def metric_durations(self): """The metric durations from the RhythmTree which describe the proportional durations of the TemporalUnit.""" return self._rt.durations @property def metric_onsets(self): """The metric onsets from the RhythmTree which describe the proportional onset times of the TemporalUnit.""" return self._rt.onsets @property def beat(self): """The rhythmic ratio that describes the beat of the TemporalUnit.""" return self._beat @property def bpm(self): """The beats per minute of the TemporalUnit.""" return self._bpm @property def type(self): """The type of the TemporalUnit.""" return self._type @property def start(self) -> float: """Absolute start time in seconds. Always ``0`` for a unit outside a Score. Inside a Score the start time is assigned by placement kwargs on :meth:`~klotho.thetos.composition.score.Score.add`. """ return self._offset @property def onsets(self): """The real-time onset of each leaf event in seconds.""" self._ensure_timing_cache() return tuple(self._real_times[n]['real_onset'] for n in self._rt.leaf_nodes) @property def durations(self): """The real-time duration of each leaf event in seconds.""" self._ensure_timing_cache() return tuple(self._real_times[n]['real_duration'] for n in self._rt.leaf_nodes) @property def duration(self): """The total duration (in seconds) of the TemporalUnit.""" return beat_duration(ratio = str(self._rt.meas * self._rt.span), beat_ratio = self.beat, bpm = self.bpm ) @property def end(self) -> float: """Absolute end time in seconds (``start + duration``).""" return self._offset + self.duration @property def time(self): """The absolute start and end times (in seconds) of the TemporalUnit.""" return self._offset, self._offset + self.duration @property def events(self): """ A :class:`~pandas.DataFrame` of all leaf events with timing and metric data. Returns ------- pandas.DataFrame """ events = self._materialize_events() return pd.DataFrame([{ 'node_id': c.node_id, 'start': c.start, 'duration': c.duration, 'end': c.end, 'is_rest': c.is_rest, 's': c.proportion, 'metric_onset': c.metric_onset, 'metric_duration': c.metric_duration, } for c in events], index=range(len(events))) def _scale_bpm(self, factor: float) -> None: """Multiply bpm by ``factor`` (private; used by ``ScoreItem``). A factor of ``0.5`` halves the bpm, doubling the resulting duration. This method is deliberately private: outside a Score, a unit's duration is immutable; duration editing is mediated by :meth:`klotho.thetos.composition.score.ScoreItem.set_duration`. """ self._bpm = self._bpm * factor self._invalidate_timing_cache()
[docs] def make_rest(self, node) -> None: """ Turn a node (or each node in an iterable) and all descendants into rests. Delegates to :meth:`RhythmTree.make_rest` and re-evaluates timing once at the end (batched across all provided nodes). Parameters ---------- node : int or iterable of int A single node ID, or an iterable of node IDs, to convert to rests. Raises ------ ValueError If any node is not found in the rhythm tree. """ nodes = self._coerce_node_targets(node) for n in nodes: self._rt.make_rest(n) self._invalidate_timing_cache()
[docs] def subdivide(self, node: int, S) -> None: """ Subdivide a leaf node with structure (D, S). Delegates to :meth:`RhythmTree.subdivide` and invalidates cached events. Parameters ---------- node : int The leaf node to subdivide. S : tuple Valid subdivisions tuple (integers or nested (D, S) tuples). Raises ------ ValueError If the node is not found or is not a leaf. """ self._rt.subdivide(node, S) self._invalidate_timing_cache()
[docs] def sparsify(self, probability, node=None): """ Randomly convert leaf events to rests with a given probability. Parameters ---------- probability : float Probability (0--1) that each eligible leaf becomes a rest. node : int, list of int, or None, optional Restrict to leaves under this node (or nodes). When None, all leaves are candidates. Default is None. """ import numpy as _np if node is None: targets = list(self._rt.leaf_nodes) else: seen = set() targets = [] for n in self._coerce_node_targets(node): for leaf in self._rt.subtree_leaves(n): if leaf not in seen: seen.add(leaf) targets.append(leaf) targets = [n for n in targets if self._rt[n].get('proportion', 1) >= 0] for leaf in targets: if _np.random.uniform() < probability: self.make_rest(leaf)
def _set_rt(self, span:int, tempus:Union[Meas,Fraction,str], prolatio:Union[tuple,str]) -> RhythmTree: match prolatio: case tuple(): self._type = ProlatioTypes.SUBDIVISION return RhythmTree(span = span, meas = tempus, subdivisions = prolatio) case str(): prolatio = prolatio.lower() match prolatio: case p if p.lower() in ProlatioTypes.PULSTYPES.value: self._type = ProlatioTypes.PULSE return RhythmTree( span = span, meas = tempus, subdivisions = (1,) * tempus._numerator ) case d if d.lower() in ProlatioTypes.DURTYPES.value: self._type = ProlatioTypes.DURATION return RhythmTree( span = span, meas = tempus, subdivisions = (1,) ) case r if r.lower() in ProlatioTypes.RESTYPES.value: self._type = ProlatioTypes.REST return RhythmTree( span = span, meas = tempus, subdivisions = (-1,) ) case _: raise ValueError(f'Invalid string: {prolatio}') case _: raise ValueError(f'Invalid prolatio type: {type(prolatio)}') def _compute_timing_cache(self): """Recompute real-time onset/duration cache for all nodes.""" self._real_times.clear() for node in self._rt.nodes: metric_duration = self._rt[node]['metric_duration'] metric_onset = self._rt[node]['metric_onset'] real_duration = beat_duration(ratio=metric_duration, bpm=self.bpm, beat_ratio=self.beat) real_onset = beat_duration(ratio=metric_onset, bpm=self.bpm, beat_ratio=self.beat) + self._offset self._real_times[node] = {'real_duration': real_duration, 'real_onset': real_onset} self._timing_dirty = False def _ensure_timing_cache(self): if self._timing_dirty or len(self._real_times) != len(self._rt): self._compute_timing_cache() def _make_node_proxy(self, node_id: int): return Chronon(node_id, self) def _event_context(self): self._ensure_timing_cache() return None def _make_event(self, node_id: int, event_context=None): return Chronon(node_id, self) def _materialize_events(self): """Materialize leaf Chronons lazily from current tree state.""" leaf_nodes = tuple(self._rt.leaf_nodes) event_context = self._event_context() return tuple(self._make_event(node_id, event_context) for node_id in leaf_nodes) def _invalidate_timing_cache(self): self._timing_dirty = True def __getitem__(self, idx): leaf_nodes = tuple(self._rt.leaf_nodes) event_context = self._event_context() if isinstance(idx, slice): return tuple(self._make_event(node_id, event_context) for node_id in leaf_nodes[idx]) return self._make_event(leaf_nodes[idx], event_context) def __iter__(self): leaf_nodes = tuple(self._rt.leaf_nodes) event_context = self._event_context() for node_id in leaf_nodes: yield self._make_event(node_id, event_context) def __len__(self): return len(self._rt.leaf_nodes) def __str__(self): result = ( f'Tempus: {self._rt.meas}' + (f' (x{self._rt.span})' if self._rt.span > 1 else '') + '\n' + f'Prolatio: {self._type.value}\n' + f'Events: {len(self)}\n' + f'Tempo: {self._beat} = {self._bpm}\n' + f'Time: {seconds_to_hmsms(self.time[0])} - {seconds_to_hmsms(self.time[1])} ({seconds_to_hmsms(self.duration)})\n' + f'{"-" * 50}\n' ) return result def __repr__(self): return self.__str__()
[docs] def repeat(self, n): """ Create a :class:`TemporalUnitSequence` of *n* copies of this unit. Parameters ---------- n : int Number of repetitions. Returns ------- TemporalUnitSequence """ uts = TemporalUnitSequence() uts.extend([self] * n) return uts
[docs] def copy(self): """Create a deep copy of this TemporalUnit. The copy preserves any internal placement (``_offset``) so that containers like :class:`TemporalUnitSequence` can rebuild cleanly. """ c = TemporalUnit( span=self.span, tempus=self.tempus, prolatio=self.prolationis, beat=self.beat, bpm=self.bpm, ) c._offset = self._offset c._invalidate_timing_cache() return c
[docs] class TemporalUnitSequence(metaclass=TemporalMeta): """ An ordered sequence of :class:`TemporalUnit` objects representing consecutive temporal events. Units are automatically offset so that each begins where the previous one ends. Outside a :class:`~klotho.thetos.composition.score.Score`, a sequence always starts at time 0 and its duration is fixed after construction. Parameters ---------- ut_seq : list of TemporalUnit, optional Initial sequence of temporal units. Default is an empty list. """
[docs] def __init__(self, ut_seq:Union[list[TemporalUnit], None]=None): if ut_seq is None: ut_seq = [] self._seq = [ut.copy() for ut in ut_seq] self._offset = 0.0 self._set_offsets()
def _set_offsets(self): """Updates the offsets of all members based on their position in the sequence. Members may be ``TemporalUnit``, ``CompositionalUnit``, ``TemporalUnitSequence``, or ``TemporalBlock``; ``_reoffset`` dispatches the correct cascade for each. """ running_offset = self._offset for ut in self._seq: _reoffset(ut, running_offset) running_offset += ut.duration @property def seq(self): """The list of TemporalUnit objects in the sequence.""" return self._seq @property def onsets(self): """A tuple of onset times (in seconds) for each TemporalUnit in the sequence.""" return calc_onsets(self.durations) @property def durations(self): """A tuple of durations (in seconds) for each TemporalUnit in the sequence.""" return tuple(ut.duration for ut in self._seq) @property def duration(self): """The total duration (in seconds) of the sequence.""" return sum(abs(d) for d in self.durations) @property def start(self) -> float: """Absolute start time in seconds (``0`` outside a Score).""" return self._offset @property def end(self) -> float: """Absolute end time in seconds (``start + duration``).""" return self._offset + self.duration @property def size(self): """The total number of events across all TemporalUnits in the sequence.""" return sum(len(ut) for ut in self._seq) @property def time(self): """The absolute start and end times (in seconds) of the sequence.""" return self._offset, self._offset + self.duration def _scale_bpm(self, factor: float) -> None: """Multiply every member's bpm by ``factor`` and recompute offsets. Private; used by :class:`~klotho.thetos.composition.score.ScoreItem` to stretch a sequence's total duration while preserving the relative durations between its members. """ for ut in self._seq: ut._scale_bpm(factor) self._set_offsets()
[docs] def append(self, ut: TemporalUnit, repeat: int = 1) -> None: """ Append a temporal unit to the end of the sequence. Parameters ---------- ut : TemporalUnit The unit to append. repeat : int, optional Number of independent copies to append. Default is 1. """ for _ in range(repeat): self._seq.append(ut.copy()) self._set_offsets()
[docs] def prepend(self, ut: TemporalUnit) -> None: """ Prepend a temporal unit to the beginning of the sequence. Parameters ---------- ut : TemporalUnit The unit to prepend. """ self._seq.insert(0, ut.copy()) self._set_offsets()
[docs] def insert(self, index: int, ut: TemporalUnit) -> None: """ Insert a temporal unit at the specified index. Parameters ---------- index : int The position at which to insert. ut : TemporalUnit The unit to insert. Raises ------ IndexError If the index is out of range. """ if not -len(self._seq) <= index <= len(self._seq): raise IndexError(f"Index {index} out of range for sequence of length {len(self._seq)}") self._seq.insert(index, ut.copy()) self._set_offsets()
[docs] def remove(self, index: int) -> None: """ Remove the temporal unit at the specified index. Parameters ---------- index : int The index of the unit to remove. Raises ------ IndexError If the index is out of range. """ if not -len(self._seq) <= index < len(self._seq): raise IndexError(f"Index {index} out of range for sequence of length {len(self._seq)}") self._seq.pop(index) self._set_offsets()
[docs] def replace(self, index: int, ut: TemporalUnit) -> None: """ Replace the temporal unit at the specified index. Parameters ---------- index : int The index of the unit to replace. ut : TemporalUnit The replacement unit. Raises ------ IndexError If the index is out of range. """ if not -len(self._seq) <= index < len(self._seq): raise IndexError(f"Index {index} out of range for sequence of length {len(self._seq)}") self._seq[index] = ut.copy() self._set_offsets()
[docs] def extend(self, other_seq, repeat: int = 1) -> None: """ Extend the sequence by appending all units from another iterable. Parameters ---------- other_seq : TemporalUnitSequence or iterable of TemporalUnit The source of units to append. repeat : int, optional Number of times to repeat the extension. Default is 1. """ for _ in range(repeat): for ut in other_seq: self._seq.append(ut.copy()) self._set_offsets()
def __getitem__(self, idx: int) -> TemporalUnit: return self._seq[idx] def __setitem__(self, idx: int, ut: TemporalUnit) -> None: self._seq[idx] = ut.copy() self._set_offsets() def __iter__(self): return iter(self._seq) def __len__(self): return len(self._seq) def __str__(self): return pd.DataFrame([{ 'Tempus': ut.tempus, 'Type': ut.type.name[0] if ut.type else '', 'Tempo': f'{ut.beat} = {round(ut.bpm, 3)}', 'Start': seconds_to_hmsms(ut.time[0]), 'End': seconds_to_hmsms(ut.time[1]), 'Duration': seconds_to_hmsms(ut.duration), } for ut in self._seq]).__str__() def __repr__(self): return self.__str__()
[docs] def copy(self): """Create a deep copy of this TemporalUnitSequence. Internal placement (``_offset``) is preserved on the copy so that :class:`TemporalBlock` and :class:`~klotho.thetos.composition.score.Score` can rebuild their layouts cleanly. """ c = TemporalUnitSequence(ut_seq=[ut.copy() for ut in self._seq]) c._offset = self._offset c._set_offsets() return c
[docs] class TemporalBlock(metaclass=TemporalMeta): """ A collection of parallel temporal structures representing simultaneous events. Each row can be a :class:`TemporalUnit`, :class:`TemporalUnitSequence`, or another ``TemporalBlock``. Rows are aligned according to the *axis* parameter and optionally sorted by duration. Parameters ---------- rows : list, optional Temporal structures (``TemporalUnit``, ``TemporalUnitSequence``, or ``TemporalBlock``). Default is an empty list. axis : float, optional Alignment axis from -1 (left) through 0 (center) to 1 (right). Default is -1. sort_rows : bool, optional Whether to sort rows by duration (longest first). Default is True. Notes ----- Outside a :class:`~klotho.thetos.composition.score.Score`, a block always starts at time 0 and its total duration is fixed after construction. """
[docs] def __init__(self, rows:Union[list[Union[TemporalUnit, TemporalUnitSequence, 'TemporalBlock']], None]=None, axis:float = -1, sort_rows:bool=True): if rows is None: rows = [] self._rows = [row.copy() for row in rows] if rows else [] self._axis = axis self._offset = 0.0 self._sort_rows = sort_rows self._align_rows()
# TODO: make free method in UT algos # Matrix to Block
[docs] @classmethod def from_tree_mat(cls, matrix, meas_denom:int=1, subdiv:bool=False, rotation_offset:int=1, beat=None, bpm=None): """ Create a ``TemporalBlock`` from a matrix of tree specifications. Parameters ---------- matrix : tuple of tuple Matrix where each element is a ``(D, S)`` pair. meas_denom : int, optional Denominator for measure fractions. Default is 1. subdiv : bool, optional Whether to apply automatic subdivision. Default is False. rotation_offset : int, optional Offset for rotation calculations. Default is 1. beat : Fraction, str, float, or None, optional Beat ratio specification. Default is None. bpm : int, float, or None, optional Beats per minute. Default is None. Returns ------- TemporalBlock """ tb = [] for i, row in enumerate(matrix): seq = [] for j, e in enumerate(row): offset = rotation_offset * i if subdiv: D, S = e[0], auto_subdiv(e[1][::-1], offset - j - i) else: D, S = e[0], e[1] seq.append(TemporalUnit(tempus = Meas(abs(D), meas_denom), prolatio = S if D > 0 else 'r', bpm = bpm, beat = beat)) tb.append(TemporalUnitSequence(seq)) return cls(tuple(tb))
def _align_rows(self): """ Aligns the rows based on the current axis value and optionally sorts them by duration. If sorting is enabled, the longest duration will be at the bottom (index 0), shortest at the top. If two rows have the same duration, their original order is preserved. """ if not self._rows: return row_duration_pairs = [(row, row.duration) for row in self._rows] if self._sort_rows: row_duration_pairs = sorted(row_duration_pairs, key=lambda pair: -pair[1], reverse=False) self._rows = [pair[0] for pair in row_duration_pairs] max_duration = max(duration for _, duration in row_duration_pairs) for row, row_duration in row_duration_pairs: if row_duration == max_duration: _reoffset(row, self._offset) continue duration_diff = max_duration - row_duration adjustment = duration_diff * (self._axis + 1) / 2 _reoffset(row, self._offset + adjustment) @property def height(self): """The number of rows in the block.""" return len(self._rows) @property def rows(self): """The list of temporal structures in the block.""" return self._rows @property def duration(self): """The total duration (in seconds) of the longest row in the block.""" return max(row.duration for row in self._rows) if self._rows else 0.0 @property def axis(self): """The temporal axis position of the block.""" return self._axis @property def start(self) -> float: """Absolute start time in seconds (``0`` outside a Score).""" return self._offset @property def end(self) -> float: """Absolute end time in seconds (``start + duration``).""" return self._offset + self.duration @property def sort_rows(self): """Whether to sort rows by duration (longest at index 0).""" return self._sort_rows @sort_rows.setter def sort_rows(self, sort_rows:bool): self._sort_rows = sort_rows self._align_rows() @axis.setter def axis(self, axis: float): """ Set the temporal axis and realign rows. Parameters ---------- axis : float Value between -1 and 1 controlling alignment: -1 = left-aligned, 0 = centered, 1 = right-aligned. Raises ------ ValueError If *axis* is outside [-1, 1]. """ if not -1 <= axis <= 1: raise ValueError("Axis must be between -1 and 1") self._axis = float(axis) self._align_rows() def _scale_bpm(self, factor: float) -> None: """Multiply every row's bpm(s) by ``factor`` and realign. Private; used by :class:`~klotho.thetos.composition.score.ScoreItem` to stretch a block's total duration while preserving the relative durations of its rows. """ for row in self._rows: row._scale_bpm(factor) self._align_rows()
[docs] def prepend(self, row: Union[TemporalUnit, TemporalUnitSequence, 'TemporalBlock']) -> None: """ Add a temporal structure at the beginning (index 0) of the block. Parameters ---------- row : TemporalUnit, TemporalUnitSequence, or TemporalBlock The temporal structure to prepend. """ self._rows.insert(0, row.copy()) self._align_rows()
[docs] def append(self, row: Union[TemporalUnit, TemporalUnitSequence, 'TemporalBlock']) -> None: """ Add a temporal structure at the end (highest index) of the block. Parameters ---------- row : TemporalUnit, TemporalUnitSequence, or TemporalBlock The temporal structure to append. """ self._rows.append(row.copy()) self._align_rows()
[docs] def insert(self, index: int, row: Union[TemporalUnit, TemporalUnitSequence, 'TemporalBlock']) -> None: """ Insert a temporal structure at the specified index. Parameters ---------- index : int The position at which to insert. row : TemporalUnit, TemporalUnitSequence, or TemporalBlock The temporal structure to insert. Raises ------ IndexError If the index is out of range. """ if not -len(self._rows) <= index <= len(self._rows): raise IndexError(f"Index {index} out of range for block of height {len(self._rows)}") self._rows.insert(index, row.copy()) self._align_rows()
[docs] def remove(self, index: int) -> None: """ Remove the row at the specified index. Parameters ---------- index : int The index of the row to remove. Raises ------ IndexError If the index is out of range. """ if not -len(self._rows) <= index < len(self._rows): raise IndexError(f"Index {index} out of range for block of height {len(self._rows)}") self._rows.pop(index) self._align_rows()
[docs] def replace(self, index: int, row: Union[TemporalUnit, TemporalUnitSequence, 'TemporalBlock']) -> None: """ Replace the row at the specified index. Parameters ---------- index : int The index of the row to replace. row : TemporalUnit, TemporalUnitSequence, or TemporalBlock The replacement temporal structure. Raises ------ IndexError If the index is out of range. """ if not -len(self._rows) <= index < len(self._rows): raise IndexError(f"Index {index} out of range for block of height {len(self._rows)}") self._rows[index] = row.copy() self._align_rows()
[docs] def extend(self, other_block: 'TemporalBlock') -> None: """ Extend the block by appending all rows from another block. Parameters ---------- other_block : TemporalBlock The block whose rows will be appended. """ for row in other_block: self._rows.append(row.copy()) self._align_rows()
def __getitem__(self, idx: int) -> Union[TemporalUnit, TemporalUnitSequence, 'TemporalBlock']: return self._rows[idx] def __iter__(self): return iter(self._rows) def __len__(self): return len(self._rows) def __str__(self): result = ( f'Rows: {len(self._rows)}\n' f'Axis: {self._axis}\n' f'Duration: {seconds_to_hmsms(self.duration)}\n' f'Time: {seconds_to_hmsms(self._offset)} - {seconds_to_hmsms(self._offset + self.duration)}\n' f'{"-" * 50}\n' ) return result def __repr__(self): return self.__str__()
[docs] def copy(self): """Create a deep copy of this TemporalBlock. Internal placement (``_offset``) is preserved on the copy so that :class:`~klotho.thetos.composition.score.Score` can rebuild its timeline cleanly. """ c = TemporalBlock( rows=[row.copy() for row in self._rows], axis=self._axis, sort_rows=self._sort_rows, ) c._offset = self._offset c._align_rows() return c
def _reoffset(unit, t: float) -> None: """Assign ``t`` as the internal offset of *unit* and cascade. Used by containers (``TemporalBlock``) and :class:`~klotho.thetos.composition.score.Score` to position a unit at an absolute time without going through a public setter. """ unit._offset = float(t) if isinstance(unit, TemporalUnitSequence): unit._set_offsets() elif isinstance(unit, TemporalBlock): unit._align_rows() elif hasattr(unit, '_invalidate_timing_cache'): unit._invalidate_timing_cache()