Skip to content

Interrupt aggregation reduce phase if the search task is cancelled#71714

Merged
imotov merged 22 commits into
elastic:masterfrom
scrappyiron:search_query_cancel
Oct 1, 2021
Merged

Interrupt aggregation reduce phase if the search task is cancelled#71714
imotov merged 22 commits into
elastic:masterfrom
scrappyiron:search_query_cancel

Conversation

@scrappyiron

Copy link
Copy Markdown
Contributor

This change raises a TaskCancelledException to stop the search query if it is detected that the SearchTask has been cancelled during the reduce phase.

Issue: #71021

@elasticsearchmachine elasticsearchmachine added the external-contributor Pull request authored by a developer outside the Elasticsearch team label Apr 15, 2021
@scrappyiron scrappyiron force-pushed the search_query_cancel branch from 9af99f3 to f412f45 Compare April 15, 2021 00:49
@elasticmachine elasticmachine added the Team:Analytics Meta label for analytical engine team (ESQL/Aggs/Geo) label Apr 15, 2021
@elasticmachine

Copy link
Copy Markdown
Collaborator

Pinging @elastic/es-analytics-geo (Team:Analytics)

@imotov imotov self-requested a review April 15, 2021 16:35

@imotov imotov left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be great to add an integration test for it and figure out a more robust way of passing the search task into aggregations. I also renamed the PR to better reflect what it is actually trying to achieve.

private final NamedWriteableRegistry namedWriteableRegistry;
private final Function<SearchRequest, InternalAggregation.ReduceContextBuilder> requestToAggReduceContextBuilder;

private SearchTask searchTask;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SearchPhaseController is a singleton that is created here, so this instance of the searchTask is going to be shared by all searches in the node leading to a race condition.

@imotov imotov changed the title Cancel query if task is cancelled Interrupt aggregation reduce phase if the search task is cancelled Apr 15, 2021
@scrappyiron scrappyiron marked this pull request as draft April 16, 2021 00:39
@scrappyiron scrappyiron force-pushed the search_query_cancel branch from f286295 to bc34d99 Compare April 16, 2021 00:45
@imotov

imotov commented Apr 19, 2021

Copy link
Copy Markdown
Contributor

I don't think it is a good idea to rely on a side-effect of somebody calling createTask and I definitely prefer not to stash task on the request. This implementation also doesn't always work since we override createTask when we do async search. I would rather figure out how to make searchTask available in the aggregation layer properly. But before we jump into implementation I think we should start with a proper test for it.

@scrappyiron

Copy link
Copy Markdown
Contributor Author

Sounds good, I'll work on the integration test first then.

@imotov

imotov commented May 19, 2021

Copy link
Copy Markdown
Contributor

@danielwhsu it looks like are reusing the same script that is used in search and it fails on init because it is executed in a completely different context and has no idea where to find fieldsLookup that it uses for logging. So, it throws an exception there and never blocks. When it times out on waiting on block it is typically a good idea to try reading the searchResponse instead, it frequently contains useful errors. While I was at it, I fixed a few other issues that you would have ran into next. I think we have a failing test now. Time to fix it!

@scrappyiron

Copy link
Copy Markdown
Contributor Author

Thanks for the help! I'll look into piping the task into the reduce phase.

@scrappyiron scrappyiron force-pushed the search_query_cancel branch from e35a866 to 8eb554d Compare June 6, 2021 00:57
@scrappyiron scrappyiron force-pushed the search_query_cancel branch from 8eb554d to b20d038 Compare June 6, 2021 01:00
@scrappyiron

scrappyiron commented Jun 6, 2021

Copy link
Copy Markdown
Contributor Author

@imotov I've updated this PR with a solution that passes the unit tests.

The main changes are:

  1. Update the TaskManager to keep track of a mapping from SearchRequest id to CancellableTask.
  2. Pass the TaskManager into the SearchService.
  3. Have the SearchService use the TaskManager to figure out which CancellableTask is associated with the SearchRequest, and put that CancellableTask into the ReduceContext it creates in forPartialReduction() and forFinalReduction().
  4. Let the ReduceContext raise an exception if it sees that the CancellableTask is cancelled.

I went with this approach because when the SearchService creates the ReduceContext in aggReduceContextBuilder(), the only information it uses to create the ReduceContext is the SearchRequest. Since the ReduceContext needs access to the CancellableTask to check if the task is cancelled, I needed some way for aggReduceContextBuilder() to figure out which CancellableTask is associated with the SearchRequest argument. That's why I built the SearchRequest to CancellableTask mapping into the TaskManager.

@scrappyiron scrappyiron marked this pull request as ready for review June 6, 2021 04:54
@scrappyiron scrappyiron marked this pull request as draft June 6, 2021 04:55
@scrappyiron scrappyiron marked this pull request as ready for review June 6, 2021 04:58
@imotov

