当前位置: 首页 > news >正文

杭州 高端网站建设 推荐网站搭建环境

杭州 高端网站建设 推荐,网站搭建环境,wordpress空白主题,丹阳是哪个省目录 完整代码代码解释1. 消息的数据类:2. 创建代理人(MyAgent):3. 创建和运行代理人的运行时环境:4. 根据发送者路由消息的代理(RoutedBySenderAgent):5. 创建和运行带路由的代理&a…

目录

    • 完整代码
    • 代码解释
      • 1. 消息的数据类:
      • 2. 创建代理人(MyAgent):
      • 3. 创建和运行代理人的运行时环境:
      • 4. 根据发送者路由消息的代理(RoutedBySenderAgent):
      • 5. 创建和运行带路由的代理:
      • 6. 内外代理的通信(InnerAgent 和 OuterAgent)
      • 7. 运行内外代理
      • 8. 消息广播
        • 1. ReceivingAgent:接收消息的代理
        • 2. BroadcastingAgent:发布消息的代理
        • 3. 注册代理并处理消息
        • 4. 默认主题与订阅
        • 5. 注册代理并启动消息发布
    • 类似例子

完整代码

from dataclasses import dataclass@dataclass
class TextMessage:content: strsource: str@dataclass
class ImageMessage:url: strsource: str
from autogen_core import AgentId, MessageContext, RoutedAgent, SingleThreadedAgentRuntime, message_handlerclass MyAgent(RoutedAgent):@message_handlerasync def on_text_message(self, message: TextMessage, ctx: MessageContext) -> None:print(f"Hello, {message.source}, you said {message.content}!")@message_handlerasync def on_image_message(self, message: ImageMessage, ctx: MessageContext) -> None:print(f"Hello, {message.source}, you sent me {message.url}!")
runtime = SingleThreadedAgentRuntime()
await MyAgent.register(runtime, "my_agent", lambda: MyAgent("My Agent"))
AgentType(type='my_agent')
runtime.start()
agent_id = AgentId("my_agent", "default")
await runtime.send_message(TextMessage(content="Hello, World!", source="User"), agent_id)
await runtime.send_message(ImageMessage(url="https://example.com/image.jpg", source="User"), agent_id)
await runtime.stop_when_idle()
Hello, User, you said Hello, World!!
Hello, User, you sent me https://example.com/image.jpg!
class RoutedBySenderAgent(RoutedAgent):@message_handler(match=lambda msg, ctx: msg.source.startswith("user1"))  # type: ignoreasync def on_user1_message(self, message: TextMessage, ctx: MessageContext) -> None:print(f"Hello from user 1 handler, {message.source}, you said {message.content}!")@message_handler(match=lambda msg, ctx: msg.source.startswith("user2"))  # type: ignoreasync def on_user2_message(self, message: TextMessage, ctx: MessageContext) -> None:print(f"Hello from user 2 handler, {message.source}, you said {message.content}!")@message_handler(match=lambda msg, ctx: msg.source.startswith("user2"))  # type: ignoreasync def on_image_message(self, message: ImageMessage, ctx: MessageContext) -> None:print(f"Hello, {message.source}, you sent me {message.url}!")
runtime = SingleThreadedAgentRuntime()
await RoutedBySenderAgent.register(runtime, "my_agent", lambda: RoutedBySenderAgent("Routed by sender agent"))
runtime.start()
agent_id = AgentId("my_agent", "default")
await runtime.send_message(TextMessage(content="Hello, World!", source="user1-test"), agent_id)
await runtime.send_message(TextMessage(content="Hello, World!", source="user2-test"), agent_id)
await runtime.send_message(ImageMessage(url="https://example.com/image.jpg", source="user1-test"), agent_id)
await runtime.send_message(ImageMessage(url="https://example.com/image.jpg", source="user2-test"), agent_id)
await runtime.stop_when_idle()
Hello from user 1 handler, user1-test, you said Hello, World!!
Hello from user 2 handler, user2-test, you said Hello, World!!
Hello, user2-test, you sent me https://example.com/image.jpg!
from dataclasses import dataclassfrom autogen_core import MessageContext, RoutedAgent, SingleThreadedAgentRuntime, message_handler@dataclass
class Message:content: strclass InnerAgent(RoutedAgent):@message_handlerasync def on_my_message(self, message: Message, ctx: MessageContext) -> Message:return Message(content=f"Hello from inner, {message.content}")class OuterAgent(RoutedAgent):def __init__(self, description: str, inner_agent_type: str):super().__init__(description)self.inner_agent_id = AgentId(inner_agent_type, self.id.key)@message_handlerasync def on_my_message(self, message: Message, ctx: MessageContext) -> None:print(f"Received message: {message.content}")# Send a direct message to the inner agent and receves a response.response = await self.send_message(Message(f"Hello from outer, {message.content}"), self.inner_agent_id)print(f"Received inner response: {response.content}")
runtime = SingleThreadedAgentRuntime()
await InnerAgent.register(runtime, "inner_agent", lambda: InnerAgent("InnerAgent"))
await OuterAgent.register(runtime, "outer_agent", lambda: OuterAgent("OuterAgent", "inner_agent"))
runtime.start()
outer_agent_id = AgentId("outer_agent", "default")
await runtime.send_message(Message(content="Hello, World!"), outer_agent_id)
await runtime.stop_when_idle()
Received message: Hello, World!
Received inner response: Hello from inner, Hello from outer, Hello, World!
from autogen_core import RoutedAgent, message_handler, type_subscription@type_subscription(topic_type="default")
class ReceivingAgent(RoutedAgent):@message_handlerasync def on_my_message(self, message: Message, ctx: MessageContext) -> None:print(f"Received a message: {message.content}")
from autogen_core import TopicIdclass BroadcastingAgent(RoutedAgent):@message_handlerasync def on_my_message(self, message: Message, ctx: MessageContext) -> None:await self.publish_message(Message("Publishing a message from broadcasting agent!"),topic_id=TopicId(type="default", source=self.id.key),)
from autogen_core import TypeSubscriptionruntime = SingleThreadedAgentRuntime()# Option 1: with type_subscription decorator
# The type_subscription class decorator automatically adds a TypeSubscription to
# the runtime when the agent is registered.
await ReceivingAgent.register(runtime, "receiving_agent", lambda: ReceivingAgent("Receiving Agent"))# Option 2: with TypeSubscription
await BroadcastingAgent.register(runtime, "broadcasting_agent", lambda: BroadcastingAgent("Broadcasting Agent"))
await runtime.add_subscription(TypeSubscription(topic_type="default", agent_type="broadcasting_agent"))# Start the runtime and publish a message.
runtime.start()
await runtime.publish_message(Message("Hello, World! From the runtime!"), topic_id=TopicId(type="default", source="default")
)
await runtime.stop_when_idle()
Received a message: Hello, World! From the runtime!
Received a message: Publishing a message from broadcasting agent!
from autogen_core import DefaultTopicId, default_subscription@default_subscription
class BroadcastingAgentDefaultTopic(RoutedAgent):@message_handlerasync def on_my_message(self, message: Message, ctx: MessageContext) -> None:# Publish a message to all agents in the same namespace.await self.publish_message(Message("Publishing a message from broadcasting agent!"),topic_id=DefaultTopicId(),)
runtime = SingleThreadedAgentRuntime()
await BroadcastingAgentDefaultTopic.register(runtime, "broadcasting_agent", lambda: BroadcastingAgentDefaultTopic("Broadcasting Agent")
)
await ReceivingAgent.register(runtime, "receiving_agent", lambda: ReceivingAgent("Receiving Agent"))
runtime.start()
await runtime.publish_message(Message("Hello, World! From the runtime!"), topic_id=DefaultTopicId())
await runtime.stop_when_idle()
Received a message: Hello, World! From the runtime!
Received a message: Publishing a message from broadcasting agent!

