channel_order.py 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179
  1. # -*- encoding=utf8 -*-
  2. import re
  3. import time
  4. import json
  5. import logging
  6. import pymysql
  7. import configparser
  8. import pandas as pd
  9. from DBUtils.PooledDB import PooledDB
  10. #配置输出日志格式
  11. LOG_FORMAT = '%(asctime)s %(filename)s[line:%(lineno)d] %(name)s %(levelname)s %(pathname)s %(message)s'
  12. #配置输出时间格式
  13. '''
  14. DATE_FORMAT = '%Y-%m-%d %H-%M-%S %a'
  15. logging.basicConfig(level = logging.INFO,
  16. format = LOG_FORMAT,
  17. datefmt = DATE_FORMAT,
  18. filename = r"./run.log")
  19. '''
  20. '''
  21. cf = configparser.RawConfigParser()
  22. #cf = configparser.ConfigParser()
  23. cf.read(r"./channel_ids11.ini") # 读取配置文件,如果写文件的绝对路径,就可以不用os模块
  24. ys_id = cf.get("channel_ids","ys_ids") #亚杉id
  25. mf_id = cf.get("channel_ids","mf_ids") #满帆id
  26. zr_id = cf.get("channel_ids","zr_ids") #兆蓉id
  27. zx_id = cf.get("channel_ids","zx_ids") #智信id
  28. yxj_id = cf.get("channel_ids","yxj_ids") #易迅捷id
  29. fy_id = cf.get("channel_ids","fy_ids") #枫叶id
  30. lz_id = cf.get("channel_ids","lz_ids") #枫叶id
  31. hy_id = cf.get("channel_ids","hy_ids") #枫叶id
  32. YS_ID = json.loads(ys_id)
  33. MF_ID = json.loads(mf_id)
  34. ZR_ID = json.loads(zr_id)
  35. ZX_ID = json.loads(zx_id)
  36. YXJ_ID = json.loads(yxj_id)
  37. FY_ID = json.loads(fy_id)
  38. LZ_ID = json.loads(lz_id)
  39. HY_ID = json.loads(hy_id)
  40. '''
  41. #通道查询db
  42. CHANNEL_DB = {
  43. 'host' : '47.95.217.180',
  44. 'port' : 3306,
  45. 'user' : 'rtech',
  46. 'password' : 'S5RE4dQ65MphYk7F',
  47. 'db' : 'fmp',
  48. 'charset' : 'utf8'
  49. #'autocommit' : 1
  50. }
  51. #监控消息数据库连接信息
  52. MONITOR_DB = {
  53. 'host' : '127.0.0.1',
  54. 'port' : 9001,
  55. 'user' : 'root',
  56. 'password' : 'nibuzhidaowozhidao',
  57. 'db' : 'monitoring',
  58. 'charset' : 'utf8',
  59. 'autocommit' : 1
  60. }
  61. #创建数据库连接池
  62. def createPool(db_config):
  63. spool = PooledDB(pymysql, 5, **db_config)
  64. return spool
  65. def get_channel_data():
  66. #conn = createPool(CHANNEL_DB).connection()
  67. sql = """SELECT trans_id,mobile_no,(UNIX_TIMESTAMP(now()) - UNIX_TIMESTAMP(create_date))/60 use_time
  68. FROM
  69. mobile_flow_dispatch_rec
  70. WHERE
  71. UNIX_TIMESTAMP(now())-UNIX_TIMESTAMP(create_date) >= 600 AND send_status = 1
  72. ORDER BY use_time DESC ;"""
  73. df = pd.read_sql(sql,channel_conn)
  74. df['use_time'] = df['use_time'].astype(int)
  75. #print(df)
  76. #conn.close()
  77. return df
  78. def save_monitor_data(df,channel_ids,group_name,group_status):
  79. timestamp = time.time()
  80. str_time = time.strftime('%H:%M',time.localtime(timestamp))
  81. #str_time_int = int(str(str_time).split(':')[-1])
  82. #print(str(str_time).split(':')[-1])
  83. ins_sql = " INSERT INTO mon_channel_overtime_order VALUES(0,%s,%s,%s,1,%s) "
  84. que_sql = " SELECT id FROM mon_channel_overtime_order WHERE group_name = '{}' "
  85. upd_sql = " UPDATE mon_channel_overtime_order SET msg = %s,status = 1,timestamp = %s WHERE id = %s"
  86. msg = str_time + ": 超过10分钟未返回状态的订单总数量为:{} ,手机号为:{};卡单最长时间为:{} 分钟。 "
  87. #print(df)
  88. channel_df = df[df['trans_id'].isin(channel_ids)]
  89. #print(channel_df)
  90. if channel_df.empty is False:
  91. total_num = channel_df.shape[0]
  92. temp_msg = ''
  93. if total_num == 1:
  94. temp_msg = str_time + """: 超过10分钟未返回状态的订单总数量为:{} ,手机号为:{},卡单时间为:{} 分钟
  95. """.format(total_num,channel_df['mobile_no'].iloc[0],channel_df['use_time'].iloc[0])
  96. elif 1 < total_num < 10 :
  97. temp_msg = msg.format(total_num,channel_df['mobile_no'].values.tolist(),channel_df['use_time'].iloc[0])
  98. else:
  99. phone_list = channel_df['mobile_no'].head(10).tolist()
  100. temp_msg = msg.format(total_num,phone_list,channel_df['use_time'].iloc[0])
  101. #print(temp_msg)
  102. que_df = pd.read_sql(que_sql.format(group_name),monitor_conn)
  103. cursor = monitor_conn.cursor()
  104. if que_df.empty is True:
  105. cursor.execute(ins_sql,(temp_msg,group_status,group_name,int(timestamp)))
  106. else:
  107. cursor.execute(upd_sql,(temp_msg,int(timestamp),int(que_df['id'].iloc[0])))
  108. cursor.close()
  109. if __name__ == '__main__':
  110. channel_conn = createPool(CHANNEL_DB).connection()
  111. monitor_conn = createPool(MONITOR_DB).connection()
  112. '''
  113. ys_name = '亚杉话费下游对接'
  114. mf_name = '满帆起航(供)-蓝色火焰'
  115. zr_name = '兆蓉-州宇(蓝色火焰)'
  116. zx_name = '智信-蓝色火焰'
  117. yxj_name = '易迅捷&蓝色火焰'
  118. fy_name = '枫叶-蓝色火焰'
  119. lz_name = '立纵-蓝色火焰'
  120. hy_name = '蓝色火焰-华翼科技'
  121. df = get_channel_data()
  122. save_monitor_data(df,YS_ID,ys_name,2)
  123. save_monitor_data(df,MF_ID,mf_name,1)
  124. save_monitor_data(df,ZR_ID,zr_name,1)
  125. save_monitor_data(df,ZX_ID,zx_name,1)
  126. save_monitor_data(df,YXJ_ID,yxj_name,1)
  127. save_monitor_data(df,FY_ID,fy_name,1)
  128. save_monitor_data(df,LZ_ID,lz_name,1)
  129. save_monitor_data(df,HY_ID,hy_name,1)
  130. '''
  131. df = get_channel_data()
  132. with open("./channels.json","r") as fp:
  133. channels = fp.read()
  134. channels_json = json.loads(channels)
  135. #print(channels_json)
  136. for item in channels_json:
  137. channel_ids = item["channel_ids"]
  138. group_name = item["group_name"]
  139. group_type = item["type"]
  140. #print(group_name)
  141. save_monitor_data(df,channel_ids,group_name,group_type)
  142. channel_conn.close()
  143. monitor_conn.close()