04-Kafka海量日志收集系统架构设计(watcher)
目录
一、watcher 基础语法
Xpack-Watchs基本使用:
创建一个watcher,比如定义一个trigger 每个10s钟看一下input里的数据
PUT _xpack/watcher/watch/school_watcher
{ "trigger": { "schedule": { "interval": "10s" } }, // 查看任务信息 "input": { "search": { "request": { // 监控具体索引 "indices": ["school*"], // body里面具体些搜索语句 "body": { "size": 0, "query": { "match": { // 比如索引里面name 有 hello 则进行报警 "name": "hello" } } } } } }, // 对于上面的查询结果进行比较: "condition": { // compare进行比较 "compare": { // 上面的query查询的结果会放入到ctx.payload中: // 比如获取 ctx.payload.hits.total ctx.payload._shards.total 等等 "ctx.payload.hits.total": { "gt": 0 } } }, // transform作用:重新查询出文档内容赋值给ctx.payload "transform": { "search": { "request": { "indices": ["school*"], "body": { "size": 10, "query": { "match": { "name": "hello" } } } } } }, // 根据上面的查询、比较结果,执行actions里面定义的动作(定义多种报警类型) "actions": { // 报警名字 "log_hello": { // 防止报警风暴: 设置阈值 15m内曾经报警过, 则不报警 "throttle_period": "15m", // 报警方式:logging、mail、http等 "logging": { // 报警具体内容:使用 {{ 查询参数 }} 进行赋值: "text": "Found {{ctx.payload.hits.total}} hello in the school" } } } }
ctx.payload取值规范:
比如我们进行search搜索school里面name=zhangsan的数据:
GET school/_search
{ "query": { "match": { "name": "zhangsan" } } }
查询结果如下:
{ "took": 14, "timed_out": false, "_shards": { "total": 2, "successful": 2, "skipped": 0, "failed": 0 }, "hits": { "total": 1, "max_score": 1.5404451, "hits": [ { "_index": "school", "_type": "student", "_id": "1", "_score": 1.5404451, "_source": { "name": "zhangsan", "age": 25, "course": "elasticsearch", "study_date": "2018-06-15T20:30:50", "mark": "today is a good day" } } ] } }
表示查询:ctx.payload结果集:
{{#ctx.payload.hits.hits}} {{_source.name}} {{_source.course}} {{/ctx.payload.hits.hits}}
比如我们进行search搜索school,并采用聚合的方式来查询terms course数据:
GET school/_search
{ "size" : 0, "aggs" : { "myterms" : { "terms" : { "field" : "course", "size" : 10 } } } }
查询结果:
{ "took": 11, "timed_out": false, "_shards": { "total": 2, "successful": 2, "skipped": 0, "failed": 0 }, "hits": { "total": 10, "max_score": 0, "hits": [] }, "aggregations": { "myterms": { "doc_count_error_upper_bound": 0, "sum_other_doc_count": 0, "buckets": [ { "key": "elasticsearch", "doc_count": 7 }, { "key": "good", "doc_count": 1 }, { "key": "spring", "doc_count": 1 }, { "key": "spring elasticsearch", "doc_count": 1 } ] } } }
payload取值使用:现在想取得上面的hits.hits里面的数据内容,就可以使用如下方式:
表示查询:ctx.payload结果集:
{{#ctx.payload.aggregations.aggsname.buckets}} {{key}} {{doc_count}} {{/ctx.payload.aggregations.aggsname.buckets}}
针对这里内容就是:
{{#ctx.payload.aggregations.myterms.buckets}} {{key}} {{doc_count}} {{/ctx.payload.aggregations.myterms.buckets}}
watcher API使用:
查看一个watcher
GET _xpack/watcher/watch/school_watcher
删除一个watcher
DELETE _xpack/watcher/watch/school_watcher
执行watcher
POST _xpack/watcher/watch/school_watcher/_execute
查看执行结果
GET /.watcher-history*/_search?pretty
{ "sort" : [ { "result.execution_time" : "desc" } ], "query": { "match": { "watch_id": "school_watcher" } } }
triggers的几种类型
hourly、daily、weekly、monthly、yearly、cron、interval
hourly:每小时执行
例如:12:00, 12:15, 12:30, 12:45, 1:00, 1:15
{ "trigger" : { "schedule" : { "hourly" : { "minute" : [ 0, 15, 30, 45 ] } } } }
daily:每天执行
每天00:00, 12:00, and 17:00
{ "trigger" : { "schedule" : { "daily" : { "at" : [ "midnight", "noon", "17:00" ] } } } }
每天00:00, 00:30, 12:00, 12:30, 17:00 and 17:30
{ "trigger" : { "schedule" : { "daily" : { "at" { "hour" : [ 0, 12, 17 ], "minute" : [0, 30] } } } } }
weekly:指定星期几
周二12:00,周五17:00
{ "trigger" : { "schedule" : { "weekly" : [ { "on" : "tuesday", "at" : "noon" }, { "on" : "friday", "at" : "17:00" } ] } } }
周二、周五的17:00
{ "trigger" : { "schedule" : { "weekly" : { "on" : [ "tuesday", "friday" ], "at" : [ "noon", "17:00" ] } } } }
monthly:指定每月哪天执行
每月10号中午、每月20号17:00
{ "trigger" : { "schedule" : { "monthly" : [ { "on" : 10, "at" : "noon" }, { "on" : 20, "at" : "17:00" } ] } } }
每月10号、20号的00:00,12:00
{ "trigger" : { "schedule" : { "monthly" : { "on" : [ 10, 20 ], "at" : [ "midnight", "noon" ] } } } }
yearly-指定月、日、时
每年的1月10日12:00,每年的7月20日17:00
{ "trigger" : { "schedule" : { "yearly" : [ { "in" : "january", "on" : 10, "at" : "noon" }, { "in" : "july", "on" : 20, "at" : "17:00" } ] } } }
每年1月10日,1月20日,12月10日,12月20日的12:00,00:00
{ "trigger" : { "schedule" : { "yearly" : { "in" : [ "jan", "dec" ], "on" : [ 10, 20 ], "at" : [ "midnight", "noon" ] } } } }
cron-表达式
[year] 0 5 9 * * ? 0 0-5 9 * * ? 0 0/15 9 * * ?
interval-周期的
间隔单位:s:秒、m:分钟、h:小时、d:天、w:星期
input的几种类型:
Simple、Search、HTTP、Chain
Simple Input-静态数据
每天12点触发
{ "trigger" : { "schedule" : { "daily" : { "at" : "noon" } } }, "input" : { "simple" : { "name" : "John" } }, "actions" : { "reminder_email" : { "email" : { "to" : "to@host.domain", "subject" : "Reminder", "body" : "Dear {{ctx.payload.name}}, by the time you read these lines, I'll be gone" } } } }
Search-搜索
{ "input" : { "search" : { "request" : { "indices" : [ "logs" ], "body" : { "query" : { "match_all" : {} } } } } }, "condition" : { "compare" : { "ctx.payload.hits.total" : { "gt" : 5 }} } ... }
Http-请求
request.host
request.port
request.path
request.headers
request.params
request.url:request.scheme, request.host, request.port and request.params
request.method:head、get、post、put、delete
request.auth
request.body
request.proxy.host
request.proxy.port
request.connection_timeout
request.read_timeout
response_content_type:json, yaml and text
extractget请求
{ "input" : { "http" : { "request" : { "host" : "example.com", "port" : 9200, "path" : "/idx/_search" } } } }
含有body体内容
{ "input" : { "http" : { "request" : { "host" : "host.domain", "port" : 9200, "path" : "/idx/_search", "body" : "{"query" : { "match" : { "category" : "event"}}}" } } } }
含有参数的
{ "input" : { "http" : { "request" : { "host" : "host.domain", "port" : "9200", "path" : "/_cluster/stats", "params" : { "human" : "true" } } } } }
含有用户密码
{ "input" : { "http" : { "request" : { "host" : "host.domain", "port" : "9200", "path" : "/myservice", "auth" : { "basic" : { "username" : "user", "password" : "pass" } } } } } }
直接请求url的
{ "input" : { "http" : { "request" : { "url" : "http://api.openweathermap.org/data/2.5/weather", "params" : { "lat" : "52.374031", "lon" : "4.88969", "appid" : "<your openweathermap appid>" } } } } }
Chain-input-同时设置多个input,串行
{ "input" : { "chain" : { "inputs" : [ // 第一步input { "first" : { "simple" : { "path" : "/_search" } } }, // 第二步input (可以去使用第一步input返回的结果) { "second" : { "http" : { "request" : { "host" : "localhost", "port" : 9200, "path" : "{{ctx.payload.first.path}}" } } } } ] } } }
condition条件设置
如果condition条件返回true,则会触发action。如果返回 false,则就停止,不执行action
Always Condition
"condition" : { "always" : {} }
Never Condition
"condition" : { "never" : {} }
Compare Condition (和查询结果进行比较の语法如下:)
eq、not_eq、gt、gte、lt、lte
比如错误条数超过了5条进行报警、响应长时间超过多少毫秒进行报警等
{ "condition" : { "compare" : { "ctx.payload.hits.total" : { "gte" : 5 } } } }
<{expression}> 正则表达式,使用 <> 中写正则表达式
比如 当前时间 - 5分钟 进行比较,如下:
{ "condition" : { "compare" : { "ctx.execution_time" : { "gte" : "<{now-5m}>" } } } }
imooc/1297-MQ-Kafka/04-12-watcher-base/ 比较,这个就是最开始的示例里面的获取参数方式,如下:
{ "condition" : { "compare" : { "ctx.payload.aggregations.status.buckets.error.doc_count" : { "not_eq" : "{{ctx.payload.aggregations.handled.buckets.true.doc_count}}" } } } }
Array Compare Condition 数组比较
比如当前的doc_count大于25 就进行报警
{ "condition": { "array_compare": { "ctx.payload.aggregations.top_tweeters.buckets" : { "path": "doc_count" , "gte": { "value": 25, } } } } }
Script Condition 脚本方式
{ "input" : { "search" : { "indices" : "log-events", "body" : { "size" : 0, "query" : { "match" : { "status" : "error" } } } } }, "condition" : { "script" : { // 当前返回的条数是否大于阈值,进行报警 "inline" : "return ctx.payload.hits.total > threshold", "params" : { "threshold" : 5 } } }
Action 触发器
Email Action–发送邮件
如果使用发送邮件的报警,则需要在elasticsearch.yml中配置发送邮件服务的信息
xpack.notification.email: default_account: gmail_account account: gmail_account: profile: gmail smtp: auth: true starttls.enable: true host: smtp.gmail.com port: 587 user: <username> password: <password> outlook_account: profile: outlook smtp: auth: true starttls.enable: true host: smtp-mail.outlook.com port: 587 user: <username> password: <password>: exchange_account: profile: outlook email_defaults: from: <email address of service account> smtp: auth: true starttls.enable: true host: <your exchange server> port: 587 user: <email address of service account> password: <password>
发送邮件
"actions" : { // actions名字 "send_email" : { "email" : { "to" : "'Recipient Name <recipient@example.com>'", // "to" : ['Personal Name <user1@host.domain>', 'user2@host.domain'], "subject" : "Watcher Notification", "body" : "{{ctx.payload.hits.total}} error logs found" } } }
发送含有附件信息的邮件
"actions" : { "email_admin" : { "email": { "to": "'John Doe <john.doe@example.com>'", "attachments" : { // 附件方式 "my_image.png" : { "http" : { "content_type" : "image.png", "request" : { "url": "http://example.org/foo/my-image.png" } } }, // xpack reporting插件生成方式: "dashboard.pdf" : { "reporting" : { "url": "http://example.org:5601/api/reporting/generate/dashboard/Error-Monitoring" } }, // 自定义附件 "data.yml" : { "data" : { "format" : "yaml" } } } } } }
Webhook Action,发送一个http请求
发送github的issue
"actions" : { "create_github_issue" : { // 因为发邮件到达率不是特别高,所以可以使用外部的接口调用方式 // 比如这里调用url为外部的手机短信接口进行发送 "webhook" : { // 请求方式 "method" : "POST", // 外部请求地址 "url" : "https://api.github.com/repos/<owner>/<repo>/issues", // 请求报文 "body" : { "title": "Found errors in 'contact.html'", "body": "Found {{ctx.payload.hits.total}} errors in the last 5 minutes", "assignee": "web-admin", "labels": [ "bug", "sev2" ] }, // 用户名密码 "auth" : { "basic" : { "username" : "<username>", "password" : "<password>" } } } } }
带有url参数的请求
"actions" : { "my_webhook" : { "webhook" : { "method" : "POST", "host" : "mylisteningserver", "port" : 9200, "path": ":/alert", "params" : { "watch_id" : "{{ctx.watch_id}}" } } } }
自定义header
"actions" : { "my_webhook" : { "webhook" : { "method" : "POST", "host" : "mylisteningserver", "port" : 9200, "path": ":/alert/{{ctx.watch_id}}", "headers" : { "Content-Type" : "application/yaml" }, "body" : "count: {{ctx.payload.hits.total}}" } } }
Index Action–创建索引文档
"actions" : { "index_payload" : { "index" : { "index" : "my-index", "doc_type" : "my-type", "doc_id": "my-id" } } }
Logging Action–记录日志
level:error, warn, info, debug and trace
日志种类:
- category:xpack.watcher.actions.logging
"actions" : { "log" : { "transform" : { }, // 日志报警 "logging" : { "text" : "executed at {{ctx.execution_time}}", // 日志级别 "level": "info" } } }
Jira Action 与jira集成
HipChat Action
Slack Action
PagerDuty Action
使用接口的形式创建一个watcher, 进行模拟:
执watcher脚本:
查询school
GET school/student/_search
{ "query": { "match_all":{} } }
创建school_watcher
PUT _xpack/watcher/watch/school_watcher
{ "trigger": { "schedule": { "interval": "10s" } }, "input": { "search": { "request": { "indices": ["school*"], "body": { "size": 0, "query": { "match": { "name": "hello" } } } } } }, "condition": { "compare": { "ctx.payload.hits.total": { "gt": 0 } } }, "transform": { "search": { "request": { "indices": ["school*"], "body": { "size": 10, "query": { "match": { "name": "hello" } } } } } }, "actions": { "log_hello": { "throttle_period": "15m", "logging": { "text": "Found {{ctx.payload.hits.total}} hello in the school" } } } }
查看watcher执行结果
GET /.watcher-history*/_search?pretty
{ "sort" : [ { "result.execution_time" : "desc" } ], "query": { "match": { "watch_id": "school_watcher" } } }
进行数据测试:
POST /school/student
{ "name": "hello", "age": 18, "course": "elasticsearch", "study_date": "2018-08-20T20:30:50", "mark": "take care day day" }
可视化操作watcher,可以启用、禁用、添加修改、删除watcher
二、watcher使用:
创建一个watcher,比如定义一个trigger 每个10s钟看一下input里的数据
PUT _xpack/watcher/watch/applog_error_watcher
{
"trigger": {
"schedule": {
"interval": "10s"
}
},
"input": {
"search": {
"request": {
"indices": ["javalog-app-*"],
"body": {
"size": 0,
"query": {
"match": {
"level": "ERROR"
}
}
}
}
}
},
"condition": {
"compare": {
"ctx.payload.hits.total": {
"gt": 0
}
}
},
"transform": {
"search": {
"request": {
"indices": ["javalog-app-*"],
"body": {
"size": 10,
"query": {
"match": {
"name": "hello"
}
}
}
}
}
},
"actions": {
"test_error": {
"throttle_period": "1m",
"webhook" : {
"method" : "POST",
"url" : "http://192.168.11.32:8001/watch",
"body" : "{
"title": "异常错误告警",
"traceId": "{{#ctx.payload.hits.hits}} {{_source.traceId}} {{/ctx.payload.hits.hits}}",
"spanId" : "{{#ctx.payload.hits.hits}} {{_source.spanId}} {{/ctx.payload.hits.hits}}",
"level":"告警级别P1",
"body": "{{#ctx.payload.hits.hits}} {{_source.messageInfo}} {{/ctx.payload.hits.hits}}"
}
}
}
}
{{#ctx.payload.hits.hits}} {{_source.traceId}} {{/ctx.payload.hits.hits}}
{{#ctx.payload.hits.hits}} {{_source.spanId}} {{/ctx.payload.hits.hits}}
{{#ctx.payload.hits.hits}} {{_source.messageInfo}} {{/ctx.payload.hits.hits}}
查询error
GET javalog-app-2019.01.24/_search
{ "query": { "match": { "level.keyword": "ERROR" } } }
查看一个watcher
GET _xpack/watcher/watch/applog_error_watcher
删除一个watcher
DELETE _xpack/watcher/watch/applog_error_watcher
执行watcher
POST _xpack/watcher/watch/applog_error_watcher/_execute
查看执行结果
GET /.watcher-history*/_search?pretty
{ "sort" : [ { "result.execution_time" : "desc" } ], "query": { "match": { "watch_id": "applog_error_watcher" } } }
转载请注明来源,欢迎对文章中的引用来源进行考证,欢迎指出任何有错误或不够清晰的表达。可以在下面评论区评论,也可以邮件至 tuyrk@qq.com
文章标题:04-Kafka海量日志收集系统架构设计(watcher)
文章字数:3.5k
本文作者:神秘的小岛岛
发布时间:2020-04-03, 15:54:20
最后更新:2020-04-25, 18:15:29
原始链接:https://www.tuyrk.cn/imooc/1297-MQ-Kafka/04-12-watcher-base/版权声明: "署名-非商用-相同方式共享 4.0" 转载请保留原文链接及作者。