Skip to content

Clickhouse

ClickhouseAsync

Bases: BaseModel

An async clickhouse client.

Usage
from tradingtoolbox.clickhouse import ClickhouseAsync

async_client = await ClickhouseAsync.create(
    host="localhost",
    port=8123,
    user="default",
    password="",
    database="default",
)

# Or you can create it without passing anything, and it will use the above defaults
async_client = await ClickhouseAsync.create()

create_table

create_table(table_name: str, schema: str, partition_key: str = '', order_by: str = '', primary_key: str = '', drop: bool = False)

Create a table

Parameters:

Name Type Description Default
table_name str

The name of the table

required
schema str

The schema of the table

required
partition_key str

The partition key

''
order_by str

The order by

''
primary_key str

The primary key

''
drop bool

Whether to drop the table if it already exists

False
Source code in src/tradingtoolbox/clickhouse/clickhouse_async.py
    async def create_table(
        self,
        table_name: str,
        schema: str,
        partition_key: str = "",
        order_by: str = "",
        primary_key: str = "",
        drop: bool = False,
    ):
        """
        Create a table

        Parameters:
            table_name: The name of the table
            schema: The schema of the table
            partition_key: The partition key
            order_by: The order by
            primary_key: The primary key
            drop: Whether to drop the table if it already exists
        """
        if drop:
            await self.drop_table(table_name)

        query_string = f"""
CREATE TABLE IF NOT EXISTS {table_name} (
    {schema}
)
ENGINE = ReplacingMergeTree()
{partition_key}
{order_by}
{primary_key}
"""
        # print(query_string)
        return await self.execute_command(query_string)

drop_table

drop_table(table_name: str)

Drops a table from the database

Parameters:

Name Type Description Default
table_name str

The table's name

required
Source code in src/tradingtoolbox/clickhouse/clickhouse_async.py
async def drop_table(self, table_name: str):
    """
    Drops a table from the database

    Parameters:
        table_name: The table's name
    """
    return await self.execute_command(f"DROP TABLE IF EXISTS {table_name}")

execute_command

execute_command(query: str)

Allows you to run any command you want

Parameters:

Name Type Description Default
query str

The query to execute

required
Source code in src/tradingtoolbox/clickhouse/clickhouse_async.py
async def execute_command(self, query: str):
    """
    Allows you to run any command you want

    Parameters:
        query: The query to execute
    """
    await self.async_client.command(query)

insert_df

insert_df(df: DataFrame, table_name: str, drop: bool = True)

Parameters:

Name Type Description Default
df DataFrame

The dataframe to insert

required
table_name str

The name of the table to insert into

required
drop bool

Whether to drop the table if it already exists

True
Source code in src/tradingtoolbox/clickhouse/clickhouse_async.py
async def insert_df(self, df: pd.DataFrame, table_name: str, drop: bool = True):
    """
    Parameters:
        df: The dataframe to insert
        table_name: The name of the table to insert into
        drop: Whether to drop the table if it already exists
    """
    if drop:
        await self.drop_table(table_name)
    schema = generate_table_schema(df, table_name)

    await self.async_client.insert_df(table_name, df)
    await self.async_client.command(schema)

optimize_table

optimize_table(table_name: str)

Optimizes a table

Parameters:

Name Type Description Default
table_name str

The name of the table

required
Source code in src/tradingtoolbox/clickhouse/clickhouse_async.py
async def optimize_table(self, table_name: str):
    """
    Optimizes a table

    Parameters:
        table_name: The name of the table
    """
    command = f"OPTIMIZE TABLE {table_name} FINAL"
    return await self.execute_command(command)

query

query(query: str)

Runs a query

Parameters:

Name Type Description Default
query str

The query to run

required
Source code in src/tradingtoolbox/clickhouse/clickhouse_async.py
async def query(self, query: str):
    """
    Runs a query

    Parameters:
        query: The query to run
    """
    return await self.async_client.query(query)

query_df

query_df(query: str)

Runs a query and returns the results as a dataframe

Parameters:

Name Type Description Default
query str

The query to run

required
Source code in src/tradingtoolbox/clickhouse/clickhouse_async.py
async def query_df(self, query: str):
    """
    Runs a query and returns the results as a dataframe

    Parameters:
        query: The query to run
    """
    return await self.async_client.query_df(query)

