Skip to the content.

ELK日志服务使用-服务配置

logstash的配置

先来说一下logstash启动的配置文件/opt/logstash/bin/logstash.lib.sh 里面有一个参数LS_HEAP_SIZE设置每一个logstash进程占用多少内存,一般这个参数不用调,如果内存小的话可以改小一些 6 LS_HEAP_SIZE=”${LS_HEAP_SIZE:=500m}” logstash配置一般放在/etc/logstash/conf.d/ 文件夹 比如上一篇文章“ELK日志服务使用-基本安装 ”里面对Apache access日志的处理,配置较简单,下面来分析一下处理java的log4j格式日志,下面做详细说明,日志片段如下:

2015-11-13 11:18:21,036 (ExceptionFilter.java:87) ERROR [DubboServerHandler-1.1.1.1:20886-thread-189] [DUBBO] Got unchecked and undeclared exception which called by. JedisConnectionException: java.net.SocketTimeoutException: Read timed out
    at redis.clients.util.RedisInputStream.ensureFill(RedisInputStream.java:201)
Caused by: java.net.SocketTimeoutException: Read timed out
    at java.net.SocketInputStream.socketRead0(Native Method)

input {
  file {
    type => "javalog"
    path => [ "/work/Serivce/nohup.out" ]
    sincedb_path => "/opt/logstash/conf/sincedb_serivce"
    start_position => "beginning"
    codec => multiline {
    #pattern => "%d{yyyy-MM-dd HH:mm:ss,SSS} (%F:%L) %p [%t] %m%n (^\s+at .+)|(^\d+\serror)|(^.+Exception: .+)|(^\s+... \d+ more)|(^\s*Caused by:.+)"
    pattern => "%d{yyyy-MM-dd HH:mm:ss,SSS}"
    what => "next"
    negate => true
    }
    }
}
filter {
  mutate {
    gsub => [ "message", "\r", "" ]
    replace => ["host", "web1"]
  }
  grok {
    type => "javalog"
    add_tag => [ "web1", "commonbase" ]
    match => [ "message", "(?m)%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:severity} %{GREEDYDATA:message}" ]
    overwrite => [ "message" ]
  }
  date {
    match => [ "timestamp" , "yyyy-MM-dd HH:mm:ss,SSS" ]
  }
}
output {
elasticsearch {
    host => "127.0.0.1"
    index => "web1-commonbase-%{+YYYY.MM.dd}"
}
}

这个配置文件主要分input、filter、output三部分,先说input配置: input里面一般是log file,可以一个或多个日志文件,也可以用通配符匹配文件夹中日志, type:是给日志一个类型,可以随意修改,不过和下面filter中type要对应 path:可以是一个日志,或者多个日志 sincedb_path:logstash读取日志文件的标记,会有一个标记,比如读到日志的100行,把logstash kill掉后再启动,会读取这个文件,继续从100行往下读 start_position:从日志头开始读取日志,end表示用日志尾开始读取 codec => multiline:这是多行匹配,比如一条日志较长,或者分多行打印,用multiline可以不把日志拆为多行,里面的pattern需要根据自己的日志格式来写,不同日志不一样的pattern,上面是java日志的例子 what => “previous” or “next” what有两个参数,next或previous,是匹配到的日志归为上一条还是下一条日志 negate => “true” or “false” true表示不匹配pattern也会建立一个多行匹配,默认是true

multiline多行匹配

对于multiline多行匹配,如果每条日志是以时间为开头的话,直接匹配时间就好。就是说匹配了日志的开头就可以,如果是其他信息,就匹配其他信息,不一定是时间。比如日志开头如下:

[09-May-2016 03:40:50 UTC] XXX...
XXX...
 
logstash的配置如下:
input {
   file {
    path => [ "/path/for/logs/file" ]
    start_position => "beginning"
                codec => multiline {
                        pattern => "\[\d+\-\S+\-\d+ \d+\:\d+\:\d+\ UTC\]"
                        negate => true
                        what => "previous"
                }
  }
}
output {
    stdout {codec => "rubydebug"}
  }

filter配置,过滤、修改日志: mutate:改变日志,有add_field、add_tag、remove_field、remove_tag、gsub、rename、replace、split等功能,比如

filter {
mutate {
add_field => { "foo_%{somefield}" => "Hello world, from %{host}" }
}
}

grok:解析任意文本和结构,匹配日志的,可以参考 https://github.com/logstash-plugins/logstash-patterns-core/tree/master/patterns 官方文档介绍的很详细 https://www.elastic.co/guide/en/logstash/1.5/plugins-filters-grok.html date:主要是我国用东八区的时区,要不kibana界面显示的时间是格林尼治标准时间

output配置: elasticsearch:把日志格式化处理后,交给elasticsearch,index根据自己实际情况写,默认为logstash。当然outpub可以为TCP/UDP协议、redis、rabbitmq、kafka、activatemq、file、json等

grok匹配规则

一个小插曲:许多人不清楚grok匹配规则,以上面日志片段为例,在http://grokdebug.herokuapp.com/网站测试,匹配时间,错误类型,日志内容等

