master_workflow

Author

Søren Jørgensen

Published

January 30, 2025

from gwf import *
import os
import os.path as op
import pandas as pd
from pprintpp import pprint as pp

#######################################
#
# `gwf` workflow for the full pipeline from downloading raw reads 
# to compartmentalization analysis. We are using `bwa mem` for mapping. 
# 
# How to run:
# conda activate hic
# gwf -f master_workflow.py status
# 
# Workflow:
#   1a  bwa_index        : Index the reference genome with `bwa index`
#   1b  sam_index        : Index the fasta again with `samtools faidx`
#   1c  download_reads   : Download raw reads from SRA
#                         `sra-downloader` with SRR IDs (paired-end reads)
#   2   bwa_map          : Map reads (PE) to the reference with `bwa mem`
#   3   pair_sort        : Pair the merged alignments from mate-pair sequencing with `pairtools parse`,
#                         then sort with `pairtools sort`
#   4   dedup            : Deduplicate the sorted pairs with `pairtools dedup`
#   5   make_pairs_cool  : Create a .cool file from the deduplicated pairs with `cooler cload pairs`
#
#######################################

# Create a workflow object
gwf = Workflow(defaults={'nodes': 1, 'queue':"normal", 'account':"hic-spermatogenesis"})

#############################################
############### Templates ###################
#############################################

def download_reads(sra_downloader, srr_id, read_dir):
    """Download raw reads from SRA"""
    inputs  = [sra_downloader]
    outputs = [f"{read_dir}/{srr_id}_1.fastq.gz",
               f"{read_dir}/{srr_id}_2.fastq.gz",
               f"{read_dir}/dl_{srr_id}.done"]
    options = {'cores':32, 'memory': "4g", 'walltime':"06:00:00"}
    spec = f"""
singularity run {sra_downloader} {srr_id} --save-dir {read_dir} --cores 16 && \
touch {read_dir}/dl_{srr_id}.done
"""
    return AnonymousTarget(inputs=inputs, outputs=outputs, options=options, spec=spec)

def bwa_index(ref_genome):
    """Template for indexing the reference genome with bwa"""
    inputs  = [ref_genome]
    outputs = [f"{ref_genome}.amb", 
               f"{ref_genome}.ann", 
               f"{ref_genome}.bwt", 
               f"{ref_genome}.pac", 
               f"{ref_genome}.sa"]
    options = {'cores':1, 'memory': "5g", 'walltime':"02:00:00"}
    spec = f'''
source $(conda info --base)/etc/profile.d/conda.sh
conda activate hic
bwa index -p {ref_genome} -a bwtsw {ref_genome}
'''
    return AnonymousTarget(inputs=inputs, outputs=outputs, options=options, spec=spec)

def sam_index(ref_genome):
    """Creating a fasta index. `bwa mem` also needs a fasta index generated by samtools"""
    inputs = [ref_genome]
    outputs = [f"{ref_genome}.fai"]
    options = {'cores':1, 'memory':"5g", 'walltime':"00:20:00"}
    spec=f"""
source $(conda info --base)/etc/profile.d/conda.sh
conda activate hic
samtools faidx {ref_genome}
"""
    return AnonymousTarget(inputs=inputs, outputs=outputs, options=options, spec=spec)

def bwa_map(ref_genome, mate_1, mate_2, download_ok, out_bam):
    """
    Template for mapping reads to a reference genome using `bwa` and `samtools`. 
    NB! Here, we map the mates together, as bwa states it is no problem for Hi-C reads. 
    """
    threads = 32
    inputs = [f"{ref_genome}.amb", 
              f"{ref_genome}.ann", 
              f"{ref_genome}.bwt", 
              f"{ref_genome}.pac", 
              f"{ref_genome}.sa", 
              f"{ref_genome}.fai",
              mate_1, mate_2,
              download_ok]
    outputs = [out_bam]
    options = {'cores':threads, 'memory': "32g", 'walltime':"24:00:00"}
    spec = f"""
source $(conda info --base)/etc/profile.d/conda.sh
conda activate hic
bwa mem -t {threads} -SP {ref_genome} {mate_1} {mate_2} > {out_bam}
"""
    return AnonymousTarget(inputs=inputs, outputs=outputs, options=options, spec=spec)

