Source code for gen3.tools.indexing.verify_manifest

"""
Module for indexing actions for verifying a manifest of
indexed file objects (against indexd's API). Supports
multiple processes and coroutines using Python's asyncio library.

The default manifest format created is a Comma-Separated Value file (csv)
with rows for every record. A header row is created with field names:
guid,authz,acl,file_size,md5,urls,file_name

Fields that are lists (like acl, authz, and urls) separate the values with spaces.

There is a default mapping for those column names above but you can override it.
Fields that expect lists (like acl, authz, and urls) by default assume these values are
separated with spaces. If you need alternate behavior, you can simply override the
`manifest_row_parsers` for specific fields and replace the default parsing function
with a custom one. For example:

```
from gen3.tools import indexing
from gen3.tools.indexing.verify_manifest import manifest_row_parsers

def _get_authz_from_row(row):
    return [row.get("authz").strip().strip("[").strip("]").strip("'")]

# override default parsers
manifest_row_parsers["authz"] = _get_authz_from_row

indexing.verify_object_manifest(COMMONS)
```

The output from this verification is a file containing any errors in the following
format:

{guid}|{error_name}|expected {value_from_manifest}|actual {value_from_indexd}
ex: 93d9af72-b0f1-450c-a5c6-7d3d8d2083b4|authz|expected ['']|actual ['/programs/DEV/projects/test']

Attributes:
    CURRENT_DIR (str): directory this file is in
    MAX_CONCURRENT_REQUESTS (int): maximum number of desired concurrent requests across
        processes/threads
"""
import aiohttp
import asyncio
import csv
from cdislogging import get_logger

import os
import time

from gen3.index import Gen3Index
from gen3.utils import get_or_create_event_loop_for_thread

MAX_CONCURRENT_REQUESTS = 24
CURRENT_DIR = os.path.dirname(os.path.realpath(__file__))

logging = get_logger("__name__")


def _get_guid_from_row(row):
    """
    Given a row from the manifest, return the field representing expected indexd guid.

    Args:
        row (dict): column_name:row_value

    Returns:
        str: guid
    """
    guid = row.get("guid")
    if not guid:
        guid = row.get("GUID")
    return guid


def _get_md5_from_row(row):
    """
    Given a row from the manifest, return the field representing file's md5 sum.

    Args:
        row (dict): column_name:row_value

    Returns:
        str: md5 sum for file
    """
    if "md5" in row:
        return row["md5"]
    elif "md5sum" in row:
        return row["md5sum"]
    else:
        return None


def _get_file_size_from_row(row):
    """
    Given a row from the manifest, return the field representing file's size in bytes.

    Args:
        row (dict): column_name:row_value

    Returns:
        int: integer representing file size in bytes
    """
    try:
        if "file_size" in row:
            return int(row["file_size"])
        elif "size" in row:
            return int(row["size"])
        else:
            return None
    except Exception:
        logging.warning(f"could not convert this to an int: {row.get('file_size')}")
        return row.get("file_size")


def _get_acl_from_row(row):
    """
    Given a row from the manifest, return the field representing file's expected acls.

    Args:
        row (dict): column_name:row_value

    Returns:
        List[str]: acls for the indexd record
    """
    return [item for item in row.get("acl", "").strip().split(" ") if item]


def _get_authz_from_row(row):
    """
    Given a row from the manifest, return the field representing file's expected authz
    resources.

    Args:
        row (dict): column_name:row_value

    Returns:
        List[str]: authz resources for the indexd record
    """
    return [item for item in row.get("authz", "").strip().split(" ") if item]


def _get_urls_from_row(row):
    """
    Given a row from the manifest, return the field representing file's expected urls.

    Args:
        row (dict): column_name:row_value

    Returns:
        List[str]: urls for indexd record file location(s)
    """
    if "urls" in row:
        return [item for item in row.get("urls", "").strip().split(" ") if item]
    elif "url" in row:
        return [item for item in row.get("urls", "").strip().split(" ") if item]
    else:
        return []


