gists/image_deduplication/idup.py
2018-10-11 16:45:31 +02:00

136 lines
3.7 KiB
Python

#!/usr/bin/env python3
# find duplicate images given a hamming distance threshold.
# employs dhash to do the heavy lifting.
# doesn't recurse into "./_duplicate/" so you can dump things there if you wish.
# dependencies: pillow, dhash
import sys, os, os.path, pickle
from PIL import Image
import dhash
def lament(*args, **kwargs):
print(*args, file=sys.stderr, **kwargs)
def result(diff, p1, p2): # TODO: rename
print("{}\t{}\t{}".format(diff, p1, p2))
dbname = "idup.db"
exts = ".jpeg .jpg .png".split()
rootpath = "."
ignore_dir = os.path.join(rootpath, "_duplicate")
"""verbosity:
-1: only unrecoverable errors.
0: include failures.
1: include image opening/hashing.
2: the kitchen sink.
"""
verbosity = 1
pname = sys.argv[0]
if len(sys.argv) <= 1:
print("usage: {} {{threshold}}".format(pname))
print(" utilizes {} in the current working directory".format(dbname))
sys.exit(1)
args = sys.argv[1:]
threshold = int(args[0])
paths = {} # path to hash mapping.
if os.path.exists(dbname) and os.path.getsize(dbname) > 0:
with open(dbname, "rb") as f:
paths = pickle.load(f)
#lament("loaded", len(paths.keys()), "hashes")
else:
if verbosity >= 0:
lament("warning: no database found. starting from scratch.")
existing = dict((path, h) for path, h in paths.items() if os.path.exists(path))
for path in paths.keys():
if path not in existing:
if verbosity >= 0:
lament("#d", path)
paths = existing
def compare_hash(h1, h2):
# hashes are in byte strings, so we have to convert them to integers.
i1 = int.from_bytes(h1, byteorder="big")
i2 = int.from_bytes(h2, byteorder="big")
# return the hamming distance.
return bin(i1 ^ i2).count('1')
def run():
for dn, _, fns in os.walk(rootpath):
if dn == ignore_dir:
continue
for fn in fns:
name, ext = os.path.splitext(fn)
path = os.path.join(dn, fn)
if ext not in exts:
continue
if path in paths:
if verbosity >= 2:
lament("#s", path)
continue
try:
image = Image.open(path)
except OSError:
if verbosity >= 0:
lament("#f", path)
else:
try:
row, col = dhash.dhash_row_col(image)
except OSError:
if verbosity >= 0:
lament("#f", path)
else:
if verbosity >= 1:
lament("#o", path)
h = dhash.format_bytes(row, col)
paths[path] = h
finally:
image.close()
# first pass: exact hash matching.
hashes = dict((v, k) for k, v in paths.items())
for p1, h in paths.items():
p2 = hashes[h]
if p1 != p2:
result(-1, p1, p2)
# second pass: fuzzy hash matching.
if threshold <= 0:
return
seen = set()
for p1, h1 in paths.items():
if verbosity >= 2:
lament("#c", p1)
seen.add(p1)
for p2, h2 in paths.items():
if p2 in seen:
continue
if h1 == h2:
continue
diff = compare_hash(h1, h2)
if diff <= threshold:
result(diff, p1, p2)
try:
run()
except KeyboardInterrupt:
if verbosity >= 0:
lament("# interrupted")
finally:
if os.path.exists(dbname):
backup = dbname+".bak"
if os.path.exists(backup):
os.remove(backup)
os.rename(dbname, dbname+".bak")
with open(dbname, "wb") as f:
pickle.dump(paths, f)