Код: Выделить всё
// This is essentially the final step in our pipeline, where we write
// one of the side outputs from the pipeline to a BigQuery table
results.get(matchedTag)
.apply("CountBackfill", Count.perElement())
.apply("ToReportRow", ParDo.of(new ToReportRow()))
// at this point, there is now a PCollection
.apply("WriteReport", BigQueryIO.writeTableRows()
.to(reportingDataset + ".AttributeBackfill_" + dayStr)
.withSchema(ReportSchema.get())
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED));
/*
* Create a TableRow from a key/value pair
*/
public static class ToReportRow extends DoFn {
private static final long serialVersionUID = 1L;
@ProcessElement
public void processElement(ProcessContext c) throws InterruptedException {
KV row = c.element();
c.output(new TableRow()
.set(ReportSchema.ID, row.getKey())
.set(ReportSchema.COUNT, row.getValue()));
}
}
Исключение в потоке " main" java.lang.NoSuchMethodError:
com.google.common.base.Preconditions.checkArgument(ZLjava/lang/String;Ljava/lang/Object;Ljava/lang/Object;)V
в
org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO$Write.expand(BigQueryIO.java:1426)
в
org.apache.beam.sdk.io. gcp.bigquery.BigQueryIO$Write.expand(BigQueryIO.java:989)
на org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:525) в
org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:479) в
org.apache. луч.sdk.values.PCollection.apply(PCollection.java:297) в
com.prod.merge.DailyUniqueProfiles.buildPipeline(DUP.java:106)
на com.prod.merge.MergePipeline.main(MergePipeline.java:91)
Строка .apply("WriteReport", BigQueryIO.writeTableRows() — это строка 106 в DUP.java, так что я подозреваю, что эта строка как-то неверна.
Есть идеи, в чем может быть проблема?
Подробнее здесь: https://stackoverflow.com/questions/480 ... etablerows
Мобильная версия