之前

之前宝塔自动续签失效两次: 宝塔不能自动续签的 bug 修复

本以为已经好了,直到今天又失效,算你厉害,用不起。

https://i.imgur.com/lJfJ0d1.png

更新宝塔还是没用,坑爹。

我不理解: 比续签更复杂你们都能做,怎么到续签就出问题了。

不仅仅我一个人续签失败,挺多人都是这样。

我理解不了啊,你们是不是故意的???

通过 cursor 来写

给 ai 的:

我要自动续签 nginx 的证书,服务器是用的宝塔。
1. 有个 domains 变量,是一个列表
   域名有:
     - c.com
     - www.a.com b.com
     - a-admin.com v.xx.com ...
2. http 请求所有域名,根据域名证书是否小于 30 天,小于 30 天判定为过期。
3. 利用/home/xxx/acme.sh 来申请证书,使用阿里云的 DNS 解析。AccessKey:xxx  SecretKey:123456
4.  最后更新到 nginx 。

模型用的是 auto-select,给了屎一样的代码。

还说我 python 版低(我 3.12.3 ),也不知道用啥模型了,手动选择 3.7 才能用。

代码

改下配置就能用

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import ssl
import socket
import datetime
import subprocess
import os
import time
from typing import List, Tuple

# 域名列表
domains = [
    "a.com,www.a.com", 
    "admin.b.com,x.b.com",
    "c.com"
]

# 阿里云 DNS 配置
ALIYUN_ACCESS_KEY = "xxx"
ALIYUN_SECRET_KEY = "xx"

def check_cert_expiry(domain: str) -> Tuple[bool, int]:
    """
    检查证书是否过期
    返回: (是否过期, 剩余天数)
    对于多域名证书,检查每个域名并返回最短的剩余天数
    """
    try:
        # 处理多域名情况,逗号分隔的域名
        if ',' in domain:
            domains_list = domain.split(',')
            min_days_left = float('inf')  # 设置初始值为无穷大
            all_results = []
            
            # 检查每个域名
            for single_domain in domains_list:
                single_domain = single_domain.strip()
                expired, days = check_cert_expiry(single_domain)
                all_results.append((single_domain, expired, days))
                if days < min_days_left:
                    min_days_left = days
            
            # 打印所有域名的结果
            for single_domain, expired, days in all_results:
                print(f"  - 子域名 {single_domain} 剩余天数: {days}")
            
            # 如果最小天数小于 30 ,则需要续签
            return min_days_left < 30, min_days_left
            
        # 使用外部命令获取证书信息
        cmd = f"echo | openssl s_client -connect {domain}:443 -servername {domain} 2>/dev/null | openssl x509 -noout -dates"
        result = subprocess.run(cmd, shell=True, capture_output=True, text=True)
        
        if result.returncode != 0:
            print(f"检查域名 {domain} 证书时出错: 无法连接或获取证书")
            return True, 0
        
        # 解析输出找到过期日期
        output = result.stdout
        not_after_line = [line for line in output.splitlines() if line.startswith('notAfter=')]
        
        if not not_after_line:
            print(f"检查域名 {domain} 证书时出错: 无法获取过期时间")
            return True, 0
            
        # 解析日期格式,例如: notAfter=May 30 12:00:00 2023 GMT
        date_str = not_after_line[0].split('=')[1]
        expires_date = datetime.datetime.strptime(date_str, '%b %d %H:%M:%S %Y %Z')
        days_left = (expires_date - datetime.datetime.now()).days
        
        print(f"域名 {domain} 证书到期日期: {expires_date.strftime('%Y-%m-%d')}, 剩余天数: {days_left}")
        return days_left < 30, days_left
    except Exception as e:
        print(f"检查域名 {domain} 证书时出错: {str(e)}")
        return True, 0  # 如果无法检查,默认为需要续签

def set_ali_env():
    """
    设置阿里云 DNS API 的环境变量
    """
    os.environ['Ali_Key'] = ALIYUN_ACCESS_KEY
    os.environ['Ali_Secret'] = ALIYUN_SECRET_KEY

def check_dns_record_exists(domain: str) -> bool:
    """
    检查指定域名的 DNS 验证记录是否存在
    """
    try:
        # 设置环境变量
        set_ali_env()
        
        # 验证记录的域名前缀
        acme_challenge = f"_acme-challenge.{domain}"
        
        # 使用阿里云 CLI 查询记录
        cmd = f"aliyun alidns DescribeDomainRecords --DomainName {domain.split('.')[-2]}.{domain.split('.')[-1]} --RRKeyWord _acme-challenge --Type TXT"
        result = subprocess.run(cmd, shell=True, capture_output=True, text=True)
        
        # 检查输出中是否包含记录
        return acme_challenge in result.stdout
    except Exception as e:
        print(f"检查 DNS 记录时出错: {str(e)}")
        # 如果无法确定,假设记录存在,以确保安全
        return True

