Return to site

Elasticsearch Indexing Data Flow

How docs get into the cluster

Elasticsearch® is a very powerful and flexible distributed data system, accepting and indexing billions of documents, making them available in near-real time for search, aggregation, and analyses. This article is about how that's done, focusing on basic new data inserts and the data flow from request all the way down to the disk.

Indexing is a relatively simple high-level process, consisting of:

  • Data arrival via the API

  • Routing to the right Index, Shard, and Node

  • Mapping, Normalization, and Analyses

  • Storage in memory and on disk

  • Making it available for search

However, the actual process is quite a bit more complicated, especially given the distributed nature of the cluster and its data, the high data rates involved, and the parallel nature of everything going on at once. Plus it all has to be as reliable and scalable as possible. This is the magic of Elasticsearch.

Let’s look at the steps in more detail.

Arrival & Batching

Elasticsearch first learns about incoming data to index when it arrives via the Index APIs. Clients such as Logstash, the Beats, or even cURL send data to the cluster’s nodes for processing. They can send one document at a time, but usually use the bulk API to send data in batches for less overhead and faster processing. Batches are just groups of documents sent in one API call, and don’t need to be related, i.e they can include data destined for several different indexes.

Ingest data can be sent to any node, though larger clusters often use dedicated coordinating nodes (more for search than ingest), or even dedicated ingest nodes, which can run data pipelines to pre-process the data. Whatever node the data arrives at will be the coordinating node for this batch, and will route the data to the right place, even though the actual ingest work is executed on the data nodes holding the target index data.

Pipelines & Data Streams

Data most commonly arrives for a single normal index, but can also be routed to a data stream and/or an ingest pipeline. Data streams are an X-Pack feature mostly commonly used to handle time series data such as metrics and logs, and essentially resolves to an actual backing index for this ingest process. Pipelines are a collection of processors for manipulating document data before it’s indexed.

If the request or batch includes a pipeline and the coordinating node is not also an ingest node, it appears it’ll get routed to an ingest node first, then goes on to the primary node. It’s not clear if the coordinator or ingest node sends it to the primary node, but probably the coordinating node, i.e. the ingest node runs the pipeline(s) and then returns the document to the coordinator node for the next step.

Routing

Once data arrives at a coordinating node, each document must be routed to the right index, shard, and nodes for ingesting. Since a bulk request may contain data for many indexes, and multiple documents for a single index may go to separate shards, the routing step is run per-document and is very important to get each one to the right place. This process starts the “Coordinating Stage”.

The first step for each document is for the coordinating to use the supplied index, alias, data stream, etc. to determine the actual target index the document will be going to. If the index does not exist, it will be created, and then the process can continue. Note that Elasticsearch attempts to first create all indices needed by the bulk request before it does any indexing.

After the coordinating node knows the target index, it runs a routing process to choose the index’s shard for the document. Routing can get complex, and by default is driven by the document ID, which by default is auto-generated by the coordinating node.

If you wish, clients can specify their own IDs, and also control what field is used for routing, such as timestamps, users, source devices, etc. as clustering strategies to co-locate related (and quickly queryable) data in a single shard. In addition, indexes can have custom routing that forces documents to specific shards. But generally, each document will get randomly distributed among its target index’s shards.

The result of the routing process will be the target shard and its shard ID, but we have to keep in mind that shard may have replicas. If there are replicas, the coordinating node will also include them in the routing list, so the result is a list of all this document’s shards: a primary shard and replicas. The coordinating node then looks up these shards' node IDs so it knows where to route the document for indexing.

Indexing Stages

Once the coordinating node knows a document’s target primary shard and that shard’s node, the document is sent to that node for primary indexing, as part of the “Primary Stage”. The primary shard validates the requests, then indexes it locally, which also validates the mapping, fields, etc. That process is described more ind detail, below.

If the primary node's indexing succeeds, the primary shard node (not the coordinator node) sends the document to all the "in-sync" “active" replicas in parallel, which is the “Replica Stage". The primary shard node waits for all of the replicas to complete indexing, and then returns a result to the waiting coordinating node. Once all docs in the batch have been indexed (or failed), the coordinator then returns the results to the original API caller, the client.

Every document is indexed separately by every one of its primary & replica shards.

It’s important to understand every document is indexed separately by EVERY shard it will live on, and ALL of these must complete before ita given document is ‘indexed'. So if an index has a replica count of 3, that means every document will go to, and be separately indexed by, four shards (primary and three replicas), all on separate nodes.

There is no real pre-processing or central indexing in Elasticsearch, and the ‘work’ done by the cluster goes up linearly with the number of replicas for a given index. This is usually where most indexing latency happens, as it can only complete as slow as the slowest node and shard.

The coordinator node parallelizes documents in a batch as much as it can. It sends documents to their routed primary shards in parallel, but appears to only queue one request per primary shard. So if the batch has 10 documents for a single index that has a single shard, these will all be processed in order, one at a time. But if the batch has 10 documents for two indexes, each with 5 shards and the routing results in one document per shard, all 10 will be done in parallel. This is one way additional primary shards speeds up processing.

