为什么我的PCollection(SCollection)大小与BigQuery Table输入大小相比如此之大?

时间:2021-10-26 15:35:54

为什么我的PCollection(SCollection)大小与BigQuery Table输入大小相比如此之大?

The above image is the table schema for a big query table which is the input into an apache beam dataflow job that runs on spotify's scio. If you aren't familiar with scio it's a Scala wrapper around the Apache Beam Java SDK. In particular, a "SCollection wraps PCollection". My input table on BigQuery disk is 136 gigs, but upon looking at the size of my SCollection in the dataflow UI it is 504.91 GB. 为什么我的PCollection(SCollection)大小与BigQuery Table输入大小相比如此之大?

上面的图像是一个大查询表的表模式,它是在spotify的scio上运行的apache beam数据流作业的输入。如果您不熟悉scio,那么它就是Apache Beam Java SDK的Scala包装器。特别是,“SCollection包装PCollection”。我在BigQuery磁盘上的输入表是136演出,但是在查看数据流UI中我的SCollection的大小时,它是504.91 GB。

I understand that BigQuery is likely much better at data compression and representation, but a >3x increase in size seems quite high. To be very clear I'm using Type Safe Big Query Case Class (let's call it Clazz) representation, so my SCollection is of type SCollection[Clazz] instead of SCollection[TableRow]. TableRow is the native representation in the Java JDK. Any tips on how to keep the memory allocation down? It is related to a particular column type in my input: Bytes, Strings, Record, Floats, etc?

我知道BigQuery在数据压缩和表示方面可能要好得多,但是大小增加3倍似乎相当高。要非常清楚我正在使用Type Safe Big Query Case Class(让我们称之为Clazz)表示,所以我的SCollection的类型为SCollection [Clazz]而不是SCollection [TableRow]。 TableRow是Java JDK中的本机表示形式。关于如何保持内存分配的任何提示?它与我输入中的特定列类型有关:字节,字符串,记录,浮点数等?

1 个解决方案

#1


4  

This is likely due to the TableRow format which contains string names for the columns, that add to the size.

这可能是由于TableRow格式包含列的字符串名称,这会增加大小。

Consider using the following to create a PCollection of objects instead of TableRows. This allows you to directly read into an object which matches the schema, which should reduce the data size a little bit.

请考虑使用以下内容创建对象的PCollection而不是TableRows。这允许您直接读入与模式匹配的对象,这应该会略微减少数据大小。

  /**
   * Reads from a BigQuery table or query and returns a {@link PCollection} with one element per
   * each row of the table or query result, parsed from the BigQuery AVRO format using the specified
   * function.
   *
   * <p>Each {@link SchemaAndRecord} contains a BigQuery {@link TableSchema} and a
   * {@link GenericRecord} representing the row, indexed by column name. Here is a
   * sample parse function that parses click events from a table.
   *
   * <pre>{@code
   * class ClickEvent { long userId; String url; ... }
   *
   * p.apply(BigQueryIO.read(new SerializableFunction<SchemaAndRecord, ClickEvent>() {
   *   public ClickEvent apply(SchemaAndRecord record) {
   *     GenericRecord r = record.getRecord();
   *     return new ClickEvent((Long) r.get("userId"), (String) r.get("url"));
   *   }
   * }).from("...");
   * }</pre>
   */
  public static <T> TypedRead<T> read(
      SerializableFunction<SchemaAndRecord, T> parseFn) {

#1


4  

This is likely due to the TableRow format which contains string names for the columns, that add to the size.

这可能是由于TableRow格式包含列的字符串名称,这会增加大小。

Consider using the following to create a PCollection of objects instead of TableRows. This allows you to directly read into an object which matches the schema, which should reduce the data size a little bit.

请考虑使用以下内容创建对象的PCollection而不是TableRows。这允许您直接读入与模式匹配的对象,这应该会略微减少数据大小。

  /**
   * Reads from a BigQuery table or query and returns a {@link PCollection} with one element per
   * each row of the table or query result, parsed from the BigQuery AVRO format using the specified
   * function.
   *
   * <p>Each {@link SchemaAndRecord} contains a BigQuery {@link TableSchema} and a
   * {@link GenericRecord} representing the row, indexed by column name. Here is a
   * sample parse function that parses click events from a table.
   *
   * <pre>{@code
   * class ClickEvent { long userId; String url; ... }
   *
   * p.apply(BigQueryIO.read(new SerializableFunction<SchemaAndRecord, ClickEvent>() {
   *   public ClickEvent apply(SchemaAndRecord record) {
   *     GenericRecord r = record.getRecord();
   *     return new ClickEvent((Long) r.get("userId"), (String) r.get("url"));
   *   }
   * }).from("...");
   * }</pre>
   */
  public static <T> TypedRead<T> read(
      SerializableFunction<SchemaAndRecord, T> parseFn) {