# -*- encoding=utf8 -*- import re import time import json import logging import pandas as pd import pymysql from DBUtils.PooledDB import PooledDB #配置输出日志格式 LOG_FORMAT = '%(asctime)s %(filename)s[line:%(lineno)d] %(name)s %(levelname)s %(pathname)s %(message)s' #配置输出时间格式 DATE_FORMAT = '%Y-%m-%d %H-%M-%S %a' logging.basicConfig(level = logging.INFO, format = LOG_FORMAT, datefmt = DATE_FORMAT, filename = r"./balance_monitoring.log") comment_re = re.compile( '(^)?[^\S\n]*/(?:\*(.*?)\*/[^\S\n]*|/[^\n]*)($)?', re.DOTALL | re.MULTILINE ) #数据库连接 def conMysql(mysql_host,mysql_port,mysql_user,mysql_password,mysql_db): db = pymysql.connect(host=mysql_host,port=mysql_port,user=mysql_user, password=mysql_password,db=mysql_db,charset = 'utf8', autocommit = 1) print("数据库连接成功") return db #创建数据库连接池 def createPool(db_config): spool = PooledDB(pymysql, 5, **db_config) return spool #查询余额 def getData(sql,db): df = pd.read_sql(sql,con=db) db.close #print(df) return df #解析带注释的json def parse_json(filename): with open(filename,encoding='utf8') as f: content = ''.join(f.readlines()) ## Looking for comments match = comment_re.search(content) while match: # single line comment content = content[:match.start()] + content[match.end():] match = comment_re.search(content) #print(content) # Return json file return json.loads(content) def saveData(): data_json = parse_json(r'./config.json') #print(json_data) ''' ys_df = chanel_df[(chanel_df['supplier_name'] == '亚杉') & (chanel_df['balance'] < 0)] mf_df = chanel_df[(chanel_df['supplier_name'] == '满帆起航') & (chanel_df['balance'] <= -45000)] zr_df = chanel_df[(chanel_df['supplier_name'] == '兆蓉') & (chanel_df['balance'] <= 0)] zrwt_df = chanel_df[(chanel_df['supplier_name'] == '兆蓉WT') & (chanel_df['balance'] <= 5000)] yxj_df = chanel_df[(chanel_df['supplier_name'] == '易迅捷') & (chanel_df['balance'] <= 10000)] fy_df = chanel_df[(chanel_df['supplier_name'] == '枫叶') & (chanel_df['balance'] <= 10000)] zx_df = chanel_df[(chanel_df['supplier_name'] == '智信') & (chanel_df['balance'] <= 2000)] sht_df = cus_df[(cus_df['customer_id'] == 54) & (cus_df['available_balance'] < 50000)] #十荟团 ylb_df = cus_df[(cus_df['customer_id'] == 47) & (cus_df['available_balance'] < 40000)] #云喇叭 fql_df = cus_df[(cus_df['customer_id'] == 66) & (cus_df['available_balance'] < 20000)] #分期乐 ''' #print(fql_df) timestamp = time.time() str_time = time.strftime('%H:%M',time.localtime(timestamp)) str_time_int = int(str(str_time).split(':')[-1]) print(str(str_time).split(':')[-1]) ins_sql = """ INSERT INTO balance_monitoring VALUES(0,%s,%s,1,%s) """ que_sql = " SELECT id FROM balance_monitoring WHERE msg LIKE '%{}%' AND group_name = '{}' " upd_sql = " UPDATE balance_monitoring SET msg = %s,status = 1,timestamp = %s WHERE id = %s" group_que_sql = " SELECT id FROM balance_monitoring WHERE group_name = '{}' " msg = str_time + ': {} 余额为 {},请及时充值' cus_msg = str_time + ': {} 可用额度为 {};额度不足,请及时处理' ord_msg = str_time + ': 超过1小时订单总数量为:{},手机号为:\n {}' sup_msg = str_time + ':超过30分钟无成功的通道:\n{}' ord_count_msg = ' {} 面额 {} :{} ; ' #art_msg = str_time + ':自动补单2中充值订单 :{}' #print(ord_df) #print(sup_df['channel_id']) #mon_cursor = mon_db.cursor() try: #print(3333) mon_cursor = mon_db.cursor() #print(mon_cursor) for item in data_json: #print(item) channels = item['channels'] customers = item['customers'] #print(channels) #print(customes) for channel in channels: supplier_name = channel['supplier_name'] balance = channel['balance'] #print(supplier_name) #print(balance) ch_df = chanel_df[(chanel_df['supplier_name'] == supplier_name) & (chanel_df['balance'] < balance)] #print(ch_df) if ch_df.empty is False: sup_name = ch_df['supplier_name'].values[0] balance = ch_df['balance'].values[0] channel_msg = msg.format(sup_name,balance) mon_cursor.execute(que_sql.format(supplier_name,group_name1)) channel_ids = mon_cursor.fetchall() #print('channel_id is {}'.format(channel_ids[0][0])) if channel_ids is not None: mon_cursor.execute(upd_sql,(channel_msg,int(timestamp),channel_ids[0][0])) else: mon_cursor.execute(ins_sql, (channel_msg,group_name1,int(timestamp))) #print(88888) for customer in customers: customer_id = customer['customer_id'] available_balance = customer['available_balance'] customer_df = cus_df[(cus_df['customer_id'] == customer_id) & (cus_df['available_balance'] < available_balance)] #if customer_df.empty is False: if((customer_df.empty is False) and (str_time_int % 30 == 0)): cus_name = customer_df['customer_name'].values[0] balance = customer_df['available_balance'].values[0] customer_msg = cus_msg.format(cus_name,balance) mon_cursor.execute(que_sql.format(customer_name,group_name3)) customer_ids = mon_cursor.fetchall() #print('channel_id is {}'.format(channel_ids[0][0])) if customer_ids is not None: mon_cursor.execute(upd_sql,(customer_msg,int(timestamp),customer_ids[0][0])) else: mon_cursor.execute(ins_sql,(customer_msg,group_name3,int(timestamp))) #print(customer_msg) ''' if(mf_df.empty is False): #print(444) sup_name = mf_df['supplier_name'].values[0] balance = mf_df['balance'].values[0] mf_msg = msg.format(sup_name,balance) mon_cursor.execute(ins_sql, (mf_msg,group_name1,int(timestamp))) #print(444) if(zr_df.empty is False): sup_name = zr_df['supplier_name'].values[0] balance = zr_df['balance'].values[0] zr_msg = msg.format(sup_name,balance) mon_cursor.execute(ins_sql, (zr_msg,group_name1,int(timestamp))) if(zrwt_df.empty is False): sup_name = zrwt_df['supplier_name'].values[0] balance = zrwt_df['balance'].values[0] zrwt_msg = msg.format(sup_name,balance) mon_cursor.execute(ins_sql, (zrwt_msg,group_name1,int(timestamp))) #print(sup_name) if(yxj_df.empty is False): sup_name = yxj_df['supplier_name'].values[0] balance = yxj_df['balance'].values[0] yxj_msg = msg.format(sup_name,balance) mon_cursor.execute(ins_sql, (yxj_msg,group_name1,int(timestamp))) #print(sup_name) if(fy_df.empty is False): sup_name = fy_df['supplier_name'].values[0] balance = fy_df['balance'].values[0] fy_msg = msg.format(sup_name,balance) mon_cursor.execute(ins_sql, (fy_msg,group_name1,int(timestamp))) #print(sup_name) if(zx_df.empty is False): sup_name = zx_df['supplier_name'].values[0] balance = zx_df['balance'].values[0] zx_msg = msg.format(sup_name,balance) mon_cursor.execute(ins_sql, (zx_msg,group_name1,int(timestamp))) #print(sup_name) if((sht_df.empty is False) and (str_time_int % 30 == 0)): cus_name = sht_df['customer_name'].values[0] balance = sht_df['available_balance'].values[0] sht_msg = cus_msg.format(cus_name,balance) mon_cursor.execute(ins_sql,(sht_msg,group_name3,int(timestamp))) #print(cus_name) if((ylb_df.empty is False) and (str_time_int % 30 == 0)): cus_name = ylb_df['customer_name'].values[0] balance = ylb_df['available_balance'].values[0] ylb_msg = cus_msg.format(cus_name,balance) mon_cursor.execute(ins_sql,(ylb_msg,group_name3,int(timestamp))) #print(222) if((fql_df.empty is False) and (str_time_int % 30 == 0)): cus_name = fql_df['customer_name'].values[0] balance = fql_df['available_balance'].values[0] fql_msg = cus_msg.format(cus_name,balance) mon_cursor.execute(ins_sql,(fql_msg,group_name3,int(timestamp))) #print('aaa') ''' #print(str_time_int % 10) #超时订单 if((ord_df.empty is False) and (str_time_int % 10 == 0)): #print(len(ord_df)) ord_que_sql = group_que_sql.format(group_name4) total_num = ord_df.shape[0] phone_list = [] if total_num >10: phone_list = ord_df['used_mobile'].head(10).tolist() else: phone_list = ord_df['used_mobile'].values.tolist() #print(total_num) #print(phone_list) ord_msg1 = ord_msg.format(total_num,phone_list) #print(ord_msg1) mon_cursor.execute(ord_que_sql) over_ids = mon_cursor.fetchall() if over_ids is not None: mon_cursor.execute(upd_sql,(ord_msg1,int(timestamp),over_ids[0][0])) else: mon_cursor.execute(ins_sql,(ord_msg1,group_name4,int(timestamp))) logging.info(ord_msg1) #sup_total_list = ['83','84','85','89','90','91','95','104','105','116','117','118','119','120','121','122','123'] #补单中的订单: if(ord_count_df.empty is False) and (str_time_int % 10 == 0): #if(ord_count_df.empty is False) : ord_count_list = ord_count_df.values.tolist() #print(ord_count_list) temp_msg = '' total_count = 0 for i in range(len(ord_count_list)): total_count += ord_count_list[i][2] if i == 0: temp_msg = str_time + ' 补单中的订单数量为:\n {} 面额 {} :{} ; '.format(ord_count_list[i][0], int(ord_count_list[i][1]),ord_count_list[i][2]) else: temp_msg += ord_count_msg.format(ord_count_list[i][0],int(ord_count_list[i][1]),ord_count_list[i][2]) temp_msg = temp_msg + '补单总数量为:【{}】。'.format(total_count) #print(temp_msg) mon_cursor.execute(group_que_sql.format(group_name5)) recharge_ids = mon_cursor.fetchall() if recharge_ids is not None: mon_cursor.execute(upd_sql,(temp_msg,int(timestamp),recharge_ids[0][0])) else: mon_cursor.execute(ins_sql,(temp_msg1,group_name5,int(timestamp))) #mon_cursor.execute(ins_sql,(temp_msg,group_name5,int(timestamp))) #自动补单2中的订单 if(art_df.empty is False): #print(art_df) art_count_list = art_df.values.tolist() #print(art_count_list) temp_msg = '' total_count = 0 for i in range(len(art_count_list)): total_count += art_count_list[i][2] #print(total_count) if i == 0: temp_msg = str_time + ' 补单2中的订单数量为:\n {} 面额 {} :{} ; '.format(art_count_list[i][0], int(art_count_list[i][1]),art_count_list[i][2]) #iprint(temp_msg) else: temp_msg += ord_count_msg.format(art_count_list[i][0],int(art_count_list[i][1]),art_count_list[i][2]) temp_msg = temp_msg + '补单2总数量为:【{}】。'.format(total_count) #print(temp_msg) mon_cursor.execute(group_que_sql.format(group_name6)) recharge_ids = mon_cursor.fetchall() if recharge_ids is not None: mon_cursor.execute(upd_sql,(temp_msg,int(timestamp),recharge_ids[0][0])) else: mon_cursor.execute(ins_sql,(temp_msg,group_name6,int(timestamp))) #mon_cursor.execute(ins_sql,(temp_msg,group_name6,int(timestamp))) sup_total_list = ['83','84','85','89','90','91','95','105','116','118','119','120','121','122', '123','125','126','127','142','143','144'] sup_list = list(sup_df['channel_id'].values) #print(sup_list) dif_list = [i for i in sup_total_list if i not in sup_list] #print(dif_list) if (len(dif_list) > 0) and (str_time_int % 10 == 0): for i in range(len(dif_list)): if dif_list[i] == '83': dif_list[i] = '满帆移动WT' elif dif_list[i] == '84': dif_list[i] = '满帆联通WT' elif dif_list[i] == '85': dif_list[i] = '满帆电信WT' elif dif_list[i] == '89': dif_list[i] = '兆蓉移动' elif dif_list[i] == '90': dif_list[i] = '兆蓉联通' elif dif_list[i] == '91': dif_list[i] = '兆蓉电信' elif dif_list[i] == '95': dif_list[i] = '兆蓉移动WT' elif dif_list[i] == '142': dif_list[i] = '亚杉移动WT2' elif dif_list[i] == '105': dif_list[i] = '兆蓉电信WT' elif dif_list[i] == '116': dif_list[i] = '兆蓉联通WT' elif dif_list[i] == '144': dif_list[i] = '亚杉电信WT2' elif dif_list[i] == '118': dif_list[i] = '智信全国移动' elif dif_list[i] == '119': dif_list[i] = '智信全国联通' elif dif_list[i] == '120': dif_list[i] = '智信全国电信' elif dif_list[i] == '121': dif_list[i] = '易迅捷移动WT' elif dif_list[i] == '122': dif_list[i] = '易迅捷电信WT' elif dif_list[i] == '123': dif_list[i] = '易迅捷联通WT' elif dif_list[i] == '143': dif_list[i] = '亚杉联通WT2' elif dif_list[i] == '145': dif_list[i] = '幻星移动WT' elif dif_list[i] == '146': dif_list[i] = '幻星联通WT' elif dif_list[i] == '147': dif_list[i] = '幻星电信WT' elif dif_list[i] == '148': dif_list[i] = '立纵电信WT' elif dif_list[i] == '149': dif_list[i] = '立纵联通WT' elif dif_list[i] == '150': dif_list[i] = '立纵移动WT' elif dif_list[i] == '151': dif_list[i] = 'A11移动WT' elif dif_list[i] == '152': dif_list[i] = 'A11联通WT' elif dif_list[i] == '153': dif_list[i] = 'A11电信WT' elif dif_list[i] == '125': dif_list[i] = '枫叶移动WT' elif dif_list[i] == '126': dif_list[i] = '枫叶联通WT' elif dif_list[i] == '127': dif_list[i] = '枫叶电信WT' dif_list = sorted(dif_list) #print(dif_list) sup_msg1 = sup_msg.format(dif_list) mon_cursor.execute(group_que_sql.format(group_name2)) recharge_ids = mon_cursor.fetchall() if recharge_ids is not None: mon_cursor.execute(upd_sql,(sup_msg1,int(timestamp),recharge_ids[0][0])) else: mon_cursor.execute(ins_sql,(sup_msg1,group_name2,int(timestamp))) #mon_cursor.execute(ins_sql,(sup_msg1,group_name2,int(timestamp))) logging.info(sup_msg1) #print(dif_list) except Exception as ex: #mon_db.rollback() print(ex) print('数据回滚') finally: mon_cursor.close mon_db.close if __name__ == '__main__': #数据库连接信息 chanel_db_config = { 'host' : '47.95.217.180', 'port' : 3306, 'user' : 'root', 'password' : '93DkChZMgZRyCbWh', 'db' : 'fmp', 'charset' : 'utf8', 'autocommit' : 1 } #监控消息数据库连接信息 mon_db_config = { 'host' : '127.0.0.1', 'port' : 9001, 'user' : 'root', 'password' : 'nibuzhidaowozhidao', 'db' : 'monitoring', 'charset' : 'utf8', 'autocommit' : 1 } #数据库连接 #db = conMysql(mysql_host,mysql_port,mysql_user,mysql_password,mysql_db) chanel_db = createPool(chanel_db_config).connection() mon_db = createPool(mon_db_config).connection() #通道余额 chanel_sql = '''SELECT supplier_name,balance FROM channel_supplier WHERE supplier_name like '%兆蓉%' OR supplier_name like '%满帆%' OR supplier_name like '%亚杉%' OR supplier_name like '%易迅捷%' OR supplier_name like '%枫叶%' OR supplier_name like '%智信%' ''' #客户余额 cus_sql = ''' SELECT customer_id,customer_name,(balance + credit_amount - current_amount) 'available_balance' FROM customer_info WHERE customer_id IN(47,54) ''' #超过1小时未处理订单 ord_sql = ''' SELECT used_mobile,(UNIX_TIMESTAMP(now()) - UNIX_TIMESTAMP(apply_date)) 'used_time' FROM flow_order_info WHERE (UNIX_TIMESTAMP(now()) - UNIX_TIMESTAMP(apply_date)) >=3600 AND status NOT IN(4,6) AND channel_id NOT IN ( 101,102,103) ORDER BY used_time DESC; ''' #半小时之内成功的通道 sup_sql = ''' SELECT channel_id FROM flow_order_info WHERE (UNIX_TIMESTAMP(now()) - UNIX_TIMESTAMP(check_time)) <=1800 AND status = 6 AND channel_id NOT IN (92,93,94,96,97,98,101,102,103,107,108,109,110,111,112,113,114,115) GROUP BY channel_id; ''' #自动补单充值中的订单数量: ord_count_sql = """ SELECT substring_index(mobile_home,'-',-1) provider , flow_amount,count(*) count_num FROM flow_order_info WHERE status NOT IN(4,6) AND channel_id IN ( 96,97,98) GROUP BY provider,flow_amount ORDER BY provider,flow_amount""" #自动补单2充值中的订单 art_sql = """ SELECT substring_index(mobile_home,'-',-1) provider , flow_amount,count(*) count_num FROM flow_order_info WHERE status NOT IN(4,6) AND channel_id IN ( 113,114,115) GROUP BY provider,flow_amount ORDER BY provider,flow_amount """ chanel_df = getData(chanel_sql,chanel_db) cus_df = getData(cus_sql,chanel_db) ord_df = getData(ord_sql,chanel_db) sup_df = getData(sup_sql,chanel_db) ord_count_df = getData(ord_count_sql,chanel_db) art_df = getData(art_sql,chanel_db) chanel_db.close #群名: group_name1 = '通道余额监控群' group_name2 = '30分钟无成功通道监控群' group_name3 = '客户授信监控群' group_name4 = '超时订单监控群' group_name5 = '补单订单数量监控群' group_name6 = '自动补单2监控群' saveData()