Optimized the scanning process in PE.

This commit is contained in:
Virgil Dupras 2011-03-04 11:15:04 +01:00
parent ec8e915830
commit 1b52feb8b8
4 changed files with 97 additions and 40 deletions

View File

@ -19,7 +19,7 @@
"Analyzed %d/%d pictures" = "Analyzed %d/%d pictures"; "Analyzed %d/%d pictures" = "Analyzed %d/%d pictures";
"Preparing for matching" = "Preparing for matching"; "Preparing for matching" = "Preparing for matching";
"Matched %d/%d pictures" = "Matched %d/%d pictures"; "Performed %d/%d chunk matches" = "Performed %d/%d chunk matches";
"Verified %d/%d matches" = "Verified %d/%d matches"; "Verified %d/%d matches" = "Verified %d/%d matches";
"Removing dead tracks from your iTunes Library" = "Removing dead tracks from your iTunes Library"; "Removing dead tracks from your iTunes Library" = "Removing dead tracks from your iTunes Library";

View File

@ -18,7 +18,7 @@
"Analyzed %d/%d pictures" = "Analyzé %d/%d images"; "Analyzed %d/%d pictures" = "Analyzé %d/%d images";
"Preparing for matching" = "Préparation pour la comparaison"; "Preparing for matching" = "Préparation pour la comparaison";
"Matched %d/%d pictures" = "Comparé %d/%d images"; "Performed %d/%d chunk matches" = "%d/%d blocs d'images comparés";
"Verified %d/%d matches" = "Vérifié %d/%d paires"; "Verified %d/%d matches" = "Vérifié %d/%d paires";
"Removing dead tracks from your iTunes Library" = "Retrait des tracks mortes de votre librairie iTunes"; "Removing dead tracks from your iTunes Library" = "Retrait des tracks mortes de votre librairie iTunes";

View File

