UDN-企业互联网技术人气社区

板块导航

浏览  : 1488
回复  : 0

[资源分享] LOGSTASH+ELASTICSEARCH处理MYSQL慢查询日志

[复制链接]
htmlman的头像 楼主
发表于 2016-5-25 10:51:20 | 显示全部楼层 |阅读模式
  1. 找到日志的位置

  先确认是否开启了, 然后找到日志文件的位置
  1. > show variables like '%slow%';
  2. +---------------------+-------------------------------------+
  3. | Variable_name       | Value                               |
  4. +---------------------+-------------------------------------+
  5. | log_slow_queries    | ON                                  |
  6. | slow_launch_time    | 2                                   |
  7. | slow_query_log      | ON                                  |
  8. | slow_query_log_file | /data/mysqllog/20000/slow-query.log |
  9. +---------------------+-------------------------------------+
复制代码

  2. 慢查询日志

  格式基本是如下, 当然, 格式如果有差异, 需要根据具体格式进行小的修改
  1. # Time: 160524  5:12:29
  2. # User@Host: user_a[xxxx] @  [10.166.140.109]
  3. # Query_time: 1.711086  Lock_time: 0.000040 Rows_sent: 385489  Rows_examined: 385489
  4. use dbname;
  5. SET timestamp=1464037949;
  6. SELECT 1 from dbname;
复制代码

  3. 使用 logstash 采集

  采集, 无非是用multiline进行多行解析

  但是, 需要处理的

  第一个是, 去除掉没用的信息

  第二个, 慢查询sql, 是会反复出现的, 所以, 执行次数成了一个很重要的指标. 我们要做的, 就是降噪(将参数去掉, 涉及带引号的内容+数字), 将参数类信息过滤掉, 留下核心的sql, 然后计算出一个hash, 这样就可以在查询, 根据这个字段进行聚合. 这里用到了 mutate 以及 checksum
  1.   # calculate unique hash
  2.   mutate {
  3.     add_field => {"sql_for_hash" => "%{sql}"}
  4.   }
  5.   mutate {
  6.     gsub => [
  7.         "sql_for_hash", "'.+?'", "",
  8.         "sql_for_hash", "-?\d*\.{0,1}\d+", ""
  9.     ]
  10.   }
  11.   checksum {
  12.     algorithm => "md5"
  13.     keys => ["sql_for_hash"]
  14.   }
复制代码

  最后算出来的md5, 放入了logstash_checksum

  完整的logstash配置文件(具体使用可能需要根据自身日志格式做些小调整) 注意, 里面的pattern ALLWORD [\s\S]*
  1. input {
  2.   file {
  3.     path => ["/data/mysqllog/20000/slow-query.log"]
  4.     sincedb_path => "/data/LogNew/logstash/sincedb/mysql.sincedb"
  5.     type => "mysql-slow-log"
  6.     add_field => ["env", "PRODUCT"]
  7.     codec => multiline {
  8.       pattern => "^# User@Host:"
  9.       negate => true
  10.       what => previous
  11.     }
  12.   }
  13. }
  14. filter {
  15.   grok {
  16.     # User@Host: logstash[logstash] @ localhost [127.0.0.1]
  17.     # User@Host: logstash[logstash] @  [127.0.0.1]
  18.     match => [ "message", "^# User@Host: %{ALLWORD:user}\[%{ALLWAORD}\] @ %{ALLWORD:dbhost}? \[%{IP:ip}\]" ]
  19.   }
  20.   grok {
  21.     # Query_time: 102.413328  Lock_time: 0.000167 Rows_sent: 0  Rows_examined: 1970
  22.     match => [ "message", "^# Query_time: %{NUMBER:duration:float}%{SPACE}Lock_time: %{NUMBER:lock_wait:float}%{SPACE}Rows_sent: %{NUMBER:results:int}%{SPACE}Rows_examined:%{SPACE}%{NUMBER:scanned:int}%{ALLWORD:sql}"]
  23.   }

  24.   // remove useless data
  25.   mutate {
  26.     gsub => [
  27.         "sql", "\nSET timestamp=\d+?;\n", "",
  28.         "sql", "\nuse [a-zA-Z0-9\-\_]+?;", "",
  29.         "sql", "\n# Time: \d+\s+\d+:\d+:\d+", "",
  30.         "sql", "\n/usr/local/mysql/bin/mysqld.+[        DISCUZ_CODE_3        ]quot;, "",
  31.         "sql", "\nTcp port:.+[        DISCUZ_CODE_3        ]quot;, "",
  32.         "sql", "\nTime .+[        DISCUZ_CODE_3        ]quot;, ""
  33.     ]
  34.   }

  35.   # Capture the time the query happened
  36.   grok {
  37.     match => [ "message", "^SET timestamp=%{NUMBER:timestamp};" ]
  38.   }
  39.   date {
  40.     match => [ "timestamp", "UNIX" ]
  41.   }


  42.   # calculate unique hash
  43.   mutate {
  44.     add_field => {"sql_for_hash" => "%{sql}"}
  45.   }
  46.   mutate {
  47.     gsub => [
  48.         "sql_for_hash", "'.+?'", "",
  49.         "sql_for_hash", "-?\d*\.{0,1}\d+", ""
  50.     ]
  51.   }
  52.   checksum {
  53.     algorithm => "md5"
  54.     keys => ["sql_for_hash"]
  55.   }

  56.   # Drop the captured timestamp field since it has been moved to the time of the event
  57.   mutate {
  58.     # TODO: remove the message field
  59.     remove_field => ["timestamp", "message", "sql_for_hash"]
  60.   }
  61. }
  62. output {
  63.     #stdout{
  64.     #    codec => rubydebug
  65.     #}
  66.     #if ("_grokparsefailure" not in [tags]) {
  67.     #    stdout{
  68.     #        codec => rubydebug
  69.     #    }
  70.     #}
  71.     if ("_grokparsefailure" not in [tags]) {
  72.         elasticsearch {
  73.           hosts => ["192.168.1.1:9200"]
  74.           index => "logstash-slowlog"
  75.         }
  76.     }
  77. }
