Source code for klotho.utils.playback.midi_player

from mido import Message, MidiFile, MidiTrack, MetaMessage
from IPython.display import Audio
import os
import tempfile
import urllib.request
import subprocess
import sys
from contextlib import redirect_stdout, redirect_stderr
from io import StringIO
from collections import deque
import math

# Optional imports for different environments
try:
    from midi2audio import FluidSynth
    HAS_FLUIDSYNTH = True
except ImportError:
    HAS_FLUIDSYNTH = False
from klotho.chronos.rhythm_trees.rhythm_tree import RhythmTree
from klotho.chronos.temporal_units.temporal import TemporalUnit, TemporalUnitSequence, TemporalBlock
from klotho.thetos.composition.compositional import CompositionalUnit
from klotho.thetos.instruments.midi import MidiInstrument
from klotho.tonos.pitch.pitch_collections import PitchCollectionBase
from klotho.tonos.pitch.pitch import Pitch
from klotho.tonos.scales.scale import Scale
from klotho.tonos.chords.chord import Chord, Voicing, ChordSequence
from klotho.utils.playback._converter_base import lower_event_ir_to_voice_events

DEFAULT_DRUM_NOTE = 77
PERCUSSION_CHANNEL = 9
DEFAULT_VELOCITY = 120
TICKS_PER_BEAT = 480

# Multi-Port MIDI Configuration
# The new system supports up to 256 channels across 16 ports (16 channels per port)
# Channels are dynamically allocated per voice with proper bank/program management
# Drum channels use Bank 128, melodic channels use Bank 0
# Channel 9 (10 in 1-based) is no longer globally reserved - it can be used for melodic or drum sounds
# This ensures EXACT microtonal tuning with proper voice independence

SOUNDFONT_URL = "https://ftp.osuosl.org/pub/musescore/soundfont/MuseScore_General/MuseScore_General.sf3"
SOUNDFONT_PATH = os.path.expanduser("~/.fluidsynth/default_sound_font.sf3")

def _is_colab():
    """Check if we're running in Google Colab."""
    try:
        import google.colab
        return True
    except ImportError:
        return False

def _ensure_soundfont():
    """Download and install a SoundFont if none exists."""
    sf_dir = os.path.dirname(SOUNDFONT_PATH)
    if not os.path.exists(sf_dir):
        os.makedirs(sf_dir)
    
    if not os.path.exists(SOUNDFONT_PATH):
        print("Downloading SoundFont for MIDI playback (one-time setup)...")
        try:
            urllib.request.urlretrieve(SOUNDFONT_URL, SOUNDFONT_PATH)
            print("SoundFont installed successfully!")
        except Exception as e:
            print(f"Could not download SoundFont: {e}")
            return None
    
    return SOUNDFONT_PATH



