class Fluent::Plugin::MonitorAgentInput

Constants

IGNORE_ATTRIBUTES
MONITOR_INFO

Public Instance Methods

all_plugins() click to toggle source
# File lib/fluent/plugin/in_monitor_agent.rb, line 258
def all_plugins
  array = []

  # get all input plugins
  array.concat Fluent::Engine.root_agent.inputs

  # get all output plugins
  array.concat Fluent::Engine.root_agent.outputs

  # get all filter plugins
  array.concat Fluent::Engine.root_agent.filters

  Fluent::Engine.root_agent.labels.each { |name, l|
    # TODO: Add label name to outputs / filters for identifing plugins
    array.concat l.outputs
    array.concat l.filters
  }

  array
end
fluentd_opts() click to toggle source
# File lib/fluent/plugin/in_monitor_agent.rb, line 374
def fluentd_opts
  @fluentd_opts ||= get_fluentd_opts
end
get_fluentd_opts() click to toggle source
# File lib/fluent/plugin/in_monitor_agent.rb, line 378
def get_fluentd_opts
  opts = {}
  ObjectSpace.each_object(Fluent::Supervisor) { |obj|
    opts.merge!(obj.options)
    break
  }
  opts
end
get_monitor_info(pe, opts={}) click to toggle source

get monitor info from the plugin `pe` and return a hash object

# File lib/fluent/plugin/in_monitor_agent.rb, line 325
def get_monitor_info(pe, opts={})
  obj = {}

  # Common plugin information
  obj['plugin_id'] = pe.plugin_id
  obj['plugin_category'] = plugin_category(pe)
  obj['type'] = pe.config['@type']
  obj['config'] = pe.config if !opts.has_key?(:with_config) || opts[:with_config]

  # run MONITOR_INFO in plugins' instance context and store the info to obj
  MONITOR_INFO.each_pair {|key,code|
    begin
      catch(:skip) do
        obj[key] = pe.instance_exec(&code)
      end
    rescue => e
      log.warn "unexpected error in monitoring plugins", key: key, plugin: pe.class, error: e
    end
  }

  # include all instance variables if :with_debug_info is set
  if opts[:with_debug_info]
    iv = {}
    pe.instance_eval do
      instance_variables.each {|sym|
        next if IGNORE_ATTRIBUTES.include?(sym)
        key = sym.to_s[1..-1]  # removes first '@'
        iv[key] = instance_variable_get(sym)
      }
    end
    obj['instance_variables'] = iv
  end

  obj
end
plugin_category(pe) click to toggle source
# File lib/fluent/plugin/in_monitor_agent.rb, line 361
def plugin_category(pe)
  case pe
  when Fluent::Plugin::Input
    'input'.freeze
  when Fluent::Plugin::Output, Fluent::Plugin::MultiOutput, Fluent::Plugin::BareOutput
    'output'.freeze
  when Fluent::Plugin::Filter
    'filter'.freeze
  else
    'unknown'.freeze
  end
end
plugin_info_by_id(plugin_id, opts={}) click to toggle source

search a plugin by plugin_id

# File lib/fluent/plugin/in_monitor_agent.rb, line 294
def plugin_info_by_id(plugin_id, opts={})
  found = all_plugins.find {|pe|
    pe.respond_to?(:plugin_id) && pe.plugin_id.to_s == plugin_id
  }
  if found
    get_monitor_info(found, opts)
  else
    nil
  end
end
plugin_info_by_tag(tag, opts={}) click to toggle source

try to match the tag and get the info from the matched output plugin TODO: Support output in label

# File lib/fluent/plugin/in_monitor_agent.rb, line 281
def plugin_info_by_tag(tag, opts={})
  matches = Fluent::Engine.root_agent.event_router.instance_variable_get(:@match_rules)
  matches.each { |rule|
    if rule.match?(tag)
      if rule.collector.is_a?(Fluent::Plugin::Output) || rule.collector.is_a?(Fluent::Output)
        return get_monitor_info(rule.collector, opts)
      end
    end
  }
  nil
end
plugins_info_all(opts={}) click to toggle source
# File lib/fluent/plugin/in_monitor_agent.rb, line 316
def plugins_info_all(opts={})
  all_plugins.map {|pe|
    get_monitor_info(pe, opts)
  }
end
plugins_info_by_type(type, opts={}) click to toggle source

This method returns an array because multiple plugins could have the same type

# File lib/fluent/plugin/in_monitor_agent.rb, line 307
def plugins_info_by_type(type, opts={})
  array = all_plugins.select {|pe|
    (pe.config['@type'] == type) rescue nil
  }
  array.map {|pe|
    get_monitor_info(pe, opts)
  }
end
shutdown() click to toggle source
Calls superclass method Fluent::Compat::Input#shutdown
# File lib/fluent/plugin/in_monitor_agent.rb, line 242
def shutdown
  if @srv
    @srv.shutdown
    @srv = nil
  end

  super
end
start() click to toggle source
Calls superclass method Fluent::Compat::Input#start
# File lib/fluent/plugin/in_monitor_agent.rb, line 210
def start
  super

  log.debug "listening monitoring http server on http://#{@bind}:#{@port}/api/plugins"
  @srv = WEBrick::HTTPServer.new({
      BindAddress: @bind,
      Port: @port,
      Logger: WEBrick::Log.new(STDERR, WEBrick::Log::FATAL),
      AccessLog: [],
    })
  @srv.mount('/api/plugins', LTSVMonitorServlet, self)
  @srv.mount('/api/plugins.json', JSONMonitorServlet, self)
  @srv.mount('/api/config', LTSVConfigMonitorServlet, self)
  @srv.mount('/api/config.json', JSONConfigMonitorServlet, self)
  thread_create :in_monitor_agent_servlet do
    @srv.start
  end
  if @tag
    log.debug "tag parameter is specified. Emit plugins info to '#{@tag}'"

    opts = {with_config: false}
    timer_execute(:in_monitor_agent_emit, @emit_interval, repeat: true) {
      es = Fluent::MultiEventStream.new
      now = Fluent::Engine.now
      plugins_info_all(opts).each { |record|
        es.add(now, record)
      }
      router.emit_stream(@tag, es)
    }
  end
end