Module transparentpath.gcsutils.transparentpath

Expand source code
import warnings
import builtins
import os
import json
import sys
import tempfile
from time import time
from copy import copy
from datetime import datetime
from pathlib import Path
from typing import Union, Tuple, Any, Iterator, Optional, Iterable, List, Callable

from .methodtranslator import MultiMethodTranslator
import gcsfs
from fsspec.implementations.local import LocalFileSystem
from inspect import signature
import collections


class TPMultipleExistenceError(Exception):
    """Exception raised when a path's destination already contain more than
    one element.
    """

    def __init__(self, path, ls):
        self.path = path
        self.ls = ls
        self.message = (
            f"Error in TransparentPath: Multiple objects exist at path {path}.\nHere is the output of ls in the "
            f"parent directory:\n {self.ls}"
        )
        super().__init__(self.message)

    def __str__(self):
        return self.message


class TPCachingWarning(Warning):
    def __init__(self, message: str = ""):
        self.message = message
        super().__init__(self.message)


def errormessage(which) -> str:
    return (
        f"Support for {which} does not seem to be installed for TransparentPath.\n"
        f"You can change that by running 'pip install transparentpath[{which}]'."
    )


def errorfunction(which) -> Callable:
    # noinspection PyUnusedLocal
    def _errorfunction(*args, **kwargs):
        raise ImportError(errormessage(which))

    return _errorfunction


# So I can use it in myisinstance
class TransparentPath:
    def __fspath__(self) -> str:
        """Implemented later"""
        pass


builtins_isinstance = builtins.isinstance


def mysmallisinstance(obj1: Any, obj2) -> bool:
    """Will return True when testing whether a TransparentPath is a str (required to use open(TransparentPath()))
    or a TransparentPath, and False in every other cases (even pathlib.Path)."""

    if type(obj1) == TransparentPath:
        if obj2 == TransparentPath or obj2 == str:
            return True
        else:
            return False

    if obj2 == TransparentPath:
        if type(obj1) == TransparentPath:
            return True
        else:
            return False

    return builtins_isinstance(obj1, obj2)


def myisinstance(obj1: Any, obj2) -> bool:
    """Will return True when testing whether a TransparentPath is a str (required to use open(TransparentPath()))
    and False when testing whether a pathlib.Path is a TransparentPath."""

    if not (builtins_isinstance(obj2, list) or builtins_isinstance(obj2, set) or builtins_isinstance(obj2, tuple)):
        return mysmallisinstance(obj1, obj2)
    else:
        is_instance = False
        for _type in obj2:
            is_instance |= mysmallisinstance(obj1, _type)
        return is_instance


setattr(builtins, "isinstance", myisinstance)


def collapse_ddots(path: Union[Path, TransparentPath, str]) -> TransparentPath:
    """Collapses the double-dots (..) in the path

    Parameters
    ----------
    path: Union[Path, TransparentPath, str]
        The path containing double-dots

    Returns
    -------
    TransparentPath
        The collapsed path.

    """
    # noinspection PyUnresolvedReferences
    thetype = path.fs_kind if type(path) == TransparentPath else None
    # noinspection PyUnresolvedReferences
    thebucket = path.bucket if type(path) == TransparentPath else None
    # noinspection PyUnresolvedReferences
    notupdatecache = path.notupdatecache if type(path) == TransparentPath else None
    # noinspection PyUnresolvedReferences
    when_checked = path.when_checked if type(path) == TransparentPath else None
    # noinspection PyUnresolvedReferences
    when_updated = path.when_updated if type(path) == TransparentPath else None
    # noinspection PyUnresolvedReferences
    update_expire = path.update_expire if type(path) == TransparentPath else None
    # noinspection PyUnresolvedReferences
    check_expire = path.check_expire if type(path) == TransparentPath else None
    # noinspection PyUnresolvedReferences

    newpath = Path(path) if type(path) == str else path

    if str(newpath) == ".." or str(newpath) == "/..":
        raise ValueError("Can not go before root")

    while ".." in newpath.parts:
        # noinspection PyUnresolvedReferences
        newnewpath = Path(newpath.parts[0])
        for part in newpath.parts[1:]:
            if part == "..":
                newnewpath = newnewpath.parent
            else:
                newnewpath /= part
        newpath = newnewpath

    if str(newpath) == str(path):
        return path
    return (
        TransparentPath(
            newpath,
            collapse=False,
            nocheck=True,
            fs=thetype,
            bucket=thebucket,
            notupdatecache=notupdatecache,
            when_checked=when_checked,
            when_updated=when_updated,
            update_expire=update_expire,
            check_expire=check_expire,
        )
        if thetype is not None
        else newpath
    )


def treat_remote_prefix(path: Union[Path, TransparentPath, str], bucket: str) -> Tuple[str, str]:
    splitted = str(path).split(TransparentPath.remote_prefix)
    if len(splitted) == 0:
        if bucket is None and TransparentPath.bucket is None:
            raise ValueError(
                "If using a path starting with 'gs://', you must include the bucket name in it unless it"
                "is specified with bucket= or if TransparentPath already has been set to use a specified bucket"
                "with set_global_fs"
            )
        path = str(path).replace(TransparentPath.remote_prefix, "", 1)

    else:
        bucket_from_path = splitted[1].split("/")[0]
        if bucket is not None:
            if bucket != bucket_from_path:
                raise ValueError(
                    f"Bucket name {bucket_from_path} was found in your path name, but it does "
                    f"not match the bucket name you specified with bucket={bucket}"
                )
        else:
            bucket = bucket_from_path

        path = str(path).replace(TransparentPath.remote_prefix, "", 1)
        if path.startswith(bucket_from_path) or (len(path) > 0 and path[1:].startswith(bucket_from_path)):
            path = path.replace(bucket_from_path, "", 1)
        if path.startswith("/"):
            path = path[1:]
    return path, bucket


def get_fs(
    fs_kind: str,
    bucket: Union[str, None] = None,
    token: Optional[Union[str, dict]] = None,
    path: Union[Path, None] = None,
) -> Tuple[Union[gcsfs.GCSFileSystem, LocalFileSystem], str, str]:
    """Gets the FileSystem object of either gcs or local (Default)

    If GCS is asked and bucket is specified, will check that it exists and is accessible.

    Parameters
    ----------
    fs_kind: str
        Returns GCSFileSystem if 'gcs_*', LocalFilsSystem if 'local'.
    bucket: str
        bucket name for GCS
    token: Optional[Union[str, dict]]
        credentials (default value = None)
    path: Pathlib.Path
        Only relevant if the method was called from TransparentPath.__init__() : will attempts to fetch the bucket
        from the path if bucket is not given

    Returns
    -------
    Tuple[Union[gcsfs.GCSFileSystem, LocalFileSystem], Union[None, str], Union[None, str], Union[None, str]]
        The FileSystem object, the project if on GCS else None, and the bucket if on GCS.
    """

    if fs_kind is None:
        fs_kind = ""
    if fs_kind == "" and token is not None:
        fs_kind = "gcs"
    fs_name = None
    if fs_kind == "local":
        bucket = None

    if path is not None and fs_kind != "local":
        # Called from TransparentPath.__init__()
        if bucket is not None:
            fs_name = check_bucket(bucket)
        if bucket is None and len(path.parts) > 0:
            bucket = path.parts[0]
            fs_name = check_bucket(bucket)
            if fs_name is None:
                bucket = None
        if bucket is None:
            bucket = TransparentPath.bucket
            fs_name = check_bucket(bucket)

        if fs_name is not None:
            return copy(TransparentPath.fss[fs_name]), fs_name, bucket

    if "gcs" in fs_kind or token is not None:

        # If bucket is specified, get the filesystem that contains it if it already exists. Else, create the filesystem.
        if bucket is not None:
            fs_name = check_bucket(bucket)
            if fs_name is not None:
                fs = copy(TransparentPath.fss[fs_name])
                return fs, fs_name, ""

        fs_name, project, token = extract_fs_name(token)
        if fs_name in TransparentPath.fss:
            pass
        elif token is None:
            fs = gcsfs.GCSFileSystem(project=project, asynchronous=False)
            TransparentPath.buckets_in_project[fs_name] = get_buckets(fs)
            TransparentPath.fss[fs_name] = fs
        else:
            fs = gcsfs.GCSFileSystem(project=project, asynchronous=False, token=token)
            TransparentPath.buckets_in_project[fs_name] = get_buckets(fs)
            TransparentPath.fss[fs_name] = fs

        ret_bucket = False
        if bucket is None and path is not None and len(path.parts) > 0:
            bucket = path.parts[0]
            ret_bucket = True
        if bucket is not None:
            if not bucket.endswith("/"):
                bucket += "/"
            if bucket not in TransparentPath.buckets_in_project[fs_name]:
                raise NotADirectoryError(f"Bucket {bucket} does not exist in any loaded projects")

        fs = copy(TransparentPath.fss[fs_name])
        if ret_bucket:
            return fs, fs_name, bucket
        else:
            return fs, fs_name, ""
    else:
        if "local" not in TransparentPath.fss:
            TransparentPath.fss["local"] = LocalFileSystem()
        return copy(TransparentPath.fss["local"]), "local", ""


def get_buckets(fs: gcsfs.GCSFileSystem) -> List[str]:
    """Return list of all buckets in the file system."""
    if "" not in fs.dircache:
        items = []
        page = fs.call("GET", "b/", project=fs.project, json_out=True)

        assert page["kind"] == "storage#buckets"
        items.extend(page.get("items", []))
        next_page_token = page.get("nextPageToken", None)

        while next_page_token is not None:
            page = fs.call(
                "GET",
                "b/",
                project=fs.project,
                pageToken=next_page_token,
                json_out=True,
            )

            assert page["kind"] == "storage#buckets"
            items.extend(page.get("items", []))
            next_page_token = page.get("nextPageToken", None)
        fs.dircache[""] = [{"name": i["name"] + "/", "size": 0, "type": "directory"} for i in items]
    return [b["name"] for b in fs.dircache[""]]


def check_bucket(bucket: Union[str, None]) -> Union[str, None]:
    """Check that the bucket exists in an initiated file system and returns the corresponding file system's name,
    or raises NotADirectoryError."""
    if bucket is None:
        return None
    bucket = str(bucket)
    if not bucket.endswith("/"):
        bucket += "/"
    fs = None
    for proj in TransparentPath.buckets_in_project:
        if bucket in TransparentPath.buckets_in_project[proj]:
            fs = proj
            break
    return fs


def check_kwargs(method: Callable, kwargs: dict):
    """Takes as argument a method and some kwargs. Will look in the method signature and return in two separate dict
    the kwargs that are in the signature and those that are not.

    If the method does not return any signature or if it explicitely accepts **kwargs, does not do anything
    """
    unexpected_kwargs = []
    s = ""
    try:
        sig = signature(method)
        if "kwargs" in sig.parameters or "kwds" in sig.parameters:
            return
        for arg in kwargs:
            if arg not in sig.parameters:
                unexpected_kwargs.append(f"{arg}={kwargs[arg]}")

        if len(unexpected_kwargs) > 0:
            s = f"You provided unexpected kwargs for method {method.__name__}:"
            s = "\n  - ".join([s] + unexpected_kwargs)
    except ValueError:
        return

    if s != "":
        raise ValueError(s)


def get_index_and_date_from_kwargs(**kwargs: dict) -> Tuple[int, bool, dict]:
    index_col = kwargs.get("index_col", None)
    parse_dates = kwargs.get("parse_dates", None)
    if index_col is not None:
        del kwargs["index_col"]
    if parse_dates is not None:
        del kwargs["parse_dates"]
    # noinspection PyTypeChecker
    return index_col, parse_dates, kwargs


def extract_fs_name(token: str = None) -> Tuple[str, str, Union[str, None]]:
    if token is None and "GOOGLE_APPLICATION_CREDENTIALS" not in os.environ:
        fs = gcsfs.GCSFileSystem()
        project = fs.project
        if (
            project is None
            or fs.credentials is None
            or not hasattr(fs.credentials.credentials, "service_account_email")
            or fs.credentials.credentials.service_account_email is None
        ):
            raise EnvironmentError(
                "If no token is explicitely specified and GOOGLE_APPLICATION_CREDENTIALS environnement variable is not"
                " set, you need to have done gcloud init or to be on GCP already to create a TransparentPath"
            )
        email = fs.credentials.credentials.service_account_email
        return f"gcs_{project}_{email}", project, None
    elif token is None:
        token = os.getenv("GOOGLE_APPLICATION_CREDENTIALS")

    token = token.strip()
    if not TransparentPath(token, fs="local", nocheck=True, notupdatecache=True).is_file():
        raise FileNotFoundError(f"Credential file {token} not found")
    content = json.load(open(token))
    if "project_id" not in content:
        raise ValueError(f"Credential file {token} does not contain project_id key.")
    if "client_email" not in content:
        raise ValueError(f"Credential file {token} does not contain client_email key.")

    fs_name = f"gcs_{content['project_id']}_{content['client_email']}"
    TransparentPath.tokens[fs_name] = token
    return fs_name, content["project_id"], token


