ときどき起きる

だいたい寝ている

Elasticsearchのタイムアウトは何をどうタイムアウトするのか追ってみる

こんにちは、pakioです。

Elasticsearchのsearch apiではQuery Parameter, Query DSLともにサポートされているtimeoutオプションが存在します。

www.elastic.co

(Optional, time units) Specifies the period of time to wait for a response from each shard. If no response is received before the timeout expires, the request fails and returns an error. Defaults to no timeout.

このドキュメントから読み取れる挙動だとタイムアウトした場合エラーが返りそうなものですが、実際にはデフォルトでtrueに設定されているallow_partial_search_resultsによって取得できた結果まで返すような挙動となるようです。

allow_partial_search_results (Optional, Boolean) If true, returns partial results if there are shard request timeouts or shard failures. If false, returns an error with no partial results. Defaults to true.

一方、ひとえに検索リクエストといってもsearch, score, aggregate, ... など様々なフェーズがあるわけですが、どこまでが打ち切られるかについては明記されていません。 実際にある程度の規模のインデックスに対してtimeoutオプションを指定した状態で重めのaggregationなどをかけると、レスポンスを受け取るまでの時間が設定した時間を大幅に上回るケースも観測しました。

そこで今回はElasticsearchのタイムアウトについて少し深堀りしてみたので、その調査結果をまとめます。

なお、本ブログはv8.5.0時点のソースコード及びドキュメントを参考に執筆しています。

TL;DR

timeoutパラメータによって

  • 中断される処理

    • search
    • scoring
    • collapsing
    • post filter
  • 中断されない処理

    • aggregation
    • rescoring
    • suggest

大まかな処理の流れについて

Elasticsearchの大まかな処理の流れについては公式のブログがわかりやすくまとめてくれています。

www.elastic.co

この中でも今回関係するのが、Searching a Shard, … Then Fetchのセクションです。

ブログ内にも書かれている通り、Elasticsearchでは各shard毎にデータを保持しているため、リクエストが振り分けられた各shard毎にfetch, score, aggregateなどの操作を行い、coordinatorに返却、結果の集約を行っています。1

ここからはさらにシャード内でどのようにタイムアウトが取り扱われているかを覗いてみます。

各シャード内でのタイムアウトの取り扱いについて

  • この周辺のコードリーディング自体が不慣れなので、もし認識に誤りがある場合ご指摘いただけると大変ありがたいです。

各シャードにリクエストが振り分けられた後、SearchService内にてcontextが生成されます。
Elasticsearchの検索処理ではこのコンテキストに様々な情報(参考:DefaultQueryContext.java)を詰めて、これをベースに様々な処理を展開していくことになる、キーとなるオブジェクトです。

その初期化処理の一貫で、もしユーザーがタイムアウトを指定していた場合、コンテキストに設定値を保存します。

        if (source.timeout() != null) {
            context.timeout(source.timeout());
        }

リンク

SearchService内から呼ばれるQueryPhase内では、このコンテキスト内にセットされたタイムアウトの値をもとに、管理オブジェクトの生成時から指定した時間経過した際にTimeExceededExceptionを返すような関数を登録します。

            boolean timeoutSet = scrollContext == null
                && searchContext.timeout() != null
                && searchContext.timeout().equals(SearchService.NO_TIMEOUT) == false;

            final Runnable timeoutRunnable;
            if (timeoutSet) {
                final long startTime = searchContext.getRelativeTimeInMillis();
                final long timeout = searchContext.timeout().millis();
                final long maxTime = startTime + timeout;
                timeoutRunnable = searcher.addQueryCancellation(() -> {
                    final long time = searchContext.getRelativeTimeInMillis();
                    if (time > maxTime) {
                        throw new TimeExceededException();
                    }
                });
            } else {
                timeoutRunnable = null;
            }

リンク

その後ContextIndexSearcherにて実際の検索フェーズに入りますが、ここで各セグメントを処理するsearchLeafの先頭にて、cancellable.checkCancelled()が呼ばれ、上記の処理で設定した関数を実行しタイムアウトのチェックを行います。

private void searchLeaf(LeafReaderContext ctx, Weight weight, Collector collector) throws IOException {
        cancellable.checkCancelled();
...

リンク

また、同様の関数はさらにsearchLeaf内のスコアリング処理 intersectScorerAndBitSetの先頭および末尾でもチェックされます。

    static void intersectScorerAndBitSet(Scorer scorer, BitSet acceptDocs, LeafCollector collector, Runnable checkCancelled)
        throws IOException {
        collector.setScorer(scorer);
        // ConjunctionDISI uses the DocIdSetIterator#cost() to order the iterators, so if roleBits has the lowest cardinality it should
        // be used first:
        DocIdSetIterator iterator = ConjunctionUtils.intersectIterators(
            Arrays.asList(new BitSetIterator(acceptDocs, acceptDocs.approximateCardinality()), scorer.iterator())
        );
        int seen = 0;
        checkCancelled.run();
        for (int docId = iterator.nextDoc(); docId < DocIdSetIterator.NO_MORE_DOCS; docId = iterator.nextDoc()) {
            if (++seen % CHECK_CANCELLED_SCORER_INTERVAL == 0) {
                checkCancelled.run();
            }
            collector.collect(docId);
        }
        checkCancelled.run();
    }

リンク

この2か所にて、実行開始から指定時間が経過していた場合、前述のとおりTimeExceededExceptionが投げられますが、この時点で走査した分のドキュメントに関してはcontextに保持された状態になっています。

また、このままだとExceptionが返る結果となってしまいますが、実際には少し戻ったQueryPhaseにてallow_partial_search_result (デフォルト: true)のチェックが行われます。ここでもし途中までの結果で返却することが許容されていた場合、Exceptionを返す代わりにsearchTimedOutをtrueにセットし、そこまでの結果で残りの処理を続行します。

        } catch (TimeExceededException e) {
            assert timeoutSet : "TimeExceededException thrown even though timeout wasn't set";
            if (searchContext.request().allowPartialSearchResults() == false) {
                // Can't rethrow TimeExceededException because not serializable
                throw new QueryPhaseExecutionException(searchContext.shardTarget(), "Time exceeded");
            }
            queryResult.searchTimedOut(true);
        }

