Skip to main content
Version: Next

AsyncLLMEngine

*在线运行 vLLM 入门教程:零基础分步指南

class_ vllm.AsyncLLMEngine(*args, log_requests: bool = True, start_engine_loop: bool = True, **kwargs)

[source]

基类:EngineClient

LLMEngine 的异步封装类。

该类用于封装 LLMEngine 类,使其变为异步。它使用 asyncio 创建一个后台循环,持续处理传入的请求。当等待队列中有请求时,generate 方法会触发 LLMEngine 进行处理。generate 方法将 LLMEngine 的输出结果返回给调用者。

参数:

  • log_requests – 是否记录请求。
  • start_engine_loop – 如果为 True,生成调用时将自动启动运行引擎的后台任务。
  • *args – LLMEngine 的参数。
  • **kwargs – LLMEngine 的参数。

async_ abort(request_id: str) → None

[source]

中止请求。

中止已提交的请求。如果请求已完成或未找到,此方法将不执行任何操作。

参数:

request_id – 请求的唯一标识符。

async_ add_lora(lora_request: LoRARequest) → None

[source]#

将新的 LoRA 适配器加载到引擎中,以供后续请求使用。

async_ check_health() → None

[source]

如果引擎运行状况不佳,则会引发错误。

async_ encode(prompt: str | TextPrompt | TokensPrompt | ExplicitEncoderDecoderPrompt, pooling_params: PoolingParams, request_id: str, lora_request: LoRARequest | None = None, trace_headers: Mapping[str, str] | None = None, priority: int = 0) → AsyncGenerator[PoolingRequestOutput, None]

[source]

为池化模型的请求生成输出。

为请求生成输出。此方法是一个协程。它将请求添加到 LLMEngine 的等待队列中,并将 LLMEngine 的输出流式传输给调用者。

参数:

  • prompt – 输入给 LLM 的提示。有关每种输入格式的更多详细信息,请参阅 PromptType
  • pooling_params – 请求的池化参数。
  • request_id – 请求的唯一标识符。
  • lora_request – 用于生成的 LoRA 请求(如果有)。
  • trace_headers – OpenTelemetry 跟踪头。
  • priority – 请求的优先级。仅适用于优先级调度。

生成:

LLMEngine 为请求生成的 PoolingRequestOutput 对象。

详细信息:

  • 如果引擎未运行,则启动后台循环,该循环会迭代调用 engine_step() 来处理等待的请求。
  • 将请求添加到引擎的 RequestTracker 中。在下一次后台循环中,该请求将被发送到底层引擎。同时,将创建一个相应的 AsyncStream
  • 等待来自 AsyncStream 的请求输出并生成它们。

示例

>>> # 请参考 entrypoints/api_server.py 获取完整示例。
>>>
>>> # 初始化引擎和示例输入
>>> # 注意这里的 engine_args 是 AsyncEngineArgs 实例
>>> engine = AsyncLLMEngine.from_engine_args(engine_args)
>>> example_input = {
>>> "input": "What is LLM?",
>>> "request_id": 0,
>>> }
>>>
>>> # 开始生成
>>> results_generator = engine.encode(
>>> example_input["input"],
>>> PoolingParams(),
>>> example_input["request_id"])
>>>
>>> # 获取结果
>>> final_output = None
>>> async for request_output in results_generator:
>>> if await request.is_disconnected():
>>> # 如果客户端断开连接,则终止请求。
>>> await engine.abort(request_id)
>>> # 返回一个错误,或者抛出一个错误。
>>> ...
>>> final_output = request_output
>>>
>>> # 处理并返回最终输出
>>> ...

async_ engine_step(virtual_engine: int) → bool

[source]

触发引擎处理等待中的请求。

如果有正在处理的请求,则返回 True。

classmethod_ from_engine_args(engine_args: AsyncEngineArgs, start_engine_loop: bool = True, usage_context: UsageContext = UsageContext.ENGINE_CONTEXT, stat_loggers: Dict[str, StatLoggerBase] | None = None) → AsyncLLMEngine

[source]

根据引擎参数创建异步 LLM 引擎。

classmethod_ from_vllm_config(vllm_config: VllmConfig, start_engine_loop: bool = True, usage_context: UsageContext = UsageContext.ENGINE_CONTEXT, stat_loggers: dict[str, vllm.engine.metrics_types.StatLoggerBase] | None = None, disable_log_requests: bool = False, disable_log_stats: bool = False) → AsyncLLMEngine

