Source code for antinex_client.scripts.ai_env_predict

#!/usr/bin/env python

import os
import sys
import json
import argparse
import pandas as pd
from spylunking.log.setup_logging import console_logger
from antinex_client.utils import ev
from antinex_client.utils import ppj
from antinex_client.consts import LOGIN_FAILED
from antinex_client.consts import SUCCESS
from antinex_client.consts import ERROR
from antinex_client.consts import FAILED
from antinex_client.build_ai_client_from_env import build_ai_client_from_env
from antinex_client.generate_ai_request import generate_ai_request


log = console_logger(
    name='env_predict')


[docs]def start_predictions(): """start_predictions Using environment variables, create an AntiNex AI Client. You can also use command line args if you want. This can train a new deep neural network if it does not exist or it can use an existing pre-trained deep neural network within the AntiNex Core to make new predictions. """ parser = argparse.ArgumentParser( description=( "Python client to make Predictions " "using a Pre-trained Deep Neural Network " "with AntiNex Django Rest Framework")) parser.add_argument( "-f", help=( "file to use default ./examples/" "predict-rows-scaler-full-django.json"), required=False, dest="datafile") parser.add_argument( "-m", help="send mock data", required=False, dest="use_fake_rows", action="store_true") parser.add_argument( "-b", help=( "optional - path to CA bundle directory for " "client encryption over HTTP"), required=False, dest="ca_dir") parser.add_argument( "-c", help=( "optional - path to x509 certificate for " "client encryption over HTTP"), required=False, dest="cert_file") parser.add_argument( "-k", help=( "optional - path to x509 key file for " "client encryption over HTTP"), required=False, dest="key_file") parser.add_argument( "-s", help="silent", required=False, dest="silent", action="store_true") parser.add_argument( "-d", help="debug", required=False, dest="debug", action="store_true") args = parser.parse_args() datafile = ev( "DATAFILE", "./examples/predict-rows-scaler-full-django.json") ca_dir = os.getenv( "API_CA_BUNDLE_DIR", None) cert_file = os.getenv( "API_CERT_FILE", None) key_file = os.getenv( "API_KEY_FILE", None) verbose = bool(str(ev( "API_CLIENT_VERBOSE", "1")).lower() == "1") debug = bool(str(ev( "API_CLIENT_DEBUG", "0")).lower() == "1") use_fake_rows = False if args.use_fake_rows: use_fake_rows = True if args.datafile: datafile = args.datafile if args.ca_dir: ca_dir = args.ca_dir if args.cert_file: cert_file = args.cert_file if args.key_file: key_file = args.key_file if args.silent: verbose = False if args.debug: debug = True if verbose: log.info("creating client") client = build_ai_client_from_env( ca_dir=ca_dir, cert_file=cert_file, key_file=key_file, verbose=verbose, debug=debug) if verbose: log.info(("loading request in datafile={}") .format( datafile)) # pass in full or partial prediction record dictionaries # the generate_ai_request will fill in gaps with defaults fake_rows_for_predicting = [ { "tcp_seq": 1 }, { "tcp_seq": 2 }, { "tcp_seq": 3 }, { "tcp_seq": 4 } ] res_gen = None if use_fake_rows: res_gen = generate_ai_request( predict_rows=fake_rows_for_predicting) else: req_with_org_rows = None with open(datafile, "r") as f: req_with_org_rows = json.loads(f.read()) res_gen = generate_ai_request( predict_rows=req_with_org_rows["predict_rows"]) # end of sending mock data from this file or a file on disk if res_gen["status"] != SUCCESS: log.error(("failed generate_ai_request with error={}") .format( res_gen["error"])) sys.exit(1) req_body = res_gen["data"] if verbose: log.info("running job") job_was_started = False response = client.run_job( body=req_body) if response["status"] == SUCCESS: log.info(("job started with response={}") .format( response["data"])) job_was_started = True elif response["status"] == FAILED: log.error(("job failed with error='{}' with response={}") .format( response["error"], response["data"])) elif response["status"] == ERROR: log.error(("job had an error='{}' with response={}") .format( response["error"], response["data"])) elif response["status"] == LOGIN_FAILED: log.error(("job reported user was not able to log in " "with an error='{}' with response={}") .format( response["error"], response["data"])) if not job_was_started: sys.exit(1) if debug: log.info(("parsing response data={}") .format( response["data"])) else: if verbose: log.info("parsing data") res_data = response["data"] job_data = res_data.get( "job", None) result_data = res_data.get( "results", None) if not job_data: log.error(("missing job dictionary in response data={}") .format( response["data"])) sys.exit(1) if not result_data: log.error(("missing results dictionary in response data={}") .format( response["data"])) sys.exit(1) job_id = job_data.get("id", None) job_status = job_data.get("status", None) result_id = result_data.get("id", None) result_status = result_data.get("status", None) log.info(("started job.id={} job.status={} with " "result.id={} result.status={}") .format( job_id, job_status, result_id, result_status)) job_results = client.wait_for_job_to_finish( job_id=job_id) if job_results["status"] != SUCCESS: log.error(("failed waiting for job.id={} to finish error={} data={}") .format( job_id, job_results["error"], job_results["data"])) sys.exit(1) final_job = job_results["data"]["job"] final_result = job_results["data"]["result"] log.info(("job={}") .format( ppj(final_job))) log.info(("result={}") .format( ppj(final_result))) log.info(("job.id={} is done") .format( job_id)) predictions = final_result["predictions_json"].get( "predictions", []) log.info(("loading predictions={} into pandas dataframe") .format( len(predictions))) df = pd.DataFrame(predictions) log.info(("dataframe={}") .format( df))
# end of start_predictions if __name__ == "__main__": start_predictions()