Google Dataflow似乎跌至第1000位

时间:2023-01-25 15:34:00

I have set up a small test using Google Dataflow (apache-beam). The use case for the experiment is to take a (csv) file and write a selected column to a (txt) file.

我使用Google Dataflow(apache-beam)设置了一个小测试。实验的用例是获取(csv)文件并将选定列写入(txt)文件。

The code for the experiment is as listed below:

实验代码如下:

from __future__ import absolute_import

import argparse
import logging
import re

import apache_beam as beam
from apache_beam.io import ReadFromText
from apache_beam.io import WriteToText
from apache_beam.metrics import Metrics
from apache_beam.metrics.metric import MetricsFilter
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions

class EmitColDoFn(beam.DoFn):
    first = True
    header = ""
    def __init__(self, i):
        super(EmitColDoFn, self).__init__()
        self.line_count =  Metrics.counter(self.__class__, 'lines')
        self.i = i

    def process(self, element):
        if self.first:
            self.header = element
            self.first = False
        else:
            self.line_count.inc()
            cols = re.split(',', element)
            return (cols[self.i],)

def run(argv=None):
    """Main entry point; defines and runs the wordcount pipeline."""
    parser = argparse.ArgumentParser()
    parser.add_argument('--input',
                        dest='input',
                        default='/users/sms/python_beam/data/MOCK_DATA (4).csv',
#                        default='gs://dataflow-samples/shakespeare/kinglear.txt',
                        help='Input file to process.')
    parser.add_argument('--output',
                        dest='output',
                        default="/users/sms/python_beam/data/",
#                        required=True,
                        help='Output file to write results to.')
    known_args, pipeline_args = parser.parse_known_args(argv)

    pipeline_options = PipelineOptions(pipeline_args)
    pipeline_options.view_as(SetupOptions).save_main_session = True
    p = beam.Pipeline(options=pipeline_options)

    # Read the text file[pattern] into a PCollection.
    lines = p | 'read' >> ReadFromText(known_args.input)

    column = (lines
            | 'email col' >> (beam.ParDo(EmitColDoFn(3)))
            | "col file" >> WriteToText(known_args.output, ".txt", shard_name_template="SS_Col"))

    result = p.run()
    result.wait_until_finish()

    if (not hasattr(result, 'has_job')  # direct runner
        or result.has_job):  # not just a template creation
        lines_filter = MetricsFilter().with_name('lines')
        query_result = result.metrics().query(lines_filter)
        if query_result['counters']:
            lines_counter = query_result['counters'][0]

        print "Lines committed", lines_counter.committed
run()

The last few lines of sample 1 below:

以下样本1的最后几行:

990,Corabel,Feldbau,cfeldbaurh@deliciousdays.com,Female,84.102.162.190,DJ
991,Kiley,Rottcher,krottcherri@stanford.edu,Male,91.97.155.28,CA
992,Glenda,Clist,gclistrj@state.gov,Female,24.98.253.127,UA
993,Ingunna,Maher,imaherrk@army.mil,Female,159.31.127.19,PL
994,Megan,Giacopetti,mgiacopettirl@instagram.com,Female,115.6.63.52,RU
995,Briny,Dutnall,bdutnallrm@xrea.com,Female,102.81.33.24,SE
996,Jan,Caddan,jcaddanrn@jalbum.net,Female,115.142.222.106,PL

Running this produces the expected output of:

运行它会产生以下预期输出:

/usr/local/bin/python2.7
/Users/sms/Library/Preferences/PyCharmCE2017.1/scratches/scratch_4.py
No handlers could be found for logger "oauth2client.contrib.multistore_file"
Lines committed 996

Process finished with exit code 0

Now for the strange results. In the next run, the number of lines is increased to 1000.

现在为了奇怪的结果。在下一次运行中,行数增加到1000。

994,Megan,Giacopetti,mgiacopettirl@instagram.com,Female,115.6.63.52,RU
995,Briny,Dutnall,bdutnallrm@xrea.com,Female,102.81.33.24,SE
996,Jan,Caddan,jcaddanrn@jalbum.net,Female,115.142.222.106,PL
997,Shannen,Gaisford,sgaisfordr7@rediff.com,Female,167.255.222.92,RU
998,Lorianna,Slyne,lslyner8@cbc.ca,Female,54.169.60.13,CN
999,Franklin,Yaakov,fyaakovr9@latimes.com,Male,122.1.92.236,CN
1000,Wilhelmine,Cariss,wcarissra@creativecommons.org,Female,237.48.113.255,PL

But this time the out put is

但这次出局是

/usr/local/bin/python2.7
/Users/sms/Library/Preferences/PyCharmCE2017.1/scratches/scratch_4.py
No handlers could be found for logger "oauth2client.contrib.multistore_file"
Lines committed 999

Process finished with exit code 0

Inspection of the output file shows that the last line was NOT processed.

检查输出文件显示最后一行未处理。

bdutnallrm@xrea.com
jcaddanrn@jalbum.net
sgaisfordr7@rediff.com
lslyner8@cbc.ca
fyaakovr9@latimes.com

Any ideas what is going on here?

有什么想法在这里发生了什么?

1 个解决方案

#1


5  

'EditColDoFn' skips first line, assuming there is one instance of it for each file. When you have more 1000 lines, the DirectRunner creates two bundles : 1000 lines in first one, and 1 line in second. In a Beam application, the input might be split into multiple bundles for processing in parallel. There is no correlation to number of files and number of bundles. Same application can process terra bytes of data spread across many files.

'EditColDoFn'跳过第一行,假设每个文件都有一个实例。当你有1000多行时,DirectRunner会创建两个包:第一个包含1000行,第二个包含1行。在Beam应用程序中,输入可能会被拆分为多个包,以便并行处理。与文件数量和捆绑数量无关。相同的应用程序可以处理遍布许多文件的terra字节数据。

ReadFromText has an option 'skip_header_lines', which you can set to 1 in order to skip header line in each of your input files.

ReadFromText有一个选项'skip_header_lines',您可以将其设置为1以跳过每个输入文件中的标题行。

#1


5  

'EditColDoFn' skips first line, assuming there is one instance of it for each file. When you have more 1000 lines, the DirectRunner creates two bundles : 1000 lines in first one, and 1 line in second. In a Beam application, the input might be split into multiple bundles for processing in parallel. There is no correlation to number of files and number of bundles. Same application can process terra bytes of data spread across many files.

'EditColDoFn'跳过第一行,假设每个文件都有一个实例。当你有1000多行时,DirectRunner会创建两个包:第一个包含1000行,第二个包含1行。在Beam应用程序中,输入可能会被拆分为多个包,以便并行处理。与文件数量和捆绑数量无关。相同的应用程序可以处理遍布许多文件的terra字节数据。

ReadFromText has an option 'skip_header_lines', which you can set to 1 in order to skip header line in each of your input files.

ReadFromText有一个选项'skip_header_lines',您可以将其设置为1以跳过每个输入文件中的标题行。