从 EngineArgs 创建一个 AsyncLLMEngine。

async_ generate(prompt: str | TextPrompt | TokensPrompt | ExplicitEncoderDecoderPrompt, sampling_params: SamplingParams, request_id: str, lora_request: LoRARequest | None = None, trace_headers: Mapping[str, str] | None = None, prompt_adapter_request: PromptAdapterRequest | None = None, priority: int = 0) → AsyncGenerator[RequestOutput, None]

[source]

为请求生成输出。

为请求生成输出。此方法是一个协程。它将请求添加到 LLMEngine 的等待队列中,并将 LLMEngine 的输出流式传输给调用者。

参数:

  • prompt – 输入给 LLM 的提示。有关每种输入格式的更多详细信息,请参阅 PromptType
  • sampling_params – 请求的采样参数。
  • request_id – 请求的唯一标识符。
  • lora_request – 用于生成的 LoRA 请求(如果有)。
  • trace_headers – OpenTelemetry 跟踪头。
  • prompt_adapter_request – 用于生成的提示适配器请求(如果有)。
  • priority – 请求的优先级。仅适用于优先级调度。

生成:

LLMEngine为请求生成的 RequestOutput 对象。

详细信息:

  • 如果引擎未运行,则启动后台循环,该循环会迭代调用 engine_step() 来处理等待的请求。
  • 将请求添加到引擎的 RequestTracker 中。在下一次后台循环中,该请求将被发送到底层引擎。同时,将创建一个相应的 AsyncStream。
  • 等待来自 AsyncStream 的请求输出并生成它们。

示例

>>> # 请参考 entrypoints/api_server.py 获取完整示例。
>>>
>>> # 初始化引擎和示例输入
>>> # 注意这里的 engine_args 是 AsyncEngineArgs 实例
>>> engine = AsyncLLMEngine.from_engine_args(engine_args)
>>> example_input = {
>>> "prompt": "What is LLM?",
>>> "stream": False, # assume the non-streaming case
>>> "temperature": 0.0,
>>> "request_id": 0,
>>> }
>>>
>>> # 开始生成
>>> results_generator = engine.generate(
>>> example_input["prompt"],
>>> SamplingParams(temperature=example_input["temperature"]),
>>> example_input["request_id"])
>>>
>>> # 获取结果
>>> final_output = None
>>> async for request_output in results_generator:
>>> if await request.is_disconnected():
>>> # Abort the request if the client disconnects.
>>> await engine.abort(request_id)
>>> # Return or raise an error
>>> ...
>>> final_output = request_output
>>>
>>> # 处理并返回最终输出
>>> ...

async_ get_decoding_config() → DecodingConfig

[source]

获取 vLLM 引擎的解码配置。

async_ get_input_preprocessor() → InputPreprocessor

[source]

获取 vLLM 引擎的输入预处理器。

async_ get_lora_config() → LoRAConfig

[source]

获取 vLLM 引擎的 LoRA 配置。

async_ get_model_config() → ModelConfig

[source]

获取 vLLM 引擎的模型配置。

async_ get_parallel_config() → ParallelConfig

[source]

获取 vLLM 引擎的并行配置。

async_ get_scheduler_config() → SchedulerConfig

[source]

获取 vLLM 引擎的调度配置。

async_ get_tokenizer(lora_request: LoRARequest | None = None) → transformers.PreTrainedTokenizer | transformers.PreTrainedTokenizerFast | TokenizerBase

[source]

获取请求的适当分词器。

async_ is_sleeping() → bool

[source]  

检查引擎是否处于休眠状态。

async_ reset_prefix_cache(device: Device | None = None) → None

[source]

重置前缀缓存。

async static_ run_engine_loop(engine_ref: ReferenceType)

[source]

我们使用引擎的弱引用,这样正在运行的循环不会阻止引擎被垃圾回收。

shutdown_background_loop() → None

[source]

关闭后台循环。

在清理过程中需要调用此方法,以移除对 self 的引用,并正确释放异步 LLM 引擎持有的资源(例如执行器及其资源)。

async_ sleep(level: int = 1) → None

[source]

让引擎休眠。

start_background_loop() → None

[source]

启动后台循环。

async_ start_profile() → None

[source]

开始分析引擎。

async_ stop_profile() → None

[source]

停止分析引擎。

async_ wake_up(tags: list[str] | None = None) → None

[source]

唤醒引擎。