зеркало из https://github.com/mozilla/pjs.git
313 строки
12 KiB
Python
313 строки
12 KiB
Python
# The contents of this file are subject to the Mozilla Public License Version
|
|
# 1.1 (the "License"); you may not use this file except in compliance with the
|
|
# License. You may obtain a copy of the License at http://www.mozilla.org/MPL/
|
|
#
|
|
# Software distributed under the License is distributed on an "AS IS" basis,
|
|
# WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License for
|
|
# the specific language governing rights and limitations under the License.
|
|
#
|
|
# The Original Code is the Python XPCOM language bindings.
|
|
#
|
|
# The Initial Developer of the Original Code is ActiveState Tool Corp.
|
|
# Portions created by ActiveState Tool Corp. are Copyright (C) 2000, 2001
|
|
# ActiveState Tool Corp. All Rights Reserved.
|
|
#
|
|
# Contributor(s): Mark Hammond <MarkH@ActiveState.com> (original author)
|
|
#
|
|
|
|
"""Implementation of Python file objects for Mozilla/xpcom.
|
|
|
|
Introduction:
|
|
This module defines various class that are implemented using
|
|
Mozilla streams. This allows you to open Mozilla URI's, and
|
|
treat them as Python file object.
|
|
|
|
Example:
|
|
>>> file = URIFile("chrome://whatever")
|
|
>>> data = file.read(5) # Pass no arg to read everything.
|
|
|
|
Known Limitations:
|
|
* Not all URL schemes will work from "python.exe" - most notably
|
|
"chrome://" and "http://" URLs - this is because a simple initialization of
|
|
xpcom by Python does not load up the full set of Mozilla URL handlers.
|
|
If you can work out how to correctly initialize the chrome registry and
|
|
setup a message queue.
|
|
|
|
Known Bugs:
|
|
* Only read ("r") mode is supported. Although write ("w") mode doesnt make
|
|
sense for HTTP type URLs, it potentially does for file:// etc type ones.
|
|
* No concept of text mode vs binary mode. It appears Mozilla takes care of
|
|
this internally (ie, all "text/???" mime types are text, rest are binary)
|
|
|
|
"""
|
|
|
|
from xpcom import components, Exception, _xpcom
|
|
import os
|
|
import threading # for locks.
|
|
|
|
NS_RDONLY = components.interfaces.nsIFileChannel.NS_RDONLY
|
|
NS_WRONLY = components.interfaces.nsIFileChannel.NS_WRONLY
|
|
NS_RDWR = components.interfaces.nsIFileChannel.NS_RDWR
|
|
NS_CREATE_FILE = components.interfaces.nsIFileChannel.NS_CREATE_FILE
|
|
NS_APPEND = components.interfaces.nsIFileChannel.NS_APPEND
|
|
NS_TRUNCATE = components.interfaces.nsIFileChannel.NS_TRUNCATE
|
|
NS_SYNC = components.interfaces.nsIFileChannel.NS_SYNC
|
|
NS_EXCL = components.interfaces.nsIFileChannel.NS_EXCL
|
|
|
|
# A helper function - you pass a progID, an interface, plus optionally the
|
|
# name of the "constructor" and its args.
|
|
def _construct(progid, interface, ctor = None, *args):
|
|
assert (ctor is None and not args) or (ctor is not None and args), \
|
|
"no point having a ctor with no args, or if you provide args, there must be a ctor!"
|
|
instance = components.classes[progid] \
|
|
.createInstance(interface)
|
|
if ctor is not None:
|
|
ctor = getattr(instance, ctor)
|
|
apply(ctor, args)
|
|
return instance
|
|
|
|
# A helper function that may come in useful
|
|
def LocalFileToURL(localFileName):
|
|
"Convert a filename to an XPCOM nsIFileURL object."
|
|
# Create an nsILocalFile
|
|
localFile = components.classes["@mozilla.org/file/local;1"] \
|
|
.createInstance(components.interfaces.nsILocalFile)
|
|
localFile.initWithPath(localFileName)
|
|
|
|
# Create a standard URL, but we QI for the FileURL interface!
|
|
url = components.classes["@mozilla.org/network/standard-url;1"] \
|
|
.createInstance(components.interfaces.nsIFileURL)
|
|
# Setting the "file" attribute causes initialization...
|
|
url.file = localFile
|
|
return url
|
|
|
|
# A base class for file objects.
|
|
class _File:
|
|
def __init__(self, name_thingy = None, mode="r"):
|
|
self.lockob = threading.Lock()
|
|
self.inputStream = self.outputStream = None
|
|
if name_thingy is not None:
|
|
self.init(name_thingy, mode)
|
|
|
|
def __del__(self):
|
|
self.close()
|
|
|
|
# The Moz file streams are not thread safe.
|
|
def _lock(self):
|
|
self.lockob.acquire()
|
|
def _release(self):
|
|
self.lockob.release()
|
|
def read(self, n = -1):
|
|
assert self.inputStream is not None, "Not setup for read!"
|
|
self._lock()
|
|
try:
|
|
return str(self.inputStream.read(n))
|
|
finally:
|
|
self._release()
|
|
|
|
def readlines(self):
|
|
# Not part of the xpcom interface, but handy for direct Python users.
|
|
# Not 100% faithful, but near enough for now!
|
|
lines = self.read().split("\n")
|
|
if len(lines) and len(lines[-1]) == 0:
|
|
lines = lines[:-1]
|
|
return [s+"\n" for s in lines ]
|
|
|
|
def write(self, data):
|
|
assert self.outputStream is not None, "Not setup for write!"
|
|
self._lock()
|
|
try:
|
|
self.outputStream.write(data, len(data))
|
|
finally:
|
|
self._release()
|
|
|
|
def close(self):
|
|
self._lock()
|
|
try:
|
|
if self.inputStream is not None:
|
|
self.inputStream.close()
|
|
self.inputStream = None
|
|
if self.outputStream is not None:
|
|
self.outputStream.close()
|
|
self.outputStream = None
|
|
self.channel = None
|
|
# leave self.fileInst alone - it should be available after close.
|
|
finally:
|
|
self._release()
|
|
|
|
def flush(self):
|
|
self._lock()
|
|
try:
|
|
if self.outputStream is not None: self.outputStream.flush()
|
|
finally:
|
|
self._release()
|
|
|
|
# A synchronous "file object" used to open a URI.
|
|
class URIFile(_File):
|
|
def init(self, url, mode="r"):
|
|
self.close()
|
|
if mode != "r":
|
|
raise ValueError, "only 'r' mode supported'"
|
|
io_service = components.classes["@mozilla.org/network/io-service;1"] \
|
|
.createInstance("nsIIOService")
|
|
if hasattr(url, "queryInterface"):
|
|
url_ob = url
|
|
else:
|
|
url_ob = components.classes["@mozilla.org/network/standard-url;1"] \
|
|
.createInstance(components.interfaces.nsIURL)
|
|
url_ob.spec = url
|
|
# Mozilla asserts and starts saying "NULL POINTER" if this is wrong!
|
|
if not url_ob.scheme:
|
|
raise ValueError, ("The URI '%s' is invalid (no scheme)"
|
|
% (url_ob.spec,))
|
|
self.channel = io_service.newChannelFromURI(url_ob)
|
|
self.inputStream = self.channel.openInputStream()
|
|
|
|
# A "file object" implemented using Netscape's native file support.
|
|
# Based on io.js - http://lxr.mozilla.org/seamonkey/source/xpcom/tests/utils/io.js
|
|
# You open this file using a local file name (as a string) so it really is pointless -
|
|
# you may as well be using a standard Python file object!
|
|
class LocalFile(_File):
|
|
def init(self, name, mode = "r"):
|
|
name = os.path.abspath(name) # Moz libraries under Linux fail with relative paths.
|
|
self.close()
|
|
self.fileInst = _construct('@mozilla.org/file/local;1', "nsILocalFile", "initWithPath", name)
|
|
self.inputStream = None
|
|
self.channel = None
|
|
if mode in ["w","a"]:
|
|
if mode== "w":
|
|
if self.fileInst.exists():
|
|
self.fileInst.delete(0)
|
|
moz_mode = NS_CREATE_FILE | NS_WRONLY
|
|
elif mode=="a":
|
|
moz_mode = NS_APPEND
|
|
else:
|
|
assert 0, "Can't happen!"
|
|
perms = 0644
|
|
if not self.fileInst.exists():
|
|
self.fileInst.create(components.interfaces.nsILocalFile.NORMAL_FILE_TYPE, 0644)
|
|
self.channel = _construct('@mozilla.org/network/local-file-channel;1', "nsIFileChannel", "init",
|
|
self.fileInst, moz_mode, perms)
|
|
self.outputStream = self.channel.openOutputStream()
|
|
self.fileInst.permissions = perms
|
|
|
|
elif mode == "r":
|
|
self.channel = _construct('@mozilla.org/network/local-file-channel;1', "nsIFileChannel", "init",
|
|
self.fileInst, NS_RDONLY, self.fileInst.permissions)
|
|
self.inputStream = self.channel.openInputStream()
|
|
else:
|
|
raise ValueError, "Unknown mode"
|
|
|
|
def read(self, n = -1):
|
|
if n == -1:
|
|
n = self.fileInst.fileSize
|
|
return _File.read(self, n)
|
|
|
|
|
|
##########################################################
|
|
##
|
|
## Test Code
|
|
##
|
|
##########################################################
|
|
def _DoTestRead(file, expected):
|
|
# read in a couple of chunks, just to test that our various arg combinations work.
|
|
got = file.read(3)
|
|
got = got + file.read(300)
|
|
got = got + file.read(0)
|
|
got = got + file.read()
|
|
if got != expected:
|
|
raise RuntimeError, "Reading '%s' failed - got %d bytes, but expected %d bytes" % (file, len(got), len(expected))
|
|
|
|
def _DoTestBufferRead(file, expected):
|
|
# read in a couple of chunks, just to test that our various arg combinations work.
|
|
buffer = _xpcom.AllocateBuffer(50)
|
|
got = ''
|
|
while 1:
|
|
# Note - we need to reach into the file object so we
|
|
# can get at the native buffer supported function.
|
|
num = file.inputStream.read(buffer)
|
|
if num == 0:
|
|
break
|
|
got = got + str(buffer[:num])
|
|
if got != expected:
|
|
raise RuntimeError, "Reading '%s' failed - got %d bytes, but expected %d bytes" % (file, len(got), len(expected))
|
|
|
|
def _TestLocalFile():
|
|
import tempfile, os
|
|
fname = tempfile.mktemp()
|
|
data = "Hello from Python"
|
|
test_file = LocalFile(fname, "w")
|
|
try:
|
|
test_file.write(data)
|
|
test_file.close()
|
|
# Make sure Python can read it OK.
|
|
f = open(fname, "r")
|
|
if f.read() != data:
|
|
print "Eeek - Python could not read the data back correctly!"
|
|
f.close()
|
|
# For the sake of the test, try a re-init.
|
|
test_file.init(fname, "r")
|
|
got = str(test_file.read())
|
|
if got != data:
|
|
print "Read the wrong data back - %r" % (got,)
|
|
else:
|
|
print "Read the correct data."
|
|
test_file.close()
|
|
# Try reading in chunks.
|
|
test_file = LocalFile(fname, "r")
|
|
got = test_file.read(10) + test_file.read()
|
|
if got != data:
|
|
print "Chunks the wrong data back - %r" % (got,)
|
|
else:
|
|
print "Chunks read the correct data."
|
|
test_file.close()
|
|
# XXX - todo - test "a" mode!
|
|
finally:
|
|
try:
|
|
os.unlink(fname)
|
|
except OSError, details:
|
|
print "Error removing temp test file:", details
|
|
|
|
def _TestAll():
|
|
# A mini test suite.
|
|
# Get a test file, and convert it to a file:// URI.
|
|
# check what we read is the same as when
|
|
# we read this file "normally"
|
|
fname = components.__file__
|
|
if fname[-1] in "cCoO": # fix .pyc/.pyo
|
|
fname = fname[:-1]
|
|
expected = open(fname, "rb").read()
|
|
# convert the fname to a URI.
|
|
url = LocalFileToURL(fname)
|
|
# First try passing a URL as a string.
|
|
_DoTestRead( URIFile( url.spec), expected)
|
|
print "Open as string test worked."
|
|
# Now with a URL object.
|
|
_DoTestRead( URIFile( url ), expected)
|
|
print "Open as URL test worked."
|
|
|
|
_DoTestBufferRead( URIFile( url ), expected)
|
|
print "File test using buffers worked."
|
|
|
|
# For the sake of testing, do our pointless, demo object!
|
|
_DoTestRead( LocalFile(fname), expected )
|
|
print "Local file read test worked."
|
|
|
|
# Now do the full test of our pointless, demo object!
|
|
_TestLocalFile()
|
|
|
|
def _TestURI(url):
|
|
test_file = URIFile(url)
|
|
print "Opened file is", test_file
|
|
got = test_file.read()
|
|
print "Read %d bytes of data from %r" % (len(got), url)
|
|
test_file.close()
|
|
|
|
if __name__=='__main__':
|
|
import sys
|
|
if len(sys.argv) < 2:
|
|
print "No URL specified on command line - performing self-test"
|
|
_TestAll()
|
|
else:
|
|
_TestURI(sys.argv[1])
|