Pyspark - FileInputDStream:查找新文件的错误

时间:2021-07-18 20:52:28

Hi I'm new to Python Spark and I'm trying out this example from Spark github in order to Counts words in new text files created in the given directory :

你好,我是Python Spark的新手,我正在从Spark github尝试这个例子,以便在给定目录中创建的新文本文件中计数单词:

import sys

from pyspark import SparkContext
from pyspark.streaming import StreamingContext

if __name__ == "__main__":
if len(sys.argv) != 2:
    print("Usage: hdfs_wordcount.py <directory>", file=sys.stderr)
    exit(-1)

sc = SparkContext(appName="PythonStreamingHDFSWordCount")
ssc = StreamingContext(sc, 1)

lines = ssc.textFileStream("hdfs:///home/my-logs/")
counts = lines.flatMap(lambda line: line.split(" "))\
              .map(lambda x: (x, 1))\
              .reduceByKey(lambda a, b: a+b)
counts.pprint()

ssc.start()
ssc.awaitTermination()

And this is what I get : a warning saying : WARN FileInputDStream: Error finding new files

这就是我得到的:一个警告:警告FileInputDStream:查找新文件错误

a warning message saying : WARN FileInputDStream: Error finding new files.

警告消息:警告FileInputDStream:错误查找新文件。

and I got empty results even i'm adding files in this dir :/

即使我在这个dir:/中添加文件,结果也是空的

Any suggested solution for this ? thanks.

对此有什么建议吗?谢谢。

2 个解决方案

#1


0  

I think you are referring to this example. Are you able to run it without modifying as I see you are setting directory to "hdfs:///" in program? You can run the example like below.

我想你指的是这个例子。我看到你在程序中把目录设置为“hdfs:///”,你能在不修改的情况下运行它吗?您可以运行如下所示的示例。

For example Spark is at /opt/spark-2.0.2-bin-hadoop2.7. You can run hdfs_wordcount.py available in example directory like below. We are using /tmp as directory to pass as argument to program.

例如Spark是at /opt/ Spark -2.0.2-bin-hadoop2.7。您可以运行hdfs_wordcount。py可以在下面的示例目录中找到。我们使用/tmp作为目录,将其作为参数传递给程序。

user1@user1:/opt/spark-2.0.2-bin-hadoop2.7$ bin/spark-submit examples/src/main/python/streaming/hdfs_wordcount.py /tmp

Now while this program is running, open another terminal and copy some file to /tmp folder

现在这个程序正在运行,打开另一个终端并复制一些文件到/tmp文件夹。

user1@user1:~$ cp test.txt /tmp

You will see the word count in first terminal.

您将在第一个终端看到单词count。

#2


0  

Solved!

解决了!

The issue is the build, i use to build like that using maven depending on their readme file from github :

问题是构建,我使用maven根据github上的readme文件来构建:

build/mvn -DskipTests clean package

构建/ mvn -DskipTests清洁包

I've build that way depending on their documentation :

我是根据他们的文件建立的:

build/mvn -Pyarn -Phadoop-2.4 -Dhadoop.version=2.4.0 -DskipTests clean package

构建/mvn -Pyarn -Phadoop-2.4 -Dhadoop.version=2.4.0 -DskipTests clean包

Someone know what those params are ?

有人知道这些params是什么吗?

#1


0  

I think you are referring to this example. Are you able to run it without modifying as I see you are setting directory to "hdfs:///" in program? You can run the example like below.

我想你指的是这个例子。我看到你在程序中把目录设置为“hdfs:///”,你能在不修改的情况下运行它吗?您可以运行如下所示的示例。

For example Spark is at /opt/spark-2.0.2-bin-hadoop2.7. You can run hdfs_wordcount.py available in example directory like below. We are using /tmp as directory to pass as argument to program.

例如Spark是at /opt/ Spark -2.0.2-bin-hadoop2.7。您可以运行hdfs_wordcount。py可以在下面的示例目录中找到。我们使用/tmp作为目录,将其作为参数传递给程序。

user1@user1:/opt/spark-2.0.2-bin-hadoop2.7$ bin/spark-submit examples/src/main/python/streaming/hdfs_wordcount.py /tmp

Now while this program is running, open another terminal and copy some file to /tmp folder

现在这个程序正在运行,打开另一个终端并复制一些文件到/tmp文件夹。

user1@user1:~$ cp test.txt /tmp

You will see the word count in first terminal.

您将在第一个终端看到单词count。

#2


0  

Solved!

解决了!

The issue is the build, i use to build like that using maven depending on their readme file from github :

问题是构建,我使用maven根据github上的readme文件来构建:

build/mvn -DskipTests clean package

构建/ mvn -DskipTests清洁包

I've build that way depending on their documentation :

我是根据他们的文件建立的:

build/mvn -Pyarn -Phadoop-2.4 -Dhadoop.version=2.4.0 -DskipTests clean package

构建/mvn -Pyarn -Phadoop-2.4 -Dhadoop.version=2.4.0 -DskipTests clean包

Someone know what those params are ?

有人知道这些params是什么吗?