Last active
April 18, 2018 23:11
-
-
Save cjw85/8c3d2421926ca703ae43441b8cd97207 to your computer and use it in GitHub Desktop.
Simulate basecalls via simulated squiggles. This is not a serious attempt to model correctly error profiles in basecalls.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Requirements: pip install numpy, pysam, scrappie | |
# | |
# usage: simulate_calls.py [-h] [--mu MU] [--sigma SIGMA] [--noise NOISE] | |
# [--threads THREADS] | |
# fasta ncalls | |
# | |
# Simulate basecalls with scrappy. | |
# | |
# positional arguments: | |
# fasta Source sequence file. | |
# ncalls Number of basecalls to produce. | |
# | |
# optional arguments: | |
# -h, --help show this help message and exit | |
# --mu MU mean fragment length. | |
# --sigma SIGMA stdv fragment length. | |
# --noise NOISE Additional Gaussian noise on signal. | |
# --threads THREADS number of worker threads. | |
import argparse | |
from concurrent.futures import ThreadPoolExecutor | |
import functools | |
import itertools | |
import logging | |
import numpy as np | |
from pysam import FastaFile | |
import scrappy | |
from matplotlib import pyplot as plt | |
comp = { | |
'A': 'T', 'T': 'A', 'C': 'G', 'G': 'C', 'X': 'X', 'N': 'N', | |
'a': 't', 't': 'a', 'c': 'g', 'g': 'c', 'x': 'x', 'n': 'n', | |
#'-': '-' | |
} | |
comp_trans = str.maketrans(''.join(comp.keys()), ''.join(comp.values())) | |
def reverse_complement(seq): | |
"""Reverse complement sequence. | |
:param: input sequence string. | |
:returns: reverse-complemented string. | |
""" | |
return seq.translate(comp_trans)[::-1] | |
def shotgun_library(fasta_file, mu, sigma, direction=(1,-1)): | |
"""Generate random fragment sequences of a given input sequence | |
:param seq: input sequence. | |
:param mu: mean fragment length. | |
:param sigma: stdv of fragment length. | |
:param direction: tuple represention direction of output sequences with | |
respect to the input sequence. | |
:yields: sequence fragments. | |
.. note:: Could be made more efficient using buffers for random samples | |
and handling cases separately. | |
""" | |
fasta = FastaFile(fasta_file) | |
seq_lens = [fasta.get_reference_length(x) for x in fasta.references] | |
total_len = sum(seq_lens) | |
seq_probs = [x / total_len for x in seq_lens] | |
# FastaFile.fetch is proper slow, just read everything | |
refs = fasta.references | |
fasta = {k:fasta.fetch(k) for k in refs} | |
def random_buffer(probs, size=10000): | |
while True: | |
buf = [] | |
for x, n in zip(range(len(probs)), np.random.multinomial(size, probs)): | |
buf.extend([x]*n) | |
np.random.shuffle(buf) | |
for x in buf: | |
yield x | |
seq_chooser = random_buffer(seq_probs) | |
# parameters for lognormal | |
mean = np.log(mu / np.sqrt(1 + sigma**2 / mu**2)) | |
stdv = np.sqrt(np.log(1 + sigma**2 / mu**2)) | |
while True: | |
# choose a seq based on length | |
seq_i = next(seq_chooser) | |
seq = fasta[refs[seq_i]] | |
seq_len = seq_lens[seq_i] | |
start = np.random.randint(0, seq_len) | |
frag_length = int(np.random.lognormal(mean, stdv)) | |
move = np.random.choice(direction) | |
end = max(0, start + move*frag_length) | |
start, end = sorted([start, end]) | |
if end - start < 2: | |
# Expand a bit to ensure we grab at least one base. | |
start = max(0, start - 1) | |
end += 1 | |
frag_seq = seq[start:end] | |
if move == -1: | |
frag_seq = reverse_complement(frag_seq) | |
yield frag_seq, refs[seq_i], start, end, '+' if move == 1 else '-' | |
def worker(args, noise=None): | |
model='rgrgr_r94' | |
seq, ref, start, end, strand = args | |
squiggle = scrappy.sequence_to_squiggle( | |
seq, rescale=True).data(as_numpy=True, sloika=False) | |
n = 1/np.sqrt(2) | |
raw_data = np.concatenate([ | |
np.random.laplace(mean, n*stdv, int(dwell)) | |
for mean, stdv, dwell in squiggle | |
]) | |
if noise is not None: | |
raw_data += np.random.normal(scale=noise, size=len(raw_data)) | |
raw = scrappy.RawTable(raw_data) | |
raw.scale() | |
post = scrappy.calc_post(raw, model, log=True) | |
call, score, _ = scrappy.decode_post(post, model) | |
return '>call_{}:{}-{}({}) seq_len={} call_len={} score={}\n{}'.format( | |
ref, start, end, strand, len(seq), len(call), score, call) | |
def main(): | |
logging.basicConfig(format='[%(asctime)s - %(name)s] %(message)s', datefmt='%H:%M:%S', level=logging.INFO) | |
parser = argparse.ArgumentParser(description='Simulate basecalls with scrappy.') | |
parser.add_argument('fasta', help='Source sequence file.') | |
parser.add_argument('ncalls', type=int, help='Number of basecalls to produce.') | |
parser.add_argument('--mu', type=float, default=8000, help='mean fragment length.') | |
parser.add_argument('--sigma', type=float, default=1000, help='stdv fragment length.') | |
parser.add_argument('--noise', type=float, default=0.06, help='Additional Gaussian noise on signal.') | |
parser.add_argument('--threads', type=int, default=None, help='number of worker threads.') | |
args = parser.parse_args() | |
regions = itertools.islice(shotgun_library( | |
args.fasta, args.mu, args.sigma, direction=(1,-1) | |
), args.ncalls) | |
_worker = functools.partial(worker, noise=args.noise) | |
with ThreadPoolExecutor(max_workers=args.threads) as executor: | |
for fasta in executor.map(_worker, regions): | |
print(fasta) | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment