# -*- encoding=utf8 -*- import re import time import json import logging import pymysql import configparser import pandas as pd from DBUtils.PooledDB import PooledDB #配置输出日志格式 LOG_FORMAT = '%(asctime)s %(filename)s[line:%(lineno)d] %(name)s %(levelname)s %(pathname)s %(message)s' #配置输出时间格式 ''' DATE_FORMAT = '%Y-%m-%d %H-%M-%S %a' logging.basicConfig(level = logging.INFO, format = LOG_FORMAT, datefmt = DATE_FORMAT, filename = r"./run.log") ''' cf = configparser.RawConfigParser() #cf = configparser.ConfigParser() cf.read(r"./channel_ids.ini") # 读取配置文件,如果写文件的绝对路径,就可以不用os模块 ys_id = cf.get("channel_ids","ys_ids") #亚杉id mf_id = cf.get("channel_ids","mf_ids") #满帆id zr_id = cf.get("channel_ids","zr_ids") #兆蓉id zx_id = cf.get("channel_ids","zx_ids") #智信id yxj_id = cf.get("channel_ids","yxj_ids") #易迅捷id fy_id = cf.get("channel_ids","fy_ids") #枫叶id YS_ID = json.loads(ys_id) MF_ID = json.loads(mf_id) ZR_ID = json.loads(zr_id) ZX_ID = json.loads(zx_id) YXJ_ID = json.loads(yxj_id) FY_ID = json.loads(fy_id) #通道查询db CHANNEL_DB = { 'host' : '47.95.217.180', 'port' : 3306, 'user' : 'rtech', 'password' : 'S5RE4dQ65MphYk7F', 'db' : 'fmp', 'charset' : 'utf8' #'autocommit' : 1 } #监控消息数据库连接信息 MONITOR_DB = { 'host' : '127.0.0.1', 'port' : 9001, 'user' : 'root', 'password' : 'nibuzhidaowozhidao', 'db' : 'monitoring', 'charset' : 'utf8', 'autocommit' : 1 } #创建数据库连接池 def createPool(db_config): spool = PooledDB(pymysql, 5, **db_config) return spool def get_channel_data(): #conn = createPool(CHANNEL_DB).connection() sql = """SELECT trans_id,mobile_no,(UNIX_TIMESTAMP(now()) - UNIX_TIMESTAMP(create_date))/60 use_time FROM mobile_flow_dispatch_rec WHERE UNIX_TIMESTAMP(now())-UNIX_TIMESTAMP(create_date) >= 600 AND send_status = 1 ORDER BY use_time DESC ;""" df = pd.read_sql(sql,channel_conn) df['use_time'] = df['use_time'].astype(int) #print(df) #conn.close() return df def save_monitor_data(df,channel_ids,group_name,group_status): timestamp = time.time() str_time = time.strftime('%H:%M',time.localtime(timestamp)) #str_time_int = int(str(str_time).split(':')[-1]) #print(str(str_time).split(':')[-1]) ins_sql = " INSERT INTO mon_channel_overtime_order VALUES(0,%s,%s,%s,1,%s) " que_sql = " SELECT id FROM mon_channel_overtime_order WHERE group_name = '{}' " upd_sql = " UPDATE mon_channel_overtime_order SET msg = %s,status = 1,timestamp = %s WHERE id = %s" msg = str_time + ": 超过10分钟未返回状态的订单总数量为:{} ,手机号为:{};卡单最长时间为:{} 分钟。 " #print(df) channel_df = df[df['trans_id'].isin(channel_ids)] #print(channel_df) if channel_df.empty is False: total_num = channel_df.shape[0] temp_msg = '' if total_num == 1: temp_msg = str_time + """: 超过10分钟未返回状态的订单总数量为:{} ,手机号为:{},卡单时间为:{} 分钟 """.format(total_num,channel_df['mobile_no'].iloc[0],channel_df['use_time'].iloc[0]) elif 1 < total_num < 10 : temp_msg = msg.format(total_num,channel_df['mobile_no'].values.tolist(),channel_df['use_time'].iloc[0]) else: phone_list = channel_df['mobile_no'].head(10).tolist() temp_msg = msg.format(total_num,phone_list,channel_df['use_time'].iloc[0]) #print(temp_msg) que_df = pd.read_sql(que_sql.format(group_name),monitor_conn) cursor = monitor_conn.cursor() if que_df.empty is True: cursor.execute(ins_sql,(temp_msg,group_status,group_name,int(timestamp))) else: cursor.execute(upd_sql,(temp_msg,int(timestamp),int(que_df['id'].iloc[0]))) cursor.close() if __name__ == '__main__': channel_conn = createPool(CHANNEL_DB).connection() monitor_conn = createPool(MONITOR_DB).connection() ys_name = '亚杉话费下游对接' mf_name = '满帆' zr_name = '兆蓉' zx_name = '智信' yxj_name = '易迅捷' fy_name = '枫叶' df = get_channel_data() save_monitor_data(df,YS_ID,ys_name,2) save_monitor_data(df,MF_ID,mf_name,1) save_monitor_data(df,ZR_ID,zr_name,1) save_monitor_data(df,ZX_ID,zx_name,1) save_monitor_data(df,YXJ_ID,yxj_name,1) save_monitor_data(df,FY_ID,fy_name,1) channel_conn.close() monitor_conn.close()