Spark streaming providing sliding window function for get rdd for last k. But I want to try use slice function to get rdd for last k, in a case I want to query rdd during range time before current time.
Spark串流为上一个k的get rdd提供滑动窗口功能,但我想尝试使用slice函数来获取上一个k的rdd,如果我想在当前时间前的距离内查询rdd。
delta = timedelta(seconds=30)
datates = datamap.slice(datetime.now()-delta,datetime.now())
And I get this error when execute the code
我在执行代码时得到这个错误
---------------------------------------------------------------------------
Py4JJavaError Traceback (most recent call last)
/home/hduser/spark-1.5.0/<ipython-input-1364-f8d325e33d4c> in <module>()
----> 1 datates = datamap.slice(datetime.now()-delta,datetime.now())
/home/hduser/spark-1.5.0/python/pyspark/streaming/dstream.pyc in slice(self, begin, end)
411 `begin`, `end` could be datetime.datetime() or unix_timestamp
412 """
--> 413 jrdds = self._jdstream.slice(self._jtime(begin), self._jtime(end))
414 return [RDD(jrdd, self._sc, self._jrdd_deserializer) for jrdd in jrdds]
415
/home/hduser/spark-1.5.0/python/pyspark/streaming/dstream.pyc in _jdstream(self)
629
630 jfunc = TransformFunction(self._sc, self.func, self.prev._jrdd_deserializer)
--> 631 dstream = self._sc._jvm.PythonTransformedDStream(self.prev._jdstream.dstream(), jfunc)
632 self._jdstream_val = dstream.asJavaDStream()
633 return self._jdstream_val
/home/hduser/spark-1.5.0/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py in __call__(self, *args)
699 answer = self._gateway_client.send_command(command)
700 return_value = get_return_value(answer, self._gateway_client, None,
--> 701 self._fqn)
702
703 for temp_arg in temp_args:
/home/hduser/spark-1.5.0/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
298 raise Py4JJavaError(
299 'An error occurred while calling {0}{1}{2}.\n'.
--> 300 format(target_id, '.', name), value)
301 else:
302 raise Py4JError(
Py4JJavaError: An error occurred while calling None.org.apache.spark.streaming.api.python.PythonTransformedDStream.
: java.lang.IllegalStateException: Adding new inputs, transformations, and output operations after stopping a context is not supported
at org.apache.spark.streaming.dstream.DStream.validateAtInit(DStream.scala:224)
at org.apache.spark.streaming.dstream.DStream.<init>(DStream.scala:64)
at org.apache.spark.streaming.api.python.PythonDStream.<init>(PythonDStream.scala:172)
at org.apache.spark.streaming.api.python.PythonTransformedDStream.<init>(PythonDStream.scala:189)
at sun.reflect.GeneratedConstructorAccessor80.newInstance(Unknown Source)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:234)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
at py4j.Gateway.invoke(Gateway.java:214)
at py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:79)
at py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:68)
at py4j.GatewayConnection.run(GatewayConnection.java:207)
at java.lang.Thread.run(Thread.java:745)
How to solve this error? Thank you
如何解决这个错误?谢谢你!
1 个解决方案
#1
0
Based on the error message,
基于错误消息,
"Adding new inputs, transformations, and output operations after stopping a context is not supported"
“不支持在停止上下文之后添加新的输入、转换和输出操作”
it looks like ssc.stop() instead of ssc.awaitTermination() was used. Please provide more information about the Spark Streaming Context (ssc) setup in the program.
看起来使用的是ssc.stop()而不是ssc.awaitTermination()。请提供更多关于Spark流上下文(ssc)设置在程序中的信息。
#1
0
Based on the error message,
基于错误消息,
"Adding new inputs, transformations, and output operations after stopping a context is not supported"
“不支持在停止上下文之后添加新的输入、转换和输出操作”
it looks like ssc.stop() instead of ssc.awaitTermination() was used. Please provide more information about the Spark Streaming Context (ssc) setup in the program.
看起来使用的是ssc.stop()而不是ssc.awaitTermination()。请提供更多关于Spark流上下文(ssc)设置在程序中的信息。