Следующий поток: как дополнительно распараллелить входной список из разветвленного канала по значениям в строках входногPython

Программы на Python
Ответить Пред. темаСлед. тема
Anonymous
 Следующий поток: как дополнительно распараллелить входной список из разветвленного канала по значениям в строках входног

Сообщение Anonymous »


Kindly bear with me! Looking an elegant way to get the following requirment to work. Appreciate your help and time!. I'm stuck with the process to fully parallelize which has set of input files per chromosomes in the list (chr1..chr22) to process per chromosome.

inputs_list

chromosomes: - chr_no: chr1 bcf: s3://path/to/vcf/somename.chr1.bcf gmap: s3://path/to/gmap/somename.chr1.b38.gmap.gz region_common: s3://path/to/common/chr1_20cM_maf001.txt region_rare: s3://path/to/common/chr1_4cM_maf001.txt - chr_no: chr2 bcf: s3://path/to/vcf/somename.chr2.bcf gmap: s3://path/to/gmap/somename.chr2.b38.gmap.gz region_common: s3://path/to/common/chr2_20cM_maf001.txt region_rare: s3://path/to/common/chr2_4cM_maf001.txt - chr_no: chr3 ... inputs = new YamlSlurper().parse(file(params.inputs_list))

channel chromosomes_input

Channel .fromList(inputs['chromosomes']) .set { chromosomes } Channel chromosomes.branch { rec -> def bcf_file = rec.bcf ? file( rec.bcf ) : null output: rec.chr_no && bcf_file def bcf_idx = file( "${rec.bcf}.csi" ) def gmap_file = rec.gmap ? file( rec.gmap ) : null def region_common_file = rec.region_common ? file( rec.region_common ) : null def region_rare_file = rec.region_rare ? file( rec.region_rare ) : null return tuple( rec.chr_no, bcf_file, bcf_idx, gmap_file, region_common, region_rare ) } .set { chromosomes_inputs } phasecommon( chromosomes_inputs ) 1. I want to further parallelize the process phasecommon like introducing another channel by reading the input file region_common in the tuple of each chromosomes by the values (row[1], row[0], row[2]) in each rows per job together with all input files of each chromosomes (bcf_file, bcf_idx, gmap_file) from chromosomes_input. Like..

region_common: s3://path/to/common/chr1_20cM_maf001.txt

0 chr1 chr1:1-10799907 ... 1 chr1 chr1:9406920-21409591 ... 2 chr1 chr1:19927406-37996621 ... ...

Channel // .fromPath(params.region_common) .splitCsv(header: false, sep: '\t') .map { row -> tuple(row[1], row[0], row[2]) } .set { region_common_list } And looking to get the following tuple to the process phasecommon.

tuple val(chr_no), val(chunk_no), val(chunk_region), path(bcf), path(csi), path(gmap)

process phasecommon { tag "${chr_no}_${chunk_no}_${chunk_region}" input: //tuple val(chr_no), val(bcf), val(csi), path(gmap), path(region_common), path(region_rare) ## instead... tuple val(chr_no), val(chunk_no), val(chunk_region), path(bcf), path(csi), path(gmap) output: tuple val(chr_no), path("${chr_no}/10k.filtered.${chr_no}.chunk_*.common.bcf*") //tuple val(chr_no), path("${chr_no}/10k.filtered.${chr_no}.chunk_*.common.bcf"), emit:common_bcf //tuple val(chr_no), path("${chr_no}/10k.filtered.${chr_no}.chunk_*.common.bcf.csi"), emit:common_idx script: """ mkdir -p ${chr_no} OUT=10k.filtered.${chr_no}.chunk_${chunk_no}.common.bcf common \ --input ${bcf} \ --map ${gmap} \ --output ${chr_no}/\${OUT} \ --thread ${task.cpus} \ --filter-maf 0.001 \ --region ${chunk_region} && bcftools index -f ${chr_no}/\${OUT} --threads ${task.cpus} """ } 2. Only when all the phasecommon row based jobs per chromosomes completed, all the output of it need to be collected per chromosomes and pass it to another process ligatecommon per chromosome.

Channel //.fromList( chr1..chr22 ) .of(chr1..ch22) .set {chromosomes_list} //ligatecommon( phasecommon.out.common_out.collect() ) //chromosomes_inputs.combine(phasecommon.out,by:0).set{ligatecommon} chromosomes_list.combine(phasecommon.out,by:0).set{ligatecommon} --
process ligatecommon { tag "${chr_no}" input: tuple val(chr_no), path(bcf), path(csi) output: tuple val(chr_no), path("${chr_no}/10k.filtered.${chr_no}.chunk_*.common.bcf"), emit:common_bcf script: """ ls -1v ${chr_no}/10k.filtered.${chr_no}.chunk_*.common.bcf > ${chr_no}/${chr_no}_phase_common.txt ligate \ --input ${chr_no}/${chr_no}_phase_common.txt \ --output ${chr_no}/10k.filtered.${chr_no}.common_ligate.bcf \ --thread ${task.cpus} \ --index """ }

Источник: https://stackoverflow.com/questions/780 ... annel-by-v
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение

Вернуться в «Python»