git @ Cat's Eye Technologies Dipple / master python / fifiin.py
master

Tree @master (Download .tar.gz)

fifiin.py @masterraw · history · blame

#!/usr/bin/env python3

# SPDX-FileCopyrightText: Chris Pressey, the original author of this work, has dedicated it to the public domain.
# For more information, please refer to <https://unlicense.org/>
# SPDX-License-Identifier: Unlicense

"""
Find files that are in some dirs that are also in other dirs
and/or that are not in other dirs.

SYNOPSIS:

    fifiin.py {dir} {--that-are-elsewhere-in dir | --that-are-not-elsewhere-in dir}

To run the built-in tests:

    python3 -m unittest fifiin

"""

from argparse import ArgumentParser
import hashlib
import json
import os
import re
try:
    from StringIO import StringIO
except ImportError:
    from io import StringIO
assert StringIO
from subprocess import check_call
import sys
from tempfile import mkdtemp
import unittest

try:
    from tqdm import tqdm
except ImportError:
    def tqdm(x, **kwargs): return x


def md5(filename):
    """Compute and return the MD5 hash of the named file.

    """
    hash = hashlib.md5()
    file = open(filename, "rb")
    eof = False
    while not eof:
        data = file.read(1024)
        if data:
            hash.update(data)
        else:
            eof = True
    file.close()
    return hash.hexdigest()


def build_sizemap(directory, sizemap, exclude=None, verbose=False, with_extension=None):
    for root, dirs, files in os.walk(directory):
        if os.path.normpath(root) in exclude:
            if verbose:
                print("(skipping {})".format(os.path.normpath(root)))
            dirs[:] = []
        else:
            for filename in files:
                if with_extension and not filename.endswith('.' + with_extension):
                    continue
                full = os.path.normpath(os.path.join(root, filename))
                if os.path.islink(full):
                    continue
                try:
                    size = os.path.getsize(full)
                    sizemap.setdefault(size, []).append(full)
                except IOError as e:
                    print(str(e))


def build_sizemap_from_dirset(dirset, exclude=None, verbose=False, with_extension=None):
    sizemap = {}
    for dirname in sorted(dirset):
        if verbose:
            print("traversing {}...".format(dirname))
        dirname = os.path.normpath(dirname)
        if not os.path.isdir(dirname):
            raise OSError("{} is not a dir".format(dirname))
        build_sizemap(dirname, sizemap, exclude=(exclude or set()), verbose=verbose, with_extension=with_extension)
    if verbose:
        print('sizemap built with {} entries'.format(len(sizemap)))
    return sizemap


class TestFindFilesIn(unittest.TestCase):

    def setUp(self):
        super(TestFindFilesIn, self).setUp()
        self.saved_stdout = sys.stdout
        self.saved_stderr = sys.stderr
        sys.stdout = StringIO()
        sys.stderr = StringIO()
        self.maxDiff = None
        self.dirname = mkdtemp()
        self.prevdir = os.getcwd()
        os.chdir(self.dirname)
        for dirname in ('canonical', 'recent'):
            check_call("mkdir -p {}".format(dirname), shell=True)
        with open('canonical/file_A', 'w') as f:
            f.write('This is file A')
        with open('recent/file_A', 'w') as f:
            f.write('This is file A')
        with open('recent/file_B', 'w') as f:
            f.write('This is file B')

    def tearDown(self):
        os.chdir(self.prevdir)
        check_call("rm -rf {}".format(self.dirname), shell=True)
        sys.stdout = self.saved_stdout
        sys.stderr = self.saved_stderr
        super(TestFindFilesIn, self).tearDown()

    def test_failure(self):
        with self.assertRaises(OSError):
            main(['not_existent_dir'])

    def test_that_are_also_in(self):
        main(['recent', '--that-are-elsewhere-in', 'canonical'])
        output = sys.stdout.getvalue()
        result = json.loads(output)
        self.assertEqual(result, [
            [[u'recent/file_A'], [u'canonical/file_A']]
        ])

    def test_that_are_not_in(self):
        main(['recent', '--that-are-not-elsewhere-in', 'canonical'])
        output = sys.stdout.getvalue()
        result = json.loads(output)
        self.assertEqual(result, [
            u'recent/file_B'
        ])


### MAIN ###


def main(args):
    argparser = ArgumentParser()

    argparser.add_argument('base_dirs', nargs='+', metavar='DIRNAME', type=str,
        help='Directories containing files we wish to query'
    )

    argparser.add_argument('--with-extension', metavar='EXTENSION', type=str, default=None,
        help='File extension that the files to consider should have'
    )
    argparser.add_argument('--that-are-elsewhere-in', metavar='DIRNAME', type=str, action='append',
        help='Directory representing a known location of the files'
    )
    argparser.add_argument('--that-are-not-elsewhere-in', metavar='DIRNAME', type=str, action='append',
        help='Directory representing a known location where the files may not be'
    )
    argparser.add_argument('--verbose', default=False, action='store_true',
        help='Produce more output about what is going on'
    )

    options = argparser.parse_args(args)

    base_dirs = set([os.path.normpath(p) for p in options.base_dirs])
    that_are_also_in = set(options.that_are_elsewhere_in or [])
    that_are_not_in = set(options.that_are_not_elsewhere_in or [])

    if that_are_also_in and that_are_not_in:
        raise NotImplementedError(
            "For now, --that-are-elsewhere-in and --that-are-not-elsewhere-in are mutually exclusive"
        )

    # Go through the base dirs, construct a base sizemap.

    base_sizemap = build_sizemap_from_dirset(base_dirs, verbose=options.verbose, with_extension=options.with_extension)

    if that_are_also_in:
        target_sizemap = build_sizemap_from_dirset(that_are_also_in, exclude=base_dirs, verbose=options.verbose)

        base_hashmap = {}
        target_hashmap = {}

        for size, filenames in base_sizemap.items():
            if size not in target_sizemap:
                continue
            for filename in filenames:
                base_hashmap.setdefault(md5(filename), []).append(filename)
            for filename in target_sizemap[size]:
                target_hashmap.setdefault(md5(filename), []).append(filename)

        result = []
        for hash_, filenames in sorted(base_hashmap.items()):
            if hash_ in target_hashmap:
                result.append([filenames, target_hashmap[hash_]])

        print(json.dumps(result, indent=4, sort_keys=True))

    if that_are_not_in:
        target_sizemap = build_sizemap_from_dirset(that_are_not_in, exclude=base_dirs, verbose=options.verbose)

        found = []

        for size, filenames in base_sizemap.items():
            if size not in target_sizemap:
                found.extend(filenames)
            else:
                base_hash = {}
                for filename in filenames:
                    base_hash.setdefault(md5(filename), []).append(filename)
                target_hash = {}
                for filename in target_sizemap[size]:
                    target_hash.setdefault(md5(filename), []).append(filename)

                for hash_, filenames in base_hash.items():
                    if hash_ not in target_hash:
                        found.extend(filenames)

        print(json.dumps(sorted(found), indent=4, sort_keys=True))


if __name__ == '__main__':
    main(sys.argv[1:])