def _get_file_name_from_row(row):
    """
    Given a row from the manifest, return the field representing file's expected file_name.

    Args:
        row (dict): column_name:row_value

    Returns:
        List[str]: file_name for indexd record file location(s)
    """
    if "file_name" in row:
        return row["file_name"]
    elif "filename" in row:
        return row["filename"]
    elif "name" in row:
        return row["name"]
    else:
        return None


manifest_row_parsers = {
    "guid": _get_guid_from_row,
    "md5": _get_md5_from_row,
    "file_size": _get_file_size_from_row,
    "acl": _get_acl_from_row,
    "authz": _get_authz_from_row,
    "urls": _get_urls_from_row,
    "file_name": _get_file_name_from_row,
}


[docs] async def async_verify_object_manifest( commons_url, manifest_file, max_concurrent_requests=MAX_CONCURRENT_REQUESTS, manifest_row_parsers=manifest_row_parsers, manifest_file_delimiter=None, output_filename=f"verify-manifest-errors-{time.time()}.log", ): """ Verify all file object records into a manifest csv Args: commons_url (str): root domain for commons where indexd lives manifest_file (str): the file to verify against max_concurrent_requests (int): the maximum number of concurrent requests allowed manifest_row_parsers (Dict{indexd_field:func_to_parse_row}): Row parsers manifest_file_delimiter (str): delimeter in manifest_file output_filename (str): filename for output logs """ start_time = time.perf_counter() logging.info(f"start time: {start_time}") # if delimiter not specified, try to get based on file ext if not manifest_file_delimiter: file_ext = os.path.splitext(manifest_file) if file_ext[-1].lower() == ".tsv": manifest_file_delimiter = "\t" else: # default, assume CSV manifest_file_delimiter = "," logging.debug(f"detected {manifest_file_delimiter} as delimiter between columns") await _verify_all_index_records_in_file( commons_url, manifest_file, manifest_file_delimiter, max_concurrent_requests, output_filename, ) end_time = time.perf_counter() logging.info(f"end time: {end_time}") logging.info(f"run time: {end_time-start_time}")
async def _verify_all_index_records_in_file( commons_url, manifest_file, manifest_file_delimiter, max_concurrent_requests, output_filename, ): """ Getting indexd records and writing to a file. This function creates semaphores to limit the number of concurrent http connections that get opened to send requests to indexd. It then uses asyncio to start a number of coroutines. Steps: 1) requests to indexd to get records (writes resulting records to a queue) 2) puts a final "DONE" in the queue to stop coroutine that will read from queue 3) reading those records from the queue and writing to a file Args: commons_url (str): root domain for commons where indexd lives manifest_file (str): the file to verify against manifest_file_delimiter (str): delimeter in manifest_file output_filename (str, optional): filename for output max_concurrent_requests (int): the maximum number of concurrent requests allowed """ max_requests = int(max_concurrent_requests) logging.debug(f"max concurrent requests: {max_requests}") lock = asyncio.Semaphore(max_requests) queue = asyncio.Queue() output_queue = asyncio.Queue() with open(manifest_file, encoding="utf-8-sig") as manifest: reader = csv.DictReader(manifest, delimiter=manifest_file_delimiter) for row in reader: new_row = {} for key, value in row.items(): if value: value = value.strip() new_row[key.strip()] = value await queue.put(new_row) for _ in range(0, int(max_concurrent_requests + (max_concurrent_requests / 4))): await queue.put("DONE") await asyncio.gather( *( _parse_from_queue(queue, lock, commons_url, output_queue) # why "+ (max_concurrent_requests / 4)"? # This is because the max requests at any given time could be # waiting for metadata responses all at once and there's processing done # before that semaphore, so this just adds a few extra processes to get # through the queue up to that point of metadata requests so it's ready # right away when a lock is released. Not entirely necessary but speeds # things up a tiny bit to always ensure something is waiting for that lock for x in range( 0, int(max_concurrent_requests + (max_concurrent_requests / 4)) ) ) ) output_filename = os.path.abspath(output_filename) logging.info( f"done processing, writing output queue to single file {output_filename}" ) # remove existing output if it exists if os.path.isfile(output_filename): os.unlink(output_filename) with open(output_filename, "w") as outfile: while not output_queue.empty(): line = await output_queue.get() outfile.write(line) logging.info(f"done writing output to file {output_filename}") async def _parse_from_queue(queue, lock, commons_url, output_queue): """ Keep getting items from the queue and verifying that indexd contains the expected fields from that row. If there are any issues, log errors into a file. Return when nothing is left in the queue. Args: queue (asyncio.Queue): queue to read indexd records from lock (asyncio.Semaphore): semaphones used to limit ammount of concurrent http connections commons_url (str): root domain for commons where indexd lives output_queue (asyncio.Queue): queue for output """ loop = get_or_create_event_loop_for_thread() row = await queue.get() while row != "DONE": guid = manifest_row_parsers["guid"](row) authz = manifest_row_parsers["authz"](row) acl = manifest_row_parsers["acl"](row) file_size = manifest_row_parsers["file_size"](row) md5 = manifest_row_parsers["md5"](row) urls = manifest_row_parsers["urls"](row) file_name = manifest_row_parsers["file_name"](row) actual_record = await _get_record_from_indexd(guid, commons_url, lock) if not actual_record: output = f"{guid}|no_record|expected {row}|actual None\n" await output_queue.put(output) logging.error(output) else: logging.info(f"verifying {guid}...") if sorted(authz) != sorted(actual_record["authz"]): output = ( f"{guid}|authz|expected {authz}|actual {actual_record['authz']}\n" ) await output_queue.put(output) logging.error(output) if sorted(acl) != sorted(actual_record["acl"]): output = f"{guid}|acl|expected {acl}|actual {actual_record['acl']}\n" await output_queue.put(output) logging.error(output) if file_size != actual_record["size"]: if ( not file_size and file_size != 0 and not actual_record["size"] and actual_record["size"] != 0 ): # actual and expected are both either empty string or None # so even though they're not equal, they represent null value so # we don't need to consider this an error in validation pass else: output = f"{guid}|file_size|expected {file_size}|actual {actual_record['size']}\n" await output_queue.put(output) logging.error(output) if md5 != actual_record["hashes"].get("md5"): if ( not md5 and md5 != 0 and not actual_record["hashes"].get("md5") and actual_record["hashes"].get("md5") != 0 ): # actual and expected are both either empty string or None # so even though they're not equal, they represent null value so # we don't need to consider this an error in validation pass else: output = f"{guid}|md5|expected {md5}|actual {actual_record['hashes'].get('md5')}\n" await output_queue.put(output) logging.error(output) urls = [url.replace("%20", " ") for url in urls] if sorted(urls) != sorted(actual_record["urls"]): output = f"{guid}|urls|expected {urls}|actual {actual_record['urls']}\n" await output_queue.put(output) logging.error(output) if not actual_record["file_name"] and file_name: # if the actual record name is "" or None but something was specified # in the manifest, we have a problem output = f"{guid}|file_name|expected {file_name}|actual {actual_record['file_name']}\n" await output_queue.put(output) logging.error(output) row = await queue.get() async def _get_record_from_indexd(guid, commons_url, lock): """ Gets a semaphore then requests a record for the given guid Args: guid (str): indexd record globally unique id commons_url (str): root domain for commons where indexd lives lock (asyncio.Semaphore): semaphones used to limit ammount of concurrent http connections """ index = Gen3Index(commons_url) async with lock: # default ssl handling unless it's explicitly http:// ssl = None if "https" not in commons_url: ssl = False record = None try: return await index.async_get_record(guid, _ssl=ssl) except aiohttp.client_exceptions.ClientResponseError as exc: logging.warning(f"couldn't get record. error: {exc}") return record