Source code for gen3.jobs

"""
Contains class for interacting with Gen3's Job Dispatching Service(s).
"""
import aiohttp
import asyncio
import backoff
import json
import requests
import urllib.parse
from cdislogging import get_logger

import sys
import time

from gen3.utils import (
    append_query_params,
    DEFAULT_BACKOFF_SETTINGS,
    raise_for_status_and_print_error,
)

# sower's "action" mapping to the relevant job
INGEST_METADATA_JOB = "ingest-metadata-manifest"
DBGAP_METADATA_JOB = "get-dbgap-metadata"
INDEX_MANIFEST_JOB = "index-object-manifest"
DOWNLOAD_MANIFEST_JOB = "download-indexd-manifest"
MERGE_MANIFEST_JOB = "merge-manifests"

logging = get_logger("__name__")


[docs] class Gen3Jobs: """ A class for interacting with the Gen3's Job Dispatching Service(s). Examples: This generates the Gen3Jobs class pointed at the sandbox commons while using the credentials.json downloaded from the commons profile page. >>> auth = Gen3Auth(refresh_file="credentials.json") ... jobs = Gen3Jobs(auth) """ def __init__(self, endpoint=None, auth_provider=None, service_location="job"): """ Initialization for instance of the class to setup basic endpoint info. Args: auth_provider (Gen3Auth, optional): Gen3Auth class to handle passing your token, required for admin endpoints service_location (str, optional): deployment location relative to the endpoint provided """ # auth_provider legacy interface required endpoint as 1st arg auth_provider = auth_provider or endpoint endpoint = auth_provider.endpoint.strip("/") # if running locally, mds is deployed by itself without a location relative # to the commons if "http://localhost" in endpoint: service_location = "" if not endpoint.endswith(service_location): endpoint += "/" + service_location self.endpoint = endpoint.rstrip("/") self._auth_provider = auth_provider
[docs] async def async_run_job_and_wait(self, job_name, job_input, _ssl=None, **kwargs): """ Asynchronous function to create a job, wait for output, and return. Will sleep in a linear delay until the job is done, starting with 1 second. Args: _ssl (None, optional): whether or not to use ssl job_name (str): name for the job, can use globals in this file job_input (Dict): dictionary of input for the job Returns: Dict: Response from the endpoint """ job_create_response = await self.async_create_job(job_name, job_input) status = {"status": "Running"} sleep_time = 3 while status.get("status") == "Running": logging.info(f"job still running, waiting for {sleep_time} seconds...") time.sleep(sleep_time) sleep_time *= 1.5 status = await self.async_get_status(job_create_response.get("uid")) logging.info(f"{status}") logging.info(f"Job is finished!") if status.get("status") != "Completed": raise Exception(f"Job status not complete: {status.get('status')}.") response = await self.async_get_output(job_create_response.get("uid")) return response
[docs] def is_healthy(self): """ Return if is healthy or not Returns: bool: True if healthy """ try: response = requests.get( self.endpoint + "/_status", auth=self._auth_provider ) raise_for_status_and_print_error(response) except Exception as exc: logging.error(exc) return False return response.text == "Healthy"
[docs] @backoff.on_exception(backoff.expo, Exception, **DEFAULT_BACKOFF_SETTINGS) def get_version(self): """ Return the version Returns: str: the version """ response = requests.get(self.endpoint + "/_version", auth=self._auth_provider) raise_for_status_and_print_error(response) return response.json().get("version")
[docs] @backoff.on_exception(backoff.expo, Exception, **DEFAULT_BACKOFF_SETTINGS) def list_jobs(self): """ List all jobs """ response = requests.get(self.endpoint + "/list", auth=self._auth_provider) raise_for_status_and_print_error(response) return response.json()
[docs] @backoff.on_exception(backoff.expo, Exception, **DEFAULT_BACKOFF_SETTINGS) def create_job(self, job_name, job_input): """ Create a job with given name and input Args: job_name (str): name for the job, can use globals in this file job_input (Dict): dictionary of input for the job Returns: Dict: Response from the endpoint """ data = {"action": job_name, "input": job_input} response = requests.post( self.endpoint + "/dispatch", json=data, auth=self._auth_provider ) raise_for_status_and_print_error(response) return response.json()
@backoff.on_exception(backoff.expo, Exception, **DEFAULT_BACKOFF_SETTINGS) async def async_create_job(self, job_name, job_input, _ssl=None, **kwargs): async with aiohttp.ClientSession() as session: url = self.endpoint + f"/dispatch" url_with_params = append_query_params(url, **kwargs) data = json.dumps({"action": job_name, "input": job_input}) # aiohttp only allows basic auth with their built in auth, so we # need to manually add JWT auth header headers = {"Authorization": self._auth_provider._get_auth_value()} async with session.post( url_with_params, data=data, headers=headers, ssl=_ssl ) as response: raise_for_status_and_print_error(response) response = await response.json(content_type=None) return response
[docs] @backoff.on_exception(backoff.expo, Exception, **DEFAULT_BACKOFF_SETTINGS) def get_status(self, job_id): """ Get the status of a previously created job """ response = requests.get( self.endpoint + f"/status?UID={job_id}", auth=self._auth_provider ) raise_for_status_and_print_error(response) return response.json()
@backoff.on_exception(backoff.expo, Exception, **DEFAULT_BACKOFF_SETTINGS) async def async_get_status(self, job_id, _ssl=None, **kwargs): async with aiohttp.ClientSession() as session: url = self.endpoint + f"/status?UID={job_id}" url_with_params = append_query_params(url, **kwargs) # aiohttp only allows basic auth with their built in auth, so we # need to manually add JWT auth header headers = {"Authorization": self._auth_provider._get_auth_value()} async with session.get( url_with_params, headers=headers, ssl=_ssl ) as response: raise_for_status_and_print_error(response) response = await response.json(content_type=None) return response
[docs] @backoff.on_exception(backoff.expo, Exception, **DEFAULT_BACKOFF_SETTINGS) def get_output(self, job_id): """ Get the output of a previously completed job """ response = requests.get( self.endpoint + f"/output?UID={job_id}", auth=self._auth_provider ) raise_for_status_and_print_error(response) return response.json()
@backoff.on_exception(backoff.expo, Exception, **DEFAULT_BACKOFF_SETTINGS) async def async_get_output(self, job_id, _ssl=None, **kwargs): async with aiohttp.ClientSession() as session: url = self.endpoint + f"/output?UID={job_id}" url_with_params = append_query_params(url, **kwargs) # aiohttp only allows basic auth with their built in auth, so we # need to manually add JWT auth header headers = {"Authorization": self._auth_provider._get_auth_value()} async with session.get( url_with_params, headers=headers, ssl=_ssl ) as response: raise_for_status_and_print_error(response) response = await response.json(content_type=None) return response