/* * Licensed to Elasticsearch under one or more contributor * license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright * ownership. Elasticsearch licenses this file to you under * the Apache License, Version 2.0 (the "License"); you may * not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ package org.elasticsearch.index.translog; import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.logging.log4j.util.Supplier; import org.apache.lucene.index.Term; import org.apache.lucene.index.TwoPhaseCommit; import org.apache.lucene.store.AlreadyClosedException; import org.apache.lucene.util.IOUtils; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.bytes.ReleasablePagedBytesReference; import org.elasticsearch.common.io.stream.ReleasableBytesStreamOutput; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.lease.Releasables; import org.elasticsearch.common.lucene.uid.Versions; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.util.concurrent.ReleasableLock; import org.elasticsearch.index.VersionType; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.seqno.SequenceNumbersService; import org.elasticsearch.index.shard.AbstractIndexShardComponent; import org.elasticsearch.index.shard.IndexShardComponent; import java.io.Closeable; import java.io.EOFException; import java.io.IOException; import java.nio.channels.FileChannel; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.StandardCopyOption; import java.nio.file.StandardOpenOption; import java.util.ArrayList; import java.util.List; import java.util.Optional; import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.LongSupplier; import java.util.regex.Matcher; import java.util.regex.Pattern; import java.util.stream.Collectors; import java.util.stream.Stream; /** * A Translog is a per index shard component that records all non-committed index operations in a durable manner. * In Elasticsearch there is one Translog instance per {@link org.elasticsearch.index.engine.InternalEngine}. The engine * records the current translog generation {@link Translog#getGeneration()} in it's commit metadata using {@link #TRANSLOG_GENERATION_KEY} * to reference the generation that contains all operations that have not yet successfully been committed to the engines lucene index. * Additionally, since Elasticsearch 2.0 the engine also records a {@link #TRANSLOG_UUID_KEY} with each commit to ensure a strong association * between the lucene index an the transaction log file. This UUID is used to prevent accidental recovery from a transaction log that belongs to a * different engine. *
* Each Translog has only one translog file open at any time referenced by a translog generation ID. This ID is written to a translog.ckp file that is designed * to fit in a single disk block such that a write of the file is atomic. The checkpoint file is written on each fsync operation of the translog and records the number of operations * written, the current translogs file generation and it's fsynced offset in bytes. *
** When a translog is opened the checkpoint is use to retrieve the latest translog file generation and subsequently to open the last written file to recovery operations. * The {@link org.elasticsearch.index.translog.Translog.TranslogGeneration}, given when the translog is opened / constructed is compared against * the latest generation and all consecutive translog files singe the given generation and the last generation in the checkpoint will be recovered and preserved until the next * generation is committed using {@link Translog#commit()}. In the common case the translog file generation in the checkpoint and the generation passed to the translog on creation are * the same. The only situation when they can be different is when an actual translog commit fails in between {@link Translog#prepareCommit()} and {@link Translog#commit()}. In such a case * the currently being committed translog file will not be deleted since it's commit was not successful. Yet, a new/current translog file is already opened at that point such that there is more than * one translog file present. Such an uncommitted translog file always has a translog-${gen}.ckp associated with it which is an fsynced copy of the it's last translog.ckp such that in * disaster recovery last fsynced offsets, number of operation etc. are still preserved. *
*/ public class Translog extends AbstractIndexShardComponent implements IndexShardComponent, Closeable, TwoPhaseCommit { /* * TODO * - we might need something like a deletion policy to hold on to more than one translog eventually (I think sequence IDs needs this) but we can refactor as we go * - use a simple BufferedOutputStream to write stuff and fold BufferedTranslogWriter into it's super class... the tricky bit is we need to be able to do random access reads even from the buffer * - we need random exception on the FileSystem API tests for all this. * - we need to page align the last write before we sync, we can take advantage of ensureSynced for this since we might have already fsynced far enough */ public static final String TRANSLOG_GENERATION_KEY = "translog_generation"; public static final String TRANSLOG_UUID_KEY = "translog_uuid"; public static final String TRANSLOG_FILE_PREFIX = "translog-"; public static final String TRANSLOG_FILE_SUFFIX = ".tlog"; public static final String CHECKPOINT_SUFFIX = ".ckp"; public static final String CHECKPOINT_FILE_NAME = "translog" + CHECKPOINT_SUFFIX; static final Pattern PARSE_STRICT_ID_PATTERN = Pattern.compile("^" + TRANSLOG_FILE_PREFIX + "(\\d+)(\\.tlog)$"); // the list of translog readers is guaranteed to be in order of translog generation private final Listtrue
iff this call caused an actual sync operation otherwise false
*/
public boolean ensureSynced(Location location) throws IOException {
try (ReleasableLock lock = readLock.acquire()) {
if (location.generation == current.getGeneration()) { // if we have a new one it's already synced
ensureOpen();
return current.syncUpTo(location.translogLocation + location.size);
}
} catch (Exception ex) {
try {
closeOnTragicEvent(ex);
} catch (Exception inner) {
ex.addSuppressed(inner);
}
throw ex;
}
return false;
}
/**
* Ensures that all locations in the given stream have been synced / written to the underlying storage.
* This method allows for internal optimization to minimize the amount of fsync operations if multiple
* locations must be synced.
*
* @return Returns true
iff this call caused an actual sync operation otherwise false
*/
public boolean ensureSynced(Streamnull
if we reached the end.
*/
Translog.Operation next() throws IOException;
}
/**
* A generic interface representing an operation performed on the transaction log.
* Each is associated with a type.
*/
public interface Operation extends Writeable {
enum Type {
@Deprecated
CREATE((byte) 1),
INDEX((byte) 2),
DELETE((byte) 3),
NO_OP((byte) 4);
private final byte id;
Type(byte id) {
this.id = id;
}
public byte id() {
return this.id;
}
public static Type fromId(byte id) {
switch (id) {
case 1:
return CREATE;
case 2:
return INDEX;
case 3:
return DELETE;
case 4:
return NO_OP;
default:
throw new IllegalArgumentException("No type mapped for [" + id + "]");
}
}
}
Type opType();
long estimateSize();
Source getSource();
long seqNo();
/**
* Reads the type and the operation from the given stream. The operation must be written with
* {@link Operation#writeType(Operation, StreamOutput)}
*/
static Operation readType(StreamInput input) throws IOException {
Translog.Operation.Type type = Translog.Operation.Type.fromId(input.readByte());
switch (type) {
case CREATE:
// the deserialization logic in Index was identical to that of Create when create was deprecated
return new Index(input);
case DELETE:
return new Delete(input);
case INDEX:
return new Index(input);
case NO_OP:
return new NoOp(input);
default:
throw new IOException("No type for [" + type + "]");
}
}
/**
* Writes the type and translog operation to the given stream
*/
static void writeType(Translog.Operation operation, StreamOutput output) throws IOException {
output.writeByte(operation.opType().id());
operation.writeTo(output);
}
}
public static class Source {
public final BytesReference source;
public final String routing;
public final String parent;
public Source(BytesReference source, String routing, String parent) {
this.source = source;
this.routing = routing;
this.parent = parent;
}
}
public static class Index implements Operation {
public static final int FORMAT_2_X = 6; // since 2.0-beta1 and 1.1
public static final int FORMAT_AUTO_GENERATED_IDS = FORMAT_2_X + 1; // since 5.0.0-beta1
public static final int FORMAT_SEQ_NO = FORMAT_AUTO_GENERATED_IDS + 1; // since 6.0.0
public static final int SERIALIZATION_FORMAT = FORMAT_SEQ_NO;
private final String id;
private final long autoGeneratedIdTimestamp;
private final String type;
private long seqNo = SequenceNumbersService.UNASSIGNED_SEQ_NO;
private long primaryTerm = 0;
private final long version;
private final VersionType versionType;
private final BytesReference source;
private final String routing;
private final String parent;
public Index(StreamInput in) throws IOException {
final int format = in.readVInt(); // SERIALIZATION_FORMAT
assert format >= FORMAT_2_X : "format was: " + format;
id = in.readString();
type = in.readString();
source = in.readBytesReference();
routing = in.readOptionalString();
parent = in.readOptionalString();
this.version = in.readLong();
if (format < FORMAT_SEQ_NO) {
in.readLong(); // timestamp
in.readLong(); // ttl
}
this.versionType = VersionType.fromValue(in.readByte());
assert versionType.validateVersionForWrites(this.version) : "invalid version for writes: " + this.version;
if (format >= FORMAT_AUTO_GENERATED_IDS) {
this.autoGeneratedIdTimestamp = in.readLong();
} else {
this.autoGeneratedIdTimestamp = IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP;
}
if (format >= FORMAT_SEQ_NO) {
seqNo = in.readLong();
primaryTerm = in.readLong();
}
}
public Index(Engine.Index index, Engine.IndexResult indexResult) {
this.id = index.id();
this.type = index.type();
this.source = index.source();
this.routing = index.routing();
this.parent = index.parent();
this.seqNo = indexResult.getSeqNo();
this.primaryTerm = index.primaryTerm();
this.version = indexResult.getVersion();
this.versionType = index.versionType();
this.autoGeneratedIdTimestamp = index.getAutoGeneratedIdTimestamp();
}
public Index(String type, String id, byte[] source) {
this.type = type;
this.id = id;
this.source = new BytesArray(source);
this.seqNo = 0;
version = Versions.MATCH_ANY;
versionType = VersionType.INTERNAL;
routing = null;
parent = null;
autoGeneratedIdTimestamp = -1;
}
@Override
public Type opType() {
return Type.INDEX;
}
@Override
public long estimateSize() {
return ((id.length() + type.length()) * 2) + source.length() + 12;
}
public String type() {
return this.type;
}
public String id() {
return this.id;
}
public String routing() {
return this.routing;
}
public String parent() {
return this.parent;
}
public BytesReference source() {
return this.source;
}
@Override
public long seqNo() {
return seqNo;
}
public long primaryTerm() {
return primaryTerm;
}
public long version() {
return this.version;
}
public VersionType versionType() {
return versionType;
}
@Override
public Source getSource() {
return new Source(source, routing, parent);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVInt(SERIALIZATION_FORMAT);
out.writeString(id);
out.writeString(type);
out.writeBytesReference(source);
out.writeOptionalString(routing);
out.writeOptionalString(parent);
out.writeLong(version);
out.writeByte(versionType.getValue());
out.writeLong(autoGeneratedIdTimestamp);
out.writeLong(seqNo);
out.writeLong(primaryTerm);
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
Index index = (Index) o;
if (version != index.version ||
seqNo != index.seqNo ||
primaryTerm != index.primaryTerm ||
id.equals(index.id) == false ||
type.equals(index.type) == false ||
versionType != index.versionType ||
autoGeneratedIdTimestamp != index.autoGeneratedIdTimestamp ||
source.equals(index.source) == false) {
return false;
}
if (routing != null ? !routing.equals(index.routing) : index.routing != null) {
return false;
}
return !(parent != null ? !parent.equals(index.parent) : index.parent != null);
}
@Override
public int hashCode() {
int result = id.hashCode();
result = 31 * result + type.hashCode();
result = 31 * result + Long.hashCode(seqNo);
result = 31 * result + Long.hashCode(primaryTerm);
result = 31 * result + Long.hashCode(version);
result = 31 * result + versionType.hashCode();
result = 31 * result + source.hashCode();
result = 31 * result + (routing != null ? routing.hashCode() : 0);
result = 31 * result + (parent != null ? parent.hashCode() : 0);
result = 31 * result + Long.hashCode(autoGeneratedIdTimestamp);
return result;
}
@Override
public String toString() {
return "Index{" +
"id='" + id + '\'' +
", type='" + type + '\'' +
'}';
}
public long getAutoGeneratedIdTimestamp() {
return autoGeneratedIdTimestamp;
}
}
public static class Delete implements Operation {
private static final int FORMAT_5_X = 2;
private static final int FORMAT_SEQ_NO = FORMAT_5_X + 1;
public static final int SERIALIZATION_FORMAT = FORMAT_SEQ_NO;
private Term uid;
private long seqNo = SequenceNumbersService.UNASSIGNED_SEQ_NO;
private long primaryTerm = 0;
private long version = Versions.MATCH_ANY;
private VersionType versionType = VersionType.INTERNAL;
public Delete(StreamInput in) throws IOException {
final int format = in.readVInt();// SERIALIZATION_FORMAT
assert format >= FORMAT_5_X : "format was: " + format;
uid = new Term(in.readString(), in.readString());
this.version = in.readLong();
this.versionType = VersionType.fromValue(in.readByte());
assert versionType.validateVersionForWrites(this.version);
if (format >= FORMAT_SEQ_NO) {
seqNo = in.readLong();
primaryTerm = in.readLong();
}
}
public Delete(Engine.Delete delete, Engine.DeleteResult deleteResult) {
this(delete.uid(), deleteResult.getSeqNo(), delete.primaryTerm(), deleteResult.getVersion(), delete.versionType());
}
/** utility for testing */
public Delete(Term uid) {
this(uid, 0, 0, Versions.MATCH_ANY, VersionType.INTERNAL);
}
public Delete(Term uid, long seqNo, long primaryTerm, long version, VersionType versionType) {
this.uid = uid;
this.seqNo = seqNo;
this.primaryTerm = primaryTerm;
this.version = version;
this.versionType = versionType;
}
@Override
public Type opType() {
return Type.DELETE;
}
@Override
public long estimateSize() {
return ((uid.field().length() + uid.text().length()) * 2) + 20;
}
public Term uid() {
return this.uid;
}
@Override
public long seqNo() {
return seqNo;
}
public long primaryTerm() {
return primaryTerm;
}
public long version() {
return this.version;
}
public VersionType versionType() {
return this.versionType;
}
@Override
public Source getSource() {
throw new IllegalStateException("trying to read doc source from delete operation");
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVInt(SERIALIZATION_FORMAT);
out.writeString(uid.field());
out.writeString(uid.text());
out.writeLong(version);
out.writeByte(versionType.getValue());
out.writeLong(seqNo);
out.writeLong(primaryTerm);
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
Delete delete = (Delete) o;
return version == delete.version &&
seqNo == delete.seqNo &&
primaryTerm == delete.primaryTerm &&
uid.equals(delete.uid) &&
versionType == delete.versionType;
}
@Override
public int hashCode() {
int result = uid.hashCode();
result = 31 * result + Long.hashCode(seqNo);
result = 31 * result + Long.hashCode(primaryTerm);
result = 31 * result + Long.hashCode(version);
result = 31 * result + versionType.hashCode();
return result;
}
@Override
public String toString() {
return "Delete{" +
"uid=" + uid +
'}';
}
}
public static class NoOp implements Operation {
private final long seqNo;
private final long primaryTerm;
private final String reason;
@Override
public long seqNo() {
return seqNo;
}
public long primaryTerm() {
return primaryTerm;
}
public String reason() {
return reason;
}
NoOp(final StreamInput in) throws IOException {
seqNo = in.readLong();
primaryTerm = in.readLong();
reason = in.readString();
}
public NoOp(final long seqNo, final long primaryTerm, final String reason) {
assert seqNo > SequenceNumbersService.NO_OPS_PERFORMED;
assert primaryTerm >= 0;
assert reason != null;
this.seqNo = seqNo;
this.primaryTerm = primaryTerm;
this.reason = reason;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeLong(seqNo);
out.writeLong(primaryTerm);
out.writeString(reason);
}
@Override
public Type opType() {
return Type.NO_OP;
}
@Override
public long estimateSize() {
return 2 * reason.length() + 2 * Long.BYTES;
}
@Override
public Source getSource() {
throw new UnsupportedOperationException("source does not exist for a no-op");
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null || getClass() != obj.getClass()) {
return false;
}
final NoOp that = (NoOp) obj;
return seqNo == that.seqNo && primaryTerm == that.primaryTerm && reason.equals(that.reason);
}
@Override
public int hashCode() {
return 31 * 31 * 31 + 31 * 31 * Long.hashCode(seqNo) + 31 * Long.hashCode(primaryTerm) + reason().hashCode();
}
}
public enum Durability {
/**
* Async durability - translogs are synced based on a time interval.
*/
ASYNC,
/**
* Request durability - translogs are synced for each high level request (bulk, index, delete)
*/
REQUEST;
}
private static void verifyChecksum(BufferedChecksumStreamInput in) throws IOException {
// This absolutely must come first, or else reading the checksum becomes part of the checksum
long expectedChecksum = in.getChecksum();
long readChecksum = in.readInt() & 0xFFFF_FFFFL;
if (readChecksum != expectedChecksum) {
throw new TranslogCorruptedException("translog stream is corrupted, expected: 0x" +
Long.toHexString(expectedChecksum) + ", got: 0x" + Long.toHexString(readChecksum));
}
}
/**
* Reads a list of operations written with {@link #writeOperations(StreamOutput, List)}
*/
public static Listtrue
iff the given generation is the current generation of this translog
*/
public boolean isCurrent(TranslogGeneration generation) {
try (ReleasableLock lock = writeLock.acquire()) {
if (generation != null) {
if (generation.translogUUID.equals(translogUUID) == false) {
throw new IllegalArgumentException("commit belongs to a different translog: " + generation.translogUUID + " vs. " + translogUUID);
}
return generation.translogFileGeneration == currentFileGeneration();
}
}
return false;
}
long getFirstOperationPosition() { // for testing
return current.getFirstOperationOffset();
}
private void ensureOpen() {
if (closed.get()) {
throw new AlreadyClosedException("translog is already closed", current.getTragicException());
}
}
/**
* The number of currently open views
*/
int getNumOpenViews() {
return outstandingViews.size();
}
ChannelFactory getChannelFactory() {
return FileChannel::open;
}
/**
* If this {@code Translog} was closed as a side-effect of a tragic exception,
* e.g. disk full while flushing a new segment, this returns the root cause exception.
* Otherwise (no tragic exception has occurred) it returns null.
*/
public Exception getTragicException() {
return current.getTragicException();
}
/** Reads and returns the current checkpoint */
static final Checkpoint readCheckpoint(final Path location) throws IOException {
return Checkpoint.read(location.resolve(CHECKPOINT_FILE_NAME));
}
/**
* Reads the sequence numbers global checkpoint from the translog checkpoint.
*
* @param location the location of the translog
* @return the global checkpoint
* @throws IOException if an I/O exception occurred reading the checkpoint
*/
public static final long readGlobalCheckpoint(final Path location) throws IOException {
return readCheckpoint(location).globalCheckpoint;
}
/**
* Returns the translog uuid used to associate a lucene index with a translog.
*/
public String getTranslogUUID() {
return translogUUID;
}
}