class TransparentPath(os.PathLike):  # noqa : F811
    # noinspection PyUnresolvedReferences
    """
    A class that allows one to use a path in a local file system or a Google Cloud Storage (GCS) file system in the
    same way one would use a `pathlib.Path` object. One can use many different GCP projects at once.

    Create a path that points to GCS, and one that does not:
    >>> from transparentpath import Path
    >>> # Or : from transparentpath import TransparentPath
    >>> p = Path("gs://mybucket/some_path", token="some/cred/file.json")
    >>> p2 = p / "foo"  # Will point to gs://mybucket/some_path/foo
    >>> p3 = Path("bar")  # Will point to local path "bar"

    Set all paths to point to GCS by default:
    >>> from transparentpath import Path
    >>> Path.set_global_fs("gcs", token="some/cred/file.json")
    >>> p = Path("mybucket") / "some_path" # Will point to gs://mybucket/some_path
    >>> p2 = p / "foo"  # Will point to gs://mybucket/some_path/foo
    >>> p3 = Path("bar", fs="local")  # Will point to local path "bar"
    >>> p4 = Path("other_bucket")  # Will point to gs://other_bucket (assuming other_bucket is a bucket)
    >>> p5 = Path("not_a_bucket")  # Will point to local path "not_a_bucket" (assuming it is indeed, not a bucket)

    Set all paths to point to severral GCS projects by default:
    >>> from transparentpath import Path
    >>> Path.set_global_fs("gcs", token="some/cred/file.json")
    >>> Path.set_global_fs("gcs", token="some/other/cred/file.json")
    >>> p = Path("mybucket") / "some_path" # Will point to gs://mybucket/some_path
    >>> p2 = p / "foo"  # Will point to gs://mybucket/some_path/foo
    >>> p3 = Path("bar", fs="local")  # Will point to local path "bar"
    >>> p4 = Path("other_bucket")  # Will point to gs://other_bucket (assuming other_bucket is a bucket)
    >>> p5 = Path("not_a_bucket")  # Will point to local path "not_a_bucket" (assuming it is indeed, not a bucket)

    Here, *mybucket* and *other_bucket* can be on two different projects, as long as at least one of the
    credential files can access them.

    Set all paths to point to GCS by default, and specify a default bucket:
    >>> from transparentpath import Path
    >>> Path.set_global_fs("gcs", bucket="mybucket", token="some/cred/file.json")
    >>> p = Path("some_path")  # Will point to gs://mybucket/some_path/
    >>> p2 = p / "foo"  # Will point to gs://mybucket/some_path/foo
    >>> p3 = Path("bar", fs="local")  # Will point to local path "bar"
    >>> p4 = Path("other_bucket")  # Will point to gs://mybucket/other_bucket
    >>> p5 = Path("not_a_bucket")  # Will point to gs://mybucket/not_a_bucket

    The latest option is interesting if you have a code that should be able to run with paths being sometimes remote,
    sometimes local. To do that, you can use the class attribute `nas_dir`. Then when a path is created, if it starts by
    *nas_dir*'s path, *nas_dir*'s path is replaced by the bucket name. This is useful if, for instance, you have a
    backup of a bucket locally at let's say */my/local/backup*. Then you can do:
    >>> from transparentpath import Path
    >>> Path.nas_dir = "/my/local/backup"
    >>> Path.set_global_fs("gcs", bucket="mybucket", token="some/cred/file.json")
    >>> p = Path("some_path")  # Will point to gs://mybucket/some_path/
    >>> p3 = Path("/my/local/backup") / "some_path"  # Will ALSO point to gs://mybucket/some_path/

    >>> from transparentpath import Path
    >>> Path.nas_dir = "/my/local/backup"
    >>> # Path.set_global_fs("gcs", bucket="mybucket", token="some/cred/file.json")
    >>> p = Path("some_path")  # Will point to /my/local/backup/some_path/
    >>> p3 = Path("/my/local/backup") / "some_path"  # Will ALSO point to /my/local/backup/some_path/

    In all the previous examples, the *token* argument can be ommited if the environment variable
    **GOOGLE_APPLICATION_CREDENTIALS** is set and point to a *.json* credential file, or if your code runs on a GCP
    machine (VM, cluster...) with access to GCS.

    No matter whether you are using GCS or your local file system, here is a sample of what TransparentPath can do:
    >>> from transparentpath import Path
    >>> # Path.set_global_fs("gcs", bucket="bucket_name", project="project_name")
    >>> # The following lines will also work with the previous line uncommented
    >>>
    >>> # Reading a csv into a pandas' DataFrame and saving it as a parquet file
    >>> mypath = Path("foo") / "bar.csv"
    >>> df = mypath.read(index_col=0, parse_dates=True)
    >>> otherpath = mypath.with_suffix(".parquet")
    >>> otherpath.write(df)
    >>>
    >>> # Reading and writing a HDF5 file works on GCS and on local:
    >>> import numpy as np
    >>> mypath = Path("foo") / "bar.hdf5"  # can be .h5 too
    >>> with mypath.read() as ifile:
    >>>     arr = np.array(ifile["store1"])
    >>>
    >>> # Doing '..' from 'foo/bar.hdf5' will return 'foo'
    >>> # Then doing 'foo' + 'babar.hdf5' will return 'foo/babar.hdf5' ('+' and '/' are synonymes)
    >>> mypath.cd("..")  # Does not return a path but modifies inplace
    >>> with (mypath  + "babar.hdf5").write(None) as ofile:
    >>>     # Note here that we must explicitely give 'None' to the 'write' method in order for it
    >>>     # to return the open HDF5 file. We could also give a dict of {arr: "store1"} to directly
    >>>     # write the file.
    >>>     ofile["store1"] = arr
    >>>
    >>>
    >>> # Reading a text file. Can also use 'w', 'a', etc... also works with binaries.
    >>> mypath = Path("foo") / "bar.txt"
    >>> with open(mypath, "r") as ifile:
    >>>     lines = ifile.readlines()
    >>>
    >>> # open is overriden to understand gs://
    >>> with open("gs://bucket/file.txt", "r") as ifile:
    >>>     _ = ifile.readlines()
    >>>
    >>> mypath.is_file()
    >>> mypath.is_dir()
    >>> mypath.is_file()
    >>> files = mypath.parent.glob("*.csv")  # Returns a Iterator[TransparentPath], can be casted to list

    As you can see from the previous example, all methods returning a path from a TransparentPath return a
    TransparentPath.

    TransparentPath supports writing and reading Dask dataframes from and to csv, excel, parquet and HDF5, both locally
    and remotely. You need to have dask-dataframe and dask-distributed installed, which will be the case if you ran `pip
    install transparentpath[dask]`. Writing Dask dataframes does not require any additionnal arguments to be
    passed for the type will be checked before calling the appropriate writting method. Reading however requires you to
    pass the *use_dask* argument to the `transparentpath.gcsutils.transparentpath.TransparentPath.read()` method.
    If the file to read is HDF5, you will also need to specify *set_names*, matching the argument *key* of Dask's
    `read_hdf()` method.

    Note that if reading a remote HDF5, the file will be downloaded in your local tmp, then read. If not using Dask, the
    file is deleted after being read. But since Dask uses delayed processes, deleting the file might occure before the
    file is actually read, so the file is kept. Up to you to empty your */tmp* directory if it is not done automatically
    by your system.

    All instances of TransparentPath are absolute, even if created with relative paths.

    TransparentPaths are seen as instances of str:
    >>> from transparentpath import Path
    >>> path = Path()
    >>> isinstance(path, str)  # returns True

    This is required to allow
    >>> from transparentpath import Path
    >>> path = Path()
    >>> # noinspection PyTypeChecker
    >>> with open(path, "w/r/a/b...") as ifile:
    >>> ...
    to work. If you want to check whether path is actually a TransparentPath and nothing else, use
    >>> from transparentpath import Path
    >>> path = Path()
    >>> assert type(path) == Path
    >>> assert issubclass(path.__class__, Path)
    instead.

    Any method or attribute valid in `fsspec.implementations.local.LocalFileSystem`, `gcs.GCSFileSystem`, `pathlib.Path`
    or `str` can be used on a TransparentPath object.

    **Warnings about GCS behaviour**
    if you use GCS:\n
      1. Remember that directories are not a thing on GCS.\n
      2. You do not need the parent directories of a file on GCS to create the file : they will be created if they do
      not exist (that is not true localy however).\n
      3. If you delete a file that was alone in its parent directories, those directories disapear.\n
      4. If a file exists at the same path than a directory, then TransparentPath is not able to know which one is the
      file and which one is the directory, and will raise a
    `transparentpath.gcsutils.transparentpath.TPMultipleExistenceError` upon object creation. This
      check for multiplicity is done at almost every method in case an exterior source created a duplicate of the
      file/directory. This case can't happen locally. However, it can happen on remote if the cache is not updated
      frequently. Doing this check can significantly increase computation time (if using glob on a directory
      containing a lot of files for example). You can deactivate it either globally (TransparentPath._do_check =
      False and TransparentPath._do_update_cache = False), for a specific path (pass nockeck=True at path
      creation), or for glob and ls by passing fast=True as additional argument.


    TransparentPath on GCS is slow because of the verification for multiple existance and the cache updating.
    However one can tweak those a bit. As mentionned earlier, cache updating and multiple existence check can be
    deactivated for all paths by doing
    >>> from transparentpath import TransparentPath
    >>> TransparentPath._do_update_cache = False
    >>> TransparentPath._do_check = False

    They can also be deactivated for one path only by doing
    >>> p = TransparentPath("somepath", nocheck=True, notupdatecache=True)

    It is also possible to specify when to do those check : at path creation, path usage (read, write, exists...) or
    both. Here to it can be set on all paths or only some :
    >>> TransparentPath._when_checked = {"created": True, "used": False}  # Default value
    >>> TransparentPath._when_updated = {"created": True, "used": False}  # Default value
    >>> p = TransparentPath(
    >>>   "somepath", when_checked={"created": False, "used": False}, notupdatecache={"created": False, "used": False}
    >>> )

    There is also an expiration time in seconds for check and update : the operation is not done if it was done not a
    long time ago. Those expiration times are of 1 second by default and can be changed through :
    >>> TransparentPath._check_expire = 10
    >>> TransparentPath._update_expire = 10
    >>> p = TransparentPath("somepath", check_expire=0, update_expire=0)
    ```

    `transparentpath.gcsutils.transparentpath.TransparentPath.glob()` and
    `transparentpath.gcsutils.transparentpath.TransparentPath.ls()` have their own way to be accelerated :
    >>> p.glob("/*", fast=True)
    >>> p.ls("", fast=True)
    Basically, *fast=True* means "do not check and do not update the cache" for all the items found by the method.

    Builtin `open` is overloaded by TransparentPath to support giving a TransparentPath to it. If a method in a package
    you did not create uses `open` in a *with* statement, everything should work out of the box with a TransparentPath.

    However, if it uses the **output** of `open`, you will have to create a class to
    override this method and anything using its ouput. Indeed, `open` returns a file descriptor, not an IO, and I did
    not find a way to access file descriptors on gcs. For example, in the FileLock package, the `acquire` method calls
    the `_acquire` method which calls `os.open`, so I had to do that:

    >>> from filelock import FileLock
    >>> from transparentpath import Path
    >>>
    >>> class MyFileLock(FileLock):
    >>>     def _acquire(self):
    >>>         tmp_lock_file = self._lock_file
    >>>         if not type(tmp_lock_file) == Path:
    >>>             tmp_lock_file = Path(tmp_lock_file)
    >>>         try:
    >>>             fd = tmp_lock_file.open("x")
    >>>         except (IOError, OSError, FileExistsError):
    >>>             pass
    >>>         else:
    >>>             self._lock_file_fd = fd
    >>>         return None

    The original method was:
    >>>
    >>> import os
    >>> ...
    >>> def _acquire(self):
    >>>     open_mode = os.O_WRONLY | os.O_CREAT | os.O_EXCL | os.O_TRUNC
    >>>     try:
    >>>         fd = os.open(self._lock_file, open_mode)
    >>>     except (IOError, OSError):
    >>>         pass
    >>>     else:
    >>>         self._lock_file_fd = fd
    >>>     return None
    >>> ...

    I tried to implement a working version of any method valid in pathlib.Path or in file systems, but futur changes
    in any of those will not be taken into account quickly. You can report missing supports by opening an issue.
    """

    @classmethod
    def reinit(cls):
        """Reinit all class attributes to their default values"""
        cls.remote_prefix = "gs://"
        cls.fss = {}
        cls.buckets_in_project = {}
        cls.fs_kind = None
        cls.bucket = None
        cls.nas_dir = None
        cls.unset = True
        cls.cwd = os.getcwd()
        cls.tokens = {}
        cls._do_update_cache = True
        cls._do_check = True
        cls._check_expire = 1
        cls._update_expire = 1
        cls._when_checked = {"used": False, "created": True}
        cls._when_updated = {"used": False, "created": True}
        cls.LOCAL_SEP = os.path.sep
        cls.cached_data_dict = collections.OrderedDict()
        cls.used_memory = 0
        cls.caching = "None"
        cls.caching_max_memory = 100

    @classmethod
    def show_state(cls):
        """Prints the state of the TransparentPath class"""
        print("remote_prefix: ", cls.remote_prefix)
        print("fss: ", cls.fss)
        print("buckets_in_project: ", cls.buckets_in_project)
        print("fs_kind: ", cls.fs_kind)
        print("bucket: ", cls.bucket)
        print("nas_dir: ", cls.nas_dir)
        print("unset: ", cls.unset)
        print("cwd: ", cls.cwd)
        print("tokens: ", cls.tokens)
        print("_do_update_cache: ", cls._do_update_cache)
        print("_do_check: ", cls._do_check)
        print("_check_expire: ", cls._check_expire)
        print("_update_expire: ", cls._update_expire)
        print("_when_updated: ", cls._when_updated)
        print("LOCAL_SEP: ", cls.LOCAL_SEP)
        print("cached_data_dict: ", cls.cached_data_dict)
        print("used_memory: ", cls.used_memory)
        print("caching: ", cls.caching)
        print("caching_max_memory: ", cls.caching_max_memory)

    @classmethod
    def get_state(cls) -> dict:
        """Returns the state of the TransparentPath class in a dictionnary"""
        state = {
            "remote_prefix": cls.remote_prefix,
            "fss": cls.fss,
            "buckets_in_project": cls.buckets_in_project,
            "fs_kind": cls.fs_kind,
            "bucket": cls.bucket,
            "nas_dir": cls.nas_dir,
            "unset": cls.unset,
            "cwd": cls.cwd,
            "tokens": cls.tokens,
            "_do_update_cache": cls._do_update_cache,
            "_do_check": cls._do_check,
            "_check_expire": cls._check_expire,
            "_update_expire": cls._update_expire,
            "_when_checked": cls._when_checked,
            "_when_updated": cls._when_updated,
            "LOCAL_SEP": cls.LOCAL_SEP,
            "cached_data_dict: ": cls.cached_data_dict,
            "used_memory: ": cls.used_memory,
            "caching: ": cls.caching,
            "caching_max_memory: ": cls.caching_max_memory,
        }
        return state

    remote_prefix = "gs://"
    """remote prefix of the known possible remote file system. For now, only GCS is supported, so default is gs://"""
    fss = {}
    """Declared filesystems. Keys are 'local' or 'gcs_cred_mail' and values are
     `fsspec.implementations.local.LocalFileSystem` or `gcsfs.GCSFileSystem` objects"""
    buckets_in_project = {}
    """Known buckets. Keys are 'gcs_cred_mail' and values are bucket names (str)"""
    fs_kind = None
    """Default fs kind ('local' or 'gcs')"""
    bucket = None
    """Default bucket"""
    nas_dir = None
    """If found in a remote path, is replaced by the bucket"""
    unset = True
    """False once `transparentpath.gcsutils.transparentpath.TransparentPath.set_global_fs` is called"""
    cwd = os.getcwd()
    """currend working directory"""
    tokens = {}
    """Known credentials. Keys are 'gcs_cred_mail' and values are credential files paths (str)"""
    _do_update_cache = True
    """If True, will update the cache according to 
    `transparentpath.gcsutils.transparentpath.TransparentPath._when_checked`"""
    _do_check = True
    """If True, will check for duplicate objects according to 
    `transparentpath.gcsutils.transparentpath.TransparentPath._when_updated`"""
    _update_expire = 1
    """Seconds before cache update expires. 0 means never."""
    _check_expire = 1
    """Seconds before a check for duplicate objects expires. 0 means never."""
    _when_updated = {"used": False, "created": True}
    """When to update the cache: at path creation and/or at path use."""
    _when_checked = {"used": False, "created": True}
    """When to check for duplicate objects: at path creation and/or at path use."""
    LOCAL_SEP = os.path.sep
    """Path separator, depends on the OS"""
    caching: str = "None"
    """Caching, meaning file content read from a transparentpath will be saved in tmp for quicker access later in the
    code. Deactivated by default"""
    caching_max_memory = 100
    """Max size allowed in cache (in MB)"""
    used_memory = 0
    """Current cache size (in MB)"""
    cached_data_dict = collections.OrderedDict()
    """caching memory as MB by saved object"""
    _attributes = [
        "bucket",
        "fs_kind",
        "fs",
        "nas_dir",
        "path",
        "sep",
        "nocheck",
        "notupdatecache",
        "last_check",
        "last_update",
        "update_expire",
        "check_expire",
        "when_checked",
        "when_updated",
    ]
    """List of class instance attribute names"""

    method_without_self_path = [
        "end_transaction",
        "get_mapper",
        "read_block",
        "start_transaction",
        "connect",
        "load_tokens",
    ]
    """Methods that do not use `transparentpath.gcsutils.transparentpath.TransparentPath.path`"""

    method_path_concat = []
    """Methods present here are assumed that their first argument must be concatenated with
    `transparentpath.gcsutils.transparentpath.TransparentPath.path`"""

    translations = {
        "mkdir": MultiMethodTranslator(
            "mkdir",
            ["local", "gcs"],
            ["mkdir", "self._do_nothing"],
            [{"parents": "create_parents"}, {"parents": ""}],
        ),
    }
    """To translate method args and kwargs between `fsspec.implementations.local.LocalFileSystem` 
    and `gcsfs.GCSFileSystem`"""

    @classmethod
    def set_global_fs(
        cls,
        fs: str,
        bucket: Union[str, None] = None,
        nas_dir: Optional[Union[TransparentPath, Path, str]] = None,
        token: Optional[Union[dict, str]] = None,
    ) -> None:
        """To call before creating any instance to set the file system.

        If not called, default file system is local. If the first parameter is 'local', the file system is local. If
        the first parameter is 'gcs', file system is GCS.

        Parameters
        ----------
        fs: str
            'gcs' will use GCSFileSystem, 'local' will use LocalFileSystem
        bucket: str
            The bucket name, only valid if using gcs (Default value =  None)
        nas_dir: Union[TransparentPath, Path, str]
            If specified, TransparentPath will delete any occurence of 'nas_dir' at the beginning of created paths if fs
            is gcs (Default value = None).
        token: Optional[Union[dict, str]]
            credentials (default value = None). If not specified, will use envvar GOOGLE_APPLICATION_CREDENTIALS. If not
            specified either, will try to log with default account, which will work is using a machine on GCP
            (VM, cluster...)

        Returns
        -------
        None
        """
        if "gcs" not in fs and fs != "local":
            raise ValueError(f"Unknown value {fs} for parameter 'fs'")

        cls.fs_kind = fs
        cls.bucket = bucket

        TransparentPath._set_nas_dir(cls, nas_dir)
        get_fs(cls.fs_kind, cls.bucket, token)
        TransparentPath.unset = False

    def __init__(
        self,
        path: Union[Path, TransparentPath, str] = ".",
        collapse: bool = True,
        fs: Optional[str] = "",
        bucket: Optional[str] = None,
        token: Optional[Union[dict, str]] = None,
        nocheck: Optional[bool] = None,
        notupdatecache: Optional[bool] = None,
        update_expire: Optional[int] = None,
        check_expire: Optional[int] = None,
        when_checked: Optional[dict] = None,
        when_updated: Optional[dict] = None,
        enable_caching: bool = False,
        **kwargs,
    ):
        """Creator of the TranparentPath object

        Parameters
        ----------
        path: Union[pathlib.Path, TransparentPath, str]
            The path of the object (Default value = '.')
        collapse: bool
            If True, will collapse any double dots ('..') in path. (Default value = True)
        fs: Optional[str]
            The file system to use, 'local' or 'gcs'. If None, uses the default one set by
            `transparentpath.gcsutils.transparentpath.TransparentPath.set_global_fs` if any, or 'local' (Default = None)
        bucket: Optional[str]
            The bucket name if using GCS and if path is not 'gs://bucket/...'
        token: Optional[Union[dict, str]]
            The path to google application credentials json file to use, if envvar GOOGLE_APPLICATION_CREDENTIALS
            is not set and the code is not running on a GCP machine.
        nocheck: bool
            If True, will not call check_multiplicity (quicker but less secure). Takes the value of
            not Transparentpath._do_check if None (Default value = None)
        notupdatecache: bool
            If True, will not call _invalidate_cache when doing operations on this path (quicker but less secure).
            Takes the value of not Transparentpath._do_update_cache if None (Default value = None)
        update_expire: Optional[int]
            Time in second after which the cache is considered obsolete and must be updated. Takes the value of
            Transparentpath._update_expire if None (Default value = None)
        check_expire: Optional[int]
            Time in second after which the check for multiple existence is considered obsolete and must be updated.
            Takes the value of Transparentpath._check_expire if None (Default value = None)
        when_checked: Optional[dict]
            Dict of the form {"used: True, "created": True}, that indicates when to check multiplicity of the path.
            Takes the value of Transparentpath._when_checked if None (Default value = None)
        when_updated: Optional[dict]
            Same as when_checked but for cache update.
        enable_caching: bool
            If True, will enable file caching, meaning file content read from a transparentpath will be saved in tmp
            for quicker access later in the code. Default is False.

        kwargs:
            Any optional kwargs valid for `pathlib.Path`
        """
        self.enable_caching = enable_caching

        if path is None:
            path = "."

        if (
            not (type(path) == type(Path("dummy")))  # noqa: E721
            and not (type(path) == str)
            and not (type(path) == TransparentPath)
        ):
            raise TypeError(f"Unsupported type {type(path)} for path")

        # I never remember whether I should use fs='local' or fs_kind='local'. That way I don't need to.
        if fs is None:
            fs = ""
        if "fs_kind" in kwargs and fs == "" and kwargs["fs_kind"] is not None and kwargs["fs_kind"] != "":
            fs = kwargs["fs_kind"]
            del kwargs["fs_kind"]
        if bucket == "":
            bucket = None

        # Copy path completely if it is a TransparentPath and we did not
        # ask for a new file system
        if type(path) == TransparentPath and fs == "":
            # noinspection PyUnresolvedReferences
            self.bucket = path.bucket
            # noinspection PyUnresolvedReferences
            self.fs_kind = path.fs_kind
            # noinspection PyUnresolvedReferences
            self.fs = copy(path.fs)
            # noinspection PyUnresolvedReferences
            self.nas_dir = path.nas_dir
            # noinspection PyUnresolvedReferences
            self.__path = path.path
            # noinspection PyUnresolvedReferences
            self.sep = path.sep
            # noinspection PyUnresolvedReferences
            self.nocheck = path.nocheck
            # noinspection PyUnresolvedReferences
            self.notupdatecache = path.notupdatecache
            # noinspection PyUnresolvedReferences
            self.last_check = path.last_check
            # noinspection PyUnresolvedReferences
            self.last_update = path.last_update
            # noinspection PyUnresolvedReferences
            self.update_expire = path.update_expire
            # noinspection PyUnresolvedReferences
            self.check_expire = path.check_expire
            # noinspection PyUnresolvedReferences
            self.when_checked = path.when_checked
            # noinspection PyUnresolvedReferences
            self.when_updated = path.when_updated
            # noinspection PyUnresolvedReferences
            self.token = path.token
            return

        # In case we initiate a path containing 'gs://'
        if str(path).startswith(TransparentPath.remote_prefix):
            if fs == "local":
                raise ValueError(
                    "You specified a path starting with 'gs://' but ask for it to be local. This is not possible."
                )
            path, bucket = treat_remote_prefix(path, bucket)
            fs = "gcs"

        self.__path = Path(str(path).encode("utf-8").decode("utf-8"), **kwargs)

        self.token = token
        self.fs, self.fs_kind, b = get_fs(fs, bucket, token, path=self.__path)
        if b != "":
            bucket = b

        self.bucket = bucket
        self.nas_dir = TransparentPath.nas_dir
        self.nocheck = nocheck if nocheck is not None else not TransparentPath._do_check
        self.notupdatecache = notupdatecache if notupdatecache is not None else not TransparentPath._do_update_cache
        self.last_check = 0
        self.last_update = 0
        self.update_expire = update_expire if update_expire is not None else TransparentPath._update_expire
        self.check_expire = check_expire if check_expire is not None else TransparentPath._check_expire
        self.when_checked = when_checked if when_checked is not None else TransparentPath._when_checked
        self.when_updated = when_updated if when_updated is not None else TransparentPath._when_updated

        if self.fs_kind == "local":
            self.sep = TransparentPath.LOCAL_SEP
            self.__path = self.__path.absolute()
        else:
            self.sep = "/"

        if collapse:
            self.__path = collapse_ddots(self.__path)

        if self.fs_kind == "local":

            # ON LOCAL

            if len(self.__path.parts) > 0 and self.__path.parts[0] == "..":
                raise ValueError("The path can not start with '..'")

        else:

            # ON GCS

            # Remove occurences of nas_dir at beginning of path, if any
            if self.nas_dir is not None and (
                str(self.__path).startswith(os.path.abspath(self.nas_dir) + os.sep) or str(self.__path) == self.nas_dir
            ):
                self.__path = self.__path.relative_to(self.nas_dir)

            if str(self.__path) == "." or str(self.__path) == "/":
                self.__path = Path(self.bucket)
            elif len(self.__path.parts) > 0:
                if self.__path.parts[0] == "..":
                    raise ValueError("Trying to access a path before bucket")
                if str(self.__path)[0] == "/":
                    self.__path = Path(str(self.__path)[1:])

                if not str(self.__path.parts[0]) == self.bucket:
                    self.__path = Path(self.bucket) / self.__path
            else:
                self.__path = Path(self.bucket) / self.__path
            # if len(self.__path.parts) > 1 and self.bucket in self.__path.parts[1:]:
            #     raise ValueError("You should never use your bucket name as a directory or file name.")

        if self.when_checked["created"] and not self.nocheck:
            self._check_multiplicity()
        elif self.when_updated["created"] and not self.notupdatecache:  # Else, because called by check_multiplicity
            self._update_cache()

    @property
    def path(self):
        return self.__path

    @path.setter
    def path(self, value):
        raise AttributeError("Can not set protected attribute 'path'")

    def __dask_tokenize__(self):
        return hash(self)

    def __contains__(self, item: str) -> bool:
        """Overload of 'in' operator
        Use __fspath__ instead of str(self) so that any method trying to assess whether the path is on gcs using
        '//' in path will return True.
        """
        return item in self.__fspath__()

    # noinspection PyUnresolvedReferences
    def __eq__(self, other: TransparentPath) -> bool:
        """Two paths are equal if their absolute pathlib.Path (double dots collapsed) are the same, and all other
        attributes are the same."""
        if not isinstance(other, TransparentPath):
            return False
        p1 = collapse_ddots(self)
        p2 = collapse_ddots(other)
        if p1.__fspath__() != p2.__fspath__():
            return False
        if p1.fs_kind != p2.fs_kind:
            return False
        return True

    def __lt__(self, other: TransparentPath) -> bool:
        return str(self) < str(other)

    def __gt__(self, other: TransparentPath) -> bool:
        return str(self) > str(other)

    def __le__(self, other: TransparentPath) -> bool:
        return str(self) <= str(other)

    def __ge__(self, other: TransparentPath) -> bool:
        return str(self) >= str(other)

    def __add__(self, other: str) -> TransparentPath:
        """Alias of truediv

        You can do :
        >>> from transparentpath import TransparentPath
        >>> p = TransparentPath("/chat")
        >>> p + "chien"
        /chat/chien

        If you want to add a string without having a '/' poping, use 'append':
        >>> from transparentpath import TransparentPath
        >>> p = TransparentPath("/chat")
        >>> p.append("chien")
        /chatchien
        """
        return self.__truediv__(other)

    def __iadd__(self, other: str) -> TransparentPath:
        return self.__itruediv__(other)

    def __radd__(self, other):
        raise TypeError(
            "You cannot div/add by a TransparentPath because they are all absolute path, which would result in a path "
            "before root "
        )

    def __truediv__(self, other: str) -> TransparentPath:
        """Overload of the division ('/') method
        TransparentPath behaves like pathlib.Path in regard to the division :
        it appends the denominator to the numerator.

        Parameters
        ----------
        other: str
            The relative path to append to self

        Returns
        -------
        TransparentPath
            The appended path
        """

        if other.startswith(self.sep):
            other = other[1:]

        if type(other) == str:
            return TransparentPath(
                self.__path / other,
                fs=self.fs_kind,
                bucket=self.bucket,
                notupdatecache=self.notupdatecache,
                nocheck=self.nocheck,
                when_checked=self.when_checked,
                when_updated=self.when_updated,
                update_expire=self.update_expire,
                check_expire=self.check_expire,
            )
        else:
            raise TypeError(f"Can not divide a TransparentPath by a {type(other)}, only by a string.")

    def __itruediv__(self, other: str) -> TransparentPath:
        """itruediv will be an actual itruediv only if other is a str"""

        if other.startswith(self.sep):
            other = other[1:]

        if type(other) == str:
            self.__path /= other
            return self
        else:
            raise TypeError(f"Can not divide a TransparentPath by a {type(other)}, only by a string.")

    def __rtruediv__(self, other: Union[TransparentPath, Path, str]):
        raise TypeError(
            "You cannot div/add by a TransparentPath because they are all absolute path, which would result in a path "
            "before root "
        )

    def __str__(self) -> str:
        return self.__fspath__()

    def __repr__(self) -> str:
        return str(self.__path)

    def __fspath__(self) -> str:
        if self.fs_kind == "local":
            return str(self.__path)
        else:
            s = "".join([TransparentPath.remote_prefix, str(self.__path)])
            if TransparentPath.LOCAL_SEP != "/":
                s = s.replace(TransparentPath.LOCAL_SEP, "/")
            return s

    def __hash__(self) -> int:
        """Uniaue hash number.

        Two TransarentPath will have a same hash number if their fspath are the same and fs_kind (which will inlude
        the project name if remote) are the same."""
        hash_number = int.from_bytes((self.fs_kind + self.__fspath__()).encode(), "little") + int.from_bytes(
            self.fs_kind.encode(), "little"
        )
        return hash_number

    def __getattr__(self, obj_name: str) -> Any:
        """Overload of the __getattr__ method
        Is called when trying to fetch a method or attribute not implemeneted in the class. If it is a method,
        will then execute _obj_missing to check if the method has been translated, or exists in the file system
        object. If it is an attribute, will check whether it exists in pathlib.Path objects, and if so add it to
        self. If this new attribute is a pathlib.Path, casts it into a TransparentPath.


        Parameters
        ----------
        obj_name: str
            The method or attribute name


        Returns
        --------
        Any
            What the method/attribute 'obj_name' is supposed to return/be

        """

        if callable(self):
            raise AttributeError(f"{obj_name} does not belong to TransparentPath")

        if obj_name in TransparentPath._attributes:
            raise AttributeError(
                f"Attribute {obj_name} is expected to belong to TransparentPath but is not found. Something somehow "
                f"tried to access this attribute before a proper call to __init__. "
            )

        if obj_name in TransparentPath.translations:
            return lambda *args, **kwargs: self._obj_missing(obj_name, "translate", *args, **kwargs)

        elif obj_name in dir(self.fs):
            obj = getattr(self.fs, obj_name)
            if not callable(obj):
                exec(f"self.{obj_name} = obj")
            else:
                return lambda *args, **kwargs: self._obj_missing(obj_name, "fs", *args, **kwargs)

        elif obj_name in dir(self.__path):
            obj = getattr(self.__path, obj_name)
            if not callable(obj):
                # Fetch the self.path's attributes to set it to self
                if type(obj) == type(self.__path):  # noqa: E721
                    newpath = TransparentPath(
                        obj,
                        fs=self.fs_kind,
                        bucket=self.bucket,
                        notupdatecache=self.notupdatecache,
                        nocheck=self.nocheck,
                        when_checked=self.when_checked,
                        when_updated=self.when_updated,
                        update_expire=self.update_expire,
                        check_expire=self.check_expire,
                    )
                    setattr(self, obj_name, newpath)
                    return newpath
                elif isinstance(obj, Iterable):
                    obj = self._cast_iterable(obj)
                    setattr(self, obj_name, obj)
                    return obj
                else:
                    setattr(self, obj_name, obj)
                    return obj
            elif self.fs_kind == "local":
                return lambda *args, **kwargs: self._obj_missing(obj_name, "pathlib", *args, **kwargs)
            else:
                raise AttributeError(f"{obj_name} is not an attribute nor a method of TransparentPath")

        elif obj_name in dir(""):
            obj = getattr("", obj_name)
            if not callable(obj):
                # Fetch the string's attributes to set it to self
                setattr(self, obj_name, obj)
                return obj
            else:
                return lambda *args, **kwargs: self._obj_missing(obj_name, "str", *args, **kwargs)
        else:
            raise AttributeError(f"{obj_name} is not an attribute nor a method of TransparentPath")

    # /////////////// #
    # PRIVATE METHODS #
    # /////////////// #

    @staticmethod
    def _set_nas_dir(obj, nas_dir):
        if nas_dir is not None:
            if isinstance(nas_dir, TransparentPath):
                obj.nas_dir = nas_dir.__path
            elif isinstance(nas_dir, str):
                obj.nas_dir = Path(nas_dir)
            else:
                obj.nas_dir = nas_dir

    def _cast_fast(self, path: str) -> TransparentPath:
        return TransparentPath(
            path,
            fs=self.fs_kind,
            nocheck=True,
            notupdatecache=True,
            bucket=self.bucket,
            when_checked=self.when_checked,
            when_updated=self.when_updated,
            update_expire=self.update_expire,
            check_expire=self.check_expire,
        )

    def _cast_slow(self, path: str) -> TransparentPath:
        return TransparentPath(
            path,
            fs=self.fs_kind,
            nocheck=False,
            notupdatecache=False,
            bucket=self.bucket,
            when_checked={"created": False, "used": False},
            when_updated={"created": False, "used": False},
            update_expire=self.update_expire,
            check_expire=self.check_expire,
        )

    def _obj_missing(self, obj_name: str, kind: str, *args, **kwargs) -> Any:
        """Method to catch any call to a method/attribute missing from the class.
        Tries to call the object on the class's FileSystem object or the instance's self.path (a pathlib.Path object) if
        FileSystem is local


        Parameters
        ----------
        obj_name: str
            The missing object's name

        kind: str
            Either 'fs', 'pathlib' or 'translate'

        args
            args to pass to the object

        kwargs
            kwargs to pass to the object


        Returns
        -------
        Any
            What the missing object is supposed to return

        """

        # Append the absolute path to self.path according to whether the object
        # needs it and whether we are in gcs or local
        new_args = self._transform_path(obj_name, *args)

        # Object is a method and exists in FileSystem object but has a
        # different name or its kwargs have different names, so use the
        # MethodTranslator class
        if kind == "translate":
            translated, new_args, new_kwargs = TransparentPath.translations[obj_name].translate(
                self.fs_kind, *new_args, **kwargs
            )
            if "self" in translated:
                the_method = getattr(TransparentPath, translated.split("self.")[1])
                to_ret = the_method(self, *args, **new_kwargs)
                return to_ret
            else:
                the_method = getattr(self.fs, translated)
                to_ret = the_method(*new_args, **new_kwargs)
            return to_ret

        # Here, could be a method or an attribute.
        # It exists in FileSystem and has same name and same kwargs (if is a
        # method).
        elif kind == "fs":
            the_obj = getattr(self.fs, obj_name)
            if callable(the_obj):
                if len(signature(the_obj).parameters) == 0:
                    to_ret = the_obj()
                else:
                    to_ret = the_obj(*new_args, **kwargs)
            else:
                return the_obj
            return to_ret

        # Method does not exist in FileSystem, but exists in pathlib,
        # so try that instead. Do not use new_args in that case, we do not need
        # absolute path
        elif kind == "pathlib":
            # If arrives there, then it must be a method. If it had been an
            # attribute, it would have been caught in __getattr__.
            the_method = getattr(Path, obj_name)
            to_ret = the_method(self.__path, *args, **kwargs)
            return to_ret
        elif kind == "str":
            # If arrives there, then it must be a method, and of str. If it had been an
            # attribute, it would have been caught in __getattr__.
            the_method = getattr(str, obj_name)
            to_ret = the_method(str(self), *args, **kwargs)
            return to_ret
        else:
            raise ValueError(f"Unknown value {kind} for attribute kind")

    def _transform_path(self, method_name: str, *args: Tuple) -> Tuple:
        """
        File system methods take self.path as first argument, so add its absolute path as first argument of args.
        Some, like ls or glob, are given a relative path to append to self.path, so we need to change the first
        element of args from args[0] to self.path / args[0]

        Parameters
        ----------
        method_name: str
            The method name, to check whether it needs to append self.path
            or not

        args: Tuple
            The args to pass to the method

        Returns
        -------
        Tuple
            Either the unchanged args, or args with the first element
            prepended by self, or args with a new first element (self)
        """
        new_args = [self]
        if method_name in TransparentPath.method_without_self_path:
            return args
        elif method_name in TransparentPath.method_path_concat:
            # Assumes first given arg in args must be concatenated with
            # absolute self.path
            if len(args) > 0:
                new_args = [str(self / str(args[0]))]
                if len(args) > 1:
                    new_args.append(args[1:])
            new_args = tuple(new_args)
        else:
            # noinspection PyTypeChecker
            new_args = tuple([str(self)] + list(args))
        return new_args

    def _update_cache(self):
        """Calls FileSystem's invalidate_cache() to discard the cache then calls a non-disruptive method (fs.info(
        bucket)) to update it.

        If local, on need to update the chache. Not even sure it needs to be invalidated...
        """

        if time() - self.last_update < self.update_expire:
            return

        self.fs.invalidate_cache()
        if "gcs" in self.fs_kind:
            try:
                self.fs.info(self.bucket)
            except FileNotFoundError:
                # noinspection PyStatementEffect
                self.buckets
        self.last_update = time()

    def _check_multiplicity(self) -> None:
        """Checks if several objects correspond to the path.
        Raises MultipleExistenceError if so, does nothing if not.
        """

        if time() - self.last_check < self.check_expire:
            return

        if not self.notupdatecache:
            self._update_cache()
        if str(self.__path) == self.bucket or str(self.__path) == "/":
            return
        if not self.exists():
            return

        collapsed = str(collapse_ddots(self.__path / ".."))
        if collapsed == "/":  # Can not ls on root directory anymore
            return
        thels = self.fs.ls(collapsed)
        if len(thels) > 1:
            thels = [Path(apath).name for apath in thels if Path(apath).name == self.name]
            if len(thels) > 1:
                raise TPMultipleExistenceError(self, thels)

        self.last_check = time()

    def _do_nothing(self) -> None:
        """does nothing (you don't say)"""
        pass

    # ////////////// #
    # PUBLIC METHODS #
    # ////////////// #

    def get_absolute(self) -> TransparentPath:
        """Returns self, since all TransparentPaths are absolute

        Returns
        -------
        TransparentPath
            self

        """
        return self

    @property
    def absolute(self) -> TransparentPath:
        """Returns self, since all TransparentPaths are absolute

        Returns
        -------
        TransparentPath
            self

        """
        return self

    def mkbucket(self, name: Optional[str] = None) -> None:
        raise NotImplementedError

    def rmbucket(self, name: Optional[str] = None) -> None:
        raise NotImplementedError

    def exist(self) -> bool:
        """To prevent typo of 'exist()' without an -s"""
        return self.exists()

    def exists(self) -> bool:
        if str(self.path) == "/" and self.fs_kind == "local":
            return True
        elif self.path == "gs://" and self.fs_kind == "gcs":
            return True
        updated = False
        if self.when_checked["used"] and not self.nocheck:
            self._check_multiplicity()
            updated = True
        elif self.when_updated["used"] and not self.notupdatecache:
            self._update_cache()
            updated = True
        if not self.fs.exists(self.__fspath__()):
            if not updated:
                self._update_cache()
                return self.fs.exists(self.__fspath__())
            else:
                return False
        return True

    def isfile(self) -> bool:
        return self.is_file()

    # noinspection PyUnusedLocal
    def isdir(self, *args, **kwargs) -> bool:
        return self.is_dir()

    # noinspection PyUnusedLocal
    def is_dir(self, *args, **kwargs) -> bool:
        """Check if self is a directory.


        Returns
        -------
        bool

        """
        if self.fs_kind == "local":
            if str(self.path) == "/":
                return True
            return self.__path.is_dir()
        else:
            if not self.exists():
                return False
            if self.is_file():
                return False
            return True

    def is_file(self) -> bool:
        """Check if self is a file
        On GCS, leaves are always files even if created with mkdir.


        Returns
        -------
        bool

        """

        if not self.exists():
            return False

        if self.fs_kind == "local":
            return self.__path.is_file()
        else:
            # GCS is shit and sometimes needs to be checked twice
            if self.info()["type"] == "file" and self.info()["type"] == "file":
                return True
            else:
                return False

    def unlink(self, **kwargs) -> None:
        """Alias of rm, to match pathlib.Path method"""
        self.rm(**kwargs)

    def rm(self, absent: str = "raise", ignore_kind: bool = False, **kwargs) -> None:
        """Removes the object pointed to by self if exists.
        Remember that leaves are always files on GCS, so rm will remove the path if it is a leaf on GCS


        Parameters
        ----------
        absent: str
            What to do if trying to remove an item that does not exist. Can
            be 'raise' or 'ignore' (Default value = 'raise')

        ignore_kind: bool
            If True, will remove anything pointed by self. If False,
            will raise an error if self points to a file and 'recursive' was
            specified in kwargs, or if self points to a dir and 'recursive'
            was not specified (Default value = False)

        kwargs
            The kwargs to pass to file system's rm method


        Returns
        -------
        None

        """

        if absent != "raise" and absent != "ignore":
            raise ValueError(f"Unexpected value for argument 'absent' : {absent}")

        # Asked to remove a directory...
        recursive = kwargs.get("recursive", False)

        if recursive:
            if not self.is_dir():
                # ...but self points to something that is not a directory!
                if self.exists():
                    # Delete anyway
                    if ignore_kind:
                        del kwargs["recursive"]
                        self.rm(absent, **kwargs)
                    # or raise
                    else:
                        raise NotADirectoryError("The path does not point to a directory!")
                # ...but self points to something that does not exist!
                else:
                    if absent == "raise":
                        raise NotADirectoryError("There is no directory here!")
                    else:
                        return
            # ...deletes the directory
            else:
                try:
                    self.fs.rm(self.__fspath__(), **kwargs)
                except OSError as e:
                    if "not found" in str(e).lower():
                        # It is possible that another parallel program deleted the object, in that case just pass
                        pass
                    else:
                        raise e
        # Asked to remove a file...
        else:
            # ...but self points to a directory!
            if self.is_dir():
                # Delete anyway
                if ignore_kind:
                    kwargs["recursive"] = True
                    self.rm(absent=absent, ignore_kind=True, **kwargs)
                # or raise
                else:
                    raise IsADirectoryError("The path points to a directory")
            else:
                # ... but nothing is at self
                if not self.exists():
                    if absent == "raise":
                        raise FileNotFoundError(f"Could not find file {self}")
                    else:
                        return
                else:
                    try:
                        self.fs.rm(self.__fspath__(), **kwargs)
                    except OSError as e:
                        if "not found" in str(e).lower():
                            # It is possible that another parallel program deleted the object, in that case just pass
                            pass
                        else:
                            raise e

    def rmdir(self, absent: str = "raise", ignore_kind: bool = False) -> None:
        """Removes the directory corresponding to self if exists
        Remember that leaves are always files on GCS, so rmdir will never remove a leaf on GCS


        Parameters
        ----------
        absent: str
            What to do if trying to remove an item that does not exist. Can
            be 'raise' or 'ignore' (Default value = 'raise')

        ignore_kind: bool
            If True, will remove anything pointed by self. If False,
            will raise an error if self points to a file and 'recursive' was
            specified in kwargs, or if self point to a dir and 'recursive'
            was not specified (Default value = False)

        """
        self.rm(absent=absent, ignore_kind=ignore_kind, recursive=True)

    def glob(
        self, wildcard: str = "*", fast: bool = False, i_am_sure_i_am_a_dir: bool = False
    ) -> Iterator[TransparentPath]:
        """Returns a list of TransparentPath matching the wildcard pattern.

        By default, the wildcard is '*'. It means 'thepath/*', so will glob in the directory.

        Parameters
        -----------
        wildcard: str
            The wilcard pattern to match, relative to self (Default value = "*")

        fast: bool
            If True, does not check multiplicity when converting output paths to TransparentPath, significantly
            speeding up the process (Default value = False)
        i_am_sure_i_am_a_dir: bool
            If True, will not check that self points to a directory. Saves time.


        Returns
        --------
        Iterator[TransparentPath]
            The list of items matching the pattern

        """

        if not i_am_sure_i_am_a_dir:
            if not self.is_dir():
                raise NotADirectoryError("The path must be a directory if you want to glob in it")

        if wildcard.startswith("/") or wildcard.startswith("\\"):
            wildcard = wildcard[1:]

        if wildcard.startswith("**/*"):
            wildcard = wildcard.replace("**/*", "**")

        path_to_glob = (self.__path / wildcard).__fspath__()

        try:
            if fast:
                to_ret = map(self._cast_fast, self.fs.glob(path_to_glob))
            else:
                to_ret = map(self._cast_slow, self.fs.glob(path_to_glob))
        except TypeError as e:
            if "list indices must be integers or slices, not str" in str(e):
                to_ret = []
            else:
                raise e
        return to_ret

    def with_suffix(self, suffix: str) -> TransparentPath:
        """Returns a new TransparentPath object with a changed suffix
        Uses the with_suffix method of pathlib.Path


        Parameters
        -----------
        suffix: str
            suffix to use, with the dot ('.pdf', '.py', etc ..). Can also use '' to remove the suffix.

        Returns
        --------
        TransparentPath

        """
        if not suffix.startswith(".") and not suffix == "":
            suffix = f".{suffix}"
        return TransparentPath(
            self.__path.with_suffix(suffix),
            fs=self.fs_kind,
            bucket=self.bucket,
            notupdatecache=self.notupdatecache,
            nocheck=self.nocheck,
            when_checked=self.when_checked,
            when_updated=self.when_updated,
            update_expire=self.update_expire,
            check_expire=self.check_expire,
        )

    def ls(self, path_to_ls: str = "", fast: bool = False) -> Iterator[TransparentPath]:
        """Unlike glob, if on GCS, will also see directories.


        Parameters
        -----------
        path_to_ls: str
            Path to ls, relative to self (default value = "")
        fast: bool
            If True, does not check multiplicity when converting output
            paths to TransparentPath, significantly speeding up the process
            (Default value = False)


        Returns
        --------
        Iterator[TransparentPath]

        """

        if isinstance(path_to_ls, TransparentPath):
            raise TypeError("Can not use a TransparentPath as a argument of ls() : TransparentPath are all absolute")

        if not self.is_dir():
            raise NotADirectoryError("The path must be a directory if you want to ls in it")

        if fast:
            to_ret = map(self._cast_fast, self.fs.ls(str(self / path_to_ls)))
        else:
            to_ret = map(self._cast_slow, self.fs.ls(str(self / path_to_ls)))
        return to_ret

    def cd(self, path: Optional[str] = None) -> None:
        """cd-like command. Works inplace

        Will collapse double-dots ('..'), so not compatible with symlinks. If path is absolute (starts with '/' or
        bucket name or is empty), will return a path starting from root directory if FileSystem is local, from bucket
        if it is GCS. If passing None or "" , will have the same effect than "/" on GCS, will return the current
        working directory on local. If passing ".", will return a path at the location of self. Will raise an error
        if trying to access a path before root or bucket.


        Parameters
        ----------
        path: str
            The path to cd to. Absolute, or relative to self.
            (Default value = None)


        Returns
        -------
        None: works inplace

        """

        # Will collapse any '..'

        if not isinstance(path, str) or isinstance(path, TransparentPath):
            raise TypeError("Can only pass a string to TransparentPath's cd method")

        path = path.replace(TransparentPath.remote_prefix, "", 1)

        if "gcs" in self.fs_kind and str(path) == self.bucket or path == "" or str(path) == "/":
            self.__path = Path(self.bucket)
            return

        # If asked to cd to home, return path script calling directory
        if path == "" or path is None:
            self.__path = Path()
            return

        # noinspection PyUnresolvedReferences
        self.__path = self.__path / path

        if self.fs_kind == "local":
            # If asked for an absolute path
            if path.startswith("/"):
                self.__path = Path(path)
                return
            # noinspection PyUnresolvedReferences
            if len(self.__path.parts) == 0:
                return
            # noinspection PyUnresolvedReferences
            if self.__path.parts[0] == "..":
                raise ValueError("The first part of a path can not be '..'")
        else:
            # If asked for an absolute path
            if path.startswith("/"):
                self.__path = Path(self.bucket) / path[1:]
                return
            # noinspection PyUnresolvedReferences
            if self.__path == 1:  # On gcs, first part is bucket
                return
            # noinspection PyUnresolvedReferences
            if self.__path.parts[1] == "..":
                raise ValueError("Trying to access a path before bucket")

        # noinspection PyUnresolvedReferences
        self.__path = collapse_ddots(self.__path)

    def touch(self, present: str = "ignore", **kwargs) -> None:
        """Creates the file corresponding to self if does not exist.

        Raises FileExistsError if there already is an object that is not a file at self. Default behavior is to
        create parent directories of the file if needed. This can be canceled by passing 'create_parents=False', but
        only if not using GCS, since directories are not a thing on GCS.


        Parameters
        ----------
        present: str
            What to do if there is already something at self. Can be "raise" or "ignore" (Default value = "ignore")

        kwargs
            The kwargs to pass to file system's touch method


        Returns
        -------
        None

        """

        if present != "raise" and present != "ignore":
            raise ValueError(f"Unexpected value for argument 'present' : {present}")

        if self.exists():
            if self.is_file() and present == "raise":
                raise FileExistsError
            elif not self.is_file():
                raise FileExistsError(f"There is already an object at {self} which is not a file.")
            else:
                return

        for parent in reversed(self.parents):
            p = TransparentPath(
                parent,
                fs=self.fs_kind,
                bucket=self.bucket,
                notupdatecache=self.notupdatecache,
                nocheck=self.nocheck,
                when_checked=self.when_checked,
                when_updated=self.when_updated,
                update_expire=self.update_expire,
                check_expire=self.check_expire,
            )
            if p.is_file():
                raise FileExistsError(f"A parent directory can not be created because there is already a file at {p}")
            elif not p.exists():
                p.mkdir()

        self.fs.touch(self.__fspath__(), **kwargs)

    def mkdir(self, present: str = "ignore", **kwargs) -> None:
        """Creates the directory corresponding to self if does not exist

        Remember that leaves are always files on GCS, so can not create a directory on GCS. Thus, the function will
        have no effect on GCS.


        Parameters
        ----------
        present: str
            What to do if there is already something at self. Can be "raise" or "ignore" (Default value = "ignore")

        kwargs
            The kwargs to pass to file system's mkdir method


        Returns
        -------
        None

        """

        if present != "raise" and present != "ignore":
            raise ValueError(f"Unexpected value for argument 'present' : {present}")

        if self.exists():
            if self.is_dir() and present == "raise":
                raise FileExistsError(f"There is already a directory at {self}")
            if not self.is_dir():
                raise FileExistsError(f"There is already an object at {self} and it is not a  directory")
            return

        for parent in reversed(self.parents):
            thefile = TransparentPath(
                parent,
                fs=self.fs_kind,
                bucket=self.bucket,
                notupdatecache=self.notupdatecache,
                nocheck=self.nocheck,
                when_checked=self.when_checked,
                when_updated=self.when_updated,
                update_expire=self.update_expire,
                check_expire=self.check_expire,
            )
            if thefile.is_file():
                raise FileExistsError(
                    "A parent directory can not be created because there is already a file at" f" {thefile}"
                )

        if self.fs_kind == "local":
            # Use _obj_missing instead of callign mkdir directly because
            # file systems mkdir has some kwargs with different name than
            # pathlib.Path's  mkdir, and this is handled in _obj_missing
            self._obj_missing("mkdir", kind="translate", **kwargs)
        else:
            # Does not mean anything to create a directory on GCS
            pass

    def stat(self) -> dict:
        """Calls file system's stat method and translates the key to os.stat_result() keys

        Returns empty dict of path does not point to anything
        """

        if not self.exist():
            return {}

        key_translation = {
            "size": "st_size",
            "timeCreated": "st_ctime",
            "updated": "st_mtime",
            "created": "st_ctime",
            "mode": "st_mode",
            "uid": "st_uid",
            "gid": "st_gid",
            "mtime": "st_mtime",
        }

        stat = self.fs.stat(self.__fspath__())
        statkeys = list(stat.keys())
        for key in statkeys:
            if key in key_translation:
                if key == "timeCreated" or key == "updated":
                    dt = datetime.strptime(stat[key], "%Y-%m-%dT%H:%M:%S.%fZ")
                    stat[key] = dt.timestamp()
                if key == "created" or key == "mtime":
                    stat[key] = int(stat[key])
                stat[key_translation[key]] = stat[key]

        for key in key_translation.values():
            if key not in stat:
                stat[key] = None

        return stat

    def append(self, other: str) -> TransparentPath:
        return TransparentPath(
            str(self) + other,
            fs=self.fs_kind,
            bucket=self.bucket,
            notupdatecache=self.notupdatecache,
            nocheck=self.nocheck,
            when_checked=self.when_checked,
            when_updated=self.when_updated,
            update_expire=self.update_expire,
            check_expire=self.check_expire,
        )

    def walk(self) -> Iterator[Tuple[TransparentPath, List[TransparentPath], List[TransparentPath]]]:
        """Like os.walk, except all outputs are TransparentPaths (so, absolute paths)

        Returns
        -------
        Iterator[Tuple[TransparentPath, List[TransparentPath], List[TransparentPath]]]
            root, dirs and files, like os.walk
        """

        if self.when_checked["used"] and not self.nocheck:
            self._check_multiplicity()
        # No need to update cache for walk

        outputs = self.fs.walk(self.__fspath__())
        for output in outputs:
            root = TransparentPath(
                output[0],
                fs=self.fs_kind,
                bucket=self.bucket,
                notupdatecache=self.notupdatecache,
                nocheck=self.nocheck,
                when_checked=self.when_checked,
                when_updated=self.when_updated,
                update_expire=self.update_expire,
                check_expire=self.check_expire,
            )
            dirs = [root / p for p in output[1]]
            files = [root / p for p in output[2]]
            yield root, dirs, files

    @property
    def buckets(self) -> List[str]:
        if self.fs_kind == "local":
            return []
        return get_buckets(self.fs)

    def _cast_iterable(self, iter_: Iterable):
        """Used by self.walk"""
        if isinstance(iter_, Path) or isinstance(iter_, TransparentPath):
            return TransparentPath(
                iter_,
                fs=self.fs_kind,
                bucket=self.bucket,
                notupdatecache=self.notupdatecache,
                nocheck=self.nocheck,
                when_checked=self.when_checked,
                when_updated=self.when_updated,
                update_expire=self.update_expire,
                check_expire=self.check_expire,
            )
        elif isinstance(iter_, str):
            return iter_
        elif not isinstance(iter_, Iterable):
            return iter_
        else:
            to_ret = [self._cast_iterable(item) for item in iter_]
            return to_ret

    def caching_ram(self, data, args, kwargs) -> None:
        """
        caching for ram
        """
        filesize = sys.getsizeof(data)
        if filesize > TransparentPath.caching_max_memory * 1000000:
            warnings.warn(
                f"You are trying to add in cache a file of {filesize / 1000000} MB, but the max memory "
                f"for caching is {TransparentPath.caching_max_memory} MB\nCaching canceled",
                TPCachingWarning,
            )
        else:
            while TransparentPath.used_memory + filesize > TransparentPath.caching_max_memory * 1000000:
                # Drop oldest file
                byename, byefile = TransparentPath.cached_data_dict.popitem(last=False)
                TransparentPath.used_memory -= sys.getsizeof(byefile)
                warnings.warn(
                    f"You have exceeded the max memory for caching of {TransparentPath.caching_max_memory} MB"
                    f"(old files {TransparentPath.used_memory / 1000000} MB, new file {filesize / 1000000})"
                    f"removing from cach : {byename}",
                    TPCachingWarning,
                )
            # Adding file to dict and filesize to total used memory
            TransparentPath.used_memory += filesize
            TransparentPath.cached_data_dict[self.__hash__()] = {"data": data, "args": args, "kwargs": kwargs}

    def caching_tmpfile(self, args, kwargs) -> None:
        """
        caching for tmpfile
        """
        temp_file = tempfile.NamedTemporaryFile(delete=True, suffix=self.suffix)
        self.get(temp_file.name)
        # noinspection PyUnresolvedReferences
        tempfilesize = temp_file.file.tell()
        if tempfilesize > TransparentPath.caching_max_memory * 1000000:
            warnings.warn(
                f"You are trying to add in cache a file of {tempfilesize / 1000000} MB, but the max memory "
                f"for caching is {TransparentPath.caching_max_memory} MB\nCaching canceled",
                TPCachingWarning,
            )
        else:
            while TransparentPath.used_memory + tempfilesize > TransparentPath.caching_max_memory * 1000000:
                byename, byefile = TransparentPath.cached_data_dict.popitem(last=False)
                byefile["file"].close()
                TransparentPath.used_memory -= byefile["memory"]
                warnings.warn(
                    f"You have exceeded the max memory for caching of {TransparentPath.caching_max_memory} MB"
                    f"(old files {TransparentPath.used_memory / 1000000} MB, new file {tempfilesize / 1000000})"
                    f"removing from cach : {byename}",
                    TPCachingWarning,
                )
                del byefile
            TransparentPath.used_memory += tempfilesize
            TransparentPath.cached_data_dict[self.__hash__()] = {
                "file": temp_file,
                "memory": tempfilesize,
                "args": args,
                "kwargs": kwargs,
            }

    def caching_saver(self, data, args, kwargs) -> None:
        """
        Save fetched data from read in tmp file or dict,
         if total of cach does not exceed caching_max_memory else remove oldest data

        To use ram caching set self.caching to "ram", to use tmp file caching set self.caching to "tmpfile"

        To disable caching, set self.caching to something else or self.enable_caching to False

        TransparentPath.caching_max_memory is in MB
        """
        if self.enable_caching:
            if self.caching == "ram":
                self.caching_ram(data, args, kwargs)

            elif self.caching == "tmpfile" and self.fs_kind != "local":
                self.caching_tmpfile(args, kwargs)

    def uncache(self) -> None:
        """
        remove data from cache
        """
        if self.enable_caching:
            if self.__hash__() in TransparentPath.cached_data_dict.keys():
                TransparentPath.cached_data_dict.pop(self.__hash__())
            else:
                warnings.warn(f"{self} is not in cache", TPCachingWarning)

    def refresh_cache(self) -> None:
        """
        update tp
        """
        if self.enable_caching:
            if self.__hash__() in TransparentPath.cached_data_dict.keys():
                arg = TransparentPath.cached_data_dict[self.__hash__()]["arg"]
                kwarg = TransparentPath.cached_data_dict[self.__hash__()]["kwarg"]
                self.uncache()
                self.read(*arg, **kwarg)
            else:
                warnings.warn(f"{self.__hash__()} is not in cache", TPCachingWarning)

    def change_suffix(self, suffix: str) -> None:
        if not suffix.startswith("."):
            suffix = f".{suffix}"

        self.path = self.path.with_suffix(suffix)
        if self.when_checked["created"] and not self.nocheck:
            self._check_multiplicity()
        elif self.when_updated["created"] and not self.notupdatecache:  # Else, because called by check_multiplicity
            self._update_cache()

    def read(
        self,
        *args,
        get_obj: bool = False,
        use_pandas: bool = False,
        use_dask: bool = False,
        **kwargs,
    ) -> Any:
        """Method used to read the content of the file located at self

        Will raise FileNotFound error if there is no file. Calls a specific method to read self based on the suffix
        of self.path:
            1: .csv : will use pandas's read_csv
            2: .parquet : will use pandas's read_parquet with pyarrow engine
            3: .hdf5 or .h5 : will use h5py.File or pd.HDFStore (if use_pandas = True). Since it does not support
            remote file systems, the file will be downloaded localy in a tmp file read, then removed.
            4: .json : will use open() method to get file content then json.loads to get a dict
            5: .xlsx : will use pd.read_excel
            6: any other suffix : will return a IO buffer to read from, or the string contained in the file if
            get_obj is False.

        For any of the reading method, the appropriate packages need to have been installed by calling
        `pip install transparentpath[something]`
        The possibilities for 'something' are 'pandas-csv', 'pandas-parquet', 'pandas-excel', 'hdf5', 'json', 'dask'.
        You can install all possible packages by putting 'all' in place of 'something'.

        The default installation of transperantpath is 'vanilla', which will only support read and write of text
         or binary files, and the use of with open(...).

        If self.enable_caching is True, will either save in tmp file (if self.caching == "tmpfile") or store the read
        data in a dict (if self.caching == "ram"), then if the path have already been read, will just return the
        previously stored data

        Parameters
        ----------
        get_obj: bool
            Only relevant for files that are not csv, parquet nor HDF5. If True returns the IO Buffer,
            else the string contained in the IO Buffer (Default value = False)
        use_pandas: bool
            Must pass it as True if hdf5 file was written using HDFStore and not h5py.File (Default value = False)
        use_dask: bool
            To return a Dask DataFrame instead of a pandas DataFrame. Only makes sense if file suffix is xlsx, csv,
            parquet. (Default value = False)
        args:
            any args to pass to the underlying reading method
        kwargs:
            any kwargs to pass to the underlying reading method

        Returns
        -------
        Any
        """
        if self.enable_caching:
            if self.caching == "ram":
                if self.__hash__() in TransparentPath.cached_data_dict.keys():
                    return TransparentPath.cached_data_dict[self.__hash__()]["data"]
            elif self.caching == "tmpfile" and self.fs_kind != "local":
                if self.__hash__() in TransparentPath.cached_data_dict.keys():
                    return TransparentPath(
                        TransparentPath.cached_data_dict[self.__hash__()]["file"].name, fs="local"
                    ).read(*args, get_obj, use_pandas, use_dask, **kwargs)
        if self.suffix == ".csv":
            ret = self.read_csv(use_dask=use_dask, **kwargs)
            self.caching_saver(
                ret, args, kwargs.update({"use_pandas": use_pandas, "use_dask": use_dask, "get_obj": get_obj})
            )
            return ret
        elif self.suffix == ".parquet":
            index_col = None
            if "index_col" in kwargs:
                index_col = kwargs["index_col"]
                del kwargs["index_col"]
            # noinspection PyNoneFunctionAssignment
            content = self.read_parquet(use_dask=use_dask, **kwargs)
            if index_col:
                # noinspection PyUnresolvedReferences
                content.set_index(content.columns[index_col])
            self.caching_saver(
                content, args, kwargs.update({"use_pandas": use_pandas, "use_dask": use_dask, "get_obj": get_obj})
            )
            return content
        elif self.suffix == ".hdf5" or self.suffix == ".h5":
            ret = self.read_hdf5(use_pandas=use_pandas, use_dask=use_dask, **kwargs)
            self.caching_saver(
                ret, args, kwargs.update({"use_pandas": use_pandas, "use_dask": use_dask, "get_obj": get_obj})
            )
            return ret
        elif self.suffix == ".json":
            ret = self.read_json(*args, get_obj=get_obj, **kwargs)
            self.caching_saver(
                ret, args, kwargs.update({"use_pandas": use_pandas, "use_dask": use_dask, "get_obj": get_obj})
            )
            return ret
        elif self.suffix in [".xlsx", ".xls", ".xlsm"]:
            ret = self.read_excel(use_dask=use_dask, **kwargs)
            self.caching_saver(
                ret, args, kwargs.update({"use_pandas": use_pandas, "use_dask": use_dask, "get_obj": get_obj})
            )
            return ret
        else:
            ret = self.read_text(*args, get_obj=get_obj, **kwargs)
            self.caching_saver(
                ret, args, kwargs.update({"use_pandas": use_pandas, "use_dask": use_dask, "get_obj": get_obj})
            )
            return ret

    # noinspection PyUnresolvedReferences
    def write(
        self,
        data: Any,
        *args,
        set_name: str = "data",
        use_pandas: bool = False,
        overwrite: bool = True,
        present: str = "ignore",
        make_parents: bool = False,
        **kwargs,
    ) -> Union[None, "pd.HDFStore", "h5py.File"]:
        """Method used to write the content of the file located at self
        Calls a specific method to write data based on the suffix of self.path:
            1: .csv : will use pandas's to_csv
            2: .parquet : will use pandas's to_parquet with pyarrow engine
            3: .hdf5 or .h5 : will use h5py.File. Since it does not support remote file systems, the file will be
            created localy in a tmp filen written to, then uploaded and removed localy.
            4: .json : will use jsonencoder.JSONEncoder class. Works with DataFrames and np.ndarrays too.
            5: .xlsx : will use pandas's to_excel
            5: any other suffix : uses self.open to write to an IO Buffer
        Parameters
        ----------
        data: Any
            The data to write
        set_name: str
            Name of the dataset to write. Only relevant if using HDF5 (Default value = 'data')
        use_pandas: bool
            Must pass it as True if hdf file must be written using HDFStore and not h5py.File
        overwrite: bool
            If True, any existing file will be overwritten. Only relevant for csv, hdf5 and parquet files,
            since others use the 'open' method, which args already specify what to do (Default value = True).
        present: str
            Indicates what to do if overwrite is False and file is present. Here too, only relevant for csv,
            hsf5 and parquet files.
        make_parents: bool
            If True and if the parent arborescence does not exist, it is created. (Default value = False)
        args:
            any args to pass to the underlying writting method
        kwargs:
            any kwargs to pass to the underlying reading method
        Returns
        -------
        Union[None, pd.HDFStore, h5py.File]
        """
        # Update cache and/or check multiplicity are called inside each specific reading method

        if make_parents and not self.parent.is_dir():
            self.parent.mkdir()

        if self.suffix != ".hdf5" and self.suffix != ".h5" and data is None:
            data = args[0]
            args = args[1:]

        if self.suffix == ".csv":
            ret = self.to_csv(
                data=data,
                overwrite=overwrite,
                present=present,
                **kwargs,
            )
            if ret is not None:
                # To skip the assert at the end of the function. Indeed if something is returned it means we used
                # Dask, which will have written files with a different name than self, so the assert would fail.
                return
        elif self.suffix == ".parquet":
            self.to_parquet(
                data=data,
                overwrite=overwrite,
                present=present,
                **kwargs,
            )
            if "dask" in str(type(data)):
                # noinspection PyUnresolvedReferences
                assert self.with_suffix("").is_dir(exist=True)
                return
        elif self.suffix == ".hdf5" or self.suffix == ".h5":
            ret = self.to_hdf5(
                data=data,
                set_name=set_name,
                use_pandas=use_pandas,
                **kwargs,
            )
            if ret is not None:
                # will not cache the changes for they will happen outside TransparentPath
                return ret
        elif self.suffix == ".json":
            self.to_json(
                data=data,
                overwrite=overwrite,
                present=present,
                **kwargs,
            )
        elif self.suffix == ".txt":
            self.write_stuff(
                *args,
                data=data,
                overwrite=overwrite,
                present=present,
                **kwargs,
            )
        elif self.suffix in [".xlsx", ".xls", ".xlsm"]:
            self.to_excel(
                data=data,
                overwrite=overwrite,
                present=present,
                **kwargs,
            )
        else:
            self.write_bytes(
                *args,
                data=data,
                overwrite=overwrite,
                present=present,
                **kwargs,
            )
        self.update_tpcache(data)
        assert self.is_file()

    def update_tpcache(self, data) -> None:
        if self.enable_caching:
            if self.caching == "ram":
                if self.__hash__() in TransparentPath.cached_data_dict.keys():
                    TransparentPath.cached_data_dict[self.__hash__()]["data"] = data
            elif self.caching == "tmpfile" and self.fs_kind != "local":
                if self.__hash__() in TransparentPath.cached_data_dict.keys():
                    TransparentPath(TransparentPath.cached_data_dict[self.__hash__()]["file"].name, fs="local").write(
                        data
                    )

    @property
    def download(self) -> Union[None, str]:
        """Returns a clickable link to download the file from GCS.

        Returns None if the path does not correspond to an existing file on GCS.
        """
        if self.fs_kind.startswith("gcs") and self.is_file():
            obj = str(self).replace(TransparentPath.remote_prefix, "").replace(" ", "%20")
            return f"https://storage.cloud.google.com/{obj}"
        return None

    @property
    def url(self) -> Union[None, str]:
        """Returns a clickable link to open the path in GCS

        Returns None if the path does not correspond to an existing file or directory.
        """
        obj = str(self).replace(TransparentPath.remote_prefix, "").replace(" ", "%20")
        if self.fs_kind.startswith("gcs"):
            project = self.fs.project
            if self.is_file():
                prefix = "https://console.cloud.google.com/storage/browser/_details/"
                postfix = f";tab=live_object?project={project}"
            elif self.is_dir():
                prefix = "https://console.cloud.google.com/storage/browser/"
                postfix = f";tab=objects?project={project}"
            else:
                return None
        else:
            if not self.exists():
                return None
            return f"file://{obj}"
        return f"{prefix}{obj}{postfix}"

    # READ CSV

    def read_csv(self, *args, **kwargs) -> Any:
        use_dask = False
        if "use_dask" in kwargs:
            use_dask = kwargs["use_dask"]
            del kwargs["use_dask"]
        if use_dask:
            return self.read_csv_dask(*args, **kwargs)
        else:
            return self.read_csv_classic(*args, **kwargs)

    def read_csv_dask(self, *args, **kwargs) -> Any:
        """Overloaded in `transparentpath.io.dask.read_csv`"""
        raise ImportError(errormessage("dask"))

    def read_csv_classic(self, *args, **kwargs) -> Any:
        """Overloaded in `transparentpath.io.pandas.read`"""
        raise ImportError(errormessage("pandas"))

    # READ HDF5

    def read_hdf5(self, *args, **kwargs) -> Any:
        use_dask = False
        if "use_dask" in kwargs:
            use_dask = kwargs["use_dask"]
            del kwargs["use_dask"]
        if use_dask:
            return self.read_hdf5_dask(*args, **kwargs)
        else:
            return self.read_hdf5_classic(*args, **kwargs)

    def read_hdf5_dask(self, *args, **kwargs) -> Any:
        """Overloaded in `transparentpath.io.dask.read_hdf5`"""
        raise ImportError(errormessage("dask,hdf5"))

    def read_hdf5_classic(self, *args, **kwargs) -> Any:
        """Overloaded in `transparentpath.io.hdf5.read`"""
        raise ImportError(errormessage("hdf5"))

    # READ EXCEL

    def read_excel(self, *args, **kwargs) -> Any:
        use_dask = False
        if "use_dask" in kwargs:
            use_dask = kwargs["use_dask"]
            del kwargs["use_dask"]
        if use_dask:
            return self.read_excel_dask(*args, **kwargs)
        else:
            return self.read_excel_classic(*args, **kwargs)

    def read_excel_dask(self, *args, **kwargs) -> Any:
        """Overloaded in `transparentpath.io.dask.read_excel`"""
        raise ImportError(errormessage("dask,excel"))

    def read_excel_classic(self, *args, **kwargs) -> Any:
        """Overloaded in `transparentpath.io.excel.read`"""
        raise ImportError(errormessage("excel"))

    # READ PARQUET

    def read_parquet(self, *args, **kwargs):
        use_dask = False
        if "use_dask" in kwargs:
            use_dask = kwargs["use_dask"]
            del kwargs["use_dask"]
        if use_dask:
            return self.read_parquet_dask(*args, **kwargs)
        else:
            return self.read_parquet_classic(*args, **kwargs)

    def read_parquet_dask(self, *args, **kwargs) -> Any:
        """Overloaded in `transparentpath.io.dask.read_parquet`"""
        raise ImportError(errormessage("dask,parquet"))

    def read_parquet_classic(self, *args, **kwargs) -> Any:
        """Overloaded in `transparentpath.io.parquet.read`"""
        raise ImportError(errormessage("parquet"))

    # READ JSON

    def read_json(self, *args, **kwargs) -> Any:
        """Overloaded in `transparentpath.io.json.read`"""
        raise ImportError(errormessage("json"))

    # WRITE CSV

    def to_csv(self, data, *args, **kwargs):
        if "dask" in str(type(data)):
            return self.to_csv_dask(data, *args, **kwargs)
        else:
            self.to_csv_classic(data, *args, **kwargs)

    def to_csv_classic(self, *args, **kwargs):
        """Overloaded in `transparentpath.io.pandas.write`"""
        raise ImportError(errormessage("pandas"))

    def to_csv_dask(self, *args, **kwargs):
        """Overloaded in `transparentpath.io.dask.write_csv`"""
        raise ImportError(errormessage("dask"))

    # WRITE HDF5

    def to_hdf5(self, data, *args, **kwargs):
        if "dask" in str(type(data)):
            return self.to_hdf5_dask(data, *args, **kwargs)
        else:
            return self.to_hdf5_classic(data, *args, **kwargs)

    def to_hdf5_classic(self, *args, **kwargs):
        """Overloaded in `transparentpath.io.hdf5.write`"""
        raise ImportError(errormessage("hdf5"))

    def to_hdf5_dask(self, *args, **kwargs):
        """Overloaded in `transparentpath.io.dask.write_hdf5`"""
        raise ImportError(errormessage("dask,hdf5"))

    # WRITE EXCEL

    def to_excel(self, data, *args, **kwargs):
        if "dask" in str(type(data)):
            self.to_excel_dask(data, *args, **kwargs)
        else:
            self.to_excel_classic(data, *args, **kwargs)

    def to_excel_classic(self, *args, **kwargs):
        """Overloaded in `transparentpath.io.excel.write`"""
        raise ImportError(errormessage("excel"))

    def to_excel_dask(self, *args, **kwargs):
        """Overloaded in `transparentpath.io.dask.write_excel`"""
        raise ImportError(errormessage("dask,excel"))

    # WRITE PARQUET

    def to_parquet(self, data, *args, **kwargs):
        if "dask" in str(type(data)):
            self.to_parquet_dask(data, *args, **kwargs)
        else:
            self.to_parquet_classic(data, *args, **kwargs)

    def to_parquet_classic(self, *args, **kwargs):
        """Overloaded in `transparentpath.io.parquet.write`"""
        raise ImportError(errormessage("parquet"))

    def to_parquet_dask(self, *args, **kwargs):
        """Overloaded in `transparentpath.io.dask.write_parquet`"""
        raise ImportError(errormessage("dask,parquet"))

    def to_json(self, data, *args, **kwargs):
        """Overloaded in `transparentpath.io.json.write`"""
        raise ImportError(errormessage("json"))

    def to_plotly_json(self):
        """Overloaded in `transparentpath.io.json.to_plotly_json`"""
        raise ImportError(errormessage("json"))


