Skip to content

Instantly share code, notes, and snippets.

@cpmpercussion
Last active January 14, 2021 08:42
Show Gist options
  • Save cpmpercussion/1505b74b573d106df6b820b960951567 to your computer and use it in GitHub Desktop.
Save cpmpercussion/1505b74b573d106df6b820b960951567 to your computer and use it in GitHub Desktop.
A script to parse lots of MIDI files into a simple melody-only 16th-note-only NumPy array format. Generates worker processes to use all available CPU power.
from music21 import converter, instrument, note, chord, stream, midi
import numpy as np
import pandas as pd
import os
import time
import h5py
import argparse
import multiprocessing
parser = argparse.ArgumentParser()
parser.add_argument('-d', '--directory',
action='store', # tell to store a value
dest='directory', # use `username` to access value
help='The directory to search for MIDI files.')
parser.add_argument('-n', '--nthreads',
action='store',
dest='nthreads',
help="Number of threads to start.",
type=int,
default=10)
args = parser.parse_args()
midi_directory = args.directory
nthreads = args.nthreads
np.set_printoptions(threshold=10e6)
# MELODY_NOTE_ON = [0, 127] # (note on at that MIDI pitch)
MELODY_NOTE_OFF = 128 # (stop playing all previous notes)
MELODY_NO_EVENT = 129 # (no change from previous event)
def streamToNoteArray(stream):
"""
Convert a Music21 sequence to a numpy array of int8s into Melody-RNN format:
0-127 - note on at specified pitch
128 - note off
129 - no event
"""
# Part one, extract from stream
# = np.int(np.round(stream.flat.highestTime / 0.25)) # in semiquavers
stream_list = []
for element in stream.flat:
if isinstance(element, note.Note):
stream_list.append([np.round(element.offset / 0.25), max(np.round(element.quarterLength / 0.25),1), element.pitch.midi])
elif isinstance(element, chord.Chord):
stream_list.append([np.round(element.offset / 0.25), max(np.round(element.quarterLength / 0.25),1), max([p.midi for p in element.pitches])])
np_stream_list = np.array(stream_list, dtype=np.int)
df = pd.DataFrame({'on': np_stream_list.T[0], 'off': np_stream_list.T[0] + np_stream_list.T[1], 'pitch': np_stream_list.T[2]})
df = df.sort_values(['on','pitch'], ascending=[True, False]) # sort the dataframe properly
df = df.drop_duplicates(subset=['on']) # drop duplicate values
# part 2, convert into a sequence of note events
output = np.zeros(df.off.max()+1, dtype=np.int16) + np.int16(MELODY_NO_EVENT) # set array full of no events by default.
# Fill in the output list
for row in df.iterrows():
output[row[1].on] = row[1].pitch # set note on
output[row[1].off] = MELODY_NOTE_OFF
return output
def process_file_worker(q, counter, file_lock):
while True:
midi_file = q.get()
if midi_file is None:
break
start = time.time()
try:
s = converter.parse(midi_file)
except Exception as e:
print("exception while parsing midi")
print(e)
q.task_done()
continue
arr = streamToNoteArray(s.parts[0]) # just extract first voice
melody_array = np.array(arr)
with file_lock:
h5f_file = h5py.File('midi_arrays.h5', 'a')
counter.value += 1
h5f_file.create_dataset('midi'+str(counter.value), data=melody_array)
print("{}: Converted: {} it took {:0.4f}".format(counter.value, midi_file, time.time() - start))
h5f_file.close()
q.task_done()
def main():
print("Going to search:", midi_directory)
midi_files = []
start = time.time()
for root, dirs, files in os.walk(midi_directory):
for file in files:
if ".mid" in file:
midi_files.append(root + os.sep + file)
print("Found", len(midi_files), "midi files.")
print("Search took", time.time() - start)
#h5f_file = h5py.File('midi_arrays.h5', 'w')
file_lock = multiprocessing.Lock() # Protects the file from concurrent access
q = multiprocessing.JoinableQueue()
counter = multiprocessing.Value('i', 0)
multiprocessing.Pool(nthreads, process_file_worker, (q, counter, file_lock,))
for file in midi_files:
q.put(file)
q.join() # block until tasks are done.
print("Done.")
#h5f_file.close()
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment