Last active
June 11, 2019 20:15
-
-
Save moui72/cd2969bd8475e6e707f4a99c954f2679 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from os import walk | |
from os.path import join, basename | |
from pydub import AudioSegment | |
from json import dump, dumps | |
import pydub.scipy_effects | |
import numpy | |
import csv | |
import time | |
def keyByVal(haystack, needle): | |
for key, val in haystack.iteritems(): | |
if needle == val: | |
return key | |
def chunk(seq, sample_rate): | |
""" Return an AudioSegment in sample_rate size chunks | |
Keyword arguments | |
sequence -- an AudioSegment | |
sample_rate -- width in ms of chunks returned | |
""" | |
return [seq[i:i+sample_rate] | |
for i in range(0, len(seq), sample_rate)] | |
def segment(chunks, threshold, sample_rate): | |
""" Classify and group a series of chunks of audios as silence or noise | |
Keyword arguments | |
chunks -- a list of dBFS levels for each chunk | |
thresho -- dBFS threshold below which a chunk is considered silence | |
sample_rate -- the width, in ms, of each chunk | |
(needed for calculating timestamps) | |
""" | |
output = [{"start": -1, "stop": 0, "below": None}] | |
candidate = None | |
min_len = {True: 250, False: 1000} | |
for t, frame_decibel in enumerate(chunks): | |
frame_now = t * sample_rate | |
frame_below = frame_decibel <= threshold | |
if candidate: | |
if frame_below is not candidate["below"]: | |
candidate = None | |
elif frame_now - candidate["start"] > min_len[candidate["below"]]: | |
output[-1]["stop"] = candidate["start"] | |
output.append(candidate) | |
candidate = None | |
elif frame_below is not output[-1]["below"]: | |
candidate = { | |
"start": frame_now, | |
"stop": None, | |
"below": frame_below | |
} | |
output[-1]["stop"] = t * sample_rate | |
return output[1:] | |
def get_silences(filename, sample_rate): | |
""" For a given file, apply a band_pass_filter and then search for the | |
lowest silence threshold that will split the file into at least 2 chunks of | |
reasonable size, where the first and last chunks are silence. Return the | |
succesful segmentation (array of silence/noise segments), or an empty array. | |
Keyword arguments | |
filename -- path to wav file | |
sample_rate -- width in ms of chunks | |
""" | |
global failures, times, thresholds, main_start_time | |
start_time = time.time() | |
# omit the first and lsat 150ms from the sound file | |
# will be added back in as presumed silence later on | |
# this is due to the prevalance of mouse clicks at the | |
# very start and very end | |
audio = AudioSegment.from_wav(filename) | |
audio = audio.band_pass_filter(150, 3100) | |
# divide audio into sample_rate sized chunks and get a list of the | |
# dBFS levels for each chunk | |
decibels = [frame.dBFS for frame in chunk(audio, sample_rate)] | |
# we will search from min to max dBFS for a threshold that will segment | |
# the file as desired | |
m_min = min(decibels) | |
m_max = max(decibels) | |
# winner will capture the first successful segmentation | |
winner = [] | |
for m in numpy.arange(m_min + 5, m_max - 5, step=0.1): | |
segments = segment(decibels, m, sample_rate) | |
if len(segments) < 3: | |
# not enough chunks segments :( | |
continue | |
if segments[0]["below"] != True: | |
# first segment is noise :( | |
continue | |
winner = segments | |
thresholds[basename(filename)] = m | |
break | |
end_time = time.time() | |
times[basename(filename)] = end_time - start_time | |
restr = "{} {:<16.16} ({:.2f} -- {})".format( | |
"+" if winner else "-", | |
basename(filename), | |
times[basename(filename)] * 1000, | |
time.strftime("%Mm %Ss", time.gmtime(end_time - main_start_time)) | |
) | |
print restr | |
if not winner: | |
failures.append(basename(filename)) | |
return None | |
return winner | |
def one_file_per_row(file, value): | |
"""returns a list of values for file (intended for single-file-per-row | |
output to csv) | |
Keyword arguments | |
file -- the .wav file being processed | |
value -- the object that was the result of successful search+segmentation | |
""" | |
initial = None | |
final = None | |
if value: | |
silences = [x for x in value if x["below"]] | |
if silences: | |
initial = silences[0] | |
if len(silences) > 1: | |
final = silences[-1] | |
return [ | |
basename(file), # filename | |
initial["stop"] - initial["start"] if initial else "NA", # leading | |
final["stop"] - final["start"] if final else "NA", # trailing | |
value[-1]["stop"] if value else "NA", | |
"F" in basename(file), | |
"Q" in basename(file), | |
"Y" in basename(file) | |
] | |
if __name__ == "__main__": | |
main_start_time = time.time() | |
# get files in all directories in a directory | |
audio_path = "/Users/tyler/Downloads/recordings" | |
audio_files = [] | |
for root, dirs, files in walk(audio_path): | |
if "prac" in dirs: | |
# eliminat practice items | |
dirs.remove("prac") | |
if "108" in dirs: | |
# this is a glitch | |
dirs.remove("108") | |
if "0" in dirs: | |
# this is test data | |
dirs.remove("0") | |
if "1000" in dirs: | |
# this is test data | |
dirs.remove("1000") | |
for name in files: | |
if("wav" in name and "P" not in name and "108" not in name): | |
# only sound files of Experimental items (E for EXP) | |
audio_files.append(join(root, name)) | |
print("built dir paths in " + str(round(time.time() - main_start_time, 2))) | |
failures = [] | |
times = {} | |
thresholds = {} | |
silences_in_files = {basename(a): get_silences(a, 50) for a in audio_files} | |
maxtime = max(times.values()) | |
mintime = min(times.values()) | |
maxm = max(thresholds.values()) | |
minm = min(thresholds.values()) | |
print("\n{:=^80}\n".format(" Done! ")) | |
main_stop_time = time.time() | |
report = { | |
"average search": round(numpy.mean(times.values()), 2) * 1000, | |
"longest search": { | |
"time (ms)": maxtime * 1000, | |
"recording": keyByVal(times, maxtime) | |
}, | |
"shortest search": { | |
"time (ms)": mintime * 1000, | |
"recording": keyByVal(times, mintime) | |
}, | |
"total time": (main_stop_time - main_start_time), | |
"average threshold": round(numpy.mean(thresholds.values()), 2), | |
"min threshold": { | |
"threshold": minm, | |
"recording": keyByVal(thresholds, minm) | |
}, | |
"max threshold": { | |
"threshold": maxm, | |
"recording": keyByVal(thresholds, maxm) | |
}, | |
"fails": len(failures), | |
"failures": failures, | |
"thresholds": thresholds, | |
"times": times | |
} | |
print(dumps( | |
{ | |
k: report[k] for k in ( | |
j for j in report.keys() if j not in ( | |
"failures", "times", "thresholds" | |
) | |
) | |
}, indent=2) | |
) | |
dts = time.strftime("%Y%m%d-%H%M%S") | |
with open('report-'+dts+'.json', 'w') as outjson: | |
dump(report, outjson, indent=2) | |
with open( | |
'timings-one_rec_per_row-'+dts+'.csv', 'w') as outcsv: | |
writer = csv.writer(outcsv) | |
writer.writerow([ | |
"Filename", | |
"Leading", | |
"Trailing", | |
"RecLength", | |
"isFiller", | |
"isQ", | |
"isGP" | |
]) | |
for afile in sorted(silences_in_files): | |
writer.writerow( | |
one_file_per_row(afile, silences_in_files[afile]) | |
) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment