Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,13 @@

import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Set;
import java.util.TreeMap;

import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg;
Expand Down Expand Up @@ -312,6 +315,23 @@ public static long getBehind(TransformCheckpoint oldCheckpoint, TransformCheckpo
return newCheckPointOperationsSum - oldCheckPointOperationsSum;
}

public static Collection<String> getChangedIndices(TransformCheckpoint oldCheckpoint, TransformCheckpoint newCheckpoint) {
if (oldCheckpoint.isEmpty()) {
return newCheckpoint.indicesCheckpoints.keySet();
}

Set<String> indices = new HashSet<>();

for (Entry<String, long[]> entry : newCheckpoint.indicesCheckpoints.entrySet()) {
// compare against the old checkpoint
if (Arrays.equals(entry.getValue(), oldCheckpoint.indicesCheckpoints.get(entry.getKey())) == false) {
indices.add(entry.getKey());
}
}

return indices;
}

private static Map<String, long[]> readCheckpoints(Map<String, Object> readMap) {
Map<String, long[]> checkpoints = new TreeMap<>();
for (Map.Entry<String, Object> e : readMap.entrySet()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,13 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.stream.Collectors;

import static org.elasticsearch.test.TestMatchers.matchesPattern;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.lessThan;

public class TransformCheckpointTests extends AbstractSerializingTransformTestCase<TransformCheckpoint> {

Expand Down Expand Up @@ -191,6 +195,74 @@ public void testGetBehind() {
assertEquals((indices - 2) * shards * 10L, TransformCheckpoint.getBehind(checkpointOld, checkpointTransientNew));
}

public void testGetChangedIndices() {
String baseIndexName = randomAlphaOfLength(8);
String id = randomAlphaOfLengthBetween(1, 10);
long timestamp = randomNonNegativeLong();

TreeMap<String, long[]> checkpointsByIndexOld = new TreeMap<>();
TreeMap<String, long[]> checkpointsByIndexNew = new TreeMap<>();

int indices = randomIntBetween(5, 20);
int shards = randomIntBetween(1, 20);

for (int i = 0; i < indices; ++i) {
List<Long> checkpoints1 = new ArrayList<>();
List<Long> checkpoints2 = new ArrayList<>();

for (int j = 0; j < shards; ++j) {
long shardCheckpoint = randomLongBetween(-1, 1_000_000);
checkpoints1.add(shardCheckpoint);
if (i % 3 == 0) {
checkpoints2.add(shardCheckpoint + 10);
} else {
checkpoints2.add(shardCheckpoint);
}
}

String indexName = baseIndexName + i;

if (i < 15) {
checkpointsByIndexOld.put(indexName, checkpoints1.stream().mapToLong(l -> l).toArray());
}
if (i % 5 != 0) {
checkpointsByIndexNew.put(indexName, checkpoints2.stream().mapToLong(l -> l).toArray());
}
}
long checkpoint = randomLongBetween(10, 100);
TransformCheckpoint checkpointOld = new TransformCheckpoint(id, timestamp, checkpoint, checkpointsByIndexOld, 0L);
TransformCheckpoint checkpointNew = new TransformCheckpoint(id, timestamp, checkpoint + 1, checkpointsByIndexNew, 0L);

Set<Integer> changedIndexes = TransformCheckpoint.getChangedIndices(checkpointOld, checkpointNew)
.stream()
.map(x -> Integer.parseInt(x.substring(baseIndexName.length())))
.collect(Collectors.toSet());

assertThat(changedIndexes.size(), lessThan(indices));

for (int i = 0; i < indices; ++i) {
if (i >= 15) {
if (i % 5 == 0) {
assertFalse(changedIndexes.contains(i));
} else {
assertTrue(changedIndexes.contains(i));
}
} else if (i % 5 == 0) {
assertFalse(changedIndexes.contains(i));
} else if (i % 3 == 0) {
assertTrue(changedIndexes.contains(i));
} else {
assertFalse(changedIndexes.contains(i));
}
}

// check against empty
assertThat(
TransformCheckpoint.getChangedIndices(TransformCheckpoint.EMPTY, checkpointNew),
equalTo(checkpointNew.getIndicesCheckpoints().keySet())
);
}

private static Map<String, long[]> randomCheckpointsByIndex() {
Map<String, long[]> checkpointsByIndex = new TreeMap<>();
int indices = randomIntBetween(1, 10);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.elasticsearch.common.logging.LoggerMessageFormat;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.index.engine.VersionConflictEngineException;
import org.elasticsearch.index.mapper.MapperParsingException;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
Expand Down Expand Up @@ -59,6 +60,7 @@
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

Expand All @@ -71,7 +73,7 @@ class ClientTransformIndexer extends TransformIndexer {
private final AtomicBoolean oldStatsCleanedUp = new AtomicBoolean(false);

private final AtomicReference<SeqNoPrimaryTermAndIndex> seqNoPrimaryTermAndIndex;
private volatile PointInTimeBuilder pit;
private final ConcurrentHashMap<String, PointInTimeBuilder> namedPits = new ConcurrentHashMap<>();
private volatile long pitCheckpoint;
private volatile boolean disablePit = false;

Expand Down Expand Up @@ -250,11 +252,7 @@ void doGetInitialProgress(SearchRequest request, ActionListener<SearchResponse>

@Override
void doGetFieldMappings(ActionListener<Map<String, String>> fieldMappingsListener) {
SchemaUtil.getDestinationFieldMappings(
client,
getConfig().getDestination().getIndex(),
fieldMappingsListener
);
SchemaUtil.getDestinationFieldMappings(client, getConfig().getDestination().getIndex(), fieldMappingsListener);
}

/**
Expand Down Expand Up @@ -363,12 +361,20 @@ protected void onStop() {
}

private void closePointInTime() {
for (String name : namedPits.keySet()) {
closePointInTime(name);
}
}

private void closePointInTime(String name) {
PointInTimeBuilder pit = namedPits.remove(name);

if (pit == null) {
return;
}

String oldPit = pit.getEncodedId();
pit = null;

ClosePointInTimeRequest closePitRequest = new ClosePointInTimeRequest(oldPit);
ClientHelper.executeWithHeadersAsync(
transformConfig.getHeaders(),
Expand All @@ -383,20 +389,25 @@ private void closePointInTime() {
);
}

private void injectPointInTimeIfNeeded(SearchRequest searchRequest, ActionListener<SearchRequest> listener) {
private void injectPointInTimeIfNeeded(
Tuple<String, SearchRequest> namedSearchRequest,
ActionListener<Tuple<String, SearchRequest>> listener
) {
if (disablePit) {
listener.onResponse(searchRequest);
listener.onResponse(namedSearchRequest);
return;
}

SearchRequest searchRequest = namedSearchRequest.v2();
PointInTimeBuilder pit = namedPits.get(namedSearchRequest.v1());
if (pit != null) {
searchRequest.source().pointInTimeBuilder(pit);
listener.onResponse(searchRequest);
listener.onResponse(namedSearchRequest);
return;
}

// no pit, create a new one
OpenPointInTimeRequest pitRequest = new OpenPointInTimeRequest(transformConfig.getSource().getIndex()).keepAlive(PIT_KEEP_ALIVE);
OpenPointInTimeRequest pitRequest = new OpenPointInTimeRequest(searchRequest.indices()).keepAlive(PIT_KEEP_ALIVE);

ClientHelper.executeWithHeadersAsync(
transformConfig.getHeaders(),
Expand All @@ -405,11 +416,17 @@ private void injectPointInTimeIfNeeded(SearchRequest searchRequest, ActionListen
OpenPointInTimeAction.INSTANCE,
pitRequest,
ActionListener.wrap(response -> {
pit = new PointInTimeBuilder(response.getPointInTimeId()).setKeepAlive(PIT_KEEP_ALIVE);
searchRequest.source().pointInTimeBuilder(pit);
PointInTimeBuilder newPit = new PointInTimeBuilder(response.getPointInTimeId()).setKeepAlive(PIT_KEEP_ALIVE);
namedPits.put(namedSearchRequest.v1(), newPit);
searchRequest.source().pointInTimeBuilder(newPit);
pitCheckpoint = getNextCheckpoint().getCheckpoint();
logger.trace("[{}] using pit search context with id [{}]", getJobId(), pit.getEncodedId());
listener.onResponse(searchRequest);
logger.trace(
"[{}] using pit search context with id [{}]; request [{}]",
getJobId(),
newPit.getEncodedId(),
namedSearchRequest.v1()
);
listener.onResponse(namedSearchRequest);
}, e -> {
Throwable unwrappedException = ExceptionsHelper.findSearchExceptionRootCause(e);
// if point in time is not supported, disable it but do not remember forever (stopping and starting will give it another
Expand All @@ -433,25 +450,27 @@ private void injectPointInTimeIfNeeded(SearchRequest searchRequest, ActionListen
e
);
}
listener.onResponse(searchRequest);
listener.onResponse(namedSearchRequest);
})
);
}

private void doSearch(SearchRequest searchRequest, ActionListener<SearchResponse> listener) {
logger.trace("searchRequest: {}", searchRequest);
private void doSearch(Tuple<String, SearchRequest> namedSearchRequest, ActionListener<SearchResponse> listener) {
logger.trace(() -> new ParameterizedMessage("searchRequest: [{}]", namedSearchRequest.v2()));

PointInTimeBuilder pit = namedSearchRequest.v2().pointInTimeBuilder();

ClientHelper.executeWithHeadersAsync(
transformConfig.getHeaders(),
ClientHelper.TRANSFORM_ORIGIN,
client,
SearchAction.INSTANCE,
searchRequest,
namedSearchRequest.v2(),
ActionListener.wrap(response -> {
// did the pit change?
if (response.pointInTimeId() != null && (pit == null || response.pointInTimeId() != pit.getEncodedId())) {
pit = new PointInTimeBuilder(response.pointInTimeId()).setKeepAlive(PIT_KEEP_ALIVE);
logger.trace("point in time handle has changed");
namedPits.put(namedSearchRequest.v1(), new PointInTimeBuilder(response.pointInTimeId()).setKeepAlive(PIT_KEEP_ALIVE));
logger.trace("point in time handle has changed; request [{}]", namedSearchRequest.v1());
}

listener.onResponse(response);
Expand All @@ -461,15 +480,22 @@ private void doSearch(SearchRequest searchRequest, ActionListener<SearchResponse
// succeeds a new pit gets created at the next run
Throwable unwrappedException = ExceptionsHelper.findSearchExceptionRootCause(e);
if (unwrappedException instanceof SearchContextMissingException) {
logger.warn(new ParameterizedMessage("[{}] Search context missing, falling back to normal search.", getJobId()), e);
pit = null;
searchRequest.source().pointInTimeBuilder(null);
logger.warn(
new ParameterizedMessage(
"[{}] Search context missing, falling back to normal search; request [{}]",
getJobId(),
namedSearchRequest.v1()
),
e
);
namedPits.remove(namedSearchRequest.v1());
namedSearchRequest.v2().source().pointInTimeBuilder(null);
ClientHelper.executeWithHeadersAsync(
transformConfig.getHeaders(),
ClientHelper.TRANSFORM_ORIGIN,
client,
SearchAction.INSTANCE,
searchRequest,
namedSearchRequest.v2(),
listener
);
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.xpack.core.transform.transforms.SourceConfig;
import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpoint;
import org.elasticsearch.xpack.core.transform.transforms.TransformIndexerStats;
import org.elasticsearch.xpack.core.transform.transforms.TransformProgress;

import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.stream.Stream;
Expand Down Expand Up @@ -74,13 +76,20 @@ public interface ChangeCollector {
/**
* Build the filter query to narrow the result set given the previously collected changes.
*
* TODO: it might be useful to have the full checkpoint data.
*
* @param lastCheckpointTimestamp the timestamp of the last checkpoint
* @param nextCheckpointTimestamp the timestamp of the next (in progress) checkpoint
* @param lastCheckpoint the last checkpoint
* @param nextCheckpoint the next (in progress) checkpoint
* @return a filter query, null in case of no filter
*/
QueryBuilder buildFilterQuery(long lastCheckpointTimestamp, long nextCheckpointTimestamp);
QueryBuilder buildFilterQuery(TransformCheckpoint lastCheckpoint, TransformCheckpoint nextCheckpoint);

/**
* Filter indices according to the given checkpoints.
*
* @param lastCheckpoint the last checkpoint
* @param nextCheckpoint the next (in progress) checkpoint
* @return set of indices to query
*/
Collection<String> getIndicesToQuery(TransformCheckpoint lastCheckpoint, TransformCheckpoint nextCheckpoint);

/**
* Clear the internal state to free up memory.
Expand Down
Loading