Skip to main content
Skip to main content
Edit this page

Python connector guide

This guide covers the full lifecycle of building a Python connector, ETL pipeline, or BI backend on top of ClickHouse using the clickhouse-connect library. It is opinionated: it recommends specific patterns and calls out known pitfalls.

For the complete API reference, see the Python client reference. This guide pairs with the ingestion patterns and consumption patterns guides.

Installation

pip install clickhouse-connect

On Windows, zoneinfo (used internally by clickhouse-connect for timezone handling) requires the tzdata package, which is not bundled with the Windows Python distribution:

pip install clickhouse-connect tzdata

This is not needed on Linux or macOS, where the system timezone database is available at /usr/share/zoneinfo.

Creating a client

Use clickhouse_connect.get_client() to create a connected client. For ClickHouse Cloud, always set secure=True and use port 8443:

import os
import clickhouse_connect

client = clickhouse_connect.get_client(
    host=os.environ["CH_HOST"],
    port=8443,
    username=os.environ["CH_USER"],
    password=os.environ["CH_PASSWORD"],
    secure=True,
)

Read credentials from environment variables rather than hardcoding them. For self-managed ClickHouse over plain HTTP, omit secure or set it to False and use port 8123.

The client is thread-safe and intended to be shared across threads. Create one instance per process rather than one per request.

Verifying connectivity

result = client.query("SELECT version()")
print(result.first_row[0])

Schema discovery

Use system.* tables for schema introspection. Do not use INFORMATION_SCHEMA for connector-level metadata — it omits ClickHouse-specific fields like is_in_sorting_key and does not expose type modifiers like LowCardinality.

Listing databases

result = client.query(
    "SELECT name FROM system.databases WHERE engine != 'System' ORDER BY name"
)
databases = [row[0] for row in result.result_rows]

Listing columns

result = client.query(
    """
    SELECT
        table,
        name,
        type,
        is_in_sorting_key,
        is_in_primary_key,
        comment
    FROM system.columns
    WHERE database = {db:String}
      AND table = {tbl:String}
    ORDER BY position
    """,
    parameters={"db": database, "tbl": table},
)

columns = [
    {
        "name": row[1],
        "type": row[2],
        "is_in_sorting_key": bool(row[3]),
        "is_in_primary_key": bool(row[4]),
        "comment": row[5],
    }
    for row in result.result_rows
]

system.columns.type returns the full type string including modifiers — for example Nullable(LowCardinality(String)) or Array(DateTime64(3, 'UTC')). INFORMATION_SCHEMA.COLUMNS.DATA_TYPE strips these wrappers and loses information your connector needs for correct type mapping.

Parsing type modifiers

Strip Nullable and LowCardinality wrappers before mapping to Python types. Both wrappers can be nested in any order:

def unwrap_type(ch_type: str) -> tuple[str, bool]:
    """Return (inner_type, is_nullable) after stripping Nullable and LowCardinality."""
    nullable = False
    t = ch_type.strip()
    changed = True
    while changed:
        changed = False
        if t.startswith("Nullable(") and t.endswith(")"):
            t = t[len("Nullable("):-1].strip()
            nullable = True
            changed = True
        if t.startswith("LowCardinality(") and t.endswith(")"):
            t = t[len("LowCardinality("):-1].strip()
            changed = True
    return t, nullable
inner, is_nullable = unwrap_type("Nullable(LowCardinality(String))")
# inner = "String", is_nullable = True

Type mapping

clickhouse-connect deserializes ClickHouse values into Python types automatically. The table below shows the default mappings:

ClickHouse typePython typeNotes
Int8, Int16, Int32, Int64int
UInt8, UInt16, UInt32int
UInt64intValues above 2^63 - 1 are valid; handle them as Python int (arbitrary precision)
Int128, Int256, UInt128, UInt256intPython int is arbitrary precision — no overflow
Float32, Float64float
Decimal32/64/128/256decimal.Decimal
Stringstr
FixedString(N)strclickhouse-connect strips trailing null bytes automatically
Datedatetime.date
Date32datetime.dateExtended range
DateTimedatetime.datetimeTimezone-aware if column has explicit timezone; naive (UTC) otherwise
DateTime64(n)datetime.datetimeSub-second precision; timezone-aware if column has explicit timezone
UUIDuuid.UUID
IPv4ipaddress.IPv4Address
IPv6ipaddress.IPv6Address
Boolbool
Array(T)listNested arrays are nested lists
Map(K, V)dict
Tuple(T1, T2, ...)tuple
Nullable(T)T or None

Timezone-aware datetime parameters

When passing datetime values as query parameters, include timezone information:

from datetime import datetime, timezone

client.query(
    "SELECT * FROM events WHERE ts > {min_ts:DateTime64(3)}",
    parameters={"min_ts": datetime(2024, 1, 1, tzinfo=timezone.utc)},
)

On clickhouse-connect versions before 0.7.x, a bug caused naive datetime parameters to be interpreted as local time and then shifted incorrectly to UTC, producing a ~5-hour offset in common US timezones. Always attach timezone.utc or the correct ZoneInfo timezone when constructing datetime parameters.

Querying

Streaming query results

Use client.query_rows_stream() for any result set that might exceed 10,000 rows. The non-streaming client.query() buffers the entire result in memory before returning.