# Updated version uses --walks-policy 5unique in stead of mask
def pair_sort_alignments(chromsizes, bam_merged, sorted_pairs):
    """Convert the paired-end alignments to .pairs with `pairtools parse`, then sort"""
    inputs = [bam_merged]
    outputs = [f"{bam_merged}_parsed.stats", 
               sorted_pairs]
    options = {'cores':32, 'memory':"16g", 'walltime':"08:00:00"}
    spec=f"""
source $(conda info --base)/etc/profile.d/conda.sh
conda activate hic
pairtools parse \
    -c {chromsizes} \
    --drop-sam --drop-seq \
    --output-stats {bam_merged}_parsed.stats \
    --add-columns mapq \
    --assembly rheMac10 \
    --walks-policy 5unique \
    {bam_merged} | \
pairtools sort -o {sorted_pairs} 
"""
    return AnonymousTarget(inputs=inputs, outputs=outputs, options=options, spec=spec)

def dedup(sorted_pairs, chromsizes):
    """Deduplicate the sorted pairs with `pairtools dedup`"""
    pairs_prefix = sorted_pairs.split(".sorted")[0]
    inputs = [sorted_pairs]
    outputs = [f"{pairs_prefix}.nodups.pairs.gz",
               f"{pairs_prefix}.unmapped.pairs.gz",
               f"{pairs_prefix}.dups.pairs.gz",
               f"{pairs_prefix}.dedup.stats",
               f"{pairs_prefix}.dedup.done"]
    options = {'cores':12, 'memory': "16g", 'walltime': "06:00:00"}
    spec = f"""
source $(conda info --base)/etc/profile.d/conda.sh
conda activate hic
pairtools dedup \
    --max-mismatch 3 \
    --mark-dups \
    --chunksize 100000 \
    --chrom-subset {chromsizes} \
    --output {pairs_prefix}.nodups.pairs.gz \
    --output-unmapped {pairs_prefix}.unmapped.pairs.gz \
    --output-stats {pairs_prefix}.dedup.stats \
    --output-dups {pairs_prefix}.dups.pairs.gz \
    {sorted_pairs} && \
touch {pairs_prefix}.dedup.done
"""
    return AnonymousTarget(inputs=inputs, outputs=outputs, options=options, spec=spec)

# Updated version uses pairtools select to filter out low quality reads
def make_pairs_cool(chromsizes, pairs, cool_out):
    """Create coolers from pairs with `cooler cload pairs`"""
    base = os.path.basename(pairs).split(".pairs.gz")[0]
    inputs = [pairs]
    outputs = [cool_out]
    options = {'cores':1, 'memory':"4g", 'walltime':"03:00:00"}
    spec = f"""
source $(conda info --base)/etc/profile.d/conda.sh
conda activate hic
pairtools select "(mapq1>=30) and (mapq2>=30)" {pairs} | \
cooler cload pairs \
    -c1 2 -p1 3 -c2 4 -p2 5 \
    --assembly rheMac10 \
    --chunksize 50000 \
    {chromsizes}:10000 \
    - \
    {cool_out}
"""
    return AnonymousTarget(inputs=inputs, outputs=outputs, options=options, spec=spec)
    
