#!/usr/bin/env python3
# SPDX-FileCopyrightText: Chris Pressey, the original author of this work, has dedicated it to the public domain.
# For more information, please refer to <https://unlicense.org/>
# SPDX-License-Identifier: Unlicense
"""
Find files that are in some dirs that are also in other dirs
and/or that are not in other dirs.
SYNOPSIS:
fifiin.py {dir} {--that-are-elsewhere-in dir | --that-are-not-elsewhere-in dir}
To run the built-in tests:
python3 -m unittest fifiin
"""
from argparse import ArgumentParser
import hashlib
import json
import os
import re
try:
from StringIO import StringIO
except ImportError:
from io import StringIO
assert StringIO
from subprocess import check_call
import sys
from tempfile import mkdtemp
import unittest
try:
from tqdm import tqdm
except ImportError:
def tqdm(x, **kwargs): return x
def md5(filename):
"""Compute and return the MD5 hash of the named file.
"""
hash = hashlib.md5()
file = open(filename, "rb")
eof = False
while not eof:
data = file.read(1024)
if data:
hash.update(data)
else:
eof = True
file.close()
return hash.hexdigest()
def build_sizemap(directory, sizemap, exclude=None, verbose=False, with_extension=None):
for root, dirs, files in os.walk(directory):
if os.path.normpath(root) in exclude:
if verbose:
print("(skipping {})".format(os.path.normpath(root)))
dirs[:] = []
else:
for filename in files:
if with_extension and not filename.endswith('.' + with_extension):
continue
full = os.path.normpath(os.path.join(root, filename))
if os.path.islink(full):
continue
try:
size = os.path.getsize(full)
sizemap.setdefault(size, []).append(full)
except IOError as e:
print(str(e))
def build_sizemap_from_dirset(dirset, exclude=None, verbose=False, with_extension=None):
sizemap = {}
for dirname in sorted(dirset):
if verbose:
print("traversing {}...".format(dirname))
dirname = os.path.normpath(dirname)
if not os.path.isdir(dirname):
raise OSError("{} is not a dir".format(dirname))
build_sizemap(dirname, sizemap, exclude=(exclude or set()), verbose=verbose, with_extension=with_extension)
if verbose:
print('sizemap built with {} entries'.format(len(sizemap)))
return sizemap
class TestFindFilesIn(unittest.TestCase):
def setUp(self):
super(TestFindFilesIn, self).setUp()
self.saved_stdout = sys.stdout
self.saved_stderr = sys.stderr
sys.stdout = StringIO()
sys.stderr = StringIO()
self.maxDiff = None
self.dirname = mkdtemp()
self.prevdir = os.getcwd()
os.chdir(self.dirname)
for dirname in ('canonical', 'recent'):
check_call("mkdir -p {}".format(dirname), shell=True)
with open('canonical/file_A', 'w') as f:
f.write('This is file A')
with open('recent/file_A', 'w') as f:
f.write('This is file A')
with open('recent/file_B', 'w') as f:
f.write('This is file B')
def tearDown(self):
os.chdir(self.prevdir)
check_call("rm -rf {}".format(self.dirname), shell=True)
sys.stdout = self.saved_stdout
sys.stderr = self.saved_stderr
super(TestFindFilesIn, self).tearDown()
def test_failure(self):
with self.assertRaises(OSError):
main(['not_existent_dir'])
def test_that_are_also_in(self):
main(['recent', '--that-are-elsewhere-in', 'canonical'])
output = sys.stdout.getvalue()
result = json.loads(output)
self.assertEqual(result, [
[[u'recent/file_A'], [u'canonical/file_A']]
])
def test_that_are_not_in(self):
main(['recent', '--that-are-not-elsewhere-in', 'canonical'])
output = sys.stdout.getvalue()
result = json.loads(output)
self.assertEqual(result, [
u'recent/file_B'
])
### MAIN ###
def main(args):
argparser = ArgumentParser()
argparser.add_argument('base_dirs', nargs='+', metavar='DIRNAME', type=str,
help='Directories containing files we wish to query'
)
argparser.add_argument('--with-extension', metavar='EXTENSION', type=str, default=None,
help='File extension that the files to consider should have'
)
argparser.add_argument('--that-are-elsewhere-in', metavar='DIRNAME', type=str, action='append',
help='Directory representing a known location of the files'
)
argparser.add_argument('--that-are-not-elsewhere-in', metavar='DIRNAME', type=str, action='append',
help='Directory representing a known location where the files may not be'
)
argparser.add_argument('--verbose', default=False, action='store_true',
help='Produce more output about what is going on'
)
options = argparser.parse_args(args)
base_dirs = set([os.path.normpath(p) for p in options.base_dirs])
that_are_also_in = set(options.that_are_elsewhere_in or [])
that_are_not_in = set(options.that_are_not_elsewhere_in or [])
if that_are_also_in and that_are_not_in:
raise NotImplementedError(
"For now, --that-are-elsewhere-in and --that-are-not-elsewhere-in are mutually exclusive"
)
# Go through the base dirs, construct a base sizemap.
base_sizemap = build_sizemap_from_dirset(base_dirs, verbose=options.verbose, with_extension=options.with_extension)
if that_are_also_in:
target_sizemap = build_sizemap_from_dirset(that_are_also_in, exclude=base_dirs, verbose=options.verbose)
base_hashmap = {}
target_hashmap = {}
for size, filenames in base_sizemap.items():
if size not in target_sizemap:
continue
for filename in filenames:
base_hashmap.setdefault(md5(filename), []).append(filename)
for filename in target_sizemap[size]:
target_hashmap.setdefault(md5(filename), []).append(filename)
result = []
for hash_, filenames in sorted(base_hashmap.items()):
if hash_ in target_hashmap:
result.append([filenames, target_hashmap[hash_]])
print(json.dumps(result, indent=4, sort_keys=True))
if that_are_not_in:
target_sizemap = build_sizemap_from_dirset(that_are_not_in, exclude=base_dirs, verbose=options.verbose)
found = []
for size, filenames in base_sizemap.items():
if size not in target_sizemap:
found.extend(filenames)
else:
base_hash = {}
for filename in filenames:
base_hash.setdefault(md5(filename), []).append(filename)
target_hash = {}
for filename in target_sizemap[size]:
target_hash.setdefault(md5(filename), []).append(filename)
for hash_, filenames in base_hash.items():
if hash_ not in target_hash:
found.extend(filenames)
print(json.dumps(sorted(found), indent=4, sort_keys=True))
if __name__ == '__main__':
main(sys.argv[1:])