代码解释

1. 消息的数据类:

from dataclasses import dataclass@dataclass
class TextMessage:content: strsource: str@dataclass
class ImageMessage:url: strsource: str

这段代码定义了两个数据类:TextMessage 和 ImageMessage。它们分别用于表示文本消息和图片消息。

  • TextMessage 包含两个字段:content(消息内容)和 source(发送者)。
  • ImageMessage 包含 url(图片的链接地址)和 source(发送者)。

2. 创建代理人(MyAgent):

from autogen_core import AgentId, MessageContext, RoutedAgent, SingleThreadedAgentRuntime, message_handlerclass MyAgent(RoutedAgent):@message_handlerasync def on_text_message(self, message: TextMessage, ctx: MessageContext) -> None:print(f"Hello, {message.source}, you said {message.content}!")@message_handlerasync def on_image_message(self, message: ImageMessage, ctx: MessageContext) -> None:print(f"Hello, {message.source}, you sent me {message.url}!")

MyAgent 继承自 RoutedAgent 类,并定义了两个消息处理器:

  • on_text_message:处理 TextMessage 消息,打印出发送者和内容。
  • on_image_message:处理 ImageMessage 消息,打印出发送者和图片的链接地址。

3. 创建和运行代理人的运行时环境:

runtime = SingleThreadedAgentRuntime()
await MyAgent.register(runtime, "my_agent", lambda: MyAgent("My Agent"))
runtime.start()
agent_id = AgentId("my_agent", "default")
await runtime.send_message(TextMessage(content="Hello, World!", source="User"), agent_id)
await runtime.send_message(ImageMessage(url="https://example.com/image.jpg", source="User"), agent_id)
await runtime.stop_when_idle()

