Source code for gen3.tools.download.drs_download

"""
Module for downloading and listing JSON DRS manifest and DRS objects. The main classes in
this module for downloading DRS objects are DownloadManager and Manifest.

    Examples:
        This generates the Gen3Jobs class pointed at the sandbox commons while
        using the credentials.json downloaded from the commons profile page.

        >>> datafiles = Manifest.load('sample/manifest_1.json')
            downloadManager = DownloadManager("source.my_commons.org",
                              Gen3Auth(refresh_file="~.gen3/my_credentials.json"), datafiles)
            for i in datafiles:
                print(i)
            downloadManager.download(datafiles, ".")

        See docs/howto/drsDownloading.md for more details

"""


import re
import os
from dataclasses import dataclass, field
from datetime import datetime, timezone, timedelta
from enum import Enum
from json import load as json_load, loads as json_loads, JSONDecodeError
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple

import humanfriendly
import requests
import zipfile
from cdislogging import get_logger
from dataclasses_json import dataclass_json, LetterCase, Undefined
from dateutil import parser as date_parser
from tqdm import tqdm
from urllib.parse import urlparse

from gen3.auth import Gen3Auth, Gen3AuthError, decode_token
from gen3.auth import _handle_access_token_response
from gen3.tools.download.drs_resolvers import resolve_drs
from gen3.utils import remove_trailing_whitespace_and_slashes_in_url
from gen3.metadata import Gen3Metadata

DEFAULT_EXPIRE: timedelta = timedelta(hours=1)

# package formats we handle for unpacking
PACKAGE_EXTENSIONS = [".zip"]

logger = get_logger("__name__")


