ubuntu上跑python连接pg,报错 ImportError: No module named psycopg2

时间:2022-11-30 11:42:30

ubuntu上跑python连接pg,报错  ImportError: No module named psycopg2





root@pgproxy1:~# python /home/zxw/PGWriterTest_m.py 

Traceback (most recent call last):

  File "/home/zxw/PGWriterTest_m.py", line 4, in <module>

    import psycopg2

ImportError: No module named psycopg2





例如以下安装:

1

root@pgproxy1:~# apt-cache search psycopg2

python-psycopg2 - Python module for PostgreSQL

python-psycopg2-dbg - Python module for PostgreSQL (debug extension)

python-psycopg2-doc - Python module for PostgreSQL (documentation package)

python3-psycopg2 - Python 3 module for PostgreSQL

python3-psycopg2-dbg - Python 3 module for PostgreSQL (debug extension)





2

root@pgproxy1:~# python -V

Python 2.7.3





3

root@pgproxy1:~# apt-get install -y python-psycopg2 python-psycopg2-doc python-psycopg2-dbg

Reading package lists... Done

Building dependency tree       

Reading state information... Done

...

Processing triggers for libc-bin ...

ldconfig deferred processing now taking place

root@pgproxy1:~#





4

脚本例如以下:

# --encoding:utf-8--

import time

import threading

import psycopg2

import Queue

import datetime

该接口相关能够參考其官网:

http://initd.org/psycopg/

class PGWriterTest(threading.Thread):

    """ 初始化 """

    def __init__(self,connstr):

        self.conn = psycopg2.connect(connstr)

        self.cursor = self.conn.cursor()

        

        self.dbnum  = 4

        self.connArray = []

        self.cursorArray = []

        

        

        for i in range(0,self.dbnum):

            #DB port

            dbidstr = '%02d' % (9900 + i)

            if (i == 0  or i == 2 ):

                dstDB = 'host=ip1 user=dbusername password=pwd dbname=dbname port='+ dbidstr

            else:

                dstDB = 'host=ip2 user=dbusername password=pwd dbname=dbname port='+ dbidstr

            print 'connect ' + dstDB

           

            dstConn =  psycopg2.connect(dstDB)

            self.connArray.append(dstConn)

            dstCursor = dstConn.cursor()

            self.cursorArray.append(dstCursor)

        

        # 执行父类的构造函数

        threading.Thread.__init__(self)





    """ 数据写入数据库 """ 

    def read(self,t_id):

        sql = 'SELECT * from get_cont('+str(t_id)+')' 

        self.cursor.execute(sql)

        datalist = []

        for row in self.cursor.fetchall():

            datalist.append(row)

        return datalist    

    

    """ 数据写入数据库 """ 

    def save(self,t_id,data):

        sql = 'SELECT write_cont((%s,character(255) %s,%d,NOW()::timestamp))'

        params = []

        params.append(t_id)

        params.append(data[1])

        params.append(data[2])

        params.append(data[3])





        #print params

        #取db_ins_id

        db_ins_id = 0

        db_ins_id = t_id % self.dbnum

        try:

            print str(datetime.datetime.now()) + " DB " + str(db_ins_id) + " " +  str( t_id ) + " before insert"

            insert_sql = 'SELECT write_cont((%s,character(255) %s,%d,NOW()::timestamp))'

            self.cursorArray[db_ins_id].execute(insert_sql,params)

            self.cursorArray[db_ins_id].execute("COMMIT")

            print str(datetime.datetime.now()) + " DB " + str(db_ins_id) + " " +  str( t_id ) + " inserted"

            return True





        except Exception,ex:

            print("save error:%s" % str(ex))

            print("save t_id:%s\t" % str(t_id))

            print("error cont:%s" % str(params))

            #self.log.write("error param:%s" % str(params))

            #self.log.write("error t_id:%s" % data.get('t_id',''))

            self.cursorArray[db_ins_id].execute("ROLLBACK")

            return False





            

            #測试读取

            """

            try:

                searchsql = 'select t_id from get_cont('+ str(t_id) + ')'

                self.cursorArray[db_ins_id].execute(searchsql)

                data = self.cursorArray[db_ins_id].fetchone()

                if (data == None  or int(data[0]) != t_id ):

                    print ' insert error for ' +str(db_ins_id) + searchsql

            except Exception,searchex:

                print str(searchex) + ' for  ' + searchsql

            """     

        return True

if __name__ == "__main__":

    pgTest = PGWriterTest('host=ip user=DBUser password=pwd dbname=DBNAme port=9999')

    start_time =  str(datetime.datetime.now())

    for i in range(1,10):

        t_id = 1000 + i;

        print str(datetime.datetime.now()) + " getting "

        rows = pgTest.read(t_id)

        

        print str(datetime.datetime.now()) + " gotten "

        #print rows[0]

        pgTest.save(t_id,rows[0])

    print start_time

print  str(datetime.datetime.now())

-----------------

转载请著明出处:

blog.csdn.net/beiigang