Class: Karafka::Web::Pro::Commanding::Listener

Inherits:
Object
  • Object
show all
Defined in:
lib/karafka/web/pro/commanding/listener.rb

Overview

Wrapper around the Pro Iterator that yields messages with commands when needed for further processing.

This iterator supports error handling, basically on errors it will be reported and ignored as long as they are not critical. Critical errors will cause back-off and reconnection.

Instance Method Summary collapse

Instance Method Details

#each {|Karafka::Messages::Message| ... } ⇒ Object

Runs iterator and keeps it running until not needed.

Yields:

  • (Karafka::Messages::Message)

    command message



32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
# File 'lib/karafka/web/pro/commanding/listener.rb', line 32

def each
  c_config = ::Karafka::Web.config.commanding
  t_config = Karafka::Web.config.topics

  iterator = Karafka::Pro::Iterator.new(
    { t_config.consumers.commands => true },
    settings: c_config.kafka,
    yield_nil: true,
    max_wait_time: c_config.max_wait_time
  )

  iterator.each do |message|
    iterator.stop if @stop
    next if @stop
    next unless message

    yield(message)
  rescue StandardError => e
    report_error(e)

    sleep(c_config.pause_timeout / 1_000)

    next
  end
rescue StandardError => e
  report_error(e)

  return if done?

  sleep(c_config.pause_timeout / 1_000)

  retry
end

#stopObject

Triggers stop of the listener. Does not stop the listener but requests it to stop.



67
68
69
# File 'lib/karafka/web/pro/commanding/listener.rb', line 67

def stop
  @stop = true
end