From d0785540527fb2fd27c4b93e02a7e39605cb64b9 Mon Sep 17 00:00:00 2001 From: Andrew Senetar Date: Wed, 3 Jun 2020 11:49:41 -0500 Subject: [PATCH] Windows Performance Improvement & Multi-Item support (#43) * Initial IFileOperation for Windows - Try using IFileOperation instead of SHFileOperation - Use pywin32 to accomplish this - Implement fallback when pywin32 not available - Handles paths like `C:\` just fine bu the `\\?\` paths in the test cause issue - Add batching for IFileOperation version (performance) - Minor formatting applied by editor * Fix issue with paths starting with \\?\ - Strip these characters off if present just like old implementation * Add windows version check, legacy list support - Add check for windows version for IFileOperation - Add list support to legacy version - Remove some debugging code - Fix bug in path converson Not sure if there is a better way to layout this file * Split plat_win into legacy and modern * Update other platforms for list support Formatter also ran on these so some other minor changes. * Add unit tests for multi-file calls --- send2trash/plat_gio.py | 24 +++--- send2trash/plat_osx.py | 36 +++++---- send2trash/plat_other.py | 116 +++++++++++++++++------------ send2trash/plat_win.py | 96 +++--------------------- send2trash/plat_win_legacy.py | 105 ++++++++++++++++++++++++++ send2trash/plat_win_modern.py | 65 +++++++++++++++++ tests/test_plat_other.py | 134 +++++++++++++++++++++++++++------- tests/test_plat_win.py | 46 ++++++++---- 8 files changed, 424 insertions(+), 198 deletions(-) create mode 100644 send2trash/plat_win_legacy.py create mode 100644 send2trash/plat_win_modern.py diff --git a/send2trash/plat_gio.py b/send2trash/plat_gio.py index 6184d80..9809fe7 100644 --- a/send2trash/plat_gio.py +++ b/send2trash/plat_gio.py @@ -7,13 +7,17 @@ from gi.repository import GObject, Gio from .exceptions import TrashPermissionError -def send2trash(path): - try: - f = Gio.File.new_for_path(path) - f.trash(cancellable=None) - except GObject.GError as e: - if e.code == Gio.IOErrorEnum.NOT_SUPPORTED: - # We get here if we can't create a trash directory on the same - # device. I don't know if other errors can result in NOT_SUPPORTED. - raise TrashPermissionError('') - raise OSError(e.message) + +def send2trash(paths): + if not isinstance(paths, list): + paths = [paths] + for path in paths: + try: + f = Gio.File.new_for_path(path) + f.trash(cancellable=None) + except GObject.GError as e: + if e.code == Gio.IOErrorEnum.NOT_SUPPORTED: + # We get here if we can't create a trash directory on the same + # device. I don't know if other errors can result in NOT_SUPPORTED. + raise TrashPermissionError("") + raise OSError(e.message) diff --git a/send2trash/plat_osx.py b/send2trash/plat_osx.py index 4ac830e..ac6634c 100644 --- a/send2trash/plat_osx.py +++ b/send2trash/plat_osx.py @@ -11,8 +11,8 @@ from ctypes.util import find_library from .compat import binary_type -Foundation = cdll.LoadLibrary(find_library('Foundation')) -CoreServices = cdll.LoadLibrary(find_library('CoreServices')) +Foundation = cdll.LoadLibrary(find_library("Foundation")) +CoreServices = cdll.LoadLibrary(find_library("CoreServices")) GetMacOSStatusCommentString = Foundation.GetMacOSStatusCommentString GetMacOSStatusCommentString.restype = c_char_p @@ -28,21 +28,29 @@ kFSFileOperationSkipSourcePermissionErrors = 0x02 kFSFileOperationDoNotMoveAcrossVolumes = 0x04 kFSFileOperationSkipPreflight = 0x08 + class FSRef(Structure): - _fields_ = [('hidden', c_char * 80)] + _fields_ = [("hidden", c_char * 80)] + def check_op_result(op_result): if op_result: - msg = GetMacOSStatusCommentString(op_result).decode('utf-8') + msg = GetMacOSStatusCommentString(op_result).decode("utf-8") raise OSError(msg) -def send2trash(path): - if not isinstance(path, binary_type): - path = path.encode('utf-8') - fp = FSRef() - opts = kFSPathMakeRefDoNotFollowLeafSymlink - op_result = FSPathMakeRefWithOptions(path, opts, byref(fp), None) - check_op_result(op_result) - opts = kFSFileOperationDefaultOptions - op_result = FSMoveObjectToTrashSync(byref(fp), None, opts) - check_op_result(op_result) + +def send2trash(paths): + if not isinstance(paths, list): + paths = [paths] + paths = [ + path.encode("utf-8") if not isinstance(path, binary_type) else path + for path in paths + ] + for path in paths: + fp = FSRef() + opts = kFSPathMakeRefDoNotFollowLeafSymlink + op_result = FSPathMakeRefWithOptions(path, opts, byref(fp), None) + check_op_result(op_result) + opts = kFSFileOperationDefaultOptions + op_result = FSMoveObjectToTrashSync(byref(fp), None, opts) + check_op_result(op_result) diff --git a/send2trash/plat_other.py b/send2trash/plat_other.py index 624eb99..dd45606 100644 --- a/send2trash/plat_other.py +++ b/send2trash/plat_other.py @@ -22,6 +22,7 @@ import os import os.path as op from datetime import datetime import stat + try: from urllib.parse import quote except ImportError: @@ -32,31 +33,35 @@ from .compat import text_type, environb from .exceptions import TrashPermissionError try: - fsencode = os.fsencode # Python 3 + fsencode = os.fsencode # Python 3 fsdecode = os.fsdecode except AttributeError: - def fsencode(u): # Python 2 + + def fsencode(u): # Python 2 return u.encode(sys.getfilesystemencoding()) + def fsdecode(b): return b.decode(sys.getfilesystemencoding()) + # The Python 3 versions are a bit smarter, handling surrogate escapes, # but these should work in most cases. -FILES_DIR = b'files' -INFO_DIR = b'info' -INFO_SUFFIX = b'.trashinfo' +FILES_DIR = b"files" +INFO_DIR = b"info" +INFO_SUFFIX = b".trashinfo" # Default of ~/.local/share [3] -XDG_DATA_HOME = op.expanduser(environb.get(b'XDG_DATA_HOME', b'~/.local/share')) -HOMETRASH_B = op.join(XDG_DATA_HOME, b'Trash') +XDG_DATA_HOME = op.expanduser(environb.get(b"XDG_DATA_HOME", b"~/.local/share")) +HOMETRASH_B = op.join(XDG_DATA_HOME, b"Trash") HOMETRASH = fsdecode(HOMETRASH_B) uid = os.getuid() -TOPDIR_TRASH = b'.Trash' -TOPDIR_FALLBACK = b'.Trash-' + text_type(uid).encode('ascii') +TOPDIR_TRASH = b".Trash" +TOPDIR_FALLBACK = b".Trash-" + text_type(uid).encode("ascii") + def is_parent(parent, path): - path = op.realpath(path) # In case it's a symlink + path = op.realpath(path) # In case it's a symlink if isinstance(path, text_type): path = fsencode(path) parent = op.realpath(parent) @@ -64,9 +69,11 @@ def is_parent(parent, path): parent = fsencode(parent) return path.startswith(parent) + def format_date(date): return date.strftime("%Y-%m-%dT%H:%M:%S") + def info_for(src, topdir): # ...it MUST not include a ".." directory, and for files not "under" that # directory, absolute pathnames must be used. [2] @@ -75,16 +82,18 @@ def info_for(src, topdir): else: src = op.relpath(src, topdir) - info = "[Trash Info]\n" + info = "[Trash Info]\n" info += "Path=" + quote(src) + "\n" info += "DeletionDate=" + format_date(datetime.now()) + "\n" return info + def check_create(dir): # use 0700 for paths [3] if not op.exists(dir): os.makedirs(dir, 0o700) + def trash_move(src, dst, topdir=None): filename = op.basename(src) filespath = op.join(dst, FILES_DIR) @@ -93,26 +102,30 @@ def trash_move(src, dst, topdir=None): counter = 0 destname = filename - while op.exists(op.join(filespath, destname)) or op.exists(op.join(infopath, destname + INFO_SUFFIX)): + while op.exists(op.join(filespath, destname)) or op.exists( + op.join(infopath, destname + INFO_SUFFIX) + ): counter += 1 - destname = base_name + b' ' + text_type(counter).encode('ascii') + ext + destname = base_name + b" " + text_type(counter).encode("ascii") + ext check_create(filespath) check_create(infopath) os.rename(src, op.join(filespath, destname)) - f = open(op.join(infopath, destname + INFO_SUFFIX), 'w') + f = open(op.join(infopath, destname + INFO_SUFFIX), "w") f.write(info_for(src, topdir)) f.close() + def find_mount_point(path): # Even if something's wrong, "/" is a mount point, so the loop will exit. # Use realpath in case it's a symlink - path = op.realpath(path) # Required to avoid infinite loop + path = op.realpath(path) # Required to avoid infinite loop while not op.ismount(path): path = op.split(path)[0] return path + def find_ext_volume_global_trash(volume_root): # from [2] Trash directories (1) check for a .Trash dir with the right # permissions set. @@ -126,13 +139,14 @@ def find_ext_volume_global_trash(volume_root): if not op.isdir(trash_dir) or op.islink(trash_dir) or not (mode & stat.S_ISVTX): return None - trash_dir = op.join(trash_dir, text_type(uid).encode('ascii')) + trash_dir = op.join(trash_dir, text_type(uid).encode("ascii")) try: check_create(trash_dir) except OSError: return None return trash_dir + def find_ext_volume_fallback_trash(volume_root): # from [2] Trash directories (1) create a .Trash-$uid dir. trash_dir = op.join(volume_root, TOPDIR_FALLBACK) @@ -145,48 +159,54 @@ def find_ext_volume_fallback_trash(volume_root): raise return trash_dir + def find_ext_volume_trash(volume_root): trash_dir = find_ext_volume_global_trash(volume_root) if trash_dir is None: trash_dir = find_ext_volume_fallback_trash(volume_root) return trash_dir + # Pull this out so it's easy to stub (to avoid stubbing lstat itself) def get_dev(path): return os.lstat(path).st_dev -def send2trash(path): - if isinstance(path, text_type): - path_b = fsencode(path) - elif isinstance(path, bytes): - path_b = path - elif hasattr(path, '__fspath__'): - # Python 3.6 PathLike protocol - return send2trash(path.__fspath__()) - else: - raise TypeError('str, bytes or PathLike expected, not %r' % type(path)) - if not op.exists(path_b): - raise OSError("File not found: %s" % path) - # ...should check whether the user has the necessary permissions to delete - # it, before starting the trashing operation itself. [2] - if not os.access(path_b, os.W_OK): - raise OSError("Permission denied: %s" % path) - # if the file to be trashed is on the same device as HOMETRASH we - # want to move it there. - path_dev = get_dev(path_b) +def send2trash(paths): + if not isinstance(paths, list): + paths = [paths] + for path in paths: + if isinstance(path, text_type): + path_b = fsencode(path) + elif isinstance(path, bytes): + path_b = path + elif hasattr(path, "__fspath__"): + # Python 3.6 PathLike protocol + return send2trash(path.__fspath__()) + else: + raise TypeError("str, bytes or PathLike expected, not %r" % type(path)) - # If XDG_DATA_HOME or HOMETRASH do not yet exist we need to stat the - # home directory, and these paths will be created further on if needed. - trash_dev = get_dev(op.expanduser(b'~')) + if not op.exists(path_b): + raise OSError("File not found: %s" % path) + # ...should check whether the user has the necessary permissions to delete + # it, before starting the trashing operation itself. [2] + if not os.access(path_b, os.W_OK): + raise OSError("Permission denied: %s" % path) + # if the file to be trashed is on the same device as HOMETRASH we + # want to move it there. + path_dev = get_dev(path_b) - if path_dev == trash_dev: - topdir = XDG_DATA_HOME - dest_trash = HOMETRASH_B - else: - topdir = find_mount_point(path_b) - trash_dev = get_dev(topdir) - if trash_dev != path_dev: - raise OSError("Couldn't find mount point for %s" % path) - dest_trash = find_ext_volume_trash(topdir) - trash_move(path_b, dest_trash, topdir) + # If XDG_DATA_HOME or HOMETRASH do not yet exist we need to stat the + # home directory, and these paths will be created further on if needed. + trash_dev = get_dev(op.expanduser(b"~")) + + if path_dev == trash_dev: + topdir = XDG_DATA_HOME + dest_trash = HOMETRASH_B + else: + topdir = find_mount_point(path_b) + trash_dev = get_dev(topdir) + if trash_dev != path_dev: + raise OSError("Couldn't find mount point for %s" % path) + dest_trash = find_ext_volume_trash(topdir) + trash_move(path_b, dest_trash, topdir) diff --git a/send2trash/plat_win.py b/send2trash/plat_win.py index 372fba7..a39833c 100644 --- a/send2trash/plat_win.py +++ b/send2trash/plat_win.py @@ -5,88 +5,16 @@ # http://www.hardcoded.net/licenses/bsd_license from __future__ import unicode_literals +from platform import version -from ctypes import (windll, Structure, byref, c_uint, - create_unicode_buffer, addressof, - GetLastError, FormatError) -from ctypes.wintypes import HWND, UINT, LPCWSTR, BOOL -import os.path as op - -from .compat import text_type - -kernel32 = windll.kernel32 -GetShortPathNameW = kernel32.GetShortPathNameW - -shell32 = windll.shell32 -SHFileOperationW = shell32.SHFileOperationW - - -class SHFILEOPSTRUCTW(Structure): - _fields_ = [ - ("hwnd", HWND), - ("wFunc", UINT), - ("pFrom", LPCWSTR), - ("pTo", LPCWSTR), - ("fFlags", c_uint), - ("fAnyOperationsAborted", BOOL), - ("hNameMappings", c_uint), - ("lpszProgressTitle", LPCWSTR), - ] - - -FO_MOVE = 1 -FO_COPY = 2 -FO_DELETE = 3 -FO_RENAME = 4 - -FOF_MULTIDESTFILES = 1 -FOF_SILENT = 4 -FOF_NOCONFIRMATION = 16 -FOF_ALLOWUNDO = 64 -FOF_NOERRORUI = 1024 - - -def get_short_path_name(long_name): - if not long_name.startswith('\\\\?\\'): - long_name = '\\\\?\\' + long_name - buf_size = GetShortPathNameW(long_name, None, 0) - # FIX: https://github.com/hsoft/send2trash/issues/31 - # If buffer size is zero, an error has occurred. - if not buf_size: - err_no = GetLastError() - raise WindowsError(err_no, FormatError(err_no), long_name[4:]) - output = create_unicode_buffer(buf_size) - GetShortPathNameW(long_name, output, buf_size) - return output.value[4:] # Remove '\\?\' for SHFileOperationW - - -def send2trash(path): - if not isinstance(path, text_type): - path = text_type(path, 'mbcs') - if not op.isabs(path): - path = op.abspath(path) - path = get_short_path_name(path) - fileop = SHFILEOPSTRUCTW() - fileop.hwnd = 0 - fileop.wFunc = FO_DELETE - # FIX: https://github.com/hsoft/send2trash/issues/17 - # Starting in python 3.6.3 it is no longer possible to use: - # LPCWSTR(path + '\0') directly as embedded null characters are no longer - # allowed in strings - # Workaround - # - create buffer of c_wchar[] (LPCWSTR is based on this type) - # - buffer is two c_wchar characters longer (double null terminator) - # - cast the address of the buffer to a LPCWSTR - # NOTE: based on how python allocates memory for these types they should - # always be zero, if this is ever not true we can go back to explicitly - # setting the last two characters to null using buffer[index] = '\0'. - buffer = create_unicode_buffer(path, len(path)+2) - fileop.pFrom = LPCWSTR(addressof(buffer)) - fileop.pTo = None - fileop.fFlags = FOF_ALLOWUNDO | FOF_NOCONFIRMATION | FOF_NOERRORUI | FOF_SILENT - fileop.fAnyOperationsAborted = 0 - fileop.hNameMappings = 0 - fileop.lpszProgressTitle = None - result = SHFileOperationW(byref(fileop)) - if result: - raise WindowsError(result, FormatError(result), path) +# if windows is vista or newer and pywin32 is available use IFileOperation +if int(version().split(".", 1)[0]) >= 6: + try: + # Attempt to use pywin32 to use IFileOperation + from .plat_win_modern import send2trash + except ImportError: + # use SHFileOperation as fallback + from .plat_win_legacy import send2trash +else: + # use SHFileOperation as fallback + from .plat_win_legacy import send2trash diff --git a/send2trash/plat_win_legacy.py b/send2trash/plat_win_legacy.py new file mode 100644 index 0000000..e3c404b --- /dev/null +++ b/send2trash/plat_win_legacy.py @@ -0,0 +1,105 @@ +# Copyright 2017 Virgil Dupras + +# This software is licensed under the "BSD" License as described in the "LICENSE" file, +# which should be included with this package. The terms are also available at +# http://www.hardcoded.net/licenses/bsd_license + +from __future__ import unicode_literals +import os.path as op +from .compat import text_type +from ctypes import ( + windll, + Structure, + byref, + c_uint, + create_unicode_buffer, + addressof, + GetLastError, + FormatError, +) +from ctypes.wintypes import HWND, UINT, LPCWSTR, BOOL + +kernel32 = windll.kernel32 +GetShortPathNameW = kernel32.GetShortPathNameW + +shell32 = windll.shell32 +SHFileOperationW = shell32.SHFileOperationW + + +class SHFILEOPSTRUCTW(Structure): + _fields_ = [ + ("hwnd", HWND), + ("wFunc", UINT), + ("pFrom", LPCWSTR), + ("pTo", LPCWSTR), + ("fFlags", c_uint), + ("fAnyOperationsAborted", BOOL), + ("hNameMappings", c_uint), + ("lpszProgressTitle", LPCWSTR), + ] + + +FO_MOVE = 1 +FO_COPY = 2 +FO_DELETE = 3 +FO_RENAME = 4 + +FOF_MULTIDESTFILES = 1 +FOF_SILENT = 4 +FOF_NOCONFIRMATION = 16 +FOF_ALLOWUNDO = 64 +FOF_NOERRORUI = 1024 + + +def get_short_path_name(long_name): + if not long_name.startswith("\\\\?\\"): + long_name = "\\\\?\\" + long_name + buf_size = GetShortPathNameW(long_name, None, 0) + # FIX: https://github.com/hsoft/send2trash/issues/31 + # If buffer size is zero, an error has occurred. + if not buf_size: + err_no = GetLastError() + raise WindowsError(err_no, FormatError(err_no), long_name[4:]) + output = create_unicode_buffer(buf_size) + GetShortPathNameW(long_name, output, buf_size) + return output.value[4:] # Remove '\\?\' for SHFileOperationW + + +def send2trash(paths): + if not isinstance(paths, list): + paths = [paths] + # convert data type + paths = [ + text_type(path, "mbcs") if not isinstance(path, text_type) else path + for path in paths + ] + # convert to full paths + paths = [op.abspath(path) if not op.isabs(path) else path for path in paths] + # get short path to handle path length issues + paths = [get_short_path_name(path) for path in paths] + # convert to a single string of null terminated paths + paths = "\0".join(paths) + fileop = SHFILEOPSTRUCTW() + fileop.hwnd = 0 + fileop.wFunc = FO_DELETE + # FIX: https://github.com/hsoft/send2trash/issues/17 + # Starting in python 3.6.3 it is no longer possible to use: + # LPCWSTR(path + '\0') directly as embedded null characters are no longer + # allowed in strings + # Workaround + # - create buffer of c_wchar[] (LPCWSTR is based on this type) + # - buffer is two c_wchar characters longer (double null terminator) + # - cast the address of the buffer to a LPCWSTR + # NOTE: based on how python allocates memory for these types they should + # always be zero, if this is ever not true we can go back to explicitly + # setting the last two characters to null using buffer[index] = '\0'. + buffer = create_unicode_buffer(paths, len(paths) + 2) + fileop.pFrom = LPCWSTR(addressof(buffer)) + fileop.pTo = None + fileop.fFlags = FOF_ALLOWUNDO | FOF_NOCONFIRMATION | FOF_NOERRORUI | FOF_SILENT + fileop.fAnyOperationsAborted = 0 + fileop.hNameMappings = 0 + fileop.lpszProgressTitle = None + result = SHFileOperationW(byref(fileop)) + if result: + raise WindowsError(result, FormatError(result), paths) diff --git a/send2trash/plat_win_modern.py b/send2trash/plat_win_modern.py new file mode 100644 index 0000000..54914f8 --- /dev/null +++ b/send2trash/plat_win_modern.py @@ -0,0 +1,65 @@ +# Copyright 2017 Virgil Dupras + +# This software is licensed under the "BSD" License as described in the "LICENSE" file, +# which should be included with this package. The terms are also available at +# http://www.hardcoded.net/licenses/bsd_license + +from __future__ import unicode_literals +import os.path as op +from .compat import text_type +from platform import version +import pythoncom +import pywintypes +from win32com.shell import shell, shellcon + + +def send2trash(paths): + if not isinstance(paths, list): + paths = [paths] + # convert data type + paths = [ + text_type(path, "mbcs") if not isinstance(path, text_type) else path + for path in paths + ] + # convert to full paths + paths = [op.abspath(path) if not op.isabs(path) else path for path in paths] + # remove the leading \\?\ if present + paths = [path[4:] if path.startswith("\\\\?\\") else path for path in paths] + # create instance of file operation object + fileop = pythoncom.CoCreateInstance( + shell.CLSID_FileOperation, None, pythoncom.CLSCTX_ALL, shell.IID_IFileOperation, + ) + # default flags to use + flags = ( + shellcon.FOF_NOCONFIRMATION + | shellcon.FOF_NOERRORUI + | shellcon.FOF_SILENT + | shellcon.FOFX_EARLYFAILURE + ) + # determine rest of the flags based on OS version + # use newer recommended flags if available + if int(version().split(".", 1)[0]) >= 8: + flags |= ( + 0x20000000 # FOFX_ADDUNDORECORD win 8+ + | 0x00080000 # FOFX_RECYCLEONDELETE win 8+ + ) + else: + flags |= shellcon.FOF_ALLOWUNDO + # set the flags + fileop.SetOperationFlags(flags) + # actually try to perform the operation, this section may throw a + # pywintypes.com_error which does not seem to create as nice of an + # error as OSError so wrapping with try to convert + try: + for path in paths: + item = shell.SHCreateItemFromParsingName(path, None, shell.IID_IShellItem) + fileop.DeleteItem(item) + result = fileop.PerformOperations() + aborted = fileop.GetAnyOperationsAborted() + # if non-zero result or aborted throw an exception + if result or aborted: + raise OSError(None, None, paths, result) + except pywintypes.com_error as error: + # convert to standard OS error, allows other code to get a + # normal errno + raise OSError(None, error.strerror, path, error.hresult) diff --git a/tests/test_plat_other.py b/tests/test_plat_other.py index ae4f391..32ff306 100644 --- a/tests/test_plat_other.py +++ b/tests/test_plat_other.py @@ -6,6 +6,7 @@ from os import path as op import send2trash.plat_other from send2trash.plat_other import send2trash as s2t from send2trash.compat import PY3 + try: from configparser import ConfigParser except ImportError: @@ -15,18 +16,22 @@ from tempfile import mkdtemp, NamedTemporaryFile, mktemp import shutil import stat import sys + # Could still use cleaning up. But no longer relies on ramfs. HOMETRASH = send2trash.plat_other.HOMETRASH + def touch(path): - with open(path, 'a'): + with open(path, "a"): os.utime(path, None) + class TestHomeTrash(unittest.TestCase): def setUp(self): self.file = NamedTemporaryFile( - dir=op.expanduser("~"), prefix='send2trash_test', delete=False) + dir=op.expanduser("~"), prefix="send2trash_test", delete=False + ) def test_trash(self): s2t(self.file.name) @@ -34,8 +39,35 @@ class TestHomeTrash(unittest.TestCase): def tearDown(self): name = op.basename(self.file.name) - os.remove(op.join(HOMETRASH, 'files', name)) - os.remove(op.join(HOMETRASH, 'info', name+'.trashinfo')) + os.remove(op.join(HOMETRASH, "files", name)) + os.remove(op.join(HOMETRASH, "info", name + ".trashinfo")) + + +class TestHomeMultiTrash(unittest.TestCase): + def setUp(self): + self.files = list( + map( + lambda index: NamedTemporaryFile( + dir=op.expanduser("~"), + prefix="send2trash_test{}".format(index), + delete=False, + ), + range(10), + ) + ) + + def test_multitrash(self): + filenames = [file.name for file in self.files] + s2t(filenames) + self.assertFalse(any([op.exists(filename) for filename in filenames])) + + def tearDown(self): + filenames = [op.basename(file.name) for file in self.files] + [os.remove(op.join(HOMETRASH, "files", filename)) for filename in filenames] + [ + os.remove(op.join(HOMETRASH, "info", filename + ".trashinfo")) + for filename in filenames + ] def _filesys_enc(): @@ -43,11 +75,12 @@ def _filesys_enc(): # Get canonical name of codec return codecs.lookup(enc).name -@unittest.skipIf(_filesys_enc() == 'ascii', 'ASCII filesystem') + +@unittest.skipIf(_filesys_enc() == "ascii", "ASCII filesystem") class TestUnicodeTrash(unittest.TestCase): def setUp(self): - self.name = u'send2trash_tést1' - self.file = op.join(op.expanduser(b'~'), self.name.encode('utf-8')) + self.name = u"send2trash_tést1" + self.file = op.join(op.expanduser(b"~"), self.name.encode("utf-8")) touch(self.file) def test_trash_bytes(self): @@ -62,10 +95,11 @@ class TestUnicodeTrash(unittest.TestCase): if op.exists(self.file): os.remove(self.file) - trash_file = op.join(HOMETRASH, 'files', self.name) + trash_file = op.join(HOMETRASH, "files", self.name) if op.exists(trash_file): os.remove(trash_file) - os.remove(op.join(HOMETRASH, 'info', self.name+'.trashinfo')) + os.remove(op.join(HOMETRASH, "info", self.name + ".trashinfo")) + # # Tests for files on some other volume than the user's home directory. @@ -76,25 +110,31 @@ class TestUnicodeTrash(unittest.TestCase): # class TestExtVol(unittest.TestCase): def setUp(self): - self.trashTopdir = mkdtemp(prefix='s2t') + self.trashTopdir = mkdtemp(prefix="s2t") if PY3: trashTopdir_b = os.fsencode(self.trashTopdir) else: trashTopdir_b = self.trashTopdir - self.fileName = 'test.txt' + self.fileName = "test.txt" self.filePath = op.join(self.trashTopdir, self.fileName) touch(self.filePath) self.old_ismount = old_ismount = op.ismount self.old_getdev = send2trash.plat_other.get_dev + def s_getdev(path): from send2trash.plat_other import is_parent + st = os.lstat(path) if is_parent(self.trashTopdir, path): - return 'dev' + return "dev" return st.st_dev + def s_ismount(path): - if op.realpath(path) in (op.realpath(self.trashTopdir), op.realpath(trashTopdir_b)): + if op.realpath(path) in ( + op.realpath(self.trashTopdir), + op.realpath(trashTopdir_b), + ): return True return old_ismount(path) @@ -106,23 +146,40 @@ class TestExtVol(unittest.TestCase): send2trash.plat_other.os.path.ismount = self.old_ismount shutil.rmtree(self.trashTopdir) + class TestTopdirTrash(TestExtVol): def setUp(self): TestExtVol.setUp(self) # Create a .Trash dir w/ a sticky bit - self.trashDir = op.join(self.trashTopdir, '.Trash') - os.mkdir(self.trashDir, 0o777|stat.S_ISVTX) + self.trashDir = op.join(self.trashTopdir, ".Trash") + os.mkdir(self.trashDir, 0o777 | stat.S_ISVTX) def test_trash(self): s2t(self.filePath) self.assertFalse(op.exists(self.filePath)) - self.assertTrue(op.exists(op.join(self.trashDir, str(os.getuid()), 'files', self.fileName))) - self.assertTrue(op.exists(op.join(self.trashDir, str(os.getuid()), 'info', self.fileName + '.trashinfo'))) + self.assertTrue( + op.exists(op.join(self.trashDir, str(os.getuid()), "files", self.fileName)) + ) + self.assertTrue( + op.exists( + op.join( + self.trashDir, + str(os.getuid()), + "info", + self.fileName + ".trashinfo", + ) + ) + ) # info relative path (if another test is added, with the same fileName/Path, # then it gets renamed etc.) cfg = ConfigParser() - cfg.read(op.join(self.trashDir, str(os.getuid()), 'info', self.fileName + '.trashinfo')) - self.assertEqual(self.fileName, cfg.get('Trash Info', 'Path', raw=True)) + cfg.read( + op.join( + self.trashDir, str(os.getuid()), "info", self.fileName + ".trashinfo" + ) + ) + self.assertEqual(self.fileName, cfg.get("Trash Info", "Path", raw=True)) + # Test .Trash-UID class TestTopdirTrashFallback(TestExtVol): @@ -130,13 +187,23 @@ class TestTopdirTrashFallback(TestExtVol): touch(self.filePath) s2t(self.filePath) self.assertFalse(op.exists(self.filePath)) - self.assertTrue(op.exists(op.join(self.trashTopdir, '.Trash-' + str(os.getuid()), 'files', self.fileName))) + self.assertTrue( + op.exists( + op.join( + self.trashTopdir, + ".Trash-" + str(os.getuid()), + "files", + self.fileName, + ) + ) + ) + # Test failure class TestTopdirFailure(TestExtVol): def setUp(self): TestExtVol.setUp(self) - os.chmod(self.trashTopdir, 0o500) # not writable to induce the exception + os.chmod(self.trashTopdir, 0o500) # not writable to induce the exception def test_trash(self): with self.assertRaises(OSError): @@ -144,9 +211,10 @@ class TestTopdirFailure(TestExtVol): self.assertTrue(op.exists(self.filePath)) def tearDown(self): - os.chmod(self.trashTopdir, 0o700) # writable to allow deletion + os.chmod(self.trashTopdir, 0o700) # writable to allow deletion TestExtVol.tearDown(self) + # Make sure it will find the mount point properly for a file in a symlinked path class TestSymlink(TestExtVol): def setUp(self): @@ -154,21 +222,31 @@ class TestSymlink(TestExtVol): # Use mktemp (race conditioney but no symlink equivalent) # Since is_parent uses realpath(), and our getdev uses is_parent, # this should work - self.slDir = mktemp(prefix='s2t', dir=op.expanduser('~')) + self.slDir = mktemp(prefix="s2t", dir=op.expanduser("~")) - os.mkdir(op.join(self.trashTopdir, 'subdir'), 0o700) - self.filePath = op.join(self.trashTopdir, 'subdir', self.fileName) + os.mkdir(op.join(self.trashTopdir, "subdir"), 0o700) + self.filePath = op.join(self.trashTopdir, "subdir", self.fileName) touch(self.filePath) - os.symlink(op.join(self.trashTopdir, 'subdir'), self.slDir) + os.symlink(op.join(self.trashTopdir, "subdir"), self.slDir) def test_trash(self): s2t(op.join(self.slDir, self.fileName)) self.assertFalse(op.exists(self.filePath)) - self.assertTrue(op.exists(op.join(self.trashTopdir, '.Trash-' + str(os.getuid()), 'files', self.fileName))) + self.assertTrue( + op.exists( + op.join( + self.trashTopdir, + ".Trash-" + str(os.getuid()), + "files", + self.fileName, + ) + ) + ) def tearDown(self): os.remove(self.slDir) TestExtVol.tearDown(self) -if __name__ == '__main__': + +if __name__ == "__main__": unittest.main() diff --git a/tests/test_plat_win.py b/tests/test_plat_win.py index 326e39c..17d8a7d 100644 --- a/tests/test_plat_win.py +++ b/tests/test_plat_win.py @@ -9,12 +9,16 @@ from tempfile import gettempdir from send2trash import send2trash as s2t -@unittest.skipIf(sys.platform != 'win32', 'Windows only') +@unittest.skipIf(sys.platform != "win32", "Windows only") class TestNormal(unittest.TestCase): def setUp(self): - self.dirname = '\\\\?\\' + op.join(gettempdir(), 'python.send2trash') - self.file = op.join(self.dirname, 'testfile.txt') + self.dirname = "\\\\?\\" + op.join(gettempdir(), "python.send2trash") + self.file = op.join(self.dirname, "testfile.txt") self._create_tree(self.file) + self.files = [ + op.join(self.dirname, "testfile{}.txt".format(index)) for index in range(10) + ] + [self._create_tree(file) for file in self.files] def tearDown(self): shutil.rmtree(self.dirname, ignore_errors=True) @@ -23,29 +27,38 @@ class TestNormal(unittest.TestCase): dirname = op.dirname(path) if not op.isdir(dirname): os.makedirs(dirname) - with open(path, 'w') as writer: - writer.write('send2trash test') + with open(path, "w") as writer: + writer.write("send2trash test") def test_trash_file(self): s2t(self.file) self.assertFalse(op.exists(self.file)) + def test_trash_multifile(self): + s2t(self.files) + self.assertFalse(any([op.exists(file) for file in self.files])) + def test_file_not_found(self): - file = op.join(self.dirname, 'otherfile.txt') + file = op.join(self.dirname, "otherfile.txt") self.assertRaises(WindowsError, s2t, file) -@unittest.skipIf(sys.platform != 'win32', 'Windows only') + +@unittest.skipIf(sys.platform != "win32", "Windows only") class TestLongPath(unittest.TestCase): def setUp(self): - filename = 'A' * 100 - self.dirname = '\\\\?\\' + op.join(gettempdir(), filename) - self.file = op.join( + filename = "A" * 100 + self.dirname = "\\\\?\\" + op.join(gettempdir(), filename) + path = op.join( self.dirname, filename, filename, # From there, the path is not trashable from Explorer filename, - filename + '.txt') + filename + "{}.txt", + ) + self.file = path.format("") self._create_tree(self.file) + self.files = [path.format(index) for index in range(10)] + [self._create_tree(file) for file in self.files] def tearDown(self): shutil.rmtree(self.dirname, ignore_errors=True) @@ -54,16 +67,21 @@ class TestLongPath(unittest.TestCase): dirname = op.dirname(path) if not op.isdir(dirname): os.makedirs(dirname) - with open(path, 'w') as writer: - writer.write('Looong filename!') + with open(path, "w") as writer: + writer.write("Looong filename!") def test_trash_file(self): s2t(self.file) self.assertFalse(op.exists(self.file)) + def test_trash_multifile(self): + s2t(self.files) + self.assertFalse(any([op.exists(file) for file in self.files])) + @unittest.skipIf( op.splitdrive(os.getcwd())[0] != op.splitdrive(gettempdir())[0], - 'Cannot trash long path from other drive') + "Cannot trash long path from other drive", + ) def test_trash_folder(self): s2t(self.dirname) self.assertFalse(op.exists(self.dirname))