balance_monitoring.py 9.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262
  1. # -*- encoding=utf8 -*-
  2. import time
  3. import logging
  4. import pandas as pd
  5. import pymysql
  6. from DBUtils.PooledDB import PooledDB
  7. #配置输出日志格式
  8. LOG_FORMAT = '%(asctime)s %(filename)s[line:%(lineno)d] %(name)s %(levelname)s %(pathname)s %(message)s'
  9. #配置输出时间格式
  10. DATE_FORMAT = '%Y-%m-%d %H-%M-%S %a'
  11. logging.basicConfig(level = logging.INFO,
  12. format = LOG_FORMAT,
  13. datefmt = DATE_FORMAT,
  14. filename = r"./balance_monitoring.log")
  15. #数据库连接
  16. def conMysql(mysql_host,mysql_port,mysql_user,mysql_password,mysql_db):
  17. db = pymysql.connect(host=mysql_host,port=mysql_port,user=mysql_user, password=mysql_password,db=mysql_db,charset = 'utf8',
  18. autocommit = 1)
  19. print("数据库连接成功")
  20. return db
  21. #创建数据库连接池
  22. def createPool(db_config):
  23. spool = PooledDB(pymysql, 5, **db_config)
  24. return spool
  25. #查询余额
  26. def getData(sql,db):
  27. df = pd.read_sql(sql,con=db)
  28. db.close
  29. #print(df)
  30. return df
  31. def saveData():
  32. ys_df = chanel_df[(chanel_df['supplier_name'] == '亚杉') & (chanel_df['balance'] < 0)]
  33. mf_df = chanel_df[(chanel_df['supplier_name'] == '满帆起航') & (chanel_df['balance'] <= -45000)]
  34. zr_df = chanel_df[(chanel_df['supplier_name'] == '兆蓉') & (chanel_df['balance'] <= 0)]
  35. zrwt_df = chanel_df[(chanel_df['supplier_name'] == '兆蓉WT') & (chanel_df['balance'] <= 5000)]
  36. yxj_df = chanel_df[(chanel_df['supplier_name'] == '易迅捷') & (chanel_df['balance'] <= 10000)]
  37. sht_df = cus_df[(cus_df['customer_id'] == 54) & (cus_df['available_balance'] < 50000)] #十荟团
  38. ylb_df = cus_df[(cus_df['customer_id'] == 47) & (cus_df['available_balance'] < 40000)] #云喇叭
  39. fql_df = cus_df[(cus_df['customer_id'] == 66) & (cus_df['available_balance'] < 20000)] #分期乐
  40. #print(fql_df)
  41. timestamp = time.time()
  42. str_time = time.strftime('%H:%M',time.localtime(timestamp))
  43. str_time_int = int(str(str_time).split(':')[-1])
  44. print(str(str_time).split(':')[-1])
  45. ins_sql = """ INSERT INTO balance_monitoring VALUES(0,%s,%s,1,%s) """
  46. msg = str_time + ': {} 余额为 {},请及时充值'
  47. cus_msg = str_time + ': {} 可用额度为 {};额度不足,请及时处理'
  48. ord_msg = str_time + ': 超过1小时订单总数量为:{},手机号为:\n {}'
  49. sup_msg = str_time + ':超过30分钟无成功的通道:\n{}'
  50. #print(ord_df)
  51. #print(sup_df['channel_id'])
  52. #mon_cursor = mon_db.cursor()
  53. try:
  54. #print(3333)
  55. mon_cursor = mon_db.cursor()
  56. #print(mon_cursor)
  57. if(mf_df.empty is False):
  58. #print(444)
  59. sup_name = mf_df['supplier_name'].values[0]
  60. balance = mf_df['balance'].values[0]
  61. mf_msg = msg.format(sup_name,balance)
  62. mon_cursor.execute(ins_sql, (mf_msg,group_name1,int(timestamp)))
  63. #print(444)
  64. if(zr_df.empty is False):
  65. sup_name = zr_df['supplier_name'].values[0]
  66. balance = zr_df['balance'].values[0]
  67. zr_msg = msg.format(sup_name,balance)
  68. mon_cursor.execute(ins_sql, (zr_msg,group_name1,int(timestamp)))
  69. if(zrwt_df.empty is False):
  70. sup_name = zrwt_df['supplier_name'].values[0]
  71. balance = zrwt_df['balance'].values[0]
  72. zrwt_msg = msg.format(sup_name,balance)
  73. mon_cursor.execute(ins_sql, (zrwt_msg,group_name1,int(timestamp)))
  74. #print(sup_name)
  75. if(yxj_df.empty is False):
  76. sup_name = yxj_df['supplier_name'].values[0]
  77. balance = yxj_df['balance'].values[0]
  78. yxj_msg = msg.format(sup_name,balance)
  79. mon_cursor.execute(ins_sql, (yxj_msg,group_name1,int(timestamp)))
  80. #print(sup_name)
  81. if((sht_df.empty is False) and (str_time_int % 30 == 0)):
  82. cus_name = sht_df['customer_name'].values[0]
  83. balance = sht_df['available_balance'].values[0]
  84. sht_msg = cus_msg.format(cus_name,balance)
  85. mon_cursor.execute(ins_sql,(sht_msg,group_name3,int(timestamp)))
  86. #print(cus_name)
  87. if((ylb_df.empty is False) and (str_time_int % 30 == 0)):
  88. cus_name = ylb_df['customer_name'].values[0]
  89. balance = ylb_df['available_balance'].values[0]
  90. ylb_msg = cus_msg.format(cus_name,balance)
  91. mon_cursor.execute(ins_sql,(ylb_msg,group_name3,int(timestamp)))
  92. #print(222)
  93. if((fql_df.empty is False) and (str_time_int % 30 == 0)):
  94. cus_name = fql_df['customer_name'].values[0]
  95. balance = fql_df['available_balance'].values[0]
  96. fql_msg = cus_msg.format(cus_name,balance)
  97. mon_cursor.execute(ins_sql,(fql_msg,group_name3,int(timestamp)))
  98. #print('aaa')
  99. #print(str_time_int % 10)
  100. if((ord_df.empty is False) and (str_time_int % 10 == 0)):
  101. print(len(ord_df))
  102. total_num = ord_df.shape[0]
  103. phone_list = []
  104. if total_num >10:
  105. phone_list = ord_df['used_mobile'].head(10).tolist()
  106. else:
  107. phone_list = ord_df['used_mobile'].values.tolist()
  108. #print(total_num)
  109. #print(phone_list)
  110. ord_msg1 = ord_msg.format(total_num,phone_list)
  111. #print(ord_msg1)
  112. mon_cursor.execute(ins_sql,(ord_msg1,group_name4,int(timestamp)))
  113. logging.info(ord_msg1)
  114. #sup_total_list = ['83','84','85','89','90','91','95','104','105','116','117','118','119','120','121','122','123']
  115. sup_total_list = ['83','84','85','89','90','91','95','104','105','116','117','121','122','123']
  116. sup_list = list(sup_df['channel_id'].values)
  117. #print(sup_list)
  118. dif_list = [i for i in sup_total_list if i not in sup_list]
  119. #print(dif_list)
  120. if (len(dif_list) > 0) and (str_time_int % 10 == 0):
  121. for i in range(len(dif_list)):
  122. if dif_list[i] == '83':
  123. dif_list[i] = '满帆移动网厅'
  124. elif dif_list[i] == '84':
  125. dif_list[i] = '满帆联通网厅'
  126. elif dif_list[i] == '85':
  127. dif_list[i] = '满帆电信网厅'
  128. elif dif_list[i] == '89':
  129. dif_list[i] = '兆蓉移动'
  130. elif dif_list[i] == '90':
  131. dif_list[i] = '兆蓉联通'
  132. elif dif_list[i] == '91':
  133. dif_list[i] = '兆蓉电信'
  134. elif dif_list[i] == '95':
  135. dif_list[i] = '兆蓉移动网厅'
  136. elif dif_list[i] == '104':
  137. dif_list[i] = '亚杉移动网厅'
  138. elif dif_list[i] == '105':
  139. dif_list[i] = '兆蓉电信网厅'
  140. elif dif_list[i] == '116':
  141. dif_list[i] = '兆蓉联通网厅'
  142. elif dif_list[i] == '117':
  143. dif_list[i] = '亚杉电信网厅'
  144. elif dif_list[i] == '118':
  145. dif_list[i] = '智信全国移动'
  146. elif dif_list[i] == '119':
  147. dif_list[i] = '智信全联通'
  148. elif dif_list[i] == '120':
  149. dif_list[i] = '智信全国电信'
  150. elif dif_list[i] == '121':
  151. dif_list[i] = '易迅捷移动网厅'
  152. elif dif_list[i] == '122':
  153. dif_list[i] = '易迅捷电信网厅'
  154. elif dif_list[i] == '123':
  155. dif_list[i] = '易迅捷联通网厅'
  156. dif_list = sorted(dif_list)
  157. #print(dif_list)
  158. sup_msg1 = sup_msg.format(dif_list)
  159. mon_cursor.execute(ins_sql,(sup_msg1,group_name2,int(timestamp)))
  160. logging.info(sup_msg1)
  161. #print(dif_list)
  162. except:
  163. #mon_db.rollback()
  164. print('数据回滚')
  165. finally:
  166. mon_cursor.close
  167. mon_db.close
  168. if __name__ == '__main__':
  169. #数据库连接信息
  170. chanel_db_config = {
  171. 'host' : '47.95.217.180',
  172. 'port' : 3306,
  173. 'user' : 'root',
  174. 'password' : '93DkChZMgZRyCbWh',
  175. 'db' : 'fmp',
  176. 'charset' : 'utf8',
  177. 'autocommit' : 1
  178. }
  179. #监控消息数据库连接信息
  180. mon_db_config = {
  181. 'host' : '127.0.0.1',
  182. 'port' : 9001,
  183. 'user' : 'root',
  184. 'password' : 'nibuzhidaowozhidao',
  185. 'db' : 'monitoring',
  186. 'charset' : 'utf8',
  187. 'autocommit' : 1
  188. }
  189. #数据库连接
  190. #db = conMysql(mysql_host,mysql_port,mysql_user,mysql_password,mysql_db)
  191. chanel_db = createPool(chanel_db_config).connection()
  192. mon_db = createPool(mon_db_config).connection()
  193. #通道余额
  194. chanel_sql = '''SELECT supplier_name,balance FROM channel_supplier
  195. WHERE
  196. supplier_name like '%兆蓉%' OR supplier_name like '%满帆%' OR supplier_name like '%亚杉%'
  197. OR supplier_name like '%易迅捷%'
  198. '''
  199. #客户余额
  200. cus_sql = '''
  201. SELECT customer_id,customer_name,(balance + credit_amount - current_amount) 'available_balance'
  202. FROM customer_info
  203. WHERE customer_id IN(47,54)
  204. '''
  205. #超过1小时未处理订单
  206. ord_sql = ''' SELECT used_mobile,(UNIX_TIMESTAMP(now()) - UNIX_TIMESTAMP(apply_date)) 'used_time'
  207. FROM flow_order_info
  208. WHERE
  209. (UNIX_TIMESTAMP(now()) - UNIX_TIMESTAMP(apply_date)) >=3600 AND status NOT IN(4,6)
  210. ORDER BY used_time DESC;
  211. '''
  212. #半小时之内成功的通道
  213. sup_sql = '''
  214. SELECT
  215. channel_id
  216. FROM
  217. flow_order_info
  218. WHERE
  219. (UNIX_TIMESTAMP(now()) - UNIX_TIMESTAMP(check_time)) <=1800 AND
  220. status = 6 AND
  221. channel_id NOT IN (92,93,94,96,97,98,101,102,103,107,108,109,110,111,112,113,114,115)
  222. GROUP BY channel_id;
  223. '''
  224. chanel_df = getData(chanel_sql,chanel_db)
  225. cus_df = getData(cus_sql,chanel_db)
  226. ord_df = getData(ord_sql,chanel_db)
  227. sup_df = getData(sup_sql,chanel_db)
  228. chanel_db.close
  229. #群名:
  230. group_name1 = '通道余额监控群'
  231. group_name2 = '30分钟无成功通道监控群'
  232. group_name3 = '客户授信监控群'
  233. group_name4 = '超时订单监控群'
  234. saveData()