-
-
Save gorkemgoknar/367d36134c186b1939840fa5729a1c0a to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Example of how to synthesize speech using the Coqui Studio API. | |
Streams the download/playback of the audio. | |
Usage: | |
$ COQUI_API_TOKEN="put your API token here" python coqui_api_stream.py --text "Hi there!" | |
To specify the voice to use, pass eg: `--voice 98d4af7d-aca0-4a70-a26e-4ca59023a248` | |
To save the audio to a file after playback, pass `--save_dest audio.wav`. Audio will be | |
written to the specified file path. | |
To use the V1 model instead of XTTS, pass `--model v1`. | |
""" | |
import argparse | |
import shutil | |
import subprocess | |
import requests | |
import os | |
from typing import Iterator | |
import nltk # we'll use this to split into sentences | |
nltk.download("punkt", quiet=True) | |
def is_installed(lib_name: str) -> bool: | |
lib = shutil.which(lib_name) | |
if lib is None: | |
return False | |
return True | |
def play(audio: bytes) -> None: | |
if not is_installed("ffplay"): | |
message = ( | |
"ffplay from ffmpeg not found, necessary to play audio. " | |
"On mac you can install it with 'brew install ffmpeg'. " | |
"On linux and windows you can install it from https://ffmpeg.org/" | |
) | |
raise ValueError(message) | |
args = ["ffplay", "-autoexit", "-", "-nodisp"] | |
proc = subprocess.Popen( | |
args=args, | |
stdout=subprocess.PIPE, | |
stdin=subprocess.PIPE, | |
stderr=subprocess.PIPE, | |
) | |
out, err = proc.communicate(input=audio) | |
proc.poll() | |
def save(audio: bytes, filename: str) -> None: | |
with open(filename, "wb") as f: | |
f.write(audio) | |
def stream(audio_stream: Iterator[bytes]) -> bytes: | |
if not is_installed("mpv"): | |
message = ( | |
"mpv not found, necessary to stream audio. " | |
"On mac you can install it with 'brew install mpv'. " | |
"On linux and windows you can install it from https://mpv.io/" | |
) | |
raise ValueError(message) | |
mpv_command = ["mpv", "--no-cache", "--no-terminal", "--", "fd://0"] | |
mpv_process = subprocess.Popen( | |
mpv_command, | |
stdin=subprocess.PIPE, | |
stdout=subprocess.DEVNULL, | |
stderr=subprocess.DEVNULL, | |
) | |
audio = b"" | |
for chunk in audio_stream: | |
if chunk is not None: | |
mpv_process.stdin.write(chunk) # type: ignore | |
mpv_process.stdin.flush() # type: ignore | |
audio += chunk | |
if mpv_process.stdin: | |
mpv_process.stdin.close() | |
mpv_process.wait() | |
return audio | |
try: | |
COQUI_API_TOKEN = os.environ["COQUI_API_TOKEN"] | |
except KeyError: | |
raise RuntimeError("Set COQUI_API_TOKEN environment variable to your API key") | |
def tts( | |
text: str, voice_id: str, model: str = "xtts", language: str = "en" | |
) -> Iterator[bytes]: | |
if model == "xtts": | |
url = "https://app.coqui.ai/api/v2/samples/xtts/stream/" | |
else: | |
url = "https://app.coqui.ai/api/v2/samples?format=wav" | |
json_data = { | |
"text": text, | |
"voice_id": voice_id, | |
"language": language, | |
"speed": 1, | |
} | |
res = requests.post( | |
url, | |
json=json_data, | |
headers={"Authorization": f"Bearer {COQUI_API_TOKEN}"}, | |
) | |
if 400 == res.status_code: | |
message = f"ERROR:{res.status_code}:{res.reason}: {res.text}:" | |
raise ValueError(message) | |
elif 401 <= res.status_code < 500: | |
print(res.reason) | |
message = f"There was a error from response , please make sure you set correct COQUI_API_TOKEN or make sure Text is not too long, status code: {res.status_code}: {res.text}:" | |
raise ValueError(message) | |
if res.status_code >= 500: | |
message = ( | |
f"There seems to be error from server side status code: {res.status_code}" | |
) | |
raise ValueError(message) | |
for chunk in res.iter_content(chunk_size=2048): | |
if chunk: | |
yield chunk | |
if __name__ == "__main__": | |
parser = argparse.ArgumentParser() | |
group = parser.add_mutually_exclusive_group(required=True) | |
group.add_argument("--text", help="Text to synthesize") | |
group.add_argument("--text_file_name", help="Text filename to synthesize") | |
parser.add_argument("--language", help="Language , default 'en'", default="en") | |
parser.add_argument( | |
"--voice", | |
help="ID of the voice to use for synthesis", | |
default="98d4af7d-aca0-4a70-a26e-4ca59023a248", | |
) | |
parser.add_argument( | |
"--save_dest", help="Optional path to save audio file to, after playback." | |
) | |
parser.add_argument( | |
"--model", | |
help="Which model to use for synthesis (V1 or XTTS), XTTS default", | |
choices=["v1", "xtts"], | |
default="xtts", | |
) | |
args = parser.parse_args() | |
if args.text_file_name is not None: | |
print("Processing text file:", args.text_file_name) | |
# Simple splitting text file into arrays | |
with open(args.text_file_name, "r", encoding="utf8") as f: | |
text = f.read() | |
# remove empty lines | |
text = "\n".join([ll.rstrip() for ll in text.splitlines() if ll.strip()]) | |
# Split text into sentences | |
text = nltk.sent_tokenize(text.replace("\n", " ").strip()) | |
else: | |
text = args.text | |
voice_id = args.voice | |
save_dest = args.save_dest | |
if type(text) == list: | |
audio = b"" | |
for line in text: | |
print(line) | |
# Will speak each line then save the output | |
audio += stream(tts(line, voice_id, args.model, language=args.language)) | |
else: | |
audio = stream(tts(text, voice_id, args.model, language=args.language)) | |
if save_dest: | |
save(audio, save_dest) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment