Class: Karafka::Pro::Processing::Coordinators::ErrorsTracker

Inherits:
Object
  • Object
show all
Includes:
Enumerable
Defined in:
lib/karafka/pro/processing/coordinators/errors_tracker.rb

Overview

Object used to track errors in between executions to be able to build error-type based recovery flows.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(topic, partition, limit: STORAGE_LIMIT) ⇒ ErrorsTracker

Note:

limit does not apply to the counts. They will work beyond the number of errors occurring

Returns a new instance of ErrorsTracker.

Parameters:

  • topic (Karafka::Routing::Topic)
  • partition (Integer)
  • limit (Integer) (defaults to: STORAGE_LIMIT)

    max number of errors we want to keep for reference when implementing custom error handling.


41
42
43
44
45
46
47
48
# File 'lib/karafka/pro/processing/coordinators/errors_tracker.rb', line 41

def initialize(topic, partition, limit: STORAGE_LIMIT)
  @errors = []
  @counts = Hash.new { |hash, key| hash[key] = 0 }
  @topic = topic
  @partition = partition
  @limit = limit
  @trace_id = SecureRandom.uuid
end

Instance Attribute Details

#countsHash (readonly)

Returns:

  • (Hash)

23
24
25
# File 'lib/karafka/pro/processing/coordinators/errors_tracker.rb', line 23

def counts
  @counts
end

#partitionInteger (readonly)

Returns partition of this error tracker.

Returns:

  • (Integer)

    partition of this error tracker


20
21
22
# File 'lib/karafka/pro/processing/coordinators/errors_tracker.rb', line 20

def partition
  @partition
end

#topicKarafka::Routing::Topic (readonly)

Returns topic of this error tracker.

Returns:


17
18
19
# File 'lib/karafka/pro/processing/coordinators/errors_tracker.rb', line 17

def topic
  @topic
end

#trace_idString (readonly)

Returns:

  • (String)

26
27
28
# File 'lib/karafka/pro/processing/coordinators/errors_tracker.rb', line 26

def trace_id
  @trace_id
end

Instance Method Details

#<<(error) ⇒ Object

Parameters:

  • error (StandardError)

    adds the error to the tracker


57
58
59
60
61
62
# File 'lib/karafka/pro/processing/coordinators/errors_tracker.rb', line 57

def <<(error)
  @errors.shift if @errors.size >= @limit
  @errors << error
  @counts[error.class] += 1
  @trace_id = SecureRandom.uuid
end

#allArray<StandardError>

Returns array with all the errors that occurred.

Returns:

  • (Array<StandardError>)

    array with all the errors that occurred


88
89
90
# File 'lib/karafka/pro/processing/coordinators/errors_tracker.rb', line 88

def all
  @errors
end

#clearObject

Clears all the errors


51
52
53
54
# File 'lib/karafka/pro/processing/coordinators/errors_tracker.rb', line 51

def clear
  @errors.clear
  @counts.clear
end

#each(&block) ⇒ Object

Iterates over errors

Parameters:

  • block (Proc)

    code we want to run on each error


83
84
85
# File 'lib/karafka/pro/processing/coordinators/errors_tracker.rb', line 83

def each(&block)
  @errors.each(&block)
end

#empty?Boolean

Returns is the error tracker empty.

Returns:

  • (Boolean)

    is the error tracker empty


65
66
67
# File 'lib/karafka/pro/processing/coordinators/errors_tracker.rb', line 65

def empty?
  @errors.empty?
end

#lastStandardError?

Returns last error that occurred or nil if no errors.

Returns:

  • (StandardError, nil)

    last error that occurred or nil if no errors


77
78
79
# File 'lib/karafka/pro/processing/coordinators/errors_tracker.rb', line 77

def last
  @errors.last
end

#sizeInteger

Returns number of elements.

Returns:

  • (Integer)

    number of elements


70
71
72
73
74
# File 'lib/karafka/pro/processing/coordinators/errors_tracker.rb', line 70

def size
  # We use counts reference of all errors and not the `@errors` array because it allows
  # us to go beyond the whole errors storage limit
  @counts.values.sum
end