智能时刻 25-12-14 21:00
微博认证:科技博主 超话主持人(AI创造营超话) 微博解说视频博主 头条文章作者

#智能时刻的观察[超话]#

🤖 ​​CrewAI Flows:如何赋能复杂的现实世界自动化?​​ 🤖

🔝 #生成式AI# #大语言模型LLM# #自然语言处理NLP# #机器学习ML# #MLOps# #机器学习#

在我前的文章中,探讨了CrewAI框架的基础知识。希望已经有机会深入了解并尝试了一些令人兴奋的自动化用例!现在,让深入探讨CrewAI框架的一个激动人心的特性:​​CrewAI Flows!​​

🧩 ​​什么是CrewAI Flows?​​
​​CrewAI Flows​​ 是一个用于简化和管理工作流创建的功能!它让开发者能够高效地组合编码任务和"工作组(Crews)",为复杂的AI自动化构建坚实的框架。在现实世界中,工作流(Workflows)并非总是顺序或层级的,它们可能是混合(Mix)、并行(Parallel)或异步(Asynchronous)的🧬。Flows让您能够轻松管理这些复杂性,并通过集成(Integrations)(例如执行Python代码)与外部世界交互!
🔹 Flows提供​​结构化的、事件驱动的工作流​​!
🔹 它们​​无缝连接任务​​、​​管理状态​​并​​控制AI应用中的执行流程​​!

💡 ​​核心特性:​​

​​简化的创建工作流:​​ 轻松链接工作组(Crews)和任务,构建复杂的AI工作流!🔗
​​状态管理:​​ 在任务之间​​毫不费力地管理和共享状态​​! 🧩
​​事件驱动架构:​​ 实现动态且响应式的工作流!⚡
​​灵活的控制流:​​ 支持条件逻辑(Conditional Logic)、循环(Loops)和分支(Branching)!🔄
​​灵活的输入:​​ 处理结构化和非结构化的状态管理!📦
🎯 ​​案例理解:销售管道线索工作流​​
为了更好地理解,让我们看一个涉及多个步骤的​​线索评分(Lead Qualification)工作流​​示例(图:销售管道流程图 - 描述:展示从获取线索到最终操作的步骤)。

这个工作流利用CrewAI Flows自动化线索评分和互动,将线索无缝地通过销售管道的各个阶段!
🔹 ​​1-获取线索:​​ 从数据库(本例使用模拟数据)检索线索📥。每条线索包括姓名、职位、公司、邮箱和用例。
🔹 ​​2-线索评分:​​ 使用预配置的评分工作组对获取的线索进行评估,根据特定标准分配分数并存储供后续使用!📊
🔹 ​​3-线索过滤:​​ 过滤掉低质量线索,只保留分数高于70的线索!确保专注于高潜力客户!✅
🔹 ​​4-记录和分类:​​ 📑
🔹 ​​5-线索路由和操作:​​ 🚦
🔹 ​​6-低优先级线索的邮件互动:​​ ✉️

🔧 ​​参与的工作组:​​

​​工作组1:线索评分工作组 (Crew 1: Lead Qualification Crew)​​ - 负责获取线索、给线索评分、根据特定标准过滤、计数线索,并根据数量决定存储、发送给销售团队跟进或直接发邮件。
​​工作组2:邮件互动工作组 (Crew 2: Email Engagement Crew)​​ - 负责邮件内容创作、定义策略、起草邮件和优化内容!✍️
该流程还使用了​​AND、OR逻辑函数​​和关键的​​路由函数(Router Function)​​!例如,log_leads函数会​​等待​​filter_leads和store_leads完成(使用and_函数)。在count_leads函数之后,使用了​​路由功能​​将线索数量分为高/中/低(high/medium/low)。

# ... 代码展示工作组创建细节 (Crew 1: Lead Qualification Crew 创建代理Agents/任务Tasks/工作组Crew) ...

