SideInputs破坏了DataFlow管道中的数据

时间:2023-01-07 15:34:52

I have a Dataflow pipeline (SDK 2.1.0, Apache Beam 2.2.0) which simply reads RDF (in N-Triples, so it's just text files) from GCS, transforms it somehow and writes it back to GCS, but in a different bucket. In this pipeline I employ side inputs which are three single files (one file per side input) and use them in a ParDo.

我有一个Dataflow管道(SDK 2.1.0,Apache Beam 2.2.0),它只是从GCS中读取RDF(在N-Triples中,所以它只是文本文件),以某种方式对其进行转换并将其写回GCS,但是在不同的桶。在这个管道中,我使用侧输入,这是三个单个文件(每侧输入一个文件)并在ParDo中使用它们。

To work with RDF in Java I use Apache Jena, so each file is read into an instance of Model class. Since Dataflow doesn't have Coder for it, I developed it myself (RDFModelCoder, see below). It works fine in number of other pipelines I created.

要在Java中使用RDF,我使用Apache Jena,因此每个文件都被读入Model类的实例。由于Dataflow没有Coder,我自己开发了它(RDFModelCoder,见下文)。它在我创建的其他管道中工作得很好。

The problem with this particular pipeline is when I add the side inputs, the execution fails with an exception indicating a corruption of the data, i.e. some garbage is added. Once I remove the side inputs, the pipeline finishes execution successfully.

这个特殊管道的问题是当我添加侧输入时,执行失败,异常表示数据损坏,即添加了一些垃圾。一旦我删除了侧输入,管道就会成功完成执行。

The exception (it's thrown from RDFModelCoder, see below):

异常(它是从RDFModelCoder抛出的,见下文):

Caused by: org.apache.jena.atlas.RuntimeIOException: java.nio.charset.MalformedInputException: Input length = 1
    at org.apache.jena.atlas.io.IO.exception(IO.java:233)
    at org.apache.jena.atlas.io.CharStreamBuffered$SourceReader.fill(CharStreamBuffered.java:77)
    at org.apache.jena.atlas.io.CharStreamBuffered.fillArray(CharStreamBuffered.java:154)
    at org.apache.jena.atlas.io.CharStreamBuffered.advance(CharStreamBuffered.java:137)
    at org.apache.jena.atlas.io.PeekReader.advanceAndSet(PeekReader.java:235)
    at org.apache.jena.atlas.io.PeekReader.init(PeekReader.java:229)
    at org.apache.jena.atlas.io.PeekReader.peekChar(PeekReader.java:151)
    at org.apache.jena.atlas.io.PeekReader.makeUTF8(PeekReader.java:92)
    at org.apache.jena.riot.tokens.TokenizerFactory.makeTokenizerUTF8(TokenizerFactory.java:48)
    at org.apache.jena.riot.lang.RiotParsers.createParser(RiotParsers.java:57)
    at org.apache.jena.riot.RDFParserRegistry$ReaderRIOTLang.read(RDFParserRegistry.java:198)
    at org.apache.jena.riot.RDFParser.read(RDFParser.java:298)
    at org.apache.jena.riot.RDFParser.parseNotUri(RDFParser.java:288)
    at org.apache.jena.riot.RDFParser.parse(RDFParser.java:237)
    at org.apache.jena.riot.RDFParserBuilder.parse(RDFParserBuilder.java:417)
    at org.apache.jena.riot.RDFDataMgr.parseFromInputStream(RDFDataMgr.java:870)
    at org.apache.jena.riot.RDFDataMgr.read(RDFDataMgr.java:268)
    at org.apache.jena.riot.RDFDataMgr.read(RDFDataMgr.java:254)
    at org.apache.jena.riot.adapters.RDFReaderRIOT.read(RDFReaderRIOT.java:69)
    at org.apache.jena.rdf.model.impl.ModelCom.read(ModelCom.java:305)

And here you can see the garbage (at the end):

在这里你可以看到垃圾(最后):

<http://example.com/typeofrepresentative/08> <http://www.w3.org/1999/02/22-rdf-syntax-ns#type> <http://www.w3.org/2002/07/owl#NamedIndividual> . ������** �����I��.�������������u�������

The pipeline:

管道:

val one = p.apply(TextIO.read().from(config.getString("source.one")))
           .apply(Combine.globally(SingleValue()))
           .apply(ParDo.of(ConvertToRDFModel(RDFLanguages.NTRIPLES)))

val two = p.apply(TextIO.read().from(config.getString("source.two")))
           .apply(Combine.globally(SingleValue()))
           .apply(ParDo.of(ConvertToRDFModel(RDFLanguages.NTRIPLES)))

val three = p.apply(TextIO.read().from(config.getString("source.three")))
             .apply(Combine.globally(SingleValue()))
             .apply(ParDo.of(ConvertToRDFModel(RDFLanguages.NTRIPLES)))

val sideInput = PCollectionList.of(one).and(two).and(three)
                .apply(Flatten.pCollections())
                .apply(View.asList())

p.apply(RDFIO.Read
                  .from(options.getSource())
                  .withSuffix(RDFLanguages.strLangNTriples))
 .apply(ParDo.of(SparqlConstructETL(config, sideInput))
                        .withSideInputs(sideInput))
 .apply(RDFIO.Write
                  .to(options.getDestination())
                  .withSuffix(RDFLanguages.NTRIPLES))

And just to provide the whole picture here are implementations of SingleValue and ConvertToRDFModel ParDos:

而这里只提供整个图片是SingleValue和ConvertToRDFModel ParDos的实现:

class SingleValue : SerializableFunction<Iterable<String>, String> {
    override fun apply(input: Iterable<String>?): String {
        if (input != null) {
            return input.joinToString(separator = " ")
        }
        return ""
    }
}

class ConvertToRDFModel(outputLang: Lang) : DoFn<String, Model>() {
    private val lang: String = outputLang.name

    @ProcessElement
    fun processElement(c: ProcessContext?) {
        if (c != null) {
            val model = ModelFactory.createDefaultModel()
            model.read(StringReader(c.element()), null, lang)
            c.output(model)
        }
    }
}

The implementation of RDFModelCoder:

RDFModelCoder的实现:

class RDFModelCoder(private val decodeLang: String = RDFLanguages.strLangNTriples,
                    private val encodeLang: String = RDFLanguages.strLangNTriples)
    : AtomicCoder<Model>() {

    private val LOG = LoggerFactory.getLogger(RDFModelCoder::class.java)

    override fun decode(inStream: InputStream): Model {
        val bytes = StreamUtils.getBytes(inStream)
        val model = ModelFactory.createDefaultModel()

        model.read(ByteArrayInputStream(bytes), null, decodeLang) // the exception is thrown from here

        return model
    }

    override fun encode(value: Model, outStream: OutputStream?) {
        value.write(outStream, encodeLang, null)
    }

}

I checked the side input files multiple times, they're fine, they have UTF-8 encoding.

我多次检查了侧输入文件,它们很好,它们有UTF-8编码。

2 个解决方案

#1


3  

Most likely the error is in the implementation of RDFModelCoder. When implementing encode/decode one has to remember that the provided InputStream and OutputStream are not exclusively owned by the current instance being encoded/decoded. E.g. there might be more data in the InputStream after the encoded form of your current Model. When using StreamUtils.getBytes(inStream) you are grabbing both data of the current encoded Model and anything else that was in the stream.

最有可能的错误在于RDFModelCoder的实现。在实现编码/解码时,必须记住所提供的InputStream和OutputStream并非由编码/解码的当前实例专有。例如。在当前模型的编码形式之后,InputStream中可能会有更多数据。使用StreamUtils.getBytes(inStream)时,您将获取当前编码模型的数据以及流中的任何其他内容。

Generally when writing a new Coder it's a good idea to only combine existing Coder's rather than hand-parsing the stream: that is less error-prone. I would suggest to convert the model to/from byte[] and use ByteArrayCoder.of() to encode/decode it.

通常在编写新的编码器时,最好只组合现有的编码器而不是手动解析流:这样不易出错。我建议将模型转换为/从byte []并使用ByteArrayCoder.of()对其进行编码/解码。

#2


1  

Apache Jena provides the Elephas IO modules which have Hadoop IO support, since Beam supports Hadoop InputFormat IO you should be able to use that to read in your NTriples file.

Apache Jena提供了支持Hadoop IO的Elephas IO模块,因为Beam支持Hadoop InputFormat IO,您应该可以使用它来读取NTriples文件。

This will likely be far more efficient since the NTriples support in Elephas is able to parallelise the IO and avoid caching the entire model into memory (in fact it won't use Model at all):

这可能会更有效,因为在Elephas中的NTriples支持能够并行化IO并避免将整个模型缓存到内存中(实际上它根本不会使用Model):

Configuration myHadoopConfiguration = new Configuration(false);

// Set Hadoop InputFormat, key and value class in configuration
myHadoopConfiguration.setClass("mapreduce.job.inputformat.class",
                               NTriplesInputFormat.class, InputFormat.class);
myHadoopConfiguration.setClass("key.class", LongWritable.class, Object.class);
myHadoopConfiguration.setClass("value.class", TripleWritable.class, Object.class);
// Set any other Hadoop config you might need

// Read data only with Hadoop configuration.
p.apply("read",
         HadoopInputFormatIO.<LongWritable, TripleWritable>read()
        .withConfiguration(myHadoopConfiguration);

Of course this may require you to refactor your overall pipeline somewhat.

当然,这可能需要您稍微重构整个管道。

#1


3  

Most likely the error is in the implementation of RDFModelCoder. When implementing encode/decode one has to remember that the provided InputStream and OutputStream are not exclusively owned by the current instance being encoded/decoded. E.g. there might be more data in the InputStream after the encoded form of your current Model. When using StreamUtils.getBytes(inStream) you are grabbing both data of the current encoded Model and anything else that was in the stream.

最有可能的错误在于RDFModelCoder的实现。在实现编码/解码时,必须记住所提供的InputStream和OutputStream并非由编码/解码的当前实例专有。例如。在当前模型的编码形式之后,InputStream中可能会有更多数据。使用StreamUtils.getBytes(inStream)时,您将获取当前编码模型的数据以及流中的任何其他内容。

Generally when writing a new Coder it's a good idea to only combine existing Coder's rather than hand-parsing the stream: that is less error-prone. I would suggest to convert the model to/from byte[] and use ByteArrayCoder.of() to encode/decode it.

通常在编写新的编码器时,最好只组合现有的编码器而不是手动解析流:这样不易出错。我建议将模型转换为/从byte []并使用ByteArrayCoder.of()对其进行编码/解码。

#2


1  

Apache Jena provides the Elephas IO modules which have Hadoop IO support, since Beam supports Hadoop InputFormat IO you should be able to use that to read in your NTriples file.

Apache Jena提供了支持Hadoop IO的Elephas IO模块,因为Beam支持Hadoop InputFormat IO,您应该可以使用它来读取NTriples文件。

This will likely be far more efficient since the NTriples support in Elephas is able to parallelise the IO and avoid caching the entire model into memory (in fact it won't use Model at all):

这可能会更有效,因为在Elephas中的NTriples支持能够并行化IO并避免将整个模型缓存到内存中(实际上它根本不会使用Model):

Configuration myHadoopConfiguration = new Configuration(false);

// Set Hadoop InputFormat, key and value class in configuration
myHadoopConfiguration.setClass("mapreduce.job.inputformat.class",
                               NTriplesInputFormat.class, InputFormat.class);
myHadoopConfiguration.setClass("key.class", LongWritable.class, Object.class);
myHadoopConfiguration.setClass("value.class", TripleWritable.class, Object.class);
// Set any other Hadoop config you might need

// Read data only with Hadoop configuration.
p.apply("read",
         HadoopInputFormatIO.<LongWritable, TripleWritable>read()
        .withConfiguration(myHadoopConfiguration);

Of course this may require you to refactor your overall pipeline somewhat.

当然,这可能需要您稍微重构整个管道。