#-*- coding: utf-8 -*- #__author__ = "dukun" import time import pymysql import logging import datetime import numpy as np import pandas as pd from celery_tasks import cel #from DBUtils.PooledDB import PooledDB from dbutils.pooled_db import PooledDB #配置输出日志格式 LOG_FORMAT = "%(asctime)s %(filename)s[line:%(lineno)d] %(name)s %(levelname)s %(pathname)s %(message)s " #配置输出时间的格式,注意月份和天数不要搞乱了 DATE_FORMAT = '%Y-%m-%d %H:%M:%S %a ' logging.basicConfig(level=logging.INFO, format=LOG_FORMAT, datefmt = DATE_FORMAT , filename=r"./logs/channel.log" #有了filename参数就不会直接输出显示到控制台,而是直接写入文件 ) #数据库连接信息 channel_db_config = { 'host' : '47.95.217.180', 'port' : 3306, 'user' : 'root', 'password' : '93DkChZMgZRyCbWh', 'db' : 'fmp', 'charset' : 'utf8', 'autocommit' : 1 } stat_db_config = { 'host' : '127.0.0.1', 'port' : 3306, 'user' : 'root', 'password' : 'nibuzhidaowozhidao', 'db' : 'statistical', 'charset' : 'utf8', 'autocommit' : 1 } #创建数据库连接池 def createPool(db_config): spool = PooledDB(pymysql, 5, **db_config) return spool @cel.task def get_channel_use_time(): conn = createPool(channel_db_config).connection() stat_conn = createPool(stat_db_config).connection() channel_sql = """SELECT trans_id,channel_name,use_time,status,cnt FROM (SELECT trans_id, SUM(UNIX_TIMESTAMP(modify_date) - UNIX_TIMESTAMP(create_date)) use_time,send_status status, COUNT(*) cnt FROM mobile_flow_dispatch_rec WHERE UNIX_TIMESTAMP(now())-UNIX_TIMESTAMP(create_date) <= (2 * 3600) GROUP BY trans_id,status) t1 LEFT JOIN access_channel_info ON access_channel_info.channel_seq_id = t1.trans_id ORDER BY channel_name ; """ ins_sql = """ INSERT INTO channel_use_time VALUES(0,%s,%s,%s,%s,%s) """ df = pd.read_sql(channel_sql,conn) #print(df) channel_ids = ['83','84','85','89','90','91','95','104','105','116','117','118','119','120', '121','122','123','124','125','126','127','128'] ins_list = [] date = datetime.datetime.now().strftime("%Y-%m-%d %H:%M") for channel_id in channel_ids: channel_df = df[df['trans_id'] == channel_id] if channel_df.empty is True: continue channel_name = channel_df['channel_name'].iloc[0] succ_df = channel_df[channel_df['status'] == '2'] fail_df = channel_df[channel_df['status'] == '4'] #成功平均时长 if succ_df.empty is False: succ_time = succ_df['use_time'].iloc[0] #print(succ_time) succ_count = succ_df['cnt'].iloc[0] succ_avg_time = float(succ_time/succ_count) else: succ_avg_time = 0 if fail_df.empty is False: fail_time = fail_df['use_time'].iloc[0] fail_count = fail_df['cnt'].iloc[0] fail_avg_time = float(fail_time/fail_count) else: fail_avg_time = 0 total_df = channel_df.groupby('trans_id').agg({'use_time' : 'sum','cnt' : 'sum'}) total_time = total_df['use_time'].iloc[0] total_count = total_df['cnt'].iloc[0] total_avg_time = float(total_time / total_count) #print(total_avg_time) #print(succ_avg_time) #print(fail_avg_time) ins_list.append((date,channel_id,channel_name,1,total_avg_time)) #合计平均时长 ins_list.append((date,channel_id,channel_name,2,succ_avg_time)) #成功平均时长 ins_list.append((date,channel_id,channel_name,3,fail_avg_time)) #失败平均时长 stat_cursor = stat_conn.cursor() stat_cursor.executemany(ins_sql,ins_list) stat_conn.rollback() #print('数据回滚') conn.close() stat_conn.close() print(3333333333333333) #get_channel_use_time()