ElasticSearch之Java Api聚合分组实战


最近有个日志收集监控的项目采用的技术栈是ELK+JAVA+Spring,客户端语言使用的是Java,以后有机会的话可以试一下JavaScript+Nodejs的方式,非常轻量级的组合,只不过不太适合服务化的工程,Kibana充当可视化层,功能虽然非常强大和灵活,但是需要业务人员懂Lucene的查询语法和Kibana的Dashboard仪表盘自定义功能才能玩的转,所以Kibana面向专业的开发人员和运维人员比较良好,但面向业务人员则稍微有点难度,我们这边就使用Java进行二次开发,然后前端定义几个业务人员关注的图表,然后把后端查询的数据,按照一定的维度放进去即可。

基础环境:
(1)ElasticSearch1.7.2
(2)Logstash2.2.2
(3)Kibana4.1.2
(3)JDK7
(4)Spring4.2


使用到的技术点:
(1)ElasticSearch的查询
(2)ElasticSearch的过滤
(3)ElasticSearch的日期聚合
(4)ElasticSearch的Terms聚合
(5)ElasticSearch的多级分组
(6)ElasticSearch+Logstash的时区问题

直接上代码:

package cn.bizbook.product.elk.dao.impl;

import cn.bizbook.product.elk.config.ESConf;
import cn.bizbook.product.elk.dao.ESDao;
import cn.bizbook.product.elk.utils.TimeTools;
import cn.bizbook.product.elk.vo.count.Condition;
import cn.bizbook.product.elk.vo.count.CountType;
import cn.bizbook.product.elk.vo.count.search.GroupCount;
import cn.bizbook.product.elk.vo.count.search.MonitorCount;
import org.apache.commons.lang.StringUtils;
import org.apache.lucene.queryparser.classic.QueryParserBase;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.index.query.FilterBuilders;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.query.QueryStringQueryBuilder;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogram;
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramBuilder;
import org.elasticsearch.search.aggregations.bucket.histogram.Histogram;
import org.elasticsearch.search.aggregations.bucket.terms.StringTerms;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Repository;

import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.List;

/**
 * Created by qindongliang on 2016/4/6.
 */
@Repository("esDaoImpl")
public class ESDaoImpl implements ESDao {

    private static Logger log= LoggerFactory.getLogger(ESDaoImpl.class);
    @Autowired
    private ESConf esConf;

    @Resource(name = "client")
    private  Client client;


    @Override
    public MonitorCount count() {
        MonitorCount count=new MonitorCount();
        //今天的数量
        count.setToday_count(customCount(false,"*:*"));
        //今天的入库量
        count.setToday_store_count(customCount(false,"-save:1"));
        //所有的总量
        count.setTotal_count(customCount(true,"*:*"));
        //所有的入库总量
        count.setTotal_store_count(customCount(true,"-save:1"));
        return count;
    }

    private long customCount(boolean isQueryAll, String queryString){
        try {
            //今天的开始时间 比如2016-04-01 00:00:00
            long today_start = TimeTools.getDayTimeStamp(0);
            //今天的结束时间 也就是明天的开始时间 比如2016-04-02 00:00:00
            //一闭区间一开区间即得到一天的统计量
            long today_end=TimeTools.getDayTimeStamp(1);
            StringBuffer fq = new StringBuffer();
                     fq.append("@timestamp:")
                    .append(" [ ")
                    .append(today_start)
                    .append(" TO  ")
                    .append(today_end)
                    .append(" } ");
            //构建查询请求,使用Lucene高级查询语法
            QueryBuilder query=QueryBuilders.queryStringQuery(queryString);
            //构建查询请求
            SearchRequestBuilder search = client.prepareSearch("crawl*").setTypes("logs");
            //非所有的情况下,设置日期过滤
            if(isQueryAll){
                search.setQuery(query);//查询所有
            }else {//加上日期过滤
                search.setQuery(QueryBuilders.filteredQuery(query, FilterBuilders.queryFilter(QueryBuilders.queryStringQuery(fq.toString()))));
            }
            SearchResponse r = search.get();//得到查询结果
            long hits = r.getHits().getTotalHits();//读取命中数量
            return hits;
        }catch (Exception e){
            log.error("统计日期数量出错!",e);
        }
        return 0;
    }


    @Override
    public List<GroupCount> query(Condition condition) {
        return grouyQuery(condition);
    }

