双过半活动自动统计报表代码

处理双过半通报数据,自动摘录相关数据和通报。

# Author: subk
# Time: 2025/5/1 13:44
# Desc: 处理25年集团市场双过半通报数据
# Version: 1.1

import requests
import pandas as pd
import re
import smtplib
import schedule
import time
from datetime import datetime, timedelta
from bs4 import BeautifulSoup
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart

# ── 用户配置 ───────────────────────────────────────────────
username    = "XXX.com"
password    = "3B3XXXXXXXXXXXX200"
smtp_server = "smtp.XX.com"
smtp_port   = 465
recipients  = [
    "XX",
    "XX"
]

REGIONS = ["涟水","清江浦","淮安区","淮阴区","盱眙","淮开","金湖","洪泽","战客"]
KEY_WEAK = [
    "战客客户净增","政企新入网","政企有效新增","电子学生证","云视讯新增终端数",
    "和对讲新增终端数","商客市场计费宽带新增","智算资源销售","专线FTTO新增",
    "战客集团渗透","泛住宿场景宽带新增","泛云主机","上网专线新增条数",
    "BC融合成员数","欠费回收"
]

# ── 状态控制 ───────────────────────────────────────────────
last_sent_date = None  # 格式:"YYYYMMDD"

# ── 通用函数 ───────────────────────────────────────────────
def get_dates():
    today = datetime.now().strftime("%Y年%m月%d日")
    yesterday = (datetime.now() - timedelta(days=1)).strftime("%Y%m%d")
    return today, yesterday

def send_email(subject: str, html_body: str):
    msg = MIMEMultipart()
    msg["From"]    = username
    msg["To"]      = ", ".join(recipients)
    msg["Subject"] = subject
    msg.attach(MIMEText(html_body, "html", "utf-8"))
    with smtplib.SMTP_SSL(smtp_server, smtp_port) as s:
        s.login(username, password)
        s.sendmail(username, recipients, msg.as_string())

def fetch_web_content(url: str) -> str:
    r = requests.get(url, timeout=10)
    r.raise_for_status()
    return r.text

def highlight_keyword(content: str, keyword: str) -> str:
    return content.replace(keyword, f"<span style='background-color:#FFD700'>{keyword}</span>")

def print_styled_table(df: pd.DataFrame, title: str) -> str:
    last = df.index[-1]
    styler = (
        df.style
          .set_table_attributes('style="margin-left:auto;margin-right:auto;border-collapse:collapse;"')
          .set_caption(f"<strong style='font-size:1.3em;color:#0000FF'>{title}</strong>")
          .set_table_styles([
              {"selector":"thead th",
               "props":[("background-color","#4390FF"),("font-weight","bold"),
                        ("font-size","14px"),("border","1px solid #AAA"),("padding","8px")]},
              {"selector":"tbody th", "props":[("border","1px solid #AAA"),("padding","6px")]}
          ])
          .set_properties(**{"border":"1px solid #AAA","padding":"6px","text-align":"center"})
          .apply(lambda row: ["background-color:#F2F3F4" if row.name==last else "" for _ in row], axis=1)
    )
    return f"<div style='margin:30px 0;'>{styler.to_html()}</div>"

# ── 解析函数 ───────────────────────────────────────────────
def parse_overall_table(soup: BeautifulSoup) -> pd.DataFrame:
    tables = pd.read_html(str(soup), header=[0,1])
    df = tables[0]
    flat = [h0 + (h1 if isinstance(h1,str) else '') for h0,h1 in df.columns]
    df.columns = flat
    df = df.rename(columns={df.columns[0]:'county'})
    front_col = next((c for c in df.columns if c.startswith('总体情况') and c.endswith('前三项目')), None)
    back_col  = next((c for c in df.columns if c.startswith('总体情况') and c.endswith('后三项目')), None)
    if not front_col or not back_col:
        raise ValueError("未找到“前三项目”或“后三项目”列")
    df = df[['county', front_col, back_col]].rename(columns={front_col:'front', back_col:'back'})
    df['front'] = pd.to_numeric(df['front'], errors='coerce')
    df['back']  = pd.to_numeric(df['back'],  errors='coerce')
    return df

def extract_lianshui(df: pd.DataFrame) -> dict:
    sub = df[df['county']=='涟水']
    if sub.empty:
        raise KeyError("涟水行缺失")
    front, back = int(sub['front'].values[0]), int(sub['back'].values[0])
    df_f = df.sort_values('front', ascending=False).reset_index(drop=True)
    fr = int((df_f['county']=='涟水').idxmax() + 1)
    df_b = df.sort_values('back', ascending=False).reset_index(drop=True)
    br = int((df_b['county']=='涟水').idxmax() + 1)
    return {'front':front,'back':back,'front_rank':fr,'back_rank':br}

def extract_deadline_text(soup: BeautifulSoup) -> str:
    for d in soup.find_all("div", class_="title_content"):
        t = d.get_text(strip=True)
        if t.startswith("截止"):
            return t
    return ""

