channel_success_rate.py 7.8 KB


  1. #-*- coding: utf-8 -*-
  2. #__author__ = "dukun"
  3. import json
  4. import time
  5. import pymysql
  6. import logging
  7. import datetime
  8. import numpy as np
  9. import pandas as pd
  10. import configparser
  11. from celery_tasks import cel
  12. #from DBUtils.PooledDB import PooledDB
  13. from dbutils.pooled_db import PooledDB
  14. #配置输出日志格式
  15. LOG_FORMAT = "%(asctime)s %(filename)s[line:%(lineno)d] %(name)s %(levelname)s %(pathname)s %(message)s "
  16. #配置输出时间的格式,注意月份和天数不要搞乱了
  17. DATE_FORMAT = '%Y-%m-%d %H:%M:%S %a '
  18. logging.basicConfig(level=logging.INFO,
  19. format=LOG_FORMAT,
  20. datefmt = DATE_FORMAT ,
  21. filename=r"./logs/channel.log" #有了filename参数就不会直接输出显示到控制台,而是直接写入文件
  22. )
  23. cf = configparser.RawConfigParser()
  24. #cf = configparser.ConfigParser()
  25. cf.read(r"./config.ini") # 读取配置文件,如果写文件的绝对路径,就可以不用os模块
  26. CHANNEL_IDS = json.loads(cf.get("channel_ids","channel_ids"))
  27. CHANNEL_HOST = cf.get("database1","host")
  28. CHANNEL_PORT = int(cf.get("database1","port"))
  29. CHANNEL_USER = cf.get("database1","user")
  30. CHANNEL_PASSWORD = cf.get("database1","password")
  31. CHANNEL_DB = cf.get("database1","db")
  32. STAT_HOST = cf.get("database2","host")
  33. STAT_PORT = int(cf.get("database2","port"))
  34. STAT_USER = cf.get("database2","user")
  35. STAT_PASSWORD = cf.get("database2","password")
  36. STAT_DB = cf.get("database2","db")
  37. #通道数据库
  38. channel_db_config = {
  39. 'host' : CHANNEL_HOST,
  40. 'port' : CHANNEL_PORT,
  41. 'user' : CHANNEL_USER,
  42. 'password' : CHANNEL_PASSWORD,
  43. 'db' : CHANNEL_DB,
  44. 'charset' : 'utf8',
  45. 'autocommit' : 1
  46. }
  47. #监控数据库
  48. stat_db_config = {
  49. 'host' : STAT_HOST,
  50. 'port' : STAT_PORT,
  51. 'user' : STAT_USER,
  52. 'password' : STAT_PASSWORD,
  53. 'db' : STAT_DB,
  54. 'charset' : 'utf8',
  55. 'autocommit' : 1
  56. }
  57. #创建数据库连接池
  58. def createPool(db_config):
  59. spool = PooledDB(pymysql, 5, **db_config)
  60. return spool
  61. @cel.task
  62. def get_channel_total_rate():
  63. conn = createPool(channel_db_config).connection()
  64. stat_conn = createPool(stat_db_config).connection()
  65. #print(conn)
  66. channel_sql = """ SELECT
  67. t1.order_id,trans_id,channel_name,flow_amount,status,t1.cnt
  68. FROM
  69. (SELECT
  70. order_id,trans_id,flow_amount,send_status AS status,COUNT(*) AS cnt
  71. FROM
  72. mobile_flow_dispatch_rec
  73. WHERE
  74. UNIX_TIMESTAMP(now())-UNIX_TIMESTAMP(create_date) <= 1800
  75. AND trans_id NOT IN(92,93,94,96,97,98,101,102,103,107,108,109,110,111,112,113,114,115)
  76. GROUP BY
  77. order_id,flow_amount,status )t1 LEFT JOIN access_channel_info
  78. ON t1.trans_id = access_channel_info.channel_seq_id"""
  79. ins_sql = 'INSERT INTO channel_success_rate VALUES(0,%s,%s,%s,%s,%s,%s,%s)'
  80. df = pd.read_sql(channel_sql,con = conn)
  81. df['flow_amount'] = df['flow_amount'].astype(int)
  82. faces = [30,50,100,200,300,500]
  83. rate_list = []
  84. date = datetime.datetime.now().strftime("%Y-%m-%d %H:%M")
  85. for channel_id in CHANNEL_IDS:
  86. channel_df = df[df['trans_id'] == channel_id]
  87. if channel_df.empty is True:
  88. continue
  89. channel_name = channel_df['channel_name'].iloc[0]
  90. operator,channel_name = get_operator(channel_name)
  91. for face in faces:
  92. face_df = channel_df[channel_df['flow_amount'] == face]
  93. if face_df.empty is True:
  94. continue
  95. total_count = face_df.groupby('order_id').size().shape[0]
  96. succ_count = face_df[face_df['status'] == '2'].shape[0]
  97. if total_count > 0 :
  98. rate = succ_count / total_count
  99. else:
  100. rate = 0
  101. rate_list.append((date,channel_id,operator,channel_name,face,100,rate))
  102. try:
  103. stat_cursor = stat_conn.cursor()
  104. stat_cursor.executemany(ins_sql,rate_list)
  105. except:
  106. stat_conn.rollback()
  107. print('数据回滚')
  108. #print(rate_list)
  109. print(222222222222222222)
  110. conn.close()
  111. stat_conn.close()
  112. @cel.task
  113. def get_channel_rate():
  114. conn = createPool(channel_db_config).connection()
  115. stat_conn = createPool(stat_db_config).connection()
  116. #print(conn)
  117. channel_sql = """ SELECT
  118. t1.order_id,trans_id,channel_name,flow_amount,status,t1.cnt
  119. FROM
  120. (SELECT
  121. order_id,trans_id,flow_amount,send_status AS status,COUNT(*) AS cnt
  122. FROM
  123. mobile_flow_dispatch_rec
  124. WHERE
  125. UNIX_TIMESTAMP(now())-UNIX_TIMESTAMP(create_date) <= (3 * 3600)
  126. AND trans_id NOT IN(92,93,94,96,97,98,101,102,103,107,108,109,110,111,112,113,114,115)
  127. GROUP BY
  128. order_id,flow_amount,status )t1 LEFT JOIN access_channel_info
  129. ON t1.trans_id = access_channel_info.channel_seq_id"""
  130. ins_sql = 'INSERT INTO channel_success_rate VALUES(0,%s,%s,%s,%s,%s,%s,%s)'
  131. faces = [30,50,100,200,300,500]
  132. df = pd.read_sql(channel_sql,con = conn)
  133. df['flow_amount'] = df['flow_amount'].astype(int)
  134. #df['cnt'] = df['cnt'].astype(int)
  135. #print(df)
  136. sucess_rate = []
  137. for channel_id in CHANNEL_IDS:
  138. channel_df = df[df['trans_id'] == channel_id]
  139. if channel_df.empty is True:
  140. continue
  141. channel_name = channel_df['channel_name'].iloc[0]
  142. operator,channel_name = get_operator(channel_name)
  143. #print(channel_name)
  144. for face in faces:
  145. face_df = channel_df[channel_df['flow_amount'] == face]
  146. if face_df.empty is True:
  147. continue
  148. #print(face_df)
  149. total_count = face_df.groupby('order_id').size().shape[0]
  150. #print(total_count)
  151. #print(channel_df)
  152. order_ids = face_df.groupby('order_id').count().index
  153. #print(order_ids)
  154. suc_count1 = 0
  155. suc_count2 = 0
  156. suc_count3 = 0
  157. for order_id in order_ids:
  158. #print(order_id)
  159. la_df = face_df[face_df['order_id'] == order_id]
  160. if la_df[la_df['status'] == '2'].empty is False:
  161. #print(la_df)
  162. if la_df[la_df['status'] == '4'].empty is True:
  163. suc_count1 += 1
  164. elif la_df[la_df['status'] == '4']['cnt'].iloc[0] == 1:
  165. suc_count2 += 1
  166. elif la_df[la_df['status'] == '4']['cnt'].iloc[0] == 2:
  167. suc_count3 += 1
  168. date = datetime.datetime.now().strftime("%Y-%m-%d %H:%M")
  169. #print(date)
  170. suc_rate1 = suc_count1 / total_count
  171. total_count2 = total_count - suc_count1
  172. if total_count2 > 0 :
  173. suc_rate2 = suc_count2 / total_count2
  174. else:
  175. suc_rate2 = 0
  176. total_count3 = total_count2 - suc_count3
  177. if total_count3 > 0 :
  178. suc_rate3 = suc_count3 / total_count3
  179. else:
  180. suc_rate3 = 0
  181. sucess_rate.append((date,channel_id,operator,channel_name,face,1,suc_rate1))
  182. sucess_rate.append((date,channel_id,operator,channel_name,face,2,suc_rate2 ))
  183. sucess_rate.append((date,channel_id,operator,channel_name,face,3,suc_rate3 ))
  184. try:
  185. stat_cursor = stat_conn.cursor()
  186. stat_cursor.executemany(ins_sql,sucess_rate)
  187. except:
  188. stat_conn.rollback()
  189. print('数据回滚')
  190. print(11111111111111111)
  191. #print(sucess_rate)
  192. conn.close()
  193. stat_conn.close()
  194. def get_operator(channel_name):
  195. operator = '移动'
  196. if '移动' in channel_name:
  197. operator = '移动'
  198. elif '联通' in channel_name:
  199. operator = '联通'
  200. elif '电信' in channel_name:
  201. operator = '电信'
  202. channel_name = channel_name.replace(operator,"")
  203. return operator,channel_name
  204. #get_channel_total_rate()
  205. #get_channel_rate()
  206. #if __name__ == '__main__':