manage_data.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373
  1. #-*- coding: utf-8 -*-
  2. #__author__ = "dukun"
  3. import json
  4. import time
  5. import pymysql
  6. import logging
  7. import numpy as np
  8. import pandas as pd
  9. import configparser
  10. from celery_tasks import cel
  11. #from DBUtils.PooledDB import PooledDB
  12. from dbutils.pooled_db import PooledDB
  13. from datetime import datetime,timedelta,date
  14. #配置输出日志格式
  15. LOG_FORMAT = "%(asctime)s %(filename)s[line:%(lineno)d] %(name)s %(levelname)s %(pathname)s %(message)s "
  16. #配置输出时间的格式,注意月份和天数不要搞乱了
  17. DATE_FORMAT = '%Y-%m-%d %H:%M:%S %a '
  18. logging.basicConfig(level=logging.INFO,
  19. format=LOG_FORMAT,
  20. datefmt = DATE_FORMAT ,
  21. filename=r"./logs/manage.log" #有了filename参数就不会直接输出显示到控制台,而是直接写入文件
  22. )
  23. cf = configparser.RawConfigParser()
  24. cf.read(r"./config.ini")
  25. CHANNEL_HOST = cf.get("database1","host")
  26. CHANNEL_PORT = int(cf.get("database1","port"))
  27. CHANNEL_USER = cf.get("database1","user")
  28. CHANNEL_PASSWORD = cf.get("database1","password")
  29. CHANNEL_DB = cf.get("database1","db")
  30. STAT_HOST = cf.get("database2","host")
  31. STAT_PORT = int(cf.get("database2","port"))
  32. STAT_USER = cf.get("database2","user")
  33. STAT_PASSWORD = cf.get("database2","password")
  34. STAT_DB = cf.get("database2","db")
  35. #通道数据库
  36. channel_db_config = {
  37. 'host' : CHANNEL_HOST,
  38. 'port' : CHANNEL_PORT,
  39. 'user' : CHANNEL_USER,
  40. 'password' : CHANNEL_PASSWORD,
  41. 'db' : CHANNEL_DB,
  42. 'charset' : 'utf8',
  43. 'autocommit' : 1
  44. }
  45. #监控数据库
  46. stat_db_config = {
  47. 'host' : STAT_HOST,
  48. 'port' : STAT_PORT,
  49. 'user' : STAT_USER,
  50. 'password' : STAT_PASSWORD,
  51. 'db' : STAT_DB,
  52. 'charset' : 'utf8',
  53. 'autocommit' : 1
  54. }
  55. #创建数据库连接池
  56. def createPool(db_config):
  57. spool = PooledDB(pymysql, 5, **db_config)
  58. return spool
  59. @cel.task
  60. def save_yesterday_data():
  61. today = datetime.today()
  62. yesterday = today - timedelta(days=1)
  63. yesterday = yesterday.strftime("%Y-%m-%d")
  64. ins_manage_sql = "INSERT INTO manage_data VALUES(0,'{}',%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)".format(yesterday)
  65. ins_use_time_sql = "INSERT INTO yesterday_use_time VALUES(0,'{}',%s,%s)".format(yesterday)
  66. #print(ins_manage_sql)
  67. conn = createPool(channel_db_config).connection()
  68. stat_conn = createPool(stat_db_config).connection()
  69. month_df = get_month_data(conn)
  70. yesterday_df = get_yesterday_data(conn)
  71. use_time_df = get_yesterday_use_time(conn)
  72. df = pd.merge(month_df,yesterday_df,how = 'outer',on=['enterprise_id','customer_name'])
  73. #df = df.fillna(0)
  74. #df['yes_rate'] = 0
  75. df['yes_rate'] = df['profit'] / df['income']
  76. df['month_rate'] = df['month_profit'] / df['income']
  77. df = df.fillna(0)
  78. df['today_income'] = -1
  79. df['today_profit'] = -1
  80. df = df.reindex(columns=['enterprise_id','customer_name','income','profit','yes_rate','month_income','month_profit',
  81. 'month_rate','today_income','today_profit'])
  82. yes_profit_sum = float(df['profit'].sum())
  83. yes_income_sum = float(df['income'].sum())
  84. month_profit_sum = float(df['month_profit'].sum())
  85. month_income_sum = float(df['month_income'].sum())
  86. yes_profit_rate = yes_profit_sum /yes_income_sum
  87. month_profit_rate = month_profit_sum /month_income_sum
  88. ins_manage_list = df.values.tolist()
  89. ins_manage_list.append(['0001','',yes_income_sum,yes_profit_sum,yes_profit_rate,month_income_sum,
  90. month_profit_sum,month_profit_rate,-1,-1] )
  91. #print(ins_manage_list)
  92. ins_use_time_list = []
  93. ins_use_time_list.append(('小于10分钟',int(use_time_df['A'].iloc[0])))
  94. ins_use_time_list.append(('10-20分钟',int(use_time_df['B'].iloc[0])))
  95. ins_use_time_list.append(('20-30分钟',int(use_time_df['C'].iloc[0])))
  96. ins_use_time_list.append(('30-40分钟',int(use_time_df['D'].iloc[0])))
  97. ins_use_time_list.append(('40-50分钟',int(use_time_df['E'].iloc[0])))
  98. ins_use_time_list.append(('50-60分钟',int(use_time_df['F'].iloc[0])))
  99. ins_use_time_list.append(('大于60分钟',int(use_time_df['G'].iloc[0])))
  100. ins_use_time_list.append(('平均时长',float(use_time_df['avg_time'].iloc[0]/60)))
  101. #print(use_time_df)
  102. #print(ins_use_time_list)
  103. cursor = stat_conn.cursor()
  104. try:
  105. cursor.executemany(ins_manage_sql,ins_manage_list)
  106. cursor.executemany(ins_use_time_sql,ins_use_time_list)
  107. except Exception as ex :
  108. stat_conn.rollback()
  109. print(ex)
  110. #print(month_df)
  111. #print(yesterday_df)
  112. #print(df)
  113. #print(use_time_df)
  114. conn.close()
  115. stat_conn.close()
  116. print(44444444444)
  117. @cel.task
  118. def save_now_data():
  119. conn = createPool(channel_db_config).connection()
  120. stat_conn = createPool(stat_db_config).connection()
  121. income,profit = get_now_data(conn)
  122. #print(income,profit)
  123. #now = today.strftime("%Y-%m-%d %H:%M:%S")
  124. now_time = time.strftime("%Y-%m-%d %H:%M:%S",time.localtime())
  125. #print(now_time)
  126. que_sql = " SELECT * FROM manage_data WHERE customer_id = '0000' "
  127. ins_sql = """ INSERT INTO manage_data (time,customer_id,today_income,today_profit) VALUES('{}','0000',
  128. {},{}) """.format(now_time,income,profit)
  129. upd_sql = """ UPDATE manage_data SET time = '{}',today_income= {},today_profit={}
  130. WHERE customer_id = '0000' """.format(now_time,income,profit)
  131. #print(upd_sql)
  132. df = pd.read_sql(que_sql,stat_conn)
  133. cursor = stat_conn.cursor()
  134. try:
  135. if df.empty is True:
  136. cursor.execute(ins_sql)
  137. else:
  138. cursor.execute(upd_sql)
  139. except Exception as ex:
  140. print(ex)
  141. stat_conn.rollback()
  142. cursor.close()
  143. conn.close()
  144. stat_conn.close()
  145. print(555555555555)
  146. def get_now_data(conn):
  147. today = datetime.today()
  148. time_start = today.strftime("%Y-%m-%d") + ' 00:00:00'
  149. sql = """SELECT SUM(price) income,SUM(price - operator_balance_price - partner_balance_price) profit
  150. FROM
  151. flow_order_info
  152. WHERE
  153. check_time BETWEEN '{}' AND now() AND status = 6
  154. AND enterprise_id NOT IN (57,61,63,64,72)
  155. AND channel_id NOT IN ( 101,102,103,110,111,112) """.format(time_start)
  156. #print(sql)
  157. df = pd.read_sql(sql,conn)
  158. return df['income'].iloc[0],df['profit'].iloc[0]
  159. #获取本月总利润
  160. def get_month_data(conn):
  161. DATE = None
  162. STEP = None
  163. today = datetime.today()
  164. year = today.year
  165. month = today.month
  166. yesterday = today - timedelta(days=1)
  167. yesterday = yesterday.strftime("%Y-%m-%d")
  168. yesterdayStart = yesterday + ' 00:00:00'
  169. yesterdayEnd = yesterday + ' 23:59:59'
  170. date2 = datetime(year, month, today.day)
  171. if date2.day == 1:
  172. if month == 1:
  173. month = 12
  174. year = year -1
  175. else:
  176. month = month - 1
  177. date1 = datetime(year, month, 1)
  178. date1_week = date1.isocalendar()[1]
  179. date2_week = date2.isocalendar()[1]
  180. if DATE is None:
  181. DATE = date1
  182. if STEP is None:
  183. STEP = (date2 - date1).days
  184. DATE_STR = DATE.strftime("%Y-%m")
  185. time_start = DATE.strftime("%Y-%m-%d %H:%M:%S")
  186. #print(time_start)
  187. time_end = (DATE + timedelta(days=STEP)).strftime("%Y-%m-%d %H:%M:%S")
  188. #print(time_start)
  189. #print(time_end)
  190. profit_temp_sql = """SELECT enterprise_id,customer_name,income,profit
  191. FROM
  192. (SELECT enterprise_id,SUM(price) income,SUM(price - operator_balance_price - partner_balance_price) profit
  193. FROM
  194. flow_order_info_{}
  195. WHERE
  196. check_time BETWEEN '{}' AND '{}' AND status = 6
  197. AND enterprise_id NOT IN (57,61,63,64,72)
  198. AND channel_id NOT IN ( 101,102,103,110,111,112)
  199. GROUP BY enterprise_id) t1 LEFT JOIN customer_info
  200. ON customer_info.customer_id = t1.enterprise_id"""
  201. profit_sql_temp_1 = """SELECT SUM(price - operator_balance_price - partner_balance_price -
  202. price * cast(0.06 / 1.06 AS decimal(12,4))) profits
  203. FROM
  204. flow_order_info_{}
  205. WHERE
  206. apply_date BETWEEN '{}' AND '{}' AND status = 6 AND enterprise_id IN (72)
  207. AND channel_id NOT IN ( 101,102,103,110,111,112)"""
  208. profit_end_sql = """ SELECT enterprise_id,customer_name,income,profit
  209. FROM
  210. (SELECT enterprise_id,SUM(price) income,SUM(price - operator_balance_price - partner_balance_price) profit
  211. FROM
  212. flow_order_info
  213. WHERE
  214. check_time BETWEEN '{}' AND '{}' AND status = 6
  215. AND enterprise_id NOT IN (57,61,63,64,72)
  216. AND channel_id NOT IN ( 101,102,103,110,111,112)
  217. GROUP BY enterprise_id) t1 LEFT JOIN customer_info
  218. ON customer_info.customer_id = t1.enterprise_id"""
  219. profit_end_sql_1 = """SELECT SUM(price - operator_balance_price - partner_balance_price
  220. - price * cast(0.06 / 1.06 AS decimal(12,4))) profit
  221. FROM
  222. flow_order_info
  223. WHERE
  224. apply_date BETWEEN '{}' AND '{}' AND status = 6 AND enterprise_id IN (72)
  225. AND channel_id NOT IN ( 101,102,103,110,111,112)"""
  226. sql = ''
  227. for i in range(date1_week,date2_week+1):
  228. str_table_suffix = "{0}{1:02d}".format(year,i)
  229. if i == date2_week:
  230. if (today - date2) < timedelta(days=7):
  231. sql += profit_end_sql.format(time_start,yesterdayEnd)
  232. else:
  233. sql += prifit_temp_sql.format(str_table_suffix,time_start,yesterdayEnd)
  234. else :
  235. sql += profit_temp_sql.format(str_table_suffix,time_start,yesterdayEnd) + ' UNION '
  236. last_sql = ''' SELECT enterprise_id,customer_name,SUM(income) month_income,SUM(profit) month_profit
  237. FROM ({})t6 GROUP BY enterprise_id
  238. '''.format(sql)
  239. #print(last_sql)
  240. df = pd.read_sql(last_sql,conn)
  241. return df
  242. def get_yesterday_data(conn):
  243. today = datetime.today()
  244. yesterday = today - timedelta(days=1)
  245. yesterday = yesterday.strftime("%Y-%m-%d")
  246. yesterdayStart = yesterday + ' 00:00:00'
  247. yesterdayEnd = yesterday + ' 23:59:59'
  248. yes_profit_sql = """SELECT enterprise_id,customer_name,income,profit
  249. FROM
  250. (SELECT enterprise_id,SUM(price) income,SUM(price - operator_balance_price - partner_balance_price) profit
  251. FROM
  252. flow_order_info
  253. WHERE
  254. check_time BETWEEN '{}' AND '{}' AND status = 6
  255. AND enterprise_id NOT IN (57,61,63,64,72)
  256. AND channel_id NOT IN ( 101,102,103,110,111,112)
  257. GROUP BY enterprise_id) t1 LEFT JOIN customer_info
  258. ON customer_info.customer_id = t1.enterprise_id""".format(yesterdayStart,yesterdayEnd)
  259. yes_profit_sql_1 = """SELECT SUM(price) sum_income,
  260. SUM(operator_balance_price + partner_balance_price
  261. + price * cast(0.06 / 1.06 AS decimal(12,4))) sum_cost
  262. FROM
  263. flow_order_info
  264. WHERE
  265. apply_date BETWEEN '{}' AND '{}' AND status = 6 AND enterprise_id IN (72)
  266. AND channel_id NOT IN ( 101,102,103,110,111,112)""".format(yesterdayStart,yesterdayEnd)
  267. #print(yes_profit_sql)
  268. df = pd.read_sql(yes_profit_sql,conn)
  269. return df
  270. def get_yesterday_use_time(conn):
  271. today = datetime.today()
  272. yesterday = today - timedelta(days=1)
  273. yesterday = yesterday.strftime("%Y-%m-%d")
  274. yesterdayStart = yesterday + ' 00:00:00'
  275. yesterdayEnd = yesterday + ' 23:59:59'
  276. sql = """ SELECT
  277. SUM(CASE WHEN diff_time BETWEEN 0 AND 600 THEN 1 ELSE 0 END) AS 'A',
  278. SUM(CASE WHEN diff_time BETWEEN 601 AND 1200 THEN 1 ELSE 0 END) AS 'B',
  279. SUM(CASE WHEN diff_time BETWEEN 1201 AND 1800 THEN 1 ELSE 0 END) AS 'C',
  280. SUM(CASE WHEN diff_time BETWEEN 1801 AND 2400 THEN 1 ELSE 0 END) AS 'D',
  281. SUM(CASE WHEN diff_time BETWEEN 2401 AND 3000 THEN 1 ELSE 0 END) AS 'E',
  282. SUM(CASE WHEN diff_time BETWEEN 3001 AND 3600 THEN 1 ELSE 0 END) AS 'F',
  283. SUM(CASE WHEN diff_time > 3600 THEN 1 ELSE 0 END) AS 'G',
  284. (SUM(diff_time) /COUNT(*)) AS 'avg_time'
  285. FROM
  286. ( SELECT
  287. (UNIX_TIMESTAMP(check_time) - UNIX_TIMESTAMP(apply_date)) AS diff_time
  288. FROM
  289. flow_order_info
  290. WHERE
  291. apply_date BETWEEN '{}' AND '{}' AND
  292. status = 6 AND
  293. channel_id NOT IN ( 101,102,103,110,111,112 ) ) AS t1 """.format(yesterdayStart,yesterdayEnd)
  294. df = pd.read_sql(sql,conn)
  295. return df
  296. save_yesterday_data()
  297. #save_now_data()