[docs] def play_midi(obj, dur=None, arp=False, prgm=0, max_channels=128, max_polyphony=None, soundfont_path=None, bend_sensitivity_semitones=12, debug=False, **kwargs): """ Play a musical object as MIDI audio in Jupyter/Colab notebooks. Automatically detects the environment and uses appropriate MIDI synthesis: - Google Colab: Uses FluidSynth CLI with multi-channel support - Local Jupyter: Uses FluidSynth if available - Fallback: Returns MIDI file for download Parameters ---------- obj : RhythmTree, TemporalUnit, CompositionalUnit, TemporalUnitSequence, TemporalBlock, PitchCollection, Scale, Chord, Voicing, or ChordSequence The musical object to play. Different object types have different playback behaviors: - RhythmTree/TemporalUnit: Rhythmic playback with default pitch - PitchCollection: Sequential pitch playback - Scale: Ascending then descending playback - Chord/Voicing: Block chord or arpeggiated playback - ChordSequence: Sequential playback of chords dur : float, optional Duration in seconds. Defaults depend on object type: - PitchCollection/Scale: 0.5 seconds per note - Chord/Voicing: 3.0 seconds total (or per note if arpeggiated) - ChordSequence: 3.0 seconds per chord arp : bool, optional For chords only: if True, arpeggiate the chord (default False) prgm : int, optional MIDI program number (0-127) for instrument sound (default 0 = Acoustic Grand Piano) max_channels : int, optional Maximum number of MIDI channels to allocate across all ports (default 128) Supports up to 256 channels across 16 ports max_polyphony : int, optional Maximum polyphony for synthesis (default equals max_channels) soundfont_path : str, optional Path to custom soundfont file (uses system default if None) bend_sensitivity_semitones : int, optional Pitch bend range in semitones (default 12, supports 12 or 24) debug : bool, optional Enable debug logging for channel allocation (default False) **kwargs Additional arguments passed to MIDI creation functions Returns ------- IPython.display.Audio or IPython.display.FileLink Audio widget for playback in Jupyter notebooks, or file link if audio synthesis is unavailable Notes ----- For Google Colab, FluidSynth is automatically installed if needed. For local environments, install FluidSynth for best results. The new backend supports true independence for up to 256 simultaneous voices with proper microtonal pitch bend and dynamic drum channel allocation. """ # Reset any global state to ensure clean slate for each call _reset_microtonal_counter() match obj: case TemporalUnitSequence() | TemporalBlock(): midi_file = _create_midi_from_collection(obj, max_channels=max_channels, bend_sensitivity_semitones=bend_sensitivity_semitones, debug=debug) case CompositionalUnit(): midi_file = _create_midi_from_compositional_unit(obj, max_channels=max_channels, bend_sensitivity_semitones=bend_sensitivity_semitones, debug=debug) case TemporalUnit(): midi_file = _create_midi_from_temporal_unit(obj, max_channels=max_channels, bend_sensitivity_semitones=bend_sensitivity_semitones, debug=debug) case RhythmTree(): temporal_unit = TemporalUnit.from_rt(obj) midi_file = _create_midi_from_temporal_unit(temporal_unit, max_channels=max_channels, bend_sensitivity_semitones=bend_sensitivity_semitones, debug=debug) case ChordSequence(): midi_file = _create_midi_from_chord_sequence(obj, dur=dur or 3.0, arp=arp, prgm=prgm, max_channels=max_channels, bend_sensitivity_semitones=bend_sensitivity_semitones, debug=debug) case PitchCollectionBase(): if isinstance(obj, Scale): midi_file = _create_midi_from_scale(obj, dur=dur or 0.5, prgm=prgm, max_channels=max_channels, bend_sensitivity_semitones=bend_sensitivity_semitones, debug=debug) elif isinstance(obj, (Chord, Voicing)): midi_file = _create_midi_from_chord(obj, dur=dur or 3.0, arp=arp, prgm=prgm, max_channels=max_channels, bend_sensitivity_semitones=bend_sensitivity_semitones, debug=debug) else: midi_file = _create_midi_from_pitch_collection(obj, dur=dur or 0.5, prgm=prgm, max_channels=max_channels, bend_sensitivity_semitones=bend_sensitivity_semitones, debug=debug) case _: raise TypeError(f"Unsupported object type: {type(obj)}. Supported types: RhythmTree, TemporalUnit, CompositionalUnit, TemporalUnitSequence, TemporalBlock, PitchCollection, Scale, Chord, Voicing, ChordSequence.") return _midi_to_audio(midi_file, soundfont_path=soundfont_path, max_polyphony=max_polyphony)
[docs] def create_midi(obj, dur=None, arp=False, prgm=0, max_channels=128, max_polyphony=None, bend_sensitivity_semitones=12, debug=False, **kwargs): """ Create a MIDI file from a musical object without audio synthesis. This function creates the exact same MIDI file as play_midi() but returns the MidiFile object directly instead of converting to audio. Parameters ---------- obj : RhythmTree, TemporalUnit, CompositionalUnit, TemporalUnitSequence, TemporalBlock, PitchCollection, Scale, Chord, Voicing, or ChordSequence The musical object to convert to MIDI. Same as play_midi(). dur : float, optional Duration in seconds. Same as play_midi(). arp : bool, optional For chords only: if True, arpeggiate the chord. Same as play_midi(). prgm : int, optional MIDI program number (0-127) for instrument sound. Same as play_midi(). max_channels : int, optional Maximum number of MIDI channels to allocate. Same as play_midi(). max_polyphony : int, optional Unused in MIDI creation, kept for API compatibility. bend_sensitivity_semitones : int, optional Pitch bend range in semitones. Same as play_midi(). debug : bool, optional Enable debug logging. Same as play_midi(). **kwargs Additional arguments. Same as play_midi(). Returns ------- MidiFile The MIDI file object that can be saved with midi_file.save('filename.mid') Examples -------- >>> from klotho.chronos.rhythm_trees.rhythm_tree import RhythmTree >>> rt = RhythmTree([1, [1, 1], 1]) >>> midi_file = create_midi(rt) >>> midi_file.save('my_rhythm.mid') """ # Reset any global state to ensure clean slate for each call _reset_microtonal_counter() # Use the exact same logic as play_midi() for MIDI creation match obj: case TemporalUnitSequence() | TemporalBlock(): midi_file = _create_midi_from_collection(obj, max_channels=max_channels, bend_sensitivity_semitones=bend_sensitivity_semitones, debug=debug) case CompositionalUnit(): midi_file = _create_midi_from_compositional_unit(obj, max_channels=max_channels, bend_sensitivity_semitones=bend_sensitivity_semitones, debug=debug) case TemporalUnit(): midi_file = _create_midi_from_temporal_unit(obj, max_channels=max_channels, bend_sensitivity_semitones=bend_sensitivity_semitones, debug=debug) case RhythmTree(): temporal_unit = TemporalUnit.from_rt(obj) midi_file = _create_midi_from_temporal_unit(temporal_unit, max_channels=max_channels, bend_sensitivity_semitones=bend_sensitivity_semitones, debug=debug) case ChordSequence(): midi_file = _create_midi_from_chord_sequence(obj, dur=dur or 3.0, arp=arp, prgm=prgm, max_channels=max_channels, bend_sensitivity_semitones=bend_sensitivity_semitones, debug=debug) case PitchCollectionBase(): if isinstance(obj, Scale): midi_file = _create_midi_from_scale(obj, dur=dur or 0.5, prgm=prgm, max_channels=max_channels, bend_sensitivity_semitones=bend_sensitivity_semitones, debug=debug) elif isinstance(obj, (Chord, Voicing)): midi_file = _create_midi_from_chord(obj, dur=dur or 3.0, arp=arp, prgm=prgm, max_channels=max_channels, bend_sensitivity_semitones=bend_sensitivity_semitones, debug=debug) else: midi_file = _create_midi_from_pitch_collection(obj, dur=dur or 0.5, prgm=prgm, max_channels=max_channels, bend_sensitivity_semitones=bend_sensitivity_semitones, debug=debug) case _: raise TypeError(f"Unsupported object type: {type(obj)}. Supported types: RhythmTree, TemporalUnit, CompositionalUnit, TemporalUnitSequence, TemporalBlock, PitchCollection, Scale, Chord, Voicing, ChordSequence.") return midi_file
def _create_midi_from_temporal_unit(temporal_unit, max_channels=128, bend_sensitivity_semitones=12, debug=False): """Create a MIDI file from a TemporalUnit using absolute timing.""" # PRD: Use absolute timing only - ignore BPM/tempo from temporal objects # Always use 4/4 at 120 BPM for MIDI file, rely on absolute start/duration times bpm = 120 # For TemporalUnit: Use SINGLE drum channel for all notes (ORIGINAL BEHAVIOR) # TemporalUnit represents rhythmic patterns - all notes should use drum channel 9 # Same drum note, same velocity, same channel - exactly as original writer = MultiPortMidiWriter(max_voices=1) # Only need 1 voice for single channel allocator = ChannelAllocator(writer.num_ports, bend_sensitivity_semitones=bend_sensitivity_semitones) # Add global meta events - always 4/4 at 120 BPM per PRD writer.add_meta_event(MetaMessage('set_tempo', tempo=int(60_000_000 / bpm), time=0)) # Allocate ONE drum channel for the entire TemporalUnit (ORIGINAL BEHAVIOR) try: port, channel = allocator.allocate_voice("temporal_drum_channel", is_drum=True, program=1) if debug: print(f"[DEBUG] TemporalUnit: Using single drum channel - port={port}, channel={channel}") except RuntimeError as e: if debug: print(f"[DEBUG] Failed to allocate drum channel: {e}") # Fallback: use channel 9 directly port, channel = 0, 9 note_events = [] # For standalone playback: subtract offset to start from beginning (DO NOT MUTATE OBJECT) min_start_time = min(chronon.start for chronon in temporal_unit if not chronon.is_rest) if any(not chronon.is_rest for chronon in temporal_unit) else 0 for chronon in temporal_unit: if not chronon.is_rest: # Use absolute timing from chronon, subtract offset for standalone playback start_time = chronon.start - min_start_time duration = abs(chronon.duration) # All notes use the SAME drum channel (ORIGINAL BEHAVIOR RESTORED) note_events.append({ 'voice_id': "temporal_drum_channel", 'port': port, 'channel': channel, # Same channel for all notes - ORIGINAL BEHAVIOR 'start_time': start_time, 'duration': duration, 'midi_note': DEFAULT_DRUM_NOTE, # Same drum note - ORIGINAL BEHAVIOR 'velocity': DEFAULT_VELOCITY, # Same velocity - ORIGINAL BEHAVIOR 'program': 1, # GM Standard Drum Kit 'is_drum': True, 'pitch_bend': None }) if debug: print(f"[DEBUG] TemporalUnit: {len(note_events)} notes, all using channel {channel}") # Generate MIDI events with special handling for same-channel overlapping notes _generate_temporal_unit_events(note_events, writer, allocator, bpm, debug) # Ensure MIDI file duration matches the TemporalUnit's total duration writer.finalize(bpm) return writer.get_midi_file() def _create_midi_from_compositional_unit(compositional_unit, max_channels=128, bend_sensitivity_semitones=12, debug=False): """Create a MIDI file from a CompositionalUnit using absolute timing.""" # Check if all instruments are drums OR no instruments (assume rhythmic) - use simple collection approach all_drums = True has_explicit_instruments = False has_melodic_instruments = False for event in compositional_unit: if not event.is_rest: instrument = compositional_unit.get_instrument(event._node_id) if isinstance(instrument, MidiInstrument): has_explicit_instruments = True if not instrument.is_Drum: has_melodic_instruments = True all_drums = False break # If no explicit instrument, assume rhythmic content (drums) # Always use multi-port approach for voice independence and 256-channel support if debug: instrument_types = [] for event in compositional_unit: if not event.is_rest: instrument = compositional_unit.get_instrument(event._node_id) if isinstance(instrument, MidiInstrument): instrument_types.append("drum" if instrument.is_Drum else "melodic") else: instrument_types.append("unknown") print(f"[DEBUG] CompositionalUnit: {len(instrument_types)} events, types={set(instrument_types)}, using multi-port approach") # PRD: Use absolute timing only - ignore BPM/tempo from temporal objects # Always use 4/4 at 120 BPM for MIDI file, rely on absolute start/duration times bpm = 120 # For CompositionalUnit: Use FULL implementation capabilities for voice independence # Each note gets its own channel for microtonal support and voice independence num_notes = 0 for event in compositional_unit: if not event.is_rest: num_notes += len(lower_event_ir_to_voice_events(event)) # Calculate how many drum and melodic notes we have num_drums = 0 num_melodic = 0 for event in compositional_unit: if not event.is_rest: instrument = compositional_unit.get_instrument(event._node_id) voice_count = len(lower_event_ir_to_voice_events(event)) if isinstance(instrument, MidiInstrument) and instrument.is_Drum: num_drums += voice_count else: num_melodic += voice_count # Calculate minimum ports needed: # - Drums: need ceil(num_drums / 1) ports (1 drum channel per port) # - Melodic: need ceil(num_melodic / 15) ports (15 melodic channels per port) ports_for_drums = num_drums # Each port provides 1 drum channel ports_for_melodic = math.ceil(num_melodic / 15) if num_melodic > 0 else 0 min_ports_needed = max(ports_for_drums, ports_for_melodic, 1) max_concurrent = min(num_notes, max_channels) # Create multi-port writer with unlimited ports for voice independence writer = MultiPortMidiWriter(max_voices=max_concurrent) allocator = ChannelAllocator(writer.num_ports, bend_sensitivity_semitones=bend_sensitivity_semitones) if debug: print(f"[DEBUG] CompositionalUnit: {num_notes} notes, using multi-port approach") # Add global meta events - always 4/4 at 120 BPM per PRD writer.add_meta_event(MetaMessage('set_tempo', tempo=int(60_000_000 / bpm), time=0)) # Collect all note events with per-note channel allocation note_events = [] voice_counter = 0 # For standalone playback: subtract offset to start from beginning (DO NOT MUTATE OBJECT) min_start_time = min(event.start for event in compositional_unit if not event.is_rest) if any(not event.is_rest for event in compositional_unit) else 0 for event in compositional_unit: if not event.is_rest: instrument = compositional_unit.get_instrument(event._node_id) expanded_voices = lower_event_ir_to_voice_events(event) for expanded_voice in expanded_voices: if isinstance(instrument, MidiInstrument): is_drum = instrument.is_Drum program = 1 if is_drum else instrument.prgm note_param = expanded_voice["pfields"].get('note', instrument['note']) velocity = expanded_voice["pfields"].get('velocity', instrument['velocity']) elif isinstance(instrument, int): is_drum = False program = instrument note_param = expanded_voice["pfields"].get('note', 60) velocity = expanded_voice["pfields"].get('velocity', DEFAULT_VELOCITY) else: is_drum = expanded_voice["pfields"].get('is_drum', event.get_pfield('is_drum', False)) default_program = 1 if is_drum else 0 program = expanded_voice["pfields"].get('program', event.get_pfield('program', default_program)) note_param = expanded_voice["pfields"].get('note', DEFAULT_DRUM_NOTE if is_drum else 60) velocity = expanded_voice["pfields"].get('velocity', event.get_pfield('velocity', DEFAULT_VELOCITY)) voice_id = f"voice_{voice_counter}" voice_counter += 1 start_time = expanded_voice["start"] - min_start_time duration = expanded_voice["duration"] try: if debug: print(f"[DEBUG] Allocating voice_id={voice_id}, is_drum={is_drum}, program={program}") if isinstance(instrument, MidiInstrument): print(f"[DEBUG] -> Instrument: {instrument.name}, is_Drum={instrument.is_Drum}, prgm={instrument.prgm}") port, channel = allocator.allocate_voice(voice_id, is_drum, program) if debug: print(f"[DEBUG] Allocated port={port}, channel={channel}") except RuntimeError: continue if is_drum: midi_note = int(note_param) if note_param else DEFAULT_DRUM_NOTE pitch_bend = None elif isinstance(note_param, Pitch): midi_note, pitch_bend = _calculate_base_note_and_bend(note_param, allocator.bend_sensitivity_semitones) elif isinstance(note_param, float) and note_param != int(note_param): pitch = Pitch.from_midi(note_param) midi_note, pitch_bend = _calculate_base_note_and_bend(pitch, allocator.bend_sensitivity_semitones) else: midi_note = int(note_param) if note_param else 60 pitch_bend = None note_events.append({ 'voice_id': voice_id, 'port': port, 'channel': channel, 'start_time': start_time, 'duration': duration, 'midi_note': midi_note, 'velocity': velocity, 'program': program, 'is_drum': is_drum, 'pitch_bend': pitch_bend }) # Generate MIDI events using new allocator system _generate_multi_port_events(note_events, writer, allocator, bpm, debug) # Finalize and return writer.finalize(bpm) return writer.get_midi_file() def _create_midi_from_collection(collection, max_channels=128, bend_sensitivity_semitones=12, debug=False): """Create a MIDI file from a TemporalUnitSequence or TemporalBlock using multi-port approach for voice independence.""" # PRD: Use absolute timing only - ignore BPM/tempo from temporal objects # Always use 4/4 at 120 BPM for MIDI file, rely on absolute start/duration times bpm = 120 # Estimate max concurrent voices needed across all units max_concurrent = _estimate_max_concurrent_voices(collection) max_concurrent = min(max_concurrent, max_channels) # Create writer with enough capacity for voice independence # Use multi-port approach to provide unlimited channel support per your spec writer = MultiPortMidiWriter(max_voices=max_concurrent) allocator = ChannelAllocator(writer.num_ports, bend_sensitivity_semitones=bend_sensitivity_semitones) # Add global meta events - always 4/4 at 120 BPM per PRD writer.add_meta_event(MetaMessage('set_tempo', tempo=int(60_000_000 / bpm), time=0)) if debug: print(f"[DEBUG] Collection: {max_concurrent} voices, {writer.num_ports} ports, using multi-port for voice independence") # Collect all note events from all units in the collection note_events = [] voice_counter = 0 # For TemporalUnitSequence: units are sequential # For TemporalBlock: units are parallel (multiple rows) if isinstance(collection, TemporalUnitSequence): # Sequential units - each unit has its own offset already built in for unit in collection: voice_counter = _collect_note_events_from_unit(unit, note_events, 0.0, voice_counter, allocator, debug) elif isinstance(collection, TemporalBlock): # Parallel units - all rows start at the same time (time 0) for row in collection: # Recursively handle ANY temporal object (including nested BT/UTS) voice_counter = _collect_note_events_from_unit(row, note_events, 0.0, voice_counter, allocator, debug) # Sort note events by time for proper sequential playback note_events.sort(key=lambda event: event['start_time']) # Generate MIDI events using new allocator system for voice independence if debug: print(f"[DEBUG] Collection: Generating MIDI for {len(note_events)} note events") _generate_multi_port_events(note_events, writer, allocator, bpm, debug) # Finalize and return writer.finalize(bpm) return writer.get_midi_file() def _collect_note_events_from_unit(unit, note_events, time_offset, voice_counter, allocator, debug=False): """Helper function to collect note events from a single temporal unit with multi-port allocation.""" from klotho.chronos.temporal_units.temporal import TemporalUnit from klotho.thetos.composition.compositional import CompositionalUnit if debug: print(f"[DEBUG] Collecting from {type(unit)}, voice_counter={voice_counter}") print(f"[DEBUG] isinstance(unit, CompositionalUnit): {isinstance(unit, CompositionalUnit)}") print(f"[DEBUG] isinstance(unit, TemporalUnit): {isinstance(unit, TemporalUnit)}") if isinstance(unit, CompositionalUnit): # CompositionalUnit with parameters - each note gets own channel if debug: print(f"[DEBUG] Processing as CompositionalUnit: {type(unit)}") for event in unit: if not event.is_rest: instrument = unit.get_instrument(event._node_id) expanded_voices = lower_event_ir_to_voice_events(event) if debug: print(f"[DEBUG] Collection event node {event._node_id}: instrument={getattr(instrument, 'name', instrument)}, is_Drum={getattr(instrument, 'is_Drum', 'N/A')}") for expanded_voice in expanded_voices: if isinstance(instrument, MidiInstrument): is_drum = instrument.is_Drum program = 1 if is_drum else instrument.prgm note_param = expanded_voice["pfields"].get('note', instrument['note']) velocity = expanded_voice["pfields"].get('velocity', instrument['velocity']) elif isinstance(instrument, int): is_drum = False program = instrument note_param = expanded_voice["pfields"].get('note', 60) velocity = expanded_voice["pfields"].get('velocity', DEFAULT_VELOCITY) else: is_drum = expanded_voice["pfields"].get('is_drum', event.get_pfield('is_drum', False)) default_program = 1 if is_drum else 0 program = expanded_voice["pfields"].get('program', event.get_pfield('program', default_program)) note_param = expanded_voice["pfields"].get('note', DEFAULT_DRUM_NOTE if is_drum else 60) velocity = expanded_voice["pfields"].get('velocity', event.get_pfield('velocity', DEFAULT_VELOCITY)) voice_id = f"collection_voice_{voice_counter}" voice_counter += 1 start_time = expanded_voice["start"] duration = expanded_voice["duration"] try: port, channel = allocator.allocate_voice(voice_id, is_drum, program) except RuntimeError: continue if is_drum: midi_note = int(note_param) if note_param else DEFAULT_DRUM_NOTE pitch_bend = None elif isinstance(note_param, Pitch): midi_note, pitch_bend = _calculate_base_note_and_bend(note_param, allocator.bend_sensitivity_semitones) elif isinstance(note_param, float) and note_param != int(note_param): pitch = Pitch.from_midi(note_param) midi_note, pitch_bend = _calculate_base_note_and_bend(pitch, allocator.bend_sensitivity_semitones) else: midi_note = int(note_param) if note_param else 60 pitch_bend = None note_events.append({ 'voice_id': voice_id, 'port': port, 'channel': channel, 'start_time': start_time, 'duration': duration, 'midi_note': midi_note, 'velocity': velocity, 'program': program, 'is_drum': is_drum, 'pitch_bend': pitch_bend }) elif isinstance(unit, TemporalUnit): # TemporalUnit: use single drum channel (optimization) if debug: print(f"[DEBUG] Processing as TemporalUnit: {type(unit)}") try: port, channel = allocator.allocate_voice("temporal_drum_shared", is_drum=True, program=1) except RuntimeError: port, channel = 0, 9 # Fallback for chronon in unit: if not chronon.is_rest: voice_id = "temporal_drum_shared" # Reuse same voice for TemporalUnit start_time = chronon.start # Events already have absolute timing duration = abs(chronon.duration) note_events.append({ 'voice_id': voice_id, 'port': port, 'channel': channel, # Same channel for all TemporalUnit notes 'start_time': start_time, 'duration': duration, 'midi_note': DEFAULT_DRUM_NOTE, 'velocity': DEFAULT_VELOCITY, 'program': 1, 'is_drum': True, 'pitch_bend': None }) elif isinstance(unit, TemporalUnitSequence): # Recursive case: TemporalUnitSequence containing other temporal objects for sub_unit in unit: voice_counter = _collect_note_events_from_unit(sub_unit, note_events, time_offset, voice_counter, allocator, debug) elif isinstance(unit, TemporalBlock): # Recursive case: TemporalBlock containing other temporal objects for row in unit: voice_counter = _collect_note_events_from_unit(row, note_events, time_offset, voice_counter, allocator, debug) else: if debug: print(f"[DEBUG] Warning: Unknown unit type {type(unit)}, skipping") return voice_counter def _collect_events_from_unit_with_offset(unit, all_events, time_offset=0.0): """Helper function to collect events from a single temporal unit with time offset.""" # Handle different types of temporal objects from klotho.chronos.temporal_units.temporal import TemporalUnit, TemporalUnitSequence, TemporalBlock from klotho.thetos.composition.compositional import CompositionalUnit if isinstance(unit, CompositionalUnit): # CompositionalUnit: iterate over events (which are Chronon objects with is_rest) unit_copy = unit.copy() for event in unit_copy: if not event.is_rest: # Get instrument information from the parameter tree instrument = unit_copy.get_instrument(event._node_id) if isinstance(instrument, MidiInstrument): is_drum = instrument.is_Drum program = 1 if is_drum else instrument.prgm note_param = event.get_pfield('note', instrument['note']) velocity = event.get_pfield('velocity', instrument['velocity']) elif isinstance(instrument, int): is_drum = False program = instrument note_param = event.get_pfield('note', 60) velocity = event.get_pfield('velocity', DEFAULT_VELOCITY) else: is_drum = event.get_pfield('is_drum', False) default_program = 1 if is_drum else 0 program = event.get_pfield('program', default_program) note_param = event.get_pfield('note', DEFAULT_DRUM_NOTE if is_drum else 60) velocity = event.get_pfield('velocity', DEFAULT_VELOCITY) start_time = event.start # Events already have absolute timing duration = abs(event.duration) # Handle microtonal MIDI float values directly like CompositionalUnit if is_drum: # Drums always go to percussion channel channel = PERCUSSION_CHANNEL midi_note = int(note_param) if note_param else DEFAULT_DRUM_NOTE # Add note events - no pitch bend for drums all_events.append((start_time, 'note_on', channel, midi_note, velocity, program)) all_events.append((start_time + duration, 'note_off', channel, midi_note, 0, program)) elif isinstance(note_param, Pitch): # Use the same logic as pitch collections channel, midi_note, pitch_bend = _get_microtonal_channel_and_note(note_param) all_events.append((start_time, 'pitch_bend', channel, pitch_bend)) # Add note events all_events.append((start_time, 'note_on', channel, midi_note, velocity, program)) all_events.append((start_time + duration, 'note_off', channel, midi_note, 0, program)) elif isinstance(note_param, float) and note_param != int(note_param): # Microtonal MIDI float - convert to Pitch and use same logic pitch = Pitch.from_midi(note_param) channel, midi_note, pitch_bend = _get_microtonal_channel_and_note(pitch) all_events.append((start_time, 'pitch_bend', channel, pitch_bend)) # Add note events all_events.append((start_time, 'note_on', channel, midi_note, velocity, program)) all_events.append((start_time + duration, 'note_off', channel, midi_note, 0, program)) else: # Simple integer MIDI note - use channel 0 channel = 0 midi_note = int(note_param) if note_param else 60 # Add note events all_events.append((start_time, 'note_on', channel, midi_note, velocity, program)) all_events.append((start_time + duration, 'note_off', channel, midi_note, 0, program)) elif isinstance(unit, TemporalUnit): # TemporalUnit: iterate over chronons (which have is_rest) unit_copy = unit.copy() for chronon in unit_copy: if not chronon.is_rest: start_time = chronon.start # Events already have absolute timing duration = abs(chronon.duration) # TemporalUnit should use drum sounds, not piano (program 1, not 0) all_events.append((start_time, 'note_on', PERCUSSION_CHANNEL, DEFAULT_DRUM_NOTE, DEFAULT_VELOCITY, 1)) all_events.append((start_time + duration, 'note_off', PERCUSSION_CHANNEL, DEFAULT_DRUM_NOTE, 0, 1)) elif isinstance(unit, (TemporalUnitSequence, TemporalBlock)): # Recursive case: these contain other temporal objects if isinstance(unit, TemporalUnitSequence): # Sequential: process each unit in order for sub_unit in unit: _collect_events_from_unit_with_offset(sub_unit, all_events, time_offset) elif isinstance(unit, TemporalBlock): # Parallel: all units start at the same time for sub_unit in unit: _collect_events_from_unit_with_offset(sub_unit, all_events, time_offset) else: print(f"Warning: Unknown unit type {type(unit)}, skipping") def _midi_to_audio(midi_file, soundfont_path=None, max_polyphony=None): """Convert MIDI file to audio for playback.""" with tempfile.NamedTemporaryFile(suffix='.mid', delete=False) as midi_temp: midi_file.save(midi_temp.name) midi_path = midi_temp.name with tempfile.NamedTemporaryFile(suffix='.wav', delete=False) as audio_temp: audio_path = audio_temp.name try: # Always try Colab method first if we're in Colab if _is_colab(): # print("Detected Google Colab environment, using timidity...") return _midi_to_audio_colab(midi_path, audio_path) # Try FluidSynth for local environments if HAS_FLUIDSYNTH: # print("Using FluidSynth for MIDI synthesis...") try: return _midi_to_audio_fluidsynth(midi_path, audio_path) except Exception as e: print(f"FluidSynth failed ({e}), trying fallback...") return _midi_to_audio_fallback(midi_path) else: print("No MIDI synthesis available, using fallback...") return _midi_to_audio_fallback(midi_path) finally: try: os.unlink(midi_path) if os.path.exists(audio_path): os.unlink(audio_path) except OSError: pass def _midi_to_audio_colab(midi_path, audio_path): """Convert MIDI to audio in Google Colab using FluidSynth CLI (PRD requirement).""" try: # Analyze MIDI file to determine channel requirements midi_file = MidiFile(midi_path) max_channels = _get_max_channels_from_midi(midi_file) # Round up to nearest multiple of 16 for port allocation channels_needed = ((max_channels - 1) // 16 + 1) * 16 channels_needed = max(16, min(channels_needed, 256)) # Cap at 256 channels # Calculate polyphony (estimate based on concurrent notes) estimated_polyphony = max(channels_needed * 2, 256) # First try to install FluidSynth if not available try: subprocess.run(['which', 'fluidsynth'], check=True, capture_output=True) except subprocess.CalledProcessError: print("Installing FluidSynth for multi-channel MIDI rendering...") subprocess.run([ 'apt-get', 'update', '-qq' ], check=True, capture_output=True) subprocess.run([ 'apt-get', 'install', '-y', '-qq', 'fluidsynth', 'fluid-soundfont-gm' ], check=True, capture_output=True) # Use FluidSynth CLI with multi-channel support (PRD requirement) fluidsynth_cmd = [ 'fluidsynth', '-ni', # No interactive mode '-o', f'synth.midi-channels={channels_needed}', '-o', f'synth.polyphony={estimated_polyphony}', '/usr/share/sounds/sf2/FluidR3_GM.sf2', # Default Colab soundfont midi_path, '-F', audio_path, '-r', '44100' ] subprocess.run(fluidsynth_cmd, check=True, capture_output=True) audio_widget = Audio(audio_path, autoplay=False) return audio_widget except (subprocess.CalledProcessError, FileNotFoundError) as e: print(f"FluidSynth installation or execution failed: {e}") print("PRD requires FluidSynth CLI for Colab. Install with: !apt install fluidsynth fluid-soundfont-gm") return _midi_to_audio_fallback(midi_path) def _midi_to_audio_fluidsynth(midi_path, audio_path): """Convert MIDI to audio using FluidSynth CLI with multi-channel support.""" try: # Analyze MIDI file to determine channel requirements (same as Colab) midi_file = MidiFile(midi_path) max_channels = _get_max_channels_from_midi(midi_file) # For multi-port MIDI files, we need to count ALL channels across ALL ports # Each port contributes 16 channels, so total = num_ports * 16 num_ports = len([track for track in midi_file.tracks if any(msg.type == 'midi_port' for msg in track)]) if num_ports > 0: channels_needed = num_ports * 16 # Each port has 16 channels else: # Fallback for single-port files channels_needed = ((max_channels - 1) // 16 + 1) * 16 channels_needed = max(16, min(channels_needed, 256)) # Cap at 256 channels # Calculate polyphony (estimate based on concurrent notes) estimated_polyphony = max(channels_needed * 2, 256) # Try FluidSynth CLI first (same approach as Colab) soundfont = _ensure_soundfont() if not soundfont: soundfont = "/System/Library/Compositions/VoiceOver/Compact.sf2" # macOS fallback fluidsynth_cmd = [ 'fluidsynth', '-ni', # No interactive mode '-o', f'synth.midi-channels={channels_needed}', '-o', f'synth.polyphony={estimated_polyphony}', soundfont, midi_path, '-F', audio_path, '-r', '44100' ] # Try CLI version first try: subprocess.run(fluidsynth_cmd, check=True, capture_output=True) audio_widget = Audio(audio_path, autoplay=False) return audio_widget except (subprocess.CalledProcessError, FileNotFoundError): # Fallback to pyfluidsynth if CLI fails pass except Exception: # If anything fails, fall back to original method pass # Fallback: Original pyfluidsynth method (limited channels) soundfont = _ensure_soundfont() # Create FluidSynth instance if soundfont and os.path.exists(soundfont): fs = FluidSynth(sound_font=soundfont) else: fs = FluidSynth() # Suppress output by redirecting to devnull at subprocess level with open(os.devnull, 'w') as devnull: old_stdout = os.dup(1) old_stderr = os.dup(2) os.dup2(devnull.fileno(), 1) os.dup2(devnull.fileno(), 2) try: fs.midi_to_audio(midi_path, audio_path) finally: os.dup2(old_stdout, 1) os.dup2(old_stderr, 2) os.close(old_stdout) os.close(old_stderr) audio_widget = Audio(audio_path, autoplay=False) return audio_widget def _midi_to_audio_fallback(midi_path): """Fallback method that returns the MIDI file directly.""" print("Audio synthesis not available. Returning MIDI file for download.") print(f"MIDI file available at: {midi_path}") # Return an Audio widget that points to the MIDI file # This won't play in most browsers, but at least won't crash try: # Try to return as a download link if possible from IPython.display import FileLink return FileLink(midi_path) except ImportError: # Fallback to basic Audio widget return Audio(midi_path, autoplay=False) # Legacy functions removed - now using MultiPortMidiWriter + ChannelAllocator system def _get_max_channels_from_midi(midi_file): """ Analyze a MIDI file to determine the maximum number of channels used. Parameters ---------- midi_file : MidiFile The MIDI file to analyze Returns ------- int Maximum channel number used + 1 (since channels are 0-indexed) """ max_channel = 0 for track in midi_file.tracks: for msg in track: if hasattr(msg, 'channel'): max_channel = max(max_channel, msg.channel) return max_channel + 1 # Convert from 0-indexed to count
[docs] class MultiPortMidiWriter: """ Multi-port MIDI file writer that supports up to 256 channels across 16 ports. Creates Type 1 MIDI files with: - Track 0: Global meta events (tempo, time signature) - Track 1-N: One track per port with midi_port meta message """
[docs] def __init__(self, max_voices=128, ticks_per_beat=TICKS_PER_BEAT): self.max_voices = max_voices self.ticks_per_beat = ticks_per_beat self.num_ports = math.ceil(max_voices / 16) # Remove 16-port limit for voice independence - MIDI spec allows more ports # Each port provides 16 channels, so we can have many ports for voice independence # Create MIDI file structure self.midi_file = MidiFile(type=1, ticks_per_beat=ticks_per_beat) # Track 0: Global meta events only self.meta_track = MidiTrack() self.midi_file.tracks.append(self.meta_track) # Port tracks: One track per port self.port_tracks = [] for port in range(self.num_ports): track = MidiTrack() # Add midi_port meta message at time 0 track.append(MetaMessage('midi_port', port=port, time=0)) self.port_tracks.append(track) self.midi_file.tracks.append(track) # Event buckets: Store events per port before final timing calculation self.port_events = [[] for _ in range(self.num_ports)]
[docs] def add_meta_event(self, meta_message): """Add a meta event to Track 0.""" self.meta_track.append(meta_message)
[docs] def add_event(self, port, event_time, message): """Add a MIDI event to the specified port's event bucket.""" if port >= self.num_ports: raise ValueError(f"Port {port} exceeds maximum ports {self.num_ports}") self.port_events[port].append((event_time, message))
[docs] def finalize(self, bpm=120): """ Finalize the MIDI file by sorting events and calculating delta times. Parameters ---------- bpm : float Beats per minute for timing calculations """ beat_duration = 60.0 / bpm # Process each port's events for port, events in enumerate(self.port_events): if not events: continue track = self.port_tracks[port] # Sort events by time, then by message type priority # Proper MIDI setup order: Bank Select -> Program Change -> Pitch Bend -> Note On message_priority = { 'control_change': 0, # Bank Select (CC0) and RPN messages 'program_change': 1, 'pitchwheel': 2, # Pitch bend after program setup 'note_on': 3, 'note_off': 4 } events.sort(key=lambda x: (x[0], message_priority.get(x[1].type, 5))) # Calculate delta times and add to track current_time = 0.0 for event_time, message in events: delta_time = event_time - current_time delta_ticks = int(delta_time / beat_duration * self.ticks_per_beat) # Clone message with delta time msg_copy = message.copy(time=delta_ticks) track.append(msg_copy) current_time = event_time
[docs] def get_midi_file(self): """Return the completed MIDI file.""" return self.midi_file
[docs] class ChannelAllocator: """ Per-note channel allocator that manages (port, channel) assignment for voices. Maintains separate pools for melodic and drum channels per port. Ensures no channel conflicts during concurrent note playback. """
[docs] def __init__(self, num_ports, bend_sensitivity_semitones=12): self.num_ports = num_ports self.bend_sensitivity_semitones = bend_sensitivity_semitones # For better synthesizer compatibility, use single-port approach with channel cycling # Channel 9 is reserved for drums in GM spec, many synthesizers hard-code this if num_ports == 1: # Single port: traditional 16-channel approach melodic_channels = [ch for ch in range(16) if ch != 9] # 0-8, 10-15 self.free_melodic = [deque(melodic_channels)] self.free_drum = [deque([9])] # Channel 9 for drums else: # Multi-port: each port gets full channel range melodic_channels = [ch for ch in range(16) if ch != 9] # 0-8, 10-15 self.free_melodic = [deque(melodic_channels) for _ in range(num_ports)] self.free_drum = [deque([9]) for _ in range(num_ports)] # Start with channel 9 available for drums # Active voice tracking: voice_id -> (port, channel, is_drum) self.active_voices = {} # Channel state per port: [port][channel] -> {bank, program, bend_sens} self.channel_state = [[{} for _ in range(16)] for _ in range(num_ports)] # Round-robin allocation tracking self.next_port = 0 self.next_melodic_channel = 0 # Global melodic channel counter self.next_drum_channel = 0 # Global drum channel counter
[docs] def allocate_voice(self, voice_id, is_drum=False, program=0): """ Allocate a (port, channel) for a new voice. Parameters ---------- voice_id : hashable Unique identifier for this voice is_drum : bool Whether this voice uses drum sounds program : int MIDI program number Returns ------- tuple (port, channel) allocation """ # Use round-robin allocation to cycle through ALL available channels if is_drum: # For drums: Use channel 9 on each port in round-robin fashion # Total drum channels available: num_ports (one channel 9 per port) port = self.next_drum_channel % self.num_ports channel = 9 self.next_drum_channel += 1 # Clear channel state for this allocation self.channel_state[port][channel] = {} else: # For melodic: Use round-robin across ALL melodic channels on ALL ports # Total melodic channels available: num_ports * 15 (channels 0-8, 10-15 per port) melodic_channels_per_port = 15 total_melodic_channels = self.num_ports * melodic_channels_per_port # Calculate which port and channel to use global_channel_index = self.next_melodic_channel % total_melodic_channels port = global_channel_index // melodic_channels_per_port # Map to actual channel number (skip channel 9) local_channel_index = global_channel_index % melodic_channels_per_port if local_channel_index >= 9: channel = local_channel_index + 1 # Skip channel 9: 0-8, then 10-15 else: channel = local_channel_index # Use channels 0-8 self.next_melodic_channel += 1 # Clear channel state for this allocation self.channel_state[port][channel] = {} # Record allocation self.active_voices[voice_id] = (port, channel, is_drum) return port, channel
[docs] def release_voice(self, voice_id): """ Release a voice and return its channel to the appropriate pool. Parameters ---------- voice_id : hashable The voice identifier to release """ if voice_id not in self.active_voices: return port, channel, is_drum = self.active_voices.pop(voice_id) # Return channel to appropriate pool (following PRD) if is_drum: # Return to drum pool self.free_drum[port].append(channel) else: # Return to melodic pool self.free_melodic[port].append(channel)
[docs] def get_channel_state(self, port, channel): """Get the current state of a channel.""" return self.channel_state[port][channel]
[docs] def set_channel_state(self, port, channel, **kwargs): """Update the state of a channel.""" self.channel_state[port][channel].update(kwargs)
[docs] def get_available_channels(self): """Get count of available channels for debugging.""" total_melodic = sum(len(pool) for pool in self.free_melodic) total_drum = sum(len(pool) for pool in self.free_drum) return total_melodic, total_drum
def _estimate_max_concurrent_voices(obj): """ Estimate the maximum number of concurrent voices needed for an object. Recursively counts all note events in nested temporal structures. Parameters ---------- obj : musical object The object to analyze Returns ------- int Estimated maximum concurrent voices """ from klotho.chronos.temporal_units.temporal import TemporalUnit, TemporalUnitSequence, TemporalBlock from klotho.thetos.composition.compositional import CompositionalUnit if isinstance(obj, (TemporalUnitSequence, TemporalBlock)): # Recursively count all events in nested structures total_events = 0 for unit in obj: total_events += _estimate_max_concurrent_voices(unit) return min(total_events, 256) # Cap at 256 voices elif isinstance(obj, (TemporalUnit, CompositionalUnit)): # Count non-rest events in the unit return len([event for event in obj if not event.is_rest]) elif hasattr(obj, '__len__'): # For other collections (scales, chords, etc.) return min(len(obj), 256) # Cap at 256 voices else: # For single objects, assume 32 voices as reasonable default (2 ports) return 32 def _calculate_base_note_and_bend(pitch, bend_sensitivity_semitones): """ Calculate optimal base MIDI note and pitch bend for microtonal pitch. Parameters ---------- pitch : Pitch The target pitch bend_sensitivity_semitones : int Maximum bend range in semitones Returns ------- tuple (base_midi_note, pitch_bend_value) """ target_midi = pitch.midi # For standard 12-TET notes, no bend needed if abs(target_midi - round(target_midi)) < 0.001: return int(round(target_midi)), 8192 # Center pitch bend # Choose base note to minimize bend amount nearest_midi = round(target_midi) cents_offset = (target_midi - nearest_midi) * 100.0 # If bend exceeds sensitivity, choose different base note if abs(cents_offset) > bend_sensitivity_semitones * 100: if cents_offset > 0: nearest_midi = int(target_midi) + 1 else: nearest_midi = int(target_midi) cents_offset = (target_midi - nearest_midi) * 100.0 # Convert cents to pitch bend value max_bend_cents = bend_sensitivity_semitones * 100 pitch_bend_value = int(8192 + (cents_offset / max_bend_cents) * 8192) pitch_bend_value = max(0, min(16383, pitch_bend_value)) return nearest_midi, pitch_bend_value def _generate_temporal_unit_events(note_events, writer, allocator, bpm, debug=False): """ Generate MIDI events for TemporalUnit with special handling for same-channel overlapping notes. This function handles the case where multiple notes use the same channel and note number, ensuring that overlapping notes don't create timing conflicts by properly managing note_on/note_off sequences. Parameters ---------- note_events : list List of note event dictionaries writer : MultiPortMidiWriter The multi-port MIDI writer allocator : ChannelAllocator The channel allocator bpm : float Beats per minute debug : bool Enable debug output """ if not note_events: return # For TemporalUnit: Simple approach - just use the original multi-port function # but ensure note_off messages for adjacent notes don't conflict by slightly adjusting timing # Adjust note durations to avoid exact timing conflicts adjusted_events = [] for i, note_event in enumerate(note_events): adjusted_event = note_event.copy() # If this note would end exactly when the next note starts, shorten it slightly if i < len(note_events) - 1: next_event = note_events[i + 1] current_end = note_event['start_time'] + note_event['duration'] next_start = next_event['start_time'] if abs(current_end - next_start) < 0.001: # They're adjacent (within 1ms) # Shorten current note by 1ms to avoid conflict adjusted_event['duration'] = max(0.001, note_event['duration'] - 0.001) if debug: print(f"[DEBUG] Shortened note {i} duration from {note_event['duration']:.3f}s to {adjusted_event['duration']:.3f}s to avoid conflict") adjusted_events.append(adjusted_event) # Use the original multi-port function with adjusted events _generate_multi_port_events(adjusted_events, writer, allocator, bpm, debug) def _generate_multi_port_events(note_events, writer, allocator, bpm, debug=False): """ Generate MIDI events using the multi-port writer and channel allocator. IMPORTANT: Every note gets its own channel for true voice independence, sustain capability, and unique tuning. Voices are released when notes end. Parameters ---------- note_events : list List of note event dictionaries writer : MultiPortMidiWriter The multi-port MIDI writer allocator : ChannelAllocator The channel allocator bpm : float Beats per minute """ # Generate all MIDI events - each note gets its own dedicated channel for note_event in note_events: voice_id = note_event['voice_id'] port = note_event['port'] channel = note_event['channel'] start_time = note_event['start_time'] duration = note_event['duration'] midi_note = note_event['midi_note'] velocity = note_event['velocity'] program = note_event['program'] is_drum = note_event['is_drum'] pitch_bend = note_event['pitch_bend'] # Always set up channel for new voice allocation (ensures clean state) # Send setup messages at note time but rely on MIDI writer's event sorting for proper order # Handle bank/program setup based on instrument type if is_drum: # For drums: Use Bank 0 (GM standard) + Program Change # In GM, drums use channel 9 with standard programs, no special bank needed if debug: print(f"[DEBUG] Setting up DRUM channel {channel}: Bank 0, Program {program}") writer.add_event(port, start_time, Message('control_change', channel=channel, control=0, value=0)) # Bank Select MSB writer.add_event(port, start_time, Message('program_change', channel=channel, program=program)) allocator.set_channel_state(port, channel, program=program, bank=0) else: # For melodic: Always send Bank Select 0 + Program Change if debug: print(f"[DEBUG] Setting up MELODIC channel {channel}: Bank 0, Program {program}") writer.add_event(port, start_time, Message('control_change', channel=channel, control=0, value=0)) # Bank Select MSB writer.add_event(port, start_time, Message('program_change', channel=channel, program=program)) allocator.set_channel_state(port, channel, program=program, bank=0) # Always set pitch bend sensitivity for new voice _send_rpn_pitch_bend_sensitivity(writer, port, channel, start_time, allocator.bend_sensitivity_semitones) allocator.set_channel_state(port, channel, bend_sens=allocator.bend_sensitivity_semitones) # Always reset pitch bend to center for new voice writer.add_event(port, start_time, Message('pitchwheel', channel=channel, pitch=0)) # Center pitch bend # Send pitch bend if needed (same time as note, sorted by message priority) if pitch_bend is not None: pitch_value = pitch_bend - 8192 # Convert to MIDI pitchwheel range writer.add_event(port, start_time, Message('pitchwheel', channel=channel, pitch=pitch_value)) # Send note on if debug: print(f"[DEBUG] Note ON: port={port}, ch={channel}, note={midi_note}, time={start_time:.3f}, dur={duration:.3f}") writer.add_event(port, start_time, Message('note_on', channel=channel, note=midi_note, velocity=velocity)) # Send note off writer.add_event(port, start_time + duration, Message('note_off', channel=channel, note=midi_note, velocity=0)) # Implement proper voice lifetime tracking as per PRD requirements # Track when notes end and release channels back to the pool # This prevents channel exhaustion and enables true 256-channel support # Create a list of (end_time, voice_id) pairs for voice release voice_releases = [] for note_event in note_events: end_time = note_event['start_time'] + note_event['duration'] voice_releases.append((end_time, note_event['voice_id'])) # Sort voice releases by time voice_releases.sort(key=lambda x: x[0]) # Add voice release events to the MIDI writer for end_time, voice_id in voice_releases: # Release the voice back to the allocator pool allocator.release_voice(voice_id) def _send_rpn_pitch_bend_sensitivity(writer, port, channel, time, semitones): """ Send RPN 0 (Pitch Bend Sensitivity) message sequence. Parameters ---------- writer : MultiPortMidiWriter The MIDI writer port : int MIDI port number channel : int MIDI channel number time : float Event time semitones : int Pitch bend sensitivity in semitones """ # RPN 0 message sequence writer.add_event(port, time, Message('control_change', channel=channel, control=101, value=0)) # RPN MSB writer.add_event(port, time, Message('control_change', channel=channel, control=100, value=0)) # RPN LSB writer.add_event(port, time, Message('control_change', channel=channel, control=6, value=semitones)) # Data Entry MSB writer.add_event(port, time, Message('control_change', channel=channel, control=38, value=0)) # Data Entry LSB writer.add_event(port, time, Message('control_change', channel=channel, control=101, value=127)) # Deselect RPN MSB writer.add_event(port, time, Message('control_change', channel=channel, control=100, value=127)) # Deselect RPN LSB # Test functions for MIDI backend validation def _test_channel_scale(): """ Test channel scale: 34 melodic + 4 drum voices across 3 ports. Returns ------- bool True if test passes """ try: # Create allocator for 38 voices (should use 3 ports) allocator = ChannelAllocator(num_ports=3) writer = MultiPortMidiWriter(max_voices=38) allocated_voices = [] # Allocate 34 melodic voices for i in range(34): voice_id = f"melodic_{i}" try: port, channel = allocator.allocate_voice(voice_id, is_drum=False, program=0) allocated_voices.append((voice_id, port, channel, False)) except RuntimeError: print(f"Failed to allocate melodic voice {i}") return False # Allocate 4 drum voices for i in range(4): voice_id = f"drum_{i}" try: port, channel = allocator.allocate_voice(voice_id, is_drum=True, program=0) allocated_voices.append((voice_id, port, channel, True)) except RuntimeError: print(f"Failed to allocate drum voice {i}") return False # Verify allocation spans 3 ports ports_used = set(voice[1] for voice in allocated_voices) if len(ports_used) != 3: print(f"Expected 3 ports, got {len(ports_used)}") return False # Verify at least one melodic voice on channel 9 (reclaimed from drums) melodic_on_ch9 = any(voice[1:3] == (port, 9) and not voice[3] for voice in allocated_voices for port in range(3)) print(f"✅ Channel scale test: {len(allocated_voices)} voices across {len(ports_used)} ports") print(f"✅ Channel 9 reclamation: {'Yes' if melodic_on_ch9 else 'No'}") return True except Exception as e: print(f"Channel scale test failed: {e}") return False def _test_bend_sanity(): """ Test pitch bend sanity: Small bends with proper event ordering. Returns ------- bool True if test passes """ try: from klotho.tonos.pitch.pitch import Pitch # Create a microtonal pitch (C4 + 50 cents) pitch = Pitch.from_midi(60.5) # Test base note and bend calculation base_note, pitch_bend = _calculate_base_note_and_bend(pitch, 12) # Verify bend is within reasonable range if not (0 <= pitch_bend <= 16383): print(f"Pitch bend out of range: {pitch_bend}") return False # Verify base note is reasonable if not (0 <= base_note <= 127): print(f"Base note out of range: {base_note}") return False print(f"✅ Bend sanity test: C4+50¢ → base={base_note}, bend={pitch_bend}") return True except Exception as e: print(f"Bend sanity test failed: {e}") return False def _test_drum_reclamation(): """ Test drum reclamation: Melodic on channel 10, multiple drum channels. Returns ------- bool True if test passes """ try: allocator = ChannelAllocator(num_ports=1) # Allocate melodic voice on channel 9 (10 in 1-based) melodic_voice = "melodic_ch9" port, channel = allocator.allocate_voice(melodic_voice, is_drum=False, program=1) if channel != 9: # Try to get channel 9 specifically by allocating other channels first temp_voices = [] for i in range(9): temp_id = f"temp_{i}" temp_port, temp_channel = allocator.allocate_voice(temp_id, is_drum=False, program=0) temp_voices.append(temp_id) if temp_channel == 9: break # Now allocate on channel 9 port, channel = allocator.allocate_voice(melodic_voice, is_drum=False, program=1) # Clean up temp voices for temp_id in temp_voices: allocator.release_voice(temp_id) # Allocate multiple drum voices drum1 = "drum_1" drum2 = "drum_2" port1, ch1 = allocator.allocate_voice(drum1, is_drum=True, program=0) port2, ch2 = allocator.allocate_voice(drum2, is_drum=True, program=8) print(f"✅ Drum reclamation test: Melodic on ch{channel}, Drums on ch{ch1},ch{ch2}") return True except Exception as e: print(f"Drum reclamation test failed: {e}") return False
[docs] def debug_voice_allocation(): """Debug voice allocation to understand piano fallback issue.""" print("=== DEBUGGING VOICE ALLOCATION ===") # Test with more voices than channels to see what happens allocator = ChannelAllocator(num_ports=1) # Only 1 port = 16 channels allocated_voices = [] # Try to allocate 20 drum voices (more than 16 channels) print("\nAllocating 20 drum voices:") for i in range(20): voice_id = f"drum_{i}" try: port, channel = allocator.allocate_voice(voice_id, is_drum=True, program=1) allocated_voices.append((voice_id, port, channel, True)) print(f" Voice {i+1}: port={port}, channel={channel}") except RuntimeError as e: print(f" Voice {i+1}: FAILED - {e}") break print(f"\nAllocated {len(allocated_voices)} voices before failure") # Test with multiple ports print("\n=== TESTING WITH MULTIPLE PORTS ===") allocator2 = ChannelAllocator(num_ports=3) # 3 ports = 48 channels allocated_voices2 = [] for i in range(50): # Try more than 48 voice_id = f"voice_{i}" try: port, channel = allocator2.allocate_voice(voice_id, is_drum=(i % 5 == 0), program=1) allocated_voices2.append((voice_id, port, channel)) if i < 10 or i % 10 == 0: print(f" Voice {i+1}: port={port}, channel={channel}") except RuntimeError as e: print(f" Voice {i+1}: FAILED - {e}") break print(f"\nAllocated {len(allocated_voices2)} voices with 3 ports")
[docs] def test_midi_backend(): """ Run all MIDI backend tests. Returns ------- bool True if all tests pass """ print("Running MIDI Backend Overhaul Tests...") tests = [ ("Channel Scale Test", _test_channel_scale), ("Bend Sanity Test", _test_bend_sanity), ("Drum Reclamation Test", _test_drum_reclamation) ] results = [] for test_name, test_func in tests: print(f"\n{test_name}:") result = test_func() results.append(result) print(f"{'✅ PASSED' if result else '❌ FAILED'}") all_passed = all(results) print(f"\n{'🎉 ALL TESTS PASSED' if all_passed else '❌ SOME TESTS FAILED'}") return all_passed
[docs] def debug_play_midi(obj, debug=True, **kwargs): """ Debug version of play_midi that shows detailed channel allocation and MIDI events. """ print("=== DEBUG MIDI PLAYBACK ===") return play_midi(obj, debug=debug, **kwargs)
[docs] def compare_midi_files(obj, **kwargs): """ Compare MIDI files generated by play_midi() vs create_midi() to debug differences. This function creates MIDI files using both methods and compares their contents to help identify any discrepancies. Parameters ---------- obj : musical object The object to test with both functions **kwargs Arguments passed to both functions Returns ------- dict Comparison results with detailed information """ import tempfile import os print("=== COMPARING MIDI FILES ===") # Create MIDI using create_midi() print("Creating MIDI using create_midi()...") midi_from_create = create_midi(obj, **kwargs) # Create MIDI using play_midi() by intercepting the file before audio conversion print("Creating MIDI using play_midi() internal logic...") # Reset global state _reset_microtonal_counter() # Use same logic as play_midi() to create MIDI dur = kwargs.get('dur') arp = kwargs.get('arp', False) prgm = kwargs.get('prgm', 0) max_channels = kwargs.get('max_channels', 128) bend_sensitivity_semitones = kwargs.get('bend_sensitivity_semitones', 12) debug = kwargs.get('debug', False) match obj: case TemporalUnitSequence() | TemporalBlock(): midi_from_play = _create_midi_from_collection(obj, max_channels=max_channels, bend_sensitivity_semitones=bend_sensitivity_semitones, debug=debug) case CompositionalUnit(): midi_from_play = _create_midi_from_compositional_unit(obj, max_channels=max_channels, bend_sensitivity_semitones=bend_sensitivity_semitones, debug=debug) case TemporalUnit(): midi_from_play = _create_midi_from_temporal_unit(obj, max_channels=max_channels, bend_sensitivity_semitones=bend_sensitivity_semitones, debug=debug) case RhythmTree(): temporal_unit = TemporalUnit.from_rt(obj) midi_from_play = _create_midi_from_temporal_unit(temporal_unit, max_channels=max_channels, bend_sensitivity_semitones=bend_sensitivity_semitones, debug=debug) case ChordSequence(): midi_from_play = _create_midi_from_chord_sequence(obj, dur=dur or 3.0, arp=arp, prgm=prgm, max_channels=max_channels, bend_sensitivity_semitones=bend_sensitivity_semitones, debug=debug) case PitchCollectionBase(): if isinstance(obj, Scale): midi_from_play = _create_midi_from_scale(obj, dur=dur or 0.5, prgm=prgm, max_channels=max_channels, bend_sensitivity_semitones=bend_sensitivity_semitones, debug=debug) elif isinstance(obj, (Chord, Voicing)): midi_from_play = _create_midi_from_chord(obj, dur=dur or 3.0, arp=arp, prgm=prgm, max_channels=max_channels, bend_sensitivity_semitones=bend_sensitivity_semitones, debug=debug) else: midi_from_play = _create_midi_from_pitch_collection(obj, dur=dur or 0.5, prgm=prgm, max_channels=max_channels, bend_sensitivity_semitones=bend_sensitivity_semitones, debug=debug) case _: raise TypeError(f"Unsupported object type: {type(obj)}") # Save both files temporarily for comparison with tempfile.NamedTemporaryFile(suffix='_create.mid', delete=False) as f1: create_path = f1.name midi_from_create.save(create_path) with tempfile.NamedTemporaryFile(suffix='_play.mid', delete=False) as f2: play_path = f2.name midi_from_play.save(play_path) # Compare file sizes create_size = os.path.getsize(create_path) play_size = os.path.getsize(play_path) # Compare track counts create_tracks = len(midi_from_create.tracks) play_tracks = len(midi_from_play.tracks) # Compare total ticks create_ticks = midi_from_create.ticks_per_beat play_ticks = midi_from_play.ticks_per_beat # Count messages in each file create_msg_count = sum(len(track) for track in midi_from_create.tracks) play_msg_count = sum(len(track) for track in midi_from_play.tracks) # Cleanup try: os.unlink(create_path) os.unlink(play_path) except OSError: pass results = { 'files_identical': create_size == play_size, 'create_midi_size': create_size, 'play_midi_size': play_size, 'create_tracks': create_tracks, 'play_tracks': play_tracks, 'create_ticks_per_beat': create_ticks, 'play_ticks_per_beat': play_ticks, 'create_message_count': create_msg_count, 'play_message_count': play_msg_count, 'create_midi_file': midi_from_create, 'play_midi_file': midi_from_play } print(f"Files identical: {results['files_identical']}") print(f"File sizes - create_midi: {create_size}, play_midi: {play_size}") print(f"Track counts - create_midi: {create_tracks}, play_midi: {play_tracks}") print(f"Ticks per beat - create_midi: {create_ticks}, play_midi: {play_ticks}") print(f"Message counts - create_midi: {create_msg_count}, play_midi: {play_msg_count}") if not results['files_identical']: print("⚠️ FILES ARE DIFFERENT!") else: print("✅ Files are identical") return results
[docs] def debug_temporal_unit_chronons(temporal_unit): """ Debug function to examine chronon timing values in a TemporalUnit. This helps identify why some notes might be very short in MIDI files. Parameters ---------- temporal_unit : TemporalUnit The temporal unit to examine Returns ------- dict Information about each chronon """ print("=== DEBUGGING TEMPORAL UNIT CHRONONS ===") chronon_info = [] min_start_time = min(chronon.start for chronon in temporal_unit if not chronon.is_rest) if any(not chronon.is_rest for chronon in temporal_unit) else 0 print(f"min_start_time: {min_start_time}") print(f"Total chronons: {len(temporal_unit)}") for i, chronon in enumerate(temporal_unit): info = { 'index': i, 'is_rest': chronon.is_rest, 'raw_start': chronon.start, 'raw_duration': chronon.duration, 'abs_duration': abs(chronon.duration), 'adjusted_start': chronon.start - min_start_time if not chronon.is_rest else None, 'end_time': chronon.start + chronon.duration if not chronon.is_rest else None, 'proportion': getattr(chronon, 'proportion', 'N/A') } chronon_info.append(info) status = "REST" if chronon.is_rest else "NOTE" if not chronon.is_rest: print(f"Chronon {i} ({status}): start={chronon.start:.6f}, duration={chronon.duration:.6f}, abs_duration={abs(chronon.duration):.6f}") print(f" -> adjusted_start={info['adjusted_start']:.6f}, end_time={info['end_time']:.6f}") if abs(chronon.duration) < 0.01: # Very short note print(f" ⚠️ WARNING: Very short duration ({abs(chronon.duration):.6f} seconds)") else: print(f"Chronon {i} ({status}): start={chronon.start:.6f}, duration={chronon.duration:.6f}") # Check for overlapping notes non_rest_chronons = [info for info in chronon_info if not info['is_rest']] for i in range(len(non_rest_chronons) - 1): current = non_rest_chronons[i] next_chronon = non_rest_chronons[i + 1] current_end = current['raw_start'] + abs(current['raw_duration']) next_start = next_chronon['raw_start'] if current_end > next_start: overlap = current_end - next_start print(f"⚠️ OVERLAP detected between chronon {current['index']} and {next_chronon['index']}: {overlap:.6f} seconds") elif current_end < next_start: gap = next_start - current_end print(f"ℹ️ GAP between chronon {current['index']} and {next_chronon['index']}: {gap:.6f} seconds") return chronon_info
def _ensure_midi_duration(track, target_duration_seconds, bpm): """ Ensure MIDI track duration matches target duration by adding silent padding if needed. This handles cases where the temporal structure ends with rests, ensuring the MIDI file doesn't cut off prematurely and allows reverb/sustain to play out. Parameters ---------- track : MidiTrack The MIDI track to extend target_duration_seconds : float The target duration in seconds bpm : float Beats per minute for timing calculations """ if not track: return # Calculate current track duration current_time_ticks = 0 for msg in track: current_time_ticks += msg.time # Convert to seconds beat_duration = 60.0 / bpm current_duration_seconds = current_time_ticks * beat_duration / TICKS_PER_BEAT # If we need more duration, add a silent padding message if target_duration_seconds > current_duration_seconds: missing_duration = target_duration_seconds - current_duration_seconds missing_ticks = int(missing_duration / beat_duration * TICKS_PER_BEAT) # Add a silent message (could be a control change or just a dummy note off) # Using a note off with velocity 0 on a silent channel as padding from mido import Message track.append(Message('note_off', channel=15, # Use channel 15 for silent padding note=127, # High note that won't interfere velocity=0, time=missing_ticks)) def _create_midi_from_free_pitch_collection(collection, dur=0.5, prgm=0, max_channels=128, bend_sensitivity_semitones=12, debug=False): """Create a MIDI file from a PitchCollection (sequential playback) using absolute timing.""" bpm = 120 max_concurrent = len(collection) if hasattr(collection, '__len__') else 16 max_concurrent = min(max_concurrent, max_channels) writer = MultiPortMidiWriter(max_voices=max_concurrent) allocator = ChannelAllocator(writer.num_ports, bend_sensitivity_semitones=bend_sensitivity_semitones) writer.add_meta_event(MetaMessage('set_tempo', tempo=int(60_000_000 / bpm), time=0)) if debug: print(f"[DEBUG] PitchCollection: {max_concurrent} voices, {writer.num_ports} ports") note_events = [] current_time = 0.0 voice_counter = 0 for pitch in collection: voice_id = f"free_pitch_voice_{voice_counter}" voice_counter += 1 try: port, channel = allocator.allocate_voice(voice_id, is_drum=False, program=prgm) except RuntimeError: continue midi_note, pitch_bend = _calculate_base_note_and_bend(pitch, bend_sensitivity_semitones) note_events.append({ 'voice_id': voice_id, 'port': port, 'channel': channel, 'start_time': current_time, 'duration': dur, 'midi_note': midi_note, 'velocity': DEFAULT_VELOCITY, 'program': prgm, 'is_drum': False, 'pitch_bend': pitch_bend }) current_time += dur _generate_multi_port_events(note_events, writer, allocator, bpm, debug) writer.finalize(bpm) return writer.get_midi_file() def _create_midi_from_pitch_collection(collection, dur=0.5, prgm=0, max_channels=128, bend_sensitivity_semitones=12, debug=False): """Create a MIDI file from a PitchCollection (sequential playback) using absolute timing.""" # PRD: Use absolute timing only - always 4/4 at 120 BPM bpm = 120 # Estimate voices needed max_concurrent = len(collection) if hasattr(collection, '__len__') else 16 max_concurrent = min(max_concurrent, max_channels) # Create multi-port writer and allocator writer = MultiPortMidiWriter(max_voices=max_concurrent) allocator = ChannelAllocator(writer.num_ports, bend_sensitivity_semitones=bend_sensitivity_semitones) # Add global meta events - always 4/4 at 120 BPM per PRD writer.add_meta_event(MetaMessage('set_tempo', tempo=int(60_000_000 / bpm), time=0)) if debug: print(f"[DEBUG] PitchCollection: {max_concurrent} voices, {writer.num_ports} ports") if hasattr(collection, 'is_instanced') and collection.is_instanced: instanced = collection elif hasattr(collection, 'is_relative') and not collection.is_relative: instanced = collection else: from klotho.tonos.pitch.pitch import Pitch instanced = collection.root(Pitch("C4")) note_events = [] current_time = 0.0 voice_counter = 0 for i in range(len(instanced)): pitch = instanced[i] voice_id = f"pitch_voice_{voice_counter}" voice_counter += 1 # Allocate voice (always melodic for pitch collections) try: port, channel = allocator.allocate_voice(voice_id, is_drum=False, program=prgm) except RuntimeError: continue # Calculate base note and pitch bend midi_note, pitch_bend = _calculate_base_note_and_bend(pitch, bend_sensitivity_semitones) note_events.append({ 'voice_id': voice_id, 'port': port, 'channel': channel, 'start_time': current_time, 'duration': dur, 'midi_note': midi_note, 'velocity': DEFAULT_VELOCITY, 'program': prgm, 'is_drum': False, 'pitch_bend': pitch_bend }) current_time += dur # Generate MIDI events using new allocator system _generate_multi_port_events(note_events, writer, allocator, bpm, debug) # Finalize and return writer.finalize(bpm) return writer.get_midi_file() def _create_midi_from_scale(scale, dur=0.5, prgm=0, max_channels=128, bend_sensitivity_semitones=12, debug=False): """Create a MIDI file from a Scale (ascending then descending) using absolute timing.""" # PRD: Use absolute timing only - always 4/4 at 120 BPM bpm = 120 # Estimate voices needed (scale length * 2 for ascending + descending) max_concurrent = len(scale) * 2 if hasattr(scale, '__len__') else 16 max_concurrent = min(max_concurrent, max_channels) # Create multi-port writer and allocator writer = MultiPortMidiWriter(max_voices=max_concurrent) allocator = ChannelAllocator(writer.num_ports, bend_sensitivity_semitones=bend_sensitivity_semitones) # Add global meta events - always 4/4 at 120 BPM per PRD writer.add_meta_event(MetaMessage('set_tempo', tempo=int(60_000_000 / bpm), time=0)) if debug: print(f"[DEBUG] Scale: {max_concurrent} voices, {writer.num_ports} ports") if hasattr(scale, 'is_instanced') and scale.is_instanced: instanced = scale elif hasattr(scale, 'is_relative') and not scale.is_relative: instanced = scale else: from klotho.tonos.pitch.pitch import Pitch instanced = scale.root(Pitch("C4")) # For scales: allocate channels per note for ascending (microtones), reuse for descending note_events = [] current_time = 0.0 # Track channel assignments for reuse during descending pitch_to_channel = {} # Maps pitch to (port, channel, voice_id) # Play ascending (including the equave at index len(instanced)) for i in range(len(instanced) + 1): pitch = instanced[i] pitch_key = str(pitch) # Use string representation as key # Allocate new channel for each unique pitch (microtonal support) voice_id = f"scale_voice_asc_{i}" try: port, channel = allocator.allocate_voice(voice_id, is_drum=False, program=prgm) # Remember this channel for potential reuse pitch_to_channel[pitch_key] = (port, channel, voice_id) except RuntimeError: continue # Calculate base note and pitch bend midi_note, pitch_bend = _calculate_base_note_and_bend(pitch, bend_sensitivity_semitones) note_events.append({ 'voice_id': voice_id, 'port': port, 'channel': channel, 'start_time': current_time, 'duration': dur, 'midi_note': midi_note, 'velocity': DEFAULT_VELOCITY, 'program': prgm, 'is_drum': False, 'pitch_bend': pitch_bend }) current_time += dur # Play descending (reuse channels from ascending for same pitches) for i in range(len(instanced) - 1, -1, -1): pitch = instanced[i] pitch_key = str(pitch) # Reuse channel if we have it, otherwise allocate new if pitch_key in pitch_to_channel: port, channel, voice_id = pitch_to_channel[pitch_key] voice_id = f"scale_voice_desc_{i}_reuse" # New voice ID for descending else: # Shouldn't happen, but allocate new if needed voice_id = f"scale_voice_desc_{i}" try: port, channel = allocator.allocate_voice(voice_id, is_drum=False, program=prgm) except RuntimeError: continue # Calculate base note and pitch bend midi_note, pitch_bend = _calculate_base_note_and_bend(pitch, bend_sensitivity_semitones) note_events.append({ 'voice_id': voice_id, 'port': port, 'channel': channel, # Reused channel for same pitch 'start_time': current_time, 'duration': dur, 'midi_note': midi_note, 'velocity': DEFAULT_VELOCITY, 'program': prgm, 'is_drum': False, 'pitch_bend': pitch_bend }) current_time += dur # Generate MIDI events using new allocator system _generate_multi_port_events(note_events, writer, allocator, bpm, debug) # Finalize and return writer.finalize(bpm) return writer.get_midi_file() def _create_midi_from_chord(chord, dur=3.0, arp=False, prgm=0, max_channels=128, bend_sensitivity_semitones=12, debug=False): """Create a MIDI file from a Chord (block chord or arpeggiated) using absolute timing.""" # PRD: Use absolute timing only - always 4/4 at 120 BPM bpm = 120 # Estimate voices needed max_concurrent = len(chord) if hasattr(chord, '__len__') else 16 max_concurrent = min(max_concurrent, max_channels) # Create multi-port writer and allocator writer = MultiPortMidiWriter(max_voices=max_concurrent) allocator = ChannelAllocator(writer.num_ports, bend_sensitivity_semitones=bend_sensitivity_semitones) # Add global meta events - always 4/4 at 120 BPM per PRD writer.add_meta_event(MetaMessage('set_tempo', tempo=int(60_000_000 / bpm), time=0)) if debug: print(f"[DEBUG] Chord: {max_concurrent} voices, {writer.num_ports} ports") if hasattr(chord, 'is_instanced') and chord.is_instanced: instanced = chord elif hasattr(chord, 'is_relative') and not chord.is_relative: instanced = chord else: from klotho.tonos.pitch.pitch import Pitch instanced = chord.root(Pitch("C4")) note_events = [] voice_counter = 0 if arp: # Arpeggiated: each note gets dur duration current_time = 0.0 for i in range(len(instanced)): pitch = instanced[i] voice_id = f"chord_voice_{voice_counter}" voice_counter += 1 # Allocate voice (always melodic for chords) try: port, channel = allocator.allocate_voice(voice_id, is_drum=False, program=prgm) except RuntimeError: continue # Calculate base note and pitch bend midi_note, pitch_bend = _calculate_base_note_and_bend(pitch, bend_sensitivity_semitones) note_events.append({ 'voice_id': voice_id, 'port': port, 'channel': channel, 'start_time': current_time, 'duration': dur, 'midi_note': midi_note, 'velocity': DEFAULT_VELOCITY, 'program': prgm, 'is_drum': False, 'pitch_bend': pitch_bend }) current_time += dur else: # Block chord: all notes start at once, last for dur for i in range(len(instanced)): pitch = instanced[i] voice_id = f"chord_voice_{voice_counter}" voice_counter += 1 # Allocate voice (always melodic for chords) try: port, channel = allocator.allocate_voice(voice_id, is_drum=False, program=prgm) except RuntimeError: continue # Calculate base note and pitch bend midi_note, pitch_bend = _calculate_base_note_and_bend(pitch, bend_sensitivity_semitones) note_events.append({ 'voice_id': voice_id, 'port': port, 'channel': channel, 'start_time': 0.0, 'duration': dur, 'midi_note': midi_note, 'velocity': DEFAULT_VELOCITY, 'program': prgm, 'is_drum': False, 'pitch_bend': pitch_bend }) # Generate MIDI events using new allocator system _generate_multi_port_events(note_events, writer, allocator, bpm, debug) # Finalize and return writer.finalize(bpm) return writer.get_midi_file() def _create_midi_from_chord_sequence(chord_sequence, dur=3.0, arp=False, prgm=0, max_channels=128, bend_sensitivity_semitones=12, debug=False): """Create a MIDI file from a ChordSequence (sequential chord playback) using absolute timing.""" bpm = 120 if not chord_sequence.chords: writer = MultiPortMidiWriter(max_voices=1) writer.add_meta_event(MetaMessage('set_tempo', tempo=int(60_000_000 / bpm), time=0)) writer.finalize(bpm) return writer.get_midi_file() total_notes = sum(len(chord) for chord in chord_sequence.chords if hasattr(chord, '__len__')) max_concurrent = min(total_notes, max_channels) writer = MultiPortMidiWriter(max_voices=max_concurrent) allocator = ChannelAllocator(writer.num_ports, bend_sensitivity_semitones=bend_sensitivity_semitones) writer.add_meta_event(MetaMessage('set_tempo', tempo=int(60_000_000 / bpm), time=0)) if debug: print(f"[DEBUG] ChordSequence: {len(chord_sequence.chords)} chords, {max_concurrent} voices, {writer.num_ports} ports") note_events = [] current_time = 0.0 voice_counter = 0 for chord_idx, chord in enumerate(chord_sequence.chords): if hasattr(chord, 'is_instanced') and chord.is_instanced: pitches = chord elif hasattr(chord, 'is_relative') and not chord.is_relative: pitches = chord else: from klotho.tonos.pitch.pitch import Pitch pitches = chord.root(Pitch("C4")) if arp: chord_start_time = current_time for i in range(len(pitches)): pitch = pitches[i] voice_id = f"chord_seq_voice_{voice_counter}" voice_counter += 1 try: port, channel = allocator.allocate_voice(voice_id, is_drum=False, program=prgm) except RuntimeError: continue midi_note, pitch_bend = _calculate_base_note_and_bend(pitch, bend_sensitivity_semitones) note_events.append({ 'voice_id': voice_id, 'port': port, 'channel': channel, 'start_time': current_time, 'duration': dur, 'midi_note': midi_note, 'velocity': DEFAULT_VELOCITY, 'program': prgm, 'is_drum': False, 'pitch_bend': pitch_bend }) current_time += dur else: for i in range(len(pitches)): pitch = pitches[i] voice_id = f"chord_seq_voice_{voice_counter}" voice_counter += 1 try: port, channel = allocator.allocate_voice(voice_id, is_drum=False, program=prgm) except RuntimeError: continue midi_note, pitch_bend = _calculate_base_note_and_bend(pitch, bend_sensitivity_semitones) note_events.append({ 'voice_id': voice_id, 'port': port, 'channel': channel, 'start_time': current_time, 'duration': dur, 'midi_note': midi_note, 'velocity': DEFAULT_VELOCITY, 'program': prgm, 'is_drum': False, 'pitch_bend': pitch_bend }) current_time += dur _generate_multi_port_events(note_events, writer, allocator, bpm, debug) writer.finalize(bpm) return writer.get_midi_file() def _get_microtonal_channel_and_note(pitch, channel_assignments=None): """ Assign exact microtonal pitches to dedicated channels with precise pitch bend. Each unique microtonal pitch gets its own channel for precise tuning. Channels 0-8, 10-15 are available (skip 9 for percussion). Returns: tuple: (channel, midi_note, pitch_bend_value) """ target_midi = pitch.midi # For standard 12-TET notes (no decimal part), always use channel 0 if abs(target_midi - round(target_midi)) < 0.001: return 0, int(round(target_midi)), 8192 # For microtonal notes, calculate exact pitch bend nearest_midi = round(target_midi) cents_offset = (target_midi - nearest_midi) * 100.0 # Convert cents to pitch bend value (±200 cents = ±4096 pitch bend units) pitch_bend_value = int(8192 + (cents_offset / 200.0) * 4096) pitch_bend_value = max(0, min(16383, pitch_bend_value)) # Global counter for channel assignment if not hasattr(_get_microtonal_channel_and_note, '_channel_counter'): _get_microtonal_channel_and_note._channel_counter = 1 # Start at 1 (0 is for 12-TET) # Available channels: 1-8, 10-15 (skip 9 for percussion) available_channels = list(range(1, 9)) + list(range(10, 16)) # 14 channels total # Assign next available channel and increment counter channel_index = (_get_microtonal_channel_and_note._channel_counter - 1) % len(available_channels) channel = available_channels[channel_index] _get_microtonal_channel_and_note._channel_counter += 1 return channel, nearest_midi, pitch_bend_value def _reset_microtonal_counter(): """Reset the global channel counter for new MIDI files.""" if hasattr(_get_microtonal_channel_and_note, '_channel_counter'): _get_microtonal_channel_and_note._channel_counter = 1 def _events_to_midi_messages(events, track, bpm): """Convert time-based events to MIDI messages with proper timing.""" # Sort by time, then by event type priority (pitch_bend before note_on/note_off) event_priority = {"pitch_bend": 0, "note_on": 1, "note_off": 2} events.sort(key=lambda x: (x[0], event_priority.get(x[1], 3))) current_time = 0.0 beat_duration = 60.0 / bpm # Track program changes per channel current_programs = {} for event in events: event_time, event_type = event[0], event[1] delta_time = event_time - current_time delta_ticks = int(delta_time / beat_duration * TICKS_PER_BEAT) if event_type == 'pitch_bend': channel, pitch_bend_value = event[2], event[3] # MIDI pitchwheel expects values in range -8192 to 8191, not 0 to 16383 pitch_value = pitch_bend_value - 8192 track.append(Message('pitchwheel', channel=channel, pitch=pitch_value, time=delta_ticks)) elif event_type == 'note_on': channel, note, velocity, program = event[2], event[3], event[4], event[5] # Add program change if needed (channels 0-11 are for pitched instruments) # Note: We use channels 0-11 for the 144-TET grid, all pitched instruments if current_programs.get(channel) != program: track.append(Message('program_change', channel=channel, program=program, time=delta_ticks)) current_programs[channel] = program delta_ticks = 0 # Reset delta_ticks since we used it for program change track.append(Message('note_on', channel=channel, note=note, velocity=velocity, time=delta_ticks)) elif event_type == 'note_off': channel, note, velocity, program = event[2], event[3], event[4], event[5] track.append(Message('note_off', channel=channel, note=note, velocity=velocity, time=delta_ticks)) current_time = event_time
[docs] def test_channel_allocation_system(): """ Test the channel allocation system to verify it meets the requirements. Requirements: 1. Exhaust ALL channels on a port before moving to the next port 2. Support up to 256 channels (16 ports × 16 channels) 3. Melodic instruments skip channel 9 on any port 4. Drum instruments use ONLY channel 9 on any port 5. Only loop back after exhausting all 256 channels """ print("=== TESTING CHANNEL ALLOCATION SYSTEM ===") # Test 1: Single port allocation print("\n1. Testing single port allocation:") allocator = ChannelAllocator(num_ports=1) # Allocate all 15 melodic channels melodic_allocations = [] for i in range(15): voice_id = f"melodic_{i}" port, channel = allocator.allocate_voice(voice_id, is_drum=False, program=0) melodic_allocations.append((port, channel)) print(f" Melodic {i}: port={port}, channel={channel}") # Verify no channel 9 was used for melodic melodic_channels = [ch for _, ch in melodic_allocations] if 9 in melodic_channels: print(" ❌ ERROR: Channel 9 used for melodic voice!") return False else: print(" ✅ Channel 9 correctly skipped for melodic voices") # Allocate drum channel drum_voice_id = "drum_1" drum_port, drum_channel = allocator.allocate_voice(drum_voice_id, is_drum=True, program=1) print(f" Drum: port={drum_port}, channel={drum_channel}") if drum_channel != 9: print(" ❌ ERROR: Drum not allocated to channel 9!") return False else: print(" ✅ Drum correctly allocated to channel 9") # Test 2: Multi-port allocation with exhaustion print("\n2. Testing multi-port allocation with exhaustion:") allocator2 = ChannelAllocator(num_ports=3) # 3 ports = 48 channels total # Allocate all melodic channels across ports melodic_allocations2 = [] for i in range(45): # 3 ports × 15 melodic channels = 45 voice_id = f"melodic_{i}" port, channel = allocator2.allocate_voice(voice_id, is_drum=False, program=0) melodic_allocations2.append((port, channel)) if i < 10 or i % 15 == 0: print(f" Melodic {i}: port={port}, channel={channel}") # Verify port exhaustion pattern ports_used = set(port for port, _ in melodic_allocations2) print(f" Ports used: {sorted(ports_used)}") # Count channels per port port_counts = {} for port, channel in melodic_allocations2: port_counts[port] = port_counts.get(port, 0) + 1 print(f" Channels per port: {port_counts}") # Verify each port has 15 channels before moving to next expected_counts = {0: 15, 1: 15, 2: 15} if port_counts != expected_counts: print(f" ❌ ERROR: Expected {expected_counts}, got {port_counts}") return False else: print(" ✅ Port exhaustion working correctly") # Test 3: Drum allocation across ports print("\n3. Testing drum allocation across ports:") drum_allocations = [] for i in range(3): # 3 drum channels (one per port) voice_id = f"drum_{i}" port, channel = allocator2.allocate_voice(voice_id, is_drum=True, program=1) drum_allocations.append((port, channel)) print(f" Drum {i}: port={port}, channel={channel}") # Verify all drums use channel 9 drum_channels = [ch for _, ch in drum_allocations] if not all(ch == 9 for ch in drum_channels): print(" ❌ ERROR: Not all drums allocated to channel 9!") return False else: print(" ✅ All drums correctly allocated to channel 9") # Test 4: Mixed allocation (melodic and drum) print("\n4. Testing mixed allocation:") allocator3 = ChannelAllocator(num_ports=2) # Allocate some melodic voices first for i in range(10): voice_id = f"mixed_melodic_{i}" port, channel = allocator3.allocate_voice(voice_id, is_drum=False, program=0) print(f" Mixed melodic {i}: port={port}, channel={channel}") # Then allocate some drum voices for i in range(2): voice_id = f"mixed_drum_{i}" port, channel = allocator3.allocate_voice(voice_id, is_drum=True, program=1) print(f" Mixed drum {i}: port={port}, channel={channel}") # Check available channels available_melodic, available_drum = allocator3.get_available_channels() print(f" Available channels: melodic={available_melodic}, drum={available_drum}") print("\n✅ All channel allocation tests passed!") return True
[docs] def debug_current_allocation(): """Debug the current channel allocation state.""" print("=== CURRENT CHANNEL ALLOCATION DEBUG ===") # Create a test allocator allocator = ChannelAllocator(num_ports=2) print(f"Initial state:") melodic, drum = allocator.get_available_channels() print(f" Available melodic: {melodic}") print(f" Available drum: {drum}") # Allocate some voices print(f"\nAllocating voices:") for i in range(10): voice_id = f"test_{i}" is_drum = (i % 3 == 0) # Every 3rd voice is drum port, channel = allocator.allocate_voice(voice_id, is_drum=is_drum, program=0) print(f" Voice {i} ({'drum' if is_drum else 'melodic'}): port={port}, channel={channel}") print(f"\nAfter allocation:") melodic, drum = allocator.get_available_channels() print(f" Available melodic: {melodic}") print(f" Available drum: {drum}") print(f"\nCurrent port positions:") print(f" Melodic port: {allocator.current_melodic_port}") print(f" Drum port: {allocator.current_drum_port}")