Python MongoDB 合表

时间:2022-05-16 03:58:03

一、原始表结构

1、imsi表

MongoDB Enterprise > db.trs_action_dzwl_zm.findOne()
{
        "_id" : {
                "imsi" : "460029380018855",
                "start_time" : "2019-03-13 15:37:07"
        },
        "site_address" : "织里-大港路与G318交叉口",
        "xnetbar_wacode" : "EG-MIX-WL-4C-006",
        "imei" : "000000052052052",
        "device_longitude" : "120.275424",
        "device_latitude" : "30.838656",
        "tmsi" : "1552462627",
        "rssi" : "140",
        "band" : "40",
        "plmn" : "46000",
        "tel_number" : "1595028",
        "device_name" : "织里-大港路与G318交叉口-4G",
        "vendor_name" : "南京森根",
        "province" : "江苏省",
        "city" : "盐城市"
}

2、car表

MongoDB Enterprise > db.trs_action_car_info.findOne()
{
        "_id" : {
                "license_number" : "苏A39NX7",
                "start_time" : "2019-05-16 23:03:13"
        },
        "site_address" : "湖织大道-香圩桥东侧",
        "site_location_id" : "",
        "unlawful_act" : "",
        "driving_direct" : "其它",
        "lane_id" : "001",
        "netbar_wacode" : "904",
        "license_color" : "002",
        "photo_cnt" : "",
        "monitor_type" : "卡口式监控",
        "photo_path" : "/pic?did=12ffaa00-78a3-1037-921c-54c4150760be&bid=486472&pid=4294966623&ptime=1558018994",
        "speed" : "0",
        "stat" : "0",
        "vehicle_brand1" : "0",
        "vehicle_brand2" : "0",
        "car_length" : "",
        "car_color" : "其它颜色",
        "shade" : "000",
        "car_type" : "轿车",
        "license_type" : "92式民用车",
        "vehicle_feature_path" : "",
        "device_name" : "湖织大道-香圩桥东侧",
        "monitor_direct" : "未知",
        "lane" : "001",
        "device_longitude" : "120.308512",
        "device_latitude" : "30.881026",
        "site_name" : "湖织大道-香圩桥东侧",
        "road_segment_direct" : "未知",
        "site_longitude" : "120.308512",
        "site_latitude" : "30.881026"
}

3、face表

MongoDB Enterprise > db.trs_action_face_info.findOne()
{
        "_id" : {
                "pid" : "0120_1561570383884_d61beb5b9e644ed081f4ffc5e362ece7",
                "start_time" : "2019-06-13 12:32:59"
        },
        "site_address" : "融泰宾馆",
        "img_mode" : "",
        "obj_img_url" : "/pic?=d4=i778z096as091-706105m6ep=t1i5i*d1=*ipd7=*9s8=42b8i2d05*717540c14-a563e27-1579*d-d0i806d8e42",
        "quality_score" : "0.883593",
        "netbar_wacode" : "33052802001310942740",
        "device_name" : "融泰宾馆",
        "device_longitude" : "120.262211",
        "device_latitude" : "30.841749",
        "age" : "",
        "gender" : "1",
        "race" : "",
        "beard" : "",
        "eye_open" : "",
        "eye_glass" : "",
        "sun_glass" : "1",
        "mask" : "",
        "mouth_open" : "",
        "smile" : "1",
        "similarity" : "0.97059",
        "image_id" : "0120_1561570383884_d61beb5b9e644ed081f4ffc5e362ece7",
        "bkg_url" : "/pic?=d4=i778z096as091-706105m6ep=t1i5i*d1=*ipd7=*9s8=42b8i2d05*717540c14-a563e27-1579*d-d0i806d8e42"
}

4、MAC表

二、合表后collectionsitetime结构

要求:将imsi、car、face、MAC(MAC暂时不合)四张表,将表中一些关键字段提取出来

1)以站点

2)以两分钟为间隔

