diff --git a/argtest.py b/argtest.py new file mode 100644 index 0000000..576f7b1 --- /dev/null +++ b/argtest.py @@ -0,0 +1,62 @@ +#!/usr/bin/python +# argument test + +import sys +import re +import os +import shutil +from optparse import OptionParser + +# Read command line options +def usage(): + print "Usage: mlab_mysql_import.py mlab_file.csv" + print "Recursive import can be realised by running:" + print "find . -iname '*.tgz.csv' -exec ./mlab_mysql_import.py {} \;" + sys.exit(1) + + +################################################################# +# # +# start of main program # +# # +################################################################# + +parser = OptionParser() +parser.add_option("-q", "--quiet", action="store_false", dest="verbose", default=False, help="don't print status messages to stdout") +(options, args) = parser.parse_args() +if len(args) == 0: + usage() + +print "sys.argv[0] = " + sys.argv[0] +print "args[0] = " + args[0] + +# We might want to iterate over ALL filenames! +filename = args[0] + +try: + f = open(filename, 'r') +except IOError as e: + print 'Could not open file ', filename +# Extract the basename of the filename, as the path is not of interest after this point +filename = os.path.basename(filename) + +def extract_archive_date(filename): + m = re.match('^(\d{4})(\d{2})(\d{2})', filename) + return (m.group(1),m.group(2)) + +# test if archive directory exist, and create it if necessary +def create_archive_dir(ym): + if (not os.path.exists(ym)): + os.makedirs(ym) + +fname = "20120212T000000Z-mlab1-ham01-glasnost-0000.tgz.csv" +(year,month) = extract_archive_date(fname) +create_archive_dir(year +'/'+ month) + + +sys.exit(1) +for file in args: + print "filename=" + file + shutil.move(file,"dest") + + diff --git a/maxmind.py b/maxmind.py new file mode 100644 index 0000000..9a5698f --- /dev/null +++ b/maxmind.py @@ -0,0 +1,100 @@ +#!/usr/bin/python +# +# maxmind.py +# +# A class to represent a full maxmind database as a python hash, to make locId lookups much faster (hopefully) +# +# Algorithm: +# set up two tables: +# startips = list of all the startips, we binary search through this using bisect +# ipranges = hash of all the ranges, where the key is the startip, and the endip is the value +# +# binary search through startips to find the closest startip of the target ip +# get the endip of the range from the hash, and compare +# return locId to caller +# +# Initials: +# AX Axel Roest +# RB Ruben Bloemgarten +# +# Version history +# 20120710 AR first version +# +# ToDO: +# + +import sys +import re +import os +import math +import bisect +from datetime import datetime +import MySQLdb + +################################################################# +# # +# settings # +# # +################################################################# + +# Defaults + +################################################################# +# # +# the meat # +# # +################################################################# + +class MaxMind: + # two tables: + # startips = [] # list of all the startips, we binary search through this using bisect + # ipranges = {} # hash of all the ranges, where the key is the startip, and the endip is the value + + # initialise the object with the database table to instantiate with + def __init__(self, maxmind_db_host, maxmind_db_user, maxmind_db_passwd, maxmind_db_name, maxmind_table_name): + global_start_time = datetime.now() + try: + # Connect to the mysql database + maxmind_db = MySQLdb.connect(host = maxmind_db_host, + user = maxmind_db_user, + passwd = maxmind_db_passwd, + db = maxmind_db_name) + maxmind_cursor = maxmind_db.cursor() + MaxMind.loadTable(self, maxmind_cursor, maxmind_table_name) + except Exception as e: + print('Aborting maxmind due to error: ' + str(e)) + exit(1) + finally: + maxmind_cursor.close() + global_end_time = datetime.now() + print 'MaxMind: Read and indexed `' + maxmind_table_name + '` in ' + str(global_end_time - global_start_time) + ' seconds.' + + def loadTable(self, maxmind_cursor, maxmind_table_name): + # grab everything (whole table, 3,5 million items) + sql = """SELECT startIpNum, endIpNum, locId FROM `{0}`""".format(maxmind_table_name) + maxmind_cursor.execute(sql) + result = maxmind_cursor.fetchall() + if result: + # fromkeys + self.startips = [x[0] for x in result] + self.startips.sort() + self.ipranges = {int(start) : [int(e),int(l)] for start,e,l in result} + + def find_le(self, a, x): + # Find rightmost value less than or equal to x + i = bisect.bisect_right(a, x) + if i: + return a[i-1] + raise ValueError + + def lookup(self, ipnumber): + # print "looking up: " + str(ipnumber) + i = self.find_le(self.startips, ipnumber) + # print "i="+str(i) + (endip, loc) = self.ipranges[i] + # print endip, loc + if ipnumber <= endip: + return loc + else: + return -1 + diff --git a/maxmind_test.py b/maxmind_test.py new file mode 100644 index 0000000..125add8 --- /dev/null +++ b/maxmind_test.py @@ -0,0 +1,53 @@ +#!/usr/bin/python +# +# maxmind_test.py +# +# A test script for the maxmind class +# +# Initials: +# AX Axel Roest +# +# Version history +# 20120710 AR first version +# +# ToDO: +# + +import sys +import re +import os +import math +import bisect +from datetime import datetime +import MySQLdb +import random +from maxmind import MaxMind + +# test + +mm = MaxMind(db_host, db_user, db_passwd, db_name, db_filetable) +ip = 67276848 +loc = mm.lookup(ip) +print str(ip) + ' : ' + str(loc) + +ip = 67277000 +loc = mm.lookup(ip) +print str(ip) + ' : ' + str(loc) + +ip = 67277023 +loc = mm.lookup(ip) +print str(ip) + ' : ' + str(loc) + +ip = 67277024 +loc = mm.lookup(ip) +print str(ip) + ' : ' + str(loc) + +testamount = 100000 +start_time = datetime.now() +for i in range(testamount): + r = random.randint(33996344, 3741319167) + loc = mm.lookup(r) +end_time = datetime.now() +totaltime = end_time - start_time +timeperlookup = totaltime / testamount +print '=====================================\nLookup ' + str(testamount) + ' ips in ' + str(totaltime) + ' seconds = ' + str(timeperlookup) + ' seconds per lookup' \ No newline at end of file diff --git a/mlab_glasnost_update_all_locid.py b/mlab_glasnost_update_all_locid.py new file mode 100644 index 0000000..cea8d82 --- /dev/null +++ b/mlab_glasnost_update_all_locid.py @@ -0,0 +1,180 @@ +#!/usr/bin/python +# +# mlab_glasnost_update_all_locid.py +# +# Initials: +# AX Axel Roest +# +# Version history +# 20120706 AX first version +# +# test: + + +import sys +import re +import os +from optparse import OptionParser +from datetime import datetime +import dateutil.parser as dparser +from dateutil.relativedelta import relativedelta +from subprocess import call +import MySQLdb + +################################################################# +# # +# settings # +# # +################################################################# + +# PLEASE UPDATE THESE SETTINGS +db_host = "localhost" # your host, usually localhost +db_user = "root" # your username +db_passwd = "rootpassword" # your password +db_name = "mlab" # name of the database +db_tables = {"glasnost": "glasnost", "ndt": "ndt"} # a mapping from testname to tablename +db_filetable = 'files' + +# directories +baseDir = '/DATA/mlab/' +logDir = baseDir + 'logs/' + +#files +updatescript= "mlab_glasnost_update_locid.py" +errorLog = "error.log" +processLog = "mlab_maxmind_processed.log" + + +################################################################# +# # +# functions # +# # +################################################################# + +# Blocks_GeoLiteCity_20090601 +def extract_datestring(string): + ''' Returns the datetime contained in string ''' + # Extract the date + date_match = re.match('Blocks_GeoLiteCity_(\d{4}\d{2}\d{2})$', string) + if not date_match: + raise Exception('Error in argument "', string, '" does not contain a valid date.') + fulldate = date_match.group(1) + "000000" + return fulldate + +def extract_date(string): + ''' Returns the datetime contained in string ''' + # Extract the date + date_match = re.match('.*(\d{4})(\d{2})(\d{2})000000$', string) + if not date_match: + raise Exception('Error in argument "', string, '" does not contain a valid date.') + date = datetime(int(date_match.group(1)),int(date_match.group(2)),int(date_match.group(3))) + return date + +# return True if the table exists in the database +def check_maxmind_exist(cur, table): + sql = "select * FROM maxmind.`" + table + "` LIMIT 1" + cur.execute(sql) + if cur.fetchone()[0] < 1: + return False + else: + return True + +# return a list of Blocks_GeoLiteCity_ tables, for looking up the locIds +def get_maxmind_tableset(cur): + sql = "SHOW TABLES FROM `maxmind`" + cur.execute(sql) + allrows = cur.fetchall() + rows = [] + # filter rows + for item in allrows: + m = re.search('Blocks_GeoLiteCity_(\d+)$', item[0]) + if (m): + rows.append(m.group(0)) + return rows + +def get_maxmind_dates(rows): + datehash = {} + # we skip storing the first entry, as we need the date of the second entry first to store the range + skipfirst = True + for table in rows: + date = extract_datestring(table) + if (skipfirst): + skipfirst = False + else: + datehash[olddate] = date + olddate = date + # the skipping is set straight by storing the last entry outside of the loop + # last one is 6 months in the future + lastdate = extract_date(date) + futuredate = lastdate + relativedelta(months = +6) + datehash[olddate] = futuredate.strftime('%Y%m%d') + '000000' + return datehash + +def update_mlab_glasnost(cur,table): + start_datum = extract_datestring(table) + end_datum = maxmind_dates[start_datum] + print 'updating `' + table + '` between ' + start_datum + ' - ' + end_datum + try: + # print ["/usr/bin/python", updatescript, table, "glasnost", start_datum, end_datum ] + call(["/usr/bin/python", updatescript, table, db_tables['glasnost'], start_datum, end_datum ]) + except Exception as e: + print "An error has occured: " +str(e) + + +################################################################# +# # +# start of initialisation # +# Read command line options # +# # +################################################################# + +parser = OptionParser() +parser.add_option("-q", "--quiet", action="store_false", dest="verbose", default=False, help="don't print status messages to stdout") +(options, args) = parser.parse_args() +# check for -h argument here + +# create file if necessary, as open by itself doesn't cut it +f = open(logDir + processLog, 'a') +f.write("\nNew mlab_glasnost_update_all job on " + str(datetime.now())) +f.close + + +################################################################# +# # +# start of main program # +# # +################################################################# +global_start_time = datetime.now() + +try: + # Connect to the mysql database + db = MySQLdb.connect(host = db_host, + user = db_user, + passwd = db_passwd, + db = db_name) + cur = db.cursor() + +except: + sys.stderr.write('Error, cannot connect to database' + db_name + '\n') + +# array with tables to loop over, in case we don't get a table argument +maxmind_all_tables = get_maxmind_tableset(cur) + +# contains hash with key = start_date, value = enddate (= startdate of next table, except for the last one) +maxmind_dates = get_maxmind_dates(maxmind_all_tables) + +if len(args) == 0: + print "Iterating over ALL maxmind tables" + for table in maxmind_all_tables: + if (check_maxmind_exist(cur,table)): + update_mlab_glasnost(cur,table) +else: + print "Iterating over all arguments" + for table in args: + if (check_maxmind_exist(cur,table)): + update_mlab_glasnost(cur,table) + +cur.close() +global_end_time = datetime.now() + +print '=====================================\nAll Glasnost updates Done. ' + str(len(args)) + ' file(s) in ' + str(global_end_time - global_start_time) diff --git a/mlab_glasnost_update_locid.py b/mlab_glasnost_update_locid.py new file mode 100644 index 0000000..b500f28 --- /dev/null +++ b/mlab_glasnost_update_locid.py @@ -0,0 +1,179 @@ +#!/usr/bin/python +# +# mlab_glasnost_update_locid.py +# +# Initials: +# PH Pascal Haakmat +# AR Axel Roest +# +# Version history +# 20120706 PH first version based on mlab_maxmind_processed.py +# 20120707 AR tested on glasnost_test database, update query changed, added location fields update +# +# Note: make sure the glasnost table has separate indexes on longip +# and date columns, i.e.: +# KEY `longip` (`longip`), +# KEY `date` (`date`) +# as part of the schema definition. Do not index locId - this will +# slow things down because MySQL needs to maintain the index for the field +# as we update it. +# +# Example invocation: +# $ python mlab_glasnost_update_locid.py Blocks_GeoLiteCity glasnost 19000101 30000101 +# 2012-07-06 04:28:08 Starting geoIP lookup for date range 19000101-30000101 +# 2012-07-06 04:28:08 Total maxmind rows: 3786204 +# 2012-07-06 04:28:08 Processing in chunks of 100000 records (38 chunk(s)) +# 2012-07-06 04:28:08 Processing chunk #1 (0-99999) +# ... +# 2012-07-06 04:39:58 Processing chunk #38 (3700000-3786203) +# 2012-07-06 04:40:16 Total glasnost changes: 935005 +# 2012-07-06 04:40:16 Finished in 0:12:07.507977 + +import sys +import re +import os +import math +from datetime import datetime +import dateutil.parser as dparser +from dateutil.relativedelta import relativedelta +import MySQLdb + +################################################################# +# # +# settings # +# # +################################################################# + +# Defaults +maxmind_db_host = "localhost" # your host, usually localhost +maxmind_db_user = "dbuser" # your username +maxmind_db_passwd = "password" # your password +maxmind_db_name = "chokepoint_mlab" # name of the database +maxmind_table_name = 'Blocks_GeoLiteCity' + +glasnost_db_host = maxmind_db_host +glasnost_db_user = maxmind_db_user +glasnost_db_passwd = maxmind_db_passwd +glasnost_db_name = "mlab" +glasnost_table_name = 'glasnost_test' + + +################################################################# +# # +# the meat # +# # +################################################################# + +def log(str): + print datetime.now().strftime('%Y-%m-%d %H:%M:%S') + " " + str + sys.stdout.flush() + +def update_glasnost(glasnost_cursor,glasnost_table_name,start_date,end_date,start_ip,end_ip,loc_id,maxmind_table_name): + updated = 0 + try: +# sql = """UPDATE `{0}` SET `locId`={1} WHERE `locId`=0 AND `longip` BETWEEN {2} AND {3} AND `date` BETWEEN {4} AND {5}""".format(glasnost_table_name, loc_id, start_ip, end_ip, start_date, end_date) + sql = """UPDATE `{0}` SET `locId`={1} , `maxmind_table_name` = '{2}' WHERE `longip` BETWEEN {3} AND {4} AND `date` BETWEEN {5} AND {6}""".format(glasnost_table_name, loc_id, maxmind_table_name, start_ip, end_ip, start_date, end_date) + glasnost_cursor.execute(sql) + updated = glasnost_cursor.rowcount + except MySQLdb.Error, e: + log("Error updating glasnost: {0}".format(e)) + return updated + +def update_glasnost_with_location(glasnost_cursor,glasnost_table_name,location_table_name): + updated = 0 + print glasnost_table_name + " - with - " + maxmind_table_name + try: + sql = """UPDATE mlab.`{0}` L, maxmind.`{1}` M SET L.country_code=M.country, L.region=M.region, L.city=M.city, L.postalCode=M.postalCode, L.latitude=M.latitude, L.longitude=M.longitude, L.metroCode=M.metroCode, L.areaCode=M.areaCode WHERE L.`locId` = M.`locId`""".format(glasnost_table_name, location_table_name) + glasnost_cursor.execute(sql) + updated = glasnost_cursor.rowcount + except MySQLdb.Error, e: + log("Error updating glasnost: {0}".format(e)) + return updated + +def process(maxmind_cursor,maxmind_table_name,glasnost_cursor,glasnost_table_name,glasnost_start_date,glasnost_end_date): + location_table_name = maxmind_table_name.replace("Blocks", "Location") + sql = """SELECT COUNT(*) FROM `{0}`""".format(maxmind_table_name) + maxmind_cursor.execute(sql) + result = maxmind_cursor.fetchone() + + maxmind_total = result[0] + + log('Total maxmind rows: '+ str(maxmind_total)) + + chunk_size = 100000 + chunk_count = int(math.ceil(float(maxmind_total) / float(chunk_size))) + + log('Processing in chunks of '+ str(chunk_size) + ' records (' + str(chunk_count) + ' chunk(s))') + + glasnost_changes = 0 + chunk_num = 1 + + for offset in range(0, maxmind_total, chunk_size): + count = min(chunk_size, maxmind_total - offset) + + log('Processing chunk #' + str(chunk_num) + ' (' + str(offset) + '-' + str(offset - 1 + count) + ')') + + sql = """SELECT * FROM `{0}` LIMIT {1} OFFSET {2}""".format(maxmind_table_name, count, offset) + maxmind_cursor.execute(sql) + result = maxmind_cursor.fetchall() + + for row in result: + glasnost_changes = glasnost_changes + update_glasnost(glasnost_cursor,glasnost_table_name,glasnost_start_date,glasnost_end_date,row[0],row[1],row[2], location_table_name) + + chunk_num = chunk_num + 1 + + # update the rest of the fields from the maxmind location table + fieldupdates = update_glasnost_with_location(glasnost_cursor,glasnost_table_name,location_table_name) + log('Total glasnost changes: ' + str(glasnost_changes) + ' [location updates: ' + str(fieldupdates) + ']') + + return glasnost_changes + +################################################################# +# # +# start of initialisation # +# Read command line options # +# # +################################################################# + +if(len(sys.argv) != 5): + print 'usage: {0} maxmind_table_name glasnost_table_name start_date end_date'.format(os.path.basename(sys.argv[0])) + print 'start_date and end_date should be of the format YYYYMMDDHHMMSS' + exit(1) + +maxmind_table_name = sys.argv[1] +glasnost_table_name = sys.argv[2] +start_date = sys.argv[3] +end_date = sys.argv[4] + +################################################################# +# # +# start of main program # +# # +################################################################# +global_start_time = datetime.now() + +log('Starting geoIP lookup for date range {0}-{1}'.format(start_date, end_date)) + +try: + # Connect to the mysql database + maxmind_db = MySQLdb.connect(host = maxmind_db_host, + user = maxmind_db_user, + passwd = maxmind_db_passwd, + db = maxmind_db_name) + maxmind_cursor = maxmind_db.cursor() + glasnost_db = MySQLdb.connect(host = glasnost_db_host, + user = glasnost_db_user, + passwd = glasnost_db_passwd, + db = glasnost_db_name) + glasnost_cursor = glasnost_db.cursor() + process(maxmind_cursor,maxmind_table_name,glasnost_cursor,glasnost_table_name,start_date,end_date) +except Exception as e: + log('Aborting due to error: ' + str(e)) + exit(1) +finally: + maxmind_cursor.close() + glasnost_cursor.close() + +global_end_time = datetime.now() + +log('Finished in ' + str(global_end_time - global_start_time)) diff --git a/mlab_maxmind_processed.py b/mlab_maxmind_processed.py new file mode 100755 index 0000000..224fea1 --- /dev/null +++ b/mlab_maxmind_processed.py @@ -0,0 +1,191 @@ +#!/usr/bin/python +# +# mlab_maxmind_processed.py +# +# Initials: +# RB Ruben Bloemgarten +# AX Axel Roest +# +# Version history +# 20120629 AX first version +# 20120701 AX cleanup of unused methods, Ruben fixed the locID bug, iterate over all maxmind tables if no argument given, fixed sql update query +# +# test: +# cd /DATA +# python scripts/mlab/mlab_mysql_import2.py mlab/clean/glasnost/20090128T000000Z-batch-batch-glasnost-0002.tgz.csv +# +# ToDO: v loop over all arguments in sys.argv[0] +# v deduplication toevoegen (put in hash, test on hash, clear hash for each file, but keep last entry +# v move files naar archive directory +# v move error files naar error directory +# v log process and errors +# todo: loop through all maxmind tables and update full mlab set + +import sys +import re +import os +from optparse import OptionParser +from datetime import datetime +import dateutil.parser as dparser +from dateutil.relativedelta import relativedelta +import MySQLdb + +################################################################# +# # +# settings # +# # +################################################################# + +# PLEASE UPDATE THESE SETTINGS +db_host = "localhost" # your host, usually localhost +db_user = "root" # your username +db_passwd = "rootpassword" # your password +db_name = "mlab" # name of the database +db_tables = {"glasnost": "glasnost", "ndt": "ndt"} # a mapping from testname to tablename +db_filetable = 'files' + +# directories +baseDir = '/DATA/mlab/' +logDir = baseDir + 'logs/' + +#files +errorLog = "error.log" +processLog = "mlab_maxmind_processed.log" + +################################################################# +# # +# functions # +# # +################################################################# + +def usage(): + print "Usage: mlab_maxmind_processed.py maxmind_table" + sys.exit(1) + +# Blocks_GeoLiteCity_20090601 +def extract_datestring(string): + ''' Returns the datetime contained in string ''' + # Extract the date + date_match = re.match('Blocks_GeoLiteCity_(\d{4}\d{2}\d{2})$', string) + if not date_match: + raise Exception('Error in argument "', string, '" does not contain a valid date.') + return date_match.group(1) + +def extract_date(string): + ''' Returns the datetime contained in string ''' + # Extract the date + date_match = re.match('.*(\d{4})(\d{2})(\d{2})$', string) + if not date_match: + raise Exception('Error in argument "', string, '" does not contain a valid date.') + date = datetime(int(date_match.group(1)),int(date_match.group(2)),int(date_match.group(3))) + return date + +# return True if the table exists in the database +def check_maxmind_exist(cur, table): + sql = "select * FROM maxmind.`" + table + "` LIMIT 1" + cur.execute(sql) + if cur.fetchone()[0] < 1: + return False + else: + return True + +# return a list of Blocks_GeoLiteCity_ tables, for looking up the locIds +def get_maxmind_tableset(cur): + sql = "SHOW TABLES FROM `maxmind`" + cur.execute(sql) + allrows = cur.fetchall() + rows = [] + # filter rows + for item in allrows: + m = re.search('Blocks_GeoLiteCity_(\d+)$', item[0]) + if (m): + rows.append(m.group(0)) + return rows + +def get_maxmind_dates(rows): + datehash = {} + # we skip storing the first entry, as we need the date of the second entry first to store the range + skipfirst = True + for table in rows: + date = extract_datestring(table) + if (skipfirst): + skipfirst = False + else: + datehash[olddate] = date + olddate = date + # the skipping is set straight by storing the last entry outside of the loop + # last one is 6 months in the future + lastdate = extract_date(date) + print 'date=' + date + " = " + str(lastdate) + # futuredate = lastdate + datetime.timedelta(365 * 6/12) + futuredate = lastdate + relativedelta(months = +6) + datehash[olddate] = futuredate.strftime('%Y%m%d') + return datehash + +def update_mlab_glasnost(cur,table): + start_datum = extract_datestring(table) + end_datum = maxmind_dates[start_datum] + print 'updating between' + start_datum + ' AND ' + end_datum + try: + sql = 'UPDATE mlab.glasnost L, maxmind.' + table + ' M SET L.locId = M.locId WHERE L.longip BETWEEN M.startIpNum AND M.endIpNum AND L.date BETWEEN "' + start_datum + '" AND "' + end_datum + '" AND L.locId = 0' + print sql + cur.execute(sql) + except MySQLdb.Error, e: + print "An error has been passed. %s" %e + +################################################################# +# # +# start of initialisation # +# Read command line options # +# # +################################################################# + +parser = OptionParser() +parser.add_option("-q", "--quiet", action="store_false", dest="verbose", default=False, help="don't print status messages to stdout") +(options, args) = parser.parse_args() +# check for -h argument here + +# create file if necessary, as open by itself doesn't cut it +f = open(logDir + processLog, 'a') +f.write("\nNew mlab_maxmind_processed job on " + str(datetime.now())) +f.close + +################################################################# +# # +# start of main program # +# # +################################################################# +global_start_time = datetime.now() + +try: + # Connect to the mysql database + db = MySQLdb.connect(host = db_host, + user = db_user, + passwd = db_passwd, + db = db_name) + cur = db.cursor() + +except: + sys.stderr.write('Error, cannot connect to database' + db_name + '\n') + +# array with tables to loop over, in case we don't get a table argument +maxmind_all_tables = get_maxmind_tableset(cur) + +# contains hash with key = start_date, value = enddate (= startdate of next table, except for the last one) +maxmind_dates = get_maxmind_dates(maxmind_all_tables) + +if len(args) == 0: + print "Iterating over ALL maxmind tables" + for table in maxmind_all_tables: + if (check_maxmind_exist(cur,table)): + update_mlab_glasnost(cur,table) +else: + print "Iterating over all arguments" + for table in args: + if (check_maxmind_exist(cur,table)): + update_mlab_glasnost(cur,table) + +cur.close() +global_end_time = datetime.now() + +print '=====================================\nAll Done. ' + str(len(args)) + ' file(s) in ' + str(global_end_time - global_start_time) diff --git a/mlab_mysql_import.py b/mlab_mysql_import.py index df8234b..3644578 100755 --- a/mlab_mysql_import.py +++ b/mlab_mysql_import.py @@ -1,4 +1,36 @@ #!/usr/bin/python +# +# Initials: SF Simon Funke +# RB Ruben Bloemgarten +# AX Axel Roest +# +# Version history +# 2012xxxx SF first version +# 20120628 AX removed testing for every line, added timing code, +# 20120629 AX added loop over all arguments, exception handling, restructured code, moved processed files to archive or error folder +# 20120708 AX skip empty ip lines instead or error message +# 20120708 RB cleaning some names and spelling, also we don't want processed_files.log to clobber the downloaders processed_files.log. So we should use overly descriptive names +# 20120710 AX added locId lookup and added longip to insert query +# +# test: +# cd /DATA +# python scripts/mlab/mlab_mysql_import2.py mlab/clean/glasnost/20090128T000000Z-batch-batch-glasnost-0002.tgz.csv +# +# ToDO: v loop over all arguments in sys.argv[0] +# v deduplication toevoegen (put in hash, test on hash, clear hash for each file, but keep last entry +# v move files naar archive directory +# v move error files naar error directory +# v log process and errors +# v skip empty ip lines instead or error message +# v added locId lookup and added longip to insert query +# +# Get the date from the filename, and look up the correct maxmind database +# then, insert the locId directly with the line in the mlab/{glasnost,ndt} database, preventing slow future updates +# on the other hand, all these updates might be extremely slow: TEST +# +# todo : refactor all the utility functions in a separate file +# todo : refactor all the passwords in a separate file (which is NOT in the repo, AND is in the .gitignore list + import sys import re import os @@ -6,93 +38,138 @@ from datetime import datetime import dateutil.parser as dparser import MySQLdb +import shutil +from maxmind import MaxMind +import socket, struct + +################################################################# +# # +# settings # +# # +################################################################# # PLEASE UPDATE THESE SETTINGS db_host = "localhost" # your host, usually localhost db_user = "root" # your username -db_passwd = "rootpassword" # your password +db_passwd = "" # your password db_name = "mlab" # name of the database -db_tables = {"glasnost": "glasnost", "ndt": "ndt"} # a mapping from testname to tablename +db_tables = {"glasnost": "glasnost", "ndt": "ndt_test"} # a mapping from testname to tablename db_filetable = 'files' -# Read command line options -def usage(): - print "Usage: mlab_mysql_import.py mlab_file.csv" - print "Recursive import can be realised by running:" - print "find . -iname '*.tgz.csv' -exec ./mlab_mysql_import.py {} \;" - sys.exit(1) +# directories +baseDir = '/DATA/mlab/' +#baseDir = '/home/axel/mlab/' +scratchDir = baseDir + 'scratch/' +workDir = baseDir + 'work/' +archiveDir = baseDir + 'archive/' +errorDir = baseDir + 'error/' +logDir = baseDir + 'logs/' +cleanDir = baseDir + 'clean/' -parser = OptionParser() -parser.add_option("-q", "--quiet", action="store_false", dest="verbose", default=False, help="don't print status messages to stdout") -(options, args) = parser.parse_args() -if len(args) == 0: - usage() +#files +errorLog = "mlab_mysql_import_error.log" +processLog = "mlab_mysql_import_processed_files.log" -filename = args[0] -try: - f = open(filename, 'r') -except IOError as e: - print 'Could not open file ', filename -# Extract the basename of the filename, as the path is not of interest after this point -filename = os.path.basename(filename) +# default tables +maxmind_table = 'Blocks_GeoLiteCity_Last' +ndt_import = 'ndt_import' +################################################################# +# # +# functions # +# # +################################################################# +# Convert an IP string to long +def ip2long(ip): + packedIP = socket.inet_aton(ip) + return struct.unpack("!L", packedIP)[0] + +def long2ip(l): + return socket.inet_ntoa(struct.pack('!L', l)) + +def usage(): + print "Usage: mlab_mysql_import.py [ -m maxmind_Blocks_Tablename ] mlab_file1.csv [mlab_files.csv ...]" + print "Default: maxmind_Blocks_Tablename = `Blocks_GeoLiteCity_Last`" + sys.exit(1) + +# This routine extracts the destination server of the mlab file. +# It assumes that the filename has the form like 20100210T000000Z-mlab3-dfw01-ndt-0000.tgz.csv +# def extract_destination(filename): - ''' This routine extracts the destination server of the mlab file. - It assumes that the filename has the form like 20100210T000000Z-mlab3-dfw01-ndt-0000.tgz.csv ''' # Split the filename and perform some tests if it conforms to our standard f_split = filename.split('-') if len(f_split) < 3: - print "The specified filename (", filename, ") should contain at least two '-' characters that delimit the data, destination and the suffix." - sys.exit(1) + raise Exception("The specified filename (", filename, ") should contain at least two '-' characters that delimit the data, destination and the suffix.") + if '.tgz.csv' not in f_split[-1]: print "The specified filename (", filename, ") should end with '.tgz.csv'." return '.'.join(filename.split('-')[1:-1]) +# Returns the datetime contained in string. def extract_datetime(string): - ''' Returns the datetime contained in string ''' # Extract the date date_match = re.search(r'\d{4}/\d{2}/\d{2}', string) if not date_match: - print 'Error im import: line "', string, '" does not contain a valid date.' - sys.exit(1) + raise Exception('Error in import: line "', string, '" does not contain a valid date.') # Extract the time time_match = re.search(r'\d{2}:\d{2}:\d{2}', string) if not time_match: - print 'Error im import: line "', string, '" does not contain a valid time.' - sys.exit(1) + raise Exception('Error in import: line "', string, '" does not contain a valid time.') try: return dparser.parse(date_match.group(0) + ' ' + time_match.group(0), fuzzy=True) except ValueError: - raise ValueError, 'Error im import: line "' + string + '" does not contain a valid date and time.' + raise ValueError, 'Error in import: line "' + string + '" does not contain a valid date and time.' +# Returns the first valid ip address contained in string. +# return with empty string when we encounter cputime, or no ip number def extract_ip(string): - ''' Returns the first valid ip address contained in string ''' + if re.search('cputime', string): + return '' # Extract the date match = re.search(r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}', string) if not match: - print 'Error im import: line "', string, '" does not contain a valid ip address.' - sys.exit(1) + # ignore file + return '' + # raise Exception ('Error in import: line "', string, '" does not contain a valid ip address.') return match.group(0) +# Test if the entry already exists in the database +def exists_dbentry(cur, file_id, db_table, test_datetime, destination, source_ip): + # Check if the entry exists already + sql = "SELECT COUNT(*) FROM " + db_table + " WHERE date = '" + test_datetime.isoformat() + "' AND destination = '" + destination + "' AND source = '" + source_ip + "' AND file_id = " + str(file_id) + cur.execute(sql) + + if cur.fetchone()[0] < 1: + return False + else: + return True + +# Insert a connection to the database without testing. +def blunt_insert_dbentry(cur, file_id, db_table, test_datetime, destination, source_ip): + longip = ip2long(source_ip) + # locid = 0 + locid = mm.lookup(longip) # lookup location id from ip number + columns = ', '.join(['date', 'destination', 'source', 'file_id', 'longip', 'locId']) + values = '"' + '", "'.join([test_datetime.isoformat(), destination, source_ip, str(file_id), str(longip), str(locid)]) + '"' + sql = "INSERT INTO " + db_table + " (" + columns + ") VALUES(" + values + ") " + # print sql + cur.execute(sql) + +# Insert a test connection to the database, if it not already exists def insert_dbentry(cur, file_id, db_table, test_datetime, destination, source_ip): - ''' Insert a test connection to the database if it not already exists ''' # Check if the entry exists already sql = "SELECT COUNT(*) FROM " + db_table + " WHERE date = '" + test_datetime.isoformat() + "' AND destination = '" + destination + "' AND source = '" + source_ip + "' AND file_id = " + str(file_id) cur.execute(sql) - # If not, then isert it + # If not, then insert it if cur.fetchone()[0] < 1: print 'Found new test performed on the', test_datetime, 'from ' + destination + ' -> ' + source_ip + '.' + blunt_insert_dbentry(cur, file_id, db_table, test_datetime, destination, source_ip) - columns = ', '.join(['date', 'destination', 'source', 'file_id']) - values = '"' + '", "'.join([test_datetime.isoformat(), destination, source_ip, str(file_id)]) + '"' - sql = "INSERT INTO " + db_table + " (" + columns + ") VALUES(" + values + ") " - cur.execute(sql) - +# Returns the id of a filename in the filename table. Creates a new row if the filename does not exist. def get_file_id(cur, filename): - ''' Returns the id of a filename in the filename table. Creates a new row if the filename does not exist. ''' sql = "SELECT id FROM " + db_filetable + " WHERE filename ='" + filename + "'" cur.execute(sql) id = cur.fetchone() @@ -103,40 +180,189 @@ def get_file_id(cur, filename): return get_file_id(cur, filename) return id[0] -# Connect to the mysql database -db = MySQLdb.connect(host = db_host, - user = db_user, - passwd = db_passwd, - db = db_name) -cur = db.cursor() - -# Find the destination server by investigating the filename -destination = extract_destination(filename) -print 'Found destination: ', destination - -# Get the filename id from the files table -file_id = get_file_id(cur, filename) -db.commit() - -# Find the testsuite by investigating the filename -try: - test = [test for test in db_tables.keys() if test in filename][0] -except IndexError: - print 'The filename ' + filename + ' does not contain a valid testname.' - sys.exit(1) -print "Found test suite " + test - -# Read the file line by line and import it into the database -for line in f: - line = line.strip() - source_ip = extract_ip(line) - test_datetime = extract_datetime(line) - insert_dbentry(cur, file_id, db_tables[test], test_datetime, destination, source_ip) - -# Commit and finish up -print 'Writing changes to database' -db.commit() - -# disconnect from server -db.close() -print 'Done' +# do deduplucation of connection strings +def dedup(file_id, table, test_datetime, destination, source_ip): + key = str(file_id) + table + str(test_datetime) + destination + source_ip + if key in deduplookup: + return False + else: + deduplookup[key] = True + return True + +# for the temp table, look up all the locations with the locId +def lookup_locations(cur, destination): + location_table_name = maxmind_table.replace("Blocks", "Location") + # sql = 'UPDATE mlab.`' + destination + '` L, maxmind.`' + location_table_name + '` M SET L.country_code = M.country, L.region=M.region, L.city=M.city, L.postalCode=M.postalCode, L.latitude=M.latitude, L.longitude=M.longitude, L.metroCode=M.metroCode, L.areaCode=M.areaCode WHERE L.`locId` = M.`locId`' + sql = 'UPDATE mlab.`ndt_import` L, maxmind.`' + location_table_name + '` M SET L.country_code = M.country, L.region=M.region, L.city=M.city, L.postalCode=M.postalCode, L.latitude=M.latitude, L.longitude=M.longitude, L.metroCode=M.metroCode, L.areaCode=M.areaCode WHERE L.`locId` = M.`locId`' + updated = cur.execute(sql) + # update country from country_code later? + return updated + +# clear the temp table +def clear_temp_table(cur): + sql = 'truncate table `' + ndt_import + '`' + cur.execute(sql) + +# move the temp table to the real on (either ndt_test or ndt) +def move_temp_table(cur, destination): + sql = 'INSERT INTO `' + destination + '` (`created_at`, `date`, `destination`, `source`, `file_id`, `country_code`, `longip`, `locId`, `country`, `region`, `city`, `postalCode`, `latitude`, `longitude`, `metroCode`, `areaCode`) SELECT * FROM `' + ndt_import + '`' + updated = cur.execute(sql) + return updated + +# returns True on error, False on correct processing +def process_file(f, filename): + start_time = datetime.now() + failure = True + try: + # Connect to the mysql database + db = MySQLdb.connect(host = db_host, + user = db_user, + passwd = db_passwd, + db = db_name) + cur = db.cursor() + clear_temp_table(cur) + + # Find the destination server by investigating the filename + destination = extract_destination(filename) + print 'Destination: ', destination, + + # Get the filename id from the files table + file_id = get_file_id(cur, filename) + db.commit() + + # Find the testsuite (glasnost or ndt) by investigating the filename + try: + test = [test for test in db_tables.keys() if test in filename][0] + except IndexError: + sys.stderr.write('The filename ' + filename + ' does not contain a valid testname.') + return 1 + # print "Found test suite " + test + + # The filetest ALONE, takes 3 seconds with a 9 million records database, without indexes + # But falls back to less than half a second when indexing is turned on on the db + filetest=True + # Read the file line by line and import it into the database + for line in f: + line = line.strip() + source_ip = extract_ip(line) + if ('' == source_ip): + continue # skip empty lines instead of error reporting them + test_datetime = extract_datetime(line) + if (filetest): + if (exists_dbentry(cur, file_id, db_tables[test], test_datetime, destination, source_ip)): + # this file has already been read: ABORT WITH ERROR + raise Exception('File entry already exist in db; the file has already been read: ' + filename) + filetest=False + # test if we have already done it in this or last filetest + if (dedup(file_id, db_tables[test], test_datetime, destination, source_ip)): + # blunt_insert_dbentry(cur, file_id, db_tables[test], test_datetime, destination, source_ip) + blunt_insert_dbentry(cur, file_id, ndt_import, test_datetime, destination, source_ip) + end_time = datetime.now() + print 'File done in ' + str(end_time - start_time) + lookup_locations(cur, destination) + move_temp_table(cur, db_tables[test]) + failure = False + except Exception as inst: + sys.stderr.write('Exception: '+str(inst.args) + '\n') + with open(logDir + errorLog, 'a') as f: + f.write(pathname + '\n') + f.write('Exception: '+str(inst.args) + '\n') + print + except IOError as e: + sys.stderr.write('Error handling file ' + filename + ' (' + str(e.args) + ')\n') + with open(logDir + errorLog, 'a') as f: + f.write(pathname + '\n') + f.write('Error handling file ' + filename + ' (' + str(e.args) + ')\n') + print + finally: + # Commit and finish up + sys.stderr.flush() + # db.commit() + # disconnect from server + db.close() + + return failure + +# get the test date from the archive filename +def extract_archive_date(filename): + m = re.match('^(\d{4})(\d{2})(\d{2})', filename) + return (m.group(1),m.group(2)) + +# test if archive directory exist, and create it if necessary +def create_archive_dir(ym): + if (not os.path.exists(ym)): + os.makedirs(ym) + return ym + +# move processed file to archive folder +def move_archive(pathname): + fname = os.path.basename(pathname) + (year,month) = extract_archive_date(fname) + aDir = create_archive_dir(archiveDir + year +'/'+ month) + shutil.move(pathname,aDir) + with open(logDir + processLog, 'a') as f: + f.write(pathname + '\n') + + +################################################################# +# # +# start of initialisation # +# Read command line options # +# # +################################################################# + +parser = OptionParser() +parser.add_option("-q", "--quiet", action="store_false", dest="verbose", default=False, help="don't print status messages to stdout") +parser.add_option("-m", "--maxmind", dest="maxmind_table", default='', help="optional maxmind_table, if omitted we use 'Last'") +(options, args) = parser.parse_args() +if options.maxmind_table != '': + maxmind_table = options.maxmind_table + +if len(args) == 0: + usage() + +# create file if necessary, as open by itself doesn't cut it +f = open(logDir + processLog, 'a') +f.write("\nNew batchjob on " + str(datetime.now())) +f.close + +# deduplookup is a hash we use for de-duplication of input lines +# maybe it is necessary to purge parts of it during the duration of the import +# but then we have to carefully monitor tests that appear in multiple files +# OR store the last test in a separate global (dirty? yeah, I know) +deduplookup = {} + +################################################################# +# # +# start of main program # +# # +################################################################# +global_start_time = datetime.now() + +# get instance of maxmind table +print "using " + maxmind_table + +mm = MaxMind(db_host, db_user, db_passwd, "maxmind",maxmind_table) + +if not mm: + sys.stderr.write('maxmind table does not exist: ' + maxmind_table + ' (' + str(e.args) + ')\n') + exit(1) + +# Iterate over ALL filenames +for pathname in args: + try: + with open(pathname, 'r') as f: + # Extract the basename of the filename, as the path is not of interest after this point + filename = os.path.basename(pathname) + print "processing file " + filename, + if (process_file(f, filename)): + shutil.move(pathname,errorDir) + else: + move_archive(pathname) + # file is automatically closed if needed + except IOError as e: + print 'Could not open file ' + pathname + '\nError: ' + str(e.args) + +global_end_time = datetime.now() + +print '=====================================\nAll Done. ' + str(len(args)) + ' file(s) in ' + str(global_end_time - global_start_time)