%{TIMESTAMP_ISO8601:timestamp} \(%{WORD:exp}.%{WORD:jv}:%{NUMBER:\d+}\) %{LOGLEVEL:severity} %{GREEDYDATA:syslog_message}
(?<time>.*?) \((?<except>.*?)\) (?<loglevel>.*?) %{GREEDYDATA:message}

上面两种写法都可以,参考:http://grokconstructor.appspot.com/RegularExpressionSyntax.txt

ELK日志服务使用-服务配置 - 第1张

elasticsearch的配置

/opt/elasticsearch/bin/elasticsearch.in.sh 里面

if [ "x$ES_MIN_MEM" = "x" ]; then
ES_MIN_MEM=256m
fi
if [ "x$ES_MAX_MEM" = "x" ]; then
ES_MAX_MEM=1g
fi

ES_MIN_MEM和ES_MAX_MEM最好相等,根据服务器内存设置, elasticsearch另一个配置文件 /opt/elasticsearch/config/elasticsearch.yml 可以设置多机集群Cluster,数据分片shards,Network And HTTP,节点的恢复recovery机制,集群的自动发现等,按配置里面的说明修改即可,一般的elasticsearch监听我就让他在服务器内部,network.bind_host: 127.0.0.1,也有添加 bootstrap.mlockall: true #防止内存不足,会切换至交换分区 bootstrap.max_open_files: true #防止文件打开过多系统出错 script.disable_dynamic: true #禁止远程代码执行 这几个选项

kibana的配置

kibana/config/kibana.yml 配置中修改如下: host: “127.0.0.1” elasticsearch_url: “http://127.0.0.1:9200” 其他无修改

简单的处理elk的访问安全交由nginx的auth_basic吧,由htpasswd设置密码

server {
 
        listen 12345;
        server_name 1.1.1.1;
        access_log  /var/log/nginx/elk.log default;
        error_page  404 = /404.html;
        error_page  500 502 503 504  /50x.html;
        location ~ (\.*)$ {
        proxy_pass http://127.0.0.1:5601;
        proxy_redirect off;
        proxy_set_header Host $host;
        proxy_set_header X-Real-IP $remote_addr;
        proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
        client_max_body_size 10m;
        client_body_buffer_size 128k;
        proxy_connect_timeout 90;
        proxy_send_timeout 90;
        proxy_read_timeout 90;
        proxy_buffer_size 4k;
        proxy_buffers 4 32k;
        proxy_busy_buffers_size 64k;
        proxy_temp_file_write_size 64k;
        auth_basic           "Restricted";
        auth_basic_user_file /usr/local/nginx/conf/server/htpasswd;
        }
        location ~* \.(sql|bak|svn|old|gz)$ {
                return 403;
        }
        location ~.*\.(gif|jpg|jpeg|png|bmp|swf)$ {
                expires      7d;
        }
        location ~ .*\.(js|css)?$ {
                   expires      12h;
        }
}

在kibana首页,一般会遇到下面的疑问,不清楚index为啥是灰色,不能出来展示日志的页面,是因为你在logstash的配置output中,自定义了index,所以要写你自定义的,如果不写,默认是logstash。如果还是灰色,建议就写个*放这里吧,先能用,再一步一步调试,如下:

ELK日志服务使用-服务配置 - 第3张

ELK日志服务使用-服务配置 - 第4张

logstash检测配置文件正确性:

1,logstash -t -f logstash的配置.conf -t就是–configtest,检查配置文件是否有语法错误

2,logstash -v -f logstash的配置.conf 等logstash启动好了,手动生成一条日志,看这里是否抛出自己定义好的格式日志

如果input中日志文件较多,在input可以用*.log匹配日志。

如果input或output中,日志类型较多,就需要在logstash的配置中用if语句匹配tag或者type区分不同的日志:

output{
if [tag] == "XXX" {
elasticsearch {
hosts => "192.168.1.1"
index => "XXX-%{+YYYY.MM.dd}"
}}
if [tag] == "YYY" {
elasticsearch {
hosts => "192.168.1.2"
index => "YYY-%{+YYYY.MM.dd}"
}}
 
#另外,output一般输出有下面几种方式:
#1,在命令行直接打印输出,主要用于调试
#2,输出到elasticsearch,如果不在本机,加IP,端口等参数
#3,输出到本地文件
    stdout {
        codec=>"rubydebug"
    }
    elasticsearch {
        host => "localhost"
    }
    file {
    path => "/tmp/test.log"
    message_format => "%{message}%"
    }
 
}

同样,input也可以定义日志,加不同的tag或type。

rsyslog debug方式: rsyslogd -dn

grok匹配 http://grokdebug.herokuapp.com/

https://github.com/logstash-plugins/logstash-patterns-core/tree/master/patterns

通过上面叙述,基本的elk服务配置和配置的说明大概熟悉了,下一篇讲解生产环境的elk配置几种方式

https://discuss.elastic.co/ ELK论坛

2016年02月24日 于 linux工匠 发表