# ... 代码展示邮件互动工作组创建细节 (Crew 2: Email Engagement Crew) ...

# ⚙️ 核心流程:使用Flows定义销售管道工作流! ⚙️
from crewai import Flow
from crewai.flow.flow import listen, start, and_, or_, router

class SalesPipeline(Flow):

@start() # 🔥 流程起点!
def fetch_leads(self): # 从数据库获取线索 (示例用模拟数据)
leads = [{"lead_data": {"name": "John Doe", ... }}] # 示例数据
return leads

@listen(fetch_leads) # 👂 监听fetch_leads
def score_leads(self, leads):
scores = lead_scoring_crew.kickoff_for_each(leads) # 使用工作组1为每个线索评分
self.state["score_crews_results"] = scores # 🧠 保存评分到流程状态(State)!
return scores

@listen(score_leads) # 👂 监听score_leads
def store_leads_score(self, scores): # 模拟将评分存入数据库
return scores

@listen(score_leads) # 👂 监听score_leads
def filter_leads(self, scores): # 🗂️ 过滤出评分>70的线索
return [score for score in scores if score['lead_score'].score > 70]

@listen(and_(filter_leads, store_leads_score)) # 🔀 AND逻辑! 等两者完成
def log_leads(self, leads): # 📝 记录过滤和存储后的线索
print(f"Leads: {leads}")

@router(filter_leads, paths=["high", "medium", "low"]) # 🧭 路由函数! 基于线索数量分支!
def count_leads(self, scores):
count = len(scores)
if count > 10: return 'high' # 🚀 高数量
elif count > 5: return 'medium' # 📈 中数量
else: return 'low' # 📉 低数量

@listen('high') # 🧭 监听路由'high'路径
def store_in_salesforce(self, leads): # 模拟存入Salesforce(CRM)
return leads

@listen('medium') # 🧭 监听路由'medium'路径
def send_to_sales_team(self, leads): # 模拟发送给销售团队
return leads

@listen('low') # 🧭 监听路由'low'路径
def write_email(self, leads): # 利用工作组2起草邮件
scored_leads = [lead.to_dict() for lead in leads]
emails = email_writing_crew.kickoff_for_each(scored_leads) # ✉️ 调用邮件工作组!
return emails

@listen(write_email) # 👂 监听write_email
def send_email(self, emails): # 模拟发送邮件
return emails
🔋 ​​强大的状态访问 & 成本分析!​​
流程执行后,状态对象(flow.state)保存了所有会话相关细节(包括调用消耗)!可以轻松进行成本分析:

import pandas as pd
# 从状态中获取评分结果中的第一个结果的token使用量 (示例)
df_usage_metrics = pd.DataFrame([flow.state["score_crews_results"][0].token_usage.dict()])
# 计算总成本 (示例定价模型)
costs = 0.150 * df_usage_metrics['total_tokens'].sum() / 1_000_000
print(f"Total costs: ${costs:.4f}") # 💰 输出示例 (本次运行成本: $0.0156)
df_usage_metrics # (图:显示Token消耗的DataFrame表格)

✨​​​​结论:
​​CrewAI Flows​​ 使开发者能够以更高的效率和更强的控制力来创建和管理复杂的AI工作流!这一强大特性为构建智能AI应用开启了自动化和灵活性的新高度!

👉 ​​想尝试创建自己的AI自动化流程吗?​​ 👉 了解更多AI技术与应用,关注 ​​@智能时刻​​ 获取前沿动态!🧠
💬 ​​欢迎评论区交流你的想法或自动化需求!​​ 💬 🔁 ​​转发给需要的开发者和AI探索者吧!​​ 🔁
🤝 ​​加入【智能时刻的铁粉群】智能时刻的铁粉群 与同道中人交流经验!​​ 🤝

#AI创造营 #ai探索计划 #AI学习营 #AI打工人 #热点科普 #CrewAI #人工智能开发 #自动化 #大语言模型应用#

发布于 北京