Class: Karafka::Processing::Coordinator

Inherits:
Object
  • Object
show all
Extended by:
Forwardable
Includes:
Core::Helpers::Time
Defined in:
lib/karafka/processing/coordinator.rb

Overview

Note:

This coordinator needs to be thread safe. Some operations are performed only in the listener thread, but we go with thread-safe by default for all not to worry about potential future mistakes.

Basic coordinator that allows us to provide coordination objects into consumers.

This is a wrapping layer to simplify management of work to be handled around consumption.

Direct Known Subclasses

Karafka::Pro::Processing::Coordinator

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(topic, partition, pause_tracker) ⇒ Coordinator

Returns a new instance of Coordinator.

Parameters:



27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
# File 'lib/karafka/processing/coordinator.rb', line 27

def initialize(topic, partition, pause_tracker)
  @topic = topic
  @partition = partition
  @pause_tracker = pause_tracker
  @revoked = false
  @consumptions = {}
  @running_jobs = Hash.new { |h, k| h[k] = 0 }
  @manual_pause = false
  @manual_seek = false
  @mutex = Mutex.new
  @marked = false
  @failure = false
  @eofed = false
  @changed_at = monotonic_now
end

Instance Attribute Details

#eofedObject

This can be set directly on the listener because it can be triggered on first run without any messages



20
21
22
# File 'lib/karafka/processing/coordinator.rb', line 20

def eofed
  @eofed
end

#partitionObject (readonly)

Returns the value of attribute partition.



16
17
18
# File 'lib/karafka/processing/coordinator.rb', line 16

def partition
  @partition
end

#pause_trackerObject (readonly)

Returns the value of attribute pause_tracker.



16
17
18
# File 'lib/karafka/processing/coordinator.rb', line 16

def pause_tracker
  @pause_tracker
end

#seek_offsetObject

Returns the value of attribute seek_offset.



16
17
18
# File 'lib/karafka/processing/coordinator.rb', line 16

def seek_offset
  @seek_offset
end

#topicObject (readonly)

Returns the value of attribute topic.



16
17
18
# File 'lib/karafka/processing/coordinator.rb', line 16

def topic
  @topic
end

Instance Method Details

#consumption(consumer) ⇒ Karafka::Processing::Result

Returns result object which we can use to indicate consumption processing state.

Parameters:

  • consumer (Object)

    karafka consumer (normal or pro)

Returns:



191
192
193
# File 'lib/karafka/processing/coordinator.rb', line 191

def consumption(consumer)
  @consumptions[consumer] ||= Processing::Result.new
end

#decrement(job_type) ⇒ Object

Decrements number of jobs we handle at the moment

Parameters:

  • job_type (Symbol)

    type of job that we want to decrement



90
91
92
93
94
95
96
97
98
99
100
101
# File 'lib/karafka/processing/coordinator.rb', line 90

def decrement(job_type)
  synchronize do
    @running_jobs[job_type] -= 1
    @changed_at = monotonic_now

    return @running_jobs[job_type] unless @running_jobs[job_type].negative?

    # This should never happen. If it does, something is heavily out of sync. Please reach
    # out to us if you encounter this
    raise Karafka::Errors::InvalidCoordinatorStateError, 'Was zero before decrementation'
  end
end

#eofed?Boolean

Returns did we reach end of partition when polling data.

Returns:

  • (Boolean)

    did we reach end of partition when polling data



155
156
157
# File 'lib/karafka/processing/coordinator.rb', line 155

def eofed?
  @eofed
end

#failure!(consumer, error) ⇒ Object

Mark given consumption on consumer as failed

Parameters:

  • consumer (Karafka::BaseConsumer)

    consumer that failed

  • error (StandardError)

    error that occurred



124
125
126
127
128
129
# File 'lib/karafka/processing/coordinator.rb', line 124

def failure!(consumer, error)
  synchronize do
    @failure = true
    consumption(consumer).failure!(error)
  end
end

#failure?Boolean

Returns true if any of work we were running failed.

Returns:

  • (Boolean)

    true if any of work we were running failed



132
133
134
# File 'lib/karafka/processing/coordinator.rb', line 132

def failure?
  @failure
end

#increment(job_type) ⇒ Object

Increases number of jobs that we handle with this coordinator

Parameters:

  • job_type (Symbol)

    type of job that we want to increment



81
82
83
84
85
86
# File 'lib/karafka/processing/coordinator.rb', line 81

def increment(job_type)
  synchronize do
    @running_jobs[job_type] += 1
    @changed_at = monotonic_now
  end
end

#manual_pauseObject

Store in the coordinator info, that this pause was done manually by the end user and not by the system itself



169
170
171
# File 'lib/karafka/processing/coordinator.rb', line 169

