master_workflow

Author

Søren Jørgensen

Published

November 13, 2025

from gwf import Workflow, AnonymousTarget
import os
import subprocess
import os.path as op
import pandas as pd
from cooler.fileops import list_coolers
from pprintpp import pprint as pp

#######################################
#
# `gwf` workflow for the full pipeline from downloading raw reads 
# to compartmentalization analysis. We are using `bwa mem` for mapping. 
# 
# How to run:
# conda activate hic
# gwf -f master_workflow.py status
# 
# Workflow:
#   1a  bwa_index        : Index the reference genome with `bwa index`
#   1b  sam_index        : Index the fasta again with `samtools faidx`
#   1c  download_reads   : Download raw reads from SRA
#                         `sra-downloader` with SRR IDs (paired-end reads)
#   2   bwa_map          : Map reads (PE) to the reference with `bwa mem`
#   3   pair_sort        : Pair the merged alignments from mate-pair sequencing with `pairtools parse`,
#                         then sort with `pairtools sort`
#   4   dedup            : Deduplicate the sorted pairs with `pairtools dedup`
#   5   make_pairs_cool  : Create a .cool file from the deduplicated pairs with `cooler cload pairs`
#
#######################################

# Create a workflow object
gwf = Workflow(defaults={'nodes': 1, 'queue':"normal", 'account':"hic-spermatogenesis"})

#############################################
############### Templates ###################
#############################################

def download_reference(ref_dir, name, url):
    """Download the reference genome"""
    inputs = []
    outputs = [op.join(ref_dir, f"{name}.chrom.sizes"),
               op.join(ref_dir, f"{name}.fa.gz"),
               op.join(ref_dir, f"{name}.fa"),
               op.join(ref_dir, f"{name}.filtered.chrom.sizes"),
               op.join(ref_dir, "download_reference.log")]
    options = {'cores':1, 'memory':"1g", 'walltime':"00:30:00"}
    spec = f"""
wget -P {ref_dir} --timestamping '{url}README.txt' && \
wget -P {ref_dir} --timestamping '{url}{name}.fa.gz' && \
wget -P {ref_dir} --timestamping '{url}{name}.chrom.sizes' && \
gunzip -k {outputs[1]} && \
grep -v -E '[_]|Un' {outputs[0]} > {outputs[3]} && \
ls -lh {ref_dir} > {outputs[4]}
"""
    return AnonymousTarget(inputs=inputs, outputs=outputs, options=options, spec=spec)


def download_reads(sra_downloader, srr_id, read_dir):
    """Download raw reads from SRA"""
    inputs  = [sra_downloader]
    outputs = [f"{read_dir}/{srr_id}_1.fastq.gz",
               f"{read_dir}/{srr_id}_2.fastq.gz",
               f"{read_dir}/dl_{srr_id}.done"]
    options = {'cores':16, 'memory': "4g", 'walltime':"06:00:00"}
    spec = f"""
singularity run {sra_downloader} {srr_id} --save-dir {read_dir} --cores 16 && \
touch {read_dir}/dl_{srr_id}.done
"""
    return AnonymousTarget(inputs=inputs, outputs=outputs, options=options, spec=spec)

def bwa_index(ref_genome):
    """Template for indexing the reference genome with bwa"""
    inputs  = [ref_genome]
    outputs = [f"{ref_genome}.amb", 
               f"{ref_genome}.ann", 
               f"{ref_genome}.bwt", 
               f"{ref_genome}.pac", 
               f"{ref_genome}.sa"]
    options = {'cores':1, 'memory': "8g", 'walltime':"02:00:00"}
    spec = f'''
pixi run bwa index -p {ref_genome} -a bwtsw {ref_genome}
'''
    return AnonymousTarget(inputs=inputs, outputs=outputs, options=options, spec=spec)

def sam_index(ref_genome):
    """Creating a fasta index. `bwa mem` also needs a fasta index generated by samtools"""
    inputs = [ref_genome]
    outputs = [f"{ref_genome}.fai"]
    options = {'cores':1, 'memory':"8g", 'walltime':"03:00:00"}
    spec=f"""
pixi run samtools faidx {ref_genome}
"""
    return AnonymousTarget(inputs=inputs, outputs=outputs, options=options, spec=spec)