这段代码启动了一个单线程的运行时环境 SingleThreadedAgentRuntime

  • 注册了 MyAgent 代理,代理名称为 my_agent
  • 使用 runtime.send_message 向代理发送了一条文本消息和一条图片消息。
  • 最后,运行时环境会在空闲时停止。

4. 根据发送者路由消息的代理(RoutedBySenderAgent):

class RoutedBySenderAgent(RoutedAgent):@message_handler(match=lambda msg, ctx: msg.source.startswith("user1"))  # type: ignoreasync def on_user1_message(self, message: TextMessage, ctx: MessageContext) -> None:print(f"Hello from user 1 handler, {message.source}, you said {message.content}!")@message_handler(match=lambda msg, ctx: msg.source.startswith("user2"))  # type: ignoreasync def on_user2_message(self, message: TextMessage, ctx: MessageContext) -> None:print(f"Hello from user 2 handler, {message.source}, you said {message.content}!")@message_handler(match=lambda msg, ctx: msg.source.startswith("user2"))  # type: ignoreasync def on_image_message(self, message: ImageMessage, ctx: MessageContext) -> None:print(f"Hello, {message.source}, you sent me {message.url}!")

RoutedBySenderAgent 继承自 RoutedAgent,并通过 message_handler 装饰器根据消息的发送者 (source) 来路由消息。

  • 如果消息来自以 user1 开头的发送者,它会调用 on_user1_message 处理文本消息。
  • 如果消息来自以 user2 开头的发送者,它会调用 on_user2_message 处理文本消息。
  • 如果是来自 user2 的图片消息,则调用 on_image_message 进行处理。

5. 创建和运行带路由的代理:

runtime = SingleThreadedAgentRuntime()
await RoutedBySenderAgent.register(runtime, "my_agent", lambda: RoutedBySenderAgent("Routed by sender agent"))
runtime.start()
agent_id = AgentId("my_agent", "default")
await runtime.send_message(TextMessage(content="Hello, World!", source="user1-test"), agent_id)
await runtime.send_message(TextMessage(content="Hello, World!", source="user2-test"), agent_id)
await runtime.send_message(ImageMessage(url="https://example.com/image.jpg", source="user1-test"), agent_id)
await runtime.send_message(ImageMessage(url="https://example.com/image.jpg", source="user2-test"), agent_id)
await runtime.stop_when_idle()

在这个部分,运行时环境启动并注册了 RoutedBySenderAgent 代理。
发送了不同来源的文本消息和图片消息。
根据消息的 source 字段,消息会被路由到对应的处理函数。
最终,代理会打印不同的响应信息,取决于消息的发送者。

6. 内外代理的通信(InnerAgent 和 OuterAgent)

@dataclass
class Message:content: strclass InnerAgent(RoutedAgent):@message_handlerasync def on_my_message(self, message: Message, ctx: MessageContext) -> Message:return Message(content=f"Hello from inner, {message.content}")class OuterAgent(RoutedAgent):def __init__(self, description: str, inner_agent_type: str):super().__init__(description)self.inner_agent_id = AgentId(inner_agent_type, self.id.key)@message_handlerasync def on_my_message(self, message: Message, ctx: MessageContext) -> None:print(f"Received message: {message.content}")response = await self.send_message(Message(f"Hello from outer, {message.content}"), self.inner_agent_id)print(f"Received inner response: {response.content}")

InnerAgent 是一个简单的代理,它收到消息后,返回一个修改过的消息。

OuterAgent 也作为代理存在,并包含对 InnerAgent 的引用。它会发送消息给 InnerAgent,并等待回应。