リンク


...

ここまでの処理はすべてQueryPhaseのexecute内、executeInternalで行われます。一方、aggregationやsuggest、rescoreに関してはexecuteInternalと同レベルで別処理として実行されるため、タイムアウトの考慮時間に含まれません。= タイムアウトで指定した時間を超過しても、処理が続行されます。

    public static void execute(SearchContext searchContext) throws QueryPhaseExecutionException {
        if (searchContext.hasOnlySuggest()) {
            SuggestPhase.execute(searchContext);
            searchContext.queryResult()
                .topDocs(
                    new TopDocsAndMaxScore(new TopDocs(new TotalHits(0, TotalHits.Relation.EQUAL_TO), Lucene.EMPTY_SCORE_DOCS), Float.NaN),
                    new DocValueFormat[0]
                );
            return;
        }

        if (LOGGER.isTraceEnabled()) {
            LOGGER.trace("{}", new SearchContextSourcePrinter(searchContext));
        }

        // Pre-process aggregations as late as possible. In the case of a DFS_Q_T_F
        // request, preProcess is called on the DFS phase, this is why we pre-process them
        // here to make sure it happens during the QUERY phase
        AggregationPhase.preProcess(searchContext);
        boolean rescore = executeInternal(searchContext);

        if (rescore) { // only if we do a regular search
            RescorePhase.execute(searchContext);
        }
        SuggestPhase.execute(searchContext);
        AggregationPhase.execute(searchContext);

        if (searchContext.getProfilers() != null) {
            searchContext.queryResult().profileResults(searchContext.getProfilers().buildQueryPhaseResults());
        }
    }

追記: タイムアウトされたリクエストのキャッシュについて

Elasticsearchではシャードレベルのリクエストキャッシュやセグメント単位のクエリキャッシュが存在しますが、このタイムアウト処理によって前者のリクエストキャッシュが影響を受けるようです。

リクエストキャッシュからの読み込み、保存処理などはIndicesServiceのloadIntoContextで実行されています。その際コンテキスト内のqueryResultのタイムアウトフラグがtrueとなっていた場合、indicesRequestCacheのinvalidateが実行され、その結果のキャッシュが消されることとなります。

        } else if (context.queryResult().searchTimedOut()) {
            // we have to invalidate the cache entry if we cached a query result form a request that timed out.
            // we can't really throw exceptions in the loading part to signal a timed out search to the outside world since if there are
            // multiple requests that wait for the cache entry to be calculated they'd fail all with the same exception.
            // instead we all caching such a result for the time being, return the timed out result for all other searches with that cache
            // key invalidate the result in the thread that caused the timeout. This will end up to be simpler and eventually correct since
            // running a search that times out concurrently will likely timeout again if it's run while we have this `stale` result in the
            // cache. One other option is to not cache requests with a timeout at all...
            indicesRequestCache.invalidate(
                new IndexShardCacheEntity(context.indexShard()),
                context.getSearchExecutionContext().mappingCacheKey(),
                directoryReader,
                cacheKey
            );
            if (logger.isTraceEnabled()) {
                logger.trace(
                    "Query timed out, invalidating cache entry for request on shard [{}]:\n {}",
                    request.shardId(),
                    request.source()
                );
            }
        }

リンク

もしリクエストキャッシュでキャッシュ可能なクエリが多数あるような環境の場合、タイムアウトの設定値には慎重になったほうが良さそうです。


まとめ

本ポストではElasticsearchのタイムアウトについてその実装を追ってみましたが、以下がその結果を簡単にまとめたものになります。

中断される処理 中断されない処理
search aggregation
scoring rescoring
collapsing
post filtering

表からも、実際には他のソフトウェア文脈で利用されるような厳密なタイムアウトではなく、一部の処理にしか適用されないものだとわかります。
極端な例にはなりますがクエリは軽いがヒット件数が多い & aggregation条件が複数ある・重い ような検索だとその恩恵は受けられなさそうです。一方純粋な検索処理だけであれば十分に効果のあるパラメータではあるため、検索条件は単純だがドキュメント数が膨れなおかつ全件取得する必要がないようなケースでは有効かもしれません。


  1. この内容は、すべてsearch_type = query_then_fetchを前提としています。