Jump to content

WikiXRay parser

From Meta, a Wikimedia project coordination wiki

Cut & paste the following code in a text file, and save it as dump_sax.py. Don't forget to give your file executable privileges.

It uses a tweak to speed up the parsing process, consisting on filtering the text events so that the characters method is called only when the complete text block has been buffered. You can see further details of this recipe in the Python CookBook (though I retrieved mine from the second edition of the printed version of the Python Cookbook, by O'Reilly; obviously, they explicitly give you permission to use the code for this purpose).

#############################################
#      WikiXRay: Quantitative Analysis of Wikipedia language versions                       
#############################################
#                  http://wikixray.berlios.de                                              
#############################################
# Copyright (c) 2006-7 Universidad Rey Juan Carlos (Madrid, Spain)     
#############################################
# This program is free software. You can redistribute it and/or modify    
# it under the terms of the GNU General Public License as published by 
# the Free Software Foundation; either version 2 or later of the GPL.     
#############################################
# Author: Jose Felipe Ortega Soto                                                             

import sys,os,codecs, datetime, random
import dbaccess
from xml.sax import saxutils,make_parser
from xml.sax.handler import feature_namespaces, ContentHandler
from xml.sax.saxutils import XMLFilterBase, XMLGenerator
from optparse import OptionParser

class text_normalize_filter(XMLFilterBase):
    """
    SAX filter to ensure that contiguous texts nodes are merged into one
    That hopefully speeds up the parsing process a lot, specially when reading
    revisions with long text
    Receip by Uche Ogbuji, James Kew and Peter Cogolo
    Retrieved from "Python Cookbook, 2nd ed., by Alex Martelli, Anna Martelli
    Ravenscroft, and David Ascher (O'Reillly Media, 2005) 0-596-00797-3"
    """

    def __init__(self, upstream, downstream):
        XMLFilterBase.__init__(self, upstream)
        self._downstream=downstream
        self._accumulator=[]
    def _complete_text_node(self):
        if self._accumulator:
            self._downstream.characters(''.join(self._accumulator))
            self._accumulator=[]
    def characters(self, text):
        self._accumulator.append(text)
    def ignorableWhiteSpace(self, ws):
        self._accumulator.append(text)
def _wrap_complete(method_name):
    def method(self, *a, **k):
        self._complete_text_node()
        getattr(self._downstream, method_name)(*a, **k)
    method.__name__= method_name
    setattr(text_normalize_filter, method_name, method)
for n in '''startElement endElement endDocument'''.split():
    _wrap_complete(n)

class wikiHandler(ContentHandler):
    """Parse an XML file generated by Wikipedia Export page into SQL data
    suitable to be imported by MySQL"""
    def __init__(self, options):
        self.fileErrPath="./errors.log"; self.options=options
        if self.options.monitor and not self.options.fileout and not self.options.streamout:
            self.acceso = dbaccess.get_Connection(self.options.machine, self.options.port,\
            self.options.user, self.options.passwd, self.options.database)
        self.nspace_dict={}; self.codens=''; self.page_dict={}; self.rev_dict = {}
        self.stack=[]; self.current_text = ''; self.current_elem=None; self.revfile=None
        self.pagefile=None
        self.page_num = 0; self.rev_num=0; self.last_page_len=0; self.rev_count=0
        self.prior_rev_id='NULL'; self.isRedirect='0'; self.isStub='0'; self.isMinor='0'
        self.revinsert=''; self.pageinsert=''; self.textinsert=''
        self.revinsertrows=0; self.revinsertsize=0; self.pageinsertrows=0
        self.pageinsertsize=0; self.textinsertrows=0; self.textinsertsize=0
        self.start=datetime.datetime.now(); self.timeCheck=None; self.timeDelta=None
        
    def startElement(self, name, attrs):
##    Here we define which tags we want to catch
##        In this case, we only want to recall the name of the tags in a stack
##        so we can later look up the parent node of a new tag
##        (for instance, to discriminate among page id, rev id and contributor id
##        all of them with the name=="id")
        if name=='page' or name=='revision' or name=='contributor':
            self.stack.append(name)
        elif name=='namespace':
            self.codens=attrs.get('key')
        elif name=='minor':
            self.isMinor='1'
        self.current_text=''
        self.current_elem=name
        return
        
    def endElement(self, name):
##    Defining tasks to manage contents from the last readed tag
##        Catching the namespace of this page
        if name=='namespace':
            self.nspace_dict[self.current_text]=self.codens
            
        elif name=='id':
            if self.stack[-1]=='contributor':
                ##Detecting contributor's attributes inside a revision
                self.rev_dict['rev_user']=self.current_text
            elif self.stack[-1]=='revision':
                self.rev_dict[name]=self.current_text
            elif self.stack[-1]=='page':
                self.page_dict[name]=self.current_text
            else:
                self.f=open(self.fileErrPath,'w')
                if len(self.stack)>0:
                    self.f.write("Unsupported parent tag for '"+name+"': "+self.stack[-1])
                self.f.close()
                
        elif name=='ip':
            self.rev_dict['rev_user']='0'
            self.rev_dict['username']=self.current_text
            
        elif name=='timestamp':
            ##Adequate formatting of timestamps
            self.rev_dict['timestamp']=self.current_text.replace('Z','').replace('T',' ')
                
        elif name=='contributor':
            ##Pop contributor tag from the stack
            self.stack.pop()
            
        elif name=='revision':
            self.rev_count+=1
            ##Store whether this is a redirect or stub page or not
            if len(self.rev_dict['text'])>0:
                if self.rev_dict['text'][0:9].upper()=='#REDIRECT':
                    self.isRedirect='1'
                else:
                    self.isRedirect='0'
            ## Takes from the first argument the threshold for stub's length
            if str(2*len(self.rev_dict['text']))<=self.options.stubth:
                self.isStub='1'
            else:
                self.isStub='0'
                
            ####CONSTRUCTION OF EXTENDED INSERTS FOR REVISIONS (STANDARD VERSION)######
            ##Values order: (rev_id, rev_page, [[rev_text_id=rev_id]], rev_comment,
            ##rev_user, rev_user_text, rev_timestamp, rev_is_minor)
            # Build current row for revinsert
            try:
	        newrevinsert="("+self.rev_dict['id']+","+self.page_dict['id']+","+self.rev_dict['id']
                if self.rev_dict.has_key('comment'):
                    newrevinsert+=","+'"'+self.rev_dict['comment'].replace("\\","\\\\").replace("'","\\'").replace('"', '\\"')+'"'
                else:
                    newrevinsert+=",''"
                newrevinsert+=","+self.rev_dict['rev_user']+","+'"'+self.rev_dict['username'].\
                replace("\\","\\\\").replace("'","\\'").replace('"', '\\"')+\
                '"'+","+'"'+self.rev_dict['timestamp']+\
                '"'+","+self.isMinor+")"
             
	    # In case that any field is missing or flawed, skip this revision and log to standard error
	    except (KeyError), e:
	        self.printfile = codecs.open("error_"+self.options.database,'a','utf_8')
	        self.printfile.write("Offending rev_dict was = \n")
	        self.printfile.write(str(self.rev_dict))
	        self.printfile.write("\n")
	        self.printfile.write("Offending page_dict was = \n")
	        self.printfile.write(str(self.page_dict))
	        self.printfile.write("\n")
	        self.printfile.write("====================================================\n")
	        self.printfile.write(str(e)+"\n")
	        self.printfile.write("====================================================\n\n")
	        self.printfile.close()
	        return

            if self.revinsertrows==0:
                #Always allow at least one row in extended inserts
                self.revinsert="INSERT INTO revision VALUES"+newrevinsert
                self.revinsertrows+=1
                #Conservative approach: assuming 2 bytes per UTF-8 character
                self.revinsertsize=len(self.revinsert)*2
            elif (self.revinsertsize+(2*len(newrevinsert))<=self.options.imaxsize*1024) and\
            ((self.revinsertrows+1)<=self.options.imaxrows):
                #Append new row to self.revinsert
                self.revinsert+=","+newrevinsert
                self.revinsertrows+=1
                #Conservative approach: assuming 2 bytes per UTF-8 character
                self.revinsertsize=len(self.revinsert)*2
            else:
                #We must finish and write currrent insert and begin a new one
                if self.options.fileout:
                    self.revinsert+=";\n"
                    # Write output to SQL file
                    self.revfile = codecs.open(self.options.revfile,'a','utf_8')
                    self.revfile.write(revinsert)
                    self.revfile.close()
                elif self.options.streamout:
                    # DON'T WRITE SQL TO FILES, GENERATE ENCONDED SQL STREAM FOR MYSQL
                    self.revinsert+=";"
                    print self.revinsert.encode('utf-8')
                elif self.options.monitor:
                    while 1:
                        try:
                            dbaccess.raw_query_SQL(self.acceso[1], self.revinsert.encode('utf-8'))
                        except (Exception), e:
                            print e
                        else:
                            break
                self.revinsert="INSERT INTO revision VALUES"+newrevinsert
                self.revinsertrows=1
                #Conservative approach: assuming 2 bytes per UTF-8 character
                self.revinsertsize=len(self.revinsert)*2
                
            ##################################################
            ##CONSTRUCTION OF EXTENDED INSERTS FOR TABLE TEXT
            ##Template for each row:
            ## (old_id, old_text, old_flags)
            newtextinsert="("+self.rev_dict['id']+','+'"'+\
            self.rev_dict['text'].replace("\\","\\\\").replace("'","\\'").replace('"', '\\"')+\
            '",'+'"utf8")'
            if self.textinsertrows==0:
                #Always allow at least one row in extended inserts
                self.textinsert="INSERT INTO text VALUES"+newtextinsert
                self.textinsertrows+=1
                #Conservative approach: assuming 2 bytes per UTF-8 character
                self.textinsertsize=len(self.textinsert)*2
            elif (self.textinsertsize+(2*len(newtextinsert))<=self.options.imaxsize*1024) and\
            ((self.textinsertrows+1)<=self.options.imaxrows):
                #Append new row to self.revinsert
                self.textinsert+=","+newtextinsert
                self.textinsertrows+=1
                #Conservative approach: assuming 2 bytes per UTF-8 character
                self.textinsertsize=len(self.textinsert)*2
            else:
                #We must finish and write currrent insert and begin a new one
                if self.options.fileout:
                    self.textinsert+=";\n"
                    # Write output to SQL file
                    self.textfile = codecs.open(self.options.textfile,'a','utf_8')
                    self.textfile.write(textinsert)
                    self.textfile.close()
                elif self.options.streamout:
                    # DON'T WRITE SQL TO FILES, GENERATE ENCONDED SQL STREAM FOR MYSQL
                    self.textinsert+=";"
                    print self.textinsert.encode('utf-8')
                elif self.options.monitor:
                    while 1:
                        try:
                            dbaccess.raw_query_SQL(self.acceso[1], self.textinsert.encode('utf-8'))
                        except (Exception), e:
                            print e
                        else:
                            break
                self.textinsert="INSERT INTO text VALUES"+newtextinsert
                self.textinsertrows=1
                #Conservative approach: assuming 2 bytes per UTF-8 character
                self.textinsertsize=len(self.textinsert)*2
            ##################################################
            ##################################################
            ##Store this rev_id to recall it when processing the following revision, if it exists
            self.prior_rev_id=self.rev_dict['id']
            ##Store this rev_len to recall it for the current page_len, in case this is the last revision for that page
            self.last_page_len=2*len(self.rev_dict['text'])
            self.rev_dict.clear()
            self.stack.pop()
            self.isMinor='0'
            self.rev_num+=1
            if self.options.verbose and self.options.log is None:
                # Display status report
                if self.rev_num % 1000 == 0:
                    self.timeCheck=datetime.datetime.now()
                    self.timeDelta=self.timeCheck-self.start
                    if self.timeDelta.seconds==0:
                        print >> sys.stderr, "page %d (%f pags. per sec.), revision %d (%f revs. per sec.)"\
                        % (self.page_num, 1e6*float(self.page_num)/self.timeDelta.microseconds,\
                        self.rev_num, 1e6*float(self.rev_num)/self.timeDelta.microseconds)
                    else:
                        print >> sys.stderr, "page %d (%f pags. per sec.), revision %d (%f revs. per sec.)"\
                        % (self.page_num, float(self.page_num)/self.timeDelta.seconds,\
                        self.rev_num, float(self.rev_num)/self.timeDelta.seconds)
            if self.options.verbose and self.options.log is not None:
                # TODO: Print report status to log file
                pass
        elif name=='page':
            ################################################
            #We must write the las revinsert before finishing this page
            if self.options.fileout:
                self.revinsert+=";\n"
            # Write output to SQL file
                self.revfile = codecs.open(self.options.revfile,'a','utf_8')
                self.revfile.write(self.revinsert)
                self.revfile.close()
            elif self.options.streamout:
                # DON'T WRITE SQL TO FILES, GENERATE ENCONDED SQL STREAM FOR MYSQL
                self.revinsert+=";"
                print self.revinsert.encode('utf-8')
            elif self.options.monitor:
                while 1:
                    try:
                        dbaccess.raw_query_SQL(self.acceso[1], self.revinsert.encode('utf-8'))
                    except (Exception), e:
                        print e
                    else:
                        break
            #Reset status vars
            self.revinsertrows=0
            self.revinsertsize=0
            ################################################
            ##Same for Insert into text table
            if self.options.fileout:
                self.textinsert+=";\n"
            # Write output to SQL file
                self.textfile = codecs.open(self.options.textfile,'a','utf_8')
                self.textfile.write(self.textinsert)
                self.textfile.close()
            elif self.options.streamout:
                # DON'T WRITE SQL TO FILES, GENERATE ENCONDED SQL STREAM FOR MYSQL
                self.textinsert+=";"
                print self.textinsert.encode('utf-8')
            elif self.options.monitor:
                while 1:
                    try:
                        dbaccess.raw_query_SQL(self.acceso[1], self.textinsert.encode('utf-8'))
                    except (Exception), e:
                        print e
                    else:
                        break
            #Reset status vars
            self.textinsertrows=0
            self.textinsertsize=0
            ################################################
            ##Recovering namespace for this page
            if self.nspace_dict.has_key(self.page_dict['title'].split(':')[0]):
                self.page_dict['namespace']=self.nspace_dict[self.page_dict['title'].split(':')[0]]
            else:
                self.page_dict['namespace']='0'
            ###################################################
            #CONSTRUCTION OF EXTENDED INSERT FOR PAGES (STANDARD VERSION)
            ###################################################
            ##Values order for page (page_id, page_namespace, page_title,  page_restrictions,
            ##page_counter[[unused]],
            ##page_is_redirect, page_is_new, page_random, page_touched[[default to '']],
            ##page_latest, page_len)
            newpageinsert="("+self.page_dict['id']+","+\
            self.page_dict['namespace']+',"'+\
            self.page_dict['title'].replace("\\","\\\\").replace("'","\\'").replace('"', '\\"')+'"'
            if self.page_dict.has_key('restrictions'):
                newpageinsert+=","+'"'+self.page_dict['restrictions']+'"'
            else:
                newpageinsert+=",''"
            newpageinsert+=","+'0'+","+self.isRedirect+","
            if self.rev_count>1:
                newpageinsert+="1,"
            else:
                newpageinsert+="0,"
            newpageinsert+=str(random.random())+","+\
            "''"+","+self.prior_rev_id+","+str(self.last_page_len)
            newpageinsert+=")"
            if self.pageinsertrows==0:
                self.pageinsert="INSERT INTO page VALUES"+newpageinsert
                self.pageinsertrows+=1
                self.pageinsertsize=len(self.pageinsert)*2
            elif (self.pageinsertsize+(2*len(newpageinsert))<=self.options.imaxsize*1024) and\
            (self.pageinsertrows+1<=self.options.imaxrows):
                #Append current row to extended insert
                self.pageinsert+=","+newpageinsert
                self.pageinsertrows+=1
                self.pageinsertsize=len(self.pageinsert)*2
            else:
                #We must write this extended insert and begin a new one
                if self.options.fileout:
                    #Write extended insert to file
                    self.pageinsert+=";\n"
                    self.pagefile = codecs.open(self.options.pagefile,'a','utf_8')
                    self.pagefile.write(self.pageinsert)
                    self.pagefile.close()
                elif self.options.streamout:
                    #Write extended insert to sys.stdout (stream to MySQL)
                    self.pageinsert+=";"
                    print self.pageinsert.encode('utf-8')
                elif self.options.monitor:
                    while 1:
                        try:
                            dbaccess.raw_query_SQL(self.acceso[1], self.pageinsert.encode('utf-8'))
                        except (Exception), e:
                            print e
                        else:
                            break
                self.pageinsert="INSERT INTO page VALUES"+newpageinsert
                self.pageinsertrows=1
                self.pageinsertsize=len(self.pageinsert)*2
            
            ##Clear temp variables for the next page
            self.page_dict.clear()
            self.prior_rev_id='NULL'
            self.last_page_len=0
            self.rev_count=0
            self.isRedirect='0'
            self.isStub='0'
            self.stack.pop()
            self.page_num += 1
                
        else:
            ##General tag processing
            if len(self.stack)>0 and (self.stack[-1]=='revision' or self.stack[-1]=='contributor'):
                self.rev_dict[self.current_elem]=self.current_text
            elif len(self.stack)>0 and self.stack[-1]=='page':
                self.page_dict[self.current_elem]=self.current_text
                
        self.current_elem=None
        return
             
    def characters(self, ch):
        if self.current_elem != None:
            self.current_text = self.current_text + ch
            
    def endDocument(self):
        ################################################
        #We must write the last pageinsert before finishing this dump
        if self.options.fileout:
        # Write output to SQL file
            self.pageinsert+=";\n"
            self.pagefile = codecs.open(self.options.pagefile,'a','utf_8')
            self.pagefile.write(self.pageinsert)
            self.pagefile.close()
        elif self.options.streamout:
            # DON'T WRITE SQL TO FILES, GENERATE ENCONDED SQL STREAM FOR MYSQL
            self.pageinsert+=";"
            print self.pageinsert.encode('utf-8')
        elif self.options.monitor:
            while 1:
                try:
                    dbaccess.raw_query_SQL(self.acceso[1], self.pageinsert.encode('utf-8'))
                except (Exception), e:
                    print e
                else:
                    break
        #Reset status vars
        self.pageinsertrows=0
        self.pageinsertsize=0
        ########IF WE USE MONITOR MODE, CLOSE DB CONNECTION
        if self.options.monitor and not self.options.fileout and not self.options.streamout:
            dbaccess.close_Connection(self.acceso[1])
        ################################################
        #Checking out total time consumed and display end message
        self.timeCheck=datetime.datetime.now()
        self.timeDelta=self.timeCheck-self.start
        print >> sys.stderr, "\n"
        print >> sys.stderr, "File successfully parsed..."
        print >> sys.stderr, "page %d (%f pags./sec.), revision %d (%f revs./sec.)" % (self.page_num,\
        float(self.page_num)/self.timeDelta.seconds, self.rev_num, float(self.rev_num)/self.timeDelta.seconds)

##Main zone
if __name__ == '__main__':
    usage = "usage: %prog [options]"
    parserc = OptionParser(usage)
    parserc.add_option("-t","--stubth", dest="stubth", type="int", metavar="STUBTH", default=256,
    help="Max. size in bytes to consider an article as stub [default: %default]")
    parserc.add_option("--pagefile", dest="pagefile", default="page.sql", metavar="FILE",
    help="Name of the SQL file created for the page table [default: %default]")
    parserc.add_option("--revfile", dest="revfile", default="revision.sql", metavar="FILE",
    help="Name of the SQL file created for the revision table [default: %default]")
    parserc.add_option("--textfile", dest="textfile", default="text.sql", metavar="FILE",
    help="Name of the SQL file created for the text table [default: %default]")
    parserc.add_option("--skipnamespaces", dest="skipns", metavar="NAMESPACES",
    help="List of namespaces whose content will be ignored [comma separated values, without "
    "blanks; e.g. --skipnamespaces=name1,name2,name3]")
    parserc.add_option("-i","--inject", dest="inject", metavar="STRING",
    help="Optional string to inject at the very start of articles' text; string "
    "must be provided within quotes (e.g. --inject='my string') or double quotes")
    parserc.add_option("-f","--fileout", dest="fileout", action="store_true", default=False,
    help="Create SQL files from parsed XML dump")
    parserc.add_option("-s","--streamout", dest="streamout", action="store_true", default=False,
    help="Generate an output SQL stream suitable for a direct import into MySQL database")
    parserc.add_option("-m", "--monitor", dest="monitor", action="store_true", default=True,
    help="Insert SQL code directly into MySQL database [default]")
    parserc.add_option("-u", "--user", dest="user", metavar="MySQL_USER",
    help="Username to connect to MySQL database")
    parserc.add_option("-p", "--passwd", dest="passwd", metavar="MySQL_PASSWORD",
    help="Password for MySQL user to access the database")
    parserc.add_option("-d", "--database", dest="database", metavar="DBNAME",
    help="Name of the MySQL database")
    parserc.add_option("--port", dest="port", metavar="MySQL_SERVER_PORT", default=3306, type="int",
    help="Listening port of MySQL server")
    parserc.add_option("--machine", dest="machine", metavar="SERVER_NAME", default="localhost",
    help="Name of MySQL server")
    parserc.add_option("-v", "--verbose", action="store_true", dest="verbose", default=True,
    help="Display standard status reports about the parsing process [default]")
    parserc.add_option("-q", "--quiet", action="store_false", dest="verbose",
    help="Do not display any status reports")
    parserc.add_option("-l","--log", dest="log", metavar="LOGFILE",
    help="Store status reports in a log file; do not display them")
    parserc.add_option("--insertmaxsize", dest="imaxsize", metavar="MAXSIZE", type="int",
    default=156, help="Max size in KB of the MySQL extended inserts [default: %default] "
    "[max: 256]")
    parserc.add_option("--insertmaxnum", dest="imaxrows", metavar="MAXROWS", type="int",
    default=50000, help="Max number of individual rows allowed in the MySQL extended "
    "inserts [default: %default][max: 250000]")
    
    (options, args) = parserc.parse_args()
    if not options.verbose and options.log!=None:
        parserc.error("Error! Illegal combination: options -q and --log options are mutually exclusive")
    if options.monitor and not options.fileout and not options.streamout and (options.user==None or options.passwd==None or options.database==None):
        parserc.error("Error! You must provide user, password and database name to execute monitor mode")
    if options.imaxsize>256 or options.imaxsize<=0:
        parserc.error("Error! Illegal value: optional param --insertmaxsize must be between 1 and 256")
    if options.imaxrows>250000 or options.imaxrows<=0:
        parserc.error("Error! Illegal value: optinal param --insertmaxnum must be between 1 250000")
    # Adapt stdout to Unicode UTF-8
    sys.stdout=codecs.EncodedFile(sys.stdout,'utf-8')
    # Create a parser
    parser = make_parser()

    # Tell the parser we are not interested in XML namespaces
    parser.setFeature(feature_namespaces, 0)

    # Create the downstream_handler using our class
    wh = wikiHandler(options)
    
    #Create de filter based in our parser and content handler
    filter_handler = text_normalize_filter(parser, wh)
    #Parse the XML dump
    filter_handler.parse(sys.stdin)