Source code for thebeat.utils

# Copyright (C) 2022-2023  Jelle van der Werff
#
# This file is part of thebeat.
#
# thebeat is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# thebeat is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with thebeat.  If not, see <https://www.gnu.org/licenses/>.

from __future__ import annotations

import numpy as np
import thebeat.core
from typing import Optional, Union
import warnings
from thebeat._warnings import phases_t_at_zero
import pandas as pd

try:
    import abjad
except ImportError:
    abjad = None


[docs]def get_ioi_df(sequences: Union[thebeat.core.Sequence, list[thebeat.core.Sequence], np.ndarray[thebeat.core.Sequence]], additional_functions: Optional[list[callable]] = None): """ This function exports a Pandas :class:`pandas.DataFrame` with information about the provided :py:class:`thebeat.core.Sequence` objects in `tidy data <https://cran.r-project.org/web/packages/tidyr/vignettes/tidy-data.html>`_ format. The DataFrame always has the columns: * ``Sequence_index``: The index of the Sequence object in the list of Sequences. * ``IOI_i``: The index of the IOI in the Sequence. * ``IOI``: The IOI. Additionally it has a column ``Sequence_name`` if at least one of the provided Sequence objects has a name. Moreover, one can provide a list of functions that will be applied to each sequence's IOIs. The results will be added as additional columns in the output DataFrame. See under 'Examples' for an illustration. Parameters ---------- sequences The Sequence object(s) to be exported. additional_functions A list of functions that will be applied to the IOIs for each individual sequence, and the results of which will be added as additional columns. Returns ------- pd.DataFrame A Pandas DataFrame with information about the provided Sequence objects in tidy data format. Examples -------- >>> rng = np.random.default_rng(123) >>> seqs = [thebeat.core.Sequence.generate_random_normal(n_events=10, mu=500, sigma=25, rng=rng) for _ in range(10)] >>> df = get_ioi_df(seqs) >>> print(df.head()) sequence_i ioi_i ioi 0 0 0 475.271966 1 0 1 490.805334 2 0 2 532.198132 3 0 3 504.849360 4 0 4 523.005772 >>> import numpy as np >>> df = get_ioi_df(seqs, additional_functions=[np.mean, np.std]) >>> print(df.head()) sequence_i mean std ioi_i ioi 0 0 503.364499 17.923263 0 475.271966 1 0 503.364499 17.923263 1 490.805334 2 0 503.364499 17.923263 2 532.198132 3 0 503.364499 17.923263 3 504.849360 4 0 503.364499 17.923263 4 523.005772 """ # Checks if not all(isinstance(sequence, thebeat.core.Sequence) for sequence in sequences): raise TypeError("The provided sequences must be Sequence objects.") if additional_functions is not None and not all(callable(f) for f in additional_functions): raise TypeError("The functions in additional_functions must be callable.") # Create output dictionary output_df = None # Loop over sequences to fill output_dict with the columns that we always have for i, sequence in enumerate(sequences): # Start with the sequence index and the sequence name if it exists sequence_dict = {'sequence_i': i, 'sequence_name': sequence.name if sequence.name else np.nan} # If functions were provided, add those columns if additional_functions is not None: for f in additional_functions: # todo consider what sorts of error handling to do here sequence_dict[f.__name__] = f(sequence.iois) # Add the IOI index and the IOI itself sequence_dict['ioi_i'] = np.arange(len(sequence.iois)) sequence_dict['ioi'] = sequence.iois # Concatenate the new DataFrame to the output DataFrame # If this is the first iteration, we need to create the output DataFrame first if output_df is None: output_df = pd.DataFrame(sequence_dict) else: output_df = pd.concat([output_df, pd.DataFrame(sequence_dict)], ignore_index=True) # Check if all names are None, if so, drop the column if output_df['sequence_name'].isnull().all(): output_df.drop('sequence_name', axis=1, inplace=True) return output_df
[docs]def get_major_scale(tonic: str, octave: int) -> list: """Get the major scale for a given tonic and octave. Returns a list of :class:`abjad.pitch.NamedPitch` objects. Note ---- This function requires abjad to be installed. Parameters ---------- tonic The tonic of the scale, e.g. 'G'. octave The octave of the scale, e.g. 4. Returns ------- pitches A list of :class:`abjad.pitch.NamedPitch` objects. """ if abjad is None: raise ImportError("This function requires the abjad package. Install, for instance by typing " "'pip install abjad' into your terminal.") intervals = "M2 M2 m2 M2 M2 M2 m2".split() intervals = [abjad.NamedInterval(interval) for interval in intervals] pitches = [] pitch = abjad.NamedPitch(tonic, octave=octave) pitches.append(pitch) for interval in intervals: pitch = pitch + interval pitches.append(pitch) return pitches
[docs]def get_phase_differences(test_sequence: thebeat.core.Sequence, reference_sequence: Union[thebeat.core.Sequence, float], circular_unit="degrees"): # todo Verify this method of calculating phase differences. I'm not quite sure about using the period of the # test_sequence instead of the reference_sequence """Get the phase differences for ``test_sequence`` compared to ``reference_sequence``. If the second argument is a number, ``test_sequence`` will be compared with an isochronous sequence with a constant inter-onset interval (IOI) of that number and the same length as the test sequence. Caution ------- The phase differences are calculated for each onset of ``test_sequence`` compared to the onset with the same index of ``reference_sequence``. Missing values are discarded. In addition, if the first onset of the test sequence is at t = 0, that phase difference is also discarded. Parameters ---------- test_sequence The sequence to be compared with the reference sequence. Can either be a single Sequence or a list or array of Sequences. reference_sequence The reference sequence. Can be a Sequence object, a list or array of Sequence objects, or a number. In the latter case, the reference sequence will be an isochronous sequence with a constant IOI of that number and the same length as ``sequence_1``. circular_unit The unit of the circular unit. Can be "degrees" or "radians". """ # Input validation if not isinstance(test_sequence, thebeat.core.Sequence): raise TypeError("Please provide a Sequence object as the left argument.") elif isinstance(reference_sequence, (int, float)): reference_sequence = thebeat.core.Sequence.generate_isochronous(n_events=len(test_sequence.onsets), ioi=reference_sequence) elif isinstance(reference_sequence, thebeat.core.Sequence): pass else: raise TypeError("Please provide a Sequence object as the left-hand argument, and a Sequence object or a " "number as the right-hand argument.") # Get onsets once test_onsets = test_sequence.onsets ref_onsets = reference_sequence.onsets # If the first onset is at t=0, raise warning and remove it if test_sequence._first_onset == 0: warnings.warn(phases_t_at_zero) test_onsets = test_onsets[1:] ref_onsets = ref_onsets[1:] start_at_tzero = False else: start_at_tzero = True # Check length sameness if not len(test_onsets) == len(ref_onsets): raise ValueError("This function only works if the number of events in the two sequences are equal. For " "missing data, insert np.nan values in the sequence for the missing data.") # Output array phase_diffs = np.array([]) # Calculate phase differences for i, test_onset in enumerate(test_onsets): # For the first event, we use the period of the IOI that follows the event, but only if it was the # first onset if i == 0 and start_at_tzero is True: period_next = test_sequence.iois[0] period_prev = period_next # For the last event, we use the period of the IOI that precedes the event elif i == len(test_onsets) - 1: period_prev = test_sequence.iois[i - 1] period_next = period_prev # For all other events, we need both the previous and the next IOI else: period_prev = test_sequence.iois[i - 1] period_next = test_sequence.iois[i] if test_onset > ref_onsets[i]: phase_diff = (test_onset - ref_onsets[i]) / period_next elif test_onset < ref_onsets[i]: phase_diff = (test_onset - ref_onsets[i]) / period_prev elif test_onset == ref_onsets[i]: phase_diff = 0.0 elif np.isnan(test_onset) or np.isnan(ref_onsets[i]): phase_diff = np.nan warnings.warn(thebeat._warnings.missing_values) else: raise ValueError("Something went wrong during the calculation of the phase differences." "Please check your data.") phase_diffs = np.append(phase_diffs, phase_diff) # Convert to degrees phase_diff_degrees = (phase_diffs * 360) % 360 # Return if circular_unit == "degrees": return phase_diff_degrees elif circular_unit == "radians": return np.deg2rad(phase_diff_degrees) else: raise ValueError("Please provide a valid circular unit. Either 'degrees' or 'radians'.")
[docs]def get_interval_ratios_from_dyads(sequence: Union[np.array, thebeat.core.Sequence, list]): r""" Return sequential interval ratios, calculated as: :math:`\textrm{ratio}_k = \frac{\textrm{IOI}_k}{\textrm{IOI}_k + \textrm{IOI}_{k+1}}`. Note that for *n* IOIs this property returns *n*-1 ratios. Parameters ---------- sequence The sequence from which to calculate the interval ratios. Can be a Sequence object, or a list or array of IOIs. Notes ----- The used method is based on the methodology from :cite:t:`roeskeCategoricalRhythmsAre2020`. """ if isinstance(sequence, thebeat.core.Sequence): sequence = sequence.iois return sequence[:-1] / (sequence[1:] + sequence[:-1])
[docs]def concatenate_sequences(sequences: np.typing.ArrayLike, name: Optional[str] = None): """Concatenate an array or list of :py:class:`~thebeat.core.Sequence` objects. Note ---- Only works for Sequence objects where all but the last provided object has an ``end_with_interval=True`` flag. Parameters ---------- sequences The to-be-concatenated objects. name Optionally, you can give the returned Sequence object a name. Returns ------- object The concatenated Sequence """ if not all(isinstance(obj, thebeat.core.Sequence) for obj in sequences): raise TypeError("Please pass only Sequence objects.") if not all(obj.end_with_interval for obj in sequences[:-1]): raise ValueError("All passed Sequence objects except for the final one need to end with an interval." "Otherwise we miss an interval between the onset of the " "final event in a Sequence and the onset of the first event in the next sequence.") if not all(obj.onsets[0] == 0.0 for obj in sequences): raise ValueError("Please only pass sequences that have their first event at onset 0.0") # Whether the sequence ends with an interval depends only on the final object passed end_with_interval = sequences[-1].end_with_interval # concatenate iois and create new Sequence iois = np.concatenate([obj.iois for obj in sequences]) return thebeat.core.Sequence(iois, end_with_interval=end_with_interval, name=name)
[docs]def concatenate_soundsequences(sound_sequences: np.typing.ArrayLike, name: Optional[str] = None): """Concatenate an array or list of :py:class:`~thebeat.core.SoundSequence` objects. Note ---- Only works for SoundSequence objects where all but the last provided object has an ``end_with_interval=True`` flag. Parameters ---------- sound_sequences The to-be-concatenated objects. name Optionally, you can give the returned SoundSequence object a name. Returns ------- object The concatenated SoundSequence """ if not all(isinstance(obj, thebeat.core.SoundSequence) for obj in sound_sequences): raise TypeError("Please pass only SoundSequence objects.") if not all(obj.end_with_interval for obj in sound_sequences[:-1]): raise ValueError("All passed SoundSequence objects except for the final one need to end with an interval." "Otherwise we miss an interval between the onset of the " "final event in a Sequence and the onset of the first event in the next sequence.") # Whether the sequence ends with an interval depends only on the final object passed end_with_interval = sound_sequences[-1].end_with_interval # concatenate iois and create new Sequence iois = np.concatenate([obj.iois for obj in sound_sequences]) seq = thebeat.core.Sequence(iois, end_with_interval=end_with_interval) # concatenate sounds all_sounds = [sound_obj for obj in sound_sequences for sound_obj in obj.sound_objects] return thebeat.core.SoundSequence(sound=all_sounds, sequence=seq, name=name)
[docs]def concatenate_soundstimuli(sound_stimuli: Union[np.ndarray, list], name: Optional[str] = None): """Concatenate an array or list of :py:class:`~thebeat.core.SoundStimulus` objects. Parameters ---------- sound_stimuli The to-be-concatenated objects. name Optionally, you can give the returned SoundStimulus object a name. Returns ------- object The concatenated SoundStimulus """ if not all(isinstance(obj, thebeat.core.SoundStimulus) for obj in sound_stimuli): raise TypeError("Please pass only SoundStimulus objects.") thebeat.helpers.check_sound_properties_sameness(sound_stimuli) samples = np.concatenate([obj.samples for obj in sound_stimuli]) fs = sound_stimuli[0].fs return thebeat.core.SoundStimulus(samples, fs, name=name)
[docs]def merge_soundstimuli(sound_stimuli: np.typing.ArrayLike[thebeat.SoundStimulus], name: Optional[str] = None): """Merge an array or list of :py:class:`~thebeat.core.SoundStimulus` objects. The sound samples for each of the objects will be overlaid on top of each other. Parameters ---------- sound_stimuli The to-be-merged objects. name Optionally, you can give the returned SoundStimulus object a name. Returns ------- object The merged SoundStimulus """ if not all(isinstance(obj, thebeat.core.SoundStimulus) for obj in sound_stimuli): raise TypeError("Can only overlay another SoundStimulus object on this SoundStimulus object.") # Check sameness of number of channels etc. thebeat.helpers.check_sound_properties_sameness(sound_stimuli) # Overlay sounds samples = thebeat.helpers.overlay_samples([obj.samples for obj in sound_stimuli]) return thebeat.core.SoundStimulus(samples=samples, fs=sound_stimuli[0].fs, name=name)
[docs]def merge_sequences(sequences: np.typing.ArrayLike[thebeat.core.Sequence], name: Optional[str] = None): """Merge an array or list of :py:class:`~thebeat.core.Sequence` objects. The the event onsets in each of the objects will be overlaid on top of each other. Parameters ---------- sequences The to-be-merged objects. name Optionally, you can give the returned Sequence object a name. Returns ------- object The merged Sequence """ # check if only Sequence objects were passed if not all(isinstance(obj, thebeat.core.Sequence) for obj in sequences): raise TypeError("Please pass only Sequence objects.") # concatenate onsets and sort onsets = np.concatenate([obj.onsets for obj in sequences]) onsets.sort() # Check for duplicates if np.any(onsets[1:] == onsets[:-1]): raise ValueError("The merged Sequence object would contain duplicate onsets.") return thebeat.core.Sequence.from_onsets(onsets, name=name)
[docs]def merge_soundsequences(sound_sequences: list[thebeat.core.SoundSequence], name: Optional[str] = None): """Merge a list or array of :py:class:`~thebeat.core.SoundSequence` objects. The event onsets in each of the objects will be overlaid on top of each other, after which the sounds Parameters ---------- sound_sequences The to-be-merged objects. name Optionally, you can give the returned SoundSequence object a name. Returns ------- object The merged SoundSequence """ # check if only SoundSequence objects were passed if not all(isinstance(obj, thebeat.core.SoundSequence) for obj in sound_sequences): raise TypeError("Please pass only SoundSequence objects.") # Get all onsets and sounds all_onsets = np.concatenate([obj.onsets for obj in sound_sequences]) all_sounds = [sound_obj for obj in sound_sequences for sound_obj in obj.sound_objects] # Sort sounds in same order as onsets sounds_sorted = [all_sounds[i] for i in np.argsort(all_onsets)] # Sort onsets onsets and create new Sequence onsets_sorted = np.sort(all_onsets) seq = thebeat.Sequence.from_onsets(onsets_sorted) return thebeat.core.SoundSequence(sound=sounds_sorted, sequence=seq, name=name)