> ## Documentation Index
> Fetch the complete documentation index at: https://private-7c7dfe99-fix-nav-issues.mintlify.site/llms.txt
> Use this file to discover all available pages before exploring further.

# 从 Elastic 迁移 agent

> 从 Elastic 迁移 agent

export const Image = ({img, alt, size}) => {
  return <Frame>
      <img src={img} alt={alt} />
    </Frame>;
};

<div id="migrating-agents-from-elastic">
  ## 从 Elastic 迁移agent
</div>

Elastic Stack 提供了多种可观测性数据采集agent。具体包括：

* [Beats 家族](https://www.elastic.co/beats) —— 例如 [Filebeat](https://www.elastic.co/beats/filebeat)、[Metricbeat](https://www.elastic.co/beats/metricbeat) 和 [Packetbeat](https://www.elastic.co/beats/packetbeat) —— 均基于 `libbeat` 库。这些 Beats 支持通过 Lumberjack 协议将[数据发送到 Elasticsearch、Kafka、Redis 或 Logstash](https://www.elastic.co/docs/reference/beats/filebeat/configuring-output)。
* [`Elastic Agent`](https://www.elastic.co/elastic-agent) 提供统一的agent，可采集日志、指标和链路追踪。该agent可通过 [Elastic Fleet Server](https://www.elastic.co/docs/reference/fleet/manage-elastic-agents-in-fleet) 进行集中管理，并支持输出到 Elasticsearch、Logstash、Kafka 或 Redis。
* Elastic 还提供了 [OpenTelemetry Collector - EDOT](https://www.elastic.co/docs/reference/opentelemetry) 的一个发行版。虽然目前它还无法由 Fleet Server 统一编排，但如果你要迁移到 ClickStack，它提供了一条更灵活、更开放的路径。

最佳迁移路径取决于当前使用的agent。在接下来的各节中，我们将介绍每种主要agent类型的迁移方案。我们的目标是尽可能降低迁移成本，并在可能的情况下，让你在过渡期间继续使用现有agent。

<div id="prefered-migration-path">
  ## 首选迁移路径
</div>

在条件允许的情况下，我们建议将所有日志、指标和 trace 采集迁移到 [OpenTelemetry (OTel) Collector](https://opentelemetry.io/docs/collector/)，并将其以[边缘侧 agent 角色](/zh/clickstack/ingesting-data/collector#collector-roles)部署。这是发送数据最高效的方式，也能避免额外的架构复杂性和数据转换。

<Info>
  **为什么选择 OpenTelemetry Collector？**

  OpenTelemetry Collector 为可观测性数据摄取提供了一种可持续、厂商中立的解决方案。我们理解，有些组织运行着由数千个、甚至数万个 Elastic agent 组成的大规模部署。对于这类用户，与现有 agent 基础设施保持兼容可能至关重要。本文档旨在为此提供支持，同时帮助团队逐步过渡到基于 OpenTelemetry 的采集方案。
</Info>

<div id="clickhouse-otel-endpoint">
  ## ClickHouse OpenTelemetry 端点
</div>

所有数据都会经由一个 **OpenTelemetry (OTel) collector** 实例摄取到 ClickStack 中，该实例是日志、指标、链路追踪和会话数据的主要入口。对于此实例，我们建议使用 collector 的官方 [ClickStack 发行版](/zh/clickstack/ingesting-data/opentelemetry#installing-otel-collector)，前提是它尚未[包含在你的 ClickStack 部署模式中](/zh/clickstack/deployment/overview)。

用户可通过[语言 SDKs](/zh/clickstack/ingesting-data/sdks)，或通过采集基础设施指标和日志的数据收集 agent，将数据发送到该 collector (例如承担 [agent](/zh/clickstack/ingesting-data/collector#collector-roles) 角色的 OTel collectors，或其他技术，如 [Fluentd](https://www.fluentd.org/) 或 [Vector](https://vector.dev/)) 。对于希望使用托管 OpenTelemetry 管道的团队，[Bindplane](/zh/clickstack/integration-partners/bindplane) 提供了一种 OpenTelemetry 原生解决方案，并内置原生 ClickStack 目标端，可简化遥测数据的采集、处理和路由。

**我们假定在所有 agent 迁移步骤中，此 collector 都可用。**

<div id="migrating-to-beats">
  ## 从 Beats 迁移
</div>

对于已大规模部署 Beats 的用户，在迁移到 ClickStack 时，可能希望保留这些现有部署。

**目前，此选项仅使用 Filebeat 测试过，因此只适用于日志。**

Beats agent 使用 [Elastic Common Schema (ECS)](https://www.elastic.co/docs/reference/ecs)，而 ClickStack 使用的 OpenTelemetry 规范目前正处于[合并 ECS 的过程中](https://github.com/open-telemetry/opentelemetry-specification/blob/main/oteps/0199-support-elastic-common-schema-in-opentelemetry.md)。不过，这些 [schema 目前仍存在显著差异](https://www.elastic.co/docs/reference/ecs/ecs-otel-alignment-overview)，因此用户目前需要先将 ECS 格式的事件转换为 OpenTelemetry 格式，再摄取到 ClickStack 中。

我们建议使用 [Vector](https://vector.dev) 来执行这一转换。它是一款轻量级、高性能的可观测性数据管道，并支持一种功能强大的转换语言，称为 Vector Remap Language (VRL)。

如果你的 Filebeat agent 已配置为将数据发送到 Kafka (这是 Beats 支持的一种输出方式) ，Vector 可以从 Kafka 中消费这些事件，使用 VRL 进行 schema 转换，然后通过 OTLP 将其转发到 ClickStack 随附的 OpenTelemetry Collector。

或者，Vector 也支持通过 Logstash 使用的 Lumberjack 协议接收事件。这使得 Beats agent 可以直接将数据发送到 Vector，在那里执行相同的转换流程，然后再通过 OTLP 转发到 ClickStack OpenTelemetry collector。

下面我们将展示这两种架构。

<Image img="https://mintcdn.com/private-7c7dfe99-fix-nav-issues/FZqG0tBuMc0GoOY1/images/use-cases/observability/clickstack-migrating-agents.png?fit=max&auto=format&n=FZqG0tBuMc0GoOY1&q=85&s=2ba057f2f998ac5ed59b3df1603c539c" alt="迁移 agents" size="lg" background width="3097" height="1688" data-path="images/use-cases/observability/clickstack-migrating-agents.png" />

在下面的示例中，我们给出了将 Vector 配置为通过 Lumberjack 协议从 Filebeat 接收日志事件的初始步骤。我们还提供了用于将入站 ECS 事件映射到 OTel 规范的 VRL，然后再通过 OTLP 将其发送到 ClickStack OpenTelemetry collector。从 Kafka 消费事件的用户可以将 Vector 的 Logstash source 替换为 [Kafka source](https://vector.dev/docs/reference/configuration/sources/kafka/)，其余步骤保持不变。

<Steps>
  <Step>
    ### 安装 Vector

    使用[官方安装指南](https://vector.dev/docs/setup/installation/)安装 Vector。

    可将其安装在与你的 Elastic Stack OTel collector 相同的实例上。

    你可以在[将 Vector 迁移到生产环境](https://vector.dev/docs/setup/going-to-prod/)时，遵循架构和安全方面的最佳实践。
  </Step>

  <Step>
    ### 配置 vector

    需要将 Vector 配置为通过 Lumberjack 协议接收事件，以模拟 Logstash 实例。可通过为 Vector 配置 [`logstash` source](https://vector.dev/docs/reference/configuration/sources/logstash/) 来实现：

    ```yaml theme={null}
    sources:
      beats:
        type: logstash
        address: 0.0.0.0:5044
        tls:
          enabled: false  # 如果使用 TLS，请设置为 true
          # 以下文件由 https://www.elastic.co/docs/reference/fleet/secure-logstash-connections#generate-logstash-certs 中的步骤生成
          # crt_file: logstash.crt
          # key_file: logstash.key
          # ca_file: ca.crt
          # verify_certificate: true
    ```

    <Info>
      **TLS 配置**

      如果需要双向 TLS，请参考 Elastic 指南 ["为 Logstash 输出配置 SSL/TLS"](https://www.elastic.co/docs/reference/fleet/secure-logstash-connections#use-ls-output) 生成证书和私钥。然后即可像上文所示那样在配置中指定这些证书和私钥。
    </Info>

    事件将以 ECS 格式接收。可以使用 Vector Remap Language (VRL) 转换器将其转换为 OpenTelemetry schema。该转换器的配置非常简单——脚本文件单独存放在一个文件中：

    ```yaml theme={null}
    transforms:
      remap_filebeat:
        inputs: ["beats"]
        type: "remap"
        file: 'beat_to_otel.vrl'
    ```

    请注意，它从上述 `beats` 数据源接收事件。下方展示了我们的重映射脚本。该脚本仅针对日志事件进行了测试，但可作为其他格式的参考基础。

    <Accordion title="VRL - ECS 转 OTel">
      ```javascript theme={null}
      # 定义根级别需要忽略的键
      ignored_keys = ["@metadata"]

      # 定义资源键前缀
      resource_keys = ["host", "cloud", "agent", "service"]

      # 为资源字段和日志记录字段分别创建对象
      resource_obj = {}
      log_record_obj = {}

      # 将所有未被忽略的根键复制到对应的对象中
      root_keys = keys(.)
      for_each(root_keys) -> |_index, key| {
          if !includes(ignored_keys, key) {
              val, err = get(., [key])
              if err == null {
                  # 检查是否为资源字段
                  is_resource = false
                  if includes(resource_keys, key) {
                      is_resource = true
                  }

                  # 添加到对应的对象中
                  if is_resource {
                      resource_obj = set(resource_obj, [key], val) ?? resource_obj
                  } else {
                      log_record_obj = set(log_record_obj, [key], val) ?? log_record_obj
                  }
              }
          }
      }

      # 分别对两个对象进行扁平化处理
      flattened_resources = flatten(resource_obj, separator: ".")
      flattened_logs = flatten(log_record_obj, separator: ".")

      # 处理资源属性
      resource_attributes = []
      resource_keys_list = keys(flattened_resources)
      for_each(resource_keys_list) -> |_index, field_key| {
          field_value, err = get(flattened_resources, [field_key])
          if err == null && field_value != null {
              attribute, err = {
                  "key": field_key,
                  "value": {
                      "stringValue": to_string(field_value)
                  }
              }
              if (err == null) {
                  resource_attributes = push(resource_attributes, attribute)
              }
          }
      }

      # 处理日志记录属性
      log_attributes = []
      log_keys_list = keys(flattened_logs)
      for_each(log_keys_list) -> |_index, field_key| {
          field_value, err = get(flattened_logs, [field_key])
          if err == null && field_value != null {
              attribute, err = {
                  "key": field_key,
                  "value": {
                      "stringValue": to_string(field_value)
                  }
              }
              if (err == null) {
                  log_attributes = push(log_attributes, attribute)
              }
          }
      }

      # 获取 timeUnixNano 的时间戳（转换为纳秒）
      timestamp_nano = if exists(.@timestamp) {
          to_unix_timestamp!(parse_timestamp!(.@timestamp, format: "%Y-%m-%dT%H:%M:%S%.3fZ"), unit: "nanoseconds")
      } else {
          to_unix_timestamp(now(), unit: "nanoseconds")
      }

      # 获取 message/body 字段
      body_value = if exists(.message) {
          to_string!(.message)
      } else if exists(.body) {
          to_string!(.body)
      } else {
          ""
      }

      # 创建 OpenTelemetry 结构
      . = {
          "resourceLogs": [
              {
                  "resource": {
                      "attributes": resource_attributes
                  },
                  "scopeLogs": [
                      {
                          "scope": {},
                          "logRecords": [
                              {
                                  "timeUnixNano": to_string(timestamp_nano),
                                  "severityNumber": 9,
                                  "severityText": "info",
                                  "body": {
                                      "stringValue": body_value
                                  },
                                  "attributes": log_attributes
                              }
                          ]
                      }
                  ]
              }
          ]
      }
      ```
    </Accordion>

    最后，转换后的事件可通过 OpenTelemetry collector 使用 OTLP 协议发送至 ClickStack。这需要在 Vector 中配置一个 OTLP sink，该 sink 将 `remap_filebeat` 转换的输出作为输入：

    ```yaml theme={null}
    sinks:
      otlp:
        type: opentelemetry
        inputs: [remap_filebeat] # 从 remap transform 接收事件 - 见下文
        protocol:
          type: http  # 端口 4317 请使用 "grpc"
          uri: http://localhost:4318/v1/logs # OTel collector 的日志端点 
          method: post
          encoding:
            codec: json
          framing:
            method: newline_delimited
          headers:
            content-type: application/json
            authorization: ${YOUR_INGESTION_API_KEY}
    ```

    此处的 `YOUR_INGESTION_API_KEY` 由 ClickStack 生成。您可以在 ClickStack UI (HyperDX) 的 `Team Settings → API Keys` 中找到该密钥。

    <Image img="https://mintcdn.com/private-7c7dfe99-fix-nav-issues/Wpmp4N2VLv_V8ziJ/images/use-cases/observability/ingestion-keys.png?fit=max&auto=format&n=Wpmp4N2VLv_V8ziJ&q=85&s=6ce62a4957e8f478f3e8047ae4265e7b" alt="摄取密钥" size="lg" width="3600" height="1902" data-path="images/use-cases/observability/ingestion-keys.png" />

    完整的最终配置如下所示：

    ```yaml theme={null}
    sources:
      beats:
        type: logstash
        address: 0.0.0.0:5044
        tls:
          enabled: false  # 如果使用 TLS，请设置为 true
            #crt_file: /data/elasticsearch-9.0.1/logstash/logstash.crt
            #key_file: /data/elasticsearch-9.0.1/logstash/logstash.key
            #ca_file: /data/elasticsearch-9.0.1/ca/ca.crt
            #verify_certificate: true

    transforms:
      remap_filebeat:
        inputs: ["beats"]
        type: "remap"
        file: 'beat_to_otel.vrl'

    sinks:
      otlp:
        type: opentelemetry
        inputs: [remap_filebeat]
        protocol:
          type: http  # 如需使用端口 4317，请改用 "grpc"
          uri: http://localhost:4318/v1/logs
          method: post
          encoding:
            codec: json
          framing:
            method: newline_delimited
          headers:
            content-type: application/json
    ```
  </Step>

  <Step>
    ### 配置 Filebeat

    现有的 Filebeat 安装只需稍作修改，将事件发送到 Vector 即可。这需要配置 Logstash 输出；同样，TLS 也可按需配置：

    ```yaml theme={null}
    # ------------------------------ Logstash 输出 -------------------------------
    output.logstash:
      # Logstash 主机
      hosts: ["localhost:5044"]

      # 可选的 SSL，默认关闭。
      # 用于 HTTPS 服务器验证的根证书列表
      #ssl.certificate_authorities: ["/etc/pki/root/ca.pem"]

      # 用于 SSL 客户端身份验证的证书
      #ssl.certificate: "/etc/pki/client/cert.pem"

      # 客户端证书密钥
      #ssl.key: "/etc/pki/client/cert.key"
    ```
  </Step>
</Steps>

<div id="migrating-from-elastic-agent">
  ## 从 Elastic Agent 迁移
</div>

Elastic Agent 将不同的 Elastic Beats 整合为一个软件包。它可与 [Elastic Fleet](https://www.elastic.co/docs/reference/fleet/fleet-server) 集成，从而实现集中编排和配置。

对于已部署 Elastic Agent 的用户，有几种迁移路径可选：

* 将 Elastic Agent 配置为通过 Lumberjack 协议发送到 Vector 端点。**目前，这种方式仅针对仅使用 Elastic Agent 收集日志数据的用户进行了测试。** 这可以通过 Kibana 中的 Fleet UI 统一配置。
* [将 Elastic Agent 作为 Elastic OpenTelemetry Collector (EDOT) 运行](https://www.elastic.co/docs/reference/fleet/otel-agent)。Elastic Agent 内嵌了 EDOT Collector，因此您只需对应用程序和基础设施进行一次埋点，即可将数据发送到多个供应商和后端。在这种配置下，您只需将 EDOT collector 配置为通过 OTLP 将事件转发到 ClickStack OTel collector。**这种方式支持所有事件类型。**

下面我们将演示这两种方式。

<div id="sending-data-via-vector">
  ### 通过 Vector 发送数据
</div>

<Steps>
  <Step>
    #### 安装并配置 Vector

    按照从 Filebeat 迁移部分文档中的[相同步骤](#install-vector)安装并配置 Vector。
  </Step>

  <Step>
    #### 配置 Elastic Agent

    需要将 Elastic Agent 配置为通过 Logstash 的 Lumberjack 协议发送数据。这是一种[受支持的部署模式](https://www.elastic.co/docs/manage-data/ingest/ingest-reference-architectures/ls-networkbridge)，既可以集中配置；如果是不使用 Fleet 的部署方式，也可以[通过 agent 配置文件 `elastic-agent.yaml`](https://www.elastic.co/docs/reference/fleet/logstash-output) 进行配置。

    要在 Kibana 中进行集中配置，可以[向 Fleet 添加一个 Output](https://www.elastic.co/docs/reference/fleet/fleet-settings#output-settings)。

    <Image img="https://mintcdn.com/private-7c7dfe99-fix-nav-issues/0q6iTuCC0qup5NC4/images/use-cases/observability/add-logstash-output.png?fit=max&auto=format&n=0q6iTuCC0qup5NC4&q=85&s=e0dc151e101405a7a04cfe92e4b4b8a9" alt="添加 Logstash Output" size="md" width="839" height="1425" data-path="images/use-cases/observability/add-logstash-output.png" />

    然后即可在 [agent policy](https://www.elastic.co/docs/reference/fleet/agent-policy) 中使用此 Output。这意味着，所有使用该策略的 agent 都会自动将其数据发送到 Vector。

    <Image img="https://mintcdn.com/private-7c7dfe99-fix-nav-issues/0q6iTuCC0qup5NC4/images/use-cases/observability/agent-output-settings.png?fit=max&auto=format&n=0q6iTuCC0qup5NC4&q=85&s=51567a0969e45938391f06d54b567b62" alt="Agent 设置" size="md" width="659" height="220" data-path="images/use-cases/observability/agent-output-settings.png" />

    由于这需要配置基于 TLS 的安全通信，我们建议参考指南[“为 Logstash Output 配置 SSL/TLS”](https://www.elastic.co/docs/reference/fleet/secure-logstash-connections#use-ls-output)。按照该指南操作时，可将用户的 Vector 实例视为充当 Logstash 角色。

    请注意，这还要求用户将 Vector 中的 Logstash source 也配置为 mutual TLS。使用[该指南中生成的](https://www.elastic.co/docs/reference/fleet/secure-logstash-connections#generate-logstash-certs) 密钥和证书，正确配置输入端。

    ```yaml theme={null}
    sources:
      beats:
        type: logstash
        address: 0.0.0.0:5044
        tls:
          enabled: true  # 如果使用 TLS，请将其设为 true。
          # 以下文件根据 https://www.elastic.co/docs/reference/fleet/secure-logstash-connections#generate-logstash-certs 中的步骤生成
          crt_file: logstash.crt
          key_file: logstash.key
          ca_file: ca.crt
          verify_certificate: true
    ```
  </Step>
</Steps>

<div id="run-agent-as-otel">
  ### 将 Elastic agent 作为 OpenTelemetry collector 运行
</div>

Elastic agent 内嵌了 EDOT Collector，让您只需为应用程序和基础设施进行一次埋点，即可将数据发送到多个供应商和后端。

<Info>
  **agent 集成与编排**

  使用随 Elastic agent 分发的 EDOT collector 的用户将无法利用 [agent 提供的现有集成](https://www.elastic.co/docs/reference/fleet/manage-integrations)。此外，该 collector 无法由 Fleet 进行集中管理，这意味着用户必须以[独立模式运行 agent](https://www.elastic.co/docs/reference/fleet/configure-standalone-elastic-agents)，并自行管理配置。
</Info>

要使用 EDOT collector 运行 Elastic agent，请参阅 [Elastic 官方指南](https://www.elastic.co/docs/reference/fleet/otel-agent-transform)。不过，与指南中所示配置 Elastic 端点的方式不同，您需要移除现有的 `exporters`，并配置 OTLP 输出，将数据发送到 ClickStack OpenTelemetry collector。例如，exporters 的配置将变为：

```yaml theme={null}
exporters:
  # 将日志和指标发送到 Elasticsearch Managed OTLP Input 的导出器
  otlp:
    endpoint: localhost:4317
    headers:
      authorization: ${YOUR_INGESTION_API_KEY}
    tls:
      insecure: true
```

<Info>
  **托管 ClickStack**

  默认情况下，如果以独立方式为托管 ClickStack 运行 OpenTelemetry Collector，则无需 API 摄取密钥。不过，也可以通过指定 OTLP 身份验证令牌来保护摄取。请参阅[“保护收集器”](/zh/clickstack/ingesting-data/collector#securing-the-collector)。
</Info>

此处的 `YOUR_INGESTION_API_KEY` 由 ClickStack 生成。您可以在 ClickStack UI 的 `Team Settings → API Keys` 中找到该密钥。

<Image img="https://mintcdn.com/private-7c7dfe99-fix-nav-issues/Wpmp4N2VLv_V8ziJ/images/use-cases/observability/ingestion-keys.png?fit=max&auto=format&n=Wpmp4N2VLv_V8ziJ&q=85&s=6ce62a4957e8f478f3e8047ae4265e7b" alt="摄取密钥" size="lg" width="3600" height="1902" data-path="images/use-cases/observability/ingestion-keys.png" />

如果 Vector 已配置为使用双向 TLS，并且证书和私钥是按照指南[“为 Logstash 输出配置 SSL/TLS”](https://www.elastic.co/docs/reference/fleet/secure-logstash-connections#use-ls-output)中的步骤生成的，则还需要相应配置 `otlp` 导出器，例如：

```yaml theme={null}
exporters:
  # 将日志和指标发送到 Elasticsearch Managed OTLP Input 的导出器
  otlp:
    endpoint: localhost:4317
    headers:
      authorization: ${YOUR_INGESTION_API_KEY}
    tls:
      insecure: false
      ca_file: /path/to/ca.crt
      cert_file: /path/to/client.crt
      key_file: /path/to/client.key
```

<div id="migrating-from-elastic-otel-collector">
  ## 从 Elastic OpenTelemetry Collector 迁移
</div>

对于已经在运行 [Elastic OpenTelemetry Collector (EDOT)](https://www.elastic.co/docs/reference/opentelemetry) 的用户，只需重新配置其 agent，通过 OTLP 将数据发送到 ClickStack OpenTelemetry collector 即可。所需步骤与上文介绍的将 [Elastic Agent 作为 OpenTelemetry collector 运行](#run-agent-as-otel) 的步骤完全相同。此方法适用于所有数据类型。
