123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401 |
- # -*- encoding=utf8 -*-
- import re
- import time
- import json
- import logging
- import pandas as pd
- import pymysql
- from DBUtils.PooledDB import PooledDB
- #配置输出日志格式
- LOG_FORMAT = '%(asctime)s %(filename)s[line:%(lineno)d] %(name)s %(levelname)s %(pathname)s %(message)s'
- #配置输出时间格式
- DATE_FORMAT = '%Y-%m-%d %H-%M-%S %a'
- logging.basicConfig(level = logging.INFO,
- format = LOG_FORMAT,
- datefmt = DATE_FORMAT,
- filename = r"./balance_monitoring.log")
- comment_re = re.compile(
- '(^)?[^\S\n]*/(?:\*(.*?)\*/[^\S\n]*|/[^\n]*)($)?',
- re.DOTALL | re.MULTILINE
- )
- #数据库连接
- def conMysql(mysql_host,mysql_port,mysql_user,mysql_password,mysql_db):
- db = pymysql.connect(host=mysql_host,port=mysql_port,user=mysql_user, password=mysql_password,db=mysql_db,charset = 'utf8',
- autocommit = 1)
- print("数据库连接成功")
- return db
- #创建数据库连接池
- def createPool(db_config):
- spool = PooledDB(pymysql, 5, **db_config)
- return spool
- #查询余额
- def getData(sql,db):
- df = pd.read_sql(sql,con=db)
- db.close
- #print(df)
- return df
- #解析带注释的json
- def parse_json(filename):
- with open(filename,encoding='utf8') as f:
- content = ''.join(f.readlines())
- ## Looking for comments
- match = comment_re.search(content)
- while match:
- # single line comment
- content = content[:match.start()] + content[match.end():]
- match = comment_re.search(content)
-
- #print(content)
- # Return json file
- return json.loads(content)
- def saveData():
- data_json = parse_json(r'./config.json')
- #print(json_data)
- '''
- ys_df = chanel_df[(chanel_df['supplier_name'] == '亚杉') & (chanel_df['balance'] < 0)]
- mf_df = chanel_df[(chanel_df['supplier_name'] == '满帆起航') & (chanel_df['balance'] <= -45000)]
- zr_df = chanel_df[(chanel_df['supplier_name'] == '兆蓉') & (chanel_df['balance'] <= 0)]
- zrwt_df = chanel_df[(chanel_df['supplier_name'] == '兆蓉WT') & (chanel_df['balance'] <= 5000)]
- yxj_df = chanel_df[(chanel_df['supplier_name'] == '易迅捷') & (chanel_df['balance'] <= 10000)]
- fy_df = chanel_df[(chanel_df['supplier_name'] == '枫叶') & (chanel_df['balance'] <= 10000)]
- zx_df = chanel_df[(chanel_df['supplier_name'] == '智信') & (chanel_df['balance'] <= 2000)]
- sht_df = cus_df[(cus_df['customer_id'] == 54) & (cus_df['available_balance'] < 50000)] #十荟团
- ylb_df = cus_df[(cus_df['customer_id'] == 47) & (cus_df['available_balance'] < 40000)] #云喇叭
- fql_df = cus_df[(cus_df['customer_id'] == 66) & (cus_df['available_balance'] < 20000)] #分期乐
- '''
-
- #print(fql_df)
- timestamp = time.time()
- str_time = time.strftime('%H:%M',time.localtime(timestamp))
- str_time_int = int(str(str_time).split(':')[-1])
- print(str(str_time).split(':')[-1])
- ins_sql = """ INSERT INTO balance_monitoring VALUES(0,%s,%s,1,%s) """
- msg = str_time + ': {} 余额为 {},请及时充值'
- cus_msg = str_time + ': {} 可用额度为 {};额度不足,请及时处理'
- ord_msg = str_time + ': 超过1小时订单总数量为:{},手机号为:\n {}'
- sup_msg = str_time + ':超过30分钟无成功的通道:\n{}'
- ord_count_msg = ' {} 面额 {} :{} ; '
- art_msg = str_time + ':自动补单2中充值订单 :{}'
- #print(ord_df)
- #print(sup_df['channel_id'])
- #mon_cursor = mon_db.cursor()
- try:
- #print(3333)
- mon_cursor = mon_db.cursor()
- #print(mon_cursor)
- for item in data_json:
- #print(item)
- channels = item['channels']
- customers = item['customers']
- #print(channels)
- #print(customes)
- for channel in channels:
- supplier_name = channel['supplier_name']
- balance = channel['balance']
- #print(supplier_name)
- #print(balance)
- ch_df = chanel_df[(chanel_df['supplier_name'] == supplier_name) & (chanel_df['balance'] < balance)]
- #print(ch_df)
- if ch_df.empty is False:
- sup_name = ch_df['supplier_name'].values[0]
- balance = ch_df['balance'].values[0]
- channel_msg = msg.format(sup_name,balance)
- mon_cursor.execute(ins_sql, (channel_msg,group_name1,int(timestamp)))
- #print(88888)
-
- for customer in customers:
- customer_id = customer['customer_id']
- available_balance = customer['available_balance']
- customer_df = cus_df[(cus_df['customer_id'] == 54) & (cus_df['available_balance'] < 50000)]
-
- #if customer_df.empty is True:
- if((customer_df.empty is False) and (str_time_int % 30 == 0)):
- cus_name = customer_df['customer_name'].values[0]
- balance = customer_df['available_balance'].values[0]
- customer_msg = cus_msg.format(cus_name,balance)
- mon_cursor.execute(ins_sql,(customer_msg,group_name3,int(timestamp)))
- #print(customer_msg)
- '''
- if(mf_df.empty is False):
- #print(444)
- sup_name = mf_df['supplier_name'].values[0]
- balance = mf_df['balance'].values[0]
- mf_msg = msg.format(sup_name,balance)
- mon_cursor.execute(ins_sql, (mf_msg,group_name1,int(timestamp)))
- #print(444)
- if(zr_df.empty is False):
- sup_name = zr_df['supplier_name'].values[0]
- balance = zr_df['balance'].values[0]
- zr_msg = msg.format(sup_name,balance)
- mon_cursor.execute(ins_sql, (zr_msg,group_name1,int(timestamp)))
- if(zrwt_df.empty is False):
- sup_name = zrwt_df['supplier_name'].values[0]
- balance = zrwt_df['balance'].values[0]
- zrwt_msg = msg.format(sup_name,balance)
- mon_cursor.execute(ins_sql, (zrwt_msg,group_name1,int(timestamp)))
- #print(sup_name)
- if(yxj_df.empty is False):
- sup_name = yxj_df['supplier_name'].values[0]
- balance = yxj_df['balance'].values[0]
- yxj_msg = msg.format(sup_name,balance)
- mon_cursor.execute(ins_sql, (yxj_msg,group_name1,int(timestamp)))
- #print(sup_name)
- if(fy_df.empty is False):
- sup_name = fy_df['supplier_name'].values[0]
- balance = fy_df['balance'].values[0]
- fy_msg = msg.format(sup_name,balance)
- mon_cursor.execute(ins_sql, (fy_msg,group_name1,int(timestamp)))
- #print(sup_name)
- if(zx_df.empty is False):
- sup_name = zx_df['supplier_name'].values[0]
- balance = zx_df['balance'].values[0]
- zx_msg = msg.format(sup_name,balance)
- mon_cursor.execute(ins_sql, (zx_msg,group_name1,int(timestamp)))
- #print(sup_name)
- if((sht_df.empty is False) and (str_time_int % 30 == 0)):
- cus_name = sht_df['customer_name'].values[0]
- balance = sht_df['available_balance'].values[0]
- sht_msg = cus_msg.format(cus_name,balance)
- mon_cursor.execute(ins_sql,(sht_msg,group_name3,int(timestamp)))
- #print(cus_name)
-
- if((ylb_df.empty is False) and (str_time_int % 30 == 0)):
- cus_name = ylb_df['customer_name'].values[0]
- balance = ylb_df['available_balance'].values[0]
- ylb_msg = cus_msg.format(cus_name,balance)
- mon_cursor.execute(ins_sql,(ylb_msg,group_name3,int(timestamp)))
- #print(222)
-
- if((fql_df.empty is False) and (str_time_int % 30 == 0)):
- cus_name = fql_df['customer_name'].values[0]
- balance = fql_df['available_balance'].values[0]
- fql_msg = cus_msg.format(cus_name,balance)
- mon_cursor.execute(ins_sql,(fql_msg,group_name3,int(timestamp)))
- #print('aaa')
- '''
- print(str_time_int % 10)
- if((ord_df.empty is False) and (str_time_int % 10 == 0)):
- #print(len(ord_df))
- total_num = ord_df.shape[0]
- phone_list = []
- if total_num >10:
- phone_list = ord_df['used_mobile'].head(10).tolist()
- else:
- phone_list = ord_df['used_mobile'].values.tolist()
- #print(total_num)
- #print(phone_list)
- ord_msg1 = ord_msg.format(total_num,phone_list)
- #print(ord_msg1)
- mon_cursor.execute(ins_sql,(ord_msg1,group_name4,int(timestamp)))
- logging.info(ord_msg1)
- #sup_total_list = ['83','84','85','89','90','91','95','104','105','116','117','118','119','120','121','122','123']
-
- #补单中的订单:
- if(ord_count_df.empty is False) and (str_time_int % 10 == 0):
- #if(ord_count_df.empty is False) :
- ord_count_list = ord_count_df.values.tolist()
- #print(ord_count_list)
- temp_msg = ''
- total_count = 0
- for i in range(len(ord_count_list)):
- total_count += ord_count_list[i][2]
- if i == 0:
- temp_msg = str_time + ' 补单中的订单数量为:\n {} 面额 {} :{} ; '.format(ord_count_list[i][0],
- int(ord_count_list[i][1]),ord_count_list[i][2])
- else:
- temp_msg += ord_count_msg.format(ord_count_list[i][0],int(ord_count_list[i][1]),ord_count_list[i][2])
- temp_msg = temp_msg + '补单总数量为:【{}】。'.format(total_count)
- #print(temp_msg)
- mon_cursor.execute(ins_sql,(temp_msg,group_name5,int(timestamp)))
-
- #自动补单2中的订单
- if(art_df.empty is False):
- #print(art_df)
- art_df['flow_amount'] = art_df['flow_amount'].apply(int)
- art_msg1 = art_msg.format(art_df.values.tolist())
- mon_cursor.execute(ins_sql,(art_msg1,group_name6,int(timestamp)))
-
- sup_total_list = ['83','84','85','89','90','91','95','104','105','116','117','118','119','120','121','122',
- '123','124','125','126','127']
- sup_list = list(sup_df['channel_id'].values)
- #print(sup_list)
- dif_list = [i for i in sup_total_list if i not in sup_list]
- #print(dif_list)
- if (len(dif_list) > 0) and (str_time_int % 10 == 0):
- for i in range(len(dif_list)):
- if dif_list[i] == '83':
- dif_list[i] = '满帆移动网厅'
- elif dif_list[i] == '84':
- dif_list[i] = '满帆联通网厅'
- elif dif_list[i] == '85':
- dif_list[i] = '满帆电信网厅'
- elif dif_list[i] == '89':
- dif_list[i] = '兆蓉移动'
- elif dif_list[i] == '90':
- dif_list[i] = '兆蓉联通'
- elif dif_list[i] == '91':
- dif_list[i] = '兆蓉电信'
- elif dif_list[i] == '95':
- dif_list[i] = '兆蓉移动网厅'
- elif dif_list[i] == '104':
- dif_list[i] = '亚杉移动网厅'
- elif dif_list[i] == '105':
- dif_list[i] = '兆蓉电信网厅'
- elif dif_list[i] == '116':
- dif_list[i] = '兆蓉联通网厅'
- elif dif_list[i] == '117':
- dif_list[i] = '亚杉电信网厅'
- elif dif_list[i] == '118':
- dif_list[i] = '智信全国移动'
- elif dif_list[i] == '119':
- dif_list[i] = '智信全国联通'
- elif dif_list[i] == '120':
- dif_list[i] = '智信全国电信'
- elif dif_list[i] == '121':
- dif_list[i] = '易迅捷移动网厅'
- elif dif_list[i] == '122':
- dif_list[i] = '易迅捷电信网厅'
- elif dif_list[i] == '123':
- dif_list[i] = '易迅捷联通网厅'
- elif dif_list[i] == '124':
- dif_list[i] = '亚杉联通网厅'
- elif dif_list[i] == '125':
- dif_list[i] = '枫叶移动网厅'
- elif dif_list[i] == '126':
- dif_list[i] = '枫叶联通网厅'
- elif dif_list[i] == '127':
- dif_list[i] = '枫叶电信网厅'
- dif_list = sorted(dif_list)
- #print(dif_list)
- sup_msg1 = sup_msg.format(dif_list)
- mon_cursor.execute(ins_sql,(sup_msg1,group_name2,int(timestamp)))
- logging.info(sup_msg1)
- #print(dif_list)
-
- except:
- #mon_db.rollback()
- print('数据回滚')
-
- finally:
- mon_cursor.close
- mon_db.close
- if __name__ == '__main__':
- #数据库连接信息
- chanel_db_config = {
- 'host' : '47.95.217.180',
- 'port' : 3306,
- 'user' : 'root',
- 'password' : '93DkChZMgZRyCbWh',
- 'db' : 'fmp',
- 'charset' : 'utf8',
- 'autocommit' : 1
- }
- #监控消息数据库连接信息
- mon_db_config = {
- 'host' : '127.0.0.1',
- 'port' : 9001,
- 'user' : 'root',
- 'password' : 'nibuzhidaowozhidao',
- 'db' : 'monitoring',
- 'charset' : 'utf8',
- 'autocommit' : 1
- }
- #数据库连接
- #db = conMysql(mysql_host,mysql_port,mysql_user,mysql_password,mysql_db)
- chanel_db = createPool(chanel_db_config).connection()
- mon_db = createPool(mon_db_config).connection()
- #通道余额
- chanel_sql = '''SELECT supplier_name,balance FROM channel_supplier
- WHERE
- supplier_name like '%兆蓉%' OR supplier_name like '%满帆%' OR supplier_name like '%亚杉%'
- OR supplier_name like '%易迅捷%' OR supplier_name like '%枫叶%' OR supplier_name like '%智信%'
- '''
-
- #客户余额
- cus_sql = '''
- SELECT customer_id,customer_name,(balance + credit_amount - current_amount) 'available_balance'
- FROM customer_info
- WHERE customer_id IN(47,54)
- '''
-
- #超过1小时未处理订单
- ord_sql = ''' SELECT used_mobile,(UNIX_TIMESTAMP(now()) - UNIX_TIMESTAMP(apply_date)) 'used_time'
- FROM flow_order_info
- WHERE
- (UNIX_TIMESTAMP(now()) - UNIX_TIMESTAMP(apply_date)) >=3600 AND status NOT IN(4,6) AND
- channel_id NOT IN ( 101,102,103)
- ORDER BY used_time DESC;
- '''
-
- #半小时之内成功的通道
- sup_sql = '''
- SELECT
- channel_id
- FROM
- flow_order_info
- WHERE
- (UNIX_TIMESTAMP(now()) - UNIX_TIMESTAMP(check_time)) <=1800 AND
- status = 6 AND
- channel_id NOT IN (92,93,94,96,97,98,101,102,103,107,108,109,110,111,112,113,114,115)
- GROUP BY channel_id;
- '''
- #自动补单充值中的订单数量:
- ord_count_sql = """ SELECT
- substring_index(mobile_home,'-',-1) provider , flow_amount,count(*) count_num
- FROM
- flow_order_info
- WHERE
- status NOT IN(4,6) AND channel_id IN ( 96,97,98)
- GROUP BY provider,flow_amount ORDER BY provider,flow_amount"""
- #自动补单2充值中的订单
- art_sql = """ SELECT
- used_mobile , flow_amount
- FROM
- flow_order_info
- WHERE
- status NOT IN(4,6) AND channel_id IN ( 113,114,115)
- ORDER BY flow_amount """
- chanel_df = getData(chanel_sql,chanel_db)
- cus_df = getData(cus_sql,chanel_db)
- ord_df = getData(ord_sql,chanel_db)
- sup_df = getData(sup_sql,chanel_db)
- ord_count_df = getData(ord_count_sql,chanel_db)
- art_df = getData(art_sql,chanel_db)
- chanel_db.close
-
- #群名:
- group_name1 = '通道余额监控群'
- group_name2 = '30分钟无成功通道监控群'
- group_name3 = '客户授信监控群'
- group_name4 = '超时订单监控群'
- group_name5 = '补单订单数量监控群'
- group_name6 = '自动补单2监控群'
- saveData()
|