CloudXli 2019-12-05
#!/usr/bin/env python # -*- coding: utf-8 -*- # @Time : 2019-12-02 15:16 # @Author : Anthony # @Email : # @File : check-ossfs.py # 检测ossfs进程是否存在 # 检测/xxxx/file是否挂载成功 # ossfs可用性检测 import psutil import requests import json import subprocess import threading class DingTalk_Base: def __init__(self): self.__headers = {‘Content-Type‘: ‘application/json;charset=utf-8‘} self.url = ‘https://oapi.dingtalk.com/robot/send?access_token=xxxxx‘ def send_msg(self,text): json_text = { "msgtype": "text", "text": { "content": text }, "at": { "atMobiles": [ "" ], "isAtAll": False } } return requests.post(self.url, json.dumps(json_text), headers=self.__headers).content class DingTalk_Disaster(DingTalk_Base): def __init__(self): super().__init__() # 填写机器人的url self.url = "https://oapi.dingtalk.com/robot/send?access_token=xxxxx" def check_oss_pid(): pids = psutil.pids() for pid in pids: p = psutil.Process(pid) if "ossfs" == p.name().strip(): return True else: return ding.send_msg("""ossfs报警通知: 机器:机器名称 消息内容:ossfs进程不存在,请及时处理...""") def get_ossfs_name(): try: args = "ls -lh /alidata/|grep file|awk ‘{print $3}‘" running_shell = subprocess.Popen(args, shell=True, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE,) out,err = running_shell.communicate() for line in out.splitlines(): s1 = bytes.decode(line).strip() if s1.strip() != "root": return ding.send_msg("""ossfs报警通知: 机器:语文课堂 消息内容:/xxxx/file/挂载失败,请及时处理...""") except Exception as e: print(e) if __name__ == "__main__": ding = DingTalk_Disaster() threads = [threading.Thread(target=get_ossfs_name), threading.Thread(target=check_oss_pid)] for t in threads: t.start()