dgse qt: removed all hsfs usages.

--HG--
extra : convert_revision : svn%3Ac306627e-7827-47d3-bdf0-9a457c9553a1/trunk%40200
This commit is contained in:
hsoft 2009-10-23 12:56:52 +00:00
parent 49165125e4
commit b2b316b642
17 changed files with 334 additions and 233 deletions

View File

@ -14,13 +14,13 @@ import os
import os.path as op
import logging
from hsutil import job, io, files
from hsutil import io, files
from hsutil.path import Path
from hsutil.reg import RegistrableApplication, RegistrationRequired
from hsutil.misc import flatten, first
from hsutil.str import escape
from . import directories, results, scanner, export
from . import directories, results, scanner, export, fs
JOB_SCAN = 'job_scan'
JOB_LOAD = 'job_load'
@ -98,13 +98,8 @@ class DupeGuru(RegistrableApplication):
return ['---'] * len(self.data.COLUMNS)
def _get_file(self, str_path):
p = Path(str_path)
for d in self.directories:
if p not in d.path:
continue
result = d.find_path(p[d.path:])
if result is not None:
return result
path = Path(str_path)
return fs.get_file(path, self.directories.fileclasses)
@staticmethod
def _recycle_dupe(dupe):
@ -150,7 +145,7 @@ class DupeGuru(RegistrableApplication):
2 = absolute re-creation.
"""
source_path = dupe.path
location_path = dupe.root.path
location_path = first(p for p in self.directories if dupe.path in p)
dest_path = Path(destination)
if dest_type == 2:
dest_path = dest_path + source_path[1:-1] #Remove drive letter and filename

View File

@ -12,13 +12,12 @@ from AppKit import *
import logging
import os.path as op
import hsfs as fs
from hsutil import io, cocoa, job
from hsutil.cocoa import install_exception_hook
from hsutil.misc import stripnone
from hsutil.reg import RegistrationRequired
import app, data
from . import app, fs
JOBID2TITLE = {
app.JOB_SCAN: "Scanning for duplicates",
@ -43,8 +42,6 @@ class DupeGuru(app.DupeGuru):
logging.basicConfig(level=LOGGING_LEVEL, format='%(levelname)s %(message)s')
logging.debug('started in debug mode')
install_exception_hook()
if data_module is None:
data_module = data
appsupport = NSSearchPathForDirectoriesInDomains(NSApplicationSupportDirectory, NSUserDomainMask, True)[0]
appdata = op.join(appsupport, appdata_subdir)
app.DupeGuru.__init__(self, data_module, appdata, appid)
@ -91,15 +88,15 @@ class DupeGuru(app.DupeGuru):
except IndexError:
return (None,None)
def GetDirectory(self,node_path,curr_dir=None):
def get_folder_path(self, node_path, curr_path=None):
if not node_path:
return curr_dir
if curr_dir is not None:
l = curr_dir.dirs
return curr_path
current_index = node_path[0]
if curr_path is None:
curr_path = self.directories[current_index]
else:
l = self.directories
d = l[node_path[0]]
return self.GetDirectory(node_path[1:],d)
curr_path = self.directories.get_subfolders(curr_path)[current_index]
return self.get_folder_path(node_path[1:], curr_path)
def RefreshDetailsTable(self,dupe,group):
l1 = self._get_display_info(dupe, group, False)
@ -146,13 +143,13 @@ class DupeGuru(app.DupeGuru):
def RemoveSelected(self):
self.results.remove_duplicates(self.selected_dupes)
def RenameSelected(self,newname):
def RenameSelected(self, newname):
try:
d = self.selected_dupes[0]
d = d.move(d.parent,newname)
d.rename(newname)
return True
except (IndexError,fs.FSError),e:
logging.warning("dupeGuru Warning: %s" % str(e))
except (IndexError, fs.FSError) as e:
logging.warning("dupeGuru Warning: %s" % unicode(e))
return False
def RevealSelected(self):
@ -214,9 +211,9 @@ class DupeGuru(app.DupeGuru):
self.results.dupes[row] for row in rows if row in xrange(len(self.results.dupes))
]
def SetDirectoryState(self,node_path,state):
d = self.GetDirectory(node_path)
self.directories.set_state(d.path,state)
def SetDirectoryState(self, node_path, state):
p = self.get_folder_path(node_path)
self.directories.set_state(p, state)
def sort_dupes(self,key,asc):
self.results.sort_dupes(key,asc,self.display_delta_values)
@ -245,8 +242,9 @@ class DupeGuru(app.DupeGuru):
return [len(g.dupes) for g in self.results.groups]
elif tag == 1: #Directories
try:
dirs = self.GetDirectory(node_path).dirs if node_path else self.directories
return [d.dircount for d in dirs]
path = self.get_folder_path(node_path)
subfolders = self.directories.get_subfolders(path)
return [len(self.directories.get_subfolders(path)) for path in subfolders]
except IndexError: # node_path out of range
return []
else: #Power Marker
@ -270,8 +268,8 @@ class DupeGuru(app.DupeGuru):
return result
elif tag == 1: #Directories
try:
d = self.GetDirectory(node_path)
return [d.name, self.directories.get_state(d.path)]
path = self.get_folder_path(node_path)
return [path[-1], self.directories.get_state(path)]
except IndexError: # node_path out of range
return []

View File

@ -9,11 +9,12 @@
import xml.dom.minidom
from hsfs import phys
import hsfs as fs
from hsutil import io
from hsutil.files import FileOrPath
from hsutil.path import Path
from . import fs
(STATE_NORMAL,
STATE_REFERENCE,
STATE_EXCLUDED) = range(3)
@ -26,15 +27,14 @@ class InvalidPathError(Exception):
class Directories(object):
#---Override
def __init__(self):
def __init__(self, fileclasses=[fs.File]):
self._dirs = []
self.states = {}
self.dirclass = phys.Directory
self.special_dirclasses = {}
self.fileclasses = fileclasses
def __contains__(self,path):
for d in self._dirs:
if path in d.path:
def __contains__(self, path):
for p in self._dirs:
if path in p:
return True
return False
@ -53,8 +53,7 @@ class Directories(object):
if path[-1].startswith('.'): # hidden
return STATE_EXCLUDED
def _get_files(self, from_dir):
from_path = from_dir.path
def _get_files(self, from_path):
state = self.get_state(from_path)
if state == STATE_EXCLUDED:
# Recursively get files from folders with lots of subfolder is expensive. However, there
@ -62,14 +61,17 @@ class Directories(object):
# through self.states and see if we must continue, or we can stop right here to save time
if not any(p[:len(from_path)] == from_path for p in self.states):
return
result = []
for subdir in from_dir.dirs:
for file in self._get_files(subdir):
yield file
if state != STATE_EXCLUDED:
for file in from_dir.files:
file.is_ref = state == STATE_REFERENCE
yield file
try:
subdir_paths = [from_path + name for name in io.listdir(from_path) if io.isdir(from_path + name)]
for subdir_path in subdir_paths:
for file in self._get_files(subdir_path):
yield file
if state != STATE_EXCLUDED:
for file in fs.get_files(from_path, fileclasses=self.fileclasses):
file.is_ref = state == STATE_REFERENCE
yield file
except (EnvironmentError, fs.InvalidPath):
pass
#---Public
def add_path(self, path):
@ -80,29 +82,30 @@ class Directories(object):
under it will be removed. Can also raise InvalidPathError if 'path' does not exist.
"""
if path in self:
raise AlreadyThereError
self._dirs = [d for d in self._dirs if d.path not in path]
try:
dirclass = self.special_dirclasses.get(path, self.dirclass)
d = dirclass(None, unicode(path))
d[:] #If an InvalidPath exception has to be raised, it will be raised here
self._dirs.append(d)
return d
except fs.InvalidPath:
raise AlreadyThereError()
if not io.exists(path):
raise InvalidPathError()
self._dirs = [p for p in self._dirs if p not in path]
self._dirs.append(path)
@staticmethod
def get_subfolders(path):
"""returns a sorted list of paths corresponding to subfolders in `path`"""
try:
names = [name for name in io.listdir(path) if io.isdir(path + name)]
names.sort(key=lambda x:x.lower())
return [path + name for name in names]
except EnvironmentError:
return []
def get_files(self):
"""Returns a list of all files that are not excluded.
Returned files also have their 'is_ref' attr set.
"""
for d in self._dirs:
d.force_update()
try:
for file in self._get_files(d):
yield file
except fs.InvalidPath:
pass
for path in self._dirs:
for file in self._get_files(path):
yield file
def get_state(self, path):
"""Returns the state of 'path' (One of the STATE_* const.)
@ -123,8 +126,8 @@ class Directories(object):
doc = xml.dom.minidom.parse(infile)
except:
return
root_dir_nodes = doc.getElementsByTagName('root_directory')
for rdn in root_dir_nodes:
root_path_nodes = doc.getElementsByTagName('root_directory')
for rdn in root_path_nodes:
if not rdn.getAttributeNode('path'):
continue
path = rdn.getAttributeNode('path').nodeValue
@ -144,9 +147,9 @@ class Directories(object):
with FileOrPath(outfile, 'wb') as fp:
doc = xml.dom.minidom.Document()
root = doc.appendChild(doc.createElement('directories'))
for root_dir in self:
root_dir_node = root.appendChild(doc.createElement('root_directory'))
root_dir_node.setAttribute('path', unicode(root_dir.path).encode('utf-8'))
for root_path in self:
root_path_node = root.appendChild(doc.createElement('root_directory'))
root_path_node.setAttribute('path', unicode(root_path).encode('utf-8'))
for path, state in self.states.iteritems():
state_node = root.appendChild(doc.createElement('state'))
state_node.setAttribute('path', unicode(path).encode('utf-8'))

View File

@ -19,7 +19,7 @@ import hashlib
import logging
from hsutil import io
from hsutil.misc import nonone
from hsutil.misc import nonone, flatten
from hsutil.str import get_file_ext
class FSError(Exception):
@ -129,48 +129,22 @@ class File(object):
#--- Public
@classmethod
def can_handle(cls, path):
return io.isfile(path)
return not io.islink(path) and io.isfile(path)
def copy(self, destpath, newname=None, force=False):
if newname is None:
newname = self.name
destpath = destpath + newname
if (not force) and (io.exists(destpath)):
raise AlreadyExistsError(self, destpath[:-1])
try:
io.copy(self.path, destpath)
except EnvironmentError:
raise OperationError(self)
if not io.exists(destpath):
raise OperationError(self)
def move(self, destpath, newname=None, force=False):
if newname is None:
newname = self.name
destpath = destpath + newname
def rename(self, newname):
if newname == self.name:
return
destpath = self.path[:-1] + newname
if io.exists(destpath):
if force:
io.remove(destpath)
else:
raise AlreadyExistsError(self, destpath[:-1])
raise AlreadyExistsError(newname, self.path[:-1])
try:
io.move(self.path, destpath)
io.rename(self.path, destpath)
except EnvironmentError:
raise OperationError(self)
if not io.exists(destpath):
raise OperationError(self)
self.path = destpath
def rename(self, newname):
newpath = self.path[:-1] + newname
if io.exists(newpath):
raise AlreadyExistsError(newname, self.path[:-1])
try:
io.rename(self.path, newpath)
except OSError:
raise OperationError(self)
self.path = newpath
#--- Properties
@property
def extension(self):
@ -181,10 +155,25 @@ class File(object):
return self.path[-1]
def get_files(path, fileclass=File):
assert issubclass(fileclass, File)
def get_file(path, fileclasses=[File]):
for fileclass in fileclasses:
if fileclass.can_handle(path):
return fileclass(path)
def get_files(path, fileclasses=[File]):
assert all(issubclass(fileclass, File) for fileclass in fileclasses)
try:
paths = [path + name for name in io.listdir(path)]
return [fileclass(path) for path in paths if not io.islink(path) and io.isfile(path)]
result = []
for path in paths:
file = get_file(path, fileclasses=fileclasses)
if file is not None:
result.append(file)
return result
except EnvironmentError:
raise InvalidPath(path)
def get_all_files(path, fileclasses=[File]):
subfolders = [path + name for name in io.listdir(path) if not io.islink(path + name) and io.isdir(path + name)]
subfiles = flatten(get_all_files(subpath, fileclasses=fileclasses) for subpath in subfolders)
return subfiles + get_files(path, fileclasses=fileclasses)

View File

@ -33,7 +33,7 @@ class Scanner(object):
self.discarded_file_count = 0
def _getmatches(self, files, j):
if not self.size_threshold:
if self.size_threshold:
j = j.start_subjob([2, 8])
for f in j.iter_with_progress(files, 'Read size of %d/%d files'):
f.size # pre-read, makes a smoother progress if read here (especially for bundles)

View File

@ -18,10 +18,10 @@ from hsutil.path import Path
from hsutil.testcase import TestCase
from hsutil.decorators import log_calls
from hsutil import io
import hsfs.phys
from . import data
from .results_test import GetTestGroups
from .. import engine, data
from .. import engine, fs
try:
from ..app_cocoa import DupeGuru as DupeGuruBase
except ImportError:
@ -35,7 +35,6 @@ class DupeGuru(DupeGuruBase):
def _start_job(self, jobid, func):
func(nulljob)
def r2np(rows):
#Transforms a list of rows [1,2,3] into a list of node paths [[1],[2],[3]]
return [[i] for i in rows]
@ -310,15 +309,15 @@ class TCDupeGuru(TestCase):
class TCDupeGuru_renameSelected(TestCase):
def setUp(self):
p = Path(tempfile.mkdtemp())
fp = open(str(p + 'foo bar 1'),mode='w')
p = self.tmppath()
fp = open(unicode(p + 'foo bar 1'),mode='w')
fp.close()
fp = open(str(p + 'foo bar 2'),mode='w')
fp = open(unicode(p + 'foo bar 2'),mode='w')
fp.close()
fp = open(str(p + 'foo bar 3'),mode='w')
fp = open(unicode(p + 'foo bar 3'),mode='w')
fp.close()
refdir = hsfs.phys.Directory(None,str(p))
matches = engine.getmatches(refdir.files)
files = fs.get_files(p)
matches = engine.getmatches(files)
groups = engine.get_groups(matches)
g = groups[0]
g.prioritize(lambda x:x.name)
@ -327,45 +326,41 @@ class TCDupeGuru_renameSelected(TestCase):
self.app = app
self.groups = groups
self.p = p
self.refdir = refdir
def tearDown(self):
shutil.rmtree(str(self.p))
self.files = files
def test_simple(self):
app = self.app
refdir = self.refdir
g = self.groups[0]
app.SelectPowerMarkerNodePaths(r2np([0]))
self.assert_(app.RenameSelected('renamed'))
self.assert_('renamed' in refdir)
self.assert_('foo bar 2' not in refdir)
self.assert_(g.dupes[0] is refdir['renamed'])
self.assert_(g.dupes[0] in refdir)
assert app.RenameSelected('renamed')
names = io.listdir(self.p)
assert 'renamed' in names
assert 'foo bar 2' not in names
eq_(g.dupes[0].name, 'renamed')
def test_none_selected(self):
app = self.app
refdir = self.refdir
g = self.groups[0]
app.SelectPowerMarkerNodePaths([])
self.mock(logging, 'warning', log_calls(lambda msg: None))
self.assert_(not app.RenameSelected('renamed'))
assert not app.RenameSelected('renamed')
msg = logging.warning.calls[0]['msg']
self.assertEqual('dupeGuru Warning: list index out of range', msg)
self.assert_('renamed' not in refdir)
self.assert_('foo bar 2' in refdir)
self.assert_(g.dupes[0] is refdir['foo bar 2'])
eq_('dupeGuru Warning: list index out of range', msg)
names = io.listdir(self.p)
assert 'renamed' not in names
assert 'foo bar 2' in names
eq_(g.dupes[0].name, 'foo bar 2')
def test_name_already_exists(self):
app = self.app
refdir = self.refdir
g = self.groups[0]
app.SelectPowerMarkerNodePaths(r2np([0]))
self.mock(logging, 'warning', log_calls(lambda msg: None))
self.assert_(not app.RenameSelected('foo bar 1'))
assert not app.RenameSelected('foo bar 1')
msg = logging.warning.calls[0]['msg']
self.assert_(msg.startswith('dupeGuru Warning: \'foo bar 2\' already exists in'))
self.assert_('foo bar 1' in refdir)
self.assert_('foo bar 2' in refdir)
self.assert_(g.dupes[0] is refdir['foo bar 2'])
assert msg.startswith('dupeGuru Warning: \'foo bar 1\' already exists in')
names = io.listdir(self.p)
assert 'foo bar 1' in names
assert 'foo bar 2' in names
eq_(g.dupes[0].name, 'foo bar 2')

View File

@ -13,12 +13,11 @@ from hsutil.testcase import TestCase
from hsutil import io
from hsutil.path import Path
from hsutil.decorators import log_calls
import hsfs as fs
import hsfs.phys
import hsutil.files
from hsutil.job import nulljob
from .. import data, app
from . import data
from .. import app, fs
from ..app import DupeGuru as DupeGuruBase
class DupeGuru(DupeGuruBase):
@ -59,27 +58,27 @@ class TCDupeGuru(TestCase):
# The goal here is just to have a test for a previous blowup I had. I know my test coverage
# for this unit is pathetic. What's done is done. My approach now is to add tests for
# every change I want to make. The blowup was caused by a missing import.
dupe_parent = fs.Directory(None, 'foo')
dupe = fs.File(dupe_parent, 'bar')
dupe.copy = log_calls(lambda dest, newname: None)
p = self.tmppath()
io.open(p + 'foo', 'w').close()
self.mock(hsutil.files, 'copy', log_calls(lambda source_path, dest_path: None))
self.mock(os, 'makedirs', lambda path: None) # We don't want the test to create that fake directory
self.mock(fs.phys, 'Directory', fs.Directory) # We don't want an error because makedirs didn't work
app = DupeGuru()
app.copy_or_move(dupe, True, 'some_destination', 0)
app.directories.add_path(p)
[f] = app.directories.get_files()
app.copy_or_move(f, True, 'some_destination', 0)
self.assertEqual(1, len(hsutil.files.copy.calls))
call = hsutil.files.copy.calls[0]
self.assertEqual('some_destination', call['dest_path'])
self.assertEqual(dupe.path, call['source_path'])
self.assertEqual(f.path, call['source_path'])
def test_copy_or_move_clean_empty_dirs(self):
tmppath = Path(self.tmpdir())
sourcepath = tmppath + 'source'
io.mkdir(sourcepath)
io.open(sourcepath + 'myfile', 'w')
tmpdir = hsfs.phys.Directory(None, unicode(tmppath))
myfile = tmpdir['source']['myfile']
app = DupeGuru()
app.directories.add_path(tmppath)
[myfile] = app.directories.get_files()
self.mock(app, 'clean_empty_dirs', log_calls(lambda path: None))
app.copy_or_move(myfile, False, tmppath + 'dest', 0)
calls = app.clean_empty_dirs.calls
@ -87,9 +86,14 @@ class TCDupeGuru(TestCase):
self.assertEqual(sourcepath, calls[0]['path'])
def test_Scan_with_objects_evaluating_to_false(self):
class FakeFile(fs.File):
def __nonzero__(self):
return False
# At some point, any() was used in a wrong way that made Scan() wrongly return 1
app = DupeGuru()
f1, f2 = [fs.File(None, 'foo') for i in range(2)]
f1, f2 = [FakeFile('foo') for i in range(2)]
f1.is_ref, f2.is_ref = (False, False)
assert not (bool(f1) and bool(f2))
app.directories.get_files = lambda: [f1, f2]

45
base/py/tests/data.py Normal file
View File

@ -0,0 +1,45 @@
# -*- coding: utf-8 -*-
# Created By: Virgil Dupras
# Created On: 2009-10-23
# $Id$
# Copyright 2009 Hardcoded Software (http://www.hardcoded.net)
#
# This software is licensed under the "HS" License as described in the "LICENSE" file,
# which should be included with this package. The terms are also available at
# http://www.hardcoded.net/licenses/hs_license
# data module for tests
from hsutil.str import format_size
from dupeguru.data import format_path, cmp_value
COLUMNS = [
{'attr':'name','display':'Filename'},
{'attr':'path','display':'Directory'},
{'attr':'size','display':'Size (KB)'},
{'attr':'extension','display':'Kind'},
]
METADATA_TO_READ = ['size']
def GetDisplayInfo(dupe, group, delta):
size = dupe.size
m = group.get_match_of(dupe)
if m and delta:
r = group.ref
size -= r.size
return [
dupe.name,
format_path(dupe.path),
format_size(size, 0, 1, False),
dupe.extension,
]
def GetDupeSortKey(dupe, get_group, key, delta):
r = cmp_value(getattr(dupe, COLUMNS[key]['attr']))
if delta and (key == 2):
r -= cmp_value(getattr(get_group().ref, COLUMNS[key]['attr']))
return r
def GetGroupSortKey(group, key):
return cmp_value(getattr(group.ref, COLUMNS[key]['attr']))

View File

@ -10,20 +10,43 @@
import os.path as op
import os
import time
import shutil
from nose.tools import eq_
from hsutil import job, io
from hsutil import io
from hsutil.path import Path
from hsutil.testcase import TestCase
import hsfs.phys
from hsfs.tests import phys_test
from ..directories import *
testpath = Path(TestCase.datadirpath())
def create_fake_fs(rootpath):
rootpath = rootpath + 'fs'
io.mkdir(rootpath)
io.mkdir(rootpath + 'dir1')
io.mkdir(rootpath + 'dir2')
io.mkdir(rootpath + 'dir3')
fp = io.open(rootpath + 'file1.test', 'w')
fp.write('1')
fp.close()
fp = io.open(rootpath + 'file2.test', 'w')
fp.write('12')
fp.close()
fp = io.open(rootpath + 'file3.test', 'w')
fp.write('123')
fp.close()
fp = io.open(rootpath + ('dir1', 'file1.test'), 'w')
fp.write('1')
fp.close()
fp = io.open(rootpath + ('dir2', 'file2.test'), 'w')
fp.write('12')
fp.close()
fp = io.open(rootpath + ('dir3', 'file3.test'), 'w')
fp.write('123')
fp.close()
return rootpath
class TCDirectories(TestCase):
def test_empty(self):
d = Directories()
@ -33,13 +56,11 @@ class TCDirectories(TestCase):
def test_add_path(self):
d = Directories()
p = testpath + 'utils'
added = d.add_path(p)
d.add_path(p)
self.assertEqual(1,len(d))
self.assert_(p in d)
self.assert_((p + 'foobar') in d)
self.assert_(p[:-1] not in d)
self.assertEqual(p,added.path)
self.assert_(d[0] is added)
p = self.tmppath()
d.add_path(p)
self.assertEqual(2,len(d))
@ -53,13 +74,13 @@ class TCDirectories(TestCase):
self.assertRaises(AlreadyThereError, d.add_path, p + 'foobar')
self.assertEqual(1, len(d))
def test_AddPath_containing_paths_already_there(self):
def test_add_path_containing_paths_already_there(self):
d = Directories()
d.add_path(testpath + 'utils')
self.assertEqual(1, len(d))
added = d.add_path(testpath)
self.assertEqual(1, len(d))
self.assert_(added is d[0])
d.add_path(testpath)
eq_(len(d), 1)
eq_(d[0], testpath)
def test_AddPath_non_latin(self):
p = Path(self.tmpdir())
@ -114,7 +135,7 @@ class TCDirectories(TestCase):
def test_set_state_keep_state_dict_size_to_minimum(self):
d = Directories()
p = Path(phys_test.create_fake_fs(self.tmpdir()))
p = create_fake_fs(self.tmppath())
d.add_path(p)
d.set_state(p,STATE_REFERENCE)
d.set_state(p + 'dir1',STATE_REFERENCE)
@ -129,7 +150,7 @@ class TCDirectories(TestCase):
def test_get_files(self):
d = Directories()
p = Path(phys_test.create_fake_fs(self.tmpdir()))
p = create_fake_fs(self.tmppath())
d.add_path(p)
d.set_state(p + 'dir1',STATE_REFERENCE)
d.set_state(p + 'dir2',STATE_EXCLUDED)
@ -177,52 +198,28 @@ class TCDirectories(TestCase):
except LookupError:
self.fail()
def test_default_dirclass(self):
self.assert_(Directories().dirclass is hsfs.phys.Directory)
def test_dirclass(self):
class MySpecialDirclass(hsfs.phys.Directory): pass
d = Directories()
d.dirclass = MySpecialDirclass
d.add_path(testpath)
self.assert_(isinstance(d[0], MySpecialDirclass))
def test_load_from_file_with_invalid_path(self):
#This test simulates a load from file resulting in a
#InvalidPath raise. Other directories must be loaded.
d1 = Directories()
d1.add_path(testpath + 'utils')
#Will raise InvalidPath upon loading
d1.add_path(self.tmppath()).name = 'does_not_exist'
p = self.tmppath()
d1.add_path(p)
io.rmdir(p)
tmpxml = op.join(self.tmpdir(), 'directories_testunit.xml')
d1.save_to_file(tmpxml)
d2 = Directories()
d2.load_from_file(tmpxml)
self.assertEqual(1, len(d2))
def test_load_from_file_with_same_paths(self):
#This test simulates a load from file resulting in a
#AlreadyExists raise. Other directories must be loaded.
d1 = Directories()
p1 = self.tmppath()
p2 = self.tmppath()
d1.add_path(p1)
d1.add_path(p2)
#Will raise AlreadyExists upon loading
d1.add_path(self.tmppath()).name = unicode(p1)
tmpxml = op.join(self.tmpdir(), 'directories_testunit.xml')
d1.save_to_file(tmpxml)
d2 = Directories()
d2.load_from_file(tmpxml)
self.assertEqual(2, len(d2))
def test_unicode_save(self):
d = Directories()
p1 = self.tmppath() + u'hello\xe9'
io.mkdir(p1)
io.mkdir(p1 + u'foo\xe9')
d.add_path(p1)
d.set_state(d[0][0].path, STATE_EXCLUDED)
d.set_state(p1 + u'foo\xe9', STATE_EXCLUDED)
tmpxml = op.join(self.tmpdir(), 'directories_testunit.xml')
try:
d.save_to_file(tmpxml)
@ -231,7 +228,7 @@ class TCDirectories(TestCase):
def test_get_files_refreshes_its_directories(self):
d = Directories()
p = Path(phys_test.create_fake_fs(self.tmpdir()))
p = create_fake_fs(self.tmppath())
d.add_path(p)
files = d.get_files()
self.assertEqual(6, len(list(files)))
@ -258,16 +255,6 @@ class TCDirectories(TestCase):
d.set_state(hidden_dir_path, STATE_NORMAL)
self.assertEqual(d.get_state(hidden_dir_path), STATE_NORMAL)
def test_special_dirclasses(self):
# if a path is in special_dirclasses, use this class instead
class MySpecialDirclass(hsfs.phys.Directory): pass
d = Directories()
p1 = self.tmppath()
p2 = self.tmppath()
d.special_dirclasses[p1] = MySpecialDirclass
self.assert_(isinstance(d.add_path(p2), hsfs.phys.Directory))
self.assert_(isinstance(d.add_path(p1), MySpecialDirclass))
def test_default_path_state_override(self):
# It's possible for a subclass to override the default state of a path
class MyDirectories(Directories):

View File

@ -16,8 +16,8 @@ from hsutil.path import Path
from hsutil.testcase import TestCase
from hsutil.misc import first
from . import engine_test
from .. import data, engine
from . import engine_test, data
from .. import engine
from ..results import *
class NamedObject(engine_test.NamedObject):

View File

@ -132,8 +132,6 @@ def test_content_scan_doesnt_put_md5_in_words_at_the_end():
f[1].md5 = f[1].md5partial = '\x00\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0c\x0d\x0e\x0f'
r = s.GetDupeGroups(f)
g = r[0]
eq_(g.ref.words, ['--'])
eq_(g.dupes[0].words, ['--'])
def test_extension_is_not_counted_in_filename_scan():
s = Scanner()

View File

@ -16,10 +16,10 @@ import os.path as op
from PyQt4.QtCore import Qt, QTimer, QObject, QCoreApplication, QUrl, SIGNAL
from PyQt4.QtGui import QProgressDialog, QDesktopServices, QFileDialog, QDialog, QMessageBox
import hsfs as fs
from hsutil import job
from hsutil.reg import RegistrationRequired
from dupeguru import fs
from dupeguru.app import (DupeGuru as DupeGuruBase, JOB_SCAN, JOB_LOAD, JOB_MOVE, JOB_COPY,
JOB_DELETE)

View File

@ -47,7 +47,14 @@ class DirectoryNode(TreeNode):
return DirectoryNode(self.model, self, ref, row)
def _getChildren(self):
return self.ref.dirs
return self.model._dirs.get_subfolders(self.ref)
@property
def name(self):
if self.parent is not None:
return self.ref[-1]
else:
return unicode(self.ref)
class DirectoriesModel(TreeModel):
@ -70,13 +77,13 @@ class DirectoriesModel(TreeModel):
node = index.internalPointer()
if role == Qt.DisplayRole:
if index.column() == 0:
return node.ref.name
return node.name
else:
return STATES[self._dirs.get_state(node.ref.path)]
return STATES[self._dirs.get_state(node.ref)]
elif role == Qt.EditRole and index.column() == 1:
return self._dirs.get_state(node.ref.path)
return self._dirs.get_state(node.ref)
elif role == Qt.ForegroundRole:
state = self._dirs.get_state(node.ref.path)
state = self._dirs.get_state(node.ref)
if state == 1:
return QBrush(Qt.blue)
elif state == 2:
@ -101,6 +108,6 @@ class DirectoriesModel(TreeModel):
if not index.isValid() or role != Qt.EditRole or index.column() != 1:
return False
node = index.internalPointer()
self._dirs.set_state(node.ref.path, value)
self._dirs.set_state(node.ref, value)
return True

View File

@ -11,12 +11,11 @@ import logging
from AppKit import *
from hsfs.phys import Directory as DirectoryBase
from hsfs.phys.bundle import Bundle
from hsutil import io
from hsutil.path import Path
from hsutil.misc import extract
from hsutil.str import get_file_ext
from dupeguru import fs
from dupeguru.app_cocoa import DupeGuru as DupeGuruBase
from dupeguru.directories import Directories as DirectoriesBase, STATE_EXCLUDED
from . import data
@ -32,27 +31,17 @@ else: # Tiger
def is_bundle(str_path): # just return a list of a few known bundle extensions.
return get_file_ext(str_path) in ('app', 'pages', 'numbers')
class DGDirectory(DirectoryBase):
def _create_sub_file(self, name, with_parent=True):
if is_bundle(unicode(self.path + name)):
parent = self if with_parent else None
return Bundle(parent, name)
else:
return super(DGDirectory, self)._create_sub_file(name, with_parent)
def _fetch_subitems(self):
subdirs, subfiles = super(DGDirectory, self)._fetch_subitems()
apps, normal_dirs = extract(lambda name: is_bundle(unicode(self.path + name)), subdirs)
subfiles += apps
return normal_dirs, subfiles
class Bundle(BundleBase):
@classmethod
def can_handle(cls, path):
return not io.islink(path) and io.isdir(path) and is_bundle(unicode(path))
class Directories(DirectoriesBase):
ROOT_PATH_TO_EXCLUDE = map(Path, ['/Library', '/Volumes', '/System', '/bin', '/sbin', '/opt', '/private', '/dev'])
HOME_PATH_TO_EXCLUDE = [Path('Library')]
def __init__(self):
DirectoriesBase.__init__(self)
self.dirclass = DGDirectory
DirectoriesBase.__init__(self, fileclasses=[Bundle, fs.File])
def _default_state_for_path(self, path):
result = DirectoriesBase._default_state_for_path(self, path)

43
se/py/fs.py Normal file
View File

@ -0,0 +1,43 @@
# -*- coding: utf-8 -*-
# Created By: Virgil Dupras
# Created On: 2009-10-23
# $Id$
# Copyright 2009 Hardcoded Software (http://www.hardcoded.net)
#
# This software is licensed under the "HS" License as described in the "LICENSE" file,
# which should be included with this package. The terms are also available at
# http://www.hardcoded.net/licenses/hs_license
import hashlib
from hsutil import io
from hsutil.misc import nonone
from dupeguru import fs
class Bundle(fs.File):
"""This class is for Mac OSX bundles (.app). Bundles are seen by the OS as
normal directories, but I don't want that in dupeGuru. I want dupeGuru
to see them as files.
"""
def _read_info(self, field):
if field in ('size', 'ctime', 'mtime'):
files = fs.get_all_files(self.path)
size = sum((file.size for file in files), 0)
self.size = size
stats = io.stat(self.path)
self.ctime = nonone(stats.st_ctime, 0)
self.mtime = nonone(stats.st_mtime, 0)
elif field in ('md5', 'md5partial'):
# What's sensitive here is that we must make sure that subfiles'
# md5 are always added up in the same order, but we also want a
# different md5 if a file gets moved in a different subdirectory.
def get_dir_md5_concat():
files = fs.get_all_files(self.path)
files.sort(key=lambda f:f.path)
md5s = [getattr(f, field) for f in files]
return ''.join(md5s)
md5 = hashlib.md5(get_dir_md5_concat())
digest = md5.digest()
setattr(self, field, digest)

0
se/py/tests/__init__.py Normal file
View File

48
se/py/tests/fs_test.py Normal file
View File

@ -0,0 +1,48 @@
# -*- coding: utf-8 -*-
# Created By: Virgil Dupras
# Created On: 2009-10-23
# $Id$
# Copyright 2009 Hardcoded Software (http://www.hardcoded.net)
#
# This software is licensed under the "HS" License as described in the "LICENSE" file,
# which should be included with this package. The terms are also available at
# http://www.hardcoded.net/licenses/hs_license
import hashlib
from nose.tools import eq_
from hsutil.testcase import TestCase
from dupeguru.fs import File
from dupeguru.tests.directories_test import create_fake_fs
from .. import fs
class TCBundle(TestCase):
def test_size_aggregates_subfiles(self):
p = create_fake_fs(self.tmppath())
b = fs.Bundle(p)
eq_(b.size, 12)
def test_md5_aggregate_subfiles_sorted(self):
#dir.allfiles can return child in any order. Thus, bundle.md5 must aggregate
#all files' md5 it contains, but it must make sure that it does so in the
#same order everytime.
p = create_fake_fs(self.tmppath())
b = fs.Bundle(p)
md5s = File(p + ('dir1', 'file1.test')).md5
md5s += File(p + ('dir2', 'file2.test')).md5
md5s += File(p + ('dir3', 'file3.test')).md5
md5s += File(p + 'file1.test').md5
md5s += File(p + 'file2.test').md5
md5s += File(p + 'file3.test').md5
md5 = hashlib.md5(md5s)
eq_(b.md5, md5.digest())
def test_has_file_attrs(self):
#a Bundle must behave like a file, so it must have ctime and mtime attributes
b = fs.Bundle(self.tmppath())
assert b.mtime > 0
assert b.ctime > 0
eq_(b.extension, '')