1.核心概念
- bucket: 一个数据分组(类似于sql group by以后的数据)
- metric:对bucket执行的某种聚合分析的操作,比如说求平均值,最大值,最小值。一些系列的统计方法(类似 select count(1) MAX MIN AVG)
请求参数说明:
size: 0 ,//只要聚合结果,不要原始数据,不等于0会返回原始数据 aggs: 固定语法,对数据进行分组聚合操作(类似于group by操作) terms: 根据字段的值进行分组 field: 根据指定的字段值进行分组 |
返回参数说明:
hits.hits: 我们指定的size是0,所以hits.hits就是空,否则会返回聚合原始数据 aggregations:聚合结果 buckets:根据聚合条件返回的结果集 key: 每个bucket对应分组条件的值 doc_count:每个bucket分组内数据量 默认排序规则:安装doc_count 降序排列 |
Aggregation 的语法
例如:
2.java代码关于脚本解析(Painless 编程调试)
第一种使用自定义聚合脚本
Map<String, List<TimeDuration>> resMap = new HashMap<String, List<TimeDuration>>(); String index = this.getIndex(); SearchRequest searchRequest = new SearchRequest(index); SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder().size(0); BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery(); TermQueryBuilder bdsIdQueryBuilder = QueryBuilders.termQuery("bdsId", bdsId); BoolQueryBuilder lessThanStartQueryBuilder = QueryBuilders.boolQuery(); RangeQueryBuilder faultStartLessThanStart = QueryBuilders.rangeQuery("faultStartTime").lte(start); RangeQueryBuilder faultEndLessThanStart = QueryBuilders.rangeQuery("faultEndTime").lte(start); lessThanStartQueryBuilder.must(faultStartLessThanStart); lessThanStartQueryBuilder.must(faultEndLessThanStart); BoolQueryBuilder greatThanStartQueryBuilder = QueryBuilders.boolQuery(); RangeQueryBuilder faultStartGreatThanStart = QueryBuilders.rangeQuery("faultStartTime").gte(end); RangeQueryBuilder faultEndGreatThanStart = QueryBuilders.rangeQuery("faultEndTime").gte(end); greatThanStartQueryBuilder.must(faultStartGreatThanStart); greatThanStartQueryBuilder.must(faultEndGreatThanStart); boolQueryBuilder.must(bdsIdQueryBuilder); boolQueryBuilder.mustNot(lessThanStartQueryBuilder); boolQueryBuilder.mustNot(greatThanStartQueryBuilder); searchSourceBuilder.query(boolQueryBuilder); //这里开始是脚本聚合,自定义脚本聚合可以互相交互 ScriptedMetricAggregationBuilder scriptedMetricAggregationBuilder = AggregationBuilders .scriptedMetric("fault_duration"); //参数初始化 Map<String, Object> params = new HashMap<String, Object>(); params.put("now", now.getTime()); params.put("start", start.getTime()); params.put("end", end.getTime()); //脚本初始化 Script initScript = ScriptUtil.getScriptBy(CABLEFAULT_SCRIPT_PATH, "init_script.js"); Script mapScript = ScriptUtil.getScriptBy(CABLEFAULT_SCRIPT_PATH, "map_script.js"); Script combineScript = ScriptUtil.getScriptBy(CABLEFAULT_SCRIPT_PATH, "combine_script.js"); Script reduceScript = ScriptUtil.getScriptBy(CABLEFAULT_SCRIPT_PATH, "reduce_script.js"); scriptedMetricAggregationBuilder.params(params); scriptedMetricAggregationBuilder.initScript(initScript); scriptedMetricAggregationBuilder.mapScript(mapScript); scriptedMetricAggregationBuilder.combineScript(combineScript); scriptedMetricAggregationBuilder.reduceScript(reduceScript); TermsAggregationBuilder termsAggregationBuilder = AggregationBuilders.terms("group_by_cableid") .field("cableId"); termsAggregationBuilder.subAggregation(scriptedMetricAggregationBuilder); termsAggregationBuilder.size(ESPage.DEFAULT_ALL_PAGESIZE); termsAggregationBuilder.minDocCount(0); searchSourceBuilder.aggregation(termsAggregationBuilder); ESPage.Builder builder = new ESPage.Builder(ESPage.DEFAULT_ALL_PAGESIZE).pageNo(0); ESPage esPage = builder.build(); searchSourceBuilder.from(Long.valueOf(esPage.getFrom()).intValue()); searchSourceBuilder.size(0); searchRequest.source(searchSourceBuilder); //执行发送 ES调用查询 SearchResponse searchResponse = this.restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT); Aggregations aggs = searchResponse.getAggregations();
第二种混合调用
String index = this.getIndex();
SearchRequest searchRequest = new SearchRequest(index);
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder().size(0);
BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
TermQueryBuilder bdsIdQueryBuilder = QueryBuilders.termQuery("bdsId", bdsId);
RangeQueryBuilder evaluateTimeQueryBuilder = QueryBuilders.rangeQuery("evaluateTime").gte(start).lte(end);
TermQueryBuilder evaluateTypeQueryBuilder = QueryBuilders.termQuery("evaluateType",
String.valueOf(evaluateType.getValue()));
boolQueryBuilder.must(bdsIdQueryBuilder);
boolQueryBuilder.must(evaluateTimeQueryBuilder);
boolQueryBuilder.must(evaluateTypeQueryBuilder);
searchSourceBuilder.query(boolQueryBuilder);
TopHitsAggregationBuilder topHitsAggregationBuilder = AggregationBuilders.topHits("top_record");
topHitsAggregationBuilder.sort("createDate", SortOrder.ASC);
topHitsAggregationBuilder.size(CommonConstant.ES_MAX_TOP_HITS);
Script filterScript=ScriptUtil.getScriptBy(HEALTH_PVLOSS_SCRIPT_PATH,"pv_filter_script.js");
TermsAggregationBuilder groupByEvaluateTimeAggregationBuilder = AggregationBuilders
.terms("group_by_bds_id_evaluate_time");
groupByEvaluateTimeAggregationBuilder.script(filterScript);//注意一下这里,与第一种方式调用不一样
groupByEvaluateTimeAggregationBuilder.subAggregation(topHitsAggregationBuilder);
groupByEvaluateTimeAggregationBuilder.size(CommonConstant.ES_MAX_PAGE_SIZE);
groupByEvaluateTimeAggregationBuilder.minDocCount(0);
TermsAggregationBuilder groupByPvCodeAggregationBuilder = AggregationBuilders.terms("group_by_pvcode")
.field("pvCode");
groupByPvCodeAggregationBuilder.subAggregation(groupByEvaluateTimeAggregationBuilder);
groupByPvCodeAggregationBuilder.size(CommonConstant.ES_MAX_PAGE_SIZE);
groupByPvCodeAggregationBuilder.minDocCount(0);
searchSourceBuilder.aggregation(groupByPvCodeAggregationBuilder);
ESPage.Builder builder = new ESPage.Builder(ESPage.DEFAULT_ALL_PAGESIZE).pageNo(0);
ESPage esPage = builder.build();
searchSourceBuilder.size(0);
searchRequest.indices(index);
searchRequest.source(searchSourceBuilder);
SearchResponse searchResponse = this.restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
Aggregations aggs = searchResponse.getAggregations();