Logo Search packages:      
Sourcecode: pythoncard version File versions  Download package

postgreBrowse.py

#!/usr/bin/python

"""
__version__="$Release $"
__date__="Sat Jun  7 13:39:12 BST 2003"
__author__="Jon Dyte <jon@totient.co.uk>"

Module Name: psyopg
Description: Plug in for PythonCard application dbBrowse to provide Psycopg specific functionality

Psycopg version of the mysqlBrowse class
"""

import psycopg
from psycopg import NUMBER, STRING, INTEGER, FLOAT, DATETIME
from psycopg import BOOLEAN, ROWID, LONGINTEGER

class browse:

    # Connection should be a dictionary with at least three keys, 'username',
    # 'password', 'database' - may need to be normalised for other RDBMS
    def __init__(self, connection):
        "Setup the database connection"
        self._system_tables = []
        try:
            # Not providing a db name is guaranteed to ruin our connection
            if not connection['database']:
                raise ValueError
            self._db = psycopg.connect( "user=%s password=%s dbname=%s" % (connection['username'],
                                                                           connection['password'],
                                                                           connection['database']) )
            self._cursor=self._db.cursor()
            # This one is used in getRow
            self._tableName=''
        except:
            self._cursor=None
            self._tableName=None

    def getTables(self):
        "Return a list of all of the non-system tables in <database>.\
        CAVEAT: actually gets all the tables not owned by user 'postgres'"
        stmt ="SELECT t.tablename FROM pg_tables t WHERE tableowner <> \'postgres\'"
        self._cursor.execute(stmt)
        # I'm using a list comprehension here instead of a for loop,
        # either will do but I think this is more concise (unlike this comment)
        return [ x[0] for x in self._cursor.fetchall() if x[0] not in self._system_tables ]

    def getColumns(self, tableName):
        "Get the definition of the columns in tableName"
        stmt = 'select * from %s where 1=0 ' % tableName
        try:
            self._cursor.execute(stmt)
        except:
            return ()
        desc = self._cursor.description
        r = []
        a = r.append
        ## shamelessly borrowed from the ZPsycopgDA for Zope
        for name, type, width, ds, p, scale, null_ok in desc:
            if type == NUMBER:
                if type == INTEGER:
                    type = INTEGER
                elif type == FLOAT:
                    type = FLOAT
                else: type = NUMBER
            elif type == BOOLEAN:
                type = BOOLEAN
            elif type == ROWID:
                type = ROWID
            elif type == DATETIME:
                type = DATETIME
            else:
                type = STRING
            a((name,type.name,0,0,0))
        return r

    def getQueryString(self, tableName):
        "Return a SQL statement which queries all of the columns in tableName"
        tableStructure=self.getColumns(tableName)
        # Construct and return the string
        return 'SELECT %s FROM %s' % (", ".join([column[0] for column in tableStructure]),
                                      tableName)
    
    def getRow(self, tableName):
        "Get a row from tableName"
        # When we upgrade to 2.2 this will be a great candidate for a
        # generator/iterator. In the meantime we use self._tableName to keep
        # track of what we are doing
        if tableName!=self._tableName:
            self._tableName=tableName
            self._cursor.execute(self.getQueryString(tableName))
        return self._cursor.fetchone()

    def getRows(self, tableName):
        "Get all of the rows from tableName"
        # When we upgrade to 2.2 this will be a great candidate for a
        # generator/iterator. In the meantime we use self._tableName to keep
        # track of what we are doing
        if tableName!=self._tableName:
            self._tableName=tableName
        self._cursor.execute(self.getQueryString(tableName))
        return self._cursor.fetchall()

if __name__ == '__main__':
    # We are in an interactive session so run our test routines
    # Connect to the database
    connection={ 'username':'<username>'
                ,'password':'<password>'
                ,'database':'<db name>'}
    dbHolder = browse(connection)
    # Return all of our table names into user_tables
    user_tables = dbHolder.getTables()

    # Print out the structure of each table and its first row
    print "--------------------------------------------------"
    for table in user_tables:
        print table
        print dbHolder.getQueryString(table)
        print dbHolder.getRow(table)
        print "--------------------------------------------------"

Generated by  Doxygen 1.6.0   Back to index