# Do imports from detached files here because some of them import TransparentPath and need it fully declared.

# noinspection PyProtectedMember
from ..io._io import put, get, mv, cp, overload_open, read_text, write_stuff, write_bytes

# noinspection PyUnresolvedReferences, PyProtectedMember
from ..io import _zipfile

overload_open()
setattr(TransparentPath, "put", put)
setattr(TransparentPath, "get", get)
setattr(TransparentPath, "mv", mv)
setattr(TransparentPath, "cp", cp)
setattr(TransparentPath, "read_text", read_text)
setattr(TransparentPath, "write_stuff", write_stuff)
setattr(TransparentPath, "write_bytes", write_bytes)

try:
    # noinspection PyUnresolvedReferences,PyProtectedMember
    from transparentpath.io._joblib_load import overload_joblib_load

    overload_joblib_load()
except ImportError:
    pass

try:
    # noinspection PyUnresolvedReferences,PyProtectedMember
    from transparentpath.io._json import read, write, to_plotly_json

    setattr(TransparentPath, "read_json", read)
    setattr(TransparentPath, "to_json", write)
    setattr(TransparentPath, "to_plotly_json", to_plotly_json)
except ImportError:
    pass

try:
    # noinspection PyUnresolvedReferences,PyProtectedMember
    from ..io._pandas import read, write

    setattr(TransparentPath, "read_csv_classic", read)
    setattr(TransparentPath, "to_csv_classic", write)
