python: Hack Adodb

Topic: 
 

เกร็ด

  • ปกติเราไม่สามารถสร้าง database จากคำสั่ง connection ได้ เช่น
    >>> conn=adodb.NewADOConnection('postgres')
    >>> conn.Connect('host','user','password','')
    Traceback (most recent call last):
      File "<stdin>", line 1, in ?
      File "/usr/lib/python2.4/site-packages/adodb/adodb.py", line 199, in Connect
        self._connect(host,user,password,database)
      File "/usr/lib/python2.4/site-packages/adodb/adodb_postgres.py", line 46, in _connect
        self._conn = psycopg.connect(dsn)
    psycopg.OperationalError: FATAL:  database "user" does not exist

    แต่เราสามารถทำโดยอ้อมได้ มีข้อแม้ว่าต้องมี database อยู่ก่อน ก็คือเราติดต่อเข้าไปหา database ที่มีอยู่ ในที่นี้เราใช้ postgres ตัว database ที่มีอยู่แน่นอนก็คือ database ชื่อ postgres แล้วจึงค่อยสร้างใหม่จากคำสั่ง Execute SQL

    >>> conn=adodb.NewADOConnection('postgres')
    >>> conn.Connect('host','user','password','postgres')
    True
    >>> cur=conn.Execute('CREATE DATABASE %s OWNER %s' % ('new_db','user'))
    >>> conn.Connect('host','user','password','new_db')
    True

โค๊ดของโปรแกรม adodb.py
ยกโค๊ดมาดูทั้งยวง ตัดทอนเหลือเฉพาะชื่อคลาสและฟังก์ชั่น
########################################################################
# Vers 2.01 5 May 2006, (c)2004-2006 John Lim (jlim#natsoft.com.my) All Rights Reserved
# Released under a BSD-style license. See LICENSE.txt.
# Download: http://adodb.sourceforge.net/#pydownload
########################################################################

__author__ = "John Lim (jlim#natsoft.com)"
__credits__ = "(c) 2004-2006 John Lim"

import exceptions,sys,re
from datetime import datetime

try:
    True, False
except NameError:
    # Maintain compatibility with Python 2.2
    True, False = 1, 0

MapTypes = {
            'VARCHAR' : 'C',
            'VARCHAR2' : 'C',
            'CHAR' : 'C',
            'C' : 'C',
            'STRING' : 'C',
            'NCHAR' : 'C',
            'NVARCHAR' : 'C',
            'VARYING' : 'C',
            'BPCHAR' : 'C',
            'CHARACTER' : 'C',
            'INTERVAL' : 'C',  # Postgres
            ##
            'LONGCHAR' : 'X',
            'TEXT' : 'X',
            'NTEXT' : 'X',
            'M' : 'X',
            'X' : 'X',
            'CLOB' : 'X',
            'NCLOB' : 'X',
            'LVARCHAR' : 'X',
            ##
            'BLOB' : 'B',
            'IMAGE' : 'B',
            'BINARY' : 'B',
            'VARBINARY' : 'B',
            'LONGBINARY' : 'B',
            'B' : 'B',
            ##
            'YEAR' : 'D', # mysql
            'DATE' : 'D',
            'D' : 'D',
            ##
            'TIME' : 'T',
            'TIMESTAMP' : 'T',
            'DATETIME' : 'T',
            'TIMESTAMPTZ' : 'T',
            'T' : 'T',
            ##
            'BOOL' : 'L',
            'BOOLEAN' : 'L', 
            'BIT' : 'L',
            'L' : 'L',
            ##
            'COUNTER' : 'R',
            'R' : 'R',
            'SERIAL' : 'R', # ifx
            'INT IDENTITY' : 'R',
            ##
            'INT' : 'I',
            'INTEGER' : 'I',
            'INTEGER UNSIGNED' : 'I',
            'SHORT' : 'I',
            'TINYINT' : 'I',
            'SMALLINT' : 'I',
            'I' : 'I',
            ##
            'LONG' : 'N', # interbase is numeric, oci8 is blob
            'BIGINT' : 'N', # this is bigger than PHP 32-bit integers
            'DECIMAL' : 'N',
            'DEC' : 'N',
            'REAL' : 'N',
            'DOUBLE' : 'N',
            'DOUBLE PRECISION' : 'N',
            'SMALLFLOAT' : 'N',
            'FLOAT' : 'N',
            'NUMBER' : 'N',
            'NUM' : 'N',
            'NUMERIC' : 'N',
            'MONEY' : 'N',
            ## informix 9.2
            'SQLINT' : 'I', 
            'SQLSERIAL' : 'I', 
            'SQLSMINT' : 'I', 
            'SQLSMFLOAT' : 'N', 
            'SQLFLOAT' : 'N', 
            'SQLMONEY' : 'N', 
            'SQLDECIMAL' : 'N', 
            'SQLDATE' : 'D', 
            'SQLVCHAR' : 'C', 
            'SQLCHAR' : 'C', 
            'SQLDTIME' : 'T', 
            'SQLINTERVAL' : 'N', 
            'SQLBYTES' : 'B', 
            'SQLTEXT' : 'X'}

