Class: Karafka::Pro::RecurringTasks::Serializer
- Inherits:
-
Object
- Object
- Karafka::Pro::RecurringTasks::Serializer
- Defined in:
- lib/karafka/pro/recurring_tasks/serializer.rb
Overview
Converts schedule command and log details into data we can dispatch to Kafka.
Constant Summary collapse
- SCHEMA_VERSION =
Current recurring tasks related schema structure
'1.0'
Instance Method Summary collapse
-
#command(command_name, task_id) ⇒ String
Serialized and compressed command data.
-
#log(event) ⇒ String
Serialized and compressed event log data.
-
#schedule(schedule) ⇒ String
Serialized and compressed current schedule data with its tasks and their current state.
Instance Method Details
#command(command_name, task_id) ⇒ String
Returns serialized and compressed command data.
46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 |
# File 'lib/karafka/pro/recurring_tasks/serializer.rb', line 46 def command(command_name, task_id) data = { schema_version: SCHEMA_VERSION, schedule_version: ::Karafka::Pro::RecurringTasks.schedule.version, dispatched_at: Time.now.to_f, type: 'command', command: { name: command_name }, task: { id: task_id } } compress( serialize(data) ) end |
#log(event) ⇒ String
Returns serialized and compressed event log data.
67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 |
# File 'lib/karafka/pro/recurring_tasks/serializer.rb', line 67 def log(event) task = event[:task] data = { schema_version: SCHEMA_VERSION, schedule_version: ::Karafka::Pro::RecurringTasks.schedule.version, dispatched_at: Time.now.to_f, type: 'log', task: { id: task.id, time_taken: event.payload[:time] || -1, result: event.payload.key?(:error) ? 'failure' : 'success' } } compress( serialize(data) ) end |
#schedule(schedule) ⇒ String
Returns serialized and compressed current schedule data with its tasks and their current state.
17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 |
# File 'lib/karafka/pro/recurring_tasks/serializer.rb', line 17 def schedule(schedule) tasks = {} schedule.each do |task| tasks[task.id] = { id: task.id, cron: task.cron.original, previous_time: task.previous_time.to_i, next_time: task.next_time.to_i, enabled: task.enabled? } end data = { schema_version: SCHEMA_VERSION, schedule_version: schedule.version, dispatched_at: Time.now.to_f, type: 'schedule', tasks: tasks } compress( serialize(data) ) end |