Source code for gen3.metadata
"""
Contains class for interacting with Gen3's Metadata Service.
"""
import aiohttp
import backoff
from datetime import datetime
import requests
import json
import os
from urllib.parse import urlparse
from gen3.utils import (
append_query_params,
DEFAULT_BACKOFF_SETTINGS,
BACKOFF_NO_LOG_IF_NOT_RETRIED,
_verify_schema,
)
from gen3.auth import Gen3Auth
from gen3.tools.utils import (
RECORD_TYPE_STANDARD_KEY,
GUID_COLUMN_NAMES,
FILENAME_COLUMN_NAMES,
SIZE_COLUMN_NAMES,
MD5_COLUMN_NAMES,
ACLS_COLUMN_NAMES,
URLS_COLUMN_NAMES,
AUTHZ_COLUMN_NAMES,
PREV_GUID_COLUMN_NAMES,
)
from cdislogging import get_logger
logging = get_logger("__name__")
PACKAGE_CONTENTS_STANDARD_KEY = "package_contents"
PACKAGE_CONTENTS_SCHEMA = {
"type": "array",
"items": {
"type": "object",
"properties": {
"file_name": {
"type": "string",
},
"size": {
"type": "integer",
},
"hashes": {
"type": "object",
},
},
"required": ["file_name"],
"additionalProperties": True,
},
}
[docs]
class Gen3Metadata:
"""
A class for interacting with the Gen3 Metadata services.
Examples:
This generates the Gen3Metadata class pointed at the sandbox commons while
using the credentials.json downloaded from the commons profile page.
>>> auth = Gen3Auth(refresh_file="credentials.json")
... metadata = Gen3Metadata(auth)
Attributes:
endpoint (str): public endpoint for reading/querying metadata - only necessary if auth_provider not provided
auth_provider (Gen3Auth): auth manager
"""
def __init__(
self,
endpoint=None,
auth_provider=None,
service_location="mds",
admin_endpoint_suffix="-admin",
):
"""
Initialization for instance of the class to setup basic endpoint info.
Args:
endpoint (str): URL for a Data Commons that has metadata service deployed
auth_provider (Gen3Auth, optional): Gen3Auth class to handle passing your
token, required for admin endpoints
service_location (str, optional): deployment location relative to the
endpoint provided
"""
# legacy interface required endpoint as 1st arg
if endpoint and isinstance(endpoint, Gen3Auth):
auth_provider = endpoint
endpoint = None
if auth_provider and isinstance(auth_provider, Gen3Auth):
endpoint = auth_provider.endpoint
endpoint = endpoint.strip("/")
# if running locally, mds is deployed by itself without a location relative
# to the commons
if "http://localhost" in endpoint:
service_location = ""
admin_endpoint_suffix = ""
if not endpoint.endswith(service_location):
endpoint += "/" + service_location
self.endpoint = endpoint.rstrip("/")
self.admin_endpoint = endpoint.rstrip("/") + admin_endpoint_suffix
self._auth_provider = auth_provider
[docs]
def is_healthy(self):
"""
Return if is healthy or not
Returns:
bool: True if healthy
"""
try:
response = requests.get(
self.endpoint + "/_status", auth=self._auth_provider
)
response.raise_for_status()
except Exception as exc:
logging.error(exc)
return False
return response.json().get("status") == "OK"
[docs]
@backoff.on_exception(backoff.expo, Exception, **DEFAULT_BACKOFF_SETTINGS)
def get_version(self):
"""
Return the version
Returns:
str: the version
"""
response = requests.get(self.endpoint + "/version", auth=self._auth_provider)
response.raise_for_status()
return response.text
[docs]
@backoff.on_exception(backoff.expo, Exception, **DEFAULT_BACKOFF_SETTINGS)
def get_index_key_paths(self):
"""
List all the metadata key paths indexed in the database.
Returns:
List: list of metadata key paths
"""
response = requests.get(
self.admin_endpoint + "/metadata_index", auth=self._auth_provider
)
response.raise_for_status()
return response.json()
[docs]
@backoff.on_exception(backoff.expo, Exception, **DEFAULT_BACKOFF_SETTINGS)
def create_index_key_path(self, path):
"""
Create a metadata key path indexed in the database.
Args:
path (str): metadata key path
"""
response = requests.post(
self.admin_endpoint + f"/metadata_index/{path}", auth=self._auth_provider
)
response.raise_for_status()
return response.json()
[docs]
@backoff.on_exception(backoff.expo, Exception, **DEFAULT_BACKOFF_SETTINGS)
def delete_index_key_path(self, path):
"""
List all the metadata key paths indexed in the database.
Args:
path (str): metadata key path
"""
response = requests.delete(
self.admin_endpoint + f"/metadata_index/{path}", auth=self._auth_provider
)
response.raise_for_status()
return response
[docs]
@backoff.on_exception(backoff.expo, Exception, **DEFAULT_BACKOFF_SETTINGS)
def query(
self,
query,
return_full_metadata=False,
limit=10,
offset=0,
use_agg_mds=False,
**kwargs,
):
"""
Query the metadata given a query.
Query format is based off the logic used in the service:
'''
Without filters, this will return all data. Add filters as query strings like this:
GET /metadata?a=1&b=2
This will match all records that have metadata containing all of:
{"a": 1, "b": 2}
The values are always treated as strings for filtering. Nesting is supported:
GET /metadata?a.b.c=3
Matching records containing:
{"a": {"b": {"c": 3}}}
Providing the same key with more than one value filters records whose value of the given key matches any of the given values. But values of different keys must all match. For example:
GET /metadata?a.b.c=3&a.b.c=33&a.b.d=4
Matches these:
{"a": {"b": {"c": 3, "d": 4}}}
{"a": {"b": {"c": 33, "d": 4}}}
{"a": {"b": {"c": "3", "d": 4, "e": 5}}}
But won't match these:
{"a": {"b": {"c": 3}}}
{"a": {"b": {"c": 3, "d": 5}}}
{"a": {"b": {"d": 5}}}
{"a": {"b": {"c": "333", "d": 4}}}
'''
Args:
query (str): mds query as defined by the metadata api
return_full_metadata (bool, optional): if False will just return a list of guids
limit (int, optional): max num records to return
offset (int, optional): offset for output
Returns:
List: list of guids matching query
OR if return_full_metadata=True
Dict{guid: {metadata}}: Dictionary with GUIDs as keys and associated
metadata JSON blobs as values
"""
url = self.endpoint + f"/metadata?{query}"
url_with_params = append_query_params(
url, data=return_full_metadata, limit=limit, offset=offset, **kwargs
)
logging.debug(f"hitting: {url_with_params}")
response = requests.get(url_with_params, auth=self._auth_provider)
response.raise_for_status()
return response.json()
[docs]
@backoff.on_exception(backoff.expo, Exception, **BACKOFF_NO_LOG_IF_NOT_RETRIED)
async def async_get(self, guid, _ssl=None, **kwargs):
"""
Asynchronous function to get metadata
Args:
guid (str): guid to use
_ssl (None, optional): whether or not to use ssl
Returns:
Dict: metadata for given guid
"""
async with aiohttp.ClientSession() as session:
url = self.endpoint + f"/metadata/{guid}"
url_with_params = append_query_params(url, **kwargs)
logging.debug(f"hitting: {url_with_params}")
async with session.get(url_with_params, ssl=_ssl) as response:
response.raise_for_status()
response = await response.json()
return response
[docs]
@backoff.on_exception(backoff.expo, Exception, **BACKOFF_NO_LOG_IF_NOT_RETRIED)
def get(self, guid, **kwargs):
"""
Get the metadata associated with the guid
Args:
guid (str): guid to use
Returns:
Dict: metadata for given guid
"""
url = self.endpoint + f"/metadata/{guid}"
url_with_params = append_query_params(url, **kwargs)
logging.debug(f"hitting: {url_with_params}")
response = requests.get(url_with_params, auth=self._auth_provider)
response.raise_for_status()
return response.json()
[docs]
@backoff.on_exception(backoff.expo, Exception, **DEFAULT_BACKOFF_SETTINGS)
def batch_create(self, metadata_list, overwrite=True, **kwargs):
"""
Create the list of metadata associated with the list of guids
Args:
metadata_list (List[Dict{"guid": "", "data": {}}]): list of metadata
objects in a specific format. Expects a dict with "guid" and "data"
fields where "data" is another JSON blob to add to the mds
overwrite (bool, optional): whether or not to overwrite existing data
"""
url = self.admin_endpoint + f"/metadata"
if len(metadata_list) > 1 and (
"guid" not in metadata_list[0] and "data" not in metadata_list[0]
):
logging.warning(
"it looks like your metadata list for bulk create is malformed. "
"the expected format is a list of dicts that have 2 keys: 'guid' "
"and 'data', where 'guid' is a string and 'data' is another dict. "
f"The first element doesn't match that pattern: {metadata_list[0]}"
)
url_with_params = append_query_params(url, overwrite=overwrite, **kwargs)
logging.debug(f"hitting: {url_with_params}")
logging.debug(f"data: {metadata_list}")
response = requests.post(
url_with_params, json=metadata_list, auth=self._auth_provider
)
response.raise_for_status()
return response.json()
[docs]
@backoff.on_exception(backoff.expo, Exception, **BACKOFF_NO_LOG_IF_NOT_RETRIED)
def create(self, guid, metadata, aliases=None, overwrite=False, **kwargs):
"""
Create the metadata associated with the guid
Args:
guid (str): guid to use
metadata (Dict): dictionary representing what will end up a JSON blob
attached to the provided GUID as metadata
overwrite (bool, optional): whether or not to overwrite existing data
"""
aliases = aliases or []
url = self.admin_endpoint + f"/metadata/{guid}"
url_with_params = append_query_params(url, overwrite=overwrite, **kwargs)
logging.debug(f"hitting: {url_with_params}")
logging.debug(f"data: {metadata}")
response = requests.post(
url_with_params, json=metadata, auth=self._auth_provider
)
response.raise_for_status()
if aliases:
try:
self.create_aliases(guid=guid, aliases=aliases, merge=overwrite)
except Exception:
logging.error(
"Error while attempting to create aliases: "
f"'{aliases}' to GUID: '{guid}' with merge={overwrite}. "
"GUID metadata record was created successfully and "
"will NOT be deleted."
)
return response.json()
[docs]
@backoff.on_exception(backoff.expo, Exception, **BACKOFF_NO_LOG_IF_NOT_RETRIED)
async def async_create(
self,
guid,
metadata,
aliases=None,
overwrite=False,
_ssl=None,
**kwargs,
):
"""
Asynchronous function to create metadata
Args:
guid (str): guid to use
metadata (Dict): dictionary representing what will end up a JSON blob
attached to the provided GUID as metadata
overwrite (bool, optional): whether or not to overwrite existing data
_ssl (None, optional): whether or not to use ssl
"""
aliases = aliases or []
async with aiohttp.ClientSession() as session:
url = self.admin_endpoint + f"/metadata/{guid}"
url_with_params = append_query_params(url, overwrite=overwrite, **kwargs)
# aiohttp only allows basic auth with their built in auth, so we
# need to manually add JWT auth header
headers = {"Authorization": self._auth_provider._get_auth_value()}
logging.debug(f"hitting: {url_with_params}")
logging.debug(f"data: {metadata}")
async with session.post(
url_with_params, json=metadata, headers=headers, ssl=_ssl
) as response:
response.raise_for_status()
response = await response.json()
if aliases:
logging.info(f"creating aliases: {aliases}")
try:
await self.async_create_aliases(
guid=guid, aliases=aliases, _ssl=_ssl
)
except Exception:
logging.error(
"Error while attempting to create aliases: "
f"'{aliases}' to GUID: '{guid}'. "
"GUID metadata record was created successfully and "
"will NOT be deleted."
)
return response
[docs]
@backoff.on_exception(backoff.expo, Exception, **DEFAULT_BACKOFF_SETTINGS)
def update(self, guid, metadata, aliases=None, merge=False, **kwargs):
"""
Update the metadata associated with the guid
Args:
guid (str): guid to use
metadata (Dict): dictionary representing what will end up a JSON blob
attached to the provided GUID as metadata
"""
aliases = aliases or []
url = self.admin_endpoint + f"/metadata/{guid}"
url_with_params = append_query_params(url, **kwargs)
logging.debug(f"hitting: {url_with_params}")
logging.debug(f"data: {metadata}")
response = requests.put(
url_with_params, json=metadata, auth=self._auth_provider
)
response.raise_for_status()
if aliases:
try:
self.update_aliases(guid=guid, aliases=aliases, merge=merge)
except Exception:
logging.error(
"Error while attempting to update aliases: "
f"'{aliases}' to GUID: '{guid}'. "
"GUID metadata record was created successfully and "
"will NOT be deleted."
)
return response.json()
[docs]
@backoff.on_exception(backoff.expo, Exception, **DEFAULT_BACKOFF_SETTINGS)
async def async_update(
self, guid, metadata, aliases=None, merge=False, _ssl=None, **kwargs
):
"""
Asynchronous function to update metadata
Args:
guid (str): guid to use
metadata (Dict): dictionary representing what will end up a JSON blob
attached to the provided GUID as metadata
aliases (list[str], optional): List of aliases to update the GUID with
merge (bool, optional): Whether or not to merge metadata AND aliases
with existing values
_ssl (None, optional): whether or not to use ssl
**kwargs: Description
"""
aliases = aliases or []
async with aiohttp.ClientSession() as session:
url = self.admin_endpoint + f"/metadata/{guid}"
url_with_params = append_query_params(url, merge=merge, **kwargs)
# aiohttp only allows basic auth with their built in auth, so we
# need to manually add JWT auth header
headers = {"Authorization": self._auth_provider._get_auth_value()}
async with session.put(
url_with_params, json=metadata, headers=headers, ssl=_ssl
) as response:
response.raise_for_status()
response = await response.json()
if aliases:
try:
await self.async_update_aliases(
guid=guid, aliases=aliases, merge=merge, _ssl=_ssl
)
except Exception:
logging.error(
"Error while attempting to update aliases: "
f"'{aliases}' to GUID: '{guid}' with merge={merge}. "
"GUID metadata record was created successfully and "
"will NOT be deleted."
)
return response
[docs]
@backoff.on_exception(backoff.expo, Exception, **DEFAULT_BACKOFF_SETTINGS)
def delete(self, guid, **kwargs):
"""
Delete the metadata associated with the guid
Args:
guid (str): guid to use
"""
url = self.admin_endpoint + f"/metadata/{guid}"
url_with_params = append_query_params(url, **kwargs)
logging.debug(f"hitting: {url_with_params}")
response = requests.delete(url_with_params, auth=self._auth_provider)
response.raise_for_status()
return response.json()
#
# Alias Support
#
[docs]
@backoff.on_exception(backoff.expo, Exception, **BACKOFF_NO_LOG_IF_NOT_RETRIED)
def get_aliases(self, guid, **kwargs):
"""
Get Aliases for the given guid
Args:
guid (TYPE): Globally unique ID for the metadata blob
**kwargs: additional query params
Returns:
requests.Response: response from the request to get aliases
"""
url = self.endpoint + f"/metadata/{guid}/aliases"
url_with_params = append_query_params(url, **kwargs)
logging.debug(f"hitting: {url_with_params}")
response = requests.get(url_with_params, auth=self._auth_provider)
response.raise_for_status()
return response.json()
[docs]
@backoff.on_exception(backoff.expo, Exception, **BACKOFF_NO_LOG_IF_NOT_RETRIED)
async def async_get_aliases(self, guid, _ssl=None, **kwargs):
"""
Asyncronously get Aliases for the given guid
Args:
guid (TYPE): Globally unique ID for the metadata blob
_ssl (None, optional): whether or not to use ssl
**kwargs: additional query params
Returns:
requests.Response: response from the request to get aliases
"""
async with aiohttp.ClientSession() as session:
url = self.endpoint + f"/metadata/{guid}/aliases"
url_with_params = append_query_params(url, **kwargs)
# aiohttp only allows basic auth with their built in auth, so we
# need to manually add JWT auth header
headers = {"Authorization": self._auth_provider._get_auth_value()}
logging.debug(f"hitting: {url_with_params}")
async with session.get(
url_with_params, headers=headers, ssl=_ssl
) as response:
response.raise_for_status()
return await response.json()
@backoff.on_exception(backoff.expo, Exception, **BACKOFF_NO_LOG_IF_NOT_RETRIED)
def delete_alias(self, guid, alias, **kwargs):
"""
Delete single Alias for the given guid
Args:
guid (TYPE): Globally unique ID for the metadata blob
**kwargs: additional query params
Returns:
requests.Response: response from the request to delete aliases
"""
url = self.admin_endpoint + f"/metadata/{guid}/aliases/{alias}"
url_with_params = append_query_params(url, **kwargs)
logging.debug(f"hitting: {url_with_params}")
response = requests.delete(url_with_params, auth=self._auth_provider)
response.raise_for_status()
return response.json()
@backoff.on_exception(backoff.expo, Exception, **BACKOFF_NO_LOG_IF_NOT_RETRIED)
async def async_delete_alias(self, guid, alias, _ssl=None, **kwargs):
"""
Asyncronously delete single Aliases for the given guid
Args:
guid (TYPE): Globally unique ID for the metadata blob
_ssl (None, optional): whether or not to use ssl
**kwargs: additional query params
Returns:
requests.Response: response from the request to delete aliases
"""
async with aiohttp.ClientSession() as session:
url = self.admin_endpoint + f"/metadata/{guid}/aliases/{alias}"
url_with_params = append_query_params(url, **kwargs)
# aiohttp only allows basic auth with their built in auth, so we
# need to manually add JWT auth header
headers = {"Authorization": self._auth_provider._get_auth_value()}
logging.debug(f"hitting: {url_with_params}")
async with session.delete(
url_with_params, headers=headers, ssl=_ssl
) as response:
response.raise_for_status()
return await response.json()
[docs]
@backoff.on_exception(backoff.expo, Exception, **BACKOFF_NO_LOG_IF_NOT_RETRIED)
def create_aliases(self, guid, aliases, **kwargs):
"""
Create Aliases for the given guid
Args:
guid (TYPE): Globally unique ID for the metadata blob
aliases (list[str]): Aliases to set for the guid
**kwargs: additional query params
Returns:
requests.Response: response from the request to create aliases
"""
url = self.admin_endpoint + f"/metadata/{guid}/aliases"
url_with_params = append_query_params(url, **kwargs)
data = {"aliases": aliases}
logging.debug(f"hitting: {url_with_params}")
logging.debug(f"data: {data}")
response = requests.post(url_with_params, json=data, auth=self._auth_provider)
response.raise_for_status()
return response.json()
[docs]
@backoff.on_exception(backoff.expo, Exception, **BACKOFF_NO_LOG_IF_NOT_RETRIED)
async def async_create_aliases(self, guid, aliases, _ssl=None, **kwargs):
"""
Asyncronously create Aliases for the given guid
Args:
guid (TYPE): Globally unique ID for the metadata blob
aliases (list[str]): Aliases to set for the guid
_ssl (None, optional): whether or not to use ssl
**kwargs: additional query params
Returns:
requests.Response: response from the request to create aliases
"""
async with aiohttp.ClientSession() as session:
url = self.admin_endpoint + f"/metadata/{guid}/aliases"
url_with_params = append_query_params(url, **kwargs)
# aiohttp only allows basic auth with their built in auth, so we
# need to manually add JWT auth header
headers = {"Authorization": self._auth_provider._get_auth_value()}
data = {"aliases": aliases}
logging.debug(f"hitting: {url_with_params}")
logging.debug(f"data: {data}")
async with session.post(
url_with_params, json=data, headers=headers, ssl=_ssl
) as response:
response.raise_for_status()
return await response.json()
[docs]
@backoff.on_exception(backoff.expo, Exception, **BACKOFF_NO_LOG_IF_NOT_RETRIED)
def update_aliases(self, guid, aliases, merge=False, **kwargs):
"""
Update Aliases for the given guid
Args:
guid (TYPE): Globally unique ID for the metadata blob
aliases (list[str]): Aliases to set for the guid
merge (bool, optional): Whether or not to aliases with existing values
**kwargs: additional query params
Returns:
requests.Response: response from the request to update aliases
"""
url = self.admin_endpoint + f"/metadata/{guid}/aliases"
url_with_params = append_query_params(url, merge=merge, **kwargs)
data = {"aliases": aliases}
logging.debug(f"hitting: {url_with_params}")
logging.debug(f"data: {data}")
response = requests.put(url_with_params, json=data, auth=self._auth_provider)
response.raise_for_status()
return response.json()
[docs]
@backoff.on_exception(backoff.expo, Exception, **BACKOFF_NO_LOG_IF_NOT_RETRIED)
async def async_update_aliases(
self, guid, aliases, merge=False, _ssl=None, **kwargs
):
"""
Asyncronously update Aliases for the given guid
Args:
guid (TYPE): Globally unique ID for the metadata blob
aliases (list[str]): Aliases to set for the guid
merge (bool, optional): Whether or not to aliases with existing values
_ssl (None, optional): whether or not to use ssl
**kwargs: additional query params
Returns:
requests.Response: response from the request to update aliases
"""
async with aiohttp.ClientSession() as session:
url = self.admin_endpoint + f"/metadata/{guid}/aliases"
url_with_params = append_query_params(url, merge=merge, **kwargs)
# aiohttp only allows basic auth with their built in auth, so we
# need to manually add JWT auth header
headers = {"Authorization": self._auth_provider._get_auth_value()}
data = {"aliases": aliases}
logging.debug(f"hitting: {url_with_params}")
logging.debug(f"data: {data}")
async with session.put(
url_with_params, json=data, headers=headers, ssl=_ssl
) as response:
response.raise_for_status()
return await response.json()
[docs]
@backoff.on_exception(backoff.expo, Exception, **BACKOFF_NO_LOG_IF_NOT_RETRIED)
def delete_aliases(self, guid, **kwargs):
"""
Delete all Aliases for the given guid
Args:
guid (TYPE): Globally unique ID for the metadata blob
**kwargs: additional query params
Returns:
requests.Response: response from the request to delete aliases
"""
url = self.admin_endpoint + f"/metadata/{guid}/aliases"
url_with_params = append_query_params(url, **kwargs)
logging.debug(f"hitting: {url_with_params}")
response = requests.delete(url_with_params, auth=self._auth_provider)
response.raise_for_status()
return response.text
[docs]
@backoff.on_exception(backoff.expo, Exception, **BACKOFF_NO_LOG_IF_NOT_RETRIED)
async def async_delete_aliases(self, guid, _ssl=None, **kwargs):
"""
Asyncronously delete all Aliases for the given guid
Args:
guid (TYPE): Globally unique ID for the metadata blob
_ssl (None, optional): whether or not to use ssl
**kwargs: additional query params
Returns:
requests.Response: response from the request to delete aliases
"""
async with aiohttp.ClientSession() as session:
url = self.admin_endpoint + f"/metadata/{guid}/aliases"
url_with_params = append_query_params(url, **kwargs)
# aiohttp only allows basic auth with their built in auth, so we
# need to manually add JWT auth header
headers = {"Authorization": self._auth_provider._get_auth_value()}
logging.debug(f"hitting: {url_with_params}")
async with session.delete(
url_with_params, headers=headers, ssl=_ssl
) as response:
response.raise_for_status()
return await response.text
[docs]
@backoff.on_exception(backoff.expo, Exception, **BACKOFF_NO_LOG_IF_NOT_RETRIED)
def delete_alias(self, guid, alias, **kwargs):
"""
Delete single Alias for the given guid
Args:
guid (TYPE): Globally unique ID for the metadata blob
alias (str): alternative identifier (alias) to delete
**kwargs: additional query params
Returns:
requests.Response: response from the request to delete aliases
"""
url = self.admin_endpoint + f"/metadata/{guid}/aliases/{alias}"
url_with_params = append_query_params(url, **kwargs)
logging.debug(f"hitting: {url_with_params}")
response = requests.delete(url_with_params, auth=self._auth_provider)
response.raise_for_status()
return response.text
[docs]
@backoff.on_exception(backoff.expo, Exception, **BACKOFF_NO_LOG_IF_NOT_RETRIED)
async def async_delete_alias(self, guid, alias, _ssl=None, **kwargs):
"""
Asyncronously delete single Aliases for the given guid
Args:
guid (str): Globally unique ID for the metadata blob
alias (str): alternative identifier (alias) to delete
_ssl (None, optional): whether or not to use ssl
**kwargs: additional query params
Returns:
requests.Response: response from the request to delete aliases
"""
async with aiohttp.ClientSession() as session:
url = self.admin_endpoint + f"/metadata/{guid}/aliases/{alias}"
url_with_params = append_query_params(url, **kwargs)
# aiohttp only allows basic auth with their built in auth, so we
# need to manually add JWT auth header
headers = {"Authorization": self._auth_provider._get_auth_value()}
logging.debug(f"hitting: {url_with_params}")
async with session.delete(
url_with_params, headers=headers, ssl=_ssl
) as response:
response.raise_for_status()
return await response.text
def _prepare_metadata(
self, metadata, indexd_doc, force_metadata_columns_even_if_empty
):
"""
Validate and generate the provided metadata for submission to the metadata
service.
If the record is of type "package", also prepare package metadata.
Args:
metadata (dict): metadata provided by the submitter
indexd_doc (dict): the indexd document created for this data
force_metadata_columns_even_if_empty (bool): see description in calling function
Returns:
dict: metadata ready to be submitted to the metadata service
"""
def _extract_non_indexd_metadata(metadata):
"""
Get the "additional metadata": metadata that was provided but is
not stored in indexd, so should be stored in the metadata service.
"""
return {
k: v
for k, v in metadata.items()
if k.lower()
not in GUID_COLUMN_NAMES
+ FILENAME_COLUMN_NAMES
+ SIZE_COLUMN_NAMES
+ MD5_COLUMN_NAMES
+ ACLS_COLUMN_NAMES
+ URLS_COLUMN_NAMES
+ AUTHZ_COLUMN_NAMES
+ PREV_GUID_COLUMN_NAMES
}
to_submit = _extract_non_indexd_metadata(metadata)
# some additional metadata columns must be validated
valid = True
# validate package columns
record_type = to_submit.pop(RECORD_TYPE_STANDARD_KEY, "").strip().lower()
package_contents = to_submit.pop(PACKAGE_CONTENTS_STANDARD_KEY, None)
if record_type == "package":
if package_contents:
package_contents = json.loads(package_contents)
if not _verify_schema(package_contents, PACKAGE_CONTENTS_SCHEMA):
logging.error(
f"ERROR: {package_contents} is not in package contents format"
)
valid = False
# generate package metadata
package_metadata = self._get_package_metadata(
metadata,
indexd_doc.file_name,
indexd_doc.size,
indexd_doc.hashes,
indexd_doc.urls,
package_contents,
)
to_submit.update(package_metadata)
elif package_contents:
logging.error(
f"ERROR: tried to set '{PACKAGE_CONTENTS_STANDARD_KEY}' for a non-package row. Ignoring '{PACKAGE_CONTENTS_STANDARD_KEY}'. Set '{RECORD_TYPE_STANDARD_KEY}' to 'package' to create packages."
)
valid = False
if not valid:
raise Exception(f"Metadata is not valid: {metadata}")
if not force_metadata_columns_even_if_empty:
# remove any empty columns if we're not being forced to include them
to_submit = {
key: value
for key, value in to_submit.items()
if value is not None and value != ""
}
return to_submit
def _get_package_metadata(
self, submitted_metadata, file_name, file_size, hashes, urls, contents
):
"""
The MDS Objects API currently expects files that have not been
uploaded yet. For files we only needs to index, not upload, create
object records manually by generating the expected object fields.
TODO: update the MDS objects API to not create upload URLs if the
relevant data is provided.
"""
def _get_filename_from_urls(submitted_metadata, urls):
file_name = ""
if not urls:
logging.warning(f"No URLs provided for: {submitted_metadata}")
for url in urls:
_file_name = os.path.basename(url)
if not file_name:
file_name = _file_name
else:
if file_name != _file_name:
logging.warning(
f"Received multiple URLs with different file names; will use the first URL (file name '{file_name}'): {submitted_metadata}"
)
return file_name
file_name_from_url = _get_filename_from_urls(submitted_metadata, urls)
if not file_name:
file_name = file_name_from_url
now = str(datetime.utcnow())
metadata = {
"type": "package",
"package": {
"version": "0.1",
"file_name": file_name,
"created_time": now,
"updated_time": now,
"size": file_size,
"hashes": hashes,
"contents": contents or None,
},
"_upload_status": "uploaded",
}
return metadata