class Fluent::Plugin::ExecInput
Public Class Methods
new()
click to toggle source
Calls superclass method
Fluent::PluginLoggerMixin.new
# File lib/fluent/plugin/in_exec.rb, line 31 def initialize super require 'fluent/plugin/exec_util' end
Public Instance Methods
configure(conf)
click to toggle source
Calls superclass method
Fluent::PluginLoggerMixin#configure
# File lib/fluent/plugin/in_exec.rb, line 55 def configure(conf) super if conf['localtime'] @localtime = true elsif conf['utc'] @localtime = false end if conf['timezone'] @timezone = conf['timezone'] Fluent::Timezone.validate!(@timezone) end if !@tag && !@tag_key raise Fleunt::ConfigError, "'tag' or 'tag_key' option is required on exec input" end if @time_key if @time_format f = @time_format @time_parse_proc = begin strptime = Strptime.new(f) Proc.new { |str| Fluent::EventTime.from_time(strptime.exec(str)) } rescue Proc.new {|str| Fluent::EventTime.from_time(Time.strptime(str, f)) } end else @time_parse_proc = Proc.new {|str| Fluent::EventTime.from_time(Time.at(str.to_f)) } end end @parser = setup_parser(conf) end
run(io)
click to toggle source
# File lib/fluent/plugin/in_exec.rb, line 121 def run(io) @parser.call(io) end
setup_parser(conf)
click to toggle source
# File lib/fluent/plugin/in_exec.rb, line 91 def setup_parser(conf) case @format when 'tsv' if @keys.empty? raise Fluent::ConfigError, "keys option is required on exec input for tsv format" end Fluent::ExecUtil::TSVParser.new(@keys, method(:on_message)) when 'json' Fluent::ExecUtil::JSONParser.new(method(:on_message)) when 'msgpack' Fluent::ExecUtil::MessagePackParser.new(method(:on_message)) else Fluent::ExecUtil::TextParserWrapperParser.new(conf, method(:on_message)) end end
start()
click to toggle source
Calls superclass method
Fluent::PluginLoggerMixin#start
# File lib/fluent/plugin/in_exec.rb, line 107 def start super if @run_interval child_process_execute(:exec_input, @command, interval: @run_interval, mode: [:read]) do |io| run(io) end else child_process_execute(:exec_input, @command, immediate: true, mode: [:read]) do |io| run(io) end end end
Private Instance Methods
on_message(record, parsed_time = nil)
click to toggle source
# File lib/fluent/plugin/in_exec.rb, line 127 def on_message(record, parsed_time = nil) if val = record.delete(@tag_key) tag = val else tag = @tag end if parsed_time time = parsed_time else if val = record.delete(@time_key) time = @time_parse_proc.call(val) else time = Fluent::EventTime.now end end router.emit(tag, time, record) rescue => e log.error "exec failed to emit", error: e, tag: tag, record: Yajl.dump(record) end