Dockerfile、CronJob、Deployment
FROM python:3.7.9COPY requirements.txt xjob_alert.py /RUN apt-get update && apt-get -y install cron vimRUN pip install --no-cache-dir -r requirements.txtRUN chmod 0644 xjob_alert.pyRUN touch /var/log/xfep.log /var/log/xpair.log /var/log/status2.log /var/log/status109.logRUN crontab -l | { cat ; echo "*/3 * * * * /usr/local/bin/python /xjob_alert.py xpair >>/var/log/xpair.log 2>&1"; } |crontab -RUN crontab -l | { cat ; echo "*/10 * * * * /usr/local/bin/python /xjob_alert.py xfep >>/var/log/xfep.log 2>&1"; } |crontab -RUN crontab -l | { cat ; echo "*/5 * * * * /usr/local/bin/python /xjob_alert.py status2 >>/var/log/status2.log 2>&1"; } |crontab -RUN crontab -l | { cat ; echo "0 */6 * * * /usr/local/bin/python /xjob_alert.py status109 >>/var/log/status109.log 2>&1"; } |crontab -CMD cron && tail -f /var/log/xpair.log
---apiVersion: batch/v1beta1kind: CronJobmetadata:name: xpair-alertspec:schedule: "*/5 * * * *"successfulJobsHistoryLimit: 3failedJobsHistoryLimit: 3jobTemplate:spec:template:spec:containers:- name: xjob-alertimage: 634192295060.dkr.ecr.us-east-1.amazonaws.com/development/xjob_alert:0.0.2imagePullPolicy: IfNotPresentcommand:- /usr/bin/python- xjob_alert.py- xpairrestartPolicy: OnFailure---apiVersion: batch/v1beta1kind: CronJobmetadata:name: xfep-alertspec:schedule: "*/30 * * * *"successfulJobsHistoryLimit: 3failedJobsHistoryLimit: 3jobTemplate:spec:template:spec:containers:- name: xjob-alertimage: 634192295060.dkr.ecr.us-east-1.amazonaws.com/development/xjob_alert:0.0.2imagePullPolicy: IfNotPresentcommand:- /usr/bin/python- xjob_alert.py- xfeprestartPolicy: OnFailure
apiVersion: apps/v1kind: Deploymentmetadata:labels:app: xjob-alertname: xjob-alertnamespace: productionspec:replicas: 1revisionHistoryLimit: 10selector:matchLabels:app: xjob-alerttemplate:metadata:labels:app: xjob-alertspec:containers:- name: xjob-alertimage: 634192295060.dkr.ecr.us-east-1.amazonaws.com/development/xjob_alert:0.0.3imagePullPolicy: AlwaysrestartPolicy: AlwaysimagePullSecrets:- name: regcred
#!/usr/bin/python# -*- coding: UTF-8 -*-import psycopg2import requests as requestsfrom psycopg2 import sqlimport json, os, time, sysclass NewDrugAlert:def __init__(self, env):if env == 'pro':self.parms = {"database": "anakin","user": "anakin","password": "81b88ebb893","host": "luke-pro.ct9zn8ktuvkc.us-east-1.rds.amazonaws.com","port": "5432",}if env == 'dev' or env == '':self.parms = {"database": "anakin","user": "postgres","password": "xtalpi123","host": "luke-postgres.development.svc","port": "5432",}def job(self,sql,job_type):conn = psycopg2.connect(**self.parms)cur = conn.cursor()cur.execute(sql)rows = cur.fetchall()conn.close()print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()),":",job_type,"job select results:",rows)return rowsdef alert(self,msg,job_type,bussiness):#test:fs_url = 'https://open.feishu.cn/open-apis/bot/v2/hook/e9a0edf5-46f2-44c4-8b67-b2d12a8579e2'#guyan:fs_url = 'https://open.feishu.cn/open-apis/bot/v2/hook/4663ac9b-bb95-4fbc-9124-c83e347a5f89'#xinyao:fs_url = 'https://open.feishu.cn/open-apis/bot/v2/hook/8d738505-682a-4fc7-8cb0-21c5ea1e2d2a'if bussiness == "guyan":fs_url = 'https://open.feishu.cn/open-apis/bot/v2/hook/4663ac9b-bb95-4fbc-9124-c83e347a5f89'elif bussiness == "xinyao":fs_url = 'https://open.feishu.cn/open-apis/bot/v2/hook/8d738505-682a-4fc7-8cb0-21c5ea1e2d2a'headers = {'Content-Type': 'application/json;charset=utf-8'}text = {"msg_type": "text", "content": {"msg":"","text":msg}}res = requests.post(fs_url, json.dumps(text), headers=headers).contentprint(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()),":",job_type,"job alert results:",res)return resdef xpair(self):job_type = "xpair"sql = "select handle,\"user\",status,cost_dimension,\date_part('minute',now()::timestamp-to_char(to_timestamp(create_at),'yyyy-MM-dd HH24:MI:SS')::timestamp) as time_diff \from job where status in (1,2,3) \and image LIKE '%tasksys_fep_map_py%' \and \"group\"::text LIKE '%xpair%' \and date_part('minute',now()::timestamp-to_char(to_timestamp(create_at),'yyyy-MM-dd HH24:MI:SS')::timestamp) >= 5 \order by create_at desc;"job_res=NewDrugAlert.job(self,sql,job_type)alert_msg = ''over_5 = "xpair任务1、2、3状态超过5分钟:"over_10 = "xpair任务1、2、3状态超过10分钟:"over_20 = "xpair任务1、2、3状态超过20分钟:"if job_res:for m in job_res:if m[-1] >= 5 and m[-1] < 10:over_5 = over_5 + "\n" + str(m)elif m[-1] >= 10 and m[-1] < 20:over_10 = over_10 + "\n" + str(m)else:over_20 = over_20 + "\n" + str(m)alert_msg = alert_msg + over_20 + "\n" + over_10 + "\n" + over_5xpair_alert = NewDrugAlert.alert(self,alert_msg,job_type,bussiness="xinyao")print(alert_msg)def xfep(self):job_type = "xfep"sql = "select handle,\"user\",status,cost_dimension,\date_part('minute',now()::timestamp-to_char(to_timestamp(create_at),'yyyy-MM-dd HH24:MI:SS')::timestamp) as time_diff \from job where cluster_id in ('gcloud','DefaultGPU') \and status in (1,2,3) \and image like '%tasksys_fep_py3%' \and date_part('minute',now()::timestamp-to_char(to_timestamp(create_at),'yyyy-MM-dd HH24:MI:SS')::timestamp) > 30 \order by time_diff desc;"job_res=NewDrugAlert.job(self,sql,job_type)alert_msg = 'xfep任务1、2、3状态持续超过30分钟:'if job_res:for m in job_res:alert_msg = alert_msg + "\n" + str(m)xfep_alert = NewDrugAlert.alert(self,alert_msg,job_type,bussiness="xinyao")print(alert_msg)def status2(self):job_type = "status 2"sql = "select handle,\"user\",status,cluster_id,cost_dimension,job_times from job \where status in (2) \and image like '%tasksys_fep_py3%' \and cluster_id in ('gcloud','DefaultGPU') \order by status desc, job_times asc"job_res=NewDrugAlert.job(self,sql,job_type)now_time = time.time()alert_msg = 'gcloud、DefaultGPU集群2状态持续超过30分钟(请确认队列是否阻塞):'if job_res:for m in job_res:time_diff = int((now_time - m[-1][-1][-1]) / 60)if time_diff > 30:alert_msg = alert_msg + "\n" + str(m[0]) + "," + str(m[1]) + "," + str(m[3]) + "," + str(m[4]) + "," + str(time_diff) + "min"if alert_msg.count("xtalpi") > 0:status2_alert = NewDrugAlert.alert(self,alert_msg,job_type,bussiness="xinyao")print(alert_msg)def status109(self):job_type = "status 109"sql = "select handle,\"user\",status,cluster_id,cost_dimension, \to_char(to_timestamp(create_at),'yyyy-MM-dd HH24:MI:SS') as create_at,job_times from job \where status in (4,109) \and job_times::text like '%109%' \and date_part('day',now()::timestamp-to_char(to_timestamp(create_at),'yyyy-MM-dd HH24:MI:SS')::timestamp) < 30\order by create_at desc"job_res=NewDrugAlert.job(self,sql,job_type)alert_msg = "近一个月有被驱逐过记录的且目前未终止状态的handle:"if job_res:for row in job_res:for m in row[-1]:if m[0] == 109:alert_msg = alert_msg + "\n" + str(row[0])\+ "," + str(row[1]) + "," + str(row[2]) + "," + str(row[5])breakstatus109_alert = NewDrugAlert.alert(self,alert_msg,job_type,bussiness="guyan")print(alert_msg)if __name__ == '__main__':LUKE_ENV = 'pro'alert = NewDrugAlert(LUKE_ENV)if sys.argv[1] and sys.argv[1] == "xfep":alert.xfep()elif sys.argv[1] and sys.argv[1] == "xpair":alert.xpair()elif sys.argv[1] and sys.argv[1] == "status2":alert.status2()elif sys.argv[1] and sys.argv[1] == "status109":alert.status109()
