from gwf import Workflow
import sys, os, re
from collections import defaultdict
from pathlib import Path
import pandas as pd
from gwf import Workflow, AnonymousTarget
from gwf.workflow import collect
gwf = Workflow(defaults={'account': 'ari-intern'})
# directories
out_dir = '/home/ari/ari-intern/people/ari/ariadna-intern/steps'
data_big = '/faststorage/project/ari-intern/data'
script_dir = '/faststorage/project/ari-intern/people/ari/ariadna-intern/scripts'
data_dir = '/faststorage/project/ari-intern/people/ari/ariadna-intern/steps/1000genome'
# function that modifies file path
def modify_path(p, parent=None, base=None, suffix=None):
par, name = os.path.split(p)
name_no_suffix, suf = os.path.splitext(name)
if type(suffix) is str:
suf = suffix
if parent is not None:
par = parent
if base is not None:
name_no_suffix = base
new_path = os.path.join(par, name_no_suffix + suf)
if type(suffix) is tuple:
assert len(suffix) == 2
new_path, nsubs = re.subn(r'{}$'.format(suffix[0]), suffix[1], new_path)
assert nsubs == 1, nsubs
return new_path
# function to combine 2 target outputs as an input
def combine(*args, only=None):
assert all(len(args[0]) == len(args[i]) for i in range(len(args)))
combined = []
for j in range(len(args[0])):
output_group = {}
for i in range(len(args)):
if only:
output_group.update({k: v for k, v in args[j].items() if k in only})
else:
output_group.update(args[i][j])
combined.append(output_group)
return combined
# map of recombination rate across the X chromosome made by DECODE genetics
def decode_genetic_maps(decode_hg38_sexavg_per_gen, genetic_map_chrX):
inputs = [decode_hg38_sexavg_per_gen]
outputs = [genetic_map_chrX]
options = {'memory': '1g', 'walltime': '00:10:00'}
spec = f'''
cat {decode_hg38_sexavg_per_gen} | tail -n +2 | grep chrX | cut -f 2,4,5 | (echo pos COMBINED_rate Genetic_Map ; cat - ; ) > {genetic_map_chrX}
'''
return AnonymousTarget(inputs=inputs, outputs=outputs, options=options, spec=spec)
decode_hg38_sexavg_per_gen=f'{data_big}/decode_hg38_sexavg_per_gen.tsv'
genetic_map_chrX=f'{out_dir}/genetic_map_chrX.tsv'
gwf.target_from_template(f'decode_genetic_maps',
decode_genetic_maps(decode_hg38_sexavg_per_gen, genetic_map_chrX))
# turn diploid females (XX) into two individual haplotypes (haploid individuals) like males
def female_haploid(haploid_vcf, chrX_filtered_eagle2_phased, phased_haplotypes):
inputs = [haploid_vcf, chrX_filtered_eagle2_phased]
outputs = [phased_haplotypes]
options = {'memory': '10g', 'walltime': '01:20:00'}
spec = f'''
python {haploid_vcf} {chrX_filtered_eagle2_phased} | gzip > {phased_haplotypes}
'''
return AnonymousTarget(inputs=inputs, outputs=outputs, options=options, spec=spec)
haploid_vcf=f'{script_dir}/haploid_vcf.py'
chrX_filtered_eagle2_phased='/home/ari/ari-intern/people/ari/ariadna-intern/steps/1000genome/CCDG_14151_B01_GRM_WGS_2020-08-05_chrX.filtered.eagle2-phased.v2.vcf.gz'
phased_haplotypes=f'{out_dir}/1000g_phased_haplotypes.vcf.gz'
gwf.target_from_template(f'female_haploid',
female_haploid(haploid_vcf, chrX_filtered_eagle2_phased, phased_haplotypes))
# construct files with haplotype IDs
def haplotype_id(phased_haplotypes, phased_haplotypes_id):
inputs = [phased_haplotypes]
outputs = [phased_haplotypes_id]
options = {'memory': '10g', 'walltime': '00:60:00'}
spec = f'''
# conda install -c bioconda bcftools
# conda install openssl ## to install libcrypto.so.1.0.0 library
bcftools query -l {phased_haplotypes} > {phased_haplotypes_id}
sleep 5
'''
return AnonymousTarget(inputs=inputs, outputs=outputs, options=options, spec=spec)
phased_haplotypes=f'{out_dir}/1000g_phased_haplotypes.vcf.gz'
phased_haplotypes_id=f'{out_dir}/1000g_phased_haplotypes_ids.txt'
gwf.target_from_template(f'haplotype_id', haplotype_id(phased_haplotypes, phased_haplotypes_id))
# construct populations labels mapping each haplotype to a population
# (group haplotypes according to the population to which the individuals carrying those haplotypes belong)
def pop_labels(make_poplabels, phased_haplotypes_id, high_coverage_seq_index, related_high_coverage_seq_index, phased_haplotypes_poplabels):
inputs = [make_poplabels, phased_haplotypes_id, high_coverage_seq_index, related_high_coverage_seq_index]
outputs = [phased_haplotypes_poplabels]
options = {'memory': '10g', 'walltime': '00:60:00'}
spec = f'''
python {make_poplabels} {phased_haplotypes_id} {high_coverage_seq_index} {related_high_coverage_seq_index} > {phased_haplotypes_poplabels}
'''
return AnonymousTarget(inputs=inputs, outputs=outputs, options=options, spec=spec)
make_poplabels=f'{script_dir}/make_poplabels.py'
phased_haplotypes_id=f'{out_dir}/1000g_phased_haplotypes_ids.txt'
high_coverage_seq_index=f'{data_dir}/seq_index/1000G_2504_high_coverage.sequence.index'
related_high_coverage_seq_index=f'{data_dir}/seq_index/1000G_698_related_high_coverage.sequence.index'
phased_haplotypes_poplabels=f'{out_dir}/1000g_phased_haplotypes_poplabels.txt'
gwf.target_from_template(f'pop_labels',
pop_labels(make_poplabels, phased_haplotypes_id, high_coverage_seq_index, related_high_coverage_seq_index, phased_haplotypes_poplabels))
# Define the function to convert VCF to haps/sample format
def convert_vcf(RelateFileFormats, phased_haplotypes_haps, phased_haplotypes_sample, phased_haplotypes, phased_haplotypes_poplabels):
inputs = [RelateFileFormats, phased_haplotypes_poplabels, phased_haplotypes]
outputs = [phased_haplotypes_haps, phased_haplotypes_sample]
options = {'memory': '10g', 'walltime': '01:00:00'}
spec = f'''
{RelateFileFormats} --mode ConvertFromVcf --haps {phased_haplotypes_haps} --sample {phased_haplotypes_sample} -i {phased_haplotypes.replace('vcf.gz', '')} --poplabels {phased_haplotypes_poplabels}
sleep 20
touch {phased_haplotypes_haps}
touch {phased_haplotypes_sample}
'''
# Returning outputs as well
return AnonymousTarget(inputs=inputs, outputs=outputs, options=options, spec=spec)
# Define the file paths for input and output
RelateFileFormats = '/faststorage/project/ari-intern/people/ari/relate/bin/RelateFileFormats'
phased_haplotypes_poplabels = '/home/ari/ari-intern/people/ari/ariadna-intern/steps/1000g_phased_haplotypes_poplabels.txt'
phased_haplotypes = '/home/ari/ari-intern/people/ari/ariadna-intern/steps/1000g_phased_haplotypes.vcf.gz'
phased_haplotypes_haps = '/home/ari/ari-intern/people/ari/ariadna-intern/steps/1000g_phased_haplotypes.haps'
phased_haplotypes_sample = '/home/ari/ari-intern/people/ari/ariadna-intern/steps/1000g_phased_haplotypes.sample'
# Creating the target using the function
convert_vcf_target = convert_vcf(RelateFileFormats, phased_haplotypes_haps, phased_haplotypes_sample, phased_haplotypes, phased_haplotypes_poplabels)
# Adding the target to the workflow
gwf.target_from_template(f'convert_vcf', convert_vcf_target)
## start with specific population ##
# exclude related individuals to avoid biases arising from shared genetic material
def exclude_related(path, population):
output_dir = f'{out_dir}/{population}/excluded'
output_path = modify_path(path, parent=output_dir, suffix='_related.txt')
inputs = {'path' : path}
outputs = {'path' : output_path}
options = {'memory': '10g', 'walltime': '00:60:00'}
spec = f'''
mkdir -p {output_dir}
grep -v '#' {path} | cut -f 10 > {output_path}
'''
return AnonymousTarget(inputs=inputs, outputs=outputs, options=options, spec=spec)
# find IDs of haplotypes from all other populations so we can exclude them
def ids_other_ppl(path, population):
output_dir = f'{out_dir}/{population}/excluded'
output_path = modify_path(path, parent=output_dir, suffix='_non_ppl.txt')
inputs = {'path' : path}
outputs = {'path' : output_path}
options = {'memory': '10g', 'walltime': '00:60:00'}
spec = f'''
mkdir -p {output_dir}
grep -v {population} {path} | cut -f 1 -d ' ' > {output_path}
'''
return AnonymousTarget(inputs=inputs, outputs=outputs, options=options, spec=spec)
# combine excluded files: both related and non population individuals
def combine_files(path, related=None):
output_path = modify_path(path, base='', suffix='excluded_combined.txt')
out_dir = modify_path(output_path, base='', suffix='')
inputs = {'path': path, 'related': related}
outputs = {'path': output_path}
options = {'memory': '10g', 'walltime': '00:60:00'}
spec = f'''
mkdir -p {out_dir}
cat {path} {related} | sort | uniq > {output_path}
'''
return AnonymousTarget(inputs=inputs, outputs=outputs, options=options, spec=spec)
# construct a list of excluded individuals
def excluded_list(path, haplotype_id=None):
output_path = modify_path(path, base='', suffix='excluded_list.txt')
out_dir = modify_path(output_path, base='', suffix='')
inputs = {'path': path, 'haplotype_id': haplotype_id}
outputs = {'exclude_list': output_path}
options = {'memory': '10g', 'walltime': '00:60:00'}
spec = f'''
mkdir -p {out_dir}
grep -f {path} {haplotype_id} > {output_path}
'''
return AnonymousTarget(inputs=inputs, outputs=outputs, options=options, spec=spec)
# construct a list of only individuals from the population of interest
def pop_labels(exclude_list, poplabels=None):
output_dir = f'{out_dir}/{population}/included'
output_path = os.path.join(output_dir, 'included_pop_labels.txt')
inputs = {'exclude_list': exclude_list, 'poplabels': poplabels}
outputs = {'pop_label_list': output_path}
options = {'memory': '10g', 'walltime': '00:60:00'}
spec = f'''
mkdir -p {output_dir}
grep -v -f {exclude_list} {poplabels} > {output_path}
'''
return AnonymousTarget(inputs=inputs, outputs=outputs, options=options, spec=spec)
# prepare input files for RELATE
def prepare_files(exclude_list, haps=None, sample=None, ancestor=None, mask=None, poplabels=None):
directory = '/home/ari/ari-intern/people/ari/ariadna-intern/steps'
output_dir = f'{directory}/{population}/relate'
inputs = {'haps': haps, 'sample': sample, 'ancestor': ancestor, 'mask':mask, 'poplabels':poplabels, 'exclude_list':exclude_list}
output_path = os.path.join(output_dir, '1000g_ppl_phased_haplotypes')
# outputs: .haps, .sample, .dist (if --mask specified), .poplabels (if remove_ids & poplabels specified), .annot (if poplabels specified)
outputs = {'haps': output_path + '.haps', 'sample': output_path + '.sample', 'dist': output_path + '.dist', 'poplabels': output_path + '.poplabels', 'annot': output_path + '.annot'}
options = {'memory': '20g', 'walltime': '10:00:00'}
spec = f'''
mkdir -p {output_dir}
/home/ari/ari-intern/people/ari/relate/scripts/PrepareInputFiles/PrepareInputFiles.sh --haps {haps} --sample {sample} --ancestor {ancestor} --mask {mask} --remove_ids {exclude_list} --poplabels {poplabels} -o {output_path}
sleep 20
'''
return AnonymousTarget(inputs=inputs, outputs=outputs, options=options, spec=spec)
# compute sfs to make sure singletons are not missing (sanity check)
# zcat 1000g_LWK_phased_haplotypes.haps.gz | cut -d ' ' -f 4- | tr -d -c '1\n' | awk '{ print length; }' | sort -n | uniq -c
# run the inference of tree sequences using RELATE
def relate(genetic_map, sample_relate=None, haps_relate=None, annot_relate=None, dist_relate=None):
output_dir = f'/home/ari/ari-intern/people/ari/ariadna-intern/steps/{population}/relate/run_relate'
file_name = '1000g_ppl_phased_haplotypes'
output_path = os.path.join(output_dir, file_name)
inputs = {'sample_relate': sample_relate, 'haps_relate': haps_relate, 'annot_relate': annot_relate, 'dist_relate': dist_relate}
outputs = {'anc': output_path + '.anc', 'mut': output_path + '.mut'}
options = {'memory': '24g', 'walltime': '10:00:00'}
# program creates a temporary folder for temporary files and if it already exists relate won't run
spec= f'''
mkdir -p {output_dir}
cd {output_dir}
rm -rf {file_name}
/home/ari/ari-intern/people/ari/relate/bin/Relate --mode All -m 1.25e-8 -N 20000 --sample {sample_relate} --haps {haps_relate} --map {genetic_map} --annot {annot_relate} --dist {dist_relate} --memory 20 -o {file_name}
sleep 90
'''
return AnonymousTarget(inputs=inputs, outputs=outputs, options=options, spec=spec)
# estimate historical population size trajectory from initially inferred tree sequences
# setting --threshold 0. This is so that the branch lengths in all trees are updated for the estimated population size history.
def estimate_ppl_size(anc_size=None, mut_size=None, poplabels_size=None):
output_dir = f'/home/ari/ari-intern/people/ari/ariadna-intern/steps/{population}/relate/run_relate'
file_name_input = '1000g_ppl_phased_haplotypes'
file_name_output = '1000g_ppl_phased_haplotypes_demog'
output_path = os.path.join(output_dir, file_name_output)
# inputs: inferred .anc/.mut files and a .poplabels file
inputs = {'anc_size': anc_size, 'mut_size': mut_size, 'poplabels_size': poplabels_size}
# outputs: two versions of coalescence rates/population sizes are outputted
## .coal --> contains coalescence rates and cross-coalescence rates, treating all samples as one population
## *.pairwise.coal/.bin --> coalescence rate file and corresponding binary file containing coalescence rates between pairs of samples
outputs = {'coal': output_path + '.coal', 'pairwise_coal': output_path + '.pairwise.coal', 'pairwise_bin': output_path + '.pairwise.bin'}
options = {'memory': '8g', 'walltime': '08:00:00'}
spec = f'''
mkdir -p {output_dir}
cd {output_dir}
rm -rf {file_name_output}
/home/ari/ari-intern/people/ari/relate/scripts/EstimatePopulationSize/EstimatePopulationSize.sh -m 1.25e-8 -N 20000 -i {file_name_input} --poplabels {poplabels_size} -o {file_name_output} --threshold 0 --num_iter 5 --years_per_gen 29 --threads 14 --threshhold 0
sleep 20
'''
return AnonymousTarget(inputs=inputs, outputs=outputs, options=options, spec=spec)
# detect selection using RELATEs builtin statistic
def detect_selection(anc_selection=None, mut_selection=None, poplabels_selection=None):
output_dir = f'/home/ari/ari-intern/people/ari/ariadna-intern/steps/{population}/relate/run_relate'
file_name_input = '1000g_ppl_phased_haplotypes_demog'
file_name_output = '1000g_ppl_phased_haplotypes_selection'
output_path = os.path.join(output_dir, file_name_output)
inputs = {'anc_selection': anc_selection, 'mut_selection': mut_selection, 'poplabels_selection': poplabels_selection}
# .freq --> Records the frequency of the derived allele at generations genN .. gen1
# .lin --> Records the number of lineages in the tree at generations genN .. gen1 as well as the number of lineages when the mutation had frequency 2
# .sele --> Records the log10 p-value for selection evidence at generations genN .. gen1 as well as the log10 p-value when the
# mutation had frequency 2. Log10 p-value is set to 1 if mutation had frequency <= 1 at a generation.
outputs = {'freq_selection': output_path + '.freq', 'lin_selection': output_path + '.lin', 'sele_selection': output_path + '.sele'}
options = {'memory': '20g', 'walltime': '10:00:00'}
spec = f'''
mkdir -p {output_dir}
cd {output_dir}
rm -rf {file_name_output}
/home/ari/ari-intern/people/ari/relate/scripts/DetectSelection/DetectSelection.sh -i {file_name_input} -m 1.25e-8 --poplabels {poplabels_size} -o {file_name_output}
sleep 80
'''
return AnonymousTarget(inputs=inputs, outputs=outputs, options=options, spec=spec)
# convert to tree sequence file format (tskit)
# this function converts anc/mut files inferred by Relate into the tree sequence file format used by tskit. In the current
# implementation, each tree is stored with new nodes in the tree sequence file format, leading to no compression. In addition,
# information about how long branches persist, and how many mutations map to a branch are lost by this conversion.
def tree_seq(anc_convert=None, mut_convert=None):
output_dir = f'/home/ari/ari-intern/people/ari/ariadna-intern/steps/{population}/relate/run_relate'
file_name_input = '1000g_ppl_phased_haplotypes'
file_name_output = '1000g_ppl_phased_haplotypes'
output_path = os.path.join(output_dir, file_name_output)
# inputs: .anc (genealogical relationships info) and .mut (mutations info)
inputs = {'anc_convert': anc_convert, 'mut_convert': mut_convert}
# outputs: .trees (combination of both inputs)
outputs = {'trees_convert': output_path + '.trees'}
options = {'memory': '8g', 'walltime': '04:00:00'}
spec = f'''
mkdir -p {output_dir}
cd {output_dir}
rm -rf {file_name_output}
/home/ari/ari-intern/people/ari/relate/bin/RelateFileFormats --mode ConvertToTreeSequence -i {file_name_input} -o {file_name_output}
'''
return AnonymousTarget(inputs=inputs, outputs=outputs, options=options, spec=spec)
#populations = ['GWD', 'ESN', 'MSL', 'YRI', 'LWK'] # african ancestry --> DONE
populations = ['KHV']
#populations = ['GBR', 'FIN', 'IBS',# 'TSI'] + PUR (puerto ricans)# european ancestry --> DONE
#populations = ['CDX', 'CHB', 'CHS', #'JPT', 'KHV'] # east asian ancestry
#populations = ['LWK', 'GWD', 'ESN', 'MSL', 'YRI', 'GBR', 'FIN', 'IBS', 'TSI', 'CDX', 'CHB', 'CHS', 'JPT', 'KHV']
# append a unique identifier to each target name to ensure they are distinct (name = ...{population})
for population in populations:
# exlcude related
input_related = [(f'{data_dir}/seq_index/1000G_698_related_high_coverage.sequence.index', population)]
related_target = gwf.map(exclude_related, input_related, name=f"exclude_related_{population}")
related = related_target.outputs[0] # list
# get ids for other populations
input_other_ppl = [(f'{out_dir}/1000g_phased_haplotypes_poplabels.txt', population)]
other_ppl_target = gwf.map(ids_other_ppl, input_other_ppl, name=f"ids_other_ppl_{population}")
# combine related and other populations
combine_target = gwf.map(combine_files, other_ppl_target.outputs, extra = {'related':related}, name=f"combine_files_{population}")
# list of excluded
haplotype_ids = f'{out_dir}/1000g_phased_haplotypes_ids.txt'
exclude_list_target = gwf.map(excluded_list, combine_target.outputs, extra = {'haplotype_id':haplotype_ids}, name=f"excluded_list_{population}")
# list of included
poplabels = f'{out_dir}/1000g_phased_haplotypes_poplabels.txt'
pop_labels_target = gwf.map(pop_labels, exclude_list_target.outputs, extra = {'poplabels':poplabels}, name=f"pop_labels_{population}")
# RELATE DIRECTORY !
relate_dir = f'/home/ari/ari-intern/people/ari/ariadna-intern/steps/{population}/relate' # relate directory
# PREPARE INPUT
haps = '/home/ari/ari-intern/people/ari/ariadna-intern/steps/1000g_phased_haplotypes.haps'
sample = '/home/ari/ari-intern/people/ari/ariadna-intern/steps/1000g_phased_haplotypes.sample'
ancestor = f'{data_dir}/homo_sapiens_ancestor_GRCh38/homo_sapiens_ancestor_X.fa'
mask = f'{data_dir}/20160622.chrX.mask.fasta'
poplabels = f'{out_dir}/1000g_phased_haplotypes_poplabels.txt'
prepare_target = gwf.map(prepare_files, exclude_list_target.outputs,
extra = {'haps': haps, 'sample': sample, 'ancestor': ancestor, 'mask':mask, 'poplabels':poplabels}, name=f"prepare_files_{population}")
# RUN RELATE
sample_relate = f'{relate_dir}/1000g_ppl_phased_haplotypes.sample.gz'
haps_relate = f'{relate_dir}/1000g_ppl_phased_haplotypes.haps.gz'
genetic_map = '/home/ari/ari-intern/people/ari/ariadna-intern/steps/genetic_map_chrX.tsv'
annot_relate = f'{relate_dir}/1000g_ppl_phased_haplotypes.annot'
dist_relate = f'{relate_dir}/1000g_ppl_phased_haplotypes.dist.gz'
run_relate_target = gwf.map(relate, [genetic_map], extra = {'haps_relate': haps_relate, 'sample_relate': sample_relate, 'annot_relate': annot_relate, 'dist_relate': dist_relate}, name=f"relate_{population}")
# ESTIMATE POPULATION SIZES
anc_size = f'{relate_dir}/run_relate/1000g_ppl_phased_haplotypes.anc'
mut_size = f'{relate_dir}/run_relate/1000g_ppl_phased_haplotypes.mut'
poplabels_size = f'{relate_dir}/1000g_ppl_phased_haplotypes.poplabels'
ppl_size_target = gwf.map(estimate_ppl_size, [anc_size], extra = {'mut_size': mut_size, 'poplabels_size': poplabels_size}, name=f"estimate_ppl_size_{population}")
# DETECT SELECTION
anc_selection = f'{relate_dir}/run_relate/1000g_ppl_phased_haplotypes_demog.anc.gz'
mut_selection = f'{relate_dir}/run_relate/1000g_ppl_phased_haplotypes_demog.mut.gz'
poplabels_selection = f'{relate_dir}/1000g_ppl_phased_haplotypes.poplabels'
detect_selection_target = gwf.map(detect_selection, [anc_selection], extra = {'mut_selection': mut_selection, 'poplabels_selection': poplabels_selection}, name=f"detect_selection_{population}")
# CONVERT TO TREE SEQUENCE FILE FORMAT (tskit)
anc_convert = f'{relate_dir}/run_relate/1000g_ppl_phased_haplotypes.anc'
mut_convert = f'{relate_dir}/run_relate/1000g_ppl_phased_haplotypes.mut'
tree_seq_target = gwf.map(tree_seq, [anc_convert], extra = {'mut_convert': mut_convert}, name=f"tree_convert_{population}")