# Add undefined=Undefined.EXCLUDE here because we only cares if the input manifest has the minimal required metadata fields for data download, any extra metadata fields should be ignored and should not cause a failure
[docs] @dataclass_json(letter_case=LetterCase.SNAKE, undefined=Undefined.EXCLUDE) @dataclass class Manifest: """Data class representing a Gen3 JSON manifest typically exported from a Gen3 discovery page. The class is passed to the DownloadManager to download or list all of the files in the manifest. The Download manager will cache additional information (if available) Attributes: object_id(str): the DRS object id. This is the only attribute that needs to be defined file_size(Optional[int]): the filesize of the object, if contained in the manifest file_name(Optional[str]): the name of the file pointed to by the DRS object id md5sum(Optional[str]): the checksum of the object commons_url(Optional[str]): url of the indexd server to retrieve file/bundle from """ object_id: str # only required member file_size: Optional[int] = -1 # -1 indicated not set file_name: Optional[str] = None md5sum: Optional[str] = None commons_url: Optional[str] = None
[docs] @staticmethod def load_manifest(path: Path): """Loads a json manifest""" with open(path, "rt") as fin: data = json_load(fin) return Manifest.schema().load(data, many=True)
[docs] @staticmethod def create_object_list(manifest) -> List["Downloadable"]: """Create a list of Downloadable instances from the manifest Args: manifest(list): list of manifest objects Returns: List of Downloadable instances """ results = [] for entry in manifest: results.append( Downloadable( object_id=entry.object_id, hostname=remove_trailing_whitespace_and_slashes_in_url( entry.commons_url ), ) ) return results
[docs] @staticmethod def load(filename: Path) -> Optional[List["Downloadable"]]: """ Method to load a json manifest and return a list of Bownloadable object. This list is passed to the DownloadManager methods of download, and list Args: filename(Path): path to manifest file Returns: list of Downloadable objects if successfully opened/parsed None otherwise """ try: manifest = Manifest.load_manifest(filename) return Manifest.create_object_list(manifest) except FileNotFoundError as ex: logger.critical(f"Error file not found: {ex.filename}") except JSONDecodeError as ex: logger.critical(f"format of manifest file is valid JSON: {ex.msg}") return None
@dataclass class KnownDRSEndpoint: """ Dataclass used internally by the DownloadManager class to cache hostnames and tokens for Gen3 commons possibly accessed using the Workspace Token Service (WTS). The endpoint is assumed to support DRS and therefore caches additional DRS information. Attributes: hostname (str): hostname of DRS server last_refresh_time (datetime): last time the token has been refreshed idp (Optional[str]): cached idp information access_token (Optional[str]): current bearer token initially None identifier (Optional[str]): DRS prefix (if used) use_wts (bool): if True use WTS to create tokens """ hostname: str expire: datetime = None idp: Optional[str] = None access_token: Optional[str] = None identifier: Optional[str] = None use_wts: bool = True @property def available(self): """If endpoint has access token it is available""" return self.access_token is not None def expired(self) -> bool: """check if WTS token is older than the default expiration date If not using the WTS return false and use standard Gen3 Auth """ if not self.use_wts: return False return datetime.now() > self.expire def renew_token(self, wts_server_name: str, server_access_token): """Gets a new token from the WTS and updates the token and refresh time Args: wts_server_name (str): hostname of WTS server server_access_token (str): token used to authenticate use of WTS """ token = wts_get_token( hostname=wts_server_name, idp=self.idp, access_token=server_access_token, ) token_info = decode_token(token) # TODO: this would break if user is trying to download object from different commons # keep BRH token and wts sparate self.access_token = token self.expire = datetime.fromtimestamp(token_info["exp"]) class DRSObjectType(str, Enum): """Enum defining the 3 possible DRS object types.""" unknown = "unknown" object = "object" bundle = "bundle"
[docs] @dataclass class Downloadable: """ Class handling the information for a DRS object. The information is populated from the manifest or by retrieving the information from a DRS server. Attributes: object_id (str): DRS object id (REQUIRED) object_type (DRSObjectType): type of DRS object hostname (str): hostname of DRS object file_size (int): size in bytes file_name (str): name of file updated_time (datetime): timestamp of last update to file created_time (datetime): timestamp when file is created access_methods (List[Dict[str, Any]]): list of access methods (e.g. s3) for DRS object children (List[Downloadable]): list of child objects (in the case of DRS bundles) _manager (DownloadManager): manager for this Downloadable """ object_id: str object_type: Optional[DRSObjectType] = DRSObjectType.unknown hostname: Optional[str] = None file_size: Optional[int] = -1 file_name: Optional[str] = None updated_time: Optional[datetime] = None created_time: Optional[datetime] = None access_methods: List[Dict[str, Any]] = field(default_factory=list) children: List["Downloadable"] = field(default_factory=list) _manager = None def __str__(self): return ( f'{self.file_name if self.file_name is not None else "not available" : >45}; ' f"{humanfriendly.format_size(self.file_size) :>12}; " f'{self.hostname if self.hostname is not None else "not resolved"}; ' f'{self.created_time.strftime("%m/%d/%Y, %H:%M:%S") if self.created_time is not None else "not available"}' ) def __repr__(self): return ( f'(Downloadable: {self.file_name if self.file_name is not None else "not available"}; ' f"{humanfriendly.format_size(self.file_size)}; " f'{self.hostname if self.hostname is not None else "not resolved"}; ' f'{self.created_time.strftime("%m/%d/%Y, %H:%M:%S") if self.created_time is not None else "not available"})' )
[docs] def download(self): """calls the manager to download this object. Allows Downloadables to be self downloading""" self._manager.download([self])
[docs] def pprint(self, indent: str = ""): """ Pretty prints the object information. This is used for listing an object. In the case of a DRS bundle the child objects are listed similar to the linux tree command """ from os import linesep res = self.__str__() + linesep child_indent = f"{indent} " pos = -1 for x in self.children: pos += 1 if pos == len(self.children) - 1: res += f"{child_indent}└── {x.pprint(child_indent)}" else: res += f"{child_indent}├── {x.pprint(child_indent)}" return res
[docs] @dataclass class DownloadStatus: """Stores the download status of objectIDs. The DataManager will return a list of DownloadStatus as a result of calling the download method Status is "pending" until it is downloaded or an error occurs. Attributes: filename (str): the name of the file to download status (str): status of file download initially "pending" start_time (Optional[datetime]): start time of download as datetime initially None end_time (Optional[datetime]): end time of download as datetime initially None """ filename: str status: str = "pending" start_time: Optional[datetime] = None end_time: Optional[datetime] = None def __str__(self): return ( f'filename: {self.filename if self.filename is not None else "not available"}; ' f"status: {self.status}; " f'start_time: {self.start_time.strftime("%m/%d/%Y, %H:%M:%S") if self.start_time is not None else "n/a"}; ' f'end_time: {self.end_time.strftime("%m/%d/%Y, %H:%M:%S") if self.start_time is not None else "n/a"}' ) def __repr__(self): return self.__str__()
def wts_external_oidc(hostname: str) -> Dict[str, Any]: """ Get the external_oidc from a connected WTS. Will report if WTS service is missing. Note that in some cases this can be considered a warning not a error. Args: hostname (str): hostname to access the WTS endpoint Returns: dict containing the oidc information """ oidc = {} try: response = requests.get(f"https://{hostname}/wts/external_oidc/") response.raise_for_status() data = response.json() if "providers" not in data: logger.warning( 'cannot find "providers". Likely no WTS service running for this commons' ) return oidc for item in data["providers"]: oidc[urlparse(item["base_url"]).netloc] = item except requests.exceptions.HTTPError as exc: logger.critical( f'HTTP Error ({exc.response.status_code}): {json_loads(exc.response.text).get("message", "")}' ) except JSONDecodeError as ex: logger.warning( f"Unable to process WTS response. Likely no WTS service running on this commons. " f"Certain commands might fail." ) return oidc def wts_get_token(hostname: str, idp: str, access_token: str): """ Gets a auth token from a Gen3 WTS server for the supplied idp Args: hostname (str): Gen3 common's WTS service idp: identity provider to use access_token: Gen3 Auth to use to with WTS Returns: Token for idp if successful, None if failure """ headers = { "Content-Type": "application/json", "Accept": "application/json", "Connection": "keep-alive", "Authorization": "bearer " + access_token, } try: url = f"https://{hostname}/wts/token/?idp={idp}" try: response = requests.get(url=url, headers=headers) response.raise_for_status() except requests.exceptions.HTTPError as exc: logger.critical( f"HTTP Error ({exc.response.status_code}): getting WTS token: {exc.response.text}" ) logger.critical( "Please make sure the target commons is connected on your profile page and that connection has not expired." ) return None return _handle_access_token_response(response, "token") except Gen3AuthError: logger.critical(f"Unable to authenticate your credentials with {hostname}") return None def get_drs_object_info(hostname: str, object_id: str) -> Optional[dict]: """ Retrieves information for a DRS object residing on the hostname Args: hostname (str): hostname of DRS object object_id (str): DRS object id Returns: GAG4H DRS object information if sucessful otherwise None """ try: response = requests.get(f"https://{hostname}/ga4gh/drs/v1/objects/{object_id}") response.raise_for_status() data = response.json() return data except requests.HTTPError as exc: if exc.response.status_code == 404: logger.critical( f"HTTP Error ({exc.response.status_code}): {object_id} not found at {hostname}" ) else: logger.critical( f"HTTP Error ({exc.response.status_code}): accessing object: {object_id}" ) return None except ConnectionError as exc: logger.critical(f"Connection Error {exc} when accessing object: {object_id}") return None def extract_filename_from_object_info(object_info: dict) -> Optional[str]: """Extracts the filename from the object_info. if filename is in object_info use that, otherwise try to extract it from the one of the access methods. Returns filename if found, else return None Args object_info (dict): DRS object dictionary """ if "name" in object_info and object_info["name"]: return object_info["name"] for access_method in object_info["access_methods"]: url = access_method["access_url"]["url"] parts = url.split("/") if parts: return parts[-1] return None def get_access_methods(object_info: dict) -> List[str]: """ Returns the DRS GA4GH access methods from the object_info. Args: object_info (dict): dict of GA4GH DRS Object information Returns: List of access methods """ if object_info is None: logger.critical("no access methods defined for this file") return [] return object_info["access_methods"] def get_drs_object_type(object_info: dict) -> DRSObjectType: """From the object info determine the type of object. Args: object_info (dict): DRS object dictionary Returns: type of object: either bundle or object """ if "form" in object_info: if object_info["form"] is None: return DRSObjectType.object return DRSObjectType(object_info["form"]) if "contents" in object_info and len(object_info["contents"]) > 0: return DRSObjectType.bundle else: return DRSObjectType.object def get_drs_object_timestamp(s: Optional[str]) -> Optional[datetime]: """returns the timestamp in datetime if not none otherwise returns None Args: s (Optional[str]): string to parse Returns: datetime if not None """ return date_parser.parse(s) if s is not None else None def add_drs_object_info(info: Downloadable) -> bool: """ Given a downloader object fill in the required fields from the resolved hostname. In the case of a bundle, try to resolve all object_ids contained in the bundle including other objects and bundles. Args: info (Downloadable): Downloadable to add information to Returns: True if object is valid and resolvable. """ if info.hostname is None: return False object_info = get_drs_object_info(info.hostname, info.object_id) if (object_info) is None: return False # Get common information we want info.file_name = extract_filename_from_object_info(object_info) info.file_size = object_info.get("size", -1) info.updated_time = get_drs_object_timestamp(object_info.get("updated_time", None)) info.created_time = get_drs_object_timestamp(object_info.get("created_time", None)) info.object_type = get_drs_object_type(object_info) if info.object_type == DRSObjectType.object: info.access_methods = get_access_methods(object_info) return True else: # a bundle,get everything else for item in object_info["contents"]: child_id = item.get("id", None) if child_id is None: continue child_object = Downloadable(hostname=info.hostname, object_id=child_id) add_drs_object_info(child_object) info.children.append(child_object) return True class InvisibleProgress: """ Invisible progress bar which stubs a tqdm progress bar """ def update(self, value): # pragma: no cover pass def download_file_from_url( url: str, filename: Path, show_progress: bool = True ) -> bool: """ Downloads a file using the URL. The URL is a pre-signed url created by the download manager from the access method of the DRS object. Args: url (str): URL to download from filename (str): name of the file to write data to show_progress (bool): show a progress bar (default) Returns: True if object has been downloaded """ try: response = requests.get(url, stream=True) response.raise_for_status() except requests.exceptions.Timeout: logger.critical(f"Was unable to get the download url: {url}. Timeout Error.") return False except requests.exceptions.HTTPError as exc: logger.critical( f"HTTP Error ({exc.response.status_code}): downloading file from {url}" ) return False total_size_in_bytes = int(response.headers.get("content-length", 0)) if total_size_in_bytes == 0: logger.critical(f"content-length is 0 and it should not be") return False total_downloaded = 0 block_size = 8092 # 8K blocks might want to tune this. progress_bar = ( tqdm( desc=f"{str(filename) : <45}", total=total_size_in_bytes, unit="iB", unit_scale=True, bar_format="{l_bar:45}{bar:35}{r_bar}{bar:-10b}", ) if show_progress else InvisibleProgress() ) # if the file name contains '/', create subdirectories and download there ensure_dirpath_exists(Path(os.path.dirname(filename))) try: with open(filename, "wb") as file: for data in response.iter_content(block_size): progress_bar.update(len(data)) total_downloaded += len(data) file.write(data) except IOError as ex: logger.critical(f"IOError opening {filename} for writing: {ex}") return False if total_downloaded != total_size_in_bytes: logger.critical( f"Error in downloading {filename}: expected {total_size_in_bytes} bytes, downloaded {total_downloaded} bytes" ) return False return True def unpackage_object(filepath: str): # allowed formats are set in PACKAGE_EXTENSIONS with zipfile.ZipFile(filepath, "r") as package: package.extractall(os.path.dirname(filepath)) def parse_drs_identifier(drs_candidate: str) -> Tuple[str, str, str]: """ Parses a DRS identifier to extract a hostname in the case of hostname based DRS otherwise it look for a DRS compact identifier. If neither one is recognized return an empty string and a type of 'unknown' Note: The regex expressions used to extract hostname or identifier has a potential for a false positive. Args: drs_candidate (str): a drs object identifier Returns: Tuple (str): tuple of hostname/drs prefix, guid, string: one of "hostname", "compact", "unknown" """ # determine if hostname or compact identifier or unknown drs_regex = r"drs://([A-Za-z0-9\.\-\~]+)/([A-Za-z0-9\.\-\_\~\/]+)" # either a drs prefix: matches = re.findall(drs_regex, drs_candidate, re.UNICODE) if len(matches) == 1: # this could be a hostname DRS id hostname_regex = ( r"^(([a-zA-Z0-9]|[a-zA-Z0-9][a-zA-Z0-9\-]*[a-zA-Z0-9])\.)*" r"([A-Za-z0-9]|[A-Za-z0-9][A-Za-z0-9\-]*[A-Za-z0-9])$" ) hostname_matches = re.findall(hostname_regex, matches[0][0], re.UNICODE) if len(hostname_matches) == 1: return matches[0][0], matches[0][1], "hostname" # possible compact rep compact_regex = r"([A-Za-z0-9\.\-\~]+)/([A-Za-z0-9\.\-\_\~\/]+)" matches = re.findall(compact_regex, drs_candidate, re.UNICODE) if len(matches) == 1 and len(matches[0]) == 2: return matches[0][0], matches[0][1], "compact" # can't figure out a this identifier return "", "", "unknown" def resolve_drs_hostname_from_id( object_id: str, resolved_drs_prefix_cache: dict, mds_url: str ) -> Optional[Tuple[str, str, str]]: """Resolves and returns a DRS identifier The resolved_drs_prefix_cache is updated if needed and is a potential side effect of this call Args: object_id (str): DRS object id to resolve resolved_drs_prefix_cache (dict) : cache of resolved DRS prefixes mds_url (str): the URL for the Gen3 Aggregate MDS to use to help resolved DRS hostname Returns: the hostname of the DRS server if resolved, otherwise it returns None """ hostname = None prefix, identifier, identifier_type = parse_drs_identifier(object_id) if identifier_type == "hostname": return prefix, identifier, identifier_type if identifier_type == "compact": if prefix not in resolved_drs_prefix_cache: hostname = resolve_drs(prefix, object_id, metadata_service_url=mds_url) if hostname is not None: resolved_drs_prefix_cache[prefix] = hostname else: hostname = resolved_drs_prefix_cache[prefix] return hostname, identifier, identifier_type def resolve_objects_drs_hostname_from_id( object_ids: List[Downloadable], resolved_drs_prefix_cache: dict, mds_url: str ) -> None: """Given a list of object_ids go through list and resolve + cache any unknown hosts Args: object_ids (List[Downloadable]): list of object to resolve resolved_drs_prefix_cache (dict): cache of resolved DRS prefixes mds_url (str): Gen3 metadata service to resolve DRS prefixes """ for entry in object_ids: if entry.hostname is None: # if resolution fails the entry hostname will still be None entry.hostname, nid, drs_type = resolve_drs_hostname_from_id( entry.object_id, resolved_drs_prefix_cache, mds_url ) if ( drs_type == "hostname" ): # drs_type is a hostname so object id will be the GUID entry.object_id = nid def ensure_dirpath_exists(path: Path) -> Path: """Utility to create a directory if missing. Returns the path so that the call can be inlined in a another call Args: path (Path): path to create Returns path of created directory """ assert path out_path: Path = path if not out_path.exists(): out_path.mkdir(parents=True, exist_ok=True) return out_path def get_download_url_using_drs( drs_hostname: str, object_id: str, access_method: str, access_token: str ) -> Optional[str]: """ Returns the presigned URL for a DRS object, from a DRS hostname, via the access method Args: drs_hostname (str): hostname of DRS server object_id (str): DRS object id access_method (str): access method to use access_token (str): access token to DRS server Returns: presigned url to object """ headers = { "Content-Type": "application/json", "Accept": "application/json", "Authorization": "bearer " + access_token, } try: response = requests.get( url=f"https://{drs_hostname}/ga4gh/drs/v1/objects/{object_id}/access/{access_method}", headers=headers, ) response.raise_for_status() data = response.json() return data.get("url", None) except requests.exceptions.Timeout: logger.critical(f"Was unable to download: {object_id}. Timeout Error.") except requests.exceptions.HTTPError as exc: logger.critical( f"HTTP Error ({exc.response.status_code}) when requesting download url from {access_method}" ) return None def get_user_auth(commons_url: str, access_token: str) -> Optional[List[str]]: """ Retrieves a user's authz for the commons based on the access token. Any error will be logged and None is returned Args: commons_url (str): hostname of Gen3 indexd access_token (str): user's auth token Returns: The authz object from the user endpoint """ """ """ headers = { "Content-Type": "application/json", "Accept": "application/json", "Authorization": "bearer " + access_token, } try: response = requests.get(url=f"https://{commons_url}/user/user", headers=headers) response.raise_for_status() data = response.json() authz = data["authz"] return authz except requests.exceptions.HTTPError as exc: logger.critical(f"HTTP Error ({exc.response.status_code}): getting user access") return None def list_auth(hostname: str, authz: dict): """ Prints the authz for a DRS hostname Args: hostname (str): hostname to list access authz (str): dictionary of authz stringts """ print( f"───────────────────────────────────────────────────────────────────────────────────────────────────────" ) print(f"Access for {hostname}:") if authz is not None and len(authz) > 0: for access, methods in authz.items(): print( f" {access : <55}: {' '.join(dict.fromkeys(x['method'] for x in methods).keys()):>40}" ) else: print(" No access")
[docs] class DownloadManager: """ Class to assist in downloading a list of Downloadable object which at a minimum is a json manifest of DRS object ids. The methods of interest are download and user_access. """ def __init__( self, hostname: str, auth: Gen3Auth, download_list: List[Downloadable], show_progress: bool = False, ): """ Initialize the DownloadManager so that is ready to start downloading. Note the downloadable objects are required so that all tokens are available to support the download. Args: hostname (str): Gen3 commons home commons auth (Gen3Auth) : Gen3 authentication download_list (List[Downloadable]): list of objects to download """ self.hostname = hostname self.access_token = auth.get_access_token() self.metadata = Gen3Metadata(auth) self.wts_endpoints = wts_external_oidc(hostname) self.resolved_compact_drs = {} # add COMMONS host as a DRSEndpoint as it does not use the WTS self.known_hosts = { self.hostname: KnownDRSEndpoint( hostname=self.hostname, access_token=self.access_token, use_wts=False, expire=datetime.fromtimestamp(decode_token(self.access_token)["exp"]), ) } self.download_list = download_list self.resolve_objects(self.download_list, show_progress)
[docs] def resolve_objects(self, object_list: List[Downloadable], show_progress: bool): """ Given an Downloadable object list, resolve the DRS hostnames and update each Downloadable Args: object_list (List[Downloadable]): list of Downloadable objects to resolve """ resolve_objects_drs_hostname_from_id( object_list, self.resolved_compact_drs, f"http://{self.hostname}/mds/aggregate/info", ) progress_bar = ( tqdm(desc=f"Resolving objects", total=len(object_list)) if show_progress else InvisibleProgress() ) for entry in object_list: add_drs_object_info(entry) # sugar to allow download objects to self download entry._manager = self progress_bar.update(1)
[docs] def cache_hosts_wts_tokens(self, object_list): """ Using the list of DRS host obtain a WTS token for all DRS hosts in the list. It's is possible """ # create two sets: one of the know WTS host and the other of the host in the manifest wts_endpoint_set = set(self.wts_endpoints.keys()) object_id_hostnames = { x.hostname for x in object_list if x.hostname is not None } drs_in_wts = wts_endpoint_set.intersection( object_id_hostnames ) # all DRS host in WTS drs_not_in_wts = object_id_hostnames.difference( wts_endpoint_set ) # all DRS host not in WTS for drs_hostname in drs_in_wts: endpoint = KnownDRSEndpoint( hostname=drs_hostname, idp=self.wts_endpoints[drs_hostname]["idp"], ) endpoint.renew_token(self.hostname, self.access_token) self.known_hosts[drs_hostname] = endpoint for drs_hostname in drs_not_in_wts: # if we already know the host then we don't need to reset the host if drs_hostname in self.known_hosts: continue # mark hostname as unavailable self.known_hosts[drs_hostname] = KnownDRSEndpoint( hostname=drs_hostname, ) logger.critical( f"Could not retrieve a token for {drs_hostname}: it is not available as a WTS endpoint." )
[docs] def get_fresh_token(self, drs_hostname: str) -> Optional[str]: """Will return and/or refresh and return a WTS token if hostname is known otherwise returns None. Args: drs_hostname (str): hostname to get token for Returns: access token if successful otherwise None """ if drs_hostname not in self.known_hosts: logger.critical(f"Could not find {drs_hostname} in cache.") return None if self.known_hosts[drs_hostname].available: if not self.known_hosts[drs_hostname].expired(): return self.known_hosts[drs_hostname].access_token else: # update the token self.known_hosts[drs_hostname].renew_token( self.hostname, self.access_token ) return self.known_hosts[drs_hostname].access_token return None
[docs] def download( self, object_list: List[Downloadable], save_directory: str = ".", show_progress: bool = False, unpack_packages: bool = True, delete_unpacked_packages: bool = False, ) -> Dict[str, Any]: """ Downloads objects to the directory or current working directory. The input is an list of Downloadable object created by loading a manifest using the Manifest class or a call to Manifest.load(... The download manager will download each file in the manifest, in the case of errors they are logged and it continues. The return value is a list of DownloadStatus object, detailing the results of the download. Args: object_list (List[Downloadable]): save_directory (str): directory to save to (will be created) show_progress (bool): show a download progress bar unpack_packages (bool): set to False to disable the unpacking of downloaded packages delete_unpacked_packages (bool): set to True to delete package files after unpacking them Returns: List of DownloadStatus objects for each object id in object_list """ self.cache_hosts_wts_tokens(object_list) output_dir = Path(save_directory) completed = { entry.object_id: DownloadStatus(filename=entry.file_name) for entry in object_list } for entry in object_list: # handle bundles first if entry.object_type is DRSObjectType.bundle: # append the filename to the directory path and child_dir = Path(save_directory, entry.file_name) # call download with the children object list child_status = self.download( entry.children, child_dir, show_progress, unpack_packages, delete_unpacked_packages, ) # when complete, append the return status completed[entry.object_id] = child_status continue if entry.hostname is None: logger.critical( f"{entry.hostname} was not resolved, skipping {entry.object_id}." f"Skipping {entry.file_name}" ) completed[entry.object_id].status = "error (resolving DRS host)" continue # check to see if we have tokens if entry.hostname not in self.known_hosts: logger.critical( f"{entry.hostname} is not present in this commons remote user access." f"Skipping {entry.file_name}" ) completed[entry.object_id].status = "error (resolving DRS host)" continue if self.known_hosts[entry.hostname].available is False: logger.critical( f"Was unable to get user authorization from {entry.hostname}. Skipping {entry.file_name}" ) completed[entry.object_id].status = "error (no auth)" continue drs_hostname = entry.hostname access_token = self.get_fresh_token(drs_hostname) if access_token is None: logger.critical( f"No access token defined for {entry.object_id}. Skipping" ) completed[entry.object_id].status = "error (no access token)" continue # TODO refine the selection of access_method if len(entry.access_methods) == 0: logger.critical( f"No access methods defined for {entry.object_id}. Skipping" ) completed[entry.object_id].status = "error (no access methods)" continue access_method = entry.access_methods[0]["access_id"] download_url = get_download_url_using_drs( drs_hostname, entry.object_id, access_method, access_token, ) if download_url is None: completed[entry.object_id].status = "error" continue completed[entry.object_id].start_time = datetime.now(timezone.utc) filepath = output_dir.joinpath(entry.file_name) res = download_file_from_url( url=download_url, filename=filepath, show_progress=show_progress ) # check if the file is a package; if so, unpack it in place ext = os.path.splitext(entry.file_name)[-1] if unpack_packages and ext in PACKAGE_EXTENSIONS: try: mds_entry = self.metadata.get(entry.object_id) except Exception: mds_entry = {} # no MDS or object not in MDS logger.debug( f"{entry.file_name} is not a package and will not be expanded" ) # if the metadata type is "package", then unpack if mds_entry.get("type") == "package": try: unpackage_object(filepath) except Exception as e: logger.critical( f"{entry.file_name} had an issue while being unpackaged: {e}" ) res = False if delete_unpacked_packages: filepath.unlink() if res: completed[entry.object_id].status = "downloaded" logger.debug( f"object {entry.object_id} has been successfully downloaded." ) else: completed[entry.object_id].status = "error" logger.debug(f"object {entry.object_id} has failed to be downloaded.") completed[entry.object_id].end_time = datetime.now(timezone.utc) return completed
[docs] def user_access(self): """ List the user's access permissions on each host needed to download DRS objects in the manifest. A useful way to determine if access permissions are one reason a download failed. Returns: list of authz for each DRS host """ results = {} self.cache_hosts_wts_tokens(self.download_list) for hostname in self.known_hosts.keys(): if self.known_hosts[hostname].available is False: logger.critical( f"Was unable to get user authorization from {hostname}." ) continue access_token = self.known_hosts[hostname].access_token authz = get_user_auth(hostname, access_token) results[hostname] = authz return results
def _download( hostname, auth, infile, output_dir=".", show_progress=False, unpack_packages=True, delete_unpacked_packages=False, ) -> Optional[Dict[str, Any]]: """ A convenience function used to download a json manifest. Args: hostname (str): hostname of Gen3 commons to use for access and WTS auth (str): Gen3 Auth instance infile (str): manifest file output_dir: directory to save downloaded files to show_progress: show progress bar unpack_packages (bool): set to False to disable the unpacking of downloaded packages delete_unpacked_packages (bool): set to True to delete package files after unpacking them Returns: List of DownloadStatus objects for each object id in object_list """ object_list = Manifest.load(Path(infile)) if object_list is None: logger.critical(f"Error loading {infile}") return None try: auth.get_access_token() except Gen3AuthError: logger.critical(f"Unable to authenticate your credentials with {hostname}") return downloader = DownloadManager( hostname=hostname, auth=auth, download_list=object_list, show_progress=show_progress, ) out_dir_path = ensure_dirpath_exists(Path(output_dir)) return downloader.download( object_list, str(out_dir_path), show_progress=show_progress, unpack_packages=unpack_packages, delete_unpacked_packages=delete_unpacked_packages, ) def _download_obj( hostname, auth, object_ids, output_dir=".", show_progress=False, unpack_packages=True, delete_unpacked_packages=False, ) -> Optional[Dict[str, Any]]: """ A convenience function used to download a single DRS object. Args: hostname (str): hostname of Gen3 commons to use for access and WTS auth: Gen3 Auth instance object_ids (List[str]): DRS object id output_dir: directory to save downloaded files to show_progress: show progress bar unpack_packages (bool): set to False to disable the unpacking of downloaded packages delete_unpacked_packages (bool): set to True to delete package files after unpacking them Returns: List of DownloadStatus objects for the DRS object """ try: auth.get_access_token() except Gen3AuthError: logger.critical(f"Unable to authenticate your credentials with {hostname}") return None object_list = [Downloadable(object_id=object_id) for object_id in object_ids] downloader = DownloadManager( hostname=hostname, auth=auth, download_list=object_list, show_progress=show_progress, ) out_dir_path = ensure_dirpath_exists(Path(output_dir)) return downloader.download( object_list, str(out_dir_path), show_progress=show_progress, unpack_packages=unpack_packages, delete_unpacked_packages=delete_unpacked_packages, ) def _listfiles(hostname, auth, infile: str) -> bool: """ A wrapper function used by the cli to list files in a manifest. Args: hostname (str): hostname of Gen3 commons to use for access and WTS auth: Gen3 Auth instance infile (str): manifest file Returns: True if successfully listed """ object_list = Manifest.load(Path(infile)) if object_list is None: return False try: auth.get_access_token() except Gen3AuthError: logger.critical(f"Unable to authenticate your credentials with {hostname}") return False except requests.exceptions.RequestException as ex: logger.critical( f"Unable to authenticate your credentials with {hostname}: {str(ex)}" ) return False DownloadManager( hostname=hostname, auth=auth, download_list=object_list, show_progress=True ) for x in object_list: print(x.pprint()) return True def _list_object(hostname, auth, object_id: str) -> bool: """ Lists a DRS object. Args: hostname (str): hostname of Gen3 commons to use for access and WTS auth: Gen3 Auth instance object_id (str): DRS object Returns: True if successfully listed """ try: auth.get_access_token() except Gen3AuthError: logger.critical(f"Unable to authenticate your credentials with {hostname}") return False except requests.exceptions.RequestException as ex: logger.critical( f"Unable to authenticate your credentials with {hostname}: {ex}" ) return False object_list = [Downloadable(object_id=object_id)] DownloadManager( hostname=hostname, auth=auth, download_list=object_list, show_progress=False ) for x in object_list: print(x.pprint()) return True def _list_access(hostname, auth, infile: str) -> bool: """ A convenience function to list a users access for all DRS hostname in a manifest. Args: hostname (str): hostname of Gen3 commons to use for access and WTS auth: Gen3 Auth instance infile (str): manifest file Returns: True if successfully listed """ object_list = Manifest.load(Path(infile)) if object_list is None: return False try: auth.get_access_token() except Gen3AuthError: logger.critical(f"Unable to authenticate your credentials with {hostname}") return False except requests.exceptions.RequestException as ex: logger.critical( f"Unable to authenticate your credentials with {hostname}: {ex}" ) return False download = DownloadManager( hostname=hostname, auth=auth, download_list=object_list, show_progress=False ) access = download.user_access() for h, access in access.items(): list_auth(h, access) return True # These functions are exposed to the SDK's cli under the drs-pull subcommand
[docs] def list_files_in_drs_manifest(hostname, auth, infile: str) -> bool: """ A wrapper function used by the cli to list files in a manifest. Args: hostname (str): hostname of Gen3 commons to use for access and WTS auth: Gen3 Auth instance infile (str): manifest file Returns: True if successfully listed """ return _listfiles(hostname, auth, infile)
[docs] def list_drs_object(hostname, auth, object_id: str) -> bool: """ A convenience function used to list a DRS object. Args: hostname (str): hostname of Gen3 commons to use for access and WTS auth: Gen3 Auth instance object_id (str): DRS object Returns: True if successfully listed """ return _list_object(hostname, auth, object_id) # pragma: no cover
[docs] def download_files_in_drs_manifest( hostname, auth, infile, output_dir, show_progress=True, unpack_packages=True, delete_unpacked_packages=False, ) -> None: """ A convenience function used to download a json manifest. Args: hostname (str): hostname of Gen3 commons to use for access and WTS auth (str): Gen3 Auth instance infile (str): manifest file output_dir: directory to save downloaded files to unpack_packages (bool): set to False to disable the unpacking of downloaded packages delete_unpacked_packages (bool): set to True to delete package files after unpacking them Returns: """ _download( hostname, auth, infile, output_dir, show_progress, unpack_packages, delete_unpacked_packages, )
def download_drs_objects( hostname, auth, object_ids, output_dir, show_progress=True, unpack_packages=True, delete_unpacked_packages=False, ) -> None: """ A convenience function used to download a single DRS object. Args: hostname (str): hostname of Gen3 commons to use for access and WTS auth: Gen3 Auth instance object_ids (List[str]): DRS object ids output_dir: directory to save downloaded files to unpack_packages (bool): set to False to disable the unpacking of downloaded packages delete_unpacked_packages (bool): set to True to delete package files after unpacking them Returns: List of DownloadStatus objects for the DRS object """ return _download_obj( hostname, auth, object_ids, output_dir, show_progress, unpack_packages, delete_unpacked_packages, )
[docs] def list_access_in_drs_manifest(hostname, auth, infile) -> bool: """ A convenience function to list a users access for all DRS hostname in a manifest. Args: hostname (str): hostname of Gen3 commons to use for access and WTS auth: Gen3 Auth instance infile (str): manifest file Returns: True if successfully listed """ return _list_access(hostname, auth, infile)