channel_order.py 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156
  1. # -*- encoding=utf8 -*-
  2. import re
  3. import time
  4. import json
  5. import logging
  6. import pymysql
  7. import configparser
  8. import pandas as pd
  9. from DBUtils.PooledDB import PooledDB
  10. #配置输出日志格式
  11. LOG_FORMAT = '%(asctime)s %(filename)s[line:%(lineno)d] %(name)s %(levelname)s %(pathname)s %(message)s'
  12. #配置输出时间格式
  13. '''
  14. DATE_FORMAT = '%Y-%m-%d %H-%M-%S %a'
  15. logging.basicConfig(level = logging.INFO,
  16. format = LOG_FORMAT,
  17. datefmt = DATE_FORMAT,
  18. filename = r"./run.log")
  19. '''
  20. cf = configparser.RawConfigParser()
  21. #cf = configparser.ConfigParser()
  22. cf.read(r"./channel_ids.ini") # 读取配置文件,如果写文件的绝对路径,就可以不用os模块
  23. ys_id = cf.get("channel_ids","ys_ids") #亚杉id
  24. mf_id = cf.get("channel_ids","mf_ids") #满帆id
  25. zr_id = cf.get("channel_ids","zr_ids") #兆蓉id
  26. zx_id = cf.get("channel_ids","zx_ids") #智信id
  27. yxj_id = cf.get("channel_ids","yxj_ids") #易迅捷id
  28. fy_id = cf.get("channel_ids","fy_ids") #枫叶id
  29. YS_ID = json.loads(ys_id)
  30. MF_ID = json.loads(mf_id)
  31. ZR_ID = json.loads(zr_id)
  32. ZX_ID = json.loads(zx_id)
  33. YXJ_ID = json.loads(yxj_id)
  34. FY_ID = json.loads(fy_id)
  35. #通道查询db
  36. CHANNEL_DB = {
  37. 'host' : '47.95.217.180',
  38. 'port' : 3306,
  39. 'user' : 'rtech',
  40. 'password' : 'S5RE4dQ65MphYk7F',
  41. 'db' : 'fmp',
  42. 'charset' : 'utf8'
  43. #'autocommit' : 1
  44. }
  45. #监控消息数据库连接信息
  46. MONITOR_DB = {
  47. 'host' : '127.0.0.1',
  48. 'port' : 9001,
  49. 'user' : 'root',
  50. 'password' : 'nibuzhidaowozhidao',
  51. 'db' : 'monitoring',
  52. 'charset' : 'utf8',
  53. 'autocommit' : 1
  54. }
  55. #创建数据库连接池
  56. def createPool(db_config):
  57. spool = PooledDB(pymysql, 5, **db_config)
  58. return spool
  59. def get_channel_data():
  60. #conn = createPool(CHANNEL_DB).connection()
  61. sql = """SELECT trans_id,mobile_no,(UNIX_TIMESTAMP(now()) - UNIX_TIMESTAMP(create_date))/60 use_time
  62. FROM
  63. mobile_flow_dispatch_rec
  64. WHERE
  65. UNIX_TIMESTAMP(now())-UNIX_TIMESTAMP(create_date) >= 600 AND send_status = 1
  66. ORDER BY use_time DESC ;"""
  67. df = pd.read_sql(sql,channel_conn)
  68. df['use_time'] = df['use_time'].astype(int)
  69. #print(df)
  70. #conn.close()
  71. return df
  72. def save_monitor_data(df,channel_ids,group_name,group_status):
  73. timestamp = time.time()
  74. str_time = time.strftime('%H:%M',time.localtime(timestamp))
  75. #str_time_int = int(str(str_time).split(':')[-1])
  76. #print(str(str_time).split(':')[-1])
  77. ins_sql = " INSERT INTO mon_channel_overtime_order VALUES(0,%s,%s,%s,1,%s) "
  78. que_sql = " SELECT id FROM mon_channel_overtime_order WHERE group_name = '{}' "
  79. upd_sql = " UPDATE mon_channel_overtime_order SET msg = %s,status = 1,timestamp = %s WHERE id = %s"
  80. msg = str_time + ": 超过10分钟未返回状态的订单总数量为:{} ,手机号为:{};卡单最长时间为:{} 分钟。 "
  81. #print(df)
  82. channel_df = df[df['trans_id'].isin(channel_ids)]
  83. #print(channel_df)
  84. if channel_df.empty is False:
  85. total_num = channel_df.shape[0]
  86. temp_msg = ''
  87. if total_num == 1:
  88. temp_msg = str_time + """: 超过10分钟未返回状态的订单总数量为:{} ,手机号为:{},卡单时间为:{} 分钟
  89. """.format(total_num,channel_df['mobile_no'].iloc[0],channel_df['use_time'].iloc[0])
  90. elif 1 < total_num < 10 :
  91. temp_msg = msg.format(total_num,channel_df['mobile_no'].values.tolist(),channel_df['use_time'].iloc[0])
  92. else:
  93. phone_list = channel_df['mobile_no'].head(10).tolist()
  94. temp_msg = msg.format(total_num,phone_list,channel_df['use_time'].iloc[0])
  95. #print(temp_msg)
  96. que_df = pd.read_sql(que_sql.format(group_name),monitor_conn)
  97. cursor = monitor_conn.cursor()
  98. if que_df.empty is True:
  99. cursor.execute(ins_sql,(temp_msg,group_status,group_name,int(timestamp)))
  100. else:
  101. cursor.execute(upd_sql,(temp_msg,int(timestamp),int(que_df['id'].iloc[0])))
  102. cursor.close()
  103. if __name__ == '__main__':
  104. channel_conn = createPool(CHANNEL_DB).connection()
  105. monitor_conn = createPool(MONITOR_DB).connection()
  106. ys_name = '亚杉话费下游对接'
  107. mf_name = '满帆'
  108. zr_name = '兆蓉'
  109. zx_name = '智信'
  110. yxj_name = '易迅捷'
  111. fy_name = '枫叶'
  112. df = get_channel_data()
  113. save_monitor_data(df,YS_ID,ys_name,2)
  114. save_monitor_data(df,MF_ID,mf_name,1)
  115. save_monitor_data(df,ZR_ID,zr_name,1)
  116. save_monitor_data(df,ZX_ID,zx_name,1)
  117. save_monitor_data(df,YXJ_ID,yxj_name,1)
  118. save_monitor_data(df,FY_ID,fy_name,1)
  119. channel_conn.close()
  120. monitor_conn.close()