#-*- coding: utf-8 -*- #__author__ = "dukun" import time import pymysql import logging import datetime import numpy as np import pandas as pd from celery_tasks import cel #from DBUtils.PooledDB import PooledDB from dbutils.pooled_db import PooledDB #配置输出日志格式 LOG_FORMAT = "%(asctime)s %(filename)s[line:%(lineno)d] %(name)s %(levelname)s %(pathname)s %(message)s " #配置输出时间的格式,注意月份和天数不要搞乱了 DATE_FORMAT = '%Y-%m-%d %H:%M:%S %a ' logging.basicConfig(level=logging.INFO, format=LOG_FORMAT, datefmt = DATE_FORMAT , filename=r"./logs/channel.log" #有了filename参数就不会直接输出显示到控制台,而是直接写入文件 ) #数据库连接信息 channel_db_config = { 'host' : '47.95.217.180', 'port' : 3306, 'user' : 'root', 'password' : '93DkChZMgZRyCbWh', 'db' : 'fmp', 'charset' : 'utf8', 'autocommit' : 1 } stat_db_config = { 'host' : '127.0.0.1', 'port' : 3306, 'user' : 'root', 'password' : 'nibuzhidaowozhidao', 'db' : 'statistical', 'charset' : 'utf8', 'autocommit' : 1 } #创建数据库连接池 def createPool(db_config): spool = PooledDB(pymysql, 5, **db_config) return spool @cel.task def get_channel_total_rate(): conn = createPool(channel_db_config).connection() stat_conn = createPool(stat_db_config).connection() #print(conn) channel_sql = """ SELECT t1.order_id,trans_id,channel_name,status,t1.cnt FROM (SELECT order_id,trans_id,send_status AS status,COUNT(*) AS cnt FROM mobile_flow_dispatch_rec WHERE UNIX_TIMESTAMP(now())-UNIX_TIMESTAMP(create_date) <= 1800 AND trans_id NOT IN(92,93,94,96,97,98,101,102,103,107,108,109,110,111,112,113,114,115) GROUP BY order_id,status )t1 LEFT JOIN access_channel_info ON t1.trans_id = access_channel_info.channel_seq_id""" ins_sql = 'INSERT INTO channel_success_rate VALUES(0,%s,%s,%s,%s,%s,%s)' df = pd.read_sql(channel_sql,con = conn) channel_ids = ['83','84','85','89','90','91','95','104','105','116','117','118','119','120', '121','122','123','124','125','126','127','128'] rate_list = [] date = datetime.datetime.now().strftime("%Y-%m-%d %H:%M") for channel_id in channel_ids: channel_df = df[df['trans_id'] == channel_id] if channel_df.empty is True: continue total_count = channel_df.groupby('order_id').size().shape[0] succ_count = channel_df[channel_df['status'] == '2'].shape[0] if total_count > 0 : rate = succ_count / total_count else: rate = 0 channel_name = channel_df['channel_name'].iloc[0] rate_list.append((date,channel_id,channel_name,-100,100,rate)) try: stat_cursor = stat_conn.cursor() stat_cursor.executemany(ins_sql,rate_list) except: stat_conn.rollback() print('数据回滚') print(222222222222222222) conn.close() stat_conn.close() @cel.task def get_channel_rate(): conn = createPool(channel_db_config).connection() stat_conn = createPool(stat_db_config).connection() #print(conn) channel_sql = """ SELECT t1.order_id,trans_id,channel_name,flow_amount,status,t1.cnt FROM (SELECT order_id,trans_id,flow_amount,send_status AS status,COUNT(*) AS cnt FROM mobile_flow_dispatch_rec WHERE UNIX_TIMESTAMP(now())-UNIX_TIMESTAMP(create_date) <= (3 * 3600) AND trans_id NOT IN(92,93,94,96,97,98,101,102,103,107,108,109,110,111,112,113,114,115) GROUP BY order_id,status )t1 LEFT JOIN access_channel_info ON t1.trans_id = access_channel_info.channel_seq_id""" ins_sql = 'INSERT INTO channel_success_rate VALUES(0,%s,%s,%s,%s,%s,%s)' channel_ids = ['83','84','85','89','90','91','95','104','105','116','117','118','119','120', '121','122','123','124','125','126','127','128'] faces = [30,50,100,200,300,500] df = pd.read_sql(channel_sql,con = conn) df['flow_amount'] = df['flow_amount'].astype(int) #df['cnt'] = df['cnt'].astype(int) #print(df) sucess_rate = [] for channel_id in channel_ids: channel_df = df[df['trans_id'] == channel_id] if channel_df.empty is True: continue channel_name = channel_df['channel_name'].iloc[0] #print(channel_name) for face in faces: face_df = channel_df[channel_df['flow_amount'] == face] if face_df.empty is True: continue #print(face_df) total_count = face_df.groupby('order_id').size().shape[0] #print(total_count) #print(channel_df) order_ids = face_df.groupby('order_id').count().index #print(order_ids) suc_count1 = 0 suc_count2 = 0 suc_count3 = 0 for order_id in order_ids: #print(order_id) la_df = face_df[face_df['order_id'] == order_id] if la_df[la_df['status'] == '2'].empty is False: #print(la_df) if la_df[la_df['status'] == '4'].empty is True: suc_count1 += 1 elif la_df[la_df['status'] == '4']['cnt'].iloc[0] == 1: suc_count2 += 1 elif la_df[la_df['status'] == '4']['cnt'].iloc[0] == 2: suc_count3 += 1 date = datetime.datetime.now().strftime("%Y-%m-%d %H:%M") #print(date) suc_rate1 = suc_count1 / total_count total_count2 = total_count - suc_count1 if total_count2 > 0 : suc_rate2 = suc_count2 / total_count2 else: suc_rate2 = 0 total_count3 = total_count2 - suc_count3 if total_count3 > 0 : suc_rate3 = suc_count3 / total_count3 else: suc_rate3 = 0 sucess_rate.append((date,channel_id,channel_name,face,1,suc_rate1)) sucess_rate.append((date,channel_id,channel_name,face,2,suc_rate2 )) sucess_rate.append((date,channel_id,channel_name,face,3,suc_rate3 )) try: stat_cursor = stat_conn.cursor() stat_cursor.executemany(ins_sql,sucess_rate) except: stat_conn.rollback() print('数据回滚') print(11111111111111111) conn.close() stat_conn.close() #get_channel_total_rate() #if __name__ == '__main__':