Last active
December 27, 2022 02:40
-
-
Save fftlxyz/eda1a7f3365f3962ef02402f748b5bf0 to your computer and use it in GitHub Desktop.
噪音检测、录制
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
效果: | |
如果我家环境比较安静,在楼上发出噪音的时候,比如“跳”、"走路比较重"的时候可以捕获到,如果我家本身比较嘈杂,会录到比较多的无用的数据,我暂时还没有比较好的方法来处理。 | |
基本实现: | |
1. pyaudio 录音 | |
在mac上搞的,通过float32录出来的信号值为大于1, 乘32767和还原出来的幅值,在float信号小于1和16位是一样的(测出来的) | |
使用float是为了探测更多的db变化。 | |
3. 噪音分析 | |
频率中心FC的变化 是一个很好的判断标准,就我的这个场景而言,常常是最大幅值变化不明显,但是fc发生了变化。 | |
频率中心的概念,我是参考的这里: <<基于支持向量机的音频分类与分割>> 白亮 老松杨 陈剑赟 吴玲达, https://www.jsjkx.com/CN/Y2005/V32/I4/87 | |
4. wave保存 | |
转为了16位的格式保存,float32格式有问题。。。 | |
其他参考资料: | |
1. 分贝检测程序,超过75dB马上警报 https://rotonnn.github.io/2019/12/01/ddd-md/ | |
2. LittleSleeper: A baby sleep monitor using a Raspberry Pi and Python, https://aicbt.com/raspberry-pi-sleep-monitor/ | |
3. 音频属性相关:声道、采样率、采样位数、样本格式、比特率 , https://www.cnblogs.com/yongdaimi/p/10722355.html |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import time | |
import pyaudio | |
import numpy as np | |
import wave | |
import sys | |
import math | |
CHUNK = 1024 | |
CHANNELS = 1 | |
RATE = 44100 | |
RECORD_SECONDS = 5 | |
FORMAT = pyaudio.paFloat32 | |
NOISY_LIMIT = 3000 | |
def get_signal_features(datas): | |
noisy_sampled = 0 | |
for amp in datas: | |
if abs(amp) > NOISY_LIMIT: | |
noisy_sampled = noisy_sampled + 1 | |
n = len(datas) | |
fftfreq = (np.fft.fftfreq(n)) * RATE | |
ffted = np.fft.fft(datas) | |
ffted_n = ffted / n | |
fc = np.sum(np.abs(fftfreq) * np.power(np.abs(ffted_n), 2)) / np.sum(np.power(np.abs(ffted_n), 2)) | |
max_amf = np.max(np.abs(datas)) | |
return fc, max_amf, noisy_sampled | |
def check_is_noisy(fc, max_amf, noisy_sampled, last_fc, last_max_amf, last_noisy_sampled): | |
if 80 < fc < 200 and max_amf > 2000: | |
return True, "fc and max_amf" | |
if noisy_sampled > 200: | |
return True, "noisy_sampled over 200" | |
if max_amf > last_max_amf and get_diff_percent(last_fc, fc) > 0.5: | |
return True, "fc diff 0.5" | |
if max_amf > last_max_amf \ | |
and noisy_sampled > last_noisy_sampled and get_diff_percent(last_noisy_sampled, noisy_sampled) > 0.5: | |
return True, "noisy_sampled diff 0.5" | |
return False, "" | |
def get_diff_percent(a, b): | |
min_value = a | |
max_value = b | |
if b < a: | |
min_value = b | |
max_value = a | |
if min_value == 0: | |
return 1 | |
return (max_value - min_value) / min_value | |
if __name__ == '__main__': | |
p = pyaudio.PyAudio() | |
stream = p.open(format=FORMAT, | |
channels=CHANNELS, | |
rate=RATE, | |
input=True, | |
output=True, | |
frames_per_buffer=CHUNK * 10) | |
last_save_ts = 0 | |
last_fc, last_max_amf, last_noisy_sampled = -1, -1, -1 | |
while True: | |
datas = [] | |
ts = time.time() | |
for i in range(0, int(RATE / CHUNK * RECORD_SECONDS)): | |
data = stream.read(CHUNK) | |
# datas.extend(np.frombuffer(data), np.int16) | |
# data_array_f = array('f', data) | |
# datas.extend(np.asarray(data_array_f) * 32767) | |
datas.extend(np.frombuffer(data, np.float32) * 32767) | |
fc, max_amf, noisy_sampled = get_signal_features(datas) | |
if math.isnan(fc) or max_amf < 1: | |
continue | |
noisy_cond, reason = check_is_noisy(fc, max_amf, noisy_sampled, last_fc, last_max_amf, last_noisy_sampled) | |
record_flow_cond = ts - last_save_ts > 60 | |
if record_flow_cond and noisy_cond: | |
ts_str = time.strftime("%Y%m%d-%H%M%S", time.localtime(ts)) | |
wf = wave.open("noisy-{}.wav".format(ts_str), 'wb') | |
wf.setnchannels(CHANNELS) | |
wf.setsampwidth(2) | |
wf.setframerate(RATE) | |
wf.writeframes(np.asarray(datas).astype(np.int16).tobytes()) | |
wf.close() | |
last_save_ts = ts | |
ts_str = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(ts)) | |
print("{} 频率中心fc:{:.2f} 最大声音:{:.2f}db({:.2f}) noisy_sampled:{} file_saved:{} record_cond:{}({})". | |
format(ts_str, fc, 20 * np.log10(max_amf), max_amf, noisy_sampled, record_flow_cond and noisy_cond, | |
noisy_cond, reason)) | |
last_fc, last_max_amf, last_noisy_sampled = fc, max_amf, noisy_sampled | |
stream.stop_stream() | |
stream.close() | |
p.terminate() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment