Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
f412f45
Cancel query if task is cancelled
scrappyiron Apr 15, 2021
bc34d99
pass SearchRequest into ReduceContext with SearchTask
scrappyiron Apr 16, 2021
68ddd45
Revert "pass SearchRequest into ReduceContext with SearchTask"
scrappyiron May 16, 2021
ad8408b
Revert "Cancel query if task is cancelled"
scrappyiron May 16, 2021
c339bd7
add IT for cancel in aggregation
scrappyiron May 16, 2021
afddd96
Fix IT Test
imotov May 19, 2021
b20d038
Use task manager to hold a mapping from SearchRequest to CancellableTask
scrappyiron Jun 5, 2021
d862a66
Merge branch 'master' into search_query_cancel
scrappyiron Jun 6, 2021
79d50bf
Remove mutable task assignment
imotov Jun 8, 2021
4e82a2d
Merge branch 'master' into search_query_cancel
elasticmachine Jun 8, 2021
0f202af
Fix AsyncSearchTaskTests
imotov Jun 8, 2021
5ea8071
Merge remote-tracking branch 'elastic/master' into search_query_cancel
imotov Jun 22, 2021
c040704
Address review comments
imotov Jun 23, 2021
19e3806
Address review comments
imotov Jun 23, 2021
7c5287b
Merge remote-tracking branch 'elastic/master' into search_query_cancel
imotov Jun 23, 2021
fbf1b03
Improve failure logging.
imotov Jun 23, 2021
a3f6f0c
Merge branch 'master' into search_query_cancel
elasticmachine Jun 24, 2021
35eb2b1
Merge remote-tracking branch 'elastic/master' into search_query_cancel
imotov Sep 29, 2021
c08e6f7
Make the benchmark more realistic
imotov Sep 30, 2021
12cb412
Remove unnecessary cast
imotov Sep 30, 2021
6294e31
Merge branch 'master' into search_query_cancel
elasticmachine Sep 30, 2021
109b4b0
Fix imports
imotov Sep 30, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

@Warmup(iterations = 5)
@Measurement(iterations = 7)
Expand All @@ -64,21 +65,30 @@
@State(Scope.Thread)
@Fork(value = 1)
public class TermsReduceBenchmark {
private final SearchPhaseController controller = new SearchPhaseController(req -> new InternalAggregation.ReduceContextBuilder() {
@Override
public InternalAggregation.ReduceContext forPartialReduction() {
return InternalAggregation.ReduceContext.forPartialReduction(null, null, () -> PipelineAggregator.PipelineTree.EMPTY);
}

@Override
public InternalAggregation.ReduceContext forFinalReduction() {
final MultiBucketConsumerService.MultiBucketConsumer bucketConsumer = new MultiBucketConsumerService.MultiBucketConsumer(
Integer.MAX_VALUE,
new NoneCircuitBreakerService().getBreaker(CircuitBreaker.REQUEST)
);
return InternalAggregation.ReduceContext.forFinalReduction(null, null, bucketConsumer, PipelineAggregator.PipelineTree.EMPTY);
private final SearchPhaseController controller = new SearchPhaseController(
(task, req) -> new InternalAggregation.ReduceContextBuilder() {
@Override
public InternalAggregation.ReduceContext forPartialReduction() {
return InternalAggregation.ReduceContext.forPartialReduction(null, null, () -> PipelineAggregator.PipelineTree.EMPTY, task);
}

@Override
public InternalAggregation.ReduceContext forFinalReduction() {
final MultiBucketConsumerService.MultiBucketConsumer bucketConsumer = new MultiBucketConsumerService.MultiBucketConsumer(
Integer.MAX_VALUE,
new NoneCircuitBreakerService().getBreaker(CircuitBreaker.REQUEST)
);
return InternalAggregation.ReduceContext.forFinalReduction(
null,
null,
bucketConsumer,
PipelineAggregator.PipelineTree.EMPTY,
task
);
}
}
});
);

@State(Scope.Benchmark)
public static class TermsList extends AbstractList<InternalAggregations> {
Expand Down Expand Up @@ -182,11 +192,13 @@ public SearchPhaseController.ReducedQueryPhase reduceAggs(TermsList candidateLis
request.source(new SearchSourceBuilder().size(0).aggregation(AggregationBuilders.terms("test")));
request.setBatchedReduceSize(bufferSize);
ExecutorService executor = Executors.newFixedThreadPool(1);
AtomicBoolean isCanceled = new AtomicBoolean();
QueryPhaseResultConsumer consumer = new QueryPhaseResultConsumer(
request,
executor,
new NoopCircuitBreaker(CircuitBreaker.REQUEST),
controller,
isCanceled::get,
SearchProgressListener.NOOP,
shards.size(),
exc -> {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,8 @@ public void testReduceRandom() {
bigArrays,
mockScriptService,
b -> {},
PipelineTree.EMPTY
PipelineTree.EMPTY,
() -> false
);
InternalMatrixStats reduced = (InternalMatrixStats) shardResults.get(0).reduce(shardResults, context);
multiPassStats.assertNearlyEqual(reduced.getResults());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
import org.elasticsearch.script.MockScriptPlugin;
import org.elasticsearch.script.Script;
import org.elasticsearch.script.ScriptType;
import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.ScriptedMetricAggregationBuilder;
import org.elasticsearch.search.lookup.LeafStoredFieldsLookup;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskCancelledException;
Expand All @@ -53,8 +55,9 @@
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;

import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
import static org.elasticsearch.index.query.QueryBuilders.scriptQuery;
import static org.elasticsearch.search.SearchCancellationIT.ScriptedBlockPlugin.SCRIPT_NAME;
import static org.elasticsearch.search.SearchCancellationIT.ScriptedBlockPlugin.SEARCH_BLOCK_SCRIPT_NAME;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertFailures;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures;
import static org.hamcrest.Matchers.containsString;
Expand Down Expand Up @@ -94,7 +97,7 @@ private void indexTestData() {

private List<ScriptedBlockPlugin> initBlockFactory() {
List<ScriptedBlockPlugin> plugins = new ArrayList<>();
for (PluginsService pluginsService : internalCluster().getDataNodeInstances(PluginsService.class)) {
for (PluginsService pluginsService : internalCluster().getInstances(PluginsService.class)) {
plugins.addAll(pluginsService.filterPlugins(ScriptedBlockPlugin.class));
}
for (ScriptedBlockPlugin plugin : plugins) {
Expand Down Expand Up @@ -159,7 +162,7 @@ public void testCancellationDuringQueryPhase() throws Exception {
logger.info("Executing search");
ActionFuture<SearchResponse> searchResponse = client().prepareSearch("test").setQuery(
scriptQuery(new Script(
ScriptType.INLINE, "mockscript", SCRIPT_NAME, Collections.emptyMap())))
ScriptType.INLINE, "mockscript", SEARCH_BLOCK_SCRIPT_NAME, Collections.emptyMap())))
.execute();

awaitForBlock(plugins);
Expand All @@ -177,7 +180,7 @@ public void testCancellationDuringFetchPhase() throws Exception {
logger.info("Executing search");
ActionFuture<SearchResponse> searchResponse = client().prepareSearch("test")
.addScriptField("test_field",
new Script(ScriptType.INLINE, "mockscript", SCRIPT_NAME, Collections.emptyMap())
new Script(ScriptType.INLINE, "mockscript", SEARCH_BLOCK_SCRIPT_NAME, Collections.emptyMap())
).execute();

awaitForBlock(plugins);
Expand All @@ -187,6 +190,71 @@ public void testCancellationDuringFetchPhase() throws Exception {
ensureSearchWasCancelled(searchResponse);
}

public void testCancellationDuringAggregation() throws Exception {
List<ScriptedBlockPlugin> plugins = initBlockFactory();
indexTestData();

logger.info("Executing search");
TermsAggregationBuilder termsAggregationBuilder = new TermsAggregationBuilder("test_agg");
if (randomBoolean()) {
termsAggregationBuilder.script(new Script(
ScriptType.INLINE,
"mockscript",
ScriptedBlockPlugin.TERM_SCRIPT_NAME,
Collections.emptyMap()
));
} else {
termsAggregationBuilder.field("field.keyword");
}

ActionFuture<SearchResponse> searchResponse = client()
.prepareSearch("test")
.setQuery(matchAllQuery())
.addAggregation(
termsAggregationBuilder
.subAggregation(
new ScriptedMetricAggregationBuilder("sub_agg")
.initScript(
new Script(
ScriptType.INLINE,
"mockscript",
ScriptedBlockPlugin.INIT_SCRIPT_NAME,
Collections.emptyMap()
)
)
.mapScript(
new Script(
ScriptType.INLINE,
"mockscript",
ScriptedBlockPlugin.MAP_SCRIPT_NAME,
Collections.emptyMap()
)
)
.combineScript(
new Script(
ScriptType.INLINE,
"mockscript",
ScriptedBlockPlugin.COMBINE_SCRIPT_NAME,
Collections.emptyMap()
)
)
.reduceScript(
new Script(
ScriptType.INLINE,
"mockscript",
ScriptedBlockPlugin.REDUCE_SCRIPT_NAME,
Collections.emptyMap()
)
)
)
)
.execute();
awaitForBlock(plugins);
cancelSearch(SearchAction.NAME);
disableBlocks(plugins);
ensureSearchWasCancelled(searchResponse);
}

public void testCancellationOfScrollSearches() throws Exception {

List<ScriptedBlockPlugin> plugins = initBlockFactory();
Expand All @@ -198,7 +266,7 @@ public void testCancellationOfScrollSearches() throws Exception {
.setSize(5)
.setQuery(
scriptQuery(new Script(
ScriptType.INLINE, "mockscript", SCRIPT_NAME, Collections.emptyMap())))
ScriptType.INLINE, "mockscript", SEARCH_BLOCK_SCRIPT_NAME, Collections.emptyMap())))
.execute();

awaitForBlock(plugins);
Expand Down Expand Up @@ -228,7 +296,7 @@ public void testCancellationOfScrollSearchesOnFollowupRequests() throws Exceptio
.setSize(2)
.setQuery(
scriptQuery(new Script(
ScriptType.INLINE, "mockscript", SCRIPT_NAME, Collections.emptyMap())))
ScriptType.INLINE, "mockscript", SEARCH_BLOCK_SCRIPT_NAME, Collections.emptyMap())))
.get();

assertNotNull(searchResponse.getScrollId());
Expand Down Expand Up @@ -261,7 +329,7 @@ public void testCancelMultiSearch() throws Exception {
List<ScriptedBlockPlugin> plugins = initBlockFactory();
indexTestData();
ActionFuture<MultiSearchResponse> msearchResponse = client().prepareMultiSearch().add(client().prepareSearch("test")
.addScriptField("test_field", new Script(ScriptType.INLINE, "mockscript", SCRIPT_NAME, Collections.emptyMap())))
.addScriptField("test_field", new Script(ScriptType.INLINE, "mockscript", SEARCH_BLOCK_SCRIPT_NAME, Collections.emptyMap())))
.execute();
awaitForBlock(plugins);
cancelSearch(MultiSearchAction.NAME);
Expand Down Expand Up @@ -311,7 +379,7 @@ public void testCancelFailedSearchWhenPartialResultDisallowed() throws Exception
SearchPhaseExecutionException e = expectThrows(SearchPhaseExecutionException.class, () ->
client().prepareSearch("test")
.setSearchType(SearchType.QUERY_THEN_FETCH)
.setQuery(scriptQuery(new Script(ScriptType.INLINE, "mockscript", SCRIPT_NAME, Collections.emptyMap())))
.setQuery(scriptQuery(new Script(ScriptType.INLINE, "mockscript", SEARCH_BLOCK_SCRIPT_NAME, Collections.emptyMap())))
.setAllowPartialSearchResults(false).setSize(1000).get());
assertThat(e.getMessage(), containsString("Partial shards failure"));
});
Expand Down Expand Up @@ -351,7 +419,12 @@ List<SearchTask> getSearchTasks() {
}

public static class ScriptedBlockPlugin extends MockScriptPlugin {
static final String SCRIPT_NAME = "search_block";
static final String SEARCH_BLOCK_SCRIPT_NAME = "search_block";
static final String INIT_SCRIPT_NAME = "init";
static final String MAP_SCRIPT_NAME = "map";
static final String COMBINE_SCRIPT_NAME = "combine";
static final String REDUCE_SCRIPT_NAME = "reduce";
static final String TERM_SCRIPT_NAME = "term";

private final AtomicInteger hits = new AtomicInteger();

Expand All @@ -377,21 +450,52 @@ public void setBeforeExecution(Runnable runnable) {

@Override
public Map<String, Function<Map<String, Object>, Object>> pluginScripts() {
return Collections.singletonMap(SCRIPT_NAME, params -> {
final Runnable runnable = beforeExecution.get();
if (runnable != null) {
runnable.run();
}
LeafStoredFieldsLookup fieldsLookup = (LeafStoredFieldsLookup) params.get("_fields");
LogManager.getLogger(SearchCancellationIT.class).info("Blocking on the document {}", fieldsLookup.get("_id"));
hits.incrementAndGet();
try {
assertBusy(() -> assertFalse(shouldBlock.get()));
} catch (Exception e) {
throw new RuntimeException(e);
}
return true;
});
return Map.of(
SEARCH_BLOCK_SCRIPT_NAME, this::searchBlockScript,
INIT_SCRIPT_NAME, this::nullScript,
MAP_SCRIPT_NAME, this::nullScript,
COMBINE_SCRIPT_NAME, this::nullScript,
REDUCE_SCRIPT_NAME, this::blockScript,
TERM_SCRIPT_NAME, this::termScript);
}

private Object searchBlockScript(Map<String, Object> params) {
final Runnable runnable = beforeExecution.get();
if (runnable != null) {
runnable.run();
}
LeafStoredFieldsLookup fieldsLookup = (LeafStoredFieldsLookup) params.get("_fields");
LogManager.getLogger(SearchCancellationIT.class).info("Blocking on the document {}", fieldsLookup.get("_id"));
hits.incrementAndGet();
try {
assertBusy(() -> assertFalse(shouldBlock.get()));
} catch (Exception e) {
throw new RuntimeException(e);
}
return true;
}

private Object nullScript(Map<String, Object> params) {
return null;
}

private Object blockScript(Map<String, Object> params) {
final Runnable runnable = beforeExecution.get();
if (runnable != null) {
runnable.run();
}
LogManager.getLogger(SearchCancellationIT.class).info("Blocking in reduce");
hits.incrementAndGet();
try {
assertBusy(() -> assertFalse(shouldBlock.get()));
} catch (Exception e) {
throw new RuntimeException(e);
}
return 42;
}

private Object termScript(Map<String, Object> params) {
return 1;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Supplier;

import static org.elasticsearch.action.search.SearchPhaseController.getTopDocsSize;
import static org.elasticsearch.action.search.SearchPhaseController.mergeTopDocs;
Expand Down Expand Up @@ -73,6 +74,7 @@ public QueryPhaseResultConsumer(SearchRequest request,
Executor executor,
CircuitBreaker circuitBreaker,
SearchPhaseController controller,
Supplier<Boolean> isCanceled,
SearchProgressListener progressListener,
int expectedResultSize,
Consumer<Exception> onPartialMergeFailure) {
Expand All @@ -81,7 +83,7 @@ public QueryPhaseResultConsumer(SearchRequest request,
this.circuitBreaker = circuitBreaker;
this.controller = controller;
this.progressListener = progressListener;
this.aggReduceContextBuilder = controller.getReduceContext(request);
this.aggReduceContextBuilder = controller.getReduceContext(isCanceled, request);
this.topNSize = getTopDocsSize(request);
this.performFinalReduce = request.isFinalReduce();
this.onPartialMergeFailure = onPartialMergeFailure;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@

import com.carrotsearch.hppc.IntArrayList;
import com.carrotsearch.hppc.ObjectObjectHashMap;

import org.apache.lucene.index.Term;
import org.apache.lucene.search.CollectionStatistics;
import org.apache.lucene.search.FieldDoc;
Expand Down Expand Up @@ -55,17 +54,20 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.IntFunction;
import java.util.function.Supplier;
import java.util.stream.Collectors;

public final class SearchPhaseController {
private static final ScoreDoc[] EMPTY_DOCS = new ScoreDoc[0];

private final Function<SearchRequest, InternalAggregation.ReduceContextBuilder> requestToAggReduceContextBuilder;
private final BiFunction<Supplier<Boolean>, SearchRequest, InternalAggregation.ReduceContextBuilder> requestToAggReduceContextBuilder;

public SearchPhaseController(Function<SearchRequest, InternalAggregation.ReduceContextBuilder> requestToAggReduceContextBuilder) {
public SearchPhaseController(
BiFunction<Supplier<Boolean>, SearchRequest, InternalAggregation.ReduceContextBuilder> requestToAggReduceContextBuilder
) {
this.requestToAggReduceContextBuilder = requestToAggReduceContextBuilder;
}

Expand Down Expand Up @@ -630,20 +632,22 @@ private SearchProfileResults buildSearchProfileResults(Collection<? extends Sear
}
}

InternalAggregation.ReduceContextBuilder getReduceContext(SearchRequest request) {
return requestToAggReduceContextBuilder.apply(request);
InternalAggregation.ReduceContextBuilder getReduceContext(Supplier<Boolean> isCanceled, SearchRequest request) {
return requestToAggReduceContextBuilder.apply(isCanceled, request);
}

/**
* Returns a new {@link QueryPhaseResultConsumer} instance that reduces search responses incrementally.
*/
QueryPhaseResultConsumer newSearchPhaseResults(Executor executor,
CircuitBreaker circuitBreaker,
Supplier<Boolean> isCanceled,
SearchProgressListener listener,
SearchRequest request,
int numShards,
Consumer<Exception> onPartialMergeFailure) {
return new QueryPhaseResultConsumer(request, executor, circuitBreaker, this, listener, numShards, onPartialMergeFailure);
return new QueryPhaseResultConsumer(request, executor, circuitBreaker,
this, isCanceled, listener, numShards, onPartialMergeFailure);
}

static final class TopDocsStats {
Expand Down
Loading