balance_monitoring.py 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493
  1. # -*- encoding=utf8 -*-
  2. import re
  3. import time
  4. import json
  5. import logging
  6. import pandas as pd
  7. import pymysql
  8. from DBUtils.PooledDB import PooledDB
  9. #配置输出日志格式
  10. LOG_FORMAT = '%(asctime)s %(filename)s[line:%(lineno)d] %(name)s %(levelname)s %(pathname)s %(message)s'
  11. #配置输出时间格式
  12. DATE_FORMAT = '%Y-%m-%d %H-%M-%S %a'
  13. logging.basicConfig(level = logging.INFO,
  14. format = LOG_FORMAT,
  15. datefmt = DATE_FORMAT,
  16. filename = r"./balance_monitoring.log")
  17. comment_re = re.compile(
  18. '(^)?[^\S\n]*/(?:\*(.*?)\*/[^\S\n]*|/[^\n]*)($)?',
  19. re.DOTALL | re.MULTILINE
  20. )
  21. #数据库连接
  22. def conMysql(mysql_host,mysql_port,mysql_user,mysql_password,mysql_db):
  23. db = pymysql.connect(host=mysql_host,port=mysql_port,user=mysql_user, password=mysql_password,db=mysql_db,charset = 'utf8',
  24. autocommit = 1)
  25. print("数据库连接成功")
  26. return db
  27. #创建数据库连接池
  28. def createPool(db_config):
  29. spool = PooledDB(pymysql, 5, **db_config)
  30. return spool
  31. #查询余额
  32. def getData(sql,db):
  33. df = pd.read_sql(sql,con=db)
  34. db.close
  35. #print(df)
  36. return df
  37. #解析带注释的json
  38. def parse_json(filename):
  39. with open(filename,encoding='utf8') as f:
  40. content = ''.join(f.readlines())
  41. ## Looking for comments
  42. match = comment_re.search(content)
  43. while match:
  44. # single line comment
  45. content = content[:match.start()] + content[match.end():]
  46. match = comment_re.search(content)
  47. #print(content)
  48. # Return json file
  49. return json.loads(content)
  50. def saveData():
  51. data_json = parse_json(r'./config.json')
  52. #print(json_data)
  53. '''
  54. ys_df = chanel_df[(chanel_df['supplier_name'] == '亚杉') & (chanel_df['balance'] < 0)]
  55. mf_df = chanel_df[(chanel_df['supplier_name'] == '满帆起航') & (chanel_df['balance'] <= -45000)]
  56. zr_df = chanel_df[(chanel_df['supplier_name'] == '兆蓉') & (chanel_df['balance'] <= 0)]
  57. zrwt_df = chanel_df[(chanel_df['supplier_name'] == '兆蓉WT') & (chanel_df['balance'] <= 5000)]
  58. yxj_df = chanel_df[(chanel_df['supplier_name'] == '易迅捷') & (chanel_df['balance'] <= 10000)]
  59. fy_df = chanel_df[(chanel_df['supplier_name'] == '枫叶') & (chanel_df['balance'] <= 10000)]
  60. zx_df = chanel_df[(chanel_df['supplier_name'] == '智信') & (chanel_df['balance'] <= 2000)]
  61. sht_df = cus_df[(cus_df['customer_id'] == 54) & (cus_df['available_balance'] < 50000)] #十荟团
  62. ylb_df = cus_df[(cus_df['customer_id'] == 47) & (cus_df['available_balance'] < 40000)] #云喇叭
  63. fql_df = cus_df[(cus_df['customer_id'] == 66) & (cus_df['available_balance'] < 20000)] #分期乐
  64. '''
  65. #print(fql_df)
  66. timestamp = time.time()
  67. str_time = time.strftime('%H:%M',time.localtime(timestamp))
  68. str_time_int = int(str(str_time).split(':')[-1])
  69. print(str(str_time).split(':')[-1])
  70. ins_sql = """ INSERT INTO balance_monitoring VALUES(0,%s,%s,1,%s) """
  71. que_sql = " SELECT id FROM balance_monitoring WHERE msg LIKE '%{}%' AND group_name = '{}' "
  72. upd_sql = " UPDATE balance_monitoring SET msg = %s,status = 1,timestamp = %s WHERE id = %s"
  73. group_que_sql = " SELECT id FROM balance_monitoring WHERE group_name = '{}' "
  74. msg = str_time + ': {} 余额为 {},请及时充值'
  75. cus_msg = str_time + ': {} 可用额度为 {};额度不足,请及时处理'
  76. ord_msg = str_time + ': 超过1小时订单总数量为:{},手机号为:\n {}'
  77. sup_msg = str_time + ':超过30分钟无成功的通道:\n{}'
  78. ord_count_msg = ' {} 面额 {} :{} ; '
  79. #art_msg = str_time + ':自动补单2中充值订单 :{}'
  80. #print(ord_df)
  81. #print(sup_df['channel_id'])
  82. #mon_cursor = mon_db.cursor()
  83. try:
  84. #print(3333)
  85. mon_cursor = mon_db.cursor()
  86. #print(mon_cursor)
  87. for item in data_json:
  88. #print(item)
  89. channels = item['channels']
  90. customers = item['customers']
  91. #print(channels)
  92. #print(customes)
  93. for channel in channels:
  94. supplier_name = channel['supplier_name']
  95. balance = channel['balance']
  96. #print(supplier_name)
  97. #print(balance)
  98. ch_df = chanel_df[(chanel_df['supplier_name'] == supplier_name) & (chanel_df['balance'] < balance)]
  99. #print(ch_df)
  100. if ch_df.empty is False:
  101. sup_name = ch_df['supplier_name'].values[0]
  102. balance = ch_df['balance'].values[0]
  103. channel_msg = msg.format(sup_name,balance)
  104. #print(channel_msg)
  105. mon_cursor.execute(que_sql.format(supplier_name,group_name1))
  106. channel_ids = mon_cursor.fetchall()
  107. #print('channel_id is {}'.format(channel_ids[0][0]))
  108. if len(channel_ids) > 0 :
  109. mon_cursor.execute(upd_sql,(channel_msg,int(timestamp),channel_ids[0][0]))
  110. else:
  111. mon_cursor.execute(ins_sql, (channel_msg,group_name1,int(timestamp)))
  112. #print(88888)
  113. for customer in customers:
  114. customer_id = customer['customer_id']
  115. available_balance = customer['available_balance']
  116. customer_df = cus_df[(cus_df['customer_id'] == customer_id) & (cus_df['available_balance'] < available_balance)]
  117. #if customer_df.empty is False:
  118. if((customer_df.empty is False) and (str_time_int % 30 == 0)):
  119. cus_name = customer_df['customer_name'].values[0]
  120. balance = customer_df['available_balance'].values[0]
  121. customer_msg = cus_msg.format(cus_name,balance)
  122. mon_cursor.execute(que_sql.format(customer_name,group_name3))
  123. customer_ids = mon_cursor.fetchall()
  124. #print('channel_id is {}'.format(channel_ids[0][0]))
  125. if customer_ids is not None:
  126. mon_cursor.execute(upd_sql,(customer_msg,int(timestamp),customer_ids[0][0]))
  127. else:
  128. mon_cursor.execute(ins_sql,(customer_msg,group_name3,int(timestamp)))
  129. #print(customer_msg)
  130. '''
  131. if(mf_df.empty is False):
  132. #print(444)
  133. sup_name = mf_df['supplier_name'].values[0]
  134. balance = mf_df['balance'].values[0]
  135. mf_msg = msg.format(sup_name,balance)
  136. mon_cursor.execute(ins_sql, (mf_msg,group_name1,int(timestamp)))
  137. #print(444)
  138. if(zr_df.empty is False):
  139. sup_name = zr_df['supplier_name'].values[0]
  140. balance = zr_df['balance'].values[0]
  141. zr_msg = msg.format(sup_name,balance)
  142. mon_cursor.execute(ins_sql, (zr_msg,group_name1,int(timestamp)))
  143. if(zrwt_df.empty is False):
  144. sup_name = zrwt_df['supplier_name'].values[0]
  145. balance = zrwt_df['balance'].values[0]
  146. zrwt_msg = msg.format(sup_name,balance)
  147. mon_cursor.execute(ins_sql, (zrwt_msg,group_name1,int(timestamp)))
  148. #print(sup_name)
  149. if(yxj_df.empty is False):
  150. sup_name = yxj_df['supplier_name'].values[0]
  151. balance = yxj_df['balance'].values[0]
  152. yxj_msg = msg.format(sup_name,balance)
  153. mon_cursor.execute(ins_sql, (yxj_msg,group_name1,int(timestamp)))
  154. #print(sup_name)
  155. if(fy_df.empty is False):
  156. sup_name = fy_df['supplier_name'].values[0]
  157. balance = fy_df['balance'].values[0]
  158. fy_msg = msg.format(sup_name,balance)
  159. mon_cursor.execute(ins_sql, (fy_msg,group_name1,int(timestamp)))
  160. #print(sup_name)
  161. if(zx_df.empty is False):
  162. sup_name = zx_df['supplier_name'].values[0]
  163. balance = zx_df['balance'].values[0]
  164. zx_msg = msg.format(sup_name,balance)
  165. mon_cursor.execute(ins_sql, (zx_msg,group_name1,int(timestamp)))
  166. #print(sup_name)
  167. if((sht_df.empty is False) and (str_time_int % 30 == 0)):
  168. cus_name = sht_df['customer_name'].values[0]
  169. balance = sht_df['available_balance'].values[0]
  170. sht_msg = cus_msg.format(cus_name,balance)
  171. mon_cursor.execute(ins_sql,(sht_msg,group_name3,int(timestamp)))
  172. #print(cus_name)
  173. if((ylb_df.empty is False) and (str_time_int % 30 == 0)):
  174. cus_name = ylb_df['customer_name'].values[0]
  175. balance = ylb_df['available_balance'].values[0]
  176. ylb_msg = cus_msg.format(cus_name,balance)
  177. mon_cursor.execute(ins_sql,(ylb_msg,group_name3,int(timestamp)))
  178. #print(222)
  179. if((fql_df.empty is False) and (str_time_int % 30 == 0)):
  180. cus_name = fql_df['customer_name'].values[0]
  181. balance = fql_df['available_balance'].values[0]
  182. fql_msg = cus_msg.format(cus_name,balance)
  183. mon_cursor.execute(ins_sql,(fql_msg,group_name3,int(timestamp)))
  184. #print('aaa')
  185. '''
  186. #print(str_time_int % 10)
  187. #超时订单
  188. if((ord_df.empty is False) and (str_time_int % 10 == 0)):
  189. #print(len(ord_df))
  190. ord_que_sql = group_que_sql.format(group_name4)
  191. total_num = ord_df.shape[0]
  192. phone_list = []
  193. '''
  194. if total_num >10:
  195. phone_list = ord_df['used_mobile'].head(10).tolist()
  196. else:
  197. phone_list = ord_df['used_mobile'].values.tolist()
  198. '''
  199. if total_num >=50:
  200. phone_list = ord_df['used_mobile'].head(10).tolist()
  201. #print(total_num)
  202. #print(phone_list)
  203. ord_msg1 = ord_msg.format(total_num,phone_list)
  204. #print(ord_msg1)
  205. mon_cursor.execute(ord_que_sql)
  206. over_ids = mon_cursor.fetchall()
  207. if over_ids is not None:
  208. mon_cursor.execute(upd_sql,(ord_msg1,int(timestamp),over_ids[0][0]))
  209. else:
  210. mon_cursor.execute(ins_sql,(ord_msg1,group_name4,int(timestamp)))
  211. logging.info(ord_msg1)
  212. #sup_total_list = ['83','84','85','89','90','91','95','104','105','116','117','118','119','120','121','122','123']
  213. #补单中的订单:
  214. if(ord_count_df.empty is False) and (str_time_int % 10 == 0):
  215. #if(ord_count_df.empty is False) :
  216. ord_count_list = ord_count_df.values.tolist()
  217. #print(ord_count_list)
  218. temp_msg = ''
  219. total_count = 0
  220. for i in range(len(ord_count_list)):
  221. total_count += ord_count_list[i][2]
  222. if i == 0:
  223. temp_msg = str_time + ' 补单中的订单数量为:\n {} 面额 {} :{} ; '.format(ord_count_list[i][0],
  224. int(ord_count_list[i][1]),ord_count_list[i][2])
  225. else:
  226. temp_msg += ord_count_msg.format(ord_count_list[i][0],int(ord_count_list[i][1]),ord_count_list[i][2])
  227. temp_msg = temp_msg + '补单总数量为:【{}】。'.format(total_count)
  228. #print(temp_msg)
  229. mon_cursor.execute(group_que_sql.format(group_name5))
  230. recharge_ids = mon_cursor.fetchall()
  231. if recharge_ids is not None:
  232. mon_cursor.execute(upd_sql,(temp_msg,int(timestamp),recharge_ids[0][0]))
  233. else:
  234. mon_cursor.execute(ins_sql,(temp_msg1,group_name5,int(timestamp)))
  235. #mon_cursor.execute(ins_sql,(temp_msg,group_name5,int(timestamp)))
  236. #自动补单2中的订单
  237. if(art_df.empty is False):
  238. #print(art_df)
  239. art_count_list = art_df.values.tolist()
  240. #print(art_count_list)
  241. temp_msg = ''
  242. total_count = 0
  243. for i in range(len(art_count_list)):
  244. total_count += art_count_list[i][2]
  245. #print(total_count)
  246. if i == 0:
  247. temp_msg = str_time + ' 补单2中的订单数量为:\n {} 面额 {} :{} ; '.format(art_count_list[i][0],
  248. int(art_count_list[i][1]),art_count_list[i][2])
  249. #iprint(temp_msg)
  250. else:
  251. temp_msg += ord_count_msg.format(art_count_list[i][0],int(art_count_list[i][1]),art_count_list[i][2])
  252. temp_msg = temp_msg + '补单2总数量为:【{}】。'.format(total_count)
  253. #print(temp_msg)
  254. mon_cursor.execute(group_que_sql.format(group_name6))
  255. recharge_ids = mon_cursor.fetchall()
  256. if recharge_ids is not None:
  257. mon_cursor.execute(upd_sql,(temp_msg,int(timestamp),recharge_ids[0][0]))
  258. else:
  259. mon_cursor.execute(ins_sql,(temp_msg,group_name6,int(timestamp)))
  260. #mon_cursor.execute(ins_sql,(temp_msg,group_name6,int(timestamp)))
  261. sup_total_list = data_json[0]['channel_ids']
  262. #print(type(sup_total_list))
  263. sup_list = list(sup_df['channel_id'].values)
  264. #print(sup_list)
  265. dif_list = [i for i in sup_total_list if i not in sup_list]
  266. #print(dif_list)
  267. if (len(dif_list) > 0) and (str_time_int % 10 == 0):
  268. for i in range(len(dif_list)):
  269. if dif_list[i] == '83':
  270. dif_list[i] = '满帆移动'
  271. elif dif_list[i] == '84':
  272. dif_list[i] = '满帆联通'
  273. elif dif_list[i] == '85':
  274. dif_list[i] = '满帆电信'
  275. elif dif_list[i] == '89':
  276. dif_list[i] = '兆蓉移动'
  277. elif dif_list[i] == '90':
  278. dif_list[i] = '兆蓉联通'
  279. elif dif_list[i] == '91':
  280. dif_list[i] = '兆蓉电信'
  281. elif dif_list[i] == '95':
  282. dif_list[i] = '兆蓉移动低价'
  283. elif dif_list[i] == '142':
  284. dif_list[i] = '亚杉移动'
  285. elif dif_list[i] == '105':
  286. dif_list[i] = '兆蓉电信低价'
  287. elif dif_list[i] == '116':
  288. dif_list[i] = '兆蓉联通低价'
  289. elif dif_list[i] == '144':
  290. dif_list[i] = '亚杉电信'
  291. elif dif_list[i] == '118':
  292. dif_list[i] = '智信全国移动'
  293. elif dif_list[i] == '119':
  294. dif_list[i] = '智信全国联通'
  295. elif dif_list[i] == '120':
  296. dif_list[i] = '智信全国电信'
  297. elif dif_list[i] == '121':
  298. dif_list[i] = '易迅捷移动'
  299. elif dif_list[i] == '122':
  300. dif_list[i] = '易迅捷电信'
  301. elif dif_list[i] == '123':
  302. dif_list[i] = '易迅捷联通'
  303. elif dif_list[i] == '143':
  304. dif_list[i] = '亚杉联通2'
  305. elif dif_list[i] == '145':
  306. dif_list[i] = '幻星移动'
  307. elif dif_list[i] == '146':
  308. dif_list[i] = '幻星联通'
  309. elif dif_list[i] == '147':
  310. dif_list[i] = '幻星电信'
  311. elif dif_list[i] == '148':
  312. dif_list[i] = 'L26电信'
  313. elif dif_list[i] == '149':
  314. dif_list[i] = 'L26联通'
  315. elif dif_list[i] == '150':
  316. dif_list[i] = 'L26移动'
  317. elif dif_list[i] == '151':
  318. dif_list[i] = 'A11移动'
  319. elif dif_list[i] == '152':
  320. dif_list[i] = 'A11联通'
  321. elif dif_list[i] == '153':
  322. dif_list[i] = 'A11电信'
  323. elif dif_list[i] == '125':
  324. dif_list[i] = '枫叶移动'
  325. elif dif_list[i] == '126':
  326. dif_list[i] = '枫叶联通'
  327. elif dif_list[i] == '127':
  328. dif_list[i] = '枫叶电信'
  329. elif dif_list[i] == '156':
  330. dif_list[i] = 'H1移动'
  331. elif dif_list[i] == '157':
  332. dif_list[i] = 'H1联通'
  333. elif dif_list[i] == '158':
  334. dif_list[i] = 'H1电信'
  335. dif_list = sorted(dif_list)
  336. #print(dif_list)
  337. sup_msg1 = sup_msg.format(dif_list)
  338. mon_cursor.execute(group_que_sql.format(group_name2))
  339. recharge_ids = mon_cursor.fetchall()
  340. if recharge_ids is not None:
  341. mon_cursor.execute(upd_sql,(sup_msg1,int(timestamp),recharge_ids[0][0]))
  342. else:
  343. mon_cursor.execute(ins_sql,(sup_msg1,group_name2,int(timestamp)))
  344. #mon_cursor.execute(ins_sql,(sup_msg1,group_name2,int(timestamp)))
  345. logging.info(sup_msg1)
  346. #print(dif_list)
  347. except Exception as ex:
  348. #mon_db.rollback()
  349. print(ex)
  350. print('数据回滚')
  351. finally:
  352. mon_cursor.close
  353. mon_db.close
  354. if __name__ == '__main__':
  355. #数据库连接信息
  356. chanel_db_config = {
  357. 'host' : '47.95.217.180',
  358. 'port' : 3306,
  359. 'user' : 'root',
  360. 'password' : '93DkChZMgZRyCbWh',
  361. 'db' : 'fmp',
  362. 'charset' : 'utf8',
  363. 'autocommit' : 1
  364. }
  365. #监控消息数据库连接信息
  366. mon_db_config = {
  367. 'host' : '127.0.0.1',
  368. 'port' : 9001,
  369. 'user' : 'root',
  370. 'password' : 'nibuzhidaowozhidao',
  371. 'db' : 'monitoring',
  372. 'charset' : 'utf8',
  373. 'autocommit' : 1
  374. }
  375. #数据库连接
  376. #db = conMysql(mysql_host,mysql_port,mysql_user,mysql_password,mysql_db)
  377. chanel_db = createPool(chanel_db_config).connection()
  378. mon_db = createPool(mon_db_config).connection()
  379. #通道余额
  380. chanel_sql = "SELECT supplier_name,balance FROM channel_supplier"
  381. #客户余额
  382. cus_sql = '''
  383. SELECT customer_id,customer_name,(balance + credit_amount - current_amount) 'available_balance'
  384. FROM customer_info
  385. WHERE customer_id IN(47,54)
  386. '''
  387. #超过1小时未处理订单
  388. ord_sql = ''' SELECT used_mobile,(UNIX_TIMESTAMP(now()) - UNIX_TIMESTAMP(apply_date)) 'used_time'
  389. FROM flow_order_info
  390. WHERE
  391. (UNIX_TIMESTAMP(now()) - UNIX_TIMESTAMP(apply_date)) >=3600 AND status NOT IN(4,6) AND
  392. channel_id NOT IN ( 101,102,103)
  393. ORDER BY used_time DESC;
  394. '''
  395. #半小时之内成功的通道
  396. sup_sql = '''
  397. SELECT
  398. channel_id
  399. FROM
  400. flow_order_info
  401. WHERE
  402. (UNIX_TIMESTAMP(now()) - UNIX_TIMESTAMP(check_time)) <=1800 AND
  403. status = 6 AND
  404. channel_id NOT IN (92,93,94,96,97,98,101,102,103,107,108,109,110,111,112,113,114,115)
  405. GROUP BY channel_id;
  406. '''
  407. #自动补单充值中的订单数量:
  408. ord_count_sql = """ SELECT
  409. substring_index(mobile_home,'-',-1) provider , flow_amount,count(*) count_num
  410. FROM
  411. flow_order_info
  412. WHERE
  413. status NOT IN(4,6) AND channel_id IN ( 96,97,98)
  414. GROUP BY provider,flow_amount ORDER BY provider,flow_amount"""
  415. #自动补单2充值中的订单
  416. art_sql = """ SELECT
  417. substring_index(mobile_home,'-',-1) provider , flow_amount,count(*) count_num
  418. FROM
  419. flow_order_info
  420. WHERE
  421. status NOT IN(4,6) AND channel_id IN ( 113,114,115)
  422. GROUP BY provider,flow_amount ORDER BY provider,flow_amount """
  423. chanel_df = getData(chanel_sql,chanel_db)
  424. cus_df = getData(cus_sql,chanel_db)
  425. ord_df = getData(ord_sql,chanel_db)
  426. sup_df = getData(sup_sql,chanel_db)
  427. ord_count_df = getData(ord_count_sql,chanel_db)
  428. art_df = getData(art_sql,chanel_db)
  429. chanel_db.close
  430. #群名:
  431. group_name1 = '通道余额监控群'
  432. group_name2 = '30分钟无成功通道监控群'
  433. group_name3 = '客户授信监控群'
  434. group_name4 = '超时订单监控群'
  435. group_name5 = '补单订单数量监控群'
  436. group_name6 = '自动补单2监控群'
  437. saveData()