123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156 |
- # -*- encoding=utf8 -*-
- import re
- import time
- import json
- import logging
- import pymysql
- import configparser
- import pandas as pd
- from DBUtils.PooledDB import PooledDB
- #配置输出日志格式
- LOG_FORMAT = '%(asctime)s %(filename)s[line:%(lineno)d] %(name)s %(levelname)s %(pathname)s %(message)s'
- #配置输出时间格式
- '''
- DATE_FORMAT = '%Y-%m-%d %H-%M-%S %a'
- logging.basicConfig(level = logging.INFO,
- format = LOG_FORMAT,
- datefmt = DATE_FORMAT,
- filename = r"./run.log")
- '''
- cf = configparser.RawConfigParser()
- #cf = configparser.ConfigParser()
- cf.read(r"./channel_ids.ini") # 读取配置文件,如果写文件的绝对路径,就可以不用os模块
- ys_id = cf.get("channel_ids","ys_ids") #亚杉id
- mf_id = cf.get("channel_ids","mf_ids") #满帆id
- zr_id = cf.get("channel_ids","zr_ids") #兆蓉id
- zx_id = cf.get("channel_ids","zx_ids") #智信id
- yxj_id = cf.get("channel_ids","yxj_ids") #易迅捷id
- fy_id = cf.get("channel_ids","fy_ids") #枫叶id
- YS_ID = json.loads(ys_id)
- MF_ID = json.loads(mf_id)
- ZR_ID = json.loads(zr_id)
- ZX_ID = json.loads(zx_id)
- YXJ_ID = json.loads(yxj_id)
- FY_ID = json.loads(fy_id)
- #通道查询db
- CHANNEL_DB = {
- 'host' : '47.95.217.180',
- 'port' : 3306,
- 'user' : 'rtech',
- 'password' : 'S5RE4dQ65MphYk7F',
- 'db' : 'fmp',
- 'charset' : 'utf8'
- #'autocommit' : 1
- }
- #监控消息数据库连接信息
- MONITOR_DB = {
- 'host' : '127.0.0.1',
- 'port' : 9001,
- 'user' : 'root',
- 'password' : 'nibuzhidaowozhidao',
- 'db' : 'monitoring',
- 'charset' : 'utf8',
- 'autocommit' : 1
- }
- #创建数据库连接池
- def createPool(db_config):
- spool = PooledDB(pymysql, 5, **db_config)
- return spool
- def get_channel_data():
- #conn = createPool(CHANNEL_DB).connection()
-
- sql = """SELECT trans_id,mobile_no,(UNIX_TIMESTAMP(now()) - UNIX_TIMESTAMP(create_date))/60 use_time
- FROM
- mobile_flow_dispatch_rec
- WHERE
- UNIX_TIMESTAMP(now())-UNIX_TIMESTAMP(create_date) >= 600 AND send_status = 1
- ORDER BY use_time DESC ;"""
- df = pd.read_sql(sql,channel_conn)
- df['use_time'] = df['use_time'].astype(int)
- #print(df)
- #conn.close()
- return df
- def save_monitor_data(df,channel_ids,group_name,group_status):
- timestamp = time.time()
- str_time = time.strftime('%H:%M',time.localtime(timestamp))
- #str_time_int = int(str(str_time).split(':')[-1])
- #print(str(str_time).split(':')[-1])
- ins_sql = " INSERT INTO mon_channel_overtime_order VALUES(0,%s,%s,%s,1,%s) "
- que_sql = " SELECT id FROM mon_channel_overtime_order WHERE group_name = '{}' "
- upd_sql = " UPDATE mon_channel_overtime_order SET msg = %s,status = 1,timestamp = %s WHERE id = %s"
- msg = str_time + ": 超过10分钟未返回状态的订单总数量为:{} ,手机号为:{};卡单最长时间为:{} 分钟。 "
- #print(df)
- channel_df = df[df['trans_id'].isin(channel_ids)]
- #print(channel_df)
- if channel_df.empty is False:
- total_num = channel_df.shape[0]
-
- temp_msg = ''
- if total_num == 1:
- temp_msg = str_time + """: 超过10分钟未返回状态的订单总数量为:{} ,手机号为:{},卡单时间为:{} 分钟
- """.format(total_num,channel_df['mobile_no'].iloc[0],channel_df['use_time'].iloc[0])
- elif 1 < total_num < 10 :
- temp_msg = msg.format(total_num,channel_df['mobile_no'].values.tolist(),channel_df['use_time'].iloc[0])
-
- else:
- phone_list = channel_df['mobile_no'].head(10).tolist()
- temp_msg = msg.format(total_num,phone_list,channel_df['use_time'].iloc[0])
-
- #print(temp_msg)
- que_df = pd.read_sql(que_sql.format(group_name),monitor_conn)
-
- cursor = monitor_conn.cursor()
- if que_df.empty is True:
- cursor.execute(ins_sql,(temp_msg,group_status,group_name,int(timestamp)))
- else:
- cursor.execute(upd_sql,(temp_msg,int(timestamp),int(que_df['id'].iloc[0])))
- cursor.close()
- if __name__ == '__main__':
- channel_conn = createPool(CHANNEL_DB).connection()
- monitor_conn = createPool(MONITOR_DB).connection()
- ys_name = '亚杉话费下游对接'
- mf_name = '满帆'
- zr_name = '兆蓉'
- zx_name = '智信'
- yxj_name = '易迅捷'
- fy_name = '枫叶'
- df = get_channel_data()
- save_monitor_data(df,YS_ID,ys_name,2)
- save_monitor_data(df,MF_ID,mf_name,1)
- save_monitor_data(df,ZR_ID,zr_name,1)
- save_monitor_data(df,ZX_ID,zx_name,1)
- save_monitor_data(df,YXJ_ID,yxj_name,1)
- save_monitor_data(df,FY_ID,fy_name,1)
- channel_conn.close()
- monitor_conn.close()
|