3)一个document中,两分钟内最多只存200个关键数据

MongoDB Enterprise > db.collecsites.findOne()
{
        "_id" : ObjectId("5e159ef831d840f9482b2adc"),
        "timeline" : "2019-03-13 15:34:00",
        "site" : "织里-大港路与G318交叉口",
        "face" : [ ],
        "lpn" : [ ],
        "mac" : [ ],
        "nsamples" : 200,
        "imsi" : [
                {
                        "start_time" : "2019-03-13 15:35:56",
                        "imsi" : "460078995442766"
                },
                {
                        "start_time" : "2019-03-13 15:35:56",
                        "imsi" : "460006254007976"
                }
        ]
}

三、开发脚本

1、使用到python模块

from multiprocessing import Pool(进程池)

from pymongo import MongoClient(python连接mongodb驱动)

import pandas as pd(将一段时间划分为多个时间段,本例子以2分钟一个时间段)

2、脚本

1)连接mongodb的脚本

[ [email protected] python3]# cat mongodbclient.py
#coding=utf-8 from multiprocessing import Pool
import os, time, random
import json
from datetime import datetime
from pymongo import MongoClient
import sys
import datetime class Database(object):
    def __init__(self, address, port, database):
        self.conn = MongoClient(host=address, port=port)
        self.db = self.conn[database]     def get_state(self):
        return self.conn is not None and self.db is not None     def insert_one(self, collection, data):
        if self.get_state():
            ret = self.db[collection].insert_one(data)
            return ret.inserted_id
        else:
            return ""     def insert_many(self, collection, data):
        if self.get_state():
            ret = self.db[collection].insert_many(data)
            return ret.inserted_id
        else:
            return ""     def update(self, collection, data):
        # data format:
        # {key:[old_data,new_data]}
        data_filter = {}
        data_revised = {}
        for key in data.keys():
            data_filter[key] = data[key][0]
            data_revised[key] = data[key][1]
        if self.get_state():
            return self.db[collection].update_many(data_filter, {"$set": data_revised}).modified_count
        return 0     def updateOne(self, collection, data_filter,data_revised):
        if self.get_state():
            return self.db[collection].update(data_filter,data_revised,True)
        return 0
    def find(self, col, condition, column=None):
        if self.get_state():
            if column is None:
                return self.db[col].find(condition)
            else:
                return self.db[col].find(condition, column)
        else:
            return None     def aggregate(self, col, condition):
        if self.get_state():
            options = {‘allowDiskUse‘:True}
            result=self.db[col].aggregate(condition,**options)
            return result
        else:
            return None
    def delete(self, col, condition):
        if self.get_state():
            return self.db[col].delete_many(filter=condition).deleted_count
        return 0     def close_connect(self):
        self.conn.close()
        #return ‘mongo连接已关闭‘ 2)对mongodb中collection做实际操作的脚本 [ [email protected] python3]# cat collection_curd.py
#coding:utf-8 from multiprocessing import Pool
import os, time, random
import json
from datetime import datetime
from pymongo import MongoClient
import sys
import datetime
import mongodbclient
import pandas as pd def max_number(num1,num2,num3):   ##获取最大值
    max_num=max(num1,num2,num3)
    return max_num def site_cursor_to_list(myresult,colum):  ##将mongodb输出的cursor转换为python的list
    sitelist=[]
    for i in myresult:
        sitelist.append(i[colum])
    return sitelist def list_Duplicate_removal(inlist):   ##去除重复值
    outlist=list(set(inlist))
    return outlist def get_time_interval(str_start_time,str_end_time):  ##以2分钟为单位,将输入的时间范围切分
    time_interval=pd.date_range(str_start_time, str_end_time,freq=‘2 Min‘)
    return time_interval def get_site(collection_name,str_start_time,str_end_time):  ##获取2分钟内imsi/face/lpn/mac的站点名称
    db = mongodbclient.Database("172.16.102.15", 27017, "idpad_zl")
    myresult=db.find(collection_name, {"_id.start_time":{ "$gte":str_start_time,"$lt":str_end_time}})
    db.close_connect()
    return site_cursor_to_list(myresult,"site_address") def get_site_data(collection_name,str_start_time,str_end_time,site,colums):  ##根据条件:2分钟的起止时间、站点名、集合名、字段名,获取所需数据
    db = mongodbclient.Database("172.16.102.15", 27017, "idpad_zl")
    myresult=db.find(collection_name, {"_id.start_time":{ "$gte":str_start_time,"$lt":str_end_time},"site_address":site},colums)
    db.close_connect()
    return myresult
