Мой рабочий процесс Snakemake требует навсегда, чтобы оценить DAG. Я утверждаю, что это связано с тем, что ему необходимо выполнить контрольную точку во время строительства. < /P>
Поэтому я подумал Наличие контрольной точки в качестве ввода в правило (хотя я думаю, не правило All ).
Однако я не совсем уверен, какой правильный способ структурировать мой Рабочий процесс здесь. Насколько я понимаю, чтобы запустить контрольную точку в правиле, мне нужно установить вывод этого правила, чтобы существовать во всех , однако мой выход - динамический набор файлов (таким образом, необходимость в контрольной точке). Я мог бы указать папку, но если я это сделаю, SnakeMake не узнает, как возобновить прерыванный рабочий процесс, когда папка частично заполнена (поскольку он не оценивает его содержимое). < /P>
Я также хочу обеспечить максимально возможное количество инкапсуляций правил, чтобы SnakeMake максимально параллелизировал. < /P>
Я ценю здесь любые советы. Спасибо. < /P>
###############################################################################
# SNAKEMAKE FILE
###############################################################################
# CONFIG AND GLOBALS
################################################################################
configfile: "config.yaml"
# config.yaml is expected to have the keys:
import os
import csv
import glob
from itertools import combinations
from snakemake.shell import shell
from pathlib import Path
# Ensure all required keys exist in config
required_keys = ["genome_dir", "barcode_map", "filter_threshold", "threads", "coverage", "output_dir", "bam_dir"]
missing_keys = [key for key in required_keys if key not in config]
if missing_keys:
raise KeyError(f"Missing required keys in config.yaml: {missing_keys}")
# Parse the barcode map for sample -> treatment
barcodes = []
treatments = []
with open(config["barcode_map"], "r") as f:
reader = csv.reader(f, delimiter="\t")
header = next(reader, None) # skip header if present
for row in reader:
if len(row) >= 2:
barcodes.append(row[0])
treatments.append(row[1])
SAMPLES = barcodes
TREATMENTS_BY_SAMPLE = dict(zip(barcodes, treatments))
ALL_TREATMENTS = sorted(list(set(treatments)))
# Gather all .fna references
GENOME_FILES = sorted(glob.glob(os.path.join(config["genome_dir"], "*.fna")))
GENOMES = [Path(f).stem for f in GENOME_FILES]
# Make them paths
config["genome_dir"] = Path(config["genome_dir"])
config["output_dir"] = Path(config["output_dir"])
config["bam_dir"] = Path(config["bam_dir"])
################################################################################
# HELPER FUNCTIONS
################################################################################
def is_metagenome(genome_name):
"""Decide if a genome is a 'metagenome' by name."""
return "metagenome" in genome_name.lower()
def get_genome_fasta(genome_name):
"""Return the full path of the .fna for a given genome name."""
return config["genome_dir"] / f"{genome_name}.fna"
def get_outdir(genome_name):
"""Return the output directory for a given genome."""
return config["output_dir"] / genome_name
def get_bam(sample, genome_name):
"""Return path to the BAM for a sample/genome."""
return config["bam_dir"] / genome_name / f"{sample}.bam"
def get_bed(sample, genome_name):
"""Return path to the raw (uncompressed) BED output of pileup."""
return get_outdir(genome_name) / f"{sample}.bed"
def get_bed_gz(sample, genome_name):
"""Return path to the compressed bed file."""
return get_bed(sample, genome_name).with_suffix(".bed.gz")
def get_summary(sample, genome_name):
"""Return path to the summary TSV for a given sample/genome."""
return get_outdir(genome_name) / f"summary_{sample}.tsv"
def get_sample_probs_dir(sample, genome_name):
"""Return path to the sample-probs directory."""
return get_outdir(genome_name) / f"sample_probs_{sample}"
def get_merged_bed(genome_name):
"""Return path to the uncompressed merged bed of all samples for a genome."""
return get_outdir(genome_name) / "all_samples.bed"
def get_merged_bed_gz(genome_name):
"""Return path to the compressed merged bed of all samples for a genome."""
return get_merged_bed(genome_name).with_suffix(".bed.gz")
def get_genome_sizes(genome_name):
"""Chrom sizes file for merging."""
return get_outdir(genome_name) / "sizes.tsv"
def get_motif_tsv(genome_name, contig=None):
"""Motif TSV output from modkit find-motifs for a single genome."""
if contig:
return get_motifs_dir(genome_name) / f"{config['coverage']}_{contig}_motifs.tsv"
elif not is_metagenome(genome_name):
return get_motifs_dir(genome_name) / f"{config['coverage']}_motifs.tsv"
else:
assert False, "Cannot get motif TSV for metagenome without contig"
def get_motifs_dir(genome_name):
"""Return the directory for all motifs in a single genome pipeline."""
return get_outdir(genome_name) / "motifs"
def get_motif_dir(genome_name, motif, contig=None):
"""Directory for a specific motif in a single genome pipeline."""
if contig:
return get_outdir(genome_name) / contig / motif
else:
return get_motifs_dir(genome_name) / motif
def get_location_bed(genome_name, motif, contig=None):
"""Return path to the motif location bed (contig or full)."""
return get_motif_dir(genome_name, motif, contig) / "location.bed"
def get_motif_bed(sample, genome_name, motif, contig=None):
"""Return path to the motif-based pileup bed for a sample."""
return get_motif_dir(genome_name, motif, contig) / f"{sample}.bed"
def get_dmr_dir(genome_name, motif, contig=None):
"""Return directory for DMR output."""
return get_motif_dir(genome_name, motif, contig) / "dmr"
def get_dmr_file(genome_name, motif, treatmentA, treatmentB, contig=None):
"""Return DMR bed file for a given pair of treatments."""
return get_dmr_dir(genome_name, motif, contig) / f"{treatmentA}_vs_{treatmentB}.bed"
def get_fai(genome_name):
"""Return .fai index path for a given genome."""
return get_genome_fasta(genome_name).with_suffix(".fna.fai")
def get_contigs(genome_name):
"""Return a list of contigs for a genome."""
fai_path = get_fai(genome_name)
contigs = []
with open(fai_path) as f:
for line in f:
fields = line.strip().split("\t")
if fields:
contigs.append(fields[0])
return contigs
def get_motif_paths(genome_name):
"""Return all motif files for a given genome."""
if is_metagenome(genome_name):
return [get_motif_tsv(genome_name, contig=contig) for contig in get_contigs(genome_name)]
else:
return [get_motif_tsv(genome_name)]
def unpack_motifs(genome_name, contig=None):
"""
Given the motif TSV from find_motifs, parse the lines to discover (mod_code, motif, modpos).
Return them for expansion.
"""
# Trigger the checkpoint
if contig:
chk = checkpoints.find_motifs_contig.get(genome_name=genome_name, contig=contig)
else:
chk = checkpoints.find_motifs_single.get(genome_name=genome_name)
tsv = get_motif_tsv(genome_name, contig=contig)
motifs = []
with open(tsv) as f:
reader = csv.reader(f, delimiter="\t")
next(reader, None) # skip header
for row in reader:
if len(row) >= 3:
mod_code, motif, modpos = row[0], row[1], row[2]
motifs.append((mod_code, motif, modpos))
return motifs
def get_location_beds(genome_name):
"""Returns paths of all motif location beds associated with this genome"""
res = []
if is_metagenome(genome_name):
for contig in get_contigs(genome_name):
for motif in unpack_motifs(genome_name, contig=contig):
res.append(get_location_bed(genome_name, motif[1], contig=contig))
else:
for motif in unpack_motifs(genome_name):
res.append(get_location_bed(genome_name, motif[1], contig=None))
return res
def get_motif_pileups(genome_name):
"""Returns paths of all motif pileups associated with this genome"""
res = []
if is_metagenome(genome_name):
for contig in get_contigs(genome_name):
for motif in unpack_motifs(genome_name, contig=contig):
for sample in SAMPLES:
res.append(get_motif_bed(sample, genome_name, motif[1], contig=contig))
else:
for motif in unpack_motifs(genome_name):
for sample in SAMPLES:
res.append(get_motif_bed(sample, genome_name, motif[1], contig=None))
return res
def get_dmr_paths(genome_name):
"""
For each pair of treatments, produce the DMR file name as a final target.
"""
# All pairwise combos of treatments
dmr_files = []
for (A, B) in combinations(ALL_TREATMENTS, 2):
if is_metagenome(genome_name):
for contig in get_contigs(genome_name):
for motif in unpack_motifs(genome_name, contig=contig):
dmr_files.append(get_dmr_file(genome_name, motif[1], A, B, contig=contig))
else:
for motif in unpack_motifs(genome_name):
dmr_files.append(get_dmr_file(genome_name, motif[1], A, B, contig=None))
return dmr_files
################################################################################
# GENOME INDEX
################################################################################
rule index_genome:
"""
Create FASTA index if it doesn't exist.
"""
input:
fasta = lambda wc: get_genome_fasta(wc.genome_name)
output:
config["genome_dir"] / "{genome_name}.fna.fai"
threads: config["threads"]
shell:
"samtools faidx {input.fasta}"
################################################################################
# RULES FOR PILEUP, SUMMARY, SAMPLE-PROBS, COMPRESS
################################################################################
rule modkit_pileup:
"""
Generate the BED file from each sample's BAM using modkit pileup.
"""
input:
bam = lambda wc: get_bam(wc.sample, wc.genome_name),
fai = lambda wc: get_fai(wc.genome_name)
output:
bed = config["output_dir"] / "{genome_name}" / "{sample}.bed"
threads: config["threads"]
params:
filter_threshold = config["filter_threshold"]
shell:
...rule code....
rule modkit_summary:
"""
Generate a summary TSV for each sample.
"""
input:
bam = lambda wc: get_bam(wc.sample, wc.genome_name),
fai = lambda wc: get_fai(wc.genome_name)
output:
tsv = config["output_dir"] / "{genome_name}" / "summary_{sample}.tsv"
threads: config["threads"]
params:
filter_threshold = config["filter_threshold"]
shell:
...rule code....
rule modkit_sample_probs:
"""
Generate sample-probs directory for each sample.
"""
input:
bam = lambda wc: get_bam(wc.sample, wc.genome_name),
fai = lambda wc: get_fai(wc.genome_name)
output:
directory = directory(config["output_dir"] / "{genome_name}" / "sample_probs_{sample}")
threads: config["threads"]
shell:
...rule code....
rule compress_and_index_bed:
"""
Compress (.gz) and tabix-index the raw bed files.
"""
input:
bed = lambda wc: get_bed(wc.sample, wc.genome_name)
output:
gz = config["output_dir"] / "{genome_name}" / "{sample}.bed.gz",
tbi = config["output_dir"] / "{genome_name}" / "{sample}.bed.gz.tbi"
threads: config["threads"]
shell:
...rule code....
################################################################################
# MERGE BED FILES
################################################################################
rule compute_genome_sizes:
"""
Compute chromosome sizes from the FASTA for merging coverage.
"""
input:
fasta = lambda wc: get_genome_fasta(wc.genome_name),
fai = lambda wc: get_fai(wc.genome_name)
output:
sizes = config["output_dir"] / "{genome_name}" / "sizes.tsv"
run:
...rule code....
rule merge_beds:
"""
Merge all sample bed.gz files into a single all_samples.bed for each genome.
"""
input:
sizes = lambda wc: get_genome_sizes(wc.genome_name),
bedgz = lambda wc: [get_bed_gz(s, wc.genome_name) for s in SAMPLES],
output:
merged = config["output_dir"] / "{genome_name}" / "all_samples.bed",
merged_gz = config["output_dir"] / "{genome_name}" / "all_samples.bed.gz",
merged_tbi = config["output_dir"] / "{genome_name}" / "all_samples.bed.gz.tbi"
threads: config["threads"]
shell:
...rule code....
################################################################################
# FIND MOTIFS (CHECKPOINT)
################################################################################
checkpoint find_motifs_contig:
input:
merged_gz = lambda wc: get_merged_bed_gz(wc.genome_name),
fasta = lambda wc: get_genome_fasta(wc.genome_name),
output:
out_file = config["output_dir"] / "{genome_name}" / "motifs" / f"{config['coverage']}_{{contig}}_motifs.tsv"
threads: config["threads"]
params:
coverage = config["coverage"]
run:
...rule code....
checkpoint find_motifs_single:
input:
merged_gz = lambda wc: get_merged_bed_gz(wc.genome_name),
fasta = lambda wc: get_genome_fasta(wc.genome_name),
output:
out_file = config["output_dir"] / "{genome_name}" / "motifs" / f"{config['coverage']}_motifs.tsv"
threads: config["threads"]
params:
coverage = config["coverage"]
run:
...rule code....
################################################################################
# CREATE MOTIF LOCATION.BED
################################################################################
rule create_motif_location_bed_single:
"""
Create location.bed for a given motif (standard genomes).
"""
input:
fasta = lambda wc: get_genome_fasta(wc.genome_name),
output:
bed = config["output_dir"] / "{genome_name}" / "{motif}" / "location.bed"
threads: config["threads"]
params:
motif = lambda wc: wc.motif,
modpos = lambda wc: get_motif_mod_pos(wc.genome_name, params.motif)
run:
...rule code....
rule create_motif_location_bed_contig:
"""
Create location.bed for a given motif in a metagenome (for a specific contig).
"""
input:
fasta = lambda wc: get_genome_fasta(wc.genome_name),
output:
bed = config["output_dir"] / "{genome_name}" / "{contig}" / "{motif}" / "location.bed"
threads: config["threads"]
params:
motif = lambda wc: wc.motif,
modpos = lambda wc: get_motif_mod_pos(wc.genome_name, params.motif, contig=wc.contig)
run:
...rule code....
################################################################################
# MOTIF PILEUP PER SAMPLE
################################################################################
rule motif_pileup_single:
"""
For each motif, run modkit pileup with --motif for each sample,
optionally restricted to a specific contig if this is a metagenome.
"""
input:
bam = lambda wc: get_bam(wc.sample, wc.genome_name),
fasta = lambda wc: get_genome_fasta(wc.genome_name),
loc = lambda wc: get_location_bed(wc.genome_name, wc.motif, contig=None)
output:
bed = config["output_dir"] / "{genome_name}" / "{motif}" / "{sample}.bed"
threads: config["threads"]
params:
motif = lambda wc: wc.motif,
modpos = lambda wc: get_motif_mod_pos(wc.genome_name, params.motif, contig=None),
filter_threshold = config["filter_threshold"]
shell:
...rule code....
rule motif_pileup_contig:
"""
For each motif, run modkit pileup with --motif for each sample,
optionally restricted to a specific contig if this is a metagenome.
"""
input:
bam = lambda wc: get_bam(wc.sample, wc.genome_name),
fasta = lambda wc: get_genome_fasta(wc.genome_name),
loc = lambda wc: get_location_bed(wc.genome_name, wc.motif, contig=wc.contig)
output:
bed = config["output_dir"] / "{genome_name}" / "{contig}" / "{motif}" / "{sample}.bed"
threads: config["threads"]
params:
motif = lambda wc: wc.motif,
modpos = lambda wc: get_motif_mod_pos(wc.genome_name, params.motif, contig=wc.contig),
filter_threshold = config["filter_threshold"]
shell:
...rule code....
################################################################################
# DMR COMPARISONS
################################################################################
rule dmr_pair_single:
"""
Compare two treatments for DMR. Then filter results with bedtools intersect.
"""
input:
fasta = lambda wc: get_genome_fasta(wc.genome_name),
loc = lambda wc: get_location_bed(wc.genome_name, wc.motif, contig=None)
# We gather sample bed gz from the top-level pipeline.
# But for the modkit dmr pair, we want the compressed all-sample bed from each treatment.
# Actually, we do pairwise combining of each sample bed in each treatment.
# We can gather them on the fly or inputfunction.
output:
dmr = config["output_dir"] / "{genome_name}" / "{motif}" / "dmr" / "{treatmentA}_vs_{treatmentB}.bed"
threads: config["threads"]
params:
coverage = config["coverage"]
run:
...rule code....
rule dmr_pair_contig:
"""
Compare two treatments for DMR. Then filter results with bedtools intersect.
"""
input:
fasta = lambda wc: get_genome_fasta(wc.genome_name),
loc = lambda wc: get_location_bed(wc.genome_name, wc.motif, contig=wc.contig)
output:
dmr = config["output_dir"] / "{genome_name}" / "{contig}" / "{motif}" / "dmr" / "{treatmentA}_vs_{treatmentB}.bed"
threads: config["threads"]
params:
coverage = config["coverage"]
run:
...rule code....
################################################################################
# RULE: ALL
################################################################################
rule all:
"""
Final target: we want everything done for all genomes, all motifs, all pairs, etc.
Using Snakemake's checkpoint and dynamic expansions, our final 'all' rule will be
the presence of all possible DMR files. But we'll rely on the checkpoint logic
to discover motifs.
"""
input:
[get_fai(g) for g in GENOMES],
[get_summary(s, g) for s in SAMPLES for g in GENOMES],
[get_sample_probs_dir(s, g) for s in SAMPLES for g in GENOMES],
[get_merged_bed_gz(g) for g in GENOMES],
# These could be collapsed to just get_dmr_paths, but provising the rest is more efficient
[path for genome in GENOMES for path in get_motif_paths(genome)],
# lambda wc: [get_location_beds(g) for g in GENOMES],
# lambda wc: [get_motif_pileups(g) for g in GENOMES],
lambda wc: [get_dmr_paths(g) for g in GENOMES]
default_target: True
ruleorder: index_genome > modkit_pileup > modkit_summary > modkit_sample_probs > compress_and_index_bed > compute_genome_sizes > merge_beds > find_motifs_single > create_motif_location_bed_single > motif_pileup_single > dmr_pair_single > find_motifs_contig > create_motif_location_bed_contig > motif_pileup_contig > dmr_pair_contig
Подробнее здесь: https://stackoverflow.com/questions/794 ... -snakemake
Правильный способ структурирования контрольной точки в SnakeMake ⇐ Python
Программы на Python
1739523923
Anonymous
Мой рабочий процесс Snakemake требует навсегда, чтобы оценить DAG. Я утверждаю, что это связано с тем, что ему необходимо выполнить контрольную точку во время строительства. < /P>
Поэтому я подумал Наличие контрольной точки в качестве ввода в правило (хотя я думаю, не правило All ).
Однако я не совсем уверен, какой правильный способ структурировать мой Рабочий процесс здесь. Насколько я понимаю, чтобы запустить контрольную точку в правиле, мне нужно установить вывод этого правила, чтобы существовать во всех , однако мой выход - динамический набор файлов (таким образом, необходимость в контрольной точке). Я мог бы указать папку, но если я это сделаю, SnakeMake не узнает, как возобновить прерыванный рабочий процесс, когда папка частично заполнена (поскольку он не оценивает его содержимое). < /P>
Я также хочу обеспечить максимально возможное количество инкапсуляций правил, чтобы SnakeMake максимально параллелизировал. < /P>
Я ценю здесь любые советы. Спасибо. < /P>
###############################################################################
# SNAKEMAKE FILE
###############################################################################
# CONFIG AND GLOBALS
################################################################################
configfile: "config.yaml"
# config.yaml is expected to have the keys:
import os
import csv
import glob
from itertools import combinations
from snakemake.shell import shell
from pathlib import Path
# Ensure all required keys exist in config
required_keys = ["genome_dir", "barcode_map", "filter_threshold", "threads", "coverage", "output_dir", "bam_dir"]
missing_keys = [key for key in required_keys if key not in config]
if missing_keys:
raise KeyError(f"Missing required keys in config.yaml: {missing_keys}")
# Parse the barcode map for sample -> treatment
barcodes = []
treatments = []
with open(config["barcode_map"], "r") as f:
reader = csv.reader(f, delimiter="\t")
header = next(reader, None) # skip header if present
for row in reader:
if len(row) >= 2:
barcodes.append(row[0])
treatments.append(row[1])
SAMPLES = barcodes
TREATMENTS_BY_SAMPLE = dict(zip(barcodes, treatments))
ALL_TREATMENTS = sorted(list(set(treatments)))
# Gather all .fna references
GENOME_FILES = sorted(glob.glob(os.path.join(config["genome_dir"], "*.fna")))
GENOMES = [Path(f).stem for f in GENOME_FILES]
# Make them paths
config["genome_dir"] = Path(config["genome_dir"])
config["output_dir"] = Path(config["output_dir"])
config["bam_dir"] = Path(config["bam_dir"])
################################################################################
# HELPER FUNCTIONS
################################################################################
def is_metagenome(genome_name):
"""Decide if a genome is a 'metagenome' by name."""
return "metagenome" in genome_name.lower()
def get_genome_fasta(genome_name):
"""Return the full path of the .fna for a given genome name."""
return config["genome_dir"] / f"{genome_name}.fna"
def get_outdir(genome_name):
"""Return the output directory for a given genome."""
return config["output_dir"] / genome_name
def get_bam(sample, genome_name):
"""Return path to the BAM for a sample/genome."""
return config["bam_dir"] / genome_name / f"{sample}.bam"
def get_bed(sample, genome_name):
"""Return path to the raw (uncompressed) BED output of pileup."""
return get_outdir(genome_name) / f"{sample}.bed"
def get_bed_gz(sample, genome_name):
"""Return path to the compressed bed file."""
return get_bed(sample, genome_name).with_suffix(".bed.gz")
def get_summary(sample, genome_name):
"""Return path to the summary TSV for a given sample/genome."""
return get_outdir(genome_name) / f"summary_{sample}.tsv"
def get_sample_probs_dir(sample, genome_name):
"""Return path to the sample-probs directory."""
return get_outdir(genome_name) / f"sample_probs_{sample}"
def get_merged_bed(genome_name):
"""Return path to the uncompressed merged bed of all samples for a genome."""
return get_outdir(genome_name) / "all_samples.bed"
def get_merged_bed_gz(genome_name):
"""Return path to the compressed merged bed of all samples for a genome."""
return get_merged_bed(genome_name).with_suffix(".bed.gz")
def get_genome_sizes(genome_name):
"""Chrom sizes file for merging."""
return get_outdir(genome_name) / "sizes.tsv"
def get_motif_tsv(genome_name, contig=None):
"""Motif TSV output from modkit find-motifs for a single genome."""
if contig:
return get_motifs_dir(genome_name) / f"{config['coverage']}_{contig}_motifs.tsv"
elif not is_metagenome(genome_name):
return get_motifs_dir(genome_name) / f"{config['coverage']}_motifs.tsv"
else:
assert False, "Cannot get motif TSV for metagenome without contig"
def get_motifs_dir(genome_name):
"""Return the directory for all motifs in a single genome pipeline."""
return get_outdir(genome_name) / "motifs"
def get_motif_dir(genome_name, motif, contig=None):
"""Directory for a specific motif in a single genome pipeline."""
if contig:
return get_outdir(genome_name) / contig / motif
else:
return get_motifs_dir(genome_name) / motif
def get_location_bed(genome_name, motif, contig=None):
"""Return path to the motif location bed (contig or full)."""
return get_motif_dir(genome_name, motif, contig) / "location.bed"
def get_motif_bed(sample, genome_name, motif, contig=None):
"""Return path to the motif-based pileup bed for a sample."""
return get_motif_dir(genome_name, motif, contig) / f"{sample}.bed"
def get_dmr_dir(genome_name, motif, contig=None):
"""Return directory for DMR output."""
return get_motif_dir(genome_name, motif, contig) / "dmr"
def get_dmr_file(genome_name, motif, treatmentA, treatmentB, contig=None):
"""Return DMR bed file for a given pair of treatments."""
return get_dmr_dir(genome_name, motif, contig) / f"{treatmentA}_vs_{treatmentB}.bed"
def get_fai(genome_name):
"""Return .fai index path for a given genome."""
return get_genome_fasta(genome_name).with_suffix(".fna.fai")
def get_contigs(genome_name):
"""Return a list of contigs for a genome."""
fai_path = get_fai(genome_name)
contigs = []
with open(fai_path) as f:
for line in f:
fields = line.strip().split("\t")
if fields:
contigs.append(fields[0])
return contigs
def get_motif_paths(genome_name):
"""Return all motif files for a given genome."""
if is_metagenome(genome_name):
return [get_motif_tsv(genome_name, contig=contig) for contig in get_contigs(genome_name)]
else:
return [get_motif_tsv(genome_name)]
def unpack_motifs(genome_name, contig=None):
"""
Given the motif TSV from find_motifs, parse the lines to discover (mod_code, motif, modpos).
Return them for expansion.
"""
# Trigger the checkpoint
if contig:
chk = checkpoints.find_motifs_contig.get(genome_name=genome_name, contig=contig)
else:
chk = checkpoints.find_motifs_single.get(genome_name=genome_name)
tsv = get_motif_tsv(genome_name, contig=contig)
motifs = []
with open(tsv) as f:
reader = csv.reader(f, delimiter="\t")
next(reader, None) # skip header
for row in reader:
if len(row) >= 3:
mod_code, motif, modpos = row[0], row[1], row[2]
motifs.append((mod_code, motif, modpos))
return motifs
def get_location_beds(genome_name):
"""Returns paths of all motif location beds associated with this genome"""
res = []
if is_metagenome(genome_name):
for contig in get_contigs(genome_name):
for motif in unpack_motifs(genome_name, contig=contig):
res.append(get_location_bed(genome_name, motif[1], contig=contig))
else:
for motif in unpack_motifs(genome_name):
res.append(get_location_bed(genome_name, motif[1], contig=None))
return res
def get_motif_pileups(genome_name):
"""Returns paths of all motif pileups associated with this genome"""
res = []
if is_metagenome(genome_name):
for contig in get_contigs(genome_name):
for motif in unpack_motifs(genome_name, contig=contig):
for sample in SAMPLES:
res.append(get_motif_bed(sample, genome_name, motif[1], contig=contig))
else:
for motif in unpack_motifs(genome_name):
for sample in SAMPLES:
res.append(get_motif_bed(sample, genome_name, motif[1], contig=None))
return res
def get_dmr_paths(genome_name):
"""
For each pair of treatments, produce the DMR file name as a final target.
"""
# All pairwise combos of treatments
dmr_files = []
for (A, B) in combinations(ALL_TREATMENTS, 2):
if is_metagenome(genome_name):
for contig in get_contigs(genome_name):
for motif in unpack_motifs(genome_name, contig=contig):
dmr_files.append(get_dmr_file(genome_name, motif[1], A, B, contig=contig))
else:
for motif in unpack_motifs(genome_name):
dmr_files.append(get_dmr_file(genome_name, motif[1], A, B, contig=None))
return dmr_files
################################################################################
# GENOME INDEX
################################################################################
rule index_genome:
"""
Create FASTA index if it doesn't exist.
"""
input:
fasta = lambda wc: get_genome_fasta(wc.genome_name)
output:
config["genome_dir"] / "{genome_name}.fna.fai"
threads: config["threads"]
shell:
"samtools faidx {input.fasta}"
################################################################################
# RULES FOR PILEUP, SUMMARY, SAMPLE-PROBS, COMPRESS
################################################################################
rule modkit_pileup:
"""
Generate the BED file from each sample's BAM using modkit pileup.
"""
input:
bam = lambda wc: get_bam(wc.sample, wc.genome_name),
fai = lambda wc: get_fai(wc.genome_name)
output:
bed = config["output_dir"] / "{genome_name}" / "{sample}.bed"
threads: config["threads"]
params:
filter_threshold = config["filter_threshold"]
shell:
...rule code....
rule modkit_summary:
"""
Generate a summary TSV for each sample.
"""
input:
bam = lambda wc: get_bam(wc.sample, wc.genome_name),
fai = lambda wc: get_fai(wc.genome_name)
output:
tsv = config["output_dir"] / "{genome_name}" / "summary_{sample}.tsv"
threads: config["threads"]
params:
filter_threshold = config["filter_threshold"]
shell:
...rule code....
rule modkit_sample_probs:
"""
Generate sample-probs directory for each sample.
"""
input:
bam = lambda wc: get_bam(wc.sample, wc.genome_name),
fai = lambda wc: get_fai(wc.genome_name)
output:
directory = directory(config["output_dir"] / "{genome_name}" / "sample_probs_{sample}")
threads: config["threads"]
shell:
...rule code....
rule compress_and_index_bed:
"""
Compress (.gz) and tabix-index the raw bed files.
"""
input:
bed = lambda wc: get_bed(wc.sample, wc.genome_name)
output:
gz = config["output_dir"] / "{genome_name}" / "{sample}.bed.gz",
tbi = config["output_dir"] / "{genome_name}" / "{sample}.bed.gz.tbi"
threads: config["threads"]
shell:
...rule code....
################################################################################
# MERGE BED FILES
################################################################################
rule compute_genome_sizes:
"""
Compute chromosome sizes from the FASTA for merging coverage.
"""
input:
fasta = lambda wc: get_genome_fasta(wc.genome_name),
fai = lambda wc: get_fai(wc.genome_name)
output:
sizes = config["output_dir"] / "{genome_name}" / "sizes.tsv"
run:
...rule code....
rule merge_beds:
"""
Merge all sample bed.gz files into a single all_samples.bed for each genome.
"""
input:
sizes = lambda wc: get_genome_sizes(wc.genome_name),
bedgz = lambda wc: [get_bed_gz(s, wc.genome_name) for s in SAMPLES],
output:
merged = config["output_dir"] / "{genome_name}" / "all_samples.bed",
merged_gz = config["output_dir"] / "{genome_name}" / "all_samples.bed.gz",
merged_tbi = config["output_dir"] / "{genome_name}" / "all_samples.bed.gz.tbi"
threads: config["threads"]
shell:
...rule code....
################################################################################
# FIND MOTIFS (CHECKPOINT)
################################################################################
checkpoint find_motifs_contig:
input:
merged_gz = lambda wc: get_merged_bed_gz(wc.genome_name),
fasta = lambda wc: get_genome_fasta(wc.genome_name),
output:
out_file = config["output_dir"] / "{genome_name}" / "motifs" / f"{config['coverage']}_{{contig}}_motifs.tsv"
threads: config["threads"]
params:
coverage = config["coverage"]
run:
...rule code....
checkpoint find_motifs_single:
input:
merged_gz = lambda wc: get_merged_bed_gz(wc.genome_name),
fasta = lambda wc: get_genome_fasta(wc.genome_name),
output:
out_file = config["output_dir"] / "{genome_name}" / "motifs" / f"{config['coverage']}_motifs.tsv"
threads: config["threads"]
params:
coverage = config["coverage"]
run:
...rule code....
################################################################################
# CREATE MOTIF LOCATION.BED
################################################################################
rule create_motif_location_bed_single:
"""
Create location.bed for a given motif (standard genomes).
"""
input:
fasta = lambda wc: get_genome_fasta(wc.genome_name),
output:
bed = config["output_dir"] / "{genome_name}" / "{motif}" / "location.bed"
threads: config["threads"]
params:
motif = lambda wc: wc.motif,
modpos = lambda wc: get_motif_mod_pos(wc.genome_name, params.motif)
run:
...rule code....
rule create_motif_location_bed_contig:
"""
Create location.bed for a given motif in a metagenome (for a specific contig).
"""
input:
fasta = lambda wc: get_genome_fasta(wc.genome_name),
output:
bed = config["output_dir"] / "{genome_name}" / "{contig}" / "{motif}" / "location.bed"
threads: config["threads"]
params:
motif = lambda wc: wc.motif,
modpos = lambda wc: get_motif_mod_pos(wc.genome_name, params.motif, contig=wc.contig)
run:
...rule code....
################################################################################
# MOTIF PILEUP PER SAMPLE
################################################################################
rule motif_pileup_single:
"""
For each motif, run modkit pileup with --motif for each sample,
optionally restricted to a specific contig if this is a metagenome.
"""
input:
bam = lambda wc: get_bam(wc.sample, wc.genome_name),
fasta = lambda wc: get_genome_fasta(wc.genome_name),
loc = lambda wc: get_location_bed(wc.genome_name, wc.motif, contig=None)
output:
bed = config["output_dir"] / "{genome_name}" / "{motif}" / "{sample}.bed"
threads: config["threads"]
params:
motif = lambda wc: wc.motif,
modpos = lambda wc: get_motif_mod_pos(wc.genome_name, params.motif, contig=None),
filter_threshold = config["filter_threshold"]
shell:
...rule code....
rule motif_pileup_contig:
"""
For each motif, run modkit pileup with --motif for each sample,
optionally restricted to a specific contig if this is a metagenome.
"""
input:
bam = lambda wc: get_bam(wc.sample, wc.genome_name),
fasta = lambda wc: get_genome_fasta(wc.genome_name),
loc = lambda wc: get_location_bed(wc.genome_name, wc.motif, contig=wc.contig)
output:
bed = config["output_dir"] / "{genome_name}" / "{contig}" / "{motif}" / "{sample}.bed"
threads: config["threads"]
params:
motif = lambda wc: wc.motif,
modpos = lambda wc: get_motif_mod_pos(wc.genome_name, params.motif, contig=wc.contig),
filter_threshold = config["filter_threshold"]
shell:
...rule code....
################################################################################
# DMR COMPARISONS
################################################################################
rule dmr_pair_single:
"""
Compare two treatments for DMR. Then filter results with bedtools intersect.
"""
input:
fasta = lambda wc: get_genome_fasta(wc.genome_name),
loc = lambda wc: get_location_bed(wc.genome_name, wc.motif, contig=None)
# We gather sample bed gz from the top-level pipeline.
# But for the modkit dmr pair, we want the compressed all-sample bed from each treatment.
# Actually, we do pairwise combining of each sample bed in each treatment.
# We can gather them on the fly or inputfunction.
output:
dmr = config["output_dir"] / "{genome_name}" / "{motif}" / "dmr" / "{treatmentA}_vs_{treatmentB}.bed"
threads: config["threads"]
params:
coverage = config["coverage"]
run:
...rule code....
rule dmr_pair_contig:
"""
Compare two treatments for DMR. Then filter results with bedtools intersect.
"""
input:
fasta = lambda wc: get_genome_fasta(wc.genome_name),
loc = lambda wc: get_location_bed(wc.genome_name, wc.motif, contig=wc.contig)
output:
dmr = config["output_dir"] / "{genome_name}" / "{contig}" / "{motif}" / "dmr" / "{treatmentA}_vs_{treatmentB}.bed"
threads: config["threads"]
params:
coverage = config["coverage"]
run:
...rule code....
################################################################################
# RULE: ALL
################################################################################
rule all:
"""
Final target: we want everything done for all genomes, all motifs, all pairs, etc.
Using Snakemake's checkpoint and dynamic expansions, our final 'all' rule will be
the presence of all possible DMR files. But we'll rely on the checkpoint logic
to discover motifs.
"""
input:
[get_fai(g) for g in GENOMES],
[get_summary(s, g) for s in SAMPLES for g in GENOMES],
[get_sample_probs_dir(s, g) for s in SAMPLES for g in GENOMES],
[get_merged_bed_gz(g) for g in GENOMES],
# These could be collapsed to just get_dmr_paths, but provising the rest is more efficient
[path for genome in GENOMES for path in get_motif_paths(genome)],
# lambda wc: [get_location_beds(g) for g in GENOMES],
# lambda wc: [get_motif_pileups(g) for g in GENOMES],
lambda wc: [get_dmr_paths(g) for g in GENOMES]
default_target: True
ruleorder: index_genome > modkit_pileup > modkit_summary > modkit_sample_probs > compress_and_index_bed > compute_genome_sizes > merge_beds > find_motifs_single > create_motif_location_bed_single > motif_pileup_single > dmr_pair_single > find_motifs_contig > create_motif_location_bed_contig > motif_pileup_contig > dmr_pair_contig
Подробнее здесь: [url]https://stackoverflow.com/questions/79437869/correct-way-of-structuring-a-checkpoint-in-snakemake[/url]
-
- Похожие темы
- Ответы
- Просмотры
- Последнее сообщение
-
-
Правильный способ структурирования контрольной точки в SnakeMake
Anonymous » » в форуме PythonМой рабочий процесс Snakemake требует навсегда, чтобы оценить DAG. Я утверждаю, что это связано с тем, что ему необходимо выполнить контрольную точку во время строительства.
Поэтому я подумал Наличие контрольной точки в качестве ввода в правило... - 0 Ответы
- 20 Просмотры
-
Последнее сообщение Anonymous
-
-
-
SnakeMake WorkflowerRor: «Целевые правила могут не содержать подстановочных знаков» при использовании контрольной точки
Anonymous » » в форуме PythonЯ пытаюсь создать трубопровод Snakemake, который использует контрольную точку для динамического определения {последовательности} подстановочных знаков после обработки данных пациента. Моя структура рабочего процесса примерно:
контрольная точка... - 0 Ответы
- 8 Просмотры
-
Последнее сообщение Anonymous
-
-
-
SnakeMake Access snakemake.config в профиле config.yaml файл
Anonymous » » в форуме PythonЯ хочу запустить конвейер на кластере, где имя заданий имеет форму: smk- {config }-{rule}-{wildcards} . Могу ли я просто сделать:
snakemake --profile slurm --configfile myconfig.yaml
с slurm/config.yaml wee:
executor: slurm
jobs: 64
latency-wait:... - 0 Ответы
- 31 Просмотры
-
Последнее сообщение Anonymous
-
-
-
SnakeMake Access snakemake.config в профиле config.yaml файл
Anonymous » » в форуме PythonЯ хочу запустить конвейер на кластере, где имя заданий имеет форму: smk- {config }-{rule}-{wildcards} . Могу ли я просто сделать:
snakemake --profile slurm --configfile myconfig.yaml
с slurm/config.yaml wee:
executor: slurm
jobs: 64
latency-wait:... - 0 Ответы
- 14 Просмотры
-
Последнее сообщение Anonymous
-
-
-
Оптимальные способы структурирования и отображения документов в мобильном приложении?
Anonymous » » в форуме AndroidМне нужно немного информации о проекте приложения.
Идея: приложение будет предназначено для конкретной профессии, скажем, юристов, где они смогут найти все необходимые документы. для конкретных случаев. Эти документы в основном выдаются... - 0 Ответы
- 18 Просмотры
-
Последнее сообщение Anonymous
-
Перейти
- Кемерово-IT
- ↳ Javascript
- ↳ C#
- ↳ JAVA
- ↳ Elasticsearch aggregation
- ↳ Python
- ↳ Php
- ↳ Android
- ↳ Html
- ↳ Jquery
- ↳ C++
- ↳ IOS
- ↳ CSS
- ↳ Excel
- ↳ Linux
- ↳ Apache
- ↳ MySql
- Детский мир
- Для души
- ↳ Музыкальные инструменты даром
- ↳ Печатная продукция даром
- Внешняя красота и здоровье
- ↳ Одежда и обувь для взрослых даром
- ↳ Товары для здоровья
- ↳ Физкультура и спорт
- Техника - даром!
- ↳ Автомобилистам
- ↳ Компьютерная техника
- ↳ Плиты: газовые и электрические
- ↳ Холодильники
- ↳ Стиральные машины
- ↳ Телевизоры
- ↳ Телефоны, смартфоны, плашеты
- ↳ Швейные машинки
- ↳ Прочая электроника и техника
- ↳ Фототехника
- Ремонт и интерьер
- ↳ Стройматериалы, инструмент
- ↳ Мебель и предметы интерьера даром
- ↳ Cантехника
- Другие темы
- ↳ Разное даром
- ↳ Давай меняться!
- ↳ Отдам\возьму за копеечку
- ↳ Работа и подработка в Кемерове
- ↳ Давай с тобой поговорим...