从Google App Engine应用运行Google Dataflow管道?

时间:2021-07-31 15:35:59

I am creating a dataflow job using DataflowPipelineRunner. I tried the following scenarios.

我正在使用DataflowPipelineRunner创建数据流作业。我尝试了以下方案。

  1. Without specifying any machineType
  2. 没有指定任何machineType
  3. With g1 small machine
  4. 用g1小机器
  5. with n1-highmem-2
  6. 与n1-highmem-2

In all the above scenarios, Input is a file from GCS which is very small file(KB size) and output is Big Query table.

在上述所有场景中,Input是来自GCS的文件,文件非常小(KB大小),输出为Big Query表。

I got Out of memory error in all the scenarios

在所有场景中我都出现内存不足错误

The size of my compiled code is 94mb. I am trying only word count example and it did not read any input(It fails before the job starts). Please help me understand why i am getting this error.

我编译的代码的大小是94mb。我只尝试单词计数示例,它没有读取任何输入(它在作业开始之前失败)。请帮助我理解为什么我收到此错误。

Note: I am using appengine to start the job.

注意:我正在使用appengine来开始工作。

Note: The same code works with beta versoin 0.4.150414

注意:相同的代码适用于beta versoin 0.4.150414

EDIT 1

编辑1

As per the suggestions in the answer tried the following,

根据答案中的建议尝试以下,

  1. Switched from Automatic scaling to Basic Scaling.
  2. 从自动缩放切换到基本缩放。
  3. Used machine type B2 which provides 256MB memory
  4. 二手机器类型B2,提供256MB内存

After these configuration, Java Heap Memory problem is solved. But it is trying to upload a jar into stagging location which is more than 10Mb, hence it fails.

在这些配置之后,Java Heap Memory问题得以解决。但它试图将一个罐子上传到超过10Mb的交错位置,因此它失败了。

It logs the following exception

它记录以下异常

com.google.api.client.http.HttpRequest execute: exception thrown while executing request
com.google.appengine.api.urlfetch.RequestPayloadTooLargeException: The request to https://www.googleapis.com/upload/storage/v1/b/pwccloudedw-stagging-bucket/o?name=appengine-api-L4wtoWwoElWmstI1Ia93cg.jar&uploadType=resumable&upload_id=AEnB2Uo6HCfw6Usa3aXlcOzg0g3RawrvuAxWuOUtQxwQdxoyA0cf22LKqno0Gu-hjKGLqXIo8MF2FHR63zTxrSmQ9Yk9HdCdZQ exceeded the 10 MiB limit.
at com.google.appengine.api.urlfetch.URLFetchServiceImpl.convertApplicationException(URLFetchServiceImpl.java:157)
at com.google.appengine.api.urlfetch.URLFetchServiceImpl.fetch(URLFetchServiceImpl.java:45)
at com.google.apphosting.utils.security.urlfetch.URLFetchServiceStreamHandler$Connection.fetchResponse(URLFetchServiceStreamHandler.java:543)
at com.google.apphosting.utils.security.urlfetch.URLFetchServiceStreamHandler$Connection.getInputStream(URLFetchServiceStreamHandler.java:422)
at com.google.apphosting.utils.security.urlfetch.URLFetchServiceStreamHandler$Connection.getResponseCode(URLFetchServiceStreamHandler.java:275)
at com.google.api.client.http.javanet.NetHttpResponse.<init>(NetHttpResponse.java:36)
at com.google.api.client.http.javanet.NetHttpRequest.execute(NetHttpRequest.java:94)
at com.google.api.client.http.HttpRequest.execute(HttpRequest.java:965)
at com.google.api.client.googleapis.media.MediaHttpUploader.executeCurrentRequestWithoutGZip(MediaHttpUploader.java:545)
at com.google.api.client.googleapis.media.MediaHttpUploader.executeCurrentRequest(MediaHttpUploader.java:562)
at com.google.api.client.googleapis.media.MediaHttpUploader.resumableUpload(MediaHttpUploader.java:419)
at com.google.api.client.googleapis.media.MediaHttpUploader.upload(MediaHttpUploader.java:336)
at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:427)
at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:352)
at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:469)
at com.google.cloud.hadoop.util.AbstractGoogleAsyncWriteChannel$UploadOperation.call(AbstractGoogleAsyncWriteChannel.java:357)
at java.util.concurrent.FutureTask.run(FutureTask.java:260)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1168)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:605)
at com.google.apphosting.runtime.ApiProxyImpl$CurrentRequestThreadFactory$1$1.run(ApiProxyImpl.java:1152)
at java.security.AccessController.doPrivileged(Native Method)
at com.google.apphosting.runtime.ApiProxyImpl$CurrentRequestThreadFactory$1.run(ApiProxyImpl.java:1146)
at java.lang.Thread.run(Thread.java:745)
at com.google.apphosting.runtime.ApiProxyImpl$CurrentRequestThreadFactory$2$1.run(ApiProxyImpl.java:1195)

