Elasticsearch 新增字段匹配查询的问题

212 天前
 iian
想使用 es 对上网日志进行分析,流控设备记录的用户访问日志可输出给 kafka ,日志中有用的信息如下:
时间 域名 URI 账号

目前思路是通过 logstash 读 kafka 数据,拆分后进 es 中建索引,但是最终想统计的是账号所在的部门对某个站点的访问量,例如:1 个月内,技术部,访问 www.163.com 的次数。

现在索引里面只有账号信息,账号和部门的对应关系在其他数据库中,应该如何把部门信息与账号匹配后存在 es 中?

现在想到的两种方式(但是不知道是否可以以及如何实现)

1.将账号和部门信息存在文件或 redis 里,logstash 有多个 input ,同时从 kafka 和文件读,从 kafka 读一条日志的时候,用账号去匹配部门,然后一起写到 es 中,如果可以,需要用 logstash 如何来实现?

2.logstash 正常处理日志进 es ,在 es 中新增一个部门字段,然后用账号匹配部门信息(不知道如何实现),写到这个新字段里。这样应该用到 es 的什么功能?

Elasticsearch 新手,望不吝赐教。
978 次点击
所在节点    Elasticsearch
6 条回复
justest123
212 天前
如果决定使用 kafka -> logstash -> elasticsearch 的方案,结合我以前的经验,大概率是可以在 logstash 这一环节补充账号对应的部门信息的(最近几年没怎么实际用过 logstash 了,不敢打保票)。

先回答你的两种方式:

第一种,多个 input 同时读取,这种是不可行的,对多个 input 来说,它们采集到的数据是相互独立的,没有办法结合。

第二种,es 应该要新增部门字段,但这个字段比较难在写入文档的时候从账号关联到部门,印象里 es 有个 script 脚本功能,但好像都是用在更新、查询的时候,能不能用在文档写入阶段就不懂了(→_→ 有没有大佬有实际应用的案例能长长见识。

最后,关于怎么实现账号找部门:logstash 的插件分三类,input filter output ,可以尝试 logstash-filter-ruby 这个 filter 插件来写 ruby 代码。

1. 如果 input 插件读取到的日志信息是 json 格式的,可以用一下 logstash-filter-json 插件,将内容先解析出来。

2. logstash-filter-ruby 插件中拿到账号,如果可以将账号和部门信息存在文件里,就可以写 ruby 代码读取本地文件,找到部门,将部门字段同时写进 logstash 的 event 对象里。

3. filter 结束,output 环节照常,es 中新增一个部门字段,写入即可。
baozhibo
212 天前
这个我们用的 flink 解决,从 kafka 读日志,flink 日志泛化异步查询 redis 部门员工信息,输出到 es 去。这样 es 里展示的日志就是都有部门信息了。
iian
212 天前
@justest123 #1 logstash-filter-ruby 插件的方式我再看看说明如何来实现
iian
212 天前
@baozhibo #2 flink 的方式我查查资料,是否使用 logstash 倒是无所谓,只要能实现从 kafka 读,中间过程匹配出部门信息,最后写到 es 中就行。
justest123
212 天前
@iian 简单写了个,测试了下可以用,但只能读本地文件确实没直接实时读 redis 方便。

```
input {
file {
path => "D:/logstash-test/input.txt"
}
}

filter {
ruby {
init => '
# 引入 json ,方便操作
require "json"

# 从本地文件中读取,解析后初始化一个 hash ,key 为 userId ,value 为部门 id
@@userDepMap = Hash.new
File.open("D:/logstash-test/filter.txt", "r").each_line do |line|
userDepArray = line.split(",")
@@userDepMap[userDepArray[0]] = userDepArray[1]
end
'
code => '
# 从 message 中拿到消息本身,转 json
msg = event.get("message")
msgJson = JSON.parse(msg)

# 从消息中拿到 userId ,从 hash 中找到对应的部门
user = msgJson["user"].to_s
dep = @@userDepMap[user]

# 将部门保存到消息 json 中
msgJson["dep"] = dep

# 将最新的 json 转字符串,重新设置回 event 中
event.set("message", JSON.generate(msgJson))
'
}
}

output {
stdout {

}
file {
path => "D:/logstash-test/output.txt"
}
}

```
iian
211 天前
@justest123 感谢🙏,我测试看看。

这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。

https://www.v2ex.com/t/980584

V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。

V2EX is a community of developers, designers and creative people.

© 2021 V2EX