import copy
from klotho.tonos import Pitch
from klotho.tonos.pitch.pitch_collections import PitchCollectionBase
from klotho.tonos.chords.chord import Chord, Voicing, ChordSequence
from klotho.tonos.scales.scale import Scale
from klotho.tonos.systems.harmonic_trees import Spectrum, HarmonicTree
from klotho.chronos.rhythm_trees.rhythm_tree import RhythmTree
from klotho.chronos.temporal_units.temporal import TemporalUnit, TemporalUnitSequence, TemporalBlock
from klotho.thetos.composition.compositional import CompositionalUnit
from klotho.thetos.instruments.tone import ToneInstrument
from klotho.utils.playback._amplitude import single_voice_amplitude, compute_voice_amplitudes
from klotho.utils.playback._converter_base import (
DEFAULT_NOTE_DURATION, DEFAULT_CHORD_DURATION,
DEFAULT_SPECTRUM_DURATION, DEFAULT_DRUM_FREQ,
KNOWN_KWARGS,
_get_addressed_collection, _merge_pfields,
scale_pitch_sequence, extract_convert_kwargs, lower_event_ir_to_voice_events, iter_group_sequence,
)
def _payload(events, instruments=None):
return {"events": events, "instruments": instruments or {}}
def _deep_merge(base, override):
merged = copy.deepcopy(base)
for key, value in override.items():
if isinstance(value, dict) and isinstance(merged.get(key), dict):
merged[key] = _deep_merge(merged[key], value)
else:
merged[key] = value
return merged
def _normalize_event_pfields(pfields):
if 'vel' not in pfields and 'amp' in pfields:
pfields['vel'] = pfields['amp']
if 'amp' not in pfields and 'vel' in pfields:
pfields['amp'] = pfields['vel']
if 'freq' not in pfields:
pfields['freq'] = 440.0
return pfields
def _build_seq_events(pitches, start, instrument, amp=None, per_voice_dur=None,
total_dur=None, pause=0.0, extra_pfields=None):
events = []
n = len(pitches)
if n == 0:
return events
if total_dur is not None:
voice_dur = total_dur / n
elif per_voice_dur is not None:
voice_dur = per_voice_dur
else:
voice_dur = DEFAULT_NOTE_DURATION
cursor = start
for i, pitch in enumerate(pitches):
pf = {
"freq": pitch.freq,
"vel": single_voice_amplitude(pitch.freq, amp),
}
pf = _merge_pfields(pf, extra_pfields)
events.append({
"start": cursor,
"duration": voice_dur,
"instrument": instrument,
"pfields": pf,
})
cursor += voice_dur + max(0.0, pause)
return events
def _build_chord_events(pitches, start, dur, strum, instrument, amp=None,
dur_factor=1.0, extra_pfields=None):
events = []
num_notes = len(pitches)
if num_notes == 0:
return events
freqs = [p.freq for p in pitches]
voice_amps = compute_voice_amplitudes(freqs, amp)
strum = max(0, min(1, strum))
for i, pitch in enumerate(pitches):
start_offset = (strum * dur * i) / num_notes if num_notes > 1 else 0
pf = {
"freq": pitch.freq,
"vel": voice_amps[i],
}
pf = _merge_pfields(pf, extra_pfields)
events.append({
"start": start + start_offset,
"duration": (dur * dur_factor) - start_offset,
"instrument": instrument,
"pfields": pf,
})
return events
[docs]
def compositional_unit_to_events(obj, extra_pfields=None, animation=False):
events = []
instruments = {}
inst_id_map = {}
leaf_nodes = obj._rt.leaf_nodes if animation else None
node_to_step = ({nid: idx for idx, nid in enumerate(leaf_nodes)}
if animation else None)
time_offset = 0
if animation:
time_offset = min(ev.start for ev in obj if not ev.is_rest) if any(not ev.is_rest for ev in obj) else 0
for event in obj:
step_idx = node_to_step.get(event.node_id, None) if animation else None
start_time = event.start - time_offset if animation else event.start
if event.is_rest:
if animation:
events.append({
"start": start_time,
"duration": abs(event.duration),
"instrument": "__rest__",
"pfields": {},
"_stepIndex": step_idx,
})
continue
instrument = obj.get_instrument(event.node_id)
if instrument is None:
instrument = ToneInstrument('Synth', 'Synth')
elif isinstance(instrument, str):
instrument = ToneInstrument(instrument, instrument)
elif not isinstance(instrument, ToneInstrument):
if animation:
events.append({
"start": start_time,
"duration": abs(event.duration),
"instrument": "__rest__",
"pfields": {},
"_stepIndex": step_idx,
})
continue
inst_identity = id(instrument)
if inst_identity not in inst_id_map:
key = instrument.name
if key in instruments:
existing = instruments[key]
if existing['tonejs_class'] != instrument.tonejs_class or existing['preset'] != instrument.pfields:
raise ValueError(
f"Instrument name '{key}' is used by two different instruments. "
f"Use distinct names."
)
else:
instruments[key] = {
'tonejs_class': instrument.tonejs_class,
'preset': instrument.pfields
}
inst_id_map[inst_identity] = key
routing_key = inst_id_map[inst_identity]
expanded_voices = lower_event_ir_to_voice_events(event, step_index=step_idx)
for expanded_voice in expanded_voices:
pfields = {k: v for k, v in expanded_voice["pfields"].items()
if k != 'group'}
pfields = _normalize_event_pfields(pfields)
default_pfields = instruments[routing_key].get('preset', {})
effective_pfields = _deep_merge(default_pfields, pfields)
effective_pfields = _merge_pfields(effective_pfields, extra_pfields)
voice_start = expanded_voice["start"] - time_offset if animation else expanded_voice["start"]
ev_data = {
"start": voice_start,
"duration": expanded_voice["duration"],
"instrument": routing_key,
"pfields": effective_pfields,
"_polyGroupId": expanded_voice["poly_group_id"],
"_logicalStepId": expanded_voice["logical_step_id"],
"_polyVoiceIndex": expanded_voice["poly_voice_index"],
"_polyVoiceCount": expanded_voice["poly_voice_count"],
"_polyLeader": expanded_voice["poly_is_leader"],
"_animate": expanded_voice["animate"],
}
if animation:
ev_data["_stepIndex"] = step_idx
events.append(ev_data)
events.sort(key=lambda ev: ev["start"])
return _payload(events, instruments)
[docs]
def compositional_unit_to_animation_events(obj, extra_pfields=None):
return compositional_unit_to_events(obj, extra_pfields=extra_pfields, animation=True)
[docs]
def pitch_to_events(pitch, duration=None, amp=None, extra_pfields=None):
dur = duration if duration is not None else 1.0
pf = {
"freq": pitch.freq,
"vel": single_voice_amplitude(pitch.freq, amp),
}
pf = _merge_pfields(pf, extra_pfields)
return _payload([{
"start": 0.0,
"duration": dur,
"instrument": "synth",
"pfields": pf,
}])
[docs]
def pitch_collection_to_events(obj, duration=None, mode="seq", arp=False, strum=0, direction='u',
amp=None, pause=0.0, extra_pfields=None):
addressed = _get_addressed_collection(obj)
pitches = [addressed[i] for i in range(len(addressed))]
if mode == "chord":
pitches = sorted(pitches, key=lambda p: p.freq)
if direction.lower() == 'd':
pitches = list(reversed(pitches))
if arp:
dur = duration if duration is not None else DEFAULT_CHORD_DURATION
return _payload(_build_seq_events(pitches, 0, "synth", amp=amp,
total_dur=dur, pause=0.0, extra_pfields=extra_pfields))
else:
dur = duration if duration is not None else DEFAULT_CHORD_DURATION
return _payload(_build_chord_events(pitches, 0, dur, strum, "synth",
amp=amp, extra_pfields=extra_pfields))
else:
dur = duration if duration is not None else DEFAULT_NOTE_DURATION
return _payload(_build_seq_events(pitches, 0, "synth", amp=amp,
per_voice_dur=dur, pause=pause, extra_pfields=extra_pfields))
[docs]
def scale_to_events(obj, duration=None, equaves=1, amp=None, pause=0.0, extra_pfields=None):
dur = duration if duration is not None else DEFAULT_NOTE_DURATION
all_pitches = scale_pitch_sequence(obj, equaves)
return _payload(_build_seq_events(all_pitches, 0, "synth", amp=amp,
per_voice_dur=dur, pause=pause, extra_pfields=extra_pfields))
[docs]
def chord_to_events(obj, duration=None, arp=False, strum=0, direction='u',
amp=None, extra_pfields=None):
addressed = _get_addressed_collection(obj)
pitches = [addressed[i] for i in range(len(addressed))]
if direction.lower() == 'd':
pitches = list(reversed(pitches))
if arp:
dur = duration if duration is not None else DEFAULT_CHORD_DURATION
return _payload(_build_seq_events(pitches, 0, "synth", amp=amp,
total_dur=dur, pause=0.0, extra_pfields=extra_pfields))
else:
dur = duration if duration is not None else DEFAULT_CHORD_DURATION
return _payload(_build_chord_events(pitches, 0, dur, strum, "synth",
amp=amp, extra_pfields=extra_pfields))
[docs]
def chord_sequence_to_events(obj, duration=None, arp=False, strum=0, direction='u',
amp=None, pause=0.25, extra_pfields=None):
events = []
dur = duration if duration is not None else DEFAULT_CHORD_DURATION
groups = []
for chord in obj:
addressed = _get_addressed_collection(chord)
groups.append([addressed[i] for i in range(len(addressed))])
group_voice_amps = [
compute_voice_amplitudes([p.freq for p in group], amp)
for group in groups
]
if arp:
for _, _, start, voice_dur, pitch in iter_group_sequence(groups, dur, arp=True, direction=direction, pause=pause):
pf = {
"freq": pitch.freq,
"vel": single_voice_amplitude(pitch.freq, amp),
}
pf = _merge_pfields(pf, extra_pfields)
events.append({
"start": start,
"duration": voice_dur,
"instrument": "synth",
"pfields": pf,
})
else:
for gi, vi, start, voice_dur, pitch in iter_group_sequence(groups, dur, arp=False, strum=strum, direction=direction, pause=pause):
pf = {
"freq": pitch.freq,
"vel": group_voice_amps[gi][vi],
}
pf = _merge_pfields(pf, extra_pfields)
events.append({
"start": start,
"duration": voice_dur,
"instrument": "synth",
"pfields": pf,
})
return _payload(events)
[docs]
def spectrum_to_events(obj, duration=None, arp=False, strum=0, direction='u',
amp=None, extra_pfields=None):
pitches = [row['pitch'] for _, row in obj.data.iterrows()]
if direction.lower() == 'd':
pitches = list(reversed(pitches))
target = amp if amp is not None else 0.4
if arp:
dur = duration if duration is not None else DEFAULT_SPECTRUM_DURATION
return _payload(_build_seq_events(pitches, 0, "sine", amp=target,
total_dur=dur, pause=0.0, extra_pfields=extra_pfields))
else:
dur = duration if duration is not None else DEFAULT_SPECTRUM_DURATION
return _payload(_build_chord_events(pitches, 0, dur, strum, "sine",
amp=target, extra_pfields=extra_pfields))
[docs]
def temporal_unit_to_events(obj, use_absolute_time=False, amp=None, extra_pfields=None,
animation=False):
events = []
target = amp if amp is not None else 0.85
leaf_nodes = obj._rt.leaf_nodes if animation else None
node_to_step = ({nid: idx for idx, nid in enumerate(leaf_nodes)}
if animation else None)
if use_absolute_time:
time_offset = 0
else:
time_offset = min(chronon.start for chronon in obj if not chronon.is_rest) if any(not chronon.is_rest for chronon in obj) else 0
for chronon in obj:
start_time = chronon.start - time_offset
duration = abs(chronon.duration)
step_idx = node_to_step.get(chronon.node_id, None) if animation else None
if chronon.is_rest:
if animation:
events.append({
"start": start_time,
"duration": duration,
"instrument": "__rest__",
"pfields": {},
"_stepIndex": step_idx,
})
continue
pf = {
"freq": DEFAULT_DRUM_FREQ,
"vel": target,
}
pf = _merge_pfields(pf, extra_pfields)
ev_data = {
"start": start_time,
"duration": duration,
"instrument": "membrane",
"pfields": pf,
}
if animation:
ev_data["_stepIndex"] = step_idx
events.append(ev_data)
if animation:
events.sort(key=lambda ev: ev["start"])
return _payload(events)
[docs]
def temporal_unit_to_animation_events(obj, use_absolute_time=False, amp=None, extra_pfields=None):
return temporal_unit_to_events(obj, use_absolute_time=use_absolute_time, amp=amp,
extra_pfields=extra_pfields, animation=True)
[docs]
def rhythm_tree_to_events(obj, beat=None, bpm=None, amp=None, extra_pfields=None):
temporal_unit = TemporalUnit.from_rt(obj, beat=beat, bpm=bpm)
return temporal_unit_to_events(temporal_unit, use_absolute_time=False,
amp=amp, extra_pfields=extra_pfields)
[docs]
def rhythm_tree_to_animation_events(obj, beat=None, bpm=None, amp=None, extra_pfields=None):
temporal_unit = TemporalUnit.from_rt(obj, beat=beat, bpm=bpm)
return temporal_unit_to_animation_events(temporal_unit, use_absolute_time=False,
amp=amp, extra_pfields=extra_pfields)
def _merge_sub_payload(target_events, target_instruments, sub_payload):
target_events.extend(sub_payload["events"])
for key, val in sub_payload["instruments"].items():
if key in target_instruments:
existing = target_instruments[key]
if existing['tonejs_class'] != val['tonejs_class'] or existing['preset'] != val['preset']:
raise ValueError(
f"Instrument name '{key}' is used by two different instruments. "
f"Use distinct names."
)
else:
target_instruments[key] = val
def _shift_payload_events_to_zero(payload):
events = payload["events"]
if not events:
return payload
min_start = min(ev.get("start", 0.0) for ev in events)
if min_start == 0.0:
return payload
for ev in events:
ev["start"] = ev.get("start", 0.0) - min_start
return payload
[docs]
def temporal_sequence_to_events(obj, extra_pfields=None, rebase_to_zero=True):
events = []
instruments = {}
for unit in obj:
if isinstance(unit, CompositionalUnit):
_merge_sub_payload(events, instruments, compositional_unit_to_events(unit, extra_pfields=None))
elif isinstance(unit, TemporalUnit):
_merge_sub_payload(events, instruments, temporal_unit_to_events(unit, use_absolute_time=True, extra_pfields=extra_pfields))
elif isinstance(unit, TemporalUnitSequence):
_merge_sub_payload(events, instruments, temporal_sequence_to_events(unit, extra_pfields=extra_pfields, rebase_to_zero=False))
elif isinstance(unit, TemporalBlock):
_merge_sub_payload(events, instruments, temporal_block_to_events(unit, extra_pfields=extra_pfields, rebase_to_zero=False))
events.sort(key=lambda ev: ev["start"])
payload = _payload(events, instruments)
if rebase_to_zero:
_shift_payload_events_to_zero(payload)
return payload
[docs]
def temporal_block_to_events(obj, extra_pfields=None, rebase_to_zero=True):
events = []
instruments = {}
for row in obj:
if isinstance(row, CompositionalUnit):
_merge_sub_payload(events, instruments, compositional_unit_to_events(row, extra_pfields=None))
elif isinstance(row, TemporalUnit):
_merge_sub_payload(events, instruments, temporal_unit_to_events(row, use_absolute_time=True, extra_pfields=extra_pfields))
elif isinstance(row, TemporalUnitSequence):
_merge_sub_payload(events, instruments, temporal_sequence_to_events(row, extra_pfields=extra_pfields, rebase_to_zero=False))
elif isinstance(row, TemporalBlock):
_merge_sub_payload(events, instruments, temporal_block_to_events(row, extra_pfields=extra_pfields, rebase_to_zero=False))
events.sort(key=lambda ev: ev["start"])
payload = _payload(events, instruments)
if rebase_to_zero:
_shift_payload_events_to_zero(payload)
return payload
[docs]
def convert_to_events(obj, **kwargs):
kw = extract_convert_kwargs(kwargs)
duration = kw['duration']
arp = kw['arp']
mode = kw['mode']
strum = kw['strum']
direction = kw['direction']
equaves = kw['equaves']
beat = kw['beat']
bpm = kw['bpm']
amp = kw['amp']
pause = kw['pause']
extra_pfields = kw['extra_pfields']
if isinstance(obj, Pitch):
return pitch_to_events(obj, duration=duration, amp=amp, extra_pfields=extra_pfields)
if isinstance(obj, Spectrum):
return spectrum_to_events(obj, duration=duration, arp=arp, strum=strum, direction=direction,
amp=amp, extra_pfields=extra_pfields)
if isinstance(obj, HarmonicTree):
spectrum = Spectrum(Pitch("C4"), list(obj.partials) if hasattr(obj, 'partials') else [1, 2, 3, 4, 5])
return spectrum_to_events(spectrum, duration=duration, arp=arp, strum=strum, direction=direction,
amp=amp, extra_pfields=extra_pfields)
if isinstance(obj, RhythmTree):
return rhythm_tree_to_events(obj, beat=beat, bpm=bpm, amp=amp, extra_pfields=extra_pfields)
if isinstance(obj, TemporalUnitSequence):
return temporal_sequence_to_events(obj, extra_pfields=extra_pfields)
if isinstance(obj, TemporalBlock):
return temporal_block_to_events(obj, extra_pfields=extra_pfields)
if isinstance(obj, CompositionalUnit):
return compositional_unit_to_events(obj, extra_pfields=None)
if isinstance(obj, TemporalUnit):
return temporal_unit_to_events(obj, amp=amp, extra_pfields=extra_pfields)
if isinstance(obj, ChordSequence):
return chord_sequence_to_events(obj, duration=duration, arp=arp, strum=strum, direction=direction,
amp=amp, pause=(0.25 if pause is None else pause), extra_pfields=extra_pfields)
if isinstance(obj, Scale):
return scale_to_events(obj, duration=duration, equaves=equaves, amp=amp,
pause=(0.0 if pause is None else pause), extra_pfields=extra_pfields)
if isinstance(obj, (Chord, Voicing)):
return chord_to_events(obj, duration=duration, arp=arp, strum=strum, direction=direction,
amp=amp, extra_pfields=extra_pfields)
if isinstance(obj, PitchCollectionBase):
effective_mode = mode if mode else "sequential"
if effective_mode == "chord":
return pitch_collection_to_events(obj, duration=duration, mode="chord", arp=arp, strum=strum,
direction=direction, amp=amp,
pause=0.0,
extra_pfields=extra_pfields)
return pitch_collection_to_events(obj, duration=duration, mode="sequential",
amp=amp, pause=(0.0 if pause is None else pause),
extra_pfields=extra_pfields)
raise TypeError(f"Unsupported object type: {type(obj)}")