# Copyright (c) 2024 Chris Pressey, Cat's Eye Technologies
# This file is distributed under an MIT license. See LICENSES directory.
# SPDX-License-Identifier: LicenseRef-MIT-X-ellsync
from argparse import ArgumentParser
import json
import os
import sys
from subprocess import Popen, STDOUT, PIPE
from tempfile import gettempdir
# - - - - utilities - - - -
def clean_dir(dirname):
if not dirname.endswith('/'):
dirname += '/'
return dirname
def run_command(argv, quiet=False):
def pretty(s):
return '"{}"'.format(s) if ' ' in s else s
if not quiet:
sys.stdout.write(' '.join([pretty(a) for a in argv]) + '\n')
try:
p = Popen(argv, stderr=STDOUT, stdout=PIPE, encoding='utf-8')
decode_line = lambda line: line
except TypeError:
# python 2.x
p = Popen(argv, stderr=STDOUT, stdout=PIPE)
decode_line = lambda line: line.decode('utf-8')
if not quiet:
for line in p.stdout:
sys.stdout.write(decode_line(line))
sys.stdout.flush()
return p.wait()
def traverse_directories(f, basedir, dirname):
fulldirname = os.path.join(basedir, dirname)
for filename in sorted(os.listdir(fulldirname)):
subname = os.path.join(dirname, filename)
fullname = os.path.join(fulldirname, filename)
if os.path.isdir(fullname) and not os.path.islink(fullname):
traverse_directories(f, basedir, subname)
else:
f.write(subname + "\n")
def obtain_dirs_for_stream(router, stream_name):
if ':' in stream_name:
stream_name, subdir = stream_name.split(':')
else:
subdir = None
stream = router[stream_name]
from_dir = stream['from']
to_dir = stream['to']
if subdir:
from_dir = os.path.join(from_dir, subdir)
to_dir = os.path.join(to_dir, subdir)
return from_dir, to_dir
def create_manifest_file_if_needed(manifest_filename, from_dir):
if not os.path.isfile(manifest_filename):
print("Cannot read {}, creating it".format(manifest_filename))
with open(manifest_filename, "w") as f:
traverse_directories(f, from_dir, '')
# - - - - commands - - - -
def list_(router, options):
for stream_name, stream in sorted(router.items()):
if os.path.isdir(stream['from']) and os.path.isdir(stream['to']):
if options.stream_name_only:
print(stream_name)
else:
print("{}: {} => {}".format(stream_name, stream['from'], stream['to']))
def sync(router, options):
for stream_name in options.stream_names:
from_dir, to_dir = obtain_dirs_for_stream(router, stream_name)
_sync_directories(from_dir, to_dir, options)
if options.apply:
run_command(['sync'])
def _sync_directories(from_dir, to_dir, options):
from_dir = clean_dir(from_dir)
to_dir = clean_dir(to_dir)
for d in (from_dir, to_dir):
if not os.path.isdir(d):
raise ValueError("Directory '{}' is not present".format(d))
if options.reverse:
signal_filename = os.path.join(from_dir, '.reverse-to-here')
if options.apply and not os.path.isfile(signal_filename):
raise IOError(
"To perform a reverse sync operation, you must create "
"a file called '{}' to signal your intent. It will be "
"deleted as part of the sync.".format(signal_filename)
)
to_dir, from_dir = from_dir, to_dir
argv = ['rsync']
if not options.apply:
argv.append('--dry-run')
if options.thorough:
argv.append('--checksum')
argv.extend(['--archive', '--verbose', '--delete', from_dir, to_dir])
run_command(argv)
def rename(router, options):
stream_name = options.stream_name
if ':' in stream_name:
stream_name, subdir = options.stream_name.split(':')
assert subdir == ''
stream = router[stream_name]
from_dir = stream['from']
to_dir = stream['to']
existing_subdir_a = clean_dir(os.path.join(from_dir, options.existing_subdir_name))
new_subdir_a = clean_dir(os.path.join(from_dir, options.new_subdir_name))
if not os.path.isdir(existing_subdir_a):
raise ValueError("Directory '{}' is not present".format(existing_subdir_a))
if os.path.isdir(new_subdir_a):
raise ValueError("Directory '{}' already exists".format(new_subdir_a))
existing_subdir_b = clean_dir(os.path.join(to_dir, options.existing_subdir_name))
new_subdir_b = clean_dir(os.path.join(to_dir, options.new_subdir_name))
if not os.path.isdir(existing_subdir_b):
raise ValueError("Directory '{}' is not present".format(existing_subdir_b))
if os.path.isdir(new_subdir_b):
raise ValueError("Directory '{}' already exists".format(new_subdir_b))
print("Renaming {} to {}".format(existing_subdir_a, new_subdir_a))
os.rename(existing_subdir_a, new_subdir_a)
print("Renaming {} to {}".format(existing_subdir_b, new_subdir_b))
os.rename(existing_subdir_b, new_subdir_b)
def verify(router, options):
from_dir, to_dir = obtain_dirs_for_stream(router, options.stream_name)
active = False
if options.continue_from:
print("Continuing from {}".format(options.continue_from))
else:
active = True
if not options.manifest:
stream_name = options.stream_name
stream_name = stream_name.replace('/', '_').replace(':', '_')
manifest_filename = os.path.join(
gettempdir(),
"ellsync-manifest-{}.lst".format(stream_name)
)
else:
manifest_filename = options.manifest
create_manifest_file_if_needed(manifest_filename, from_dir)
print("Traversing manifest {}".format(manifest_filename))
bads = []
with open(manifest_filename, "r") as f:
for line in f:
path = line.strip()
from_path = os.path.join(from_dir, path)
to_path = os.path.join(to_dir, path)
assert not os.path.isdir(from_path)
if not active:
if path == options.continue_from:
print("Found {}, resuming verify".format(options.continue_from))
active = True
else:
continue
exit_code = run_command(
['diff', '--brief', '--new-file', from_path, to_path],
quiet=True
)
if exit_code == 0:
print("[OK!] {}".format(path))
else:
print("[BAD] {}".format(path))
bads.append(path)
print("Traversal complete, deleting manifest {}".format(manifest_filename))
os.unlink(manifest_filename)
if bads:
print("Files that FAILED verification:")
for path in bads:
print("[BAD] {}".format(path))
raise ValueError("Not all files in run verified successfully")
else:
print("All files in run verified successfully.")
# - - - - driver - - - -
def main(args):
argparser = ArgumentParser()
argparser.add_argument('router', metavar='ROUTER', type=str,
help='JSON file containing the backup router description'
)
argparser.add_argument('--version', action='version', version="%(prog)s 0.6")
subparsers = argparser.add_subparsers()
# - - - - list - - - -
parser_list = subparsers.add_parser('list', help='List available sync streams')
parser_list.add_argument('--stream-name-only', default=False, action='store_true',
help='Output only the names of the available streams'
)
parser_list.set_defaults(func=list_)
# - - - - sync - - - -
parser_sync = subparsers.add_parser('sync', help='Sync contents across one or more sync streams')
parser_sync.add_argument('stream_names', metavar='STREAM', type=str, nargs='+',
help='Name of stream (or stream:subdirectory) to sync contents across'
)
parser_sync.add_argument('--apply', default=False, action='store_true',
help='Actually run the rsync command'
)
parser_sync.add_argument('--thorough', default=False, action='store_true',
help='Ignore the timestamp on all destination files, to ensure content is synced'
)
parser_sync.add_argument('--reverse', default=False, action='store_true',
help='Reverse the direction of the sync operation'
)
parser_sync.set_defaults(func=sync)
# - - - - verify - - - -
parser_verify = subparsers.add_parser('verify', help='Verify contents on both sides of a sync stream match')
parser_verify.add_argument('stream_name', metavar='STREAM', type=str,
help='Name of stream (or stream:subdirectory) to verify across'
)
parser_verify.add_argument('--manifest', metavar='FILENAME', default=None, type=str,
help='Specify the name of the manifest file to use. If not given, a name derived from '
'the name of the stream will be used. If the file does not exist, it will be created.'
)
parser_verify.add_argument('--continue-from', metavar='FILENAME', default=None, type=str,
help='If given, skip over filenames in the manifest until seeing this filename'
)
parser_verify.set_defaults(func=verify)
# - - - - rename - - - -
parser_rename = subparsers.add_parser(
'rename', help='Rename a subdirectory in both source and dest of sync stream'
)
parser_rename.add_argument('stream_name', metavar='STREAM', type=str,
help='Name of stream to operate under'
)
parser_rename.add_argument('existing_subdir_name', metavar='DIRNAME', type=str,
help='Existing subdirectory to be renamed'
)
parser_rename.add_argument('new_subdir_name', metavar='DIRNAME', type=str,
help='New name for subdirectory'
)
parser_rename.set_defaults(func=rename)
options = argparser.parse_args(args)
with open(options.router, 'r') as f:
router = json.loads(f.read())
try:
func = options.func
except AttributeError:
argparser.print_usage()
sys.exit(1)
func(router, options)