登录
  • 欢迎访问悠扬的技术博客,推荐使用最新版火狐浏览器和Chrome浏览器访问本网站😉

Elasticsearch高级聚合查询

Elasticsearch 悠扬 819次浏览 已收录 0个评论

1.聚合脚本处理


GET /idx_znyg_datakgzdb/_search
{
  "size": 0,
   //必须是0 否则会把整个数据输出
  "query": {
 //查询条件,先过滤数据,=SQL语法 where
    "bool": {
      "must": [
        {
          "term": {
            "bdsId": {
              "value": "nxzkjy_640303_20200706_000000031"
            }
          }
        }, {
          "range": {
            "fetchTime": {
              "gte": "2020-12-01 00:00:00",
              "lte": "2020-12-01 23:59:59"
            }
          }
        }
      ]
    }
  },
  "aggs": {
 //聚合语法,关键字 类似SQL语法中group by
    "group_by_dbid": {
  //命名随意,与aggs对应,可以理解分组数据集合的名称
      "terms": {
 //分组条件
        "field": "dbId",
  //指定分组字段
        "size": 65535
  //每个桶的数据量
      },
  //引用
      "_source": {
         "includes": [
              "bdsId",
              "nbqId",
              "fetchTime",
              "pwOutputActive"
          ],
         "excludes": []
      },
      "aggs": {
//针对桶内数据操作处理
        "db_energy": {
          "scripted_metric": {
            "init_script": {
              "lang": "painless",
              "source": """
                state.dbEnergyMap=new java.util.HashMap();

              """
            },
            "map_script": {
              "lang": "painless",
              "source": """
                String dbId = doc['dbId'].value;
                double pwPositiveTotal = doc['pwPositiveTotal'].value;
                long fetchMillis = doc['fetchTime'].value.getMillis();
                String fetchKey = '0000000000' + fetchMillis;
                fetchKey = fetchKey.substring(fetchKey.length() - 18, fetchKey.length());
                TreeMap qCombineMap = state.dbEnergyMap.get(dbId);
                if(qCombineMap == null) {
                    qCombineMap = new java.util.TreeMap();
                }
                qCombineMap.put(fetchKey, pwPositiveTotal);
                state.dbEnergyMap.put(dbId, qCombineMap);

              """
            },
            "combine_script": {
              "lang": "painless",
              "source": """
                return state;

              """
            },
            "reduce_script": {
              "lang": "painless",
              "source": """
                Map resMap = new java.util.HashMap();
                Map dbEnergyMap = new java.util.HashMap();
                for(state in states) {
                    if(state != null) {
                        for(dbId in state.dbEnergyMap.keySet()) {
                            TreeMap allQCombineMap = dbEnergyMap.get(dbId);
                            Map qCombineMap = state.dbEnergyMap.get(dbId);
                            if(allQCombineMap == null) {
                                allQCombineMap = new java.util.TreeMap();
                            }
                            allQCombineMap.putAll(qCombineMap);
                            dbEnergyMap.put(dbId, allQCombineMap);
                        }
                    }
                }
                for(dbId in dbEnergyMap.keySet()) {
                    TreeMap qCombineMap = dbEnergyMap.get(dbId);
                    double qCombine = qCombineMap.get(qCombineMap.lastKey()) - qCombineMap.get(qCombineMap.firstKey());
                    resMap.put(dbId, qCombine);
                }
                return resMap;

              """
            }
          }
        }
      }
    }
  }
}

 

查询参数:

shard_min_doc_count 

       shard_min_doc_count参数用于调节如果索引词实际上应该被添加到候选列表或者不应该被添加到min_doc_count中的分片所具有的确定性。如果集合中的局部碎片频率高于shard_min_doc_count计数,则仅考虑索引词。如果您的文档包含许多低频索引词,并且您对这些索引词不感兴趣(例如,拼写错误),那么您可以设置shard_min_doc_count参数以筛选分片级别上的候选索引词,即使经过合理的确定也不会达到所需的min_doc_count。合并本地计数。默认情况下,shard_min_doc_count设置为0,除非显式设置,否则不会产生任何影响。

注意
      设置shard_min_doc_count=0也将返回与不匹配的条件的分组。然而,一些返回的文档计数为零的索引词可能只属于从其他类型删除的文档或文档,因此没有保证._all查询将找到这些索引词的正文档计数。

警告
      当不是用文档计数降序排序时,min_doc_count取值太大将使返回一些小于大小的分组,因为没有从分片收集足够的数据。丢失分组可以通过增加shard_size来恢复。设置shard_min_doc_count值太高会导致在分片级别上过滤掉索引词。这个值应该比min_doc_count/#shards低很多。

min_doc_count 

      规定了最终结果的筛选,返回最小的文档数。强制返回空数据。如果是0,时间间隔内缺少数据,则自动补充0.一般场景就是返回空数据,减少程序的处理。

