监控kafka队列长度

jiaomrswang 2020-09-23

#!/usr/local/python37/bin/python #获取kafka命令中lags的值,来判定现在队列中有多少未消费,如果超过5000,则可能有延迟 import os import re import datetime import time import pandas as pd import numpy as np import subprocess #now=datetime.datetime.now().strftime("%Y-%m-%dT%H:%M") now=time.time() lagsInfos=os.popen("sh /opt/elk/kafka_node1/bin/kafka-consumer-groups.sh --describe --bootstrap-server 192.168.10.100:9092 --group logstash | awk ‘{if($5>20){print $1,$5}}‘").read() #定义dataframe的index及value的列表 columnList=[] #lagList=[] #print(lagsInfos.splitlines()) for i in range(1,len(lagsInfos.splitlines())): lagList=[] lagInfo=lagsInfos.splitlines()[i].split() lagList.append(lagInfo[0]) lagList.append(int(lagInfo[1])) columnList.append(lagList) df=pd.DataFrame(columnList,columns=["topics","LAG"]) dfResult=df.groupby("topics",as_index=False).sum() h1=dfResult.loc[dfResult["LAG"]>100,["topics","LAG"]] #print(h1) if len(h1)==0: print("OK") else: #将要发送的短信内容中的空格和换行符替换成url里面的格式,否则在发送短信时会报错 msg=str(h1).replace(" ","%20").replace("\n","%0a") url="‘http://sms.domain.com/Smsweb/sms?pid=smsPid&pwd=Mjdfadklfae&phone=1111111111&msg="+msg+"‘" print(url) result=subprocess.getoutput("curl " + url) print(result)

相关推荐