I tried directly uploading the jar file - appengine-api-1.0-sdk-1.9.20.jar, still it tries to upload this jar appengine-api-L4wtoWwoElWmstI1Ia93cg.jar. which i dont know what jar it is. Any idea on what this jar is appreciated.

我试着直接上传jar文件 - appengine-api-1.0-sdk-1.9.20.jar,但它还是试图上传这个jar appengine-api-L4wtoWwoElWmstI1Ia93cg.jar。我不知道它是什么罐子。知道这个罐子是什么的任何想法。

Please help me to fix this issue.

请帮我解决这个问题。

3 个解决方案

#1


4  

The short answer is that if you use AppEngine on a Managed VM you will not encounter the AppEngine sandbox limits (OOM when using a F1 or B1 instance class, execution time limit issues, whitelisted JRE classes). If you really want to run within the App Engine sandbox, then your use of the Dataflow SDK most conform to the limits of the AppEngine sandbox. Below I explain common issues and what people have done to conform to the AppEngine sandbox limits.

简短的回答是,如果在托管VM上使用AppEngine,则不会遇到AppEngine沙箱限制(使用F1或B1实例类时的OOM,执行时间限制问题,列入白名单的JRE类)。如果您确实希望在App Engine沙箱中运行,那么您对Dataflow SDK的使用大多符合AppEngine沙箱的限制。下面我将解释常见问题以及人们为遵守AppEngine沙箱限制所做的工作。

The Dataflow SDK requires an AppEngine instance class which has enough memory to execute the users application to construct the pipeline, stage any resources, and send the job description to the Dataflow service. Typically we have seen users require using an instance class with more than 128mb of memory to not see OOM errors.

Dataflow SDK需要一个AppEngine实例类,该实例类具有足够的内存来执行用户应用程序以构建管道,暂存任何资源,并将作业描述发送到Dataflow服务。通常我们看到用户需要使用内存超过128mb的实例类才能看到OOM错误。

Generally constructing a pipeline and submitting it to the Dataflow service typically takes less than a couple of seconds if the required resources for your application are already staged. Uploading your JARs and any other resources to GCS can take longer than 60 seconds. This can be solved manually by pre-staging your JARs to GCS beforehand (the Dataflow SDK will skip staging them again if it detects they are already there) or using a task queue to get a 10 minute limit (note that for large applications, 10 mins may not be enough to stage all your resources).

通常,如果已经暂存了应用程序所需的资源,通常构建管道并将其提交到Dataflow服务通常需要不到几秒钟的时间。将JAR和任何其他资源上载到GCS可能需要超过60秒。这可以通过事先将JAR预先暂存到GCS来手动解决(如果Dataflow SDK检测到它们已经存在,则会再次跳过它们)或使用任务队列获得10分钟的限制(请注意,对于大型应用程序,10分钟可能不足以分配所有资源)。

