Update hash db code to upgrade schema

This commit is contained in:
Andrew Senetar 2022-03-23 23:44:24 -05:00
parent e16df489bd
commit b6fe312693
Signed by: arsenetar
GPG Key ID: C63300DCE48AB2F1
1 changed files with 59 additions and 42 deletions

View File

@ -11,12 +11,13 @@
# resulting needless complexity and memory usage. It's been a while since I wanted to do that fork,
# and I'm doing it now.
import os
import xxhash
from math import floor
import logging
import sqlite3
from threading import Lock
from typing import Any
from typing import Any, AnyStr, Union
from hscommon.path import Path
from hscommon.util import nonone, get_file_ext
@ -83,9 +84,11 @@ class OperationError(FSError):
class FilesDB:
schema_version = 1
schema_version_description = "Changed from md5 to xxhash"
create_table_query = "CREATE TABLE IF NOT EXISTS files (path TEXT PRIMARY KEY, size INTEGER, mtime_ns INTEGER, entry_dt DATETIME, md5 BLOB, md5partial BLOB)"
drop_table_query = "DROP TABLE files;"
create_table_query = "CREATE TABLE IF NOT EXISTS files (path TEXT PRIMARY KEY, size INTEGER, mtime_ns INTEGER, entry_dt DATETIME, digest BLOB, digest_partial BLOB, digest_samples BLOB)"
drop_table_query = "DROP TABLE IF EXISTS files;"
select_query = "SELECT {key} FROM files WHERE path=:path AND size=:size and mtime_ns=:mtime_ns"
insert_query = """
INSERT INTO files (path, size, mtime_ns, entry_dt, {key}) VALUES (:path, :size, :mtime_ns, datetime('now'), :value)
@ -97,24 +100,37 @@ class FilesDB:
self.cur = None
self.lock = None
def connect(self, path):
# type: (str, ) -> None
def connect(self, path: Union[AnyStr, os.PathLike]) -> None:
self.conn = sqlite3.connect(path, check_same_thread=False)
self.cur = self.conn.cursor()
self.cur.execute(self.create_table_query)
self.lock = Lock()
self._check_upgrade()
def clear(self):
# type: () -> None
def _check_upgrade(self) -> None:
with self.lock:
has_schema = self.cur.execute(
"SELECT NAME FROM sqlite_master WHERE type='table' AND name='schema_version'"
).fetchall()
version = None
if has_schema:
version = self.cur.execute("SELECT version FROM schema_version ORDER BY version DESC").fetchone()[0]
else:
self.cur.execute("CREATE TABLE schema_version (version int PRIMARY KEY, description TEXT)")
if version != self.schema_version:
self.cur.execute(self.drop_table_query)
self.cur.execute(
"INSERT OR REPLACE INTO schema_version VALUES (:version, :description)",
{"version": self.schema_version, "description": self.schema_version_description},
)
self.cur.execute(self.create_table_query)
self.conn.commit()
def clear(self) -> None:
with self.lock:
self.cur.execute(self.drop_table_query)
self.cur.execute(self.create_table_query)
def get(self, path, key):
# type: (Path, str) -> bytes
def get(self, path: Path, key: str) -> Union[bytes, None]:
stat = path.stat()
size = stat.st_size
mtime_ns = stat.st_mtime_ns
@ -128,9 +144,7 @@ class FilesDB:
return None
def put(self, path, key, value):
# type: (Path, str, Any) -> None
def put(self, path: Path, key: str, value: Any) -> None:
stat = path.stat()
size = stat.st_size
mtime_ns = stat.st_mtime_ns
@ -141,15 +155,11 @@ class FilesDB:
{"path": str(path), "size": size, "mtime_ns": mtime_ns, "value": value},
)
def commit(self):
# type: () -> None
def commit(self) -> None:
with self.lock:
self.conn.commit()
def close(self):
# type: () -> None
def close(self) -> None:
with self.lock:
self.cur.close()
self.conn.close()
@ -214,6 +224,25 @@ class File:
partial_data = fp.read(size)
return xxhash.xxh128_digest(partial_data)
def _calc_digest_samples(self) -> bytes:
size = self.size
with self.path.open("rb") as fp:
# Chunk at 25% of the file
fp.seek(floor(size * 25 / 100), 0)
file_data = fp.read(CHUNK_SIZE)
file_hash = xxhash.xxh128(file_data)
# Chunk at 60% of the file
fp.seek(floor(size * 60 / 100), 0)
file_data = fp.read(CHUNK_SIZE)
file_hash.update(file_data)
# Last chunk of the file
fp.seek(-CHUNK_SIZE, 2)
file_data = fp.read(CHUNK_SIZE)
file_hash.update(file_data)
return file_hash.digest()
def _read_info(self, field):
# print(f"_read_info({field}) for {self}")
if field in ("size", "mtime"):
@ -222,18 +251,18 @@ class File:
self.mtime = nonone(stats.st_mtime, 0)
elif field == "digest_partial":
try:
self.digest_partial = filesdb.get(self.path, "md5partial")
self.digest_partial = filesdb.get(self.path, "digest_partial")
if self.digest_partial is None:
self.digest_partial = self._calc_digest_partial()
filesdb.put(self.path, "md5partial", self.digest_partial)
filesdb.put(self.path, "digest_partial", self.digest_partial)
except Exception as e:
logging.warning("Couldn't get digest_partial for %s: %s", self.path, e)
elif field == "digest":
try:
self.digest = filesdb.get(self.path, "md5")
self.digest = filesdb.get(self.path, "digest")
if self.digest is None:
self.digest = self._calc_digest()
filesdb.put(self.path, "md5", self.digest)
filesdb.put(self.path, "digest", self.digest)
except Exception as e:
logging.warning("Couldn't get digest for %s: %s", self.path, e)
elif field == "digest_samples":
@ -243,24 +272,12 @@ class File:
setattr(self, field, self.digest)
return
try:
with self.path.open("rb") as fp:
# Chunk at 25% of the file
fp.seek(floor(size * 25 / 100), 0)
file_data = fp.read(CHUNK_SIZE)
file_hash = xxhash.xxh128(file_data)
# Chunk at 60% of the file
fp.seek(floor(size * 60 / 100), 0)
file_data = fp.read(CHUNK_SIZE)
file_hash.update(file_data)
# Last chunk of the file
fp.seek(-CHUNK_SIZE, 2)
file_data = fp.read(CHUNK_SIZE)
file_hash.update(file_data)
setattr(self, field, file_hash.digest())
self.digest_samples = filesdb.get(self.path, "digest_samples")
if self.digest_samples is None:
self.digest_samples = self._calc_digest_samples()
filesdb.put(self.path, "digest_samples", self.digest_samples)
except Exception as e:
logging.error(f"Error computing digest_samples: {e}")
logging.warning(f"Couldn't get digest_samples for {self.path}: {e}")
def _read_all_info(self, attrnames=None):
"""Cache all possible info.