1
0
mirror of https://github.com/arsenetar/dupeguru.git synced 2026-01-22 14:41:39 +00:00

Additional type hints in hscommon

This commit is contained in:
2022-05-11 00:50:34 -05:00
parent 7865e4aeac
commit d5eeab4a17
6 changed files with 133 additions and 111 deletions

View File

@@ -7,6 +7,9 @@
# http://www.gnu.org/licenses/gpl-3.0.html
from typing import Any, Callable, Generator, Iterator, List, Union
class JobCancelled(Exception):
"The user has cancelled the job"
@@ -36,7 +39,7 @@ class Job:
"""
# ---Magic functions
def __init__(self, job_proportions, callback):
def __init__(self, job_proportions: Union[List[int], int], callback: Callable) -> None:
"""Initialize the Job with 'jobcount' jobs. Start every job with
start_job(). Every time the job progress is updated, 'callback' is called
'callback' takes a 'progress' int param, and a optional 'desc'
@@ -55,12 +58,12 @@ class Job:
self._currmax = 1
# ---Private
def _subjob_callback(self, progress, desc=""):
def _subjob_callback(self, progress: int, desc: str = "") -> bool:
"""This is the callback passed to children jobs."""
self.set_progress(progress, desc)
return True # if JobCancelled has to be raised, it will be at the highest level
def _do_update(self, desc):
def _do_update(self, desc: str) -> None:
"""Calls the callback function with a % progress as a parameter.
The parameter is a int in the 0-100 range.
@@ -78,13 +81,16 @@ class Job:
raise JobCancelled()
# ---Public
def add_progress(self, progress=1, desc=""):
def add_progress(self, progress: int = 1, desc: str = "") -> None:
self.set_progress(self._progress + progress, desc)
def check_if_cancelled(self):
def check_if_cancelled(self) -> None:
self._do_update("")
def iter_with_progress(self, iterable, desc_format=None, every=1, count=None):
# TODO type hint iterable
def iter_with_progress(
self, iterable, desc_format: Union[str, None] = None, every: int = 1, count: Union[int, None] = None
) -> Generator[Any, None, None]:
"""Iterate through ``iterable`` while automatically adding progress.
WARNING: We need our iterable's length. If ``iterable`` is not a sequence (that is,
@@ -107,7 +113,7 @@ class Job:
desc = desc_format % (count, count)
self.set_progress(100, desc)
def start_job(self, max_progress=100, desc=""):
def start_job(self, max_progress: int = 100, desc: str = "") -> None:
"""Begin work on the next job. You must not call start_job more than
'jobcount' (in __init__) times.
'max' is the job units you are to perform.
@@ -122,7 +128,7 @@ class Job:
self._currmax = max(1, max_progress)
self._do_update(desc)
def start_subjob(self, job_proportions, desc=""):
def start_subjob(self, job_proportions: Union[List[int], int], desc: str = "") -> "Job":
"""Starts a sub job. Use this when you want to split a job into
multiple smaller jobs. Pretty handy when starting a process where you
know how many subjobs you will have, but don't know the work unit count
@@ -132,7 +138,7 @@ class Job:
self.start_job(100, desc)
return Job(job_proportions, self._subjob_callback)
def set_progress(self, progress, desc=""):
def set_progress(self, progress: int, desc: str = "") -> None:
"""Sets the progress of the current job to 'progress', and call the
callback
"""
@@ -143,29 +149,29 @@ class Job:
class NullJob:
def __init__(self, *args, **kwargs):
def __init__(self, *args, **kwargs) -> None:
# Null job does nothing
pass
def add_progress(self, *args, **kwargs):
def add_progress(self, *args, **kwargs) -> None:
# Null job does nothing
pass
def check_if_cancelled(self):
def check_if_cancelled(self) -> None:
# Null job does nothing
pass
def iter_with_progress(self, sequence, *args, **kwargs):
def iter_with_progress(self, sequence, *args, **kwargs) -> Iterator:
return iter(sequence)
def start_job(self, *args, **kwargs):
def start_job(self, *args, **kwargs) -> None:
# Null job does nothing
pass
def start_subjob(self, *args, **kwargs):
def start_subjob(self, *args, **kwargs) -> "NullJob":
return NullJob()
def set_progress(self, *args, **kwargs):
def set_progress(self, *args, **kwargs) -> None:
# Null job does nothing
pass

View File

@@ -8,6 +8,7 @@
from threading import Thread
import sys
from typing import Callable, Tuple, Union
from hscommon.jobprogress.job import Job, JobInProgressError, JobCancelled
@@ -28,15 +29,15 @@ class ThreadedJobPerformer:
last_error = None
# --- Protected
def create_job(self):
def create_job(self) -> Job:
if self._job_running:
raise JobInProgressError()
self.last_progress = -1
self.last_progress: Union[int, None] = -1
self.last_desc = ""
self.job_cancelled = False
return Job(1, self._update_progress)
def _async_run(self, *args):
def _async_run(self, *args) -> None:
target = args[0]
args = tuple(args[1:])
self._job_running = True
@@ -52,7 +53,7 @@ class ThreadedJobPerformer:
self._job_running = False
self.last_progress = None
def reraise_if_error(self):
def reraise_if_error(self) -> None:
"""Reraises the error that happened in the thread if any.
Call this after the caller of run_threaded detected that self._job_running returned to False
@@ -60,13 +61,13 @@ class ThreadedJobPerformer:
if self.last_error is not None:
raise self.last_error.with_traceback(self.last_traceback)
def _update_progress(self, newprogress, newdesc=""):
def _update_progress(self, newprogress: int, newdesc: str = "") -> bool:
self.last_progress = newprogress
if newdesc:
self.last_desc = newdesc
return not self.job_cancelled
def run_threaded(self, target, args=()):
def run_threaded(self, target: Callable, args: Tuple = ()) -> None:
if self._job_running:
raise JobInProgressError()
args = (target,) + args