Class: Karafka::TimeTrackers::PartitionUsage

Inherits:
Base
  • Object
show all
Defined in:
lib/karafka/time_trackers/partition_usage.rb

Overview

Note:

We do not track revocation as on revocation we clear given topic partition reference not to have a potential memory leak

Note:

We do not track shutdown jobs as shutdown is finishing the process, so no time sensitive operations remain that would use this

Note:

We consider partition as active if we scheduled any job related to it within the tick interval. This has nothing to do whether a partition is assigned.

Tracker used to keep time reference when we last time dispatched any job related to a given topic partition.

We can use it to know when last time a job was scheduled

Instance Method Summary collapse

Constructor Details

#initializePartitionUsage

Creates new partition usage time tracker



20
21
22
23
24
25
26
27
28
# File 'lib/karafka/time_trackers/partition_usage.rb', line 20

def initialize
  super

  @last_usage = Hash.new do |topics_hash, topic_name|
    topics_hash[topic_name] = Hash.new do |partitions_hash, partition_id|
      partitions_hash[partition_id] = 0
    end
  end
end

Instance Method Details

#active?(topic, partition, interval) ⇒ Boolean

Returns was this topic partition active.

Parameters:

  • topic (String)
  • partition (Integer)
  • interval (Integer)

    minimum interval

Returns:

  • (Boolean)

    was this topic partition active



34
35
36
# File 'lib/karafka/time_trackers/partition_usage.rb', line 34

def active?(topic, partition, interval)
  monotonic_now - @last_usage[topic][partition] < interval
end

#revoke(topic, partition) ⇒ Object

Clears references about given partition. Useful on revocation so we do not store old unassigned partitions data

Parameters:

  • topic (String)
  • partition (Integer)


51
52
53
# File 'lib/karafka/time_trackers/partition_usage.rb', line 51

def revoke(topic, partition)
  @last_usage[topic].delete(partition)
end

#track(topic, partition) ⇒ Object

Marks usage of given partition

Parameters:

  • topic (String)
  • partition (Integer)


42
43
44
# File 'lib/karafka/time_trackers/partition_usage.rb', line 42

def track(topic, partition)
  @last_usage[topic][partition] = monotonic_now
end