#!/usr/bin/env python import pgdb import pg import sys import getopt import xml_logger import re import os # simply takes a string s as input and prints it if running verbosely def output(s, newline=True): if not Settings.is_quiet: if newline: print s else: print s, def abort(status): ''' exit according to status ''' exit(status) class Feature(object): def __init__(self, id=None, name=None, distance=None, city=None,\ state=None, zipcode=None): self.id = id self.name = name self.distance = distance self.city = city self.state = state self.zipcode = zipcode class Directory(object): def __init__(self): self.db = pgdb.connect(user="postgres", password="postgres", host="127.0.0.1", database="directory") c = self.db.cursor() c.execute("SET client_encoding TO 'LATIN1'") # key = feature type # val = Feature instance self.query_dict = {} def list_feature_types(self): output("This may take a while...") c = self.db.cursor() c.execute("SELECT DISTINCT feature_class FROM features") rows = c.fetchall() for row in rows: output(row[0]) Settings.feature_types.append(row[0]) if Settings.is_xml_output: filename = os.path.join(os.path.dirname(sys.argv[0]), "features_list.xml") xml_logger.log_feature_types(Settings.feature_types, filename) def get_state(self, feature_id, feature_name, abbreviation=True): c = self.db.cursor() query = """SELECT state_numeric FROM state_numeric WHERE feature_id='%s'""" % feature_id c.execute(query) rows = c.fetchall() state_numeric = "" try: state_numeric = rows[0][0] except IndexError: print 'Could not find numeric state info for "%s"' % feature_name return None query = """SELECT state_abbreviation FROM states WHERE state_code='%s'""" % state_numeric c.execute(query) rows = c.fetchall() state_abbreviation = "" try: state_abbreviation = rows[0][0] except IndexError: print 'Could not find state abbreviation for "%s"' % feature_name return None return state_abbreviation def guess_zip_and_city(self, feature_id, feature_name): c = self.db.cursor() query = """SELECT zip_code, city FROM (SELECT Z.zip_code, Z.city, st_distance(Z.longlat_point_meter, L.longlat_point_meter) FROM zip_codes Z, location L WHERE L.feature_id='%s' ORDER BY 3 LIMIT 1) AS subquery;""" % feature_id c.execute(query) rows = c.fetchall() zip = "" city = "" try: zip = rows[0][0] city = rows[0][1] except IndexError: print 'Could not find zip and/or city info for "%s"' % \ feature_name return (None, None) return (zip, city) def zip_code_query(self): for feature_type in Settings.feature_types: c = self.db.cursor() output("Querying %s. Please wait..." % feature_type) # grab the feature ids, names, and distances of the n closest # features of feature_type query = """SELECT L.feature_id, F.feature_name, st_distance(L.longlat_point_meter, Z.longlat_point_meter) FROM location L, zip_codes Z, features F WHERE Z.zip_code='%s' AND L.feature_id=F.feature_id AND UPPER(F.feature_class)='%s' ORDER BY 3 LIMIT 5;""" % \ (Settings.source, feature_type.upper()) c.execute(query) rows = c.fetchall() self.query_dict[feature_type] = [] for row in rows: f = Feature() f.id = row[0] f.name = row[1] f.distance = row[2] state = self.get_state(f.id, f.name) if state is not None: f.state = state else: return 1 f.zipcode, f.city = self.guess_zip_and_city(f.id, f.name) self.query_dict[feature_type].append(f) self.feature_print(feature_type) def feature_print(self, feature_type): # calculate the width of the header and print it n1 = (Settings.character_width - len(feature_type) - 2) / 2 header = "".join(("=" * n1, " ", feature_type.upper(), " ", "=" * n1)) if len(header) == Settings.character_width - 1: header = "".join((header, "=")) print header # finally print the goods for i,f in enumerate(self.query_dict[feature_type]): output("Rank:\t\t%i" % (i + 1)) output("Name:\t\t%s" % f.name) output("City:\t\t%s" % f.city) output("State:\t\t%s" % f.state) output("Zip:\t\t%s" % f.zipcode) output("Distance:\t%s" % f.distance) output("") if Settings.is_xml_output: filename = os.path.join(os.path.dirname(sys.argv[0]), Settings.xml_file_name) xml_logger.log_query_results(self.query_dict, filename) def feature_query(self): for feature_type in Settings.feature_types: c = self.db.cursor() output("Querying %s. Please wait..." % feature_type) # grab the feature_id of the source query = "SELECT feature_id FROM features\ WHERE UPPER(feature_name)='%s'" % Settings.source.upper() c.execute(query) rows = c.fetchall() # XXX: resolve duplicates instead of just taking the first source # found source_id = "" try: source_id = rows[0][0] except IndexError: print '"%s" does not exist in the database' % Settings.source return 1 c.execute(query) rows = c.fetchall() query = """SELECT A.feature_id, st_distance(A.longlat_point_meter, B.longlat_point_meter), F.feature_name FROM location A, location B, features F WHERE B.feature_id='%s' AND A.feature_id<>'%s' AND F.feature_id=A.feature_id and UPPER(F.feature_class)='%s' ORDER BY 2 LIMIT 5;""" % \ (source_id, source_id, feature_type.upper()) c.execute(query) rows = c.fetchall() # get the city, state, and zip for the feature_id(s) found above. # create a Feature instance and add it to the query dict self.query_dict[feature_type] = [] for row in rows: f = Feature() f.id = row[0] f.distance = row[1] f.name = row[2] state = self.get_state(f.id, f.name) if state is not None: f.state = state else: return 1 f.zipcode, f.city = self.guess_zip_and_city(f.id, f.name) self.query_dict[feature_type].append(f) self.feature_print(feature_type) def list_nearest_features(self): if Settings.is_zipcode_query: return self.zip_code_query() else: return self.feature_query() class Settings(object): is_quiet = False is_xml_output = False is_list_feature_types = False feature_types = [] source = "" xml_file_name = "" num_features = 5 character_width = 80 is_zipcode_query = False is_feature_types_query = False is_feature_query = False def __init__(self): self.has_source = False self.has_feature_types = False def argument_parser(self): opts = [] args = [] try: opts, args = getopt.getopt(sys.argv[1:],"hlqn:x:",["help","list-feature-types","quiet","num-features=","terminal-width=","xml="]) except getopt.GetoptError: self.help() sys.exit(1) for o,a in opts: if o in ("-q","--quiet"): Settings.is_quiet = True for o,a in opts: if o in ("-h","--help"): self.help() abort(0) if o in ("-x","--xml"): Settings.is_xml_output = True Settings.xml_file_name = a if o in ("--terminal-width"): try: Settings.character_width = int(a) except ValueError: output("You must specify an integer for -n or --num-features") abort(1) if o in ("-l","--list-feature-types"): Settings.is_feature_types_query = True if o in ("-n","--num-features"): try: Settings.num_features = int(a) except ValueError: output("You must specify an integer for -n or --num-features") abort(1) try: Settings.feature_types = args[0].split(",") self.has_feature_types = True except IndexError: pass try: Settings.source = args[1] self.has_source = True except IndexError: pass if self.has_feature_types or self.has_source: if not self.has_feature_types == self.has_source: output("You must specify both a feature type followed by a zip code, college, or\nuniversity") abort(1) if not Settings.is_feature_types_query: if not self.has_feature_types and not self.has_source: output("You must specify a feature type list") output("You must specify a zip code, college, or university") zip_code_regex = re.compile("\d\d\d\d\d") if len(Settings.source) == 5 and zip_code_regex.match(Settings.source): Settings.is_zipcode_query = True if self.has_feature_types and self.has_source: Settings.is_feature_query = True def help(self): output("Usage:") output(" Querying Features:") output(" directory.py [options] ") output("") output(" Where is a comma-delimited list of feature types.") output(" Feature types that consist of more than one word must be surrounded") output(" in quotes. One-word feature types may optionally be surrounded by") output(" quotes. College and universities must also be surrounded in quotes") output(" if they consist of more than one word. Letter casing (upper/lower") output(" is not important. Spaces should not be inserted between commas.") output(" If you wish to create a features list with all features, simply") output(" type the word \"all\" in place of a list of feature types") output(" List feature types:") output(" directory.py <-l|--list-feature-types>") output(" Queries are not case sensitive.") output("") output("Additional Options:") output(" -h | --help Print help information (this message).") output(" -q | --quiet Don't print anything.") output(" -x | --xml= Send XML output to .") output(" --terminal-width= Limit the number of characters printed per line to") output(" . Default is 80, which is standard.") output("") output("Examples:") output(" Get the 5 nearest schools, post offices, and airports to zip code 84078") output(' directory.py school,"post office",airport 84078') output(" Get the 5 nearest parks to Utah State University") output(" directory.py park \"Utah State University\"") output(" Get the five nearest features of each feature type to Utah State University") output(" directory.py all \"Utah State University\"") class Main(object): def __init__(self): try: self.d = Directory() except pg.InternalError, e: output("ERROR: COULD NOT CONNECT TO THE DATABASE") output(e) abort(2) def main(self): if Settings.is_feature_types_query: return self.d.list_feature_types() if Settings.is_feature_query: return self.d.list_nearest_features() if __name__ == '__main__': s = Settings() s.argument_parser() m = Main() sys.exit(m.main())