Следующий поток: как дополнительно распараллелить процесс, прочитав файл в списке и собрав выходные данные для обработки ⇐ Python
Следующий поток: как дополнительно распараллелить процесс, прочитав файл в списке и собрав выходные данные для обработки
Kindly bear with me! Looking an elegant way to get the following requirment works. Appreciate your help and time!. I'm stuck with the process to fully parallelize which has set of input files per chromosomes in the list (chr1..chr22) to process per chromosome.
inputs_list
chromosomes: - chr_no: chr1 bcf: s3://path/to/vcf/somename.chr1.bcf gmap: s3://path/to/gmap/somename.chr1.b38.gmap.gz region_common: s3://path/to/common/chr1_20cM_maf001.txt region_rare: s3://path/to/common/chr1_4cM_maf001.txt - chr_no: chr2 bcf: s3://path/to/vcf/somename.chr2.bcf gmap: s3://path/to/gmap/somename.chr2.b38.gmap.gz region_common: s3://path/to/common/chr2_20cM_maf001.txt region_rare: s3://path/to/common/chr2_4cM_maf001.txt - chr_no: chr3 ... inputs = new YamlSlurper().parse(file(params.inputs_list))
channel chromosomes_input
Channel .fromList(inputs['chromosomes']) .set { chromosomes } Channel chromosomes.branch { rec -> def bcf_file = rec.bcf ? file( rec.bcf ) : null output: rec.chr_no && bcf_file def bcf_idx = file( "${rec.bcf}.csi" ) def gmap_file = rec.gmap ? file( rec.gmap ) : null def region_common_file = rec.region_common ? file( rec.region_common ) : null def region_rare_file = rec.region_rare ? file( rec.region_rare ) : null return tuple( rec.chr_no, bcf_file, bcf_idx, gmap_file, region_common, region_rare ) } .set { chromosomes_inputs } phasecommon( chromosomes_inputs ) 1. I want to further parallelize the process phasecommon like introducing another channel by reading one of the input file region_common in the tuple of each chromosomes by the values of (row[1], row[0], row[2]) in each rows per job together with other input files of each chromosomes (bcf_file, bcf_idx, gmap_file) from chromosomes_input. Like..
region_common: s3://path/to/common/chr1_20cM_maf001.txt
0 chr1 chr1:1-10799907 ... 1 chr1 chr1:9406920-21409591 ... 2 chr1 chr1:19927406-37996621 ... ...
Channel // .fromPath(params.region_common) .splitCsv(header: false, sep: '\t') .map { row -> tuple(row[1], row[0], row[2]) } .set { region_common_list } And looking to get the following tuple as input to the process phasecommon.
tuple val(chr_no), val(chunk_no), val(chunk_region), path(bcf), path(csi), path(gmap)
process phasecommon { tag "${chr_no}_${chunk_no}_${chunk_region}" input: //tuple val(chr_no), val(bcf), val(csi), path(gmap), path(region_common), path(region_rare) ## instead... tuple val(chr_no), val(chunk_no), val(chunk_region), path(bcf), path(csi), path(gmap) output: tuple val(chr_no), path("${chr_no}/10k.filtered.${chr_no}.chunk_*.common.bcf*") //tuple val(chr_no), path("${chr_no}/10k.filtered.${chr_no}.chunk_*.common.bcf"), emit:common_bcf //tuple val(chr_no), path("${chr_no}/10k.filtered.${chr_no}.chunk_*.common.bcf.csi"), emit:common_idx script: """ mkdir -p ${chr_no} OUT=10k.filtered.${chr_no}.chunk_${chunk_no}.common.bcf common \ --input ${bcf} \ --map ${gmap} \ --output ${chr_no}/\${OUT} \ --thread ${task.cpus} \ --filter-maf 0.001 \ --region ${chunk_region} && bcftools index -f ${chr_no}/\${OUT} --threads ${task.cpus} """ } 2. Only when all the phasecommon row based jobs per chromosomes completed, all the output of it need to be collected per chromosomes and pass it to another process ligatecommon per chromosome.
Channel //.fromList( chr1..chr22 ) .of(chr1..ch22) .set {chromosomes_list} //ligatecommon( phasecommon.out.common_out.collect() ) //chromosomes_inputs.combine(phasecommon.out,by:0).set{ligatecommon} chromosomes_list.combine(phasecommon.out,by:0).set{ligatecommon} --
process ligatecommon { tag "${chr_no}" input: tuple val(chr_no), path(bcf), path(csi) output: tuple val(chr_no), path("${chr_no}/10k.filtered.${chr_no}.chunk_*.common.bcf"), emit:common_bcf script: """ ls -1v ${chr_no}/10k.filtered.${chr_no}.chunk_*.common.bcf > ${chr_no}/${chr_no}_phase_common.txt ligate \ --input ${chr_no}/${chr_no}_phase_common.txt \ --output ${chr_no}/10k.filtered.${chr_no}.common_ligate.bcf \ --thread ${task.cpus} \ --index """ }
Источник: https://stackoverflow.com/questions/780 ... n-the-list
Kindly bear with me! Looking an elegant way to get the following requirment works. Appreciate your help and time!. I'm stuck with the process to fully parallelize which has set of input files per chromosomes in the list (chr1..chr22) to process per chromosome.
inputs_list
chromosomes: - chr_no: chr1 bcf: s3://path/to/vcf/somename.chr1.bcf gmap: s3://path/to/gmap/somename.chr1.b38.gmap.gz region_common: s3://path/to/common/chr1_20cM_maf001.txt region_rare: s3://path/to/common/chr1_4cM_maf001.txt - chr_no: chr2 bcf: s3://path/to/vcf/somename.chr2.bcf gmap: s3://path/to/gmap/somename.chr2.b38.gmap.gz region_common: s3://path/to/common/chr2_20cM_maf001.txt region_rare: s3://path/to/common/chr2_4cM_maf001.txt - chr_no: chr3 ... inputs = new YamlSlurper().parse(file(params.inputs_list))
channel chromosomes_input
Channel .fromList(inputs['chromosomes']) .set { chromosomes } Channel chromosomes.branch { rec -> def bcf_file = rec.bcf ? file( rec.bcf ) : null output: rec.chr_no && bcf_file def bcf_idx = file( "${rec.bcf}.csi" ) def gmap_file = rec.gmap ? file( rec.gmap ) : null def region_common_file = rec.region_common ? file( rec.region_common ) : null def region_rare_file = rec.region_rare ? file( rec.region_rare ) : null return tuple( rec.chr_no, bcf_file, bcf_idx, gmap_file, region_common, region_rare ) } .set { chromosomes_inputs } phasecommon( chromosomes_inputs ) 1. I want to further parallelize the process phasecommon like introducing another channel by reading one of the input file region_common in the tuple of each chromosomes by the values of (row[1], row[0], row[2]) in each rows per job together with other input files of each chromosomes (bcf_file, bcf_idx, gmap_file) from chromosomes_input. Like..
region_common: s3://path/to/common/chr1_20cM_maf001.txt
0 chr1 chr1:1-10799907 ... 1 chr1 chr1:9406920-21409591 ... 2 chr1 chr1:19927406-37996621 ... ...
Channel // .fromPath(params.region_common) .splitCsv(header: false, sep: '\t') .map { row -> tuple(row[1], row[0], row[2]) } .set { region_common_list } And looking to get the following tuple as input to the process phasecommon.
tuple val(chr_no), val(chunk_no), val(chunk_region), path(bcf), path(csi), path(gmap)
process phasecommon { tag "${chr_no}_${chunk_no}_${chunk_region}" input: //tuple val(chr_no), val(bcf), val(csi), path(gmap), path(region_common), path(region_rare) ## instead... tuple val(chr_no), val(chunk_no), val(chunk_region), path(bcf), path(csi), path(gmap) output: tuple val(chr_no), path("${chr_no}/10k.filtered.${chr_no}.chunk_*.common.bcf*") //tuple val(chr_no), path("${chr_no}/10k.filtered.${chr_no}.chunk_*.common.bcf"), emit:common_bcf //tuple val(chr_no), path("${chr_no}/10k.filtered.${chr_no}.chunk_*.common.bcf.csi"), emit:common_idx script: """ mkdir -p ${chr_no} OUT=10k.filtered.${chr_no}.chunk_${chunk_no}.common.bcf common \ --input ${bcf} \ --map ${gmap} \ --output ${chr_no}/\${OUT} \ --thread ${task.cpus} \ --filter-maf 0.001 \ --region ${chunk_region} && bcftools index -f ${chr_no}/\${OUT} --threads ${task.cpus} """ } 2. Only when all the phasecommon row based jobs per chromosomes completed, all the output of it need to be collected per chromosomes and pass it to another process ligatecommon per chromosome.
Channel //.fromList( chr1..chr22 ) .of(chr1..ch22) .set {chromosomes_list} //ligatecommon( phasecommon.out.common_out.collect() ) //chromosomes_inputs.combine(phasecommon.out,by:0).set{ligatecommon} chromosomes_list.combine(phasecommon.out,by:0).set{ligatecommon} --
process ligatecommon { tag "${chr_no}" input: tuple val(chr_no), path(bcf), path(csi) output: tuple val(chr_no), path("${chr_no}/10k.filtered.${chr_no}.chunk_*.common.bcf"), emit:common_bcf script: """ ls -1v ${chr_no}/10k.filtered.${chr_no}.chunk_*.common.bcf > ${chr_no}/${chr_no}_phase_common.txt ligate \ --input ${chr_no}/${chr_no}_phase_common.txt \ --output ${chr_no}/10k.filtered.${chr_no}.common_ligate.bcf \ --thread ${task.cpus} \ --index """ }
Источник: https://stackoverflow.com/questions/780 ... n-the-list
-
- Похожие темы
- Ответы
- Просмотры
- Последнее сообщение
-
-
Создайте временные ряды, прочитав данные часового пояса из Excel в Python.
Anonymous » » в форуме Python - 0 Ответы
- 38 Просмотры
-
Последнее сообщение Anonymous
-
-
-
Сделайте URL-адрес настраиваемым, прочитав файл appsettings.json на странице blazor.
Anonymous » » в форуме C# - 0 Ответы
- 13 Просмотры
-
Последнее сообщение Anonymous
-
-
-
Сделайте URL-адрес настраиваемым, прочитав файл appsettings.json на странице blazor.
Anonymous » » в форуме C# - 0 Ответы
- 7 Просмотры
-
Последнее сообщение Anonymous
-
-
-
Сделайте URL-адрес настраиваемым, прочитав файл appsettings.json на странице blazor.
Anonymous » » в форуме C# - 0 Ответы
- 6 Просмотры
-
Последнее сообщение Anonymous
-