Created
December 11, 2022 17:02
-
-
Save Pendrokar/e9eed003eae551f38223de54e9f9011f to your computer and use it in GitHub Desktop.
Reads the latest log file of Infinity: Battlescape and uses xVASynth as TTS
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import sys | |
import queue | |
import winsound | |
import urllib | |
import requests | |
import os | |
from os.path import exists | |
import asyncio | |
import random | |
import time | |
import logging | |
import hashlib | |
# find latest logfile | |
paths = [ | |
"%s" % (f) for t, f in | |
sorted([(os.path.getctime(x),x) for x in os.listdir(".")], reverse=True) | |
] | |
# grab first log | |
logfilepath = paths[0] | |
# clear var | |
outputPath = 'C:/Program Files (x86)/Steam/steamapps/common/xVASynth/resources/app/output/masseffect/' | |
outputPathTwo = 'C:\\Program Files (x86)\\Steam\\steamapps\\common\\xVASynth\\resources\\app\\output\\masseffect\\' | |
paths = [] | |
print('Watching logfile: '+ logfilepath) | |
# TODO: async playback and audio generation | |
playbackQueue = queue.Queue() | |
url = 'http://localhost:8008/synthesize' | |
def set_tali_synth_voice(): | |
print('Loading Mass Effect Tali model') | |
post_url = "http://localhost:8008/loadModel" | |
requests.post(post_url, '{"outputs":"None","model":"C:/Program Files (x86)/Steam/steamapps/common/xVASynth/resources/app/models/masseffect/me_tali","modelType":"FastPitch1.1","version":"1.4","model_speakers":2,"base_lang":"en","pluginsContext":"{}"}') | |
def set_edi_synth_voice(): | |
print('Loading Mass Effect EDI model') | |
post_url = "http://localhost:8008/loadModel" | |
try: | |
requests.post(post_url, '{"outputs":"None","model":"C:/Program Files (x86)/Steam/steamapps/common/xVASynth/resources/app/models/masseffect/me_edi","modelType":"FastPitch1.1","version":"2.0","base_lang":"en","pluginsContext":"{}"}') | |
except: | |
print('Failed to receive a response') | |
exit(0) | |
# set_tali_synth_voice() | |
set_edi_synth_voice() | |
print('Model loaded') | |
class LogDictation(object): | |
# last line before parsing it | |
lastLine = '' | |
# lastVoiceLine - to compare and not say the same voiceline | |
lastVoiceLine = '' | |
# text of last missing line | |
lastMissingLine = '' | |
def __init__(self): | |
self._cached_stamp = 0 | |
self.filename = logfilepath | |
async def ook(self): | |
stamp = os.stat(self.filename).st_mtime | |
if stamp != self._cached_stamp: | |
self._cached_stamp = stamp | |
# File has changed, so do something... | |
print('File has changed') | |
# print('Last Line:'+ self.lastLine) | |
hasPassedLastLine = False | |
# missed lines since last log parse | |
missedLines = [] | |
veryLastLine = '' | |
logfile = open(logfilepath, mode="r", encoding="utf-8") | |
for line in logfile: | |
veryLastLine = line | |
# print(line +'='+ self.lastLine) | |
# TODO: collect missed lines and prepare a collected statement | |
if self.lastLine != '' and line == self.lastLine: | |
# print('passed last line') | |
hasPassedLastLine = True | |
# now start the missing lines, so skip this one | |
continue | |
if hasPassedLastLine: | |
# print('adding new line') | |
missedLines.append(line) | |
logfile.close() | |
self.lastLine = veryLastLine | |
# print('New last Line:'+ self.lastLine) | |
voiceLines = '' | |
for line in missedLines: | |
voiceLine = self.parseLine(line) | |
if voiceLine != '': | |
print(line) | |
# having a period at the end helps TTS voices | |
voiceLines += voiceLine + '. ' | |
else: | |
continue | |
# skip if too long; TODO: separate conjoined lines into multiple files | |
if len(voiceLine) > 200: | |
print('Voice line too long. Skipping...') | |
return | |
# add already generated audio lines to playback queue | |
self.addGenerated(voiceLine) | |
# playQueuedAudio = asyncio.ensure_future(play_audio()) | |
# asyncio.gather(playQueuedAudio) | |
for line in missedLines: | |
voiceLine = self.parseLine(line) | |
if voiceLine != '': | |
print(line) | |
# having a period at the end helps TTS voices | |
voiceLines += voiceLine + '. ' | |
else: | |
continue | |
# skip if too long; TODO: separate conjoined lines into multiple files | |
if len(voiceLine) > 200: | |
print('Voice line too long. Skipping...') | |
return | |
# add ungenerated audio lines to playback queue | |
self.tts(voiceLine) | |
# print(voiceLines) | |
# if voiceLines.strip() != '': | |
# # skip if too long; TODO: separate conjoined lines into multiple files | |
# if len(voiceLine) > 200: | |
# print('Voice line too long. Skipping...') | |
# return | |
# # add already generated audio lines to playback queue | |
# self.addGenerated(voiceLines) | |
# # add ungenerated audio lines to playback queue | |
# self.tts(voiceLines) | |
def parseLine(self, line): | |
isTextNotification = False | |
if ( | |
line.find('credits from team effort pool') > 0 | |
or line.find('CREDITS:') > 0 | |
or line.find('FRIENDLY FIRE') > 0 | |
or line.find('XP') > 0 | |
or line.find('OBJECTIVE SCORE:') > 0 | |
): | |
print('Repetitive line') | |
return '' | |
index = line.find('Localized') | |
# find text within line | |
voiceLineStart = line.find('(\'') + 2 | |
voiceLineEnd = line.find('\')') | |
if (index >= 0): | |
line = line[voiceLineStart:voiceLineEnd] | |
isTextNotification = True | |
index = line.find('Text notification') | |
voiceLineStart = line.find('\'') + 1 | |
voiceLineEnd = len(line) - 1 | |
if (index >= 0): | |
# check for empty text: Battlescape bug | |
# TODO: make it not be the reason for blocking speech | |
if (line.find(": ''") > 0): | |
print('Empty notification') | |
return '' | |
if (line.find("Battle will start in") > 0): | |
# TODO: make unrepettitive | |
print('Repetitive line') | |
return '' | |
if (line.find("Victory odds") > 0): | |
# TODO: make unrepettitive | |
print('Repetitive line') | |
return '' | |
# print('Shortened notification') | |
# line = line[line.find("Victory odds:"):voiceLineEnd] | |
isTextNotification = True | |
line = line[voiceLineStart:voiceLineEnd] | |
if (self.lastMissingLine == line): | |
# skip same voice line | |
print('Skip same voiceline') | |
return '' | |
lastMissingLine = line | |
# if (self.lastVoiceLine == voiceLine): | |
# # skip same voice line | |
# print('Skipping same voiceline') | |
# return | |
voiceLine = line.replace('Hauler HAULER', 'Hauler') | |
voiceLine = line.replace(': ', ', ') | |
# TODO: apply angry emotion (speedup) | |
index = voiceLine.find('Griefing !') | |
if (index >= 0): | |
voiceLine = 'Griefer!' | |
return voiceLine | |
index = voiceLine.find('Bounty on') | |
if (index >= 0): | |
return '' | |
# voiceLine = voiceLine.replace('Bounty on', 'Bounty on,') | |
# voiceLine = voiceLine.replace('has been claimed', ' has been claimed') | |
# voiceLine = self.convertRankHuman(voiceLine) | |
# return voiceLine | |
index = voiceLine.find('YOU KILLED') | |
if (index >= 0): | |
# voiceLine = voiceLine.replace('YOU KILLED', '') | |
# voiceLine = self.convertRankHuman(voiceLine) | |
voiceLine = 'Target' | |
# cleanup percentage | |
parenthesisStart = voiceLine.find('(') | |
if parenthesisStart >= 0: | |
voiceLine = voiceLine[0:parenthesisStart] | |
voiceLine += self.getDeathLine() | |
# voiceLine += ' is destroyed' | |
# hmm capital ships aren't pilots | |
# voiceLine += ' is K I Ay ' | |
return voiceLine | |
index = voiceLine.find('KILL ASSIST') | |
if (index >= 0): | |
return '' | |
# voiceLine = voiceLine.replace('Kill assist', 'Kill assist on') | |
# voiceLine = self.convertRankHuman(voiceLine) | |
# # cleanup percentage | |
# parenthesisStart = voiceLine.find('%))') - 3 | |
# if parenthesisStart >= 0: | |
# voiceLine = voiceLine[0:parenthesisStart] | |
# return voiceLine | |
# Subsystems: | |
# self | |
index = voiceLine.find('Critical hit!') | |
if (index >= 0): | |
voiceLine = voiceLine.replace('Critical hit!', '') | |
voiceLine = voiceLine.replace('destroyed', 'damaged') | |
return voiceLine | |
# target | |
index = voiceLine.find('Critical hit on target') | |
if (index >= 0): | |
voiceLine = voiceLine.replace('Critical hit on target', '') | |
voiceLine = 'Target\'s '+ voiceLine[1:] | |
return voiceLine | |
if isTextNotification: | |
return voiceLine | |
else: | |
return '' | |
def addGenerated(self, voiceLine): | |
mdFive = hashlib.md5('EDI:'.encode('utf-8') + voiceLine.encode('utf-8')).hexdigest() | |
mdFilePath = outputPathTwo + mdFive +'.wav' | |
# if audio file exists, add it to the playback queue | |
if exists(mdFilePath): | |
playbackQueue.put(mdFilePath) | |
print('Queued existing: '+ voiceLine) | |
return | |
def tts(self, voiceLine): | |
mdFive = hashlib.md5('EDI:'.encode('utf-8') + voiceLine.encode('utf-8')).hexdigest() | |
mdFilePath = outputPath + mdFive +'.wav' | |
# if audio file exists, add it to the playback queue | |
if exists(mdFilePath): | |
return | |
print('Converting voiceline... (Web Request): '+ voiceLine) | |
# print('Web Request: '+ '{"sequence":"'+ voiceLine +'","pitch":[],"duration":[],"energy":[],"pace":1,"modelType":"FastPitch1.1","outfile":"' + outputPath + mdFive +'.wav","pluginsContext":"[]","vocoder":""}') | |
try: | |
requests.post(url, '{"sequence":"'+ voiceLine +'","pitch":[],"duration":[],"energy":[],"pace":1,"modelType":"FastPitch1.1","outfile":"' + outputPath + mdFive +'.wav","pluginsContext":"[]","vocoder":""}') | |
self.lastVoiceLine = voiceLine | |
playbackQueue.put(mdFilePath) | |
print('Queued: '+ voiceLine) | |
except: | |
print('Failed to receive a response from server ') | |
# remove rank abbrevations | |
def removeRankHuman(self, line): | |
line = line.replace('RCT', '') | |
line = line.replace('PVT', '') | |
line = line.replace('PV1', '') | |
# more important ranks, extra pause | |
line = line.replace('GRD', '') | |
line = line.replace('PO3', '') | |
line = line.replace('PO2', '') | |
line = line.replace('PO1', '') | |
line = line.replace('CPO', '') | |
line = line.replace('SPO', '') | |
line = line.replace('MP0', '') | |
line = line.replace('ENS', '') | |
line = line.replace('LTJ', '') | |
line = line.replace('LTN', '') | |
line = line.replace('LTC', '') | |
# most important ranks, exclamation | |
line = line.replace('CMD', '') | |
line = line.replace('CPT', '') | |
line = line.replace('CDR', '') | |
line = line.replace('RADM', '') | |
line = line.replace('VADM', '') | |
line = line.replace('ADM', '') | |
line = line.replace('FADM', '') | |
return line | |
# Make rank abbrevations human readable | |
def convertRankHuman(self, line): | |
line = line.replace('RCT', 'Recruit') | |
line = line.replace('PVT', 'Private') | |
line = line.replace('PV1', 'Private First Class') | |
# more important ranks, extra pause | |
line = line.replace('GRD', 'Guardian,') | |
line = line.replace('PO3', 'Petty Officer 3rd Class,') | |
line = line.replace('PO2', 'Petty Officer 2nd Class,') | |
line = line.replace('PO1', 'Petty Officer 1st Class,') | |
line = line.replace('CPO', 'Chief Petty Officer,') | |
line = line.replace('SPO', 'Senior Chief Petty Officer,') | |
line = line.replace('MP0', 'Master Chief Petty Officer,') | |
line = line.replace('ENS', 'Ensign,') | |
line = line.replace('LTJ', 'Lieutenant Junior,') | |
line = line.replace('LTN', 'Lieutenant,') | |
line = line.replace('LTC', 'Lieutenant Commander,') | |
# most important ranks, exclamation | |
line = line.replace('CMD', 'Commander!') | |
line = line.replace('CPT', 'Captain!') | |
line = line.replace('CDR', 'Commodore!') | |
line = line.replace('RADM', 'Rear Admiral!') | |
line = line.replace('VADM', 'Vice Admiral!') | |
line = line.replace('ADM', 'Admiral!') | |
line = line.replace('FADM', 'Fleet Admiral!') | |
return line | |
# get a random deathline | |
def getDeathLine(self): | |
return random.choice([ | |
' is destroyed', | |
' is annihilated', | |
' is eradicated', | |
' is vaporized', | |
' is obliterated', | |
' is gone', | |
' is toast', | |
' is disintegrated', | |
' is now dust', | |
]) | |
logdictation = LogDictation() | |
# Define a function that plays the audio files in the queue | |
async def play_audio(): | |
# Iterate over the playbackQueue in the queue | |
while not playbackQueue.empty(): | |
filePath = playbackQueue.get(); | |
print('Playing file from queue.') | |
# print('Playing file from queue: '+ filePath) | |
# Play the audio file | |
# try: | |
winsound.PlaySound(filePath, winsound.SND_FILENAME) | |
# except: | |
# print('Could not play') | |
asyncio.sleep(1) | |
async def main(): | |
while True: | |
asyncio.sleep(1) | |
# await logdictation.ook() | |
# await play_audio() | |
# Create a task for each function and run them concurrently | |
playQueuedAudio = asyncio.ensure_future(play_audio()) | |
parseText = asyncio.ensure_future(logdictation.ook()) | |
# # Wait for playQueuedAudio to complete using asyncio.wait() | |
# done, pending = await asyncio.wait({parseText, playQueuedAudio}, return_when=asyncio.FIRST_COMPLETED) | |
# # Iterate over the completed tasks in the order that they completed | |
# for task in asyncio.as_completed({parseText, playQueuedAudio}): | |
# asyncio.sleep(1) | |
# print(f"Task {task} completed") | |
await asyncio.gather( | |
parseText, | |
playQueuedAudio | |
) | |
# play_audio() | |
# await logdictation.ook() | |
# await asyncio.gather( | |
# parseText | |
# ) | |
# play_audio() | |
# Get the event loop | |
loop = asyncio.get_event_loop() | |
# Run the main function until it is complete | |
loop.run_until_complete(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Demo - https://www.youtube.com/watch?v=qffHyhdwPfU