Source code for klotho.thetos.composition.compositional

"""
Compositional units combining temporal structure with parameterized events.

This module provides ``CompositionalUnit``, which extends ``TemporalUnit``
with a synchronized ``ParameterTree`` for hierarchical parameter management,
envelope application, slur marking, and instrument assignment. The ``Parametron``
class extends ``Chronon`` with parameter field access.
"""

from typing import Union, Optional, Any, Literal
from fractions import Fraction
from dataclasses import dataclass, field
import inspect
import warnings
import pandas as pd

from klotho.chronos import TemporalUnit, RhythmTree, Meas
from klotho.chronos.temporal_units.temporal import Chronon, NodeContext, UTNodeHandle, UTNodeSelector
from klotho.thetos.parameters import ParameterTree
from klotho.thetos.instruments import Instrument
from klotho.thetos.instruments.base import Effect
from klotho.thetos.instruments.base import Kit
from klotho.dynatos.envelopes import Envelope
from klotho.topos.collections.sequences import Pattern


[docs] @dataclass(frozen=True) class ParentDistributionView: ref: UTNodeHandle is_rest: bool pfields: dict mfields: dict instrument: Any _owner: Any = field(repr=False, compare=False) @property def id(self) -> int: return self.ref.id @property def parent(self) -> Optional['ParentDistributionView']: return self._owner._build_parent_distribution_view(self.id) def __getattr__(self, key): return getattr(self.ref, key)
[docs] @dataclass(frozen=True) class DistributionContext(NodeContext): is_rest: bool pfields: dict mfields: dict instrument: Any _owner: Any = field(repr=False, compare=False) @property def parent(self) -> Optional[ParentDistributionView]: return self._owner._build_parent_distribution_view(self.id)
PFieldContext = DistributionContext def _resolve_kit_member(inst, pt, node): if isinstance(inst, Kit): selector_val = pt.get_pfield(node, inst.selector) return inst._resolve(selector_val) return inst def _build_pfield_context(uc, node: int, index: int, total: int, is_rest: bool) -> DistributionContext: _ = is_rest return uc._build_node_context(node, index, total) def _callable_arity(fn): try: sig = inspect.signature(fn) return len([p for p in sig.parameters.values() if p.default is inspect.Parameter.empty]) except (ValueError, TypeError): return 0
[docs] class Parametron(Chronon): """ An enhanced Chronon that includes parameter field access. Extends the basic temporal event data (start, duration, etc.) with access to musical parameters stored in a synchronized ParameterTree. """ __slots__ = ('_pt',)
[docs] def __init__(self, node_id: int, ut, pt: ParameterTree): """ Initialize a Parametron. Parameters ---------- node_id : int The node ID in the rhythm tree. ut : TemporalUnit or CompositionalUnit The temporal unit providing temporal data. pt : ParameterTree The parameter tree providing field values (including instrument via ``pt.get(node_id, 'instrument')``). """ super().__init__(node_id, ut) self._pt = pt
@property def pfields(self): """ Get parameter field values for this event (for playback, etc.). Returns pfield values with instrument fallback. When the governing instrument is a Kit, defaults come from the resolved member (based on the selector pfield at this node), not the Kit shell. Returns ------- dict Dictionary of parameter field names and values """ result = {} inst = self._resolve_instrument() effective = _resolve_kit_member(inst, self._pt, self._node_id) if inst is not None else inst if effective is not None and hasattr(effective, 'pfields'): result.update(dict(effective.pfields)) for k in self._pt._meta['pfields']: v = self._pt.get_pfield(self._node_id, k) if v is not None: result[k] = v elif effective is not None and hasattr(effective, 'pfields'): eff_pfields = effective.pfields if k in eff_pfields: result[k] = eff_pfields[k] return result @property def mfields(self): """ Get meta field values for this event. Returns ------- dict Dictionary of meta field names and values """ return {k: self._pt.get_mfield(self._node_id, k) for k in self._pt._meta['mfields']} def _resolve_instrument(self): return self._pt.get_instrument(self._node_id)
[docs] def get_pfield(self, key: str, default=None): value = self._pt.get_pfield(self._node_id, key) return default if value is None else value
[docs] def get_mfield(self, key: str, default=None): value = self._pt.get_mfield(self._node_id, key) return default if value is None else value
def __getitem__(self, key: str): temporal_attrs = {'start', 'duration', 'end', 'proportion', 'metric_duration', 'metric_onset', 'node_id', 'is_rest', 'real_onset', 'real_duration'} if key in temporal_attrs: return getattr(self, key) v = self.get_pfield(key) if v is not None: return v return self.get_mfield(key)
[docs] class UCNodeSelector(UTNodeSelector): """Selector for :class:`CompositionalUnit` owners. Extends :class:`UTNodeSelector` with UC-specific verbs that delegate to the owning UC's parameter / envelope / slur / instrument mutators. Existing UC setter semantics (callable-per-node, Pattern-cycling, static tuple-as-poly-event, ``include_rests`` filtering, ensemble-family side effects, slur/envelope healing) are preserved verbatim. """ # --- Parameter verbs ---
[docs] def set_pfields(self, include_rests: bool = False, **kwargs) -> None: """Set parameter field values on every selected node.""" return self._owner.set_pfields( self, include_rests=include_rests, **kwargs )
[docs] def set_mfields(self, include_rests: bool = False, **kwargs) -> None: """Set meta field values on every selected node.""" return self._owner.set_mfields( self, include_rests=include_rests, **kwargs )
[docs] def set_instrument(self, instrument): """Assign an instrument (or Pattern/callable thereof) to the selection.""" return self._owner.set_instrument(self, instrument)
[docs] def apply_envelope(self, envelope, pfields, *, offset=0, take=None, scope: str = 'span', control: bool = False, endpoint: bool = True): """Apply an envelope to the selection. See :meth:`CompositionalUnit.apply_envelope`.""" return self._owner.apply_envelope( envelope, pfields, node=self, offset=offset, take=take, scope=scope, control=control, endpoint=endpoint, )
[docs] def apply_slur(self, *, offset=0, take=None, mode: str = 'span'): """Apply a slur over the selection. See :meth:`CompositionalUnit.apply_slur`.""" return self._owner.apply_slur( node=self, offset=offset, take=take, mode=mode, )
[docs] def clear_parameters(self) -> None: """Clear parameter values on every selected node (and its subtree).""" for n in self._ids: self._owner.clear_parameters(n)
[docs] def set(self, *, inst=None, mfields=None, pfields=None, include_rests: bool = False): """Set instrument / mfields / pfields in one call across the selection.""" return self._owner.set( self, inst=inst, mfields=mfields, pfields=pfields, include_rests=include_rests, )
# --- Per-node getters (return list aligned with self._ids) ---
[docs] def get_pfield(self, key: str, default=None) -> list: return [self._owner.get_pfield(n, key, default) for n in self._ids]
[docs] def get_mfield(self, key: str, default=None) -> list: return [self._owner.get_mfield(n, key, default) for n in self._ids]
[docs] def get_instrument(self) -> list: return [self._owner.get_instrument(n) for n in self._ids]
[docs] class UCNodeHandle(UTNodeHandle):
[docs] def set_pfields(self, include_rests: bool = False, **kwargs): return self._owner.set_pfields(self.id, include_rests=include_rests, **kwargs)
[docs] def set_mfields(self, include_rests: bool = False, **kwargs): return self._owner.set_mfields(self.id, include_rests=include_rests, **kwargs)
[docs] def set_instrument(self, instrument): return self._owner.set_instrument(self.id, instrument)
[docs] def apply_envelope(self, envelope, pfields, *, offset=0, take=None, scope: str = 'span', control: bool = False, endpoint: bool = True): return self._owner.apply_envelope( envelope, pfields, node=self.id, offset=offset, take=take, scope=scope, control=control, endpoint=endpoint, )
[docs] def apply_slur(self, *, offset=0, take=None, mode: str = 'span'): return self._owner.apply_slur( node=self.id, offset=offset, take=take, mode=mode )
[docs] def clear_parameters(self): return self._owner.clear_parameters(self.id)
[docs] def set(self, *, inst=None, mfields=None, pfields=None, include_rests: bool = False): return self._owner.set( self.id, inst=inst, mfields=mfields, pfields=pfields, include_rests=include_rests )
@property def is_rest(self): return self._owner._rt[self.id].get('proportion', 1) < 0 @property def pfields(self): return { key: self._owner.get_pfield(self.id, key) for key in self._owner._pt._meta['pfields'] } @property def mfields(self): return { key: self._owner.get_mfield(self.id, key) for key in self._owner._pt._meta['mfields'] } @property def instrument(self): return self._owner.get_instrument(self.id)
[docs] def get_pfield(self, key: str, default=None): return self._owner.get_pfield(self.id, key, default)
[docs] def get_mfield(self, key: str, default=None): return self._owner.get_mfield(self.id, key, default)
[docs] def get_instrument(self): return self._owner.get_instrument(self.id)
[docs] class CompositionalUnit(TemporalUnit): """ A TemporalUnit enhanced with synchronized parameter management capabilities. Extends TemporalUnit to include a shadow ParameterTree that maintains identical structural form to the internal RhythmTree. This allows for hierarchical parameter organization where parameter values can be set at any level and automatically propagate to descendant events. Parameters ---------- span : Union[int, float, Fraction], default=1 Number of measures the unit spans tempus : Union[Meas, Fraction, int, float, str], default='4/4' Time signature (e.g., '4/4', Meas(4,4)) prolatio : Union[tuple, str], default='d' Subdivision pattern (tuple) or type ('d', 'r', 'p', 's') beat : Union[None, Fraction, int, float, str], optional Beat unit for tempo (e.g., Fraction(1,4) for quarter note) bpm : Union[None, int, float], optional Beats per minute pfields : Union[dict, list, None], optional Parameter fields to initialize. Can be: - dict: {field_name: default_value, ...} - list: [field_name1, field_name2, ...] (defaults to 0.0) - None: No parameter fields initially Notes ----- Outside a :class:`~klotho.thetos.composition.score.Score`, a ``CompositionalUnit`` always starts at time 0 and its duration is fixed after construction. Placement and duration editing are handled by :class:`~klotho.thetos.composition.score.ScoreItem` once the UC has been added to a Score. Attributes ---------- pt : ParameterTree The synchronized parameter tree matching RhythmTree structure (returns copy) pfields : list List of all available parameter field names """ _node_selector_class = UCNodeSelector _node_handle_class = UCNodeHandle
[docs] def __init__(self, span : Union[int, float, Fraction] = 1, tempus : Union[Meas, Fraction, int, float, str] = '4/4', prolatio : Union[tuple, str] = 'd', beat : Union[None, Fraction, int, float, str] = None, bpm : Union[None, int, float] = None, inst : Union[Instrument, None] = None, mfields : Union[dict, list, None] = None, pfields : Union[dict, list, None] = None): super().__init__(span, tempus, prolatio, beat, bpm) if mfields is None: mfields = {} if 'group' not in mfields: mfields['group'] = 'default' self._pt = self._create_synchronized_parameter_tree(pfields, mfields) if inst is not None: self.set_instrument(self._pt.root, inst) self._slur_specs = {} self._next_slur_id = 0 self._control_envelopes: dict[int, dict] = {} self._next_envelope_id = 0
[docs] @classmethod def from_rt(cls, rt: RhythmTree, beat: Union[None, Fraction, int, float, str] = None, bpm: Union[None, int, float] = None, pfields: Union[dict, list, None] = None, mfields: Union[dict, list, None] = None, inst: Union[Instrument, None] = None): """ Create a CompositionalUnit from an existing RhythmTree. Parameters ---------- rt : RhythmTree Source rhythm tree whose structure is adopted. beat : Fraction, int, float, str, or None, optional Beat unit for tempo calculation. bpm : int, float, or None, optional Beats per minute. pfields : dict, list, or None, optional Parameter fields to initialize. mfields : dict, list, or None, optional Meta fields to initialize. inst : Instrument or None, optional Instrument to assign to the root node. Returns ------- CompositionalUnit A new CompositionalUnit with the rhythm tree's structure. """ return cls(span = rt.span, tempus = rt.meas, prolatio = rt.subdivisions, beat = beat, bpm = bpm, pfields = pfields, mfields = mfields, inst = inst)
[docs] @classmethod def from_ut(cls, ut: TemporalUnit, pfields: Union[dict, list, None] = None, mfields: Union[dict, list, None] = None, inst: Union[Instrument, None] = None): """ Create a CompositionalUnit from an existing TemporalUnit. Parameters ---------- ut : TemporalUnit Source temporal unit whose timing and structure are adopted. pfields : dict, list, or None, optional Parameter fields to initialize. mfields : dict, list, or None, optional Meta fields to initialize. inst : Instrument or None, optional Instrument to assign to the root node. Returns ------- CompositionalUnit A new CompositionalUnit with the temporal unit's structure. """ new_uc = cls( span = ut.span, tempus = ut.tempus, prolatio = ut.prolationis, beat = ut.beat, bpm = ut.bpm, pfields = pfields, mfields = mfields, inst = inst, ) new_uc._offset = ut._offset new_uc._invalidate_timing_cache() return new_uc
def _create_synchronized_parameter_tree(self, pfields: Union[dict, list, None], mfields: Union[dict, list, None] = None) -> ParameterTree: """ Create a ParameterTree with identical structure to the RhythmTree but blank node data. Parameters ---------- pfields : Union[dict, list, None] Parameter fields to initialize inst : Union[Instrument, None], optional Instrument to set on the root node mfields : Union[dict, list, None], optional Meta fields to initialize Returns ------- ParameterTree A parameter tree matching the rhythm tree structure with clean nodes """ pt = ParameterTree.from_tree_structure(self._rt) if pfields is not None: self._initialize_parameter_fields(pt, pfields) if mfields is not None: self._initialize_meta_fields(pt, mfields) return pt def _initialize_parameter_fields(self, pt: ParameterTree, pfields: Union[dict, list]): """ Initialize parameter fields across all nodes in the parameter tree. Parameters ---------- pt : ParameterTree The parameter tree to initialize pfields : Union[dict, list] Parameter fields to set """ if isinstance(pfields, dict): pt.set_pfields(pt.root, **pfields) elif isinstance(pfields, list): default_values = {field: 0.0 for field in pfields} pt.set_pfields(pt.root, **default_values) def _initialize_meta_fields(self, pt: ParameterTree, mfields: Union[dict, list]): """ Initialize meta fields across all nodes in the parameter tree. Parameters ---------- pt : ParameterTree The parameter tree to initialize mfields : Union[dict, list] Meta fields to set """ if isinstance(mfields, dict): pt.set_mfields(pt.root, **mfields) elif isinstance(mfields, list): default_values = {field: '' for field in mfields} pt.set_mfields(pt.root, **default_values) def _copy_pt_node_data(self, target_cu: 'CompositionalUnit', mapping: dict[int, int]) -> None: for old_node, new_node in mapping.items(): target_cu._pt.replace_node_data(new_node, dict(self._pt.items(old_node))) target_cu._pt._meta['pfields'] = set(self._pt._meta.get('pfields', set())) target_cu._pt._meta['mfields'] = set(self._pt._meta.get('mfields', set())) def _copy_pt_instruments(self, target_cu: 'CompositionalUnit', mapping: dict[int, int]) -> None: for old_node, inst in self._pt._node_instruments.items(): new_node = mapping.get(old_node) if new_node is not None: target_cu._pt.set_instrument(new_node, inst) def _sync_pt_after_rt_subdivide(self, node: int, new_children: list[int], pfields: dict, mfields: dict) -> None: for _ in new_children: self._pt.add_child(node) self._pt._invalidate_caches() for child in new_children: if pfields: self._pt.set_pfields(child, **pfields) if mfields: self._pt.set_mfields(child, **mfields) def _resolve_governing_instrument_node(self, node: int): return self._pt._resolve_governing_instrument_node(node) def _resolve_distribution_fields(self, node_id: int): inst = self._pt.get_instrument(node_id) effective = _resolve_kit_member(inst, self._pt, node_id) if inst is not None else inst inst_pfields = effective.pfields if (effective is not None and hasattr(effective, "pfields")) else {} pfields = {} for key in self._pt._meta["pfields"]: value = self._pt.get_pfield(node_id, key) if value is None and key in inst_pfields: value = inst_pfields[key] pfields[key] = value mfields = {key: self._pt.get_mfield(node_id, key) for key in self._pt._meta["mfields"]} is_rest = self._rt[node_id].get("proportion", 1) < 0 return is_rest, pfields, mfields, inst def _build_parent_distribution_view(self, node_id: int) -> Optional[ParentDistributionView]: parent_id = self._rt.parent(node_id) if parent_id is None: return None is_rest, pfields, mfields, instrument = self._resolve_distribution_fields(parent_id) return ParentDistributionView( ref=self._build_node_handle(parent_id), is_rest=is_rest, pfields=pfields, mfields=mfields, instrument=instrument, _owner=self, ) def _build_node_context(self, node_id: int, index: int, total: int) -> DistributionContext: base = super()._build_node_context(node_id, index, total) is_rest, pfields, mfields, instrument = self._resolve_distribution_fields(node_id) return DistributionContext( ref=base.ref, index=index, total=total, is_rest=is_rest, pfields=pfields, mfields=mfields, instrument=instrument, _owner=self, ) def _normalize_node_input(self, node): if node is None: raise ValueError("node selection is required") try: return self._coerce_node_targets(node) except TypeError as exc: raise ValueError("node must be int, selector, or iterable thereof") from exc def _resolve_leaf_selection(self, node): source_nodes = self._normalize_node_input(node) leaf_order = list(self._rt.leaf_nodes) leaf_index = {leaf: i for i, leaf in enumerate(leaf_order)} leaf_set = set(leaf_order) selected = set() for selected_node in source_nodes: if selected_node not in self._rt.nodes: raise ValueError(f"Node {selected_node} not found in tree") if selected_node in leaf_set: selected.add(selected_node) else: selected.update(self._rt.subtree_leaves(selected_node)) if not selected: raise ValueError("Selection resolves to no leaf nodes") ordered = [leaf for leaf in leaf_order if leaf in selected] indices = [leaf_index[leaf] for leaf in ordered] if indices != list(range(indices[0], indices[-1] + 1)): raise ValueError("Selection must be contiguous in left-to-right tree order") return ordered def _resolve_per_node_leaf_groups(self, node): source_nodes = self._normalize_node_input(node) leaf_set = set(self._rt.leaf_nodes) groups = [] for selected_node in source_nodes: if selected_node not in self._rt.nodes: raise ValueError(f"Node {selected_node} not found in tree") if selected_node in leaf_set: groups.append((selected_node,)) else: groups.append(tuple(self._rt.subtree_leaves(selected_node))) return groups def _apply_offset_take(self, leaves, offset=0, take=None): if offset < 0: raise ValueError("offset must be >= 0") if offset > len(leaves): raise ValueError("offset exceeds selection bounds") if take is None: result = leaves[offset:] else: if take <= 0: raise ValueError("take must be > 0") end = offset + take if end > len(leaves): raise ValueError("offset/take exceeds UC boundaries") result = leaves[offset:end] if not result: raise ValueError("Resolved span is empty") return tuple(result) def _ranges_overlap(self, left, right): return not (left[1] < right[0] or right[1] < left[0]) def _selection_index_range(self, leaves): leaf_order = list(self._rt.leaf_nodes) leaf_index = {leaf: i for i, leaf in enumerate(leaf_order)} idx = [leaf_index[leaf] for leaf in leaves] return min(idx), max(idx) def _build_effective_parameter_tree(self): pt_snapshot = self._pt.copy() for slur_id, slur_spec in self._slur_specs.items(): leaves = list(slur_spec['leaf_nodes']) if not leaves: continue first, last = leaves[0], leaves[-1] for leaf in leaves: pt_snapshot.set_mfields( leaf, _slur_start=1 if leaf == first else 0, _slur_end=1 if leaf == last else 0, _slur_id=slur_id ) return pt_snapshot def _event_context(self): self._ensure_timing_cache() return self._build_effective_parameter_tree() def _make_node_proxy(self, node_id: int): self._ensure_timing_cache() return Parametron(node_id, self, self._pt) def _make_event(self, node_id: int, event_context=None): eval_pt = event_context if event_context is not None else self._build_effective_parameter_tree() return Parametron(node_id, self, eval_pt) @property def pt(self) -> ParameterTree: """ Effective ParameterTree snapshot for the current UC state. Returns ------- ParameterTree A copy of the parameter tree with UC overlays materialized for plotting and inspection (e.g., envelope-applied values and slur markers). """ return self._build_effective_parameter_tree() @property def pfields(self) -> list: """ List of all available parameter field names. Returns ------- list of str Sorted list of parameter field names """ return self._pt.pfields @property def mfields(self) -> list: """ List of all available meta field names. Returns ------- list of str Sorted list of meta field names """ return self._pt.mfields @staticmethod def _instrument_display(inst): if inst is None: return None if isinstance(inst, (str, int)): return inst if hasattr(inst, 'name') and inst.name not in (None, 'default'): return inst.name if hasattr(inst, 'defName'): return inst.defName if hasattr(inst, 'tonejs_class'): return inst.tonejs_class if hasattr(inst, 'prgm'): return inst.prgm return str(inst) @property def events(self): """ Flattened event DataFrame for inspection. Columns (left to right): ``node_id``, ``start``, ``dur``, ``metric_dur``, ``instrument``, then one column per pfield key, then one column per mfield key. Rests are indicated by negative ``metric_dur``. Pfield/mfield columns are the union across all events; missing keys are ``None``. Returns ------- pandas.DataFrame """ events = self._materialize_events() all_pf_keys: list[str] = [] all_mf_keys: list[str] = [] pf_seen: set[str] = set() mf_seen: set[str] = set() rows = [] for event in events: inst = self.get_instrument(event.node_id) pf = event.pfields mf = event.mfields for k in pf: if k not in pf_seen: pf_seen.add(k) all_pf_keys.append(k) for k in mf: if k not in mf_seen: mf_seen.add(k) all_mf_keys.append(k) rows.append((event, inst, pf, mf)) data = [] for event, inst, pf, mf in rows: row = { 'node_id': event.node_id, 'start': event.start, 'dur': event.duration, 'metric_dur': event.metric_duration, 'instrument': self._instrument_display(inst), } for k in all_pf_keys: row[k] = pf.get(k) for k in all_mf_keys: row[k] = mf.get(k) data.append(row) return pd.DataFrame(data, index=range(len(rows))) def _distribute_to_targets(self, targets, fields, include_rests, setter='pfields'): if not include_rests: targets = [n for n in targets if self._rt[n].get('proportion', 1) >= 0] total = len(targets) for i, n in enumerate(targets): ctx = _build_pfield_context( self, n, i, total, is_rest=self._rt[n].get('proportion', 1) < 0 ) resolved = {} for k, v in fields.items(): if callable(v): arity = _callable_arity(v) resolved[k] = v(ctx) if arity >= 1 else v() elif isinstance(v, Pattern): val = next(v) if val is not None: resolved[k] = val if resolved: if setter == 'pfields': self._pt.set_pfields(n, **resolved) else: self._pt.set_mfields(n, **resolved)
[docs] def set_pfields(self, node, include_rests=False, **kwargs) -> None: """ Set parameter field values for target node(s). Parameters ---------- node : int or list/tuple/set of int Target node(s). Single node: value evaluated once, set on that node, PT inheritance cascades. List of nodes: value evaluated once per node. include_rests : bool, default=False When True, rest nodes are included during callable/Pattern distribution. **kwargs Parameter field names and values. Value types: - Scalar: set directly on target node(s) (includes tuples, lists, or any non-callable/non-Pattern value) - Callable: evaluated once per target node (0-arg or 1-arg with DistributionContext) - Pattern: next() called once per target node """ targets = self._coerce_node_targets(node) distributable_fields = {k: v for k, v in kwargs.items() if callable(v) or isinstance(v, Pattern)} static_fields = {k: v for k, v in kwargs.items() if k not in distributable_fields} for n in targets: if static_fields: self._pt.set_pfields(n, **static_fields) if distributable_fields: self._distribute_to_targets(targets, distributable_fields, include_rests, setter='pfields')
[docs] def set_mfields(self, node, include_rests=False, **kwargs) -> None: """ Set meta field values for target node(s). Parameters ---------- node : int or list/tuple/set of int Target node(s). Same scoping rules as set_pfields. include_rests : bool, default=False When True, rest nodes are included during callable/Pattern distribution. **kwargs Meta field names and values. Value types: - Scalar: set directly on target node(s) - Callable: evaluated once per target node (0-arg or 1-arg with DistributionContext) - Pattern: next() called once per target node """ targets = self._coerce_node_targets(node) distributable_fields = {k: v for k, v in kwargs.items() if callable(v) or isinstance(v, Pattern)} static_fields = {k: v for k, v in kwargs.items() if k not in distributable_fields} for n in targets: if static_fields: self._pt.set_mfields(n, **static_fields) if distributable_fields: self._distribute_to_targets(targets, distributable_fields, include_rests, setter='mfields')
def _bake_envelope(self, selected, envelope, pfields_list, endpoint): self._ensure_timing_cache() sounding = [n for n in selected if self.nodes[n].get('proportion', 1) >= 0] if not sounding: warnings.warn( "apply_envelope: selection resolves to no sounding leaves; envelope not applied", RuntimeWarning, stacklevel=3 ) return if len(sounding) == 1 and not endpoint: warnings.warn( "apply_envelope: endpoint=False with a single sounding leaf " "collapses envelope duration to 0; falling back to endpoint=True", RuntimeWarning, stacklevel=3 ) endpoint = True start_time = min(self.nodes[n]['real_onset'] for n in sounding) if endpoint: end_time = max(self.nodes[n]['real_onset'] + abs(self.nodes[n]['real_duration']) for n in sounding) else: end_time = max(self.nodes[n]['real_onset'] for n in sounding) duration = end_time - start_time raw_total = sum(envelope.times) scaled_envelope = Envelope( values=envelope.values, times=envelope.times, curve=envelope._curve, time_scale=duration / raw_total if raw_total > 0 else 1.0 ) for node in sounding: event_time = self.nodes[node]['real_onset'] relative_time = max(0, min(event_time - start_time, scaled_envelope.total_time)) try: env_value = scaled_envelope.at_time(relative_time) except ValueError: env_value = scaled_envelope.values[0] if relative_time <= 0 else scaled_envelope.values[-1] self._pt.set_pfields(node, **{pfield: env_value for pfield in pfields_list}) def _resolve_control_envelope_leaves(self, desc): anchor = desc["anchor_node"] if anchor not in self._rt: return [] leaf_subset = desc["leaf_subset"] if leaf_subset is None: candidates = list(self._rt.subtree_leaves(anchor)) else: current_leaves = set(self._rt.leaf_nodes) candidates = [n for n in leaf_subset if n in current_leaves] return [n for n in candidates if self._rt[n].get('proportion', 1) >= 0] @staticmethod def _leaf_subset_contains(leaf_subset, value): return value in leaf_subset @staticmethod def _leaf_subset_intersects(leaf_subset, other): other_set = other if isinstance(other, (set, frozenset)) else set(other) return any(n in other_set for n in leaf_subset) @staticmethod def _leaf_subset_subtract(leaf_subset, other): other_set = other if isinstance(other, (set, frozenset)) else set(other) return tuple(n for n in leaf_subset if n not in other_set) @staticmethod def _leaf_subset_union(leaf_subset, other): seen = set(leaf_subset) extras = tuple(n for n in other if n not in seen) return tuple(leaf_subset) + extras def _resolve_control_envelope_time_span(self, desc, sounding=None): if sounding is None: sounding = self._resolve_control_envelope_leaves(desc) if not sounding: return (0.0, 0.0) self._ensure_timing_cache() start = min(self.nodes[n]['real_onset'] for n in sounding) if desc["endpoint"]: end = max(self.nodes[n]['real_onset'] + abs(self.nodes[n]['real_duration']) for n in sounding) else: end = max(self.nodes[n]['real_onset'] for n in sounding) return (start, end) def _check_envelope_overlap(self, new_pfields, new_leaves): new_pf_set = set(new_pfields) new_leaf_set = set(new_leaves) for desc in self._control_envelopes.values(): shared_pf = new_pf_set.intersection(desc["pfields"]) if not shared_pf: continue existing_leaves = set(self._resolve_control_envelope_leaves(desc)) shared_leaves = new_leaf_set.intersection(existing_leaves) if shared_leaves: raise ValueError( "Overlapping control envelopes on the same pfield " f"(pfields={sorted(shared_pf)}, " f"shared_leaves={sorted(shared_leaves)}, " f"existing_pfields={sorted(desc['pfields'])})" ) def _rebake_control_envelope(self, desc): sounding = self._resolve_control_envelope_leaves(desc) if sounding: self._bake_envelope(sounding, desc["envelope"], desc["pfields"], desc["endpoint"]) def _record_control_envelope(self, selected, envelope, pfields_list, endpoint): self._ensure_timing_cache() sounding = [n for n in selected if self._rt[n].get('proportion', 1) >= 0] if not sounding: warnings.warn( "apply_envelope: selection resolves to no sounding leaves; envelope not applied", RuntimeWarning, stacklevel=3 ) return anchor_node = selected[0] for n in selected[1:]: anchor_node = self._rt.lowest_common_ancestor(anchor_node, n) all_anchor_leaves = set(self._rt.subtree_leaves(anchor_node)) leaf_subset = None if set(selected) == all_anchor_leaves else tuple(selected) self._check_envelope_overlap(pfields_list, sounding) self._bake_envelope(sounding, envelope, pfields_list, endpoint) env_id = self._next_envelope_id self._next_envelope_id += 1 self._control_envelopes[env_id] = { "envelope": envelope, "pfields": list(pfields_list), "endpoint": endpoint, "anchor_node": anchor_node, "leaf_subset": leaf_subset, } return env_id
[docs] def resolved_control_envelopes(self): self._ensure_timing_cache() result = [] for desc in self._control_envelopes.values(): leaves = self._resolve_control_envelope_leaves(desc) if not leaves: continue start, end = self._resolve_control_envelope_time_span(desc, leaves) result.append({ "envelope": desc["envelope"], "pfields": desc["pfields"], "target_nodes": list(leaves), "time_span": (start, end), }) return result
[docs] def remove_envelope(self, env_id: int) -> None: """ Remove a previously-applied control envelope by handle. The baked pfield values written by this envelope are unset so that each affected leaf falls back to its inherited (parent/instrument) default. Only control-mode envelopes allocate handles; bake-mode envelopes are one-shot writes with no state to remove. Parameters ---------- env_id : int The identifier returned by ``apply_envelope(..., control=True)``. Raises ------ KeyError If ``env_id`` is not a live envelope handle on this UC. """ if env_id not in self._control_envelopes: raise KeyError(f"No control envelope with id {env_id}") desc = self._control_envelopes.pop(env_id) leaves = self._resolve_control_envelope_leaves(desc) for leaf in leaves: if leaf not in self._pt: continue node_data = self._pt.items(leaf) for pfield in desc["pfields"]: node_data.pop(pfield, None) self._pt.replace_node_data(leaf, node_data)
[docs] def apply_envelope(self, envelope: Envelope, pfields: Union[str, list], node: Union[int, list, tuple, set], offset: int = 0, take: Union[int, None] = None, scope: Literal["span", "per_node"] = "span", control: bool = False, endpoint: bool = True) -> Union[int, list[int]]: """ Apply an envelope to a contiguous leaf span within this UC. Parameters ---------- envelope : Envelope Envelope specification to apply. pfields : Union[str, list] Target parameter field(s). Overlap is allowed across different fields but rejected for overlapping spans on the same field. node : int | list | tuple | set Node selector. A single node resolves to subtree leaves. An iterable can be treated either as one combined span (``scope="span"``) or as independent per-node applications (``scope="per_node"``). offset : int, default=0 Leaf offset into the resolved contiguous selection. take : int, optional Number of leaves to include from ``offset``. If omitted, uses all leaves from ``offset`` to the end of the resolved selection. scope : {"span", "per_node"}, default="span" How the node selection is interpreted. ``"span"`` treats all resolved leaves as one contiguous group. ``"per_node"`` gives each node in the iterable its own independent envelope. control : bool, default=False When ``True``, values are still baked into the ParameterTree (for inspection) but a control-envelope descriptor is also recorded for runtime bus-based automation via a ``__klEnvCtrl`` control synth. endpoint : bool, default=True If True, envelope span is onset-to-end of the selected sounding leaves. If False, span is onset-to-onset. Returns ------- int | list[int] Envelope identifier, or list of identifiers when ``scope="per_node"``. In per-node scope, the return value is always a list. Raises ------ ValueError If selection is invalid/non-contiguous, offset/take overflows bounds, a same-pfield overlap is detected, or scope is invalid. """ pfields_list = pfields if isinstance(pfields, list) else [pfields] apply_fn = self._record_control_envelope if control else self._bake_envelope if scope == "span": selected = self._resolve_leaf_selection(node=node) selected = self._apply_offset_take(selected, offset=offset, take=take) return apply_fn(selected, envelope, pfields_list, endpoint) elif scope == "per_node": groups = self._resolve_per_node_leaf_groups(node) results = [] for group in groups: selected = self._apply_offset_take(group, offset=offset, take=take) results.append(apply_fn(selected, envelope, pfields_list, endpoint)) return results else: raise ValueError(f"Unknown scope: {scope}")
def _partition_non_rest_segments(self, leaves, rest_set): segments = [] current = [] for leaf in leaves: if leaf in rest_set: if len(current) >= 2: segments.append(tuple(current)) current = [] else: current.append(leaf) if len(current) >= 2: segments.append(tuple(current)) return segments def _validate_slur_segment(self, segment, reserved_sets=None): proposed_set = set(segment) for spec in self._slur_specs.values(): if proposed_set.intersection(spec['leaf_set']): raise ValueError("Slurs cannot overlap") if reserved_sets: for reserved_set in reserved_sets: if proposed_set.intersection(reserved_set): raise ValueError("Slurs cannot overlap within requested per-node applications") return proposed_set def _validate_slur_selection(self, selected, reserved_sets=None): if len(selected) < 2: raise ValueError("Slur requires at least two leaves") rest_set = {n for n in selected if self._rt[n].get('proportion', 1) < 0} segments = self._partition_non_rest_segments(selected, rest_set) if not segments: raise ValueError("Slur selection has no segment with at least two sounding leaves") proposed_set = set(selected) proposed_range = self._selection_index_range(selected) for spec in self._slur_specs.values(): if proposed_set.intersection(spec['leaf_set']): raise ValueError("Slurs cannot overlap") if reserved_sets: for reserved_set in reserved_sets: if proposed_set.intersection(reserved_set): raise ValueError("Slurs cannot overlap within requested per-node applications") return proposed_set, proposed_range def _register_slur(self, selected): proposed_set, proposed_range = self._validate_slur_selection(selected) slur_id = self._next_slur_id self._next_slur_id += 1 self._slur_specs[slur_id] = { 'leaf_nodes': selected, 'leaf_set': proposed_set, 'index_range': proposed_range } return slur_id
[docs] def apply_slur(self, node: Union[int, list, tuple, set], offset: int = 0, take: Union[int, None] = None, mode: Literal["span", "per_node"] = "span") -> Union[int, list[int]]: """ Apply a slur to a contiguous leaf span within this UC. Parameters ---------- node : int | list | tuple | set Node selector. A single node resolves to subtree leaves. An iterable can be treated either as one combined span (`mode=\"span\"`) or as independent per-node applications (`mode=\"per_node\"`). offset : int, default=0 Leaf offset into the resolved contiguous selection. take : int, optional Number of leaves to include from `offset`. If omitted, uses all leaves from `offset` to the end of the resolved selection. mode : {"span", "per_node"}, default="span" Selection interpretation mode. Returns ------- int | list[int] Slur identifier, or list of identifiers when `mode=\"per_node\"` applies. In per-node mode, the return value is always a list. Raises ------ ValueError If selection is invalid/non-contiguous, includes rests, overflows offset/take bounds, resolves to fewer than two leaves, overlaps an existing slur, or mode is invalid. """ if mode == "span": selected = self._resolve_leaf_selection(node=node) selected = self._apply_offset_take(selected, offset=offset, take=take) if len(selected) < 2: raise ValueError("Slur requires at least two leaves") rest_set = {n for n in selected if self._rt[n].get('proportion', 1) < 0} segments = self._partition_non_rest_segments(selected, rest_set) slur_ids = [] reserved_sets = [] for segment in segments: self._validate_slur_segment(segment, reserved_sets) slur_id = self._register_slur(segment) reserved_sets.append(set(segment)) slur_ids.append(slur_id) return slur_ids[0] if len(slur_ids) == 1 else slur_ids if mode == "per_node": groups = self._resolve_per_node_leaf_groups(node) slur_ids = [] reserved_sets = [] for group in groups: selected = self._apply_offset_take(group, offset=offset, take=take) rest_set = {n for n in selected if self._rt[n].get('proportion', 1) < 0} segments = self._partition_non_rest_segments(selected, rest_set) for segment in segments: self._validate_slur_segment(segment, reserved_sets) slur_id = self._register_slur(segment) reserved_sets.append(set(segment)) slur_ids.append(slur_id) return slur_ids raise ValueError(f"Unknown mode: {mode}")
def _split_slurs_for_rests(self, nodes_to_rest: set[int]): for slur_id, spec in list(self._slur_specs.items()): if not spec['leaf_set'].intersection(nodes_to_rest): continue leaves = list(spec['leaf_nodes']) segments = self._partition_non_rest_segments(leaves, nodes_to_rest) del self._slur_specs[slur_id] for segment in segments: self._register_slur(segment) def _invalidate_slurs_for_removed_nodes(self, removed_set): for slur_id, spec in list(self._slur_specs.items()): if not spec['leaf_set'].intersection(removed_set): continue remaining = [n for n in spec['leaf_nodes'] if n not in removed_set] del self._slur_specs[slur_id] if len(remaining) >= 2: rest_set = {n for n in remaining if self._rt[n].get('proportion', 1) < 0} segments = self._partition_non_rest_segments(remaining, rest_set) for segment in segments: self._register_slur(segment) def _heal_slurs_after_subdivide(self, old_leaf, new_leaves): for slur_id, spec in list(self._slur_specs.items()): if old_leaf not in spec['leaf_set']: continue old_nodes = list(spec['leaf_nodes']) idx = old_nodes.index(old_leaf) new_nodes = old_nodes[:idx] + list(new_leaves) + old_nodes[idx + 1:] del self._slur_specs[slur_id] rest_set = {n for n in new_nodes if self._rt[n].get('proportion', 1) < 0} segments = self._partition_non_rest_segments(new_nodes, rest_set) for segment in segments: self._register_slur(segment) def _heal_envelopes_after_subdivide(self, old_leaf, new_leaves): for desc in self._control_envelopes.values(): needs_rebake = False if desc["leaf_subset"] is not None and old_leaf in desc["leaf_subset"]: without_old = self._leaf_subset_subtract(desc["leaf_subset"], {old_leaf}) desc["leaf_subset"] = self._leaf_subset_union(without_old, new_leaves) needs_rebake = True elif desc["leaf_subset"] is None: ancestor_set = set(self._rt.descendants(desc["anchor_node"])) | {desc["anchor_node"]} if old_leaf in ancestor_set: needs_rebake = True if needs_rebake: self._rebake_control_envelope(desc) def _filter_envelopes_for_rests(self, affected_leaves): for env_id, desc in list(self._control_envelopes.items()): touched = False if desc["leaf_subset"] is not None: if self._leaf_subset_intersects(desc["leaf_subset"], affected_leaves): desc["leaf_subset"] = self._leaf_subset_subtract( desc["leaf_subset"], affected_leaves ) touched = True else: anchor_leaves = set(self._rt.subtree_leaves(desc["anchor_node"])) if anchor_leaves.intersection(affected_leaves): touched = True if not touched: continue if not self._resolve_control_envelope_leaves(desc): warnings.warn( "Control envelope removed: all target leaves are now rests", RuntimeWarning, stacklevel=3 ) del self._control_envelopes[env_id] else: self._rebake_control_envelope(desc) def _invalidate_envelopes_for_removed_nodes(self, removed_set): for env_id, desc in list(self._control_envelopes.items()): if desc["anchor_node"] in removed_set: warnings.warn( "Control envelope removed: anchor node was destroyed", RuntimeWarning, stacklevel=3 ) del self._control_envelopes[env_id] continue if (desc["leaf_subset"] is not None and self._leaf_subset_intersects(desc["leaf_subset"], removed_set)): desc["leaf_subset"] = self._leaf_subset_subtract( desc["leaf_subset"], removed_set ) if not self._resolve_control_envelope_leaves(desc): warnings.warn( "Control envelope removed: all target leaves were destroyed", RuntimeWarning, stacklevel=3 ) del self._control_envelopes[env_id] else: self._rebake_control_envelope(desc)
[docs] def make_rest(self, node) -> None: """ Make one or more nodes (and their subtrees) rests, splitting intersecting slurs and filtering control envelopes. When multiple nodes are passed, the affected-leaf set is collected across all of them before slur splitting and envelope filtering, so slurs/envelopes that touch the combined set are healed exactly once. Parameters ---------- node : int or iterable of int The node ID (or iterable of node IDs) to convert to rests. """ nodes = self._coerce_node_targets(node) affected: set = set() for n in nodes: affected.add(n) affected.update(self._rt.descendants(n)) affected_leaves = {n for n in affected if n in self._rt.leaf_nodes} self._split_slurs_for_rests(affected_leaves) super().make_rest(nodes) self._filter_envelopes_for_rests(affected_leaves)
[docs] def subdivide(self, node: int, S) -> None: """ Subdivide a leaf node with structure (D, S), syncing PT, cascading values, and healing slurs/control envelopes. Parameters ---------- node : int The leaf node to subdivide. S : tuple Valid subdivisions tuple (integers or nested (D, S) tuples). Raises ------ ValueError If the node is not found or is not a leaf. """ parent_data = self._pt[node].active_items() pfields = {k: v for k, v in parent_data.items() if k in self._pt._meta['pfields']} mfields = {k: v for k, v in parent_data.items() if k in self._pt._meta['mfields']} self._rt.subdivide(node, S) self._invalidate_timing_cache() new_children = list(self._rt.successors(node)) self._sync_pt_after_rt_subdivide(node, new_children, pfields, mfields) new_leaves = list(self._rt.subtree_leaves(node)) self._heal_slurs_after_subdivide(node, new_leaves) self._heal_envelopes_after_subdivide(node, new_leaves)
[docs] def add_child(self, parent, **attr): if 'label' in attr and 'proportion' not in attr: attr = dict(attr) attr['proportion'] = attr.pop('label') new_rt_node = self._rt.add_child(parent, **attr) self._pt.add_child(parent) return new_rt_node
[docs] def prune(self, node): removed_set = {node} self._invalidate_slurs_for_removed_nodes(removed_set) self._invalidate_envelopes_for_removed_nodes(removed_set) self._rt.prune(node) self._pt.prune(node)
[docs] def remove_subtree(self, node): removed_set = {node} | set(self._rt.descendants(node)) self._invalidate_slurs_for_removed_nodes(removed_set) self._invalidate_envelopes_for_removed_nodes(removed_set) self._rt.remove_subtree(node) self._pt.remove_subtree(node)
[docs] def set_instrument(self, node, instrument) -> None: """ Set an instrument for target node(s). Parameters ---------- node : int or list/tuple of int Target node(s). Single node: instrument set on that node, inherits to descendants. List of nodes: instrument evaluated once per node. instrument : Instrument, str, int, Pattern, or callable - Instrument: set directly on node with pfield defaults. If the instrument carries an ``_ensemble_family`` tag (i.e. it was accessed through an Ensemble family view), the ``group`` mfield is automatically set to the family name. - str: raw synth reference (defName for SC, tonejs_class for Tone.js) - int: raw program number (MIDI) - Pattern: next() called once per target node - Callable: evaluated once per target node (0-arg or 1-arg with DistributionContext) """ targets = self._coerce_node_targets(node) if isinstance(instrument, (str, int)): for n in targets: self._pt.set_instrument(n, instrument) elif isinstance(instrument, (Instrument, Effect)): family = getattr(instrument, '_ensemble_family', None) for n in targets: self._pt.set_instrument(n, instrument) if family is not None: self._pt.set_mfields(n, group=family) elif callable(instrument) or isinstance(instrument, Pattern): total = len(targets) for i, n in enumerate(targets): if isinstance(instrument, Pattern): inst = next(instrument) else: ctx = _build_pfield_context( self, n, i, total, is_rest=self._rt[n].get('proportion', 1) < 0 ) arity = _callable_arity(instrument) inst = instrument(ctx) if arity >= 1 else instrument() if inst is not None: self._pt.set_instrument(n, inst) family = getattr(inst, '_ensemble_family', None) if family is not None: self._pt.set_mfields(n, group=family)
[docs] def set(self, node, inst=None, mfields=None, pfields=None, include_rests=False): """ Convenience method to set instrument, meta fields, and parameter fields in one call. Parameters ---------- node : int or list/tuple of int Target node(s). inst : Instrument, Pattern, callable, or None, optional Instrument to assign. mfields : dict or None, optional Meta field names and values to set. pfields : dict or None, optional Parameter field names and values to set. include_rests : bool, optional When True, rest nodes are included during callable/Pattern distribution (default is False). """ if inst is not None: self.set_instrument(node, inst) if mfields is not None: self.set_mfields(node, include_rests=include_rests, **mfields) if pfields is not None: self.set_pfields(node, include_rests=include_rests, **pfields)
[docs] def sparsify(self, probability, node=None): """ Randomly convert sounding leaves to rests. Extends the base ``TemporalUnit.sparsify`` to accept a callable probability that receives a ``DistributionContext`` for each candidate leaf, enabling parameter-aware rest decisions. Parameters ---------- probability : float or callable If float, the fixed probability (0--1) of each leaf becoming a rest. If callable, receives a ``DistributionContext`` and returns True to rest the leaf. node : int or iterable of int, optional Restrict sparsification to this node's subtree leaves. If None, all leaves are candidates. """ if not callable(probability): super().sparsify(probability, node) return import numpy as _np if node is None: targets = list(self._rt.leaf_nodes) else: seen = set() targets = [] for n in self._coerce_node_targets(node): for leaf in self._rt.subtree_leaves(n): if leaf not in seen: seen.add(leaf) targets.append(leaf) targets = [n for n in targets if self._rt[n].get('proportion', 1) >= 0] total = len(targets) for i, leaf in enumerate(targets): ctx = _build_pfield_context(self, leaf, i, total, is_rest=False) if probability(ctx): self.make_rest(leaf)
[docs] def get_instrument(self, node: int): """Resolved instrument for node (nearest ancestor with instrument).""" return self._pt.get_instrument(node)
[docs] def get_pfield(self, node: int, key: str, default=None): """Parameter field value for node (PT only, no instrument fallback).""" value = self._pt.get_pfield(node, key) return default if value is None else value
[docs] def get_mfield(self, node: int, key: str, default=None): """Meta field value for node.""" value = self._pt.get_mfield(node, key) return default if value is None else value
[docs] def clear_parameters(self, node: int = None) -> None: """ Clear parameter values and intersecting overlays. Parameters ---------- node : int, optional Node ID to clear. If None, clears all nodes and all UC overlays. """ if node is None: self._slur_specs.clear() self._control_envelopes.clear() else: affected_nodes = {node} | set(self._rt.descendants(node)) affected_leaves = {n for n in affected_nodes if n in self._rt.leaf_nodes} self._split_slurs_for_rests(affected_leaves) self._invalidate_envelopes_for_removed_nodes(affected_nodes) self._pt.clear(node)
[docs] def get_event_parameters(self, idx: int) -> dict: """ Get all parameter values for a specific event by index. Parameters ---------- idx : int Parametron index Returns ------- dict Dictionary of parameter field names and values """ e = self[idx] return {'pfields': e.pfields, 'mfields': e.mfields}
[docs] def from_subtree(self, node: int) -> 'CompositionalUnit': """ Create a new CompositionalUnit from a subtree of this one. Preserves PT values and instrument assignments for mapped nodes. Preserves envelopes/slurs that are fully contained in the subtree leaf set; overlays crossing subtree boundaries are discarded. Parameters ---------- node : int The root node of the subtree to extract Returns ------- CompositionalUnit A new CompositionalUnit containing the subtree """ rt_subtree = self._rt.subtree(node, renumber=True) new_cu = self.__class__.from_rt(rt_subtree, beat=self.beat, bpm=self.bpm, pfields=None) original_subtree_nodes = [node] + list(self._rt.descendants(node)) old_to_new_mapping = self._rt.map_parallel_nodes( new_cu._rt, self_root=node, other_root=new_cu._rt.root, ) for old_node, new_node in old_to_new_mapping.items(): old_proportion = self._rt[old_node].get('proportion') if old_proportion is not None and old_proportion < 0: new_cu.make_rest(new_node) self._copy_pt_node_data(new_cu, old_to_new_mapping) subtree_node_set = set(original_subtree_nodes) governing_instrument_node = self._pt._resolve_governing_instrument_node(node) if (governing_instrument_node is not None and governing_instrument_node not in subtree_node_set and governing_instrument_node in self._pt._node_instruments): new_cu._pt.set_instrument( new_cu._pt.root, self._pt._node_instruments[governing_instrument_node] ) self._copy_pt_instruments(new_cu, old_to_new_mapping) old_leaf_set = set(self._rt.subtree_leaves(node)) for slur_id, slur_spec in self._slur_specs.items(): slur_leaf_set = set(slur_spec['leaf_nodes']) if slur_leaf_set and slur_leaf_set.issubset(old_leaf_set): mapped = [] for old_leaf in slur_spec['leaf_nodes']: if old_leaf in old_to_new_mapping: mapped.append(old_to_new_mapping[old_leaf]) if mapped: new_cu.apply_slur(node=mapped) subtree_node_set = set(original_subtree_nodes) for desc in self._control_envelopes.values(): if desc["anchor_node"] not in subtree_node_set: continue new_anchor = old_to_new_mapping[desc["anchor_node"]] new_leaf_subset = None if desc["leaf_subset"] is not None: mapped_leaves = tuple( old_to_new_mapping[n] for n in desc["leaf_subset"] if n in old_to_new_mapping ) if not mapped_leaves: continue new_leaf_subset = mapped_leaves new_env_id = new_cu._next_envelope_id new_cu._next_envelope_id += 1 new_cu._control_envelopes[new_env_id] = { "envelope": desc["envelope"], "pfields": list(desc["pfields"]), "endpoint": desc["endpoint"], "anchor_node": new_anchor, "leaf_subset": new_leaf_subset, } return new_cu
[docs] def copy(self): """ Create a deep copy of this CompositionalUnit. The copy preserves the original time signature (``tempus``), span, pfield / mfield data, instruments, envelopes, slurs, and internal placement (``_offset``) so that containers (``TemporalUnitSequence``, ``TemporalBlock``) and :class:`~klotho.thetos.composition.score.Score` can rebuild their layouts cleanly. Returns ------- CompositionalUnit A new CompositionalUnit with identical structure, parameters, instruments, envelopes, and slurs. """ c = self.__class__( span = self.span, tempus = self.tempus, prolatio = self.prolationis, beat = self.beat, bpm = self.bpm, ) old_to_new_mapping = self._rt.map_parallel_nodes( c._rt, self_root=self._rt.root, other_root=c._rt.root, ) for old_node, new_node in old_to_new_mapping.items(): old_proportion = self._rt[old_node].get('proportion') if old_proportion is not None and old_proportion < 0: if c._rt[new_node].get('proportion', 1) >= 0: c.make_rest(new_node) self._copy_pt_node_data(c, old_to_new_mapping) self._copy_pt_instruments(c, old_to_new_mapping) for slur_id, spec in self._slur_specs.items(): mapped_leaf_nodes = tuple( old_to_new_mapping[n] for n in spec['leaf_nodes'] if n in old_to_new_mapping ) if not mapped_leaf_nodes: continue c._slur_specs[slur_id] = { 'leaf_nodes': mapped_leaf_nodes, 'leaf_set': set(mapped_leaf_nodes), 'index_range': tuple(c._selection_index_range(mapped_leaf_nodes)), } c._next_slur_id = self._next_slur_id for env_id, desc in self._control_envelopes.items(): if desc["anchor_node"] not in old_to_new_mapping: continue mapped_leaf_subset = ( tuple( old_to_new_mapping[n] for n in desc["leaf_subset"] if n in old_to_new_mapping ) if desc["leaf_subset"] is not None else None ) c._control_envelopes[env_id] = { "envelope": desc["envelope"], "pfields": list(desc["pfields"]), "endpoint": desc["endpoint"], "anchor_node": old_to_new_mapping[desc["anchor_node"]], "leaf_subset": mapped_leaf_subset, } c._next_envelope_id = self._next_envelope_id c._offset = self._offset c._invalidate_timing_cache() return c