I need to read some data form csv files and as it's i need to put in single mysql database table. I written very simple classes for reading data in dictonary format and sending data to mysql database.
I used two python modules
1. csv
2. MySQLdb
we can divide this task in two class one can handle the csv reader and another can handle database.
Now have look of first class
import csv
class myReader:
def __init__(self, csvfile ):
self.reader = csv.DictReader(open(csvfile, "rb"), delimiter = ",", skipinitialspace=True)
def getRow(self):
try:
row = self.reader.next()
except:
row = None
return row
def __del__(self):
print "Destorying myReader object"
after reading the row from csv file, we can directly pass to our table class which have create sql based on number of columns in row data, sometimes may be your row have less number of columns then table.
import MySQLdb
class csvTable:
def __init__(self, db ):
self.db = db
def additem(self, item):
cursor = self.db.cursor()
vallist=''
collist=''
for key in item:
if key=='':
continue
if vallist=='':
vallist="%("+key+")s"
collist = key
else:
vallist = vallist + "," + "%("+key+")s"
collist = collist + "," + key
sql = "INSERT INTO csvDataTable ("+ collist + ") VALUES (" + vallist + ")"
cursor.execute(sql,item)
cursor.close()
return
def savedata(self):
'''
commit the data in database
'''
self.db.commit()
return
def __del__(self):
print "Destroying the csvTable object"
Now we need to write unittest function.
def processcsvfile(csvfile):
print "inserting data of " + csvfile
conn = MySQLdb.connect(host= "localhost",user="mysqluser",passwd="password",db="mydb")
rd = myReader (csvfile)
ct = csvTable(conn)
rw = rd.getRow()
i=0;
while rw:
try:
ct.additem(rw)
except:
print "Unexpected error:", sys.exc_info()
rw = rd.getRow()
i=i+1
print "Number of recored processed :" + str(i)
ct.savedata()
conn.close()
I used two python modules
1. csv
2. MySQLdb
we can divide this task in two class one can handle the csv reader and another can handle database.
Now have look of first class
import csv
class myReader:
def __init__(self, csvfile ):
self.reader = csv.DictReader(open(csvfile, "rb"), delimiter = ",", skipinitialspace=True)
def getRow(self):
try:
row = self.reader.next()
except:
row = None
return row
def __del__(self):
print "Destorying myReader object"
after reading the row from csv file, we can directly pass to our table class which have create sql based on number of columns in row data, sometimes may be your row have less number of columns then table.
import MySQLdb
class csvTable:
def __init__(self, db ):
self.db = db
def additem(self, item):
cursor = self.db.cursor()
vallist=''
collist=''
for key in item:
if key=='':
continue
if vallist=='':
vallist="%("+key+")s"
collist = key
else:
vallist = vallist + "," + "%("+key+")s"
collist = collist + "," + key
sql = "INSERT INTO csvDataTable ("+ collist + ") VALUES (" + vallist + ")"
cursor.execute(sql,item)
cursor.close()
return
def savedata(self):
'''
commit the data in database
'''
self.db.commit()
return
def __del__(self):
print "Destroying the csvTable object"
Now we need to write unittest function.
def processcsvfile(csvfile):
print "inserting data of " + csvfile
conn = MySQLdb.connect(host= "localhost",user="mysqluser",passwd="password",db="mydb")
rd = myReader (csvfile)
ct = csvTable(conn)
rw = rd.getRow()
i=0;
while rw:
try:
ct.additem(rw)
except:
print "Unexpected error:", sys.exc_info()
rw = rd.getRow()
i=i+1
print "Number of recored processed :" + str(i)
ct.savedata()
conn.close()
No comments:
Post a Comment
would you like it. :)