[docs]classHttpFilerStrategy(FilerStrategy):"""Filer strategy for HTTP and HTTPS."""def__init__(self,payload:TesInput|TesOutput):"""Initialize the HTTP filer strategy. Args: payload: The payload to instantiate the strategy implementation. """super().__init__(payload)self.input=self.payload
[docs]asyncdefdownload_input_file(self,container_path:str):"""Download the input file from the HTTP or HTTPS URI. Args: container_path: The path to download the file to. """ifself.input.urlisNone:raiseValueError("URL is required")response=requests.get(self.input.url,stream=True,timeout=30)withopen(container_path,"wb")asf:forchunkinresponse.iter_content(chunk_size=8192):ifchunk:f.write(chunk)
[docs]asyncdefdownload_input_directory(self,container_path:str):"""Download the input directory from the HTTP or HTTPS URI. Args: container_path: The path to download the file to. """raiseNotImplementedError("Downloading directory over HTTP or HTTPS is not supported")
[docs]asyncdefupload_output_file(self,container_path:str):"""Upload the output file to the HTTP or HTTPS URI. Args: output: The output file to upload. container_path: The path to upload the file from. """raiseNotImplementedError("Uploading to HTTP or HTTPS is not supported")
[docs]asyncdefupload_output_directory(self,container_path:str):"""Upload the output directory to the HTTP or HTTPS URI. Args: output: The output file to upload. container_path: The path to upload the file from. """raiseNotImplementedError("Uploading to HTTP or HTTPS is not supported")
[docs]asyncdefupload_glob(self,glob_files:list[tuple[str,str,bool]]):"""Upload files using glob patterns."""raiseNotImplementedError("Uploading to HTTP or HTTPS is not supported")