def bwa_map(ref_genome, mate_1, mate_2, download_ok, out_bam):
    """
    Template for mapping reads to a reference genome using `bwa` and `samtools`. 
    NB! Here, we map the mates together, as bwa states it is no problem for Hi-C reads. 
    """
    threads = 32
    inputs = [f"{ref_genome}.amb", 
              f"{ref_genome}.ann", 
              f"{ref_genome}.bwt", 
              f"{ref_genome}.pac", 
              f"{ref_genome}.sa", 
              f"{ref_genome}.fai",
              mate_1, mate_2,
              download_ok]
    outputs = [out_bam]
    options = {'cores':threads, 'memory': "32g", 'walltime':"12:00:00"}
    spec = f"""
pixi run bwa mem -t {threads} -SP {ref_genome} {mate_1} {mate_2} > {out_bam}
"""
    return AnonymousTarget(inputs=inputs, outputs=outputs, options=options, spec=spec)

# Updated version uses --walks-policy 5unique in stead of mask
def pair_sort_alignments(chromsizes, bam_merged, sorted_pairs):
    """Convert the paired-end alignments to .pairs with `pairtools parse`, then sort"""
    inputs = [bam_merged]
    outputs = [f"{bam_merged}_parsed.stats", 
               sorted_pairs]
    options = {'cores':32, 'memory':"16g", 'walltime':"06:00:00"}
    spec=f"""
pixi run pairtools parse \
    -c {chromsizes} \
    --drop-sam --drop-seq \
    --output-stats {bam_merged}_parsed.stats \
    --add-columns mapq \
    --assembly rheMac10 \
    --walks-policy 5unique \
    {bam_merged} | \
pairtools sort -o {sorted_pairs} 
"""
    return AnonymousTarget(inputs=inputs, outputs=outputs, options=options, spec=spec)

def dedup(sorted_pairs, chromsizes):
    """Deduplicate the sorted pairs with `pairtools dedup`"""
    pairs_prefix = sorted_pairs.split(".sorted")[0]
    inputs = [sorted_pairs]
    outputs = [f"{pairs_prefix}.nodups.pairs.gz",
               f"{pairs_prefix}.unmapped.pairs.gz",
               f"{pairs_prefix}.dups.pairs.gz",
               f"{pairs_prefix}.dedup.stats",
               f"{pairs_prefix}.dedup.done"]
    options = {'cores':12, 'memory': "1g", 'walltime': "01:30:00"}
    spec = f"""
pixi run pairtools dedup \
    --max-mismatch 3 \
    --mark-dups \
    --chunksize 100000 \
    --chrom-subset {chromsizes} \
    --output {pairs_prefix}.nodups.pairs.gz \
    --output-unmapped {pairs_prefix}.unmapped.pairs.gz \
    --output-stats {pairs_prefix}.dedup.stats \
    --output-dups {pairs_prefix}.dups.pairs.gz \
    {sorted_pairs} && \
touch {pairs_prefix}.dedup.done
"""
    return AnonymousTarget(inputs=inputs, outputs=outputs, options=options, spec=spec)

# Updated version uses pairtools select to filter out low quality reads
def make_pairs_cool(chromsizes, pairs, cool_out):
    """Create coolers from pairs with `cooler cload pairs`"""
    base = os.path.basename(pairs).split(".pairs.gz")[0]
    inputs = [pairs]
    outputs = [cool_out]
    options = {'cores':1, 'memory':"1g", 'walltime':"01:30:00"}
    spec = f"""
pixi run pairtools select "(mapq1>=30) and (mapq2>=30)" {pairs} | \
cooler cload pairs \
    -c1 2 -p1 3 -c2 4 -p2 5 \
    --assembly rheMac10 \
    --chunksize 50000 \
    {chromsizes}:10000 \
    - \
    {cool_out}
"""
    return AnonymousTarget(inputs=inputs, outputs=outputs, options=options, spec=spec)
    