except ImportError:
    pass

try:
    # noinspection PyUnresolvedReferences,PyProtectedMember
    from ..io._hdf5 import read, write

    setattr(TransparentPath, "read_hdf5_classic", read)
    setattr(TransparentPath, "to_hdf5_classic", write)
except ImportError:
    pass

try:
    # noinspection PyUnresolvedReferences,PyProtectedMember
    from ..io._parquet import read, write

    setattr(TransparentPath, "read_parquet_classic", read)
    setattr(TransparentPath, "to_parquet_classic", write)
except ImportError:
    pass

try:
    # noinspection PyUnresolvedReferences,PyProtectedMember
    from ..io._excel import read, write

    setattr(TransparentPath, "read_excel_classic", read)
    setattr(TransparentPath, "to_excel_classic", write)
except ImportError:
    pass

try:
    # noinspection PyProtectedMember
    from ..io._dask import (
        read_csv,
        write_csv,
        read_hdf5,
        write_hdf5,
        read_excel,
        write_excel,
        read_parquet,
        write_parquet,
    )

    setattr(TransparentPath, "read_csv_dask", read_csv)
    setattr(TransparentPath, "to_csv_dask", write_csv)
    setattr(TransparentPath, "read_hdf5_dask", read_hdf5)
    setattr(TransparentPath, "to_hdf5_dask", write_hdf5)
    setattr(TransparentPath, "read_excel_dask", read_excel)
    setattr(TransparentPath, "to_excel_dask", write_excel)
    setattr(TransparentPath, "read_parquet_dask", read_parquet)
    setattr(TransparentPath, "to_parquet_dask", write_parquet)
except ImportError:
    pass

Functions

def check_bucket(bucket: Optional[str]) ‑> Optional[str]

Check that the bucket exists in an initiated file system and returns the corresponding file system's name, or raises NotADirectoryError.

Expand source code
def check_bucket(bucket: Union[str, None]) -> Union[str, None]:
    """Check that the bucket exists in an initiated file system and returns the corresponding file system's name,
    or raises NotADirectoryError."""
    if bucket is None:
        return None
    bucket = str(bucket)
    if not bucket.endswith("/"):
        bucket += "/"
    fs = None
    for proj in TransparentPath.buckets_in_project:
        if bucket in TransparentPath.buckets_in_project[proj]:
            fs = proj
            break
    return fs
def check_kwargs(method: Callable, kwargs: dict)

Takes as argument a method and some kwargs. Will look in the method signature and return in two separate dict the kwargs that are in the signature and those that are not.

If the method does not return any signature or if it explicitely accepts **kwargs, does not do anything

Expand source code
def check_kwargs(method: Callable, kwargs: dict):
    """Takes as argument a method and some kwargs. Will look in the method signature and return in two separate dict
    the kwargs that are in the signature and those that are not.

    If the method does not return any signature or if it explicitely accepts **kwargs, does not do anything
    """
    unexpected_kwargs = []
    s = ""
    try:
        sig = signature(method)
        if "kwargs" in sig.parameters or "kwds" in sig.parameters:
            return
        for arg in kwargs:
            if arg not in sig.parameters:
                unexpected_kwargs.append(f"{arg}={kwargs[arg]}")

        if len(unexpected_kwargs) > 0:
            s = f"You provided unexpected kwargs for method {method.__name__}:"
            s = "\n  - ".join([s] + unexpected_kwargs)
    except ValueError:
        return

    if s != "":
        raise ValueError(s)
def collapse_ddots(path: Union[pathlib.Path, TransparentPath, str]) ‑> TransparentPath

Collapses the double-dots (..) in the path

Parameters

path : Union[Path, TransparentPath, str]
The path containing double-dots

Returns

TransparentPath
The collapsed path.
Expand source code
def collapse_ddots(path: Union[Path, TransparentPath, str]) -> TransparentPath:
    """Collapses the double-dots (..) in the path

    Parameters
    ----------
    path: Union[Path, TransparentPath, str]
        The path containing double-dots

    Returns
    -------
    TransparentPath
        The collapsed path.

    """
    # noinspection PyUnresolvedReferences
    thetype = path.fs_kind if type(path) == TransparentPath else None
    # noinspection PyUnresolvedReferences
    thebucket = path.bucket if type(path) == TransparentPath else None
    # noinspection PyUnresolvedReferences
    notupdatecache = path.notupdatecache if type(path) == TransparentPath else None
    # noinspection PyUnresolvedReferences
    when_checked = path.when_checked if type(path) == TransparentPath else None
    # noinspection PyUnresolvedReferences
    when_updated = path.when_updated if type(path) == TransparentPath else None
    # noinspection PyUnresolvedReferences
    update_expire = path.update_expire if type(path) == TransparentPath else None
    # noinspection PyUnresolvedReferences
    check_expire = path.check_expire if type(path) == TransparentPath else None
    # noinspection PyUnresolvedReferences

    newpath = Path(path) if type(path) == str else path

    if str(newpath) == ".." or str(newpath) == "/..":
        raise ValueError("Can not go before root")

    while ".." in newpath.parts:
        # noinspection PyUnresolvedReferences
        newnewpath = Path(newpath.parts[0])
        for part in newpath.parts[1:]:
            if part == "..":
                newnewpath = newnewpath.parent
            else:
                newnewpath /= part
        newpath = newnewpath

    if str(newpath) == str(path):
        return path
    return (
        TransparentPath(
            newpath,
            collapse=False,
            nocheck=True,
            fs=thetype,
            bucket=thebucket,
            notupdatecache=notupdatecache,
            when_checked=when_checked,
            when_updated=when_updated,
            update_expire=update_expire,
            check_expire=check_expire,
        )
        if thetype is not None
        else newpath
    )
def errorfunction(which) ‑> Callable
Expand source code
def errorfunction(which) -> Callable:
    # noinspection PyUnusedLocal
    def _errorfunction(*args, **kwargs):
        raise ImportError(errormessage(which))

    return _errorfunction
def errormessage(which) ‑> str
Expand source code
def errormessage(which) -> str:
    return (
        f"Support for {which} does not seem to be installed for TransparentPath.\n"
        f"You can change that by running 'pip install transparentpath[{which}]'."
    )
def extract_fs_name(token: str = None) ‑> Tuple[str, str, Optional[str]]
Expand source code
def extract_fs_name(token: str = None) -> Tuple[str, str, Union[str, None]]:
    if token is None and "GOOGLE_APPLICATION_CREDENTIALS" not in os.environ:
        fs = gcsfs.GCSFileSystem()
        project = fs.project
        if (
            project is None
            or fs.credentials is None
            or not hasattr(fs.credentials.credentials, "service_account_email")
            or fs.credentials.credentials.service_account_email is None
        ):
            raise EnvironmentError(
                "If no token is explicitely specified and GOOGLE_APPLICATION_CREDENTIALS environnement variable is not"
                " set, you need to have done gcloud init or to be on GCP already to create a TransparentPath"
            )
        email = fs.credentials.credentials.service_account_email
        return f"gcs_{project}_{email}", project, None
    elif token is None:
        token = os.getenv("GOOGLE_APPLICATION_CREDENTIALS")

    token = token.strip()
    if not TransparentPath(token, fs="local", nocheck=True, notupdatecache=True).is_file():
        raise FileNotFoundError(f"Credential file {token} not found")
    content = json.load(open(token))
    if "project_id" not in content:
        raise ValueError(f"Credential file {token} does not contain project_id key.")
    if "client_email" not in content:
        raise ValueError(f"Credential file {token} does not contain client_email key.")

    fs_name = f"gcs_{content['project_id']}_{content['client_email']}"
    TransparentPath.tokens[fs_name] = token
    return fs_name, content["project_id"], token
def get_buckets(fs: gcsfs.core.GCSFileSystem) ‑> List[str]

Return list of all buckets in the file system.

Expand source code
def get_buckets(fs: gcsfs.GCSFileSystem) -> List[str]:
    """Return list of all buckets in the file system."""
    if "" not in fs.dircache:
        items = []
        page = fs.call("GET", "b/", project=fs.project, json_out=True)

        assert page["kind"] == "storage#buckets"
        items.extend(page.get("items", []))
        next_page_token = page.get("nextPageToken", None)

        while next_page_token is not None:
            page = fs.call(
                "GET",
                "b/",
                project=fs.project,
                pageToken=next_page_token,
                json_out=True,
            )

            assert page["kind"] == "storage#buckets"
            items.extend(page.get("items", []))
            next_page_token = page.get("nextPageToken", None)
        fs.dircache[""] = [{"name": i["name"] + "/", "size": 0, "type": "directory"} for i in items]
    return [b["name"] for b in fs.dircache[""]]
def get_fs(fs_kind: str, bucket: Optional[str] = None, token: Union[str, dict, None] = None, path: Optional[pathlib.Path] = None) ‑> Tuple[Union[gcsfs.core.GCSFileSystem, fsspec.implementations.local.LocalFileSystem], str, str]

Gets the FileSystem object of either gcs or local (Default)

If GCS is asked and bucket is specified, will check that it exists and is accessible.

Parameters

fs_kind : str
Returns GCSFileSystem if 'gcs_*', LocalFilsSystem if 'local'.
bucket : str
bucket name for GCS
token : Optional[Union[str, dict]]
credentials (default value = None)
path : Pathlib.Path
Only relevant if the method was called from TransparentPath.init() : will attempts to fetch the bucket from the path if bucket is not given

Returns

