ClickHouse Cluster, Replication, and Keeper in Practice
How ClickHouse clusters work in production: ReplicatedMergeTree, distributed tables, sharding, and ClickHouse Keeper vs ZooKeeper for coordination.
ClickHouse
This post was written by an engineer at QueryPlane. QueryPlane is an app builder for your database: bring your own postgres db and you can create interactive applications to share with other developers, coworkers or even your customers. If you’re interested in trying it out, get started here.
A single ClickHouse node will take you surprisingly far — billions of rows, sub-second aggregations, no coordination service to babysit. The trouble starts the day that node becomes a single point of failure, or the day one machine can no longer hold the data. At that point you need a cluster, and a ClickHouse cluster is built from two ideas that people constantly conflate: replication (keeping redundant copies of the same data for availability) and sharding (splitting one logical table across machines for scale). They are independent. You can have one without the other, and most production clusters run both at once.
The piece that ties it together is a coordination service. ClickHouse replicas don’t gossip their state directly — they agree on “what data exists and in what order” through an external consensus store. Historically that was Apache ZooKeeper. Today it’s ClickHouse Keeper, a drop-in, ZooKeeper-compatible service written in C++.
This post is about how those pieces actually fit together when you run ClickHouse yourself.
In this post, we’ll cover:
- The two axes — why replication and sharding are separate decisions
- ClickHouse Keeper — what coordination does and why Keeper replaced ZooKeeper
- ReplicatedMergeTree — how replication works, macros, and
ON CLUSTERDDL - Distributed tables and sharding — fan-out reads, sharded writes, and
internal_replication - ClickHouse Cloud — why SharedMergeTree means you may not manage any of this
- Pitfalls — the mistakes that bite real clusters
The two axes: replication vs sharding
Replication and sharding solve different problems, and it helps to hold them apart in your head before you write a single line of config.
Replication is about availability and read throughput. Each shard keeps two or more identical copies on different servers. If a node dies, its replica still has every row, and queries keep working. Reads can be spread across replicas. Replication does nothing for capacity — every replica stores the full dataset of its shard.
Sharding is about capacity and write/scan throughput. You split one logical table into N pieces by a sharding key, and each shard lives on a different set of servers. A 6 TB table becomes three 2 TB shards. Scans run in parallel across shards and the results are merged. Sharding does nothing for availability on its own — lose a shard with no replica and you’ve lost a third of your data.
In practice you combine them. A typical “3 shards × 2 replicas” cluster has six ClickHouse servers: three shards for capacity, each shard mirrored once for fault tolerance. The cluster topology — which servers belong to which shard — lives in a remote_servers section of the config:
<remote_servers>
<analytics_cluster>
<shard>
<internal_replication>true</internal_replication>
<replica><host>ch-1</host><port>9000</port></replica>
<replica><host>ch-2</host><port>9000</port></replica>
</shard>
<shard>
<internal_replication>true</internal_replication>
<replica><host>ch-3</host><port>9000</port></replica>
<replica><host>ch-4</host><port>9000</port></replica>
</shard>
<shard>
<internal_replication>true</internal_replication>
<replica><host>ch-5</host><port>9000</port></replica>
<replica><host>ch-6</host><port>9000</port></replica>
</shard>
</analytics_cluster>
</remote_servers>
Hold onto that internal_replication flag — it matters later, and getting it wrong is one of the most common ways to end up with duplicated rows.
ClickHouse Keeper: the coordination layer
Replicated tables need somewhere to agree on shared state: which data parts exist, what order inserts happened in, which replica is currently the leader for a merge. That agreement is a classic distributed-consensus problem, and ClickHouse delegates it to an external service that speaks the ZooKeeper protocol.
For years that meant running an Apache ZooKeeper ensemble — a JVM service with its own tuning, GC pauses, and operational quirks that had nothing to do with ClickHouse. ClickHouse Keeper was built to replace it: a C++ implementation of the same coordination API, using the Raft consensus algorithm instead of ZooKeeper’s ZAB protocol. It’s been production-ready since 2022 and is the recommended option for new clusters.
The practical wins are real. Keeper ships inside the ClickHouse binary, so there’s no separate Java runtime to operate. It uses dramatically less memory — ClickHouse’s own benchmarks report roughly 46x less memory than ZooKeeper for the same request volume — and it compresses its logs and snapshots. Because reads and writes go through Raft, it offers linearizable consistency.
A minimal Keeper config defines the server’s identity and the ensemble it belongs to:
<keeper_server>
<tcp_port>9181</tcp_port>
<server_id>1</server_id>
<raft_configuration>
<server><id>1</id><hostname>keeper-1</hostname><port>9234</port></server>
<server><id>2</id><hostname>keeper-2</hostname><port>9234</port></server>
<server><id>3</id><hostname>keeper-3</hostname><port>9234</port></server>
</raft_configuration>
</keeper_server>
Run three Keeper nodes for production — that’s the minimum that tolerates one failure while keeping a quorum. Five tolerate two. An even number buys you nothing because Raft needs a majority, so three and four both only tolerate a single loss. Keeper can run co-located on the ClickHouse hosts for small clusters, but on busy clusters give it dedicated nodes so a heavy query can’t starve coordination traffic.
ClickHouse then learns about Keeper through a zookeeper config section (the name is kept for compatibility):
<zookeeper>
<node><host>keeper-1</host><port>9181</port></node>
<node><host>keeper-2</host><port>9181</port></node>
<node><host>keeper-3</host><port>9181</port></node>
</zookeeper>
If you’re migrating off an existing ZooKeeper ensemble, the snapshot formats are incompatible, but the bundled clickhouse-keeper-converter tool turns ZooKeeper data into Keeper snapshots so you don’t lose your replication state.
ReplicatedMergeTree: how replication actually works
Replication in ClickHouse is a property of the table engine, not the cluster. You opt in by using a Replicated* engine — most commonly ReplicatedMergeTree. Each replica of the table registers itself in Keeper under a shared path, and from then on every insert, merge, and mutation is recorded in a replication log that all replicas consume.
The engine takes two parameters: a Keeper path that identifies the table’s shard, and a replica name that identifies this particular copy.
CREATE TABLE events_local ON CLUSTER analytics_cluster
(
event_time DateTime,
user_id UInt64,
event_type LowCardinality(String),
payload String
)
ENGINE = ReplicatedMergeTree(
'/clickhouse/tables/{shard}/events_local',
'{replica}'
)
PARTITION BY toYYYYMM(event_time)
ORDER BY (event_type, event_time);
The {shard} and {replica} tokens are macros, resolved per-server from a <macros> block in each node’s config:
<macros>
<shard>01</shard>
<replica>ch-1</replica>
</macros>
This is what lets you run the exact same CREATE TABLE statement on every node. Server ch-1 expands the path to /clickhouse/tables/01/events_local with replica ch-1; its sibling ch-2 expands to the same shard path but replica ch-2. Because the Keeper path is identical, the two nodes recognize each other as replicas of the same shard. Get the macros wrong — say, two nodes that should be replicas resolve to different shard paths — and they’ll silently diverge into separate datasets. This is the single most common ClickHouse cluster misconfiguration.
Once registered, the mechanics are straightforward. When you insert into a replica, it writes a local data part and appends an entry to the replication log in Keeper. The other replicas see the new log entry and fetch the part directly from the originating replica over the interserver HTTP port. Merges work the same way: one replica is elected to plan the merge, records its decision in the log, and the others either replay it or fetch the merged result. The data movement is replica-to-replica; Keeper only stores the metadata about what should exist, not the data itself.
If you need a write to land on more than one replica before the insert returns, set insert_quorum. The default fire-and-forget behavior is fast but means a node failure immediately after an insert, before replication catches up, can briefly hide those rows from the surviving replica.
See what QueryPlane can build for you
Connect to your database, write SQL with AI, and build shareable apps — all from your browser.
Distributed tables and sharding
Replication gives you copies; it does nothing to split data across shards. That’s the job of the Distributed engine. A Distributed table stores no data of its own — it’s a thin routing layer that sits in front of the per-shard local tables and fans queries out across the cluster.
The pattern is to keep two tables: the ReplicatedMergeTree from above (which physically stores one shard’s data on each server), and a Distributed table layered on top that knows about all shards.
CREATE TABLE events ON CLUSTER analytics_cluster
AS events_local
ENGINE = Distributed(
'analytics_cluster', -- cluster name from remote_servers
currentDatabase(),
'events_local', -- the underlying local table
cityHash64(user_id) -- sharding key
);
When you SELECT from events, ClickHouse sends the query to one replica of every shard, each shard scans its local data in parallel, and the initiator merges the partial results. Aggregations are executed in two stages: each shard computes a partial GROUP BY over its own rows, ships the intermediate aggregate states back, and the initiating node merges them into the final answer. This two-stage execution is why distributed GROUP BY scales — most of the work happens in parallel on the shards, and only compact aggregate states cross the network.
Writes are where the internal_replication flag from the cluster config comes back to bite people. When you insert into the Distributed table, it evaluates the sharding key for each row and routes it to the correct shard. How it writes within a shard depends on the flag:
internal_replication = true(what you almost always want): theDistributedtable writes to just one replica of the target shard, and ReplicatedMergeTree replicates the data to the other replicas. The write happens once.internal_replication = false: theDistributedtable itself writes to every replica of the shard. Combined with ReplicatedMergeTree — which also replicates — you get the data written twice and end up with duplicates.
So the rule is: if your local tables are Replicated*, set internal_replication = true and let the engine handle copies. Only use false for non-replicated MergeTree local tables where the Distributed table is your only replication mechanism.
You can also skip the Distributed table for writes entirely and insert directly into the events_local table on a specific node. That bypasses sharding — the rows stay on whatever node you wrote to — which is sometimes exactly what you want for a load process that shards data itself.
Choosing a sharding key
The sharding key decides which shard each row lands on, and a bad choice is painful to undo. Two properties matter.
First, even distribution. A key like cityHash64(user_id) spreads rows uniformly so no single shard becomes a hotspot. Sharding by something low-cardinality or skewed — country, say, where one country dominates — piles most of your data onto one shard and defeats the purpose.
Second, query locality. If most queries filter or join on a particular column, sharding by that column means a query can sometimes be answered by a single shard instead of fanning out to all of them. The classic example is multi-tenant data sharded by tenant_id: a per-tenant query hits one shard. The tradeoff is that an uneven tenant distribution reintroduces hotspots, so this only works when tenants are roughly balanced.
The uncomfortable truth is that ClickHouse has no built-in online resharding. Adding a shard doesn’t automatically rebalance existing data — new writes spread across the larger cluster, but old data stays put until you manually move partitions or re-insert. Plan shard count with headroom, and prefer a key that stays balanced as you grow.
ON CLUSTER and the DDL queue
Notice the ON CLUSTER analytics_cluster clause on every CREATE TABLE above. Running DDL on one node only changes that node; ON CLUSTER tells ClickHouse to apply the statement across the whole cluster. It does this through a distributed DDL queue stored in Keeper (under /clickhouse/task_queue/ddl by default). Each node watches the queue, executes the statement locally, and reports back. This is how you keep schemas consistent across dozens of servers without scripting an SSH loop — and it’s another reason a healthy Keeper ensemble is non-negotiable. If Keeper is down, ON CLUSTER DDL can’t make progress.
ClickHouse Cloud changes the calculus
Everything above describes a self-managed cluster. ClickHouse Cloud takes a different architecture: instead of ReplicatedMergeTree copying data between local disks, it uses SharedMergeTree, which stores all durable data in shared object storage (S3, GCS, or Azure Blob) and separates compute from storage entirely.
With SharedMergeTree, replicas don’t copy parts to each other at all — every compute node reads the same data directly from object storage, and coordination still flows through Keeper for metadata. Adding a replica is just adding a stateless compute node; there’s no data to sync, so it’s nearly instant. Sharding in the manual Distributed-table sense largely disappears because storage scales independently of compute.
The takeaway: if you’re on ClickHouse Cloud, you mostly don’t manage replication topology or shard keys by hand. If you’re self-hosting on real disks — on Kubernetes, EC2, or bare metal — the ReplicatedMergeTree + Distributed + Keeper model in this post is exactly what you’re operating, and understanding it is the difference between a cluster that heals itself and one that quietly serves stale or duplicated data.
Common pitfalls
Mismatched macros. Two servers that should be replicas resolving to different {shard} paths is the classic silent failure — they never see each other’s data and diverge. Verify with SELECT * FROM system.macros on each node and check system.replicas for the expected replica count.
internal_replication = false with Replicated tables. This double-writes and produces duplicates. With any Replicated* local engine, the flag must be true.
Inserting into the local table when you meant the Distributed table. Direct inserts into events_local skip sharding and pile rows onto one node. Inserts that should be sharded go through the Distributed table.
Running one or two Keeper nodes. A single Keeper is a single point of failure for the entire cluster’s coordination; two can’t form a quorum if one dies. Always run an odd number, at least three.
Forgetting that sharding isn’t online-rebalanceable. Under-provisioning shard count and expecting to “just add a shard later” leads to a painful manual partition-migration project. Size for growth up front.
Querying the local table when you need the cluster-wide view. SELECT count() FROM events_local returns one shard’s count, not the total. Read through the Distributed table for cluster-wide results, or use clusterAllReplicas() for diagnostics.
Frequently asked questions
What is ClickHouse Keeper? ClickHouse Keeper is a coordination service that provides the consensus and metadata storage that replicated tables depend on — tracking which data parts exist, ordering inserts, and coordinating merges. It’s a C++ reimplementation of Apache ZooKeeper’s API using the Raft algorithm, designed specifically for ClickHouse workloads and shipped inside the ClickHouse binary.
Do I still need ZooKeeper for ClickHouse in 2026?
No. ClickHouse Keeper has been production-ready since 2022 and is the recommended coordination service for new clusters. It speaks the same protocol as ZooKeeper, so replicated tables work unchanged, but it uses far less memory and removes the Java dependency. Existing ZooKeeper ensembles can be migrated with the clickhouse-keeper-converter tool.
What’s the difference between replication and sharding in ClickHouse? Replication keeps identical copies of a shard’s data on multiple servers for availability and read throughput — every replica holds the full dataset of its shard. Sharding splits one logical table across servers by a sharding key for capacity and parallel scan throughput — each shard holds only part of the data. They’re independent decisions and most production clusters use both together.
How does ReplicatedMergeTree replication work? Each replica registers under a shared Keeper path. When you insert, the replica writes a local data part and appends an entry to a replication log in Keeper. Other replicas see the log entry and fetch the part directly from the originating replica over the interserver HTTP port. Keeper stores only the metadata about what should exist; the actual data moves replica-to-replica.
What is a Distributed table in ClickHouse? A Distributed table is a routing layer that stores no data itself. It sits in front of the per-shard local tables and fans queries out: reads run in parallel on every shard and the results are merged, while inserts are routed to the correct shard based on the sharding key. You typically pair one Distributed table with a ReplicatedMergeTree local table on each node.
Why am I getting duplicate rows when inserting into a Distributed table?
Almost always because internal_replication is set to false while your local tables use a Replicated* engine. With that combination, the Distributed table writes to every replica and ReplicatedMergeTree replicates the data, so each row lands twice. Set internal_replication = true for replicated local tables.
How many Keeper nodes should I run? Three for production. Raft needs a majority quorum, so three nodes tolerate one failure and five tolerate two. An even number gives no extra fault tolerance over the odd number below it, so always run an odd count — never one or two.
How do I choose a ClickHouse sharding key?
Pick a key that distributes rows evenly (a hash of a high-cardinality column like cityHash64(user_id) is a safe default) and, when possible, matches how you query. Sharding by tenant_id, for example, lets per-tenant queries hit a single shard — but only works if tenants are balanced. Avoid low-cardinality or skewed keys that create hotspots, since ClickHouse can’t reshard existing data online.
Wrapping up
A ClickHouse cluster is two independent ideas plus a coordination service:
- Replication (ReplicatedMergeTree) keeps redundant copies for availability, coordinating through Keeper.
- Sharding (the Distributed engine) splits data for capacity and parallel scans.
- ClickHouse Keeper is the consensus layer both rely on — run three nodes, keep it healthy, and prefer it over ZooKeeper.
Get the macros right, set internal_replication = true with replicated tables, and size your shard count for growth, and a self-managed cluster will heal around failures instead of quietly corrupting your data. If you’re on ClickHouse Cloud, SharedMergeTree hides most of this behind object storage — but knowing the model still pays off when you reason about consistency and cost.
If you’re designing the tables that live on these shards, pair this with our guides to ClickHouse PARTITION BY, ORDER BY, and PRIMARY KEY and ReplacingMergeTree, and reach for projections when a single ORDER BY can’t serve every query. For the day-to-day tooling side of inspecting a cluster, see our roundup of the best ClickHouse GUI tools — and if you want to build dashboards and internal apps directly on a ClickHouse cluster, QueryPlane connects to ClickHouse and runs your queries with your own credentials.