钉钉机器人stream模式实现互动对话

lincat · 2024-9-14 12:55:09 · 121 次点击

事情的起因是因为逛论坛的时候,看到有佬的帖子提供了一个随机小姐姐的API,然后想着能不能通过艾特钉钉机器人通过指令实现推送视频链接。然后就通过搜索以及claude实现了一个。



说明:



1.本人对python语言不熟悉,试过通过nodejs开发,但是本地调试一直网络连接错误,所以放弃选择了python,再通过Dingtalk官方文档,以及claude的帮助下实现的。

2.这个帖子只是抛砖引玉的,后续还想实现钉钉机器人卡片交互的(有没有佬实现了的,求教 :see_no_evil:)。官方文档: 互动卡片 - 概述

3.stream模式开发的机器人只支持组织内部使用,也就是开发后只能在内部群机器人里添加。官方文档:服务端stream模式。不需要公网ip,本地即可开发,上手简单。

4.如果不想自己实现的,想通过API实现一个chatgpt的钉钉机器人,可以点这个 eryajf/chatgpt-dingtalk

5.慎重使用,不要涉及敏感的东西。



前提:得有组织的开发权限,我的解决方案就是自己建一个组织。不需要认证。


创建应用




  1. 创建步骤参考文档:企业内部应用。在开发者后台点击右侧创建应用



    在左侧弹出框填入相应信息后点击保存即可。




  2. 创建完成后,在左侧点击添加应用能力选项,选择添加机器人(这边已经添加过了,所以显示的是配置)。



    在机器人配置中注意消息接收模式选择stream模式。




  3. 发布机器人。在左侧面板点击版本管理与发布选项,创建新版本。



    在左侧面板中凭证与基础信息中包含开发需要的Client IDClient Secret




本地开发


直接上代码,本地运行即可。



  1. 先执行安装代码需要的依赖库。


pip install -r requirements.txt


  1. requirements.txt文件如下:


beautifulsoup4==4.12.3
dingtalk_stream==0.21.0
Flask==3.0.3
jsonpath_ng==1.6.1
Requests==2.32.3
lxml==5.3.0


  1. 执行python代码:


from flask import Flask, request, jsonify
import logging
import threading
import json
import random
import re
from bs4 import BeautifulSoup
from jsonpath_ng import jsonpath, parse
from datetime import datetime
import dingtalk_stream
from dingtalk_stream import AckMessage
import requests
import signal
import sys

app = Flask(__name__)
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# 钉钉应用的App Key和App Secret
APP_KEY = "your_client_id"
APP_SECRET = "your_client_secret"
# 和风天气api
WEATHER_KEY = "和风天气_key"

# 请求头
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8",
"Accept-Language": "en-US,en;q=0.5",
"Connection": "keep-alive",
"Upgrade-Insecure-Requests": "1",
}

# 定义事件处理器
class MyEventHandler(dingtalk_stream.EventHandler):
async def process(self, event: dingtalk_stream.EventMessage):
print(event.headers.event_type, event.headers.event_id, event.headers.event_born_time, event.data)
return AckMessage.STATUS_OK, "OK"


# 发送markdown文本消息到钉钉群
class SendMarkdownHandler(dingtalk_stream.ChatbotHandler):
# 判断数据是否存在,则调用self.reply_markdown,否则调用self.reply_text
def reply_wrapper(self, title, content, incoming_message):
if content:
self.reply_markdown(title, content, incoming_message)
else:
self.reply_text(f"获取{title}失败", incoming_message)

async def process(self, callback: dingtalk_stream.CallbackMessage):
# 处理接收到的回调消息
incoming_message = dingtalk_stream.ChatbotMessage.from_dict(callback.data)
# logger.info(f"Received callback: {callback.headers.topic}, {callback.data}")

