git @ Cat's Eye Technologies ellsync / master src / ellsync / main.py
master

Tree @master (Download .tar.gz)

main.py @masterraw · history · blame

# Copyright (c) 2024 Chris Pressey, Cat's Eye Technologies
# This file is distributed under an MIT license.  See LICENSES directory.
# SPDX-License-Identifier: LicenseRef-MIT-X-ellsync

from argparse import ArgumentParser
import json
import os
import sys
from subprocess import Popen, STDOUT, PIPE
from tempfile import gettempdir


# - - - - utilities - - - -


def clean_dir(dirname):
    if not dirname.endswith('/'):
        dirname += '/'
    return dirname


def run_command(argv, quiet=False):
    def pretty(s):
        return '"{}"'.format(s) if ' ' in s else s
    if not quiet:
        sys.stdout.write(' '.join([pretty(a) for a in argv]) + '\n')
    try:
        p = Popen(argv, stderr=STDOUT, stdout=PIPE, encoding='utf-8')
        decode_line = lambda line: line
    except TypeError:
        # python 2.x
        p = Popen(argv, stderr=STDOUT, stdout=PIPE)
        decode_line = lambda line: line.decode('utf-8')
    if not quiet:
        for line in p.stdout:
            sys.stdout.write(decode_line(line))
            sys.stdout.flush()
    return p.wait()


def traverse_directories(f, basedir, dirname):
    fulldirname = os.path.join(basedir, dirname)
    for filename in sorted(os.listdir(fulldirname)):
        subname = os.path.join(dirname, filename)
        fullname = os.path.join(fulldirname, filename)
        if os.path.isdir(fullname) and not os.path.islink(fullname):
            traverse_directories(f, basedir, subname)
        else:
            f.write(subname + "\n")


def obtain_dirs_for_stream(router, stream_name):
    if ':' in stream_name:
        stream_name, subdir = stream_name.split(':')
    else:
        subdir = None
    stream = router[stream_name]
    from_dir = stream['from']
    to_dir = stream['to']
    if subdir:
        from_dir = os.path.join(from_dir, subdir)
        to_dir = os.path.join(to_dir, subdir)
    return from_dir, to_dir


def create_manifest_file_if_needed(manifest_filename, from_dir):
    if not os.path.isfile(manifest_filename):
        print("Cannot read {}, creating it".format(manifest_filename))
        with open(manifest_filename, "w") as f:
            traverse_directories(f, from_dir, '')


# - - - - commands - - - -


def list_(router, options):
    for stream_name, stream in sorted(router.items()):
        if os.path.isdir(stream['from']) and os.path.isdir(stream['to']):
            if options.stream_name_only:
                print(stream_name)
            else:
                print("{}: {} => {}".format(stream_name, stream['from'], stream['to']))


def sync(router, options):
    for stream_name in options.stream_names:
        from_dir, to_dir = obtain_dirs_for_stream(router, stream_name)
        _sync_directories(from_dir, to_dir, options)
    if options.apply:
        run_command(['sync'])


def _sync_directories(from_dir, to_dir, options):
    from_dir = clean_dir(from_dir)
    to_dir = clean_dir(to_dir)

    for d in (from_dir, to_dir):
        if not os.path.isdir(d):
            raise ValueError("Directory '{}' is not present".format(d))

    if options.reverse:
        signal_filename = os.path.join(from_dir, '.reverse-to-here')
        if options.apply and not os.path.isfile(signal_filename):
            raise IOError(
                "To perform a reverse sync operation, you must create "
                "a file called '{}' to signal your intent. It will be "
                "deleted as part of the sync.".format(signal_filename)
            )
        to_dir, from_dir = from_dir, to_dir

    argv = ['rsync']
    if not options.apply:
        argv.append('--dry-run')
    if options.thorough:
        argv.append('--checksum')
    argv.extend(['--archive', '--verbose', '--delete', from_dir, to_dir])
    run_command(argv)


def rename(router, options):
    stream_name = options.stream_name
    if ':' in stream_name:
        stream_name, subdir = options.stream_name.split(':')
        assert subdir == ''

    stream = router[stream_name]
    from_dir = stream['from']
    to_dir = stream['to']

    existing_subdir_a = clean_dir(os.path.join(from_dir, options.existing_subdir_name))
    new_subdir_a = clean_dir(os.path.join(from_dir, options.new_subdir_name))

    if not os.path.isdir(existing_subdir_a):
        raise ValueError("Directory '{}' is not present".format(existing_subdir_a))
    if os.path.isdir(new_subdir_a):
        raise ValueError("Directory '{}' already exists".format(new_subdir_a))

    existing_subdir_b = clean_dir(os.path.join(to_dir, options.existing_subdir_name))
    new_subdir_b = clean_dir(os.path.join(to_dir, options.new_subdir_name))

    if not os.path.isdir(existing_subdir_b):
        raise ValueError("Directory '{}' is not present".format(existing_subdir_b))
    if os.path.isdir(new_subdir_b):
        raise ValueError("Directory '{}' already exists".format(new_subdir_b))

    print("Renaming {} to {}".format(existing_subdir_a, new_subdir_a))
    os.rename(existing_subdir_a, new_subdir_a)
    print("Renaming {} to {}".format(existing_subdir_b, new_subdir_b))
    os.rename(existing_subdir_b, new_subdir_b)


