зеркало из https://github.com/mozilla/pjs.git
Bug 126252: add the gnatsparse Python script to the contrib directory; author: Daniel Berlin <dan@dberlin.org>; a=justdave.
This commit is contained in:
Родитель
c3fd60f7e1
Коммит
9161ba942d
|
@ -12,6 +12,9 @@ This directory includes:
|
|||
mysqld-watcher.pl -- This script can be installed as a frequent cron
|
||||
job to clean up stalled/dead queries.
|
||||
|
||||
gnatsparse/ -- A Python script used to import a GNATS database
|
||||
into Bugzilla.
|
||||
|
||||
gnats2bz.pl -- A perl script to help import bugs from a GNATS
|
||||
database into a Bugzilla database. Contributed by
|
||||
Tom Schutter <tom@platte.com>
|
||||
|
|
|
@ -0,0 +1,44 @@
|
|||
gnatsparse
|
||||
==========
|
||||
|
||||
Author: Daniel Berlin <dan@dberlin.org>
|
||||
|
||||
gnatsparse is a simple Python program that imports a GNATS database
|
||||
into a Bugzilla system. It is based on the gnats2bz.pl Perl script
|
||||
but it's a rewrite at the same time. Its parser is based on gnatsweb,
|
||||
which gives a 10 times speed improvement compared to the previous code.
|
||||
|
||||
Features
|
||||
--------
|
||||
|
||||
* Chunks audit trail into separate comments, with the right From's, times, etc.
|
||||
|
||||
* Handles followup emails that are in the report, with the right From's, times,
|
||||
etc.
|
||||
|
||||
* Properly handles duplicates, adding the standard bugzilla duplicate message.
|
||||
|
||||
* Extracts and handles gnatsweb attachments, as well as uuencoded attachments
|
||||
appearing in either followup emails, the how-to-repeat field, etc. Replaces
|
||||
them with a message to look at the attachments list, and adds the standard
|
||||
"Created an attachment" message that bugzilla uses. Handling them includes
|
||||
giving them the right name and mime-type. "attachments" means multiple
|
||||
uuencoded things/gnatsweb attachments are handled properly.
|
||||
|
||||
* Handles reopened bug reports.
|
||||
|
||||
* Builds the cc list from the people who have commented on the report,
|
||||
and the reporter.
|
||||
|
||||
Requirements
|
||||
------------
|
||||
|
||||
It requires python 2.2+, it won't work with 1.5.2 (Linux distributions
|
||||
ship with 2.2+ these days, so that shouldn't be an issue).
|
||||
|
||||
Documentation
|
||||
-------------
|
||||
|
||||
Documentation can be found inside the scripts. The source code is self
|
||||
documenting.
|
||||
|
|
@ -0,0 +1,804 @@
|
|||
try:
|
||||
# Using Psyco makes it about 25% faster, but there's a bug in psyco in
|
||||
# handling of eval causing it to use unlimited memory with the magic
|
||||
# file enabled.
|
||||
# import psyco
|
||||
# psyco.full()
|
||||
# from psyco.classes import *
|
||||
pass
|
||||
except:
|
||||
pass
|
||||
import re
|
||||
import base64
|
||||
import cStringIO
|
||||
import specialuu
|
||||
import array
|
||||
import email.Utils
|
||||
import zlib
|
||||
import magic
|
||||
|
||||
# Comment out if you don't want magic detection
|
||||
magicf = magic.MagicFile()
|
||||
|
||||
# Open our output file
|
||||
outfile = open("gnats2bz_data.sql", "w")
|
||||
|
||||
# List of GNATS fields
|
||||
fieldnames = ("Number", "Category", "Synopsis", "Confidential", "Severity",
|
||||
"Priority", "Responsible", "State", "Quarter", "Keywords",
|
||||
"Date-Required", "Class", "Submitter-Id", "Arrival-Date",
|
||||
"Closed-Date", "Last-Modified", "Originator", "Release",
|
||||
"Organization", "Environment", "Description", "How-To-Repeat",
|
||||
"Fix", "Release-Note", "Audit-Trail", "Unformatted")
|
||||
|
||||
# Dictionary telling us which GNATS fields are multiline
|
||||
multilinefields = {"Organization":1, "Environment":1, "Description":1,
|
||||
"How-To-Repeat":1, "Fix":1, "Release-Note":1,
|
||||
"Audit-Trail":1, "Unformatted":1}
|
||||
|
||||
# Mapping of GCC release to version. Our version string is updated every
|
||||
# so we need to funnel all release's with 3.4 in the string to be version
|
||||
# 3.4 for bug tracking purposes
|
||||
# The key is a regex to match, the value is the version it corresponds
|
||||
# with
|
||||
releasetovermap = {r"3\.4":"3.4", r"3\.3":"3.3", r"3\.2\.2":"3.2.2",
|
||||
r"3\.2\.1":"3.2.1", r"3\.2":"3.2", r"3\.1\.2":"3.1.2",
|
||||
r"3\.1\.1":"3.1.1", r"3\.1":"3.1", r"3\.0\.4":"3.0.4",
|
||||
r"3\.0\.3":"3.0.3", r"3\.0\.2":"3.0.2", r"3\.0\.1":"3.0.1",
|
||||
r"3\.0":"3.0", r"2\.95\.4":"2.95.4", r"2\.95\.3":"2.95.3",
|
||||
r"2\.95\.2":"2.95.2", r"2\.95\.1":"2.95.1",
|
||||
r"2\.95":"2.95", r"2\.97":"2.97",
|
||||
r"2\.96.*[rR][eE][dD].*[hH][aA][tT]":"2.96 (redhat)",
|
||||
r"2\.96":"2.96"}
|
||||
|
||||
# These map the field name to the field id bugzilla assigns. We need
|
||||
# the id when doing bug activity.
|
||||
fieldids = {"State":8, "Responsible":15}
|
||||
|
||||
# These are the keywords we use in gcc bug tracking. They are transformed
|
||||
# into bugzilla keywords. The format here is <keyword>-><bugzilla keyword id>
|
||||
keywordids = {"wrong-code":1, "ice-on-legal-code":2, "ice-on-illegal-code":3,
|
||||
"rejects-legal":4, "accepts-illegal":5, "pessimizes-code":6}
|
||||
|
||||
# Map from GNATS states to Bugzilla states. Duplicates and reopened bugs
|
||||
# are handled when parsing the audit trail, so no need for them here.
|
||||
state_lookup = {"":"NEW", "open":"ASSIGNED", "analyzed":"ASSIGNED",
|
||||
"feedback":"WAITING", "closed":"CLOSED",
|
||||
"suspended":"SUSPENDED"}
|
||||
|
||||
# Table of versions that exist in the bugs, built up as we go along
|
||||
versions_table = {}
|
||||
|
||||
# Delimiter gnatsweb uses for attachments
|
||||
attachment_delimiter = "----gnatsweb-attachment----\n"
|
||||
|
||||
# Here starts the various regular expressions we use
|
||||
# Matches an entire GNATS single line field
|
||||
gnatfieldre = re.compile(r"""^([>\w\-]+)\s*:\s*(.*)\s*$""")
|
||||
|
||||
# Matches the name of a GNATS field
|
||||
fieldnamere = re.compile(r"""^>(.*)$""")
|
||||
|
||||
# Matches the useless part of an envelope
|
||||
uselessre = re.compile(r"""^(\S*?):\s*""", re.MULTILINE)
|
||||
|
||||
# Matches the filename in a content disposition
|
||||
dispositionre = re.compile("(\\S+);\\s*filename=\"([^\"]+)\"")
|
||||
|
||||
# Matches the last changed date in the entire text of a bug
|
||||
# If you have other editable fields that get audit trail entries, modify this
|
||||
# The field names are explicitly listed in order to speed up matching
|
||||
lastdatere = re.compile(r"""^(?:(?:State|Responsible|Priority|Severity)-Changed-When: )(.+?)$""", re.MULTILINE)
|
||||
|
||||
# Matches the From line of an email or the first line of an audit trail entry
|
||||
# We use this re to find the begin lines of all the audit trail entries
|
||||
# The field names are explicitly listed in order to speed up matching
|
||||
fromtore=re.compile(r"""^(?:(?:State|Responsible|Priority|Severity)-Changed-From-To: |From: )""", re.MULTILINE)
|
||||
|
||||
# These re's match the various parts of an audit trail entry
|
||||
changedfromtore=re.compile(r"""^(\w+?)-Changed-From-To: (.+?)$""", re.MULTILINE)
|
||||
changedbyre=re.compile(r"""^\w+?-Changed-By: (.+?)$""", re.MULTILINE)
|
||||
changedwhenre=re.compile(r"""^\w+?-Changed-When: (.+?)$""", re.MULTILINE)
|
||||
changedwhyre=re.compile(r"""^\w+?-Changed-Why:\s*(.*?)$""", re.MULTILINE)
|
||||
|
||||
# This re matches audit trail text saying that the current bug is a duplicate of another
|
||||
duplicatere=re.compile(r"""(?:")?Dup(?:licate)?(?:d)?(?:")? of .*?(\d+)""", re.IGNORECASE | re.MULTILINE)
|
||||
|
||||
# Get the text of a From: line
|
||||
fromre=re.compile(r"""^From: (.*?)$""", re.MULTILINE)
|
||||
|
||||
# Get the text of a Date: Line
|
||||
datere=re.compile(r"""^Date: (.*?)$""", re.MULTILINE)
|
||||
|
||||
# Map of the responsible file to email addresses
|
||||
responsible_map = {}
|
||||
# List of records in the responsible file
|
||||
responsible_list = []
|
||||
# List of records in the categories file
|
||||
categories_list = []
|
||||
# List of pr's in the index
|
||||
pr_list = []
|
||||
# Map usernames to user ids
|
||||
usermapping = {}
|
||||
# Start with this user id
|
||||
userid_base = 2
|
||||
|
||||
# Name of gnats user
|
||||
gnats_username = "gnats@gcc.gnu.org"
|
||||
# Name of unassigned user
|
||||
unassigned_username = "unassigned@gcc.gnu.org"
|
||||
|
||||
gnats_db_dir = "."
|
||||
product = "gcc"
|
||||
productdesc = "GNU Compiler Connection"
|
||||
milestoneurl = "http://gcc/gnu.org"
|
||||
defaultmilestone = "3.4"
|
||||
|
||||
def write_non_bug_tables():
|
||||
""" Write out the non-bug related tables, such as products, profiles, etc."""
|
||||
# Set all non-unconfirmed bugs's everconfirmed flag
|
||||
print >>outfile, "update bugs set everconfirmed=1 where bug_status != 'UNCONFIRMED';"
|
||||
|
||||
# Set all bugs assigned to the unassigned user to NEW
|
||||
print >>outfile, "update bugs set bug_status='NEW',assigned_to='NULL' where bug_status='ASSIGNED' AND assigned_to=3;"
|
||||
|
||||
# Insert the products
|
||||
print >>outfile, "\ninsert into products ("
|
||||
print >>outfile, " product, description, milestoneurl, disallownew,"
|
||||
print >>outfile, " defaultmilestone, votestoconfirm) values ("
|
||||
print >>outfile, " '%s', '%s', '%s', 0, '%s', 1);" % (product,
|
||||
productdesc,
|
||||
milestoneurl,
|
||||
defaultmilestone)
|
||||
|
||||
# Insert the components
|
||||
for category in categories_list:
|
||||
component = SqlQuote(category[0])
|
||||
productstr = SqlQuote(product)
|
||||
description = SqlQuote(category[1])
|
||||
initialowner = SqlQuote("3")
|
||||
print >>outfile, "\ninsert into components (";
|
||||
print >>outfile, " value, program, initialowner, initialqacontact,"
|
||||
print >>outfile, " description) values ("
|
||||
print >>outfile, " %s, %s, %s, '', %s);" % (component, productstr,
|
||||
initialowner, description)
|
||||
|
||||
# Insert the versions
|
||||
for productstr, version_list in versions_table.items():
|
||||
productstr = SqlQuote(productstr)
|
||||
for version in version_list:
|
||||
version = SqlQuote(version)
|
||||
print >>outfile, "\ninsert into versions (value, program) "
|
||||
print >>outfile, " values (%s, %s);" % (version, productstr)
|
||||
|
||||
# Insert the users
|
||||
for username, userid in usermapping.items():
|
||||
realname = map_username_to_realname(username)
|
||||
username = SqlQuote(username)
|
||||
realname = SqlQuote(realname)
|
||||
print >>outfile, "\ninsert into profiles ("
|
||||
print >>outfile, " userid, login_name, password, cryptpassword, realname, groupset"
|
||||
print >>outfile, ") values ("
|
||||
print >>outfile, "%s,%s,'password',encrypt('password'), %s, 0);" % (userid, username, realname)
|
||||
print >>outfile, "update profiles set groupset=1 << 32 where login_name like '%\@gcc.gnu.org';"
|
||||
|
||||
def unixdate2datetime(unixdate):
|
||||
""" Convert a unix date to a datetime value """
|
||||
year, month, day, hour, min, sec, x, x, x, x = email.Utils.parsedate_tz(unixdate)
|
||||
return "%d-%02d-%02d %02d:%02d:%02d" % (year,month,day,hour,min,sec)
|
||||
|
||||
def unixdate2timestamp(unixdate):
|
||||
""" Convert a unix date to a timestamp value """
|
||||
year, month, day, hour, min, sec, x, x, x, x = email.Utils.parsedate_tz(unixdate)
|
||||
return "%d%02d%02d%02d%02d%02d" % (year,month,day,hour,min,sec)
|
||||
|
||||
def SqlQuote(str):
|
||||
""" Perform SQL quoting on a string """
|
||||
return "'%s'" % str.replace("'", """''""").replace("\\", "\\\\").replace("\0","\\0")
|
||||
|
||||
def convert_gccver_to_ver(gccver):
|
||||
""" Given a gcc version, convert it to a Bugzilla version. """
|
||||
for k in releasetovermap.keys():
|
||||
if re.search(".*%s.*" % k, gccver) is not None:
|
||||
return releasetovermap[k]
|
||||
result = re.search(r""".*(\d\.\d) \d+ \(experimental\).*""", gccver)
|
||||
if result is not None:
|
||||
return result.group(1)
|
||||
return "unknown"
|
||||
|
||||
def load_index(fname):
|
||||
""" Load in the GNATS index file """
|
||||
global pr_list
|
||||
ifp = open(fname)
|
||||
for record in ifp.xreadlines():
|
||||
fields = record.split("|")
|
||||
pr_list.append(fields[0])
|
||||
ifp.close()
|
||||
|
||||
def load_categories(fname):
|
||||
""" Load in the GNATS categories file """
|
||||
global categories_list
|
||||
cfp = open(fname)
|
||||
for record in cfp.xreadlines():
|
||||
if re.search("^#", record) is not None:
|
||||
continue
|
||||
categories_list.append(record.split(":"))
|
||||
cfp.close()
|
||||
|
||||
def map_username_to_realname(username):
|
||||
""" Given a username, find the real name """
|
||||
name = username
|
||||
name = re.sub("@.*", "", name)
|
||||
for responsible_record in responsible_list:
|
||||
if responsible_record[0] == name:
|
||||
return responsible_record[1]
|
||||
if len(responsible_record) > 2:
|
||||
if responsible_record[2] == username:
|
||||
return responsible_record[1]
|
||||
return ""
|
||||
|
||||
|
||||
def get_userid(responsible):
|
||||
""" Given an email address, get the user id """
|
||||
global responsible_map
|
||||
global usermapping
|
||||
global userid_base
|
||||
if responsible is None:
|
||||
return -1
|
||||
responsible = responsible.lower()
|
||||
responsible = re.sub("sources.redhat.com", "gcc.gnu.org", responsible)
|
||||
if responsible_map.has_key(responsible):
|
||||
responsible = responsible_map[responsible]
|
||||
if usermapping.has_key(responsible):
|
||||
return usermapping[responsible]
|
||||
else:
|
||||
usermapping[responsible] = userid_base
|
||||
userid_base += 1
|
||||
return usermapping[responsible]
|
||||
|
||||
def load_responsible(fname):
|
||||
""" Load in the GNATS responsible file """
|
||||
global responsible_map
|
||||
global responsible_list
|
||||
rfp = open(fname)
|
||||
for record in rfp.xreadlines():
|
||||
if re.search("^#", record) is not None:
|
||||
continue
|
||||
split_record = record.split(":")
|
||||
responsible_map[split_record[0]] = split_record[2].rstrip()
|
||||
responsible_list.append(record.split(":"))
|
||||
rfp.close()
|
||||
|
||||
def split_csl(list):
|
||||
""" Split a comma seperated list """
|
||||
newlist = re.split(r"""\s*,\s*""", list)
|
||||
return newlist
|
||||
|
||||
def fix_email_addrs(addrs):
|
||||
""" Perform various fixups and cleaning on an e-mail address """
|
||||
addrs = split_csl(addrs)
|
||||
trimmed_addrs = []
|
||||
for addr in addrs:
|
||||
addr = re.sub(r"""\(.*\)""","",addr)
|
||||
addr = re.sub(r""".*<(.*)>.*""","\\1",addr)
|
||||
addr = addr.rstrip()
|
||||
addr = addr.lstrip()
|
||||
trimmed_addrs.append(addr)
|
||||
addrs = ", ".join(trimmed_addrs)
|
||||
return addrs
|
||||
|
||||
class Bugzillabug(object):
|
||||
""" Class representing a bugzilla bug """
|
||||
def __init__(self, gbug):
|
||||
""" Initialize a bugzilla bug from a GNATS bug. """
|
||||
self.bug_id = gbug.bug_id
|
||||
self.long_descs = []
|
||||
self.bug_ccs = [get_userid("gcc-bugs@gcc.gnu.org")]
|
||||
self.bug_activity = []
|
||||
self.attachments = gbug.attachments
|
||||
self.gnatsfields = gbug.fields
|
||||
self.need_unformatted = gbug.has_unformatted_attach == 0
|
||||
self.need_unformatted &= gbug.fields.has_key("Unformatted")
|
||||
self.translate_pr()
|
||||
self.update_versions()
|
||||
if self.fields.has_key("Audit-Trail"):
|
||||
self.parse_audit_trail()
|
||||
self.write_bug()
|
||||
|
||||
def parse_fromto(type, string):
|
||||
""" Parses the from and to parts of a changed-from-to line """
|
||||
fromstr = ""
|
||||
tostr = ""
|
||||
|
||||
# Some slightly messed up changed lines have unassigned-new,
|
||||
# instead of unassigned->new. So we make the > optional.
|
||||
result = re.search(r"""(.*)-(?:>?)(.*)""", string)
|
||||
|
||||
# Only know how to handle parsing of State and Responsible
|
||||
# changed-from-to right now
|
||||
if type == "State":
|
||||
fromstr = state_lookup[result.group(1)]
|
||||
tostr = state_lookup[result.group(2)]
|
||||
elif type == "Responsible":
|
||||
if result.group(1) != "":
|
||||
fromstr = result.group(1)
|
||||
if result.group(2) != "":
|
||||
tostr = result.group(2)
|
||||
if responsible_map.has_key(fromstr):
|
||||
fromstr = responsible_map[fromstr]
|
||||
if responsible_map.has_key(tostr):
|
||||
tostr = responsible_map[tostr]
|
||||
return (fromstr, tostr)
|
||||
parse_fromto = staticmethod(parse_fromto)
|
||||
|
||||
def parse_audit_trail(self):
|
||||
""" Parse a GNATS audit trail """
|
||||
trail = self.fields["Audit-Trail"]
|
||||
# Begin to split the audit trail into pieces
|
||||
result = fromtore.finditer(trail)
|
||||
starts = []
|
||||
ends = []
|
||||
pieces = []
|
||||
# Make a list of the pieces
|
||||
for x in result:
|
||||
pieces.append (x)
|
||||
# Find the start and end of each piece
|
||||
if len(pieces) > 0:
|
||||
for x in xrange(len(pieces)-1):
|
||||
starts.append(pieces[x].start())
|
||||
ends.append(pieces[x+1].start())
|
||||
starts.append(pieces[-1].start())
|
||||
ends.append(len(trail))
|
||||
pieces = []
|
||||
# Now make the list of actual text of the pieces
|
||||
for x in xrange(len(starts)):
|
||||
pieces.append(trail[starts[x]:ends[x]])
|
||||
# And parse the actual pieces
|
||||
for piece in pieces:
|
||||
result = changedfromtore.search(piece)
|
||||
# See what things we actually have inside this entry, and
|
||||
# handle them approriately
|
||||
if result is not None:
|
||||
type = result.group(1)
|
||||
changedfromto = result.group(2)
|
||||
# If the bug was reopened, mark it as such
|
||||
if changedfromto.find("closed->analyzed") != -1:
|
||||
if self.fields["bug_status"] == "'NEW'":
|
||||
self.fields["bug_status"] = "'REOPENED'"
|
||||
if type == "State" or type == "Responsible":
|
||||
oldstate, newstate = self.parse_fromto (type, changedfromto)
|
||||
result = changedbyre.search(piece)
|
||||
if result is not None:
|
||||
changedby = result.group(1)
|
||||
result = changedwhenre.search(piece)
|
||||
if result is not None:
|
||||
changedwhen = result.group(1)
|
||||
changedwhen = unixdate2datetime(changedwhen)
|
||||
changedwhen = SqlQuote(changedwhen)
|
||||
result = changedwhyre.search(piece)
|
||||
changedwhy = piece[result.start(1):]
|
||||
#changedwhy = changedwhy.lstrip()
|
||||
changedwhy = changedwhy.rstrip()
|
||||
changedby = get_userid(changedby)
|
||||
# Put us on the cc list if we aren't there already
|
||||
if changedby != self.fields["userid"] \
|
||||
and changedby not in self.bug_ccs:
|
||||
self.bug_ccs.append(changedby)
|
||||
# If it's a duplicate, mark it as such
|
||||
result = duplicatere.search(changedwhy)
|
||||
if result is not None:
|
||||
newtext = "*** This bug has been marked as a duplicate of %s ***" % result.group(1)
|
||||
newtext = SqlQuote(newtext)
|
||||
self.long_descs.append((self.bug_id, changedby,
|
||||
changedwhen, newtext))
|
||||
self.fields["bug_status"] = "'RESOLVED'"
|
||||
self.fields["resolution"] = "'DUPLICATE'"
|
||||
self.fields["userid"] = changedby
|
||||
else:
|
||||
newtext = "%s-Changed-From-To: %s\n%s-Changed-Why: %s\n" % (type, changedfromto, type, changedwhy)
|
||||
newtext = SqlQuote(newtext)
|
||||
self.long_descs.append((self.bug_id, changedby,
|
||||
changedwhen, newtext))
|
||||
if type == "State" or type == "Responsible":
|
||||
newstate = SqlQuote("%s" % newstate)
|
||||
oldstate = SqlQuote("%s" % oldstate)
|
||||
fieldid = fieldids[type]
|
||||
self.bug_activity.append((newstate, oldstate, fieldid, changedby, changedwhen))
|
||||
|
||||
else:
|
||||
# It's an email
|
||||
result = fromre.search(piece)
|
||||
if result is None:
|
||||
continue
|
||||
fromstr = result.group(1)
|
||||
fromstr = fix_email_addrs(fromstr)
|
||||
fromstr = get_userid(fromstr)
|
||||
result = datere.search(piece)
|
||||
if result is None:
|
||||
continue
|
||||
datestr = result.group(1)
|
||||
datestr = SqlQuote(unixdate2timestamp(datestr))
|
||||
if fromstr != self.fields["userid"] \
|
||||
and fromstr not in self.bug_ccs:
|
||||
self.bug_ccs.append(fromstr)
|
||||
self.long_descs.append((self.bug_id, fromstr, datestr,
|
||||
SqlQuote(piece)))
|
||||
|
||||
|
||||
|
||||
def write_bug(self):
|
||||
""" Output a bug to the data file """
|
||||
fields = self.fields
|
||||
print >>outfile, "\ninsert into bugs("
|
||||
print >>outfile, " bug_id, assigned_to, bug_severity, priority, bug_status, creation_ts, delta_ts,"
|
||||
print >>outfile, " short_desc,"
|
||||
print >>outfile, " reporter, version,"
|
||||
print >>outfile, " product, component, resolution, target_milestone, qa_contact,"
|
||||
print >>outfile, " gccbuild, gcctarget, gcchost, keywords"
|
||||
print >>outfile, " ) values ("
|
||||
print >>outfile, "%s, %s, %s, %s, %s, %s, %s," % (self.bug_id, fields["userid"], fields["bug_severity"], fields["priority"], fields["bug_status"], fields["creation_ts"], fields["delta_ts"])
|
||||
print >>outfile, "%s," % (fields["short_desc"])
|
||||
print >>outfile, "%s, %s," % (fields["reporter"], fields["version"])
|
||||
print >>outfile, "%s, %s, %s, %s, 0," %(fields["product"], fields["component"], fields["resolution"], fields["target_milestone"])
|
||||
print >>outfile, "%s, %s, %s, %s" % (fields["gccbuild"], fields["gcctarget"], fields["gcchost"], fields["keywords"])
|
||||
print >>outfile, ");"
|
||||
if self.fields["keywords"] != 0:
|
||||
print >>outfile, "\ninsert into keywords (bug_id, keywordid) values ("
|
||||
print >>outfile, " %s, %s);" % (self.bug_id, fields["keywordid"])
|
||||
for id, who, when, text in self.long_descs:
|
||||
print >>outfile, "\ninsert into longdescs ("
|
||||
print >>outfile, " bug_id, who, bug_when, thetext) values("
|
||||
print >>outfile, " %s, %s, %s, %s);" % (id, who, when, text)
|
||||
for name, data, who in self.attachments:
|
||||
print >>outfile, "\ninsert into attachments ("
|
||||
print >>outfile, " bug_id, filename, description, mimetype, ispatch, submitter_id, thedata) values ("
|
||||
ftype = None
|
||||
# It's *magic*!
|
||||
if name.endswith(".ii") == 1:
|
||||
ftype = "text/x-c++"
|
||||
elif name.endswith(".i") == 1:
|
||||
ftype = "text/x-c"
|
||||
else:
|
||||
ftype = magicf.detect(cStringIO.StringIO(data))
|
||||
if ftype is None:
|
||||
ftype = "application/octet-stream"
|
||||
|
||||
print >>outfile, "%s,%s,%s, %s,0, %s,%s);" %(self.bug_id, SqlQuote(name), SqlQuote(name), SqlQuote (ftype), who, SqlQuote(zlib.compress(data)))
|
||||
for newstate, oldstate, fieldid, changedby, changedwhen in self.bug_activity:
|
||||
print >>outfile, "\ninsert into bugs_activity ("
|
||||
print >>outfile, " bug_id, who, bug_when, fieldid, added, removed) values ("
|
||||
print >>outfile, " %s, %s, %s, %s, %s, %s);" % (self.bug_id,
|
||||
changedby,
|
||||
changedwhen,
|
||||
fieldid,
|
||||
newstate,
|
||||
oldstate)
|
||||
for cc in self.bug_ccs:
|
||||
print >>outfile, "\ninsert into cc(bug_id, who) values (%s, %s);" %(self.bug_id, cc)
|
||||
def update_versions(self):
|
||||
""" Update the versions table to account for the version on this bug """
|
||||
global versions_table
|
||||
if self.fields.has_key("Release") == 0 \
|
||||
or self.fields.has_key("Category") == 0:
|
||||
return
|
||||
curr_product = "gcc"
|
||||
curr_version = self.fields["Release"]
|
||||
if curr_version == "":
|
||||
return
|
||||
curr_version = convert_gccver_to_ver (curr_version)
|
||||
if versions_table.has_key(curr_product) == 0:
|
||||
versions_table[curr_product] = []
|
||||
for version in versions_table[curr_product]:
|
||||
if version == curr_version:
|
||||
return
|
||||
versions_table[curr_product].append(curr_version)
|
||||
def translate_pr(self):
|
||||
""" Transform a GNATS PR into a Bugzilla bug """
|
||||
self.fields = self.gnatsfields
|
||||
if (self.fields.has_key("Organization") == 0) \
|
||||
or self.fields["Organization"].find("GCC"):
|
||||
self.fields["Originator"] = ""
|
||||
self.fields["Organization"] = ""
|
||||
self.fields["Organization"].lstrip()
|
||||
if (self.fields.has_key("Release") == 0) \
|
||||
or self.fields["Release"] == "" \
|
||||
or self.fields["Release"].find("unknown-1.0") != -1:
|
||||
self.fields["Release"]="unknown"
|
||||
if self.fields.has_key("Responsible"):
|
||||
result = re.search(r"""\w+""", self.fields["Responsible"])
|
||||
self.fields["Responsible"] = "%s%s" % (result.group(0), "@gcc.gnu.org")
|
||||
self.fields["gcchost"] = ""
|
||||
self.fields["gcctarget"] = ""
|
||||
self.fields["gccbuild"] = ""
|
||||
if self.fields.has_key("Environment"):
|
||||
result = re.search("^host: (.+?)$", self.fields["Environment"],
|
||||
re.MULTILINE)
|
||||
if result is not None:
|
||||
self.fields["gcchost"] = result.group(1)
|
||||
result = re.search("^target: (.+?)$", self.fields["Environment"],
|
||||
re.MULTILINE)
|
||||
if result is not None:
|
||||
self.fields["gcctarget"] = result.group(1)
|
||||
result = re.search("^build: (.+?)$", self.fields["Environment"],
|
||||
re.MULTILINE)
|
||||
if result is not None:
|
||||
self.fields["gccbuild"] = result.group(1)
|
||||
self.fields["userid"] = get_userid(self.fields["Responsible"])
|
||||
self.fields["bug_severity"] = "normal"
|
||||
if self.fields["Class"] == "change-request":
|
||||
self.fields["bug_severity"] = "enhancement"
|
||||
elif self.fields.has_key("Severity"):
|
||||
if self.fields["Severity"] == "critical":
|
||||
self.fields["bug_severity"] = "critical"
|
||||
elif self.fields["Severity"] == "serious":
|
||||
self.fields["bug_severity"] = "major"
|
||||
elif self.fields.has_key("Synopsis"):
|
||||
if re.search("crash|assert", self.fields["Synopsis"]):
|
||||
self.fields["bug_severity"] = "critical"
|
||||
elif re.search("wrong|error", self.fields["Synopsis"]):
|
||||
self.fields["bug_severity"] = "major"
|
||||
self.fields["bug_severity"] = SqlQuote(self.fields["bug_severity"])
|
||||
self.fields["keywords"] = 0
|
||||
if keywordids.has_key(self.fields["Class"]):
|
||||
self.fields["keywords"] = self.fields["Class"]
|
||||
self.fields["keywordid"] = keywordids[self.fields["Class"]]
|
||||
self.fields["keywords"] = SqlQuote(self.fields["keywords"])
|
||||
self.fields["priority"] = "P1"
|
||||
if self.fields.has_key("Severity") and self.fields.has_key("Priority"):
|
||||
severity = self.fields["Severity"]
|
||||
priority = self.fields["Priority"]
|
||||
if severity == "critical":
|
||||
if priority == "high":
|
||||
self.fields["priority"] = "P1"
|
||||
else:
|
||||
self.fields["priority"] = "P2"
|
||||
elif severity == "serious":
|
||||
if priority == "low":
|
||||
self.fields["priority"] = "P4"
|
||||
else:
|
||||
self.fields["priority"] = "P3"
|
||||
else:
|
||||
if priority == "high":
|
||||
self.fields["priority"] = "P4"
|
||||
else:
|
||||
self.fields["priority"] = "P5"
|
||||
self.fields["priority"] = SqlQuote(self.fields["priority"])
|
||||
state = self.fields["State"]
|
||||
if (state == "open" or state == "analyzed") and self.fields["userid"] != 3:
|
||||
self.fields["bug_status"] = "ASSIGNED"
|
||||
self.fields["resolution"] = ""
|
||||
elif state == "feedback":
|
||||
self.fields["bug_status"] = "WAITING"
|
||||
self.fields["resolution"] = ""
|
||||
elif state == "closed":
|
||||
self.fields["bug_status"] = "CLOSED"
|
||||
if self.fields.has_key("Class"):
|
||||
theclass = self.fields["Class"]
|
||||
if theclass.find("duplicate") != -1:
|
||||
self.fields["resolution"]="DUPLICATE"
|
||||
elif theclass.find("mistaken") != -1:
|
||||
self.fields["resolution"]="INVALID"
|
||||
else:
|
||||
self.fields["resolution"]="FIXED"
|
||||
else:
|
||||
self.fields["resolution"]="FIXED"
|
||||
elif state == "suspended":
|
||||
self.fields["bug_status"] = "SUSPENDED"
|
||||
self.fields["resolution"] = ""
|
||||
elif state == "analyzed" and self.fields["userid"] == 3:
|
||||
self.fields["bug_status"] = "NEW"
|
||||
self.fields["resolution"] = ""
|
||||
else:
|
||||
self.fields["bug_status"] = "UNCONFIRMED"
|
||||
self.fields["resolution"] = ""
|
||||
self.fields["bug_status"] = SqlQuote(self.fields["bug_status"])
|
||||
self.fields["resolution"] = SqlQuote(self.fields["resolution"])
|
||||
self.fields["creation_ts"] = ""
|
||||
if self.fields.has_key("Arrival-Date") and self.fields["Arrival-Date"] != "":
|
||||
self.fields["creation_ts"] = unixdate2datetime(self.fields["Arrival-Date"])
|
||||
self.fields["creation_ts"] = SqlQuote(self.fields["creation_ts"])
|
||||
self.fields["delta_ts"] = ""
|
||||
if self.fields.has_key("Audit-Trail"):
|
||||
result = lastdatere.findall(self.fields["Audit-Trail"])
|
||||
result.reverse()
|
||||
if len(result) > 0:
|
||||
self.fields["delta_ts"] = unixdate2timestamp(result[0])
|
||||
if self.fields["delta_ts"] == "":
|
||||
if self.fields.has_key("Arrival-Date") and self.fields["Arrival-Date"] != "":
|
||||
self.fields["delta_ts"] = unixdate2timestamp(self.fields["Arrival-Date"])
|
||||
self.fields["delta_ts"] = SqlQuote(self.fields["delta_ts"])
|
||||
self.fields["short_desc"] = SqlQuote(self.fields["Synopsis"])
|
||||
if self.fields.has_key("Reply-To") and self.fields["Reply-To"] != "":
|
||||
self.fields["reporter"] = get_userid(self.fields["Reply-To"])
|
||||
elif self.fields.has_key("Mail-Header"):
|
||||
result = re.search(r"""From .*?([\w.]+@[\w.]+)""", self.fields["Mail-Header"])
|
||||
if result:
|
||||
self.fields["reporter"] = get_userid(result.group(1))
|
||||
else:
|
||||
self.fields["reporter"] = get_userid(gnats_username)
|
||||
else:
|
||||
self.fields["reporter"] = get_userid(gnats_username)
|
||||
long_desc = self.fields["Description"]
|
||||
long_desc2 = ""
|
||||
for field in ["Release", "Environment", "How-To-Repeat"]:
|
||||
if self.fields.has_key(field) and self.fields[field] != "":
|
||||
long_desc += ("\n\n%s:\n" % field) + self.fields[field]
|
||||
if self.fields.has_key("Fix") and self.fields["Fix"] != "":
|
||||
long_desc2 = "Fix:\n" + self.fields["Fix"]
|
||||
if self.need_unformatted == 1 and self.fields["Unformatted"] != "":
|
||||
long_desc += "\n\nUnformatted:\n" + self.fields["Unformatted"]
|
||||
if long_desc != "":
|
||||
self.long_descs.append((self.bug_id, self.fields["reporter"],
|
||||
self.fields["creation_ts"],
|
||||
SqlQuote(long_desc)))
|
||||
if long_desc2 != "":
|
||||
self.long_descs.append((self.bug_id, self.fields["reporter"],
|
||||
self.fields["creation_ts"],
|
||||
SqlQuote(long_desc2)))
|
||||
for field in ["gcchost", "gccbuild", "gcctarget"]:
|
||||
self.fields[field] = SqlQuote(self.fields[field])
|
||||
self.fields["version"] = ""
|
||||
if self.fields["Release"] != "":
|
||||
self.fields["version"] = convert_gccver_to_ver (self.fields["Release"])
|
||||
self.fields["version"] = SqlQuote(self.fields["version"])
|
||||
self.fields["product"] = SqlQuote("gcc")
|
||||
self.fields["component"] = "invalid"
|
||||
if self.fields.has_key("Category"):
|
||||
self.fields["component"] = self.fields["Category"]
|
||||
self.fields["component"] = SqlQuote(self.fields["component"])
|
||||
self.fields["target_milestone"] = "---"
|
||||
if self.fields["version"].find("3.4") != -1:
|
||||
self.fields["target_milestone"] = "3.4"
|
||||
self.fields["target_milestone"] = SqlQuote(self.fields["target_milestone"])
|
||||
if self.fields["userid"] == 2:
|
||||
self.fields["userid"] = "\'NULL\'"
|
||||
|
||||
class GNATSbug(object):
|
||||
""" Represents a single GNATS PR """
|
||||
def __init__(self, filename):
|
||||
self.attachments = []
|
||||
self.has_unformatted_attach = 0
|
||||
fp = open (filename)
|
||||
self.fields = self.parse_pr(fp.xreadlines())
|
||||
self.bug_id = int(self.fields["Number"])
|
||||
if self.fields.has_key("Unformatted"):
|
||||
self.find_gnatsweb_attachments()
|
||||
if self.fields.has_key("How-To-Repeat"):
|
||||
self.find_regular_attachments("How-To-Repeat")
|
||||
if self.fields.has_key("Fix"):
|
||||
self.find_regular_attachments("Fix")
|
||||
|
||||
def get_attacher(fields):
|
||||
if fields.has_key("Reply-To") and fields["Reply-To"] != "":
|
||||
return get_userid(fields["Reply-To"])
|
||||
else:
|
||||
result = None
|
||||
if fields.has_key("Mail-Header"):
|
||||
result = re.search(r"""From .*?([\w.]+\@[\w.]+)""",
|
||||
fields["Mail-Header"])
|
||||
if result is not None:
|
||||
reporter = get_userid(result.group(1))
|
||||
else:
|
||||
reporter = get_userid(gnats_username)
|
||||
get_attacher = staticmethod(get_attacher)
|
||||
def find_regular_attachments(self, which):
|
||||
fields = self.fields
|
||||
while re.search("^begin [0-7]{3}", fields[which],
|
||||
re.DOTALL | re.MULTILINE):
|
||||
outfp = cStringIO.StringIO()
|
||||
infp = cStringIO.StringIO(fields[which])
|
||||
filename, start, end = specialuu.decode(infp, outfp, quiet=0)
|
||||
fields[which]=fields[which].replace(fields[which][start:end],
|
||||
"See attachments for %s\n" % filename)
|
||||
self.attachments.append((filename, outfp.getvalue(),
|
||||
self.get_attacher(fields)))
|
||||
|
||||
def decode_gnatsweb_attachment(self, attachment):
|
||||
result = re.split(r"""\n\n""", attachment, 1)
|
||||
if len(result) == 1:
|
||||
return -1
|
||||
envelope, body = result
|
||||
envelope = uselessre.split(envelope)
|
||||
envelope.pop(0)
|
||||
# Turn the list of key, value into a dict of key => value
|
||||
attachinfo = dict([(envelope[i], envelope[i+1]) for i in xrange(0,len(envelope),2)])
|
||||
for x in attachinfo.keys():
|
||||
attachinfo[x] = attachinfo[x].rstrip()
|
||||
if (attachinfo.has_key("Content-Type") == 0) or \
|
||||
(attachinfo.has_key("Content-Disposition") == 0):
|
||||
raise ValueError, "Unable to parse file attachment"
|
||||
result = dispositionre.search(attachinfo["Content-Disposition"])
|
||||
filename = result.group(2)
|
||||
filename = re.sub(".*/","", filename)
|
||||
filename = re.sub(".*\\\\","", filename)
|
||||
attachinfo["filename"]=filename
|
||||
result = re.search("""(\S+);.*""", attachinfo["Content-Type"])
|
||||
if result is not None:
|
||||
attachinfo["Content-Type"] = result.group(1)
|
||||
if attachinfo.has_key("Content-Transfer-Encoding"):
|
||||
if attachinfo["Content-Transfer-Encoding"] == "base64":
|
||||
attachinfo["data"] = base64.decodestring(body)
|
||||
else:
|
||||
attachinfo["data"]=body
|
||||
|
||||
return (attachinfo["filename"], attachinfo["data"],
|
||||
self.get_attacher(self.fields))
|
||||
|
||||
def find_gnatsweb_attachments(self):
|
||||
fields = self.fields
|
||||
attachments = re.split(attachment_delimiter, fields["Unformatted"])
|
||||
fields["Unformatted"] = attachments.pop(0)
|
||||
for attachment in attachments:
|
||||
result = self.decode_gnatsweb_attachment (attachment)
|
||||
if result != -1:
|
||||
self.attachments.append(result)
|
||||
self.has_unformatted_attach = 1
|
||||
def parse_pr(lines):
|
||||
#fields = {"envelope":[]}
|
||||
fields = {"envelope":array.array("c")}
|
||||
hdrmulti = "envelope"
|
||||
for line in lines:
|
||||
line = line.rstrip('\n')
|
||||
line += '\n'
|
||||
result = gnatfieldre.search(line)
|
||||
if result is None:
|
||||
if hdrmulti != "":
|
||||
if fields.has_key(hdrmulti):
|
||||
#fields[hdrmulti].append(line)
|
||||
fields[hdrmulti].fromstring(line)
|
||||
else:
|
||||
#fields[hdrmulti] = [line]
|
||||
fields[hdrmulti] = array.array("c", line)
|
||||
continue
|
||||
hdr, arg = result.groups()
|
||||
ghdr = "*not valid*"
|
||||
result = fieldnamere.search(hdr)
|
||||
if result != None:
|
||||
ghdr = result.groups()[0]
|
||||
if ghdr in fieldnames:
|
||||
if multilinefields.has_key(ghdr):
|
||||
hdrmulti = ghdr
|
||||
#fields[ghdr] = [""]
|
||||
fields[ghdr] = array.array("c")
|
||||
else:
|
||||
hdrmulti = ""
|
||||
#fields[ghdr] = [arg]
|
||||
fields[ghdr] = array.array("c", arg)
|
||||
elif hdrmulti != "":
|
||||
#fields[hdrmulti].append(line)
|
||||
fields[hdrmulti].fromstring(line)
|
||||
if hdrmulti == "envelope" and \
|
||||
(hdr == "Reply-To" or hdr == "From" \
|
||||
or hdr == "X-GNATS-Notify"):
|
||||
arg = fix_email_addrs(arg)
|
||||
#fields[hdr] = [arg]
|
||||
fields[hdr] = array.array("c", arg)
|
||||
if fields.has_key("Reply-To") and len(fields["Reply-To"]) > 0:
|
||||
fields["Reply-To"] = fields["Reply-To"]
|
||||
else:
|
||||
fields["Reply-To"] = fields["From"]
|
||||
if fields.has_key("From"):
|
||||
del fields["From"]
|
||||
if fields.has_key("X-GNATS-Notify") == 0:
|
||||
fields["X-GNATS-Notify"] = array.array("c")
|
||||
#fields["X-GNATS-Notify"] = ""
|
||||
for x in fields.keys():
|
||||
fields[x] = fields[x].tostring()
|
||||
#fields[x] = "".join(fields[x])
|
||||
for x in fields.keys():
|
||||
if multilinefields.has_key(x):
|
||||
fields[x] = fields[x].rstrip()
|
||||
|
||||
return fields
|
||||
parse_pr = staticmethod(parse_pr)
|
||||
load_index("%s/gnats-adm/index" % gnats_db_dir)
|
||||
load_categories("%s/gnats-adm/categories" % gnats_db_dir)
|
||||
load_responsible("%s/gnats-adm/responsible" % gnats_db_dir)
|
||||
get_userid(gnats_username)
|
||||
get_userid(unassigned_username)
|
||||
for x in pr_list:
|
||||
print "Processing %s..." % x
|
||||
a = GNATSbug ("%s/%s" % (gnats_db_dir, x))
|
||||
b = Bugzillabug(a)
|
||||
write_non_bug_tables()
|
||||
outfile.close()
|
|
@ -0,0 +1,712 @@
|
|||
# Found on a russian zope mailing list, and modified to fix bugs in parsing
|
||||
# the magic file and string making
|
||||
# -- Daniel Berlin <dberlin@dberlin.org>
|
||||
import sys, struct, time, re, exceptions, pprint, stat, os, pwd, grp
|
||||
|
||||
_mew = 0
|
||||
|
||||
# _magic='/tmp/magic'
|
||||
# _magic='/usr/share/magic.mime'
|
||||
_magic='/usr/share/magic.mime'
|
||||
mime = 1
|
||||
|
||||
_ldate_adjust = lambda x: time.mktime( time.gmtime(x) )
|
||||
|
||||
BUFFER_SIZE = 1024 * 128 # 128K should be enough...
|
||||
|
||||
class MagicError(exceptions.Exception): pass
|
||||
|
||||
def _handle(fmt='@x',adj=None): return fmt, struct.calcsize(fmt), adj
|
||||
|
||||
KnownTypes = {
|
||||
# 'byte':_handle('@b'),
|
||||
'byte':_handle('@B'),
|
||||
'ubyte':_handle('@B'),
|
||||
|
||||
'string':('s',0,None),
|
||||
'pstring':_handle('p'),
|
||||
|
||||
# 'short':_handle('@h'),
|
||||
# 'beshort':_handle('>h'),
|
||||
# 'leshort':_handle('<h'),
|
||||
'short':_handle('@H'),
|
||||
'beshort':_handle('>H'),
|
||||
'leshort':_handle('<H'),
|
||||
'ushort':_handle('@H'),
|
||||
'ubeshort':_handle('>H'),
|
||||
'uleshort':_handle('<H'),
|
||||
|
||||
'long':_handle('@l'),
|
||||
'belong':_handle('>l'),
|
||||
'lelong':_handle('<l'),
|
||||
'ulong':_handle('@L'),
|
||||
'ubelong':_handle('>L'),
|
||||
'ulelong':_handle('<L'),
|
||||
|
||||
'date':_handle('=l'),
|
||||
'bedate':_handle('>l'),
|
||||
'ledate':_handle('<l'),
|
||||
'ldate':_handle('=l',_ldate_adjust),
|
||||
'beldate':_handle('>l',_ldate_adjust),
|
||||
'leldate':_handle('<l',_ldate_adjust),
|
||||
}
|
||||
|
||||
_mew_cnt = 0
|
||||
def mew(x):
|
||||
global _mew_cnt
|
||||
if _mew :
|
||||
if x=='.' :
|
||||
_mew_cnt += 1
|
||||
if _mew_cnt % 64 == 0 : sys.stderr.write( '\n' )
|
||||
sys.stderr.write( '.' )
|
||||
else:
|
||||
sys.stderr.write( '\b'+x )
|
||||
|
||||
def has_format(s):
|
||||
n = 0
|
||||
l = None
|
||||
for c in s :
|
||||
if c == '%' :
|
||||
if l == '%' : n -= 1
|
||||
else : n += 1
|
||||
l = c
|
||||
return n
|
||||
|
||||
def read_asciiz(file,size=None,pos=None):
|
||||
s = []
|
||||
if pos :
|
||||
mew('s')
|
||||
file.seek( pos, 0 )
|
||||
mew('z')
|
||||
if size is not None :
|
||||
s = [file.read( size ).split('\0')[0]]
|
||||
else:
|
||||
while 1 :
|
||||
c = file.read(1)
|
||||
if (not c) or (ord(c)==0) or (c=='\n') : break
|
||||
s.append (c)
|
||||
mew('Z')
|
||||
return ''.join(s)
|
||||
|
||||
def a2i(v,base=0):
|
||||
if v[-1:] in 'lL' : v = v[:-1]
|
||||
return int( v, base )
|
||||
|
||||
_cmap = {
|
||||
'\\' : '\\',
|
||||
'0' : '\0',
|
||||
}
|
||||
for c in range(ord('a'),ord('z')+1) :
|
||||
try : e = eval('"\\%c"' % chr(c))
|
||||
except ValueError : pass
|
||||
else : _cmap[chr(c)] = e
|
||||
else:
|
||||
del c
|
||||
del e
|
||||
|
||||
def make_string(s):
|
||||
return eval( '"'+s.replace('"','\\"')+'"')
|
||||
|
||||
class MagicTestError(MagicError): pass
|
||||
|
||||
class MagicTest:
|
||||
def __init__(self,offset,mtype,test,message,line=None,level=None):
|
||||
self.line, self.level = line, level
|
||||
self.mtype = mtype
|
||||
self.mtest = test
|
||||
self.subtests = []
|
||||
self.mask = None
|
||||
self.smod = None
|
||||
self.nmod = None
|
||||
self.offset, self.type, self.test, self.message = \
|
||||
offset,mtype,test,message
|
||||
if self.mtype == 'true' : return # XXX hack to enable level skips
|
||||
if test[-1:]=='\\' and test[-2:]!='\\\\' :
|
||||
self.test += 'n' # looks like someone wanted EOL to match?
|
||||
if mtype[:6]=='string' :
|
||||
if '/' in mtype : # for strings
|
||||
self.type, self.smod = \
|
||||
mtype[:mtype.find('/')], mtype[mtype.find('/')+1:]
|
||||
else:
|
||||
for nm in '&+-' :
|
||||
if nm in mtype : # for integer-based
|
||||
self.nmod, self.type, self.mask = (
|
||||
nm,
|
||||
mtype[:mtype.find(nm)],
|
||||
# convert mask to int, autodetect base
|
||||
int( mtype[mtype.find(nm)+1:], 0 )
|
||||
)
|
||||
break
|
||||
self.struct, self.size, self.cast = KnownTypes[ self.type ]
|
||||
def __str__(self):
|
||||
return '%s %s %s %s' % (
|
||||
self.offset, self.mtype, self.mtest, self.message
|
||||
)
|
||||
def __repr__(self):
|
||||
return 'MagicTest(%s,%s,%s,%s,line=%s,level=%s,subtests=\n%s%s)' % (
|
||||
`self.offset`, `self.mtype`, `self.mtest`, `self.message`,
|
||||
`self.line`, `self.level`,
|
||||
'\t'*self.level, pprint.pformat(self.subtests)
|
||||
)
|
||||
def run(self,file):
|
||||
result = ''
|
||||
do_close = 0
|
||||
try:
|
||||
if type(file) == type('x') :
|
||||
file = open( file, 'r', BUFFER_SIZE )
|
||||
do_close = 1
|
||||
# else:
|
||||
# saved_pos = file.tell()
|
||||
if self.mtype != 'true' :
|
||||
data = self.read(file)
|
||||
last = file.tell()
|
||||
else:
|
||||
data = last = None
|
||||
if self.check( data ) :
|
||||
result = self.message+' '
|
||||
if has_format( result ) : result %= data
|
||||
for test in self.subtests :
|
||||
m = test.run(file)
|
||||
if m is not None : result += m
|
||||
return make_string( result )
|
||||
finally:
|
||||
if do_close :
|
||||
file.close()
|
||||
# else:
|
||||
# file.seek( saved_pos, 0 )
|
||||
def get_mod_and_value(self):
|
||||
if self.type[-6:] == 'string' :
|
||||
# "something like\tthis\n"
|
||||
if self.test[0] in '=<>' :
|
||||
mod, value = self.test[0], make_string( self.test[1:] )
|
||||
else:
|
||||
mod, value = '=', make_string( self.test )
|
||||
else:
|
||||
if self.test[0] in '=<>&^' :
|
||||
mod, value = self.test[0], a2i(self.test[1:])
|
||||
elif self.test[0] == 'x':
|
||||
mod = self.test[0]
|
||||
value = 0
|
||||
else:
|
||||
mod, value = '=', a2i(self.test)
|
||||
return mod, value
|
||||
def read(self,file):
|
||||
mew( 's' )
|
||||
file.seek( self.offset(file), 0 ) # SEEK_SET
|
||||
mew( 'r' )
|
||||
try:
|
||||
data = rdata = None
|
||||
# XXX self.size might be 0 here...
|
||||
if self.size == 0 :
|
||||
# this is an ASCIIZ string...
|
||||
size = None
|
||||
if self.test != '>\\0' : # magic's hack for string read...
|
||||
value = self.get_mod_and_value()[1]
|
||||
size = (value=='\0') and None or len(value)
|
||||
rdata = data = read_asciiz( file, size=size )
|
||||
else:
|
||||
rdata = file.read( self.size )
|
||||
if not rdata or (len(rdata)!=self.size) : return None
|
||||
data = struct.unpack( self.struct, rdata )[0] # XXX hack??
|
||||
except:
|
||||
print >>sys.stderr, self
|
||||
print >>sys.stderr, '@%s struct=%s size=%d rdata=%s' % (
|
||||
self.offset, `self.struct`, self.size,`rdata`)
|
||||
raise
|
||||
mew( 'R' )
|
||||
if self.cast : data = self.cast( data )
|
||||
if self.mask :
|
||||
try:
|
||||
if self.nmod == '&' : data &= self.mask
|
||||
elif self.nmod == '+' : data += self.mask
|
||||
elif self.nmod == '-' : data -= self.mask
|
||||
else: raise MagicTestError(self.nmod)
|
||||
except:
|
||||
print >>sys.stderr,'data=%s nmod=%s mask=%s' % (
|
||||
`data`, `self.nmod`, `self.mask`
|
||||
)
|
||||
raise
|
||||
return data
|
||||
def check(self,data):
|
||||
mew('.')
|
||||
if self.mtype == 'true' :
|
||||
return '' # not None !
|
||||
mod, value = self.get_mod_and_value()
|
||||
if self.type[-6:] == 'string' :
|
||||
# "something like\tthis\n"
|
||||
if self.smod :
|
||||
xdata = data
|
||||
if 'b' in self.smod : # all blanks are optional
|
||||
xdata = ''.join( data.split() )
|
||||
value = ''.join( value.split() )
|
||||
if 'c' in self.smod : # all blanks are optional
|
||||
xdata = xdata.upper()
|
||||
value = value.upper()
|
||||
# if 'B' in self.smod : # compact blanks
|
||||
### XXX sorry, i don't understand this :-(
|
||||
# data = ' '.join( data.split() )
|
||||
# if ' ' not in data : return None
|
||||
else:
|
||||
xdata = data
|
||||
try:
|
||||
if mod == '=' : result = data == value
|
||||
elif mod == '<' : result = data < value
|
||||
elif mod == '>' : result = data > value
|
||||
elif mod == '&' : result = data & value
|
||||
elif mod == '^' : result = (data & (~value)) == 0
|
||||
elif mod == 'x' : result = 1
|
||||
else : raise MagicTestError(self.test)
|
||||
if result :
|
||||
zdata, zval = `data`, `value`
|
||||
if self.mtype[-6:]!='string' :
|
||||
try: zdata, zval = hex(data), hex(value)
|
||||
except: zdata, zval = `data`, `value`
|
||||
if 0 : print >>sys.stderr, '%s @%s %s:%s %s %s => %s (%s)' % (
|
||||
'>'*self.level, self.offset,
|
||||
zdata, self.mtype, `mod`, zval, `result`,
|
||||
self.message
|
||||
)
|
||||
return result
|
||||
except:
|
||||
print >>sys.stderr,'mtype=%s data=%s mod=%s value=%s' % (
|
||||
`self.mtype`, `data`, `mod`, `value`
|
||||
)
|
||||
raise
|
||||
def add(self,mt):
|
||||
if not isinstance(mt,MagicTest) :
|
||||
raise MagicTestError((mt,'incorrect subtest type %s'%(type(mt),)))
|
||||
if mt.level == self.level+1 :
|
||||
self.subtests.append( mt )
|
||||
elif self.subtests :
|
||||
self.subtests[-1].add( mt )
|
||||
elif mt.level > self.level+1 :
|
||||
# it's possible to get level 3 just after level 1 !!! :-(
|
||||
level = self.level + 1
|
||||
while level < mt.level :
|
||||
xmt = MagicTest(None,'true','x','',line=self.line,level=level)
|
||||
self.add( xmt )
|
||||
level += 1
|
||||
else:
|
||||
self.add( mt ) # retry...
|
||||
else:
|
||||
raise MagicTestError((mt,'incorrect subtest level %s'%(`mt.level`,)))
|
||||
def last_test(self):
|
||||
return self.subtests[-1]
|
||||
#end class MagicTest
|
||||
|
||||
class OffsetError(MagicError): pass
|
||||
|
||||
class Offset:
|
||||
pos_format = {'b':'<B','B':'>B','s':'<H','S':'>H','l':'<I','L':'>I',}
|
||||
pattern0 = re.compile(r''' # mere offset
|
||||
^
|
||||
&? # possible ampersand
|
||||
( 0 # just zero
|
||||
| [1-9]{1,1}[0-9]* # decimal
|
||||
| 0[0-7]+ # octal
|
||||
| 0x[0-9a-f]+ # hex
|
||||
)
|
||||
$
|
||||
''', re.X|re.I
|
||||
)
|
||||
pattern1 = re.compile(r''' # indirect offset
|
||||
^\(
|
||||
(?P<base>&?0 # just zero
|
||||
|&?[1-9]{1,1}[0-9]* # decimal
|
||||
|&?0[0-7]* # octal
|
||||
|&?0x[0-9A-F]+ # hex
|
||||
)
|
||||
(?P<type>
|
||||
\. # this dot might be alone
|
||||
[BSL]? # one of this chars in either case
|
||||
)?
|
||||
(?P<sign>
|
||||
[-+]{0,1}
|
||||
)?
|
||||
(?P<off>0 # just zero
|
||||
|[1-9]{1,1}[0-9]* # decimal
|
||||
|0[0-7]* # octal
|
||||
|0x[0-9a-f]+ # hex
|
||||
)?
|
||||
\)$''', re.X|re.I
|
||||
)
|
||||
def __init__(self,s):
|
||||
self.source = s
|
||||
self.value = None
|
||||
self.relative = 0
|
||||
self.base = self.type = self.sign = self.offs = None
|
||||
m = Offset.pattern0.match( s )
|
||||
if m : # just a number
|
||||
if s[0] == '&' :
|
||||
self.relative, self.value = 1, int( s[1:], 0 )
|
||||
else:
|
||||
self.value = int( s, 0 )
|
||||
return
|
||||
m = Offset.pattern1.match( s )
|
||||
if m : # real indirect offset
|
||||
try:
|
||||
self.base = m.group('base')
|
||||
if self.base[0] == '&' :
|
||||
self.relative, self.base = 1, int( self.base[1:], 0 )
|
||||
else:
|
||||
self.base = int( self.base, 0 )
|
||||
if m.group('type') : self.type = m.group('type')[1:]
|
||||
self.sign = m.group('sign')
|
||||
if m.group('off') : self.offs = int( m.group('off'), 0 )
|
||||
if self.sign == '-' : self.offs = 0 - self.offs
|
||||
except:
|
||||
print >>sys.stderr, '$$', m.groupdict()
|
||||
raise
|
||||
return
|
||||
raise OffsetError(`s`)
|
||||
def __call__(self,file=None):
|
||||
if self.value is not None : return self.value
|
||||
pos = file.tell()
|
||||
try:
|
||||
if not self.relative : file.seek( self.offset, 0 )
|
||||
frmt = Offset.pos_format.get( self.type, 'I' )
|
||||
size = struct.calcsize( frmt )
|
||||
data = struct.unpack( frmt, file.read( size ) )
|
||||
if self.offs : data += self.offs
|
||||
return data
|
||||
finally:
|
||||
file.seek( pos, 0 )
|
||||
def __str__(self): return self.source
|
||||
def __repr__(self): return 'Offset(%s)' % `self.source`
|
||||
#end class Offset
|
||||
|
||||
class MagicFileError(MagicError): pass
|
||||
|
||||
class MagicFile:
|
||||
def __init__(self,filename=_magic):
|
||||
self.file = None
|
||||
self.tests = []
|
||||
self.total_tests = 0
|
||||
self.load( filename )
|
||||
self.ack_tests = None
|
||||
self.nak_tests = None
|
||||
def __del__(self):
|
||||
self.close()
|
||||
def load(self,filename=None):
|
||||
self.open( filename )
|
||||
self.parse()
|
||||
self.close()
|
||||
def open(self,filename=None):
|
||||
self.close()
|
||||
if filename is not None :
|
||||
self.filename = filename
|
||||
self.file = open( self.filename, 'r', BUFFER_SIZE )
|
||||
def close(self):
|
||||
if self.file :
|
||||
self.file.close()
|
||||
self.file = None
|
||||
def parse(self):
|
||||
line_no = 0
|
||||
for line in self.file.xreadlines() :
|
||||
line_no += 1
|
||||
if not line or line[0]=='#' : continue
|
||||
line = line.lstrip().rstrip('\r\n')
|
||||
if not line or line[0]=='#' : continue
|
||||
try:
|
||||
x = self.parse_line( line )
|
||||
if x is None :
|
||||
print >>sys.stderr, '#[%04d]#'%line_no, line
|
||||
continue
|
||||
except:
|
||||
print >>sys.stderr, '###[%04d]###'%line_no, line
|
||||
raise
|
||||
self.total_tests += 1
|
||||
level, offset, mtype, test, message = x
|
||||
new_test = MagicTest(offset,mtype,test,message,
|
||||
line=line_no,level=level)
|
||||
try:
|
||||
if level == 0 :
|
||||
self.tests.append( new_test )
|
||||
else:
|
||||
self.tests[-1].add( new_test )
|
||||
except:
|
||||
if 1 :
|
||||
print >>sys.stderr, 'total tests=%s' % (
|
||||
`self.total_tests`,
|
||||
)
|
||||
print >>sys.stderr, 'level=%s' % (
|
||||
`level`,
|
||||
)
|
||||
print >>sys.stderr, 'tests=%s' % (
|
||||
pprint.pformat(self.tests),
|
||||
)
|
||||
raise
|
||||
else:
|
||||
while self.tests[-1].level > 0 :
|
||||
self.tests.pop()
|
||||
def parse_line(self,line):
|
||||
# print >>sys.stderr, 'line=[%s]' % line
|
||||
if (not line) or line[0]=='#' : return None
|
||||
level = 0
|
||||
offset = mtype = test = message = ''
|
||||
mask = None
|
||||
# get optional level (count leading '>')
|
||||
while line and line[0]=='>' :
|
||||
line, level = line[1:], level+1
|
||||
# get offset
|
||||
while line and not line[0].isspace() :
|
||||
offset, line = offset+line[0], line[1:]
|
||||
try:
|
||||
offset = Offset(offset)
|
||||
except:
|
||||
print >>sys.stderr, 'line=[%s]' % line
|
||||
raise
|
||||
# skip spaces
|
||||
line = line.lstrip()
|
||||
# get type
|
||||
c = None
|
||||
while line :
|
||||
last_c, c, line = c, line[0], line[1:]
|
||||
if last_c!='\\' and c.isspace() :
|
||||
break # unescaped space - end of field
|
||||
else:
|
||||
mtype += c
|
||||
if last_c == '\\' :
|
||||
c = None # don't fuck my brain with sequential backslashes
|
||||
# skip spaces
|
||||
line = line.lstrip()
|
||||
# get test
|
||||
c = None
|
||||
while line :
|
||||
last_c, c, line = c, line[0], line[1:]
|
||||
if last_c!='\\' and c.isspace() :
|
||||
break # unescaped space - end of field
|
||||
else:
|
||||
test += c
|
||||
if last_c == '\\' :
|
||||
c = None # don't fuck my brain with sequential backslashes
|
||||
# skip spaces
|
||||
line = line.lstrip()
|
||||
# get message
|
||||
message = line
|
||||
if mime and line.find("\t") != -1:
|
||||
message=line[0:line.find("\t")]
|
||||
#
|
||||
# print '>>', level, offset, mtype, test, message
|
||||
return level, offset, mtype, test, message
|
||||
def detect(self,file):
|
||||
self.ack_tests = 0
|
||||
self.nak_tests = 0
|
||||
answers = []
|
||||
for test in self.tests :
|
||||
message = test.run( file )
|
||||
if message :
|
||||
self.ack_tests += 1
|
||||
answers.append( message )
|
||||
else:
|
||||
self.nak_tests += 1
|
||||
if answers :
|
||||
return '; '.join( answers )
|
||||
#end class MagicFile
|
||||
|
||||
def username(uid):
|
||||
try:
|
||||
return pwd.getpwuid( uid )[0]
|
||||
except:
|
||||
return '#%s'%uid
|
||||
|
||||
def groupname(gid):
|
||||
try:
|
||||
return grp.getgrgid( gid )[0]
|
||||
except:
|
||||
return '#%s'%gid
|
||||
|
||||
def get_file_type(fname,follow):
|
||||
t = None
|
||||
if not follow :
|
||||
try:
|
||||
st = os.lstat( fname ) # stat that entry, don't follow links!
|
||||
except os.error, why :
|
||||
pass
|
||||
else:
|
||||
if stat.S_ISLNK(st[stat.ST_MODE]) :
|
||||
t = 'symbolic link'
|
||||
try:
|
||||
lnk = os.readlink( fname )
|
||||
except:
|
||||
t += ' (unreadable)'
|
||||
else:
|
||||
t += ' to '+lnk
|
||||
if t is None :
|
||||
try:
|
||||
st = os.stat( fname )
|
||||
except os.error, why :
|
||||
return "can't stat `%s' (%s)." % (why.filename,why.strerror)
|
||||
|
||||
dmaj, dmin = (st.st_rdev>>8)&0x0FF, st.st_rdev&0x0FF
|
||||
|
||||
if 0 : pass
|
||||
elif stat.S_ISSOCK(st.st_mode) : t = 'socket'
|
||||
elif stat.S_ISLNK (st.st_mode) : t = follow and 'symbolic link' or t
|
||||
elif stat.S_ISREG (st.st_mode) : t = 'file'
|
||||
elif stat.S_ISBLK (st.st_mode) : t = 'block special (%d/%d)'%(dmaj,dmin)
|
||||
elif stat.S_ISDIR (st.st_mode) : t = 'directory'
|
||||
elif stat.S_ISCHR (st.st_mode) : t = 'character special (%d/%d)'%(dmaj,dmin)
|
||||
elif stat.S_ISFIFO(st.st_mode) : t = 'pipe'
|
||||
else: t = '<unknown>'
|
||||
|
||||
if st.st_mode & stat.S_ISUID :
|
||||
t = 'setuid(%d=%s) %s'%(st.st_uid,username(st.st_uid),t)
|
||||
if st.st_mode & stat.S_ISGID :
|
||||
t = 'setgid(%d=%s) %s'%(st.st_gid,groupname(st.st_gid),t)
|
||||
if st.st_mode & stat.S_ISVTX :
|
||||
t = 'sticky '+t
|
||||
|
||||
return t
|
||||
|
||||
HELP = '''%s [options] [files...]
|
||||
|
||||
Options:
|
||||
|
||||
-?, --help -- this help
|
||||
-m, --magic=<file> -- use this magic <file> instead of %s
|
||||
-f, --files=<namefile> -- read filenames for <namefile>
|
||||
* -C, --compile -- write "compiled" magic file
|
||||
-b, --brief -- don't prepend filenames to output lines
|
||||
+ -c, --check -- check the magic file
|
||||
-i, --mime -- output MIME types
|
||||
* -k, --keep-going -- don't stop st the first match
|
||||
-n, --flush -- flush stdout after each line
|
||||
-v, --verson -- print version and exit
|
||||
* -z, --compressed -- try to look inside compressed files
|
||||
-L, --follow -- follow symlinks
|
||||
-s, --special -- don't skip special files
|
||||
|
||||
* -- not implemented so far ;-)
|
||||
+ -- implemented, but in another way...
|
||||
'''
|
||||
|
||||
def main():
|
||||
import getopt
|
||||
global _magic
|
||||
try:
|
||||
brief = 0
|
||||
flush = 0
|
||||
follow= 0
|
||||
mime = 0
|
||||
check = 0
|
||||
special=0
|
||||
try:
|
||||
opts, args = getopt.getopt(
|
||||
sys.argv[1:],
|
||||
'?m:f:CbciknvzLs',
|
||||
( 'help',
|
||||
'magic=',
|
||||
'names=',
|
||||
'compile',
|
||||
'brief',
|
||||
'check',
|
||||
'mime',
|
||||
'keep-going',
|
||||
'flush',
|
||||
'version',
|
||||
'compressed',
|
||||
'follow',
|
||||
'special',
|
||||
)
|
||||
)
|
||||
except getopt.error, why:
|
||||
print >>sys.stderr, sys.argv[0], why
|
||||
return 1
|
||||
else:
|
||||
files = None
|
||||
for o,v in opts :
|
||||
if o in ('-?','--help'):
|
||||
print HELP % (
|
||||
sys.argv[0],
|
||||
_magic,
|
||||
)
|
||||
return 0
|
||||
elif o in ('-f','--files='):
|
||||
files = v
|
||||
elif o in ('-m','--magic='):
|
||||
_magic = v[:]
|
||||
elif o in ('-C','--compile'):
|
||||
pass
|
||||
elif o in ('-b','--brief'):
|
||||
brief = 1
|
||||
elif o in ('-c','--check'):
|
||||
check = 1
|
||||
elif o in ('-i','--mime'):
|
||||
mime = 1
|
||||
if os.path.exists( _magic+'.mime' ) :
|
||||
_magic += '.mime'
|
||||
print >>sys.stderr,sys.argv[0]+':',\
|
||||
"Using regular magic file `%s'" % _magic
|
||||
elif o in ('-k','--keep-going'):
|
||||
pass
|
||||
elif o in ('-n','--flush'):
|
||||
flush = 1
|
||||
elif o in ('-v','--version'):
|
||||
print 'VERSION'
|
||||
return 0
|
||||
elif o in ('-z','--compressed'):
|
||||
pass
|
||||
elif o in ('-L','--follow'):
|
||||
follow = 1
|
||||
elif o in ('-s','--special'):
|
||||
special = 1
|
||||
else:
|
||||
if files :
|
||||
files = map(lambda x: x.strip(), v.split(','))
|
||||
if '-' in files and '-' in args :
|
||||
error( 1, 'cannot use STDIN simultaneously for file list and data' )
|
||||
for file in files :
|
||||
for name in (
|
||||
(file=='-')
|
||||
and sys.stdin
|
||||
or open(file,'r',BUFFER_SIZE)
|
||||
).xreadlines():
|
||||
name = name.strip()
|
||||
if name not in args :
|
||||
args.append( name )
|
||||
try:
|
||||
if check : print >>sys.stderr, 'Loading magic database...'
|
||||
t0 = time.time()
|
||||
m = MagicFile(_magic)
|
||||
t1 = time.time()
|
||||
if check :
|
||||
print >>sys.stderr, \
|
||||
m.total_tests, 'tests loaded', \
|
||||
'for', '%.2f' % (t1-t0), 'seconds'
|
||||
print >>sys.stderr, len(m.tests), 'tests at top level'
|
||||
return 0 # XXX "shortened" form ;-)
|
||||
|
||||
mlen = max( map(len, args) )+1
|
||||
for arg in args :
|
||||
if not brief : print (arg + ':').ljust(mlen),
|
||||
ftype = get_file_type( arg, follow )
|
||||
if (special and ftype.find('special')>=0) \
|
||||
or ftype[-4:] == 'file' :
|
||||
t0 = time.time()
|
||||
try:
|
||||
t = m.detect( arg )
|
||||
except (IOError,os.error), why:
|
||||
t = "can't read `%s' (%s)" % (why.filename,why.strerror)
|
||||
if ftype[-4:] == 'file' : t = ftype[:-4] + t
|
||||
t1 = time.time()
|
||||
print t and t or 'data'
|
||||
if 0 : print \
|
||||
'#\t%d tests ok, %d tests failed for %.2f seconds'%\
|
||||
(m.ack_tests, m.nak_tests, t1-t0)
|
||||
else:
|
||||
print mime and 'application/x-not-regular-file' or ftype
|
||||
if flush : sys.stdout.flush()
|
||||
# print >>sys.stderr, 'DONE'
|
||||
except:
|
||||
if check : return 1
|
||||
raise
|
||||
else:
|
||||
return 0
|
||||
finally:
|
||||
pass
|
||||
|
||||
if __name__ == '__main__' :
|
||||
sys.exit( main() )
|
||||
# vim:ai
|
||||
# EOF #
|
|
@ -0,0 +1,104 @@
|
|||
#! /usr/bin/env python2.2
|
||||
|
||||
# Copyright 1994 by Lance Ellinghouse
|
||||
# Cathedral City, California Republic, United States of America.
|
||||
# All Rights Reserved
|
||||
# Permission to use, copy, modify, and distribute this software and its
|
||||
# documentation for any purpose and without fee is hereby granted,
|
||||
# provided that the above copyright notice appear in all copies and that
|
||||
# both that copyright notice and this permission notice appear in
|
||||
# supporting documentation, and that the name of Lance Ellinghouse
|
||||
# not be used in advertising or publicity pertaining to distribution
|
||||
# of the software without specific, written prior permission.
|
||||
# LANCE ELLINGHOUSE DISCLAIMS ALL WARRANTIES WITH REGARD TO
|
||||
# THIS SOFTWARE, INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND
|
||||
# FITNESS, IN NO EVENT SHALL LANCE ELLINGHOUSE CENTRUM BE LIABLE
|
||||
# FOR ANY SPECIAL, INDIRECT OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
|
||||
# WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
|
||||
# ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT
|
||||
# OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
|
||||
#
|
||||
# Modified by Jack Jansen, CWI, July 1995:
|
||||
# - Use binascii module to do the actual line-by-line conversion
|
||||
# between ascii and binary. This results in a 1000-fold speedup. The C
|
||||
# version is still 5 times faster, though.
|
||||
# - Arguments more compliant with python standard
|
||||
|
||||
"""Implementation of the UUencode and UUdecode functions.
|
||||
|
||||
encode(in_file, out_file [,name, mode])
|
||||
decode(in_file [, out_file, mode])
|
||||
"""
|
||||
|
||||
import binascii
|
||||
import os
|
||||
import sys
|
||||
from types import StringType
|
||||
|
||||
__all__ = ["Error", "decode"]
|
||||
|
||||
class Error(Exception):
|
||||
pass
|
||||
|
||||
def decode(in_file, out_file=None, mode=None, quiet=0):
|
||||
"""Decode uuencoded file"""
|
||||
#
|
||||
# Open the input file, if needed.
|
||||
#
|
||||
if in_file == '-':
|
||||
in_file = sys.stdin
|
||||
elif isinstance(in_file, StringType):
|
||||
in_file = open(in_file)
|
||||
#
|
||||
# Read until a begin is encountered or we've exhausted the file
|
||||
#
|
||||
while 1:
|
||||
hdr = in_file.readline()
|
||||
if not hdr:
|
||||
raise Error, 'No valid begin line found in input file'
|
||||
if hdr[:5] != 'begin':
|
||||
continue
|
||||
hdrfields = hdr.split(" ", 2)
|
||||
if len(hdrfields) == 3 and hdrfields[0] == 'begin':
|
||||
try:
|
||||
int(hdrfields[1], 8)
|
||||
start_pos = in_file.tell() - len (hdr)
|
||||
break
|
||||
except ValueError:
|
||||
pass
|
||||
if out_file is None:
|
||||
out_file = hdrfields[2].rstrip()
|
||||
if os.path.exists(out_file):
|
||||
raise Error, 'Cannot overwrite existing file: %s' % out_file
|
||||
if mode is None:
|
||||
mode = int(hdrfields[1], 8)
|
||||
#
|
||||
# Open the output file
|
||||
#
|
||||
if out_file == '-':
|
||||
out_file = sys.stdout
|
||||
elif isinstance(out_file, StringType):
|
||||
fp = open(out_file, 'wb')
|
||||
try:
|
||||
os.path.chmod(out_file, mode)
|
||||
except AttributeError:
|
||||
pass
|
||||
out_file = fp
|
||||
#
|
||||
# Main decoding loop
|
||||
#
|
||||
s = in_file.readline()
|
||||
while s and s.strip() != 'end':
|
||||
try:
|
||||
data = binascii.a2b_uu(s)
|
||||
except binascii.Error, v:
|
||||
# Workaround for broken uuencoders by /Fredrik Lundh
|
||||
nbytes = (((ord(s[0])-32) & 63) * 4 + 5) / 3
|
||||
data = binascii.a2b_uu(s[:nbytes])
|
||||
if not quiet:
|
||||
sys.stderr.write("Warning: %s\n" % str(v))
|
||||
out_file.write(data)
|
||||
s = in_file.readline()
|
||||
# if not s:
|
||||
# raise Error, 'Truncated input file'
|
||||
return (hdrfields[2].rstrip(), start_pos, in_file.tell())
|
Загрузка…
Ссылка в новой задаче