import itertools
from typing import Union
import ms3
import music21 as m21
import pandas as pd
from music21.stream.base import Measure
from pandas import DataFrame
import musif.extract.constants as C
from musif.cache import isinstance
from musif.extract.constants import PLAYTHROUGH
from musif.logs import pwarn
from musif.musicxml.tempo import get_number_of_beats
file_names = []
repeat_bracket = False
[docs]
def process_musescore_file(file_path: str, expand_repeats: bool = False) -> DataFrame:
"""
Given a mscx file name, parses the file using ms3 library and returns a dataframe containing all harmonic information.
Adds Playthrough column that contains number of every measure in the cronological order
Parameters
----------
file_path: str
Path to mscx file
expand_repeats: bool
Directory path to musescore file
Returns
-------
harmonic_analysis: str
Dataframe containing harmonic information
"""
msc3_score = ms3.score.Score(file_path, logger_cfg={"level": "ERROR"})
harmonic_analysis = msc3_score.mscx.expanded()
harmonic_analysis.reset_index(inplace=True)
if expand_repeats:
harmonic_analysis = msc3_score.mscx.expanded(unfold=True)
harmonic_analysis.reset_index(inplace=True)
# unfolded_mc=msc3_score.mscx.measures().set_index("mc").next
# mn = next2sequence(unfolded_mc)
# mn = ms3.utils.next2sequence(unfolded_mc)
# mn = pd.Series(mn, name="mc_playthrough")
# harmonic_analysis = ms3.utils.unfold_repeats(harmonic_analysis, mn)
harmonic_analysis.rename(columns={"mc_playthrough": PLAYTHROUGH}, inplace=True)
else:
harmonic_analysis = msc3_score.mscx.expanded()
harmonic_analysis.reset_index(inplace=True)
if harmonic_analysis.mn[0] == 0:
harmonic_analysis[PLAYTHROUGH] = harmonic_analysis["mc"]
else:
harmonic_analysis[PLAYTHROUGH] = harmonic_analysis["mn"]
_include_beats_column(harmonic_analysis)
return harmonic_analysis
[docs]
def expand_score_repetitions(score, repeat_elements: list):
"""
Given a music21 Score object and a list containing repetition elements, expands the score object and
places all measures in their correspondent cronological order
Parameters
----------
score: music21 Score
Score object parsed by music21
expand_repeats: list
List containing all repetition elements
Returns
-------
final_score: music21 Score
Score object with expanded repetitions
"""
score = _expand_repeat_bars(score)
final_score = m21.stream.Score()
final_score.metadata = score.metadata
if len(repeat_elements) > 0:
for part in score.parts:
p = _expand_part(part, repeat_elements)
final_score.insert(0, p)
else:
final_score = score
return final_score
def _measure_ranges(
instrument_measures: int,
init: int,
end: int,
iteration: int = None,
offset: int = None,
twocompasses_flag: bool = False,
remove_repetition_marks_flag: bool = False,
) -> list:
measures = []
o = offset
last_offset = (
0.0 if int(init) - 6 < 0 else instrument_measures[int(init) - 6].offset
)
init_index, end_index = _find_init_and_end_indexes(instrument_measures, init, end)
for i in range(init_index, end_index + 1):
if (
not i < 0
and i < len(instrument_measures)
and instrument_measures[i].measureNumber >= int(init)
and instrument_measures[i].measureNumber <= int(end)
):
if not twocompasses_flag:
compass = instrument_measures[i].quarterLength
m = m21.stream.Measure(number=instrument_measures[i].measureNumber)
if remove_repetition_marks_flag:
m.elements = [
e
for e in instrument_measures[i].elements
if not isinstance(e, m21.repeat.RepeatMark)
]
else:
m.elements = instrument_measures[i].elements
m.quarterLength = compass
if offset is None:
m.offset = last_offset
else:
m.offset = o
o += compass
measures.append(m)
if (
instrument_measures[i].measureNumber != 0.0
and instrument_measures[i].offset != 0
):
last_offset = instrument_measures[i].offset + compass
twocompasses_flag = False
if iteration == 2:
last_measure = instrument_measures[i + 1]
last_measure.offset = measures[-1].offset
measures = measures[:-1] + [last_measure]
return measures
def _find_init_and_end_indexes(
instrument_measures: list, init: int, end: int
) -> Union[int, int]:
init_index = instrument_measures.index(
[m for m in instrument_measures if m.measureNumber == init][0]
)
end_compass = [m for m in instrument_measures if m.measureNumber == end]
end_index = (
instrument_measures.index(end_compass[0])
if len(end_compass) > 0
else len(instrument_measures) - 1
)
return init_index, end_index
def _get_instrument_elements(part):
measures = []
for elem in part:
if isinstance(elem, m21.stream.Measure):
measures.append(elem)
# Change the note offsets to avoid problems due to the slurs
last_offset = 0
last_duration = 0
for note in elem:
if (
isinstance(note, m21.note.Note)
or isinstance(note, m21.note.Rest)
or isinstance(note, m21.chord.Chord)
):
note.offset = last_offset + last_duration
last_offset = note.offset
last_duration = note.duration.quarterLength
return measures
def get_repetition_elements(score, v=True):
global repeat_bracket
repeat_bracket = False
repeat_elements = set()
for instruments in score.parts:
instr_repeat_elements = []
for elem in instruments.elements:
if isinstance(elem, m21.stream.Measure):
for e in elem:
if isinstance(e, m21.repeat.RepeatMark) and not isinstance(
e, m21.bar.Repeat
):
measure = e.measureNumber
if elem.numberSuffix in ["X1", "X2"]: # Exception
measure += 1
instr_repeat_elements.append((measure, e.name))
elif isinstance(elem, m21.spanner.RepeatBracket):
repeat_bracket = True
string_e = str(elem)
index = string_e.find("music21.stream.Measure")
string_e = string_e[index:].replace("music21.stream.Measure ", "")
measure = (
string_e.split(" ")[0].strip().replace("X1", "").replace("X2", "")
)
instr_repeat_elements.append(
(int(measure), "repeat bracket" + elem.number)
)
repeat_elements.update(instr_repeat_elements)
repeat_elements = sorted(list(repeat_elements), key=lambda tup: tup[0])
if v:
print("The repeat elements found in this score are: " + str(repeat_elements))
return repeat_elements
def _include_beats_column(harmonic_analysis: DataFrame) -> None:
harmonic_analysis["beats"] = 0
for index, measure in enumerate(harmonic_analysis[PLAYTHROUGH].values):
if measure <= 1:
beat = int(
measure
+ float(harmonic_analysis.mc_onset[index])
* get_number_of_beats(harmonic_analysis.timesig[index])
)
else:
time_sig = get_number_of_beats(harmonic_analysis.timesig[index - 1])
beat = int(
(measure - 1) * time_sig
+ 1
+ harmonic_analysis.mc_onset[index]
* get_number_of_beats(harmonic_analysis.timesig[index])
)
harmonic_analysis.at[index, "beats"] = beat
def _get_beat_position(
beats_timesignature: int, number_of_beats: int, pos: int
) -> float:
if number_of_beats == beats_timesignature:
return pos
else:
return (pos / beats_timesignature) + 1
def _expand_repeat_bars(score):
final_score = m21.stream.Score()
final_score.metadata = score.metadata
exist_repetition_bars = False
# find repeat bars and expand
for instr in score.parts:
part_measures = _get_instrument_elements(
instr.elements
) # returns the measures with repetitions
last_measure = part_measures[-1].measureNumber
part_measures_expanded = []
startsin0 = part_measures[0].measureNumber == 0 # Everything should be -1
repetition_bars = []
# Find all repetitions in that instrument
for elem in instr.elements:
if isinstance(elem, m21.stream.Measure):
exist_repetition_bars = _examine_measure(repetition_bars, elem)
elif isinstance(elem, m21.spanner.RepeatBracket):
_examine_repeat_bracket(instr, repetition_bars, elem)
repetition_bars = sorted(list(repetition_bars), key=lambda tup: tup[0])
start = 0 if startsin0 else 1
_append_repetitions(
final_score,
exist_repetition_bars,
instr,
part_measures,
last_measure,
part_measures_expanded,
repetition_bars,
start,
)
return final_score if exist_repetition_bars else score
def _append_repetitions(
final_score,
exist_repetition_bars,
instr,
part_measures,
last_measure,
part_measures_expanded,
repetition_bars,
start,
):
if exist_repetition_bars:
p = m21.stream.Part()
p.id = instr.id
p.partName = instr.partName
for repetition_bar in repetition_bars:
measure = _measure_ranges(
part_measures, repetition_bar[0], repetition_bar[0]
)[0].quarterLength
if repetition_bar[1] == "start":
start = _add_start(
part_measures,
part_measures_expanded,
start,
repetition_bar,
measure,
)
elif repetition_bar[1] == "end":
start = _add_end(
part_measures,
part_measures_expanded,
repetition_bars,
start,
repetition_bar,
measure,
)
if start < last_measure:
measure = _measure_ranges(part_measures, start, start + 1)[0].quarterLength
offset = part_measures_expanded[-1][-1].offset
part_measures_expanded.append(
_measure_ranges(
part_measures, start, last_measure + 1, offset=offset + measure
)
)
p.elements = list(itertools.chain(*tuple(part_measures_expanded)))
final_score.insert(0, p)
def _add_end(
part_measures,
part_measures_expanded,
repetition_bars,
start,
repetition_bar,
measure,
):
if len(part_measures_expanded) > 0:
offset = part_measures_expanded[-1][-1].offset
else:
offset = 0
casilla_1 = (
True
if any(re[1] == "1" and re[0] <= repetition_bar[0] for re in repetition_bars)
else False
)
casilla_2 = None
if casilla_1:
casilla_2 = [
re for re in repetition_bars if re[1] == "2" and re[0] > repetition_bar[0]
]
casilla_2 = None if len(casilla_2) == 0 else casilla_2[0]
part_measures_expanded.append(
_measure_ranges(
part_measures,
init=start,
end=repetition_bar[0],
offset=offset + measure,
remove_repetition_marks_flag=True,
)
) # This should erase the repetition marks
if casilla_2 != None:
part_measures_expanded.append(
_measure_ranges(
part_measures,
start,
casilla_2[0],
iteration=2,
offset=part_measures_expanded[-1][-1].offset + measure,
)
)
start = casilla_2[0] + 1
else:
part_measures_expanded.append(
_measure_ranges(
part_measures,
init=start,
end=repetition_bar[0],
offset=part_measures_expanded[-1][-1].offset + measure,
)
)
start = repetition_bar[0] + 1
return start
def _add_start(part_measures, part_measures_expanded, start, repetition_bar, measure):
if len(part_measures_expanded) > 0:
offset = part_measures_expanded[-1][-1].offset
else:
offset = 0
start_measures = _measure_ranges(
part_measures, start, repetition_bar[0] - 1, offset=offset + measure
)
if len(start_measures) > 0:
part_measures_expanded.append(start_measures)
start = repetition_bar[0]
return start
def _examine_repeat_bracket(instr, repetition_bars, elem):
string_e = str(elem)
index = string_e.find("music21.stream.Measure")
measure = string_e[index:].replace("music21.stream.Measure", "")[1:3].strip()
repetition_bars.append((int(measure), elem.number))
index = instr.elements.index(elem)
elem.elements = instr.elements[:index] + instr.elements[index + 1 :]
def _examine_measure(repetition_bars, elem):
for e in elem:
if isinstance(e, m21.bar.Repeat):
exist_repetition_bars = True
if e.direction == "start":
repetition_bars.append((e.measureNumber, "start"))
elif e.direction == "end":
repetition_bars.append((e.measureNumber, "end"))
index = elem.elements.index(e)
elem.elements = elem.elements[:index] + elem.elements[index + 1 :]
return exist_repetition_bars
def _get_measures_list(part_measures: list, repeat_elements: list):
measures_list = []
startsin0 = part_measures[0].measureNumber == 0 # Everything should be -1
there_is_fine = False
there_is_segno = False
# 1. find the fine and segno
if any([r[1] == "fine" for r in repeat_elements]):
f = [x[0] for x in repeat_elements if x[1] == "fine"][0]
there_is_fine = True
if any([r[1] == "segno" for r in repeat_elements]):
s = [x[0] for x in repeat_elements if x[1] == "segno"][0]
there_is_segno = True
# Having all the repetition elements, get the measures
if there_is_segno:
before_segno = _measure_ranges(part_measures, 1 if not startsin0 else 0, s - 1)
measures_list.append(
before_segno
) # S -1 OR S-> when segno in compass 1, s, else s-1?
dc_time_signature = [
y
for x in before_segno
for y in x.elements
if isinstance(y, m21.meter.TimeSignature)
]
elif there_is_fine:
measures_list.append(
_measure_ranges(
part_measures,
1 if not startsin0 else 0,
f - 1,
iteration=1 if repeat_bracket else None,
)
)
else:
measures_list.append(
_measure_ranges(
part_measures,
1 if not startsin0 else 0,
len(part_measures),
iteration=1 if repeat_bracket else None,
)
)
for repeat in repeat_elements:
repeat_measure = _measure_ranges(part_measures, repeat[0], repeat[0])
compass = repeat_measure[0].quarterLength
if repeat[1] == "segno":
offset = measures_list[-1][-1].offset
segno_part = _measure_ranges(
part_measures,
s,
f - 1 if there_is_fine else len(part_measures),
iteration=1 if repeat_bracket else None,
offset=offset + compass,
remove_repetition_marks_flag=True,
)
measures_list.append(segno_part) # Segno to Fine
elif repeat[1] == "fine":
twoCompasses = False
"""if len(repeat_measure) > 0:
twoCompasses = True"""
offset = measures_list[-1][-1].offset
fine_part = _measure_ranges(
part_measures,
f,
len(part_measures),
offset=offset + compass,
twocompasses_flag=twoCompasses,
remove_repetition_marks_flag=True,
)
measures_list.append(fine_part) # Fine to end
elif repeat[1] == "al segno" or repeat[1] == "dal segno":
offset = measures_list[-1][-1].offset
# segnos' compass time signature
segno_time_measure = [
x
for x in segno_part[0].elements
if isinstance(x, m21.meter.TimeSignature)
]
segno_time_measure = (
segno_time_measure
if len(segno_time_measure) != 0
else dc_time_signature[-1]
)
alsegno_list = _measure_ranges(
part_measures,
s,
f - 1 if there_is_fine else len(part_measures),
iteration=2 if repeat_bracket else None,
offset=offset + compass,
remove_repetition_marks_flag=True,
)
if not any(
isinstance(x, m21.meter.TimeSignature) for x in alsegno_list[0].elements
):
# we reset the time signature that was on the dacapo
alsegno_list[0].elements = tuple(
[segno_time_measure] + list(alsegno_list[0].elements)
)
measures_list.append(alsegno_list) # Segno to fine
elif repeat[1] == "da capo":
offset = measures_list[-1][-1].offset
if startsin0 and there_is_fine and not repeat_bracket:
f += 1
dacapo_list = _measure_ranges(
part_measures,
0 if startsin0 else 1,
f - 1 if there_is_fine else len(part_measures),
iteration=2 if repeat_bracket else None,
offset=offset + compass,
remove_repetition_marks_flag=True,
)
measures_list.append(dacapo_list)
return measures_list
def _expand_part(part, repeat_elements: list):
part_measures = _get_instrument_elements(
part.elements
) # returns the measures with repetitions
p = m21.stream.Part()
p.id = part.id
p.partName = part.partName
part_measures_expanded = _get_measures_list(
part_measures, repeat_elements
) # returns the measures expanded
part_measures_expanded = list(itertools.chain(*part_measures_expanded))
# Assign a new continuous measure number to every measure
measure_number = 0 if part_measures_expanded[0].measureNumber == 0 else 1
for i, e in enumerate(part_measures_expanded):
m = m21.stream.Measure(number=measure_number)
m.elements = e.elements
m.offset = e.offset
m.quarterLength = e.quarterLength
part_measures_expanded[i] = m
measure_number += 1
p.elements = part_measures_expanded
return p
def _calculate_total_number_of_beats(time_signatures: list) -> int:
"""
Given a list of time signatures, sums the beats of each time signature
"""
# periods = _get_timesignature_periods(time_signatures)
return sum(get_number_of_beats(ts) for ts in time_signatures)
def cast_mixed_dtypes(col):
if "mixed" in pd.api.types.infer_dtype(col):
notna = col.notna()
newtype = col[notna].map(type).mode()[0]
if issubclass(newtype, float):
# convert fractions like '1/3' to float
col[notna] = col[notna].apply(pd.eval)
col = col.convert_dtypes()
elif issubclass(newtype, int):
# convert to string
col = col.astype("string")
return col