class adodb_iter:
    def __iter__(self):
    def next(self):

def NewADOConnection(modulename):
def ADONewConnection(modulename):

class ADOConnection:
    databaseType = None
    dataProvider = 'native'
    host = None
    user = None
    password = None
    database = None
    replaceQuote = "\\'"
    useExceptions = True
    debug = None
    getLOBs = True
    hasRowCount = True
    metaColSQL = 'Invalid'
    fmtDate = '%Y-%m-%d'
    fmtTimeStamp = '%Y-%m-%d %H:%M:%S'
    
    _errormsg = ''
    _errno = 0
    _conn = None
    _autocommit = True
    _connected = True
    
    def __init__(self):
        pass            
    
    def Connect(self,host=None,user=None,password=None,database=None):
    def IsConnected(self):
    def DriverInfo(self):
    def ErrorMsg(self):
    def ErrorNo(self):
    def qstr(self,s):
    def quote(self,s):
    def addq(self,s):
    def Conn(self):
    def _query(self,sql,params=None,_cursor=None):
    def SelectLimit(self,sql,limit,offset=-1,params=None):
    def Execute(self,sql,params=None):
    def UpdateBlob(self,table,field,blob,where,blobtype='BLOB'):
    def UpdateBlobFile(self,table,field,filepath,where,blobtype='BLOB'):
    def UpdateClob(self,table,field,blob,where):
    def GetRows(self,sql,params=None):
    def GetArray(self,sql,params=None):
    def GetAll(self,sql,params=None):
    def GetRow(self,sql,params=None):
    def GetRow(self,sql,params=None):
    def GetOne(self,sql,params=None):
    def GetCol(self, sql, params=None):
    def GetAssoc(self, sql, params=None):
    def GetDict(self, sql, params=None):
    def BeginTrans(self):
    def CommitTrans(self):
    def RollbackTrans(self):
    def Close(self):
    def DBDate(self,d):
    def DBTimeStamp(self,d):
    def Date(self,s):
    def TimeStamp(self,s):
    def MetaType(self, dbtype):
    def MetaColumns(self, table):

        
class ADOCursor:
    _cursor = None
    fields = None
    EOF = False
    _rowcount = 0
    _isselect = False
    _insertid = 0
    _conn = None
    
    def __init__(self,rs,conn,norowcount=False):
    def __iter__(self):
    def RecordCount(self):
    def MoveNext(self):
    def FetchRow(self):

    # returns a tuple of the form (name, type_code,display_size, internal_size, precision, scale,null_ok)
    # note: databases could return name in upper or lower-case
    def FetchField(self,row):
    def Affected_Rows(self):
    def Insert_ID(self):
    def Cursor(self):
    def GetRowAssoc(self,upper=1):
    def Close(self):

#===========================================================
#                  UNIT TESTING
#===========================================================

def _Test_Eq(testid, correct, testval, errmsg=''):
    if correct == testval:
        print "Passed Test: "+testid
    else:
        print ""
        print "********* Failed Test: "+testid
        print "********************** "+str(errmsg)
        print "********************** expected="+str(correct)
        print "**********************   actual="+str(testval)

def Test_Blob(db):
    import os
    src = 'c:/lensserver.gif'
    dest = 'c:/testpy1.gif'
    try:
        os.unlink(dest)
    except:
        pass

    saved = db.debug
    saveb = db.getLOBs
    db.debug = True
    db.getLOBs = True
    
    db.UpdateBlobFile('photos','photo',src,'id=1')
    data = db.GetOne('select photo from photos where id=1')
    f = file(dest,'wb')
    f.write(data)
    f.close()

    rs = db.Execute('select * from photos')
    while not rs.EOF:
        print 'Fields=',rs.fields
        rs.MoveNext()
        
    print "======================="

    rows = db.GetAll('select * from photos where id<=1')
    print rows
    
    db.getLOBs = saveb
    db.debug = saved