@ -8,8 +8,9 @@
import logging import logging
import multiprocessing import multiprocessing
from collections import defaultdict, deque from itertools import combinations
from hscommon.util import extract
from hscommon.trans import tr from hscommon.trans import tr
from jobprogress import job from jobprogress import job
@ -17,15 +18,32 @@ from core.engine import Match
from .block import avgdiff, DifferentBlockCountError, NoBlocksError from .block import avgdiff, DifferentBlockCountError, NoBlocksError
from .cache import Cache from .cache import Cache
# OPTIMIZATION NOTES:
# The bottleneck of the matching phase is CPU, which is why we use multiprocessing. However, another
# bottleneck that shows up when a lot of pictures are involved is Disk IO's because blocks
# constantly have to be read from disks by subprocesses. This problem is especially big on CPUs
# with a lot of cores. Therefore, we must minimize Disk IOs. The best way to achieve that is to
# separate the files to scan in "chunks" and it's by chunk that blocks are read in memory and
# compared to each other. Each file in a chunk has to be compared to each other, of course, but also
# to files in other chunks. So chunkifying doesn't save us any actual comparison, but the advantage
# is that instead of reading blocks from disk number_of_files**2 times, we read it
# number_of_files*number_of_chunks times.
# Determining the right chunk size is tricky, bceause if it's too big, too many blocks will be in
# memory at the same time and we might end up with memory trashing, which is awfully slow. So,
# because our *real* bottleneck is CPU, the chunk size must simply be enough so that the CPU isn't
# starved by Disk IOs.
MIN_ITERATIONS = 3 MIN_ITERATIONS = 3
BLOCK_COUNT_PER_SIDE = 15 BLOCK_COUNT_PER_SIDE = 15
DEFAULT_CHUNK_SIZE = 1000
MIN_CHUNK_SIZE = 100
# Enough so that we're sure that the main thread will not wait after a result.get() call # Enough so that we're sure that the main thread will not wait after a result.get() call
# cpucount*2 should be enough to be sure that the spawned process will not wait after the results # cpucount+1 should be enough to be sure that the spawned process will not wait after the results
# collection made by the main process. # collection made by the main process.
RESULTS_QUEUE_LIMIT = multiprocessing.cpu_count() * 2 RESULTS_QUEUE_LIMIT = multiprocessing.cpu_count() + 1
def prepare_pictures(pictures, cache_path, j=job.nulljob): def prepare_pictures(pictures, cache_path, with_dimensions, j=job.nulljob):
# The MemoryError handlers in there use logging without first caring about whether or not # The MemoryError handlers in there use logging without first caring about whether or not
# there is enough memory left to carry on the operation because it is assumed that the # there is enough memory left to carry on the operation because it is assumed that the
# MemoryError happens when trying to read an image file, which is freed from memory by the # MemoryError happens when trying to read an image file, which is freed from memory by the
@ -36,6 +54,7 @@ def prepare_pictures(pictures, cache_path, j=job.nulljob):
for picture in j.iter_with_progress(pictures, tr("Analyzed %d/%d pictures")): for picture in j.iter_with_progress(pictures, tr("Analyzed %d/%d pictures")):
picture.unicode_path = str(picture.path) picture.unicode_path = str(picture.path)
logging.debug("Analyzing picture at {}".format(picture.unicode_path)) logging.debug("Analyzing picture at {}".format(picture.unicode_path))
if with_dimensions:
picture.dimensions # pre-read dimensions picture.dimensions # pre-read dimensions
try: try:
if picture.unicode_path not in cache: if picture.unicode_path not in cache:
@ -53,18 +72,42 @@ def prepare_pictures(pictures, cache_path, j=job.nulljob):
cache.close() cache.close()
return prepared return prepared
def get_chunks(pictures):
min_chunk_count = multiprocessing.cpu_count() * 2 # have enough chunks to feed all subprocesses
chunk_count = len(pictures) // DEFAULT_CHUNK_SIZE
chunk_count = max(min_chunk_count, chunk_count)
chunk_size = (len(pictures) // chunk_count) + 1
chunk_size = max(MIN_CHUNK_SIZE, chunk_size)
logging.info("Creating {} chunks with a chunk size of {} for {} pictures".format(
chunk_count, chunk_size, len(pictures)))
chunks = [pictures[i:i+chunk_size] for i in range(0, len(pictures), chunk_size)]
return chunks
def get_match(first, second, percentage): def get_match(first, second, percentage):
if percentage < 0: if percentage < 0:
percentage = 0 percentage = 0
return Match(first, second, percentage) return Match(first, second, percentage)
def async_compare(ref_id, other_ids, dbname, threshold): def async_compare(ref_ids, other_ids, dbname, threshold, picinfo):
# The list of ids in ref_ids have to be compared to the list of ids in other_ids. other_ids
# can be None. In this case, ref_ids has to be compared with itself
# picinfo is a dictionary {pic_id: (dimensions, is_ref)}
cache = Cache(dbname) cache = Cache(dbname)
limit = 100 - threshold limit = 100 - threshold
ref_blocks = cache[ref_id] ref_pairs = list(cache.get_multiple(ref_ids))
pairs = cache.get_multiple(other_ids) if other_ids is not None:
other_pairs = list(cache.get_multiple(other_ids))
comparisons_to_do = [(r, o) for r in ref_pairs for o in other_pairs]
else:
comparisons_to_do = list(combinations(ref_pairs, 2))
results = [] results = []
for other_id, other_blocks in pairs: for (ref_id, ref_blocks), (other_id, other_blocks) in comparisons_to_do:
ref_dimensions, ref_is_ref = picinfo[ref_id]
other_dimensions, other_is_ref = picinfo[other_id]
if ref_is_ref and other_is_ref:
continue
if ref_dimensions != other_dimensions:
continue
try: try:
diff = avgdiff(ref_blocks, other_blocks, limit, MIN_ITERATIONS) diff = avgdiff(ref_blocks, other_blocks, limit, MIN_ITERATIONS)
percentage = 100 - diff percentage = 100 - diff
@ -76,48 +119,60 @@ def async_compare(ref_id, other_ids, dbname, threshold):
return results return results
def getmatches(pictures, cache_path, threshold=75, match_scaled=False, j=job.nulljob): def getmatches(pictures, cache_path, threshold=75, match_scaled=False, j=job.nulljob):
def get_picinfo(p):
if match_scaled:
return (None, p.is_ref)
else:
return (p.dimensions, p.is_ref)
def collect_results(collect_all=False):
# collect results and wait until the queue is small enough to accomodate a new results.
nonlocal async_results, matches, comparison_count
limit = 0 if collect_all else RESULTS_QUEUE_LIMIT
while len(async_results) > limit:
ready, working = extract(lambda r: r.ready(), async_results)
for result in ready:
matches += result.get()
async_results.remove(result)
comparison_count += 1
progress_msg = tr("Performed %d/%d chunk matches") % (comparison_count, len(comparisons_to_do))
j.set_progress(comparison_count, progress_msg)
j = j.start_subjob([3, 7]) j = j.start_subjob([3, 7])
pictures = prepare_pictures(pictures, cache_path, j) pictures = prepare_pictures(pictures, cache_path, with_dimensions=not match_scaled, j=j)
j = j.start_subjob([9, 1], tr("Preparing for matching")) j = j.start_subjob([9, 1], tr("Preparing for matching"))
cache = Cache(cache_path) cache = Cache(cache_path)
id2picture = {} id2picture = {}
dimensions2pictures = defaultdict(set)
for picture in pictures: for picture in pictures:
try: try:
picture.cache_id = cache.get_id(picture.unicode_path) picture.cache_id = cache.get_id(picture.unicode_path)
id2picture[picture.cache_id] = picture id2picture[picture.cache_id] = picture
if not match_scaled:
dimensions2pictures[picture.dimensions].add(picture)
except ValueError: except ValueError:
pass pass
cache.close() cache.close()
pictures = [p for p in pictures if hasattr(p, 'cache_id')] pictures = [p for p in pictures if hasattr(p, 'cache_id')]
pool = multiprocessing.Pool() pool = multiprocessing.Pool()
async_results = deque() async_results = []
matches = [] matches = []
pictures_copy = set(pictures) chunks = get_chunks(pictures)
for ref in j.iter_with_progress(pictures, tr("Matched %d/%d pictures")): # We add a None element at the end of the chunk list because each chunk has to be compared
others = pictures_copy if match_scaled else dimensions2pictures[ref.dimensions] # with itself. Thus, each chunk will show up as a ref_chunk having other_chunk set to None once.
others.remove(ref) comparisons_to_do = list(combinations(chunks + [None], 2))
if ref.is_ref: comparison_count = 0
# Don't spend time comparing two ref pics together. j.start_job(len(comparisons_to_do))
others = [pic for pic in others if not pic.is_ref] for ref_chunk, other_chunk in comparisons_to_do:
if others: picinfo = {p.cache_id: get_picinfo(p) for p in ref_chunk}
cache_ids = [f.cache_id for f in others] ref_ids = [p.cache_id for p in ref_chunk]
# We limit the number of cache_ids we send for multi-processing because otherwise, we if other_chunk is not None:
# might get an error saying "String or BLOB exceeded size limit" other_ids = [p.cache_id for p in other_chunk]
ARG_LIMIT = 1000 picinfo.update({p.cache_id: get_picinfo(p) for p in other_chunk})
while cache_ids: else:
args = (ref.cache_id, cache_ids[:ARG_LIMIT], cache_path, threshold) other_ids = None
args = (ref_ids, other_ids, cache_path, threshold, picinfo)
async_results.append(pool.apply_async(async_compare, args)) async_results.append(pool.apply_async(async_compare, args))
cache_ids = cache_ids[ARG_LIMIT:] collect_results()
# We use a while here because it's possible that more than one result has been added if collect_results(collect_all=True)
# ARG_LIMIT has been reached. pool.close()
while len(async_results) > RESULTS_QUEUE_LIMIT:
result = async_results.popleft()
matches.extend(result.get())
for result in async_results: # process the rest of the results
matches.extend(result.get())
result = [] result = []
for ref_id, other_id, percentage in j.iter_with_progress(matches, tr("Verified %d/%d matches"), every=10): for ref_id, other_id, percentage in j.iter_with_progress(matches, tr("Verified %d/%d matches"), every=10):
@ -126,6 +181,8 @@ def getmatches(pictures, cache_path, threshold=75, match_scaled=False, j=job.nul
if percentage == 100 and ref.md5 != other.md5: if percentage == 100 and ref.md5 != other.md5:
percentage = 99 percentage = 99
if percentage >= threshold: if percentage >= threshold:
ref.dimensions # pre-read dimensions for display in results
other.dimensions
result.append(get_match(ref, other, percentage)) result.append(get_match(ref, other, percentage))
return result return result

View File

@ -84,7 +84,7 @@
</message> </message>
<message> <message>
<source>Matched %d/%d pictures</source> <source>Matched %d/%d pictures</source>
<translation>Comparé %d/%d images</translation> <translation>%d/%d blocs d'images comparés</translation>
</message> </message>
<message> <message>
<source>Verified %d/%d matches</source> <source>Verified %d/%d matches</source>