text_content = incoming_message.text.content.strip()
# 帮助
if text_content in ["帮助", "help"]:
markdown_text = help_info()
self.reply_markdown("帮助信息", markdown_text, incoming_message)
# coser
elif text_content == "coser":
markdown_text = fetch_coser_data()
self.reply_wrapper("coser", markdown_text, incoming_message)
elif text_content == "今日必应":
markdown_text = fetch_bing_data()
self.reply_wrapper("今日必应", markdown_text, incoming_message)
elif text_content == "摸鱼":
markdown_text = fetch_moyu_data()
self.reply_wrapper("摸鱼人日历", markdown_text, incoming_message)
elif text_content == "随机小姐姐":
markdown_text = fetch_sister_data()
self.reply_wrapper("随机小姐姐", markdown_text, incoming_message)
elif match := re.match(r"^(.+)天气$", text_content):
city = match.group(1)
geo = get_city_geo(city)
if geo:
markdown_text = fetch_hourly_weather(geo)
self.reply_wrapper(f"{city}天气", markdown_text, incoming_message)
else:
self.reply_text(f"未找到{city}的Geo信息", incoming_message)
else:
self.reply_text('未识别的指令。请发送"帮助"或"help"获取可用指令列表。', incoming_message)

return AckMessage.STATUS_OK, "OK"

# 定义帮助信息
def help_info():
markdown_text = "# 使用说明\n\n"
markdown_text = "| 指令 | 描述 |\n"
markdown_text += "| :----: | :----: |\n"
markdown_text += "| **coser** | 获取coser图片 |\n"
markdown_text += "| **随机小姐姐** | 获取随机小姐姐视频 |\n"
markdown_text += "| **今日必应** | 获取今日必应图片 |\n"
markdown_text += "| **摸鱼** | 获取摸鱼人日历 |\n"
markdown_text += "| **[城市]天气** | 获取城市天气,比如杭州天气 |\n"
return markdown_text

# 请求coser API
def fetch_coser_data():
urls = ["https://3650000.xyz/api/?type=img&mode=8&proxy=sg"]
selected_url_index = random.randint(0, len(urls) - 1)
selected_url = urls[selected_url_index]

try:
response = requests.get(selected_url, headers=headers, allow_redirects=True)
if response.status_code == 200:
soup = BeautifulSoup(response.text, "lxml")
# 提取图片 URL
img_tag = soup.find("img")
img_url = img_tag["src"]
markdown_text = f"![coser]({img_url})"
return markdown_text
else:
logger.error(f"请求coser api失败,状态码为: {response.status_code}")
return None
except requests.RequestException as e:
logger.error(f"发送coser api请求错误: {str(e)}")
return None

# 获取今日必应图片
def fetch_bing_data():
url = "https://api.vvhan.com/api/bing?type=json"
try:
response = requests.get(url)
if response.status_code == 200:
data = json.loads(response.text)
markdown_text = f"![{data['data']['title']}]({data['data']['url']})"
return markdown_text
else:
logger.error(f"请求必应图片失败,状态码为: {response.status_code}")
return None
except requests.RequestException as e:
logger.error(f"发送必应图片请求错误: {str(e)}")
return None

# 获取摸鱼人日历
def fetch_moyu_data():
url = "https://api.vvhan.com/api/moyu?type=json"
try:
response = requests.get(url)
if response.status_code == 200:
data = json.loads(response.text)
markdown_text = f"![摸鱼人日历{data['time']}]({data['url']})"
return markdown_text
else:
logger.error(f"请求摸鱼人日历失败,状态码为: {response.status_code}")
return None
except requests.RequestException as e:
logger.error(f"发送摸鱼人日历请求错误: {str(e)}")
return None

# 获取随机小姐姐日历
def fetch_sister_data():
urls = [{"url": "https://api.qqsuu.cn/api/dm-xjj2?type=json", "path": "$.video"}]
# 随机选择一个 URL
selected_url = random.choice(urls)
url = selected_url["url"]
path = selected_url["path"]
try:
response = requests.get(url)
if response.status_code == 200:
data = response.json()
jsonpath_expr = parse(path)
matches = [match.value for match in jsonpath_expr.find(data)]
if matches:
markdown_text = f"[随机小姐姐]({matches[0]})"
return markdown_text
else:
logger.error(f"未找到匹配的jsonpath: {path}")
return None
else:
logger.error(f"请求随机小姐姐失败,状态码为: {response.status_code}")
return None
except requests.RequestException as e:
logger.error(f"发送随机小姐姐请求错误: {str(e)}")
return None

