class Fluent::Plugin::ExecOutput

Public Instance Methods

compat_parameters_default_chunk_key() click to toggle source
# File lib/fluent/plugin/out_exec.rb, line 51
def compat_parameters_default_chunk_key
  'time'
end
configure(conf) click to toggle source
Calls superclass method Fluent::Compat::Output#configure
# File lib/fluent/plugin/out_exec.rb, line 55
def configure(conf)
  compat_parameters_convert(conf, :buffer, default_chunk_key: 'time')

  super

  @formatter = case @format
               when :tsv
                 if @keys.empty?
                   raise Fluent::ConfigError, "keys option is required on exec output for tsv format"
                 end
                 Fluent::ExecUtil::TSVFormatter.new(@keys)
               when :json
                 Fluent::ExecUtil::JSONFormatter.new
               when :msgpack
                 Fluent::ExecUtil::MessagePackFormatter.new
               end

  if @time_key
    if @time_format
      tf = Fluent::TimeFormatter.new(@time_format, @localtime, @timezone)
      @time_format_proc = tf.method(:format)
    else
      @time_format_proc = Proc.new { |time| time.to_s }
    end
  end
end
format(tag, time, record) click to toggle source
# File lib/fluent/plugin/out_exec.rb, line 82
def format(tag, time, record)
  out = ''
  if @time_key
    record[@time_key] = @time_format_proc.call(time)
  end
  if @tag_key
    record[@tag_key] = tag
  end
  @formatter.call(record, out)
  out
end
write(chunk) click to toggle source
# File lib/fluent/plugin/out_exec.rb, line 94
def write(chunk)
  if chunk.respond_to?(:path)
    prog = "#{@command} #{chunk.path}"
  else
    tmpfile = Tempfile.new("fluent-plugin-exec-")
    tmpfile.binmode
    chunk.write_to(tmpfile)
    tmpfile.close
    prog = "#{@command} #{tmpfile.path}"
  end

  system(prog)
  ecode = $?.to_i
  tmpfile.delete if tmpfile

  if ecode != 0
    raise "command returns #{ecode}: #{prog}"
  end
end