XML.DOM
需求
有一个表,里面数据量比较大,每天一更新,其字段可以通过xml配置文件进行配置,即,可能每次建表的字段不一样。
上游跑时会根据配置从源文件中提取,到入库这一步需要根据配置进行建表。
解决
写了一个简单的xml,配置需要字段及类型
上游读取到对应的数据
入库这一步,先把原表删除,根据配置建新表
XML文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
|
<? xml version = "1.0" encoding = "UTF-8" ?>
<!-- 表名 ,数据库名 可灵活配置插入哪个库哪个表 -->
< table name = "top_query" db_name = "evaluting_sys" >
<!-- 非业务主键,自增长,可配名,其他 INTEGER UNSIGNED AUTO_INCREMENT -->
< primary_key >
< name >id</ name >
</ primary_key >
<!-- 字段开始 -->
< field >
< name >query</ name >
< type >varchar(200)</ type >
< is_index >false</ is_index >
< description >query</ description >
</ field >
< field >
< name >pv</ name >
< type >integer</ type >
< is_index >false</ is_index >
< description >pv</ description >
</ field >
< field >
< name >avg_money</ name >
< type >integer</ type >
< is_index >false</ is_index >
< description ></ description >
</ field >
<!-- 字段配置结束 -->
</ table >
|
处理脚本
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
|
#!/usr/bin/python
# -*- coding:utf-8 -*-
#author: wklken
#desc: use to read db xml config.
#-----------------------
#2012-02-18 created
#----------------------
import sys,os
from xml.dom import minidom, Node
def read_dbconfig_xml(xml_file_path):
content = {}
root = minidom.parse(xml_file_path)
table = root.getElementsByTagName( "table" )[ 0 ]
#read dbname and table name.
table_name = table.getAttribute( "name" )
db_name = table.getAttribute( "db_name" )
if len (table_name) > 0 and len (db_name) > 0 :
db_sql = "create database if not exists `" + db_name + "`; use " + db_name + ";"
table_drop_sql = "drop " + table_name + " if exists " + table_name + ";"
content.update({ "db_sql" : db_sql})
content.update({ "table_sql" : table_drop_sql })
else :
print "Error:attribute is not define well! db_name=" + db_name + " ;table_name=" + table_name
sys.exit( 1 )
#print table_name, db_name
table_create_sql = "create table " + table_name + "("
#read primary cell
primary_key = table.getElementsByTagName( "primary_key" )[ 0 ]
primary_key_name = primary_key.getElementsByTagName( "name" )[ 0 ].childNodes[ 0 ].nodeValue
table_create_sql + = primary_key_name + " INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,"
#print primary_key.toxml()
#read ordernary field
fields = table.getElementsByTagName( "field" )
f_index = 0
for field in fields:
f_index + = 1
name = field.getElementsByTagName( "name" )[ 0 ].childNodes[ 0 ].nodeValue
type = field.getElementsByTagName( "type" )[ 0 ].childNodes[ 0 ].nodeValue
table_create_sql + = name + " " + type
if f_index ! = len (fields):
table_create_sql + = ","
is_index = field.getElementsByTagName( "is_index" )[ 0 ].childNodes[ 0 ].nodeValue
table_create_sql + = ");"
content.update({ "table_create_sql" : table_create_sql})
#character set latin1 collate latin1_danish_ci;
print content
if __name__ = = "__main__" :
read_dbconfig_xml(sys.argv[ 1 ])
|
涉及方法
root = minidom.parse(xml_file_path) 获取dom对象
root.getElementsByTagName("table") 根据tag获取节点列表
table.getAttribute("name") 获取属性
primary_key.getElementsByTagName("name")[0].childNodes[0].nodeValue 获取子节点的值(id 得到id)
SAX
需求
读取xml数据文件,文件较大,需要实时处理插入到数据库
xml文档
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
< PERSONS >
< person >
< id >100000</ id >
< sex >男</ sex >
< address >北京,海淀区</ address >
< fansNum >437</ fansNum >
< summary >1989</ summary >
< wbNum >333</ wbNum >
< gzNum >242</ gzNum >
< blog >null</ blog >
< edu >大学</ edu >
< work ></ work >
< renZh >1</ renZh >
< brithday >2月14日</ brithday >
</ person >
</ PERSONS >
|
处理
sax处理时并不会像dom一样可以以类似节点的维度进行读取,它只有 开始标签 内容 结束标签 之分
处理思想是:通过一个handler,对开始标签,内容,结束标签各有一个处理函数
代码及注解
person 处理类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
|
from xml.sax import handler,parseString
class PersonHandler(handler.ContentHandler):
def __init__( self , db_ops):
#db op obj
self .db_ops = db_ops
#存储一个person的map
self .person = {}
#当前的tag
self .current_tag = ""
#是否是tag之间的内容 ,目的拿到tag间内容,不受空白的干扰
self .in_quote = 0
#开始,清空map
def startElement( self , name, attr):
#以person,清空map
if name = = "person" :
self .person = {}
#记录 状态
self .current_tag = name
self .in_quote = 1
#结束,插入数据库
def endElement( self , name):
#以person结尾 代表读取一个person的信息结束
if name = = "person" :
#do something
in_fields = tuple ([ ( '"' + self.person.get(i,"") + '"' ) for i in fields ])
print in_sql % in_fields
db_ops.insert( in_sql % (in_fields))
#处理
self .in_quote = 0
def characters( self , content):
#若是在tag之间的内容,更新到map中
if self .in_quote:
self .person.update({ self .current_tag: content})
|
加上入库的完整代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
|
#!/usr/bin/python
# -*- coding:utf-8 -*-
#parse_person.py
#version : 0.1
#author : wukunliang@163.com
#desc : parse person.xml and out sql
import sys,os
import MySQLdb
reload (sys)
sys.setdefaultencoding( 'utf-8' )
in_sql = "insert into person( id ,sex,address,fansNum,summary,wbNum,gzNum,blog,edu,work,renZh,brithday) values( % s, % s, % s, % s, % s, % s,
% s, % s, % s, % s, % s, % s)"
fields = ( "id" , "sex" , "address" , "fansNum" , "summary" , "wbNum" , "gzNum" , "blog" , "edu" , "work" , "renZh" , "brithday" )
#数据库方法
class Db_Connect:
def __init__( self , db_host, user, pwd, db_name, charset = "utf8" , use_unicode = True ):
print "init begin"
print db_host, user, pwd, db_name, charset , use_unicode
self .conn = MySQLdb.Connection(db_host, user, pwd, db_name, charset = charset , use_unicode = use_unicode)
print "init end"
def insert( self , sql):
try :
n = self .conn.cursor().execute(sql)
return n
except MySQLdb.Warning, e:
print "Error: execute sql '" ,sql, "' failed"
def close( self ):
self .conn.close()
#person 处理类
from xml.sax import handler,parseString
class PersonHandler(handler.ContentHandler):
def __init__( self , db_ops):
#db op obj
self .db_ops = db_ops
#存储一个person的map
self .person = {}
#当前的tag
self .current_tag = ""
#是否是tag之间的内容
self .in_quote = 0
#开始,清空map
def startElement( self , name, attr):
#以person,清空map
if name = = "person" :
self .person = {}
#记录 状态
self .current_tag = name
self .in_quote = 1
#结束,插入数据库
def endElement( self , name):
#以person结尾 代表读取一个person的信息结束
if name = = "person" :
#do something
in_fields = tuple ([ ( '"' + self.person.get(i,"") + '"' ) for i in fields ])
print in_sql % in_fields
db_ops.insert( in_sql % (in_fields))
#处理
self .in_quote = 0
def characters( self , content):
#若是在tag之间的内容,更新到map中
if self .in_quote:
self .person.update({ self .current_tag: content})
if __name__ = = "__main__" :
f = open ( "./person.xml" )
#如果源文件gbk 转码 若是utf-8,去掉decode.encode
db_ops = Db_Connect( "127.0.0.1" , "root" , "root" , "test" )
parseString(f.read().decode( "gbk" ).encode( "utf-8" ), PersonHandler(db_ops))
f.close()
db_ops.close()
|
平时拿python来分析数据,工具脚本还有hadoop streamming,但是用的面和深度实在欠缺 只能说道行还浅,需要多多实践