with client.query_rows_stream(
    "SELECT user_id, event, ts FROM events WHERE date = {d:Date}",
    parameters={"d": date(2024, 1, 15)},
) as stream:
    for row in stream:
        process(row[0], row[1], row[2])

The context manager ensures the underlying connection is released when iteration ends or if an exception is raised mid-stream. Do not hold the stream open longer than needed.

Parameterized queries

Always use parameterized queries for any value that comes from user input or external data. ClickHouse uses {name:Type} syntax in the SQL string:

from datetime import date

result = client.query(
    """
    SELECT
        user_id,
        count() AS events,
        max(ts) AS last_seen
    FROM events
    WHERE user_id = {user_id:String}
      AND date >= {min_date:Date}
    GROUP BY user_id
    """,
    parameters={
        "user_id": "alice",
        "min_date": date(2024, 1, 1),
    },
)

The {name:Type} syntax is ClickHouse server-side parameter binding. The value is sent separately and never interpolated into the SQL string, so SQL injection is not possible.

Query tagging

Tag every query with a query_id and a log_comment so queries are attributable in system.query_log. This is essential for debugging customer issues and for identifying which part of your connector generated a slow query:

result = client.query(
    "SELECT count() FROM events",
    query_id="connector:schema-check:job-42",
    settings={"log_comment": "connector:schema-discovery"},
)

Use a deterministic query_id derived from your job or request context. When retrying after a timeout, reuse the same query_id — ClickHouse will return the result of the already-running query rather than starting a duplicate.

Inserting data

Inserting lists of dicts

Pass a list of dicts (or a list of lists) and specify column names explicitly:

rows = [
    {"user_id": "alice", "event": "login",  "ts": datetime(2024, 1, 15, 12, 0, 0)},
    {"user_id": "bob",   "event": "signup", "ts": datetime(2024, 1, 15, 12, 1, 0)},
]

client.insert(
    "events",
    data=rows,
    column_names=["user_id", "event", "ts"],
)

You can also pass a list of lists or tuples — column order must then match column_names exactly.

Batch sizing

Target 10,000–100,000 rows per insert() call. Each call creates a new data part on disk; ClickHouse merges parts asynchronously. Inserting one row at a time (or even one hundred rows at a time) causes the part count to exceed ClickHouse's default threshold and raises:

DB::Exception: Too many parts (300). Merges are processing significantly slower than inserts.

Never call client.insert() inside a per-row loop. Accumulate rows in memory and flush in batches.

InsertContext for high-throughput

For pipelines that insert into the same table repeatedly, create an InsertContext once and reuse it across insert calls. This avoids a pre-insert query to look up column types on every call:

context = client.create_insert_context(
    table="events",
    column_names=["user_id", "event", "ts"],
)

for batch in batches:
    context.data = batch
    client.insert(context=context)

The InsertContext holds the column schema and serialization state. Reusing it has measurable throughput impact at high insert rates.

Async insert

When multiple producers are writing small payloads, use server-side async buffering to avoid the "too many parts" problem. The server buffers incoming inserts and flushes them in bulk:

client.insert(
    "events",
    data=rows,
    column_names=["user_id", "event", "ts"],
    settings={
        "async_insert": 1,
        "wait_for_async_insert": 1,
    },
)

wait_for_async_insert=1 blocks until the server confirms the data has been written. With wait_for_async_insert=0, the call returns immediately but a type error in any row silently drops the entire buffered batch with no error returned to the client. Use wait_for_async_insert=1 in all connectors unless you have a specific reason to accept silent drops.

Idempotent inserts

Set insert_deduplication_token to a stable string per logical batch. On retry after a network error, send the same token — if the original insert succeeded, ClickHouse silently skips the retry:

client.insert(
    "events",
    data=rows,
    column_names=["user_id", "event", "ts"],
    settings={"insert_deduplication_token": "pipeline-job-001-batch-042"},
)

Derive the token from your job and batch identifiers, not from the data itself. The deduplication window covers the last 100 inserts by default; tokens older than that window are no longer tracked and will not deduplicate.

Error handling

Catch clickhouse_connect.driver.exceptions.DatabaseError for server-reported errors. The ClickHouse error code is available as e.code:

from clickhouse_connect.driver.exceptions import DatabaseError
import time

def insert_with_retry(client, table, rows, column_names, max_retries=3):
    token = f"job-{job_id}-batch-{batch_num}"
    for attempt in range(max_retries):
        try:
            client.insert(
                table,
                data=rows,
                column_names=column_names,
                settings={"insert_deduplication_token": token},
            )
            return
        except DatabaseError as e:
            if e.code == 60:
                raise RuntimeError(f"Table {table} does not exist") from e
            if e.code == 241:
                raise RuntimeError("ClickHouse memory limit exceeded") from e
            if e.code == 159:
                raise RuntimeError("Query timed out") from e
            raise
        except Exception:
            if attempt == max_retries - 1:
                raise
            time.sleep(2 ** attempt)

Key error codes for connector developers:

CodeNameAction
60UNKNOWN_TABLEDo not retry; surface to user
81UNKNOWN_DATABASEDo not retry; surface to user
164READONLYDo not retry; check user permissions
241MEMORY_LIMIT_EXCEEDEDDo not retry; reduce batch size or query scope
159TIMEOUT_EXCEEDEDMay retry with a larger max_execution_time setting

For network-level errors (ConnectionError, TimeoutError, OSError), retry with exponential backoff. Always reuse the same insert_deduplication_token on insert retries.