imotov commented Jun 23, 2021

Copy link
Copy Markdown
Contributor

@elasticmachine test this please

return true;
});
return Map.of(
SCRIPT_NAME, this::searchBlockScript,

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rename to SEARCH_SCRIPT_NAME?

.setQuery(matchAllQuery())
.addAggregation(
new TermsAggregationBuilder("test_agg")
.script(new Script(

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you use a regular terms agg? If not probably worth a comment why. I spent a little while trying to figure out if the terms script created a second block or something before I went and read it.

* the maximum number of buckets allowed in a response
*/
public void consumeBucketsAndMaybeBreak(int size) {
if (isCanceled.get()) {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's worth adding a comment that this is a volatile read. Do we want to do that on every consume? We don't check the memory breaker on every consume so maybe we should do this either?

I see that AtomicBoolean has a bunch of fun new ways to fetch the value without memory effects - but only in java 9, so, sadly, probably not for us yet.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My reasoning here was that this happens in the reduce context and as you said, it's AtomicBoolean.get(). Do you think it will have noticeable overhead here? I can probably move inside MultiBucketConsumer and call it every few buckets if you think it makes sense.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not really sure! Its the kind of location that could suffer from it but I don't know how much without benchmarks. I'd feel quite comfortable without benchmarks if it were in the same 1024 calls thing, but it wouldn't interrupt as immediately. I expect the checks are much lighter than the circuit breaker checks but heavier than the general bucket building stuff.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! I will try to run some benchmarks to see what's going on there.

@imotov

imotov commented Jun 23, 2021

Copy link
Copy Markdown
Contributor

@elasticmachine test this please

@imotov

imotov commented Jun 23, 2021

Copy link
Copy Markdown
Contributor

@elasticmachine test this please

@imotov

imotov commented Jun 24, 2021

Copy link
Copy Markdown
Contributor

@elasticmachine update branch

@scrappyiron

Copy link
Copy Markdown
Contributor Author

@imotov just checking, is this still something we want to pursue?

@imotov

imotov commented Sep 1, 2021

Copy link
Copy Markdown
Contributor

@imotov just checking, is this still something we want to pursue?

Definitely. I just don't have time to address @nik9000's concerns in #71714 (comment) at the moment due to other pressing work.

@imotov

imotov commented Sep 29, 2021

Copy link
Copy Markdown
Contributor

@elasticmachine test this please

@imotov

imotov commented Sep 30, 2021

Copy link
Copy Markdown
Contributor

I have updated the existing benchmark to mimic the checking for cancellation and ran it with both checks for cancellation and the checks commented out. The results are pretty much identical within expected margin of error:

Without Cancellation Check:

Benchmark                        (bufferSize)  (cardinalityFactor)  (numShards)      (seed)  (topNSize)  Mode  Cnt     Score     Error  Units
TermsReduceBenchmark.reduceAggs            32                    1           64  1600172297         100  avgt    7    51.275 ±   1.404  ms/op
TermsReduceBenchmark.reduceAggs            32                    1          128  1600172297         100  avgt    7   102.531 ±   1.225  ms/op
TermsReduceBenchmark.reduceAggs            32                    1          512  1600172297         100  avgt    7   409.667 ±   2.869  ms/op
TermsReduceBenchmark.reduceAggs            32                   10           64  1600172297         100  avgt    7   167.183 ±   0.862  ms/op
TermsReduceBenchmark.reduceAggs            32                   10          128  1600172297         100  avgt    7   458.448 ±   4.404  ms/op
TermsReduceBenchmark.reduceAggs            32                   10          512  1600172297         100  avgt    7  2926.366 ±  24.859  ms/op
TermsReduceBenchmark.reduceAggs            32                  100           64  1600172297         100  avgt    7   176.265 ±   1.751  ms/op
TermsReduceBenchmark.reduceAggs            32                  100          128  1600172297         100  avgt    7   553.663 ±   4.629  ms/op
TermsReduceBenchmark.reduceAggs            32                  100          512  1600172297         100  avgt    7  6847.864 ± 461.645  ms/op
TermsReduceBenchmark.reduceAggs           512                    1           64  1600172297         100  avgt    7    51.425 ±   0.574  ms/op
TermsReduceBenchmark.reduceAggs           512                    1          128  1600172297         100  avgt    7   118.018 ±   1.096  ms/op
TermsReduceBenchmark.reduceAggs           512                    1          512  1600172297         100  avgt    7   626.532 ±   6.566  ms/op
TermsReduceBenchmark.reduceAggs           512                   10           64  1600172297         100  avgt    7    30.153 ±   0.783  ms/op
TermsReduceBenchmark.reduceAggs           512                   10          128  1600172297         100  avgt    7    59.465 ±   0.934  ms/op
TermsReduceBenchmark.reduceAggs           512                   10          512  1600172297         100  avgt    7   236.926 ±   1.939  ms/op
TermsReduceBenchmark.reduceAggs           512                  100           64  1600172297         100  avgt    7    31.553 ±   0.642  ms/op
TermsReduceBenchmark.reduceAggs           512                  100          128  1600172297         100  avgt    7    45.052 ±   0.841  ms/op
TermsReduceBenchmark.reduceAggs           512                  100          512  1600172297         100  avgt    7   168.656 ±   2.558  ms/op

With Cancellation Check:

Benchmark                        (bufferSize)  (cardinalityFactor)  (numShards)      (seed)  (topNSize)  Mode  Cnt     Score     Error  Units
TermsReduceBenchmark.reduceAggs            32                    1           64  1600172297         100  avgt    7    50.337 ±   0.373  ms/op
TermsReduceBenchmark.reduceAggs            32                    1          128  1600172297         100  avgt    7   103.912 ±   1.516  ms/op
TermsReduceBenchmark.reduceAggs            32                    1          512  1600172297         100  avgt    7   407.468 ±   3.463  ms/op
TermsReduceBenchmark.reduceAggs            32                   10           64  1600172297         100  avgt    7   192.436 ±   2.470  ms/op
TermsReduceBenchmark.reduceAggs            32                   10          128  1600172297         100  avgt    7   453.878 ±   6.764  ms/op
TermsReduceBenchmark.reduceAggs            32                   10          512  1600172297         100  avgt    7  2911.667 ±  21.592  ms/op
TermsReduceBenchmark.reduceAggs            32                  100           64  1600172297         100  avgt    7   180.069 ±   0.968  ms/op
TermsReduceBenchmark.reduceAggs            32                  100          128  1600172297         100  avgt    7   541.983 ±   3.675  ms/op
TermsReduceBenchmark.reduceAggs            32                  100          512  1600172297         100  avgt    7  6879.713 ± 711.939  ms/op
TermsReduceBenchmark.reduceAggs           512                    1           64  1600172297         100  avgt    7    51.895 ±   0.436  ms/op
TermsReduceBenchmark.reduceAggs           512                    1          128  1600172297         100  avgt    7   119.713 ±   1.480  ms/op
TermsReduceBenchmark.reduceAggs           512                    1          512  1600172297         100  avgt    7   681.412 ±   6.904  ms/op
TermsReduceBenchmark.reduceAggs           512                   10           64  1600172297         100  avgt    7    38.695 ±   0.649  ms/op
TermsReduceBenchmark.reduceAggs           512                   10          128  1600172297         100  avgt    7    61.097 ±   0.630  ms/op
TermsReduceBenchmark.reduceAggs           512                   10          512  1600172297         100  avgt    7   308.060 ±   4.110  ms/op
TermsReduceBenchmark.reduceAggs           512                  100           64  1600172297         100  avgt    7    22.936 ±   0.408  ms/op
TermsReduceBenchmark.reduceAggs           512                  100          128  1600172297         100  avgt    7    44.244 ±   0.288  ms/op
TermsReduceBenchmark.reduceAggs           512                  100          512  1600172297         100  avgt    7   163.627 ±   2.500  ms/op

final TaskId parentTaskId = task.taskInfo(clusterService.localNode().getId(), false).getTaskId();
ccsRemoteReduce(parentTaskId, rewritten, localIndices, remoteClusterIndices, timeProvider,
searchService.aggReduceContextBuilder(rewritten),
searchService.aggReduceContextBuilder(((CancellableTask) task)::isCancelled, rewritten),

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you need this cast any more now that you've changed the signature?

@imotov

imotov commented Sep 30, 2021

Copy link
Copy Markdown
Contributor

@elasticmachine update branch

@imotov

imotov commented Sep 30, 2021

Copy link
Copy Markdown
Contributor

@elasticmachine test this please

@imotov

imotov commented Sep 30, 2021

Copy link
Copy Markdown
Contributor

@elasticmachine test this please

@imotov imotov merged commit ec40e08 into elastic:master Oct 1, 2021
imotov added a commit to imotov/elasticsearch that referenced this pull request Oct 2, 2021
…lastic#71714)

This change raises a TaskCancelledException to stop the search query if it is detected that the SearchTask has been cancelled during the reduce phase.

Issue: elastic#71021

Co-authored-by: Daniel Hsu <daniel.hsu7@gmail.com>
Co-authored-by: Igor Motov <igor@motovs.org>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

:Analytics/Aggregations Aggregations >enhancement external-contributor Pull request authored by a developer outside the Elasticsearch team Team:Analytics Meta label for analytical engine team (ESQL/Aggs/Geo) v7.16.0 v8.0.0-beta1

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants