From 1b52feb8b843b05e7989803047b6fb7e57a9c244 Mon Sep 17 00:00:00 2001 From: Virgil Dupras Date: Fri, 4 Mar 2011 11:15:04 +0100 Subject: [PATCH] Optimized the scanning process in PE. --- cocoa/base/en.lproj/core.strings | 2 +- cocoa/base/fr.lproj/core.strings | 2 +- core_pe/matchbase.py | 131 ++++++++++++++++++++++--------- qt/lang/fr.ts | 2 +- 4 files changed, 97 insertions(+), 40 deletions(-) diff --git a/cocoa/base/en.lproj/core.strings b/cocoa/base/en.lproj/core.strings index ed27a88c..b9786151 100644 --- a/cocoa/base/en.lproj/core.strings +++ b/cocoa/base/en.lproj/core.strings @@ -19,7 +19,7 @@ "Analyzed %d/%d pictures" = "Analyzed %d/%d pictures"; "Preparing for matching" = "Preparing for matching"; -"Matched %d/%d pictures" = "Matched %d/%d pictures"; +"Performed %d/%d chunk matches" = "Performed %d/%d chunk matches"; "Verified %d/%d matches" = "Verified %d/%d matches"; "Removing dead tracks from your iTunes Library" = "Removing dead tracks from your iTunes Library"; diff --git a/cocoa/base/fr.lproj/core.strings b/cocoa/base/fr.lproj/core.strings index 34edbe6e..c4df9736 100644 --- a/cocoa/base/fr.lproj/core.strings +++ b/cocoa/base/fr.lproj/core.strings @@ -18,7 +18,7 @@ "Analyzed %d/%d pictures" = "Analyzé %d/%d images"; "Preparing for matching" = "Préparation pour la comparaison"; -"Matched %d/%d pictures" = "Comparé %d/%d images"; +"Performed %d/%d chunk matches" = "%d/%d blocs d'images comparés"; "Verified %d/%d matches" = "Vérifié %d/%d paires"; "Removing dead tracks from your iTunes Library" = "Retrait des tracks mortes de votre librairie iTunes"; diff --git a/core_pe/matchbase.py b/core_pe/matchbase.py index e3e5534e..b419239c 100644 --- a/core_pe/matchbase.py +++ b/core_pe/matchbase.py @@ -8,8 +8,9 @@ import logging import multiprocessing -from collections import defaultdict, deque +from itertools import combinations +from hscommon.util import extract from hscommon.trans import tr from jobprogress import job @@ -17,15 +18,32 @@ from core.engine import Match from .block import avgdiff, DifferentBlockCountError, NoBlocksError from .cache import Cache +# OPTIMIZATION NOTES: +# The bottleneck of the matching phase is CPU, which is why we use multiprocessing. However, another +# bottleneck that shows up when a lot of pictures are involved is Disk IO's because blocks +# constantly have to be read from disks by subprocesses. This problem is especially big on CPUs +# with a lot of cores. Therefore, we must minimize Disk IOs. The best way to achieve that is to +# separate the files to scan in "chunks" and it's by chunk that blocks are read in memory and +# compared to each other. Each file in a chunk has to be compared to each other, of course, but also +# to files in other chunks. So chunkifying doesn't save us any actual comparison, but the advantage +# is that instead of reading blocks from disk number_of_files**2 times, we read it +# number_of_files*number_of_chunks times. +# Determining the right chunk size is tricky, bceause if it's too big, too many blocks will be in +# memory at the same time and we might end up with memory trashing, which is awfully slow. So, +# because our *real* bottleneck is CPU, the chunk size must simply be enough so that the CPU isn't +# starved by Disk IOs. + MIN_ITERATIONS = 3 BLOCK_COUNT_PER_SIDE = 15 +DEFAULT_CHUNK_SIZE = 1000 +MIN_CHUNK_SIZE = 100 # Enough so that we're sure that the main thread will not wait after a result.get() call -# cpucount*2 should be enough to be sure that the spawned process will not wait after the results +# cpucount+1 should be enough to be sure that the spawned process will not wait after the results # collection made by the main process. -RESULTS_QUEUE_LIMIT = multiprocessing.cpu_count() * 2 +RESULTS_QUEUE_LIMIT = multiprocessing.cpu_count() + 1 -def prepare_pictures(pictures, cache_path, j=job.nulljob): +def prepare_pictures(pictures, cache_path, with_dimensions, j=job.nulljob): # The MemoryError handlers in there use logging without first caring about whether or not # there is enough memory left to carry on the operation because it is assumed that the # MemoryError happens when trying to read an image file, which is freed from memory by the @@ -36,7 +54,8 @@ def prepare_pictures(pictures, cache_path, j=job.nulljob): for picture in j.iter_with_progress(pictures, tr("Analyzed %d/%d pictures")): picture.unicode_path = str(picture.path) logging.debug("Analyzing picture at {}".format(picture.unicode_path)) - picture.dimensions # pre-read dimensions + if with_dimensions: + picture.dimensions # pre-read dimensions try: if picture.unicode_path not in cache: blocks = picture.get_blocks(BLOCK_COUNT_PER_SIDE) @@ -53,18 +72,42 @@ def prepare_pictures(pictures, cache_path, j=job.nulljob): cache.close() return prepared +def get_chunks(pictures): + min_chunk_count = multiprocessing.cpu_count() * 2 # have enough chunks to feed all subprocesses + chunk_count = len(pictures) // DEFAULT_CHUNK_SIZE + chunk_count = max(min_chunk_count, chunk_count) + chunk_size = (len(pictures) // chunk_count) + 1 + chunk_size = max(MIN_CHUNK_SIZE, chunk_size) + logging.info("Creating {} chunks with a chunk size of {} for {} pictures".format( + chunk_count, chunk_size, len(pictures))) + chunks = [pictures[i:i+chunk_size] for i in range(0, len(pictures), chunk_size)] + return chunks + def get_match(first, second, percentage): if percentage < 0: percentage = 0 return Match(first, second, percentage) -def async_compare(ref_id, other_ids, dbname, threshold): +def async_compare(ref_ids, other_ids, dbname, threshold, picinfo): + # The list of ids in ref_ids have to be compared to the list of ids in other_ids. other_ids + # can be None. In this case, ref_ids has to be compared with itself + # picinfo is a dictionary {pic_id: (dimensions, is_ref)} cache = Cache(dbname) limit = 100 - threshold - ref_blocks = cache[ref_id] - pairs = cache.get_multiple(other_ids) + ref_pairs = list(cache.get_multiple(ref_ids)) + if other_ids is not None: + other_pairs = list(cache.get_multiple(other_ids)) + comparisons_to_do = [(r, o) for r in ref_pairs for o in other_pairs] + else: + comparisons_to_do = list(combinations(ref_pairs, 2)) results = [] - for other_id, other_blocks in pairs: + for (ref_id, ref_blocks), (other_id, other_blocks) in comparisons_to_do: + ref_dimensions, ref_is_ref = picinfo[ref_id] + other_dimensions, other_is_ref = picinfo[other_id] + if ref_is_ref and other_is_ref: + continue + if ref_dimensions != other_dimensions: + continue try: diff = avgdiff(ref_blocks, other_blocks, limit, MIN_ITERATIONS) percentage = 100 - diff @@ -76,48 +119,60 @@ def async_compare(ref_id, other_ids, dbname, threshold): return results def getmatches(pictures, cache_path, threshold=75, match_scaled=False, j=job.nulljob): + def get_picinfo(p): + if match_scaled: + return (None, p.is_ref) + else: + return (p.dimensions, p.is_ref) + + def collect_results(collect_all=False): + # collect results and wait until the queue is small enough to accomodate a new results. + nonlocal async_results, matches, comparison_count + limit = 0 if collect_all else RESULTS_QUEUE_LIMIT + while len(async_results) > limit: + ready, working = extract(lambda r: r.ready(), async_results) + for result in ready: + matches += result.get() + async_results.remove(result) + comparison_count += 1 + progress_msg = tr("Performed %d/%d chunk matches") % (comparison_count, len(comparisons_to_do)) + j.set_progress(comparison_count, progress_msg) + j = j.start_subjob([3, 7]) - pictures = prepare_pictures(pictures, cache_path, j) + pictures = prepare_pictures(pictures, cache_path, with_dimensions=not match_scaled, j=j) j = j.start_subjob([9, 1], tr("Preparing for matching")) cache = Cache(cache_path) id2picture = {} - dimensions2pictures = defaultdict(set) for picture in pictures: try: picture.cache_id = cache.get_id(picture.unicode_path) id2picture[picture.cache_id] = picture - if not match_scaled: - dimensions2pictures[picture.dimensions].add(picture) except ValueError: pass cache.close() pictures = [p for p in pictures if hasattr(p, 'cache_id')] pool = multiprocessing.Pool() - async_results = deque() + async_results = [] matches = [] - pictures_copy = set(pictures) - for ref in j.iter_with_progress(pictures, tr("Matched %d/%d pictures")): - others = pictures_copy if match_scaled else dimensions2pictures[ref.dimensions] - others.remove(ref) - if ref.is_ref: - # Don't spend time comparing two ref pics together. - others = [pic for pic in others if not pic.is_ref] - if others: - cache_ids = [f.cache_id for f in others] - # We limit the number of cache_ids we send for multi-processing because otherwise, we - # might get an error saying "String or BLOB exceeded size limit" - ARG_LIMIT = 1000 - while cache_ids: - args = (ref.cache_id, cache_ids[:ARG_LIMIT], cache_path, threshold) - async_results.append(pool.apply_async(async_compare, args)) - cache_ids = cache_ids[ARG_LIMIT:] - # We use a while here because it's possible that more than one result has been added if - # ARG_LIMIT has been reached. - while len(async_results) > RESULTS_QUEUE_LIMIT: - result = async_results.popleft() - matches.extend(result.get()) - for result in async_results: # process the rest of the results - matches.extend(result.get()) + chunks = get_chunks(pictures) + # We add a None element at the end of the chunk list because each chunk has to be compared + # with itself. Thus, each chunk will show up as a ref_chunk having other_chunk set to None once. + comparisons_to_do = list(combinations(chunks + [None], 2)) + comparison_count = 0 + j.start_job(len(comparisons_to_do)) + for ref_chunk, other_chunk in comparisons_to_do: + picinfo = {p.cache_id: get_picinfo(p) for p in ref_chunk} + ref_ids = [p.cache_id for p in ref_chunk] + if other_chunk is not None: + other_ids = [p.cache_id for p in other_chunk] + picinfo.update({p.cache_id: get_picinfo(p) for p in other_chunk}) + else: + other_ids = None + args = (ref_ids, other_ids, cache_path, threshold, picinfo) + async_results.append(pool.apply_async(async_compare, args)) + collect_results() + collect_results(collect_all=True) + pool.close() result = [] for ref_id, other_id, percentage in j.iter_with_progress(matches, tr("Verified %d/%d matches"), every=10): @@ -126,6 +181,8 @@ def getmatches(pictures, cache_path, threshold=75, match_scaled=False, j=job.nul if percentage == 100 and ref.md5 != other.md5: percentage = 99 if percentage >= threshold: + ref.dimensions # pre-read dimensions for display in results + other.dimensions result.append(get_match(ref, other, percentage)) return result diff --git a/qt/lang/fr.ts b/qt/lang/fr.ts index 849dbc5a..8e5ad3e4 100644 --- a/qt/lang/fr.ts +++ b/qt/lang/fr.ts @@ -84,7 +84,7 @@ Matched %d/%d pictures - Comparé %d/%d images + %d/%d blocs d'images comparés Verified %d/%d matches