def Test(db,debug=False):
    db.DriverInfo()

    if False:
        d = db.Date('2004-03-21')
        print '2004-03-21=',d

        d = db.TimeStamp('2004-03-22 12:50:51')
        print '2004-03-22 12:50:51=',d

        print "DBTimeStamp=", db.DBTimeStamp(d)
    
    db.useExceptions = True # use adodb error handling
    try:
        sql = 'select * from xadoxyz where 0 < id and id < 3'
        rs = db.Execute(sql)
        _Test_Eq('Bad SQL',None, rs, sql)
    except:
        print "And you should see an error message indicating bad table was defined: "
        print "err=",db.ErrorMsg()
    
    print "-----"
    rs = db.Execute('select * from ADOXYZ where 0 < id and id < 3 order by id')
    while not rs.EOF:
        print rs.fields
        rs.MoveNext()
        
    print "You should see 2 rows of data here:"
    rs = db.Execute('select * from adoxyz where 0 < id and id < 3 order by id')
    print "rows=",rs.RecordCount()
    while (not rs.EOF):
        print rs.GetRowAssoc()
        rs.MoveNext()

    print "-----"
    rs = db.Execute('select id,firstname from adoxyz where 0 < id and id < 3 order by id')
    _Test_Eq("Test FetchField",'FIRSTNAME',rs.FetchField(1)[0].upper())
    if (debug): print rs.FetchField(1)
    cnt = 0
    while 1:
        arr=rs.FetchRow()
        if arr == None: break
        cnt += 1
        _Test_Eq('Execute 2.0',cnt,arr[0])

    _Test_Eq('Execute 2.1',2,cnt)
    if rs.RecordCount() == -1: print "*** RecordCount not supported: -1"
    else: _Test_Eq('Execute 2.1 RecordCount',2,rs.RecordCount())  
    
    rs = db.Execute("delete from adoxyz where id=997")
    cnt = rs.Affected_Rows()
    _Test_Eq('Affected_Rows',1,cnt)
    
    ok = db.Execute("insert into adoxyz (id, firstname,lastname) values (997,'python','snake')")
    if not ok:  _Test_Eq('DELETE/INSERT','inserted row','failed insert')
       
    row = db.GetRow("select id,firstname from adoxyz where id=997");        
    _Test_Eq('GetRow',str(997)+' '+'python',str(int(row[0]))+' '+row[1].rstrip(),row)
    
    row = db.GetOne("select id,firstname from adoxyz where id=997");        
    _Test_Eq('GetOne',997,row)

    rs = db.SelectLimit("select id,firstname from adoxyz",3)
    cnt = 0
 
    try:
        for row in rs:
            cnt += 1
            #print rs.fields
        _Test_Eq('SelectLimit',3,cnt)
    except:
        print "Failed Iteration"
        print sys.exc_info()[1]
    
    d = db.GetOne('select created from adoxyz where id=1')
    d2 = db.TimeStamp(d)
    _Test_Eq('DBDate',str(d)[:19],str(d2))

    if (db.qstr("\\show'boat") != "'\\\\show\\'boat'" and db.qstr("\\show'boat") != "'\\show''boat'"):
        _Test_Eq('qstr',"qstr(\\show'boat)", db.qstr("\\show'boat"))
    else:
        _Test_Eq('qstr','1','1')

    try:
        db.debug=True
        print "Testing GetAssoc"
        arr = db.GetAssoc('select firstname,lastname from adoxyz')
        print arr
        print "Testing GetCol"
        arr = db.GetCol('select firstname from adoxyz')
        print arr
    except:
        print sys.exc_info()[1]
        
    try:
        print "MetaColumns:"
        rows = db.MetaColumns('adoxyz')
        print rows
    except:
        print "Failed MetaColumns"
        print sys.exc_info()[1]
    
    try:    
        db.BeginTrans()
        ok = db.Execute("insert into adoxyz (id, firstname,lastname) values (1997,'python','snake')")
        db.RollbackTrans()
        val = db.GetOne('select * from adoxyz where id=1997')
        _Test_Eq('Rollback Test',None,val)
    except:
        print "Failed Rollback Test"
        print sys.exc_info()[1]