#!/usr/bin/env python """ Usage: schematic.py path/to/schema_files schematic talks to your system over stdin on the command line. It will do SQL migrations by talking directly to your database. It will also do migrations for any language, so you can do almost anything in the migration. It supports all DBMSs that have a command line interface and doesn't care what programming language you worship. Win! Schematic expects 1 argument which is the directory full of schema, DDL or script files you wish to migrate. Configuration is done in `settings.py`, which should look something like: # How to connect to the database db = 'mysql --silent -p blam -D pow' # The table where version info is stored. table = 'schema_version' # Optionally, how you want to handle something that's not SQL handlers = {'.py': 'python -B manage.py runscript migrations.%s'} It's python so you can do whatever crazy things you want, and it's a separate file so you can keep local settings out of version control. schematic will try to look for settings.py on the PYTHON_PATH and then in the migrations directory. Migrations are just files whose names start with a number, like `001-adding-awesome.sql`. They're matched against `'^\d+'` so you can put zeros in front to keep ordering in `ls` happy, and whatever you want after the migration number, such as text describing the migration. If a file ends with no extension or .sql it is treated as SQL and passed to your database. If the file ends with an extension in handlers, the file is passed to that command to do with as it wishes. schematic creates a table (named in settings.py) with one column, that holds one row, which describes the current version of the database. Any migration file with a number greater than the current version will be applied to the database and the version tracker will be upgraded. The migration and version bump are performed in a transaction. The version-tracking table will initially be set to 0, so the 0th migration could be a script that creates all your tables (for reference). Migration numbers are not required to increase linearly. schematic doesn't pretend to be intelligent. Running migrations manually without upgrading the version tracking will throw things off. Tested on sqlite and mysql. NOTE: any superfluous output, like column headers, will cause an error. On mysql, this is fixed by using the `--silent` parameter. """ import optparse import os import re import six import sys import time from subprocess import Popen, PIPE, STDOUT SETTINGS = 'settings' VARIABLES = ['db', 'table', 'handlers'] OPTIONAL_VARIABLES = {'handlers': {}} IGNORE_FILES = ['.pyc'] CREATE = 'CREATE TABLE %s (version INTEGER NOT NULL);' COUNT = 'SELECT COUNT(version) FROM %s;' SELECT = 'SELECT version FROM %s;' INSERT = 'INSERT INTO %s (version) VALUES (%s);' UPDATE = 'UPDATE %s SET version = %s;' UPGRADE = 'BEGIN;\n%s\n%s\nCOMMIT;' class SchematicError(Exception): """Base class for custom errors.""" class MissingSettings(SchematicError): def __init__(self): super(MissingSettings, self).__init__("Couldn't import settings file") class SettingsError(SchematicError): def __init__(self, key): super(SettingsError, self).__init__( "Couldn't find value for '%s' in %s.py" % (key, SETTINGS)) class DbError(SchematicError): def __init__(self, cmd, stdout, stderr, returncode): msg = '\n'.join( ["Had trouble running this: %s", "stdout: %s", "stderr: %s", "returncode: %s"]) super(DbError, self).__init__( msg % (cmd, stdout, stderr, returncode)) class MultipleMigrations(SchematicError): def __init__(self, num): super(MultipleMigrations, self).__init__( 'Multiple migrations with number: %d' % num) class ExternalError(DbError): pass def get_settings(schema_dir): # Also search for settings in the schema_dir. sys.path.append(schema_dir) try: import schematic_settings as settings except ImportError: try: import settings except ImportError: raise MissingSettings for key in VARIABLES: try: getattr(settings, key) except AttributeError: if key not in OPTIONAL_VARIABLES: raise SettingsError(key) return settings def say(db, command): """Try talking to the database, bail if there's anything on stderr or a bad returncode.""" process = Popen(db, stdin=PIPE, stdout=PIPE, stderr=PIPE, shell=True) stdout, stderr = process.communicate(command.encode('utf-8')) if stderr or process.returncode != 0: raise DbError(command, stdout, stderr, process.returncode) else: return stdout def ext(command): """Run an external command and blow up if theres an error.""" process = Popen( command, stdin=PIPE, stdout=PIPE, stderr=STDOUT, shell=True) stdout, stderr = process.communicate() # Ignoring stderr because we don't want to break on deprecation warnings # if you want to stop the migration, raise an error. if process.returncode != 0: raise ExternalError(command, stdout, stderr, process.returncode) else: return stdout def table_check(db, table): try: # Try a count to see if the table is there. count = int(say(db, COUNT % table)) except DbError: # Try to create the table. say(db, CREATE % table) count = 0 # Start tracking at version 0. if count == 0: say(db, INSERT % (table, 0)) def find_upgrades(schema_dir): files = filter( os.path.isfile, map(lambda p: os.path.join(schema_dir, p), os.listdir(schema_dir))) upgrades = {} for file_ in files: if os.path.splitext(file_)[1] in IGNORE_FILES: continue match = re.match('^(\d+)', os.path.basename(file_)) if match: num = int(match.group(0)) if num in upgrades: raise MultipleMigrations(num) upgrades[num] = file_ return upgrades def run_upgrades(db, table, schema_dir, maximum=None, handlers={}, fake=False): current = get_version(db, table) all_upgrades = find_upgrades(schema_dir).items() upgrades = sorted([ (version, path) for version, path in all_upgrades if version > current] ) if fake: version = upgrades[-1][0] update = UPDATE % (table, version) print('Faking migrations all the way up to %s' % version) say(db, update) else: for version, path in upgrades: if maximum and version > maximum: print('Reached max version: %s' % maximum) break start = time.time() upgrade(db, table, version, path, handlers) print('That took %.2f seconds' % (time.time() - start)) print('#' * 50, '\n') def upgrade(db, table, version, path, handlers): extension = os.path.splitext(path)[1] handler = handlers.get(extension) update = UPDATE % (table, version) if not extension or extension == '.sql': sql = open(path).read() print('Running SQL migration %s:\n' % version, sql) say(db, UPGRADE % (sql, update)) elif handler: cmd = handler % (os.path.splitext(os.path.basename(path))[0]) print('Running %s migation %s:\n' % (extension, version), cmd) print(ext(cmd)) say(db, update) else: raise NotImplementedError("Don't know how to migrate: %s" % path) def get_version(db, table): return int(say(db, SELECT % table)) def main(schema_dir, maximum=None, fake=False): settings = get_settings(schema_dir) db, table = settings.db, settings.table table_check(db, table) if maximum: print('Up to max: %s' % maximum) run_upgrades(db, table, schema_dir, maximum, getattr(settings, 'handlers', OPTIONAL_VARIABLES['handlers']), fake) def update(schema_dir, version): settings = get_settings(schema_dir) table_check(settings.db, settings.table) say(settings.db, UPDATE % (settings.table, version)) def version(schema_dir): settings = get_settings(schema_dir) print(get_version(settings.db, settings.table)) if __name__ == '__main__': dir_ = '/path/to/migrations/dir' error = "Expected a directory: %s" % dir_ # No arguments yet, but we'll get there. parser = optparse.OptionParser(usage="Usage: %%prog %s" % dir_) parser.add_option('-u', '--update', dest='update', default=False, help='Update schema tracking table to this version ' '(without running any migrations)') parser.add_option('-v', '--version', dest='version', default=False, action='store_true', help='Print the current schema version (without ' 'running any migrations)') parser.add_option('-m', '--max', dest='max', default=None, action='store', type='int', help='Stop running migrations after the specified ' 'revision.') parser.add_option('-F', '--fake', dest='fake', default=False, action='store_true', help='Mark migrations as run without actually running ' 'them') options, args = parser.parse_args() if len(args) != 1: parser.error(error) path = os.path.realpath(args[0]) if not os.path.isdir(path): parser.error(error) try: if options.update: update(path, options.update) elif options.version: version(path) else: main(path, options.max, options.fake) except SchematicError as exc: print('Error:', exc) sys.exit(1)