def extract_fall_behind_metrics(soup: BeautifulSoup) -> dict:
    weak = {r: [] for r in REGIONS}
    for d in soup.find_all("div", class_="title_content"):
        txt = d.get_text(strip=True)
        m = re.match(r'^\d+、(.+?)双过半', txt)
        if not m: continue
        metric = m.group(1).strip()
        for f in d.find_all("font", attrs={"color":"red"}):
            items = re.findall(r'([\u4e00-\u9fa5]+)\([^)]*\)', f.get_text())
            for region in items:
                if region in weak:
                    weak[region].append(metric)
    return weak

def extract_top_metrics(soup: BeautifulSoup) -> dict:
    topm = {r: [] for r in REGIONS}
    for d in soup.find_all("div", class_="title_content"):
        txt = d.get_text(strip=True)
        m = re.match(r'^\d+、(.+?)双过半', txt)
        if not m: continue
        metric = m.group(1).strip()
        for f in d.find_all("font", attrs={"color":"blue"}):
            items = re.findall(r'([\u4e00-\u9fa5]+)\([^)]*\)', f.get_text())
            for region in items:
                if region in topm:
                    topm[region].append(metric)
    return topm

# ── 报表生成 ───────────────────────────────────────────────
def generate_report():
    today, yesterday = get_dates()
    url = f"http://10.XXXXXXXXXXXXXXXXXXXXX&cfg_id=205&day_time={yesterday}"
    html_raw = fetch_web_content(url)
    soup = BeautifulSoup(html_raw, "html.parser")
    overall = parse_overall_table(soup)

    if '涟水' not in overall['county'].values:
        raise KeyError("涟水行不存在")

    parts = []
    parts.append(f"<p style='text-align:center;font-size:1.2em;color:#FF0000;'>{extract_deadline_text(soup)}</p>")
    ls = extract_lianshui(overall)
    summary = (f"涟水前三项目数{ls['front']},排名{ls['front_rank']};"
               f"后三项目数{ls['back']},排名{ls['back_rank']}。")
    parts.append(f"<p style='text-align:center;font-size:1.2em;font-weight:bold;'>{summary}</p>")

    weak = extract_fall_behind_metrics(soup)
    maxlen = max(len(v) for v in weak.values())
    df_weak = pd.DataFrame({r: weak[r]+['']*(maxlen-len(weak[r])) for r in REGIONS})
    df_weak.loc[df_weak.shape[0]] = {r: len(weak[r]) for r in REGIONS}
    df_weak.index = [f"后三项目{i+1}" for i in range(df_weak.shape[0]-1)] + ["后三项目个数"]
    parts.append(print_styled_table(df_weak, "后三项目列表"))

    topm = extract_top_metrics(soup)
    maxlen2 = max(len(v) for v in topm.values())
    df_top = pd.DataFrame({r: topm[r]+['']*(maxlen2-len(topm[r])) for r in REGIONS})
    df_top.loc[df_top.shape[0]] = {r: len(topm[r]) for r in REGIONS}
    df_top.index = [f"前三项目{i+1}" for i in range(df_top.shape[0]-1)] + ["前三项目个数"]
    parts.append(print_styled_table(df_top, "前三项目列表"))

    matched = {r: [m for m in weak[r] if m in KEY_WEAK] for r in REGIONS}
    maxlen3 = max(len(v) for v in matched.values())
    df_matched = pd.DataFrame({r: matched[r]+['']*(maxlen3-len(matched[r])) for r in REGIONS})
    df_matched.loc[df_matched.shape[0]] = {r: len(matched[r]) for r in REGIONS}
    df_matched.index = [f"重点弱项{i+1}" for i in range(df_matched.shape[0]-1)] + ["重点弱项个数"]
    parts.append(print_styled_table(df_matched, "周调度重点弱项匹配情况"))

    highlighted = highlight_keyword(html_raw, "涟水")
    parts.append("<hr><div style='margin-top:20px;font-size:13px;'><strong>原始页面内容(含涟水高亮):</strong>
<br>")
    parts.append(f"<div style='border:1px solid #ccc; padding:10px;'>{highlighted}</div></div>")

    subject = f"【抢先版】政企\"双过半\"通报:{summary}"
    return subject, "".join(parts)

# ── 调度逻辑 ───────────────────────────────────────────────
def job_wrapper():
    global last_sent_date
    now = datetime.now()
    today_str = now.strftime("%Y%m%d")

    if now.hour >= 15 and last_sent_date != today_str:
        try:
            subject, body = generate_report()
            send_email(subject, body)
            last_sent_date = today_str
            print(f"[{now}] 报告已发送,已标记今日。")
        except Exception as e:
            print(f"[{now}] 报告生成/发送失败:{e}")

def main():
    global last_sent_date
    now = datetime.now()
    today_str = now.strftime("%Y%m%d")

    try:
        subject, body = generate_report()
        send_email(subject, body)
        if now.hour >= 15:
            last_sent_date = today_str
            print(f"[{now}] 手动执行:报告已发送并标记。")
        else:
            print(f"[{now}] 手动执行:报告已发送(不标记发送状态)。")
    except Exception:
        print("手动执行:请稍等,数据暂时没出来。")

    schedule.every(5).minutes.do(job_wrapper)

    while True:
        schedule.run_pending()
        time.sleep(1)

if __name__ == '__main__':
    main()