def renew_cert(domain: str) -> bool:
    """
    使用 acme.sh 续签证书
    支持多域名证书申请
    """
    try:
        # 先设置环境变量
        set_ali_env()
        
        acme_path = "/home/xxx/acme.sh"
        
        # 确保 acme.sh 有执行权限
        os.chmod(acme_path, 0o755)
        
        # 处理多域名情况
        domain_params = ""
        main_domain = ""
        if ',' in domain:
            domains_list = domain.split(',')
            main_domain = domains_list[0].strip()
            domain_params = f"-d {main_domain}"
            
            # 添加其他域名
            for alt_domain in domains_list[1:]:
                alt_domain = alt_domain.strip()
                domain_params += f" -d {alt_domain}"
        else:
            main_domain = domain
            domain_params = f"-d {domain}"
        
        # 检查并清理 DNS 记录
        needs_cleanup = False
        
        # 检查主域名
        if check_dns_record_exists(main_domain):
            print(f"域名 {main_domain} 存在 DNS 验证记录,需要清理")
            needs_cleanup = True
            # 清理主域名
            cleanup_cmd = f"{acme_path}/acme.sh --cleanup --domain {main_domain} --dns dns_ali"
            print(f"执行清理命令: {cleanup_cmd}")
            cleanup_process = subprocess.run(cleanup_cmd, shell=True, capture_output=True, text=True)
            print(f"清理结果: {cleanup_process.stdout}")
        else:
            print(f"域名 {main_domain} 不存在 DNS 验证记录,无需清理")
        
        # 检查其他域名
        if ',' in domain:
            for alt_domain in domain.split(',')[1:]:
                alt_domain = alt_domain.strip()
                if check_dns_record_exists(alt_domain):
                    print(f"域名 {alt_domain} 存在 DNS 验证记录,需要清理")
                    needs_cleanup = True
                    # 清理其他域名
                    alt_cleanup_cmd = f"{acme_path}/acme.sh --cleanup --domain {alt_domain} --dns dns_ali"
                    print(f"执行清理命令: {alt_cleanup_cmd}")
                    alt_cleanup_process = subprocess.run(alt_cleanup_cmd, shell=True, capture_output=True, text=True)
                    print(f"清理结果: {alt_cleanup_process.stdout}")
                else:
                    print(f"域名 {alt_domain} 不存在 DNS 验证记录,无需清理")
        
        # 如果进行了清理,等待 DNS 记录更新
        if needs_cleanup:
            print("等待 DNS 记录清理完成...")
            time.sleep(30)  # 等待 30 秒确保 DNS 记录已清理
        
        # 执行续签命令,明确指定使用 Let's Encrypt
        cmd = f"{acme_path}/acme.sh --issue --dns dns_ali {domain_params} --keylength 2048 --force --dnssleep 120 --server letsencrypt"
        print(f"执行命令: {cmd}")
        
        process = subprocess.Popen(
            cmd, 
            shell=True,
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE,
            text=True
        )
        
        # 获取输出
        stdout, stderr = process.communicate()
        
        if process.returncode == 0:
            print(f"续签输出: {stdout}")
            return True
        else:
            print(f"续签错误: {stderr}")
            
            # 如果仍然失败,尝试完全移除证书再重新申请
            if "DNS record already exists" in stderr:
                print("尝试完全移除证书后重新申请...")
                
                # 移除证书
                for d in domain.split(','):
                    d = d.strip()
                    remove_cmd = f"{acme_path}/acme.sh --remove -d {d}"
                    print(f"执行移除命令: {remove_cmd}")
                    subprocess.run(remove_cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
                
                # 再次等待
                print("等待 DNS 记录更新...")
                time.sleep(30)
                
                # 重新申请
                reissue_cmd = f"{acme_path}/acme.sh --issue --dns dns_ali {domain_params} --keylength 2048 --force --dnssleep 180 --server letsencrypt"
                print(f"执行重新申请命令: {reissue_cmd}")
                
                reissue_process = subprocess.Popen(
                    reissue_cmd, 
                    shell=True,
                    stdout=subprocess.PIPE,
                    stderr=subprocess.PIPE,
                    text=True
                )
                
                reissue_stdout, reissue_stderr = reissue_process.communicate()
                
                if reissue_process.returncode == 0:
                    print(f"重新申请成功: {reissue_stdout}")
                    return True
                else:
                    print(f"重新申请失败: {reissue_stderr}")
                    return False
            
            return False
            
    except Exception as e:
        print(f"续签域名 {domain} 证书时出错: {str(e)}")
        return False

def deploy_cert(domain: str) -> bool:
    """
    部署证书到 Nginx
    支持多域名证书部署
    """
    try:
        acme_path = "/home/xxx/acme.sh"
        
        # 处理多域名情况,使用第一个域名作为主域名
        main_domain = domain.split(',')[0].strip() if ',' in domain else domain
        
        # 证书安装路径
        nginx_cert_path = f"/www/server/panel/vhost/cert/{main_domain}"
        
        # 确保目录存在
        os.makedirs(nginx_cert_path, exist_ok=True)
        
        # 部署证书
        cmd = f"{acme_path}/acme.sh --install-cert -d {main_domain} " \
              f"--key-file {nginx_cert_path}/privkey.pem " \
              f"--fullchain-file {nginx_cert_path}/fullchain.pem " 
            #   f"\ --reloadcmd 'service nginx force-reload'"  利用宝塔重启,而不是 acme.sh 重启
        print(f"执行命令: {cmd}")
        
        process = subprocess.Popen(
            cmd, 
            shell=True,
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE,
            text=True
        )
        
        # 获取输出
        stdout, stderr = process.communicate()
        
        if process.returncode == 0:
            print(f"部署输出: {stdout}")
            return True
        else:
            print(f"部署错误: {stderr}")
            return False
            
    except Exception as e:
        print(f"部署域名 {domain} 证书时出错: {str(e)}")
        return False

def update_nginx():
    """
    更新 Nginx 配置并重启服务
    """
    try:
        # 使用宝塔命令重载 Nginx
        print("重载 Nginx 配置...")
        reload_cmd = "bt reload nginx"
        reload_process = subprocess.Popen(
            reload_cmd, 
            shell=True,
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE,
            text=True
        )
        
        reload_stdout, reload_stderr = reload_process.communicate()
        
        if reload_process.returncode != 0:
            print(f"Nginx 重载错误: {reload_stderr}")
            return False
            
        # 完全重启 Nginx 以确保证书生效
        print("重启 Nginx 服务...")
        restart_cmd = "bt restart nginx"
        restart_process = subprocess.Popen(
            restart_cmd, 
            shell=True,
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE,
            text=True
        )
        
        restart_stdout, restart_stderr = restart_process.communicate()
        
        if restart_process.returncode == 0:
            print(f"Nginx 重启成功: {restart_stdout}")
            return True
        else:
            print(f"Nginx 重启错误: {restart_stderr}")
            return False
    except Exception as e:
        print(f"更新和重启 Nginx 时出错: {str(e)}")
        return False

def main():
    print(f"开始检查证书状态 - {datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
    domains_to_renew = []
    
    # 检查所有域名的证书状态
    for domain in domains:
        print(f"检查域名: {domain}")
        is_expired, days_left = check_cert_expiry(domain)
        if is_expired:
            print(f"域名 {domain} 证书将在 {days_left} 天后过期,需要续签")
            domains_to_renew.append(domain)
        else:
            print(f"域名 {domain} 证书还有 {days_left} 天过期,无需续签")
    
    if not domains_to_renew:
        print("所有证书都在有效期内,无需续签")
        return
    
    # 续签需要更新的证书
    renewed_domains = []
    for domain in domains_to_renew:
        print(f"\n 正在续签域名 {domain} 的证书...")
        if renew_cert(domain):
            print(f"域名 {domain} 证书续签成功")
            # 部署证书
            if deploy_cert(domain):
                print(f"域名 {domain} 证书部署成功")
                renewed_domains.append(domain)
            else:
                print(f"域名 {domain} 证书部署失败")
        else:
            print(f"域名 {domain} 证书续签失败")
            
    # 如果有证书被续签并部署,更新 Nginx 配置
    if renewed_domains:
        print("\n 正在更新 Nginx 配置...")
        if update_nginx():
            print("Nginx 配置更新成功")
        else:
            print("Nginx 配置更新失败")
    
    print(f"\n 证书续签任务完成 - {datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
    print(f"已续签的域名: {', '.join(renewed_domains) if renewed_domains else '无'}")

def force_renew_all():
    """
    强制更新所有域名的证书,用于测试
    """
    print(f"开始强制更新所有证书 - {datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
    
    # 续签所有域名的证书
    renewed_domains = []
    for domain in domains:
        print(f"\n 正在更新域名 {domain} 的证书...")
        if renew_cert(domain):
            print(f"域名 {domain} 证书更新成功")
            # 部署证书
            if deploy_cert(domain):
                print(f"域名 {domain} 证书部署成功")
                renewed_domains.append(domain)
            else:
                print(f"域名 {domain} 证书部署失败")
        else:
            print(f"域名 {domain} 证书更新失败")
    
    # 如果有证书被更新并部署,更新 Nginx 配置
    if renewed_domains:
        print("\n 正在更新 Nginx 配置...")
        if update_nginx():
            print("Nginx 配置更新成功")
        else:
            print("Nginx 配置更新失败")
    
    print(f"\n 证书更新任务完成 - {datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
    print(f"已更新的域名: {', '.join(renewed_domains) if renewed_domains else '无'}")

if __name__ == "__main__":
    import sys
    if len(sys.argv) > 1 and sys.argv[1] == '--force':
        force_renew_all()
    else:
        main()
举报· 619 次点击
登录 注册 站外分享
5 条回复  
HangoX 小成 6 天前
是会失败,很傻逼
adoal 小成 6 天前
为啥还要用 Python 写程序来干这事呢,dehydrated 或者 acme.sh 只要写个配置不就行了吗。
javalaw2010 小成 6 天前
直接 acme.sh ,我这边生产环境稳定跑好几年了。
shangfabao 初学 6 天前
同意楼上,这不重复造轮子么 acme.sh 很稳定
adoal 小成 6 天前
仔细看了一下,这段 Python 代码是在调用 acme.sh……那就更奇怪了。
返回顶部