#-*- coding: utf-8 -*- #__author__ = "dukun" import json import time import pymysql import logging import numpy as np import pandas as pd import configparser from celery_tasks import cel #from DBUtils.PooledDB import PooledDB from dbutils.pooled_db import PooledDB from datetime import datetime,timedelta,date #配置输出日志格式 LOG_FORMAT = "%(asctime)s %(filename)s[line:%(lineno)d] %(name)s %(levelname)s %(pathname)s %(message)s " #配置输出时间的格式,注意月份和天数不要搞乱了 DATE_FORMAT = '%Y-%m-%d %H:%M:%S %a ' logging.basicConfig(level=logging.INFO, format=LOG_FORMAT, datefmt = DATE_FORMAT , filename=r"./logs/manage.log" #有了filename参数就不会直接输出显示到控制台,而是直接写入文件 ) cf = configparser.RawConfigParser() cf.read(r"./config.ini") CHANNEL_HOST = cf.get("database1","host") CHANNEL_PORT = int(cf.get("database1","port")) CHANNEL_USER = cf.get("database1","user") CHANNEL_PASSWORD = cf.get("database1","password") CHANNEL_DB = cf.get("database1","db") STAT_HOST = cf.get("database2","host") STAT_PORT = int(cf.get("database2","port")) STAT_USER = cf.get("database2","user") STAT_PASSWORD = cf.get("database2","password") STAT_DB = cf.get("database2","db") #通道数据库 channel_db_config = { 'host' : CHANNEL_HOST, 'port' : CHANNEL_PORT, 'user' : CHANNEL_USER, 'password' : CHANNEL_PASSWORD, 'db' : CHANNEL_DB, 'charset' : 'utf8', 'autocommit' : 1 } #监控数据库 stat_db_config = { 'host' : STAT_HOST, 'port' : STAT_PORT, 'user' : STAT_USER, 'password' : STAT_PASSWORD, 'db' : STAT_DB, 'charset' : 'utf8', 'autocommit' : 1 } #创建数据库连接池 def createPool(db_config): spool = PooledDB(pymysql, 5, **db_config) return spool @cel.task def save_yesterday_data(): today = datetime.today() yesterday = today - timedelta(days=1) yesterday = yesterday.strftime("%Y-%m-%d") ins_manage_sql = "INSERT INTO manage_data VALUES(0,'{}',%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)".format(yesterday) ins_use_time_sql = "INSERT INTO yesterday_use_time VALUES(0,'{}',%s,%s)".format(yesterday) #print(ins_manage_sql) conn = createPool(channel_db_config).connection() stat_conn = createPool(stat_db_config).connection() month_df = get_month_data(conn) yesterday_df = get_yesterday_data(conn) use_time_df = get_yesterday_use_time(conn) df = pd.merge(month_df,yesterday_df,how = 'outer',on=['enterprise_id','customer_name']) #df = df.fillna(0) #df['yes_rate'] = 0 df['yes_rate'] = df['profit'] / df['income'] df['month_rate'] = df['month_profit'] / df['income'] df = df.fillna(0) df['today_income'] = -1 df['today_profit'] = -1 df = df.reindex(columns=['enterprise_id','customer_name','income','profit','yes_rate','month_income','month_profit', 'month_rate','today_income','today_profit']) yes_profit_sum = float(df['profit'].sum()) yes_income_sum = float(df['income'].sum()) month_profit_sum = float(df['month_profit'].sum()) month_income_sum = float(df['month_income'].sum()) yes_profit_rate = yes_profit_sum /yes_income_sum month_profit_rate = month_profit_sum /month_income_sum ins_manage_list = df.values.tolist() ins_manage_list.append(['0001','',yes_income_sum,yes_profit_sum,yes_profit_rate,month_income_sum, month_profit_sum,month_profit_rate,-1,-1] ) #print(ins_manage_list) ins_use_time_list = [] ins_use_time_list.append(('小于10分钟',int(use_time_df['A'].iloc[0]))) ins_use_time_list.append(('10-20分钟',int(use_time_df['B'].iloc[0]))) ins_use_time_list.append(('20-30分钟',int(use_time_df['C'].iloc[0]))) ins_use_time_list.append(('30-40分钟',int(use_time_df['D'].iloc[0]))) ins_use_time_list.append(('40-50分钟',int(use_time_df['E'].iloc[0]))) ins_use_time_list.append(('50-60分钟',int(use_time_df['F'].iloc[0]))) ins_use_time_list.append(('大于60分钟',int(use_time_df['G'].iloc[0]))) ins_use_time_list.append(('平均时长',float(use_time_df['avg_time'].iloc[0]/60))) #print(use_time_df) #print(ins_use_time_list) cursor = stat_conn.cursor() try: cursor.executemany(ins_manage_sql,ins_manage_list) cursor.executemany(ins_use_time_sql,ins_use_time_list) except Exception as ex : stat_conn.rollback() print(ex) #print(month_df) #print(yesterday_df) #print(df) #print(use_time_df) conn.close() stat_conn.close() print(44444444444) @cel.task def save_now_data(): conn = createPool(channel_db_config).connection() stat_conn = createPool(stat_db_config).connection() income,profit = get_now_data(conn) #print(income,profit) #now = today.strftime("%Y-%m-%d %H:%M:%S") now_time = time.strftime("%Y-%m-%d %H:%M:%S",time.localtime()) #print(now_time) que_sql = " SELECT * FROM manage_data WHERE customer_id = '0000' " ins_sql = """ INSERT INTO manage_data (time,customer_id,today_income,today_profit) VALUES('{}','0000', {},{}) """.format(now_time,income,profit) upd_sql = """ UPDATE manage_data SET time = '{}',today_income= {},today_profit={} WHERE customer_id = '0000' """.format(now_time,income,profit) #print(upd_sql) df = pd.read_sql(que_sql,stat_conn) cursor = stat_conn.cursor() try: if df.empty is True: cursor.execute(ins_sql) else: cursor.execute(upd_sql) except Exception as ex: print(ex) stat_conn.rollback() cursor.close() conn.close() stat_conn.close() print(555555555555) def get_now_data(conn): today = datetime.today() time_start = today.strftime("%Y-%m-%d") + ' 00:00:00' sql = """SELECT SUM(price) income,SUM(price - operator_balance_price - partner_balance_price) profit FROM flow_order_info WHERE check_time BETWEEN '{}' AND now() AND status = 6 AND enterprise_id NOT IN (57,61,63,64,72) AND channel_id NOT IN ( 101,102,103,110,111,112) """.format(time_start) #print(sql) df = pd.read_sql(sql,conn) return df['income'].iloc[0],df['profit'].iloc[0] #获取本月总利润 def get_month_data(conn): DATE = None STEP = None today = datetime.today() year = today.year month = today.month yesterday = today - timedelta(days=1) yesterday = yesterday.strftime("%Y-%m-%d") yesterdayStart = yesterday + ' 00:00:00' yesterdayEnd = yesterday + ' 23:59:59' date2 = datetime(year, month, today.day) if date2.day == 1: if month == 1: month = 12 year = year -1 else: month = month - 1 date1 = datetime(year, month, 1) date1_week = date1.isocalendar()[1] date2_week = date2.isocalendar()[1] if DATE is None: DATE = date1 if STEP is None: STEP = (date2 - date1).days DATE_STR = DATE.strftime("%Y-%m") time_start = DATE.strftime("%Y-%m-%d %H:%M:%S") #print(time_start) time_end = (DATE + timedelta(days=STEP)).strftime("%Y-%m-%d %H:%M:%S") #print(time_start) #print(time_end) profit_temp_sql = """SELECT enterprise_id,customer_name,income,profit FROM (SELECT enterprise_id,SUM(price) income,SUM(price - operator_balance_price - partner_balance_price) profit FROM flow_order_info_{} WHERE check_time BETWEEN '{}' AND '{}' AND status = 6 AND enterprise_id NOT IN (57,61,63,64,72) AND channel_id NOT IN ( 101,102,103,110,111,112) GROUP BY enterprise_id) t1 LEFT JOIN customer_info ON customer_info.customer_id = t1.enterprise_id""" profit_sql_temp_1 = """SELECT SUM(price - operator_balance_price - partner_balance_price - price * cast(0.06 / 1.06 AS decimal(12,4))) profits FROM flow_order_info_{} WHERE apply_date BETWEEN '{}' AND '{}' AND status = 6 AND enterprise_id IN (72) AND channel_id NOT IN ( 101,102,103,110,111,112)""" profit_end_sql = """ SELECT enterprise_id,customer_name,income,profit FROM (SELECT enterprise_id,SUM(price) income,SUM(price - operator_balance_price - partner_balance_price) profit FROM flow_order_info WHERE check_time BETWEEN '{}' AND '{}' AND status = 6 AND enterprise_id NOT IN (57,61,63,64,72) AND channel_id NOT IN ( 101,102,103,110,111,112) GROUP BY enterprise_id) t1 LEFT JOIN customer_info ON customer_info.customer_id = t1.enterprise_id""" profit_end_sql_1 = """SELECT SUM(price - operator_balance_price - partner_balance_price - price * cast(0.06 / 1.06 AS decimal(12,4))) profit FROM flow_order_info WHERE apply_date BETWEEN '{}' AND '{}' AND status = 6 AND enterprise_id IN (72) AND channel_id NOT IN ( 101,102,103,110,111,112)""" sql = '' for i in range(date1_week,date2_week+1): str_table_suffix = "{0}{1:02d}".format(year,i) if i == date2_week: if (today - date2) < timedelta(days=7): sql += profit_end_sql.format(time_start,yesterdayEnd) else: sql += prifit_temp_sql.format(str_table_suffix,time_start,yesterdayEnd) else : sql += profit_temp_sql.format(str_table_suffix,time_start,yesterdayEnd) + ' UNION ' last_sql = ''' SELECT enterprise_id,customer_name,SUM(income) month_income,SUM(profit) month_profit FROM ({})t6 GROUP BY enterprise_id '''.format(sql) #print(last_sql) df = pd.read_sql(last_sql,conn) return df def get_yesterday_data(conn): today = datetime.today() yesterday = today - timedelta(days=1) yesterday = yesterday.strftime("%Y-%m-%d") yesterdayStart = yesterday + ' 00:00:00' yesterdayEnd = yesterday + ' 23:59:59' yes_profit_sql = """SELECT enterprise_id,customer_name,income,profit FROM (SELECT enterprise_id,SUM(price) income,SUM(price - operator_balance_price - partner_balance_price) profit FROM flow_order_info WHERE check_time BETWEEN '{}' AND '{}' AND status = 6 AND enterprise_id NOT IN (57,61,63,64,72) AND channel_id NOT IN ( 101,102,103,110,111,112) GROUP BY enterprise_id) t1 LEFT JOIN customer_info ON customer_info.customer_id = t1.enterprise_id""".format(yesterdayStart,yesterdayEnd) yes_profit_sql_1 = """SELECT SUM(price) sum_income, SUM(operator_balance_price + partner_balance_price + price * cast(0.06 / 1.06 AS decimal(12,4))) sum_cost FROM flow_order_info WHERE apply_date BETWEEN '{}' AND '{}' AND status = 6 AND enterprise_id IN (72) AND channel_id NOT IN ( 101,102,103,110,111,112)""".format(yesterdayStart,yesterdayEnd) #print(yes_profit_sql) df = pd.read_sql(yes_profit_sql,conn) return df def get_yesterday_use_time(conn): today = datetime.today() yesterday = today - timedelta(days=1) yesterday = yesterday.strftime("%Y-%m-%d") yesterdayStart = yesterday + ' 00:00:00' yesterdayEnd = yesterday + ' 23:59:59' sql = """ SELECT SUM(CASE WHEN diff_time BETWEEN 0 AND 600 THEN 1 ELSE 0 END) AS 'A', SUM(CASE WHEN diff_time BETWEEN 601 AND 1200 THEN 1 ELSE 0 END) AS 'B', SUM(CASE WHEN diff_time BETWEEN 1201 AND 1800 THEN 1 ELSE 0 END) AS 'C', SUM(CASE WHEN diff_time BETWEEN 1801 AND 2400 THEN 1 ELSE 0 END) AS 'D', SUM(CASE WHEN diff_time BETWEEN 2401 AND 3000 THEN 1 ELSE 0 END) AS 'E', SUM(CASE WHEN diff_time BETWEEN 3001 AND 3600 THEN 1 ELSE 0 END) AS 'F', SUM(CASE WHEN diff_time > 3600 THEN 1 ELSE 0 END) AS 'G', (SUM(diff_time) /COUNT(*)) AS 'avg_time' FROM ( SELECT (UNIX_TIMESTAMP(check_time) - UNIX_TIMESTAMP(apply_date)) AS diff_time FROM flow_order_info WHERE apply_date BETWEEN '{}' AND '{}' AND status = 6 AND channel_id NOT IN ( 101,102,103,110,111,112 ) ) AS t1 """.format(yesterdayStart,yesterdayEnd) df = pd.read_sql(sql,conn) return df save_yesterday_data() #save_now_data()