diff --git a/Lib/pathlib/__init__.py b/Lib/pathlib/__init__.py index 12cf9f579cb32d..f5203f6661de82 100644 --- a/Lib/pathlib/__init__.py +++ b/Lib/pathlib/__init__.py @@ -11,6 +11,7 @@ import os import posixpath import sys +from collections import deque from errno import * from glob import _StringGlobber, _no_recurse_symlinks from itertools import chain @@ -1055,19 +1056,6 @@ def rmdir(self): """ os.rmdir(self) - def _delete(self): - """ - Delete this file or directory (including all sub-directories). - """ - if self.is_symlink() or self.is_junction(): - self.unlink() - elif self.is_dir(): - # Lazy import to improve module import time - import shutil - shutil.rmtree(self) - else: - self.unlink() - def rename(self, target): """ Rename this path to the target path. @@ -1105,7 +1093,7 @@ def copy(self, target, **kwargs): if not hasattr(target, 'with_segments'): target = self.with_segments(target) ensure_distinct_paths(self, target) - target._copy_from(self, **kwargs) + deque(target._iter_copy_from(self, **kwargs), maxlen=0) return target.joinpath() # Empty join to ensure fresh metadata. def copy_into(self, target_dir, **kwargs): @@ -1121,22 +1109,26 @@ def copy_into(self, target_dir, **kwargs): target = self.with_segments(target_dir, name) return self.copy(target, **kwargs) - def _copy_from(self, source, follow_symlinks=True, preserve_metadata=False): + def _iter_copy_from(self, source, follow_symlinks=True, + preserve_metadata=False): """ - Recursively copy the given path to this path. + Recursively copy the given path to this path. Yields a + (source, target) tuple after each path is copied. """ if not follow_symlinks and source.info.is_symlink(): self._copy_from_symlink(source, preserve_metadata) elif source.info.is_dir(): children = source.iterdir() os.mkdir(self) - for child in children: - self.joinpath(child.name)._copy_from( - child, follow_symlinks, preserve_metadata) + for src in children: + dst = self.joinpath(src.name) + yield from dst._iter_copy_from( + src, follow_symlinks, preserve_metadata) if preserve_metadata: copy_info(source.info, self) else: self._copy_from_file(source, preserve_metadata) + yield source, self def _copy_from_file(self, source, preserve_metadata=False): ensure_different_files(source, self) @@ -1177,24 +1169,32 @@ def move(self, target): """ Recursively move this file or directory tree to the given destination. """ + if not hasattr(target, 'with_segments'): + target = self.with_segments(target) + # Use os.replace() if the target is os.PathLike and on the same FS. + ensure_different_files(self, target) try: - target = self.with_segments(target) + os.replace(self, os.fspath(target)) except TypeError: pass + except OSError as err: + if err.errno != EXDEV: + raise else: - ensure_different_files(self, target) - try: - os.replace(self, target) - except OSError as err: - if err.errno != EXDEV: - raise - else: - return target.joinpath() # Empty join to ensure fresh metadata. + return target.joinpath() # Empty join to ensure fresh metadata. + # Fall back to copy+delete. - target = self.copy(target, follow_symlinks=False, preserve_metadata=True) - self._delete() - return target + ensure_distinct_paths(self, target) + for src, _dst in target._iter_copy_from( + self, follow_symlinks=False, preserve_metadata=True): + if src.info.is_symlink() or src.is_junction(): + src.unlink() + elif src.info.is_dir(): + src.rmdir() + else: + src.unlink() + return target.joinpath() # Empty join to ensure fresh metadata. def move_into(self, target_dir): """ diff --git a/Lib/pathlib/types.py b/Lib/pathlib/types.py index d8f5c34a1a7513..30b35e64ff393e 100644 --- a/Lib/pathlib/types.py +++ b/Lib/pathlib/types.py @@ -11,6 +11,7 @@ from abc import ABC, abstractmethod +from collections import deque from glob import _PathGlobber from io import text_encoding from pathlib._os import magic_open, ensure_distinct_paths, ensure_different_files, copyfileobj @@ -337,7 +338,7 @@ def copy(self, target, **kwargs): Recursively copy this file or directory tree to the given destination. """ ensure_distinct_paths(self, target) - target._copy_from(self, **kwargs) + deque(target._iter_copy_from(self, **kwargs), maxlen=0) return target.joinpath() # Empty join to ensure fresh metadata. def copy_into(self, target_dir, **kwargs): @@ -404,25 +405,25 @@ def write_text(self, data, encoding=None, errors=None, newline=None): with magic_open(self, mode='w', encoding=encoding, errors=errors, newline=newline) as f: return f.write(data) - def _copy_from(self, source, follow_symlinks=True): - """ - Recursively copy the given path to this path. - """ - stack = [(source, self)] - while stack: - src, dst = stack.pop() - if not follow_symlinks and src.info.is_symlink(): - dst.symlink_to(str(src.readlink()), src.info.is_dir()) - elif src.info.is_dir(): - children = src.iterdir() - dst.mkdir() - for child in children: - stack.append((child, dst.joinpath(child.name))) - else: - ensure_different_files(src, dst) - with magic_open(src, 'rb') as source_f: - with magic_open(dst, 'wb') as target_f: - copyfileobj(source_f, target_f) + def _iter_copy_from(self, source, follow_symlinks=True): + """ + Recursively copy the given path to this path. Yields a + (source, target) tuple after each path is copied. + """ + if not follow_symlinks and source.info.is_symlink(): + self.symlink_to(str(source.readlink()), source.info.is_dir()) + elif source.info.is_dir(): + children = source.iterdir() + self.mkdir() + for src in children: + dst = self.joinpath(src.name) + yield from dst._iter_copy_from(src, follow_symlinks) + else: + ensure_different_files(source, self) + with magic_open(source, 'rb') as source_f: + with magic_open(self, 'wb') as target_f: + copyfileobj(source_f, target_f) + yield source, self _JoinablePath.register(PurePath) diff --git a/Lib/test/test_pathlib/test_pathlib.py b/Lib/test/test_pathlib/test_pathlib.py index 8a313cc4292574..00c69fcf36b53a 100644 --- a/Lib/test/test_pathlib/test_pathlib.py +++ b/Lib/test/test_pathlib/test_pathlib.py @@ -2096,174 +2096,6 @@ def test_rmdir(self): self.assertFileNotFound(p.stat) self.assertFileNotFound(p.unlink) - def test_delete_file(self): - p = self.cls(self.base) / 'fileA' - p._delete() - self.assertFalse(p.exists()) - self.assertFileNotFound(p._delete) - - def test_delete_dir(self): - base = self.cls(self.base) - base.joinpath('dirA')._delete() - self.assertFalse(base.joinpath('dirA').exists()) - self.assertFalse(base.joinpath('dirA', 'linkC').exists( - follow_symlinks=False)) - base.joinpath('dirB')._delete() - self.assertFalse(base.joinpath('dirB').exists()) - self.assertFalse(base.joinpath('dirB', 'fileB').exists()) - self.assertFalse(base.joinpath('dirB', 'linkD').exists( - follow_symlinks=False)) - base.joinpath('dirC')._delete() - self.assertFalse(base.joinpath('dirC').exists()) - self.assertFalse(base.joinpath('dirC', 'dirD').exists()) - self.assertFalse(base.joinpath('dirC', 'dirD', 'fileD').exists()) - self.assertFalse(base.joinpath('dirC', 'fileC').exists()) - self.assertFalse(base.joinpath('dirC', 'novel.txt').exists()) - - def test_delete_missing(self): - tmp = self.cls(self.base, 'delete') - tmp.mkdir() - # filename is guaranteed not to exist - filename = tmp / 'foo' - self.assertRaises(FileNotFoundError, filename._delete) - - @needs_symlinks - def test_delete_symlink(self): - tmp = self.cls(self.base, 'delete') - tmp.mkdir() - dir_ = tmp / 'dir' - dir_.mkdir() - link = tmp / 'link' - link.symlink_to(dir_) - link._delete() - self.assertTrue(dir_.exists()) - self.assertFalse(link.exists(follow_symlinks=False)) - - @needs_symlinks - def test_delete_inner_symlink(self): - tmp = self.cls(self.base, 'delete') - tmp.mkdir() - dir1 = tmp / 'dir1' - dir2 = dir1 / 'dir2' - dir3 = tmp / 'dir3' - for d in dir1, dir2, dir3: - d.mkdir() - file1 = tmp / 'file1' - file1.write_text('foo') - link1 = dir1 / 'link1' - link1.symlink_to(dir2) - link2 = dir1 / 'link2' - link2.symlink_to(dir3) - link3 = dir1 / 'link3' - link3.symlink_to(file1) - # make sure symlinks are removed but not followed - dir1._delete() - self.assertFalse(dir1.exists()) - self.assertTrue(dir3.exists()) - self.assertTrue(file1.exists()) - - @unittest.skipIf(sys.platform[:6] == 'cygwin', - "This test can't be run on Cygwin (issue #1071513).") - @os_helper.skip_if_dac_override - @os_helper.skip_unless_working_chmod - def test_delete_unwritable(self): - tmp = self.cls(self.base, 'delete') - tmp.mkdir() - child_file_path = tmp / 'a' - child_dir_path = tmp / 'b' - child_file_path.write_text("") - child_dir_path.mkdir() - old_dir_mode = tmp.stat().st_mode - old_child_file_mode = child_file_path.stat().st_mode - old_child_dir_mode = child_dir_path.stat().st_mode - # Make unwritable. - new_mode = stat.S_IREAD | stat.S_IEXEC - try: - child_file_path.chmod(new_mode) - child_dir_path.chmod(new_mode) - tmp.chmod(new_mode) - - self.assertRaises(PermissionError, tmp._delete) - finally: - tmp.chmod(old_dir_mode) - child_file_path.chmod(old_child_file_mode) - child_dir_path.chmod(old_child_dir_mode) - - @needs_windows - def test_delete_inner_junction(self): - import _winapi - tmp = self.cls(self.base, 'delete') - tmp.mkdir() - dir1 = tmp / 'dir1' - dir2 = dir1 / 'dir2' - dir3 = tmp / 'dir3' - for d in dir1, dir2, dir3: - d.mkdir() - file1 = tmp / 'file1' - file1.write_text('foo') - link1 = dir1 / 'link1' - _winapi.CreateJunction(str(dir2), str(link1)) - link2 = dir1 / 'link2' - _winapi.CreateJunction(str(dir3), str(link2)) - link3 = dir1 / 'link3' - _winapi.CreateJunction(str(file1), str(link3)) - # make sure junctions are removed but not followed - dir1._delete() - self.assertFalse(dir1.exists()) - self.assertTrue(dir3.exists()) - self.assertTrue(file1.exists()) - - @needs_windows - def test_delete_outer_junction(self): - import _winapi - tmp = self.cls(self.base, 'delete') - tmp.mkdir() - src = tmp / 'cheese' - dst = tmp / 'shop' - src.mkdir() - spam = src / 'spam' - spam.write_text('') - _winapi.CreateJunction(str(src), str(dst)) - dst._delete() - self.assertFalse(dst.exists()) - self.assertTrue(spam.exists()) - self.assertTrue(src.exists()) - - @unittest.skipUnless(hasattr(os, "mkfifo"), 'requires os.mkfifo()') - @unittest.skipIf(sys.platform == "vxworks", - "fifo requires special path on VxWorks") - def test_delete_on_named_pipe(self): - p = self.cls(self.base, 'pipe') - os.mkfifo(p) - p._delete() - self.assertFalse(p.exists()) - - p = self.cls(self.base, 'dir') - p.mkdir() - os.mkfifo(p / 'mypipe') - p._delete() - self.assertFalse(p.exists()) - - def test_delete_does_not_choke_on_failing_lstat(self): - try: - orig_lstat = os.lstat - tmp = self.cls(self.base, 'delete') - - def raiser(fn, *args, **kwargs): - if fn != tmp: - raise OSError() - else: - return orig_lstat(fn) - - os.lstat = raiser - - tmp.mkdir() - foo = tmp / 'foo' - foo.write_text('') - tmp._delete() - finally: - os.lstat = orig_lstat - @os_helper.skip_unless_hardlink def test_hardlink_to(self): P = self.cls(self.base) diff --git a/Misc/NEWS.d/next/Library/2025-05-10-20-55-04.gh-issue-132566.tEdoMp.rst b/Misc/NEWS.d/next/Library/2025-05-10-20-55-04.gh-issue-132566.tEdoMp.rst new file mode 100644 index 00000000000000..0d85396984d7d3 --- /dev/null +++ b/Misc/NEWS.d/next/Library/2025-05-10-20-55-04.gh-issue-132566.tEdoMp.rst @@ -0,0 +1,2 @@ +Fix issue where :meth:`pathlib.Path.move` copied all paths before deleting +any when moving between filesystems.