Finally, within the AppEngine sandbox environment, you and all your dependencies are limited to using only whitelisted classes within the JRE or you'll get an exception like:

最后,在AppEngine沙箱环境中,您和您的所有依赖项仅限于在JRE中仅使用列入白名单的类,否则您将获得如下异常:

java.lang.SecurityException:
  java.lang.IllegalAccessException: YYY is not allowed on ZZZ
  ...

EDIT 1

编辑1

We perform a hash of the contents of the jars on the classpath and upload them to GCS with a modified filename. AppEngine runs a sandboxed environment with its own JARs, appengine-api-L4wtoWwoElWmstI1Ia93cg.jar refers to appengine-api.jar which is a jar that the sandboxed environment adds. You can see from our PackageUtil#getUniqueContentName(...) that we just append -$HASH before .jar.

我们在类路径上执行jar内容的哈希,并使用修改后的文件名将它们上传到GCS。 AppEngine运行带有自己的JAR的沙盒环境,appengine-api-L4wtoWwoElWmstI1Ia93cg.jar指的是appengine-api.jar,它是沙盒环境添加的jar。你可以从我们刚刚附加的PackageUtil#getUniqueContentName(...)中看到 - $。哈希在.jar之前。

We are working to solve why you are seeing the RequestPayloadToLarge excepton and it is currently recommended that you set the filesToStage option and filter out the jars not required to execute your Dataflow to get around the issue that you face. You can see how we build the files to stage with DataflowPipelineRunner#detectClassPathResourcesToStage(...).

我们正在努力解决您查看RequestPayloadToLarge除外的原因,目前建议您设置filesToStage选项并过滤掉执行数据流所不需要的jar以解决您遇到的问题。您可以看到我们如何构建要使用DataflowPipelineRunner#detectClassPathResourcesToStage(...)进行分段的文件。

#2


1  

I had the same problem with the 10MB limit. What I did was filtering out the JAR files bigger than that limit (instead of specific files), and then set the renaming files in the DataflowPipelineOptions with setFilesToStage.

我有10MB限制的同样问题。我所做的是过滤掉大于该限制的JAR文件(而不是特定文件),然后使用setFilesToStage在DataflowPipelineOptions中设置重命名文件。

So I just copied the method detectClassPathResourcesToStage from the Dataflow SDK and changed it sightly:

所以我只是从Dataflow SDK中复制了方法detectClassPathResourcesToStage并将其改为:

private static final long FILE_BYTES_THRESHOLD = 10 * 1024 * 1024; // 10 MB

protected static List<String> detectClassPathResourcesToStage(ClassLoader classLoader) {
    if (!(classLoader instanceof URLClassLoader)) {
        String message = String.format("Unable to use ClassLoader to detect classpath elements. "
                + "Current ClassLoader is %s, only URLClassLoaders are supported.", classLoader);
        throw new IllegalArgumentException(message);
    }

    List<String> files = new ArrayList<>();
    for (URL url : ((URLClassLoader) classLoader).getURLs()) {
        try {
            File file = new File(url.toURI());
            if (file.length() < FILE_BYTES_THRESHOLD) {
                files.add(file.getAbsolutePath());
            }
        } catch (IllegalArgumentException | URISyntaxException e) {
            String message = String.format("Unable to convert url (%s) to file.", url);
            throw new IllegalArgumentException(message, e);
        }
    }
    return files;
}

And then when I'm creating the DataflowPipelineOptions:

然后当我创建DataflowPipelineOptions时:

DataflowPipelineOptions dataflowOptions = PipelineOptionsFactory.create().as(DataflowPipelineOptions.class);
...
dataflowOptions.setFilesToStage(detectClassPathResourcesToStage(DataflowPipelineRunner.class.getClassLoader()));

#3


1  

Here's a version of Helder's 10MB-filtering solution that will adapt to the default file-staging behavior of DataflowPipelineOptions even if it changes in a future version of the SDK.

