From 99ed3857abbaf374c92082674013f0c731ff73a1 Mon Sep 17 00:00:00 2001 From: Axel Roest Date: Fri, 29 Jun 2012 13:28:12 +0200 Subject: [PATCH 01/12] Bringing over changes from other branch 1000x speed improvement, by not testing every line in the db, added timing and todo list --- mlab_mysql_import.py | 74 +++++++++++++++++++++++++++++++++++++++----- 1 file changed, 66 insertions(+), 8 deletions(-) diff --git a/mlab_mysql_import.py b/mlab_mysql_import.py index df8234b..c84111a 100755 --- a/mlab_mysql_import.py +++ b/mlab_mysql_import.py @@ -1,4 +1,23 @@ #!/usr/bin/python +# +# Initials: SF Simon Funke +# RB Ruben Bloemgarten +# AX Axel Roest +# +# Version history +# 2012xxxx SF first version +# 20120628 AX removed testing for every line, added timing code +# +# test: +# cd /DATA +# python scripts/mlab/mlab_mysql_import2.py mlab/clean/glasnost/20090128T000000Z-batch-batch-glasnost-0002.tgz.csv +# +# ToDO: loop over all arguments in sys.argv[0] +# deduplication toevoegen (put in hash, test on hash, clear hash for each file, but keep last entry +# move files naar done directory +# move error files naar error directory +# + import sys import re import os @@ -22,12 +41,20 @@ def usage(): print "find . -iname '*.tgz.csv' -exec ./mlab_mysql_import.py {} \;" sys.exit(1) + +################################################################# +# # +# start of initialisation # +# # +################################################################# + parser = OptionParser() parser.add_option("-q", "--quiet", action="store_false", dest="verbose", default=False, help="don't print status messages to stdout") (options, args) = parser.parse_args() if len(args) == 0: usage() +# We might want to iterate over ALL filenames! filename = args[0] try: f = open(filename, 'r') @@ -76,8 +103,26 @@ def extract_ip(string): sys.exit(1) return match.group(0) +def exists_dbentry(cur, file_id, db_table, test_datetime, destination, source_ip): + ''' Test if the entry already exists in the database ''' + # Check if the entry exists already + sql = "SELECT COUNT(*) FROM " + db_table + " WHERE date = '" + test_datetime.isoformat() + "' AND destination = '" + destination + "' AND source = '" + source_ip + "' AND file_id = " + str(file_id) + cur.execute(sql) + + if cur.fetchone()[0] < 1: + return False + else: + return True + +def blunt_insert_dbentry(cur, file_id, db_table, test_datetime, destination, source_ip): + ''' Insert a connection to the database without testing ''' + columns = ', '.join(['date', 'destination', 'source', 'file_id']) + values = '"' + '", "'.join([test_datetime.isoformat(), destination, source_ip, str(file_id)]) + '"' + sql = "INSERT INTO " + db_table + " (" + columns + ") VALUES(" + values + ") " + cur.execute(sql) + def insert_dbentry(cur, file_id, db_table, test_datetime, destination, source_ip): - ''' Insert a test connection to the database if it not already exists ''' + ''' Insert a test connection to the database, if it not already exists ''' # Check if the entry exists already sql = "SELECT COUNT(*) FROM " + db_table + " WHERE date = '" + test_datetime.isoformat() + "' AND destination = '" + destination + "' AND source = '" + source_ip + "' AND file_id = " + str(file_id) cur.execute(sql) @@ -85,11 +130,7 @@ def insert_dbentry(cur, file_id, db_table, test_datetime, destination, source_ip # If not, then isert it if cur.fetchone()[0] < 1: print 'Found new test performed on the', test_datetime, 'from ' + destination + ' -> ' + source_ip + '.' - - columns = ', '.join(['date', 'destination', 'source', 'file_id']) - values = '"' + '", "'.join([test_datetime.isoformat(), destination, source_ip, str(file_id)]) + '"' - sql = "INSERT INTO " + db_table + " (" + columns + ") VALUES(" + values + ") " - cur.execute(sql) + blunt_insert_dbentry(cur, file_id, db_table, test_datetime, destination, source_ip) def get_file_id(cur, filename): ''' Returns the id of a filename in the filename table. Creates a new row if the filename does not exist. ''' @@ -103,6 +144,13 @@ def get_file_id(cur, filename): return get_file_id(cur, filename) return id[0] +################################################################# +# # +# start of main program # +# # +################################################################# + +start_time = datetime.now() # Connect to the mysql database db = MySQLdb.connect(host = db_host, user = db_user, @@ -127,11 +175,19 @@ def get_file_id(cur, filename): print "Found test suite " + test # Read the file line by line and import it into the database +filetest=True for line in f: line = line.strip() source_ip = extract_ip(line) test_datetime = extract_datetime(line) - insert_dbentry(cur, file_id, db_tables[test], test_datetime, destination, source_ip) + if (filetest): + if (exists_dbentry(cur, file_id, db_tables[test], test_datetime, destination, source_ip)): + # this file has already been read: ABORT WITH ERROR + sys.stderr.write('The file has already been read: ' + filename) + sys.stderr.flush() + sys.exit('file entry already exist in db') + filetest=False + blunt_insert_dbentry(cur, file_id, db_tables[test], test_datetime, destination, source_ip) # Commit and finish up print 'Writing changes to database' @@ -139,4 +195,6 @@ def get_file_id(cur, filename): # disconnect from server db.close() -print 'Done' +end_time = datetime.now() + +print 'Done in ' + str(end_time - start_time) From 20bc2059dceff947e9a2d7b351148634cf825c74 Mon Sep 17 00:00:00 2001 From: Axel Roest Date: Fri, 29 Jun 2012 21:50:38 +0200 Subject: [PATCH 02/12] handle multiple files, with error handling --- mlab_mysql_import.py | 247 ++++++++++++++++++++++++++++--------------- 1 file changed, 163 insertions(+), 84 deletions(-) diff --git a/mlab_mysql_import.py b/mlab_mysql_import.py index c84111a..e1c3780 100755 --- a/mlab_mysql_import.py +++ b/mlab_mysql_import.py @@ -1,22 +1,23 @@ #!/usr/bin/python # -# Initials: SF Simon Funke -# RB Ruben Bloemgarten -# AX Axel Roest +# Initials: SF Simon Funke +# RB Ruben Bloemgarten +# AX Axel Roest # # Version history -# 2012xxxx SF first version -# 20120628 AX removed testing for every line, added timing code +# 2012xxxx SF first version +# 20120628 AX removed testing for every line, added timing code, +# 20120629 AX added loop over all arguments, exception handling, restructured code # # test: # cd /DATA # python scripts/mlab/mlab_mysql_import2.py mlab/clean/glasnost/20090128T000000Z-batch-batch-glasnost-0002.tgz.csv # # ToDO: loop over all arguments in sys.argv[0] -# deduplication toevoegen (put in hash, test on hash, clear hash for each file, but keep last entry -# move files naar done directory -# move error files naar error directory -# +# deduplication toevoegen (put in hash, test on hash, clear hash for each file, but keep last entry +# move files naar done directory +# move error files naar error directory +# import sys import re @@ -25,6 +26,13 @@ from datetime import datetime import dateutil.parser as dparser import MySQLdb +import shutil + +################################################################# +# # +# settings # +# # +################################################################# # PLEASE UPDATE THESE SETTINGS db_host = "localhost" # your host, usually localhost @@ -34,34 +42,30 @@ db_tables = {"glasnost": "glasnost", "ndt": "ndt"} # a mapping from testname to tablename db_filetable = 'files' -# Read command line options -def usage(): - print "Usage: mlab_mysql_import.py mlab_file.csv" - print "Recursive import can be realised by running:" - print "find . -iname '*.tgz.csv' -exec ./mlab_mysql_import.py {} \;" - sys.exit(1) +# directories +baseDir = '/DATA/mlab/' +scratchDir = baseDir + 'scratch/' +workDir = baseDir + 'work/' +archiveDir = baseDir + 'archive/' +errorDir = baseDir + 'error/' +logDir = baseDir + 'logs/' +cleanDir = baseDir + 'clean/' +#files +errorLog = "error.log" +processLog = "processed_files.log" ################################################################# # # -# start of initialisation # +# functions # # # ################################################################# -parser = OptionParser() -parser.add_option("-q", "--quiet", action="store_false", dest="verbose", default=False, help="don't print status messages to stdout") -(options, args) = parser.parse_args() -if len(args) == 0: - usage() - -# We might want to iterate over ALL filenames! -filename = args[0] -try: - f = open(filename, 'r') -except IOError as e: - print 'Could not open file ', filename -# Extract the basename of the filename, as the path is not of interest after this point -filename = os.path.basename(filename) +def usage(): + print "Usage: mlab_mysql_import.py mlab_file.csv" + print "Recursive import can be realised by running:" + print "find . -iname '*.tgz.csv' -exec ./mlab_mysql_import.py {} \;" + sys.exit(1) def extract_destination(filename): ''' This routine extracts the destination server of the mlab file. @@ -69,8 +73,8 @@ def extract_destination(filename): # Split the filename and perform some tests if it conforms to our standard f_split = filename.split('-') if len(f_split) < 3: - print "The specified filename (", filename, ") should contain at least two '-' characters that delimit the data, destination and the suffix." - sys.exit(1) + raise Exception("The specified filename (", filename, ") should contain at least two '-' characters that delimit the data, destination and the suffix.") + if '.tgz.csv' not in f_split[-1]: print "The specified filename (", filename, ") should end with '.tgz.csv'." @@ -81,13 +85,11 @@ def extract_datetime(string): # Extract the date date_match = re.search(r'\d{4}/\d{2}/\d{2}', string) if not date_match: - print 'Error im import: line "', string, '" does not contain a valid date.' - sys.exit(1) + raise Exception('Error im import: line "', string, '" does not contain a valid date.') # Extract the time time_match = re.search(r'\d{2}:\d{2}:\d{2}', string) if not time_match: - print 'Error im import: line "', string, '" does not contain a valid time.' - sys.exit(1) + raise Exception('Error im import: line "', string, '" does not contain a valid time.') try: return dparser.parse(date_match.group(0) + ' ' + time_match.group(0), fuzzy=True) @@ -99,8 +101,7 @@ def extract_ip(string): # Extract the date match = re.search(r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}', string) if not match: - print 'Error im import: line "', string, '" does not contain a valid ip address.' - sys.exit(1) + raise Exception ('Error im import: line "', string, '" does not contain a valid ip address.') return match.group(0) def exists_dbentry(cur, file_id, db_table, test_datetime, destination, source_ip): @@ -144,57 +145,135 @@ def get_file_id(cur, filename): return get_file_id(cur, filename) return id[0] +# returns True on error, False on correct processing +def process_file(f, filename): + start_time = datetime.now() + failure = True + try: + # Connect to the mysql database + db = MySQLdb.connect(host = db_host, + user = db_user, + passwd = db_passwd, + db = db_name) + cur = db.cursor() + + # Find the destination server by investigating the filename + destination = extract_destination(filename) + print 'Destination: ', destination, + + # Get the filename id from the files table + file_id = get_file_id(cur, filename) + db.commit() + + # Find the testsuite by investigating the filename + try: + test = [test for test in db_tables.keys() if test in filename][0] + except IndexError: + sys.stderr.write('The filename ' + filename + ' does not contain a valid testname.') + return 1 + # print "Found test suite " + test + + # The filetest ALONE, takes 3 seconds with a 9 million records database, without indexes + filetest=False + # Read the file line by line and import it into the database + for line in f: + line = line.strip() + source_ip = extract_ip(line) + test_datetime = extract_datetime(line) + if (filetest): + if (exists_dbentry(cur, file_id, db_tables[test], test_datetime, destination, source_ip)): + # this file has already been read: ABORT WITH ERROR + raise Exception('File entry already exist in db; the file has already been read: ' + filename) + filetest=False + blunt_insert_dbentry(cur, file_id, db_tables[test], test_datetime, destination, source_ip) + end_time = datetime.now() + print 'File done in ' + str(end_time - start_time) + failure = False + except Exception as inst: + sys.stderr.write('Exception: '+str(inst.args) + '\n') + with open(logDir + errorLog, 'a') as f: + f.write(pathname + '\n') + f.write('Exception: '+str(inst.args) + '\n') + print + except IOError as e: + sys.stderr.write('Error handling file ' + filename + ' (' + str(e.args) + ')\n') + with open(logDir + errorLog, 'a') as f: + f.write(pathname + '\n') + f.write('Error handling file ' + filename + ' (' + str(e.args) + ')\n') + print +# except: +# sys.stderr.write('Process error ' + '\n') + finally: + # Commit and finish up + sys.stderr.flush() + # db.commit() + # disconnect from server + db.close() + + return failure + +def extract_archive_date(filename): + m = re.match('^(\d{4})(\d{2})(\d{2})', filename) + return (m.group(1),m.group(2)) + +# test if archive directory exist, and create it if necessary +def create_archive_dir(ym): + if (not os.path.exists(ym)): + os.makedirs(ym) + return ym + +def move_archive(pathname): + fname = os.path.basename(pathname) + (year,month) = extract_archive_date(fname) + aDir = create_archive_dir(archiveDir + year +'/'+ month) + shutil.move(pathname,aDir) + with open(logDir + processLog, 'a') as f: + f.write(pathname + '\n') + + + + ################################################################# # # -# start of main program # +# start of initialisation # +# Read command line options # # # ################################################################# -start_time = datetime.now() -# Connect to the mysql database -db = MySQLdb.connect(host = db_host, - user = db_user, - passwd = db_passwd, - db = db_name) -cur = db.cursor() - -# Find the destination server by investigating the filename -destination = extract_destination(filename) -print 'Found destination: ', destination - -# Get the filename id from the files table -file_id = get_file_id(cur, filename) -db.commit() - -# Find the testsuite by investigating the filename -try: - test = [test for test in db_tables.keys() if test in filename][0] -except IndexError: - print 'The filename ' + filename + ' does not contain a valid testname.' - sys.exit(1) -print "Found test suite " + test - -# Read the file line by line and import it into the database -filetest=True -for line in f: - line = line.strip() - source_ip = extract_ip(line) - test_datetime = extract_datetime(line) - if (filetest): - if (exists_dbentry(cur, file_id, db_tables[test], test_datetime, destination, source_ip)): - # this file has already been read: ABORT WITH ERROR - sys.stderr.write('The file has already been read: ' + filename) - sys.stderr.flush() - sys.exit('file entry already exist in db') - filetest=False - blunt_insert_dbentry(cur, file_id, db_tables[test], test_datetime, destination, source_ip) +parser = OptionParser() +parser.add_option("-q", "--quiet", action="store_false", dest="verbose", default=False, help="don't print status messages to stdout") +(options, args) = parser.parse_args() +if len(args) == 0: + usage() + +# create file if necessary, as open by itself doesn't cut it +f = open(logDir + processLog, 'a') +f.write("\nNew batchjob on " + str(datetime.now())) +f.close + + +################################################################# +# # +# start of main program # +# # +################################################################# +global_start_time = datetime.now() -# Commit and finish up -print 'Writing changes to database' -db.commit() +# Iterate over ALL filenames +for pathname in args: + try: + with open(pathname, 'r') as f: + # Extract the basename of the filename, as the path is not of interest after this point + filename = os.path.basename(pathname) + print "processing file " + filename, + if (process_file(f, filename)): + shutil.move(pathname,errorDir) + else: + move_archive(pathname) + # file is automatically closed if needed + except IOError as e: + print 'Could not open file ' + pathname + '\nError: ' + str(e.args) -# disconnect from server -db.close() -end_time = datetime.now() +global_end_time = datetime.now() -print 'Done in ' + str(end_time - start_time) +print '=====================================\nAll Done. ' + str(len(args)) + ' file(s) in ' + str(global_end_time - global_start_time) From 5f53ad19b32311cf4f6ebc36cba7928f4fe39371 Mon Sep 17 00:00:00 2001 From: Axel Roest Date: Fri, 29 Jun 2012 22:17:33 +0200 Subject: [PATCH 03/12] Added de-duplication of the input data, so the database does not get corrupted with multiple identical values --- mlab_mysql_import.py | 35 +++++++++++++++++++++++------------ 1 file changed, 23 insertions(+), 12 deletions(-) diff --git a/mlab_mysql_import.py b/mlab_mysql_import.py index e1c3780..e062e3a 100755 --- a/mlab_mysql_import.py +++ b/mlab_mysql_import.py @@ -7,17 +7,17 @@ # Version history # 2012xxxx SF first version # 20120628 AX removed testing for every line, added timing code, -# 20120629 AX added loop over all arguments, exception handling, restructured code +# 20120629 AX added loop over all arguments, exception handling, restructured code, moved processed files to archive or error folder # # test: # cd /DATA # python scripts/mlab/mlab_mysql_import2.py mlab/clean/glasnost/20090128T000000Z-batch-batch-glasnost-0002.tgz.csv # -# ToDO: loop over all arguments in sys.argv[0] -# deduplication toevoegen (put in hash, test on hash, clear hash for each file, but keep last entry -# move files naar done directory -# move error files naar error directory -# +# ToDO: v loop over all arguments in sys.argv[0] +# v deduplication toevoegen (put in hash, test on hash, clear hash for each file, but keep last entry +# v move files naar archive directory +# v move error files naar error directory +# v log process and errors import sys import re @@ -62,9 +62,7 @@ ################################################################# def usage(): - print "Usage: mlab_mysql_import.py mlab_file.csv" - print "Recursive import can be realised by running:" - print "find . -iname '*.tgz.csv' -exec ./mlab_mysql_import.py {} \;" + print "Usage: mlab_mysql_import3.py mlab_file1.csv [mlab_files.csv ...]" sys.exit(1) def extract_destination(filename): @@ -145,6 +143,14 @@ def get_file_id(cur, filename): return get_file_id(cur, filename) return id[0] +def dedup(file_id, table, test_datetime, destination, source_ip): + key = str(file_id) + table + str(test_datetime) + destination + source_ip + if key in deduplookup: + return False + else: + deduplookup[key] = True + return True + # returns True on error, False on correct processing def process_file(f, filename): start_time = datetime.now() @@ -185,7 +191,9 @@ def process_file(f, filename): # this file has already been read: ABORT WITH ERROR raise Exception('File entry already exist in db; the file has already been read: ' + filename) filetest=False - blunt_insert_dbentry(cur, file_id, db_tables[test], test_datetime, destination, source_ip) + # test if we have already done it in this or last filetest + if (dedup(file_id, db_tables[test], test_datetime, destination, source_ip)): + blunt_insert_dbentry(cur, file_id, db_tables[test], test_datetime, destination, source_ip) end_time = datetime.now() print 'File done in ' + str(end_time - start_time) failure = False @@ -231,8 +239,6 @@ def move_archive(pathname): f.write(pathname + '\n') - - ################################################################# # # # start of initialisation # @@ -251,6 +257,11 @@ def move_archive(pathname): f.write("\nNew batchjob on " + str(datetime.now())) f.close +# deduplookup is a hash we use for de-duplication of input lines +# maybe it is necessary to purge parts of it during the duration of the import +# but then we have to carefully monitor tests that appear in multiple files +# OR store the last test in a separate global (dirty? yeah, I know) +deduplookup = {} ################################################################# # # From cfae83386c42d4fdc9fe5fae1bbb18a1bd333969 Mon Sep 17 00:00:00 2001 From: Axel Roest Date: Fri, 29 Jun 2012 22:58:48 +0200 Subject: [PATCH 04/12] Added filetests again --- mlab_mysql_import.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/mlab_mysql_import.py b/mlab_mysql_import.py index e062e3a..52fdf5d 100755 --- a/mlab_mysql_import.py +++ b/mlab_mysql_import.py @@ -180,7 +180,8 @@ def process_file(f, filename): # print "Found test suite " + test # The filetest ALONE, takes 3 seconds with a 9 million records database, without indexes - filetest=False + # But falls back to less than half a second when indexing is turned on on the db + filetest=True # Read the file line by line and import it into the database for line in f: line = line.strip() From 992b94df7ec852ba97e229540f09a4d3585d30b9 Mon Sep 17 00:00:00 2001 From: Axel Roest Date: Sat, 30 Jun 2012 00:52:22 +0200 Subject: [PATCH 05/12] Added first version of mlab_maxmind_processed --- mlab_maxmind_processed.py | 214 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 214 insertions(+) create mode 100755 mlab_maxmind_processed.py diff --git a/mlab_maxmind_processed.py b/mlab_maxmind_processed.py new file mode 100755 index 0000000..cf350d1 --- /dev/null +++ b/mlab_maxmind_processed.py @@ -0,0 +1,214 @@ +#!/usr/bin/python +# +# mlab_maxmind_processed.py +# +# Initials: +# RB Ruben Bloemgarten +# AX Axel Roest +# +# Version history +# 20120629 AX first version, doesn't work yet +# +# test: +# cd /DATA +# python scripts/mlab/mlab_mysql_import2.py mlab/clean/glasnost/20090128T000000Z-batch-batch-glasnost-0002.tgz.csv +# +# ToDO: v loop over all arguments in sys.argv[0] +# v deduplication toevoegen (put in hash, test on hash, clear hash for each file, but keep last entry +# v move files naar archive directory +# v move error files naar error directory +# v log process and errors + +import sys +import re +import os +from optparse import OptionParser +from datetime import datetime +import dateutil.parser as dparser +from dateutil.relativedelta import relativedelta +import MySQLdb + +################################################################# +# # +# settings # +# # +################################################################# + +# PLEASE UPDATE THESE SETTINGS +db_host = "localhost" # your host, usually localhost +db_user = "root" # your username +db_passwd = "rootpassword" # your password +db_name = "mlab" # name of the database +db_tables = {"glasnost": "glasnost", "ndt": "ndt"} # a mapping from testname to tablename +db_filetable = 'files' + +# directories +baseDir = '/DATA/mlab/' +scratchDir = baseDir + 'scratch/' +workDir = baseDir + 'work/' +archiveDir = baseDir + 'archive/' +errorDir = baseDir + 'error/' +logDir = baseDir + 'logs/' +cleanDir = baseDir + 'clean/' + +#files +errorLog = "error.log" +processLog = "mlab_maxmind_processed.log" + +################################################################# +# # +# functions # +# # +################################################################# + +def usage(): + print "Usage: mlab_maxmind_processed.py maxmind_table" + sys.exit(1) + +# Blocks_GeoLiteCity_20090601 +def extract_datestring(string): + ''' Returns the datetime contained in string ''' + # Extract the date + date_match = re.match('Blocks_GeoLiteCity_(\d{4}\d{2}\d{2})$', string) + if not date_match: + raise Exception('Error in argument "', string, '" does not contain a valid date.') + return date_match.group(1) + +def extract_date(string): + ''' Returns the datetime contained in string ''' + # Extract the date + date_match = re.match('.*(\d{4})(\d{2})(\d{2})$', string) + if not date_match: + raise Exception('Error in argument "', string, '" does not contain a valid date.') + date = datetime(int(date_match.group(1)),int(date_match.group(2)),int(date_match.group(3))) + return date + +def exists_dbentry(cur, file_id, db_table, test_datetime, destination, source_ip): + ''' Test if the entry already exists in the database ''' + # Check if the entry exists already + sql = "SELECT COUNT(*) FROM " + db_table + " WHERE date = '" + test_datetime.isoformat() + "' AND destination = '" + destination + "' AND source = '" + source_ip + "' AND file_id = " + str(file_id) + cur.execute(sql) + + if cur.fetchone()[0] < 1: + return False + else: + return True + +def blunt_insert_dbentry(cur, file_id, db_table, test_datetime, destination, source_ip): + ''' Insert a connection to the database without testing ''' + columns = ', '.join(['date', 'destination', 'source', 'file_id']) + values = '"' + '", "'.join([test_datetime.isoformat(), destination, source_ip, str(file_id)]) + '"' + sql = "INSERT INTO " + db_table + " (" + columns + ") VALUES(" + values + ") " + cur.execute(sql) + +def insert_dbentry(cur, file_id, db_table, test_datetime, destination, source_ip): + ''' Insert a test connection to the database, if it not already exists ''' + # Check if the entry exists already + sql = "SELECT COUNT(*) FROM " + db_table + " WHERE date = '" + test_datetime.isoformat() + "' AND destination = '" + destination + "' AND source = '" + source_ip + "' AND file_id = " + str(file_id) + cur.execute(sql) + + # If not, then isert it + if cur.fetchone()[0] < 1: + print 'Found new test performed on the', test_datetime, 'from ' + destination + ' -> ' + source_ip + '.' + blunt_insert_dbentry(cur, file_id, db_table, test_datetime, destination, source_ip) + +# return True if the table exists in the database +def check_maxmind_exist(cur, table): + sql = "select * FROM maxmind.`" + table + "` LIMIT 1" + cur.execute(sql) + if cur.fetchone()[0] < 1: + return False + else: + return True + +def get_maxmind_dates(cur): + datehash = {} + sql = "SHOW TABLES FROM `maxmind`" + cur.execute(sql) + rows = cur.fetchall() + rows2 = [] + # filter rows + for item in rows: + m = re.search('Blocks_GeoLiteCity_(\d+)$', item[0]) + if (m): + rows2.append(m.group(0)) + + # print rows2 + + skipfirst = True + for table in rows2: + date = extract_datestring(table) + if (skipfirst): + skipfirst = False + else: + datehash[olddate] = date + olddate = date + # last one is 6 months in the future + lastdate = extract_date(date) + print 'date=' + date + " = " + str(lastdate) + # futuredate = lastdate + datetime.timedelta(365 * 6/12) + futuredate = lastdate + relativedelta(months = +6) + datehash[olddate] = futuredate.strftime('%Y%m%d') + return datehash + +def update_mlab_glasnost(cur,table): + start_datum = extract_datestring(table) + end_datum = maxmind_dates[start_datum] + print 'updating between' + start_datum + ' AND ' + end_datum + try: + sql = 'UPDATE mlab.glasnost SET locID = M.`locId` FROM mlab.glasnost L , maxmind.' + table + ' M WHERE L.`source` BETWEEN M.`startnumip` AND M.`endnumip` AND L.`date` BETWEEN "' + start_datum + '" AND "' + end_datum + '" AND L.`locId` = 0' + print sql + cur.execute(sql) + except MySQLdb.Error, e: + print "An error has been passed. %s" %e + +################################################################# +# # +# start of initialisation # +# Read command line options # +# # +################################################################# + +parser = OptionParser() +parser.add_option("-q", "--quiet", action="store_false", dest="verbose", default=False, help="don't print status messages to stdout") +(options, args) = parser.parse_args() +if len(args) == 0: + usage() + +# create file if necessary, as open by itself doesn't cut it +f = open(logDir + processLog, 'a') +f.write("\nNew mlab_maxmind_processed job on " + str(datetime.now())) +f.close + +################################################################# +# # +# start of main program # +# # +################################################################# +global_start_time = datetime.now() + +try: + # Connect to the mysql database + db = MySQLdb.connect(host = db_host, + user = db_user, + passwd = db_passwd, + db = db_name) + cur = db.cursor() + +except: + sys.stderr.write('Error, cannot connect to database' + db_name + '\n') + +# contains hash with key = start_date, value = enddate (= startdate of next table, except for the last one) +maxmind_dates = get_maxmind_dates(cur) +print maxmind_dates + +# sys.exit(1) +# Iterate over ALL filenames +for table in args: + if (check_maxmind_exist(cur,table)): + update_mlab_glasnost(cur,table) + +cur.close() +global_end_time = datetime.now() + +print '=====================================\nAll Done. ' + str(len(args)) + ' file(s) in ' + str(global_end_time - global_start_time) From a8e6df5fcf965a235a8ce7d450efcf3ee52a665f Mon Sep 17 00:00:00 2001 From: Axel Roest Date: Sun, 1 Jul 2012 13:03:40 +0200 Subject: [PATCH 06/12] Cleaned up code in mlab_maxmind_processed.py, fixed locId bug. --- mlab_maxmind_processed.py | 60 +++++++++++---------------------------- 1 file changed, 16 insertions(+), 44 deletions(-) diff --git a/mlab_maxmind_processed.py b/mlab_maxmind_processed.py index cf350d1..4b40a03 100755 --- a/mlab_maxmind_processed.py +++ b/mlab_maxmind_processed.py @@ -7,7 +7,8 @@ # AX Axel Roest # # Version history -# 20120629 AX first version, doesn't work yet +# 20120629 AX first version +# 20120701 AX cleanup of unused methods, Ruben fixed the locID bug # # test: # cd /DATA @@ -18,6 +19,7 @@ # v move files naar archive directory # v move error files naar error directory # v log process and errors +# todo: loop through all maxmind tables and update full mlab set import sys import re @@ -44,12 +46,7 @@ # directories baseDir = '/DATA/mlab/' -scratchDir = baseDir + 'scratch/' -workDir = baseDir + 'work/' -archiveDir = baseDir + 'archive/' -errorDir = baseDir + 'error/' logDir = baseDir + 'logs/' -cleanDir = baseDir + 'clean/' #files errorLog = "error.log" @@ -83,35 +80,6 @@ def extract_date(string): date = datetime(int(date_match.group(1)),int(date_match.group(2)),int(date_match.group(3))) return date -def exists_dbentry(cur, file_id, db_table, test_datetime, destination, source_ip): - ''' Test if the entry already exists in the database ''' - # Check if the entry exists already - sql = "SELECT COUNT(*) FROM " + db_table + " WHERE date = '" + test_datetime.isoformat() + "' AND destination = '" + destination + "' AND source = '" + source_ip + "' AND file_id = " + str(file_id) - cur.execute(sql) - - if cur.fetchone()[0] < 1: - return False - else: - return True - -def blunt_insert_dbentry(cur, file_id, db_table, test_datetime, destination, source_ip): - ''' Insert a connection to the database without testing ''' - columns = ', '.join(['date', 'destination', 'source', 'file_id']) - values = '"' + '", "'.join([test_datetime.isoformat(), destination, source_ip, str(file_id)]) + '"' - sql = "INSERT INTO " + db_table + " (" + columns + ") VALUES(" + values + ") " - cur.execute(sql) - -def insert_dbentry(cur, file_id, db_table, test_datetime, destination, source_ip): - ''' Insert a test connection to the database, if it not already exists ''' - # Check if the entry exists already - sql = "SELECT COUNT(*) FROM " + db_table + " WHERE date = '" + test_datetime.isoformat() + "' AND destination = '" + destination + "' AND source = '" + source_ip + "' AND file_id = " + str(file_id) - cur.execute(sql) - - # If not, then isert it - if cur.fetchone()[0] < 1: - print 'Found new test performed on the', test_datetime, 'from ' + destination + ' -> ' + source_ip + '.' - blunt_insert_dbentry(cur, file_id, db_table, test_datetime, destination, source_ip) - # return True if the table exists in the database def check_maxmind_exist(cur, table): sql = "select * FROM maxmind.`" + table + "` LIMIT 1" @@ -121,22 +89,25 @@ def check_maxmind_exist(cur, table): else: return True -def get_maxmind_dates(cur): - datehash = {} +def get_maxmind_tableset(cur): sql = "SHOW TABLES FROM `maxmind`" cur.execute(sql) - rows = cur.fetchall() - rows2 = [] + allrows = cur.fetchall() + rows = [] # filter rows - for item in rows: + for item in allrows: m = re.search('Blocks_GeoLiteCity_(\d+)$', item[0]) if (m): - rows2.append(m.group(0)) + rows.append(m.group(0)) + print rows + return rows - # print rows2 +def get_maxmind_dates(cur): + datehash = {} + rows = get_maxmind_tableset(cur) skipfirst = True - for table in rows2: + for table in rows: date = extract_datestring(table) if (skipfirst): skipfirst = False @@ -156,7 +127,7 @@ def update_mlab_glasnost(cur,table): end_datum = maxmind_dates[start_datum] print 'updating between' + start_datum + ' AND ' + end_datum try: - sql = 'UPDATE mlab.glasnost SET locID = M.`locId` FROM mlab.glasnost L , maxmind.' + table + ' M WHERE L.`source` BETWEEN M.`startnumip` AND M.`endnumip` AND L.`date` BETWEEN "' + start_datum + '" AND "' + end_datum + '" AND L.`locId` = 0' + sql = 'UPDATE mlab.glasnost SET locId = M.`locId` FROM mlab.glasnost L , maxmind.' + table + ' M WHERE L.`source` BETWEEN M.`startnumip` AND M.`endnumip` AND L.`date` BETWEEN "' + start_datum + '" AND "' + end_datum + '" AND L.`locId` = 0' print sql cur.execute(sql) except MySQLdb.Error, e: @@ -199,6 +170,7 @@ def update_mlab_glasnost(cur,table): sys.stderr.write('Error, cannot connect to database' + db_name + '\n') # contains hash with key = start_date, value = enddate (= startdate of next table, except for the last one) +# maxmind_all_tables = get_maxmind_tableset(cur) maxmind_dates = get_maxmind_dates(cur) print maxmind_dates From ceea05c3cf7d11e7ec9cbe8098278c8983e4aefd Mon Sep 17 00:00:00 2001 From: Axel Roest Date: Fri, 6 Jul 2012 18:24:43 +0200 Subject: [PATCH 07/12] Added new scripts mlab_glasnost_update_all_locid.py and mlab_glasnost_update_locid.py from Pascal --- mlab_glasnost_update_all_locid.py | 180 ++++++++++++++++++++++++++++++ mlab_glasnost_update_locid.py | 179 +++++++++++++++++++++++++++++ 2 files changed, 359 insertions(+) create mode 100644 mlab_glasnost_update_all_locid.py create mode 100644 mlab_glasnost_update_locid.py diff --git a/mlab_glasnost_update_all_locid.py b/mlab_glasnost_update_all_locid.py new file mode 100644 index 0000000..cea8d82 --- /dev/null +++ b/mlab_glasnost_update_all_locid.py @@ -0,0 +1,180 @@ +#!/usr/bin/python +# +# mlab_glasnost_update_all_locid.py +# +# Initials: +# AX Axel Roest +# +# Version history +# 20120706 AX first version +# +# test: + + +import sys +import re +import os +from optparse import OptionParser +from datetime import datetime +import dateutil.parser as dparser +from dateutil.relativedelta import relativedelta +from subprocess import call +import MySQLdb + +################################################################# +# # +# settings # +# # +################################################################# + +# PLEASE UPDATE THESE SETTINGS +db_host = "localhost" # your host, usually localhost +db_user = "root" # your username +db_passwd = "rootpassword" # your password +db_name = "mlab" # name of the database +db_tables = {"glasnost": "glasnost", "ndt": "ndt"} # a mapping from testname to tablename +db_filetable = 'files' + +# directories +baseDir = '/DATA/mlab/' +logDir = baseDir + 'logs/' + +#files +updatescript= "mlab_glasnost_update_locid.py" +errorLog = "error.log" +processLog = "mlab_maxmind_processed.log" + + +################################################################# +# # +# functions # +# # +################################################################# + +# Blocks_GeoLiteCity_20090601 +def extract_datestring(string): + ''' Returns the datetime contained in string ''' + # Extract the date + date_match = re.match('Blocks_GeoLiteCity_(\d{4}\d{2}\d{2})$', string) + if not date_match: + raise Exception('Error in argument "', string, '" does not contain a valid date.') + fulldate = date_match.group(1) + "000000" + return fulldate + +def extract_date(string): + ''' Returns the datetime contained in string ''' + # Extract the date + date_match = re.match('.*(\d{4})(\d{2})(\d{2})000000$', string) + if not date_match: + raise Exception('Error in argument "', string, '" does not contain a valid date.') + date = datetime(int(date_match.group(1)),int(date_match.group(2)),int(date_match.group(3))) + return date + +# return True if the table exists in the database +def check_maxmind_exist(cur, table): + sql = "select * FROM maxmind.`" + table + "` LIMIT 1" + cur.execute(sql) + if cur.fetchone()[0] < 1: + return False + else: + return True + +# return a list of Blocks_GeoLiteCity_ tables, for looking up the locIds +def get_maxmind_tableset(cur): + sql = "SHOW TABLES FROM `maxmind`" + cur.execute(sql) + allrows = cur.fetchall() + rows = [] + # filter rows + for item in allrows: + m = re.search('Blocks_GeoLiteCity_(\d+)$', item[0]) + if (m): + rows.append(m.group(0)) + return rows + +def get_maxmind_dates(rows): + datehash = {} + # we skip storing the first entry, as we need the date of the second entry first to store the range + skipfirst = True + for table in rows: + date = extract_datestring(table) + if (skipfirst): + skipfirst = False + else: + datehash[olddate] = date + olddate = date + # the skipping is set straight by storing the last entry outside of the loop + # last one is 6 months in the future + lastdate = extract_date(date) + futuredate = lastdate + relativedelta(months = +6) + datehash[olddate] = futuredate.strftime('%Y%m%d') + '000000' + return datehash + +def update_mlab_glasnost(cur,table): + start_datum = extract_datestring(table) + end_datum = maxmind_dates[start_datum] + print 'updating `' + table + '` between ' + start_datum + ' - ' + end_datum + try: + # print ["/usr/bin/python", updatescript, table, "glasnost", start_datum, end_datum ] + call(["/usr/bin/python", updatescript, table, db_tables['glasnost'], start_datum, end_datum ]) + except Exception as e: + print "An error has occured: " +str(e) + + +################################################################# +# # +# start of initialisation # +# Read command line options # +# # +################################################################# + +parser = OptionParser() +parser.add_option("-q", "--quiet", action="store_false", dest="verbose", default=False, help="don't print status messages to stdout") +(options, args) = parser.parse_args() +# check for -h argument here + +# create file if necessary, as open by itself doesn't cut it +f = open(logDir + processLog, 'a') +f.write("\nNew mlab_glasnost_update_all job on " + str(datetime.now())) +f.close + + +################################################################# +# # +# start of main program # +# # +################################################################# +global_start_time = datetime.now() + +try: + # Connect to the mysql database + db = MySQLdb.connect(host = db_host, + user = db_user, + passwd = db_passwd, + db = db_name) + cur = db.cursor() + +except: + sys.stderr.write('Error, cannot connect to database' + db_name + '\n') + +# array with tables to loop over, in case we don't get a table argument +maxmind_all_tables = get_maxmind_tableset(cur) + +# contains hash with key = start_date, value = enddate (= startdate of next table, except for the last one) +maxmind_dates = get_maxmind_dates(maxmind_all_tables) + +if len(args) == 0: + print "Iterating over ALL maxmind tables" + for table in maxmind_all_tables: + if (check_maxmind_exist(cur,table)): + update_mlab_glasnost(cur,table) +else: + print "Iterating over all arguments" + for table in args: + if (check_maxmind_exist(cur,table)): + update_mlab_glasnost(cur,table) + +cur.close() +global_end_time = datetime.now() + +print '=====================================\nAll Glasnost updates Done. ' + str(len(args)) + ' file(s) in ' + str(global_end_time - global_start_time) diff --git a/mlab_glasnost_update_locid.py b/mlab_glasnost_update_locid.py new file mode 100644 index 0000000..b500f28 --- /dev/null +++ b/mlab_glasnost_update_locid.py @@ -0,0 +1,179 @@ +#!/usr/bin/python +# +# mlab_glasnost_update_locid.py +# +# Initials: +# PH Pascal Haakmat +# AR Axel Roest +# +# Version history +# 20120706 PH first version based on mlab_maxmind_processed.py +# 20120707 AR tested on glasnost_test database, update query changed, added location fields update +# +# Note: make sure the glasnost table has separate indexes on longip +# and date columns, i.e.: +# KEY `longip` (`longip`), +# KEY `date` (`date`) +# as part of the schema definition. Do not index locId - this will +# slow things down because MySQL needs to maintain the index for the field +# as we update it. +# +# Example invocation: +# $ python mlab_glasnost_update_locid.py Blocks_GeoLiteCity glasnost 19000101 30000101 +# 2012-07-06 04:28:08 Starting geoIP lookup for date range 19000101-30000101 +# 2012-07-06 04:28:08 Total maxmind rows: 3786204 +# 2012-07-06 04:28:08 Processing in chunks of 100000 records (38 chunk(s)) +# 2012-07-06 04:28:08 Processing chunk #1 (0-99999) +# ... +# 2012-07-06 04:39:58 Processing chunk #38 (3700000-3786203) +# 2012-07-06 04:40:16 Total glasnost changes: 935005 +# 2012-07-06 04:40:16 Finished in 0:12:07.507977 + +import sys +import re +import os +import math +from datetime import datetime +import dateutil.parser as dparser +from dateutil.relativedelta import relativedelta +import MySQLdb + +################################################################# +# # +# settings # +# # +################################################################# + +# Defaults +maxmind_db_host = "localhost" # your host, usually localhost +maxmind_db_user = "dbuser" # your username +maxmind_db_passwd = "password" # your password +maxmind_db_name = "chokepoint_mlab" # name of the database +maxmind_table_name = 'Blocks_GeoLiteCity' + +glasnost_db_host = maxmind_db_host +glasnost_db_user = maxmind_db_user +glasnost_db_passwd = maxmind_db_passwd +glasnost_db_name = "mlab" +glasnost_table_name = 'glasnost_test' + + +################################################################# +# # +# the meat # +# # +################################################################# + +def log(str): + print datetime.now().strftime('%Y-%m-%d %H:%M:%S') + " " + str + sys.stdout.flush() + +def update_glasnost(glasnost_cursor,glasnost_table_name,start_date,end_date,start_ip,end_ip,loc_id,maxmind_table_name): + updated = 0 + try: +# sql = """UPDATE `{0}` SET `locId`={1} WHERE `locId`=0 AND `longip` BETWEEN {2} AND {3} AND `date` BETWEEN {4} AND {5}""".format(glasnost_table_name, loc_id, start_ip, end_ip, start_date, end_date) + sql = """UPDATE `{0}` SET `locId`={1} , `maxmind_table_name` = '{2}' WHERE `longip` BETWEEN {3} AND {4} AND `date` BETWEEN {5} AND {6}""".format(glasnost_table_name, loc_id, maxmind_table_name, start_ip, end_ip, start_date, end_date) + glasnost_cursor.execute(sql) + updated = glasnost_cursor.rowcount + except MySQLdb.Error, e: + log("Error updating glasnost: {0}".format(e)) + return updated + +def update_glasnost_with_location(glasnost_cursor,glasnost_table_name,location_table_name): + updated = 0 + print glasnost_table_name + " - with - " + maxmind_table_name + try: + sql = """UPDATE mlab.`{0}` L, maxmind.`{1}` M SET L.country_code=M.country, L.region=M.region, L.city=M.city, L.postalCode=M.postalCode, L.latitude=M.latitude, L.longitude=M.longitude, L.metroCode=M.metroCode, L.areaCode=M.areaCode WHERE L.`locId` = M.`locId`""".format(glasnost_table_name, location_table_name) + glasnost_cursor.execute(sql) + updated = glasnost_cursor.rowcount + except MySQLdb.Error, e: + log("Error updating glasnost: {0}".format(e)) + return updated + +def process(maxmind_cursor,maxmind_table_name,glasnost_cursor,glasnost_table_name,glasnost_start_date,glasnost_end_date): + location_table_name = maxmind_table_name.replace("Blocks", "Location") + sql = """SELECT COUNT(*) FROM `{0}`""".format(maxmind_table_name) + maxmind_cursor.execute(sql) + result = maxmind_cursor.fetchone() + + maxmind_total = result[0] + + log('Total maxmind rows: '+ str(maxmind_total)) + + chunk_size = 100000 + chunk_count = int(math.ceil(float(maxmind_total) / float(chunk_size))) + + log('Processing in chunks of '+ str(chunk_size) + ' records (' + str(chunk_count) + ' chunk(s))') + + glasnost_changes = 0 + chunk_num = 1 + + for offset in range(0, maxmind_total, chunk_size): + count = min(chunk_size, maxmind_total - offset) + + log('Processing chunk #' + str(chunk_num) + ' (' + str(offset) + '-' + str(offset - 1 + count) + ')') + + sql = """SELECT * FROM `{0}` LIMIT {1} OFFSET {2}""".format(maxmind_table_name, count, offset) + maxmind_cursor.execute(sql) + result = maxmind_cursor.fetchall() + + for row in result: + glasnost_changes = glasnost_changes + update_glasnost(glasnost_cursor,glasnost_table_name,glasnost_start_date,glasnost_end_date,row[0],row[1],row[2], location_table_name) + + chunk_num = chunk_num + 1 + + # update the rest of the fields from the maxmind location table + fieldupdates = update_glasnost_with_location(glasnost_cursor,glasnost_table_name,location_table_name) + log('Total glasnost changes: ' + str(glasnost_changes) + ' [location updates: ' + str(fieldupdates) + ']') + + return glasnost_changes + +################################################################# +# # +# start of initialisation # +# Read command line options # +# # +################################################################# + +if(len(sys.argv) != 5): + print 'usage: {0} maxmind_table_name glasnost_table_name start_date end_date'.format(os.path.basename(sys.argv[0])) + print 'start_date and end_date should be of the format YYYYMMDDHHMMSS' + exit(1) + +maxmind_table_name = sys.argv[1] +glasnost_table_name = sys.argv[2] +start_date = sys.argv[3] +end_date = sys.argv[4] + +################################################################# +# # +# start of main program # +# # +################################################################# +global_start_time = datetime.now() + +log('Starting geoIP lookup for date range {0}-{1}'.format(start_date, end_date)) + +try: + # Connect to the mysql database + maxmind_db = MySQLdb.connect(host = maxmind_db_host, + user = maxmind_db_user, + passwd = maxmind_db_passwd, + db = maxmind_db_name) + maxmind_cursor = maxmind_db.cursor() + glasnost_db = MySQLdb.connect(host = glasnost_db_host, + user = glasnost_db_user, + passwd = glasnost_db_passwd, + db = glasnost_db_name) + glasnost_cursor = glasnost_db.cursor() + process(maxmind_cursor,maxmind_table_name,glasnost_cursor,glasnost_table_name,start_date,end_date) +except Exception as e: + log('Aborting due to error: ' + str(e)) + exit(1) +finally: + maxmind_cursor.close() + glasnost_cursor.close() + +global_end_time = datetime.now() + +log('Finished in ' + str(global_end_time - global_start_time)) From 2007af1e1850aa94f3cac2bfa47f615224b9e02e Mon Sep 17 00:00:00 2001 From: Axel Roest Date: Fri, 6 Jul 2012 18:28:05 +0200 Subject: [PATCH 08/12] mlab_maxmind_processed: iterate over all tables, fixed (?) sql update query --- mlab_maxmind_processed.py | 43 ++++++++++++++++++++++----------------- 1 file changed, 24 insertions(+), 19 deletions(-) diff --git a/mlab_maxmind_processed.py b/mlab_maxmind_processed.py index 4b40a03..224fea1 100755 --- a/mlab_maxmind_processed.py +++ b/mlab_maxmind_processed.py @@ -7,8 +7,8 @@ # AX Axel Roest # # Version history -# 20120629 AX first version -# 20120701 AX cleanup of unused methods, Ruben fixed the locID bug +# 20120629 AX first version +# 20120701 AX cleanup of unused methods, Ruben fixed the locID bug, iterate over all maxmind tables if no argument given, fixed sql update query # # test: # cd /DATA @@ -19,7 +19,7 @@ # v move files naar archive directory # v move error files naar error directory # v log process and errors -# todo: loop through all maxmind tables and update full mlab set +# todo: loop through all maxmind tables and update full mlab set import sys import re @@ -89,6 +89,7 @@ def check_maxmind_exist(cur, table): else: return True +# return a list of Blocks_GeoLiteCity_ tables, for looking up the locIds def get_maxmind_tableset(cur): sql = "SHOW TABLES FROM `maxmind`" cur.execute(sql) @@ -99,13 +100,11 @@ def get_maxmind_tableset(cur): m = re.search('Blocks_GeoLiteCity_(\d+)$', item[0]) if (m): rows.append(m.group(0)) - print rows return rows -def get_maxmind_dates(cur): +def get_maxmind_dates(rows): datehash = {} - rows = get_maxmind_tableset(cur) - + # we skip storing the first entry, as we need the date of the second entry first to store the range skipfirst = True for table in rows: date = extract_datestring(table) @@ -114,6 +113,7 @@ def get_maxmind_dates(cur): else: datehash[olddate] = date olddate = date + # the skipping is set straight by storing the last entry outside of the loop # last one is 6 months in the future lastdate = extract_date(date) print 'date=' + date + " = " + str(lastdate) @@ -127,7 +127,7 @@ def update_mlab_glasnost(cur,table): end_datum = maxmind_dates[start_datum] print 'updating between' + start_datum + ' AND ' + end_datum try: - sql = 'UPDATE mlab.glasnost SET locId = M.`locId` FROM mlab.glasnost L , maxmind.' + table + ' M WHERE L.`source` BETWEEN M.`startnumip` AND M.`endnumip` AND L.`date` BETWEEN "' + start_datum + '" AND "' + end_datum + '" AND L.`locId` = 0' + sql = 'UPDATE mlab.glasnost L, maxmind.' + table + ' M SET L.locId = M.locId WHERE L.longip BETWEEN M.startIpNum AND M.endIpNum AND L.date BETWEEN "' + start_datum + '" AND "' + end_datum + '" AND L.locId = 0' print sql cur.execute(sql) except MySQLdb.Error, e: @@ -143,8 +143,7 @@ def update_mlab_glasnost(cur,table): parser = OptionParser() parser.add_option("-q", "--quiet", action="store_false", dest="verbose", default=False, help="don't print status messages to stdout") (options, args) = parser.parse_args() -if len(args) == 0: - usage() +# check for -h argument here # create file if necessary, as open by itself doesn't cut it f = open(logDir + processLog, 'a') @@ -169,16 +168,22 @@ def update_mlab_glasnost(cur,table): except: sys.stderr.write('Error, cannot connect to database' + db_name + '\n') +# array with tables to loop over, in case we don't get a table argument +maxmind_all_tables = get_maxmind_tableset(cur) + # contains hash with key = start_date, value = enddate (= startdate of next table, except for the last one) -# maxmind_all_tables = get_maxmind_tableset(cur) -maxmind_dates = get_maxmind_dates(cur) -print maxmind_dates - -# sys.exit(1) -# Iterate over ALL filenames -for table in args: - if (check_maxmind_exist(cur,table)): - update_mlab_glasnost(cur,table) +maxmind_dates = get_maxmind_dates(maxmind_all_tables) + +if len(args) == 0: + print "Iterating over ALL maxmind tables" + for table in maxmind_all_tables: + if (check_maxmind_exist(cur,table)): + update_mlab_glasnost(cur,table) +else: + print "Iterating over all arguments" + for table in args: + if (check_maxmind_exist(cur,table)): + update_mlab_glasnost(cur,table) cur.close() global_end_time = datetime.now() From 8c329ac01ba6e08a428b3c6c0d776413b1346443 Mon Sep 17 00:00:00 2001 From: Axel Roest Date: Mon, 9 Jul 2012 00:43:03 +0200 Subject: [PATCH 09/12] skip lines in mlab_mysql_import when cputime is in the line, or no ip number can be found. --- argtest.py | 62 ++++++++++++++++++++++++++++++++++++++++++++ mlab_mysql_import.py | 11 +++++++- 2 files changed, 72 insertions(+), 1 deletion(-) create mode 100644 argtest.py diff --git a/argtest.py b/argtest.py new file mode 100644 index 0000000..576f7b1 --- /dev/null +++ b/argtest.py @@ -0,0 +1,62 @@ +#!/usr/bin/python +# argument test + +import sys +import re +import os +import shutil +from optparse import OptionParser + +# Read command line options +def usage(): + print "Usage: mlab_mysql_import.py mlab_file.csv" + print "Recursive import can be realised by running:" + print "find . -iname '*.tgz.csv' -exec ./mlab_mysql_import.py {} \;" + sys.exit(1) + + +################################################################# +# # +# start of main program # +# # +################################################################# + +parser = OptionParser() +parser.add_option("-q", "--quiet", action="store_false", dest="verbose", default=False, help="don't print status messages to stdout") +(options, args) = parser.parse_args() +if len(args) == 0: + usage() + +print "sys.argv[0] = " + sys.argv[0] +print "args[0] = " + args[0] + +# We might want to iterate over ALL filenames! +filename = args[0] + +try: + f = open(filename, 'r') +except IOError as e: + print 'Could not open file ', filename +# Extract the basename of the filename, as the path is not of interest after this point +filename = os.path.basename(filename) + +def extract_archive_date(filename): + m = re.match('^(\d{4})(\d{2})(\d{2})', filename) + return (m.group(1),m.group(2)) + +# test if archive directory exist, and create it if necessary +def create_archive_dir(ym): + if (not os.path.exists(ym)): + os.makedirs(ym) + +fname = "20120212T000000Z-mlab1-ham01-glasnost-0000.tgz.csv" +(year,month) = extract_archive_date(fname) +create_archive_dir(year +'/'+ month) + + +sys.exit(1) +for file in args: + print "filename=" + file + shutil.move(file,"dest") + + diff --git a/mlab_mysql_import.py b/mlab_mysql_import.py index 52fdf5d..32b68c9 100755 --- a/mlab_mysql_import.py +++ b/mlab_mysql_import.py @@ -8,6 +8,7 @@ # 2012xxxx SF first version # 20120628 AX removed testing for every line, added timing code, # 20120629 AX added loop over all arguments, exception handling, restructured code, moved processed files to archive or error folder +# 20120708 AX skip empty ip lines instead or error message # # test: # cd /DATA @@ -18,6 +19,7 @@ # v move files naar archive directory # v move error files naar error directory # v log process and errors +# v skip empty ip lines instead or error message import sys import re @@ -94,12 +96,17 @@ def extract_datetime(string): except ValueError: raise ValueError, 'Error im import: line "' + string + '" does not contain a valid date and time.' +# return with empty string when we encounter cputime, or no ip number def extract_ip(string): + if re.search('cputime', string): + return '' ''' Returns the first valid ip address contained in string ''' # Extract the date match = re.search(r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}', string) if not match: - raise Exception ('Error im import: line "', string, '" does not contain a valid ip address.') + # ignore file + return '' + # raise Exception ('Error im import: line "', string, '" does not contain a valid ip address.') return match.group(0) def exists_dbentry(cur, file_id, db_table, test_datetime, destination, source_ip): @@ -186,6 +193,8 @@ def process_file(f, filename): for line in f: line = line.strip() source_ip = extract_ip(line) + if ('' == source_ip): + continue # skip empty lines instead of error reporting them test_datetime = extract_datetime(line) if (filetest): if (exists_dbentry(cur, file_id, db_tables[test], test_datetime, destination, source_ip)): From eb6716d1574574e7da99840a5d4a9ecb080d15e3 Mon Sep 17 00:00:00 2001 From: Axel Roest Date: Tue, 10 Jul 2012 01:21:30 +0200 Subject: [PATCH 10/12] Cleaned up comments, Ruben added some comments, added todos --- mlab_mysql_import.py | 47 ++++--- mlab_mysql_import4.py | 307 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 338 insertions(+), 16 deletions(-) create mode 100755 mlab_mysql_import4.py diff --git a/mlab_mysql_import.py b/mlab_mysql_import.py index 32b68c9..c286b26 100755 --- a/mlab_mysql_import.py +++ b/mlab_mysql_import.py @@ -9,6 +9,7 @@ # 20120628 AX removed testing for every line, added timing code, # 20120629 AX added loop over all arguments, exception handling, restructured code, moved processed files to archive or error folder # 20120708 AX skip empty ip lines instead or error message +# 20120708 RB cleaning some names and spelling, also we don't want processed_files.log to clobber the downloaders processed_files.log. So we should use overly descriptive names # # test: # cd /DATA @@ -20,6 +21,15 @@ # v move error files naar error directory # v log process and errors # v skip empty ip lines instead or error message +# +# Get the date from the filename, and look up the correct maxmind database +# then, insert the locId directly with the line in the mlab/{glasnost,ndt} database, preventing slow future updates +# on the other hand, all these updates might be extremely slow: TEST +# +# todo : refactor all the utility functions in a separate file +# todo : refactor all the passwords in a separate file (which is NOT in the repo, AND is in the .gitignore list + + import sys import re @@ -39,7 +49,7 @@ # PLEASE UPDATE THESE SETTINGS db_host = "localhost" # your host, usually localhost db_user = "root" # your username -db_passwd = "rootpassword" # your password +db_passwd = "" # your password db_name = "mlab" # name of the database db_tables = {"glasnost": "glasnost", "ndt": "ndt"} # a mapping from testname to tablename db_filetable = 'files' @@ -54,8 +64,8 @@ cleanDir = baseDir + 'clean/' #files -errorLog = "error.log" -processLog = "processed_files.log" +errorLog = "mlab_mysql_import_error.log" +processLog = "mlab_mysql_import_processed_files.log" ################################################################# # # @@ -67,9 +77,10 @@ def usage(): print "Usage: mlab_mysql_import3.py mlab_file1.csv [mlab_files.csv ...]" sys.exit(1) +# This routine extracts the destination server of the mlab file. +# It assumes that the filename has the form like 20100210T000000Z-mlab3-dfw01-ndt-0000.tgz.csv +# def extract_destination(filename): - ''' This routine extracts the destination server of the mlab file. - It assumes that the filename has the form like 20100210T000000Z-mlab3-dfw01-ndt-0000.tgz.csv ''' # Split the filename and perform some tests if it conforms to our standard f_split = filename.split('-') if len(f_split) < 3: @@ -80,37 +91,37 @@ def extract_destination(filename): return '.'.join(filename.split('-')[1:-1]) +# Returns the datetime contained in string. def extract_datetime(string): - ''' Returns the datetime contained in string ''' # Extract the date date_match = re.search(r'\d{4}/\d{2}/\d{2}', string) if not date_match: - raise Exception('Error im import: line "', string, '" does not contain a valid date.') + raise Exception('Error in import: line "', string, '" does not contain a valid date.') # Extract the time time_match = re.search(r'\d{2}:\d{2}:\d{2}', string) if not time_match: - raise Exception('Error im import: line "', string, '" does not contain a valid time.') + raise Exception('Error in import: line "', string, '" does not contain a valid time.') try: return dparser.parse(date_match.group(0) + ' ' + time_match.group(0), fuzzy=True) except ValueError: - raise ValueError, 'Error im import: line "' + string + '" does not contain a valid date and time.' + raise ValueError, 'Error in import: line "' + string + '" does not contain a valid date and time.' +# Returns the first valid ip address contained in string. # return with empty string when we encounter cputime, or no ip number def extract_ip(string): if re.search('cputime', string): return '' - ''' Returns the first valid ip address contained in string ''' # Extract the date match = re.search(r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}', string) if not match: # ignore file return '' - # raise Exception ('Error im import: line "', string, '" does not contain a valid ip address.') + # raise Exception ('Error in import: line "', string, '" does not contain a valid ip address.') return match.group(0) +# Test if the entry already exists in the database def exists_dbentry(cur, file_id, db_table, test_datetime, destination, source_ip): - ''' Test if the entry already exists in the database ''' # Check if the entry exists already sql = "SELECT COUNT(*) FROM " + db_table + " WHERE date = '" + test_datetime.isoformat() + "' AND destination = '" + destination + "' AND source = '" + source_ip + "' AND file_id = " + str(file_id) cur.execute(sql) @@ -120,26 +131,26 @@ def exists_dbentry(cur, file_id, db_table, test_datetime, destination, source_ip else: return True +# Insert a connection to the database without testing. def blunt_insert_dbentry(cur, file_id, db_table, test_datetime, destination, source_ip): - ''' Insert a connection to the database without testing ''' columns = ', '.join(['date', 'destination', 'source', 'file_id']) values = '"' + '", "'.join([test_datetime.isoformat(), destination, source_ip, str(file_id)]) + '"' sql = "INSERT INTO " + db_table + " (" + columns + ") VALUES(" + values + ") " cur.execute(sql) +# Insert a test connection to the database, if it not already exists def insert_dbentry(cur, file_id, db_table, test_datetime, destination, source_ip): - ''' Insert a test connection to the database, if it not already exists ''' # Check if the entry exists already sql = "SELECT COUNT(*) FROM " + db_table + " WHERE date = '" + test_datetime.isoformat() + "' AND destination = '" + destination + "' AND source = '" + source_ip + "' AND file_id = " + str(file_id) cur.execute(sql) - # If not, then isert it + # If not, then insert it if cur.fetchone()[0] < 1: print 'Found new test performed on the', test_datetime, 'from ' + destination + ' -> ' + source_ip + '.' blunt_insert_dbentry(cur, file_id, db_table, test_datetime, destination, source_ip) +# Returns the id of a filename in the filename table. Creates a new row if the filename does not exist. def get_file_id(cur, filename): - ''' Returns the id of a filename in the filename table. Creates a new row if the filename does not exist. ''' sql = "SELECT id FROM " + db_filetable + " WHERE filename ='" + filename + "'" cur.execute(sql) id = cur.fetchone() @@ -150,6 +161,7 @@ def get_file_id(cur, filename): return get_file_id(cur, filename) return id[0] +# do deduplucation of connection strings def dedup(file_id, table, test_datetime, destination, source_ip): key = str(file_id) + table + str(test_datetime) + destination + source_ip if key in deduplookup: @@ -219,6 +231,7 @@ def process_file(f, filename): f.write(pathname + '\n') f.write('Error handling file ' + filename + ' (' + str(e.args) + ')\n') print +# This bit should probably be cleaned up. # except: # sys.stderr.write('Process error ' + '\n') finally: @@ -230,6 +243,7 @@ def process_file(f, filename): return failure +# get the test date from the archive filename def extract_archive_date(filename): m = re.match('^(\d{4})(\d{2})(\d{2})', filename) return (m.group(1),m.group(2)) @@ -240,6 +254,7 @@ def create_archive_dir(ym): os.makedirs(ym) return ym +# move processed file to archive folder def move_archive(pathname): fname = os.path.basename(pathname) (year,month) = extract_archive_date(fname) diff --git a/mlab_mysql_import4.py b/mlab_mysql_import4.py new file mode 100755 index 0000000..37c0dff --- /dev/null +++ b/mlab_mysql_import4.py @@ -0,0 +1,307 @@ +#!/usr/bin/python +# +# Initials: SF Simon Funke +# RB Ruben Bloemgarten +# AX Axel Roest +# +# Version history +# 2012xxxx SF first version +# 20120628 AX removed testing for every line, added timing code, +# 20120629 AX added loop over all arguments, exception handling, restructured code, moved processed files to archive or error folder +# 20120708 AX skip empty ip lines instead or error message +# 20120708 RB cleaning some names and spelling, also we don't want processed_files.log to clobber the downloaders processed_files.log. So we should use overly descriptive names +# +# test: +# cd /DATA +# python scripts/mlab/mlab_mysql_import2.py mlab/clean/glasnost/20090128T000000Z-batch-batch-glasnost-0002.tgz.csv +# +# ToDO: v loop over all arguments in sys.argv[0] +# v deduplication toevoegen (put in hash, test on hash, clear hash for each file, but keep last entry +# v move files naar archive directory +# v move error files naar error directory +# v log process and errors +# v skip empty ip lines instead or error message + +import sys +import re +import os +from optparse import OptionParser +from datetime import datetime +import dateutil.parser as dparser +import MySQLdb +import shutil + +################################################################# +# # +# settings # +# # +################################################################# + +# PLEASE UPDATE THESE SETTINGS +db_host = "localhost" # your host, usually localhost +db_user = "root" # your username +db_passwd = "" # your password +db_name = "mlab" # name of the database +db_tables = {"glasnost": "glasnost", "ndt": "ndt_test"} # a mapping from testname to tablename +db_filetable = 'files' + +# directories +baseDir = '/DATA/mlab/' +#baseDir = '/home/axel/mlab/' +scratchDir = baseDir + 'scratch/' +workDir = baseDir + 'work/' +archiveDir = baseDir + 'archive/' +errorDir = baseDir + 'error/' +logDir = baseDir + 'logs/' +cleanDir = baseDir + 'clean/' + +#files +errorLog = "mlab_mysql_import_error.log" +processLog = "mlab_mysql_import_processed_files.log" + +################################################################# +# # +# functions # +# # +################################################################# + +def usage(): + print "Usage: mlab_mysql_import3.py mlab_file1.csv [mlab_files.csv ...]" + sys.exit(1) + +# This routine extracts the destination server of the mlab file. +# It assumes that the filename has the form like 20100210T000000Z-mlab3-dfw01-ndt-0000.tgz.csv +# +def extract_destination(filename): + # Split the filename and perform some tests if it conforms to our standard + f_split = filename.split('-') + if len(f_split) < 3: + raise Exception("The specified filename (", filename, ") should contain at least two '-' characters that delimit the data, destination and the suffix.") + + if '.tgz.csv' not in f_split[-1]: + print "The specified filename (", filename, ") should end with '.tgz.csv'." + + return '.'.join(filename.split('-')[1:-1]) + +# Returns the datetime contained in string. +def extract_datetime(string): + # Extract the date + date_match = re.search(r'\d{4}/\d{2}/\d{2}', string) + if not date_match: + raise Exception('Error in import: line "', string, '" does not contain a valid date.') + # Extract the time + time_match = re.search(r'\d{2}:\d{2}:\d{2}', string) + if not time_match: + raise Exception('Error in import: line "', string, '" does not contain a valid time.') + + try: + return dparser.parse(date_match.group(0) + ' ' + time_match.group(0), fuzzy=True) + except ValueError: + raise ValueError, 'Error in import: line "' + string + '" does not contain a valid date and time.' + +# Returns the first valid ip address contained in string. +# return with empty string when we encounter cputime, or no ip number +def extract_ip(string): + if re.search('cputime', string): + return '' + # Extract the date + match = re.search(r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}', string) + if not match: + # ignore file + return '' + # raise Exception ('Error in import: line "', string, '" does not contain a valid ip address.') + return match.group(0) + +# Test if the entry already exists in the database +def exists_dbentry(cur, file_id, db_table, test_datetime, destination, source_ip): + # Check if the entry exists already + sql = "SELECT COUNT(*) FROM " + db_table + " WHERE date = '" + test_datetime.isoformat() + "' AND destination = '" + destination + "' AND source = '" + source_ip + "' AND file_id = " + str(file_id) + cur.execute(sql) + + if cur.fetchone()[0] < 1: + return False + else: + return True + +# Insert a connection to the database without testing. +def blunt_insert_dbentry(cur, file_id, db_table, test_datetime, destination, source_ip): + columns = ', '.join(['date', 'destination', 'source', 'file_id']) + values = '"' + '", "'.join([test_datetime.isoformat(), destination, source_ip, str(file_id)]) + '"' + sql = "INSERT INTO " + db_table + " (" + columns + ") VALUES(" + values + ") " + cur.execute(sql) + +# Insert a test connection to the database, if it not already exists +def insert_dbentry(cur, file_id, db_table, test_datetime, destination, source_ip): + # Check if the entry exists already + sql = "SELECT COUNT(*) FROM " + db_table + " WHERE date = '" + test_datetime.isoformat() + "' AND destination = '" + destination + "' AND source = '" + source_ip + "' AND file_id = " + str(file_id) + cur.execute(sql) + + # If not, then insert it + if cur.fetchone()[0] < 1: + print 'Found new test performed on the', test_datetime, 'from ' + destination + ' -> ' + source_ip + '.' + blunt_insert_dbentry(cur, file_id, db_table, test_datetime, destination, source_ip) + +# Returns the id of a filename in the filename table. Creates a new row if the filename does not exist. +def get_file_id(cur, filename): + sql = "SELECT id FROM " + db_filetable + " WHERE filename ='" + filename + "'" + cur.execute(sql) + id = cur.fetchone() + # If the entry does not exist, we add it in + if not id: + sql = "INSERT INTO " + db_filetable + " (filename) VALUES('" + filename + "')" + cur.execute(sql) + return get_file_id(cur, filename) + return id[0] + +# do deduplucation of connection strings +def dedup(file_id, table, test_datetime, destination, source_ip): + key = str(file_id) + table + str(test_datetime) + destination + source_ip + if key in deduplookup: + return False + else: + deduplookup[key] = True + return True + +# returns True on error, False on correct processing +def process_file(f, filename): + start_time = datetime.now() + failure = True + try: + # Connect to the mysql database + db = MySQLdb.connect(host = db_host, + user = db_user, + passwd = db_passwd, + db = db_name) + cur = db.cursor() + + # Find the destination server by investigating the filename + destination = extract_destination(filename) + print 'Destination: ', destination, + + # Get the filename id from the files table + file_id = get_file_id(cur, filename) + db.commit() + + # Find the testsuite by investigating the filename + try: + test = [test for test in db_tables.keys() if test in filename][0] + except IndexError: + sys.stderr.write('The filename ' + filename + ' does not contain a valid testname.') + return 1 + # print "Found test suite " + test + + # The filetest ALONE, takes 3 seconds with a 9 million records database, without indexes + # But falls back to less than half a second when indexing is turned on on the db + filetest=True + # Read the file line by line and import it into the database + for line in f: + line = line.strip() + source_ip = extract_ip(line) + if ('' == source_ip): + continue # skip empty lines instead of error reporting them + test_datetime = extract_datetime(line) + if (filetest): + if (exists_dbentry(cur, file_id, db_tables[test], test_datetime, destination, source_ip)): + # this file has already been read: ABORT WITH ERROR + raise Exception('File entry already exist in db; the file has already been read: ' + filename) + filetest=False + # test if we have already done it in this or last filetest + if (dedup(file_id, db_tables[test], test_datetime, destination, source_ip)): + blunt_insert_dbentry(cur, file_id, db_tables[test], test_datetime, destination, source_ip) + end_time = datetime.now() + print 'File done in ' + str(end_time - start_time) + failure = False + except Exception as inst: + sys.stderr.write('Exception: '+str(inst.args) + '\n') + with open(logDir + errorLog, 'a') as f: + f.write(pathname + '\n') + f.write('Exception: '+str(inst.args) + '\n') + print + except IOError as e: + sys.stderr.write('Error handling file ' + filename + ' (' + str(e.args) + ')\n') + with open(logDir + errorLog, 'a') as f: + f.write(pathname + '\n') + f.write('Error handling file ' + filename + ' (' + str(e.args) + ')\n') + print +# This bit should probably be cleaned up. +# except: +# sys.stderr.write('Process error ' + '\n') + finally: + # Commit and finish up + sys.stderr.flush() + # db.commit() + # disconnect from server + db.close() + + return failure + +# get the test date from the archive filename +def extract_archive_date(filename): + m = re.match('^(\d{4})(\d{2})(\d{2})', filename) + return (m.group(1),m.group(2)) + +# test if archive directory exist, and create it if necessary +def create_archive_dir(ym): + if (not os.path.exists(ym)): + os.makedirs(ym) + return ym + +# move processed file to archive folder +def move_archive(pathname): + fname = os.path.basename(pathname) + (year,month) = extract_archive_date(fname) + aDir = create_archive_dir(archiveDir + year +'/'+ month) + shutil.move(pathname,aDir) + with open(logDir + processLog, 'a') as f: + f.write(pathname + '\n') + + +################################################################# +# # +# start of initialisation # +# Read command line options # +# # +################################################################# + +parser = OptionParser() +parser.add_option("-q", "--quiet", action="store_false", dest="verbose", default=False, help="don't print status messages to stdout") +(options, args) = parser.parse_args() +if len(args) == 0: + usage() + +# create file if necessary, as open by itself doesn't cut it +f = open(logDir + processLog, 'a') +f.write("\nNew batchjob on " + str(datetime.now())) +f.close + +# deduplookup is a hash we use for de-duplication of input lines +# maybe it is necessary to purge parts of it during the duration of the import +# but then we have to carefully monitor tests that appear in multiple files +# OR store the last test in a separate global (dirty? yeah, I know) +deduplookup = {} + +################################################################# +# # +# start of main program # +# # +################################################################# +global_start_time = datetime.now() + +# Iterate over ALL filenames +for pathname in args: + try: + with open(pathname, 'r') as f: + # Extract the basename of the filename, as the path is not of interest after this point + filename = os.path.basename(pathname) + print "processing file " + filename, + if (process_file(f, filename)): + shutil.move(pathname,errorDir) + else: + move_archive(pathname) + # file is automatically closed if needed + except IOError as e: + print 'Could not open file ' + pathname + '\nError: ' + str(e.args) + +global_end_time = datetime.now() + +print '=====================================\nAll Done. ' + str(len(args)) + ' file(s) in ' + str(global_end_time - global_start_time) From 91682aecbc38ef4e7d9665ed59086c1bec5e15f7 Mon Sep 17 00:00:00 2001 From: Axel Roest Date: Tue, 10 Jul 2012 20:28:53 +0200 Subject: [PATCH 11/12] added maxmind.py and updated mlab_mysql_import to take advantage of it. maxmind.py indexes one of the maxmind tables, to greatly speed up the locId lookup process. --- maxmind.py | 100 ++++++++++++++ mlab_mysql_import.py | 97 ++++++++++--- mlab_mysql_import4.py | 307 ------------------------------------------ 3 files changed, 175 insertions(+), 329 deletions(-) create mode 100644 maxmind.py delete mode 100755 mlab_mysql_import4.py diff --git a/maxmind.py b/maxmind.py new file mode 100644 index 0000000..9a5698f --- /dev/null +++ b/maxmind.py @@ -0,0 +1,100 @@ +#!/usr/bin/python +# +# maxmind.py +# +# A class to represent a full maxmind database as a python hash, to make locId lookups much faster (hopefully) +# +# Algorithm: +# set up two tables: +# startips = list of all the startips, we binary search through this using bisect +# ipranges = hash of all the ranges, where the key is the startip, and the endip is the value +# +# binary search through startips to find the closest startip of the target ip +# get the endip of the range from the hash, and compare +# return locId to caller +# +# Initials: +# AX Axel Roest +# RB Ruben Bloemgarten +# +# Version history +# 20120710 AR first version +# +# ToDO: +# + +import sys +import re +import os +import math +import bisect +from datetime import datetime +import MySQLdb + +################################################################# +# # +# settings # +# # +################################################################# + +# Defaults + +################################################################# +# # +# the meat # +# # +################################################################# + +class MaxMind: + # two tables: + # startips = [] # list of all the startips, we binary search through this using bisect + # ipranges = {} # hash of all the ranges, where the key is the startip, and the endip is the value + + # initialise the object with the database table to instantiate with + def __init__(self, maxmind_db_host, maxmind_db_user, maxmind_db_passwd, maxmind_db_name, maxmind_table_name): + global_start_time = datetime.now() + try: + # Connect to the mysql database + maxmind_db = MySQLdb.connect(host = maxmind_db_host, + user = maxmind_db_user, + passwd = maxmind_db_passwd, + db = maxmind_db_name) + maxmind_cursor = maxmind_db.cursor() + MaxMind.loadTable(self, maxmind_cursor, maxmind_table_name) + except Exception as e: + print('Aborting maxmind due to error: ' + str(e)) + exit(1) + finally: + maxmind_cursor.close() + global_end_time = datetime.now() + print 'MaxMind: Read and indexed `' + maxmind_table_name + '` in ' + str(global_end_time - global_start_time) + ' seconds.' + + def loadTable(self, maxmind_cursor, maxmind_table_name): + # grab everything (whole table, 3,5 million items) + sql = """SELECT startIpNum, endIpNum, locId FROM `{0}`""".format(maxmind_table_name) + maxmind_cursor.execute(sql) + result = maxmind_cursor.fetchall() + if result: + # fromkeys + self.startips = [x[0] for x in result] + self.startips.sort() + self.ipranges = {int(start) : [int(e),int(l)] for start,e,l in result} + + def find_le(self, a, x): + # Find rightmost value less than or equal to x + i = bisect.bisect_right(a, x) + if i: + return a[i-1] + raise ValueError + + def lookup(self, ipnumber): + # print "looking up: " + str(ipnumber) + i = self.find_le(self.startips, ipnumber) + # print "i="+str(i) + (endip, loc) = self.ipranges[i] + # print endip, loc + if ipnumber <= endip: + return loc + else: + return -1 + diff --git a/mlab_mysql_import.py b/mlab_mysql_import.py index c286b26..3644578 100755 --- a/mlab_mysql_import.py +++ b/mlab_mysql_import.py @@ -10,6 +10,7 @@ # 20120629 AX added loop over all arguments, exception handling, restructured code, moved processed files to archive or error folder # 20120708 AX skip empty ip lines instead or error message # 20120708 RB cleaning some names and spelling, also we don't want processed_files.log to clobber the downloaders processed_files.log. So we should use overly descriptive names +# 20120710 AX added locId lookup and added longip to insert query # # test: # cd /DATA @@ -21,15 +22,14 @@ # v move error files naar error directory # v log process and errors # v skip empty ip lines instead or error message +# v added locId lookup and added longip to insert query # # Get the date from the filename, and look up the correct maxmind database # then, insert the locId directly with the line in the mlab/{glasnost,ndt} database, preventing slow future updates # on the other hand, all these updates might be extremely slow: TEST # -# todo : refactor all the utility functions in a separate file -# todo : refactor all the passwords in a separate file (which is NOT in the repo, AND is in the .gitignore list - - +# todo : refactor all the utility functions in a separate file +# todo : refactor all the passwords in a separate file (which is NOT in the repo, AND is in the .gitignore list import sys import re @@ -39,6 +39,8 @@ import dateutil.parser as dparser import MySQLdb import shutil +from maxmind import MaxMind +import socket, struct ################################################################# # # @@ -51,11 +53,12 @@ db_user = "root" # your username db_passwd = "" # your password db_name = "mlab" # name of the database -db_tables = {"glasnost": "glasnost", "ndt": "ndt"} # a mapping from testname to tablename +db_tables = {"glasnost": "glasnost", "ndt": "ndt_test"} # a mapping from testname to tablename db_filetable = 'files' # directories baseDir = '/DATA/mlab/' +#baseDir = '/home/axel/mlab/' scratchDir = baseDir + 'scratch/' workDir = baseDir + 'work/' archiveDir = baseDir + 'archive/' @@ -67,14 +70,26 @@ errorLog = "mlab_mysql_import_error.log" processLog = "mlab_mysql_import_processed_files.log" +# default tables +maxmind_table = 'Blocks_GeoLiteCity_Last' +ndt_import = 'ndt_import' ################################################################# # # # functions # # # ################################################################# +# Convert an IP string to long +def ip2long(ip): + packedIP = socket.inet_aton(ip) + return struct.unpack("!L", packedIP)[0] + +def long2ip(l): + return socket.inet_ntoa(struct.pack('!L', l)) + def usage(): - print "Usage: mlab_mysql_import3.py mlab_file1.csv [mlab_files.csv ...]" + print "Usage: mlab_mysql_import.py [ -m maxmind_Blocks_Tablename ] mlab_file1.csv [mlab_files.csv ...]" + print "Default: maxmind_Blocks_Tablename = `Blocks_GeoLiteCity_Last`" sys.exit(1) # This routine extracts the destination server of the mlab file. @@ -133,9 +148,13 @@ def exists_dbentry(cur, file_id, db_table, test_datetime, destination, source_ip # Insert a connection to the database without testing. def blunt_insert_dbentry(cur, file_id, db_table, test_datetime, destination, source_ip): - columns = ', '.join(['date', 'destination', 'source', 'file_id']) - values = '"' + '", "'.join([test_datetime.isoformat(), destination, source_ip, str(file_id)]) + '"' + longip = ip2long(source_ip) + # locid = 0 + locid = mm.lookup(longip) # lookup location id from ip number + columns = ', '.join(['date', 'destination', 'source', 'file_id', 'longip', 'locId']) + values = '"' + '", "'.join([test_datetime.isoformat(), destination, source_ip, str(file_id), str(longip), str(locid)]) + '"' sql = "INSERT INTO " + db_table + " (" + columns + ") VALUES(" + values + ") " + # print sql cur.execute(sql) # Insert a test connection to the database, if it not already exists @@ -170,6 +189,26 @@ def dedup(file_id, table, test_datetime, destination, source_ip): deduplookup[key] = True return True +# for the temp table, look up all the locations with the locId +def lookup_locations(cur, destination): + location_table_name = maxmind_table.replace("Blocks", "Location") + # sql = 'UPDATE mlab.`' + destination + '` L, maxmind.`' + location_table_name + '` M SET L.country_code = M.country, L.region=M.region, L.city=M.city, L.postalCode=M.postalCode, L.latitude=M.latitude, L.longitude=M.longitude, L.metroCode=M.metroCode, L.areaCode=M.areaCode WHERE L.`locId` = M.`locId`' + sql = 'UPDATE mlab.`ndt_import` L, maxmind.`' + location_table_name + '` M SET L.country_code = M.country, L.region=M.region, L.city=M.city, L.postalCode=M.postalCode, L.latitude=M.latitude, L.longitude=M.longitude, L.metroCode=M.metroCode, L.areaCode=M.areaCode WHERE L.`locId` = M.`locId`' + updated = cur.execute(sql) + # update country from country_code later? + return updated + +# clear the temp table +def clear_temp_table(cur): + sql = 'truncate table `' + ndt_import + '`' + cur.execute(sql) + +# move the temp table to the real on (either ndt_test or ndt) +def move_temp_table(cur, destination): + sql = 'INSERT INTO `' + destination + '` (`created_at`, `date`, `destination`, `source`, `file_id`, `country_code`, `longip`, `locId`, `country`, `region`, `city`, `postalCode`, `latitude`, `longitude`, `metroCode`, `areaCode`) SELECT * FROM `' + ndt_import + '`' + updated = cur.execute(sql) + return updated + # returns True on error, False on correct processing def process_file(f, filename): start_time = datetime.now() @@ -181,6 +220,7 @@ def process_file(f, filename): passwd = db_passwd, db = db_name) cur = db.cursor() + clear_temp_table(cur) # Find the destination server by investigating the filename destination = extract_destination(filename) @@ -190,7 +230,7 @@ def process_file(f, filename): file_id = get_file_id(cur, filename) db.commit() - # Find the testsuite by investigating the filename + # Find the testsuite (glasnost or ndt) by investigating the filename try: test = [test for test in db_tables.keys() if test in filename][0] except IndexError: @@ -198,7 +238,7 @@ def process_file(f, filename): return 1 # print "Found test suite " + test - # The filetest ALONE, takes 3 seconds with a 9 million records database, without indexes + # The filetest ALONE, takes 3 seconds with a 9 million records database, without indexes # But falls back to less than half a second when indexing is turned on on the db filetest=True # Read the file line by line and import it into the database @@ -215,9 +255,12 @@ def process_file(f, filename): filetest=False # test if we have already done it in this or last filetest if (dedup(file_id, db_tables[test], test_datetime, destination, source_ip)): - blunt_insert_dbentry(cur, file_id, db_tables[test], test_datetime, destination, source_ip) + # blunt_insert_dbentry(cur, file_id, db_tables[test], test_datetime, destination, source_ip) + blunt_insert_dbentry(cur, file_id, ndt_import, test_datetime, destination, source_ip) end_time = datetime.now() print 'File done in ' + str(end_time - start_time) + lookup_locations(cur, destination) + move_temp_table(cur, db_tables[test]) failure = False except Exception as inst: sys.stderr.write('Exception: '+str(inst.args) + '\n') @@ -231,9 +274,6 @@ def process_file(f, filename): f.write(pathname + '\n') f.write('Error handling file ' + filename + ' (' + str(e.args) + ')\n') print -# This bit should probably be cleaned up. -# except: -# sys.stderr.write('Process error ' + '\n') finally: # Commit and finish up sys.stderr.flush() @@ -273,7 +313,11 @@ def move_archive(pathname): parser = OptionParser() parser.add_option("-q", "--quiet", action="store_false", dest="verbose", default=False, help="don't print status messages to stdout") +parser.add_option("-m", "--maxmind", dest="maxmind_table", default='', help="optional maxmind_table, if omitted we use 'Last'") (options, args) = parser.parse_args() +if options.maxmind_table != '': + maxmind_table = options.maxmind_table + if len(args) == 0: usage() @@ -295,17 +339,26 @@ def move_archive(pathname): ################################################################# global_start_time = datetime.now() +# get instance of maxmind table +print "using " + maxmind_table + +mm = MaxMind(db_host, db_user, db_passwd, "maxmind",maxmind_table) + +if not mm: + sys.stderr.write('maxmind table does not exist: ' + maxmind_table + ' (' + str(e.args) + ')\n') + exit(1) + # Iterate over ALL filenames for pathname in args: - try: - with open(pathname, 'r') as f: - # Extract the basename of the filename, as the path is not of interest after this point - filename = os.path.basename(pathname) + try: + with open(pathname, 'r') as f: + # Extract the basename of the filename, as the path is not of interest after this point + filename = os.path.basename(pathname) print "processing file " + filename, - if (process_file(f, filename)): - shutil.move(pathname,errorDir) - else: - move_archive(pathname) + if (process_file(f, filename)): + shutil.move(pathname,errorDir) + else: + move_archive(pathname) # file is automatically closed if needed except IOError as e: print 'Could not open file ' + pathname + '\nError: ' + str(e.args) diff --git a/mlab_mysql_import4.py b/mlab_mysql_import4.py deleted file mode 100755 index 37c0dff..0000000 --- a/mlab_mysql_import4.py +++ /dev/null @@ -1,307 +0,0 @@ -#!/usr/bin/python -# -# Initials: SF Simon Funke -# RB Ruben Bloemgarten -# AX Axel Roest -# -# Version history -# 2012xxxx SF first version -# 20120628 AX removed testing for every line, added timing code, -# 20120629 AX added loop over all arguments, exception handling, restructured code, moved processed files to archive or error folder -# 20120708 AX skip empty ip lines instead or error message -# 20120708 RB cleaning some names and spelling, also we don't want processed_files.log to clobber the downloaders processed_files.log. So we should use overly descriptive names -# -# test: -# cd /DATA -# python scripts/mlab/mlab_mysql_import2.py mlab/clean/glasnost/20090128T000000Z-batch-batch-glasnost-0002.tgz.csv -# -# ToDO: v loop over all arguments in sys.argv[0] -# v deduplication toevoegen (put in hash, test on hash, clear hash for each file, but keep last entry -# v move files naar archive directory -# v move error files naar error directory -# v log process and errors -# v skip empty ip lines instead or error message - -import sys -import re -import os -from optparse import OptionParser -from datetime import datetime -import dateutil.parser as dparser -import MySQLdb -import shutil - -################################################################# -# # -# settings # -# # -################################################################# - -# PLEASE UPDATE THESE SETTINGS -db_host = "localhost" # your host, usually localhost -db_user = "root" # your username -db_passwd = "" # your password -db_name = "mlab" # name of the database -db_tables = {"glasnost": "glasnost", "ndt": "ndt_test"} # a mapping from testname to tablename -db_filetable = 'files' - -# directories -baseDir = '/DATA/mlab/' -#baseDir = '/home/axel/mlab/' -scratchDir = baseDir + 'scratch/' -workDir = baseDir + 'work/' -archiveDir = baseDir + 'archive/' -errorDir = baseDir + 'error/' -logDir = baseDir + 'logs/' -cleanDir = baseDir + 'clean/' - -#files -errorLog = "mlab_mysql_import_error.log" -processLog = "mlab_mysql_import_processed_files.log" - -################################################################# -# # -# functions # -# # -################################################################# - -def usage(): - print "Usage: mlab_mysql_import3.py mlab_file1.csv [mlab_files.csv ...]" - sys.exit(1) - -# This routine extracts the destination server of the mlab file. -# It assumes that the filename has the form like 20100210T000000Z-mlab3-dfw01-ndt-0000.tgz.csv -# -def extract_destination(filename): - # Split the filename and perform some tests if it conforms to our standard - f_split = filename.split('-') - if len(f_split) < 3: - raise Exception("The specified filename (", filename, ") should contain at least two '-' characters that delimit the data, destination and the suffix.") - - if '.tgz.csv' not in f_split[-1]: - print "The specified filename (", filename, ") should end with '.tgz.csv'." - - return '.'.join(filename.split('-')[1:-1]) - -# Returns the datetime contained in string. -def extract_datetime(string): - # Extract the date - date_match = re.search(r'\d{4}/\d{2}/\d{2}', string) - if not date_match: - raise Exception('Error in import: line "', string, '" does not contain a valid date.') - # Extract the time - time_match = re.search(r'\d{2}:\d{2}:\d{2}', string) - if not time_match: - raise Exception('Error in import: line "', string, '" does not contain a valid time.') - - try: - return dparser.parse(date_match.group(0) + ' ' + time_match.group(0), fuzzy=True) - except ValueError: - raise ValueError, 'Error in import: line "' + string + '" does not contain a valid date and time.' - -# Returns the first valid ip address contained in string. -# return with empty string when we encounter cputime, or no ip number -def extract_ip(string): - if re.search('cputime', string): - return '' - # Extract the date - match = re.search(r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}', string) - if not match: - # ignore file - return '' - # raise Exception ('Error in import: line "', string, '" does not contain a valid ip address.') - return match.group(0) - -# Test if the entry already exists in the database -def exists_dbentry(cur, file_id, db_table, test_datetime, destination, source_ip): - # Check if the entry exists already - sql = "SELECT COUNT(*) FROM " + db_table + " WHERE date = '" + test_datetime.isoformat() + "' AND destination = '" + destination + "' AND source = '" + source_ip + "' AND file_id = " + str(file_id) - cur.execute(sql) - - if cur.fetchone()[0] < 1: - return False - else: - return True - -# Insert a connection to the database without testing. -def blunt_insert_dbentry(cur, file_id, db_table, test_datetime, destination, source_ip): - columns = ', '.join(['date', 'destination', 'source', 'file_id']) - values = '"' + '", "'.join([test_datetime.isoformat(), destination, source_ip, str(file_id)]) + '"' - sql = "INSERT INTO " + db_table + " (" + columns + ") VALUES(" + values + ") " - cur.execute(sql) - -# Insert a test connection to the database, if it not already exists -def insert_dbentry(cur, file_id, db_table, test_datetime, destination, source_ip): - # Check if the entry exists already - sql = "SELECT COUNT(*) FROM " + db_table + " WHERE date = '" + test_datetime.isoformat() + "' AND destination = '" + destination + "' AND source = '" + source_ip + "' AND file_id = " + str(file_id) - cur.execute(sql) - - # If not, then insert it - if cur.fetchone()[0] < 1: - print 'Found new test performed on the', test_datetime, 'from ' + destination + ' -> ' + source_ip + '.' - blunt_insert_dbentry(cur, file_id, db_table, test_datetime, destination, source_ip) - -# Returns the id of a filename in the filename table. Creates a new row if the filename does not exist. -def get_file_id(cur, filename): - sql = "SELECT id FROM " + db_filetable + " WHERE filename ='" + filename + "'" - cur.execute(sql) - id = cur.fetchone() - # If the entry does not exist, we add it in - if not id: - sql = "INSERT INTO " + db_filetable + " (filename) VALUES('" + filename + "')" - cur.execute(sql) - return get_file_id(cur, filename) - return id[0] - -# do deduplucation of connection strings -def dedup(file_id, table, test_datetime, destination, source_ip): - key = str(file_id) + table + str(test_datetime) + destination + source_ip - if key in deduplookup: - return False - else: - deduplookup[key] = True - return True - -# returns True on error, False on correct processing -def process_file(f, filename): - start_time = datetime.now() - failure = True - try: - # Connect to the mysql database - db = MySQLdb.connect(host = db_host, - user = db_user, - passwd = db_passwd, - db = db_name) - cur = db.cursor() - - # Find the destination server by investigating the filename - destination = extract_destination(filename) - print 'Destination: ', destination, - - # Get the filename id from the files table - file_id = get_file_id(cur, filename) - db.commit() - - # Find the testsuite by investigating the filename - try: - test = [test for test in db_tables.keys() if test in filename][0] - except IndexError: - sys.stderr.write('The filename ' + filename + ' does not contain a valid testname.') - return 1 - # print "Found test suite " + test - - # The filetest ALONE, takes 3 seconds with a 9 million records database, without indexes - # But falls back to less than half a second when indexing is turned on on the db - filetest=True - # Read the file line by line and import it into the database - for line in f: - line = line.strip() - source_ip = extract_ip(line) - if ('' == source_ip): - continue # skip empty lines instead of error reporting them - test_datetime = extract_datetime(line) - if (filetest): - if (exists_dbentry(cur, file_id, db_tables[test], test_datetime, destination, source_ip)): - # this file has already been read: ABORT WITH ERROR - raise Exception('File entry already exist in db; the file has already been read: ' + filename) - filetest=False - # test if we have already done it in this or last filetest - if (dedup(file_id, db_tables[test], test_datetime, destination, source_ip)): - blunt_insert_dbentry(cur, file_id, db_tables[test], test_datetime, destination, source_ip) - end_time = datetime.now() - print 'File done in ' + str(end_time - start_time) - failure = False - except Exception as inst: - sys.stderr.write('Exception: '+str(inst.args) + '\n') - with open(logDir + errorLog, 'a') as f: - f.write(pathname + '\n') - f.write('Exception: '+str(inst.args) + '\n') - print - except IOError as e: - sys.stderr.write('Error handling file ' + filename + ' (' + str(e.args) + ')\n') - with open(logDir + errorLog, 'a') as f: - f.write(pathname + '\n') - f.write('Error handling file ' + filename + ' (' + str(e.args) + ')\n') - print -# This bit should probably be cleaned up. -# except: -# sys.stderr.write('Process error ' + '\n') - finally: - # Commit and finish up - sys.stderr.flush() - # db.commit() - # disconnect from server - db.close() - - return failure - -# get the test date from the archive filename -def extract_archive_date(filename): - m = re.match('^(\d{4})(\d{2})(\d{2})', filename) - return (m.group(1),m.group(2)) - -# test if archive directory exist, and create it if necessary -def create_archive_dir(ym): - if (not os.path.exists(ym)): - os.makedirs(ym) - return ym - -# move processed file to archive folder -def move_archive(pathname): - fname = os.path.basename(pathname) - (year,month) = extract_archive_date(fname) - aDir = create_archive_dir(archiveDir + year +'/'+ month) - shutil.move(pathname,aDir) - with open(logDir + processLog, 'a') as f: - f.write(pathname + '\n') - - -################################################################# -# # -# start of initialisation # -# Read command line options # -# # -################################################################# - -parser = OptionParser() -parser.add_option("-q", "--quiet", action="store_false", dest="verbose", default=False, help="don't print status messages to stdout") -(options, args) = parser.parse_args() -if len(args) == 0: - usage() - -# create file if necessary, as open by itself doesn't cut it -f = open(logDir + processLog, 'a') -f.write("\nNew batchjob on " + str(datetime.now())) -f.close - -# deduplookup is a hash we use for de-duplication of input lines -# maybe it is necessary to purge parts of it during the duration of the import -# but then we have to carefully monitor tests that appear in multiple files -# OR store the last test in a separate global (dirty? yeah, I know) -deduplookup = {} - -################################################################# -# # -# start of main program # -# # -################################################################# -global_start_time = datetime.now() - -# Iterate over ALL filenames -for pathname in args: - try: - with open(pathname, 'r') as f: - # Extract the basename of the filename, as the path is not of interest after this point - filename = os.path.basename(pathname) - print "processing file " + filename, - if (process_file(f, filename)): - shutil.move(pathname,errorDir) - else: - move_archive(pathname) - # file is automatically closed if needed - except IOError as e: - print 'Could not open file ' + pathname + '\nError: ' + str(e.args) - -global_end_time = datetime.now() - -print '=====================================\nAll Done. ' + str(len(args)) + ' file(s) in ' + str(global_end_time - global_start_time) From 42f831de8320e46c079aad8f9803c9a9e43ce02b Mon Sep 17 00:00:00 2001 From: Axel Roest Date: Tue, 10 Jul 2012 20:48:12 +0200 Subject: [PATCH 12/12] added maxmind_test.py, to test the class --- maxmind_test.py | 53 +++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 53 insertions(+) create mode 100644 maxmind_test.py diff --git a/maxmind_test.py b/maxmind_test.py new file mode 100644 index 0000000..125add8 --- /dev/null +++ b/maxmind_test.py @@ -0,0 +1,53 @@ +#!/usr/bin/python +# +# maxmind_test.py +# +# A test script for the maxmind class +# +# Initials: +# AX Axel Roest +# +# Version history +# 20120710 AR first version +# +# ToDO: +# + +import sys +import re +import os +import math +import bisect +from datetime import datetime +import MySQLdb +import random +from maxmind import MaxMind + +# test + +mm = MaxMind(db_host, db_user, db_passwd, db_name, db_filetable) +ip = 67276848 +loc = mm.lookup(ip) +print str(ip) + ' : ' + str(loc) + +ip = 67277000 +loc = mm.lookup(ip) +print str(ip) + ' : ' + str(loc) + +ip = 67277023 +loc = mm.lookup(ip) +print str(ip) + ' : ' + str(loc) + +ip = 67277024 +loc = mm.lookup(ip) +print str(ip) + ' : ' + str(loc) + +testamount = 100000 +start_time = datetime.now() +for i in range(testamount): + r = random.randint(33996344, 3741319167) + loc = mm.lookup(r) +end_time = datetime.now() +totaltime = end_time - start_time +timeperlookup = totaltime / testamount +print '=====================================\nLookup ' + str(testamount) + ' ips in ' + str(totaltime) + ' seconds = ' + str(timeperlookup) + ' seconds per lookup' \ No newline at end of file