Class: Karafka::Processing::Coordinator

Inherits:
Object
  • Object
show all
Extended by:
Forwardable
Includes:
Core::Helpers::Time
Defined in:
lib/karafka/processing/coordinator.rb

Overview

Note:

This coordinator needs to be thread safe. Some operations are performed only in the listener thread, but we go with thread-safe by default for all not to worry about potential future mistakes.

Basic coordinator that allows us to provide coordination objects into consumers.

This is a wrapping layer to simplify management of work to be handled around consumption.

Direct Known Subclasses

Karafka::Pro::Processing::Coordinator

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(topic, partition, pause_tracker) ⇒ Coordinator

Returns a new instance of Coordinator.

Parameters:



30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
# File 'lib/karafka/processing/coordinator.rb', line 30

def initialize(topic, partition, pause_tracker)
  @topic = topic
  @partition = partition
  @pause_tracker = pause_tracker
  @revoked = false
  @consumptions = {}
  @running_jobs = Hash.new { |h, k| h[k] = 0 }
  @manual_pause = false
  @manual_seek = false
  @mutex = Mutex.new
  @marked = false
  @failure = false
  @eofed = false
  @changed_at = monotonic_now
  @last_polled_at = @changed_at
end

Instance Attribute Details

#eofedObject

This can be set directly on the listener because it can be triggered on first run without any messages



20
21
22
# File 'lib/karafka/processing/coordinator.rb', line 20

def eofed
  @eofed
end

#last_polled_atObject

Last polled at time set based on the incoming last poll time



23
24
25
# File 'lib/karafka/processing/coordinator.rb', line 23

def last_polled_at
  @last_polled_at
end

#partitionObject (readonly)

Returns the value of attribute partition.



16
17
18
# File 'lib/karafka/processing/coordinator.rb', line 16

def partition
  @partition
end

#pause_trackerObject (readonly)

Returns the value of attribute pause_tracker.



16
17
18
# File 'lib/karafka/processing/coordinator.rb', line 16

def pause_tracker
  @pause_tracker
end

#seek_offsetObject

Returns the value of attribute seek_offset.



16
17
18
# File 'lib/karafka/processing/coordinator.rb', line 16

def seek_offset
  @seek_offset
end

#topicObject (readonly)

Returns the value of attribute topic.



16
17
18
# File 'lib/karafka/processing/coordinator.rb', line 16

def topic
  @topic
end

Instance Method Details

#consumption(consumer) ⇒ Karafka::Processing::Result

Returns result object which we can use to indicate consumption processing state.

Parameters:

  • consumer (Object)

    karafka consumer (normal or pro)

Returns:



195
196
197
# File 'lib/karafka/processing/coordinator.rb', line 195

def consumption(consumer)
  @consumptions[consumer] ||= Processing::Result.new
end

#decrement(job_type) ⇒ Object

Decrements number of jobs we handle at the moment

Parameters:

  • job_type (Symbol)

    type of job that we want to decrement



94
95
96
97
98
99
100
101
102
103
104
105
# File 'lib/karafka/processing/coordinator.rb', line 94

def decrement(job_type)
  synchronize do
    @running_jobs[job_type] -= 1
    @changed_at = monotonic_now

    return @running_jobs[job_type] unless @running_jobs[job_type].negative?

    # This should never happen. If it does, something is heavily out of sync. Please reach
    # out to us if you encounter this
    raise Karafka::Errors::InvalidCoordinatorStateError, 'Was zero before decrementation'
  end
end

#eofed?Boolean

Returns did we reach end of partition when polling data.

Returns:

  • (Boolean)

    did we reach end of partition when polling data



159
160
161
# File 'lib/karafka/processing/coordinator.rb', line 159

def eofed?
  @eofed
end

#failure!(consumer, error) ⇒ Object

Mark given consumption on consumer as failed

Parameters:

  • consumer (Karafka::BaseConsumer)

    consumer that failed

  • error (StandardError)

    error that occurred



128
129
130
131
132
133
# File 'lib/karafka/processing/coordinator.rb', line 128

def failure!(consumer, error)
  synchronize do
    @failure = true
    consumption(consumer).failure!(error)
  end
end

#failure?Boolean

Returns true if any of work we were running failed.

Returns:

  • (Boolean)

    true if any of work we were running failed



136
137
138
# File 'lib/karafka/processing/coordinator.rb', line 136

def failure?
  @failure
end

#increment(job_type) ⇒ Object

Increases number of jobs that we handle with this coordinator

Parameters:

  • job_type (Symbol)

    type of job that we want to increment



85
86
87
88
89
90
# File 'lib/karafka/processing/coordinator.rb', line 85

def increment(job_type)
  synchronize do
    @running_jobs[job_type] += 1
    @changed_at = monotonic_now
  end
end

#manual_pauseObject

Store in the coordinator info, that this pause was done manually by the end user and not by the system itself