Tuple[Union[gcsfs.GCSFileSystem, LocalFileSystem], Union[None, str], Union[None, str], Union[None, str]]
The FileSystem object, the project if on GCS else None, and the bucket if on GCS.
Expand source code
def get_fs(
    fs_kind: str,
    bucket: Union[str, None] = None,
    token: Optional[Union[str, dict]] = None,
    path: Union[Path, None] = None,
) -> Tuple[Union[gcsfs.GCSFileSystem, LocalFileSystem], str, str]:
    """Gets the FileSystem object of either gcs or local (Default)

    If GCS is asked and bucket is specified, will check that it exists and is accessible.

    Parameters
    ----------
    fs_kind: str
        Returns GCSFileSystem if 'gcs_*', LocalFilsSystem if 'local'.
    bucket: str
        bucket name for GCS
    token: Optional[Union[str, dict]]
        credentials (default value = None)
    path: Pathlib.Path
        Only relevant if the method was called from TransparentPath.__init__() : will attempts to fetch the bucket
        from the path if bucket is not given

    Returns
    -------
    Tuple[Union[gcsfs.GCSFileSystem, LocalFileSystem], Union[None, str], Union[None, str], Union[None, str]]
        The FileSystem object, the project if on GCS else None, and the bucket if on GCS.
    """

    if fs_kind is None:
        fs_kind = ""
    if fs_kind == "" and token is not None:
        fs_kind = "gcs"
    fs_name = None
    if fs_kind == "local":
        bucket = None

    if path is not None and fs_kind != "local":
        # Called from TransparentPath.__init__()
        if bucket is not None:
            fs_name = check_bucket(bucket)
        if bucket is None and len(path.parts) > 0:
            bucket = path.parts[0]
            fs_name = check_bucket(bucket)
            if fs_name is None:
                bucket = None
        if bucket is None:
            bucket = TransparentPath.bucket
            fs_name = check_bucket(bucket)

        if fs_name is not None:
            return copy(TransparentPath.fss[fs_name]), fs_name, bucket

    if "gcs" in fs_kind or token is not None:

        # If bucket is specified, get the filesystem that contains it if it already exists. Else, create the filesystem.
        if bucket is not None:
            fs_name = check_bucket(bucket)
            if fs_name is not None:
                fs = copy(TransparentPath.fss[fs_name])
                return fs, fs_name, ""

        fs_name, project, token = extract_fs_name(token)
        if fs_name in TransparentPath.fss:
            pass
        elif token is None:
            fs = gcsfs.GCSFileSystem(project=project, asynchronous=False)
            TransparentPath.buckets_in_project[fs_name] = get_buckets(fs)
            TransparentPath.fss[fs_name] = fs
        else:
            fs = gcsfs.GCSFileSystem(project=project, asynchronous=False, token=token)
            TransparentPath.buckets_in_project[fs_name] = get_buckets(fs)
            TransparentPath.fss[fs_name] = fs

        ret_bucket = False
        if bucket is None and path is not None and len(path.parts) > 0:
            bucket = path.parts[0]
            ret_bucket = True
        if bucket is not None:
            if not bucket.endswith("/"):
                bucket += "/"
            if bucket not in TransparentPath.buckets_in_project[fs_name]:
                raise NotADirectoryError(f"Bucket {bucket} does not exist in any loaded projects")

        fs = copy(TransparentPath.fss[fs_name])
        if ret_bucket:
            return fs, fs_name, bucket
        else:
            return fs, fs_name, ""
    else:
        if "local" not in TransparentPath.fss:
            TransparentPath.fss["local"] = LocalFileSystem()
        return copy(TransparentPath.fss["local"]), "local", ""
def get_index_and_date_from_kwargs(**kwargs: dict) ‑> Tuple[int, bool, dict]
Expand source code
def get_index_and_date_from_kwargs(**kwargs: dict) -> Tuple[int, bool, dict]:
    index_col = kwargs.get("index_col", None)
    parse_dates = kwargs.get("parse_dates", None)
    if index_col is not None:
        del kwargs["index_col"]
    if parse_dates is not None:
        del kwargs["parse_dates"]
    # noinspection PyTypeChecker
    return index_col, parse_dates, kwargs
def myisinstance(obj1: Any, obj2) ‑> bool

Will return True when testing whether a TransparentPath is a str (required to use open(TransparentPath())) and False when testing whether a pathlib.Path is a TransparentPath.

Expand source code
def myisinstance(obj1: Any, obj2) -> bool:
    """Will return True when testing whether a TransparentPath is a str (required to use open(TransparentPath()))
    and False when testing whether a pathlib.Path is a TransparentPath."""

    if not (builtins_isinstance(obj2, list) or builtins_isinstance(obj2, set) or builtins_isinstance(obj2, tuple)):
        return mysmallisinstance(obj1, obj2)
    else:
        is_instance = False
        for _type in obj2:
            is_instance |= mysmallisinstance(obj1, _type)
        return is_instance
def mysmallisinstance(obj1: Any, obj2) ‑> bool

Will return True when testing whether a TransparentPath is a str (required to use open(TransparentPath())) or a TransparentPath, and False in every other cases (even pathlib.Path).

Expand source code
def mysmallisinstance(obj1: Any, obj2) -> bool:
    """Will return True when testing whether a TransparentPath is a str (required to use open(TransparentPath()))
    or a TransparentPath, and False in every other cases (even pathlib.Path)."""

    if type(obj1) == TransparentPath:
        if obj2 == TransparentPath or obj2 == str:
            return True
        else:
            return False

    if obj2 == TransparentPath:
        if type(obj1) == TransparentPath:
            return True
        else:
            return False

    return builtins_isinstance(obj1, obj2)
def treat_remote_prefix(path: Union[pathlib.Path, TransparentPath, str], bucket: str) ‑> Tuple[str, str]
Expand source code
def treat_remote_prefix(path: Union[Path, TransparentPath, str], bucket: str) -> Tuple[str, str]:
    splitted = str(path).split(TransparentPath.remote_prefix)
    if len(splitted) == 0:
        if bucket is None and TransparentPath.bucket is None:
            raise ValueError(
                "If using a path starting with 'gs://', you must include the bucket name in it unless it"
                "is specified with bucket= or if TransparentPath already has been set to use a specified bucket"
                "with set_global_fs"
            )
        path = str(path).replace(TransparentPath.remote_prefix, "", 1)

    else:
        bucket_from_path = splitted[1].split("/")[0]
        if bucket is not None:
            if bucket != bucket_from_path:
                raise ValueError(
                    f"Bucket name {bucket_from_path} was found in your path name, but it does "
                    f"not match the bucket name you specified with bucket={bucket}"
                )
        else:
            bucket = bucket_from_path

        path = str(path).replace(TransparentPath.remote_prefix, "", 1)
        if path.startswith(bucket_from_path) or (len(path) > 0 and path[1:].startswith(bucket_from_path)):
            path = path.replace(bucket_from_path, "", 1)
        if path.startswith("/"):
            path = path[1:]
    return path, bucket

Classes

class TPCachingWarning (message: str = '')

Base class for warning categories.

Expand source code
class TPCachingWarning(Warning):
    def __init__(self, message: str = ""):
        self.message = message
        super().__init__(self.message)

Ancestors

  • builtins.Warning
  • builtins.Exception
  • builtins.BaseException
class TPMultipleExistenceError (path, ls)

Exception raised when a path's destination already contain more than one element.

Expand source code
class TPMultipleExistenceError(Exception):
    """Exception raised when a path's destination already contain more than
    one element.
    """

    def __init__(self, path, ls):
        self.path = path
        self.ls = ls
        self.message = (
            f"Error in TransparentPath: Multiple objects exist at path {path}.\nHere is the output of ls in the "
            f"parent directory:\n {self.ls}"
        )
        super().__init__(self.message)

    def __str__(self):
        return self.message

Ancestors

  • builtins.Exception
  • builtins.BaseException
class TransparentPath (path: Union[pathlib.Path, TransparentPath, str] = '.', collapse: bool = True, fs: Optional[str] = '', bucket: Optional[str] = None, token: Union[str, dict, None] = None, nocheck: Optional[bool] = None, notupdatecache: Optional[bool] = None, update_expire: Optional[int] = None, check_expire: Optional[int] = None, when_checked: Optional[dict] = None, when_updated: Optional[dict] = None, enable_caching: bool = False, **kwargs)

A class that allows one to use a path in a local file system or a Google Cloud Storage (GCS) file system in the same way one would use a pathlib.Path object. One can use many different GCP projects at once.

Create a path that points to GCS, and one that does not:

>>> from transparentpath import Path
>>> # Or : from transparentpath import TransparentPath
>>> p = Path("gs://mybucket/some_path", token="some/cred/file.json")
>>> p2 = p / "foo"  # Will point to gs://mybucket/some_path/foo
>>> p3 = Path("bar")  # Will point to local path "bar"

Set all paths to point to GCS by default:

>>> from transparentpath import Path
>>> Path.set_global_fs("gcs", token="some/cred/file.json")
>>> p = Path("mybucket") / "some_path" # Will point to gs://mybucket/some_path
>>> p2 = p / "foo"  # Will point to gs://mybucket/some_path/foo
>>> p3 = Path("bar", fs="local")  # Will point to local path "bar"
>>> p4 = Path("other_bucket")  # Will point to gs://other_bucket (assuming other_bucket is a bucket)
>>> p5 = Path("not_a_bucket")  # Will point to local path "not_a_bucket" (assuming it is indeed, not a bucket)

Set all paths to point to severral GCS projects by default:

>>> from transparentpath import Path
>>> Path.set_global_fs("gcs", token="some/cred/file.json")
>>> Path.set_global_fs("gcs", token="some/other/cred/file.json")
>>> p = Path("mybucket") / "some_path" # Will point to gs://mybucket/some_path
>>> p2 = p / "foo"  # Will point to gs://mybucket/some_path/foo
>>> p3 = Path("bar", fs="local")  # Will point to local path "bar"
>>> p4 = Path("other_bucket")  # Will point to gs://other_bucket (assuming other_bucket is a bucket)
>>> p5 = Path("not_a_bucket")  # Will point to local path "not_a_bucket" (assuming it is indeed, not a bucket)

Here, mybucket and other_bucket can be on two different projects, as long as at least one of the credential files can access them.

Set all paths to point to GCS by default, and specify a default bucket:

>>> from transparentpath import Path
>>> Path.set_global_fs("gcs", bucket="mybucket", token="some/cred/file.json")
>>> p = Path("some_path")  # Will point to gs://mybucket/some_path/
>>> p2 = p / "foo"  # Will point to gs://mybucket/some_path/foo
>>> p3 = Path("bar", fs="local")  # Will point to local path "bar"
>>> p4 = Path("other_bucket")  # Will point to gs://mybucket/other_bucket
>>> p5 = Path("not_a_bucket")  # Will point to gs://mybucket/not_a_bucket

The latest option is interesting if you have a code that should be able to run with paths being sometimes remote, sometimes local. To do that, you can use the class attribute nas_dir. Then when a path is created, if it starts by nas_dir's path, nas_dir's path is replaced by the bucket name. This is useful if, for instance, you have a backup of a bucket locally at let's say /my/local/backup. Then you can do:

>>> from transparentpath import Path
>>> Path.nas_dir = "/my/local/backup"
>>> Path.set_global_fs("gcs", bucket="mybucket", token="some/cred/file.json")
>>> p = Path("some_path")  # Will point to gs://mybucket/some_path/
>>> p3 = Path("/my/local/backup") / "some_path"  # Will ALSO point to gs://mybucket/some_path/
>>> from transparentpath import Path
>>> Path.nas_dir = "/my/local/backup"
>>> # Path.set_global_fs("gcs", bucket="mybucket", token="some/cred/file.json")
>>> p = Path("some_path")  # Will point to /my/local/backup/some_path/
>>> p3 = Path("/my/local/backup") / "some_path"  # Will ALSO point to /my/local/backup/some_path/

In all the previous examples, the token argument can be ommited if the environment variable GOOGLE_APPLICATION_CREDENTIALS is set and point to a .json credential file, or if your code runs on a GCP machine (VM, cluster…) with access to GCS.

No matter whether you are using GCS or your local file system, here is a sample of what TransparentPath can do:

>>> from transparentpath import Path
>>> # Path.set_global_fs("gcs", bucket="bucket_name", project="project_name")
>>> # The following lines will also work with the previous line uncommented
>>>
>>> # Reading a csv into a pandas' DataFrame and saving it as a parquet file
>>> mypath = Path("foo") / "bar.csv"
>>> df = mypath.read(index_col=0, parse_dates=True)
>>> otherpath = mypath.with_suffix(".parquet")
>>> otherpath.write(df)
>>>
>>> # Reading and writing a HDF5 file works on GCS and on local:
>>> import numpy as np
>>> mypath = Path("foo") / "bar.hdf5"  # can be .h5 too
>>> with mypath.read() as ifile:
>>>     arr = np.array(ifile["store1"])
>>>
>>> # Doing '..' from 'foo/bar.hdf5' will return 'foo'
>>> # Then doing 'foo' + 'babar.hdf5' will return 'foo/babar.hdf5' ('+' and '/' are synonymes)
>>> mypath.cd("..")  # Does not return a path but modifies inplace
>>> with (mypath  + "babar.hdf5").write(None) as ofile:
>>>     # Note here that we must explicitely give 'None' to the 'write' method in order for it
>>>     # to return the open HDF5 file. We could also give a dict of {arr: "store1"} to directly
>>>     # write the file.
>>>     ofile["store1"] = arr
>>>
>>>
>>> # Reading a text file. Can also use 'w', 'a', etc... also works with binaries.
>>> mypath = Path("foo") / "bar.txt"
>>> with open(mypath, "r") as ifile:
>>>     lines = ifile.readlines()
>>>
>>> # open is overriden to understand gs://
>>> with open("gs://bucket/file.txt", "r") as ifile:
>>>     _ = ifile.readlines()
>>>
>>> mypath.is_file()
>>> mypath.is_dir()
>>> mypath.is_file()
>>> files = mypath.parent.glob("*.csv")  # Returns a Iterator[TransparentPath], can be casted to list

As you can see from the previous example, all methods returning a path from a TransparentPath return a TransparentPath.

TransparentPath supports writing and reading Dask dataframes from and to csv, excel, parquet and HDF5, both locally and remotely. You need to have dask-dataframe and dask-distributed installed, which will be the case if you ran pip install transparentpath[dask]. Writing Dask dataframes does not require any additionnal arguments to be passed for the type will be checked before calling the appropriate writting method. Reading however requires you to pass the use_dask argument to the TransparentPath.read() method. If the file to read is HDF5, you will also need to specify set_names, matching the argument key of Dask's read_hdf() method.

Note that if reading a remote HDF5, the file will be downloaded in your local tmp, then read. If not using Dask, the file is deleted after being read. But since Dask uses delayed processes, deleting the file might occure before the file is actually read, so the file is kept. Up to you to empty your /tmp directory if it is not done automatically by your system.

All instances of TransparentPath are absolute, even if created with relative paths.

TransparentPaths are seen as instances of str:

>>> from transparentpath import Path
>>> path = Path()
>>> isinstance(path, str)  # returns True

This is required to allow

>>> from transparentpath import Path
>>> path = Path()
>>> # noinspection PyTypeChecker
>>> with open(path, "w/r/a/b...") as ifile:
>>> ...
to work. If you want to check whether path is actually a TransparentPath and nothing else, use
>>> from transparentpath import Path
>>> path = Path()
>>> assert type(path) == Path
>>> assert issubclass(path.__class__, Path)
instead.

Any method or attribute valid in fsspec.implementations.local.LocalFileSystem, gcs.GCSFileSystem, pathlib.Path or str can be used on a TransparentPath object.

Warnings about GCS behaviour if you use GCS:

  1. Remember that directories are not a thing on GCS.

  2. You do not need the parent directories of a file on GCS to create the file : they will be created if they do not exist (that is not true localy however).

  3. If you delete a file that was alone in its parent directories, those directories disapear.

  4. If a file exists at the same path than a directory, then TransparentPath is not able to know which one is the file and which one is the directory, and will raise a TPMultipleExistenceError upon object creation. This check for multiplicity is done at almost every method in case an exterior source created a duplicate of the file/directory. This case can't happen locally. However, it can happen on remote if the cache is not updated frequently. Doing this check can significantly increase computation time (if using glob on a directory containing a lot of files for example). You can deactivate it either globally (TransparentPath._do_check = False and TransparentPath._do_update_cache = False), for a specific path (pass nockeck=True at path creation), or for glob and ls by passing fast=True as additional argument.

TransparentPath on GCS is slow because of the verification for multiple existance and the cache updating. However one can tweak those a bit. As mentionned earlier, cache updating and multiple existence check can be deactivated for all paths by doing

>>> from transparentpath import TransparentPath
>>> TransparentPath._do_update_cache = False
>>> TransparentPath._do_check = False

They can also be deactivated for one path only by doing

>>> p = TransparentPath("somepath", nocheck=True, notupdatecache=True)

It is also possible to specify when to do those check : at path creation, path usage (read, write, exists…) or both. Here to it can be set on all paths or only some :

>>> TransparentPath._when_checked = {"created": True, "used": False}  # Default value
>>> TransparentPath._when_updated = {"created": True, "used": False}  # Default value
>>> p = TransparentPath(
>>>   "somepath", when_checked={"created": False, "used": False}, notupdatecache={"created": False, "used": False}
>>> )

There is also an expiration time in seconds for check and update : the operation is not done if it was done not a long time ago. Those expiration times are of 1 second by default and can be changed through :

>>> TransparentPath._check_expire = 10
>>> TransparentPath._update_expire = 10
>>> p = TransparentPath("somepath", check_expire=0, update_expire=0)