7. 运行内外代理

runtime = SingleThreadedAgentRuntime()
await InnerAgent.register(runtime, "inner_agent", lambda: InnerAgent("InnerAgent"))
await OuterAgent.register(runtime, "outer_agent", lambda: OuterAgent("OuterAgent", "inner_agent"))
runtime.start()
outer_agent_id = AgentId("outer_agent", "default")
await runtime.send_message(Message(content="Hello, World!"), outer_agent_id)
await runtime.stop_when_idle()

这个部分启动了 InnerAgentOuterAgent 代理。

OuterAgent 会发送一条消息给 InnerAgent,并接收到来自 InnerAgent 的响应。

最终,输出显示了 OuterAgent 接收到的消息以及 InnerAgent 的响应。

8. 消息广播

这一部分涉及了通过 topic 和 subscription 实现的消息广播和接收机制。它展示了如何通过 topic 来发布消息以及如何订阅消息。

1. ReceivingAgent:接收消息的代理
from autogen_core import RoutedAgent, message_handler, type_subscription@type_subscription(topic_type="default")
class ReceivingAgent(RoutedAgent):@message_handlerasync def on_my_message(self, message: Message, ctx: MessageContext) -> None:print(f"Received a message: {message.content}")

ReceivingAgent 继承自 RoutedAgent,并使用了 @type_subscription(topic_type="default") 装饰器。这个装饰器将代理与某个类型的 topic(本例中是 default)关联。

当收到来自 default 主题的消息时,代理会调用 on_my_message 方法,并打印出消息内容。

2. BroadcastingAgent:发布消息的代理
from autogen_core import TopicIdclass BroadcastingAgent(RoutedAgent):@message_handlerasync def on_my_message(self, message: Message, ctx: MessageContext) -> None:await self.publish_message(Message("Publishing a message from broadcasting agent!"),topic_id=TopicId(type="default", source=self.id.key),)

BroadcastingAgent 也继承自 RoutedAgent,它定义了一个消息处理方法,当收到消息时,它会发布一条新的消息,消息内容为 “Publishing a message from broadcasting agent!”。

这条消息会发布到 default 主题,且 topic_id 包含代理的 source(即 self.id.key)作为源。

3. 注册代理并处理消息
from autogen_core import TypeSubscriptionruntime = SingleThreadedAgentRuntime()# Option 1: with type_subscription decorator
await ReceivingAgent.register(runtime, "receiving_agent", lambda: ReceivingAgent("Receiving Agent"))# Option 2: with TypeSubscription
await BroadcastingAgent.register(runtime, "broadcasting_agent", lambda: BroadcastingAgent("Broadcasting Agent"))
await runtime.add_subscription(TypeSubscription(topic_type="default", agent_type="broadcasting_agent"))# Start the runtime and publish a message.
runtime.start()
await runtime.publish_message(Message("Hello, World! From the runtime!"), topic_id=TopicId(type="default", source="default")
)
await runtime.stop_when_idle()

runtime 创建了一个单线程的运行时环境。

选项 1:通过 @type_subscription 装饰器,ReceivingAgent 被注册到运行时,它会订阅 default 主题。

选项 2BroadcastingAgent 注册后,通过 runtime.add_subscription() 将其与 default 主题的消息订阅关联起来。

runtime.start() 启动运行时并发布了一条消息 “Hello, World! From the runtime!”,这条消息会被所有订阅 default 主题的代理接收。

由于 ReceivingAgent 订阅了 default 主题,它会接收到来自 runtime.publish_message 的消息,并输出 “Received a message: Hello, World! From the runtime!”。

之后,BroadcastingAgent 发布了一条消息 “Publishing a message from broadcasting agent!”,并发送到 default 主题。

4. 默认主题与订阅
from autogen_core import DefaultTopicId, default_subscription@default_subscription
class BroadcastingAgentDefaultTopic(RoutedAgent):@message_handlerasync def on_my_message(self, message: Message, ctx: MessageContext) -> None:# Publish a message to all agents in the same namespace.await self.publish_message(Message("Publishing a message from broadcasting agent!"),topic_id=DefaultTopicId(),)

BroadcastingAgentDefaultTopic 通过 @default_subscription 装饰器,订阅了一个特殊的默认主题(DefaultTopicId())。

当它收到消息时,它会将 “Publishing a message from broadcasting agent!” 消息发布到同一命名空间的所有代理,所有订阅该主题的代理都会收到这条消息。

