使用任务队列来解析大型xml数据

时间:2021-02-23 17:02:20

So what I'm trying to do is go through a 5Gb xml file of products for a website and eventually add the data to a datastore. I'm just playing around with queues now and my idea was to create a queue that will read through the file line by line and take every 50 products and send them to another queue to be processed (eventually to the datastore). I'm testing this on a much smaller xml file. My problem is within OpenFileQueue, it's creating a queue even when the conditions "if ((self.count % 50) == 0):" have not been met. Any ideas on what might be going on? Or ideas on better ways to read through this file. It feels like a bad hack the way im doing it now. The test file im using has around 170 products when I run the code as it is now and call /gcs I end up with about 86 queues. Not sure what is going on here.

所以我要做的就是为网站浏览一个5Gb xml的产品文件,最后将数据添加到数据存储区。我现在只是在玩队列,我的想法是创建一个队列,逐行读取文件,并将每50个产品发送到另一个队列进行处理(最终到数据存储区)。我正在一个更小的xml文件上测试它。我的问题在OpenFileQueue中,即使条件“if((self.count%50)== 0):”尚未满足,它也会创建一个队列。关于可能发生的事情的任何想法?或者有关更好地阅读此文件的方法的想法。我现在这样做的感觉就像一个糟糕的黑客。当我运行代码时,我正在使用的测试文件有大约170个产品,并且调用/ gcs我最终得到大约86个队列。不知道这里发生了什么。

import webapp2
import os
import datetime
import time
from lxml import etree
import sys
import codecs
import time
import gc
import logging


from google.appengine.ext import db
from google.appengine.api import search
import cloudstorage as gcs
from google.appengine.api import taskqueue

my_default_retry_params = gcs.RetryParams(initial_delay=0.2,
                                      max_delay=5.0,
                                      backoff_factor=2,
                                      max_retry_period=15)
gcs.set_default_retry_params(my_default_retry_params)

logging.getLogger().setLevel(logging.DEBUG)


class GoogleCloudStorage(webapp2.RequestHandler):

    def get(self):
        bucket = '/newegg-catalog'
        self.response.headers['Content-Type'] = 'text/plain'
        self.tmp_filenames_to_clean_up = []    
        filename = bucket + '/ndd.xml'     
        taskqueue.add(url='/openfile', params={'filename': filename})
        self.redirect('/')



class AddFileParts(webapp2.RequestHandler):
     def post(self):
         data = self.request.get('data')
         logging.debug('PROCESSING %s', data)

class OpenFileQueue(webapp2.RequestHandler):
    def __init__(self, request, response):
        self.initialize(request, response)
        self.Plist = []
        self.masterList = []
        self.count = 0

    def post(self):
        filename = self.request.get('filename')
        logging.debug('Opening file %s', filename)
        gcs_file = gcs.open(filename)

        while True:
            line = gcs_file.readline()
            self.Plist.append(line)
            if line.strip()=="</product>":
                self.masterList.append(self.Plist)
                self.Plist = []
                self.count+=1

            if ((self.count % 50) == 0):
                logging.debug('Starting queue of items up to %s with 50 items', self.count)
                taskqueue.add(url='/adddata', params={'data': self.masterList})
                self.masterList = []
            if line.strip()=="</catalog>":
                break
        gcs_file.close()

app = webapp2.WSGIApplication([('/adddata',AddFileParts),
                                ('/openfile', OpenFileQueue),
                                ('/gcs', GoogleCloudStorage)],
                                debug=True)

1 个解决方案

#1


0  

When a line matches "</product>", it appends to self.masterlist and increments self.count (eventually to 50). But if the next line is not "</product>", the count will still be 50 and add another task to the queue.

当一行匹配“ ”时,它会附加到self.masterlist并增加self.count(最终为50)。但如果下一行不是“ ”,则计数仍为50,并将另一个任务添加到队列中。

Instead, use the length of self.masterList because it is reset after being added to the queue:

相反,使用self.masterList的长度,因为它在添加到队列后重置:

if len(self.masterList) >= 50:
    logging.debug('Starting queue of items up to %s with 50 items', len(self.masterList))
    taskqueue.add(url='/adddata', params={'data': self.masterList})
    self.masterList = []

and remove all references to self.count.

并删除对self.count的所有引用。

#1


0  

When a line matches "</product>", it appends to self.masterlist and increments self.count (eventually to 50). But if the next line is not "</product>", the count will still be 50 and add another task to the queue.

当一行匹配“ ”时,它会附加到self.masterlist并增加self.count(最终为50)。但如果下一行不是“ ”,则计数仍为50,并将另一个任务添加到队列中。

Instead, use the length of self.masterList because it is reset after being added to the queue:

相反,使用self.masterList的长度,因为它在添加到队列后重置:

if len(self.masterList) >= 50:
    logging.debug('Starting queue of items up to %s with 50 items', len(self.masterList))
    taskqueue.add(url='/adddata', params={'data': self.masterList})
    self.masterList = []

and remove all references to self.count.

并删除对self.count的所有引用。