#!/usr/bin/env python3 # find duplicate images given a hamming distance threshold. # employs dhash to do the heavy lifting. # doesn't recurse into "./_duplicate/" so you can dump things there if you wish. # dependencies: pillow, dhash import sys, os, os.path, pickle from PIL import Image import dhash def lament(*args, **kwargs): print(*args, file=sys.stderr, **kwargs) def result(diff, p1, p2): # TODO: rename print("{}\t{}\t{}".format(diff, p1, p2)) dbname = "idup.db" exts = ".jpeg .jpg .png".split() rootpath = "." ignore_dir = os.path.join(rootpath, "_duplicate") """verbosity: -1: only unrecoverable errors. 0: include failures. 1: include image opening/hashing. 2: the kitchen sink. """ verbosity = 1 pname = sys.argv[0] if len(sys.argv) <= 1: print("usage: {} {{threshold}}".format(pname)) print(" utilizes {} in the current working directory".format(dbname)) sys.exit(1) args = sys.argv[1:] threshold = int(args[0]) paths = {} # path to hash mapping. if os.path.exists(dbname) and os.path.getsize(dbname) > 0: with open(dbname, "rb") as f: paths = pickle.load(f) #lament("loaded", len(paths.keys()), "hashes") else: if verbosity >= 0: lament("warning: no database found. starting from scratch.") existing = dict((path, h) for path, h in paths.items() if os.path.exists(path)) for path in paths.keys(): if path not in existing: if verbosity >= 0: lament("#d", path) paths = existing def compare_hash(h1, h2): # hashes are in byte strings, so we have to convert them to integers. i1 = int.from_bytes(h1, byteorder="big") i2 = int.from_bytes(h2, byteorder="big") # return the hamming distance. return bin(i1 ^ i2).count('1') def run(): for dn, _, fns in os.walk(rootpath): if dn == ignore_dir: continue for fn in fns: name, ext = os.path.splitext(fn) path = os.path.join(dn, fn) if ext not in exts: continue if path in paths: if verbosity >= 2: lament("#s", path) continue try: image = Image.open(path) except OSError: if verbosity >= 0: lament("#f", path) else: try: row, col = dhash.dhash_row_col(image) except OSError: if verbosity >= 0: lament("#f", path) else: if verbosity >= 1: lament("#o", path) h = dhash.format_bytes(row, col) paths[path] = h finally: image.close() # first pass: exact hash matching. hashes = dict((v, k) for k, v in paths.items()) for p1, h in paths.items(): p2 = hashes[h] if p1 != p2: result(-1, p1, p2) # second pass: fuzzy hash matching. if threshold <= 0: return seen = set() for p1, h1 in paths.items(): if verbosity >= 2: lament("#c", p1) seen.add(p1) for p2, h2 in paths.items(): if p2 in seen: continue if h1 == h2: continue diff = compare_hash(h1, h2) if diff <= threshold: result(diff, p1, p2) try: run() except KeyboardInterrupt: if verbosity >= 0: lament("# interrupted") finally: if os.path.exists(dbname): backup = dbname+".bak" if os.path.exists(backup): os.remove(backup) os.rename(dbname, dbname+".bak") with open(dbname, "wb") as f: pickle.dump(paths, f)