Следующий поток: как дополнительно распараллелить процесс, прочитав файл в списке и собрав выходные данные для обработкиPython

Программы на Python
Ответить Пред. темаСлед. тема
Anonymous
 Следующий поток: как дополнительно распараллелить процесс, прочитав файл в списке и собрав выходные данные для обработки

Сообщение Anonymous »


Kindly bear with me! Looking an elegant way to get the following requirment works. Appreciate your help and time!. I'm stuck with the process to fully parallelize which has set of input files per chromosomes in the list (chr1..chr22) to process per chromosome.

inputs_list

chromosomes: - chr_no: chr1 bcf: s3://path/to/vcf/somename.chr1.bcf gmap: s3://path/to/gmap/somename.chr1.b38.gmap.gz region_common: s3://path/to/common/chr1_20cM_maf001.txt region_rare: s3://path/to/common/chr1_4cM_maf001.txt - chr_no: chr2 bcf: s3://path/to/vcf/somename.chr2.bcf gmap: s3://path/to/gmap/somename.chr2.b38.gmap.gz region_common: s3://path/to/common/chr2_20cM_maf001.txt region_rare: s3://path/to/common/chr2_4cM_maf001.txt - chr_no: chr3 ... inputs = new YamlSlurper().parse(file(params.inputs_list))

channel chromosomes_input

Channel .fromList(inputs['chromosomes']) .set { chromosomes } Channel chromosomes.branch { rec -> def bcf_file = rec.bcf ? file( rec.bcf ) : null output: rec.chr_no && bcf_file def bcf_idx = file( "${rec.bcf}.csi" ) def gmap_file = rec.gmap ? file( rec.gmap ) : null def region_common_file = rec.region_common ? file( rec.region_common ) : null def region_rare_file = rec.region_rare ? file( rec.region_rare ) : null return tuple( rec.chr_no, bcf_file, bcf_idx, gmap_file, region_common, region_rare ) } .set { chromosomes_inputs } phasecommon( chromosomes_inputs ) 1. I want to further parallelize the process phasecommon like introducing another channel by reading one of the input file region_common in the tuple of each chromosomes by the values of (row[1], row[0], row[2]) in each rows per job together with other input files of each chromosomes (bcf_file, bcf_idx, gmap_file) from chromosomes_input. Like..

region_common: s3://path/to/common/chr1_20cM_maf001.txt

0 chr1 chr1:1-10799907 ... 1 chr1 chr1:9406920-21409591 ... 2 chr1 chr1:19927406-37996621 ... ...

Channel // .fromPath(params.region_common) .splitCsv(header: false, sep: '\t') .map { row -> tuple(row[1], row[0], row[2]) } .set { region_common_list } And looking to get the following tuple as input to the process phasecommon.

tuple val(chr_no), val(chunk_no), val(chunk_region), path(bcf), path(csi), path(gmap)

process phasecommon { tag "${chr_no}_${chunk_no}_${chunk_region}" input: //tuple val(chr_no), val(bcf), val(csi), path(gmap), path(region_common), path(region_rare) ## instead... tuple val(chr_no), val(chunk_no), val(chunk_region), path(bcf), path(csi), path(gmap) output: tuple val(chr_no), path("${chr_no}/10k.filtered.${chr_no}.chunk_*.common.bcf*") //tuple val(chr_no), path("${chr_no}/10k.filtered.${chr_no}.chunk_*.common.bcf"), emit:common_bcf //tuple val(chr_no), path("${chr_no}/10k.filtered.${chr_no}.chunk_*.common.bcf.csi"), emit:common_idx script: """ mkdir -p ${chr_no} OUT=10k.filtered.${chr_no}.chunk_${chunk_no}.common.bcf common \ --input ${bcf} \ --map ${gmap} \ --output ${chr_no}/\${OUT} \ --thread ${task.cpus} \ --filter-maf 0.001 \ --region ${chunk_region} && bcftools index -f ${chr_no}/\${OUT} --threads ${task.cpus} """ } 2. Only when all the phasecommon row based jobs per chromosomes completed, all the output of it need to be collected per chromosomes and pass it to another process ligatecommon per chromosome.

Channel //.fromList( chr1..chr22 ) .of(chr1..ch22) .set {chromosomes_list} //ligatecommon( phasecommon.out.common_out.collect() ) //chromosomes_inputs.combine(phasecommon.out,by:0).set{ligatecommon} chromosomes_list.combine(phasecommon.out,by:0).set{ligatecommon} --
process ligatecommon { tag "${chr_no}" input: tuple val(chr_no), path(bcf), path(csi) output: tuple val(chr_no), path("${chr_no}/10k.filtered.${chr_no}.chunk_*.common.bcf"), emit:common_bcf script: """ ls -1v ${chr_no}/10k.filtered.${chr_no}.chunk_*.common.bcf > ${chr_no}/${chr_no}_phase_common.txt ligate \ --input ${chr_no}/${chr_no}_phase_common.txt \ --output ${chr_no}/10k.filtered.${chr_no}.common_ligate.bcf \ --thread ${task.cpus} \ --index """ }

Источник: https://stackoverflow.com/questions/780 ... n-the-list
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение

Вернуться в «Python»