这是Helder的10MB过滤解决方案的一个版本,它将适应DataflowPipelineOptions的默认文件暂存行为,即使它在SDK的未来版本中发生了变化。

Instead of duplicating the logic, it passes a throwaway copy of the DataflowPipelineOptions to DataflowPipelineRunner to see which files it would have staged, then removes any that are too big.

它不是复制逻辑,而是将DataflowPipelineOptions的一次性副本传递给DataflowPipelineRunner,以查看它将要播放的文件,然后删除任何太大的文件。

Note that this code assumes that you've defined a custom PipelineOptions class named MyOptions, along with a java.util.Logger field named logger.

请注意,此代码假定您已定义名为MyOptions的自定义PipelineOptions类以及名为logger的java.util.Logger字段。

// The largest file size that can be staged to the dataflow service.
private static final long MAX_STAGED_FILE_SIZE_BYTES = 10 * 1024 * 1024;

/**
 * Returns the list of .jar/etc files to stage based on the
 * Options, filtering out any files that are too large for
 * DataflowPipelineRunner.
 *
 * <p>If this accidentally filters out a necessary file, it should
 * be obvious when the pipeline fails with a runtime link error.
 */
private static ImmutableList<String> getFilesToStage(MyOptions options) {
  // Construct a throw-away runner with a copy of the Options to see
  // which files it would have wanted to stage. This could be an
  // explicitly-specified list of files from the MyOptions param, or
  // the default list of files determined by DataflowPipelineRunner.
  List<String> baseFiles;
  {
    DataflowPipelineOptions tmpOptions =
        options.cloneAs(DataflowPipelineOptions.class);
    // Ignore the result; we only care about how fromOptions()
    // modifies its parameter.
    DataflowPipelineRunner.fromOptions(tmpOptions);
    baseFiles = tmpOptions.getFilesToStage();
    // Some value should have been set.
    Preconditions.checkNotNull(baseFiles);
  }
  // Filter out any files that are too large to stage.
  ImmutableList.Builder<String> filteredFiles = ImmutableList.builder();
  for (String file : baseFiles) {
    long size = new File(file).length();
    if (size < MAX_STAGED_FILE_SIZE_BYTES) {
      filteredFiles.add(file);
    } else {
      logger.info("Not staging large file " + file + ": length " + size
          + " >= max length " + MAX_STAGED_FILE_SIZE_BYTES);
    }
  }
  return filteredFiles.build();
}

/** Runs the processing pipeline with given options. */
public void runPipeline(MyOptions options)
    throws IOException, InterruptedException {
  // DataflowPipelineRunner can't stage large files;
  // remove any from the list.
  DataflowPipelineOptions dpOpts =
      options.as(DataflowPipelineOptions.class);
  dpOpts.setFilesToStage(getFilesToStage(options));

  // Run the pipeline as usual using "options".
  // ...
}

#1


4  

The short answer is that if you use AppEngine on a Managed VM you will not encounter the AppEngine sandbox limits (OOM when using a F1 or B1 instance class, execution time limit issues, whitelisted JRE classes). If you really want to run within the App Engine sandbox, then your use of the Dataflow SDK most conform to the limits of the AppEngine sandbox. Below I explain common issues and what people have done to conform to the AppEngine sandbox limits.

简短的回答是,如果在托管VM上使用AppEngine,则不会遇到AppEngine沙箱限制(使用F1或B1实例类时的OOM,执行时间限制问题,列入白名单的JRE类)。如果您确实希望在App Engine沙箱中运行,那么您对Dataflow SDK的使用大多符合AppEngine沙箱的限制。下面我将解释常见问题以及人们为遵守AppEngine沙箱限制所做的工作。

The Dataflow SDK requires an AppEngine instance class which has enough memory to execute the users application to construct the pipeline, stage any resources, and send the job description to the Dataflow service. Typically we have seen users require using an instance class with more than 128mb of memory to not see OOM errors.

