add a release script for convenience

This commit is contained in:
Dan Mills 2010-03-03 15:52:23 -08:00
Родитель 06c1ec33c3
Коммит 5206a7bf02
7 изменённых файлов: 6 добавлений и 1201 удалений

Просмотреть файл

@ -1,178 +0,0 @@
#! /usr/bin/env python
"""
usage: python build_cross_platform_xpi.py <combined-xpi>
<xpi-1> <xpi-2> ... [xpi-n]
This script builds a cross-platform XPI for Weave by simply
extracting the given list of .xpi files in the root Weave
directory into the same temporary build directory; all files with
the same name must be identical. This directory is then zipped up
into a target .xpi in the root directory, whose name is passed as
the 'combined-xpi' argument on the command-line.
The original platform-specific XPIs can be built by running
'make xpi' in the root Weave directory. This will only create
the XPI for the platform it's run on; the other platform-specific
XPIs will need to be run on their respective platforms and moved
over.
Note that in the future, this process will be replaced by a Buildbot
trigger or something much less hackish. See #433927 for more thoughts
on this.
"""
import sys
import os
import glob
import subprocess
import zipfile
from cStringIO import StringIO
from distutils import dir_util
def call(*args):
"""
Call the given command-line and ensure the result is successful.
For instance:
>>> call("python", "-c", "print 'hi there'")
Alternatively, a single list can be passed in:
>>> call(["python", "-c", "print 'hi there'"])
An OSError will be raised if the operation wasn't successful:
>>> call(["python", "-c", "priont 'hi there'"])
Traceback (most recent call last):
...
OSError: Subprocess failed: python -c priont 'hi there'
"""
if isinstance(args[0], basestring):
cmdline = args
else:
assert len(args) == 1
cmdline = args[0]
if subprocess.call(cmdline) != 0:
raise OSError("Subprocess failed: %s" % " ".join(cmdline))
def nuke(path):
"""
Destroys the given path if it exists; equivalent to the Unix shell
command 'rm -rf'.
"""
if os.path.exists(path):
if os.path.isdir(path):
dir_util.remove_tree(path)
else:
os.remove(path)
def ensure_jars_are_identical(jar1data, jar2data):
jars = [zipfile.ZipFile(StringIO(jar1data), "r"),
zipfile.ZipFile(StringIO(jar2data), "r")]
not_in_jars = [
[name
for name in jars[1].namelist()
if name not in jars[0].namelist()],
[name
for name in jars[0].namelist()
if name not in jars[1].namelist()],
]
inconsistencies = [name
for name in jars[0].namelist()
if name in jars[1].namelist() and
jars[0].read(name) != jars[1].read(name)]
if not_in_jars[0] or not_in_jars[1] or inconsistencies:
print ("Jars in different platform-specific XPIs are not "
"identical.")
print not_in_jars[0]
print not_in_jars[1]
print inconsistencies
sys.exit(1)
def ensure_xpis_are_consistent(canonical_xpi, src_xpis):
canonical = zipfile.ZipFile(canonical_xpi, "r")
for filename in src_xpis:
zf = zipfile.ZipFile(filename, "r")
inconsistencies = [name
for name in canonical.namelist()
if name in zf.namelist() and
zf.read(name) != canonical.read(name)]
if inconsistencies:
actually_bad = []
for ifilename in inconsistencies:
if ifilename.endswith(".jar"):
print "Examining %s." % ifilename
# It's possible that the two archives are different
# but contain files that have the same contents;
# this is because zip files contain metadata about
# last modification dates, the system which
# created the archive, and so on, all of which
# are irrelevant for our purposes.
ensure_jars_are_identical(
canonical.read(ifilename),
zf.read(ifilename)
)
print "Contents of jars are consistent."
else:
actually_bad.append(ifilename)
if actually_bad:
print ("The following files are contained in two or more "
"platform-specific XPIs, yet are not identical:\n")
print "\n".join(actually_bad)
sys.exit(1)
def main(args=sys.argv[1:]):
if len(args) < 3:
usage_str = __import__("__main__").__doc__
print usage_str
sys.exit(1)
NEW_XPI = args[0]
BUILD_DIR = "__temp_build"
nuke(NEW_XPI)
nuke(BUILD_DIR)
src_xpis = args[1:]
print "Combining these XPIs into a platform-independent XPI:"
print "\n".join(src_xpis)
dir_util.mkpath(BUILD_DIR)
for filename in src_xpis:
call("unzip",
# Update existing files and create new ones if needed.
"-u",
# Overwrite existing files without prompting.
"-o",
"%s" % filename,
# Extract to the build dir.
"-d", BUILD_DIR)
print "Creating new XPI."
os.chdir(BUILD_DIR)
call(["zip",
# Use optimal compression.
"-9",
# Travel the directory structure recursively.
"-r",
"../%s" % NEW_XPI] + os.listdir("."))
os.chdir("..")
dir_util.remove_tree(BUILD_DIR)
print ("Ensuring the final XPI is consistent with its consitutent "
"parts.")
ensure_xpis_are_consistent(NEW_XPI, src_xpis)
print "Success!"
if __name__ == "__main__":
if len(sys.argv) > 1 and sys.argv[1] == "test":
import doctest
doctest.testmod(verbose=True)
else:
main()