173
174
175
# File 'lib/karafka/processing/coordinator.rb', line 173

def manual_pause
  @manual_pause = true
end

#manual_pause?Boolean

Returns are we in a pause that was initiated by the user.

Returns:

  • (Boolean)

    are we in a pause that was initiated by the user



178
179
180
# File 'lib/karafka/processing/coordinator.rb', line 178

def manual_pause?
  paused? && @manual_pause
end

#manual_seekObject

Marks seek as manual for coordination purposes



183
184
185
# File 'lib/karafka/processing/coordinator.rb', line 183

def manual_seek
  @manual_seek = true
end

#manual_seek?Boolean

Returns did a user invoke seek in the current operations scope.

Returns:

  • (Boolean)

    did a user invoke seek in the current operations scope



188
189
190
# File 'lib/karafka/processing/coordinator.rb', line 188

def manual_seek?
  @manual_seek
end

#marked?Boolean

Returns was the new seek offset assigned at least once. This is needed because by default we assign seek offset of a first message ever, however this is insufficient for DLQ in a scenario where the first message would be broken. We would never move out of it and would end up in an endless loop.

Returns:

  • (Boolean)

    was the new seek offset assigned at least once. This is needed because by default we assign seek offset of a first message ever, however this is insufficient for DLQ in a scenario where the first message would be broken. We would never move out of it and would end up in an endless loop.



167
168
169
# File 'lib/karafka/processing/coordinator.rb', line 167

def marked?
  @marked
end

#revokeObject

Marks given coordinator for processing group as revoked

This is invoked in two places: - from the main listener loop when we detect revoked partitions - from the consumer in case checkpointing fails

This means, we can end up having consumer being aware that it was revoked prior to the listener loop dispatching the revocation job. It is ok, as effectively nothing will be processed until revocation jobs are done.



149
150
151
# File 'lib/karafka/processing/coordinator.rb', line 149

def revoke
  synchronize { @revoked = true }
end

#revoked?Boolean

Returns is the partition we are processing revoked or not.

Returns:

  • (Boolean)

    is the partition we are processing revoked or not



154
155
156
# File 'lib/karafka/processing/coordinator.rb', line 154

def revoked?
  @revoked
end

#start(messages) ⇒ Object

Starts the coordinator for given consumption jobs

Parameters:

  • messages (Array<Karafka::Messages::Message>)

    batch of message for which we are going to coordinate work. Not used with regular coordinator.



50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
# File 'lib/karafka/processing/coordinator.rb', line 50

def start(messages)
  @failure = false
  @running_jobs[:consume] = 0
  # We need to clear the consumption results hash here, otherwise we could end up storing
  # consumption results of consumer instances we no longer control
  @consumptions.clear

  # When starting to run, no pause is expected and no manual pause as well
  @manual_pause = false

  # No user invoked seeks on a new run
  @manual_seek = false

  # We set it on the first encounter and never again, because then the offset setting
  # should be up to the consumers logic (our or the end user)
  # Seek offset needs to be always initialized as for case where manual offset management
  # is turned on, we need to have reference to the first offset even in case of running
  # multiple batches without marking any messages as consumed. Rollback needs to happen to
  # the last place we know of or the last message + 1 that was marked
  #
  # It is however worth keeping in mind, that this may need to be used with `#marked?` to
  # make sure that the first offset is an offset that has been marked.
  @seek_offset ||= messages.first.offset
end

#success!(consumer) ⇒ Object

Mark given consumption on consumer as successful

Parameters:



119
120
121
122
123
# File 'lib/karafka/processing/coordinator.rb', line 119

def success!(consumer)
  synchronize do
    consumption(consumer).success!
  end
end

#success?Boolean

Note:

This is only used for consume synchronization

Is all the consumption done and finished successfully for this coordinator We do not say we’re successful until all work is done, because running work may still crash.

Returns:

  • (Boolean)


111
112
113
114
115
# File 'lib/karafka/processing/coordinator.rb', line 111

def success?
  synchronize do
    @running_jobs[:consume].zero? && @consumptions.values.all?(&:success?)
  end
end

#synchronize(&block) ⇒ Object

Note:

We check if mutex is not owned already by the current thread so we won’t end up with a deadlock in case user runs coordinated code from inside of his own lock

Note:

This is internal and should not be used to synchronize user-facing code. Otherwise user indirectly could cause deadlocks or prolonged locks by running his logic. This can and should however be used for multi-thread strategy applications and other internal operations locks.

Allows to run synchronized (locked) code that can operate only from a given thread

Parameters:

  • block (Proc)

    code we want to run in the synchronized mode



210
211
212
213
214
215
216
# File 'lib/karafka/processing/coordinator.rb', line 210

def synchronize(&block)
  if @mutex.owned?
    yield
  else
    @mutex.synchronize(&block)
  end
end