Shard-Level Indexing

Once a document arrives at a given node that owns a shard it will live in, the actual document indexing is done. The first step is to write the document to the translog so it has a durable copy in case of a node crash after this point. The translog is an Elasticsearch feature providing durability above and beyond that Lucene can do on its own, and is key to a reliable system. If the node crashes before the actual indexing in complete, upon restart Elasticsearch will replay the document into the indexing process to make sure it gets processed.

The actual indexing process has several steps:

  • Map Documents Fields in Elasticsearch

  • Analyze in Lucene

  • Add to the inverted index in Lucene

First, the node maps the document’s fields via the index’s template, which specifies how each field is handled, such as types but also includes analyzers and other options. Since every document can have different fields and data, this mapping step is important and is where errors often occur, as the field types don’t match, are out of bounds, etc. This work is done at the Elasticsearch level, since Lucene has no notion of templates or maps. A Lucene document is just a set of fields, each of which has a name, type, and a value.

Second, the document is passed to Lucene which will ‘analyze’ it. In practice, this means running the configured analyzer on it, and each analyzer can have a number of steps and components, including tokenizing and filtering that together can do lots of powerful things.

Tokenization is splitting the data in each field into tokens, such as on whitespace to separate words, while filtering includes a broad range of things aside from basic filtering out things, plus lowercasing the text, removing stop words, and normalization via stemming (i.e. changing words to their ‘normal’ version, e.g. dogs becomes dog, watched becomes watch, etc.)

Finally, Lucene takes the results and builds its storage records for this document. This generally includes every field in the document, plus special fields such as _source and _all which can be used for reindexing, etc. along with the all-important inverted index itself.

 

Lucene writes all this to an in-memory segment buffer and then returns to the coordinating node as successful. Once this is done on all the replica shards, this document’s indexing is essentially complete from the coordinator node or client’s point-of-view.

Getting Document Data on Disk & Searchable

The just indexed document is only in the temporary multi-document segment in memory, and is not yet on disk, and also not yet available for searching. Two separate processes are running behind the scenes to make those two things happen.

Indexed data is not yet on disk & also not yet searchable.

The first process is “refreshing’ to make the data available for searching. The refresh interval is set per-index and defaults to 1s. Many users set this much higher, such as 30-60s, because it's an expensive operation and doing it once per second can reduce overall indexing throughput. Note that indices not often being searched will not auto-refresh until they are searched, to improve bulk index speed.

At the refresh interval, the memory buffer's segment(s) are merged and written to a single new segment on the file system, and the data is available for searching. But, while this segment now exists on the file system, it’s mostly in the file cache and may not actually be on disk, which is a problem if there is a crash at this point. The data is available, but not safe, though if there is a crash, the translog still exists and will be played back, and the document will be indexed again.

To make the data safe on disk, there is a separate Elasticsearch flush process that does a Lucene commit, which merges and fsyncs the above segments, ensuring they really are on disk. Once that completes, Elasticsearch truncates the translog, since the data is now safely on disk and won’t be lost in a crash. Elasticsearch schedules these flushes based on translog size (default maximum is 512MB) to help keep recovery times reasonable.

Essentially, the translog maintains reliability for all new document changes, and between Elasticsearch flushes/Lucene commits. Note the translog has its own reliability settings, including the default of being fsync’d to disk every 5s.

 

Also separately, Elasticsearch runs background threads to continue merging segments as it can, using a tiered merge policy that tries to minimize segment count (since they are searched in sequence) while not slowing down overall real-time indexing and searching performance. This is separate from all the above processes.

The overall result is that, at any given time, any particular available index consists of a set of permanent segments of varying size on disk, and some new segments in the file cache. Plus indexed but not-yet-available segments that are in memory only, awaiting the refresh interval.

Problems

Elasticsearch indexing is a very nice, though complex, distributed process that balances high-performance, data reliability, and powerful functionality. And while it works very well, things can and do go wrong. Some problems are in the documents themselves, while others are on the cluster side of things.

Cluster-level issues are usually around shard loss or movement during the process. The normal flow goes from coordinator node to primary node to replica nodes, but what happens if the primary changes during the process, or a replica is lost? There are a variety of complex retry, timeout, and routing processes that try to save the document though they can of course fail, as which point the client has to try again.

Some of these, like a replica timing out or failing will cause that shard to be declared out of sync and invalid, changing the index status to yellow and scheduling a replica rebuild. Others like a network partition will cause the primary itself to be declared invalid, which it will discover when it tries to talk to replicas. Overall, it’s a complex but robust system.

References

There are several good articles about this process, but most are many years and the best cover much earlier versions of Elasticsearch, though the processes are still generally the same:

About ELKman

ELKman is our ELK Stack management tool, helping you manage, tune, audit, and optimize your Elasticsearch systems, including seeing the above processes in action. More at ELKMan.io

Copyright 2020 by Wapiti Systems, Inc.
ELK Manager is not affiliated in any way with ElasticSearch BV.
ElasticSearch®, Logstash®, Kibana® and Beats® are registered trademarks of Elasticsearch BV.