channel_use_time.py 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182
  1. #-*- coding: utf-8 -*-
  2. #__author__ = "dukun"
  3. import json
  4. import time
  5. import pymysql
  6. import logging
  7. import datetime
  8. import numpy as np
  9. import pandas as pd
  10. import configparser
  11. from celery_tasks import cel
  12. #from DBUtils.PooledDB import PooledDB
  13. from dbutils.pooled_db import PooledDB
  14. #配置输出日志格式
  15. LOG_FORMAT = "%(asctime)s %(filename)s[line:%(lineno)d] %(name)s %(levelname)s %(pathname)s %(message)s "
  16. #配置输出时间的格式,注意月份和天数不要搞乱了
  17. DATE_FORMAT = '%Y-%m-%d %H:%M:%S %a '
  18. logging.basicConfig(level=logging.INFO,
  19. format=LOG_FORMAT,
  20. datefmt = DATE_FORMAT ,
  21. filename=r"./logs/channel.log" #有了filename参数就不会直接输出显示到控制台,而是直接写入文件
  22. )
  23. cf = configparser.RawConfigParser()
  24. #cf = configparser.ConfigParser()
  25. cf.read(r"./config.ini") # 读取配置文件,如果写文件的绝对路径,就可以不用os模块
  26. CHANNEL_IDS = json.loads(cf.get("channel_ids","channel_ids"))
  27. CHANNEL_HOST = cf.get("database1","host")
  28. CHANNEL_PORT = int(cf.get("database1","port"))
  29. CHANNEL_USER = cf.get("database1","user")
  30. CHANNEL_PASSWORD = cf.get("database1","password")
  31. CHANNEL_DB = cf.get("database1","db")
  32. STAT_HOST = cf.get("database2","host")
  33. STAT_PORT = int(cf.get("database2","port"))
  34. STAT_USER = cf.get("database2","user")
  35. STAT_PASSWORD = cf.get("database2","password")
  36. STAT_DB = cf.get("database2","db")
  37. #数据库连接信息
  38. channel_db_config = {
  39. 'host' : CHANNEL_HOST,
  40. 'port' : CHANNEL_PORT,
  41. 'user' : CHANNEL_USER,
  42. 'password' : CHANNEL_PASSWORD,
  43. 'db' : CHANNEL_DB,
  44. 'charset' : 'utf8',
  45. 'autocommit' : 1
  46. }
  47. stat_db_config = {
  48. 'host' : STAT_HOST,
  49. 'port' : STAT_PORT,
  50. 'user' : STAT_USER,
  51. 'password' : STAT_PASSWORD,
  52. 'db' : STAT_DB,
  53. 'charset' : 'utf8',
  54. 'autocommit' : 1
  55. }
  56. #创建数据库连接池
  57. def createPool(db_config):
  58. spool = PooledDB(pymysql, 5, **db_config)
  59. return spool
  60. @cel.task
  61. def get_channel_use_time():
  62. conn = createPool(channel_db_config).connection()
  63. stat_conn = createPool(stat_db_config).connection()
  64. channel_sql = """SELECT trans_id,channel_name,flow_amount,use_time,status,cnt
  65. FROM
  66. (SELECT trans_id, flow_amount,SUM(UNIX_TIMESTAMP(modify_date) - UNIX_TIMESTAMP(create_date)) use_time,
  67. send_status status, COUNT(*) cnt
  68. FROM
  69. mobile_flow_dispatch_rec
  70. WHERE UNIX_TIMESTAMP(now())-UNIX_TIMESTAMP(create_date) <= (2 * 3600)
  71. GROUP BY trans_id,status) t1 LEFT JOIN access_channel_info
  72. ON access_channel_info.channel_seq_id = t1.trans_id
  73. ORDER BY channel_name ; """
  74. ins_sql = """ INSERT INTO channel_use_time VALUES(0,%s,%s,%s,%s,%s,%s,%s) """
  75. df = pd.read_sql(channel_sql,conn)
  76. #print(df)
  77. #channel_ids = CHANNEL_IDS
  78. ins_list = []
  79. faces = [30,50,100,200,300,500]
  80. date = datetime.datetime.now().strftime("%Y-%m-%d %H:%M")
  81. for channel_id in CHANNEL_IDS:
  82. channel_df = df[df['trans_id'] == channel_id]
  83. if channel_df.empty is True:
  84. continue
  85. channel_name = channel_df['channel_name'].iloc[0]
  86. operator,channel_name = get_operator(channel_name)
  87. for face in faces:
  88. face_df = channel_df[channel_df['flow_amount'] == face]
  89. if face_df.empty is True:
  90. continue
  91. succ_df = face_df[face_df['status'] == '2']
  92. fail_df = face_df[face_df['status'] == '4']
  93. #成功平均时长
  94. if succ_df.empty is False:
  95. succ_time = succ_df['use_time'].iloc[0]
  96. #print(succ_time)
  97. succ_count = succ_df['cnt'].iloc[0]
  98. succ_avg_time = float(succ_time/succ_count)
  99. else:
  100. succ_avg_time = 0
  101. if fail_df.empty is False:
  102. fail_time = fail_df['use_time'].iloc[0]
  103. fail_count = fail_df['cnt'].iloc[0]
  104. fail_avg_time = float(fail_time/fail_count)
  105. else:
  106. fail_avg_time = 0
  107. total_df = face_df.groupby('trans_id').agg({'use_time' : 'sum','cnt' : 'sum'})
  108. #print(total_df)
  109. total_time = total_df['use_time'].iloc[0]
  110. total_count = total_df['cnt'].iloc[0]
  111. total_avg_time = float(total_time / total_count)
  112. #print(total_avg_time)
  113. #print(succ_avg_time)
  114. #print(fail_avg_time)
  115. ins_list.append((date,channel_id,operator,channel_name,face,1,total_avg_time)) #合计平均时长
  116. ins_list.append((date,channel_id,operator,channel_name,face,2,succ_avg_time)) #成功平均时长
  117. ins_list.append((date,channel_id,operator,channel_name,face,3,fail_avg_time)) #失败平均时长
  118. stat_cursor = stat_conn.cursor()
  119. stat_cursor.executemany(ins_sql,ins_list)
  120. stat_conn.rollback()
  121. #print('数据回滚')
  122. conn.close()
  123. stat_conn.close()
  124. print(3333333333333333)
  125. def get_operator(channel_name):
  126. operator = '移动'
  127. if '移动' in channel_name:
  128. operator = '移动'
  129. elif '联通' in channel_name:
  130. operator = '联通'
  131. elif '电信' in channel_name:
  132. operator = '电信'
  133. channel_name = channel_name.replace(operator,"")
  134. return operator,channel_name
  135. get_channel_use_time()