Source code for gen3.file

import json
import requests
import json
import asyncio
import aiohttp
import aiofiles
import time
from tqdm import tqdm
from types import SimpleNamespace as Namespace
import os
import requests
from pathlib import Path

from cdislogging import get_logger

from gen3.index import Gen3Index
from gen3.utils import DEFAULT_BACKOFF_SETTINGS, raise_for_status_and_print_error
from urllib.parse import parse_qsl, urlencode, urlparse, urlunparse

logging = get_logger("__name__")


MAX_RETRIES = 3


[docs] class Gen3File: """For interacting with Gen3 file management features. A class for interacting with the Gen3 file download services. Supports getting presigned urls right now. Args: auth_provider (Gen3Auth): A Gen3Auth class instance. Examples: This generates the Gen3File class pointed at the sandbox commons while using the credentials.json downloaded from the commons profile page. >>> auth = Gen3Auth(refresh_file="credentials.json") ... file = Gen3File(auth) """ def __init__(self, endpoint=None, auth_provider=None): # auth_provider legacy interface required endpoint as 1st arg self._auth_provider = auth_provider or endpoint self._endpoint = self._auth_provider.endpoint self.unsuccessful_downloads = []
[docs] def get_presigned_url(self, guid, protocol=None): """Generates a presigned URL for a file. Retrieves a presigned url for a file giving access to a file for a limited time. Args: guid (str): The GUID for the object to retrieve. protocol (:obj:`str`, optional): The protocol to use for picking the available URL for generating the presigned URL. Examples: >>> Gen3File.get_presigned_url(query) """ api_url = "{}/user/data/download/{}".format(self._endpoint, guid) if protocol: api_url += "?protocol={}".format(protocol) resp = requests.get(api_url, auth=self._auth_provider) raise_for_status_and_print_error(resp) try: return resp.json() except: return resp.text
[docs] def delete_file(self, guid): """ This method is DEPRECATED. Use delete_file_locations() instead. Delete all locations of a stored data file and remove its record from indexd Args: guid (str): provide a UUID for file id to delete Returns: text: requests.delete text result """ print("This method is DEPRECATED. Use delete_file_locations() instead.") api_url = "{}/user/data/{}".format(self._endpoint, guid) output = requests.delete(api_url, auth=self._auth_provider).text return output
[docs] def delete_file_locations(self, guid): """ Delete all locations of a stored data file and remove its record from indexd Args: guid (str): provide a UUID for file id to delete Returns: requests.Response : requests.delete result """ api_url = "{}/user/data/{}".format(self._endpoint, guid) output = requests.delete(api_url, auth=self._auth_provider) return output
[docs] def upload_file( self, file_name, authz=None, protocol=None, expires_in=None, bucket=None ): """ Get a presigned url for a file to upload Args: file_name (str): file_name to use for upload authz (list): authorization scope for the file as list of paths, optional. protocol (str): Storage protocol to use for upload: "s3", "az". If this isn't set, the default will be "s3" expires_in (int): Amount in seconds that the signed url will expire from datetime.utcnow(). Be sure to use a positive integer. This value will also be treated as <= MAX_PRESIGNED_URL_TTL in the fence configuration. bucket (str): Bucket to upload to. The bucket must be configured in the Fence instance's `ALLOWED_DATA_UPLOAD_BUCKETS` setting. If not specified, Fence defaults to the `DATA_UPLOAD_BUCKET` setting. Returns: Document: json representation for the file upload """ api_url = f"{self._endpoint}/user/data/upload" body = {} if protocol: body["protocol"] = protocol if authz: body["authz"] = authz if expires_in: body["expires_in"] = expires_in if file_name: body["file_name"] = file_name if bucket: body["bucket"] = bucket headers = {"Content-Type": "application/json"} resp = requests.post( api_url, auth=self._auth_provider, json=body, headers=headers ) raise_for_status_and_print_error(resp) try: data = json.loads(resp.text) except: return resp.text return data
def _ensure_dirpath_exists(path: Path) -> Path: """Utility to create a directory if missing. Returns the path so that the call can be inlined in another call Args: path (Path): path to create Returns path of created directory """ assert path out_path: Path = path if not out_path.exists(): out_path.mkdir(parents=True, exist_ok=True) return out_path
[docs] def download_single(self, object_id, path): """ Download a single file using its GUID. Args: object_id (str): The file's unique ID path (str): Path to store the downloaded file at """ try: url = self.get_presigned_url(object_id) except Exception as e: logging.critical(f"Unable to get a presigned URL for download: {e}") return False response = requests.get(url["url"], stream=True) if response.status_code != 200: logging.error(f"Response code: {response.status_code}") if response.status_code >= 500: for _ in range(MAX_RETRIES): logging.info("Retrying now...") # NOTE could be updated with exponential backoff time.sleep(1) response = requests.get(url["url"], stream=True) if response.status == 200: break if response.status != 200: logging.critical("Response status not 200, try again later") return False else: return False response.raise_for_status() total_size_in_bytes = int(response.headers.get("content-length")) total_downloaded = 0 index = Gen3Index(self._auth_provider) record = index.get_record(object_id) filename = record["file_name"] out_path = Gen3File._ensure_dirpath_exists(Path(path)) with open(os.path.join(out_path, filename), "wb") as f: for data in response.iter_content(4096): total_downloaded += len(data) f.write(data) if total_size_in_bytes == total_downloaded: logging.info(f"File {filename} downloaded successfully") else: logging.error(f"File {filename} not downloaded successfully") return False return True
[docs] def upload_file_to_guid( self, guid, file_name, protocol=None, expires_in=None, bucket=None ): """ Get a presigned url for a file to upload to the specified existing GUID Args: file_name (str): file_name to use for upload protocol (str): Storage protocol to use for upload: "s3", "az". If this isn't set, the default will be "s3" expires_in (int): Amount in seconds that the signed url will expire from datetime.utcnow(). Be sure to use a positive integer. This value will also be treated as <= MAX_PRESIGNED_URL_TTL in the fence configuration. bucket (str): Bucket to upload to. The bucket must be configured in the Fence instance's `ALLOWED_DATA_UPLOAD_BUCKETS` setting. If not specified, Fence defaults to the `DATA_UPLOAD_BUCKET` setting. Returns: Document: json representation for the file upload """ url = f"{self._endpoint}/user/data/upload/{guid}" params = {} if protocol: params["protocol"] = protocol if expires_in: params["expires_in"] = expires_in if file_name: params["file_name"] = file_name if bucket: params["bucket"] = bucket url_parts = list(urlparse(url)) query = dict(parse_qsl(url_parts[4])) query.update(params) url_parts[4] = urlencode(query) url = urlunparse(url_parts) resp = requests.get(url, auth=self._auth_provider) raise_for_status_and_print_error(resp) return resp.json()