Dataflow SDK需要一个AppEngine实例类,该实例类具有足够的内存来执行用户应用程序以构建管道,暂存任何资源,并将作业描述发送到Dataflow服务。通常我们看到用户需要使用内存超过128mb的实例类才能看到OOM错误。

Generally constructing a pipeline and submitting it to the Dataflow service typically takes less than a couple of seconds if the required resources for your application are already staged. Uploading your JARs and any other resources to GCS can take longer than 60 seconds. This can be solved manually by pre-staging your JARs to GCS beforehand (the Dataflow SDK will skip staging them again if it detects they are already there) or using a task queue to get a 10 minute limit (note that for large applications, 10 mins may not be enough to stage all your resources).

通常,如果已经暂存了应用程序所需的资源,通常构建管道并将其提交到Dataflow服务通常需要不到几秒钟的时间。将JAR和任何其他资源上载到GCS可能需要超过60秒。这可以通过事先将JAR预先暂存到GCS来手动解决(如果Dataflow SDK检测到它们已经存在,则会再次跳过它们)或使用任务队列获得10分钟的限制(请注意,对于大型应用程序,10分钟可能不足以分配所有资源)。

Finally, within the AppEngine sandbox environment, you and all your dependencies are limited to using only whitelisted classes within the JRE or you'll get an exception like:

最后,在AppEngine沙箱环境中,您和您的所有依赖项仅限于在JRE中仅使用列入白名单的类,否则您将获得如下异常:

java.lang.SecurityException:
  java.lang.IllegalAccessException: YYY is not allowed on ZZZ
  ...

EDIT 1

编辑1

We perform a hash of the contents of the jars on the classpath and upload them to GCS with a modified filename. AppEngine runs a sandboxed environment with its own JARs, appengine-api-L4wtoWwoElWmstI1Ia93cg.jar refers to appengine-api.jar which is a jar that the sandboxed environment adds. You can see from our PackageUtil#getUniqueContentName(...) that we just append -$HASH before .jar.

我们在类路径上执行jar内容的哈希,并使用修改后的文件名将它们上传到GCS。 AppEngine运行带有自己的JAR的沙盒环境,appengine-api-L4wtoWwoElWmstI1Ia93cg.jar指的是appengine-api.jar,它是沙盒环境添加的jar。你可以从我们刚刚附加的PackageUtil#getUniqueContentName(...)中看到 - $。哈希在.jar之前。

We are working to solve why you are seeing the RequestPayloadToLarge excepton and it is currently recommended that you set the filesToStage option and filter out the jars not required to execute your Dataflow to get around the issue that you face. You can see how we build the files to stage with DataflowPipelineRunner#detectClassPathResourcesToStage(...).

我们正在努力解决您查看RequestPayloadToLarge除外的原因,目前建议您设置filesToStage选项并过滤掉执行数据流所不需要的jar以解决您遇到的问题。您可以看到我们如何构建要使用DataflowPipelineRunner#detectClassPathResourcesToStage(...)进行分段的文件。

#2


1  

I had the same problem with the 10MB limit. What I did was filtering out the JAR files bigger than that limit (instead of specific files), and then set the renaming files in the DataflowPipelineOptions with setFilesToStage.

我有10MB限制的同样问题。我所做的是过滤掉大于该限制的JAR文件(而不是特定文件),然后使用setFilesToStage在DataflowPipelineOptions中设置重命名文件。

So I just copied the method detectClassPathResourcesToStage from the Dataflow SDK and changed it sightly:

所以我只是从Dataflow SDK中复制了方法detectClassPathResourcesToStage并将其改为:

private static final long FILE_BYTES_THRESHOLD = 10 * 1024 * 1024; // 10 MB

