#智能时刻的观察[超话]#
🤖 CrewAI Flows:如何赋能复杂的现实世界自动化? 🤖
🔝 #生成式AI# #大语言模型LLM# #自然语言处理NLP# #机器学习ML# #MLOps# #机器学习#
在我前的文章中,探讨了CrewAI框架的基础知识。希望已经有机会深入了解并尝试了一些令人兴奋的自动化用例!现在,让深入探讨CrewAI框架的一个激动人心的特性:CrewAI Flows!
🧩 什么是CrewAI Flows?
CrewAI Flows 是一个用于简化和管理工作流创建的功能!它让开发者能够高效地组合编码任务和"工作组(Crews)",为复杂的AI自动化构建坚实的框架。在现实世界中,工作流(Workflows)并非总是顺序或层级的,它们可能是混合(Mix)、并行(Parallel)或异步(Asynchronous)的🧬。Flows让您能够轻松管理这些复杂性,并通过集成(Integrations)(例如执行Python代码)与外部世界交互!
🔹 Flows提供结构化的、事件驱动的工作流!
🔹 它们无缝连接任务、管理状态并控制AI应用中的执行流程!
💡 核心特性:
简化的创建工作流: 轻松链接工作组(Crews)和任务,构建复杂的AI工作流!🔗
状态管理: 在任务之间毫不费力地管理和共享状态! 🧩
事件驱动架构: 实现动态且响应式的工作流!⚡
灵活的控制流: 支持条件逻辑(Conditional Logic)、循环(Loops)和分支(Branching)!🔄
灵活的输入: 处理结构化和非结构化的状态管理!📦
🎯 案例理解:销售管道线索工作流
为了更好地理解,让我们看一个涉及多个步骤的线索评分(Lead Qualification)工作流示例(图:销售管道流程图 - 描述:展示从获取线索到最终操作的步骤)。
这个工作流利用CrewAI Flows自动化线索评分和互动,将线索无缝地通过销售管道的各个阶段!
🔹 1-获取线索: 从数据库(本例使用模拟数据)检索线索📥。每条线索包括姓名、职位、公司、邮箱和用例。
🔹 2-线索评分: 使用预配置的评分工作组对获取的线索进行评估,根据特定标准分配分数并存储供后续使用!📊
🔹 3-线索过滤: 过滤掉低质量线索,只保留分数高于70的线索!确保专注于高潜力客户!✅
🔹 4-记录和分类: 📑
🔹 5-线索路由和操作: 🚦
🔹 6-低优先级线索的邮件互动: ✉️
🔧 参与的工作组:
工作组1:线索评分工作组 (Crew 1: Lead Qualification Crew) - 负责获取线索、给线索评分、根据特定标准过滤、计数线索,并根据数量决定存储、发送给销售团队跟进或直接发邮件。
工作组2:邮件互动工作组 (Crew 2: Email Engagement Crew) - 负责邮件内容创作、定义策略、起草邮件和优化内容!✍️
该流程还使用了AND、OR逻辑函数和关键的路由函数(Router Function)!例如,log_leads函数会等待filter_leads和store_leads完成(使用and_函数)。在count_leads函数之后,使用了路由功能将线索数量分为高/中/低(high/medium/low)。
# ... 代码展示工作组创建细节 (Crew 1: Lead Qualification Crew 创建代理Agents/任务Tasks/工作组Crew) ...
# ... 代码展示邮件互动工作组创建细节 (Crew 2: Email Engagement Crew) ...
# ⚙️ 核心流程:使用Flows定义销售管道工作流! ⚙️
from crewai import Flow
from crewai.flow.flow import listen, start, and_, or_, router
class SalesPipeline(Flow):
@start() # 🔥 流程起点!
def fetch_leads(self): # 从数据库获取线索 (示例用模拟数据)
leads = [{"lead_data": {"name": "John Doe", ... }}] # 示例数据
return leads
@listen(fetch_leads) # 👂 监听fetch_leads
def score_leads(self, leads):
scores = lead_scoring_crew.kickoff_for_each(leads) # 使用工作组1为每个线索评分
self.state["score_crews_results"] = scores # 🧠 保存评分到流程状态(State)!
return scores
@listen(score_leads) # 👂 监听score_leads
def store_leads_score(self, scores): # 模拟将评分存入数据库
return scores
@listen(score_leads) # 👂 监听score_leads
def filter_leads(self, scores): # 🗂️ 过滤出评分>70的线索
return [score for score in scores if score['lead_score'].score > 70]
@listen(and_(filter_leads, store_leads_score)) # 🔀 AND逻辑! 等两者完成
def log_leads(self, leads): # 📝 记录过滤和存储后的线索
print(f"Leads: {leads}")
@router(filter_leads, paths=["high", "medium", "low"]) # 🧭 路由函数! 基于线索数量分支!
def count_leads(self, scores):
count = len(scores)
if count > 10: return 'high' # 🚀 高数量
elif count > 5: return 'medium' # 📈 中数量
else: return 'low' # 📉 低数量
@listen('high') # 🧭 监听路由'high'路径
def store_in_salesforce(self, leads): # 模拟存入Salesforce(CRM)
return leads
@listen('medium') # 🧭 监听路由'medium'路径
def send_to_sales_team(self, leads): # 模拟发送给销售团队
return leads
@listen('low') # 🧭 监听路由'low'路径
def write_email(self, leads): # 利用工作组2起草邮件
scored_leads = [lead.to_dict() for lead in leads]
emails = email_writing_crew.kickoff_for_each(scored_leads) # ✉️ 调用邮件工作组!
return emails
@listen(write_email) # 👂 监听write_email
def send_email(self, emails): # 模拟发送邮件
return emails
🔋 强大的状态访问 & 成本分析!
流程执行后,状态对象(flow.state)保存了所有会话相关细节(包括调用消耗)!可以轻松进行成本分析:
import pandas as pd
# 从状态中获取评分结果中的第一个结果的token使用量 (示例)
df_usage_metrics = pd.DataFrame([flow.state["score_crews_results"][0].token_usage.dict()])
# 计算总成本 (示例定价模型)
costs = 0.150 * df_usage_metrics['total_tokens'].sum() / 1_000_000
print(f"Total costs: ${costs:.4f}") # 💰 输出示例 (本次运行成本: $0.0156)
df_usage_metrics # (图:显示Token消耗的DataFrame表格)
✨结论:
CrewAI Flows 使开发者能够以更高的效率和更强的控制力来创建和管理复杂的AI工作流!这一强大特性为构建智能AI应用开启了自动化和灵活性的新高度!
👉 想尝试创建自己的AI自动化流程吗? 👉 了解更多AI技术与应用,关注 @智能时刻 获取前沿动态!🧠
💬 欢迎评论区交流你的想法或自动化需求! 💬 🔁 转发给需要的开发者和AI探索者吧! 🔁
🤝 加入【智能时刻的铁粉群】智能时刻的铁粉群 与同道中人交流经验! 🤝
#AI创造营 #ai探索计划 #AI学习营 #AI打工人 #热点科普 #CrewAI #人工智能开发 #自动化 #大语言模型应用#
