并行读取S3中的多个文件(Spark,Java)

时间:2021-11-06 23:10:58

I saw a few discussions on this but couldn't quite understand the right solution: I want to load a couple hundred files from S3 into an RDD. Here is how I'm doing it now:

我看到了一些关于此的讨论,但还不太明白正确的解决方案:我想将S3中的几百个文件加载到RDD中。我现在就是这样做的:

ObjectListing objectListing = s3.listObjects(new ListObjectsRequest().
                withBucketName(...).
                withPrefix(...));
List<String> keys = new LinkedList<>();
objectListing.getObjectSummaries().forEach(summery -> keys.add(summery.getKey())); // repeat while objectListing.isTruncated()

JavaRDD<String> events = sc.parallelize(keys).flatMap(new ReadFromS3Function(clusterProps));

The ReadFromS3Function does the actual reading using the AmazonS3 client:

ReadFromS3Function使用AmazonS3客户端执行实际读取:

    public Iterator<String> call(String s) throws Exception {
        AmazonS3 s3Client = getAmazonS3Client(properties);
        S3Object object = s3Client.getObject(new GetObjectRequest(...));
        InputStream is = object.getObjectContent();
        List<String> lines = new LinkedList<>();
        String str;
        try {
            BufferedReader reader = new BufferedReader(new InputStreamReader(is));
            if (is != null) {
                while ((str = reader.readLine()) != null) {
                    lines.add(str);
                }
            } else {
                ...
            }
        } finally {
            ...
        }
        return lines.iterator();

I kind of "translated" this from answers I saw for the same question in Scala. I think it's also possible to pass the entire list of paths to sc.textFile(...), but I'm not sure which is the best-practice way.

我从Scala中为同一个问题看到的答案中“有点”翻译了这个。我认为也可以将整个路径列表传递给sc.textFile(...),但我不确定哪种是最佳实践方式。

3 个解决方案

#1


5  

the underlying problem is that listing objects in s3 is really slow, and the way it is made to look like a directory tree kills performance whenever something does a treewalk (as wildcard pattern maching of paths does).

潜在的问题是在s3中列出对象的速度非常慢,并且只要某些东西进行了树木行走(就像路径的通配符模式一样),它看起来像目录树的方式会导致性能下降。

The code in the post is doing the all-children listing which delivers way better performance, it's essentially what ships with Hadoop 2.8 and s3a listFiles(path, recursive) see HADOOP-13208.

帖子中的代码正在进行全子列表,提供更好的性能,它基本上是Hadoop 2.8和s3a listFiles(路径,递归)附带的HADOOP-13208。

After getting that listing, you've got strings to objects paths which you can then map to s3a/s3n paths for spark to handle as text file inputs, and which you can then apply work to

获得该列表后,您已经获得了对象路径的字符串,然后您可以映射到s3a / s3n路径以便将spark作为文本文件输入处理,然后您可以将其应用于

val files = keys.map(key -> s"s3a://$bucket/$key").mkString(",")
sc.textFile(files).map(...)

And as requested, here's the java code used.

根据要求,这是使用的java代码。

String prefix = "s3a://" + properties.get("s3.source.bucket") + "/";
objectListing.getObjectSummaries().forEach(summary -> keys.add(prefix+summary.getKey())); 
// repeat while objectListing truncated 
JavaRDD<String> events = sc.textFile(String.join(",", keys))

Note that I switched s3n to s3a, because, provided you have the hadoop-aws and amazon-sdk JARs on your CP, the s3a connector is the one you should be using. It's better, and its the one which gets maintained and tested against spark workloads by people (me). See The history of Hadoop's S3 connectors.

请注意,我将s3n切换到s3a,因为如果你的CP上有hadoop-aws和amazon-sdk JAR,那么s3a连接器就是你应该使用的连接器。它更好,它是一个由人(我)维护和测试火花工作负载的那个。请参阅Hadoop S3连接器的历史。

#2


2  

You may use sc.textFile to read multiple files.

您可以使用sc.textFile读取多个文件。

You can pass multiple file url with as its argument.

您可以传递多个文件URL作为其参数。

You can specify whole directories, use wildcards and even CSV of directories and wildcards.

您可以指定整个目录,使用通配符甚至CSV目录和通配符。

Ex:

sc.textFile("/my/dir1,/my/paths/part-00[0-5]*,/another/dir,/a/specific/file")

Reference from this ans

从这个ans的参考

#3


0  

I guess if you try to parallelize while reading aws will be utilizing executor and definitely improve the performance

我想如果你尝试并行化,而阅读aws将使用执行器,绝对提高性能

val bucketName=xxx
val keyname=xxx
val df=sc.parallelize(new AmazonS3Client(new BasicAWSCredentials("awsccessKeyId", "SecretKey")).listObjects(request).getObjectSummaries.map(_.getKey).toList)
        .flatMap { key => Source.fromInputStream(s3.getObject(bucketName, keyname).getObjectContent: InputStream).getLines }

#1


5  

the underlying problem is that listing objects in s3 is really slow, and the way it is made to look like a directory tree kills performance whenever something does a treewalk (as wildcard pattern maching of paths does).

潜在的问题是在s3中列出对象的速度非常慢,并且只要某些东西进行了树木行走(就像路径的通配符模式一样),它看起来像目录树的方式会导致性能下降。

The code in the post is doing the all-children listing which delivers way better performance, it's essentially what ships with Hadoop 2.8 and s3a listFiles(path, recursive) see HADOOP-13208.

帖子中的代码正在进行全子列表,提供更好的性能,它基本上是Hadoop 2.8和s3a listFiles(路径,递归)附带的HADOOP-13208。

After getting that listing, you've got strings to objects paths which you can then map to s3a/s3n paths for spark to handle as text file inputs, and which you can then apply work to

获得该列表后,您已经获得了对象路径的字符串,然后您可以映射到s3a / s3n路径以便将spark作为文本文件输入处理,然后您可以将其应用于

val files = keys.map(key -> s"s3a://$bucket/$key").mkString(",")
sc.textFile(files).map(...)

And as requested, here's the java code used.

根据要求,这是使用的java代码。

String prefix = "s3a://" + properties.get("s3.source.bucket") + "/";
objectListing.getObjectSummaries().forEach(summary -> keys.add(prefix+summary.getKey())); 
// repeat while objectListing truncated 
JavaRDD<String> events = sc.textFile(String.join(",", keys))

Note that I switched s3n to s3a, because, provided you have the hadoop-aws and amazon-sdk JARs on your CP, the s3a connector is the one you should be using. It's better, and its the one which gets maintained and tested against spark workloads by people (me). See The history of Hadoop's S3 connectors.

请注意,我将s3n切换到s3a,因为如果你的CP上有hadoop-aws和amazon-sdk JAR,那么s3a连接器就是你应该使用的连接器。它更好,它是一个由人(我)维护和测试火花工作负载的那个。请参阅Hadoop S3连接器的历史。

#2


2  

You may use sc.textFile to read multiple files.

您可以使用sc.textFile读取多个文件。

You can pass multiple file url with as its argument.

您可以传递多个文件URL作为其参数。

You can specify whole directories, use wildcards and even CSV of directories and wildcards.

您可以指定整个目录,使用通配符甚至CSV目录和通配符。

Ex:

sc.textFile("/my/dir1,/my/paths/part-00[0-5]*,/another/dir,/a/specific/file")

Reference from this ans

从这个ans的参考

#3


0  

I guess if you try to parallelize while reading aws will be utilizing executor and definitely improve the performance

我想如果你尝试并行化,而阅读aws将使用执行器,绝对提高性能

val bucketName=xxx
val keyname=xxx
val df=sc.parallelize(new AmazonS3Client(new BasicAWSCredentials("awsccessKeyId", "SecretKey")).listObjects(request).getObjectSummaries.map(_.getKey).toList)
        .flatMap { key => Source.fromInputStream(s3.getObject(bucketName, keyname).getObjectContent: InputStream).getLines }