Elasticsearch® is a very powerful and flexible distributed data system, primarily focused on searching and analyzing billions of documents. This article is about how that's done, focusing on query and data flow through the cluster, from the disk up to all the shards, indices, nodes, analyses, filters and more.
This blog is about how searching works at a fairly deep level, and our goal is to walk through the process from search request to a results reply, including routing queries to shards, analyzers, mapping, aggregations, and coordination. Some of these items are from the documentation or code, others from others’ writings, and some is guesswork, as not everything is clear nor well-documented.
Basic Search Data Flow
The basic search data flow is as follows:
- Arrival at Coordinator
- Index list & aliases
- Route to Shards
- Actual Search
- Assemble Doc List
- Fetch Documents
- Sort & Aggregation
- Return Results
However, the actual process is quite a bit more complicated, especially given the distributed nature of the cluster and its data, the high search rates involved, and the parallel nature of everything going on at once. Plus it all has to be as reliable and scalable as possible. This is the magic of Elasticsearch.
We'll look at each of these in more depth, below.
Elasticsearch first learns about a search query when it arrives via the various search APIs. Clients such as Kibana, apps, or even cURL send search requests to the cluster’s nodes for processing. There are several APIs and options, but nearly all end up in what is essentially a search, though with more or less complexity and resource requirements.
Search requests can be sent to any node, though larger clusters often use dedicated coordinating nodes with enough CPU and RAM to manage high search volume and limit the impact of coordinating on data or other nodes. Whatever node the query arrives at will be the coordinating node for this query, and will route the data to the right place, even though most of the actual search work is executed on the data nodes holding the source indices' data.
Once a query arrives at a coordinating node, it must be routed to the right indices, shards, and nodes for the search. Since a query request may cover many indices and shards, the routing step is very important to get each one to the right place.
First, the coordinator builds a target index list from the query index pattern or alias. This is often a single index but can be a pattern like “logsash-*” or alias that points to an index or pattern. The result is an actual index list that the query needs to search against.
The coordinator then builds list of all the target indices’ distinct shards. This can be confusing, since in Elasticsearch, a distinct shard (with Shard ID) is actually a set of a single primary and its optional replica copies.
Thus an index with 5 shards and 2 replicas will have 15 total shards, but only 5 distinct shards, each with an ID starting with 0, so 0-4 for this case. Each of those will have 3 shards: a primary and two replicas. A given primary and its replicas share the same shard ID, just with primaryOrReplica set to ‘p’ or ‘r’ in the shard list, so you'll see shard: 0/p, 0/r, and second 0/r (each of these also has a unique Allocation ID, which is how Elasticsearch tells them apart internally).
For each Index and based on index routing options, the coordinator decides if the query goes to a single distinct shard or to all of them. Most queries go to all distinct shards, but specific routing can ensure all a query’s documents are in a single distinct shard; if so, the query only goes to that distinct shard.
Balancing & Scattering
Regardless if the query is going to one distinct shard or all of them, for each one involved, the coordinator will choose which actual shard for each one will be queried, the primary or one of the replicas.
Thus, if we’re querying an index with 5 shards and 2 replicas, that’s 5 distinct shards, with 15 total shards. Assuming no configured routing, the actual query will be sent to 5 shards, each chosen among the 3 copies of each distinct shard (1 primary, 2 replicas).
This selection or "balancing" algorithm is mostly random by default, though there are some optimizations including favoring the shard that performs best in recent queries. It can also be controlled on a query basis with the “preference’ option.
The result of the routing process is a list of actual shards to be queried, and from that, the nodes those shards live on, since that’s where the coordinator needs to send the query to be run.
Searching Shards - Query Phase
The shards do the actual searching (and scoring) work. The query phase search is just that, searching for the documents that match the query.
Several things happen for each shard for this search:
- Mapping at Elasticsearch level
- Analysis in Lucene
- Search in Lucene
- Scoring in Lucene
The mapping is similar to the mapping at index time, where Elasticsearch maps the query fields to the underlying Lucene data fields and structures to create a Lucene-compatible query that each segment (actually a Lucene index) can execute. It appears this mapping and conversion to the Lucene query is done by each shard, similar to indexing where mapping is done by each.
The analysis is exactly the same as at index time, where the text parts of the query are run through the same analyzers, for example tokenizing the text, converting to lowercases, and stemming, etc. That way the query text will best match the way the documents were indexed.
A shard-level search is actually a series of segment-level searches that are merged together (which is why fewer segments generally performs better). Since the segments are doing the real search work, most of the caches are also at the segment level, which is how you’ll see them in cluster and node stats.
The actual search process details at the segment level depend on the type of query and what’s required. This can vary widely from simple term searched for something like name = “bob” to complex multi-field full text searches in various languages.
The results of any of these searches is generally a list of document IDs, which may optionally be scored and sorted for relevance. This list is called a priority queue and its size size depends on the query, and defaults to 10, but if the query uses normal pagination, it will be 'from+size’ which can use lots of RAM when paging deeply.
Scoring is itself a complicated area that takes more resources than non-scored queries, especially if DTS mode is used to improve global scoring results. We'll leave Lucene scoring for other blogs.
Sorting by any document field (i.e. not the score) is done via doc values, since the inverted index is poorly-suited for this. Doc values are Lucene’s serialized column data store that packs all of a field’s data together so lots of value can be read quickly, which is perfect for aggregations, and useful for sorting. They are enabled by default for all fields except analyzed strings.
Aggregations are more complicated as they need a way to access all the matching documents, i.e. they can’t use a short list. They also work from ‘doc values,’ not the inverted index. The process varies by the aggregation type, and in some cases, such as terms count, the shard returns a whole aggregation size set for its documents, and the coordinator will merge them together.
For example, for a terms count of size 100, each shard returns 160 terms, and the coordinator will merge & sort them down to a final 100 for the client. This per-shard count can be adjusted via the shard_size parameter, and defaults to (size * 1.5 + 10), or 160 for a size of 100.
For metrics aggregations, such as averages, it needs all the matching documents and their field data. It’s not clear how this is done, but presumably each shard provides its own average and count, which the coordinating node can then combine. Min/Max and others are likely processed similarly.
Note for the search process segments have their own caches for several things:
- Filter Cache - Segment cache of document IDs for a given filter. This greatly speeds searching and is why filters are popular.
- Field Cache - Segment cache of field data values. Used mostly later during the fetch phase.
- Page Cache - Outside of Elasticsearch, of course, for the segment data.
The shard also maintains a query cache, so it can return results for the same query in the future. But this cache is invalidated every time the index refreshes (default 1s, more commonly 30-60s) if the index has actually changed, so while less useful for indexing-heavy indexes, it can still help on heavily-searched indexes. Note this cache is shared by all shards on a given node, up to 1% of heap size.
While filters are caches, queries (scored searches) are not, so for queries and any uncached filters or fields, the search has to hit inverted index to build a list of document IDs. The resulting filter results and field data can be cached.
Note that all searches are done from refreshed or committed Index segments, so data is only searched or found once it’s been refreshed. The only exception is when the client does a GET document by ID, in which case it can be pulled from the translog before the index is refreshed. See the blog on Elasticsearch Indexing Dataflow for more details on refreshing and the translog.
Each shard will return its top hits as document IDs, not the whole document. So if we have 5 shards and default size of 10, we’ll get 50 results. If there are multiple indices involved, their shards will return their results also. The coordinator node merges these list to get the actual sorted list and proceeds to get the actual data for them in the collecting phase.
Fetch Phase - Collecting
Once the coordinator node has a final list of document IDs it needs, it will go back to the shards to get the actual data, which until now it has not needed. This is phase 2 or ‘collection’ process and uses multi-document GET requests to the various shards to get the document data, usually as the _source field. Note this is skipped if the client only asked for aggregations (size=0).
Note this is where the coordinating node RAM can get out of control, and is one of the main reasons to have coordinating nodes in the first place. These nodes keep the CPU and RAM resources needed to process, merge, and sort results in a few easily-monitored nodes, importantly keeping these resource-intensive processes away from the master, data, and ML nodes doing other important work.
For example, in deep paging, the number of documents coming back will be the the page 'from + size', so a deep page from many indices and shards will collect "number_of_shards * (from + size)” documents, which can get quite large, eating up all the heap. In this case, users generally use a scroll query instead. Large document sizes and lists can likewise cause elevated RAM use.
Aggregations are generally built from the aggregation results returned from the shards, and there appears to be no fetch phase for aggregations, though if the query size>0, the coordinator will still go fetch the underlying document data for the client.
Once the coordinating node has all the documents and their data and/or aggregations, it builds the final results, enhanced them if needed with metadata and other elements, and returns them to the caller and the process completes.
Elasticsearch searching is very fast and powerful, though it’s a complex, distributed process that balances high-performance, accuracy, and functionality. And more recently, stability, i.e. not letting big queries blow up the cluster. While it works very well, things can and do go wrong.
Of course, any data system can run out of key resources, especially CPU and disk IO bandwidth. Elasticsearch is quite dependent on both, but being distributed, it’s often easy to add more of each, as needed.
The other key resource is RAM, which is where more problems can occur. Lots of work has gone into protecting the system in recent versions, especially with the notion of circuit breakers which limit the RAM individual queries and aggregated operations can consume.
Query-level breakers are also used for various parts of the query, such as field data, to prevent a query from overloading that part of the system (and to provide better reporting on exactly how your query was potentially hurting the cluster).
Query-driven memory related issues often come from field combinations, big aggregations, big documents, deep paging, and more. On a related-note, having large indexes that don't fit in the page cache will cause I/O pressure, which won’t crash the system, but will slow it down.
Other problems include timeouts and loss of shards or nodes during the search process. Generally Elasticsearch will retry these operations with other shards to try to fully answer the client’s query as best it can. Note by default Elasticsearch will return partial results if there are internal timeouts or shard failures.
Elasticsearch is very nice and powerful system, capable of rapidly and flexibly searching billions of documents via a simple interface. From this blog, you can see how the request and data moves around the cluster to get from the disk to the client.
There are several good articles about this process, but most are many years and the best cover much earlier versions of Elasticsearch, though the processes are still generally the same: