Class: Karafka::TimeTrackers::PartitionUsage
- Defined in:
- lib/karafka/time_trackers/partition_usage.rb
Overview
We do not track revocation as on revocation we clear given topic partition reference not to have a potential memory leak
We do not track shutdown jobs as shutdown is finishing the process, so no time sensitive operations remain that would use this
We consider partition as active if we scheduled any job related to it within the tick interval. This has nothing to do whether a partition is assigned.
Tracker used to keep time reference when we last time dispatched any job related to a given topic partition.
We can use it to know when last time a job was scheduled
Instance Method Summary collapse
-
#active?(topic, partition, interval) ⇒ Boolean
Was this topic partition active.
-
#initialize ⇒ PartitionUsage
constructor
Creates new partition usage time tracker.
-
#revoke(topic, partition) ⇒ Object
Clears references about given partition.
-
#track(topic, partition) ⇒ Object
Marks usage of given partition.
Constructor Details
#initialize ⇒ PartitionUsage
Creates new partition usage time tracker
20 21 22 23 24 25 26 27 28 |
# File 'lib/karafka/time_trackers/partition_usage.rb', line 20 def initialize super @last_usage = Hash.new do |topics_hash, topic_name| topics_hash[topic_name] = Hash.new do |partitions_hash, partition_id| partitions_hash[partition_id] = 0 end end end |
Instance Method Details
#active?(topic, partition, interval) ⇒ Boolean
Returns was this topic partition active.
34 35 36 |
# File 'lib/karafka/time_trackers/partition_usage.rb', line 34 def active?(topic, partition, interval) monotonic_now - @last_usage[topic][partition] < interval end |
#revoke(topic, partition) ⇒ Object
Clears references about given partition. Useful on revocation so we do not store old unassigned partitions data
51 52 53 |
# File 'lib/karafka/time_trackers/partition_usage.rb', line 51 def revoke(topic, partition) @last_usage[topic].delete(partition) end |
#track(topic, partition) ⇒ Object
Marks usage of given partition
42 43 44 |
# File 'lib/karafka/time_trackers/partition_usage.rb', line 42 def track(topic, partition) @last_usage[topic][partition] = monotonic_now end |