Stucked queries

Hi,
I am trying to replace my cassandra db with yugabyte. I’m using python cassandra driver and there is an issue with execute_concurrent. Since execute_concurrent doesn’t have timeout on query execute (it’s explicitly set to no timeout inside the driver), sometimes a query gets ‘stucked’ waiting for a response from yugabyte. Looking at logs, I don’t see any server side timeout indications, are they even logged and returned to the client? Only thing that I noticed that works in that case, is truncating the table I’m writing in. In that case, client proceeds with queries execution. Could you help me with this?

Hi @Martina

What queries are you sending with execute_concurrent? And what’s the table schema? Are there any steps we can reproduce this?

CREATE KEYSPACE xx WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'}  AND durable_writes = true;

CREATE TABLE xx.yy (
    field1 blob,
    field2 text,
    field3 blob,
    value blob,
    PRIMARY KEY (field1, field2, field3)
) WITH CLUSTERING ORDER BY (field2 ASC, field3 ASC)
    AND default_time_to_live = 0
    AND transactions = {'enabled': 'false'};

That is the schema, and queries are inserts to this table.
The code:

#!/usr/bin/python

from cassandra.cluster import Cluster
from cassandra.concurrent import execute_concurrent

from test.data import batch


def main():
    session = Cluster(contact_points=['yb-tserver']).connect()
    prepared_statement = session.prepare('INSERT INTO xx.yy (field1, field2, field3, value) VALUES (?, ?, ?, ?)')

    statements = []

    while True:
        for field1, value in batch:
            statements.append((prepared_statement, (field1, 'DUMMY', '\x00' * 16, value)))

            if len(statements) >= 1000:
                execute_concurrent(session, statements, raise_on_first_error=True)
                statements = []


if __name__ == '__main__':
    main()

It gets stuck on random execute, not always on the same record, after a day or 2 of going through the same batch of 1000 records. I’m using docker-compose to setup Yugabyte DB.

@Martina is the database experiencing high cpu/io usage when the script hangs?

Can you try connecting another client to the database and trying inserts/selects and see if they work?

What are the server resources of the container?

No, the database is not experiencing high cpu nor io usage when the script hangs. Containers run on a large server with 128 cores and 500GB of memory, and currently only yb-tserver, yb-master and the script container are running on this server. Yes, I tried connecting another client, insterts/selects/deletes/updates work with the new client, but the script still hangs. Only truncate would trigger the script to continue.

Hmm, maybe it hanged on “while True” after all the data finished in batch on the code above?

What happens if you shut down the database while the script is hanging, do you get an exception? If yes, can you paste it?

Batch contains only 1000 records, after the data is flushed the same batch is repeated again (while true), I added While True as a mechanism to reproduce this in controlled environment, the original script continuously consumed new data, but here I managed to reproduce the behaviour with the same batch of data that works N times, than hangs randomly after some time.
I tried shuting down yb-tserver and got:

Error NoHostAvailable
Traceback (most recent call last):
  File "test.py", line 42, in main
    execute_concurrent(session, statements, raise_on_first_error=True)
  File "/devel/cassandra/concurrent.py", line 98, in execute_concurrent
    return executor.execute(concurrency, raise_on_first_error)
  File "/devel/cassandra/concurrent.py", line 206, in execute
    return super(ConcurrentExecutorListResults, self).execute(concurrency, fail_fast)
  File "/devel/cassandra/concurrent.py", line 125, in execute
    return self._results()
  File "/devel/cassandra/concurrent.py", line 224, in _results
    self._raise(self._exception)
  File "/devel/cassandra/concurrent.py", line 172, in _raise
    raise exc
NoHostAvailable: ('Unable to complete the operation against any hosts', {<Host: 172.18.0.3:9042 datacenter1>: ConnectionShutdown('Connection to 172.18.0.3:9042 was closed',)})
Traceback (most recent call last):
  File "test.py", line 52, in <module>
    main()
  File "test.py", line 42, in main
    execute_concurrent(session, statements, raise_on_first_error=True)
  File "/devel/cassandra/concurrent.py", line 98, in execute_concurrent
    return executor.execute(concurrency, raise_on_first_error)
  File "/devel/cassandra/concurrent.py", line 206, in execute
    return super(ConcurrentExecutorListResults, self).execute(concurrency, fail_fast)
  File "/devel/cassandra/concurrent.py", line 125, in execute
    return self._results()
  File "/devel/cassandra/concurrent.py", line 224, in _results
    self._raise(self._exception)
  File "/devel/cassandra/concurrent.py", line 172, in _raise
    raise exc
cassandra.cluster.NoHostAvailable: ('Unable to complete the operation against any hosts', {<Host: 172.18.0.3:9042 datacenter1>: ConnectionShutdown('Connection to 172.18.0.3:9042 was closed',)})

What about running this code? Does it hang again ?

def random_string(length):
    import random
    import string
    return ''.join(random.choices(string.ascii_uppercase + string.digits, k=length))


def check_code():
    """
    CREATE KEYSPACE ybdemo WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'}  AND durable_writes = true;

    CREATE TABLE ybdemo.yy (
        field1 blob,
        field2 text,
        field3 blob,
        value blob,
        PRIMARY KEY (field1, field2, field3)
    ) WITH CLUSTERING ORDER BY (field2 ASC, field3 ASC)
        AND default_time_to_live = 0
        AND transactions = {'enabled': 'false'};

    """
    from cassandra.cluster import Cluster
    from cassandra.concurrent import execute_concurrent

    session = Cluster().connect()
    prepared_statement = session.prepare(
        'INSERT INTO ybdemo.yy (field1, field2, field3, value) VALUES (?, ?, ?, ?)')

    statements = []
    total = 0
    while True:
        statements.append((prepared_statement, (random_string(8).encode(), 'DUMMY', b'\x00' * 16, random_string(5).encode())))
        if len(statements) >= 1000:
            execute_concurrent(session, statements, raise_on_first_error=True)
            total += len(statements)
            statements = []
            print(f"inserted {total}")

    exit()


check_code()

If yes, at which batch?

So, I tried this and I get OperationTimedOut Error after 12 hours, count: 182461000, but no hang. Could it be that repeating keys are causing this in my use case?

Maybe? I don’t believe so, the database should be able to compact them. I would still need a way to reproduce though. Should I make the random keys range smaller? How small is the range in your test dataset?

No, the keys are pretty random and all of the same size, but it’s possible to have some repeating keys. I’ll try to run the same test with larger values, and then maybe with different value sizes to try to reproduce the stall with random generated dataset.

I suspected changing value length could be the cause of the issue so I tried another test and managed to reproduce this issue with

statements.append((prepared_statement, (random_string(8).encode('hex'), MSG_TYPE, EMPTY_RECORD_ID, random_string(random.choice([8,2000])).encode('hex'))))

It hanged after ~24h (count: 134590000)

Can you run it in debug mode to pause the threads and see where it hangs after 24h?
Your code is different from mine, can you use the same code?