|
from sv.project import SVProject |
|
from sv.sampler import SVBanks, SVBank |
|
from sv.utils import is_online |
|
from sv.utils.banks.s3 import init_s3_banks |
|
from sv.utils.export import export_wav |
|
from sv.utils.naming import random_name |
|
from sv.utils.slicing import slice_audio_segment_custom |
|
|
|
from pydub import AudioSegment |
|
from machines import SVMachinesPatch |
|
from parse import parse_line |
|
|
|
import boto3 |
|
import cmd |
|
import datetime |
|
import json |
|
import os |
|
import random |
|
import re |
|
import readline |
|
import yaml |
|
import zipfile |
|
|
|
def load_yaml(attr): |
|
return yaml.safe_load(open(f"{attr}.yaml").read()) |
|
|
|
Env = load_yaml("env") |
|
Machines = load_yaml("machines") |
|
Modules = load_yaml("modules") |
|
Terms = load_yaml("terms") |
|
|
|
AppName = "sv-euclid-beats" |
|
|
|
HistorySize = 100 |
|
|
|
""" |
|
- https://stackoverflow.com/questions/7331462/check-if-a-string-is-a-possible-abbrevation-for-a-name |
|
""" |
|
|
|
def is_abbrev(abbrev, text): |
|
abbrev = abbrev.lower() |
|
text = text.lower() |
|
words = text.split() |
|
if not abbrev: |
|
return True |
|
if abbrev and not text: |
|
return False |
|
if abbrev[0] != text[0]: |
|
return False |
|
else: |
|
return (is_abbrev(abbrev[1:],' '.join(words[1:])) or |
|
any(is_abbrev(abbrev[1:],text[i+1:]) |
|
for i in range(len(words[0])))) |
|
|
|
def timestamp(): |
|
return datetime.datetime.utcnow().strftime("%Y-%m-%d-%H-%M-%S") |
|
|
|
def render_patches(prefix): |
|
def decorator(fn): |
|
def wrapped(self, *args, **kwargs): |
|
self.project_name = random_name() |
|
self.file_name = "%s-%s-%s" % (timestamp(), |
|
prefix, |
|
self.project_name) |
|
print ("INFO: %s" % self.file_name) |
|
self.patches = fn(self, *args, **kwargs) |
|
self.dump_json() |
|
self.dump_sunvox() |
|
return wrapped |
|
return decorator |
|
|
|
def assert_project(fn): |
|
def wrapped(self, *args, **kwargs): |
|
if not self.patches: |
|
raise RuntimeError("no patches found") |
|
return fn(self, *args, **kwargs) |
|
return wrapped |
|
|
|
class SVEnvironment(dict): |
|
|
|
def __init__(self, item = {}): |
|
dict.__init__(self, item) |
|
|
|
def lookup(self, abbrev): |
|
matches = [] |
|
for key in self: |
|
if is_abbrev(abbrev, key): |
|
matches.append(key) |
|
if matches == []: |
|
raise RuntimeError("%s not found" % abbrev) |
|
elif len(matches) > 1: |
|
raise RuntimeError("multiple key matches for %s" % abbrev) |
|
return matches.pop() |
|
|
|
class SVBaseCli(cmd.Cmd): |
|
|
|
prompt = ">>> " |
|
|
|
def __init__(self, |
|
s3, |
|
bucket_name, |
|
env, |
|
modules, |
|
history_size = HistorySize): |
|
cmd.Cmd.__init__(self) |
|
self.s3 = s3 |
|
self.bucket_name = bucket_name |
|
self.out_dir = "tmp" |
|
self.init_sub_dirs() |
|
self.modules = modules |
|
self.env = SVEnvironment(env) |
|
self.patches = None |
|
self.project_name = None |
|
self.file_name = None |
|
self.history_file = os.path.expanduser("%s/.clihistory" % self.out_dir) |
|
self.history_size = history_size |
|
|
|
def init_sub_dirs(self, sub_dirs = ["json", "sunvox", "stems"]): |
|
for sub_dir in sub_dirs: |
|
path = "%s/%s" % (self.out_dir, sub_dir) |
|
if not os.path.exists(path): |
|
os.makedirs(path) |
|
|
|
def preloop(self): |
|
if os.path.exists(self.history_file): |
|
readline.read_history_file(self.history_file) |
|
|
|
def dump_json(self): |
|
file_name = "%s/json/%s.json" % (self.out_dir, |
|
self.file_name) |
|
struct = {"patches": self.patches} |
|
with open(file_name, 'w') as f: |
|
f.write(json.dumps(struct, |
|
indent = 2)) |
|
|
|
def dump_sunvox(self): |
|
file_name = "%s/sunvox/%s.sunvox" % (self.out_dir, |
|
self.file_name) |
|
with open(file_name, 'wb') as f: |
|
rendered_patches = [patch.render(n_ticks = self.env["nticks"], |
|
density = self.env["density"], |
|
temperature = self.env["temperature"]) |
|
for patch in self.patches] |
|
project = SVProject().render_project(patches = rendered_patches, |
|
modules = self.modules, |
|
banks = self.banks, |
|
bpm = self.env["bpm"]) |
|
project.write_to(f) |
|
|
|
@parse_line() |
|
def do_show_params(self): |
|
for key in sorted(self.env.keys()): |
|
print ("%s: %s" % (key, self.env[key])) |
|
|
|
@parse_line(config = [{"name": "frag", |
|
"type": "str"}, |
|
{"name": "value", |
|
"type": "number"}]) |
|
def do_set_param(self, frag, value): |
|
key = self.env.lookup(frag) |
|
if key: |
|
self.env[key] = value |
|
print ("INFO: %s=%s" % (key, self.env[key])) |
|
else: |
|
print ("WARNING: %s not found" % frag) |
|
|
|
@parse_line() |
|
def do_list_projects(self): |
|
for file_name in sorted(os.listdir(self.out_dir + "/json")): |
|
print (file_name.split(".")[0]) |
|
|
|
@parse_line(config = [{"name": "stem", |
|
"type": "str"}]) |
|
def do_load_project(self, stem): |
|
matches = [file_name |
|
for file_name in sorted(os.listdir(self.out_dir + "/json")) |
|
if stem in file_name] |
|
if matches == []: |
|
print ("WARNING: no matches") |
|
elif len(matches) == 1: |
|
self.file_name = matches.pop().split(".")[0] |
|
self.project_name = "-".join(self.file_name.split("-")[-2:]) |
|
print ("INFO: %s" % self.project_name) |
|
abspath = "%s/json/%s.json" % (self.out_dir, self.file_name) |
|
struct = json.loads(open(abspath).read()) |
|
self.patches = [SVMachinesPatch(**patch) |
|
for patch in struct["patches"]] |
|
else: |
|
print ("WARNING: multiple matches") |
|
|
|
@parse_line() |
|
def do_clean_projects(self, sub_dirs = ["json", "sunvox", "stems"]): |
|
for sub_dir in sub_dirs: |
|
os.system("rm -rf %s/%s" % (self.out_dir, sub_dir)) |
|
self.init_sub_dirs() |
|
|
|
def do_exit(self, _): |
|
return self.do_quit(None) |
|
|
|
def do_quit(self, _): |
|
print ("INFO: exiting") |
|
return True |
|
|
|
def postloop(self): |
|
readline.set_history_length(self.history_size) |
|
readline.write_history_file(self.history_file) |
|
|
|
class SVBankCli(SVBaseCli): |
|
|
|
def __init__(self, |
|
banks, |
|
pool, |
|
mapping, |
|
*args, |
|
**kwargs): |
|
SVBaseCli.__init__(self, *args, **kwargs) |
|
self.banks = banks |
|
self.pool = pool |
|
self.mapping = mapping |
|
|
|
@parse_line() |
|
def do_show_tags(self): |
|
print (yaml.safe_dump(self.pool.tags, |
|
default_flow_style = False)) |
|
|
|
@parse_line() |
|
def do_show_mapping(self): |
|
for key in sorted(self.mapping.keys()): |
|
print ("%s: %s" % (key, self.mapping[key])) |
|
|
|
@parse_line() |
|
def do_randomise_mapping(self): |
|
tags = list(self.pool.tags.keys()) |
|
for key in sorted(self.mapping.keys()): |
|
self.mapping[key] = random.choice(tags) |
|
print ("%s: %s" % (key, self.mapping[key])) |
|
|
|
@parse_line(config = [{"name": "key", |
|
"type": "str"}, |
|
{"name": "value", |
|
"type": "str"}]) |
|
def do_set_mapping(self, key, value): |
|
if key not in self.mapping: |
|
raise RuntimeError("%s not found in mapping" % key) |
|
tags = list(self.pool.tags.keys()) |
|
if value not in tags: |
|
raise RuntimeError("%s not found in tags" % value) |
|
self.mapping[key] = value |
|
|
|
class SVCli(SVBankCli): |
|
|
|
intro = f"Welcome to the {AppName} CLI ;)" |
|
|
|
def __init__(self, |
|
machines, |
|
*args, |
|
**kwargs): |
|
self.machines = machines |
|
mapping = {machine["tag"]: machine["default"] |
|
for machine in machines |
|
if "tag" in machine} |
|
SVBankCli.__init__(self, mapping = mapping, *args, **kwargs) |
|
|
|
@parse_line() |
|
@render_patches(prefix = "random") |
|
def do_randomise_patches(self): |
|
patches = [] |
|
for i in range(self.env["npatches"]): |
|
patch = SVMachinesPatch.randomise(machines = self.machines, |
|
pool = self.pool, |
|
mapping = self.mapping) |
|
patches.append(patch) |
|
return patches |
|
|
|
""" |
|
mutating samples within patch creates too much noise |
|
""" |
|
|
|
@parse_line(config =[{"name": "i", |
|
"type": "int"}]) |
|
@assert_project |
|
@render_patches(prefix = "mutation") |
|
def do_mutate_patch(self, i, attrs = "level|volume|trig|pattern".split("|")): |
|
root = self.patches[i % len(self.patches)] |
|
patches = [root] |
|
for i in range(self.env["npatches"]-1): |
|
patch = root.clone() |
|
for machine in patch["machines"]: |
|
for attr in attrs: |
|
if attr in machine["seeds"]: |
|
machine["seeds"][attr] = int(1e8*random.random()) |
|
patches.append(patch) |
|
return patches |
|
|
|
""" |
|
Expectation is that Transfer Tool ignores the name of the zipfile and just uses the paths therein contained, when creating the internal Digitakt folder structure |
|
""" |
|
|
|
""" |
|
This is all far far too complex but is really due to the hard coupling of three samplers and a sequencer together |
|
Should be much easier when each instrument is independent |
|
In particular having to mute stuff rather than just not render stuff feels like a terrible hard coded code smell |
|
""" |
|
|
|
@parse_line() |
|
@assert_project |
|
def do_export_stems(self, fade = 3): |
|
class Sequencers(dict): |
|
def __init__(self, sequencers): |
|
dict.__init__(self, {sequencer["tag"]:sequencer["name"] |
|
for sequencer in sequencers}) |
|
def mutes(self, solo_key): |
|
return [self[key] for key in self |
|
if key != solo_key] |
|
class Items(list): |
|
def __init__(self, sequencers, patches, env): |
|
list.__init__(self, []) |
|
for solo_key in sequencers: |
|
mutes = sequencers.mutes(solo_key) |
|
for i, patch in enumerate(patches): |
|
index = "{:02}".format(i+1) |
|
# patch_key = f"{solo_key}/{index}" |
|
patch_key = f"{solo_key}-{index}" |
|
rendered = patch.render(n_ticks = env["nticks"], |
|
density = env["density"], |
|
temperature = env["temperature"], |
|
mutes = mutes) |
|
self.append((patch_key, rendered)) |
|
@property |
|
def keys(self): |
|
return [item[0] for item in self] |
|
@property |
|
def values(self): |
|
return [item[1] for item in self] |
|
def init_zip_items(audio_io, items): |
|
block_sz = int(len(audio_io) / len(items)) |
|
chunk_sz = int(block_sz / 3) # wash, patch, break |
|
zip_items = [] |
|
for i, patch_key in enumerate(items.keys): |
|
for chunk_type, chunk_offset in [("cln", 0), |
|
("drt", chunk_sz)]: |
|
start_time = i * block_sz + chunk_offset |
|
end_time = start_time + chunk_sz |
|
# zip_path = f"{AppName}-{self.env['bpm']}/{self.project_name}/{chunk_type}/{patch_key}.wav" |
|
zip_path = f"{chunk_type}-{patch_key}-{self.project_name}.wav" |
|
zip_item = {"zip_path": zip_path, |
|
"start_time": start_time, |
|
"end_time": end_time} |
|
zip_items.append(zip_item) |
|
return zip_items |
|
print ("INFO: generating patches") |
|
sequencers = Sequencers([machine for machine in self.machines |
|
if "tag" in machine]) # because only sequencers have tags |
|
items = Items(sequencers = sequencers, |
|
patches = self.patches, |
|
env = self.env) |
|
print ("INFO: rendering project") |
|
project = SVProject().render_project(patches = items.values, |
|
modules = self.modules, |
|
banks = self.banks, |
|
bpm = self.env["bpm"], |
|
wash = True, |
|
breaks = True) |
|
print ("INFO: generating wav and slicing stems") |
|
wav_io = export_wav(project=project) |
|
audio_io = AudioSegment.from_file(wav_io, format = "wav") |
|
zip_items = init_zip_items(audio_io = audio_io, |
|
items = items) |
|
zip_buffer = slice_audio_segment_custom(audio_io = audio_io, |
|
zip_items = zip_items) |
|
file_name = f"{self.out_dir}/stems/{self.file_name}.zip" |
|
with open(file_name, 'wb') as f: |
|
f.write(zip_buffer.getvalue()) |
|
|
|
def load_banks(cache_dir = "tmp/banks"): |
|
banks = [] |
|
for file_name in os.listdir(cache_dir): |
|
zip_path = f"{cache_dir}/{file_name}" |
|
bank = SVBank.load_zipfile(zip_path) |
|
banks.append(bank) |
|
return banks |
|
|
|
def save_banks(banks, cache_dir = "tmp/banks"): |
|
if not os.path.exists(cache_dir): |
|
os.mkdir(cache_dir) |
|
for bank in banks: |
|
bank.dump_zipfile(cache_dir) |
|
|
|
if __name__ == "__main__": |
|
try: |
|
bucket_name = os.environ["ASSETS_BUCKET"] |
|
if bucket_name in ["", None]: |
|
raise RuntimeError("ASSETS_BUCKET does not exist") |
|
s3 = boto3.client("s3") |
|
if os.path.exists("tmp/banks"): |
|
print ("INFO: loading banks from local cache") |
|
banks = load_banks() |
|
elif is_online(): |
|
print ("INFO: loading banks from S3") |
|
banks = init_s3_banks(s3, bucket_name) |
|
print ("INFO: caching banks locally") |
|
save_banks(banks) |
|
else: |
|
raise RuntimeError("no cached banks and not online, sorry") |
|
pool, _ = SVBanks(banks).spawn_pool(tag_mapping = Terms) |
|
SVCli(s3 = s3, |
|
machines = Machines, |
|
bucket_name = bucket_name, |
|
env = Env, |
|
modules = Modules, |
|
banks = banks, |
|
pool = pool).cmdloop() |
|
except RuntimeError as error: |
|
print ("ERROR: %s" % str(error)) |