from gwf import *
import os
import os.path as op
import pandas as pd
from pprintpp import pprint as pp
#######################################
#
# `gwf` workflow for the full pipeline from downloading raw reads
# to compartmentalization analysis. We are using `bwa mem` for mapping.
#
# How to run:
# conda activate hic
# gwf -f master_workflow.py status
#
# Workflow:
# 1a bwa_index : Index the reference genome with `bwa index`
# 1b sam_index : Index the fasta again with `samtools faidx`
# 1c download_reads : Download raw reads from SRA
# `sra-downloader` with SRR IDs (paired-end reads)
# 2 bwa_map : Map reads (PE) to the reference with `bwa mem`
# 3 pair_sort : Pair the merged alignments from mate-pair sequencing with `pairtools parse`,
# then sort with `pairtools sort`
# 4 dedup : Deduplicate the sorted pairs with `pairtools dedup`
# 5 make_pairs_cool : Create a .cool file from the deduplicated pairs with `cooler cload pairs`
#
#######################################
# Create a workflow object
gwf = Workflow(defaults={'nodes': 1, 'queue':"normal", 'account':"hic-spermatogenesis"})
#############################################
############### Templates ###################
#############################################
def download_reads(sra_downloader, srr_id, read_dir):
"""Download raw reads from SRA"""
inputs = [sra_downloader]
outputs = [f"{read_dir}/{srr_id}_1.fastq.gz",
f"{read_dir}/{srr_id}_2.fastq.gz",
f"{read_dir}/dl_{srr_id}.done"]
options = {'cores':32, 'memory': "4g", 'walltime':"06:00:00"}
spec = f"""
singularity run {sra_downloader} {srr_id} --save-dir {read_dir} --cores 16 && \
touch {read_dir}/dl_{srr_id}.done
"""
return AnonymousTarget(inputs=inputs, outputs=outputs, options=options, spec=spec)
def bwa_index(ref_genome):
"""Template for indexing the reference genome with bwa"""
inputs = [ref_genome]
outputs = [f"{ref_genome}.amb",
f"{ref_genome}.ann",
f"{ref_genome}.bwt",
f"{ref_genome}.pac",
f"{ref_genome}.sa"]
options = {'cores':1, 'memory': "5g", 'walltime':"02:00:00"}
spec = f'''
source $(conda info --base)/etc/profile.d/conda.sh
conda activate hic
bwa index -p {ref_genome} -a bwtsw {ref_genome}
'''
return AnonymousTarget(inputs=inputs, outputs=outputs, options=options, spec=spec)
def sam_index(ref_genome):
"""Creating a fasta index. `bwa mem` also needs a fasta index generated by samtools"""
inputs = [ref_genome]
outputs = [f"{ref_genome}.fai"]
options = {'cores':1, 'memory':"5g", 'walltime':"00:20:00"}
spec=f"""
source $(conda info --base)/etc/profile.d/conda.sh
conda activate hic
samtools faidx {ref_genome}
"""
return AnonymousTarget(inputs=inputs, outputs=outputs, options=options, spec=spec)
def bwa_map(ref_genome, mate_1, mate_2, download_ok, out_bam):
"""
Template for mapping reads to a reference genome using `bwa` and `samtools`.
NB! Here, we map the mates together, as bwa states it is no problem for Hi-C reads.
"""
threads = 32
inputs = [f"{ref_genome}.amb",
f"{ref_genome}.ann",
f"{ref_genome}.bwt",
f"{ref_genome}.pac",
f"{ref_genome}.sa",
f"{ref_genome}.fai",
mate_1, mate_2,
download_ok]
outputs = [out_bam]
options = {'cores':threads, 'memory': "32g", 'walltime':"24:00:00"}
spec = f"""
source $(conda info --base)/etc/profile.d/conda.sh
conda activate hic
bwa mem -t {threads} -SP {ref_genome} {mate_1} {mate_2} > {out_bam}
"""
return AnonymousTarget(inputs=inputs, outputs=outputs, options=options, spec=spec)
# Updated version uses --walks-policy 5unique in stead of mask
def pair_sort_alignments(chromsizes, bam_merged, sorted_pairs):
"""Convert the paired-end alignments to .pairs with `pairtools parse`, then sort"""
inputs = [bam_merged]
outputs = [f"{bam_merged}_parsed.stats",
sorted_pairs]
options = {'cores':32, 'memory':"16g", 'walltime':"08:00:00"}
spec=f"""
source $(conda info --base)/etc/profile.d/conda.sh
conda activate hic
pairtools parse \
-c {chromsizes} \
--drop-sam --drop-seq \
--output-stats {bam_merged}_parsed.stats \
--add-columns mapq \
--assembly rheMac10 \
--walks-policy 5unique \
{bam_merged} | \
pairtools sort -o {sorted_pairs}
"""
return AnonymousTarget(inputs=inputs, outputs=outputs, options=options, spec=spec)
def dedup(sorted_pairs, chromsizes):
"""Deduplicate the sorted pairs with `pairtools dedup`"""
pairs_prefix = sorted_pairs.split(".sorted")[0]
inputs = [sorted_pairs]
outputs = [f"{pairs_prefix}.nodups.pairs.gz",
f"{pairs_prefix}.unmapped.pairs.gz",
f"{pairs_prefix}.dups.pairs.gz",
f"{pairs_prefix}.dedup.stats",
f"{pairs_prefix}.dedup.done"]
options = {'cores':12, 'memory': "16g", 'walltime': "06:00:00"}
spec = f"""
source $(conda info --base)/etc/profile.d/conda.sh
conda activate hic
pairtools dedup \
--max-mismatch 3 \
--mark-dups \
--chunksize 100000 \
--chrom-subset {chromsizes} \
--output {pairs_prefix}.nodups.pairs.gz \
--output-unmapped {pairs_prefix}.unmapped.pairs.gz \
--output-stats {pairs_prefix}.dedup.stats \
--output-dups {pairs_prefix}.dups.pairs.gz \
{sorted_pairs} && \
touch {pairs_prefix}.dedup.done
"""
return AnonymousTarget(inputs=inputs, outputs=outputs, options=options, spec=spec)
# Updated version uses pairtools select to filter out low quality reads
def make_pairs_cool(chromsizes, pairs, cool_out):
"""Create coolers from pairs with `cooler cload pairs`"""
base = os.path.basename(pairs).split(".pairs.gz")[0]
inputs = [pairs]
outputs = [cool_out]
options = {'cores':1, 'memory':"4g", 'walltime':"03:00:00"}
spec = f"""
source $(conda info --base)/etc/profile.d/conda.sh
conda activate hic
pairtools select "(mapq1>=30) and (mapq2>=30)" {pairs} | \
cooler cload pairs \
-c1 2 -p1 3 -c2 4 -p2 5 \
--assembly rheMac10 \
--chunksize 50000 \
{chromsizes}:10000 \
- \
{cool_out}
"""
return AnonymousTarget(inputs=inputs, outputs=outputs, options=options, spec=spec)
# DRAFT: Not implemented as a target yet
def merge_zoomify_coolers(cooler_list, merged, mcool):
"""Merge a given list of coolers with `cooler merge`"""
inputs = [cooler_list]
outputs = [merged, mcool]
options = {'cores':8, 'memory':"32g", 'walltime':"03:00:00"}
spec = f"""
source $(conda info --base)/etc/profile.d/conda.sh
conda activate hic
cooler merge -c 50000000 {merged} {cooler_list} && \
cooler zoomify --nproc 8 \
--resolutions 10000,50000,100000,500000 \
-o {mcool} \
{merged}
"""
return AnonymousTarget(inputs=inputs, outputs=outputs, options=options, spec=spec)
# DRAFT: Not implemented as a target yet
def balance_cooler_default(cool_in, cool_out):
"""Balance a cooler with `cooler balance`"""
mcool = cool_in.split("::")[0]
inputs = [mcool]
outputs = [cool_out]
options = {'cores':8, 'memory':"32g", 'walltime':"03:00:00"}
spec = f"""
source $(conda info --base)/etc/profile.d/conda.sh
conda activate hic
cooler balance -p 32 {cool_in} && \
touch {cool_out}
"""
return AnonymousTarget(inputs=inputs, outputs=outputs, options=options, spec=spec)
# DRAFT: Not implemented as a target yet
def balance_cooler_cis(cool_in, cool_out):
"""Balance a cooler with `cooler balance`"""
mcool = cool_in.split("::")[0]
inputs = [mcool]
outputs = [cool_out]
options = {'cores':8, 'memory':"32g", 'walltime':"03:00:00"}
spec = f"""
source $(conda info --base)/etc/profile.d/conda.sh
conda activate hic
cooler balance -p 32 --cis-only --name cis_weights {cool_in} && \
touch {cool_out}
"""
return AnonymousTarget(inputs=inputs, outputs=outputs, options=options, spec=spec)
################################################
# Set up the folder structure for the workflow #
################################################
# Define our working dir
dirname = op.dirname(__file__)
# Define reference genome directory (relative to project/script dir)
ref_dir = "data/links/ucsc_ref/"
ref_genome = op.join(dirname, ref_dir, "rheMac10.fa")
chromsizes = op.join(dirname, ref_dir, "misc/rheMac10.chrom.sizes")
filtered_chromsizes = op.join(dirname, ref_dir, "misc/rheMac10.filtered.chrom.sizes")
# Locate the reference genome
if not op.exists(ref_genome):
raise FileNotFoundError(f"Reference genome not found: {ref_genome}")
else:
#print(f"Reference genome found at: \n{ref_genome}")
pass
# Define the reads directory
reads_dir = op.join(dirname, "../../../data/macaque_raw/downloaded/")
# Define subdirs for tissue type, strip_whitespace and make lowercase
sra_runtable = pd.read_csv("data/SraRunTable.txt")
reads_subdirs = set(sra_runtable["source_name"])
# Make a dict mapping the tissue type ['source_name'] to the SRR IDs ['Run']
tissue_dict = {tissue: sra_runtable[sra_runtable["source_name"]==tissue]["Run"].tolist() for tissue in reads_subdirs}
# Define the output dirs for files
bam_dir = op.join(dirname,"steps/bwa/PE/bamfiles")
pair_dir = op.join(dirname, "steps/bwa/PE/pairs")
cool_dir = op.join(dirname, "steps/bwa/PE/cool")
#### Updated version creates the new .pairs and .cool files under 'recPE' for recommended PE. --walks-policy 5unique, filter mapq >= 30
rec_pair_dir = op.join(dirname, "steps/bwa/recPE/pairs")
rec_cool_dir = op.join(dirname, "steps/bwa/recPE/cool")
pair_dir = rec_pair_dir
cool_dir = rec_cool_dir
#############################################
############### Targets #####################
#############################################
T1a = gwf.target_from_template(f"bwa_index_{os.path.basename(ref_genome).split('.')[0]}",
bwa_index(ref_genome=ref_genome))
T1b = gwf.target_from_template(f"sam_index_{os.path.basename(ref_genome).split('.')[0]}",
sam_index(ref_genome=ref_genome))
T5out = {} # initialize a dict to store the cool files (T5)
# T1c target for each ID in the SRA runtable
for id in sra_runtable["Run"]:
# Do some stuff before downloading the reads
# Get the tissue type for the ID, and assign a subdirectory for the reads
sub_dir = None
for cell_type, ids in tissue_dict.items():
if id in ids:
sub_dir = cell_type.lower().replace(" ", "_")
break
if sub_dir is None:
raise ValueError(f"ID {id} not found in any tissue type")
fastq_dir = op.join(reads_dir, sub_dir)
# Create subdirs if they don't exist
if not op.exists(fastq_dir):
os.makedirs(fastq_dir)
# T1c: Download SRA ID
T1c = gwf.target_from_template(f"dl_{id}",
download_reads(
sra_downloader = "../../../data/macaque_raw/sra-downloader_latest.sif",
srr_id = id,
read_dir = fastq_dir
))
# T2: Map the reads to the reference genome
bam_wdir = op.join(bam_dir, sub_dir)
out_bam = op.join(bam_wdir, f"{id}.PE.bam")
if not op.exists(bam_wdir):
os.makedirs(bam_wdir)
T2 = gwf.target_from_template(f"bwa_map_{id}",
bwa_map(ref_genome=ref_genome,
mate_1=T1c.outputs[0], mate_2=T1c.outputs[1],
download_ok=T1c.outputs[2],
out_bam=out_bam))
# T3: pair and sort the alignments
pair_subdir = op.join(pair_dir, sub_dir)
sorted_pairs = op.join(pair_subdir, f"{id}.sorted.pairs.gz")
if not op.exists(pair_subdir):
os.makedirs(pair_subdir)
T3 = gwf.target_from_template(f"parse_{id}",
pair_sort_alignments(chromsizes=chromsizes,
bam_merged=T2.outputs[0],
sorted_pairs=sorted_pairs))
# T4: Deduplicate the sorted pairs
# Dedup dir is the same as pair dir
# NB we are using the reduced chromsizes file (only 'real' chromosomes)
T4 = gwf.target_from_template(f"dedup_{id}",
dedup(sorted_pairs=T3.outputs[1],
chromsizes=filtered_chromsizes))
#pp(T4.outputs)
# T5: Create a .cool file from the deduplicated pairs
cool_subdir = op.join(cool_dir, sub_dir)
if not op.exists(cool_subdir):
os.makedirs(cool_subdir)
T4outpairs = [x for x in T4.outputs if x.endswith(".pairs.gz")]
if sub_dir not in T5out:
T5out[sub_dir] = []
for T4out in T4outpairs:
# Filter to be removed if we want to
# Only make a cool file of the 'nodups' pairs
if 'nodups' not in T4out:
continue
cool_name = f"{id}.{T4out.split('.')[1]}"
cool_file = os.path.join(cool_subdir, f"{cool_name}.10000.cool")
T5 = gwf.target_from_template(f"coolify_{cool_name}",
make_pairs_cool(filtered_chromsizes, pairs=T4out, cool_out=cool_file))
T5out[sub_dir].append(T5.outputs[0])
#pp(T5out)
for subdir,cool_list in T5out.items():
print(f'merging {subdir} coolers:')
T6 = gwf.target_from_template(f"merge_{subdir}",
merge_zoomify_coolers(cooler_list=cool_list,
merged=op.join(cool_dir, subdir, f"{subdir}.merged.cool"),
mcool=op.join(cool_dir, subdir, f"{subdir}.merged.mcool")))