复制代码

  采集进去的内容
  1. {
  2.            "@timestamp" => "2016-05-23T21:12:59.000Z",
  3.              "@version" => "1",
  4.                  "tags" => [
  5.         [0] "multiline"
  6.     ],
  7.                  "path" => "/Users/ken/tx/elk/logstash/data/slow_sql.log",
  8.                  "host" => "Luna-mac-2.local",
  9.                  "type" => "mysql-slow",
  10.                   "env" => "PRODUCT",
  11.                  "user" => "dba_bak_all_sel",
  12.                    "ip" => "10.166.140.109",
  13.              "duration" => 28.812601,
  14.             "lock_wait" => 0.000132,
  15.               "results" => 749414,
  16.               "scanned" => 749414,
  17.                   "sql" => "SELECT /*!40001 SQL_NO_CACHE */ * FROM `xxxxx`;",
  18.     "logstash_checksum" => "3e3ccb89ee792de882a57e2bef6c5371"
  19. }
复制代码

  4. 写查询

  查询, 我们需要按logstash_checksum进行聚合, 然后按照次数由多到少降序展示, 同时, 每个logstash_checksum需要有一条具体的sql进行展示

  通过 es 的 Top hits Aggregation 可以完美地解决这个查询需求

  查询的query
  1. body = {
  2.     "from": 0,
  3.     "size": 0,
  4.     "query": {
  5.         "filtered": {
  6.             "query": {
  7.                 "match": {
  8.                     "user": "test"
  9.                 }
  10.             },
  11.             "filter": {
  12.                 "range": {
  13.                     "@timestamp": {
  14.                         "gte": "now-1d",
  15.                         "lte": "now"
  16.                     }
  17.                 }
  18.             }
  19.         }
  20.     },
  21.     "aggs": {
  22.         "top_errors": {
  23.             "terms": {
  24.                 "field": "logstash_checksum",
  25.                 "size": 20
  26.             },
  27.             "aggs": {
  28.                 "top_error_hits": {
  29.                     "top_hits": {
  30.                         "sort": [
  31.                             {
  32.                                 "@timestamp":{
  33.                                     "order": "desc"
  34.                                 }
  35.                             }
  36.                         ],
  37.                         "_source": {
  38.                             "include": [
  39.                                "user" , "sql", "logstash_checksum", "@timestamp", "duration", "lock_wait", "results", "scanned"
  40.                             ]
  41.                         },
  42.                         "size" : 1
  43.                     }
  44.                 }
  45.             }
  46.         }
  47.     }
  48. }
复制代码

  跟这个写法相关的几个参考链接: Terms Aggregation / Elasticsearch filter document group by field

  5. 渲染页面

  python的后台, 使用sqlparse包, 将sql进行格式化(换行/缩进/大小写), 再往前端传. sqlparse
  1. >>> sql = 'select * from foo where id in (select id from bar);'
  2. >>> print sqlparse.format(sql, reindent=True, keyword_case='upper')
  3. SELECT *
  4. FROM foo
  5. WHERE id IN
  6.   (SELECT id
  7.    FROM bar);
复制代码

  然后在页面上, 使用js进行语法高亮 code-prettify

相关帖子

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关于我们
联系我们
  • 电话:010-86393388
  • 邮件:udn@yonyou.com
  • 地址:北京市海淀区北清路68号
移动客户端下载
关注我们
  • 微信公众号:yonyouudn
  • 扫描右侧二维码关注我们
  • 专注企业互联网的技术社区
版权所有:用友网络科技股份有限公司82041 京ICP备05007539号-11 京公网网备安1101080209224 Powered by Discuz!
快速回复 返回列表 返回顶部