From b6fe3126936ad18b982816aaa0a17ce6bf12968b Mon Sep 17 00:00:00 2001 From: Andrew Senetar Date: Wed, 23 Mar 2022 23:44:24 -0500 Subject: [PATCH] Update hash db code to upgrade schema --- core/fs.py | 101 +++++++++++++++++++++++++++++++---------------------- 1 file changed, 59 insertions(+), 42 deletions(-) diff --git a/core/fs.py b/core/fs.py index fc235a4a..a4cee502 100644 --- a/core/fs.py +++ b/core/fs.py @@ -11,12 +11,13 @@ # resulting needless complexity and memory usage. It's been a while since I wanted to do that fork, # and I'm doing it now. +import os import xxhash from math import floor import logging import sqlite3 from threading import Lock -from typing import Any +from typing import Any, AnyStr, Union from hscommon.path import Path from hscommon.util import nonone, get_file_ext @@ -83,9 +84,11 @@ class OperationError(FSError): class FilesDB: + schema_version = 1 + schema_version_description = "Changed from md5 to xxhash" - create_table_query = "CREATE TABLE IF NOT EXISTS files (path TEXT PRIMARY KEY, size INTEGER, mtime_ns INTEGER, entry_dt DATETIME, md5 BLOB, md5partial BLOB)" - drop_table_query = "DROP TABLE files;" + create_table_query = "CREATE TABLE IF NOT EXISTS files (path TEXT PRIMARY KEY, size INTEGER, mtime_ns INTEGER, entry_dt DATETIME, digest BLOB, digest_partial BLOB, digest_samples BLOB)" + drop_table_query = "DROP TABLE IF EXISTS files;" select_query = "SELECT {key} FROM files WHERE path=:path AND size=:size and mtime_ns=:mtime_ns" insert_query = """ INSERT INTO files (path, size, mtime_ns, entry_dt, {key}) VALUES (:path, :size, :mtime_ns, datetime('now'), :value) @@ -97,24 +100,37 @@ class FilesDB: self.cur = None self.lock = None - def connect(self, path): - # type: (str, ) -> None - + def connect(self, path: Union[AnyStr, os.PathLike]) -> None: self.conn = sqlite3.connect(path, check_same_thread=False) self.cur = self.conn.cursor() - self.cur.execute(self.create_table_query) self.lock = Lock() + self._check_upgrade() - def clear(self): - # type: () -> None + def _check_upgrade(self) -> None: + with self.lock: + has_schema = self.cur.execute( + "SELECT NAME FROM sqlite_master WHERE type='table' AND name='schema_version'" + ).fetchall() + version = None + if has_schema: + version = self.cur.execute("SELECT version FROM schema_version ORDER BY version DESC").fetchone()[0] + else: + self.cur.execute("CREATE TABLE schema_version (version int PRIMARY KEY, description TEXT)") + if version != self.schema_version: + self.cur.execute(self.drop_table_query) + self.cur.execute( + "INSERT OR REPLACE INTO schema_version VALUES (:version, :description)", + {"version": self.schema_version, "description": self.schema_version_description}, + ) + self.cur.execute(self.create_table_query) + self.conn.commit() + def clear(self) -> None: with self.lock: self.cur.execute(self.drop_table_query) self.cur.execute(self.create_table_query) - def get(self, path, key): - # type: (Path, str) -> bytes - + def get(self, path: Path, key: str) -> Union[bytes, None]: stat = path.stat() size = stat.st_size mtime_ns = stat.st_mtime_ns @@ -128,9 +144,7 @@ class FilesDB: return None - def put(self, path, key, value): - # type: (Path, str, Any) -> None - + def put(self, path: Path, key: str, value: Any) -> None: stat = path.stat() size = stat.st_size mtime_ns = stat.st_mtime_ns @@ -141,15 +155,11 @@ class FilesDB: {"path": str(path), "size": size, "mtime_ns": mtime_ns, "value": value}, ) - def commit(self): - # type: () -> None - + def commit(self) -> None: with self.lock: self.conn.commit() - def close(self): - # type: () -> None - + def close(self) -> None: with self.lock: self.cur.close() self.conn.close() @@ -214,6 +224,25 @@ class File: partial_data = fp.read(size) return xxhash.xxh128_digest(partial_data) + def _calc_digest_samples(self) -> bytes: + size = self.size + with self.path.open("rb") as fp: + # Chunk at 25% of the file + fp.seek(floor(size * 25 / 100), 0) + file_data = fp.read(CHUNK_SIZE) + file_hash = xxhash.xxh128(file_data) + + # Chunk at 60% of the file + fp.seek(floor(size * 60 / 100), 0) + file_data = fp.read(CHUNK_SIZE) + file_hash.update(file_data) + + # Last chunk of the file + fp.seek(-CHUNK_SIZE, 2) + file_data = fp.read(CHUNK_SIZE) + file_hash.update(file_data) + return file_hash.digest() + def _read_info(self, field): # print(f"_read_info({field}) for {self}") if field in ("size", "mtime"): @@ -222,18 +251,18 @@ class File: self.mtime = nonone(stats.st_mtime, 0) elif field == "digest_partial": try: - self.digest_partial = filesdb.get(self.path, "md5partial") + self.digest_partial = filesdb.get(self.path, "digest_partial") if self.digest_partial is None: self.digest_partial = self._calc_digest_partial() - filesdb.put(self.path, "md5partial", self.digest_partial) + filesdb.put(self.path, "digest_partial", self.digest_partial) except Exception as e: logging.warning("Couldn't get digest_partial for %s: %s", self.path, e) elif field == "digest": try: - self.digest = filesdb.get(self.path, "md5") + self.digest = filesdb.get(self.path, "digest") if self.digest is None: self.digest = self._calc_digest() - filesdb.put(self.path, "md5", self.digest) + filesdb.put(self.path, "digest", self.digest) except Exception as e: logging.warning("Couldn't get digest for %s: %s", self.path, e) elif field == "digest_samples": @@ -243,24 +272,12 @@ class File: setattr(self, field, self.digest) return try: - with self.path.open("rb") as fp: - # Chunk at 25% of the file - fp.seek(floor(size * 25 / 100), 0) - file_data = fp.read(CHUNK_SIZE) - file_hash = xxhash.xxh128(file_data) - - # Chunk at 60% of the file - fp.seek(floor(size * 60 / 100), 0) - file_data = fp.read(CHUNK_SIZE) - file_hash.update(file_data) - - # Last chunk of the file - fp.seek(-CHUNK_SIZE, 2) - file_data = fp.read(CHUNK_SIZE) - file_hash.update(file_data) - setattr(self, field, file_hash.digest()) + self.digest_samples = filesdb.get(self.path, "digest_samples") + if self.digest_samples is None: + self.digest_samples = self._calc_digest_samples() + filesdb.put(self.path, "digest_samples", self.digest_samples) except Exception as e: - logging.error(f"Error computing digest_samples: {e}") + logging.warning(f"Couldn't get digest_samples for {self.path}: {e}") def _read_all_info(self, attrnames=None): """Cache all possible info.