module Fluent::PluginHelper
Constants
- ProcessInfo
Public Instance Methods
child_process_execute_once( title, command, arguments, subprocess_name, mode, stderr, env, unsetenv, chdir, internal_encoding, external_encoding, scrub, replace_string, &block )
click to toggle source
# File lib/fluent/plugin_helper/child_process.rb, line 195 def child_process_execute_once( title, command, arguments, subprocess_name, mode, stderr, env, unsetenv, chdir, internal_encoding, external_encoding, scrub, replace_string, &block ) spawn_args = if arguments || subprocess_name [ env, (subprocess_name ? [command, subprocess_name] : command), *(arguments || []) ] else [ env, command ] end spawn_opts = { unsetenv_others: unsetenv, } if chdir spawn_opts[:chdir] = chdir end encoding_options = {} if scrub encoding_options[:invalid] = encoding_options[:undef] = :replace if replace_string encoding_options[:replace] = replace_string end end log.debug "Executing command", title: title, spawn: spawn_args, mode: mode, stderr: stderr readio = writeio = stderrio = wait_thread = nil readio_in_use = writeio_in_use = stderrio_in_use = false if !mode.include?(:stderr) && !mode.include?(:read_with_stderr) && stderr != :discard # connect writeio, readio, wait_thread = *Open3.popen2(*spawn_args, spawn_opts) elsif mode.include?(:read_with_stderr) writeio, readio, wait_thread = *Open3.popen2e(*spawn_args, spawn_opts) else writeio, readio, stderrio, wait_thread = *Open3.popen3(*spawn_args, spawn_opts) if !mode.include?(:stderr) # stderr == :discard stderrio.reopen(IO::NULL) end end if mode.include?(:write) writeio.set_encoding(external_encoding, internal_encoding, encoding_options) writeio_in_use = true end if mode.include?(:read) || mode.include?(:read_with_stderr) readio.set_encoding(external_encoding, internal_encoding, encoding_options) readio_in_use = true end if mode.include?(:stderr) stderrio.set_encoding(external_encoding, internal_encoding, encoding_options) stderrio_in_use = true end pid = wait_thread.pid # wait_thread => Process::Waiter io_objects = [] mode.each do |m| io_objects << case m when :read then readio when :write then writeio when :read_with_stderr then readio when :stderr then stderrio else raise "BUG: invalid mode must be checked before here: '#{m}'" end end m = Mutex.new m.lock thread = thread_create :child_process_callback do m.lock # run after plugin thread get pid, thread instance and i/o m.unlock begin block.call(*io_objects) rescue EOFError => e log.debug "Process exit and I/O closed", title: title, pid: pid, command: command, arguments: arguments rescue IOError => e if e.message == 'stream closed' log.debug "Process I/O stream closed", title: title, pid: pid, command: command, arguments: arguments else log.error "Unexpected I/O error for child process", title: title, pid: pid, command: command, arguments: arguments, error: e end rescue => e log.warn "Unexpected error while processing I/O for child process", title: title, pid: pid, command: command, error: e end process_info = @_child_process_mutex.synchronize do process_info = @_child_process_processes[pid] @_child_process_processes.delete(pid) process_info end child_process_kill(process_info, force: true) if process_info && process_info.alive && ::Thread.current[:_fluentd_plugin_helper_child_process_running] end thread[:_fluentd_plugin_helper_child_process_running] = true thread[:_fluentd_plugin_helper_child_process_pid] = pid pinfo = ProcessInfo.new(title, thread, pid, readio, readio_in_use, writeio, writeio_in_use, stderrio, stderrio_in_use, wait_thread, true, nil) @_child_process_mutex.synchronize do @_child_process_processes[pid] = pinfo end m.unlock pid end
child_process_kill(process_info, force: false)
click to toggle source
# File lib/fluent/plugin_helper/child_process.rb, line 160 def child_process_kill(process_info, force: false) if !process_info || !process_info.alive return end process_info.killed_at = Time.now unless force begin pid, status = Process.waitpid2(process_info.pid, Process::WNOHANG) if pid && status process_info.thread[:_fluentd_plugin_helper_child_process_exit_status] = status process_info.alive = false end rescue Errno::ECHILD, Errno::ESRCH, Errno::EPERM process_info.alive = false rescue # ignore end if !process_info.alive return end begin signal = (Fluent.windows? || force) ? :KILL : :TERM Process.kill(signal, process_info.pid) if force process_info.alive = false end rescue Errno::ECHILD, Errno::ESRCH process_info.alive = false end end
helpers(*snake_case_symbols)
click to toggle source
# File lib/fluent/plugin_helper.rb, line 38 def helpers(*snake_case_symbols) helper_modules = snake_case_symbols.map{|name| Fluent::PluginHelper.const_get(name.to_s.split('_').map(&:capitalize).join) } include(*helper_modules) end