    /***
     * @param c 查询的条件
     * @return 查询的结果
     */
    private List<GroupCount> grouyQuery(Condition c){
        //封装结果集
        List<GroupCount> datas=new ArrayList<>();
        //组装分组
        DateHistogramBuilder dateAgg = AggregationBuilders.dateHistogram("dateagg");
        //定义分组的日期字段
        dateAgg.field("@timestamp");
        //按天分组
        if(CountType.EACH_DAY==(c.getType())) {
            dateAgg.interval(DateHistogram.Interval.DAY);
            dateAgg.timeZone("+8:00");
            dateAgg.format("yyyy-MM-dd");
        //按小时分组
        }else if(CountType.EACH_HOUR==c.getType()){
            dateAgg.interval(DateHistogram.Interval.HOUR);
            //按小时分组,必须使用这个方法,不然得到的结果不正确
            dateAgg.postZone("+8:00");
            dateAgg.format("yyyy-MM-dd HH");
        //无效分组
        }else{
            throw new NullPointerException("无效的枚举类型");
        }
        //二级分组,统计入库的成功失败量 0 1 2 , 1为不成功
        dateAgg.subAggregation(AggregationBuilders.terms("success").field("save"));

        //查询过滤条件
        StringBuffer fq = new StringBuffer();
        //过滤时间字段
        fq.append(" +@timestamp:")
                .append(" [ ")
                .append(c.getStart().getTime())
                .append(" TO  ")
                .append(c.getEnd().getTime())
                .append(" } ");
        //过滤一级
        if(StringUtils.isNotEmpty(c.getT1())){
            fq.append(" +t1:").append(c.getT1());
        }
        //过滤二级
        if(StringUtils.isNotEmpty(c.getT2())){
            fq.append(" +t2:").append(c.getT2());
        }
        //过滤三级
        if(StringUtils.isNotEmpty(c.getT3())){
            fq.append(" +t3:").append(c.getT3());
        }
        //过滤url
        if(StringUtils.isNotEmpty(c.getSourceUrl())){
            //对url进行转义,防止查询出现错误
            fq.append(" +url:").append(QueryParserBase.escape(c.getSourceUrl()));
        }
        //过滤省份编码
        if(StringUtils.isNotEmpty(c.getProvinceCode())){
            fq.append(" +pcode:").append(c.getProvinceCode());
        }
        //过滤入库状态
        if(c.getSavaState()!=null){
            fq.append(" +save:").append(c.getSavaState().getCode());
        }
        //过滤http状态码
        if(c.getWebsiteState()!=null){
            if(!c.getWebsiteState().getCode().equals("-1")) {
                fq.append(" +httpcode:").append(c.getWebsiteState().getCode());
            }else{
                fq.append(" -httpcode:").append("(0 110 200)");
            }
        }
        //过滤配置configid
        if(StringUtils.isNotEmpty(c.getConfigId())){
            fq.append(" +cid:").append(c.getConfigId());
        }



        //查询索引
        SearchRequestBuilder search=client.prepareSearch("crawl*").setTypes("logs");
        //组装请求
        search.setQuery(QueryBuilders.filteredQuery(QueryBuilders.matchAllQuery(),
                FilterBuilders.queryFilter(QueryBuilders.queryStringQuery(fq.toString())
                        .defaultOperator(QueryStringQueryBuilder.Operator.AND)
                ))).addAggregation(dateAgg);
        //获取查询结果
        SearchResponse r = search.get();//得到查询结果
        //获取一级聚合数据
        Histogram h=r.getAggregations().get("dateagg");
        //得到一级聚合结果里面的分桶集合
        List<DateHistogram.Bucket> buckets = (List<DateHistogram.Bucket>) h.getBuckets();
        //遍历分桶集
        for(DateHistogram.Bucket b:buckets){
            //读取二级聚合数据集引用
            Aggregations sub = b.getAggregations();
            //获取二级聚合集合
            StringTerms count = sub.get("success");
            GroupCount groupCount=new GroupCount();
            //设置x轴分组日期
            groupCount.setGroupKey(b.getKey());
            //设置指定分组条件下入库总量
            groupCount.setTotal_count(b.getDocCount());
            //读取指定分组条件下不成功的数量
            long bad_count=count.getBucketByKey("1")==null?0:count.getBucketByKey("1").getDocCount();
            //设置指定分组条件下成功的入库量
            groupCount.setTotal_store_count(b.getDocCount()-bad_count);
            //计算成功率
            groupCount.setSuccess_rate(groupCount.getTotal_store_count()*1.0/groupCount.getTotal_count());
            //添加到集合里面
            datas.add(groupCount);
        }
        return datas;
    }



}






总结:
(1)关于时区的问题,目前发现在测试按小时,按天分组统计的时候,时区使用的方法不是一致的,而postZone这个方法,在1.5版本已经废弃,说是使用timeZone替代,但经测试发现在按小时分组的时候,使用timeZone加8个时区的并没生效,后续看下最新版本的ElasticSearch是否修复。
(2)使用Terms的聚合分组时,这个字段最好是没有分过词的,否则大量的元数据返回,有可能会发生OOM的异常
(3)在不需要评分排名查询的场景中,尽量使用filter查询,elasticsearch会缓存查询结果,从而能大幅提高检索性能

今天先总结这么多,后续有空再关注下
(1)elasticsearch中的Aggregations和Facet的区别以及对比Solr中的Group和Facet的区别
(2)在不同的聚合渠道中多级分组中是组内有序还是全局有序