# DRAFT: Not implemented as a target yet
def merge_zoomify_coolers(cooler_list, merged, mcool):
    """Merge a given list of coolers with `cooler merge`"""
    inputs = [cooler_list]
    outputs = [merged, mcool]
    options = {'cores':8, 'memory':"32g", 'walltime':"03:00:00"}
    spec = f"""
source $(conda info --base)/etc/profile.d/conda.sh
conda activate hic
cooler merge -c 50000000 {merged} {cooler_list} && \
cooler zoomify --nproc 8 \
    --resolutions 10000,50000,100000,500000 \
    -o {mcool} \
    {merged}
"""
    return AnonymousTarget(inputs=inputs, outputs=outputs, options=options, spec=spec)

# DRAFT: Not implemented as a target yet
def balance_cooler_default(cool_in, cool_out):
    """Balance a cooler with `cooler balance`"""
    mcool = cool_in.split("::")[0]
    inputs = [mcool]
    outputs = [cool_out]
    options = {'cores':8, 'memory':"32g", 'walltime':"03:00:00"}
    spec = f"""
source $(conda info --base)/etc/profile.d/conda.sh
conda activate hic
cooler balance -p 32 {cool_in} && \
touch {cool_out}
"""
    return AnonymousTarget(inputs=inputs, outputs=outputs, options=options, spec=spec)

# DRAFT: Not implemented as a target yet
def balance_cooler_cis(cool_in, cool_out):
    """Balance a cooler with `cooler balance`"""
    mcool = cool_in.split("::")[0]
    inputs = [mcool]
    outputs = [cool_out]
    options = {'cores':8, 'memory':"32g", 'walltime':"03:00:00"}
    spec = f"""
source $(conda info --base)/etc/profile.d/conda.sh
conda activate hic
cooler balance -p 32 --cis-only --name cis_weights {cool_in} && \
touch {cool_out}
"""
    return AnonymousTarget(inputs=inputs, outputs=outputs, options=options, spec=spec)



################################################
# Set up the folder structure for the workflow #
################################################   
 
 # Define our working dir
dirname = op.dirname(__file__)

# Define reference genome directory (relative to project/script dir)
ref_dir = "data/links/ucsc_ref/"

ref_genome = op.join(dirname, ref_dir, "rheMac10.fa")
chromsizes = op.join(dirname, ref_dir, "misc/rheMac10.chrom.sizes")
filtered_chromsizes = op.join(dirname, ref_dir, "misc/rheMac10.filtered.chrom.sizes")


# Locate the reference genome
if not op.exists(ref_genome):
    raise FileNotFoundError(f"Reference genome not found: {ref_genome}")
else:
    #print(f"Reference genome found at: \n{ref_genome}")
    pass

# Define the reads directory
reads_dir = op.join(dirname, "../../../data/macaque_raw/downloaded/")

# Define subdirs for tissue type, strip_whitespace and make lowercase
sra_runtable = pd.read_csv("data/SraRunTable.txt")
reads_subdirs = set(sra_runtable["source_name"])

# Make a dict mapping the tissue type ['source_name'] to the SRR IDs ['Run']
tissue_dict = {tissue: sra_runtable[sra_runtable["source_name"]==tissue]["Run"].tolist() for tissue in reads_subdirs}

# Define the output dirs for files
bam_dir = op.join(dirname,"steps/bwa/PE/bamfiles")
pair_dir = op.join(dirname, "steps/bwa/PE/pairs")
cool_dir = op.join(dirname, "steps/bwa/PE/cool")


#### Updated version creates the new .pairs and .cool files under 'recPE' for recommended PE. --walks-policy 5unique, filter mapq >= 30
rec_pair_dir = op.join(dirname, "steps/bwa/recPE/pairs")
rec_cool_dir = op.join(dirname, "steps/bwa/recPE/cool")
pair_dir = rec_pair_dir
cool_dir = rec_cool_dir

#############################################
############### Targets #####################
#############################################

T1a = gwf.target_from_template(f"bwa_index_{os.path.basename(ref_genome).split('.')[0]}", 
                               bwa_index(ref_genome=ref_genome))
T1b = gwf.target_from_template(f"sam_index_{os.path.basename(ref_genome).split('.')[0]}", 
                               sam_index(ref_genome=ref_genome))


