Trying to figure out how to write a streaming job relies on some 3rd party modules and look-up table to work, as follows:
试图找出如何编写流式作业依赖于某些第三方模块和查找表来工作,如下所示:
# custom.py
# this is the 3rd party or user defined python module,
# there're some module-level variables
# and some functions which rely on the moduel-level variables to work
VAR_A = ...
VAR_B = ...
# load external data files to initialize VAR_A and VAR_B
def init_var(external_file):
with open(external_file, 'r') as f:
for l in f:
VAR_A.append(l)
VAR_B.check(l)
....
# relies on VAR_A and VAR_B to work
def process(x):
if x in VAR_A:
...
if VAR_B.check(x):
...
The streaming driver as follows, and basically, for each rdd I want to apply custom
's process
function via handle
, however in the process
function relies on some look-up variables to work, i.e. VAR_A
and VAR_B
, so do I have to explicitly broadcast these look-up vars in Spark contenxt?
流驱动程序如下,基本上,对于每个rdd我想通过句柄应用自定义的进程函数,但是在进程函数依赖于一些查找变量工作,即VAR_A和VAR_B,所以我必须明确地广播这些Spark contenxt中的查找变量?
# driver.py
import custom
def handle(x):
...
custom = shared.value
return custom.process(x)
if __name__ == '__main__':
sc = SparkContext(appName='porn_score_on_name')
ssc = StreamingContext(sc, 2)
custom.init('/path/to/external_file')
# since each task node will use custom, so I try to make it a shared one
# HOWEVER, this won't work, since module cannot be pickled
shared = sc.broadcast(custom)
# get stream data
data = ...
processed = data.map(handle)
# further processing
...
ssc.start()
ssc.awaitTermination()
I wonder how to make it work, if I have to use the 3rd party module?
如果我必须使用第三方模块,我想知道如何使它工作?
UPDATE
Suppose the real-time streaming input with lines of words, e.g.
假设具有单词行的实时流输入,例如,
word1 word2
word3
word5 word7 word1
...
and I want to find out the lines with words in a precomputed vocabulary (V).
我想在预先计算的词汇表中找出包含单词的行(V)。
So I have this idea: write a streaming job to process the incoming data, and that means I have multiple executor running in parallel to consume the data, and for each executor, the precomputed vocabular V should be available all the time. Now the problem is how do I make it happen?
所以我有这个想法:写一个流作业来处理传入的数据,这意味着我有多个并行运行的执行器来使用数据,而对于每个执行器,预先计算的词汇表V应该始终可用。现在的问题是我该如何实现?
Here is my intial take at this: I make a zip package containing the vocabulary and my custom code, e.g. pack.zip, and then I submit this pack.zip via spark-submit
, so that this pack.zip is available on each executor machine, then I should do something to make each executor load the vocabulary from pack.zip into a in-memory look-up table, so that now each executor has access to the vocabulary, so they could correctly handle the real-time streaming data when the driver starts running.
以下是我的主要内容:我制作一个包含词汇表和自定义代码的zip包,例如: pack.zip,然后我通过spark-submit提交这个pack.zip,以便这个pack.zip在每个执行器机器上都可用,然后我应该做一些事情让每个执行器将pack.zip中的词汇表加载到in-内存查找表,现在每个执行程序都可以访问词汇表,这样他们就可以在驱动程序开始运行时正确处理实时流数据。
However it turns out that, the above idea could work, but each executor loads the vocabulary again and again per batch, that's not acceptable. So here is my second take: I should load the vocabulary in the driver (so this happens on the local machine right, not on the executors), and then I broadcast the vocabulary look-up table to all the executors, and then to do the job.
然而事实证明,上述想法可行,但每个执行者每批次一次又一次地加载词汇表,这是不可接受的。所以这是我的第二个看法:我应该在驱动程序中加载词汇表(所以这发生在本地机器上,而不是在执行程序上),然后我将词汇表查找表广播到所有执行程序,然后执行工作。
1 个解决方案
#1
0
Your example doesn't really seem to be a streaming problem, but just how to load a global variable...
你的例子似乎不是一个流媒体问题,而是如何加载一个全局变量......
I wouldn't try to broadcast a whole module, just the variables that are required individually.
我不会尝试广播整个模块,只是单独需要的变量。
For example, you should be able to use a broadcast variable like so. (code untested)
例如,您应该能够像这样使用广播变量。 (代码未经测试)
# One of the first things you do
vocab = sc.broadcast(open('vocab.txt').readlines()) # broadcast to all executors
def vocab_filter(line):
words = line.split()
return [w for w in words if w in vocab.value]
ssc = StreamingContext(sc, 1) # Some streaming context
lines = ssc.socketTextStream("localhost", 9999) # Some stream
# remove extraneous words from the lines and flatten all words in the stream
lines_filtered = lines.flatMap(vocab_filter)
#1
0
Your example doesn't really seem to be a streaming problem, but just how to load a global variable...
你的例子似乎不是一个流媒体问题,而是如何加载一个全局变量......
I wouldn't try to broadcast a whole module, just the variables that are required individually.
我不会尝试广播整个模块,只是单独需要的变量。
For example, you should be able to use a broadcast variable like so. (code untested)
例如,您应该能够像这样使用广播变量。 (代码未经测试)
# One of the first things you do
vocab = sc.broadcast(open('vocab.txt').readlines()) # broadcast to all executors
def vocab_filter(line):
words = line.split()
return [w for w in words if w in vocab.value]
ssc = StreamingContext(sc, 1) # Some streaming context
lines = ssc.socketTextStream("localhost", 9999) # Some stream
# remove extraneous words from the lines and flatten all words in the stream
lines_filtered = lines.flatMap(vocab_filter)