6
tools/build/release Executable file
Просмотреть файл

@ -0,0 +1,6 @@
#!/bin/sh
make clean
make xpi
make
scp dist/xpi/*.xpi dist/stage/update.rdf people.mozilla.com:public_html/account-manager/.

Просмотреть файл

@ -1,22 +0,0 @@
#!/bin/sh
function error() {
[ -z "$1" ] || echo $1
echo "Usage: $0: <x86 sdkdir> <ppc sdkdir> <destination dir>"
exit 1
}
[ -z "$3" ] && error ""
[ -d "$1" ] || error "i386 sdk directory does not exist"
[ -d "$2" ] || error "ppc sdk directory does not exist"
[ -d "$3" ] && error "destination (universal) sdk directory already exists"
cp -r $1 $3
for i in $(ls $1/lib); do
lipo -create -output $3/lib/$i -arch ppc $2/lib/$i -arch i386 $1/lib/$i
done
for i in $(ls $1/bin); do
lipo -create -output $3/bin/$i -arch ppc $2/bin/$i -arch i386 $1/bin/$i
done

Просмотреть файл

@ -1,78 +0,0 @@
"""
Usage: python share.py <root-dir> <owner> <cmd-json>
This utility can be used in conjunction with an Apache-based
WebDAV server to create .htaccess files with readability
permissions that are set by a privileged remote client.
<root-dir> is the local root directory where per-user
directories are kept.
<owner> is the name of the user who is sharing a
directory. Their user directory is assumed to be in
<root-dir>/<owner>.
<cmd-json> is JSON that contains information about the
share. It must have the following keys:
version: This must be 1.
directory: This is the directory to be shared, relative to
the owner's user directory.
share_to_users: This is a list of the users who should be
given access to read the directory. If it is
a list with "all" as its only element, then
the directory is readable by anyone.
If successful, the script displays nothing and exits with a
return code of 0. Otherwise, it displays an error and exits
with a nonzero return code.
"""
import os
import sys
import json
def make_read_share_htaccess(owner, users):
if users == ["all"]:
read_require_perms = "valid-user"
else:
users = set(users + [owner])
users = list(users)
users.sort()
read_require_perms = "user %s" % " ".join(users)
lines = [
"Options +Indexes",
"<Limit GET PROPFIND>",
"Require %s" % read_require_perms,
"</Limit>",
"<LimitExcept GET PROPFIND>",
"Require user %s" % owner,
"</LimitExcept>",
""
]
return "\n".join(lines)
def write_htaccess(root_dir, owner, cmd, file_open = open):
htaccess = make_read_share_htaccess(owner,
cmd["share_to_users"])
user_root_dir = os.path.join(root_dir, owner)
path = os.path.join(user_root_dir, cmd["directory"], ".htaccess")
path = os.path.normpath(path)
if not path.startswith(user_root_dir):
raise Exception("Path doesn't start with user root dir: %s" % path)
file_open(path, "w").write(htaccess)
def write_htaccess_from_json(root_dir, owner, cmd_json,
write_htaccess = write_htaccess):
write_htaccess(root_dir, owner, json.read(cmd_json))
if __name__ == "__main__":
args = sys.argv[1:]
if len(args) < 3:
__main__ = __import__("__main__")
print __main__.__doc__
sys.exit(1)
root_dir, owner, cmd_json = args
write_htaccess_from_json(root_dir, owner, cmd_json)

Просмотреть файл

@ -1,66 +0,0 @@
import unittest
from share import *
class Tests(unittest.TestCase):
def test_write_htaccess_user_works(self, root_dir = None, owner = None,
cmd = None):
if not root_dir:
root_dir = "/home/foo/blarg"
owner = "johndoe"
cmd = {"version" : 1,
"directory" : "public",
"share_to_users" : ["janedoe"]}
def mock_open(path, flag):
assert flag == "w"
assert path == "/home/foo/blarg/johndoe/public/.htaccess"
class MockFile:
def write(self, data):
assert "Require user janedoe johndoe\n" in data
assert "Require user johndoe\n" in data
return MockFile()
write_htaccess(root_dir, owner, cmd, file_open = mock_open)
def test_write_htaccess_all_works(self, root_dir = None, owner = None,
cmd = None):
if not root_dir:
root_dir = "/home/foo/blarg"
owner = "johndoe"
cmd = {"version" : 1,
"directory" : "public",
"share_to_users" : ["all"]}
def mock_open(path, flag):
assert flag == "w"
assert path == "/home/foo/blarg/johndoe/public/.htaccess"
class MockFile:
def write(self, data):
assert "Require valid-user\n" in data
assert "Require user johndoe\n" in data
return MockFile()
write_htaccess(root_dir, owner, cmd, file_open = mock_open)
def test_write_htaccess_from_json_works(self):
write_htaccess_from_json(
"/home/foo/blarg",
"johndoe",
'{"version": 1,"directory":"public","share_to_users":["all"]}',
write_htaccess = self.test_write_htaccess_all_works
)
def test_write_htaccess_stops_evil_dirs(self):
cmd = {"version" : 1,
"directory" : "../valuable-stuff",
"share_to_users" : ["all"]}
try:
write_htaccess("/Users", "johndoe", cmd)
except Exception, e:
assert "Path doesn't start with user root dir" in str(e)
return
assert "Exception not thrown"
if __name__ == "__main__":
unittest.main()

Просмотреть файл

@ -1,477 +0,0 @@
"""
weave_server.py [options]
This is a simple reference implementation for a Weave server,
which can also be used to test the Weave client against.
This server behaves like a standard Weave server, with the
following limitations and/or enhancements:
* Responses to CAPTCHA challenges during new user registration
are always accepted, with the exception that the magic word
'bad' always fails (this was added for testing purposes).
* Validation emails are not sent, though the server tells
clients that they are.
* The url at '/state/' can be accessed to retrieve a snapshot of
the current state of the server. This data can be saved to a
file and restored later with the '--state' command-line
option. The state is also just a Pythonic representation of
the server's state, and as such is relatively human-readable
and can be hand-edited if necessary.
* Timeout values sent with HTTP LOCK requests are ignored.
* The server operates over HTTP, not HTTPS.
"""
from wsgiref.simple_server import make_server
from optparse import OptionParser
import httplib
import base64
import logging
import pprint
import cgi
DEFAULT_PORT = 8000
DEFAULT_REALM = "services.mozilla.com"
CAPTCHA_FAILURE_MAGIC_WORD = "bad"
class HttpResponse(object):
def __init__(self, code, content = "", content_type = "text/plain"):
self.status = "%s %s" % (code, httplib.responses.get(code, ""))
self.headers = [("Content-type", content_type)]
if code == httplib.UNAUTHORIZED:
self.headers += [("WWW-Authenticate",
"Basic realm=\"%s\"" % DEFAULT_REALM)]
if not content:
content = self.status
self.content = content
class HttpRequest(object):
def __init__(self, environ):
self.environ = environ
content_length = environ.get("CONTENT_LENGTH")
if content_length:
stream = environ["wsgi.input"]
self.contents = stream.read(int(content_length))
else:
self.contents = ""
class Perms(object):
# Special identifier to indicate 'everyone' instead of a
# particular user.
EVERYONE = 0
def __init__(self, readers=None, writers=None):
if not readers:
readers = []
if not writers:
writers = []
self.readers = readers
self.writers = writers
def __is_privileged(self, user, access_list):
return (user in access_list or self.EVERYONE in access_list)
def can_read(self, user):
return self.__is_privileged(user, self.readers)
def can_write(self, user):
return self.__is_privileged(user, self.writers)
def __acl_repr(self, acl):
items = []
for item in acl:
if item == self.EVERYONE:
items.append("Perms.EVERYONE")
else:
items.append(repr(item))
return "[" + ", ".join(items) + "]"
def __repr__(self):
return "Perms(readers=%s, writers=%s)" % (
self.__acl_repr(self.readers),
self.__acl_repr(self.writers)
)
def requires_read_access(function):
function._requires_read_access = True
return function
def requires_write_access(function):
function._requires_write_access = True
return function
class WeaveApp(object):
"""
WSGI app for the Weave server.
"""
__CAPTCHA_HTML = '<html><head></head><body><script type=\'text/javascript\'>var RecaptchaOptions = {theme: \'red\', lang: \'en\'};</script><script type="text/javascript" src="http://api.recaptcha.net/challenge?k=6Lc_HwIAAAAAACneEwAadA-wKZCOrjo36TFQv160"></script>\n\n\t<noscript>\n \t\t<iframe src="http://api.recaptcha.net/noscript?k=6Lc_HwIAAAAAACneEwAadA-wKZCOrjo36TFQv160" height="300" width="500" frameborder="0"></iframe><br/>\n \t\t<textarea name="recaptcha_challenge_field" rows="3" cols="40"></textarea>\n \t\t<input type="hidden" name="recaptcha_response_field" value="manual_challenge"/>\n\t</noscript></body></html>'
def __init__(self, state=None):
self.contents = {}
self.dir_perms = {"/" : Perms(readers=[Perms.EVERYONE])}
self.passwords = {}
self.email = {}
self.locks = {}
self._tokenIds = 0
if state:
self.__setstate__(state)
def add_user(self, username, password, email = None):
assert username, "Username cannot be empty"
assert password, "Password cannot be empty"
home_dir = "/user/%s/" % username
public_dir = home_dir + "public/"
self.dir_perms[home_dir] = Perms(readers=[username],
writers=[username])
self.dir_perms[public_dir] = Perms(readers=[Perms.EVERYONE],
writers=[username])
self.passwords[username] = password
if email:
self.email[email] = username
def __get_perms_for_path(self, path):
possible_perms = [dirname for dirname in self.dir_perms
if path.startswith(dirname)]
possible_perms.sort(key = len)
perms = possible_perms[-1]
return self.dir_perms[perms]
def __get_files_in_dir(self, path):
return [filename for filename in self.contents
if filename.startswith(path)]
def __api_share(self, path):
params = cgi.parse_qs(self.request.contents)
user = params["uid"][0]
password = params["password"][0]
if self.passwords.get(user) != password:
return HttpResponse(httplib.UNAUTHORIZED)
else:
import json
cmd = json.read(params["cmd"][0])
dirname = "/user/%s/%s" % (user, cmd["directory"])
if not dirname.endswith("/"):
dirname += "/"
readers = []
for reader in cmd["share_to_users"]:
if reader == "all":
readers.append(Perms.EVERYONE)
else:
readers.append(reader)
if user not in readers:
readers.append(user)
self.dir_perms[dirname] = Perms(readers = readers,
writers = [user])
return HttpResponse(httplib.OK, "OK")
# Registration API
def __api_register_check(self, what, where):
what = what.strip("/")
if what.strip() == "":
return HttpResponse(400,
self.ERR_WRONG_HTTP_METHOD)
if what in where:
return HttpResponse(httplib.OK,
self.ERR_UID_OR_EMAIL_IN_USE)
else:
return HttpResponse(httplib.OK,
self.ERR_UID_OR_EMAIL_AVAILABLE)
ERR_UID_OR_EMAIL_AVAILABLE = "1"
ERR_WRONG_HTTP_METHOD = "-1"
ERR_MISSING_UID = "-2"
ERR_INVALID_UID = "-3"
ERR_UID_OR_EMAIL_IN_USE = "0"
ERR_EMAIL_IN_USE = "-5"
ERR_MISSING_PASSWORD = "-8"
ERR_MISSING_RECAPTCHA_CHALLENGE_FIELD = "-6"
ERR_MISSING_RECAPTCHA_RESPONSE_FIELD = "-7"
ERR_MISSING_NEW = "-11"
ERR_INCORRECT_PASSWORD = "-12"
ERR_ACCOUNT_CREATED_VERIFICATION_SENT = "2"
ERR_ACCOUNT_CREATED = "3"
__REQUIRED_CHANGE_PASSWORD_FIELDS = ["uid", "password", "new"]
__REQUIRED_NEW_ACCOUNT_FIELDS = ["uid",
"password",
"recaptcha_challenge_field",
"recaptcha_response_field"]
__FIELD_ERRORS = {
"uid" : ERR_MISSING_UID,
"password" : ERR_MISSING_PASSWORD,
"new" : ERR_MISSING_NEW,
"recaptcha_challenge_field" : ERR_MISSING_RECAPTCHA_CHALLENGE_FIELD,
"recaptcha_response_field" : ERR_MISSING_RECAPTCHA_RESPONSE_FIELD
}
def __get_fields(self, required_fields):
params = cgi.parse_qs(self.request.contents)
fields = {}
for name in params:
fields[name] = params[name][0]
for name in required_fields:
if not fields.get(name):
return HttpResponse(httplib.BAD_REQUEST,
self.__FIELD_ERRORS[name])
return fields
def __api_create_account(self, path):
fields = self.__get_fields(self.__REQUIRED_NEW_ACCOUNT_FIELDS)
if isinstance(fields, HttpResponse):
return fields
if fields["uid"] in self.passwords:
return HttpResponse(httplib.BAD_REQUEST,
self.ERR_UID_OR_EMAIL_IN_USE)
if fields["recaptcha_response_field"] == CAPTCHA_FAILURE_MAGIC_WORD:
return HttpResponse(httplib.EXPECTATION_FAILED)
if fields.get("mail"):
if self.email.get(fields["mail"]):
return HttpResponse(httplib.BAD_REQUEST,
self.ERR_EMAIL_IN_USE)
# TODO: We're not actually sending an email...
body_code = self.ERR_ACCOUNT_CREATED_VERIFICATION_SENT
else:
body_code = self.ERR_ACCOUNT_CREATED
self.add_user(fields["uid"], fields["password"],
fields.get("mail"))
return HttpResponse(httplib.CREATED, body_code)
def __api_change_password(self, path):
fields = self.__get_fields(self.__REQUIRED_CHANGE_PASSWORD_FIELDS)
if isinstance(fields, HttpResponse):
return fields
if not self.passwords.get(fields["uid"]):
return HttpResponse(httplib.BAD_REQUEST,
self.ERR_INVALID_UID)
if self.passwords[fields["uid"]] != fields["password"]:
return HttpResponse(httplib.BAD_REQUEST,
self.ERR_INCORRECT_PASSWORD)
self.passwords[fields["uid"]] = fields["new"]
return HttpResponse(httplib.OK)
# HTTP method handlers
@requires_write_access
def _handle_LOCK(self, path):
if path in self.locks:
return HttpResponse(httplib.LOCKED)
token = "opaquelocktoken:%d" % self._tokenIds
self._tokenIds += 1
self.locks[path] = token
response = """<?xml version="1.0" encoding="utf-8"?>
<D:prop xmlns:D="DAV:">
<D:lockdiscovery>
<D:activelock>
<D:locktoken>
<D:href>%s</D:href>
</D:locktoken>
</D:activelock>
</D:lockdiscovery>
</D:prop>""" % token
return HttpResponse(httplib.OK, response, content_type="text/xml")
@requires_write_access
def _handle_UNLOCK(self, path):
token = self.request.environ["HTTP_LOCK_TOKEN"]
if path not in self.locks:
return HttpResponse(httplib.BAD_REQUEST)
if token == "<%s>" % self.locks[path]:
del self.locks[path]
return HttpResponse(httplib.NO_CONTENT)
return HttpResponse(httplib.BAD_REQUEST)
@requires_write_access
def _handle_MKCOL(self, path):
return HttpResponse(httplib.OK)
@requires_write_access
def _handle_PUT(self, path):
self.contents[path] = self.request.contents
return HttpResponse(httplib.OK)
def _handle_POST(self, path):
if path == "/api/share/":
return self.__api_share(path)
elif path == "/api/register/new/":
return self.__api_create_account(path)
elif path == "/api/register/chpwd/":
return self.__api_change_password(path)
else:
return HttpResponse(httplib.NOT_FOUND)
@requires_write_access
def _handle_PROPFIND(self, path):
response = """<?xml version="1.0" encoding="utf-8"?>
<D:multistatus xmlns:D="DAV:" xmlns:ns0="DAV:">"""
path_template = """<D:response>
<D:href>%(href)s</D:href>
<D:propstat>
<D:prop>
%(props)s
</D:prop>
<D:status>HTTP/1.1 200 OK</D:status>
</D:propstat>
</D:response>"""
if path in self.locks:
props = "<D:locktoken><D:href>%s</D:href></D:locktoken>" % (
self.locks[path]
)
else:
props = ""
response += path_template % {"href": path,
"props": props}
if path.endswith("/"):
for filename in self.__get_files_in_dir(path):
response += path_template % {"href" : filename,
"props" : ""}
response += """</D:multistatus>"""
return HttpResponse(httplib.MULTI_STATUS, response,
content_type="text/xml")
@requires_write_access
def _handle_DELETE(self, path):
response = HttpResponse(httplib.OK)
if path.endswith("/"):
# Delete a directory.
for filename in self.__get_files_in_dir(path):
del self.contents[filename]
else:
# Delete a file.
if path not in self.contents:
response = HttpResponse(httplib.NOT_FOUND)
else:
del self.contents[path]
return response
@requires_read_access
def _handle_GET(self, path):
if path in self.contents:
return HttpResponse(httplib.OK, self.contents[path])
elif path == "/state/":
state_str = pprint.pformat(self.__getstate__())
return HttpResponse(httplib.OK, state_str)
elif path == "/api/register/new/":
return HttpResponse(httplib.OK, self.__CAPTCHA_HTML,
content_type = "text/html")
elif path.startswith("/api/register/check/"):
return self.__api_register_check(path[20:], self.passwords)
elif path.startswith("/api/register/chkmail/"):
return self.__api_register_check(path[22:], self.email)
elif path.endswith("/"):
return self.__show_index(path)
else:
return HttpResponse(httplib.NOT_FOUND)
def __getstate__(self):
state = {}
state.update(self.__dict__)
del state["request"]
return state
def __setstate__(self, state):
self.__dict__.update(state)
def __show_index(self, path):
output = []
for filename in self.__get_files_in_dir(path):
output.append("<p><a href=\"%s\">%s</a></p>" % (filename,
filename))
if output:
output = "".join(output)
else:
output = ("<p>There are no files under the "
"directory <tt>%s</tt>.</p>" % (path))
return HttpResponse(httplib.OK, output, content_type="text/html")
def __process_handler(self, handler):
response = None
auth = self.request.environ.get("HTTP_AUTHORIZATION")
if auth:
user, password = base64.b64decode(auth.split()[1]).split(":")
if self.passwords.get(user) != password:
response = HttpResponse(httplib.UNAUTHORIZED)
else:
user = Perms.EVERYONE
if response is None:
path = self.request.environ["PATH_INFO"]
perms = self.__get_perms_for_path(path)
checks = []
if hasattr(handler, "_requires_read_access"):
checks.append(perms.can_read)
if hasattr(handler, "_requires_write_access"):
checks.append(perms.can_write)
for check in checks:
if not check(user):
response = HttpResponse(httplib.UNAUTHORIZED)
if response is None:
response = handler(path)
return response
def __call__(self, environ, start_response):
"""
Main WSGI application method.
"""
self.request = HttpRequest(environ)
method = "_handle_%s" % environ["REQUEST_METHOD"]
# See if we have a method called 'handle_<method>', where
# <method> is the name of the HTTP method to call. If we do,
# then call it.
if hasattr(self, method):
handler = getattr(self, method)
response = self.__process_handler(handler)
else:
response = HttpResponse(
httplib.METHOD_NOT_ALLOWED,
"Method %s is not yet implemented." % method
)
start_response(response.status, response.headers)
return [response.content]
if __name__ == "__main__":
usage = __import__("__main__").__doc__
parser = OptionParser(usage = usage)
parser.add_option("-s", "--state", dest="state_filename",
help="retrieve server state from filename")
options, args = parser.parse_args()
print "Weave Development Server"
print
print "Run this script with '-h' for usage information."
logging.basicConfig(level=logging.DEBUG)
if options.state_filename:
filename = options.state_filename
logging.info("Setting initial state from '%s'." % filename)
data = open(filename, "r").read()
state = eval(data)
app = WeaveApp(state)
else:
app = WeaveApp()
logging.info("Serving on port %d." % DEFAULT_PORT)
httpd = make_server('', DEFAULT_PORT, app)
httpd.serve_forever()

Просмотреть файл

@ -1,380 +0,0 @@
"""
Simple script to test Weave server support and ensure that it
works properly.
"""
import sys
import urllib
import urllib2
import httplib
from urlparse import urlsplit
from xml.etree import cElementTree as ET
import json
import weave_server
import threading
class DavRequest(urllib2.Request):
def __init__(self, method, *args, **kwargs):
urllib2.Request.__init__(self, *args, **kwargs)
self.__method = method
def get_method(self):
return self.__method
class DavHandler(urllib2.BaseHandler):
def _normal_response(self, req, fp, code, msg, headers):
return fp
# Multi-status
http_error_207 = _normal_response
# Created
http_error_201 = _normal_response
# Accepted
http_error_202 = _normal_response
# No content
http_error_204 = _normal_response
class WeaveSession(object):
def __init__(self, username, password, server_url,
realm = weave_server.DEFAULT_REALM):
self.username = username
self.server_url = server_url
self.server = urlsplit(server_url).netloc
self.realm = realm
self.password = password
def clone(self):
return WeaveSession(self.username, self.password,
self.server_url, self.realm)
def _open(self, req):
davHandler = DavHandler()
authHandler = urllib2.HTTPBasicAuthHandler()
authHandler.add_password(self.realm,
self.server,
self.username,
self.password)
opener = urllib2.build_opener(authHandler, davHandler)
return opener.open(req)
def _get_user_url(self, path, user = None):
if not user:
user = self.username
if path.startswith("/"):
path = path[1:]
url = "%s/user/%s/%s" % (self.server_url,
user,
path)
return url
def list_files(self, path):
xml_data = ("<?xml version=\"1.0\" encoding=\"utf-8\" ?>"
"<D:propfind xmlns:D='DAV:'><D:prop/></D:propfind>")
url = self._get_user_url(path)
headers = {"Content-type" : "text/xml; charset=\"utf-8\"",
"Depth" : "1"}
req = DavRequest("PROPFIND", url, xml_data, headers = headers)
result_xml = self._open(req).read()
multistatus = ET.XML(result_xml)
hrefs = multistatus.findall(".//{DAV:}href")
root = hrefs[0].text
return [href.text[len(root):] for href in hrefs[1:]]
def create_dir(self, path):
req = DavRequest("MKCOL", self._get_user_url(path))
self._open(req)
def remove_dir(self, path):
if not path[-1] == "/":
path += "/"
self.delete_file(path)
def get_file(self, path, user = None):
obj = self._open(self._get_user_url(path, user))
return obj.read()
def put_file(self, path, data):
req = DavRequest("PUT", self._get_user_url(path), data)
self._open(req)
def delete_file(self, path):
req = DavRequest("DELETE", self._get_user_url(path))
self._open(req)
def lock_file(self, path):
headers = {"Content-type" : "text/xml; charset=\"utf-8\"",
"Depth" : "infinity",
"Timeout": "Second-600"}
xml_data = ("<?xml version=\"1.0\" encoding=\"utf-8\" ?>\n"
"<D:lockinfo xmlns:D=\"DAV:\">\n"
" <D:locktype><D:write/></D:locktype>\n"
" <D:lockscope><D:exclusive/></D:lockscope>\n"
"</D:lockinfo>")
req = DavRequest("LOCK", self._get_user_url(path), xml_data,
headers = headers)
result_xml = self._open(req).read()
response = ET.XML(result_xml)
token = response.find(".//{DAV:}locktoken/{DAV:}href").text
return token
def unlock_file(self, path, token):
headers = {"Lock-Token" : "<%s>" % token}
req = DavRequest("UNLOCK", self._get_user_url(path),
headers = headers)
self._open(req)
def ensure_unlock_file(self, path):
xml_data = ("<?xml version=\"1.0\" encoding=\"utf-8\" ?>"
"<D:propfind xmlns:D='DAV:'>"
"<D:prop><D:lockdiscovery/></D:prop></D:propfind>")
url = self._get_user_url(path)
headers = {"Content-type" : "text/xml; charset=\"utf-8\"",
"Depth" : "0"}
req = DavRequest("PROPFIND", url, xml_data, headers = headers)
try:
result_xml = self._open(req).read()
except urllib2.HTTPError, e:
return
multistatus = ET.XML(result_xml)
href = multistatus.find(".//{DAV:}locktoken/{DAV:}href")
if href is not None:
self.unlock_file(path, href.text)
def does_email_exist(self, email):
return self._does_entity_exist("chkmail", email)
def does_username_exist(self, username):
return self._does_entity_exist("check", username)
def _does_entity_exist(self, entity_kind, entity):
url = "%s/api/register/%s/%s" % (self.server_url,
entity_kind,
entity)
result = int(self._open(url).read())
if result == 0:
return True
elif result == 1:
return False
else:
raise Exception("Unexpected result code: %d" % result)
def change_password(self, new_password):
url = "%s/api/register/chpwd/" % (self.server_url)
postdata = urllib.urlencode({"uid" : self.username,
"password" : self.password,
"new" : new_password})
req = urllib2.Request(url, postdata)
self._open(req).read()
self.password = new_password
def share_with_users(self, path, users):
url = "%s/api/share/" % (self.server_url)
cmd = {"version" : 1,
"directory" : path,
"share_to_users" : users}
postdata = urllib.urlencode({"cmd" : json.write(cmd),
"uid" : self.username,
"password" : self.password})
req = urllib2.Request(url, postdata)
result = self._open(req).read()
if result != "OK":
raise Exception("Share attempt failed: %s" % result)
def ensure_weave_disallows_php(session):
print "Ensuring that weave disallows PHP upload and execution."
session.put_file("phptest.php", "<?php echo 'hai2u!' ?>")
try:
if session.get_file("phptest.php") == "hai2u!":
raise Exception("Weave server allows PHP execution!")
finally:
session.delete_file("phptest.php")
def _do_test(session_1, session_2):
print "Ensuring that user '%s' exists." % session_1.username
assert session_1.does_username_exist(session_1.username)
print "Ensuring that user '%s' exists." % session_2.username
assert session_1.does_username_exist(session_2.username)
print "Changing password of user '%s' to 'blarg'." % session_1.username
old_pwd = session_1.password
old_session = session_1.clone()
try:
session_1.change_password("blarg")
except urllib2.HTTPError, e:
if (e.code == httplib.BAD_REQUEST and
e.read() == weave_server.WeaveApp.ERR_INCORRECT_PASSWORD):
print ("That didn't work; an old run of this test may "
"have been aborted. Trying to revert...")
session_1.password = "blarg"
session_1.change_password(old_pwd)
print "Revert successful, attempting to change password again."
session_1.change_password("blarg")
else:
raise
try:
print "Ensuring we can't log in using old password."
old_session.change_password("fnarg")
except urllib2.HTTPError, e:
if e.code != httplib.BAD_REQUEST:
raise
content = e.read()
if content != weave_server.WeaveApp.ERR_INCORRECT_PASSWORD:
raise AssertionError("Bad return value: %s" % content)
else:
raise AssertionError("We could log in using the old password!")
print "Reverting back to old password."
session_1.change_password(old_pwd)
print "Ensuring that file is not locked."
session_1.ensure_unlock_file("test_lock")
print "Locking file"
session_1.lock_file("test_lock")
print "Unlocking file by querying for its token"
session_1.ensure_unlock_file("test_lock")
print "Locking file again"
token = session_1.lock_file("test_lock")
try:
print "Ensuring that we can't re-lock the file."
session_1.lock_file("test_lock")
except urllib2.HTTPError, e:
if e.code != httplib.LOCKED:
raise
else:
raise AssertionError("We can re-lock the file!")
print "Unlocking file"
session_1.unlock_file("test_lock", token)
print "Ensuring that PROPFIND on the user's home dir works."
files = session_1.list_files("")
print "Cleaning up any files left over from a failed previous test."
if "blargle/bloop" in files:
session_1.delete_file("blargle/bloop")
if "blargle/" in files:
session_1.remove_dir("blargle")
print "Creating directory."
session_1.create_dir("blargle")
print "Ensuring that directory indexes don't raise errors."
session_1.get_file("")
try:
print "Creating temporary file."
session_1.put_file("blargle/bloop", "hai2u!")
print "Verifying that temporary file is listed."
assert "bloop" in session_1.list_files("blargle/")
try:
assert session_1.get_file("blargle/bloop") == "hai2u!"
session_1.share_with_users("blargle", [])
try:
print "Ensuring user 2 can't read user 1's file."
session_2.get_file("blargle/bloop", session_1.username)
except urllib2.HTTPError, e:
if e.code != httplib.UNAUTHORIZED:
raise
else:
raise AssertionError("User 2 can read user 1's file!")
print "Sharing directory with user 2."
session_1.share_with_users("blargle", [session_2.username])
print "Ensuring user 2 can read user 1's file."
assert session_2.get_file("blargle/bloop",
session_1.username) == "hai2u!"
print "Sharing directory with everyone."
session_1.share_with_users("blargle", ["all"])
print "Ensuring user 2 can read user 1's file."
assert session_2.get_file("blargle/bloop",
session_1.username) == "hai2u!"
finally:
session_1.delete_file("blargle/bloop")
finally:
print "Removing directory."
session_1.remove_dir("blargle")
ensure_weave_disallows_php(session_1)
print "Test complete."
def redirect_stdio(func):
def wrapper(*args, **kwargs):
from cStringIO import StringIO
old_stdio = [sys.stdout, sys.stderr]
stream = StringIO()
sys.stderr = sys.stdout = stream
try:
try:
return func(*args, **kwargs)
except Exception, e:
import traceback
traceback.print_exc()
raise Exception("Test failed:\n\n%s" % stream.getvalue())
finally:
sys.stderr, sys.stdout = old_stdio
wrapper.__name__ = func.__name__
return wrapper
@redirect_stdio
def test_weave_server():
server_url = "http://127.0.0.1:%d" % weave_server.DEFAULT_PORT
username_1 = "foo"
password_1 = "test123"
username_2 = "bar"
password_2 = "test1234"
start_event = threading.Event()
def run_server():
app = weave_server.WeaveApp()
app.add_user(username_1, password_1)
app.add_user(username_2, password_2)
httpd = weave_server.make_server('', weave_server.DEFAULT_PORT, app)
start_event.set()
while 1:
request, client_address = httpd.socket.accept()
httpd.process_request(request, client_address)
thread = threading.Thread(target=run_server)
thread.setDaemon(True)
thread.start()
start_event.wait()
session_1 = WeaveSession(username_1, password_1, server_url)
session_2 = WeaveSession(username_2, password_2, server_url)
_do_test(session_1, session_2)
if __name__ == "__main__":
args = sys.argv[1:]
if len(args) < 5:
print ("usage: %s <server-url> <username-1> <password-1> "
"<username-2> <password-2>" % sys.argv[0])
sys.exit(1)
server_url = args[0]
username_1 = args[1]
password_1 = args[2]
username_2 = args[3]
password_2 = args[4]
session_1 = WeaveSession(username_1, password_1, server_url)
session_2 = WeaveSession(username_2, password_2, server_url)
_do_test(session_1, session_2)