protected static List<String> detectClassPathResourcesToStage(ClassLoader classLoader) {
    if (!(classLoader instanceof URLClassLoader)) {
        String message = String.format("Unable to use ClassLoader to detect classpath elements. "
                + "Current ClassLoader is %s, only URLClassLoaders are supported.", classLoader);
        throw new IllegalArgumentException(message);
    }

    List<String> files = new ArrayList<>();
    for (URL url : ((URLClassLoader) classLoader).getURLs()) {
        try {
            File file = new File(url.toURI());
            if (file.length() < FILE_BYTES_THRESHOLD) {
                files.add(file.getAbsolutePath());
            }
        } catch (IllegalArgumentException | URISyntaxException e) {
            String message = String.format("Unable to convert url (%s) to file.", url);
            throw new IllegalArgumentException(message, e);
        }
    }
    return files;
}

And then when I'm creating the DataflowPipelineOptions:

然后当我创建DataflowPipelineOptions时:

DataflowPipelineOptions dataflowOptions = PipelineOptionsFactory.create().as(DataflowPipelineOptions.class);
...
dataflowOptions.setFilesToStage(detectClassPathResourcesToStage(DataflowPipelineRunner.class.getClassLoader()));

#3


1  

Here's a version of Helder's 10MB-filtering solution that will adapt to the default file-staging behavior of DataflowPipelineOptions even if it changes in a future version of the SDK.

这是Helder的10MB过滤解决方案的一个版本,它将适应DataflowPipelineOptions的默认文件暂存行为,即使它在SDK的未来版本中发生了变化。

Instead of duplicating the logic, it passes a throwaway copy of the DataflowPipelineOptions to DataflowPipelineRunner to see which files it would have staged, then removes any that are too big.

它不是复制逻辑,而是将DataflowPipelineOptions的一次性副本传递给DataflowPipelineRunner,以查看它将要播放的文件,然后删除任何太大的文件。

Note that this code assumes that you've defined a custom PipelineOptions class named MyOptions, along with a java.util.Logger field named logger.

请注意,此代码假定您已定义名为MyOptions的自定义PipelineOptions类以及名为logger的java.util.Logger字段。

// The largest file size that can be staged to the dataflow service.
private static final long MAX_STAGED_FILE_SIZE_BYTES = 10 * 1024 * 1024;

/**
 * Returns the list of .jar/etc files to stage based on the
 * Options, filtering out any files that are too large for
 * DataflowPipelineRunner.
 *
 * <p>If this accidentally filters out a necessary file, it should
 * be obvious when the pipeline fails with a runtime link error.
 */
private static ImmutableList<String> getFilesToStage(MyOptions options) {
  // Construct a throw-away runner with a copy of the Options to see
  // which files it would have wanted to stage. This could be an
  // explicitly-specified list of files from the MyOptions param, or
  // the default list of files determined by DataflowPipelineRunner.
  List<String> baseFiles;
  {
    DataflowPipelineOptions tmpOptions =
        options.cloneAs(DataflowPipelineOptions.class);
    // Ignore the result; we only care about how fromOptions()
    // modifies its parameter.
    DataflowPipelineRunner.fromOptions(tmpOptions);
    baseFiles = tmpOptions.getFilesToStage();
    // Some value should have been set.
    Preconditions.checkNotNull(baseFiles);
  }
  // Filter out any files that are too large to stage.
  ImmutableList.Builder<String> filteredFiles = ImmutableList.builder();
  for (String file : baseFiles) {
    long size = new File(file).length();
    if (size < MAX_STAGED_FILE_SIZE_BYTES) {
      filteredFiles.add(file);
    } else {
      logger.info("Not staging large file " + file + ": length " + size
          + " >= max length " + MAX_STAGED_FILE_SIZE_BYTES);
    }
  }
  return filteredFiles.build();
}

/** Runs the processing pipeline with given options. */
public void runPipeline(MyOptions options)
    throws IOException, InterruptedException {
  // DataflowPipelineRunner can't stage large files;
  // remove any from the list.
  DataflowPipelineOptions dpOpts =
      options.as(DataflowPipelineOptions.class);
  dpOpts.setFilesToStage(getFilesToStage(options));

  // Run the pipeline as usual using "options".
  // ...
}