master_workflow
from gwf import * import os import subprocess import os.path as op import pandas as pd from cooler.fileops import list_coolers from pprintpp import pprint as pp
gwf
workflow for the full pipeline from downloading raw reads to compartmentalization analysis. We are using bwa mem
for mapping.
How to run: conda activate hic gwf -f master_workflow.py status
Workflow: 1a bwa_index : Index the reference genome with bwa index
1b sam_index : Index the fasta again with samtools faidx
1c download_reads : Download raw reads from SRA sra-downloader
with SRR IDs (paired-end reads) 2 bwa_map : Map reads (PE) to the reference with bwa mem
3 pair_sort : Pair the merged alignments from mate-pair sequencing with pairtools parse
, then sort with pairtools sort
4 dedup : Deduplicate the sorted pairs with pairtools dedup
5 make_pairs_cool : Create a .cool file from the deduplicated pairs with cooler cload pairs
Create a workflow object gwf = Workflow(defaults={‘nodes’: 1, ‘queue’:“normal”, ‘account’:“hic-spermatogenesis”})
Templates
def download_index_reference(name, refdir): ““” Download the reference genome from NCBI, and index it for bwa ““” out_dir = op.join(refdir, name) inputs = [] outputs = [f”{out_dir}/index/bwa/{name}.fa”, f”{out_dir}/index/bwa/{name}.fa.amb”, f”{out_dir}/index/bwa/{name}.fa.ann”, f”{out_dir}/index/bwa/{name}.fa.bwt”, f”{out_dir}/index/bwa/{name}.fa.pac”, f”{out_dir}/index/bwa/{name}.fa.sa”, f”{out_dir}/assembly_report.txt”, f”{out_dir}/{name}.fa.sizes”, f”{out_dir}/{name}.fa.fai”, f”{out_dir}/{name}.gaps.bed”, ] options = { ‘memory’: ‘16g’, ‘walltime’: ‘01:00:00’, ‘cores’: 8, } spec = f”“” source $(conda info –base)/etc/profile.d/conda.sh conda activate hic genomepy plugin enable bwa &&
genomepy install {name} –provider ncbi –genomes_dir {refdir} –threads 8 ““” return AnonymousTarget(inputs=inputs, outputs=outputs, options=options, spec=spec)
def download_reads(sra_downloader, srr_id, read_dir): “““Download raw reads from SRA”“” inputs = [sra_downloader] outputs = [f”{read_dir}/{srr_id}_1.fastq.gz”, f”{read_dir}/{srr_id}2.fastq.gz”, f”{read_dir}/dl{srr_id}.done”] options = {‘cores’:16, ‘memory’: “4g”, ‘walltime’:“06:00:00”} spec = f”“” singularity run {sra_downloader} {srr_id} –save-dir {read_dir} –cores 16 &&
touch {read_dir}/dl_{srr_id}.done ““” return AnonymousTarget(inputs=inputs, outputs=outputs, options=options, spec=spec)
def bwa_index(ref_genome): “““Template for indexing the reference genome with bwa”“” inputs = [ref_genome] outputs = [f”{ref_genome}.amb”, f”{ref_genome}.ann”, f”{ref_genome}.bwt”, f”{ref_genome}.pac”, f”{ref_genome}.sa”] options = {‘cores’:1, ‘memory’: “8g”, ‘walltime’:“02:00:00”} spec = f’’’ source $(conda info –base)/etc/profile.d/conda.sh conda activate hic bwa index -p {ref_genome} -a bwtsw {ref_genome} ’’’ return AnonymousTarget(inputs=inputs, outputs=outputs, options=options, spec=spec)
def sam_index(ref_genome): “““Creating a fasta index. bwa mem
also needs a fasta index generated by samtools”“” inputs = [ref_genome] outputs = [f”{ref_genome}.fai”] options = {‘cores’:1, ‘memory’:“1g”, ‘walltime’:“00:20:00”} spec=f”“” source $(conda info –base)/etc/profile.d/conda.sh conda activate hic samtools faidx {ref_genome} ““” return AnonymousTarget(inputs=inputs, outputs=outputs, options=options, spec=spec)
def bwa_map(ref_genome, mate_1, mate_2, download_ok, out_bam): ““” Template for mapping reads to a reference genome using bwa
and samtools
. NB! Here, we map the mates together, as bwa states it is no problem for Hi-C reads. ““” threads = 32 inputs = [f”{ref_genome}.amb”, f”{ref_genome}.ann”, f”{ref_genome}.bwt”, f”{ref_genome}.pac”, f”{ref_genome}.sa”, #f”{ref_genome}.fai”, mate_1, mate_2, download_ok] outputs = [out_bam] options = {‘cores’:threads, ‘memory’: “32g”, ‘walltime’:“24:00:00”} spec = f”“” source $(conda info –base)/etc/profile.d/conda.sh conda activate hic bwa mem -t {threads} -SP {ref_genome} {mate_1} {mate_2} > {out_bam} ““” return AnonymousTarget(inputs=inputs, outputs=outputs, options=options, spec=spec)
Updated version uses –walks-policy 5unique in stead of mask def pair_sort_alignments(chromsizes, bam_merged, sorted_pairs): “““Convert the paired-end alignments to .pairs with pairtools parse
, then sort”“” inputs = [bam_merged] outputs = [f”{bam_merged}_parsed.stats”, sorted_pairs] options = {‘cores’:32, ‘memory’:“16g”, ‘walltime’:“10:00:00”} spec=f”“” source $(conda info –base)/etc/profile.d/conda.sh conda activate hic pairtools parse
-c {chromsizes}
–drop-sam –drop-seq
–output-stats {bam_merged}_parsed.stats
–add-columns mapq
–assembly ‘T2T-MMU8v1’
–walks-policy 5unique
{bam_merged} |
pairtools sort -o {sorted_pairs} ““” return AnonymousTarget(inputs=inputs, outputs=outputs, options=options, spec=spec)
def dedup(sorted_pairs, chromsizes): “““Deduplicate the sorted pairs with pairtools dedup
”“” pairs_prefix = sorted_pairs.split(“.sorted”)[0] inputs = [sorted_pairs] outputs = [f”{pairs_prefix}.nodups.pairs.gz”, f”{pairs_prefix}.unmapped.pairs.gz”, f”{pairs_prefix}.dups.pairs.gz”, f”{pairs_prefix}.dedup.stats”, f”{pairs_prefix}.dedup.done”] options = {‘cores’:12, ‘memory’: “4g”, ‘walltime’: “03:30:00”} spec = f”“” source $(conda info –base)/etc/profile.d/conda.sh conda activate hic pairtools dedup
–max-mismatch 3
–mark-dups
–chunksize 100000
–chrom-subset {chromsizes}
–output {pairs_prefix}.nodups.pairs.gz
–output-unmapped {pairs_prefix}.unmapped.pairs.gz
–output-stats {pairs_prefix}.dedup.stats
–output-dups {pairs_prefix}.dups.pairs.gz
{sorted_pairs} &&
touch {pairs_prefix}.dedup.done ““” return AnonymousTarget(inputs=inputs, outputs=outputs, options=options, spec=spec)
Updated version uses pairtools select to filter out low quality reads def make_pairs_cool(chromsizes, pairs, cool_out): “““Create coolers from pairs with cooler cload pairs
”“” base = os.path.basename(pairs).split(“.pairs.gz”)[0] inputs = [pairs] outputs = [cool_out] options = {‘cores’:1, ‘memory’:“8g”, ‘walltime’:“06:30:00”} spec = f”“” source $(conda info –base)/etc/profile.d/conda.sh conda activate hic pairtools select “(mapq1>=30) and (mapq2>=30)” {pairs} |
cooler cload pairs
-c1 2 -p1 3 -c2 4 -p2 5
–assembly ‘T2T-MMU8v1’
–chunksize 200000
{chromsizes}:1000
-
{cool_out} ““” return AnonymousTarget(inputs=inputs, outputs=outputs, options=options, spec=spec)
def merge_zoomify_balance(cooler_list, merged, mcool): “““Merge a given list of coolers with cooler merge
and then zoomify the merged cooler with cooler zoomify
”“” cooler_list_unix_str = ” “.join(cooler_list) inputs = [cooler_list] outputs = [merged, mcool] options = {‘cores’:16, ‘memory’:”32g”, ‘walltime’:“08:00:00”} spec = f”“” source $(conda info –base)/etc/profile.d/conda.sh conda activate hic cooler merge -c 50000000 {merged} {cooler_list_unix_str} &&
cooler zoomify –nproc 16
–resolutions 1000,5000,10000,50000,100000
–balance
–balance-args ‘–nproc 16’
-o {mcool}
{merged} ““” return AnonymousTarget(inputs=inputs, outputs=outputs, options=options, spec=spec)
DRAFT: Not implemented as a target yet def balance_cooler_cis(cool_in, cool_out): “““Balance a cooler with cooler balance
”“” mcool = cool_in.split(“::”)[0] inputs = [mcool] outputs = [cool_out] options = {‘cores’:8, ‘memory’:“32g”, ‘walltime’:“03:00:00”} spec = f”“” source $(conda info –base)/etc/profile.d/conda.sh conda activate hic cooler balance -p 32 –cis-only –name cis_weights {cool_in} &&
touch {cool_out} ““” return AnonymousTarget(inputs=inputs, outputs=outputs, options=options, spec=spec)
Set up the folder structure for the workflow # ###############################################
# Define our working dir main_dir = op.dirname(file) msc_dir = op.join(main_dir, “../hic-spermatogenesis/”)
Define the reads directory (download reads here) reads_dir = op.join(“steps/macaque_raw/downloaded/”)
Check to find SRA-downloader, else pull from docker sra_downloader = “steps/sra-downloader.sif”
if not op.exists(sra_downloader): command = [“singularity”, “pull”, sra_downloader, “docker://wwydmanski/sra-downloader”] try: result = subprocess.run(command, check=True, capture_output=True, text=True) print(result.stdout) print(result.stderr) except subprocess.CalledProcessError as e: print(e.stdout) print(e.stderr)
Define subdirs for tissue type, strip_whitespace and make lowercase sra_runtable = pd.read_csv(“data/SraRunTable.txt”) reads_subdirs = set(sra_runtable[“source_name”])
Make a dict mapping the tissue type [‘source_name’] to the SRR IDs [‘Run’] tissue_dict = {tissue: sra_runtable[sra_runtable[“source_name”]==tissue][“Run”].tolist() for tissue in reads_subdirs}
Define the output dirs for files bam_dir = op.join(main_dir,“steps/bamfiles”) pair_dir = op.join(main_dir, “steps/pairs”)
cool_dir = op.join(main_dir, “steps/cool”)
Targets
Download the reference genome ref_name = “T2T-MMU8v1.0” ref_dir = op.join(main_dir, “steps/ref”)
T0 = gwf.target_from_template( f”install_T2T_MMU8V1”, download_index_reference(ref_name, ref_dir) )
ref_genome = T0.outputs[0] ref_dir = op.join(ref_dir, ref_name) ## Update ref_dir chromsizes = op.join(ref_dir, f”{ref_name}.fa.sizes”)
Index the reference genome
T1a = gwf.target_from_template(f”bwa_index_{os.path.basename(ref_genome).split(‘.’)[0]}“, bwa_index(ref_genome=ref_genome)) T1b = gwf.target_from_template(f”sam_index_{os.path.basename(ref_genome).split(‘.’)[0]}“, sam_index(ref_genome=ref_genome))
T5out is a dict to store the cool files for each tissue type to be merged in the end T5out = {}
T1c target for each ID in the SRA runtable for id in sra_runtable[“Run”]:
# Do some stuff before downloading the reads
# Get the tissue type for the ID, and assign a subdirectory for the reads
sub_dir = None
for cell_type, ids in tissue_dict.items():
if id in ids:
sub_dir = cell_type.lower().replace(" ", "_")
break
if sub_dir is None:
raise ValueError(f"ID {id} not found in any tissue type")
fastq_dir = op.join(reads_dir, sub_dir)
# Create subdirs if they don't exist
if not op.exists(fastq_dir):
os.makedirs(fastq_dir)
# T1c: Download SRA ID
T1c = gwf.target_from_template(f"dl_{id}",
download_reads(
sra_downloader = sra_downloader,
srr_id = id,
read_dir = fastq_dir
))
# T2: Map the reads to the reference genome
bam_wdir = op.join(bam_dir, sub_dir)
out_bam = op.join(bam_wdir, f"{id}.bam")
if not op.exists(bam_wdir):
os.makedirs(bam_wdir)
T2 = gwf.target_from_template(f"bwa_map_{id}",
bwa_map(ref_genome=ref_genome,
mate_1=T1c.outputs[0], mate_2=T1c.outputs[1],
download_ok=T1c.outputs[2],
out_bam=out_bam))
# T3: pair and sort the alignments
pair_subdir = op.join(pair_dir, sub_dir)
sorted_pairs = op.join(pair_subdir, f"{id}.sorted.pairs.gz")
if not op.exists(pair_subdir):
os.makedirs(pair_subdir)
T3 = gwf.target_from_template(f"parse_{id}",
pair_sort_alignments(chromsizes=chromsizes,
bam_merged=T2.outputs[0],
sorted_pairs=sorted_pairs))
# T4: Deduplicate the sorted pairs
# Dedup dir is the same as pair dir
# NB we are using the reduced chromsizes file (only 'real' chromosomes)
T4 = gwf.target_from_template(f"dedup_{id}",
dedup(sorted_pairs=T3.outputs[1],
chromsizes=chromsizes))
# T5: Create a .cool file from the deduplicated pairs
cool_subdir = op.join(cool_dir, sub_dir)
if not op.exists(cool_subdir):
os.makedirs(cool_subdir)
T4outpairs = [x for x in T4.outputs if x.endswith(".pairs.gz")]
if sub_dir not in T5out:
T5out[sub_dir] = []
for T4out in T4outpairs:
# Filter to be removed if we want to
# Only make a cool file of the 'nodups' pairs
if 'nodups' not in T4out:
continue
cool_name = f"{id}"
cool_file = os.path.join(cool_subdir, f"{cool_name}.1000.cool")
T5 = gwf.target_from_template(f"coolify_{cool_name}",
make_pairs_cool(chromsizes, pairs=T4out, cool_out=cool_file))
T5out[sub_dir].append(T5.outputs[0])
Merge the coolers for each tissue type (group) T6out = {} for subdir,cool_list in T5out.items(): T6out[subdir] = gwf.target_from_template(f”merge_zoom_balance_{subdir}“, merge_zoomify_balance(cooler_list=cool_list, merged=op.join(cool_dir, subdir, f”{subdir}.merged.cool”), mcool=op.join(cool_dir, subdir, f”{subdir}.merged.mcool”))) pp(T6out[subdir].inputs[0])