def manual_pause
  @manual_pause = true
end

#manual_pause?Boolean

Returns are we in a pause that was initiated by the user.

Returns:

  • (Boolean)

    are we in a pause that was initiated by the user



174
175
176
# File 'lib/karafka/processing/coordinator.rb', line 174

def manual_pause?
  paused? && @manual_pause
end

#manual_seekObject

Marks seek as manual for coordination purposes



179
180
181
# File 'lib/karafka/processing/coordinator.rb', line 179

def manual_seek
  @manual_seek = true
end

#manual_seek?Boolean

Returns did a user invoke seek in the current operations scope.

Returns:

  • (Boolean)

    did a user invoke seek in the current operations scope



184
185
186
# File 'lib/karafka/processing/coordinator.rb', line 184

def manual_seek?
  @manual_seek
end

#marked?Boolean

Returns was the new seek offset assigned at least once. This is needed because by default we assign seek offset of a first message ever, however this is insufficient for DLQ in a scenario where the first message would be broken. We would never move out of it and would end up in an endless loop.

Returns:

  • (Boolean)

    was the new seek offset assigned at least once. This is needed because by default we assign seek offset of a first message ever, however this is insufficient for DLQ in a scenario where the first message would be broken. We would never move out of it and would end up in an endless loop.



163
164
165
# File 'lib/karafka/processing/coordinator.rb', line 163

def marked?
  @marked
end

#revokeObject

Marks given coordinator for processing group as revoked

This is invoked in two places: - from the main listener loop when we detect revoked partitions - from the consumer in case checkpointing fails

This means, we can end up having consumer being aware that it was revoked prior to the listener loop dispatching the revocation job. It is ok, as effectively nothing will be processed until revocation jobs are done.



145
146
147
# File 'lib/karafka/processing/coordinator.rb', line 145

def revoke
  synchronize { @revoked = true }
end

#revoked?Boolean

Returns is the partition we are processing revoked or not.

Returns:

  • (Boolean)

    is the partition we are processing revoked or not



150
151
152
# File 'lib/karafka/processing/coordinator.rb', line 150

def revoked?
  @revoked
end

#start(messages) ⇒ Object

Starts the coordinator for given consumption jobs

Parameters:

  • messages (Array<Karafka::Messages::Message>)

    batch of message for which we are going to coordinate work. Not used with regular coordinator.



46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
# File 'lib/karafka/processing/coordinator.rb', line 46

def start(messages)
  @failure = false
  @running_jobs[:consume] = 0
  # We need to clear the consumption results hash here, otherwise we could end up storing
  # consumption results of consumer instances we no longer control
  @consumptions.clear

  # When starting to run, no pause is expected and no manual pause as well
  @manual_pause = false

  # No user invoked seeks on a new run
  @manual_seek = false

  # We set it on the first encounter and never again, because then the offset setting
  # should be up to the consumers logic (our or the end user)
  # Seek offset needs to be always initialized as for case where manual offset management
  # is turned on, we need to have reference to the first offset even in case of running
  # multiple batches without marking any messages as consumed. Rollback needs to happen to
  # the last place we know of or the last message + 1 that was marked
  #
  # It is however worth keeping in mind, that this may need to be used with `#marked?` to
  # make sure that the first offset is an offset that has been marked.
  @seek_offset ||= messages.first.offset
end

#success!(consumer) ⇒ Object

Mark given consumption on consumer as successful

Parameters:



115
116
117
118
119
# File 'lib/karafka/processing/coordinator.rb', line 115

def success!(consumer)
  synchronize do
    consumption(consumer).success!
  end
end

#success?Boolean

Note:

This is only used for consume synchronization

Is all the consumption done and finished successfully for this coordinator We do not say we’re successful until all work is done, because running work may still crash.

Returns:

  • (Boolean)


107
108
109
110
111
# File 'lib/karafka/processing/coordinator.rb', line 107

def success?
  synchronize do
    @running_jobs[:consume].zero? && @consumptions.values.all?(&:success?)
  end
end

#synchronize(&block) ⇒ Object

Note:

We check if mutex is not owned already by the current thread so we won’t end up with a deadlock in case user runs coordinated code from inside of his own lock

Note:

This is internal and should not be used to synchronize user-facing code. Otherwise user indirectly could cause deadlocks or prolonged locks by running his logic. This can and should however be used for multi-thread strategy applications and other internal operations locks.

Allows to run synchronized (locked) code that can operate only from a given thread

Parameters:

  • block (Proc)

    code we want to run in the synchronized mode



206
207
208
209
210
211
212
# File 'lib/karafka/processing/coordinator.rb', line 206

def synchronize(&block)
  if @mutex.owned?
    yield
  else
    @mutex.synchronize(&block)
  end
end