123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253 |
- #-*- coding: utf-8 -*-
- #__author__ = "dukun"
- import json
- import time
- import pymysql
- import logging
- import datetime
- import numpy as np
- import pandas as pd
- import configparser
- from celery_tasks import cel
- #from DBUtils.PooledDB import PooledDB
- from dbutils.pooled_db import PooledDB
- #配置输出日志格式
- LOG_FORMAT = "%(asctime)s %(filename)s[line:%(lineno)d] %(name)s %(levelname)s %(pathname)s %(message)s "
- #配置输出时间的格式,注意月份和天数不要搞乱了
- DATE_FORMAT = '%Y-%m-%d %H:%M:%S %a '
- logging.basicConfig(level=logging.INFO,
- format=LOG_FORMAT,
- datefmt = DATE_FORMAT ,
- filename=r"./logs/channel.log" #有了filename参数就不会直接输出显示到控制台,而是直接写入文件
- )
- cf = configparser.RawConfigParser()
- #cf = configparser.ConfigParser()
- cf.read(r"./config.ini") # 读取配置文件,如果写文件的绝对路径,就可以不用os模块
- CHANNEL_IDS = json.loads(cf.get("channel_ids","channel_ids"))
- CHANNEL_HOST = cf.get("database1","host")
- CHANNEL_PORT = int(cf.get("database1","port"))
- CHANNEL_USER = cf.get("database1","user")
- CHANNEL_PASSWORD = cf.get("database1","password")
- CHANNEL_DB = cf.get("database1","db")
- STAT_HOST = cf.get("database2","host")
- STAT_PORT = int(cf.get("database2","port"))
- STAT_USER = cf.get("database2","user")
- STAT_PASSWORD = cf.get("database2","password")
- STAT_DB = cf.get("database2","db")
- #通道数据库
- channel_db_config = {
- 'host' : CHANNEL_HOST,
- 'port' : CHANNEL_PORT,
- 'user' : CHANNEL_USER,
- 'password' : CHANNEL_PASSWORD,
- 'db' : CHANNEL_DB,
- 'charset' : 'utf8',
- 'autocommit' : 1
- }
- #监控数据库
- stat_db_config = {
- 'host' : STAT_HOST,
- 'port' : STAT_PORT,
- 'user' : STAT_USER,
- 'password' : STAT_PASSWORD,
- 'db' : STAT_DB,
- 'charset' : 'utf8',
- 'autocommit' : 1
- }
- #创建数据库连接池
- def createPool(db_config):
- spool = PooledDB(pymysql, 5, **db_config)
- return spool
- @cel.task
- def get_channel_total_rate():
- conn = createPool(channel_db_config).connection()
- stat_conn = createPool(stat_db_config).connection()
- #print(conn)
- channel_sql = """ SELECT
- t1.order_id,trans_id,channel_name,flow_amount,status,t1.cnt
- FROM
- (SELECT
- order_id,trans_id,flow_amount,send_status AS status,COUNT(*) AS cnt
- FROM
- mobile_flow_dispatch_rec
- WHERE
- UNIX_TIMESTAMP(now())-UNIX_TIMESTAMP(create_date) <= 1800
- AND trans_id NOT IN(92,93,94,96,97,98,101,102,103,107,108,109,110,111,112,113,114,115)
- GROUP BY
- order_id,flow_amount,status )t1 LEFT JOIN access_channel_info
- ON t1.trans_id = access_channel_info.channel_seq_id"""
- ins_sql = 'INSERT INTO channel_success_rate VALUES(0,%s,%s,%s,%s,%s,%s,%s)'
- df = pd.read_sql(channel_sql,con = conn)
- df['flow_amount'] = df['flow_amount'].astype(int)
- faces = [30,50,100,200,300,500]
- rate_list = []
- date = datetime.datetime.now().strftime("%Y-%m-%d %H:%M")
- for channel_id in CHANNEL_IDS:
- channel_df = df[df['trans_id'] == channel_id]
- if channel_df.empty is True:
- continue
-
- channel_name = channel_df['channel_name'].iloc[0]
- operator,channel_name = get_operator(channel_name)
- for face in faces:
- face_df = channel_df[channel_df['flow_amount'] == face]
- if face_df.empty is True:
- continue
- total_count = face_df.groupby('order_id').size().shape[0]
- succ_count = face_df[face_df['status'] == '2'].shape[0]
- if total_count > 0 :
- rate = succ_count / total_count
- else:
- rate = 0
- rate_list.append((date,channel_id,operator,channel_name,face,100,rate))
- try:
- stat_cursor = stat_conn.cursor()
- stat_cursor.executemany(ins_sql,rate_list)
- except:
- stat_conn.rollback()
- print('数据回滚')
- #print(rate_list)
- print(222222222222222222)
- conn.close()
- stat_conn.close()
- @cel.task
- def get_channel_rate():
- conn = createPool(channel_db_config).connection()
- stat_conn = createPool(stat_db_config).connection()
- #print(conn)
- channel_sql = """ SELECT
- t1.order_id,trans_id,channel_name,flow_amount,status,t1.cnt
- FROM
- (SELECT
- order_id,trans_id,flow_amount,send_status AS status,COUNT(*) AS cnt
- FROM
- mobile_flow_dispatch_rec
- WHERE
- UNIX_TIMESTAMP(now())-UNIX_TIMESTAMP(create_date) <= (3 * 3600)
- AND trans_id NOT IN(92,93,94,96,97,98,101,102,103,107,108,109,110,111,112,113,114,115)
- GROUP BY
- order_id,flow_amount,status )t1 LEFT JOIN access_channel_info
- ON t1.trans_id = access_channel_info.channel_seq_id"""
- ins_sql = 'INSERT INTO channel_success_rate VALUES(0,%s,%s,%s,%s,%s,%s,%s)'
- faces = [30,50,100,200,300,500]
- df = pd.read_sql(channel_sql,con = conn)
- df['flow_amount'] = df['flow_amount'].astype(int)
- #df['cnt'] = df['cnt'].astype(int)
- #print(df)
- sucess_rate = []
- for channel_id in CHANNEL_IDS:
- channel_df = df[df['trans_id'] == channel_id]
- if channel_df.empty is True:
- continue
- channel_name = channel_df['channel_name'].iloc[0]
- operator,channel_name = get_operator(channel_name)
- #print(channel_name)
- for face in faces:
- face_df = channel_df[channel_df['flow_amount'] == face]
- if face_df.empty is True:
- continue
- #print(face_df)
- total_count = face_df.groupby('order_id').size().shape[0]
- #print(total_count)
- #print(channel_df)
- order_ids = face_df.groupby('order_id').count().index
- #print(order_ids)
- suc_count1 = 0
- suc_count2 = 0
- suc_count3 = 0
- for order_id in order_ids:
- #print(order_id)
- la_df = face_df[face_df['order_id'] == order_id]
- if la_df[la_df['status'] == '2'].empty is False:
- #print(la_df)
- if la_df[la_df['status'] == '4'].empty is True:
- suc_count1 += 1
- elif la_df[la_df['status'] == '4']['cnt'].iloc[0] == 1:
- suc_count2 += 1
- elif la_df[la_df['status'] == '4']['cnt'].iloc[0] == 2:
- suc_count3 += 1
-
- date = datetime.datetime.now().strftime("%Y-%m-%d %H:%M")
- #print(date)
- suc_rate1 = suc_count1 / total_count
-
- total_count2 = total_count - suc_count1
- if total_count2 > 0 :
- suc_rate2 = suc_count2 / total_count2
- else:
- suc_rate2 = 0
- total_count3 = total_count2 - suc_count3
- if total_count3 > 0 :
- suc_rate3 = suc_count3 / total_count3
- else:
- suc_rate3 = 0
-
- sucess_rate.append((date,channel_id,operator,channel_name,face,1,suc_rate1))
- sucess_rate.append((date,channel_id,operator,channel_name,face,2,suc_rate2 ))
- sucess_rate.append((date,channel_id,operator,channel_name,face,3,suc_rate3 ))
- try:
- stat_cursor = stat_conn.cursor()
- stat_cursor.executemany(ins_sql,sucess_rate)
- except:
- stat_conn.rollback()
- print('数据回滚')
- print(11111111111111111)
- #print(sucess_rate)
- conn.close()
- stat_conn.close()
-
- def get_operator(channel_name):
- operator = '移动'
- if '移动' in channel_name:
- operator = '移动'
- elif '联通' in channel_name:
- operator = '联通'
- elif '电信' in channel_name:
- operator = '电信'
- channel_name = channel_name.replace(operator,"")
- return operator,channel_name
- #get_channel_total_rate()
- #get_channel_rate()
- #if __name__ == '__main__':
|