# 获取城市Geo信息
def get_city_geo(city):
url = (f"https://geoapi.qweather.com/v2/city/lookup?location={city}&key={WEATHER_KEY}")
try:
response = requests.get(url)
if response.status_code == 200:
data = response.json()
if data.get("location") and len(data["location"]) > 0:
return data["location"][0]
else:
logger.error(f"未找到城市{city}的Geo信息")
return None
else:
logger.error(f"请求城市信息失败,状态码为: {response.status_code}")
return None
except requests.RequestException as e:
logger.error(f"发送城市信息请求错误: {str(e)}")
return None

# 获取指定城市的逐小时天气预报
def fetch_hourly_weather(geo):
url = (f"https://devapi.qweather.com/v7/weather/24h?location={geo['id']}&key={WEATHER_KEY}")
try:
response = requests.get(url)
if response.status_code == 200:
data = response.json()
# 构建地点名称
location_name =(f"{geo.get('adm1', '')}"f"{geo.get('adm2', '')}")
markdown_text = f"#### 猜您想找{location_name}{geo['name']},如果不是您想要的城市,请重新给出详细城市。{geo['name']}未来8小时天气预报:\n\n"
markdown_text += "---\n\n"
for item in data["hourly"][:8]:
# 将时间字符串转换为datetime对象
time = datetime.fromisoformat(item['fxTime'])
# 格式化时间字符串
formatted_time = time.strftime("%m月%d日 %H:%M")

markdown_text += f"**时间:** {formatted_time}\n\n"
markdown_text += f"**天气:** {item['text']} <img src='https://cdn.jsdelivr.net/npm/qweather-icons@1.6.0/icons/{item['icon']}.svg' alt='QWeather Icons' width='24' height='24'>\n\n"
markdown_text += f"**温度:** {item['temp']}℃\n\n"
markdown_text += f"**风向:** {item['windDir']}\n\n"
markdown_text += f"**风力等级(蒲福风级):** {item['windScale']}级\n\n"
markdown_text += f"**降水概率:** {item.get('pop',0)}%\n\n"
markdown_text += "---\n\n"
return markdown_text
else:
logger.error(f"请求逐小时天气预报失败,状态码为: {response.status_code}")
return None
except requests.RequestException as e:
logger.error(f"发送逐小时天气预报请求错误: {str(e)}")

# 启动钉钉Stream客户端
def start_dingtalk_client():
credential = dingtalk_stream.Credential(APP_KEY, APP_SECRET)
client = dingtalk_stream.DingTalkStreamClient(credential)
client.register_all_event_handler(MyEventHandler())
client.register_callback_handler(
dingtalk_stream.chatbot.ChatbotMessage.TOPIC, SendMarkdownHandler()
)
client.start_forever()

# 启动钉钉Stream客户端线程
thread = threading.Thread(target=start_dingtalk_client)
# 设置为守护线程,主线程退出时,守护线程也会退出
thread.daemon = True
thread.start()

# 捕获ctrl+c信号,退出程序
def signal_handler(signal, frame):
sys.exit(0)

# Flask路由,用于健康检查或其他用途
@app.route("/")
def index():
return "DingTalk Stream client is running."

if __name__ == "__main__":
app.run(port=13434)

你可以自己在fetch_sister_data函数里添加api地址,path中$代表的是响应数据,比如data.text.video就是你要的链接地址,那么path就是$.text.video


添加机器人


在群设置-机器人-机器人管理-添加机器人中可以添加创建好的机器人。

image


效果预览



🖼 点我查看示例图




举报· 121 次点击
登录 注册 站外分享
9 条回复  
youxiyouxi 初学 2024-9-14 12:55:09

我去 谢谢佬一点都不会Python,我先看看

hw7622 初学 2024-9-14 12:55:09

我就是用钉钉机器人接了个讯飞星火的api当玩具玩呢

handsome 限制会员 2024-9-14 12:55:09

感谢大佬教程

lal 初学 2024-9-14 12:55:09

牛的,刚好有点这种需求,去试试,感谢

jonty 小成 2024-10-30 10:44:48

正好在找企钉机器人的创建教程,感谢

返回顶部