Source code for portmodlib.fs

# Copyright 2019-2021 Portmod Authors
# Distributed under the terms of the GNU General Public License v3

import os
import shutil
import stat
from fnmatch import fnmatch, fnmatchcase
from functools import lru_cache
from pathlib import Path
from shutil import copystat
from typing import Callable, Generator, List, Optional, Set, Tuple, Union

from portmodlib.portmod import _get_hash
from portmodlib.source import HashAlg

# 32MB buffer seems to give the best balance between performance on large files
# and on small files
HASH_BUF_SIZE = 32 * 1024 * 1024

try:
    DirEntryStr = os.DirEntry[str]
except TypeError:
    # Python 3.7 and 3.8 don't like subscripting os.DirEntry
    DirEntryStr = os.DirEntry  # type: ignore


def onerror(func, path, _exc_info):
    """
    Error handler for ``shutil.rmtree``.

    If the error is due to an access error (read only file)
    it attempts to add write permission and then retries.

    If the error is for another reason it re-raises the error.

    Usage : ``shutil.rmtree(path, onerror=onerror)``
    """
    if not os.access(path, os.W_OK):
        # Is the error an access error ?
        os.chmod(path, stat.S_IWUSR)
        func(path)
    else:
        raise  # pylint: disable=misplaced-bare-raise


def _move2(src: DirEntryStr, dest: str):
    if os.path.islink(src):
        return os.symlink(os.readlink(src.path), dest)
    return shutil.move(src.path, dest)


def _patch_file(
    src: os.DirEntry,
    dst: str,
    overwrite: bool = True,
    move_function: Callable[[os.DirEntry, str], None] = _move2,
):
    if os.path.exists(dst) and src.is_file():
        if overwrite:
            os.remove(dst)
        else:
            raise FileExistsError(f"File {dst} already exists")

    move_function(src, dst)


def _iter_files_to_patch(
    src: Union[str, DirEntryStr],
    dst: str,
    *,
    ignore: Optional[Callable[[str, List[str]], Set[str]]] = None,
    case_sensitive: bool = True,
) -> Generator[Tuple[os.DirEntry, str], None, None]:
    with os.scandir(src) as itr:
        entries = list(itr)
    if ignore is not None:
        ignored_names = ignore(os.fspath(src), [x.name for x in entries])
    else:
        ignored_names = set()

    for entry in entries:
        if entry.name in ignored_names:
            continue
        if case_sensitive:
            dstname = os.path.join(dst, entry.name)
        else:
            dstname = ci_exists(os.path.join(dst, entry.name)) or os.path.join(
                dst, entry.name
            )

        if entry.is_symlink():
            yield (entry, dstname)
        elif entry.is_dir():
            yield from _iter_files_to_patch(
                entry,
                dstname,
                ignore=ignore,
                case_sensitive=case_sensitive,
            )
        else:
            yield (entry, dstname)


