Created
May 8, 2024 21:46
-
-
Save jaggzh/2d33c02a03d324bf7cb2b19c92493397 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# datagen.py | |
import numpy as np | |
import librosa | |
def load_audio_chunk(audio_path, start_frame, dur_s, sr): | |
# Load a specific chunk of the audio file | |
audio, _ = librosa.load(audio_path, sr=sr, mono=True, offset=start_frame/sr, duration=dur_s) | |
return audio | |
def preprocess_audio(audio_data): | |
# Normalize audio to the range [-1, 1] | |
max_val = np.max(np.abs(audio_data)) | |
return audio_data / max_val if max_val > 0 else audio_data | |
def gen_audio_clips(audio_path, offset_s=0, dur_s=None, verbose=0, *, sr, chunklen_s, noise_floor_perc, noise_frac, train_frames, train_skip, batch_count): | |
total_samples = librosa.get_duration(path=audio_path, sr=sr) * sr | |
start_frame = 0 | |
silence_eval_s = 0.2 # Length of each chunk for evaluating noise floor | |
silence_eval_samples = int(sr * silence_eval_s) | |
while start_frame + train_frames < total_samples: | |
audio_chunk = load_audio_chunk(audio_path, start_frame, chunklen_s, sr) | |
audio_chunk = preprocess_audio(audio_chunk) | |
# Calculate noise floor more robustly | |
max_amplitudes = [] | |
for j in range(0, len(audio_chunk), silence_eval_samples): | |
chunk = audio_chunk[j:j + silence_eval_samples] | |
max_amplitudes.append(np.max(np.abs(chunk))) | |
max_amplitudes.sort() | |
noise_floor_index = int(len(max_amplitudes) * noise_floor_perc / 100) | |
noise_floor = max_amplitudes[noise_floor_index] * noise_frac | |
if verbose>0: | |
print('') | |
print(f"Noise frac perc: {float(noise_floor_perc):.1}") | |
print(f" Noise frac: {noise_frac:.3}") | |
print(f"Amplitudes : {max_amplitudes}") | |
print(f"Noise floor idx: {noise_floor_index}") | |
print(f"Noise floor : {noise_floor}") | |
batch_data = [] | |
noise_masks = [] | |
voice_masks = [] | |
for i in range(0, len(audio_chunk) - train_frames, train_skip): | |
end_frame = i + train_frames | |
if end_frame > len(audio_chunk): | |
break | |
batch = audio_chunk[i:end_frame].reshape(1, train_frames, 1) | |
# Determine masks | |
max_amplitude = np.max(np.abs(batch)) | |
is_noise = max_amplitude <= noise_floor | |
noise_mask = 1.0 if is_noise else 0.0 | |
voice_mask = 0.0 if is_noise else 1.0 | |
batch_data.append(batch) | |
noise_masks.append(noise_mask) | |
voice_masks.append(voice_mask) | |
# Yield batch if count reaches batch_count | |
if len(batch_data) == batch_count: | |
xd = np.array(batch_data) | |
xn = np.array(noise_masks).reshape(-1, 1) | |
xv = np.array(voice_masks).reshape(-1, 1) | |
yield xd, xn, xv | |
batch_data = [] | |
noise_masks = [] | |
voice_masks = [] | |
start_frame += train_frames - train_skip | |
# Handle any remaining batches | |
if batch_data: | |
xd = np.array(batch_data) | |
xn = np.array(noise_masks).reshape(-1, 1) | |
xv = np.array(voice_masks).reshape(-1, 1) | |
yield xd, xn, xv |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# model.py | |
from keras.models import Model | |
from keras.layers import Input, Conv2D, MaxPooling2D, UpSampling2D, Add, Multiply | |
def create_model(input_shape): | |
# Main audio input | |
audio_input = Input(shape=input_shape, name="audio_input") | |
# Mask inputs | |
noise_mask_input = Input(shape=(1,), name="noise_mask_input") | |
voice_mask_input = Input(shape=(1,), name="voice_mask_input") | |
# Shared encoder layer | |
x = Conv2D(16, (1, 3), activation='relu', padding='same')(audio_input) | |
x = MaxPooling2D((1, 2), padding='same')(x) | |
# Noise branch | |
noise_path = Conv2D(8, (1, 3), activation='relu', padding='same')(x) | |
noise_path = MaxPooling2D((1, 2), padding='same')(noise_path) | |
noise_path = UpSampling2D((1, 2))(noise_path) | |
noise_path = UpSampling2D((1, 2))(noise_path) # Upsample back to original dimension | |
noise_output = Conv2D(1, (1, 3), activation='sigmoid', padding='same')(noise_path) | |
# Voice branch | |
voice_path = Conv2D(8, (1, 3), activation='relu', padding='same')(x) | |
voice_path = MaxPooling2D((1, 2), padding='same')(voice_path) | |
voice_path = UpSampling2D((1, 2))(voice_path) | |
voice_path = UpSampling2D((1, 2))(voice_path) # Upsample back to original dimension | |
voice_output = Conv2D(1, (1, 3), activation='sigmoid', padding='same')(voice_path) | |
# Apply masks | |
masked_noise_output = Multiply()([noise_output, noise_mask_input]) | |
masked_voice_output = Multiply()([voice_output, voice_mask_input]) | |
# Combine masked outputs | |
combined_output = Add()([masked_noise_output, masked_voice_output]) | |
#model = Model(inputs=[audio_input, noise_mask_input, voice_mask_input], outputs=[combined_output, noise_output, voice_output]) | |
model = Model(inputs=[audio_input, noise_mask_input, voice_mask_input], outputs=[combined_output]) | |
print(model.summary()) | |
return model |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# train.py | |
import os | |
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2' | |
import logging | |
logging.getLogger('tensorflow').setLevel(logging.ERROR) | |
import tensorflow as tf | |
from stg import Settings | |
from datagen import gen_audio_clips | |
from utils import save_model | |
from model import create_model | |
from keras.optimizers import Adam | |
import numpy as np | |
from keras.utils import Sequence | |
class AudioDataGenerator(tf.keras.utils.Sequence): | |
def __init__(self, *, audio_path, batch_size, steps_per_epoch, | |
epochs, audio_grabber_batch_size): | |
self.audio_path = audio_path | |
self.batch_size = batch_size | |
self.steps_per_epoch = steps_per_epoch | |
self.epochs = epochs | |
self.audio_grabber_batch_size = audio_grabber_batch_size | |
self.generator = self.create_generator() | |
def create_generator(self): | |
return gen_audio_clips(self.audio_path, | |
offset_s=0, | |
dur_s=Settings.lookahead_secs, | |
sr=Settings.sr, | |
chunklen_s=Settings.lookahead_secs, | |
noise_floor_perc=Settings.noise_floor_perc, | |
noise_frac=Settings.noise_frac, | |
train_frames=Settings.train_frames, | |
train_skip=Settings.train_skip, | |
batch_count=self.audio_grabber_batch_size) | |
def __len__(self): | |
return self.steps_per_epoch | |
def __getitem__real(self, index): # reserved for constant getitem test | |
bb = next(self.generator) | |
# aa=bb[0][0].squeeze(axis=-1) | |
aa=bb[0][0] | |
a2=bb[1][0][0] | |
a3=bb[2][0][0] | |
for i in range(0,3): print(bb[i].shape) | |
for i in aa, a2, a3: print(i.shape) | |
# import ipdb; ipdb.set_trace(context=16); pass | |
return (aa,a2,a3), aa # Return as ((inputs)[0], (output)) | |
def __getitem__fail(self, index): | |
# Create dummy data with the correct shape and type | |
batch_size = 1 # You can adjust this to the desired batch size | |
train_frames = Settings.train_frames # Assuming this is defined in your Settings | |
# Dummy audio data: shape (batch_size, 1, train_frames, 1) | |
audio_data = np.random.random((batch_size, 1, train_frames, 1)).astype(np.float32) | |
# Dummy noise and voice masks: shape (batch_size, 1) | |
noise_mask = np.random.random((batch_size, 1)).astype(np.float32) | |
voice_mask = np.random.random((batch_size, 1)).astype(np.float32) | |
# Dummy output (same shape as audio data) | |
output_data = np.random.random((batch_size, 1, train_frames, 1)).astype(np.float32) | |
# Return in the format required by the output_signature | |
return (audio_data, noise_mask, voice_mask), output_data | |
def __getitem__fail2(self, index): | |
# Specify the batch size and dimensions | |
batch_size = 1 # This should match the setup of your network and training configuration | |
train_frames = Settings.train_frames # The number of frames your model expects | |
# Generate dummy audio data | |
audio_data = np.random.rand(batch_size, 1, train_frames, 1).astype(np.float32) | |
# Generate dummy masks | |
noise_mask = np.random.rand(batch_size, 1).astype(np.float32) | |
voice_mask = np.random.rand(batch_size, 1).astype(np.float32) | |
# Generate dummy output data, matching the shape of audio_data | |
output_data = np.random.rand(batch_size, 1, train_frames, 1).astype(np.float32) | |
# Return packaged data as ((audio_data, noise_mask, voice_mask), output_data) | |
return ((audio_data, noise_mask, voice_mask), output_data) | |
def __getitem__(self, index): | |
return np.random.rand(2,3) | |
# Specify the batch size and dimensions | |
batch_size = 1 # This should match the setup of your network and training configuration | |
train_frames = Settings.train_frames # The number of frames your model expects | |
# Generate dummy audio data with shape (batch_size, 1, train_frames, 1) | |
audio_data = np.random.rand(batch_size, 1, train_frames, 1).astype(np.float32) | |
# Generate dummy masks with shape (batch_size, 1) | |
noise_mask = np.random.rand(batch_size, 1).astype(np.float32) | |
voice_mask = np.random.rand(batch_size, 1).astype(np.float32) | |
# Generate dummy output data, matching the shape of audio_data | |
output_data = np.random.rand(batch_size, 1, train_frames, 1).astype(np.float32) | |
# Properly pack the data into the expected tuple structure | |
input_tuple = (audio_data, noise_mask, voice_mask) | |
return_tuple = (input_tuple, output_data) | |
return (np.random.rand(2,3),) | |
return return_tuple | |
def on_epoch_end(self): | |
self.generator = self.create_generator() # Restart generator at the end of each epoch | |
# Create a TensorFlow dataset from the generator | |
def get_dataset(generator): | |
output_signature = ( | |
( | |
tf.TensorSpec(shape=(None, 1, Settings.train_frames, 1), dtype=tf.float32), # Audio data | |
tf.TensorSpec(shape=(None, 1), dtype=tf.float32), # Noise mask | |
tf.TensorSpec(shape=(None, 1), dtype=tf.float32) # Voice mask | |
), | |
tf.TensorSpec(shape=(None, 1, Settings.train_frames, 1), dtype=tf.float32), # Dummy for output | |
) | |
return tf.data.Dataset.from_generator(generator.__getitem__, | |
output_signature=output_signature, | |
args=(0,)) # Generator function, signature and initial argument | |
def train_model(audio_path, epochs=50, batch_size=10, steps_per_epoch=100): | |
model = create_model(input_shape=(1, Settings.train_frames, 1)) | |
model.compile(optimizer=Adam(), loss='mse') | |
generator = AudioDataGenerator( | |
audio_path=audio_path, | |
batch_size=batch_size, | |
steps_per_epoch=steps_per_epoch, | |
epochs=epochs, | |
audio_grabber_batch_size=1, | |
) | |
dataset = get_dataset(generator) | |
model.fit(dataset, epochs=epochs, steps_per_epoch=steps_per_epoch) | |
save_model(model, f'gen/model_final.keras') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment