在使用 [Vector]( 
https://vector.dev/)( vrl 语言)时,消费 Kafka 数据需要遵循几个步骤。假设你已经通过 Vector 连接 Kafka ,并将 Kafka 数据流转换为 Vector 能处理的日志结构。接下来,你可以使用 VRL (Vector Remap Language) 来处理像 `[{ "a": 1 }, { "b": 2 }, { "c": 12 }, { "z": 83 }]` 这样的 JSON 数据。
以下是消费 Kafka 数据的流程:
### 1. Vector 配置 Kafka Source
首先,确保你的 `vector.toml` 文件中已经配置好了 Kafka Source 。简单示例如下:
```toml
[sources.kafka]
  type = "kafka"
  bootstrap_servers = "localhost:9092"
  group_id = "consumer-group-id"
  topics = ["my-topic"]
  key_field = "key"    # 如果 Kafka 数据有 key
  value_field = "message"   # 消息数据字段
  encoding.codec = "json"   # 假设 Kafka 数据是 JSON 格式
```
### 2. 使用 VRL 处理 Kafka 消息
假设 Kafka 消息的 `message` 字段包含了你的数据,例如 `[{ "a": 1 }, { "b": 2 }, { "c": 12 }, { "z": 83 }]`,你可以使用 VRL 脚本来提取和处理这些字段。
#### VRL 脚本示例:
```toml
[transforms.kafka_parser]
  type = "remap"
  inputs = ["kafka"]
  source = '''
  # 假设 message 字段包含 JSON 数据数组
  .data = parse_json!(.message) 
  '''
```
### 3. 消费数据
当 `message` 是 `[{ "a": 1 }, { "b": 2 }, { "c": 12 }, { "z": 83 }]` 这样的数组时,VRL 处理步骤可以提取每个字段。
你可以直接在 `source` 部分进行各种操作,比如遍历 JSON 数组或处理特定的值:
```toml
source = '''
  .data = parse_json!(.message)  
  # 遍历数组,提取每个元素
  .fields = []
  for item in .data do
    .fields = .fields + [get(item, "a", "undefined")]
  end
  '''
```
### 4. 输出处理后的数据
处理完 Kafka 数据后,你可以将结果传递给某个 Sink ,例如 Console 、Elasticsearch 等。
```toml
[sinks.console]
  type = "console"
  inputs = ["kafka_parser"]
  encoding.codec = "json"
```
### 示例流程总结
1. Kafka source 接收 `[{ "a": 1 }, { "b": 2 }, { "c": 12 }, { "z": 83 }]`。
2. 使用 VRL 的 `parse_json!` 函数解析 JSON 数据。
3. 遍历数组、提取每个 JSON 对象中的值。
4. 输出处理后的数据到 sink ,例如 Console 。
你可以根据业务需求修改 VRL 逻辑,处理和过滤特定字段、对值进行转换等。