I am trying to implement the DSL API example from the Apache Beam documentation. I am using the newest versions of the apache beam libraries (2.4.0)
我正在尝试从Apache Beam文档中实现DSL API示例。我正在使用最新版本的apache beam libraries(2.4.0)
The code I am running is the same as in the docs:
我运行的代码与文档中的代码相同:
@Rule
public final transient TestPipeline p = TestPipeline.create();
@Test
public void dslTest() {
RowType appType = RowSqlType
.builder()
.withIntegerField("appId")
.withVarcharField("description")
.withTimestampField("rowtime")
.build();
// Create a concrete row with that type.
Row row = Row.withRowType(appType)
.addValues(1, "Some cool app", new Date())
.build();
// Create a source PCollection containing only that row
PCollection<Row> testApps = PBegin
.in(p)
.apply(Create
.of(row)
.withCoder(appType.getRowCoder()));
PCollection<Row> filteredNames = testApps.apply(
BeamSql.query(
"SELECT appId, description, rowtime "
+ "FROM PCOLLECTION "
+ "WHERE appId=1"));
}
This always fails with the following error:
这总是失败,出现以下错误:
java.lang.AssertionError
at org.apache.beam.sdks.java.extensions.sql.repackaged.org.apache.calcite.plan.volcano.VolcanoPlanner.changeTraits(VolcanoPlanner.java:546)
at org.apache.beam.sdks.java.extensions.sql.repackaged.org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:365)
at org.apache.beam.sdks.java.extensions.sql.repackaged.org.apache.calcite.prepare.PlannerImpl.transform(PlannerImpl.java:336)
at org.apache.beam.sdk.extensions.sql.impl.planner.BeamQueryPlanner.convertToBeamRel(BeamQueryPlanner.java:165)
at org.apache.beam.sdk.extensions.sql.impl.planner.BeamQueryPlanner.validateAndConvert(BeamQueryPlanner.java:156)
at org.apache.beam.sdk.extensions.sql.impl.planner.BeamQueryPlanner.convertToBeamRel(BeamQueryPlanner.java:144)
at org.apache.beam.sdk.extensions.sql.QueryTransform.expand(QueryTransform.java:73)
at org.apache.beam.sdk.extensions.sql.QueryTransform.expand(QueryTransform.java:47)
at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:537)
at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:472)
at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:286)
at PipelineTest.dslTest(PipelineTest.java:42)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.apache.beam.sdk.testing.TestPipeline$1.evaluate(TestPipeline.java:324)
at org.junit.rules.RunRules.evaluate(RunRules.java:20)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
at com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47)
at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)
at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)
What is the correct way to run this test, or is this a bug and the dsl apis are not working?
运行此测试的正确方法是什么,或者这是一个错误并且dsl apis不起作用?
1 个解决方案
#1
1
It looks like a known issue, there's an assertion inside a planner which Beam fails to satisfy, see this jira. The fix for it is not ready at the moment.
它看起来像一个已知的问题,在计划器内部有一个断言,梁无法满足,看到这个jira。目前还没有准备好修复它。
Current workaround is to disable assertions, depending on your build system:
当前的解决方法是禁用断言,具体取决于您的构建系统:
-
if you're using gradle, then in build.gradle you will have something like this:
如果您正在使用gradle,那么在build.gradle中,您将拥有以下内容:
test { jvmArgs "-da" }
-
if you're using maven, then in pom.xml you will have something like this:
如果你正在使用maven,那么在pom.xml中你会得到这样的东西:
<plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-surefire-plugin</artifactId> <configuration> <argLine>-da</argLine>
#1
1
It looks like a known issue, there's an assertion inside a planner which Beam fails to satisfy, see this jira. The fix for it is not ready at the moment.
它看起来像一个已知的问题,在计划器内部有一个断言,梁无法满足,看到这个jira。目前还没有准备好修复它。
Current workaround is to disable assertions, depending on your build system:
当前的解决方法是禁用断言,具体取决于您的构建系统:
-
if you're using gradle, then in build.gradle you will have something like this:
如果您正在使用gradle,那么在build.gradle中,您将拥有以下内容:
test { jvmArgs "-da" }
-
if you're using maven, then in pom.xml you will have something like this:
如果你正在使用maven,那么在pom.xml中你会得到这样的东西:
<plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-surefire-plugin</artifactId> <configuration> <argLine>-da</argLine>