def verify(router, options):
    from_dir, to_dir = obtain_dirs_for_stream(router, options.stream_name)

    active = False
    if options.continue_from:
        print("Continuing from {}".format(options.continue_from))
    else:
        active = True

    if not options.manifest:
        stream_name = options.stream_name
        stream_name = stream_name.replace('/', '_').replace(':', '_')
        manifest_filename = os.path.join(
            gettempdir(),
            "ellsync-manifest-{}.lst".format(stream_name)
        )
    else:
        manifest_filename = options.manifest
    create_manifest_file_if_needed(manifest_filename, from_dir)

    print("Traversing manifest {}".format(manifest_filename))

    bads = []
    with open(manifest_filename, "r") as f:
        for line in f:
            path = line.strip()

            from_path = os.path.join(from_dir, path)
            to_path = os.path.join(to_dir, path)

            assert not os.path.isdir(from_path)

            if not active:
                if path == options.continue_from:
                    print("Found {}, resuming verify".format(options.continue_from))
                    active = True
                else:
                    continue

            exit_code = run_command(
                ['diff', '--brief', '--new-file', from_path, to_path],
                quiet=True
            )
            if exit_code == 0:
                print("[OK!] {}".format(path))
            else:
                print("[BAD] {}".format(path))
                bads.append(path)

    print("Traversal complete, deleting manifest {}".format(manifest_filename))
    os.unlink(manifest_filename)
    if bads:
        print("Files that FAILED verification:")
        for path in bads:
            print("[BAD] {}".format(path))
        raise ValueError("Not all files in run verified successfully")
    else:
        print("All files in run verified successfully.")


# - - - - driver - - - -


def main(args):
    argparser = ArgumentParser()

    argparser.add_argument('router', metavar='ROUTER', type=str,
        help='JSON file containing the backup router description'
    )
    argparser.add_argument('--version', action='version', version="%(prog)s 0.6")

    subparsers = argparser.add_subparsers()

    # - - - - list - - - -
    parser_list = subparsers.add_parser('list', help='List available sync streams')
    parser_list.add_argument('--stream-name-only', default=False, action='store_true',
        help='Output only the names of the available streams'
    )
    parser_list.set_defaults(func=list_)

    # - - - - sync - - - -
    parser_sync = subparsers.add_parser('sync', help='Sync contents across one or more sync streams')
    parser_sync.add_argument('stream_names', metavar='STREAM', type=str, nargs='+',
        help='Name of stream (or stream:subdirectory) to sync contents across'
    )
    parser_sync.add_argument('--apply', default=False, action='store_true',
        help='Actually run the rsync command'
    )
    parser_sync.add_argument('--thorough', default=False, action='store_true',
        help='Ignore the timestamp on all destination files, to ensure content is synced'
    )
    parser_sync.add_argument('--reverse', default=False, action='store_true',
        help='Reverse the direction of the sync operation'
    )
    parser_sync.set_defaults(func=sync)

    # - - - - verify - - - -
    parser_verify = subparsers.add_parser('verify', help='Verify contents on both sides of a sync stream match')
    parser_verify.add_argument('stream_name', metavar='STREAM', type=str,
        help='Name of stream (or stream:subdirectory) to verify across'
    )
    parser_verify.add_argument('--manifest', metavar='FILENAME', default=None, type=str,
        help='Specify the name of the manifest file to use.  If not given, a name derived from '
             'the name of the stream will be used.  If the file does not exist, it will be created.'
    )
    parser_verify.add_argument('--continue-from', metavar='FILENAME', default=None, type=str,
        help='If given, skip over filenames in the manifest until seeing this filename'
    )
    parser_verify.set_defaults(func=verify)

    # - - - - rename - - - -
    parser_rename = subparsers.add_parser(
        'rename', help='Rename a subdirectory in both source and dest of sync stream'
    )
    parser_rename.add_argument('stream_name', metavar='STREAM', type=str,
        help='Name of stream to operate under'
    )
    parser_rename.add_argument('existing_subdir_name', metavar='DIRNAME', type=str,
        help='Existing subdirectory to be renamed'
    )
    parser_rename.add_argument('new_subdir_name', metavar='DIRNAME', type=str,
        help='New name for subdirectory'
    )
    parser_rename.set_defaults(func=rename)

    options = argparser.parse_args(args)
    with open(options.router, 'r') as f:
        router = json.loads(f.read())
    try:
        func = options.func
    except AttributeError:
        argparser.print_usage()
        sys.exit(1)
    func(router, options)