Map<String, BigDecimal> resMap = new HashMap<>();
   String index = this.getIndex();
   SearchRequest searchRequest = new SearchRequest(index);
   SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder().size(0);

   Date start = TimeUtil.getStatisticStartTime(currentTime, EnumTimeUnit.DAYS);
   Date end = currentTime;

   BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
   TermQueryBuilder bdsIdQueryBuilder = QueryBuilders.termQuery("bdsId", bdsId);
   RangeQueryBuilder fetchTimeQueryBuilder = QueryBuilders.rangeQuery("fetchTime").gte(start).lte(end);
   RangeQueryBuilder qOutputDailyQueryBuilder = QueryBuilders.rangeQuery("qOutputDaily").gt(0.0)
         .lte(NBQ_SERIES_DAILY_MAX_ENERGY);
   TermQueryBuilder delFlagQueryBuilder = QueryBuilders.termQuery("delFlag", DelFlagEnum.NORMAL.value());

   boolQueryBuilder.must(bdsIdQueryBuilder);
   boolQueryBuilder.must(fetchTimeQueryBuilder);
   boolQueryBuilder.must(qOutputDailyQueryBuilder);
   boolQueryBuilder.must(delFlagQueryBuilder);

   searchSourceBuilder.query(boolQueryBuilder);

   ScriptedMetricAggregationBuilder scriptedMetricAggregationBuilder = AggregationBuilders
         .scriptedMetric("nbq_energy");

   Script initScript = ScriptUtil.getScriptBy(NBQ_DAILY_SCRIPT_PATH, "init_script.js");
   Script mapScript = ScriptUtil.getScriptBy(NBQ_DAILY_SCRIPT_PATH, "map_script.js");
   Script combineScript = ScriptUtil.getScriptBy(NBQ_DAILY_SCRIPT_PATH, "combine_script.js");
   Script reduceScript = ScriptUtil.getScriptBy(NBQ_DAILY_SCRIPT_PATH, "reduce_script.js");
   scriptedMetricAggregationBuilder.initScript(initScript);
   scriptedMetricAggregationBuilder.mapScript(mapScript);
   scriptedMetricAggregationBuilder.combineScript(combineScript);
   scriptedMetricAggregationBuilder.reduceScript(reduceScript);

   //这里会生成 min_doc_count:1  shard_min_doc_count:0
   TermsAggregationBuilder termsAggregationBuilder = AggregationBuilders.terms("group_by_nbq").field("nbqId");
   termsAggregationBuilder.subAggregation(scriptedMetricAggregationBuilder);
   termsAggregationBuilder.size(ESPage.DEFAULT_ALL_PAGESIZE);

   searchSourceBuilder.aggregation(termsAggregationBuilder);
   //这里设置 from和size为0 意思是不放query查询出的数据,只返回分组统计数据
   searchSourceBuilder.from(0);
   searchSourceBuilder.size(0);

   searchRequest.indices(index);
   searchRequest.source(searchSourceBuilder);

   SearchResponse searchResponse = this.restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
   Aggregations aggs = searchResponse.getAggregations();
   if (aggs != null) {
      Map<String, Aggregation> aggMap = aggs.getAsMap();
      ParsedMultiBucketAggregation agg = (ParsedMultiBucketAggregation) aggMap.get("group_by_nbq");
      List<MultiBucketsAggregation.Bucket> list = (List<MultiBucketsAggregation.Bucket>) agg.getBuckets();
      for (MultiBucketsAggregation.Bucket bucket : list) {
         String nbqId = bucket.getKeyAsString();
         Map<String, Aggregation> nbqAggsMap = bucket.getAggregations().getAsMap();
         Aggregation aggr = nbqAggsMap.get("nbq_energy");
         if (aggr instanceof ParsedScriptedMetric) {
            ParsedScriptedMetric metric = (ParsedScriptedMetric) aggr;
            Map<String, Object> metaMap = (Map<String, Object>) metric.aggregation();
            Object data = metaMap.get(nbqId);
            if (data instanceof String) {
               if (NumberUtil.isNumber(StringUtils.trimToEmpty((String) data))) {
                  BigDecimal energy = new BigDecimal(StringUtils.trimToEmpty((String) data));
                  resMap.put(nbqId, energy);
               }
            } else if (data instanceof Double) {
               BigDecimal energy = Optional.ofNullable(BigDecimal.valueOf((Double) data))
                     .orElse(BigDecimal.valueOf(0.0));
               resMap.put(nbqId, energy);
            } else if (data instanceof BigDecimal) {
               BigDecimal energy = Optional.ofNullable((BigDecimal) data).orElse(BigDecimal.valueOf(0.0));
               resMap.put(nbqId, energy);
            }
         }
      }
   }
   return resMap;
}

nbq.json

demo


版权所有丨如未注明 , 均为原创丨本网站采用BY-NC-SA协议进行授权 , 转载请注明Elasticsearch高级聚合查询
喜欢 (0)
支付宝[]
分享 (0)
悠扬
关于作者:
10年以上工作经验,从事2年微服务架构搭建工作,有大数据处理相关工作经验,使用spring全家桶包括:Spring,SpringBoot,SpringCloud 数据层组件服务使用SpringDataJpa,Mybatis以及其他第三方组件Sharding-JDBC,Sharding-Proxy分库分表。熟悉微服务,服务降级,限流,分流,做过项目源码修改,有cat,apollo,nacos使用经验,有Lostash,Elasticsearch,kibana,mysqlMHA生产实践经验,使用开源代码Apache Sarding项目,修改源码支持mysql分库分表使用年月日小时分库分表,docker做集群服务,Jekins做项目发布,GitLab做项目管理,使用docker容器部署,熟悉消息队列RabbitMQ,Kafka,ActiveMQ。RuoYi-Vue-Atomikos项目开源加入生态圈组件,项目支持分布式事务,界面添加多数据源,数据源动态配置,切面切换,多数据源事务支持,支持区域数据源配置,用于区域数据切分,数据层次分库。项目地址:https://gitee.com/zsiyang/ruoyi-vue-atomikos
发表我的评论
取消评论

表情 贴图 加粗 删除线 居中 斜体 签到

Hi,您需要填写昵称和邮箱!

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址