def merge_zoomify_balance(cooler_list, merged, mcool):
    """Merge a given list of coolers with `cooler merge`"""
    cooler_list_unix_str = " ".join(cooler_list)
    inputs = [cooler_list]
    outputs = [merged, mcool]
    options = {'cores':16, 'memory':"32g", 'walltime':"02:00:00"}
    spec = f"""
pixi run cooler merge -c 50000000 {merged} {cooler_list_unix_str} && \
cooler zoomify --nproc 16 \
    --resolutions 10000,50000,100000,500000 \
    --balance \
    --balance-args '--nproc 16' \
    -o {mcool} \
    {merged}
"""
    return AnonymousTarget(inputs=inputs, outputs=outputs, options=options, spec=spec)

# DRAFT: Not implemented as a target yet
def balance_cooler_default(mcool):
    """Balance all collers inside an mcool with `cooler balance`"""
    #path = op.dirname(mcool)
    inputs = [mcool]
    outputs = ["balanced_mcool.done"]
    options = {'cores':8, 'memory':"16g", 'walltime':"01:00:00"}
    spec = f"""
pixi run bash -c 'for COOL in $(cooler ls {mcool}); do cooler balance -p 8 $COOL && echo "Balanced $COOL"; done'
touch {outputs[0]}
"""
    return AnonymousTarget(inputs=inputs, outputs=outputs, options=options, spec=spec)

# DRAFT: Not implemented as a target yet
def balance_cooler_cis(cool_in, cool_out):
    """Balance a cooler with `cooler balance`"""
    mcool = cool_in.split("::")[0]
    inputs = [mcool]
    outputs = [cool_out]
    options = {'cores':8, 'memory':"32g", 'walltime':"03:00:00"}
    spec = f"""
pixi run cooler balance -p 32 --cis-only --name cis_weights {cool_in} && \
touch {cool_out}
"""
    return AnonymousTarget(inputs=inputs, outputs=outputs, options=options, spec=spec)


################################################
# Set up the folder structure for the workflow #
################################################   
 
 # Define our working dir
main_dir = op.dirname(__file__)

# Define reference genome directory (relative to project/script dir)
ref_name = "rheMac10"
ref_dir = f"steps/macaque_raw/{ref_name}_ucsc/"
ref_link = "https://hgdownload.soe.ucsc.edu/goldenPath/rheMac10/bigZips/"
os.makedirs(ref_dir, exist_ok=True)


# Define the reads directory (download reads here)
reads_dir = op.join(main_dir, "steps/macaque_raw/downloaded/")

# Check to find SRA-downloader, else pull from docker
sra_downloader = "steps/sra-downloader.sif"

if not op.exists(sra_downloader):
    command = ["singularity", "pull", sra_downloader, "docker://wwydmanski/sra-downloader"]
    try:
        result = subprocess.run(command, check=True, capture_output=True, text=True)
        print(result.stdout)
        print(result.stderr)
    except subprocess.CalledProcessError as e:
        print(e.stdout) 
        print(e.stderr)


# Define subdirs for tissue type, strip_whitespace and make lowercase
sra_runtable = pd.read_csv("data/SraRunTable.txt")
reads_subdirs = set(sra_runtable["source_name"])

# Make a dict mapping the tissue type ['source_name'] to the SRR IDs ['Run']
tissue_dict = {tissue: sra_runtable[sra_runtable["source_name"]==tissue]["Run"].tolist() for tissue in reads_subdirs}

# # Define the output dirs for files
# bam_dir = op.join(main_dir,"steps/bwa/PE/bamfiles")
# pair_dir = op.join(main_dir, "steps/bwa/PE/pairs")
# cool_dir = op.join(main_dir, "steps/bwa/PE/cool")


#### Updated version creates the new .pairs and .cool files under 'recPE' for recommended PE. --walks-policy 5unique, filter mapq >= 30
rec_bam_dir = op.join(main_dir,"steps/bwa/recPE/bamfiles")
rec_pair_dir = op.join(main_dir, "steps/bwa/recPE/pairs")
rec_cool_dir = op.join(main_dir, "steps/bwa/recPE/cool")
pair_dir = rec_pair_dir
cool_dir = rec_cool_dir

#############################################
############### Targets #####################
#############################################

# Download the reference genome
T0 = gwf.target_from_template("download_reference", download_reference(ref_dir, ref_name, ref_link))
chromsizes = T0.outputs[0]
ref_genome = T0.outputs[2]
filtered_chromsizes = T0.outputs[3]