ClickhouseSync

Bases: BaseModel

An sync clickhouse client.

Usage
from tradingtoolbox.clickhouse import ClickhouseSync

client = ClickhouseSync.create(
    host="localhost",
    port=8123,
    user="default",
    password="",
    database="default",
)

# Or you can create it without passing anything, and it will use the above defaults
async_client = ClickhouseSync.create()

create_table

create_table(table_name: str, schema: str, partition_key: str = '', order_by: str = '', primary_key: str = '', drop: bool = False)

Create a table

Parameters:

Name Type Description Default
table_name str

The name of the table

required
schema str

The schema of the table

required
partition_key str

The partition key

''
order_by str

The order by

''
primary_key str

The primary key

''
drop bool

Whether to drop the table if it already exists

False
Source code in src/tradingtoolbox/clickhouse/clickhouse_sync.py
    def create_table(
        self,
        table_name: str,
        schema: str,
        partition_key: str = "",
        order_by: str = "",
        primary_key: str = "",
        drop: bool = False,
    ):
        """
        Create a table

        Parameters:
            table_name: The name of the table
            schema: The schema of the table
            partition_key: The partition key
            order_by: The order by
            primary_key: The primary key
            drop: Whether to drop the table if it already exists
        """
        if drop:
            self.drop_table(table_name)

        query_string = f"""
CREATE TABLE IF NOT EXISTS {table_name} (
    {schema}
)
ENGINE = ReplacingMergeTree()
{partition_key}
{order_by}
{primary_key}
"""
        # print(query_string)
        return self.execute_command(query_string)

drop_table

drop_table(table_name: str)

Drops a table from the database

Parameters:

Name Type Description Default
table_name str

The table's name

required
Source code in src/tradingtoolbox/clickhouse/clickhouse_sync.py
def drop_table(self, table_name: str):
    """
    Drops a table from the database

    Parameters:
        table_name: The table's name
    """
    return self.execute_command(f"DROP TABLE IF EXISTS {table_name}")

execute_command

execute_command(msg: str)

Allows you to run any command you want

Parameters:

Name Type Description Default
query

The query to execute

required
Source code in src/tradingtoolbox/clickhouse/clickhouse_sync.py
def execute_command(self, msg: str):
    """
    Allows you to run any command you want

    Parameters:
        query: The query to execute
    """
    self.client.command(msg)

insert_df

insert_df(df: DataFrame, table_name: str, drop=True)

Parameters:

Name Type Description Default
df DataFrame

The dataframe to insert

required
table_name str

The name of the table to insert into

required
drop

Whether to drop the table if it already exists

True
Source code in src/tradingtoolbox/clickhouse/clickhouse_sync.py
def insert_df(self, df: pd.DataFrame, table_name: str, drop=True):
    """
    Parameters:
        df: The dataframe to insert
        table_name: The name of the table to insert into
        drop: Whether to drop the table if it already exists
    """
    if drop:
        self.drop_table(table_name)
    schema = generate_table_schema(df, table_name)

    self.client.command(schema)
    self.client.insert_df(table_name, df)

optimize_table

optimize_table(table_name: str)

Optimizes a table

Parameters:

Name Type Description Default
table_name str

The name of the table

required
Source code in src/tradingtoolbox/clickhouse/clickhouse_sync.py
def optimize_table(self, table_name: str):
    """
    Optimizes a table

    Parameters:
        table_name: The name of the table
    """
    command = f"OPTIMIZE TABLE {table_name} FINAL"
    return self.execute_command(command)

query

query(query: str)

Runs a query

Parameters:

Name Type Description Default
query str

The query to run

required
Source code in src/tradingtoolbox/clickhouse/clickhouse_sync.py
def query(self, query: str):
    """
    Runs a query

    Parameters:
        query: The query to run
    """
    return self.client.query(query)

query_df

query_df(query: str)

Runs a query and returns the results as a dataframe

Parameters:

Name Type Description Default
query str

The query to run

required
Source code in src/tradingtoolbox/clickhouse/clickhouse_sync.py
def query_df(self, query: str):
    """
    Runs a query and returns the results as a dataframe

    Parameters:
        query: The query to run
    """
    return self.client.query_df(query)