Source code for antinex_client.ai_client

import json
import requests
import logging
import time
from spylunking.log.setup_logging import console_logger
from antinex_client.utils import ev
from antinex_client.consts import LOGIN_SUCCESS
from antinex_client.consts import LOGIN_NOT_ATTEMPTED
from antinex_client.consts import LOGIN_FAILED
from antinex_client.consts import SUCCESS
from antinex_client.consts import FAILED
from antinex_client.consts import ERROR
from antinex_client.consts import NOT_SET


log_level = logging.INFO
log_level_str = ev(
    "AI_CLIENT_LEVEL",
    "info").lower()
if log_level_str == "info":
    log_level = logging.INFO
elif log_level_str == "debug":
    log_level = logging.DEBUG
elif log_level_str == "silent":
    log_level = logging.CRITICAL
elif log_level_str == "critical":
    log_level = logging.CRITICAL
elif log_level_str == "error":
    log_level = logging.ERROR

log = console_logger(
    name='ai_client',
    log_level=log_level)


[docs]class AIClient: """ AntiNex Python AI Client This can use either environment variables or keyword arguments to create a valid client. """ def __init__( self, user=ev( "API_USER", "user-not-set"), password=ev( "API_PASSWORD", "password-not-set"), url=ev( "API_URL", "http://localhost:8010"), email=ev( "API_EMAIL", "email-not-set"), verbose=True, ca_dir=None, cert_file=None, key_file=None, debug=False): """__init__ :param user: username :param email: email address :param password: password for the user :param url: url running the django rest framework :param verbose: turn off setup_logging :param ca_dir: optional path to CA bundle dir :param cert_file: optional path to x509 ssl cert file :param key_file: optional path to x509 ssl private key :param debug: turn on debugging - this will print passwords to stdout """ self.user = user self.email = email self.password = password self.url = url self.verbose = verbose self.ca_dir = ca_dir self.cert_file = cert_file self.key_file = key_file self.cert = None self.use_verify = False if self.ca_dir: self.use_verify = self.ca_dir elif self.cert_file: self.use_verify = self.cert_file if self.cert_file and self.key_file: self.cert = ( self.cert_file, self.key_file) self.debug = debug if self.debug: self.verbose = True self.api_urls = { "login": "{}/api-token-auth/".format(self.url), "job": "{}/ml/".format(self.url), "prepare": "{}/mlprepare/".format(self.url), "results": "{}/mlresults/".format(self.url), "create_user": "{}/users/".format(self.url) } self.token = "not-logged-in-no-token" self.login_status = LOGIN_NOT_ATTEMPTED self.user_id = None self.max_retries = 10 self.login_retry_wait_time = 0.1 # in seconds self.all_prepares = {} self.all_jobs = {} self.all_results = {} # end of __init__
[docs] def login( self): """login""" auth_url = self.api_urls["login"] if self.verbose: log.info(("log in user={} url={} ca_dir={} cert={}") .format( self.user, auth_url, self.ca_dir, self.cert)) use_headers = { "Content-type": "application/json" } login_data = { "username": self.user, "password": self.password } if self.debug: log.info(( "LOGIN with body={} headers={} url={} " "verify={} cert={}").format( login_data, use_headers, auth_url, self.use_verify, self.cert)) response = requests.post( auth_url, verify=self.use_verify, cert=self.cert, data=json.dumps(login_data), headers=use_headers) if self.debug: log.info(("LOGIN response status_code={} text={} reason={}") .format( response.status_code, response.text, response.reason)) user_token = "" if response.status_code == 200: user_token = json.loads(response.text)["token"] if user_token != "": self.token = user_token self.login_status = LOGIN_SUCCESS if self.verbose: log.debug("login success") else: log.error(("failed to login user={} to url={} text={}") .format( self.user, auth_url, response.text)) self.login_status = LOGIN_FAILED # if the user token exists return self.login_status
# end of login
[docs] def is_logged_in( self): """is_logged_in""" return self.login_status == LOGIN_SUCCESS
# end of is_logged_in
[docs] def get_token( self): """get_token""" return self.token
# end of get_token
[docs] def get_auth_header( self): """get_auth_header""" headers = { "Content-type": "application/json", "Authorization": "JWT {}".format(self.get_token()) } return headers
# end of get_auth_header
[docs] def build_response( self, status=NOT_SET, error="", data=None): """build_response :param status: status code :param error: error message :param data: dictionary to send back """ res_node = { "status": status, "error": error, "data": data } return res_node
# end of build_response
[docs] def retry_login( self): """retry_login""" if not self.user or not self.password: return self.build_response( status=ERROR, error="please set the user and password") retry = 0 not_done = True while not_done: if self.is_logged_in(): return self.build_response( status=SUCCESS) else: if self.verbose: log.debug(("login attempt={} max={}") .format( retry, self.max_retries)) if self.login() == LOGIN_SUCCESS: return self.build_response( status=SUCCESS) else: time.sleep( self.login_retry_wait_time) # if able to login or not retry += 1 if retry > self.max_retries: return self.build_response( status=ERROR, error="failed logging in user={} retries={}".format( self.user, self.max_retries)) # if login worked or not return self.build_response( status=FAILED, error="user={} not able to login attempts={}".format( self.user, retry))
# end of retry_login
[docs] def get_prepare_by_id( self, prepare_id=None): """get_prepare_by_id :param prepare_id: MLJob.id in the database """ if not prepare_id: log.error("missing prepare_id for get_prepare_by_id") return self.build_response( status=ERROR, error="missing prepare_id for get_prepare_by_id") if self.debug: log.info(("user={} getting prepare={}") .format( self.user, prepare_id)) url = "{}{}".format( self.api_urls["prepare"], prepare_id) not_done = True while not_done: if self.debug: log.info(( "JOB attempting to get={} to url={} " "verify={} cert={}").format( prepare_id, url, self.use_verify, self.cert)) response = requests.get( url, verify=self.use_verify, cert=self.cert, headers=self.get_auth_header()) if self.debug: log.info(("JOB response status_code={} text={} reason={}") .format( response.status_code, response.text, response.reason)) if response.status_code == 401: login_res = self.retry_login() if login_res["status"] != SUCCESS: if self.verbose: log.error( "retry login attempts failed") return self.build_response( status=login_res["status"], error=login_res["error"]) # if able to log back in just retry the call elif response.status_code == 200: if self.verbose: log.debug("deserializing") prepare_data = json.loads( response.text) prepare_id = prepare_data.get( "id", None) if not prepare_id: return self.build_response( status=ERROR, error="missing prepare.id", data="text={} reason={}".format( response.reason, response.text)) self.all_prepares[str(prepare_id)] = prepare_data if self.debug: log.info(("added prepare={} all_prepares={}") .format( prepare_id, len(self.all_prepares))) return self.build_response( status=SUCCESS, error="", data=prepare_data) else: err_msg = ("failed with " "status_code={} text={} reason={}").format( response.status_code, response.text, response.reason) if self.verbose: log.error(err_msg) return self.build_response( status=ERROR, error=err_msg)
# if able to log back in just retry the call # end of handling response status codes # end of while not_done # end of get_prepare_by_id
[docs] def get_job_by_id( self, job_id=None): """get_job_by_id :param job_id: MLJob.id in the database """ if not job_id: log.error("missing job_id for get_job_by_id") return self.build_response( status=ERROR, error="missing job_id for get_job_by_id") if self.debug: log.info(("user={} getting job={}") .format( self.user, job_id)) url = "{}{}".format( self.api_urls["job"], job_id) not_done = True while not_done: if self.debug: log.info(( "JOB attempting to get={} to url={} " "verify={} cert={}").format( job_id, url, self.use_verify, self.cert)) response = requests.get( url, verify=self.use_verify, cert=self.cert, headers=self.get_auth_header()) if self.debug: log.info(("JOB response status_code={} text={} reason={}") .format( response.status_code, response.text, response.reason)) if response.status_code == 401: login_res = self.retry_login() if login_res["status"] != SUCCESS: if self.verbose: log.error( "retry login attempts failed") return self.build_response( status=login_res["status"], error=login_res["error"]) # if able to log back in just retry the call elif response.status_code == 200: if self.verbose: log.debug("deserializing") job_data = json.loads( response.text) job_id = job_data.get( "id", None) if not job_id: return self.build_response( status=ERROR, error="missing job.id", data="text={} reason={}".format( response.reason, response.text)) self.all_jobs[str(job_id)] = job_data if self.debug: log.info(("added job={} all_jobs={}") .format( job_id, len(self.all_jobs))) return self.build_response( status=SUCCESS, error="", data=job_data) else: err_msg = ("failed with " "status_code={} text={} reason={}").format( response.status_code, response.text, response.reason) if self.verbose: log.error(err_msg) return self.build_response( status=ERROR, error=err_msg)
# if able to log back in just retry the call # end of handling response status codes # end of while not_done # end of get_job_by_id
[docs] def get_result_by_id( self, result_id=None): """get_result_by_id :param result_id: MLJobResult.id in the database """ if not result_id: log.error("missing result_id for get_result_by_id") return self.build_response( status=ERROR, error="missing result_id for get_result_by_id") if self.debug: log.info(("user={} getting result={}") .format( self.user, result_id)) url = "{}{}".format( self.api_urls["results"], result_id) not_done = True while not_done: if self.debug: log.info(( "RESULT attempting to get={} to url={} " "verify={} cert={}").format( result_id, url, self.use_verify, self.cert)) response = requests.get( url, verify=self.use_verify, cert=self.cert, headers=self.get_auth_header()) if self.debug: log.info(("RESULT response status_code={} text={} reason={}") .format( response.status_code, response.text, response.reason)) if response.status_code == 401: login_res = self.retry_login() if login_res["status"] != SUCCESS: if self.verbose: log.error( "retry login attempts failed") return self.build_response( status=login_res["status"], error=login_res["error"]) # if able to log back in just retry the call elif response.status_code == 200: if self.verbose: log.debug("deserializing") result_data = json.loads( response.text) result_id = result_data.get( "id", None) if not result_id: return self.build_response( status=ERROR, error="missing result.id", data="text={} reason={}".format( response.reason, response.text)) self.all_results[str(result_id)] = result_data if self.debug: log.info(("added result={} all_results={} ") .format( result_id, len(self.all_results))) return self.build_response( status=SUCCESS, error="", data=result_data) else: err_msg = ("failed with " "status_code={} text={} reason={}").format( response.status_code, response.text, response.reason) if self.verbose: log.error(err_msg) return self.build_response( status=ERROR, error=err_msg)
# if able to log back in just retry the call # end of handling response status codes # end of while not_done # end of get_result_by_id
[docs] def run_job( self, body): """run_job :param body: dictionary to launch job """ if self.verbose: log.info(("user={} starting job={}") .format( self.user, str(body)[0:32])) url = "{}".format( self.api_urls["job"]) not_done = True while not_done: if self.debug: log.info(( "JOB attempting to post={} to url={} " "verify={} cert={}").format( json.dumps(body), url, self.use_verify, self.cert)) response = requests.post( url, verify=self.use_verify, cert=self.cert, data=json.dumps(body), headers=self.get_auth_header()) if self.debug: log.info(("JOB response status_code={} text={} reason={}") .format( response.status_code, response.text, response.reason)) if response.status_code == 401: login_res = self.retry_login() if login_res["status"] != SUCCESS: if self.verbose: log.error( "retry login attempts failed") return self.build_response( status=login_res["status"], error=login_res["error"]) # if able to log back in just retry the call elif response.status_code == 201: if self.verbose: log.debug("deserializing") res_dict = json.loads( response.text) job_data = res_dict.get( "job", None) result_data = res_dict.get( "results", None) if not job_data: return self.build_response( status=ERROR, error="job failed", data="text={} reason={}".format( response.reason, response.text)) job_id = job_data.get( "id", None) result_id = result_data.get( "id", None) if not job_id: return self.build_response( status=ERROR, error="missing job.id", data="text={} reason={}".format( response.reason, response.text)) if not result_id: return self.build_response( status=ERROR, error="missing result.id", data="text={} reason={}".format( response.reason, response.text)) self.all_jobs[str(job_id)] = job_data self.all_results[str(result_id)] = result_data if self.verbose: log.info(("added job={} result={} " "all_jobs={} all_results={}") .format( job_id, result_id, len(self.all_jobs), len(self.all_results))) return self.build_response( status=SUCCESS, error="", data=res_dict) else: err_msg = ("failed with " "status_code={} text={} reason={}").format( response.status_code, response.text, response.reason) if self.verbose: log.error(err_msg) return self.build_response( status=ERROR, error=err_msg)
# if able to log back in just retry the call # end of handling response status codes # end of while not_done # end of run_job
[docs] def wait_for_job_to_finish( self, job_id, sec_to_sleep=5.0, max_retries=100000): """wait_for_job_to_finish :param job_id: MLJob.id to wait on :param sec_to_sleep: seconds to sleep during polling :param max_retries: max retires until stopping """ not_done = True retry_attempt = 1 while not_done: if self.debug: log.info(("JOBSTATUS getting job.id={} details") .format( job_id)) response = self.get_job_by_id(job_id) if self.debug: log.info(("JOBSTATUS got job.id={} response={}") .format( job_id, response)) if response["status"] != SUCCESS: log.error(("JOBSTATUS failed to get job.id={} with error={}") .format( job_id, response["error"])) return self.build_response( status=ERROR, error=response["error"], data=response["data"]) # stop if this failed getting the job details job_data = response.get( "data", None) if not job_data: return self.build_response( status=ERROR, error="failed to find job dictionary in response", data=response["data"]) job_status = job_data["status"] if job_status == "finished" \ or job_status == "completed" \ or job_status == "launched": if self.debug: log.info(("job.id={} is done with status={}") .format( job_id, job_status)) result_id = job_data["predict_manifest"]["result_id"] if self.debug: log.info(("JOBRESULT getting result.id={} details") .format( result_id)) response = self.get_result_by_id(result_id) if self.debug: log.info(("JOBRESULT got result.id={} response={}") .format( result_id, response)) if response["status"] != SUCCESS: log.error(("JOBRESULT failed to get " "result.id={} with error={}") .format( result_id, response["error"])) return self.build_response( status=ERROR, error=response["error"], data=response["data"]) # stop if this failed getting the result details result_data = response.get( "data", None) if result_data["status"] == "finished": full_response = { "job": job_data, "result": result_data } not_done = False return self.build_response( status=SUCCESS, error="", data=full_response) else: if retry_attempt % 100 == 0: if self.verbose: log.info(("result_id={} are not done retry={}") .format( result_id, retry_attempt)) retry_attempt += 1 if retry_attempt > max_retries: err_msg = ("failed waiting " "for job.id={} result.id={} " "to finish").format( job_id, result_id) log.error(err_msg) return self.build_response( status=ERROR, error=err_msg) else: time.sleep(sec_to_sleep) # wait while results are written to the db else: retry_attempt += 1 if retry_attempt > max_retries: err_msg = ("failed waiting " "for job.id={} to finish").format( job_id) log.error(err_msg) return self.build_response( status=ERROR, error=err_msg) else: if self.verbose: if retry_attempt % 100 == 0: log.info(("waiting on job.id={} retry={}") .format( job_id, retry_attempt)) # if logging just to show this is running time.sleep(sec_to_sleep)
# end of while waiting for the job to finish # end of wait_for_job_to_finish
[docs] def run_prepare( self, body): """run_prepare :param body: dictionary to launch prepare """ if self.verbose: log.info(("user={} starting prepare={}") .format( self.user, str(body)[0:32])) url = "{}".format( self.api_urls["prepare"]) not_done = True while not_done: if self.debug: log.info(( "JOB attempting to post={} to url={} " "verify={} cert={}").format( json.dumps(body), url, self.use_verify, self.cert)) response = requests.post( url, verify=self.use_verify, cert=self.cert, data=json.dumps(body), headers=self.get_auth_header()) if self.debug: log.info(("JOB response status_code={} text={} reason={}") .format( response.status_code, response.text, response.reason)) if response.status_code == 401: login_res = self.retry_login() if login_res["status"] != SUCCESS: if self.verbose: log.error( "retry login attempts failed") return self.build_response( status=login_res["status"], error=login_res["error"]) # if able to log back in just retry the call elif response.status_code == 201: if self.verbose: log.info(("deserializing={}") .format( response.text)) prepare_data = json.loads( response.text) if not prepare_data: return self.build_response( status=ERROR, error="prepare failed", data="text={} reason={}".format( response.reason, response.text)) prepare_id = prepare_data.get( "id", None) if not prepare_id: return self.build_response( status=ERROR, error="missing prepare.id", data="text={} reason={}".format( response.reason, response.text)) self.all_prepares[str(prepare_id)] = prepare_data if self.verbose: log.info(("added prepare={} all_prepares={}") .format( prepare_id, len(self.all_prepares))) return self.build_response( status=SUCCESS, error="", data=prepare_data) else: err_msg = ("failed with " "status_code={} text={} reason={}").format( response.status_code, response.text, response.reason) if self.verbose: log.error(err_msg) return self.build_response( status=ERROR, error=err_msg)
# if able to log back in just retry the call # end of handling response status codes # end of while not_done # end of run_prepare
[docs] def wait_for_prepare_to_finish( self, prepare_id, sec_to_sleep=5.0, max_retries=100000): """wait_for_prepare_to_finish :param prepare_id: MLPrepare.id to wait on :param sec_to_sleep: seconds to sleep during polling :param max_retries: max retires until stopping """ not_done = True retry_attempt = 1 while not_done: if self.debug: log.info(("PREPSTATUS getting prepare.id={} details") .format( prepare_id)) response = self.get_prepare_by_id(prepare_id) if self.debug: log.info(("PREPSTATUS got prepare.id={} response={}") .format( prepare_id, response)) if response["status"] != SUCCESS: log.error(("PREPSTATUS failed to get prepare.id={} " "with error={}") .format( prepare_id, response["error"])) return self.build_response( status=ERROR, error=response["error"], data=response["data"]) # stop if this failed getting the prepare details prepare_data = response.get( "data", None) if not prepare_data: return self.build_response( status=ERROR, error="failed to find prepare dictionary in response", data=response["data"]) prepare_status = prepare_data["status"] if prepare_status == "finished" \ or prepare_status == "completed": not_done = False return self.build_response( status=SUCCESS, error="", data=prepare_data) else: retry_attempt += 1 if retry_attempt > max_retries: err_msg = ("failed waiting " "for prepare.id={} to finish").format( prepare_id) log.error(err_msg) return self.build_response( status=ERROR, error=err_msg) else: if self.verbose: if retry_attempt % 100 == 0: log.info(("waiting on prepare.id={} retry={}") .format( prepare_id, retry_attempt)) # if logging just to show this is running time.sleep(sec_to_sleep)
# end of while waiting for the prepare to finish # end of wait_for_prepare_to_finish # end of AIClient