事情的起因是因为逛论坛的时候,看到有佬的帖子提供了一个随机小姐姐的API,然后想着能不能通过艾特钉钉机器人通过指令实现推送视频链接。然后就通过搜索以及claude实现了一个。
说明:
1.本人对python语言不熟悉,试过通过nodejs开发,但是本地调试一直网络连接错误,所以放弃选择了python,再通过Dingtalk官方文档,以及claude的帮助下实现的。
2.这个帖子只是抛砖引玉的,后续还想实现钉钉机器人卡片交互的(有没有佬实现了的,求教 )。官方文档: 互动卡片 - 概述。
3.stream模式开发的机器人只支持组织内部使用,也就是开发后只能在内部群机器人里添加。官方文档:服务端stream模式。不需要公网ip,本地即可开发,上手简单。
4.如果不想自己实现的,想通过API实现一个chatgpt的钉钉机器人,可以点这个 eryajf/chatgpt-dingtalk。
5.慎重使用,不要涉及敏感的东西。
前提:得有组织的开发权限,我的解决方案就是自己建一个组织。不需要认证。
创建应用
创建步骤参考文档:企业内部应用。在开发者后台点击右侧创建应用。
在左侧弹出框填入相应信息后点击保存即可。
创建完成后,在左侧点击添加应用能力选项,选择添加机器人(这边已经添加过了,所以显示的是配置)。
在机器人配置中注意消息接收模式选择stream模式。
发布机器人。在左侧面板点击版本管理与发布选项,创建新版本。
在左侧面板中凭证与基础信息中包含开发需要的Client ID和Client Secret。
本地开发
直接上代码,本地运行即可。
- 先执行安装代码需要的依赖库。
pip install -r requirements.txt
- requirements.txt文件如下:
beautifulsoup4==4.12.3 dingtalk_stream==0.21.0 Flask==3.0.3 jsonpath_ng==1.6.1 Requests==2.32.3 lxml==5.3.0
- 执行python代码:
from flask import Flask, request, jsonify import logging import threading import json import random import re from bs4 import BeautifulSoup from jsonpath_ng import jsonpath, parse from datetime import datetime import dingtalk_stream from dingtalk_stream import AckMessage import requests import signal import sys
app = Flask(__name__) logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__)
# 钉钉应用的App Key和App Secret APP_KEY = "your_client_id" APP_SECRET = "your_client_secret" # 和风天气api WEATHER_KEY = "和风天气_key"
# 请求头 headers = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36", "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8", "Accept-Language": "en-US,en;q=0.5", "Connection": "keep-alive", "Upgrade-Insecure-Requests": "1", }
# 定义事件处理器 class MyEventHandler(dingtalk_stream.EventHandler): async def process(self, event: dingtalk_stream.EventMessage): print(event.headers.event_type, event.headers.event_id, event.headers.event_born_time, event.data) return AckMessage.STATUS_OK, "OK"
# 发送markdown文本消息到钉钉群 class SendMarkdownHandler(dingtalk_stream.ChatbotHandler): # 判断数据是否存在,则调用self.reply_markdown,否则调用self.reply_text def reply_wrapper(self, title, content, incoming_message): if content: self.reply_markdown(title, content, incoming_message) else: self.reply_text(f"获取{title}失败", incoming_message)
async def process(self, callback: dingtalk_stream.CallbackMessage): # 处理接收到的回调消息 incoming_message = dingtalk_stream.ChatbotMessage.from_dict(callback.data) # logger.info(f"Received callback: {callback.headers.topic}, {callback.data}")
text_content = incoming_message.text.content.strip() # 帮助 if text_content in ["帮助", "help"]: markdown_text = help_info() self.reply_markdown("帮助信息", markdown_text, incoming_message) # coser elif text_content == "coser": markdown_text = fetch_coser_data() self.reply_wrapper("coser", markdown_text, incoming_message) elif text_content == "今日必应": markdown_text = fetch_bing_data() self.reply_wrapper("今日必应", markdown_text, incoming_message) elif text_content == "摸鱼": markdown_text = fetch_moyu_data() self.reply_wrapper("摸鱼人日历", markdown_text, incoming_message) elif text_content == "随机小姐姐": markdown_text = fetch_sister_data() self.reply_wrapper("随机小姐姐", markdown_text, incoming_message) elif match := re.match(r"^(.+)天气$", text_content): city = match.group(1) geo = get_city_geo(city) if geo: markdown_text = fetch_hourly_weather(geo) self.reply_wrapper(f"{city}天气", markdown_text, incoming_message) else: self.reply_text(f"未找到{city}的Geo信息", incoming_message) else: self.reply_text('未识别的指令。请发送"帮助"或"help"获取可用指令列表。', incoming_message)
return AckMessage.STATUS_OK, "OK"
# 定义帮助信息 def help_info(): markdown_text = "# 使用说明\n\n" markdown_text = "| 指令 | 描述 |\n" markdown_text += "| :----: | :----: |\n" markdown_text += "| **coser** | 获取coser图片 |\n" markdown_text += "| **随机小姐姐** | 获取随机小姐姐视频 |\n" markdown_text += "| **今日必应** | 获取今日必应图片 |\n" markdown_text += "| **摸鱼** | 获取摸鱼人日历 |\n" markdown_text += "| **[城市]天气** | 获取城市天气,比如杭州天气 |\n" return markdown_text
# 请求coser API def fetch_coser_data(): urls = ["https://3650000.xyz/api/?type=img&mode=8&proxy=sg"] selected_url_index = random.randint(0, len(urls) - 1) selected_url = urls[selected_url_index]
try: response = requests.get(selected_url, headers=headers, allow_redirects=True) if response.status_code == 200: soup = BeautifulSoup(response.text, "lxml") # 提取图片 URL img_tag = soup.find("img") img_url = img_tag["src"] markdown_text = f"![coser]({img_url})" return markdown_text else: logger.error(f"请求coser api失败,状态码为: {response.status_code}") return None except requests.RequestException as e: logger.error(f"发送coser api请求错误: {str(e)}") return None
# 获取今日必应图片 def fetch_bing_data(): url = "https://api.vvhan.com/api/bing?type=json" try: response = requests.get(url) if response.status_code == 200: data = json.loads(response.text) markdown_text = f"![{data['data']['title']}]({data['data']['url']})" return markdown_text else: logger.error(f"请求必应图片失败,状态码为: {response.status_code}") return None except requests.RequestException as e: logger.error(f"发送必应图片请求错误: {str(e)}") return None
# 获取摸鱼人日历 def fetch_moyu_data(): url = "https://api.vvhan.com/api/moyu?type=json" try: response = requests.get(url) if response.status_code == 200: data = json.loads(response.text) markdown_text = f"![摸鱼人日历{data['time']}]({data['url']})" return markdown_text else: logger.error(f"请求摸鱼人日历失败,状态码为: {response.status_code}") return None except requests.RequestException as e: logger.error(f"发送摸鱼人日历请求错误: {str(e)}") return None
# 获取随机小姐姐日历 def fetch_sister_data(): urls = [{"url": "https://api.qqsuu.cn/api/dm-xjj2?type=json", "path": "$.video"}] # 随机选择一个 URL selected_url = random.choice(urls) url = selected_url["url"] path = selected_url["path"] try: response = requests.get(url) if response.status_code == 200: data = response.json() jsonpath_expr = parse(path) matches = [match.value for match in jsonpath_expr.find(data)] if matches: markdown_text = f"[随机小姐姐]({matches[0]})" return markdown_text else: logger.error(f"未找到匹配的jsonpath: {path}") return None else: logger.error(f"请求随机小姐姐失败,状态码为: {response.status_code}") return None except requests.RequestException as e: logger.error(f"发送随机小姐姐请求错误: {str(e)}") return None
# 获取城市Geo信息 def get_city_geo(city): url = (f"https://geoapi.qweather.com/v2/city/lookup?location={city}&key={WEATHER_KEY}") try: response = requests.get(url) if response.status_code == 200: data = response.json() if data.get("location") and len(data["location"]) > 0: return data["location"][0] else: logger.error(f"未找到城市{city}的Geo信息") return None else: logger.error(f"请求城市信息失败,状态码为: {response.status_code}") return None except requests.RequestException as e: logger.error(f"发送城市信息请求错误: {str(e)}") return None
# 获取指定城市的逐小时天气预报 def fetch_hourly_weather(geo): url = (f"https://devapi.qweather.com/v7/weather/24h?location={geo['id']}&key={WEATHER_KEY}") try: response = requests.get(url) if response.status_code == 200: data = response.json() # 构建地点名称 location_name =(f"{geo.get('adm1', '')}"f"{geo.get('adm2', '')}") markdown_text = f"#### 猜您想找{location_name}{geo['name']},如果不是您想要的城市,请重新给出详细城市。{geo['name']}未来8小时天气预报:\n\n" markdown_text += "---\n\n" for item in data["hourly"][:8]: # 将时间字符串转换为datetime对象 time = datetime.fromisoformat(item['fxTime']) # 格式化时间字符串 formatted_time = time.strftime("%m月%d日 %H:%M")
markdown_text += f"**时间:** {formatted_time}\n\n" markdown_text += f"**天气:** {item['text']} <img src='https://cdn.jsdelivr.net/npm/qweather-icons@1.6.0/icons/{item['icon']}.svg' alt='QWeather Icons' width='24' height='24'>\n\n" markdown_text += f"**温度:** {item['temp']}℃\n\n" markdown_text += f"**风向:** {item['windDir']}\n\n" markdown_text += f"**风力等级(蒲福风级):** {item['windScale']}级\n\n" markdown_text += f"**降水概率:** {item.get('pop',0)}%\n\n" markdown_text += "---\n\n" return markdown_text else: logger.error(f"请求逐小时天气预报失败,状态码为: {response.status_code}") return None except requests.RequestException as e: logger.error(f"发送逐小时天气预报请求错误: {str(e)}")
# 启动钉钉Stream客户端 def start_dingtalk_client(): credential = dingtalk_stream.Credential(APP_KEY, APP_SECRET) client = dingtalk_stream.DingTalkStreamClient(credential) client.register_all_event_handler(MyEventHandler()) client.register_callback_handler( dingtalk_stream.chatbot.ChatbotMessage.TOPIC, SendMarkdownHandler() ) client.start_forever()
# 启动钉钉Stream客户端线程 thread = threading.Thread(target=start_dingtalk_client) # 设置为守护线程,主线程退出时,守护线程也会退出 thread.daemon = True thread.start()
# 捕获ctrl+c信号,退出程序 def signal_handler(signal, frame): sys.exit(0)
# Flask路由,用于健康检查或其他用途 @app.route("/") def index(): return "DingTalk Stream client is running."
if __name__ == "__main__": app.run(port=13434)
你可以自己在fetch_sister_data函数里添加api地址,path中$代表的是响应数据,比如data.text.video就是你要的链接地址,那么path就是$.text.video
添加机器人
在群设置-机器人-机器人管理-添加机器人中可以添加创建好的机器人。
效果预览
🖼 点我查看示例图
|