<code><a title="transparentpath.gcsutils.transparentpath.TransparentPath.glob" href="#transparentpath.gcsutils.transparentpath.TransparentPath.glob">TransparentPath.glob()</a></code> and
<code><a title="transparentpath.gcsutils.transparentpath.TransparentPath.ls" href="#transparentpath.gcsutils.transparentpath.TransparentPath.ls">TransparentPath.ls()</a></code> have their own way to be accelerated :
```python-repl
>>> p.glob("/*", fast=True)
>>> p.ls("", fast=True)
Basically, *fast=True* means "do not check and do not update the cache" for all the items found by the method.

Builtin open is overloaded by TransparentPath to support giving a TransparentPath to it. If a method in a package you did not create uses open in a with statement, everything should work out of the box with a TransparentPath.

However, if it uses the output of open, you will have to create a class to override this method and anything using its ouput. Indeed, open returns a file descriptor, not an IO, and I did not find a way to access file descriptors on gcs. For example, in the FileLock package, the acquire method calls the _acquire method which calls os.open, so I had to do that:

>>> from filelock import FileLock
>>> from transparentpath import Path
>>>
>>> class MyFileLock(FileLock):
>>>     def _acquire(self):
>>>         tmp_lock_file = self._lock_file
>>>         if not type(tmp_lock_file) == Path:
>>>             tmp_lock_file = Path(tmp_lock_file)
>>>         try:
>>>             fd = tmp_lock_file.open("x")
>>>         except (IOError, OSError, FileExistsError):
>>>             pass
>>>         else:
>>>             self._lock_file_fd = fd
>>>         return None

The original method was:

>>> import os
>>> ...
>>> def _acquire(self):
>>>     open_mode = os.O_WRONLY | os.O_CREAT | os.O_EXCL | os.O_TRUNC
>>>     try:
>>>         fd = os.open(self._lock_file, open_mode)
>>>     except (IOError, OSError):
>>>         pass
>>>     else:
>>>         self._lock_file_fd = fd
>>>     return None
>>> ...

I tried to implement a working version of any method valid in pathlib.Path or in file systems, but futur changes in any of those will not be taken into account quickly. You can report missing supports by opening an issue.

Creator of the TranparentPath object

Parameters

path : Union[pathlib.Path, TransparentPath, str]
The path of the object (Default value = '.')
collapse : bool
If True, will collapse any double dots ('..') in path. (Default value = True)
fs : Optional[str]
The file system to use, 'local' or 'gcs'. If None, uses the default one set by TransparentPath.set_global_fs() if any, or 'local' (Default = None)
bucket : Optional[str]
The bucket name if using GCS and if path is not 'gs://bucket/…'
token : Optional[Union[dict, str]]
The path to google application credentials json file to use, if envvar GOOGLE_APPLICATION_CREDENTIALS is not set and the code is not running on a GCP machine.
nocheck : bool
If True, will not call check_multiplicity (quicker but less secure). Takes the value of not Transparentpath._do_check if None (Default value = None)
notupdatecache : bool
If True, will not call _invalidate_cache when doing operations on this path (quicker but less secure). Takes the value of not Transparentpath._do_update_cache if None (Default value = None)
update_expire : Optional[int]
Time in second after which the cache is considered obsolete and must be updated. Takes the value of Transparentpath._update_expire if None (Default value = None)
check_expire : Optional[int]
Time in second after which the check for multiple existence is considered obsolete and must be updated. Takes the value of Transparentpath._check_expire if None (Default value = None)
when_checked : Optional[dict]
Dict of the form {"used: True, "created": True}, that indicates when to check multiplicity of the path. Takes the value of Transparentpath._when_checked if None (Default value = None)
when_updated : Optional[dict]
Same as when_checked but for cache update.
enable_caching : bool
If True, will enable file caching, meaning file content read from a transparentpath will be saved in tmp for quicker access later in the code. Default is False.

kwargs: Any optional kwargs valid for pathlib.Path

Expand source code
class TransparentPath:
    def __fspath__(self) -> str:
        """Implemented later"""
        pass

Ancestors

  • os.PathLike
  • abc.ABC

Class variables

var LOCAL_SEP
var bucket
var buckets_in_project
var cached_data_dict
var caching : str
var caching_max_memory
var cli
var cwd
var fs_kind
var fss
var method_path_concat
var method_without_self_path
var nas_dir
var remote_prefix
var tokens
var translations
var unset
var used_memory

Static methods

def get_state() ‑> dict

Returns the state of the TransparentPath class in a dictionnary

Expand source code
@classmethod
def get_state(cls) -> dict:
    """Returns the state of the TransparentPath class in a dictionnary"""
    state = {
        "remote_prefix": cls.remote_prefix,
        "fss": cls.fss,
        "buckets_in_project": cls.buckets_in_project,
        "fs_kind": cls.fs_kind,
        "bucket": cls.bucket,
        "nas_dir": cls.nas_dir,
        "unset": cls.unset,
        "cwd": cls.cwd,
        "tokens": cls.tokens,
        "_do_update_cache": cls._do_update_cache,
        "_do_check": cls._do_check,
        "_check_expire": cls._check_expire,
        "_update_expire": cls._update_expire,
        "_when_checked": cls._when_checked,
        "_when_updated": cls._when_updated,
        "LOCAL_SEP": cls.LOCAL_SEP,
        "cached_data_dict: ": cls.cached_data_dict,
        "used_memory: ": cls.used_memory,
        "caching: ": cls.caching,
        "caching_max_memory: ": cls.caching_max_memory,
    }
    return state
def reinit()

Reinit all class attributes to their default values

Expand source code
@classmethod
def reinit(cls):
    """Reinit all class attributes to their default values"""
    cls.remote_prefix = "gs://"
    cls.fss = {}
    cls.buckets_in_project = {}
    cls.fs_kind = None
    cls.bucket = None
    cls.nas_dir = None
    cls.unset = True
    cls.cwd = os.getcwd()
    cls.tokens = {}
    cls._do_update_cache = True
    cls._do_check = True
    cls._check_expire = 1
    cls._update_expire = 1
    cls._when_checked = {"used": False, "created": True}
    cls._when_updated = {"used": False, "created": True}
    cls.LOCAL_SEP = os.path.sep
    cls.cached_data_dict = collections.OrderedDict()
    cls.used_memory = 0
    cls.caching = "None"
    cls.caching_max_memory = 100
def set_global_fs(fs: str, bucket: Optional[str] = None, nas_dir: Union[TransparentPath, pathlib.Path, str, None] = None, token: Union[str, dict, None] = None) ‑> None

To call before creating any instance to set the file system.

If not called, default file system is local. If the first parameter is 'local', the file system is local. If the first parameter is 'gcs', file system is GCS.

Parameters

fs : str
'gcs' will use GCSFileSystem, 'local' will use LocalFileSystem
bucket : str
The bucket name, only valid if using gcs (Default value = None)
nas_dir : Union[TransparentPath, Path, str]
If specified, TransparentPath will delete any occurence of 'nas_dir' at the beginning of created paths if fs is gcs (Default value = None).
token : Optional[Union[dict, str]]
credentials (default value = None). If not specified, will use envvar GOOGLE_APPLICATION_CREDENTIALS. If not specified either, will try to log with default account, which will work is using a machine on GCP (VM, cluster…)

Returns

None
 
Expand source code
@classmethod
def set_global_fs(
    cls,
    fs: str,
    bucket: Union[str, None] = None,
    nas_dir: Optional[Union[TransparentPath, Path, str]] = None,
    token: Optional[Union[dict, str]] = None,
) -> None:
    """To call before creating any instance to set the file system.

    If not called, default file system is local. If the first parameter is 'local', the file system is local. If
    the first parameter is 'gcs', file system is GCS.

    Parameters
    ----------
    fs: str
        'gcs' will use GCSFileSystem, 'local' will use LocalFileSystem
    bucket: str
        The bucket name, only valid if using gcs (Default value =  None)
    nas_dir: Union[TransparentPath, Path, str]
        If specified, TransparentPath will delete any occurence of 'nas_dir' at the beginning of created paths if fs
        is gcs (Default value = None).
    token: Optional[Union[dict, str]]
        credentials (default value = None). If not specified, will use envvar GOOGLE_APPLICATION_CREDENTIALS. If not
        specified either, will try to log with default account, which will work is using a machine on GCP
        (VM, cluster...)

    Returns
    -------
    None
    """
    if "gcs" not in fs and fs != "local":
        raise ValueError(f"Unknown value {fs} for parameter 'fs'")

    cls.fs_kind = fs
    cls.bucket = bucket

    TransparentPath._set_nas_dir(cls, nas_dir)
    get_fs(cls.fs_kind, cls.bucket, token)
    TransparentPath.unset = False
def show_state()

Prints the state of the TransparentPath class

Expand source code
@classmethod
def show_state(cls):
    """Prints the state of the TransparentPath class"""
    print("remote_prefix: ", cls.remote_prefix)
    print("fss: ", cls.fss)
    print("buckets_in_project: ", cls.buckets_in_project)
    print("fs_kind: ", cls.fs_kind)
    print("bucket: ", cls.bucket)
    print("nas_dir: ", cls.nas_dir)
    print("unset: ", cls.unset)
    print("cwd: ", cls.cwd)
    print("tokens: ", cls.tokens)
    print("_do_update_cache: ", cls._do_update_cache)
    print("_do_check: ", cls._do_check)
    print("_check_expire: ", cls._check_expire)
    print("_update_expire: ", cls._update_expire)
    print("_when_updated: ", cls._when_updated)
    print("LOCAL_SEP: ", cls.LOCAL_SEP)
    print("cached_data_dict: ", cls.cached_data_dict)
    print("used_memory: ", cls.used_memory)
    print("caching: ", cls.caching)
    print("caching_max_memory: ", cls.caching_max_memory)

Instance variables

var absoluteTransparentPath

Returns self, since all TransparentPaths are absolute

Returns

TransparentPath
self
Expand source code
@property
def absolute(self) -> TransparentPath:
    """Returns self, since all TransparentPaths are absolute

    Returns
    -------
    TransparentPath
        self

    """
    return self
var buckets : List[str]
Expand source code
@property
def buckets(self) -> List[str]:
    if self.fs_kind == "local":
        return []
    return get_buckets(self.fs)
var download : Optional[str]

Returns a clickable link to download the file from GCS.

Returns None if the path does not correspond to an existing file on GCS.

Expand source code
@property
def download(self) -> Union[None, str]:
    """Returns a clickable link to download the file from GCS.

    Returns None if the path does not correspond to an existing file on GCS.
    """
    if self.fs_kind.startswith("gcs") and self.is_file():
        obj = str(self).replace(TransparentPath.remote_prefix, "").replace(" ", "%20")
        return f"https://storage.cloud.google.com/{obj}"
    return None
var path
Expand source code
@property
def path(self):
    return self.__path
var url : Optional[str]

Returns a clickable link to open the path in GCS

Returns None if the path does not correspond to an existing file or directory.

Expand source code
@property
def url(self) -> Union[None, str]:
    """Returns a clickable link to open the path in GCS

    Returns None if the path does not correspond to an existing file or directory.
    """
    obj = str(self).replace(TransparentPath.remote_prefix, "").replace(" ", "%20")
    if self.fs_kind.startswith("gcs"):
        project = self.fs.project
        if self.is_file():
            prefix = "https://console.cloud.google.com/storage/browser/_details/"
            postfix = f";tab=live_object?project={project}"
        elif self.is_dir():
            prefix = "https://console.cloud.google.com/storage/browser/"
            postfix = f";tab=objects?project={project}"
        else:
            return None
    else:
        if not self.exists():
            return None
        return f"file://{obj}"
    return f"{prefix}{obj}{postfix}"

Methods

def append(self, other: str) ‑> TransparentPath
Expand source code
def append(self, other: str) -> TransparentPath:
    return TransparentPath(
        str(self) + other,
        fs=self.fs_kind,
        bucket=self.bucket,
        notupdatecache=self.notupdatecache,
        nocheck=self.nocheck,
        when_checked=self.when_checked,
        when_updated=self.when_updated,
        update_expire=self.update_expire,
        check_expire=self.check_expire,
    )
def caching_ram(self, data, args, kwargs) ‑> None

caching for ram

Expand source code
def caching_ram(self, data, args, kwargs) -> None:
    """
    caching for ram
    """
    filesize = sys.getsizeof(data)
    if filesize > TransparentPath.caching_max_memory * 1000000:
        warnings.warn(
            f"You are trying to add in cache a file of {filesize / 1000000} MB, but the max memory "
            f"for caching is {TransparentPath.caching_max_memory} MB\nCaching canceled",
            TPCachingWarning,
        )
    else:
        while TransparentPath.used_memory + filesize > TransparentPath.caching_max_memory * 1000000:
            # Drop oldest file
            byename, byefile = TransparentPath.cached_data_dict.popitem(last=False)
            TransparentPath.used_memory -= sys.getsizeof(byefile)
            warnings.warn(
                f"You have exceeded the max memory for caching of {TransparentPath.caching_max_memory} MB"
                f"(old files {TransparentPath.used_memory / 1000000} MB, new file {filesize / 1000000})"
                f"removing from cach : {byename}",
                TPCachingWarning,
            )
        # Adding file to dict and filesize to total used memory
        TransparentPath.used_memory += filesize
        TransparentPath.cached_data_dict[self.__hash__()] = {"data": data, "args": args, "kwargs": kwargs}
def caching_saver(self, data, args, kwargs) ‑> None

Save fetched data from read in tmp file or dict, if total of cach does not exceed caching_max_memory else remove oldest data

To use ram caching set self.caching to "ram", to use tmp file caching set self.caching to "tmpfile"

To disable caching, set self.caching to something else or self.enable_caching to False

TransparentPath.caching_max_memory is in MB

Expand source code
def caching_saver(self, data, args, kwargs) -> None:
    """
    Save fetched data from read in tmp file or dict,
     if total of cach does not exceed caching_max_memory else remove oldest data

    To use ram caching set self.caching to "ram", to use tmp file caching set self.caching to "tmpfile"

    To disable caching, set self.caching to something else or self.enable_caching to False

    TransparentPath.caching_max_memory is in MB
    """
    if self.enable_caching:
        if self.caching == "ram":
            self.caching_ram(data, args, kwargs)

        elif self.caching == "tmpfile" and self.fs_kind != "local":
            self.caching_tmpfile(args, kwargs)
def caching_tmpfile(self, args, kwargs) ‑> None

caching for tmpfile

Expand source code
def caching_tmpfile(self, args, kwargs) -> None:
    """
    caching for tmpfile
    """
    temp_file = tempfile.NamedTemporaryFile(delete=True, suffix=self.suffix)
    self.get(temp_file.name)
    # noinspection PyUnresolvedReferences
    tempfilesize = temp_file.file.tell()
    if tempfilesize > TransparentPath.caching_max_memory * 1000000:
        warnings.warn(
            f"You are trying to add in cache a file of {tempfilesize / 1000000} MB, but the max memory "
            f"for caching is {TransparentPath.caching_max_memory} MB\nCaching canceled",
            TPCachingWarning,
        )
    else:
        while TransparentPath.used_memory + tempfilesize > TransparentPath.caching_max_memory * 1000000:
            byename, byefile = TransparentPath.cached_data_dict.popitem(last=False)
            byefile["file"].close()
            TransparentPath.used_memory -= byefile["memory"]
            warnings.warn(
                f"You have exceeded the max memory for caching of {TransparentPath.caching_max_memory} MB"
                f"(old files {TransparentPath.used_memory / 1000000} MB, new file {tempfilesize / 1000000})"
                f"removing from cach : {byename}",
                TPCachingWarning,
            )
            del byefile
        TransparentPath.used_memory += tempfilesize
        TransparentPath.cached_data_dict[self.__hash__()] = {
            "file": temp_file,
            "memory": tempfilesize,
            "args": args,
            "kwargs": kwargs,
        }
def cd(self, path: Optional[str] = None) ‑> None

cd-like command. Works inplace

Will collapse double-dots ('..'), so not compatible with symlinks. If path is absolute (starts with '/' or bucket name or is empty), will return a path starting from root directory if FileSystem is local, from bucket if it is GCS. If passing None or "" , will have the same effect than "/" on GCS, will return the current working directory on local. If passing ".", will return a path at the location of self. Will raise an error if trying to access a path before root or bucket.

Parameters

path : str
The path to cd to. Absolute, or relative to self. (Default value = None)

Returns

None : works inplace
 
Expand source code
def cd(self, path: Optional[str] = None) -> None:
    """cd-like command. Works inplace

    Will collapse double-dots ('..'), so not compatible with symlinks. If path is absolute (starts with '/' or
    bucket name or is empty), will return a path starting from root directory if FileSystem is local, from bucket
    if it is GCS. If passing None or "" , will have the same effect than "/" on GCS, will return the current
    working directory on local. If passing ".", will return a path at the location of self. Will raise an error
    if trying to access a path before root or bucket.


    Parameters
    ----------
    path: str
        The path to cd to. Absolute, or relative to self.
        (Default value = None)


    Returns
    -------
    None: works inplace

    """

    # Will collapse any '..'

    if not isinstance(path, str) or isinstance(path, TransparentPath):
        raise TypeError("Can only pass a string to TransparentPath's cd method")

    path = path.replace(TransparentPath.remote_prefix, "", 1)

    if "gcs" in self.fs_kind and str(path) == self.bucket or path == "" or str(path) == "/":
        self.__path = Path(self.bucket)
        return

    # If asked to cd to home, return path script calling directory
    if path == "" or path is None:
        self.__path = Path()
        return

    # noinspection PyUnresolvedReferences
    self.__path = self.__path / path

    if self.fs_kind == "local":
        # If asked for an absolute path
        if path.startswith("/"):
            self.__path = Path(path)
            return
        # noinspection PyUnresolvedReferences
        if len(self.__path.parts) == 0:
            return
        # noinspection PyUnresolvedReferences
        if self.__path.parts[0] == "..":
            raise ValueError("The first part of a path can not be '..'")
    else:
        # If asked for an absolute path
        if path.startswith("/"):
            self.__path = Path(self.bucket) / path[1:]
            return
        # noinspection PyUnresolvedReferences
        if self.__path == 1:  # On gcs, first part is bucket
            return
        # noinspection PyUnresolvedReferences
        if self.__path.parts[1] == "..":
            raise ValueError("Trying to access a path before bucket")

    # noinspection PyUnresolvedReferences
    self.__path = collapse_ddots(self.__path)
def change_suffix(self, suffix: str) ‑> None
Expand source code
def change_suffix(self, suffix: str) -> None:
    if not suffix.startswith("."):
        suffix = f".{suffix}"

    self.path = self.path.with_suffix(suffix)
    if self.when_checked["created"] and not self.nocheck:
        self._check_multiplicity()
    elif self.when_updated["created"] and not self.notupdatecache:  # Else, because called by check_multiplicity
        self._update_cache()
def cp(self, other: Union[str, pathlib.Path, TransparentPath])

Used to copy a file or a directory on the same filesystem.

Expand source code
def cp(self, other: Union[str, Path, TransparentPath]):
    """Used to copy a file or a directory on the same filesystem."""

    # noinspection PyProtectedMember
    if not self.exist():
        raise FileNotFoundError(f"No such file or directory: {self}")

    if not type(other) == TransparentPath:
        other = TransparentPath(
            other,
            fs=self.fs_kind,
            bucket=self.bucket,
            notupdatecache=self.notupdatecache,
            nocheck=self.nocheck,
            when_checked=self.when_checked,
            when_updated=self.when_updated,
            update_expire=self.update_expire,
            check_expire=self.check_expire,
            token=self.token,
        )
    if other.fs_kind != self.fs_kind:
        if self.fs_kind == "local":
            self.put(other)
        else:
            self.get(other)
        return

    # Do not use filesystem's copy if self is not a file, for it was coded by apes and is not able to use recursive
    # properly

    if self.is_file():
        self.fs.cp(self.__fspath__(), other)
        return

    if self.isdir() and self.info()["size"] == 0:
        other.mkdir()
        return

    for stuff in list(self.glob("**/*", fast=True)):
        # noinspection PyUnresolvedReferences
        if not stuff.is_file():
            continue
        # noinspection PyUnresolvedReferences
        relative = stuff.split(f"/{self.name}/")[-1]
        newpath = other / relative
        newpath.parent.mkdir(recursive=True)
        self.fs.cp(stuff.__fspath__(), newpath)
def exist(self) ‑> bool

To prevent typo of 'exist()' without an -s

Expand source code
def exist(self) -> bool:
    """To prevent typo of 'exist()' without an -s"""
    return self.exists()
def exists(self) ‑> bool
Expand source code
def exists(self) -> bool:
    if str(self.path) == "/" and self.fs_kind == "local":
        return True
    elif self.path == "gs://" and self.fs_kind == "gcs":
        return True
    updated = False
    if self.when_checked["used"] and not self.nocheck:
        self._check_multiplicity()
        updated = True
    elif self.when_updated["used"] and not self.notupdatecache:
        self._update_cache()
        updated = True
    if not self.fs.exists(self.__fspath__()):
        if not updated:
            self._update_cache()
            return self.fs.exists(self.__fspath__())
        else:
            return False
    return True
def get(self, loc: Union[str, pathlib.Path, TransparentPath])

used to get a remote file to local. Does not remove the remote file.

self must be a remote TransparentPath. If loc is a TransparentPath, it must be local. If it is a pathlib.Path or a str, it will be casted into a local TransparentPath.

Expand source code
def get(self, loc: Union[str, Path, TransparentPath]):
    """used to get a remote file to local. Does not remove the remote file.

    self must be a remote TransparentPath. If loc is a TransparentPath, it must be local. If it is a pathlib.Path or
    a str, it will be casted into a local TransparentPath. """

    def recursive(source, destination):
        if not source.exists():
            raise FileNotFoundError(f"Element {source} does not exist")
        if source.isfile():
            source.get(destination)
        elif source.isdir():
            for element in source.glob("*", fast=True):
                recursive(element, destination / element.name)
        else:
            raise ValueError(f"Element {source} exists is neither a file nor a dir, then what is it ?")

    if "gcs" not in self.fs_kind:
        raise ValueError("The calling instance of get() must be on GCS. To move a file localy, use the mv() method.")
    # noinspection PyUnresolvedReferences
    if type(loc) == TransparentPath and loc.fs_kind != "local":
        raise ValueError(
            "The second argument can not be a GCS "
            "TransparentPath. To move on gcs a file already"
            "on gcs, use mv(). To move a file from gcs, to"
            " local, use get()"
        )
    if type(loc) != TransparentPath:
        loc = TransparentPath(
            loc,
            fs="local",
            notupdatecache=self.notupdatecache,
            nocheck=self.nocheck,
            when_checked=self.when_checked,
            when_updated=self.when_updated,
            update_expire=self.update_expire,
            check_expire=self.check_expire,
        )

    # noinspection PyProtectedMember
    if not self.exist():
        raise FileNotFoundError(f"No such file or directory: {self}")

    if self.is_dir(exist=True):
        # Recursive fs.get does not find all existing elements, it seems, so overload it
        recursive(self, loc)
    else:
        self.fs.get(self.__fspath__(), loc.__fspath__())
def get_absolute(self) ‑> TransparentPath

Returns self, since all TransparentPaths are absolute

Returns

TransparentPath
self
Expand source code
def get_absolute(self) -> TransparentPath:
    """Returns self, since all TransparentPaths are absolute

    Returns
    -------
    TransparentPath
        self

    """
    return self
def glob(self, wildcard: str = '*', fast: bool = False, i_am_sure_i_am_a_dir: bool = False) ‑> Iterator[TransparentPath]

Returns a list of TransparentPath matching the wildcard pattern.

By default, the wildcard is ''. It means 'thepath/', so will glob in the directory.

Parameters

wildcard : str
The wilcard pattern to match, relative to self (Default value = "*")
fast : bool
If True, does not check multiplicity when converting output paths to TransparentPath, significantly speeding up the process (Default value = False)
i_am_sure_i_am_a_dir : bool
If True, will not check that self points to a directory. Saves time.

Returns

Iterator[TransparentPath]
The list of items matching the pattern
Expand source code
def glob(
    self, wildcard: str = "*", fast: bool = False, i_am_sure_i_am_a_dir: bool = False
) -> Iterator[TransparentPath]:
    """Returns a list of TransparentPath matching the wildcard pattern.

    By default, the wildcard is '*'. It means 'thepath/*', so will glob in the directory.

    Parameters
    -----------
    wildcard: str
        The wilcard pattern to match, relative to self (Default value = "*")

    fast: bool
        If True, does not check multiplicity when converting output paths to TransparentPath, significantly
        speeding up the process (Default value = False)
    i_am_sure_i_am_a_dir: bool
        If True, will not check that self points to a directory. Saves time.


    Returns
    --------
    Iterator[TransparentPath]
        The list of items matching the pattern

    """

    if not i_am_sure_i_am_a_dir:
        if not self.is_dir():
            raise NotADirectoryError("The path must be a directory if you want to glob in it")

    if wildcard.startswith("/") or wildcard.startswith("\\"):
        wildcard = wildcard[1:]

    if wildcard.startswith("**/*"):
        wildcard = wildcard.replace("**/*", "**")

    path_to_glob = (self.__path / wildcard).__fspath__()

    try:
        if fast:
            to_ret = map(self._cast_fast, self.fs.glob(path_to_glob))
        else:
            to_ret = map(self._cast_slow, self.fs.glob(path_to_glob))
    except TypeError as e:
        if "list indices must be integers or slices, not str" in str(e):
            to_ret = []
        else:
            raise e
    return to_ret
def is_dir(self, *args, **kwargs) ‑> bool

Check if self is a directory.

Returns

bool
 
Expand source code
def is_dir(self, *args, **kwargs) -> bool:
    """Check if self is a directory.


    Returns
    -------
    bool

    """
    if self.fs_kind == "local":
        if str(self.path) == "/":
            return True
        return self.__path.is_dir()
    else:
        if not self.exists():
            return False
        if self.is_file():
            return False
        return True
def is_file(self) ‑> bool

Check if self is a file On GCS, leaves are always files even if created with mkdir.

Returns

bool
 
Expand source code
def is_file(self) -> bool:
    """Check if self is a file
    On GCS, leaves are always files even if created with mkdir.


    Returns
    -------
    bool

    """

    if not self.exists():
        return False

    if self.fs_kind == "local":
        return self.__path.is_file()
    else:
        # GCS is shit and sometimes needs to be checked twice
        if self.info()["type"] == "file" and self.info()["type"] == "file":
            return True
        else:
            return False
def isdir(self, *args, **kwargs) ‑> bool
Expand source code
def isdir(self, *args, **kwargs) -> bool:
    return self.is_dir()
def isfile(self) ‑> bool
Expand source code
def isfile(self) -> bool:
    return self.is_file()
def ls(self, path_to_ls: str = '', fast: bool = False) ‑> Iterator[TransparentPath]

Unlike glob, if on GCS, will also see directories.

Parameters

path_to_ls : str
Path to ls, relative to self (default value = "")
fast : bool
If True, does not check multiplicity when converting output paths to TransparentPath, significantly speeding up the process (Default value = False)

Returns

Iterator[TransparentPath]
 
Expand source code
def ls(self, path_to_ls: str = "", fast: bool = False) -> Iterator[TransparentPath]:
    """Unlike glob, if on GCS, will also see directories.


    Parameters
    -----------
    path_to_ls: str
        Path to ls, relative to self (default value = "")
    fast: bool
        If True, does not check multiplicity when converting output
        paths to TransparentPath, significantly speeding up the process
        (Default value = False)


    Returns
    --------
    Iterator[TransparentPath]

    """

    if isinstance(path_to_ls, TransparentPath):
        raise TypeError("Can not use a TransparentPath as a argument of ls() : TransparentPath are all absolute")

    if not self.is_dir():
        raise NotADirectoryError("The path must be a directory if you want to ls in it")

    if fast:
        to_ret = map(self._cast_fast, self.fs.ls(str(self / path_to_ls)))
    else:
        to_ret = map(self._cast_slow, self.fs.ls(str(self / path_to_ls)))
    return to_ret
def mkbucket(self, name: Optional[str] = None) ‑> None
Expand source code
def mkbucket(self, name: Optional[str] = None) -> None:
    raise NotImplementedError
def mkdir(self, present: str = 'ignore', **kwargs) ‑> None

Creates the directory corresponding to self if does not exist

Remember that leaves are always files on GCS, so can not create a directory on GCS. Thus, the function will have no effect on GCS.

Parameters

present : str
What to do if there is already something at self. Can be "raise" or "ignore" (Default value = "ignore")
kwargs
The kwargs to pass to file system's mkdir method

Returns

None
 
Expand source code
def mkdir(self, present: str = "ignore", **kwargs) -> None:
    """Creates the directory corresponding to self if does not exist

    Remember that leaves are always files on GCS, so can not create a directory on GCS. Thus, the function will
    have no effect on GCS.


    Parameters
    ----------
    present: str
        What to do if there is already something at self. Can be "raise" or "ignore" (Default value = "ignore")

    kwargs
        The kwargs to pass to file system's mkdir method


    Returns
    -------
    None

    """

    if present != "raise" and present != "ignore":
        raise ValueError(f"Unexpected value for argument 'present' : {present}")

    if self.exists():
        if self.is_dir() and present == "raise":
            raise FileExistsError(f"There is already a directory at {self}")
        if not self.is_dir():
            raise FileExistsError(f"There is already an object at {self} and it is not a  directory")
        return

    for parent in reversed(self.parents):
        thefile = TransparentPath(
            parent,
            fs=self.fs_kind,
            bucket=self.bucket,
            notupdatecache=self.notupdatecache,
            nocheck=self.nocheck,
            when_checked=self.when_checked,
            when_updated=self.when_updated,
            update_expire=self.update_expire,
            check_expire=self.check_expire,
        )
        if thefile.is_file():
            raise FileExistsError(
                "A parent directory can not be created because there is already a file at" f" {thefile}"
            )

    if self.fs_kind == "local":
        # Use _obj_missing instead of callign mkdir directly because
        # file systems mkdir has some kwargs with different name than
        # pathlib.Path's  mkdir, and this is handled in _obj_missing
        self._obj_missing("mkdir", kind="translate", **kwargs)
    else:
        # Does not mean anything to create a directory on GCS
        pass
def mv(self, other: Union[str, pathlib.Path, TransparentPath])

Used to move a file or a directory. Works between any filesystems.

Expand source code
def mv(self, other: Union[str, Path, TransparentPath]):
    """Used to move a file or a directory. Works between any filesystems."""

    if not type(other) == TransparentPath:
        other = TransparentPath(
            other,
            fs=self.fs_kind,
            bucket=self.bucket,
            notupdatecache=self.notupdatecache,
            nocheck=self.nocheck,
            when_checked=self.when_checked,
            when_updated=self.when_updated,
            update_expire=self.update_expire,
            check_expire=self.check_expire,
            token=self.token,
        )

    if other.fs_kind != self.fs_kind:
        if self.fs_kind == "local":
            self.put(other)
            self.rm(absent="raise", ignore_kind=True)
        else:
            self.get(other)
            self.rm(absent="raise", ignore_kind=True)
        return

    # Do not use filesystem's move, for it is coded by apes and is not able to use recursive properly
    # self.fs.mv(self.__fspath__(), other, **kwargs)

    # noinspection PyProtectedMember
    if not self.exist():
        raise FileNotFoundError(f"No such file or directory: {self}")

    if self.is_file():
        self.fs.mv(self.__fspath__(), other)
        return

    for stuff in list(self.glob("**/*", fast=True)):
        # noinspection PyUnresolvedReferences
        if not stuff.is_file():
            continue
        # noinspection PyUnresolvedReferences
        relative = stuff.split(f"/{self.name}/")[-1]
        newpath = other / relative
        newpath.parent.mkdir(recursive=True)
        self.fs.mv(stuff.__fspath__(), newpath)
def put(self, dst: Union[str, pathlib.Path, TransparentPath])

used to push a local file to the cloud. Does not remove the local file.

self must be a local TransparentPath. If dst is a TransparentPath, it must be on GCS. If it is a pathlib.Path or a str, it will be casted into a GCS TransparentPath, so a gcs file system must have been set up before.

Expand source code
def put(self, dst: Union[str, Path, TransparentPath]):
    """used to push a local file to the cloud. Does not remove the local file.

    self must be a local TransparentPath. If dst is a TransparentPath, it must be on GCS. If it is a pathlib.Path
    or a str, it will be casted into a GCS TransparentPath, so a gcs file system must have been set up before. """
    if not self.fs_kind == "local":
        raise ValueError(
            "The calling instance of put() must be local. "
            "To move on gcs a file already on gcs, use mv("
            "). To move a file from gcs, to local, use get("
            "). "
        )
    # noinspection PyUnresolvedReferences
    if type(dst) == TransparentPath and "gcs" not in dst.fs_kind:
        raise ValueError(
            "The second argument can not be a local TransparentPath. To move a file localy, use the mv() method."
        )
    if type(dst) != TransparentPath:
        if TransparentPath.remote_prefix not in str(dst):
            if "gcs" not in "".join(TransparentPath.fss):
                raise ValueError("You need to set up a gcs file system before using the put() command.")
            dst = TransparentPath(dst, fs="gcs")
        else:
            dst = TransparentPath(dst)

    # noinspection PyProtectedMember
    if not self.exist():
        raise FileNotFoundError(f"No such file or directory: {self}")

    if self.is_dir():
        for item in self.glob("/*"):
            # noinspection PyUnresolvedReferences
            item.put(dst / item.name)
    else:
        with open(self, "rb") as f1:
            with open(dst, "wb") as f2:
                data = True
                while data:
                    data = f1.read(self.blocksize)
                    f2.write(data)
def read(self, *args, get_obj: bool = False, use_pandas: bool = False, use_dask: bool = False, **kwargs) ‑> Any

Method used to read the content of the file located at self

Will raise FileNotFound error if there is no file. Calls a specific method to read self based on the suffix of self.path: 1: .csv : will use pandas's read_csv 2: .parquet : will use pandas's read_parquet with pyarrow engine 3: .hdf5 or .h5 : will use h5py.File or pd.HDFStore (if use_pandas = True). Since it does not support remote file systems, the file will be downloaded localy in a tmp file read, then removed. 4: .json : will use open() method to get file content then json.loads to get a dict 5: .xlsx : will use pd.read_excel 6: any other suffix : will return a IO buffer to read from, or the string contained in the file if get_obj is False.

For any of the reading method, the appropriate packages need to have been installed by calling pip install transparentpath[something] The possibilities for 'something' are 'pandas-csv', 'pandas-parquet', 'pandas-excel', 'hdf5', 'json', 'dask'. You can install all possible packages by putting 'all' in place of 'something'.

The default installation of transperantpath is 'vanilla', which will only support read and write of text or binary files, and the use of with open(…).

If self.enable_caching is True, will either save in tmp file (if self.caching == "tmpfile") or store the read data in a dict (if self.caching == "ram"), then if the path have already been read, will just return the previously stored data

Parameters

get_obj : bool
Only relevant for files that are not csv, parquet nor HDF5. If True returns the IO Buffer, else the string contained in the IO Buffer (Default value = False)
use_pandas : bool
Must pass it as True if hdf5 file was written using HDFStore and not h5py.File (Default value = False)
use_dask : bool
To return a Dask DataFrame instead of a pandas DataFrame. Only makes sense if file suffix is xlsx, csv, parquet. (Default value = False)

args: any args to pass to the underlying reading method kwargs: any kwargs to pass to the underlying reading method

Returns

Any
 
Expand source code
def read(
    self,
    *args,
    get_obj: bool = False,
    use_pandas: bool = False,
    use_dask: bool = False,
    **kwargs,
) -> Any:
    """Method used to read the content of the file located at self

    Will raise FileNotFound error if there is no file. Calls a specific method to read self based on the suffix
    of self.path:
        1: .csv : will use pandas's read_csv
        2: .parquet : will use pandas's read_parquet with pyarrow engine
        3: .hdf5 or .h5 : will use h5py.File or pd.HDFStore (if use_pandas = True). Since it does not support
        remote file systems, the file will be downloaded localy in a tmp file read, then removed.
        4: .json : will use open() method to get file content then json.loads to get a dict
        5: .xlsx : will use pd.read_excel
        6: any other suffix : will return a IO buffer to read from, or the string contained in the file if
        get_obj is False.

    For any of the reading method, the appropriate packages need to have been installed by calling
    `pip install transparentpath[something]`
    The possibilities for 'something' are 'pandas-csv', 'pandas-parquet', 'pandas-excel', 'hdf5', 'json', 'dask'.
    You can install all possible packages by putting 'all' in place of 'something'.

    The default installation of transperantpath is 'vanilla', which will only support read and write of text
     or binary files, and the use of with open(...).

    If self.enable_caching is True, will either save in tmp file (if self.caching == "tmpfile") or store the read
    data in a dict (if self.caching == "ram"), then if the path have already been read, will just return the
    previously stored data

    Parameters
    ----------
    get_obj: bool
        Only relevant for files that are not csv, parquet nor HDF5. If True returns the IO Buffer,
        else the string contained in the IO Buffer (Default value = False)
    use_pandas: bool
        Must pass it as True if hdf5 file was written using HDFStore and not h5py.File (Default value = False)
    use_dask: bool
        To return a Dask DataFrame instead of a pandas DataFrame. Only makes sense if file suffix is xlsx, csv,
        parquet. (Default value = False)
    args:
        any args to pass to the underlying reading method
    kwargs:
        any kwargs to pass to the underlying reading method

    Returns
    -------
    Any
    """
    if self.enable_caching:
        if self.caching == "ram":
            if self.__hash__() in TransparentPath.cached_data_dict.keys():
                return TransparentPath.cached_data_dict[self.__hash__()]["data"]
        elif self.caching == "tmpfile" and self.fs_kind != "local":
            if self.__hash__() in TransparentPath.cached_data_dict.keys():
                return TransparentPath(
                    TransparentPath.cached_data_dict[self.__hash__()]["file"].name, fs="local"
                ).read(*args, get_obj, use_pandas, use_dask, **kwargs)
    if self.suffix == ".csv":
        ret = self.read_csv(use_dask=use_dask, **kwargs)
        self.caching_saver(
            ret, args, kwargs.update({"use_pandas": use_pandas, "use_dask": use_dask, "get_obj": get_obj})
        )
        return ret
    elif self.suffix == ".parquet":
        index_col = None
        if "index_col" in kwargs:
            index_col = kwargs["index_col"]
            del kwargs["index_col"]
        # noinspection PyNoneFunctionAssignment
        content = self.read_parquet(use_dask=use_dask, **kwargs)
        if index_col:
            # noinspection PyUnresolvedReferences
            content.set_index(content.columns[index_col])
        self.caching_saver(
            content, args, kwargs.update({"use_pandas": use_pandas, "use_dask": use_dask, "get_obj": get_obj})
        )
        return content
    elif self.suffix == ".hdf5" or self.suffix == ".h5":
        ret = self.read_hdf5(use_pandas=use_pandas, use_dask=use_dask, **kwargs)
        self.caching_saver(
            ret, args, kwargs.update({"use_pandas": use_pandas, "use_dask": use_dask, "get_obj": get_obj})
        )
        return ret
    elif self.suffix == ".json":
        ret = self.read_json(*args, get_obj=get_obj, **kwargs)
        self.caching_saver(
            ret, args, kwargs.update({"use_pandas": use_pandas, "use_dask": use_dask, "get_obj": get_obj})
        )
        return ret
    elif self.suffix in [".xlsx", ".xls", ".xlsm"]:
        ret = self.read_excel(use_dask=use_dask, **kwargs)
        self.caching_saver(
            ret, args, kwargs.update({"use_pandas": use_pandas, "use_dask": use_dask, "get_obj": get_obj})
        )
        return ret
    else:
        ret = self.read_text(*args, get_obj=get_obj, **kwargs)
        self.caching_saver(
            ret, args, kwargs.update({"use_pandas": use_pandas, "use_dask": use_dask, "get_obj": get_obj})
        )
        return ret
def read_csv(self, *args, **kwargs) ‑> Any
Expand source code
def read_csv(self, *args, **kwargs) -> Any:
    use_dask = False
    if "use_dask" in kwargs:
        use_dask = kwargs["use_dask"]
        del kwargs["use_dask"]
    if use_dask:
        return self.read_csv_dask(*args, **kwargs)
    else:
        return self.read_csv_classic(*args, **kwargs)
def read_csv_classic(self, **kwargs) ‑> pandas.core.frame.DataFrame
Expand source code
def read(self, **kwargs) -> pd.DataFrame:

    if not self.is_file():
        raise FileNotFoundError(f"Could not find file {self}")

    # noinspection PyTypeChecker,PyUnresolvedReferences
    try:

        check_kwargs(pd.read_csv, kwargs)
        with self.fs.open(self.__fspath__()) as f:
            return pd.read_csv(f, **kwargs)

    except pd.errors.ParserError:
        # noinspection PyUnresolvedReferences
        raise pd.errors.ParserError(
            "Could not read data. Most likely, the file is encrypted."
            " Ask your cloud manager to remove encryption on it."
        )
def read_csv_dask(self, **kwargs) ‑> dask.dataframe.core.DataFrame
Expand source code
def read_csv(self, **kwargs) -> dd.DataFrame:

    if not self.nocheck:
        self._check_multiplicity()

    check_dask(self)

    if self.is_file():
        to_use = self
    else:
        to_use = self.with_suffix("")

    index_col, parse_dates, kwargs = get_index_and_date_from_kwargs(**kwargs)
    check_kwargs(dd.read_csv, kwargs)
    return apply_index_and_date_dd(index_col, parse_dates, dd.read_csv(to_use.__fspath__(), **kwargs))
def read_excel(self, *args, **kwargs) ‑> Any
Expand source code
def read_excel(self, *args, **kwargs) -> Any:
    use_dask = False
    if "use_dask" in kwargs:
        use_dask = kwargs["use_dask"]
        del kwargs["use_dask"]
    if use_dask:
        return self.read_excel_dask(*args, **kwargs)
    else:
        return self.read_excel_classic(*args, **kwargs)
def read_excel_classic(self, **kwargs) ‑> pandas.core.frame.DataFrame
Expand source code
def read(self, **kwargs) -> pd.DataFrame:

    if not self.is_file():
        raise FileNotFoundError(f"Could not find file {self}")

    check_kwargs(pd.read_excel, kwargs)
    # noinspection PyTypeChecker,PyUnresolvedReferences
    try:
        if self.fs_kind == "local":
            return pd.read_excel(self.__fspath__(), **kwargs)
        else:
            f = tempfile.NamedTemporaryFile(delete=False, suffix=self.suffix)
            f.close()  # deletes the tmp file, but we can still use its name
            self.get(f.name)
            data = pd.read_excel(f.name, **kwargs)
            Path(f.name).unlink()
            return data
    except pd.errors.ParserError:
        # noinspection PyUnresolvedReferences
        raise pd.errors.ParserError(
            "Could not read data. Most likely, the file is encrypted."
            " Ask your cloud manager to remove encryption on it."
        )
def read_excel_dask(self, **kwargs) ‑> pandas.core.frame.DataFrame
Expand source code
def read_excel(self, **kwargs) -> pd.DataFrame:

    if not excel_ok:
        raise ImportError(errormessage_excel)

    # noinspection PyProtectedMember
    if not self.nocheck:
        self._check_multiplicity()

    check_dask(self)

    check_kwargs(pd.read_excel, kwargs)
    # noinspection PyTypeChecker,PyUnresolvedReferences
    try:
        if self.fs_kind == "local":
            parts = delayed(pd.read_excel)(self.__fspath__(), **kwargs)
            return dd.from_delayed(parts)
        else:
            f = tempfile.NamedTemporaryFile(delete=False, suffix=".xlsx")
            f.close()  # deletes the tmp file, but we can still use its name
            self.get(f.name)
            parts = delayed(pd.read_excel)(f.name, **kwargs)
            data = dd.from_delayed(parts)
            # We should not delete the tmp file, since dask does its operations lasily.
            return data
    except pd.errors.ParserError:
        # noinspection PyUnresolvedReferences
        raise pd.errors.ParserError(
            "Could not read data. Most likely, the file is encrypted. Ask your cloud manager to remove encryption "
            "on it."
        )
def read_hdf5(self, *args, **kwargs) ‑> Any
Expand source code
def read_hdf5(self, *args, **kwargs) -> Any:
    use_dask = False
    if "use_dask" in kwargs:
        use_dask = kwargs["use_dask"]
        del kwargs["use_dask"]
    if use_dask:
        return self.read_hdf5_dask(*args, **kwargs)
    else:
        return self.read_hdf5_classic(*args, **kwargs)
def read_hdf5_classic(self: TransparentPath, use_pandas: bool = False, **kwargs) ‑> Union[h5py._hl.files.File, transparentpath.io._pandas.MyHDFStore]

Reads a HDF5 file. Must have been created by h5py.File or pd.HDFStore (specify use_pandas=True if so)

Since h5py.File/pd.HDFStore does not support GCS, first copy it in a tmp file.

Parameters

self : TransparentPath
 
use_pandas : bool
To use HDFStore instead of h5py.File (Default value = False)
kwargs
The kwargs to pass to h5py.File/pd.HDFStore method, or to dask.dataframe.read_hdf()

Returns

Union[h5py.File, MyHDFStore]
Opened h5py.File/pd.HDFStore
Expand source code
def read(self: TransparentPath, use_pandas: bool = False, **kwargs,) -> Union[h5py.File, MyHDFStore]:
    """Reads a HDF5 file. Must have been created by h5py.File or pd.HDFStore (specify use_pandas=True if so)

    Since h5py.File/pd.HDFStore does not support GCS, first copy it in a tmp file.


    Parameters
    ----------
    self: TransparentPath

    use_pandas: bool
        To use HDFStore instead of h5py.File (Default value = False)

    kwargs
        The kwargs to pass to h5py.File/pd.HDFStore method, or to dask.dataframe.read_hdf()


    Returns
    -------
    Union[h5py.File, MyHDFStore]
        Opened h5py.File/pd.HDFStore

    """

    mode = "r"
    if "mode" in kwargs:
        mode = kwargs["mode"]
        del kwargs["mode"]
    if "r" not in mode:
        raise ValueError("If using read_hdf5, mode must contain 'r'")

    class_to_use = h5py.File
    if use_pandas:
        class_to_use = MyHDFStore

    if not self.is_file():
        raise FileNotFoundError(f"Could not find file {self}")

    if self.fs_kind == "local":
        # Do not check kwargs since HDFStore and h5py both accepct kwargs anyway
        data = class_to_use(self.path, mode=mode, **kwargs)
    else:
        f = tempfile.NamedTemporaryFile(delete=False, suffix=".hdf5")
        f.close()  # deletes the tmp file, but we can still use its name
        self.get(f.name)
        # Do not check kwargs since HDFStore and h5py both accepct kwargs anyway
        data = class_to_use(f.name, mode=mode, **kwargs)
        Path(f.name).unlink()
    return data
def read_hdf5_dask(self, set_names: str = '', use_pandas: bool = False, **kwargs) ‑> dask.dataframe.core.DataFrame
Expand source code
def read_hdf5(self, set_names: str = "", use_pandas: bool = False, **kwargs) -> dd.DataFrame:

    if not hdf5_ok:
        raise ImportError(errormessage_hdf5)

    if use_pandas:
        raise NotImplementedError("Using dask in transparentpath does not support pandas's HDFStore")

    mode = "r"
    if "mode" in kwargs:
        mode = kwargs["mode"]
        del kwargs["mode"]
    if "r" not in mode:
        raise ValueError("If using read_hdf5, mode must contain 'r'")

    if not self.nocheck:
        self._check_multiplicity()

    check_dask(self)

    if len(set_names) == 0:
        raise ValueError(
            "If using Dask, you must specify the dataset name to extract using set_names='aname' or a wildcard."
        )

    check_kwargs(dd.read_hdf, kwargs)
    if self.fs_kind == "local":
        return dd.read_hdf(pattern=self.__fspath__(), key=set_names, **kwargs)
    f = tempfile.NamedTemporaryFile(delete=False, suffix=".hdf5")
    f.close()  # deletes the tmp file, but we can still use its name to download the remote file locally
    self.get(f.name)
    data = self.__class__.cli.submit(dd.read_hdf, f.name, set_names, **kwargs)
    # Do not delete the tmp file, since dask tasks are delayed
    return data.result()
def read_json(self, *args, get_obj, **kwargs)
Expand source code
def read(self, *args, get_obj, **kwargs):
    stringified = self.read_text(*args, get_obj=get_obj, **kwargs)
    dictified = json.loads(stringified, object_hook=json_obj_hook)
    if isinstance(dictified, str):
        try:
            dictified = json.loads(dictified)
        except TypeError:
            pass
    return dictified
def read_parquet(self, *args, **kwargs)
Expand source code
def read_parquet(self, *args, **kwargs):
    use_dask = False
    if "use_dask" in kwargs:
        use_dask = kwargs["use_dask"]
        del kwargs["use_dask"]
    if use_dask:
        return self.read_parquet_dask(*args, **kwargs)
    else:
        return self.read_parquet_classic(*args, **kwargs)
def read_parquet_classic(self, **kwargs) ‑> Union[pandas.core.frame.DataFrame, pandas.core.series.Series]
Expand source code
def read(self, **kwargs) -> Union[pd.DataFrame, pd.Series]:

    if not self.is_file():
        raise FileNotFoundError(f"Could not find file {self}")

    index_col, parse_dates, kwargs = get_index_and_date_from_kwargs(**kwargs)

    check_kwargs(pd.read_parquet, kwargs)
    if "engine" in kwargs:
        engine = kwargs["engine"]
        del kwargs["engine"]
    else:
        engine = "pyarrow"
    if self.fs_kind == "local":
        return apply_index_and_date(
            index_col, parse_dates, pd.read_parquet(self.__fspath__(), engine=engine, **kwargs)
        )

    elif engine == "pyarrow":
        return apply_index_and_date(
            index_col, parse_dates, pd.read_parquet(self.open("rb"), engine="pyarrow", **kwargs)
        )
    else:
        f = tempfile.NamedTemporaryFile(delete=False, suffix=".parquet")
        f.close()  # deletes the tmp file, but we can still use its name
        self.get(f.name)
        data = pd.read_parquet(f.name, engine=engine, **kwargs)
        Path(f.name).unlink()
        return apply_index_and_date(index_col, parse_dates, data)
def read_parquet_dask(self, **kwargs) ‑> Union[dask.dataframe.core.DataFrame, dask.dataframe.core.Series]
Expand source code
def read_parquet(self, **kwargs) -> Union[dd.DataFrame, dd.Series]:

    if not parquet_ok:
        raise ImportError(errormessage_parquet)

    index_col, parse_dates, kwargs = get_index_and_date_from_kwargs(**kwargs)

    if not self.nocheck:
        self._check_multiplicity()

    check_dask(self)

    if self.is_file():
        to_use = self
    else:
        to_use = self.with_suffix("")
    check_kwargs(dd.read_parquet, kwargs)
    return apply_index_and_date_dd(
        index_col, parse_dates, dd.read_parquet(to_use.__fspath__(), engine="pyarrow", **kwargs)
    )
def read_text(self, *args, size: int = -1, get_obj: bool = False, **kwargs) ‑> Union[str, IO]
Expand source code
def read_text(self, *args, size: int = -1, get_obj: bool = False, **kwargs) -> Union[str, IO]:
    if not self.is_file():
        raise FileNotFoundError(f"Could not find file {self}")

    byte_mode = True
    if len(args) == 0:
        byte_mode = False
        args = ("rb",)
    if "b" not in args[0]:
        byte_mode = False
        args[0] += "b"
    if get_obj:
        return self.open(*args, **kwargs)

    with self.open(*args, **kwargs) as f:
        to_ret = f.read(size)
        if not byte_mode:
            to_ret = to_ret.decode()
    return to_ret
def refresh_cache(self) ‑> None

update tp

Expand source code
def refresh_cache(self) -> None:
    """
    update tp
    """
    if self.enable_caching:
        if self.__hash__() in TransparentPath.cached_data_dict.keys():
            arg = TransparentPath.cached_data_dict[self.__hash__()]["arg"]
            kwarg = TransparentPath.cached_data_dict[self.__hash__()]["kwarg"]
            self.uncache()
            self.read(*arg, **kwarg)
        else:
            warnings.warn(f"{self.__hash__()} is not in cache", TPCachingWarning)
def rm(self, absent: str = 'raise', ignore_kind: bool = False, **kwargs) ‑> None

Removes the object pointed to by self if exists. Remember that leaves are always files on GCS, so rm will remove the path if it is a leaf on GCS

Parameters

absent : str
What to do if trying to remove an item that does not exist. Can be 'raise' or 'ignore' (Default value = 'raise')
ignore_kind : bool
If True, will remove anything pointed by self. If False, will raise an error if self points to a file and 'recursive' was specified in kwargs, or if self points to a dir and 'recursive' was not specified (Default value = False)
kwargs
The kwargs to pass to file system's rm method

Returns

None
 
Expand source code
def rm(self, absent: str = "raise", ignore_kind: bool = False, **kwargs) -> None:
    """Removes the object pointed to by self if exists.
    Remember that leaves are always files on GCS, so rm will remove the path if it is a leaf on GCS


    Parameters
    ----------
    absent: str
        What to do if trying to remove an item that does not exist. Can
        be 'raise' or 'ignore' (Default value = 'raise')

    ignore_kind: bool
        If True, will remove anything pointed by self. If False,
        will raise an error if self points to a file and 'recursive' was
        specified in kwargs, or if self points to a dir and 'recursive'
        was not specified (Default value = False)

    kwargs
        The kwargs to pass to file system's rm method


    Returns
    -------
    None

    """

    if absent != "raise" and absent != "ignore":
        raise ValueError(f"Unexpected value for argument 'absent' : {absent}")

    # Asked to remove a directory...
    recursive = kwargs.get("recursive", False)

    if recursive:
        if not self.is_dir():
            # ...but self points to something that is not a directory!
            if self.exists():
                # Delete anyway
                if ignore_kind:
                    del kwargs["recursive"]
                    self.rm(absent, **kwargs)
                # or raise
                else:
                    raise NotADirectoryError("The path does not point to a directory!")
            # ...but self points to something that does not exist!
            else:
                if absent == "raise":
                    raise NotADirectoryError("There is no directory here!")
                else:
                    return
        # ...deletes the directory
        else:
            try:
                self.fs.rm(self.__fspath__(), **kwargs)
            except OSError as e:
                if "not found" in str(e).lower():
                    # It is possible that another parallel program deleted the object, in that case just pass
                    pass
                else:
                    raise e
    # Asked to remove a file...
    else:
        # ...but self points to a directory!
        if self.is_dir():
            # Delete anyway
            if ignore_kind:
                kwargs["recursive"] = True
                self.rm(absent=absent, ignore_kind=True, **kwargs)
            # or raise
            else:
                raise IsADirectoryError("The path points to a directory")
        else:
            # ... but nothing is at self
            if not self.exists():
                if absent == "raise":
                    raise FileNotFoundError(f"Could not find file {self}")
                else:
                    return
            else:
                try:
                    self.fs.rm(self.__fspath__(), **kwargs)
                except OSError as e:
                    if "not found" in str(e).lower():
                        # It is possible that another parallel program deleted the object, in that case just pass
                        pass
                    else:
                        raise e
def rmbucket(self, name: Optional[str] = None) ‑> None
Expand source code
def rmbucket(self, name: Optional[str] = None) -> None:
    raise NotImplementedError
def rmdir(self, absent: str = 'raise', ignore_kind: bool = False) ‑> None

Removes the directory corresponding to self if exists Remember that leaves are always files on GCS, so rmdir will never remove a leaf on GCS

Parameters

absent : str
What to do if trying to remove an item that does not exist. Can be 'raise' or 'ignore' (Default value = 'raise')
ignore_kind : bool
If True, will remove anything pointed by self. If False, will raise an error if self points to a file and 'recursive' was specified in kwargs, or if self point to a dir and 'recursive' was not specified (Default value = False)
Expand source code
def rmdir(self, absent: str = "raise", ignore_kind: bool = False) -> None:
    """Removes the directory corresponding to self if exists
    Remember that leaves are always files on GCS, so rmdir will never remove a leaf on GCS


    Parameters
    ----------
    absent: str
        What to do if trying to remove an item that does not exist. Can
        be 'raise' or 'ignore' (Default value = 'raise')

    ignore_kind: bool
        If True, will remove anything pointed by self. If False,
        will raise an error if self points to a file and 'recursive' was
        specified in kwargs, or if self point to a dir and 'recursive'
        was not specified (Default value = False)

    """
    self.rm(absent=absent, ignore_kind=ignore_kind, recursive=True)
def stat(self) ‑> dict

Calls file system's stat method and translates the key to os.stat_result() keys

Returns empty dict of path does not point to anything

Expand source code
def stat(self) -> dict:
    """Calls file system's stat method and translates the key to os.stat_result() keys

    Returns empty dict of path does not point to anything
    """

    if not self.exist():
        return {}

    key_translation = {
        "size": "st_size",
        "timeCreated": "st_ctime",
        "updated": "st_mtime",
        "created": "st_ctime",
        "mode": "st_mode",
        "uid": "st_uid",
        "gid": "st_gid",
        "mtime": "st_mtime",
    }

    stat = self.fs.stat(self.__fspath__())
    statkeys = list(stat.keys())
    for key in statkeys:
        if key in key_translation:
            if key == "timeCreated" or key == "updated":
                dt = datetime.strptime(stat[key], "%Y-%m-%dT%H:%M:%S.%fZ")
                stat[key] = dt.timestamp()
            if key == "created" or key == "mtime":
                stat[key] = int(stat[key])
            stat[key_translation[key]] = stat[key]

    for key in key_translation.values():
        if key not in stat:
            stat[key] = None

    return stat
def to_csv(self, data, *args, **kwargs)
Expand source code
def to_csv(self, data, *args, **kwargs):
    if "dask" in str(type(data)):
        return self.to_csv_dask(data, *args, **kwargs)
    else:
        self.to_csv_classic(data, *args, **kwargs)
def to_csv_classic(self, data: Union[pandas.core.frame.DataFrame, pandas.core.series.Series], overwrite: bool = True, present: str = 'ignore', **kwargs)
Expand source code
def write(
    self, data: Union[pd.DataFrame, pd.Series], overwrite: bool = True, present: str = "ignore", **kwargs,
):

    if self.suffix != ".csv":
        warnings.warn(f"path {self} does not have '.csv' as suffix while using to_csv. The path will be "
                      f"changed to a path with '.csv' as suffix")
        self.change_suffix(".csv")

    if not overwrite and self.is_file() and present != "ignore":
        raise FileExistsError()

    check_kwargs(data.to_csv, kwargs)
    with self.fs.open(self.__fspath__(), "w") as f:
        data.to_csv(f, **kwargs)
def to_csv_dask(self, data: dask.dataframe.core.DataFrame, overwrite: bool = True, present: str = 'ignore', **kwargs) ‑> Optional[None]
Expand source code
def write_csv(
    self, data: dd.DataFrame, overwrite: bool = True, present: str = "ignore", **kwargs,
) -> Union[None, List[TransparentPath]]:

    if self.suffix != ".csv":
        warnings.warn(f"path {self} does not have '.csv' as suffix while using to_csv. The path will be "
                      f"changed to a path with '.csv' as suffix")
        self.change_suffix(".csv")

    if not self.nocheck:
        self._check_multiplicity()

    if not overwrite and self.is_file() and present != "ignore":
        raise FileExistsError()

    if self.__class__.cli is None:
        self.__class__.cli = client.Client()
    check_kwargs(dd.to_csv, kwargs)
    path_to_save = self
    if not path_to_save.stem.endswith("*"):
        path_to_save = path_to_save.parent / (path_to_save.stem + "_*.csv")
    # noinspection PyTypeChecker
    futures = self.__class__.cli.submit(dd.to_csv, data, path_to_save.__fspath__(), **kwargs)
    outfiles = [
        TransparentPath(f, fs=self.fs_kind, bucket=self.bucket) for f in futures.result()
    ]
    if len(outfiles) == 1:
        outfiles[0].mv(self)
    else:
        return outfiles
def to_excel(self, data, *args, **kwargs)
Expand source code
def to_excel(self, data, *args, **kwargs):
    if "dask" in str(type(data)):
        self.to_excel_dask(data, *args, **kwargs)
    else:
        self.to_excel_classic(data, *args, **kwargs)
def to_excel_classic(self, data: Union[pandas.core.frame.DataFrame, pandas.core.series.Series], overwrite: bool = True, present: str = 'ignore', **kwargs) ‑> None
Expand source code
def write(
    self, data: Union[pd.DataFrame, pd.Series], overwrite: bool = True, present: str = "ignore", **kwargs,
) -> None:

    if self.suffix != ".xlsx" and self.suffix != ".xls" and self.suffix != ".xlsm":
        warnings.warn(f"path {self} does not have '.xls(x,m)' as suffix while using to_excel. The path will be "
                      f"changed to a path with '.xlsx' as suffix")
        self.change_suffix(".xlsx")

    if not overwrite and self.is_file() and present != "ignore":
        raise FileExistsError()

    # noinspection PyTypeChecker

    if self.fs_kind == "local":
        data.to_excel(self.__fspath__(), **kwargs)
    else:
        with tempfile.NamedTemporaryFile(delete=True, suffix=self.suffix) as f:
            check_kwargs(data.to_excel, kwargs)
            data.to_excel(f.name, **kwargs)
            TransparentPath(
                path=f.name,
                fs="local",
                notupdatecache=self.notupdatecache,
                nocheck=self.nocheck,
                when_checked=self.when_checked,
                when_updated=self.when_updated,
                update_expire=self.update_expire,
                check_expire=self.check_expire,
            ).put(self.path)
def to_excel_dask(self, data: Union[pandas.core.frame.DataFrame, pandas.core.series.Series, dask.dataframe.core.DataFrame], overwrite: bool = True, present: str = 'ignore', **kwargs) ‑> None
Expand source code
def write_excel(
    self,
    data: Union[pd.DataFrame, pd.Series, dd.DataFrame],
    overwrite: bool = True,
    present: str = "ignore",
    **kwargs,
) -> None:

    if not excel_ok:
        raise ImportError(errormessage_excel)

    if self.suffix != ".xlsx" and self.suffix != ".xls" and self.suffix != ".xlsm":
        warnings.warn(f"path {self} does not have '.xls(x,m)' as suffix while using to_excel. The path will be "
                      f"changed to a path with '.xlsx' as suffix")
        self.change_suffix(".xlsx")

    if not self.nocheck:
        self._check_multiplicity()

    if not overwrite and self.is_file() and present != "ignore":
        raise FileExistsError()

    if self.fs_kind == "local":
        if self.__class__.cli is None:
            self.__class__.cli = client.Client()
        check_kwargs(pd.DataFrame.to_excel, kwargs)
        parts = delayed(pd.DataFrame.to_excel)(data, self.__fspath__(), **kwargs)
        parts.compute()
        return
    else:
        with tempfile.NamedTemporaryFile(delete=True, suffix=".xlsx") as f:
            if TransparentPath.cli is None:
                TransparentPath.cli = client.Client()
            check_kwargs(pd.DataFrame.to_excel, kwargs)
            parts = delayed(pd.DataFrame.to_excel)(data, f.name, **kwargs)
            parts.compute()
            TransparentPath(path=f.name, fs="local", bucket=self.bucket).put(self.path)
def to_hdf5(self, data, *args, **kwargs)
Expand source code
def to_hdf5(self, data, *args, **kwargs):
    if "dask" in str(type(data)):
        return self.to_hdf5_dask(data, *args, **kwargs)
    else:
        return self.to_hdf5_classic(data, *args, **kwargs)
def to_hdf5_classic(self: TransparentPath, data: Any = None, set_name: str = None, use_pandas: bool = False, **kwargs) ‑> Union[None, h5py._hl.files.File, transparentpath.io._pandas.MyHDFStore]

Parameters

self : TransparentPath
 
data : Any
The data to store. Can be None, in that case an opened file is returned (Default value = None)
set_name : str
The name of the dataset (Default value = None)
use_pandas : bool
To use pd.HDFStore object instead of h5py.File (Default = False)
**kwargs
 

Returns

Union[None, pd.HDFStore, h5py.File]
 
Expand source code
def write(
    self: TransparentPath, data: Any = None, set_name: str = None, use_pandas: bool = False, **kwargs,
) -> Union[None, h5py.File, MyHDFStore]:
    """

    Parameters
    ----------
    self: TransparentPath
    data: Any
        The data to store. Can be None, in that case an opened file is returned (Default value = None)
    set_name: str
        The name of the dataset (Default value = None)
    use_pandas: bool
        To use pd.HDFStore object instead of h5py.File (Default = False)
    **kwargs

    Returns
    -------
    Union[None, pd.HDFStore, h5py.File]

    """

    if self.suffix != ".hdf5" and self.suffix != "h5":
        warnings.warn(f"path {self} does not have '.h(df)5' as suffix while using to_hdf5. The path will be "
                      f"changed to a path with '.hdf5' as suffix")
        self.change_suffix(".hdf5")

    mode = "w"
    if "mode" in kwargs:
        mode = kwargs["mode"]
        del kwargs["mode"]

    if self.when_checked["used"] and not self.nocheck:
        self._check_multiplicity()

    # If no data is specified, an HDF5 file is returned, opened in write mode, or any other specified mode.
    if data is None:

        class_to_use = MyHDFFile
        if use_pandas:
            class_to_use = MyHDFStore

        if self.fs_kind == "local":
            return class_to_use(self.path, mode=mode, **kwargs)
        else:
            f = tempfile.NamedTemporaryFile(delete=True, suffix=".hdf5")
            return class_to_use(f, remote=self.path, mode=mode, **kwargs)
    else:

        if isinstance(data, dict):
            sets = data
        else:
            if set_name is None:
                set_name = "data"
            sets = {set_name: data}

        class_to_use = h5py.File
        if use_pandas:
            class_to_use = MyHDFStore

        if self.fs_kind == "local":
            thefile = class_to_use(self.path, mode=mode, **kwargs)
            for aset in sets:
                thefile[aset] = sets[aset]
            thefile.close()
        else:
            with tempfile.NamedTemporaryFile(delete=True, suffix=".hdf5") as f:
                thefile = class_to_use(f.name, mode=mode, **kwargs)
                for aset in sets:
                    thefile[aset] = sets[aset]
                thefile.close()
                TransparentPath(
                    path=f.name,
                    fs="local",
                    notupdatecache=self.notupdatecache,
                    nocheck=self.nocheck,
                    when_checked=self.when_checked,
                    when_updated=self.when_updated,
                    update_expire=self.update_expire,
                    check_expire=self.check_expire,
                ).put(self.path)
def to_hdf5_dask(self, data: Any = None, set_name: str = None, use_pandas: bool = False, **kwargs) ‑> Optional[None]
Expand source code
def write_hdf5(
    self, data: Any = None, set_name: str = None, use_pandas: bool = False, **kwargs,
) -> Union[None, "h5py.File"]:

    if not hdf5_ok:
        raise ImportError(errormessage_hdf5)

    if use_pandas:
        raise NotImplementedError("TransparentPath does not support storing Dask objects in pandas's HDFStore yet.")

    if self.suffix != ".hdf5" and self.suffix != "h5":
        warnings.warn(f"path {self} does not have '.h(df)5' as suffix while using to_hdf5. The path will be "
                      f"changed to a path with '.hdf5' as suffix")
        self.change_suffix(".hdf5")

    if not self.nocheck:
        self._check_multiplicity()

    if self.__class__.cli is None:
        self.__class__.cli = client.Client()
    check_kwargs(dd.to_hdf, kwargs)

    mode = "w"
    if "mode" in kwargs:
        mode = kwargs["mode"]
        del kwargs["mode"]

    if isinstance(data, dict):
        sets = data
    else:
        if set_name is None:
            set_name = "data"
        sets = {set_name: data}

    if self.fs_kind == "local":
        for aset in sets:
            dd.to_hdf(sets[aset], self.__fspath__(), aset, mode=mode, **kwargs)
    else:
        with tempfile.NamedTemporaryFile() as f:
            futures = self.__class__.cli.map(
                dd.to_hdf, list(sets.values()), [f.name] * len(sets), list(sets.keys()), mode=mode, **kwargs
            )
            self.__class__.cli.gather(futures)
            TransparentPath(path=f.name, fs="local", bucket=self.bucket).put(self.path)
    return
def to_json(self, data: Any, overwrite: bool = True, present: str = 'ignore', **kwargs)
Expand source code
def write(self, data: Any, overwrite: bool = True, present: str = "ignore", **kwargs):

    if self.suffix != ".json":
        warnings.warn(
            f"path {self} does not have '.json' as suffix while using to_json. The path will be "
            "changed to a path with '.json' as suffix"
        )
        self.change_suffix(".json")
    jsonified = json.dumps(data, cls=JSONEncoder)
    self.write_stuff(
        jsonified,
        "w",
        overwrite=overwrite,
        present=present,
        **kwargs,
    )
def to_parquet(self, data, *args, **kwargs)
Expand source code
def to_parquet(self, data, *args, **kwargs):
    if "dask" in str(type(data)):
        self.to_parquet_dask(data, *args, **kwargs)
    else:
        self.to_parquet_classic(data, *args, **kwargs)
def to_parquet_classic(self, data: Union[pandas.core.frame.DataFrame, pandas.core.series.Series], overwrite: bool = True, present: str = 'ignore', columns_to_string: bool = True, to_dataframe: bool = True, **kwargs) ‑> None

Warning : if data is a Dask dataframe, the output will be written in a directory. For convenience, the directory if self.with_suffix(""). Reading is transparent and one can specify a path with .parquet suffix.

Expand source code
def write(
    self,
    data: Union[pd.DataFrame, pd.Series],
    overwrite: bool = True,
    present: str = "ignore",
    columns_to_string: bool = True,
    to_dataframe: bool = True,
    **kwargs,
) -> None:
    """
    Warning : if data is a Dask dataframe, the output will be written in a directory. For convenience, the directory
    if self.with_suffix(""). Reading is transparent and one can specify a path with .parquet suffix.
    """

    if self.suffix != ".parquet":
        warnings.warn(f"path {self} does not have '.parquet' as suffix while using to_parquet. The path will be "
                      f"changed to a path with '.parquet' as suffix")
        self.change_suffix(".parquet")

    if not overwrite and self.is_file() and present != "ignore":
        raise FileExistsError()

    if to_dataframe and isinstance(data, pd.Series):
        name = data.name
        data = pd.DataFrame(data=data)
        if name is not None:
            data.columns = [name]

    if columns_to_string and not isinstance(data.columns[0], str):
        # noinspection PyUnresolvedReferences
        data.columns = data.columns.astype(str)

    # noinspection PyTypeChecker
    check_kwargs(data.to_parquet, kwargs)
    if "engine" in kwargs:
        engine = kwargs["engine"]
        del kwargs["engine"]
    else:
        engine = "pyarrow"
    if "compression" in kwargs:
        compression = kwargs["compression"]
        del kwargs["compression"]
    else:
        compression = "snappy"
    if (self.fs_kind != "local") and ((engine != "pyarrow") or (compression != "snappy")):
        with tempfile.NamedTemporaryFile(delete=True, suffix=".parquet") as f:
            data.to_parquet(f.name, engine=engine, compression=compression, **kwargs)
            TransparentPath(
                path=f.name,
                fs="local",
                notupdatecache=self.notupdatecache,
                nocheck=self.nocheck,
                when_checked=self.when_checked,
                when_updated=self.when_updated,
                update_expire=self.update_expire,
                check_expire=self.check_expire,
            ).put(self.path)
    elif self.fs_kind == "local":
        data.to_parquet(str(self), engine=engine, compression=compression, **kwargs)
    else:
        data.to_parquet(self.open("wb"), engine=engine, compression=compression, **kwargs)
def to_parquet_dask(self, data: Union[pandas.core.frame.DataFrame, pandas.core.series.Series, dask.dataframe.core.DataFrame], overwrite: bool = True, present: str = 'ignore', columns_to_string: bool = True, **kwargs) ‑> None
Expand source code
def write_parquet(
    self,
    data: Union[pd.DataFrame, pd.Series, dd.DataFrame],
    overwrite: bool = True,
    present: str = "ignore",
    columns_to_string: bool = True,
    **kwargs,
) -> None:

    if not parquet_ok:
        raise ImportError(errormessage_hdf5)

    if self.suffix != ".parquet":
        warnings.warn(f"path {self} does not have '.parquet' as suffix while using to_parquet. The path will be "
                      f"changed to a path with '.parquet' as suffix")
        self.change_suffix(".parquet")

    compression = kwargs.get("compression", None)

    if compression is not None and compression != "snappy":
        warnings.warn("TransparentPath can not write parquet files with a compression that is not snappy. You "
                      f"specified '{compression}', it will be replaced by 'snappy'.")

    if not self.nocheck:
        self._check_multiplicity()

    if not overwrite and self.is_file() and present != "ignore":
        raise FileExistsError()

    if columns_to_string and not isinstance(data.columns[0], str):
        data.columns = data.columns.astype(str)

    if self.__class__.cli is None:
        self.__class__.cli = client.Client()
    check_kwargs(dd.to_parquet, kwargs)
    dd.to_parquet(data, self.with_suffix("").__fspath__(), engine="pyarrow", compression="snappy", **kwargs)
def to_plotly_json(self)

For compatibility with Plotly Dash

Expand source code
def to_plotly_json(self):
    """For compatibility with Plotly Dash"""
    return str(self)
def touch(self, present: str = 'ignore', **kwargs) ‑> None

Creates the file corresponding to self if does not exist.

Raises FileExistsError if there already is an object that is not a file at self. Default behavior is to create parent directories of the file if needed. This can be canceled by passing 'create_parents=False', but only if not using GCS, since directories are not a thing on GCS.

Parameters

present : str
What to do if there is already something at self. Can be "raise" or "ignore" (Default value = "ignore")
kwargs
The kwargs to pass to file system's touch method

Returns

None
 
Expand source code
def touch(self, present: str = "ignore", **kwargs) -> None:
    """Creates the file corresponding to self if does not exist.

    Raises FileExistsError if there already is an object that is not a file at self. Default behavior is to
    create parent directories of the file if needed. This can be canceled by passing 'create_parents=False', but
    only if not using GCS, since directories are not a thing on GCS.


    Parameters
    ----------
    present: str
        What to do if there is already something at self. Can be "raise" or "ignore" (Default value = "ignore")

    kwargs
        The kwargs to pass to file system's touch method


    Returns
    -------
    None

    """

    if present != "raise" and present != "ignore":
        raise ValueError(f"Unexpected value for argument 'present' : {present}")

    if self.exists():
        if self.is_file() and present == "raise":
            raise FileExistsError
        elif not self.is_file():
            raise FileExistsError(f"There is already an object at {self} which is not a file.")
        else:
            return

    for parent in reversed(self.parents):
        p = TransparentPath(
            parent,
            fs=self.fs_kind,
            bucket=self.bucket,
            notupdatecache=self.notupdatecache,
            nocheck=self.nocheck,
            when_checked=self.when_checked,
            when_updated=self.when_updated,
            update_expire=self.update_expire,
            check_expire=self.check_expire,
        )
        if p.is_file():
            raise FileExistsError(f"A parent directory can not be created because there is already a file at {p}")
        elif not p.exists():
            p.mkdir()

    self.fs.touch(self.__fspath__(), **kwargs)
def uncache(self) ‑> None

remove data from cache

Expand source code
def uncache(self) -> None:
    """
    remove data from cache
    """
    if self.enable_caching:
        if self.__hash__() in TransparentPath.cached_data_dict.keys():
            TransparentPath.cached_data_dict.pop(self.__hash__())
        else:
            warnings.warn(f"{self} is not in cache", TPCachingWarning)

Alias of rm, to match pathlib.Path method

Expand source code
def unlink(self, **kwargs) -> None:
    """Alias of rm, to match pathlib.Path method"""
    self.rm(**kwargs)
def update_tpcache(self, data) ‑> None
Expand source code
def update_tpcache(self, data) -> None:
    if self.enable_caching:
        if self.caching == "ram":
            if self.__hash__() in TransparentPath.cached_data_dict.keys():
                TransparentPath.cached_data_dict[self.__hash__()]["data"] = data
        elif self.caching == "tmpfile" and self.fs_kind != "local":
            if self.__hash__() in TransparentPath.cached_data_dict.keys():
                TransparentPath(TransparentPath.cached_data_dict[self.__hash__()]["file"].name, fs="local").write(
                    data
                )
def walk(self) ‑> Iterator[Tuple[TransparentPath, List[TransparentPath], List[TransparentPath]]]

Like os.walk, except all outputs are TransparentPaths (so, absolute paths)

Returns

Iterator[Tuple[TransparentPath, List[TransparentPath], List[TransparentPath]]]
root, dirs and files, like os.walk
Expand source code
def walk(self) -> Iterator[Tuple[TransparentPath, List[TransparentPath], List[TransparentPath]]]:
    """Like os.walk, except all outputs are TransparentPaths (so, absolute paths)

    Returns
    -------
    Iterator[Tuple[TransparentPath, List[TransparentPath], List[TransparentPath]]]
        root, dirs and files, like os.walk
    """

    if self.when_checked["used"] and not self.nocheck:
        self._check_multiplicity()
    # No need to update cache for walk

    outputs = self.fs.walk(self.__fspath__())
    for output in outputs:
        root = TransparentPath(
            output[0],
            fs=self.fs_kind,
            bucket=self.bucket,
            notupdatecache=self.notupdatecache,
            nocheck=self.nocheck,
            when_checked=self.when_checked,
            when_updated=self.when_updated,
            update_expire=self.update_expire,
            check_expire=self.check_expire,
        )
        dirs = [root / p for p in output[1]]
        files = [root / p for p in output[2]]
        yield root, dirs, files
def with_suffix(self, suffix: str) ‑> TransparentPath

Returns a new TransparentPath object with a changed suffix Uses the with_suffix method of pathlib.Path

Parameters

suffix : str
suffix to use, with the dot ('.pdf', '.py', etc ..). Can also use '' to remove the suffix.

Returns

TransparentPath
 
Expand source code
def with_suffix(self, suffix: str) -> TransparentPath:
    """Returns a new TransparentPath object with a changed suffix
    Uses the with_suffix method of pathlib.Path


    Parameters
    -----------
    suffix: str
        suffix to use, with the dot ('.pdf', '.py', etc ..). Can also use '' to remove the suffix.

    Returns
    --------
    TransparentPath

    """
    if not suffix.startswith(".") and not suffix == "":
        suffix = f".{suffix}"
    return TransparentPath(
        self.__path.with_suffix(suffix),
        fs=self.fs_kind,
        bucket=self.bucket,
        notupdatecache=self.notupdatecache,
        nocheck=self.nocheck,
        when_checked=self.when_checked,
        when_updated=self.when_updated,
        update_expire=self.update_expire,
        check_expire=self.check_expire,
    )
def write(self, data: Any, *args, set_name: str = 'data', use_pandas: bool = False, overwrite: bool = True, present: str = 'ignore', make_parents: bool = False, **kwargs) ‑> Union[None, pd.HDFStore, h5py.File]

Method used to write the content of the file located at self Calls a specific method to write data based on the suffix of self.path: 1: .csv : will use pandas's to_csv 2: .parquet : will use pandas's to_parquet with pyarrow engine 3: .hdf5 or .h5 : will use h5py.File. Since it does not support remote file systems, the file will be created localy in a tmp filen written to, then uploaded and removed localy. 4: .json : will use jsonencoder.JSONEncoder class. Works with DataFrames and np.ndarrays too. 5: .xlsx : will use pandas's to_excel 5: any other suffix : uses self.open to write to an IO Buffer Parameters


data : Any
The data to write
set_name : str
Name of the dataset to write. Only relevant if using HDF5 (Default value = 'data')
use_pandas : bool
Must pass it as True if hdf file must be written using HDFStore and not h5py.File
overwrite : bool
If True, any existing file will be overwritten. Only relevant for csv, hdf5 and parquet files, since others use the 'open' method, which args already specify what to do (Default value = True).
present : str
Indicates what to do if overwrite is False and file is present. Here too, only relevant for csv, hsf5 and parquet files.
make_parents : bool
If True and if the parent arborescence does not exist, it is created. (Default value = False)

args: any args to pass to the underlying writting method kwargs: any kwargs to pass to the underlying reading method Returns


Union[None, pd.HDFStore, h5py.File]
 
Expand source code
def write(
    self,
    data: Any,
    *args,
    set_name: str = "data",
    use_pandas: bool = False,
    overwrite: bool = True,
    present: str = "ignore",
    make_parents: bool = False,
    **kwargs,
) -> Union[None, "pd.HDFStore", "h5py.File"]:
    """Method used to write the content of the file located at self
    Calls a specific method to write data based on the suffix of self.path:
        1: .csv : will use pandas's to_csv
        2: .parquet : will use pandas's to_parquet with pyarrow engine
        3: .hdf5 or .h5 : will use h5py.File. Since it does not support remote file systems, the file will be
        created localy in a tmp filen written to, then uploaded and removed localy.
        4: .json : will use jsonencoder.JSONEncoder class. Works with DataFrames and np.ndarrays too.
        5: .xlsx : will use pandas's to_excel
        5: any other suffix : uses self.open to write to an IO Buffer
    Parameters
    ----------
    data: Any
        The data to write
    set_name: str
        Name of the dataset to write. Only relevant if using HDF5 (Default value = 'data')
    use_pandas: bool
        Must pass it as True if hdf file must be written using HDFStore and not h5py.File
    overwrite: bool
        If True, any existing file will be overwritten. Only relevant for csv, hdf5 and parquet files,
        since others use the 'open' method, which args already specify what to do (Default value = True).
    present: str
        Indicates what to do if overwrite is False and file is present. Here too, only relevant for csv,
        hsf5 and parquet files.
    make_parents: bool
        If True and if the parent arborescence does not exist, it is created. (Default value = False)
    args:
        any args to pass to the underlying writting method
    kwargs:
        any kwargs to pass to the underlying reading method
    Returns
    -------
    Union[None, pd.HDFStore, h5py.File]
    """
    # Update cache and/or check multiplicity are called inside each specific reading method

    if make_parents and not self.parent.is_dir():
        self.parent.mkdir()

    if self.suffix != ".hdf5" and self.suffix != ".h5" and data is None:
        data = args[0]
        args = args[1:]

    if self.suffix == ".csv":
        ret = self.to_csv(
            data=data,
            overwrite=overwrite,
            present=present,
            **kwargs,
        )
        if ret is not None:
            # To skip the assert at the end of the function. Indeed if something is returned it means we used
            # Dask, which will have written files with a different name than self, so the assert would fail.
            return
    elif self.suffix == ".parquet":
        self.to_parquet(
            data=data,
            overwrite=overwrite,
            present=present,
            **kwargs,
        )
        if "dask" in str(type(data)):
            # noinspection PyUnresolvedReferences
            assert self.with_suffix("").is_dir(exist=True)
            return
    elif self.suffix == ".hdf5" or self.suffix == ".h5":
        ret = self.to_hdf5(
            data=data,
            set_name=set_name,
            use_pandas=use_pandas,
            **kwargs,
        )
        if ret is not None:
            # will not cache the changes for they will happen outside TransparentPath
            return ret
    elif self.suffix == ".json":
        self.to_json(
            data=data,
            overwrite=overwrite,
            present=present,
            **kwargs,
        )
    elif self.suffix == ".txt":
        self.write_stuff(
            *args,
            data=data,
            overwrite=overwrite,
            present=present,
            **kwargs,
        )
    elif self.suffix in [".xlsx", ".xls", ".xlsm"]:
        self.to_excel(
            data=data,
            overwrite=overwrite,
            present=present,
            **kwargs,
        )
    else:
        self.write_bytes(
            *args,
            data=data,
            overwrite=overwrite,
            present=present,
            **kwargs,
        )
    self.update_tpcache(data)
    assert self.is_file()
def write_bytes(self, data: Any, *args, overwrite: bool = True, present: str = 'ignore', **kwargs) ‑> None
Expand source code
def write_bytes(self, data: Any, *args, overwrite: bool = True, present: str = "ignore", **kwargs,) -> None:

    args = list(args)
    if len(args) == 0:
        args = ("wb",)
    if "b" not in args[0]:
        args[0] += "b"

    self.write_stuff(data, *tuple(args), overwrite=overwrite, present=present, **kwargs)
def write_stuff(self, data: Any, *args, overwrite: bool = True, present: str = 'ignore', **kwargs) ‑> None
Expand source code
def write_stuff(self, data: Any, *args, overwrite: bool = True, present: str = "ignore", **kwargs) -> None:

    if not overwrite and self.is_file() and present != "ignore":
        raise FileExistsError()

    args = list(args)
    if len(args) == 0:
        args = ("w",)

    with self.open(*args, **kwargs) as f:
        f.write(data)