# Index the reference genome
T1a = gwf.target_from_template(f"bwa_index_{os.path.basename(ref_genome).split('.')[0]}", 
                               bwa_index(ref_genome=ref_genome))
T1b = gwf.target_from_template(f"sam_index_{os.path.basename(ref_genome).split('.')[0]}", 
                               sam_index(ref_genome=ref_genome))


# T5out is a dict to store the cool files for each tissue type
# to be merged in the end
T5out = {}

# T1c target for each ID in the SRA runtable
for id in sra_runtable["Run"]:
    
    # Do some stuff before downloading the reads
    # Get the tissue type for the ID, and assign a subdirectory for the reads
    sub_dir = None
    for cell_type, ids in tissue_dict.items():
        if id in ids:
            sub_dir = cell_type.lower().replace(" ", "_")
            break
    if sub_dir is None:
        raise ValueError(f"ID {id} not found in any tissue type")

    fastq_dir = op.join(reads_dir, sub_dir)

    # Create subdirs if they don't exist
    if not op.exists(fastq_dir):
        os.makedirs(fastq_dir)

    # T1c: Download SRA ID
    T1c = gwf.target_from_template(f"dl_{id}",
                                  download_reads(
                                        sra_downloader = sra_downloader,
                                        srr_id = id,
                                        read_dir = fastq_dir
                                    ))
    
    # T2: Map the reads to the reference genome
    bam_wdir = op.join(rec_bam_dir, sub_dir)
    out_bam = op.join(bam_wdir, f"{id}.bam")

    if not op.exists(bam_wdir):
        os.makedirs(bam_wdir)

    T2 = gwf.target_from_template(f"bwa_map_{id}", 
                                  bwa_map(ref_genome=ref_genome, 
                                          mate_1=T1c.outputs[0], mate_2=T1c.outputs[1], 
                                          download_ok=T1c.outputs[2],
                                          out_bam=out_bam))
    
    # T3: pair and sort the alignments
    pair_subdir = op.join(pair_dir, sub_dir)
    sorted_pairs = op.join(pair_subdir, f"{id}.sorted.pairs.gz")

    if not op.exists(pair_subdir):
        os.makedirs(pair_subdir)

    T3 = gwf.target_from_template(f"parse_{id}",
                                    pair_sort_alignments(chromsizes=chromsizes, 
                                                         bam_merged=T2.outputs[0], 
                                                         sorted_pairs=sorted_pairs))
    
    # T4: Deduplicate the sorted pairs
    # Dedup dir is the same as pair dir
    # NB we are using the reduced chromsizes file (only 'real' chromosomes)
    T4 = gwf.target_from_template(f"dedup_{id}",
                                    dedup(sorted_pairs=T3.outputs[1],
                                            chromsizes=filtered_chromsizes))
    #pp(T4.outputs)

    # T5: Create a .cool file from the deduplicated pairs
    cool_subdir = op.join(cool_dir, sub_dir)

    if not op.exists(cool_subdir):
        os.makedirs(cool_subdir)

    T4outpairs = [x for x in T4.outputs if x.endswith(".pairs.gz")]
    if sub_dir not in T5out:
        T5out[sub_dir] = []

    for T4out in T4outpairs:
        # Filter to be removed if we want to
        # Only make a cool file of the 'nodups' pairs
        if 'nodups' not in T4out:
            continue

        cool_name = f"{id}.{T4out.split('.')[1]}"
        cool_file = os.path.join(cool_subdir, f"{cool_name}.10000.cool")

        T5 = gwf.target_from_template(f"coolify_{cool_name}",
                                        make_pairs_cool(filtered_chromsizes, pairs=T4out, cool_out=cool_file))
        T5out[sub_dir].append(T5.outputs[0])
        

# Merge the coolers for each tissue type (group)
T6out = {}
for subdir,cool_list in T5out.items():
    T6out[subdir] = gwf.target_from_template(f"merge_zoom_balance_{subdir}",
                                    merge_zoomify_balance(cooler_list=cool_list, 
                                                          merged=op.join(cool_dir, subdir, f"{subdir}.merged.cool"), 
                                                          mcool=op.join(cool_dir, subdir, f"{subdir}.merged.mcool")))
#pp(T6out[subdir].inputs[0])