123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373 |
- #-*- coding: utf-8 -*-
- #__author__ = "dukun"
- import json
- import time
- import pymysql
- import logging
- import numpy as np
- import pandas as pd
- import configparser
- from celery_tasks import cel
- #from DBUtils.PooledDB import PooledDB
- from dbutils.pooled_db import PooledDB
- from datetime import datetime,timedelta,date
- #配置输出日志格式
- LOG_FORMAT = "%(asctime)s %(filename)s[line:%(lineno)d] %(name)s %(levelname)s %(pathname)s %(message)s "
- #配置输出时间的格式,注意月份和天数不要搞乱了
- DATE_FORMAT = '%Y-%m-%d %H:%M:%S %a '
- logging.basicConfig(level=logging.INFO,
- format=LOG_FORMAT,
- datefmt = DATE_FORMAT ,
- filename=r"./logs/manage.log" #有了filename参数就不会直接输出显示到控制台,而是直接写入文件
- )
- cf = configparser.RawConfigParser()
- cf.read(r"./config.ini")
- CHANNEL_HOST = cf.get("database1","host")
- CHANNEL_PORT = int(cf.get("database1","port"))
- CHANNEL_USER = cf.get("database1","user")
- CHANNEL_PASSWORD = cf.get("database1","password")
- CHANNEL_DB = cf.get("database1","db")
- STAT_HOST = cf.get("database2","host")
- STAT_PORT = int(cf.get("database2","port"))
- STAT_USER = cf.get("database2","user")
- STAT_PASSWORD = cf.get("database2","password")
- STAT_DB = cf.get("database2","db")
- #通道数据库
- channel_db_config = {
- 'host' : CHANNEL_HOST,
- 'port' : CHANNEL_PORT,
- 'user' : CHANNEL_USER,
- 'password' : CHANNEL_PASSWORD,
- 'db' : CHANNEL_DB,
- 'charset' : 'utf8',
- 'autocommit' : 1
- }
- #监控数据库
- stat_db_config = {
- 'host' : STAT_HOST,
- 'port' : STAT_PORT,
- 'user' : STAT_USER,
- 'password' : STAT_PASSWORD,
- 'db' : STAT_DB,
- 'charset' : 'utf8',
- 'autocommit' : 1
- }
- #创建数据库连接池
- def createPool(db_config):
- spool = PooledDB(pymysql, 5, **db_config)
- return spool
- @cel.task
- def save_yesterday_data():
- today = datetime.today()
- yesterday = today - timedelta(days=1)
- yesterday = yesterday.strftime("%Y-%m-%d")
- ins_manage_sql = "INSERT INTO manage_data VALUES(0,'{}',%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)".format(yesterday)
- ins_use_time_sql = "INSERT INTO yesterday_use_time VALUES(0,'{}',%s,%s)".format(yesterday)
- #print(ins_manage_sql)
- conn = createPool(channel_db_config).connection()
- stat_conn = createPool(stat_db_config).connection()
-
- month_df = get_month_data(conn)
- yesterday_df = get_yesterday_data(conn)
- use_time_df = get_yesterday_use_time(conn)
-
- df = pd.merge(month_df,yesterday_df,how = 'outer',on=['enterprise_id','customer_name'])
- #df = df.fillna(0)
- #df['yes_rate'] = 0
- df['yes_rate'] = df['profit'] / df['income']
- df['month_rate'] = df['month_profit'] / df['income']
- df = df.fillna(0)
- df['today_income'] = -1
- df['today_profit'] = -1
- df = df.reindex(columns=['enterprise_id','customer_name','income','profit','yes_rate','month_income','month_profit',
- 'month_rate','today_income','today_profit'])
-
- yes_profit_sum = float(df['profit'].sum())
- yes_income_sum = float(df['income'].sum())
- month_profit_sum = float(df['month_profit'].sum())
- month_income_sum = float(df['month_income'].sum())
- yes_profit_rate = yes_profit_sum /yes_income_sum
- month_profit_rate = month_profit_sum /month_income_sum
- ins_manage_list = df.values.tolist()
- ins_manage_list.append(['0001','',yes_income_sum,yes_profit_sum,yes_profit_rate,month_income_sum,
- month_profit_sum,month_profit_rate,-1,-1] )
- #print(ins_manage_list)
- ins_use_time_list = []
- ins_use_time_list.append(('小于10分钟',int(use_time_df['A'].iloc[0])))
- ins_use_time_list.append(('10-20分钟',int(use_time_df['B'].iloc[0])))
- ins_use_time_list.append(('20-30分钟',int(use_time_df['C'].iloc[0])))
- ins_use_time_list.append(('30-40分钟',int(use_time_df['D'].iloc[0])))
- ins_use_time_list.append(('40-50分钟',int(use_time_df['E'].iloc[0])))
- ins_use_time_list.append(('50-60分钟',int(use_time_df['F'].iloc[0])))
- ins_use_time_list.append(('大于60分钟',int(use_time_df['G'].iloc[0])))
- ins_use_time_list.append(('平均时长',float(use_time_df['avg_time'].iloc[0]/60)))
- #print(use_time_df)
- #print(ins_use_time_list)
- cursor = stat_conn.cursor()
- try:
- cursor.executemany(ins_manage_sql,ins_manage_list)
- cursor.executemany(ins_use_time_sql,ins_use_time_list)
-
- except Exception as ex :
- stat_conn.rollback()
- print(ex)
- #print(month_df)
- #print(yesterday_df)
- #print(df)
- #print(use_time_df)
-
- conn.close()
- stat_conn.close()
- print(44444444444)
- @cel.task
- def save_now_data():
- conn = createPool(channel_db_config).connection()
- stat_conn = createPool(stat_db_config).connection()
-
- income,profit = get_now_data(conn)
- #print(income,profit)
- #now = today.strftime("%Y-%m-%d %H:%M:%S")
- now_time = time.strftime("%Y-%m-%d %H:%M:%S",time.localtime())
- #print(now_time)
- que_sql = " SELECT * FROM manage_data WHERE customer_id = '0000' "
- ins_sql = """ INSERT INTO manage_data (time,customer_id,today_income,today_profit) VALUES('{}','0000',
- {},{}) """.format(now_time,income,profit)
- upd_sql = """ UPDATE manage_data SET time = '{}',today_income= {},today_profit={}
- WHERE customer_id = '0000' """.format(now_time,income,profit)
- #print(upd_sql)
- df = pd.read_sql(que_sql,stat_conn)
-
- cursor = stat_conn.cursor()
- try:
- if df.empty is True:
- cursor.execute(ins_sql)
- else:
- cursor.execute(upd_sql)
- except Exception as ex:
- print(ex)
- stat_conn.rollback()
-
- cursor.close()
- conn.close()
- stat_conn.close()
- print(555555555555)
- def get_now_data(conn):
- today = datetime.today()
- time_start = today.strftime("%Y-%m-%d") + ' 00:00:00'
- sql = """SELECT SUM(price) income,SUM(price - operator_balance_price - partner_balance_price) profit
- FROM
- flow_order_info
- WHERE
- check_time BETWEEN '{}' AND now() AND status = 6
- AND enterprise_id NOT IN (57,61,63,64,72)
- AND channel_id NOT IN ( 101,102,103,110,111,112) """.format(time_start)
- #print(sql)
- df = pd.read_sql(sql,conn)
-
- return df['income'].iloc[0],df['profit'].iloc[0]
-
- #获取本月总利润
- def get_month_data(conn):
- DATE = None
- STEP = None
- today = datetime.today()
- year = today.year
- month = today.month
-
- yesterday = today - timedelta(days=1)
- yesterday = yesterday.strftime("%Y-%m-%d")
- yesterdayStart = yesterday + ' 00:00:00'
- yesterdayEnd = yesterday + ' 23:59:59'
- date2 = datetime(year, month, today.day)
- if date2.day == 1:
- if month == 1:
- month = 12
- year = year -1
- else:
- month = month - 1
- date1 = datetime(year, month, 1)
- date1_week = date1.isocalendar()[1]
- date2_week = date2.isocalendar()[1]
- if DATE is None:
- DATE = date1
- if STEP is None:
- STEP = (date2 - date1).days
- DATE_STR = DATE.strftime("%Y-%m")
- time_start = DATE.strftime("%Y-%m-%d %H:%M:%S")
-
- #print(time_start)
- time_end = (DATE + timedelta(days=STEP)).strftime("%Y-%m-%d %H:%M:%S")
- #print(time_start)
- #print(time_end)
-
- profit_temp_sql = """SELECT enterprise_id,customer_name,income,profit
- FROM
- (SELECT enterprise_id,SUM(price) income,SUM(price - operator_balance_price - partner_balance_price) profit
- FROM
- flow_order_info_{}
- WHERE
- check_time BETWEEN '{}' AND '{}' AND status = 6
- AND enterprise_id NOT IN (57,61,63,64,72)
- AND channel_id NOT IN ( 101,102,103,110,111,112)
- GROUP BY enterprise_id) t1 LEFT JOIN customer_info
- ON customer_info.customer_id = t1.enterprise_id"""
-
- profit_sql_temp_1 = """SELECT SUM(price - operator_balance_price - partner_balance_price -
- price * cast(0.06 / 1.06 AS decimal(12,4))) profits
- FROM
- flow_order_info_{}
- WHERE
- apply_date BETWEEN '{}' AND '{}' AND status = 6 AND enterprise_id IN (72)
- AND channel_id NOT IN ( 101,102,103,110,111,112)"""
-
- profit_end_sql = """ SELECT enterprise_id,customer_name,income,profit
- FROM
- (SELECT enterprise_id,SUM(price) income,SUM(price - operator_balance_price - partner_balance_price) profit
- FROM
- flow_order_info
- WHERE
- check_time BETWEEN '{}' AND '{}' AND status = 6
- AND enterprise_id NOT IN (57,61,63,64,72)
- AND channel_id NOT IN ( 101,102,103,110,111,112)
- GROUP BY enterprise_id) t1 LEFT JOIN customer_info
- ON customer_info.customer_id = t1.enterprise_id"""
-
- profit_end_sql_1 = """SELECT SUM(price - operator_balance_price - partner_balance_price
- - price * cast(0.06 / 1.06 AS decimal(12,4))) profit
- FROM
- flow_order_info
- WHERE
- apply_date BETWEEN '{}' AND '{}' AND status = 6 AND enterprise_id IN (72)
- AND channel_id NOT IN ( 101,102,103,110,111,112)"""
-
- sql = ''
- for i in range(date1_week,date2_week+1):
- str_table_suffix = "{0}{1:02d}".format(year,i)
- if i == date2_week:
- if (today - date2) < timedelta(days=7):
- sql += profit_end_sql.format(time_start,yesterdayEnd)
- else:
- sql += prifit_temp_sql.format(str_table_suffix,time_start,yesterdayEnd)
- else :
- sql += profit_temp_sql.format(str_table_suffix,time_start,yesterdayEnd) + ' UNION '
-
- last_sql = ''' SELECT enterprise_id,customer_name,SUM(income) month_income,SUM(profit) month_profit
- FROM ({})t6 GROUP BY enterprise_id
-
- '''.format(sql)
- #print(last_sql)
- df = pd.read_sql(last_sql,conn)
-
- return df
- def get_yesterday_data(conn):
- today = datetime.today()
- yesterday = today - timedelta(days=1)
- yesterday = yesterday.strftime("%Y-%m-%d")
- yesterdayStart = yesterday + ' 00:00:00'
- yesterdayEnd = yesterday + ' 23:59:59'
-
- yes_profit_sql = """SELECT enterprise_id,customer_name,income,profit
- FROM
- (SELECT enterprise_id,SUM(price) income,SUM(price - operator_balance_price - partner_balance_price) profit
- FROM
- flow_order_info
- WHERE
- check_time BETWEEN '{}' AND '{}' AND status = 6
- AND enterprise_id NOT IN (57,61,63,64,72)
- AND channel_id NOT IN ( 101,102,103,110,111,112)
- GROUP BY enterprise_id) t1 LEFT JOIN customer_info
- ON customer_info.customer_id = t1.enterprise_id""".format(yesterdayStart,yesterdayEnd)
- yes_profit_sql_1 = """SELECT SUM(price) sum_income,
- SUM(operator_balance_price + partner_balance_price
- + price * cast(0.06 / 1.06 AS decimal(12,4))) sum_cost
- FROM
- flow_order_info
- WHERE
- apply_date BETWEEN '{}' AND '{}' AND status = 6 AND enterprise_id IN (72)
- AND channel_id NOT IN ( 101,102,103,110,111,112)""".format(yesterdayStart,yesterdayEnd)
- #print(yes_profit_sql)
- df = pd.read_sql(yes_profit_sql,conn)
-
- return df
- def get_yesterday_use_time(conn):
- today = datetime.today()
- yesterday = today - timedelta(days=1)
- yesterday = yesterday.strftime("%Y-%m-%d")
- yesterdayStart = yesterday + ' 00:00:00'
- yesterdayEnd = yesterday + ' 23:59:59'
- sql = """ SELECT
- SUM(CASE WHEN diff_time BETWEEN 0 AND 600 THEN 1 ELSE 0 END) AS 'A',
- SUM(CASE WHEN diff_time BETWEEN 601 AND 1200 THEN 1 ELSE 0 END) AS 'B',
- SUM(CASE WHEN diff_time BETWEEN 1201 AND 1800 THEN 1 ELSE 0 END) AS 'C',
- SUM(CASE WHEN diff_time BETWEEN 1801 AND 2400 THEN 1 ELSE 0 END) AS 'D',
- SUM(CASE WHEN diff_time BETWEEN 2401 AND 3000 THEN 1 ELSE 0 END) AS 'E',
- SUM(CASE WHEN diff_time BETWEEN 3001 AND 3600 THEN 1 ELSE 0 END) AS 'F',
- SUM(CASE WHEN diff_time > 3600 THEN 1 ELSE 0 END) AS 'G',
- (SUM(diff_time) /COUNT(*)) AS 'avg_time'
- FROM
- ( SELECT
- (UNIX_TIMESTAMP(check_time) - UNIX_TIMESTAMP(apply_date)) AS diff_time
- FROM
- flow_order_info
- WHERE
- apply_date BETWEEN '{}' AND '{}' AND
- status = 6 AND
- channel_id NOT IN ( 101,102,103,110,111,112 ) ) AS t1 """.format(yesterdayStart,yesterdayEnd)
-
- df = pd.read_sql(sql,conn)
-
- return df
- save_yesterday_data()
- #save_now_data()
|