|
import os |
|
import time |
|
import logging |
|
import openai |
|
import pysrt |
|
from typing import List, Dict |
|
from pydub import AudioSegment |
|
|
|
|
|
# Set up directories |
|
transcriptions_path = "audio_transcription" |
|
audio_path = "audio" |
|
|
|
# Set up Whisper resources |
|
supported_formats = {".m4a", ".mp3", ".webm", ".mp4", ".mpga", ".wav", ".mpeg"} |
|
|
|
# Set the chunk size for splitting large audio files (in milliseconds) |
|
chunk_size_ms = 10 * 60 * 1000 |
|
|
|
|
|
def transcribe_part(part: AudioSegment, audio_format: str) -> Dict[str, float]: |
|
part.export(f"temp.{audio_format}", format=audio_format) |
|
try: |
|
with open(f"temp.{audio_format}", "rb") as audio_file: |
|
response = openai.Audio.transcribe( |
|
"whisper-1", |
|
audio_file, |
|
None, |
|
None, |
|
None, |
|
None, |
|
None, |
|
response_format="verbose_json", |
|
language="de", |
|
) |
|
return response.to_dict() |
|
finally: |
|
os.remove(f"temp.{audio_format}") |
|
|
|
|
|
def transcribe_audio( |
|
audio_file_path, max_size=25 * 1024 * 1024 |
|
) -> List[Dict[str, float]]: |
|
# Calculate size in bytes |
|
audio_size = os.path.getsize(audio_file_path) |
|
# Get the audio file format |
|
audio_format = os.path.splitext(audio_file_path)[1][1:] |
|
# Store the API responses |
|
responses = [] |
|
# Use pydub to handle audio larger than max size |
|
if audio_size > max_size: |
|
song = AudioSegment.from_file(audio_file_path, format=audio_format) |
|
for i in range(0, len(song), chunk_size_ms): |
|
audio_part = song[i : i + chunk_size_ms] |
|
responses.append(transcribe_part(audio_part, audio_format=audio_format)) |
|
else: |
|
with open(audio_file_path, "rb") as audio_file: |
|
response = openai.Audio.transcribe( |
|
"whisper-1", |
|
audio_file, |
|
None, |
|
None, |
|
None, |
|
None, |
|
None, |
|
response_format="verbose_json", |
|
language="de", |
|
) |
|
responses.append(response.to_dict()) |
|
return responses |
|
|
|
|
|
def generate_srt(responses: List[Dict[str, float]], output_file: str): |
|
subs = pysrt.SubRipFile() |
|
index = 1 |
|
# Keep track of the duration of previous chunks |
|
offset = 0 |
|
for response in responses: |
|
for segment in response["segments"]: |
|
start_time = int((segment["start"] + offset) * 1000) |
|
end_time = int((segment["end"] + offset) * 1000) |
|
start_time = pysrt.SubRipTime(milliseconds=start_time) |
|
end_time = pysrt.SubRipTime(milliseconds=end_time) |
|
text = segment["text"] |
|
subs.append(pysrt.SubRipItem(index, start_time, end_time, text)) |
|
index += 1 |
|
# Update the offset with the duration of the current chunk |
|
offset += response["duration"] |
|
subs.save(output_file, encoding="utf-8") |
|
|
|
|
|
def main(): |
|
# Set up logger |
|
logging.basicConfig(filename="whisper.log", filemode="w", level=logging.DEBUG) |
|
|
|
# Run whisper on each audio file |
|
for file_name in os.listdir(audio_path): |
|
if not os.path.splitext(file_name)[1] in supported_formats: |
|
continue |
|
|
|
audio_file_path = f"{audio_path}/{file_name}" |
|
out_file_path = f"{transcriptions_path}/{file_name[:-4]}" |
|
|
|
if os.path.exists(out_file_path + ".txt"): |
|
print(f"Skipping file: {audio_file_path}") |
|
logging.info(f"Skipping file: {audio_file_path}") |
|
continue |
|
|
|
print(f"Processing file: {audio_file_path}") |
|
logging.info(f"Processing file: {audio_file_path}") |
|
start_time = time.time() |
|
|
|
responses = transcribe_audio(audio_file_path) |
|
|
|
if not responses: |
|
print(f"No transcription for file: {audio_file_path}") |
|
logging.info(f"No transcription for file: {audio_file_path}") |
|
else: |
|
with open(out_file_path + ".txt", "w") as f: |
|
f.write(" ".join([response["text"] for response in responses])) |
|
generate_srt(responses, out_file_path + ".srt") |
|
|
|
end_time = time.time() |
|
time_diff = end_time - start_time |
|
print(f"Time taken: {time_diff:.2f} seconds") |
|
logging.info(f"File processed: {audio_file_path}") |
|
logging.info(f"Time taken: {time_diff:.2f} seconds") |
|
|
|
|
|
if __name__ == "__main__": |
|
main() |