Source code for gen3.index

import aiohttp
import backoff
import requests
import urllib.parse
from cdislogging import get_logger

import sys

import indexclient.client as client

from gen3.utils import DEFAULT_BACKOFF_SETTINGS, raise_for_status_and_print_error
from gen3.auth import Gen3Auth

logging = get_logger("__name__")


[docs] class Gen3Index: """ A class for interacting with the Gen3 Index services. Args: endpoint (str): public endpoint for reading/querying indexd - only necessary if auth_provider not provided auth_provider (Gen3Auth): A Gen3Auth class instance or indexd basic creds tuple Examples: This generates the Gen3Index class pointed at the sandbox commons while using the credentials.json downloaded from the commons profile page. >>> auth = Gen3Auth(refresh_file="credentials.json") ... index = Gen3Index(auth) """ def __init__(self, endpoint=None, auth_provider=None, service_location="index"): # legacy interface required endpoint as 1st arg if endpoint and isinstance(endpoint, Gen3Auth): auth_provider = endpoint endpoint = None if auth_provider and isinstance(auth_provider, Gen3Auth): endpoint = auth_provider.endpoint endpoint = endpoint.strip("/") # if running locally, indexd is deployed by itself without a location relative # to the commons if "http://localhost" in endpoint: service_location = "" if not endpoint.endswith(service_location): endpoint += "/" + service_location self.client = client.IndexClient(endpoint, auth=auth_provider) ### Get Requests
[docs] def is_healthy(self): """ Return if indexd is healthy or not """ try: response = self.client._get("_status") response.raise_for_status() except Exception: return False return response.text == "Healthy"
[docs] @backoff.on_exception(backoff.expo, Exception, **DEFAULT_BACKOFF_SETTINGS) def get_version(self): """ Return the version of indexd """ response = self.client._get("_version") raise_for_status_and_print_error(response) return response.json()
[docs] @backoff.on_exception(backoff.expo, Exception, **DEFAULT_BACKOFF_SETTINGS) def get_stats(self): """ Return basic info about the records in indexd """ response = self.client._get("_stats") raise_for_status_and_print_error(response) return response.json()
[docs] @backoff.on_exception(backoff.expo, Exception, **DEFAULT_BACKOFF_SETTINGS) def get_all_records(self, limit=None, paginate=False): """ Get a list of all records """ all_records = [] url = "index/" if limit: url += f"?limit={limit}" response = self.client._get(url) raise_for_status_and_print_error(response) records = response.json().get("records") all_records.extend(records) if paginate and records: previous_did = None start_did = records[-1].get("did") while start_did != previous_did: previous_did = start_did params = {"start": f"{start_did}"} url_parts = list(urllib.parse.urlparse(url)) query = dict(urllib.parse.parse_qsl(url_parts[4])) query.update(params) url_parts[4] = urllib.parse.urlencode(query) url = urllib.parse.urlunparse(url_parts) response = self.client._get(url) raise_for_status_and_print_error(response) records = response.json().get("records") all_records.extend(records) if records: start_did = response.json().get("records")[-1].get("did") return all_records
[docs] @backoff.on_exception(backoff.expo, Exception, **DEFAULT_BACKOFF_SETTINGS) def get_records_on_page(self, limit=None, page=None): """ Get a list of all records given the page and page size limit """ params = {} url = "index/" if limit is not None: params["limit"] = limit if page is not None: params["page"] = page query = urllib.parse.urlencode(params) response = self.client._get(url + "?" + query) raise_for_status_and_print_error(response) return response.json().get("records")
[docs] @backoff.on_exception(backoff.expo, Exception, **DEFAULT_BACKOFF_SETTINGS) async def async_get_record(self, guid=None, _ssl=None): """ Asynchronous function to request a record from indexd. Args: guid (str): record guid Returns: dict: indexd record """ url = f"{self.client.url}/index/{guid}" async with aiohttp.ClientSession() as session: async with session.get(url, ssl=_ssl) as response: raise_for_status_and_print_error(response) response = await response.json() return response
[docs] @backoff.on_exception(backoff.expo, Exception, **DEFAULT_BACKOFF_SETTINGS) async def async_get_records_on_page(self, limit=None, page=None, _ssl=None): """ Asynchronous function to request a page from indexd. Args: page (int/str): indexd page to request Returns: List[dict]: List of indexd records from the page """ all_records = [] params = {} if limit is not None: params["limit"] = limit if page is not None: params["page"] = page query = urllib.parse.urlencode(params) url = f"{self.client.url}/index" + "?" + query async with aiohttp.ClientSession() as session: async with session.get(url, ssl=_ssl) as response: response = await response.json() return response.get("records")
[docs] @backoff.on_exception(backoff.expo, Exception, **DEFAULT_BACKOFF_SETTINGS) async def async_get_records_from_checksum( self, checksum, checksum_type="md5", _ssl=None ): """ Asynchronous function to request records from indexd matching checksum. Args: checksum (str): indexd checksum to request checksum_type (str): type of checksum, defaults to md5 Returns: List[dict]: List of indexd records """ all_records = [] params = {} params["hash"] = f"{checksum_type}:{checksum}" query = urllib.parse.urlencode(params) url = f"{self.client.url}/index" + "?" + query async with aiohttp.ClientSession() as session: async with session.get(url, ssl=_ssl) as response: response = await response.json() return response.get("records")
[docs] @backoff.on_exception(backoff.expo, Exception, **DEFAULT_BACKOFF_SETTINGS) def get(self, guid, dist_resolution=True): """ Get the metadata associated with the given id, alias, or distributed identifier Args: guid: string - record id dist_resolution: boolean - *optional* Specify if we want distributed dist_resolution or not """ rec = self.client.global_get(guid, dist_resolution) if not rec: return rec return rec.to_json()
[docs] @backoff.on_exception(backoff.expo, Exception, **DEFAULT_BACKOFF_SETTINGS) def get_urls(self, size=None, hashes=None, guids=None): """ Get a list of urls that match query params Args: size: integer - object size hashes: string - hashes specified as algorithm:value guids: list - list of ids """ if guids: guids = ",".join(guids) p = {"size": size, "hash": hashes, "ids": guids} urls = self.client._get("urls", params=p).json() return [url for _, url in urls.items()]
[docs] @backoff.on_exception(backoff.expo, Exception, **DEFAULT_BACKOFF_SETTINGS) def get_record(self, guid): """ Get the metadata associated with a given id """ rec = self.client.get(guid) if not rec: return rec return rec.to_json()
[docs] @backoff.on_exception(backoff.expo, Exception, **DEFAULT_BACKOFF_SETTINGS) def get_record_doc(self, guid): """ Get the metadata associated with a given id """ return self.client.get(guid)
[docs] @backoff.on_exception(backoff.expo, Exception, **DEFAULT_BACKOFF_SETTINGS) def get_with_params(self, params=None): """ Return a document object corresponding to the supplied parameters, such as ``{'hashes': {'md5': '...'}, 'size': '...', 'metadata': {'file_state': '...'}}``. - need to include all the hashes in the request - index client like signpost or indexd will need to handle the query param `'hash': 'hash_type:hash'` """ rec = self.client.get_with_params(params) if not rec: return rec return rec.to_json()
[docs] @backoff.on_exception(backoff.expo, Exception, **DEFAULT_BACKOFF_SETTINGS) async def async_get_with_params(self, params, _ssl=None): """ Return a document object corresponding to the supplied parameter - need to include all the hashes in the request - need to handle the query param `'hash': 'hash_type:hash'` Args: params (dict): params to search with _ssl (None, optional): whether or not to use ssl Returns: Document: json representation of an entry in indexd """ query_params = urllib.parse.urlencode(params) url = f"{self.client.url}/index/?{query_params}" async with aiohttp.ClientSession() as session: async with session.get(url, ssl=_ssl) as response: await response.raise_for_status() response = await response.json() return response
[docs] @backoff.on_exception(backoff.expo, Exception, **DEFAULT_BACKOFF_SETTINGS) def get_latest_version(self, guid, has_version=False): """ Get the metadata of the latest index record version associated with the given id Args: guid: string - record id has_version: boolean - *optional* exclude entries without a version """ rec = self.client.get_latest_version(guid, has_version) if not rec: return rec return rec.to_json()
[docs] @backoff.on_exception(backoff.expo, Exception, **DEFAULT_BACKOFF_SETTINGS) def get_versions(self, guid): """ Get the metadata of index record version associated with the given id Args: guid: string - record id """ response = self.client._get(f"/index/{guid}/versions") raise_for_status_and_print_error(response) versions = response.json() return [r for _, r in versions.items()]
### Post Requests
[docs] @backoff.on_exception(backoff.expo, Exception, **DEFAULT_BACKOFF_SETTINGS) def create_record( self, hashes, size, did=None, urls=None, file_name=None, metadata=None, baseid=None, acl=None, urls_metadata=None, version=None, authz=None, description=None, content_created_date=None, content_updated_date=None, ): """ Create a new record and add it to the index Args: hashes (dict): {hash type: hash value,} eg ``hashes={'md5': ab167e49d25b488939b1ede42752458b'}`` size (int): file size metadata associated with a given uuid did (str): provide a UUID for the new indexd to be made urls (list): list of URLs where you can download the UUID acl (list): access control list authz (list): RBAC strings file_name (str): name of the file associated with a given UUID metadata (dict): additional key value metadata for this entry urls_metadata (dict): metadata attached to each url baseid (str): optional baseid to group with previous entries versions version (str): entry version string description (str): optional description of the object content_created_date (datetime): optional creation date and time of the content being indexed content_updated_date (datetime): optional update date and time of the content being indexed Returns: Document: json representation of an entry in indexd """ if urls is None: urls = [] json = { "urls": urls, "hashes": hashes, "size": size, "file_name": file_name, "metadata": metadata, "urls_metadata": urls_metadata, "baseid": baseid, "acl": acl, "authz": authz, "version": version, "description": description, "content_created_date": content_created_date, "content_updated_date": content_updated_date, } if did: json["did"] = did rec = self.client.create(**json) return rec.to_json()
[docs] @backoff.on_exception(backoff.expo, Exception, **DEFAULT_BACKOFF_SETTINGS) async def async_create_record( self, hashes, size, did=None, urls=None, file_name=None, metadata=None, baseid=None, acl=None, urls_metadata=None, version=None, authz=None, _ssl=None, description=None, content_created_date=None, content_updated_date=None, ): """ Asynchronous function to create a record in indexd. Args: hashes (dict): {hash type: hash value,} eg ``hashes={'md5': ab167e49d25b488939b1ede42752458b'}`` size (int): file size metadata associated with a given uuid did (str): provide a UUID for the new indexd to be made urls (list): list of URLs where you can download the UUID acl (list): access control list authz (str): RBAC string file_name (str): name of the file associated with a given UUID metadata (dict): additional key value metadata for this entry urls_metadata (dict): metadata attached to each url baseid (str): optional baseid to group with previous entries versions version (str): entry version string description (str): optional description of the object content_created_date (datetime): optional creation date and time of the content being indexed content_updated_date (datetime): optional update date and time of the content being indexed Returns: Document: json representation of an entry in indexd """ async with aiohttp.ClientSession() as session: if urls is None: urls = [] json = { "form": "object", "hashes": hashes, "size": size, "urls": urls or [], } if did: json["did"] = did if file_name: json["file_name"] = file_name if metadata: json["metadata"] = metadata if baseid: json["baseid"] = baseid if acl: json["acl"] = acl if urls_metadata: json["urls_metadata"] = urls_metadata if version: json["version"] = version if authz: json["authz"] = authz if description: json["description"] = description if content_created_date: json["content_created_date"] = content_created_date if content_updated_date: json["content_updated_date"] = content_updated_date # aiohttp only allows basic auth with their built in auth, so we # need to manually add JWT auth header headers = {"Authorization": self.client.auth._get_auth_value()} async with session.post( f"{self.client.url}/index/", json=json, headers=headers, ssl=_ssl, ) as response: assert response.status == 200, await response.json() response = await response.json() return response
[docs] @backoff.on_exception(backoff.expo, Exception, **DEFAULT_BACKOFF_SETTINGS) def create_blank(self, uploader, file_name=None): """ Create a blank record Args: json - json in the format: { 'uploader': type(string) 'file_name': type(string) (optional*) } """ json = {"uploader": uploader, "file_name": file_name} response = self.client._post( "index/blank", headers={"content-type": "application/json"}, auth=self.client.auth, data=client.json_dumps(json), ) raise_for_status_and_print_error(response) rec = response.json() return self.get_record(rec["did"])
[docs] @backoff.on_exception(backoff.expo, Exception, **DEFAULT_BACKOFF_SETTINGS) def create_new_version( self, guid, hashes, size, did=None, urls=None, file_name=None, metadata=None, acl=None, urls_metadata=None, version=None, authz=None, description=None, content_created_date=None, content_updated_date=None, ): """ Add new version for the document associated to the provided uuid Since data content is immutable, when you want to change the size or hash, a new index document with a new uuid needs to be created as its new version. That uuid is returned in the did field of the response. The old index document is not deleted. Args: guid: (string): record id hashes (dict): {hash type: hash value,} eg ``hashes={'md5': ab167e49d25b488939b1ede42752458b'}`` size (int): file size metadata associated with a given uuid did (str): provide a UUID for the new indexd to be made urls (list): list of URLs where you can download the UUID file_name (str): name of the file associated with a given UUID metadata (dict): additional key value metadata for this entry acl (list): access control list urls_metadata (dict): metadata attached to each url version (str): entry version string authz (str): RBAC string description (str): optional description of the object content_created_date (datetime): optional creation date and time of the content being indexed content_updated_date (datetime): optional update date and time of the content being indexed body: json/dictionary format - Metadata object that needs to be added to the store. Providing size and at least one hash is necessary and sufficient. Note: it is a good idea to add a version number """ if urls is None: urls = [] json = { "urls": urls, "form": "object", "hashes": hashes, "size": size, "file_name": file_name, "metadata": metadata, "urls_metadata": urls_metadata, "acl": acl, "authz": authz, "version": version, "description": description, "content_created_date": content_created_date, "content_updated_date": content_updated_date, } if did: json["did"] = did response = self.client._post( "index", guid, headers={"content-type": "application/json"}, data=client.json_dumps(json), auth=self.client.auth, ) raise_for_status_and_print_error(response) rec = response.json() if rec and "did" in rec: return self.get_record(rec["did"]) return None
[docs] @backoff.on_exception(backoff.expo, Exception, **DEFAULT_BACKOFF_SETTINGS) def get_records(self, dids): """ Get a list of documents given a list of dids Args: dids: list - a list of record ids Returns: list: json representing index records """ try: response = self.client._post( "bulk/documents", json=dids, auth=self.client.auth ) except requests.HTTPError as exception: if exception.response.status_code == 404: return None else: raise exception return response.json()
### Put Requests
[docs] @backoff.on_exception(backoff.expo, Exception, **DEFAULT_BACKOFF_SETTINGS) def update_blank(self, guid, rev, hashes, size, urls=None, authz=None): """ Update only hashes and size for a blank index Args: guid (string): record id rev (string): data revision - simple consistency mechanism hashes (dict): {hash type: hash value,} eg ``hashes={'md5': ab167e49d25b488939b1ede42752458b'}`` size (int): file size metadata associated with a given uuid """ params = {"rev": rev} json = {"hashes": hashes, "size": size} if urls: json["urls"] = urls if authz: json["authz"] = authz response = self.client._put( "index/blank", guid, headers={"content-type": "application/json"}, params=params, auth=self.client.auth, data=client.json_dumps(json), ) raise_for_status_and_print_error(response) rec = response.json() return self.get_record(rec["did"])
[docs] @backoff.on_exception(backoff.expo, Exception, **DEFAULT_BACKOFF_SETTINGS) def update_record( self, guid, file_name=None, urls=None, version=None, metadata=None, acl=None, authz=None, urls_metadata=None, description=None, content_created_date=None, content_updated_date=None, ): """ Update an existing entry in the index Args: guid: string - record id body: json/dictionary format - index record information that needs to be updated. - can not update size or hash, use new version for that """ updatable_attrs = { "file_name": file_name, "urls": urls, "version": version, "metadata": metadata, "acl": acl, "authz": authz, "urls_metadata": urls_metadata, "description": description, "content_created_date": content_created_date, "content_updated_date": content_updated_date, } rec = self.client.get(guid) for k, v in updatable_attrs.items(): if v is not None: exec(f"rec.{k} = v") rec.patch() return rec.to_json()
[docs] @backoff.on_exception(backoff.expo, Exception, **DEFAULT_BACKOFF_SETTINGS) async def async_update_record( self, guid, file_name=None, urls=None, version=None, metadata=None, acl=None, authz=None, urls_metadata=None, _ssl=None, description=None, content_created_date=None, content_updated_date=None, **kwargs, ): """ Asynchronous function to update a record in indexd. Args: guid: string - record id body: json/dictionary format - index record information that needs to be updated. - can not update size or hash, use new version for that """ async with aiohttp.ClientSession() as session: updatable_attrs = { "file_name": file_name, "urls": urls, "version": version, "metadata": metadata, "acl": acl, "authz": authz, "urls_metadata": urls_metadata, "description": description, "content_created_date": content_created_date, "content_updated_date": content_updated_date, } record = await self.async_get_record(guid) revision = record.get("rev") for key, value in updatable_attrs.items(): if value is not None: record[key] = value del record["created_date"] del record["rev"] del record["updated_date"] del record["version"] del record["uploader"] del record["form"] del record["urls_metadata"] del record["baseid"] del record["size"] del record["hashes"] del record["did"] logging.info(f"PUT-ing record: {record}") # aiohttp only allows basic auth with their built in auth, so we # need to manually add JWT auth header headers = {"Authorization": self.client.auth._get_auth_value()} async with session.put( f"{self.client.url}/index/{guid}?rev={revision}", json=record, headers=headers, ssl=_ssl, ) as response: assert response.status == 200, await response.json() response = await response.json() return response
### Delete Requests
[docs] @backoff.on_exception(backoff.expo, Exception, **DEFAULT_BACKOFF_SETTINGS) def delete_record(self, guid): """ Delete an entry from the index Args: guid: string - record id Returns: Nothing """ rec = self.client.get(guid) if rec: rec.delete() return rec
### Query Requests
[docs] @backoff.on_exception(backoff.expo, Exception, **DEFAULT_BACKOFF_SETTINGS) def query_urls(self, pattern): """ Query all record URLs for given pattern Args: pattern (str): pattern to match against indexd urls Returns: List[records]: indexd records with urls matching pattern """ response = self.client._get(f"/_query/urls/q?include={pattern}") raise_for_status_and_print_error(response) return response.json()
[docs] @backoff.on_exception(backoff.expo, Exception, **DEFAULT_BACKOFF_SETTINGS) async def async_query_urls(self, pattern, _ssl=None): """ Asynchronous function to query urls from indexd. Args: pattern (str): pattern to match against indexd urls Returns: List[records]: indexd records with urls matching pattern """ url = f"{self.client.url}/_query/urls/q?include={pattern}" async with aiohttp.ClientSession() as session: logging.debug(f"request: {url}") async with session.get(url, ssl=_ssl) as response: raise_for_status_and_print_error(response) response = await response.json() return response
## Mint GUID Requests
[docs] @backoff.on_exception(backoff.expo, Exception, **DEFAULT_BACKOFF_SETTINGS) def get_valid_guids(self, count=None): """ Get a list of valid GUIDs without indexing Args: count (int): number of GUIDs to request Returns: List[str]: list of valid indexd GUIDs """ url = "/guid/mint" if count: url += f"?count={count}" response = self.client._get(url) response.raise_for_status() return response.json().get("guids", [])
[docs] @backoff.on_exception(backoff.expo, Exception, **DEFAULT_BACKOFF_SETTINGS) def get_guids_prefix(self): """ Get the prefix for GUIDs if there is one Returns: str: prefix for this instance """ response = self.client._get("/guid/prefix") response.raise_for_status() return response.json().get("prefix")
def _print_func_name(function): return "{}.{}".format(function.__module__, function.__name__) def _print_kwargs(kwargs): return ", ".join("{}={}".format(k, repr(v)) for k, v in list(kwargs.items()))