Class: Karafka::Pro::Processing::JobsBuilder

Inherits:
Karafka::Processing::JobsBuilder show all
Defined in:
lib/karafka/pro/processing/jobs_builder.rb

Overview

Pro jobs builder that supports lrj

Instance Method Summary collapse

Methods inherited from Karafka::Processing::JobsBuilder

#shutdown

Instance Method Details

#consume(executor, messages) ⇒ Karafka::Processing::Jobs::Consume, Karafka::Pro::Processing::Jobs::ConsumeNonBlocking

Parameters:

Returns:



28
29
30
31
32
33
34
# File 'lib/karafka/pro/processing/jobs_builder.rb', line 28

def consume(executor, messages)
  if executor.topic.long_running_job?
    Jobs::ConsumeNonBlocking.new(executor, messages)
  else
    super
  end
end

#eofed(executor) ⇒ Karafka::Processing::Jobs::Eofed, Karafka::Processing::Jobs::EofedBlocking

Parameters:

Returns:

  • (Karafka::Processing::Jobs::Eofed)

    eofed job for non LRJ

  • (Karafka::Processing::Jobs::EofedBlocking)

    eofed job that is non-blocking, so when revocation job is scheduled for LRJ it also will not block



40
41
42
43
44
45
46
# File 'lib/karafka/pro/processing/jobs_builder.rb', line 40

def eofed(executor)
  if executor.topic.long_running_job?
    Jobs::EofedNonBlocking.new(executor)
  else
    super
  end
end

#idle(executor) ⇒ Object

Parameters:



20
21
22
# File 'lib/karafka/pro/processing/jobs_builder.rb', line 20

def idle(executor)
  Karafka::Processing::Jobs::Idle.new(executor)
end

#periodic(executor) ⇒ Jobs::Periodic, Jobs::PeriodicNonBlocking

Parameters:

Returns:



63
64
65
66
67
68
69
# File 'lib/karafka/pro/processing/jobs_builder.rb', line 63

def periodic(executor)
  if executor.topic.long_running_job?
    Jobs::PeriodicNonBlocking.new(executor)
  else
    Jobs::Periodic.new(executor)
  end
end

#revoked(executor) ⇒ Karafka::Processing::Jobs::Revoked, Karafka::Processing::Jobs::RevokedNonBlocking

Parameters:

Returns:

  • (Karafka::Processing::Jobs::Revoked)

    revocation job for non LRJ

  • (Karafka::Processing::Jobs::RevokedNonBlocking)

    revocation job that is non-blocking, so when revocation job is scheduled for LRJ it also will not block



52
53
54
55
56
57
58
# File 'lib/karafka/pro/processing/jobs_builder.rb', line 52

def revoked(executor)
  if executor.topic.long_running_job?
    Jobs::RevokedNonBlocking.new(executor)
  else
    super
  end
end