有什么问题 可以扫码关注微信公众号:我是攻城师(woshigcs),在后台留言咨询。
本公众号的内容是有关搜索和大数据技术和互联网等方面内容的分享,也是一个温馨的技术互动交流的小家园



本页内容版权归属为原作者,如有侵犯您的权益,请通知我们删除。

java map顺序 - 2016-04-09 18:04:52

hashmap  瞎排 treemap   首位为字母数字的话 按照首位排 linkedhashmap 按照放入的顺序

插入排序-学习篇(三) - 2016-04-09 18:04:42

插入排序: int array[] = {9,2,5,4,3,6,1,7,8};int temp = 0;int num = 0;int in = 0;for(int i = 1; i array.length ; i++){temp = array[i];in = i;while(in 0 array[in - 1] = temp){array[in] = array[in-1];--in;num++;//记录交换的次数}array[in] = temp;//插入}for(int i = 0; i ar
org/w3c/dom/ElementTraversal 错误解决办法           不记得之前几天把什么maven依赖包删除了,今天利用htmlunit运行代码的时候报了下面的错误:   Exception in thread "main" java.lang.NoClassDefFoundError: org/w3c/dom/ElementTraversalat java.lang.ClassLoader.defineClass1(Native Method)at java.lang.Class

JAVA多线程-同步工具类 - 2016-04-09 17:04:44

在讲同步工具类之前,笔者想先介绍一下Runnable,Callable,Future,Executor,FutureTask这几个类或接口。 1 Runnable和Callable是接口,实现两者的任何一个可以开发多线程。但是前者无法返回线程的执行结果,后者可以返回线程的执行结果。 2 Future是一个接口,用于等待或者查看Callable线程的返回结果。 3 Executor是线程的调度容器。 4 FutureTask是一个类,他实现了Runnable接口和Future接口。可以以一个实现了runnab

java基础学习 - 2016-04-09 15:04:06

Java逻辑运算符"","","|","||"解析:   逻辑运算符用于连接布尔型表达式。在Java中不同于数学的逻辑表达 3X5 ,java 中应该写成 x3 x5      "" 和"" 的区别;    单个"",左边无论真假,右边都进行计算。    双个"",左边为假,右边不进行计算。     "|"和"||" 的区别;    单个"|",左边无论真假,右边都进行计算。    双个"|",左边为真,右边不进行计算。       "|"和"" 做位运算     二进制进行|位运算,只有0|0时候为0

java中文分词例子程序 - 2016-04-09 15:04:07

由于项目中搜索时需要用到中文分词,于是今天研究了下java分词方法,在网上找到Ansj的开源Java分词器,自己测试了下将例子程序传上来,步骤如下: 1、先从 Github地址: https://github.com/ansjsun/ansj_seg 这里下载源程序,然后运行下打个jar包放到自己的工程里面。 2、其次在自己的工程中引入 nlp-lang-1.5.jar,其实第一步中已经引入到源程序中了,然后就可以直接使用了,源程序中有测试例子可以参考。 下面特将我下的的源程序和自己的例子程序传上来以便参

Java测试Junit和mockito - 2016-04-09 14:04:16

Mockito是一个开源mock框架,官网:http://mockito.org/,源码:https://github.com/mockito/mockito   Junit是 一个Java语言的单元测试框架,官网:http://junit.org/ 这两个jar包的下载地址是:http://download.csdn.net/detail/bgk083/9043363   单元测试(unit testing) ,是指对软件中的最小可测试单元进行检查和验证。对于单元测试中单元的含义,一般来说,要根据实际情
获取【下载地址】    QQ: 313596790   【免费支持更新】 三大数据库 mysql  oracle  sqlsever    更专业、更强悍、适合不同用户群体 【 新录针对本系统的视频教程,手把手教开发一个模块,快速掌握本系统 】 A 集成代码生成器(开发利器);                                         技术:313596790    增删改查的处理类,service层,mybatis的xml,SQL( mysql   和oracle)脚本,   js

Java的Random实例 - 2016-04-08 17:04:56

这两天出去玩,经常看大有各种**的小游戏,就想到以前朋友说的一个事儿:游侠装备**的时候如何保证玩家抽到的好装备较少。其实这个思路还挺简单的:生成一个随机数,判断若该随机数6或7什么的,就有好装备否则就是不好的。于是就琢磨了下,写个小例子玩玩。   说明:假设总共**10次,好东西3个,随机数大于6的时候才能有好东西。因此先声明静态变量: private static int goosNum = 3;//声明10次抽到好东西的数量private static int boundNum=6;//界限,若是大

Solrcloud部署 - 2016-04-08 15:04:08

Solrcloud部署文档 上传安装包 1.上传solrcloud-imsearch.zip至/usr/local/im目录 2.解压solrcloud-imsearch.zip,得到如下目录结构 配置zookeeper 1.修改zookeeper配置 cd zookeeper/conf/ vim zoo.cfg a.修改dataDir和dataLogDir目录或创建目录: mkdir  -p  /data/im-zookeeper/data mkdir  -p  /data/im-zookeeper/lo