Следующий поток: как дополнительно распараллелить входной список из разветвленного канала по значениям в строках входног ⇐ Python
Следующий поток: как дополнительно распараллелить входной список из разветвленного канала по значениям в строках входног
Kindly bear with me! Looking an elegant way to get the following requirment to work. Appreciate your help and time!. I'm stuck with the process to fully parallelize which has set of input files per chromosomes in the list (chr1..chr22) to process per chromosome.
inputs_list
chromosomes: - chr_no: chr1 bcf: s3://path/to/vcf/somename.chr1.bcf gmap: s3://path/to/gmap/somename.chr1.b38.gmap.gz region_common: s3://path/to/common/chr1_20cM_maf001.txt region_rare: s3://path/to/common/chr1_4cM_maf001.txt - chr_no: chr2 bcf: s3://path/to/vcf/somename.chr2.bcf gmap: s3://path/to/gmap/somename.chr2.b38.gmap.gz region_common: s3://path/to/common/chr2_20cM_maf001.txt region_rare: s3://path/to/common/chr2_4cM_maf001.txt - chr_no: chr3 ... inputs = new YamlSlurper().parse(file(params.inputs_list))
channel chromosomes_input
Channel .fromList(inputs['chromosomes']) .set { chromosomes } Channel chromosomes.branch { rec -> def bcf_file = rec.bcf ? file( rec.bcf ) : null output: rec.chr_no && bcf_file def bcf_idx = file( "${rec.bcf}.csi" ) def gmap_file = rec.gmap ? file( rec.gmap ) : null def region_common_file = rec.region_common ? file( rec.region_common ) : null def region_rare_file = rec.region_rare ? file( rec.region_rare ) : null return tuple( rec.chr_no, bcf_file, bcf_idx, gmap_file, region_common, region_rare ) } .set { chromosomes_inputs } phasecommon( chromosomes_inputs ) 1. I want to further parallelize the process phasecommon like introducing another channel by reading the input file region_common in the tuple of each chromosomes by the values (row[1], row[0], row[2]) in each rows per job together with all input files of each chromosomes (bcf_file, bcf_idx, gmap_file) from chromosomes_input. Like..
region_common: s3://path/to/common/chr1_20cM_maf001.txt
0 chr1 chr1:1-10799907 ... 1 chr1 chr1:9406920-21409591 ... 2 chr1 chr1:19927406-37996621 ... ...
Channel // .fromPath(params.region_common) .splitCsv(header: false, sep: '\t') .map { row -> tuple(row[1], row[0], row[2]) } .set { region_common_list } And looking to get the following tuple to the process phasecommon.
tuple val(chr_no), val(chunk_no), val(chunk_region), path(bcf), path(csi), path(gmap)
process phasecommon { tag "${chr_no}_${chunk_no}_${chunk_region}" input: //tuple val(chr_no), val(bcf), val(csi), path(gmap), path(region_common), path(region_rare) ## instead... tuple val(chr_no), val(chunk_no), val(chunk_region), path(bcf), path(csi), path(gmap) output: tuple val(chr_no), path("${chr_no}/10k.filtered.${chr_no}.chunk_*.common.bcf*") //tuple val(chr_no), path("${chr_no}/10k.filtered.${chr_no}.chunk_*.common.bcf"), emit:common_bcf //tuple val(chr_no), path("${chr_no}/10k.filtered.${chr_no}.chunk_*.common.bcf.csi"), emit:common_idx script: """ mkdir -p ${chr_no} OUT=10k.filtered.${chr_no}.chunk_${chunk_no}.common.bcf common \ --input ${bcf} \ --map ${gmap} \ --output ${chr_no}/\${OUT} \ --thread ${task.cpus} \ --filter-maf 0.001 \ --region ${chunk_region} && bcftools index -f ${chr_no}/\${OUT} --threads ${task.cpus} """ } 2. Only when all the phasecommon row based jobs per chromosomes completed, all the output of it need to be collected per chromosomes and pass it to another process ligatecommon per chromosome.
Channel //.fromList( chr1..chr22 ) .of(chr1..ch22) .set {chromosomes_list} //ligatecommon( phasecommon.out.common_out.collect() ) //chromosomes_inputs.combine(phasecommon.out,by:0).set{ligatecommon} chromosomes_list.combine(phasecommon.out,by:0).set{ligatecommon} --
process ligatecommon { tag "${chr_no}" input: tuple val(chr_no), path(bcf), path(csi) output: tuple val(chr_no), path("${chr_no}/10k.filtered.${chr_no}.chunk_*.common.bcf"), emit:common_bcf script: """ ls -1v ${chr_no}/10k.filtered.${chr_no}.chunk_*.common.bcf > ${chr_no}/${chr_no}_phase_common.txt ligate \ --input ${chr_no}/${chr_no}_phase_common.txt \ --output ${chr_no}/10k.filtered.${chr_no}.common_ligate.bcf \ --thread ${task.cpus} \ --index """ }
Источник: https://stackoverflow.com/questions/780 ... annel-by-v
Kindly bear with me! Looking an elegant way to get the following requirment to work. Appreciate your help and time!. I'm stuck with the process to fully parallelize which has set of input files per chromosomes in the list (chr1..chr22) to process per chromosome.
inputs_list
chromosomes: - chr_no: chr1 bcf: s3://path/to/vcf/somename.chr1.bcf gmap: s3://path/to/gmap/somename.chr1.b38.gmap.gz region_common: s3://path/to/common/chr1_20cM_maf001.txt region_rare: s3://path/to/common/chr1_4cM_maf001.txt - chr_no: chr2 bcf: s3://path/to/vcf/somename.chr2.bcf gmap: s3://path/to/gmap/somename.chr2.b38.gmap.gz region_common: s3://path/to/common/chr2_20cM_maf001.txt region_rare: s3://path/to/common/chr2_4cM_maf001.txt - chr_no: chr3 ... inputs = new YamlSlurper().parse(file(params.inputs_list))
channel chromosomes_input
Channel .fromList(inputs['chromosomes']) .set { chromosomes } Channel chromosomes.branch { rec -> def bcf_file = rec.bcf ? file( rec.bcf ) : null output: rec.chr_no && bcf_file def bcf_idx = file( "${rec.bcf}.csi" ) def gmap_file = rec.gmap ? file( rec.gmap ) : null def region_common_file = rec.region_common ? file( rec.region_common ) : null def region_rare_file = rec.region_rare ? file( rec.region_rare ) : null return tuple( rec.chr_no, bcf_file, bcf_idx, gmap_file, region_common, region_rare ) } .set { chromosomes_inputs } phasecommon( chromosomes_inputs ) 1. I want to further parallelize the process phasecommon like introducing another channel by reading the input file region_common in the tuple of each chromosomes by the values (row[1], row[0], row[2]) in each rows per job together with all input files of each chromosomes (bcf_file, bcf_idx, gmap_file) from chromosomes_input. Like..
region_common: s3://path/to/common/chr1_20cM_maf001.txt
0 chr1 chr1:1-10799907 ... 1 chr1 chr1:9406920-21409591 ... 2 chr1 chr1:19927406-37996621 ... ...
Channel // .fromPath(params.region_common) .splitCsv(header: false, sep: '\t') .map { row -> tuple(row[1], row[0], row[2]) } .set { region_common_list } And looking to get the following tuple to the process phasecommon.
tuple val(chr_no), val(chunk_no), val(chunk_region), path(bcf), path(csi), path(gmap)
process phasecommon { tag "${chr_no}_${chunk_no}_${chunk_region}" input: //tuple val(chr_no), val(bcf), val(csi), path(gmap), path(region_common), path(region_rare) ## instead... tuple val(chr_no), val(chunk_no), val(chunk_region), path(bcf), path(csi), path(gmap) output: tuple val(chr_no), path("${chr_no}/10k.filtered.${chr_no}.chunk_*.common.bcf*") //tuple val(chr_no), path("${chr_no}/10k.filtered.${chr_no}.chunk_*.common.bcf"), emit:common_bcf //tuple val(chr_no), path("${chr_no}/10k.filtered.${chr_no}.chunk_*.common.bcf.csi"), emit:common_idx script: """ mkdir -p ${chr_no} OUT=10k.filtered.${chr_no}.chunk_${chunk_no}.common.bcf common \ --input ${bcf} \ --map ${gmap} \ --output ${chr_no}/\${OUT} \ --thread ${task.cpus} \ --filter-maf 0.001 \ --region ${chunk_region} && bcftools index -f ${chr_no}/\${OUT} --threads ${task.cpus} """ } 2. Only when all the phasecommon row based jobs per chromosomes completed, all the output of it need to be collected per chromosomes and pass it to another process ligatecommon per chromosome.
Channel //.fromList( chr1..chr22 ) .of(chr1..ch22) .set {chromosomes_list} //ligatecommon( phasecommon.out.common_out.collect() ) //chromosomes_inputs.combine(phasecommon.out,by:0).set{ligatecommon} chromosomes_list.combine(phasecommon.out,by:0).set{ligatecommon} --
process ligatecommon { tag "${chr_no}" input: tuple val(chr_no), path(bcf), path(csi) output: tuple val(chr_no), path("${chr_no}/10k.filtered.${chr_no}.chunk_*.common.bcf"), emit:common_bcf script: """ ls -1v ${chr_no}/10k.filtered.${chr_no}.chunk_*.common.bcf > ${chr_no}/${chr_no}_phase_common.txt ligate \ --input ${chr_no}/${chr_no}_phase_common.txt \ --output ${chr_no}/10k.filtered.${chr_no}.common_ligate.bcf \ --thread ${task.cpus} \ --index """ }
Источник: https://stackoverflow.com/questions/780 ... annel-by-v
-
- Похожие темы
- Ответы
- Просмотры
- Последнее сообщение