# Modified version of shutil.copytree from
# https://github.com/python/cpython
# Python software and documentation are licensed under the
# Python Software Foundation License Version 2
[docs]def patch_dir( src: Union[str, os.DirEntry], dst: str, *, overwrite: bool = True, ignore: Optional[Callable[[str, List[str]], Set[str]]] = None, case_sensitive: bool = True, move_function: Callable[[os.DirEntry, str], None] = _move2, ) -> str: """ Copies src ontop of dst args: src: Source directory to copy from dst: Destination directory to copy to overwrite: If true, overwrite existing files. ignore: A callable which, given a directory and its contents, should return a set of files to ignore case_sensitive: If False, treat file and directory names as case insensitive move_function: The function to use to transfer individual files. Default is shutil.move (modified to accept a DirEntry). The signature should match shutil.copy2. raises: FileExistsError returns: Returns dst """ for src_file, dst_file in _iter_files_to_patch( src, dst, ignore=ignore, case_sensitive=case_sensitive, ): parent_dir = os.path.dirname(dst_file) if not os.path.isdir(parent_dir): os.makedirs(parent_dir) try: copystat(os.path.dirname(src_file.path), parent_dir) except OSError as why: if getattr(why, "winerror", None) is None: raise why _patch_file( src_file, dst_file, overwrite=overwrite, move_function=move_function, ) return dst
def ci_exists(path: str, *, prefix: Optional[str] = None) -> Optional[str]: """ Checks if a path exists, ignoring case. If the path exists but is ambiguous the result is not guaranteed args: path: The path to check. This path must either be absolute, or be relative to the prefix prefix: A leading path to ignore. If path is relative, it is treated as relative to this directory If prefix is None, path is treated as relative to the current working directory. Case-insensitive checks will not be performed on the components of the prefix """ if os.path.isabs(path) and os.path.exists(path): return path if os.path.isabs(path): partial_path = prefix or "/" else: partial_path = prefix or os.getcwd() if os.path.exists(os.path.join(partial_path, path)): return os.path.join(partial_path, path) if os.path.isabs(path): # Ignore leading empty component when splitting absolute paths components = os.path.normpath(path).split(os.sep)[1:] else: components = os.path.normpath(path).split(os.sep) for component in components: found = False # The entry that exists is not a directory, so it cannot have any contents if not os.path.isdir(partial_path): return None for entryname in os.listdir(partial_path): if entryname.lower() == component.lower(): partial_path = os.path.join(partial_path, entryname) found = True break if not found: return None if os.path.exists(partial_path): return partial_path return None def get_tree_size(path): """Return total size of files in given path and subdirs.""" total = 0 for entry in os.scandir(path): if entry.is_dir(follow_symlinks=False): total += get_tree_size(entry.path) else: total += entry.stat(follow_symlinks=False).st_size return total @lru_cache(maxsize=None) def get_hash(filename: str, funcs=(HashAlg.BLAKE3,)) -> List[str]: """Hashes the given file""" return _get_hash(filename, [func.value for func in funcs], HASH_BUF_SIZE) def is_parent(path: str, prefix: str) -> bool: """ Returns true if and only if prefix is a parent directory of path args: path: An absolute path prefix: An absolute path returns: True if and only if prefix is a parent directory of path """ path = os.path.normpath(os.path.abspath(path)) prefix = os.path.normpath(os.path.abspath(prefix)) return ( os.path.splitdrive(path)[0] == os.path.splitdrive(prefix)[0] and os.path.commonpath([path, prefix]) == prefix ) def make_unique_filename(path: str, *, case_insensitive: bool = False) -> str: numeric_component = 1 new_name = os.path.basename(path) directory = os.path.dirname(path) while (case_insensitive and ci_exists(new_name, prefix=directory)) or ( not case_insensitive and os.path.exists(os.path.join(directory, new_name)) ): name, ext = os.path.splitext(os.path.basename(path)) new_name = f"{name}.{numeric_component}{ext}" return os.path.join(directory, new_name) def match(path: Path, pattern: str) -> bool: """ Returns true if the glob-style pattern matches the given path, relative to root The pattern can contain :py:mod:`fnmatch`-style patterns however it breaks them up per path component, and ``**`` can be used to match recursively. args: path: a relative path pattern: A relative glob-style pattern to match path to. returns: True if and only if path matches pattern. """ assert not path.is_absolute() pattern_path = Path(pattern) path_parts = list(reversed(path.parts)) pattern_parts = list(reversed(pattern_path.parts)) def match_inner(path_parts: List[str], pattern_parts: List[str]) -> bool: while path_parts and pattern_parts: path_part = path_parts.pop() pattern_part = pattern_parts.pop() if pattern_part == "**": # Should match all remaining path parts # Base case: Match ** against nothing. # Otherwise, re-insert it into the pattern and try on the remaining path return match_inner( path_parts + [path_part], list(pattern_parts) ) or match_inner(list(path_parts), pattern_parts + [pattern_part]) if os.environ.get("CASE_INSENSITIVE_FILES"): if not fnmatch(path_part, pattern_part): return False else: if not fnmatchcase(path_part, pattern_part): return False # Only match if there are no components left return not path_parts and not pattern_parts return match_inner(path_parts, pattern_parts)