У меня есть один большой файл, а также файл с содержимым заголовка. Я получу один входной номер total parts=(может быть 100, 20 или 200 аргумент времени выполнения).
Используя их, я хочу разделить свой большой файл на общее количество частей, чтобы каждый файл имел доступно содержимое заголовка.
Приведенный ниже код почти сгенерирован Chatgpt, но не может правильно вывести данные. Может ли кто-нибудь объяснить какой-либо справочный документ или исправление?
import org.apache.beam.runners.direct.DirectRunner;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
import org.joda.time.Instant;
import java.io.BufferedWriter;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.io.FileWriter;
import java.nio.file.Path;
public class MySampleFile {
public interface Options extends PipelineOptions {
ValueProvider getHeaderFilePath();
void setHeaderFilePath(ValueProvider value);
ValueProvider getLargeFilePath();
void setLargeFilePath(ValueProvider value);
ValueProvider getTotalFileParts();
void setTotalFileParts(ValueProvider value);
ValueProvider getOutputPath();
void setOutputPath(ValueProvider value);
}
public static void main(String[] args) {
Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
options.setRunner(DirectRunner.class);
Pipeline pipeline = Pipeline.create(options);
// Step 1: Read the header file into a side input
PCollectionView headerView = pipeline
.apply("ReadHeaderFile", TextIO.read().from(options.getHeaderFilePath()))
.apply("HeaderAsList", View.asList());
// Step 2: Read the large file
PCollection largeFileLines = pipeline
.apply("ReadLargeFile", TextIO.read().from(options.getLargeFilePath()));
PCollectionView largeFileSizeView = largeFileLines
.apply("CalculateFileSize", ParDo.of(new DoFn() {
@ProcessElement
public void processElement(ProcessContext c) {
String line = c.element();
c.output((long) line.getBytes().length);
}
}))
.apply("SumFileSize", Sum.longsGlobally().asSingletonView());
// Step 3: Split the large file content into parts and add header
PCollection partitionedContent = largeFileLines
.apply("PartitionFile", ParDo.of(new PartitionFileFn(options.getTotalFileParts(), headerView, largeFileSizeView))
.withSideInputs(headerView, largeFileSizeView));
// Print all the data in the PCollection to System.out.println
partitionedContent.apply("PrintPartitions", ParDo.of(new DoFn() {
@ProcessElement
public void processElement(ProcessContext c) {
KV partition = c.element();
System.out.println("Partition Index: " + partition.getKey());
for (String line : partition.getValue()) {
System.out.println(line);
}
}
}));
//Step 4: Write each partitioned file to GCS
partitionedContent
.apply("FormatAndWritePartitions", ParDo.of(new WritePartitionFn(options.getTotalFileParts(), options.getOutputPath(), headerView))
.withSideInputs(headerView));
pipeline.run().waitUntilFinish();
}
static class PartitionFileFn extends DoFn {
private final ValueProvider totalFileParts;
private final PCollectionView headerView;
private final PCollectionView largeFileSizeView;
private List currentPartition = new ArrayList();
private int currentPartitionSize = 0;
private int partitionIndex = 0;
private long fileSizeInBytes = 0 ;
public PartitionFileFn(ValueProvider totalFileParts, PCollectionView headerView, PCollectionView largeFileSizeView) {
this.totalFileParts = totalFileParts;
this.headerView = headerView;
this.largeFileSizeView = largeFileSizeView ;
}
@ProcessElement
public void processElement(ProcessContext c) {
String line = c.element();
int lineSize = line.getBytes().length;
fileSizeInBytes = c.sideInput(largeFileSizeView).longValue() ;
float partitionSizeInBytes = (float)fileSizeInBytes / totalFileParts.get() ;
if (currentPartitionSize + lineSize > partitionSizeInBytes) {
// Emit the current partition and reset
c.output(KV.of(partitionIndex++, new ArrayList(currentPartition)));
currentPartition.clear();
currentPartitionSize = 0;
}
currentPartition.add(line);
currentPartitionSize += lineSize;
}
// Emit any remaining partition at the end of the bundle
@FinishBundle
public void finishBundle(FinishBundleContext context) {
if (!currentPartition.isEmpty()) {
context.output(KV.of(partitionIndex++, new ArrayList(currentPartition)), Instant.now(), GlobalWindow.INSTANCE);
}
}
}
static class WritePartitionFn extends DoFn {
private final ValueProvider outputPath;
private final PCollectionView headerView;
public WritePartitionFn(ValueProvider totalParts,ValueProvider outputPath, PCollectionView headerView) {
this.outputPath = outputPath;
this.headerView = headerView;
}
@ProcessElement
public void processElement(ProcessContext context) {
KV partition = context.element();
int partitionIndex = partition.getKey();
List content = partition.getValue();
List header = context.sideInput(headerView);
String outputFileName = outputPath.get() + "/partition-" + partitionIndex + ".txt";
Path outputPath = Paths.get(outputFileName);
try {
synchronized (this) {
boolean isNewFile = !Files.exists(outputPath) || Files.size(outputPath) == 0;
try (BufferedWriter writer = new BufferedWriter(new FileWriter(outputPath.toString(), true))) {
// Write the header only if the file is new or empty
if (isNewFile) {
for (String headerLine : header) {
writer.write(headerLine);
writer.newLine();
}
}
// Write the content
for (String line : content) {
writer.write(line);
writer.newLine();
}
}
}
} catch (IOException e) {
throw new RuntimeException("Error writing to output file: " + outputFileName, e);
}
}
}
}
Подробнее здесь: https://stackoverflow.com/questions/791 ... -into-many
Apache Beam DataflowRunner Разделение файлов на множество ⇐ JAVA
Программисты JAVA общаются здесь
-
Anonymous
1730538734
Anonymous
У меня есть один большой файл, а также файл с содержимым заголовка. Я получу один входной номер total parts=(может быть 100, 20 или 200 аргумент времени выполнения).
Используя их, я хочу разделить свой большой файл на общее количество частей, чтобы каждый файл имел доступно содержимое заголовка.
Приведенный ниже код почти сгенерирован Chatgpt, но не может правильно вывести данные. Может ли кто-нибудь объяснить какой-либо справочный документ или исправление?
import org.apache.beam.runners.direct.DirectRunner;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
import org.joda.time.Instant;
import java.io.BufferedWriter;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.io.FileWriter;
import java.nio.file.Path;
public class MySampleFile {
public interface Options extends PipelineOptions {
ValueProvider getHeaderFilePath();
void setHeaderFilePath(ValueProvider value);
ValueProvider getLargeFilePath();
void setLargeFilePath(ValueProvider value);
ValueProvider getTotalFileParts();
void setTotalFileParts(ValueProvider value);
ValueProvider getOutputPath();
void setOutputPath(ValueProvider value);
}
public static void main(String[] args) {
Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
options.setRunner(DirectRunner.class);
Pipeline pipeline = Pipeline.create(options);
// Step 1: Read the header file into a side input
PCollectionView headerView = pipeline
.apply("ReadHeaderFile", TextIO.read().from(options.getHeaderFilePath()))
.apply("HeaderAsList", View.asList());
// Step 2: Read the large file
PCollection largeFileLines = pipeline
.apply("ReadLargeFile", TextIO.read().from(options.getLargeFilePath()));
PCollectionView largeFileSizeView = largeFileLines
.apply("CalculateFileSize", ParDo.of(new DoFn() {
@ProcessElement
public void processElement(ProcessContext c) {
String line = c.element();
c.output((long) line.getBytes().length);
}
}))
.apply("SumFileSize", Sum.longsGlobally().asSingletonView());
// Step 3: Split the large file content into parts and add header
PCollection partitionedContent = largeFileLines
.apply("PartitionFile", ParDo.of(new PartitionFileFn(options.getTotalFileParts(), headerView, largeFileSizeView))
.withSideInputs(headerView, largeFileSizeView));
// Print all the data in the PCollection to System.out.println
partitionedContent.apply("PrintPartitions", ParDo.of(new DoFn() {
@ProcessElement
public void processElement(ProcessContext c) {
KV partition = c.element();
System.out.println("Partition Index: " + partition.getKey());
for (String line : partition.getValue()) {
System.out.println(line);
}
}
}));
//Step 4: Write each partitioned file to GCS
partitionedContent
.apply("FormatAndWritePartitions", ParDo.of(new WritePartitionFn(options.getTotalFileParts(), options.getOutputPath(), headerView))
.withSideInputs(headerView));
pipeline.run().waitUntilFinish();
}
static class PartitionFileFn extends DoFn {
private final ValueProvider totalFileParts;
private final PCollectionView headerView;
private final PCollectionView largeFileSizeView;
private List currentPartition = new ArrayList();
private int currentPartitionSize = 0;
private int partitionIndex = 0;
private long fileSizeInBytes = 0 ;
public PartitionFileFn(ValueProvider totalFileParts, PCollectionView headerView, PCollectionView largeFileSizeView) {
this.totalFileParts = totalFileParts;
this.headerView = headerView;
this.largeFileSizeView = largeFileSizeView ;
}
@ProcessElement
public void processElement(ProcessContext c) {
String line = c.element();
int lineSize = line.getBytes().length;
fileSizeInBytes = c.sideInput(largeFileSizeView).longValue() ;
float partitionSizeInBytes = (float)fileSizeInBytes / totalFileParts.get() ;
if (currentPartitionSize + lineSize > partitionSizeInBytes) {
// Emit the current partition and reset
c.output(KV.of(partitionIndex++, new ArrayList(currentPartition)));
currentPartition.clear();
currentPartitionSize = 0;
}
currentPartition.add(line);
currentPartitionSize += lineSize;
}
// Emit any remaining partition at the end of the bundle
@FinishBundle
public void finishBundle(FinishBundleContext context) {
if (!currentPartition.isEmpty()) {
context.output(KV.of(partitionIndex++, new ArrayList(currentPartition)), Instant.now(), GlobalWindow.INSTANCE);
}
}
}
static class WritePartitionFn extends DoFn {
private final ValueProvider outputPath;
private final PCollectionView headerView;
public WritePartitionFn(ValueProvider totalParts,ValueProvider outputPath, PCollectionView headerView) {
this.outputPath = outputPath;
this.headerView = headerView;
}
@ProcessElement
public void processElement(ProcessContext context) {
KV partition = context.element();
int partitionIndex = partition.getKey();
List content = partition.getValue();
List header = context.sideInput(headerView);
String outputFileName = outputPath.get() + "/partition-" + partitionIndex + ".txt";
Path outputPath = Paths.get(outputFileName);
try {
synchronized (this) {
boolean isNewFile = !Files.exists(outputPath) || Files.size(outputPath) == 0;
try (BufferedWriter writer = new BufferedWriter(new FileWriter(outputPath.toString(), true))) {
// Write the header only if the file is new or empty
if (isNewFile) {
for (String headerLine : header) {
writer.write(headerLine);
writer.newLine();
}
}
// Write the content
for (String line : content) {
writer.write(line);
writer.newLine();
}
}
}
} catch (IOException e) {
throw new RuntimeException("Error writing to output file: " + outputFileName, e);
}
}
}
}
Подробнее здесь: [url]https://stackoverflow.com/questions/79150318/apache-beam-dataflowrunner-split-files-into-many[/url]
Ответить
1 сообщение
• Страница 1 из 1
Перейти
- Кемерово-IT
- ↳ Javascript
- ↳ C#
- ↳ JAVA
- ↳ Elasticsearch aggregation
- ↳ Python
- ↳ Php
- ↳ Android
- ↳ Html
- ↳ Jquery
- ↳ C++
- ↳ IOS
- ↳ CSS
- ↳ Excel
- ↳ Linux
- ↳ Apache
- ↳ MySql
- Детский мир
- Для души
- ↳ Музыкальные инструменты даром
- ↳ Печатная продукция даром
- Внешняя красота и здоровье
- ↳ Одежда и обувь для взрослых даром
- ↳ Товары для здоровья
- ↳ Физкультура и спорт
- Техника - даром!
- ↳ Автомобилистам
- ↳ Компьютерная техника
- ↳ Плиты: газовые и электрические
- ↳ Холодильники
- ↳ Стиральные машины
- ↳ Телевизоры
- ↳ Телефоны, смартфоны, плашеты
- ↳ Швейные машинки
- ↳ Прочая электроника и техника
- ↳ Фототехника
- Ремонт и интерьер
- ↳ Стройматериалы, инструмент
- ↳ Мебель и предметы интерьера даром
- ↳ Cантехника
- Другие темы
- ↳ Разное даром
- ↳ Давай меняться!
- ↳ Отдам\возьму за копеечку
- ↳ Работа и подработка в Кемерове
- ↳ Давай с тобой поговорим...
Мобильная версия