[docs]classFilerStrategy(ABC):"""Filer strategy interface."""def__init__(self,payload:TesInput|TesOutput):"""Initialize the filer strategy. Args: payload: input or output object from the TES task request. """self.payload=payload
[docs]@abstractmethodasyncdefdownload_input_file(self,container_path:str):"""Download file from storage and mount to PVC. Args: container_path: The path inside the container from where the file needs to be downloaded to the storage. """pass
[docs]@abstractmethodasyncdefdownload_input_directory(self,container_path:str):"""Download the directory content from storage and mount to PVC. Args: container_path: The path inside the container from where the file needs to be downloaded to the storage. """pass
[docs]@abstractmethodasyncdefupload_output_file(self,container_path:str):"""Upload file to storage created by executors, mounted to PVC. Args: container_path: The path inside the container from where the file needs to be uploaded to the storage. """pass
[docs]@abstractmethodasyncdefupload_output_directory(self,container_path:str):"""Upload directory to storage created by executors, mounted to PVC. Args: container_path: The path inside the container from where the file needs to be uploaded to the storage. """pass
[docs]@abstractmethodasyncdefupload_glob(self,glob_files:list[tuple[str,str,bool]]):"""Upload files and directories when wildcards are present. Args: glob_files: List of tuples containing (file_path, relative_path, is_directory) """pass
def_get_container_path(self,path:str)->str:"""Get the container path for the file. For each path say `/data/f1/f2/file1`, the container path will be `/transfer/f1/f2/file1`, this way this location can be mounted to PVC at `/data` path, retaining the original path structure, ie `/data/f1/f2/file1`. Note: This method creates the `container_path` if it doesn't exists. Args: path: The path of the file. """container_path=os.path.join(core_constants.K8s.FILER_PVC_PATH,path.lstrip("/"),)os.makedirs(os.path.dirname(container_path),exist_ok=True)returncontainer_pathdef_get_path_as_in_exec_pod(self,path:str)->str:"""Get the path of the file as it was in exec pod. Note: This is done because file structure mounted in the filer pod is different from that of the executor pod. Args: path: The string path obtained from glob. Returns: str: Path of the file as it was in the executor path. """pvc_base=core_constants.K8s.FILER_PVC_PATHifpath.startswith(pvc_base):return"/"+path[len(pvc_base):].lstrip("/")returnpath
[docs]asyncdefdownload(self):"""Download file from storage and mount to PVC. Get the appropriate secrets, check permissions and download the file. """container_path=self._get_container_path(self.payload.path)ifself.payload.type==TesFileType.FILE:awaitself.download_input_file(container_path)else:awaitself.download_input_directory(container_path)
def_get_glob_files(self,container_path:str)->list[tuple[str,str,bool]]:"""Get the list of the files and directories from wildcards. Note: tuple[0] is the path of the file/directory, tuple[1] is the path from which the prefix `path_prefix` have been removed, and tuple[2] is a boolean indicating if the item is a directory. Each protocol might handle that URL differently, hence each `upload_glob` method should take care of this URL based on its own implementation and requirement. Returns: list[tuple[str, str, bool]]: List of tuple of file/directory path, its prefix removed path that needs to be appended to url, and whether it's a directory. """assertisinstance(self.payload,TesOutput)assertself.payload.path_prefixisnotNone_ret:list[tuple[str,str,bool]]=[]matched_items=glob(container_path)foriteminmatched_items:path_prefix=self.payload.path_prefix_file_path=(self._get_path_as_in_exec_pod(item).removeprefix(path_prefix).lstrip("/"))is_directory=os.path.isdir(item)_ret.append((item,_file_path,is_directory))return_ret
[docs]asyncdefupload(self):"""Upload file to storage created by executors, mounted to PVC. Get the appropriate secrets, check permissions and upload the file. """container_path=self._get_container_path(self.payload.path)ifisinstance(self.payload,TesOutput)andself.payload.path_prefix:awaitself.upload_glob(self._get_glob_files(container_path))elifself.payload.type==TesFileType.FILE:awaitself.upload_output_file(container_path)else:awaitself.upload_output_directory(container_path)