5. 注册代理并启动消息发布
runtime = SingleThreadedAgentRuntime()
await BroadcastingAgentDefaultTopic.register(runtime, "broadcasting_agent", lambda: BroadcastingAgentDefaultTopic("Broadcasting Agent")
)
await ReceivingAgent.register(runtime, "receiving_agent", lambda: ReceivingAgent("Receiving Agent"))
runtime.start()
await runtime.publish_message(Message("Hello, World! From the runtime!"), topic_id=DefaultTopicId())
await runtime.stop_when_idle()
  • 注册了 BroadcastingAgentDefaultTopicReceivingAgent 两个代理。

  • 在运行时启动后,runtime.publish_message 发布了一条消息 “Hello, World! From the runtime!” 到默认主题 DefaultTopicId(),这个消息会被 ReceivingAgent 接收并打印。

  • 随后,BroadcastingAgentDefaultTopic 会发布一条消息 “Publishing a message from broadcasting agent!”,并将其广播到同一个命名空间中的所有代理,ReceivingAgent 收到这条消息并打印。

类似例子

from autogen_core import RoutedAgent, message_handler, type_subscription, TopicId# Agent that will receive status updates
@type_subscription(topic_type="status")
class StatusReceiverAgent(RoutedAgent):@message_handlerasync def on_status_update(self, message: Message, ctx: MessageContext) -> None:print(f"Status received: {message.content}")# Agent that publishes status updates
@type_subscription(topic_type="command")
class StatusPublisherAgent(RoutedAgent):@message_handlerasync def on_command_message(self, message: Message, ctx: MessageContext) -> None:# Publish a status update to the "status" topicawait self.publish_message(Message("System is up and running!"),topic_id=TopicId(type="status", source=self.id.key),)# Agent runtime and agent registration
runtime = SingleThreadedAgentRuntime()# Register the status receiver and publisher agents
await StatusReceiverAgent.register(runtime, "status_receiver", lambda: StatusReceiverAgent("Status Receiver"))
await StatusPublisherAgent.register(runtime, "status_publisher", lambda: StatusPublisherAgent("Status Publisher"))# Start the runtime and simulate sending a command to the publisher
runtime.start()# The publisher publishes a status update when it receives a message (e.g., "send status update")
await runtime.publish_message(Message("send status update"), topic_id=TopicId(type="command", source="status_publisher"))# The receiver will print the status update message
await runtime.stop_when_idle()

输出

Status received: System is up and running!

参考链接:
https://microsoft.github.io/autogen/stable/user-guide/core-user-guide/framework/message-and-communication.html

http://www.yayakq.cn/news/58006/

相关文章:

  • 南宁百度网站公司吗王也天年龄
  • 公司为什么建立网站做海外市场什么网站推广
  • 惠州网站建设方案外包郑州网页制作设计
  • 旅游网站建设的利益温州做网站老师
  • 网站公司动态做不了怎么办清华大学自动化系
  • 沧州做网站优化开发一个公司官网大概多少钱
  • 球球是哪个公司开发的seo北京公司
  • wordpress 获取所有子页面扬州seo
  • 推广网站的方法有搜索支付网站建设费怎么做账
  • 亳州网站开发公司后台管理网站开发
  • 无锡网站设计厂家门户网站建设的书籍
  • 网站建设的用途是什么意思网站城市分站织梦系统
  • 青海wap网站建设比较好专业做网站厂家
  • 企业网站建设工作室网页怎么做出来的
  • 专门做ppt背景的网站有哪些电子商务是坑人专业吗
  • 石家庄建设银行网站wordpress多站点问题
  • 做网站需要买服务器门户网站建设哪专业
  • 网站开发流程的8个步骤徐州市工程建设交易平台
  • 怎么把网站做成手机版的精准客户软件
  • 东营免费网站制作为什么wordpress有广告
  • 昆明网站seo优化试客网站 源码
  • 怎样做海外淘宝网站网站备案幕布照片
  • 做街舞网站的素材广州上宏网站建设
  • 网站如果不备案软件开发前景分析
  • 企业网站建设注意事项建筑网站招聘
  • 美发企业网站建设价格十大编程教育培训机构
  • 电商网站费用搜索引擎推广试题
  • 网站优化的价值信誉好的镇江网站优化
  • 深圳找工作的网站php网站后台密码忘记
  • 全响应网站萧山人才网手机版