summaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorSimon Willnauer <simon.willnauer@elasticsearch.com>2016-09-14 15:25:25 +0200
committerGitHub <noreply@github.com>2016-09-14 15:25:25 +0200
commitd402ca0dd721590bb516776aca67a8804c5649be (patch)
treee4af3d249e6ac2710cb232c0190826f4f08ea70b /core
parentc1e84618a674d7a54e52725c6d927343085bb01d (diff)
Remove poor-mans compression in InternalSearchHit and friends (#20472)
We still use some crazy poor mans compression in InternalSearchHit that uses a thread local and an unordered map as a lookup table if requested. Stuff like this should be handled by compression on the transport layer rather than in-line in the serialization code. This code is complex enough.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/java/org/elasticsearch/search/fetch/FetchSearchResult.java15
-rw-r--r--core/src/main/java/org/elasticsearch/search/internal/InternalSearchHit.java51
-rw-r--r--core/src/main/java/org/elasticsearch/search/internal/InternalSearchHits.java93
-rw-r--r--core/src/main/java/org/elasticsearch/search/suggest/completion/CompletionSuggestion.java6
-rw-r--r--core/src/test/java/org/elasticsearch/search/internal/InternalSearchHitTests.java17
-rw-r--r--core/src/test/java/org/elasticsearch/search/sort/FieldSortIT.java1
6 files changed, 28 insertions, 155 deletions
diff --git a/core/src/main/java/org/elasticsearch/search/fetch/FetchSearchResult.java b/core/src/main/java/org/elasticsearch/search/fetch/FetchSearchResult.java
index ed8c0358db..eac878569e 100644
--- a/core/src/main/java/org/elasticsearch/search/fetch/FetchSearchResult.java
+++ b/core/src/main/java/org/elasticsearch/search/fetch/FetchSearchResult.java
@@ -21,14 +21,13 @@ package org.elasticsearch.search.fetch;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.internal.InternalSearchHits;
import org.elasticsearch.transport.TransportResponse;
import java.io.IOException;
-import static org.elasticsearch.search.internal.InternalSearchHits.StreamContext;
-
/**
*
*/
@@ -70,9 +69,17 @@ public class FetchSearchResult extends TransportResponse implements FetchSearchR
}
public void hits(InternalSearchHits hits) {
+ assert assertNoSearchTarget(hits);
this.hits = hits;
}
+ private boolean assertNoSearchTarget(InternalSearchHits hits) {
+ for (SearchHit hit : hits.hits()) {
+ assert hit.getShard() == null : "expected null but got: " + hit.getShard();
+ }
+ return true;
+ }
+
public InternalSearchHits hits() {
return hits;
}
@@ -96,13 +103,13 @@ public class FetchSearchResult extends TransportResponse implements FetchSearchR
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
id = in.readLong();
- hits = InternalSearchHits.readSearchHits(in, InternalSearchHits.streamContext().streamShardTarget(StreamContext.ShardTargetType.NO_STREAM));
+ hits = InternalSearchHits.readSearchHits(in);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeLong(id);
- hits.writeTo(out, InternalSearchHits.streamContext().streamShardTarget(StreamContext.ShardTargetType.NO_STREAM));
+ hits.writeTo(out);
}
}
diff --git a/core/src/main/java/org/elasticsearch/search/internal/InternalSearchHit.java b/core/src/main/java/org/elasticsearch/search/internal/InternalSearchHit.java
index e8ba4d88aa..227fe90ee6 100644
--- a/core/src/main/java/org/elasticsearch/search/internal/InternalSearchHit.java
+++ b/core/src/main/java/org/elasticsearch/search/internal/InternalSearchHit.java
@@ -39,7 +39,6 @@ import org.elasticsearch.search.SearchHitField;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.fetch.subphase.highlight.HighlightField;
-import org.elasticsearch.search.internal.InternalSearchHits.StreamContext.ShardTargetType;
import org.elasticsearch.search.lookup.SourceLookup;
import java.io.IOException;
@@ -554,18 +553,14 @@ public class InternalSearchHit implements SearchHit {
return builder;
}
- public static InternalSearchHit readSearchHit(StreamInput in, InternalSearchHits.StreamContext context) throws IOException {
+ public static InternalSearchHit readSearchHit(StreamInput in) throws IOException {
InternalSearchHit hit = new InternalSearchHit();
- hit.readFrom(in, context);
+ hit.readFrom(in);
return hit;
}
@Override
public void readFrom(StreamInput in) throws IOException {
- readFrom(in, InternalSearchHits.streamContext().streamShardTarget(ShardTargetType.STREAM));
- }
-
- public void readFrom(StreamInput in, InternalSearchHits.StreamContext context) throws IOException {
score = in.readFloat();
id = in.readOptionalText();
type = in.readOptionalText();
@@ -644,26 +639,13 @@ public class InternalSearchHit implements SearchHit {
matchedQueries[i] = in.readString();
}
}
-
- if (context.streamShardTarget() == ShardTargetType.STREAM) {
- if (in.readBoolean()) {
- shard = new SearchShardTarget(in);
- }
- } else if (context.streamShardTarget() == ShardTargetType.LOOKUP) {
- int lookupId = in.readVInt();
- if (lookupId > 0) {
- shard = context.handleShardLookup().get(lookupId);
- }
- }
-
+ shard = in.readOptionalWriteable(SearchShardTarget::new);
size = in.readVInt();
if (size > 0) {
innerHits = new HashMap<>(size);
for (int i = 0; i < size; i++) {
String key = in.readString();
- ShardTargetType shardTarget = context.streamShardTarget();
- InternalSearchHits value = InternalSearchHits.readSearchHits(in, context.streamShardTarget(ShardTargetType.NO_STREAM));
- context.streamShardTarget(shardTarget);
+ InternalSearchHits value = InternalSearchHits.readSearchHits(in);
innerHits.put(key, value);
}
}
@@ -671,10 +653,6 @@ public class InternalSearchHit implements SearchHit {
@Override
public void writeTo(StreamOutput out) throws IOException {
- writeTo(out, InternalSearchHits.streamContext().streamShardTarget(ShardTargetType.STREAM));
- }
-
- public void writeTo(StreamOutput out, InternalSearchHits.StreamContext context) throws IOException {
out.writeFloat(score);
out.writeOptionalText(id);
out.writeOptionalText(type);
@@ -752,31 +730,14 @@ public class InternalSearchHit implements SearchHit {
out.writeString(matchedFilter);
}
}
-
- if (context.streamShardTarget() == ShardTargetType.STREAM) {
- if (shard == null) {
- out.writeBoolean(false);
- } else {
- out.writeBoolean(true);
- shard.writeTo(out);
- }
- } else if (context.streamShardTarget() == ShardTargetType.LOOKUP) {
- if (shard == null) {
- out.writeVInt(0);
- } else {
- out.writeVInt(context.shardHandleLookup().get(shard));
- }
- }
-
+ out.writeOptionalWriteable(shard);
if (innerHits == null) {
out.writeVInt(0);
} else {
out.writeVInt(innerHits.size());
for (Map.Entry<String, InternalSearchHits> entry : innerHits.entrySet()) {
out.writeString(entry.getKey());
- ShardTargetType shardTarget = context.streamShardTarget();
- entry.getValue().writeTo(out, context.streamShardTarget(ShardTargetType.NO_STREAM));
- context.streamShardTarget(shardTarget);
+ entry.getValue().writeTo(out);
}
}
}
diff --git a/core/src/main/java/org/elasticsearch/search/internal/InternalSearchHits.java b/core/src/main/java/org/elasticsearch/search/internal/InternalSearchHits.java
index 592d4b0751..ab4f84c2d1 100644
--- a/core/src/main/java/org/elasticsearch/search/internal/InternalSearchHits.java
+++ b/core/src/main/java/org/elasticsearch/search/internal/InternalSearchHits.java
@@ -40,54 +40,6 @@ import static org.elasticsearch.search.internal.InternalSearchHit.readSearchHit;
*/
public class InternalSearchHits implements SearchHits {
- public static class StreamContext {
-
- public static enum ShardTargetType {
- STREAM,
- LOOKUP,
- NO_STREAM
- }
-
- private IdentityHashMap<SearchShardTarget, Integer> shardHandleLookup = new IdentityHashMap<>();
- private IntObjectHashMap<SearchShardTarget> handleShardLookup = new IntObjectHashMap<>();
- private ShardTargetType streamShardTarget = ShardTargetType.STREAM;
-
- public StreamContext reset() {
- shardHandleLookup.clear();
- handleShardLookup.clear();
- streamShardTarget = ShardTargetType.STREAM;
- return this;
- }
-
- public IdentityHashMap<SearchShardTarget, Integer> shardHandleLookup() {
- return shardHandleLookup;
- }
-
- public IntObjectHashMap<SearchShardTarget> handleShardLookup() {
- return handleShardLookup;
- }
-
- public ShardTargetType streamShardTarget() {
- return streamShardTarget;
- }
-
- public StreamContext streamShardTarget(ShardTargetType streamShardTarget) {
- this.streamShardTarget = streamShardTarget;
- return this;
- }
- }
-
- private static final ThreadLocal<StreamContext> cache = new ThreadLocal<StreamContext>() {
- @Override
- protected StreamContext initialValue() {
- return new StreamContext();
- }
- };
-
- public static StreamContext streamContext() {
- return cache.get().reset();
- }
-
public static InternalSearchHits empty() {
// We shouldn't use static final instance, since that could directly be returned by native transport clients
return new InternalSearchHits(EMPTY, 0, 0);
@@ -186,11 +138,6 @@ public class InternalSearchHits implements SearchHits {
return builder;
}
- public static InternalSearchHits readSearchHits(StreamInput in, StreamContext context) throws IOException {
- InternalSearchHits hits = new InternalSearchHits();
- hits.readFrom(in, context);
- return hits;
- }
public static InternalSearchHits readSearchHits(StreamInput in) throws IOException {
InternalSearchHits hits = new InternalSearchHits();
@@ -200,63 +147,27 @@ public class InternalSearchHits implements SearchHits {
@Override
public void readFrom(StreamInput in) throws IOException {
- readFrom(in, streamContext().streamShardTarget(StreamContext.ShardTargetType.LOOKUP));
- }
-
- public void readFrom(StreamInput in, StreamContext context) throws IOException {
totalHits = in.readVLong();
maxScore = in.readFloat();
int size = in.readVInt();
if (size == 0) {
hits = EMPTY;
} else {
- if (context.streamShardTarget() == StreamContext.ShardTargetType.LOOKUP) {
- // read the lookup table first
- int lookupSize = in.readVInt();
- for (int i = 0; i < lookupSize; i++) {
- context.handleShardLookup().put(in.readVInt(), new SearchShardTarget(in));
- }
- }
-
hits = new InternalSearchHit[size];
for (int i = 0; i < hits.length; i++) {
- hits[i] = readSearchHit(in, context);
+ hits[i] = readSearchHit(in);
}
}
}
@Override
public void writeTo(StreamOutput out) throws IOException {
- writeTo(out, streamContext().streamShardTarget(StreamContext.ShardTargetType.LOOKUP));
- }
-
- public void writeTo(StreamOutput out, StreamContext context) throws IOException {
out.writeVLong(totalHits);
out.writeFloat(maxScore);
out.writeVInt(hits.length);
if (hits.length > 0) {
- if (context.streamShardTarget() == StreamContext.ShardTargetType.LOOKUP) {
- // start from 1, 0 is for null!
- int counter = 1;
- for (InternalSearchHit hit : hits) {
- if (hit.shard() != null) {
- Integer handle = context.shardHandleLookup().get(hit.shard());
- if (handle == null) {
- context.shardHandleLookup().put(hit.shard(), counter++);
- }
- }
- }
- out.writeVInt(context.shardHandleLookup().size());
- if (!context.shardHandleLookup().isEmpty()) {
- for (Map.Entry<SearchShardTarget, Integer> entry : context.shardHandleLookup().entrySet()) {
- out.writeVInt(entry.getValue());
- entry.getKey().writeTo(out);
- }
- }
- }
-
for (InternalSearchHit hit : hits) {
- hit.writeTo(out, context);
+ hit.writeTo(out);
}
}
}
diff --git a/core/src/main/java/org/elasticsearch/search/suggest/completion/CompletionSuggestion.java b/core/src/main/java/org/elasticsearch/search/suggest/completion/CompletionSuggestion.java
index 8453371078..c86c056522 100644
--- a/core/src/main/java/org/elasticsearch/search/suggest/completion/CompletionSuggestion.java
+++ b/core/src/main/java/org/elasticsearch/search/suggest/completion/CompletionSuggestion.java
@@ -27,7 +27,6 @@ import org.elasticsearch.common.text.Text;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.search.internal.InternalSearchHit;
import org.elasticsearch.search.internal.InternalSearchHits;
-import org.elasticsearch.search.internal.InternalSearchHits.StreamContext.ShardTargetType;
import org.elasticsearch.search.suggest.Suggest;
import java.io.IOException;
@@ -261,8 +260,7 @@ public final class CompletionSuggestion extends Suggest.Suggestion<CompletionSug
super.readFrom(in);
this.doc = Lucene.readScoreDoc(in);
if (in.readBoolean()) {
- this.hit = InternalSearchHit.readSearchHit(in,
- InternalSearchHits.streamContext().streamShardTarget(ShardTargetType.STREAM));
+ this.hit = InternalSearchHit.readSearchHit(in);
}
int contextSize = in.readInt();
this.contexts = new LinkedHashMap<>(contextSize);
@@ -283,7 +281,7 @@ public final class CompletionSuggestion extends Suggest.Suggestion<CompletionSug
Lucene.writeScoreDoc(out, doc);
if (hit != null) {
out.writeBoolean(true);
- hit.writeTo(out, InternalSearchHits.streamContext().streamShardTarget(ShardTargetType.STREAM));
+ hit.writeTo(out);
} else {
out.writeBoolean(false);
}
diff --git a/core/src/test/java/org/elasticsearch/search/internal/InternalSearchHitTests.java b/core/src/test/java/org/elasticsearch/search/internal/InternalSearchHitTests.java
index dedd47d3e4..20216e1059 100644
--- a/core/src/test/java/org/elasticsearch/search/internal/InternalSearchHitTests.java
+++ b/core/src/test/java/org/elasticsearch/search/internal/InternalSearchHitTests.java
@@ -32,6 +32,7 @@ import java.util.HashMap;
import java.util.Map;
import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
public class InternalSearchHitTests extends ESTestCase {
@@ -63,19 +64,15 @@ public class InternalSearchHitTests extends ESTestCase {
InternalSearchHits hits = new InternalSearchHits(new InternalSearchHit[]{hit1, hit2}, 2, 1f);
- InternalSearchHits.StreamContext context = new InternalSearchHits.StreamContext();
- context.streamShardTarget(InternalSearchHits.StreamContext.ShardTargetType.STREAM);
BytesStreamOutput output = new BytesStreamOutput();
- hits.writeTo(output, context);
+ hits.writeTo(output);
InputStream input = output.bytes().streamInput();
- context = new InternalSearchHits.StreamContext();
- context.streamShardTarget(InternalSearchHits.StreamContext.ShardTargetType.STREAM);
- InternalSearchHits results = InternalSearchHits.readSearchHits(new InputStreamStreamInput(input), context);
+ InternalSearchHits results = InternalSearchHits.readSearchHits(new InputStreamStreamInput(input));
assertThat(results.getAt(0).shard(), equalTo(target));
- assertThat(results.getAt(0).getInnerHits().get("1").getAt(0).shard(), nullValue());
- assertThat(results.getAt(0).getInnerHits().get("1").getAt(0).getInnerHits().get("1").getAt(0).shard(), nullValue());
- assertThat(results.getAt(0).getInnerHits().get("1").getAt(1).shard(), nullValue());
- assertThat(results.getAt(0).getInnerHits().get("2").getAt(0).shard(), nullValue());
+ assertThat(results.getAt(0).getInnerHits().get("1").getAt(0).shard(), notNullValue());
+ assertThat(results.getAt(0).getInnerHits().get("1").getAt(0).getInnerHits().get("1").getAt(0).shard(), notNullValue());
+ assertThat(results.getAt(0).getInnerHits().get("1").getAt(1).shard(), notNullValue());
+ assertThat(results.getAt(0).getInnerHits().get("2").getAt(0).shard(), notNullValue());
assertThat(results.getAt(1).shard(), equalTo(target));
}
diff --git a/core/src/test/java/org/elasticsearch/search/sort/FieldSortIT.java b/core/src/test/java/org/elasticsearch/search/sort/FieldSortIT.java
index 78c77e15f3..6d1c64437d 100644
--- a/core/src/test/java/org/elasticsearch/search/sort/FieldSortIT.java
+++ b/core/src/test/java/org/elasticsearch/search/sort/FieldSortIT.java
@@ -122,7 +122,6 @@ public class FieldSortIT extends ESIntegTestCase {
}
}
- @LuceneTestCase.BadApple(bugUrl = "simon is working on this")
public void testIssue6614() throws ExecutionException, InterruptedException {
List<IndexRequestBuilder> builders = new ArrayList<>();
boolean strictTimeBasedIndices = randomBoolean();