使用apache beam python sdk可以进行有状态处理吗?

时间:2023-01-25 15:34:18

I have been following the Timely (and Stateful) Processing with Apache Beam article and though comprehensive and well written it does not specify how to achieve the same with python. More specifically it states:

我一直在使用Apache Beam文章跟踪Timely(和Stateful)处理,虽然全面而且编写得很好但没有指定如何用python实现相同的功能。更具体地说,它指出:

State and timers are not yet supported in Beam's Python SDK.

Beam的Python SDK尚不支持状态和定时器。

It does not state the reason for this though ... is there an innate reason why this is not possible?

它没有说明这一点的原因......是否有一个天生的原因,为什么这是不可能的?

I am looking to implement a replay buffer / windowing system for a signal processing system that I am aiming to implement. Whereby a sliding window / historical frame buffer of features of length W is constantly updated with the latest window.

我正在寻求为我希望实现的信号处理系统实现重放缓冲/窗口系统。因此,使用最新窗口不断更新长度为W的特征的滑动窗口/历史帧缓冲器。

In Java its implementation looks like the following:

在Java中,它的实现如下所示:

static class FeatureFrameBuffer extends DoFn, FeatureFrame> { Integer bufferSize;

static class FeatureFrameBuffer扩展DoFn,FeatureFrame> {Integer bufferSize;

    public FeatureFrameBuffer(Integer bufferSize) {
        this.bufferSize = bufferSize;
    }

    @StateId("buffer")
    private final StateSpec<BagState<KV<String, Double>>> bufferedFeatures = StateSpecs.bag();

    @StateId("count")
    private final StateSpec<ValueState<Integer>> countState = StateSpecs.value();

    @ProcessElement
    public void process(
                        ProcessContext context,
                        @StateId("buffer") BagState<KV<String, Double>> bufferState,
                        @StateId("count") ValueState<Integer> countState
                        ) {

        int count = firstNonNull(countState.read(), 0);
        count = count + 1;
        countState.write(count);
        bufferState.add(context.element());

        // Only output buffer if count is greater than bufferSize
        // Remove last element from buffer if count
        // greater than or equals buferSize
        if (count >= bufferSize) {
            bufferState.read();
            createFeatureFrame();
            context.output(featureFrame);
            bufferState.clear();
            countState.clear();
        }
    }
}

I was wondering if the same could be achieved with the python sdk before I start to develop a custom implementation. Some advice on the matter would be great.

我想知道在开始开发自定义实现之前是否可以使用python sdk实现相同的功能。关于此事的一些建议会很棒。

1 个解决方案

#1


0  

As of today, Python SDK support for Stateful Processing is still an open issue. see https://issues.apache.org/jira/browse/BEAM-2687, and it is blocked by this ticket: "Implement Beam Python User State and Timer API", which is actively in progress though.

截至今天,Python SDK对状态处理的支持仍然是一个悬而未决的问题。请参阅https://issues.apache.org/jira/browse/BEAM-2687,它被此票据阻止:“实施Beam Python用户状态和定时器API”,但正在积极进行中。

#1


0  

As of today, Python SDK support for Stateful Processing is still an open issue. see https://issues.apache.org/jira/browse/BEAM-2687, and it is blocked by this ticket: "Implement Beam Python User State and Timer API", which is actively in progress though.

截至今天,Python SDK对状态处理的支持仍然是一个悬而未决的问题。请参阅https://issues.apache.org/jira/browse/BEAM-2687,它被此票据阻止:“实施Beam Python用户状态和定时器API”,但正在积极进行中。