def sitetime_insert(collection_name,site,str_start_time,imsi_sitetime,face_sitetime,car_sitetime,mac_sitetime):  ##将数据插入集合
    db = mongodbclient.Database("172.16.102.15", 27017, "idpad_zl")
    db.insert_one(collection_name,{"site":site,"timeline":str_start_time,"nsamples":200,"imsi":imsi_sitetime,"face":face_sitetime,"lpn":car_sitetime,"mac":mac_sitetime})
    db.close_connect() def sitetime_updateOne(collection_name,site,str_start_time,key,value):   ##将数据更新到集合中
    db = mongodbclient.Database("172.16.102.15", 27017, "idpad_zl")
    db.updateOne(collection_name,{"site":site,"timeline":str_start_time,"nsamples":200,key:[]},{"$set":{key:value}})
    db.close_connect() #def sit_colse():
#    db.close_connect()   3)操作脚本 [ [email protected] python3]# cat collection_insert.py
#coding:utf-8 from multiprocessing import Pool
import os, time, random
import json
from datetime import datetime
from pymongo import MongoClient
import sys
import datetime
import mongodbclient
import pandas as pd
import collection_curd as curd
from multiprocessing import Pool #update_exec(imsi_outlen_flo,"collecsites",‘imsi‘,imsidata,imsi_outlen_int,imsi_max_len)
def update_exec(type_outlen_flo,collectionname,site,str_start_time,typelist,datalist,type_outlen_int,type_max_len):
    if type_outlen_flo <=1.0:
        curd.sitetime_updateOne(collectionname,site,str_start_time,typelist,datalist)
    else:
        for x in range(type_outlen_int 1):
            if x==type_outlen_int:
                curd.sitetime_updateOne(collectionname,site,str_start_time,typelist,datalist[x*200:type_max_len])
                #print(typelist)
            else:
                curd.sitetime_updateOne(collectionname,site,str_start_time,typelist,datalist[x*200:(x 1)*200])
                #print(typelist) def data_exec(nums,time_interval):
    #start = time.time()
    #print("start_time : ",start)
    #time_interval=curd.get_time_interval(‘20190310‘,‘20191230‘)
    #for i in range(len(time_interval)-1):  ##从时间切片中,选取每一个切片时间段
    #print("start : ",nums)
    str_start_time = datetime.datetime.strftime(time_interval[nums],‘%Y-%m-%d %H:%M:%S‘)  ##时间切片,每个切片的开始时间
    str_end_time = datetime.datetime.strftime(time_interval[nums 1],‘%Y-%m-%d %H:%M:%S‘)  ##时间切片,每个切片的结束时间
    #print(str_start_time,‘   ‘,str_end_time)
    #print("########################")
    #time.sleep(5)
    #exit()
    #sitelist=[]
    myresult_imsi_sit=curd.get_site("trs_action_dzwl_zm",str_start_time,str_end_time) ##获取2分钟内imsi的站点名称,并将站点名带入下面的循环
    myresult_car_sit=curd.get_site("trs_action_car_info",str_start_time,str_end_time) ##获取2分钟内car的站点名称,并将站点名带入下面的循环
    myresult_face_sit=curd.get_site("trs_action_face_info",str_start_time,str_end_time) ##获取2分钟内face的站点名称,并将站点名带入下面的循环
    myresult=myresult_imsi_sit myresult_car_sit myresult_face_sit
    #print(myresult)
    myresult=curd.list_Duplicate_removal(myresult) ##获取去重后的所有站点
    #print(myresult)
    #exit()
    if not myresult:
        pass
    else:
        for i in range(len(myresult)):
            site=myresult[i]
            #print(site)
            my_imsi_site_data=curd.get_site_data("trs_action_dzwl_zm",str_start_time,str_end_time,site,{"_id"}) ##获取这个站点、这段时间内的数据imsi
            my_car_site_data=curd.get_site_data("trs_action_car_info",str_start_time,str_end_time,site,{"_id"}) ##获取这个站点、这段时间内的数据car
            my_face_site_data=curd.get_site_data("trs_action_face_info",str_start_time,str_end_time,site,{"_id"}) ##获取这个站点、这段时间内的数据face             imsidata=curd.site_cursor_to_list(my_imsi_site_data,"_id")
            cardata=curd.site_cursor_to_list(my_car_site_data,"_id")
            facedata=curd.site_cursor_to_list(my_face_site_data,"_id")
            #print(imsidata)             imsi_outlen_int=len(imsidata)/200
            imsi_outlen_flo=len(imsidata)/200.0
            car_outlen_int=len(cardata)/200
            face_outlen_int=len(facedata)/200
            car_outlen_flo=len(cardata)/200.0
            face_outlen_flo=len(facedata)/200.0             car_max_len=len(cardata)
            face_max_len=len(facedata)
            imsi_max_len=len(imsidata)
            #print("car_max_len:",car_outlen_int," ","face_max_len:",face_outlen_int," ","imsi_max_len:",imsi_outlen_int)
            max_mod_200=max(imsi_outlen_int,car_outlen_int,face_outlen_int) 1
            #print(max_mod_200)             if imsi_outlen_flo>imsi_outlen_int or car_outlen_flo>car_outlen_int or face_outlen_flo>face_outlen_int:
                for i in range(max_mod_200):
                    curd.sitetime_insert("collecsites",site,str_start_time,[],[],[],[])
            else:
                for i in range(max_mod_200-1):
                    curd.sitetime_insert("collecsites",site,str_start_time,[],[],[],[])
            update_exec(imsi_outlen_flo,"collecsites",site,str_start_time,‘imsi‘,imsidata,imsi_outlen_int,imsi_max_len)
            update_exec(car_outlen_flo,"collecsites",site,str_start_time,‘lpn‘,cardata,car_outlen_int,car_max_len)
            update_exec(face_outlen_flo,"collecsites",site,str_start_time,‘face‘,facedata,face_outlen_int,face_max_len)
            #print(site)
            #exit()
            #curd.sit_colse
    #def update_exec(type_outlen_flo,collectionname,site,str_start_time,typelist,datalist,type_outlen_int,type_max_len):     #end = time.time()
    #print("end_time : ",end)
    #print(‘ALL Insert Task runs %s(ms).‘ % ((end - start)*1000))
if __name__ == ‘__main__‘:
    start = time.time()
    p=Pool(30)
    #print("start_time : ",start)
    time_interval=curd.get_time_interval(‘20190310‘,‘20191230‘)
    for i in range(len(time_interval)-1):  ##从时间切片中,选取每一个切片时间段
        #print(i)
        #res=p.apply_async(data_exec,args=(i,))
        result=p.apply_async(data_exec, args=(i,time_interval))
    p.close()
    p.join()
    end = time.time()
    print("end_time : ",end)
    print(‘ALL Insert Task runs %s(ms).‘ % ((end - start)*1000))  

四、开发中遇到的问题

1、如何将一段时间按照两分钟进行划分 2、实例化mongodb连接后,在脚本运行中,连接如何close 3、进程线程池的使用