专做零食的网站平面设计画图软件
概念
LiveKit核心概念:
- Room(房间)
 - Participant(参会人)
 - Track(信息流追踪)
 
Agent 架构图
订阅信息流
agent交互流程
客户端操作
加入房间
房间创建方式
手动
赋予用户创建房间的权限,在客户的加入并创建房间。
自动
客户的指定ws_url和token,加入指定房间。
room = LiveKit.create(appContext = applicationContext)
room.connect(wsUrl, token) 
离开房间
调用 Room.disconnect() 通知 LiveKit 离开事件。如果应用程序在未通知 LiveKit 的情况下关闭,则将继续显示参与者在 Room 中 15 秒。
Swift上,当应用程序退出时,会自动调用 Room.disconnect。
发送消息
发送方式
客户端通过LocalParticipant.publishData API 向房间中的任何参与者发布任意数据消息。房间数据通过 WebRTC 数据通道发布到SFU;LiveKit 服务器会将该数据转发给聊天室中的一个或多个参与者。
给指定用户发消息,通过设置destinationIdentities ,它表示用户的身份。
// 发送消息
coroutineScope.launch {val data: ByteArray = //...// 发送有损消息给全员,LOSSY表示数据发送一次,无顺序保证。这对于优先考虑交付速度的实时更新来说是理想的选择。room.localParticipant.publishData(data, DataPublishReliability.LOSSY)// 发送可靠的消息给指定成员,RELIABLE表示发送数据时最多重试3次并保证顺序。适合优先保证交付而不是抵延迟的场景,例如室内聊天。val identities = listOf(Participant.Identity("alice"),Participant.Identity("bob"),)room.localParticipant.publishData(data, DataPublishReliability.RELIABLE, identities)
}// 处理接收到的消息
coroutineScope.launch {room.events.collect { event ->if(event is RoomEvent.DataReceived) {// Process data}}
} 
消息大小限制
由于 SCTP 协议的限制,对大于 16 KiB 的消息使用数据通道是不切实际的,包括 LiveKit 的协议包装器。我们建议将消息大小保持在 15 KiB 以下。详细了解数据通道大小限制。
消息的topic
消息可以指定topic,在接收方通过topic进行过滤出感兴趣的消息。
发送信息流
livekit默认支持摄像头、麦克风、录屏3个流,也支持用户发布自定义流的配置。
音视频流
// Turns camera track on
room.localParticipant.setCameraEnabled(true)// Turns microphone track on
room.localParticipant.setMicrophoneEnabled(true) 
录屏流
// Create an intent launcher for screen capture
// This *must* be registered prior to onCreate(), ideally as an instance val
val screenCaptureIntentLauncher = registerForActivityResult(ActivityResultContracts.StartActivityForResult()
) { result ->val resultCode = result.resultCodeval data = result.dataif (resultCode != Activity.RESULT_OK || data == null) {return@registerForActivityResult}lifecycleScope.launch {room.localParticipant.setScreenShareEnabled(true, data)}
}// When it's time to enable the screen share, perform the following
val mediaProjectionManager =getSystemService(MEDIA_PROJECTION_SERVICE) as MediaProjectionManager
screenCaptureIntentLauncher.launch(mediaProjectionManager.createScreenCaptureIntent()) 
自定义流配置
// Option 1: set room defaults
val options = RoomOptions(audioTrackCaptureDefaults = LocalAudioTrackOptions(noiseSuppression = true,echoCancellation = true,autoGainControl = true,highPassFilter = true,typingNoiseDetection = true,),videoTrackCaptureDefaults = LocalVideoTrackOptions(deviceId = "",position = CameraPosition.FRONT,captureParams = VideoPreset169.H1080.capture,),audioTrackPublishDefaults = AudioTrackPublishDefaults(audioBitrate = 20_000,dtx = true,),videoTrackPublishDefaults = VideoTrackPublishDefaults(videoEncoding = VideoPreset169.H1080.encoding,)
)
var room = LiveKit.create(...roomOptions = options,
)// Option 2: create tracks manually
val localParticipant = room.localParticipant
val audioTrack = localParticipant.createAudioTrack("audio")
localParticipant.publishAudioTrack(audioTrack)val videoTrack = localParticipant.createVideoTrack("video", LocalVideoTrackOptions(CameraPosition.FRONT,VideoPreset169.H1080.capture
))
localParticipant.publishVideoTrack(videoTrack) 
订阅信息流
默认用户进入房间,会监听所有信息流。
coroutineScope.launch {room.events.collect { event ->when(event) {is RoomEvent.TrackSubscribed -> {// Audio tracks are automatically played.val videoTrack = event.track as? VideoTrack ?: return@collectvideoTrack.addRenderer(videoRenderer)}else -> {}}}
} 
监听事件
事件分为:room事件和参与者事件。这是事件列表:
|   EVENT  |   DESCRIPTION  |   ROOM EVENT  |   PARTICIPANT EVENT  | 
|   ParticipantConnected 参与者Connected  |   A RemoteParticipant joins after the local participant.  |   ✔️  | |
|   RemoteParticipant 在本地参与者之后加入。  | |||
|   ParticipantDisconnected 参与者断开连接  |   A RemoteParticipant leaves  |   ✔️  | |
|   RemoteParticipant 离开  | |||
|   Reconnecting 重新连接  |   The connection to the server has been interrupted and it's attempting to reconnect.  |   ✔️  | |
|   与服务器的连接已中断,它正在尝试重新连接。  | |||
|   Reconnected 重新  |   Reconnection has been successful  |   ✔️  | |
|   重新连接成功  | |||
|   Disconnected 断开  |   Disconnected from room due to the room closing or unrecoverable failure  |   ✔️  | |
|   由于会议室关闭或无法恢复的故障而与会议室断开连接  | |||
|   TrackPublished 轨迹已发布  |   A new track is published to room after the local participant has joined  |   ✔️  |   ✔️  | 
|   本地参加者加入后,新轨道将发布到聊天室  | |||
|   TrackUnpublished TrackUnpublished (未发布)  |   A RemoteParticipant has unpublished a track  |   ✔️  |   ✔️  | 
|   RemoteParticipant 已取消发布轨道  | |||
|   TrackSubscribed  |   The LocalParticipant has subscribed to a track  |   ✔️  |   ✔️  | 
|   LocalParticipant 已订阅跟踪  | |||
|   TrackUnsubscribed 跟踪Unsubscribed  |   A previously subscribed track has been unsubscribed  |   ✔️  |   ✔️  | 
|   之前订阅的曲目已取消订阅  | |||
|   TrackMuted TrackMuted (轨道静音)  |   A track was muted, fires for both local tracks and remote tracks  |   ✔️  |   ✔️  | 
|   轨道已静音,本地轨道和远程轨道均触发  | |||
|   TrackUnmuted TrackUnmuted (轨道未静音)  |   A track was unmuted, fires for both local tracks and remote tracks  |   ✔️  |   ✔️  | 
|   轨道已取消静音,本地轨道和远程轨道均触发  | |||
|   LocalTrackPublished LocalTrack已发布  |   A local track was published successfully  |   ✔️  |   ✔️  | 
|   已成功发布本地轨道  | |||
|   LocalTrackUnpublished  |   A local track was unpublished  |   ✔️  |   ✔️  | 
|   本地曲目未发布  | |||
|   ActiveSpeakersChanged ActiveSpeakers已更改  |   Current active speakers has changed  |   ✔️  | |
|   当前当前活跃的发言人已更改  | |||
|   IsSpeakingChanged  |   The current participant has changed speaking status  |   ✔️  | |
|   当前参与者已更改发言状态  | |||
|   ConnectionQualityChanged 连接质量已更改  |   Connection quality was changed for a Participant  |   ✔️  |   ✔️  | 
|   参与者的连接质量已更改  | |||
|   ParticipantMetadataChanged  |   A participant's metadata was updated via server API  |   ✔️  |   ✔️  | 
|   参与者的元数据已通过服务器 API 更新  | |||
|   RoomMetadataChanged RoomMetadataChanged 的  |   Metadata associated with the room has changed  |   ✔️  | |
|   与聊天室关联的元数据已更改  | |||
|   DataReceived 已接收数据  |   Data received from another participant or server  |   ✔️  |   ✔️  | 
|   从其他参与者或服务器接收的数据  | |||
|   TrackStreamStateChanged TrackStreamStateChanged (已更改)  |   Indicates if a subscribed track has been paused due to bandwidth  |   ✔️  |   ✔️  | 
|   指示订阅的曲目是否因带宽而暂停  | |||
|   TrackSubscriptionPermissionChanged  |   One of subscribed tracks have changed track-level permissions for the current participant  |   ✔️  |   ✔️  | 
|   其中一个已订阅的轨道已更改当前参与者的轨道级别权限  | |||
|   ParticipantPermissionsChanged  |   When the current participant's permissions have changed  |   ✔️  |   ✔️  | 
|   ParticipantPermissions已更改  |   当前参与者的权限发生更改时  | 
服务端操作
生成用户token
需要LiveKit服务的API_KEY和API-SECRET,通过LiveKit API生成JWT令牌。
通过登录JWT获取到用户的信息,identify=user_id+场景,name=用户昵称(默认值),room名称=场景名(user_id)
# server.py
import os
from livekit import api
from flask import Flaskapp = Flask(__name__)@app.route('/getToken')
def getToken():token = api.AccessToken(os.getenv('LIVEKIT_API_KEY'), os.getenv('LIVEKIT_API_SECRET')) \.with_identity("identity") \.with_name("my name") \.with_grants(api.VideoGrants(room_join=True,room="my-room",))return token.to_jwt() 
开发环境可以通过CLI快速创建token:
livekit-cli token create   --api-key devkey --api-secret secret   --join --room test_room --identity test_user   --valid-for 24h 
token属性
基于JWT的令牌,包含用户身份、放假名称、功能、权限等。按照场景颁发token,也就是对应的房间。
聊天室权限在解码的加入令牌的 video 字段中指定。它可能包含以下一个或多个属性:
|   FIELD  |   TYPE  |   DESCRIPTION  | 
|   roomCreate room创建  |   bool  |   Permission to create or delete rooms  | 
|   创建或删除聊天室的权限  | ||
|   roomList roomList 会议室  |   bool  |   Permission to list available rooms  | 
|   列出可用会议室的权限  | ||
|   roomJoin room加入  |   bool  |   Permission to join a room  | 
|   加入聊天室的权限  | ||
|   roomAdmin roomAdmin 管理员  |   bool  |   Permission to moderate a room  | 
|   管理聊天室的权限  | ||
|   roomRecord roomRecord (房间记录)  |   bool  |   Permissions to use Egress service  | 
|   使用 Egress 服务的权限  | ||
|   ingressAdmin 入口管理员  |   bool 布尔  |   Permissions to use Ingress service  | 
|   Ingress 服务使用权限  | ||
|   room 房间  |   string 字符串  |   Name of the room, required if join or admin is set  | 
|   聊天室的名称,如果设置了 join 或 admin,则为必填项  | ||
|   canPublish 可以发布  |   bool 布尔  |   Allow participant to publish tracks  | 
|   允许参与者发布轨迹  | ||
|   canPublishData  |   bool 布尔  |   Allow participant to publish data to the room  | 
|   允许参与者将数据发布到聊天室  | ||
|   canPublishSources  |   string[] 字符串[]  |   When set, only listed source can be published. (camera, microphone, screen_share, screen_share_audio)  | 
|   设置后,只能发布列出的源。(摄像头、麦克风、screen_share、screen_share_audio)  | ||
|   canSubscribe canSubscribe 订阅  |   bool 布尔  |   Allow participant to subscribe to tracks  | 
|   允许参加者订阅曲目  | ||
|   canUpdateOwnMetadata  |   bool 布尔  |   Allow participant to update its own metadata  | 
|   允许参与者更新自己的元数据  | ||
|   hidden 隐藏  |   bool 布尔  |   Hide participant from others in the room  | 
|   对聊天室中的其他人隐藏参与者  | ||
|   kind 类  |   string 字符串  |   Type of participant (standard, ingress, egress, sip, or agent). this field is typically set by LiveKit internals.  | 
|   参与者类型(标准、入口、出口、SIP 或代理)。此字段通常由 LiveKit 内部设置。  | 
session断开操作
用户离开房间后,回话会结束,通过add_shutdown_callback回调,可以处理后续操作。例如:发送聊天结束事件。
async def entrypoint(ctx: JobContext):async def my_shutdown_hook():# save user state...ctx.add_shutdown_callback(my_shutdown_hook) 
Agent操作
创建Agent服务节点
LiveKit的Agent框架现在只支持python的SDK,文档地址如下:https://docs.livekit.io/agents/quickstart/
这是官方给的demo:
import asynciofrom livekit.agents import AutoSubscribe, JobContext, WorkerOptions, cli, llm
from livekit.agents.voice_assistant import VoiceAssistant
from livekit.plugins import deepgram, openai, silero# This function is the entrypoint for the agent.
async def entrypoint(ctx: JobContext):# Create an initial chat context with a system promptinitial_ctx = llm.ChatContext().append(role="system",text=("You are a voice assistant created by LiveKit. Your interface with users will be voice. ""You should use short and concise responses, and avoiding usage of unpronouncable punctuation."),)# Connect to the LiveKit room# indicating that the agent will only subscribe to audio tracksawait ctx.connect(auto_subscribe=AutoSubscribe.AUDIO_ONLY)# VoiceAssistant is a class that creates a full conversational AI agent.# See https://github.com/livekit/agents/tree/main/livekit-agents/livekit/agents/voice_assistant# for details on how it works.assistant = VoiceAssistant(vad=silero.VAD.load(),stt=deepgram.STT(),llm=openai.LLM(),tts=openai.TTS(),chat_ctx=initial_ctx,)# Start the voice assistant with the LiveKit roomassistant.start(ctx.room)await asyncio.sleep(1)# Greets the user with an initial messageawait assistant.say("Hey, how can I help you today?", allow_interruptions=True)if __name__ == "__main__":# Initialize the worker with the entrypointcli.run_app(WorkerOptions(entrypoint_fnc=entrypoint)) 
Agent的生命周期
- 当worker程序启动时,会通过websocket连接到LiveKit服务器,将自己注册成worker。一个worker下会有多个子进程(Agent)来处理请求。
 - 当用户进入房间时,LiveKit服务器通过负载均衡选择一个worker,为用户提供服务。
 - 子进程处理来自用户的消息,并给出回复。
 - 当用户退出房间时,房间滚啊比,并且断开与agent的连接。
 
Agent内部执行流程
agent在处理请求时,包含几个节点:
- request handler:判断能否处理请求,不能请求则LiveKit会讲任务交给其他worker
 - entrypoint:agent进入房间之前,执行的初始化操作
 - prewarm function:agent进程启动时调用,可以执行加载模型等耗时的操作
 
Worker类型
opts = WorkerOptions(...# when omitted, the default is JobType.JT_ROOMworker_type=JobType.JT_ROOM,
) 
JobType 枚举有两个选项:
- JT_ROOM:将为每个房间创建一个新的代理实例。
 - JT_PUBLISHER:将为房间里的每个参与者创建一个新的代理实例。
 
Agent处理请求
处理音频流
@ctx.room.on("track_subscribed")
def on_track_subscribed(track: rtc.Track,publication: rtc.TrackPublication,participant: rtc.RemoteParticipant,
):# 监听音频流if track.kind == rtc.TrackKind.KIND_AUDIO:audio_stream = rtc.AudioStream(track)async for event in audio_stream:do_something(event.frame) 
发布音频流
发布音频涉及将流拆分为长度固定的音频帧。内部缓冲区保存 50 毫秒长的音频队列,实时发送。用于发送新帧的 capture_frame 方法是阻塞的,在缓冲区接收整个帧之前阻塞在那里。这样可以更轻松地处理中断。
为了发布音轨,需要事先确定采样率和声道数,以及每帧的长度(样本数)。下面的示例是在 10ms 长帧中以 48kHz 传输恒定的 16 位正弦波:
SAMPLE_RATE = 48000
NUM_CHANNELS = 1 # mono audio
AMPLITUDE = 2 ** 8 - 1
SAMPLES_PER_CHANNEL = 480 # 10ms at 48kHzasync def entrypoint(ctx: JobContext):await ctx.connect()source = rtc.AudioSource(SAMPLE_RATE, NUM_CHANNELS)track = rtc.LocalAudioTrack.create_audio_track("example-track", source)# since the agent is a participant, our audio I/O is its "microphone"options = rtc.TrackPublishOptions(source=rtc.TrackSource.SOURCE_MICROPHONE)# ctx.agent is an alias for ctx.room.local_participantpublication = await ctx.agent.publish_track(track, options)frequency = 440async def _sinewave():audio_frame = rtc.AudioFrame.create(SAMPLE_RATE, NUM_CHANNELS, SAMPLES_PER_CHANNEL)audio_data = np.frombuffer(audio_frame.data, dtype=np.int16)time = np.arange(SAMPLES_PER_CHANNEL) / SAMPLE_RATEtotal_samples = 0while True:time = (total_samples + np.arange(SAMPLES_PER_CHANNEL)) / SAMPLE_RATEsinewave = (AMPLITUDE * np.sin(2 * np.pi * frequency * time)).astype(np.int16)np.copyto(audio_data, sinewave)# send this frame to the trackawait source.capture_frame(frame)total_samples += samples_per_channel 
处理文本消息
监听data_received事件,处理用户发来的消息;通过publish_data()发送消息给用户。
@room.on("data_received")
def on_data_received(data: rtc.DataPacket):logging.info("received data from %s: %s", data.participant.identity, data.data)# string payload will be encoded to bytes with UTF-8
await room.local_participant.publish_data("my payload",reliable=True,destination_identities=["identity1", "identity2"],topic="topic1") 