T5out = {} # initialize a dict to store the cool files (T5)

# T1c target for each ID in the SRA runtable
for id in sra_runtable["Run"]:
    
    # Do some stuff before downloading the reads
    # Get the tissue type for the ID, and assign a subdirectory for the reads
    sub_dir = None
    for cell_type, ids in tissue_dict.items():
        if id in ids:
            sub_dir = cell_type.lower().replace(" ", "_")
            break
    if sub_dir is None:
        raise ValueError(f"ID {id} not found in any tissue type")

    fastq_dir = op.join(reads_dir, sub_dir)

    # Create subdirs if they don't exist
    if not op.exists(fastq_dir):
        os.makedirs(fastq_dir)

    # T1c: Download SRA ID
    T1c = gwf.target_from_template(f"dl_{id}",
                                  download_reads(
                                        sra_downloader = "../../../data/macaque_raw/sra-downloader_latest.sif",
                                        srr_id = id,
                                        read_dir = fastq_dir
                                    ))
    
    # T2: Map the reads to the reference genome
    bam_wdir = op.join(bam_dir, sub_dir)
    out_bam = op.join(bam_wdir, f"{id}.PE.bam")

    if not op.exists(bam_wdir):
        os.makedirs(bam_wdir)

    T2 = gwf.target_from_template(f"bwa_map_{id}", 
                                  bwa_map(ref_genome=ref_genome, 
                                          mate_1=T1c.outputs[0], mate_2=T1c.outputs[1], 
                                          download_ok=T1c.outputs[2],
                                          out_bam=out_bam))
    
    # T3: pair and sort the alignments
    pair_subdir = op.join(pair_dir, sub_dir)
    sorted_pairs = op.join(pair_subdir, f"{id}.sorted.pairs.gz")

    if not op.exists(pair_subdir):
        os.makedirs(pair_subdir)

    T3 = gwf.target_from_template(f"parse_{id}",
                                    pair_sort_alignments(chromsizes=chromsizes, 
                                                         bam_merged=T2.outputs[0], 
                                                         sorted_pairs=sorted_pairs))
    
    # T4: Deduplicate the sorted pairs
    # Dedup dir is the same as pair dir
    # NB we are using the reduced chromsizes file (only 'real' chromosomes)
    T4 = gwf.target_from_template(f"dedup_{id}",
                                    dedup(sorted_pairs=T3.outputs[1],
                                            chromsizes=filtered_chromsizes))
    #pp(T4.outputs)

    # T5: Create a .cool file from the deduplicated pairs
    cool_subdir = op.join(cool_dir, sub_dir)

    if not op.exists(cool_subdir):
        os.makedirs(cool_subdir)

    T4outpairs = [x for x in T4.outputs if x.endswith(".pairs.gz")]
    if sub_dir not in T5out:
        T5out[sub_dir] = []

    for T4out in T4outpairs:
        # Filter to be removed if we want to
        # Only make a cool file of the 'nodups' pairs
        if 'nodups' not in T4out:
            continue

        cool_name = f"{id}.{T4out.split('.')[1]}"
        cool_file = os.path.join(cool_subdir, f"{cool_name}.10000.cool")

        T5 = gwf.target_from_template(f"coolify_{cool_name}",
                                        make_pairs_cool(filtered_chromsizes, pairs=T4out, cool_out=cool_file))
        T5out[sub_dir].append(T5.outputs[0])
        
#pp(T5out)

for subdir,cool_list in T5out.items():
    print(f'merging {subdir} coolers:')
    T6 = gwf.target_from_template(f"merge_{subdir}",
                                    merge_zoomify_coolers(cooler_list=cool_list, 
                                                          merged=op.join(cool_dir, subdir, f"{subdir}.merged.cool"), 
                                                          mcool=op.join(cool_dir, subdir, f"{subdir}.merged.mcool")))