Ctrl+K
Logo image Logo image

Site Navigation

  • API Reference
  • Examples

Site Navigation

  • API Reference
  • Examples

Section Navigation

  • PyFlink Table
  • PyFlink DataStream
    • StreamExecutionEnvironment
    • DataStream
    • Functions
    • State
    • Timer
    • Window
    • Checkpoint
    • Side Outputs
    • Connectors
    • Formats
  • PyFlink Common

pyflink.datastream.state.StateTtlConfig.Builder.cleanup_incrementally#

Builder.cleanup_incrementally(cleanup_size: int, run_cleanup_for_every_record) → Builder#

Cleanup expired state incrementally cleanup local state.

Upon every state access this cleanup strategy checks a bunch of state keys for expiration and cleans up expired ones. It keeps a lazy iterator through all keys with relaxed consistency if backend supports it. This way all keys should be regularly checked and cleaned eventually over time if any state is constantly being accessed.

Additionally to the incremental cleanup upon state access, it can also run per every record. Caution: if there are a lot of registered states using this option, they all will be iterated for every record to check if there is something to cleanup.

if no access happens to this state or no records are processed in case of run_cleanup_for_every_record, expired state will persist.

Time spent for the incremental cleanup increases record processing latency.

Note:

At the moment incremental cleanup is implemented only for Heap state backend. Setting it for RocksDB will have no effect.

Note:

If heap state backend is used with synchronous snapshotting, the global iterator keeps a copy of all keys while iterating because of its specific implementation which does not support concurrent modifications. Enabling of this feature will increase memory consumption then. Asynchronous snapshotting does not have this problem.

Parameters
  • cleanup_size – max number of keys pulled from queue for clean up upon state touch for any key

  • run_cleanup_for_every_record – run incremental cleanup per each processed record

previous

pyflink.datastream.state.StateTtlConfig.Builder.cleanup_full_snapshot

next

pyflink.datastream.state.StateTtlConfig.Builder.cleanup_in_rocksdb_compact_filter

On this page
  • Builder.cleanup_incrementally()
Show Source

Created using Sphinx 5.3.0.