class Fluent::Plugin::Output
Constants
- CHUNKING_FIELD_WARN_NUM
- CHUNK_KEY_PATTERN
- CHUNK_KEY_PLACEHOLDER_PATTERN
- CHUNK_TAG_PLACEHOLDER_PATTERN
- DequeuedChunkInfo
- FORMAT_COMPRESSED_MSGPACK_STREAM
- FORMAT_COMPRESSED_MSGPACK_STREAM_TIME_INT
- FORMAT_MSGPACK_STREAM
- FORMAT_MSGPACK_STREAM_TIME_INT
- FlushThreadState
Internal states
- TIMESTAMP_CHECK_BASE_TIME
- TIME_KEY_PLACEHOLDER_THRESHOLDS
Attributes
as_secondary[R]
buffer[R]
for tests
chunk_key_tag[R]
for tests
chunk_key_time[R]
for tests
chunk_keys[R]
for tests
delayed_commit[R]
delayed_commit_timeout[R]
emit_count[R]
emit_records[R]
in_tests[RW]
num_errors[R]
output_enqueue_thread_waiting[RW]
retry[R]
for tests
rollback_count[R]
secondary[R]
for tests
timekey_zone[R]
write_count[R]
Public Class Methods
new()
click to toggle source
#output_enqueue_thread_waiting: for test of output.rb itself #in_tests: for tests of plugins with test drivers
Calls superclass method
Fluent::PluginLoggerMixin.new
# File lib/fluent/plugin/output.rb, line 158 def initialize super @counters_monitor = Monitor.new @buffering = false @delayed_commit = false @as_secondary = false @in_tests = false @primary_instance = nil # TODO: well organized counters @num_errors = 0 @emit_count = 0 @emit_records = 0 @write_count = 0 @rollback_count = 0 # How to process events is decided here at once, but it will be decided in delayed way on #configure & #start if implement?(:synchronous) if implement?(:buffered) || implement?(:delayed_commit) @buffering = nil # do #configure or #start to determine this for full-featured plugins else @buffering = false end else @buffering = true end @custom_format = implement?(:custom_format) @enable_msgpack_streamer = false # decided later @buffer = nil @secondary = nil @retry = nil @dequeued_chunks = nil @output_enqueue_thread = nil @output_flush_threads = nil @simple_chunking = nil @chunk_keys = @chunk_key_time = @chunk_key_tag = nil @flush_mode = nil @timekey_zone = nil end
Public Instance Methods
acts_as_secondary(primary)
click to toggle source
# File lib/fluent/plugin/output.rb, line 200 def acts_as_secondary(primary) @as_secondary = true @primary_instance = primary @chunk_keys = @primary_instance.chunk_keys || [] @chunk_key_tag = @primary_instance.chunk_key_tag || false if @primary_instance.chunk_key_time @chunk_key_time = @primary_instance.chunk_key_time @timekey_zone = @primary_instance.timekey_zone @output_time_formatter_cache = {} end (class << self; self; end).module_eval do define_method(:commit_write){ |chunk_id| @primary_instance.commit_write(chunk_id, delayed: delayed_commit, secondary: true) } define_method(:rollback_write){ |chunk_id| @primary_instance.rollback_write(chunk_id) } end end
after_shutdown()
click to toggle source
Calls superclass method
Fluent::Plugin::Base#after_shutdown
# File lib/fluent/plugin/output.rb, line 449 def after_shutdown try_rollback_all if @buffering && !@as_secondary # rollback regardless with @delayed_commit, because secondary may do it @secondary.after_shutdown if @secondary if @buffering && @buffer @buffer.after_shutdown @output_flush_threads_running = false if @output_flush_threads && !@output_flush_threads.empty? @output_flush_threads.each do |state| state.thread.run if state.thread.alive? # to wakeup thread and make it to stop by itself end @output_flush_threads.each do |state| state.thread.join end end end super end
after_start()
click to toggle source
Calls superclass method
Fluent::Plugin::Base#after_start
# File lib/fluent/plugin/output.rb, line 411 def after_start super @secondary.after_start if @secondary end
before_shutdown()
click to toggle source
Calls superclass method
Fluent::Plugin::Base#before_shutdown
# File lib/fluent/plugin/output.rb, line 423 def before_shutdown @secondary.before_shutdown if @secondary if @buffering && @buffer if @flush_at_shutdown force_flush end @buffer.before_shutdown # Need to ensure to stop enqueueing ... after #shutdown, we cannot write any data @output_enqueue_thread_running = false if @output_enqueue_thread && @output_enqueue_thread.alive? @output_enqueue_thread.wakeup @output_enqueue_thread.join end end super end
close()
click to toggle source
Calls superclass method
Fluent::Plugin::Base#close
# File lib/fluent/plugin/output.rb, line 470 def close @buffer.close if @buffering && @buffer @secondary.close if @secondary super end
commit_write(chunk_id, delayed: @delayed_commit, secondary: false)
click to toggle source
# File lib/fluent/plugin/output.rb, line 890 def commit_write(chunk_id, delayed: @delayed_commit, secondary: false) log.trace "committing write operation to a chunk", chunk: dump_unique_id_hex(chunk_id), delayed: delayed if delayed @dequeued_chunks_mutex.synchronize do @dequeued_chunks.delete_if{ |info| info.chunk_id == chunk_id } end end @buffer.purge_chunk(chunk_id) @retry_mutex.synchronize do if @retry # success to flush chunks in retries if secondary log.warn "retry succeeded by secondary.", plugin_id: plugin_id, chunk_id: dump_unique_id_hex(chunk_id) else log.warn "retry succeeded.", plugin_id: plugin_id, chunk_id: dump_unique_id_hex(chunk_id) end @retry = nil end end end
configure(conf)
click to toggle source
Calls superclass method
Fluent::PluginLoggerMixin#configure
# File lib/fluent/plugin/output.rb, line 217 def configure(conf) unless implement?(:synchronous) || implement?(:buffered) || implement?(:delayed_commit) raise "BUG: output plugin must implement some methods. see developer documents." end has_buffer_section = (conf.elements(name: 'buffer').size > 0) super if has_buffer_section unless implement?(:buffered) || implement?(:delayed_commit) raise Fluent::ConfigError, "<buffer> section is configured, but plugin '#{self.class}' doesn't support buffering" end @buffering = true else # no buffer sections if implement?(:synchronous) if !implement?(:buffered) && !implement?(:delayed_commit) if @as_secondary raise Fluent::ConfigError, "secondary plugin '#{self.class}' must support buffering, but doesn't." end @buffering = false else if @as_secondary # secondary plugin always works as buffered plugin without buffer instance @buffering = true else # @buffering.nil? shows that enabling buffering or not will be decided in lazy way in #start @buffering = nil end end else # buffered or delayed_commit is supported by `unless` of first line in this method @buffering = true end end if @as_secondary if !@buffering && !@buffering.nil? raise Fluent::ConfigError, "secondary plugin '#{self.class}' must support buffering, but doesn't" end end if (@buffering || @buffering.nil?) && !@as_secondary # When @buffering.nil?, @buffer_config was initialized with default value for all parameters. # If so, this configuration MUST success. @chunk_keys = @buffer_config.chunk_keys.dup @chunk_key_time = !!@chunk_keys.delete('time') @chunk_key_tag = !!@chunk_keys.delete('tag') if @chunk_keys.any?{ |key| key !~ CHUNK_KEY_PATTERN } raise Fluent::ConfigError, "chunk_keys specification includes invalid char" end if @chunk_key_time raise Fluent::ConfigError, "<buffer ...> argument includes 'time', but timekey is not configured" unless @buffer_config.timekey Fluent::Timezone.validate!(@buffer_config.timekey_zone) @timekey_zone = @buffer_config.timekey_use_utc ? '+0000' : @buffer_config.timekey_zone @output_time_formatter_cache = {} end if (@chunk_key_tag ? 1 : 0) + @chunk_keys.size >= CHUNKING_FIELD_WARN_NUM log.warn "many chunk keys specified, and it may cause too many chunks on your system." end # no chunk keys or only tags (chunking can be done without iterating event stream) @simple_chunking = !@chunk_key_time && @chunk_keys.empty? @flush_mode = @buffer_config.flush_mode if @flush_mode == :default @flush_mode = (@chunk_key_time ? :lazy : :interval) end buffer_type = @buffer_config[:@type] buffer_conf = conf.elements(name: 'buffer').first || Fluent::Config::Element.new('buffer', '', {}, []) @buffer = Plugin.new_buffer(buffer_type, parent: self) @buffer.configure(buffer_conf) @flush_at_shutdown = @buffer_config.flush_at_shutdown if @flush_at_shutdown.nil? @flush_at_shutdown = if @buffer.persistent? false else true # flush_at_shutdown is true in default for on-memory buffer end elsif !@flush_at_shutdown && !@buffer.persistent? buf_type = Plugin.lookup_type_from_class(@buffer.class) log.warn "'flush_at_shutdown' is false, and buffer plugin '#{buf_type}' is not persistent buffer." log.warn "your configuration will lose buffered data at shutdown. please confirm your configuration again." end end if @secondary_config raise Fluent::ConfigError, "Invalid <secondary> section for non-buffered plugin" unless @buffering raise Fluent::ConfigError, "<secondary> section cannot have <buffer> section" if @secondary_config.buffer raise Fluent::ConfigError, "<secondary> section cannot have <secondary> section" if @secondary_config.secondary raise Fluent::ConfigError, "<secondary> section and 'retry_forever' are exclusive" if @buffer_config.retry_forever secondary_type = @secondary_config[:@type] unless secondary_type secondary_type = conf['@type'] # primary plugin type end secondary_conf = conf.elements(name: 'secondary').first @secondary = Plugin.new_output(secondary_type) @secondary.acts_as_secondary(self) @secondary.configure(secondary_conf) @secondary.router = router if @secondary.has_router? if (self.class != @secondary.class) && (@custom_format || @secondary.implement?(:custom_format)) log.warn "secondary type should be same with primary one", primary: self.class.to_s, secondary: @secondary.class.to_s end else @secondary = nil end self end
emit_buffered(tag, es)
click to toggle source
# File lib/fluent/plugin/output.rb, line 704 def emit_buffered(tag, es) @counters_monitor.synchronize{ @emit_count += 1 } begin execute_chunking(tag, es, enqueue: (@flush_mode == :immediate)) if !@retry && @buffer.queued? submit_flush_once end rescue # TODO: separate number of errors into emit errors and write/flush errors @counters_monitor.synchronize{ @num_errors += 1 } raise end end
emit_events(tag, es)
click to toggle source
# File lib/fluent/plugin/output.rb, line 684 def emit_events(tag, es) # actually this method will be overwritten by #configure if @buffering emit_buffered(tag, es) else emit_sync(tag, es) end end
emit_sync(tag, es)
click to toggle source
# File lib/fluent/plugin/output.rb, line 693 def emit_sync(tag, es) @counters_monitor.synchronize{ @emit_count += 1 } begin process(tag, es) @counters_monitor.synchronize{ @emit_records += es.size } rescue @counters_monitor.synchronize{ @num_errors += 1 } raise end end
enqueue_thread_run()
click to toggle source
# File lib/fluent/plugin/output.rb, line 1104 def enqueue_thread_run value_for_interval = nil if @flush_mode == :interval value_for_interval = @buffer_config.flush_interval end if @chunk_key_time if !value_for_interval || @buffer_config.timekey < value_for_interval value_for_interval = @buffer_config.timekey end end unless value_for_interval raise "BUG: both of flush_interval and timekey are disabled" end interval = value_for_interval / 11.0 if interval < @buffer_config.flush_thread_interval interval = @buffer_config.flush_thread_interval end while !self.after_started? && !self.stopped? sleep 0.5 end log.debug "enqueue_thread actually running" begin while @output_enqueue_thread_running now_int = Time.now.to_i if @output_flush_interrupted sleep interval next end @output_enqueue_thread_mutex.lock begin if @flush_mode == :interval flush_interval = @buffer_config.flush_interval.to_i # This block should be done by integer values. # If both of flush_interval & flush_thread_interval are 1s, expected actual flush timing is 1.5s. # If we use integered values for this comparison, expected actual flush timing is 1.0s. @buffer.enqueue_all{ |metadata, chunk| chunk.created_at.to_i + flush_interval <= now_int } end if @chunk_key_time timekey_unit = @buffer_config.timekey timekey_wait = @buffer_config.timekey_wait current_timekey = now_int - now_int % timekey_unit @buffer.enqueue_all{ |metadata, chunk| metadata.timekey < current_timekey && metadata.timekey + timekey_unit + timekey_wait <= now_int } end rescue => e raise if @under_plugin_development log.error "unexpected error while checking flushed chunks. ignored.", plugin_id: plugin_id, error: e log.error_backtrace ensure @output_enqueue_thread_waiting = false @output_enqueue_thread_mutex.unlock end sleep interval end rescue => e # normal errors are rescued by inner begin-rescue clause. log.error "error on enqueue thread", plugin_id: plugin_id, error: e log.error_backtrace raise end end
enqueue_thread_wait()
click to toggle source
only for tests of output plugin
# File lib/fluent/plugin/output.rb, line 1085 def enqueue_thread_wait @output_enqueue_thread_mutex.synchronize do @output_flush_interrupted = false @output_enqueue_thread_waiting = true end require 'timeout' Timeout.timeout(10) do Thread.pass while @output_enqueue_thread_waiting end end
execute_chunking(tag, es, enqueue: false)
click to toggle source
# File lib/fluent/plugin/output.rb, line 768 def execute_chunking(tag, es, enqueue: false) if @simple_chunking handle_stream_simple(tag, es, enqueue: enqueue) elsif @custom_format handle_stream_with_custom_format(tag, es, enqueue: enqueue) else handle_stream_with_standard_format(tag, es, enqueue: enqueue) end end
extract_placeholders(str, metadata)
click to toggle source
TODO: optimize this code
# File lib/fluent/plugin/output.rb, line 643 def extract_placeholders(str, metadata) if metadata.empty? str else rvalue = str.dup # strftime formatting if @chunk_key_time # this section MUST be earlier than rest to use raw 'str' @output_time_formatter_cache[str] ||= Fluent::Timezone.formatter(@timekey_zone, str) rvalue = @output_time_formatter_cache[str].call(metadata.timekey) end # ${tag}, ${tag[0]}, ${tag[1]}, ... if @chunk_key_tag if str.include?('${tag}') rvalue = rvalue.gsub('${tag}', metadata.tag) end if str =~ CHUNK_TAG_PLACEHOLDER_PATTERN hash = {} metadata.tag.split('.').each_with_index do |part, i| hash["${tag[#{i}]}"] = part end rvalue = rvalue.gsub(CHUNK_TAG_PLACEHOLDER_PATTERN, hash) end if rvalue =~ CHUNK_TAG_PLACEHOLDER_PATTERN log.warn "tag placeholder '#{$1}' not replaced. tag:#{metadata.tag}, template:#{str}" end end # ${a_chunk_key}, ... if !@chunk_keys.empty? && metadata.variables hash = {'${tag}' => '${tag}'} # not to erase this wrongly @chunk_keys.each do |key| hash["${#{key}}"] = metadata.variables[key.to_sym] end rvalue = rvalue.gsub(CHUNK_KEY_PLACEHOLDER_PATTERN, hash) end if rvalue =~ CHUNK_KEY_PLACEHOLDER_PATTERN log.warn "chunk key placeholder '#{$1}' not replaced. templace:#{str}" end rvalue end end
flush_thread_run(state)
click to toggle source
# File lib/fluent/plugin/output.rb, line 1169 def flush_thread_run(state) flush_thread_interval = @buffer_config.flush_thread_interval # If the given clock_id is not supported, Errno::EINVAL is raised. clock_id = Process::CLOCK_MONOTONIC rescue Process::CLOCK_MONOTONIC_RAW state.next_time = Process.clock_gettime(clock_id) + flush_thread_interval while !self.after_started? && !self.stopped? sleep 0.5 end log.debug "flush_thread actually running" begin # This thread don't use `thread_current_running?` because this thread should run in `before_shutdown` phase while @output_flush_threads_running time = Process.clock_gettime(clock_id) interval = state.next_time - time if state.next_time <= time try_flush # next_flush_interval uses flush_thread_interval or flush_thread_burst_interval (or retrying) interval = next_flush_time.to_f - Time.now.to_f # TODO: if secondary && delayed-commit, next_flush_time will be much longer than expected (because @retry still exists) # @retry should be cleared if delayed commit is enabled? Or any other solution? state.next_time = Process.clock_gettime(clock_id) + interval end if @dequeued_chunks_mutex.synchronize{ !@dequeued_chunks.empty? && @dequeued_chunks.first.expired? } unless @output_flush_interrupted try_rollback_write end end sleep interval if interval > 0 end rescue => e # normal errors are rescued by output plugins in #try_flush # so this rescue section is for critical & unrecoverable errors log.error "error on output thread", plugin_id: plugin_id, error_class: e.class, error: e log.error_backtrace raise end end
flush_thread_wakeup()
click to toggle source
only for tests of output plugin
# File lib/fluent/plugin/output.rb, line 1097 def flush_thread_wakeup @output_flush_threads.each do |state| state.next_time = 0 state.thread.run end end
force_flush()
click to toggle source
# File lib/fluent/plugin/output.rb, line 1065 def force_flush if @buffering @buffer.enqueue_all submit_flush_all end end
format(tag, time, record)
click to toggle source
# File lib/fluent/plugin/output.rb, line 117 def format(tag, time, record) # standard msgpack_event_stream chunk will be used if this method is not implemented in plugin subclass raise NotImplementedError, "BUG: output plugins MUST implement this method" end
formatted_to_msgpack_binary()
click to toggle source
# File lib/fluent/plugin/output.rb, line 122 def formatted_to_msgpack_binary # To indicate custom format method (#format) returns msgpack binary or not. # If #format returns msgpack binary, override this method to return true. false end
generate_format_proc()
click to toggle source
# File lib/fluent/plugin/output.rb, line 818 def generate_format_proc if @buffer && @buffer.compress == :gzip @time_as_integer ? FORMAT_COMPRESSED_MSGPACK_STREAM_TIME_INT : FORMAT_COMPRESSED_MSGPACK_STREAM else @time_as_integer ? FORMAT_MSGPACK_STREAM_TIME_INT : FORMAT_MSGPACK_STREAM end end
get_placeholders_keys(str)
click to toggle source
# File lib/fluent/plugin/output.rb, line 638 def get_placeholders_keys(str) str.scan(CHUNK_KEY_PLACEHOLDER_PATTERN).map{|ph| ph[2..-2]}.reject{|s| s == "tag"}.sort end
get_placeholders_tag(str)
click to toggle source
-1 means whole tag
# File lib/fluent/plugin/output.rb, line 625 def get_placeholders_tag(str) # [["tag"],["tag[0]"]] parts = [] str.scan(CHUNK_TAG_PLACEHOLDER_PATTERN).map(&:first).each do |ph| if ph == "tag" parts << -1 elsif ph =~ /^tag\[(\d+)\]$/ parts << $1.to_i end end parts.sort end
get_placeholders_time(str)
click to toggle source
it's not validated to use timekey larger than 1 day
# File lib/fluent/plugin/output.rb, line 615 def get_placeholders_time(str) base_str = TIMESTAMP_CHECK_BASE_TIME.strftime(str) TIME_KEY_PLACEHOLDER_THRESHOLDS.each do |triple| sec = triple.first return triple if (TIMESTAMP_CHECK_BASE_TIME + sec).strftime(str) != base_str end nil end
handle_stream_simple(tag, es, enqueue: false)
click to toggle source
# File lib/fluent/plugin/output.rb, line 868 def handle_stream_simple(tag, es, enqueue: false) format_proc = nil meta = metadata((@chunk_key_tag ? tag : nil), nil, nil) records = es.size if @custom_format records = 0 data = [] es.each do |time, record| data << format(tag, time, record) records += 1 end else format_proc = generate_format_proc data = es end write_guard do @buffer.write({meta => data}, format: format_proc, enqueue: enqueue) end @counters_monitor.synchronize{ @emit_records += records } true end
handle_stream_with_custom_format(tag, es, enqueue: false)
click to toggle source
metadata_and_data is a Hash of:
(standard format) metadata => event stream (custom format) metadata => array of formatted event
For standard format, formatting should be done for whole event stream, but
"whole event stream" may be a split of "es" here when it's bigger than chunk_limit_size. `@buffer.write` will do this splitting.
For custom format, formatting will be done here. Custom formatting always requires
iteration of event stream, and it should be done just once even if total event stream size is bigger than chunk_limit_size because of performance.
# File lib/fluent/plugin/output.rb, line 835 def handle_stream_with_custom_format(tag, es, enqueue: false) meta_and_data = {} records = 0 es.each do |time, record| meta = metadata(tag, time, record) meta_and_data[meta] ||= [] meta_and_data[meta] << format(tag, time, record) records += 1 end write_guard do @buffer.write(meta_and_data, enqueue: enqueue) end @counters_monitor.synchronize{ @emit_records += records } true end
handle_stream_with_standard_format(tag, es, enqueue: false)
click to toggle source
# File lib/fluent/plugin/output.rb, line 851 def handle_stream_with_standard_format(tag, es, enqueue: false) format_proc = generate_format_proc meta_and_data = {} records = 0 es.each do |time, record| meta = metadata(tag, time, record) meta_and_data[meta] ||= MultiEventStream.new meta_and_data[meta].add(time, record) records += 1 end write_guard do @buffer.write(meta_and_data, format: format_proc, enqueue: enqueue) end @counters_monitor.synchronize{ @emit_records += records } true end
implement?(feature)
click to toggle source
# File lib/fluent/plugin/output.rb, line 496 def implement?(feature) methods_of_plugin = self.class.instance_methods(false) case feature when :synchronous then methods_of_plugin.include?(:process) || support_in_v12_style?(:synchronous) when :buffered then methods_of_plugin.include?(:write) || support_in_v12_style?(:buffered) when :delayed_commit then methods_of_plugin.include?(:try_write) when :custom_format then methods_of_plugin.include?(:format) || support_in_v12_style?(:custom_format) else raise ArgumentError, "Unknown feature for output plugin: #{feature}" end end
interrupt_flushes()
click to toggle source
only for tests of output plugin
# File lib/fluent/plugin/output.rb, line 1080 def interrupt_flushes @output_flush_interrupted = true end
metadata(tag, time, record)
click to toggle source
TODO: optimize this code
# File lib/fluent/plugin/output.rb, line 719 def metadata(tag, time, record) # this arguments are ordered in output plugin's rule # Metadata 's argument order is different from this one (timekey, tag, variables) raise ArgumentError, "tag must be a String: #{tag.class}" unless tag.nil? || tag.is_a?(String) raise ArgumentError, "time must be a Fluent::EventTime (or Integer): #{time.class}" unless time.nil? || time.is_a?(Fluent::EventTime) || time.is_a?(Integer) raise ArgumentError, "record must be a Hash: #{record.class}" unless record.nil? || record.is_a?(Hash) if @chunk_keys.nil? && @chunk_key_time.nil? && @chunk_key_tag.nil? # for tests return Struct.new(:timekey, :tag, :variables).new end # timekey is int from epoch, and `timekey - timekey % 60` is assumed to mach with 0s of each minutes. # it's wrong if timezone is configured as one which supports leap second, but it's very rare and # we can ignore it (especially in production systems). if @chunk_keys.empty? if !@chunk_key_time && !@chunk_key_tag @buffer.metadata() elsif @chunk_key_time && @chunk_key_tag time_int = time.to_i timekey = (time_int - (time_int % @buffer_config.timekey)).to_i @buffer.metadata(timekey: timekey, tag: tag) elsif @chunk_key_time time_int = time.to_i timekey = (time_int - (time_int % @buffer_config.timekey)).to_i @buffer.metadata(timekey: timekey) else @buffer.metadata(tag: tag) end else timekey = if @chunk_key_time time_int = time.to_i (time_int - (time_int % @buffer_config.timekey)).to_i else nil end pairs = Hash[@chunk_keys.map{|k| [k.to_sym, record[k]]}] @buffer.metadata(timekey: timekey, tag: (@chunk_key_tag ? tag : nil), variables: pairs) end end
metadata_for_test(tag, time, record)
click to toggle source
# File lib/fluent/plugin/output.rb, line 761 def metadata_for_test(tag, time, record) raise "BUG: #metadata_for_test is available only when no actual metadata exists" unless @buffer.metadata_list.empty? m = metadata(tag, time, record) @buffer.metadata_list_clear! m end
next_flush_time()
click to toggle source
# File lib/fluent/plugin/output.rb, line 953 def next_flush_time if @buffer.queued? @retry_mutex.synchronize do @retry ? @retry.next_time : Time.now + @buffer_config.flush_thread_burst_interval end else Time.now + @buffer_config.flush_thread_interval end end
placeholder_validate!(name, str)
click to toggle source
# File lib/fluent/plugin/output.rb, line 508 def placeholder_validate!(name, str) placeholder_validators(name, str).each do |v| v.validate! end end
placeholder_validators(name, str, time_key = (@chunk_key_time && @buffer_config.timekey), tag_key = @chunk_key_tag, chunk_keys = @chunk_keys)
click to toggle source
# File lib/fluent/plugin/output.rb, line 514 def placeholder_validators(name, str, time_key = (@chunk_key_time && @buffer_config.timekey), tag_key = @chunk_key_tag, chunk_keys = @chunk_keys) validators = [] sec, title, example = get_placeholders_time(str) if sec || time_key validators << PlaceholderValidator.new(name, str, :time, {sec: sec, title: title, example: example, timekey: time_key}) end parts = get_placeholders_tag(str) if tag_key || !parts.empty? validators << PlaceholderValidator.new(name, str, :tag, {parts: parts, tagkey: tag_key}) end keys = get_placeholders_keys(str) if chunk_keys && !chunk_keys.empty? || !keys.empty? validators << PlaceholderValidator.new(name, str, :keys, {keys: keys, chunkkeys: chunk_keys}) end validators end
prefer_buffered_processing()
click to toggle source
# File lib/fluent/plugin/output.rb, line 128 def prefer_buffered_processing # override this method to return false only when all of these are true: # * plugin has both implementation for buffered and non-buffered methods # * plugin is expected to work as non-buffered plugin if no `<buffer>` sections specified true end
prefer_delayed_commit()
click to toggle source
# File lib/fluent/plugin/output.rb, line 135 def prefer_delayed_commit # override this method to decide which is used of `write` or `try_write` if both are implemented true end
process(tag, es)
click to toggle source
# File lib/fluent/plugin/output.rb, line 105 def process(tag, es) raise NotImplementedError, "BUG: output plugins MUST implement this method" end
retry_state(randomize)
click to toggle source
# File lib/fluent/plugin/output.rb, line 1038 def retry_state(randomize) if @secondary retry_state_create( :output_retries, @buffer_config.retry_type, @buffer_config.retry_wait, @buffer_config.retry_timeout, forever: @buffer_config.retry_forever, max_steps: @buffer_config.retry_max_times, backoff_base: @buffer_config.retry_exponential_backoff_base, max_interval: @buffer_config.retry_max_interval, secondary: true, secondary_threshold: @buffer_config.retry_secondary_threshold, randomize: randomize ) else retry_state_create( :output_retries, @buffer_config.retry_type, @buffer_config.retry_wait, @buffer_config.retry_timeout, forever: @buffer_config.retry_forever, max_steps: @buffer_config.retry_max_times, backoff_base: @buffer_config.retry_exponential_backoff_base, max_interval: @buffer_config.retry_max_interval, randomize: randomize ) end end
rollback_write(chunk_id)
click to toggle source
# File lib/fluent/plugin/output.rb, line 911 def rollback_write(chunk_id) # This API is to rollback chunks explicitly from plugins. # 3rd party plugins can depend it on automatic rollback of #try_rollback_write @dequeued_chunks_mutex.synchronize do @dequeued_chunks.delete_if{ |info| info.chunk_id == chunk_id } end # returns true if chunk was rollbacked as expected # false if chunk was already flushed and couldn't be rollbacked unexpectedly # in many cases, false can be just ignored if @buffer.takeback_chunk(chunk_id) @counters_monitor.synchronize{ @rollback_count += 1 } true else false end end
shutdown()
click to toggle source
Calls superclass method
Fluent::Plugin::Base#shutdown
# File lib/fluent/plugin/output.rb, line 442 def shutdown @secondary.shutdown if @secondary @buffer.shutdown if @buffering && @buffer super end
start()
click to toggle source
Calls superclass method
Fluent::PluginLoggerMixin#start
# File lib/fluent/plugin/output.rb, line 331 def start super if @buffering.nil? @buffering = prefer_buffered_processing if !@buffering && @buffer @buffer.terminate # it's not started, so terminate will be enough # At here, this plugin works as non-buffered plugin. # Un-assign @buffer not to show buffering metrics (e.g., in_monitor_agent) @buffer = nil end end if @buffering m = method(:emit_buffered) (class << self; self; end).module_eval do define_method(:emit_events, m) end @custom_format = implement?(:custom_format) @enable_msgpack_streamer = @custom_format ? formatted_to_msgpack_binary : true @delayed_commit = if implement?(:buffered) && implement?(:delayed_commit) prefer_delayed_commit else implement?(:delayed_commit) end @delayed_commit_timeout = @buffer_config.delayed_commit_timeout else # !@buffering m = method(:emit_sync) (class << self; self; end).module_eval do define_method(:emit_events, m) end end if @buffering && !@as_secondary @retry = nil @retry_mutex = Mutex.new @buffer.start @output_enqueue_thread = nil @output_enqueue_thread_running = true @output_flush_threads = [] @output_flush_threads_mutex = Mutex.new @output_flush_threads_running = true # mainly for test: detect enqueue works as code below: # @output.interrupt_flushes # # emits # @output.enqueue_thread_wait @output_flush_interrupted = false @output_enqueue_thread_mutex = Mutex.new @output_enqueue_thread_waiting = false @dequeued_chunks = [] @dequeued_chunks_mutex = Mutex.new @buffer_config.flush_thread_count.times do |i| thread_title = "flush_thread_#{i}".to_sym thread_state = FlushThreadState.new(nil, nil) thread = thread_create(thread_title) do flush_thread_run(thread_state) end thread_state.thread = thread @output_flush_threads_mutex.synchronize do @output_flush_threads << thread_state end end @output_flush_thread_current_position = 0 unless @in_tests if @flush_mode == :interval || @chunk_key_time @output_enqueue_thread = thread_create(:enqueue_thread, &method(:enqueue_thread_run)) end end end @secondary.start if @secondary end
stop()
click to toggle source
Calls superclass method
Fluent::Plugin::Base#stop
# File lib/fluent/plugin/output.rb, line 416 def stop @secondary.stop if @secondary @buffer.stop if @buffering && @buffer super end
submit_flush_all()
click to toggle source
# File lib/fluent/plugin/output.rb, line 1072 def submit_flush_all while !@retry && @buffer.queued? submit_flush_once sleep @buffer_config.flush_thread_burst_interval end end
submit_flush_once()
click to toggle source
# File lib/fluent/plugin/output.rb, line 1057 def submit_flush_once # Without locks: it is rough but enough to select "next" writer selection @output_flush_thread_current_position = (@output_flush_thread_current_position + 1) % @buffer_config.flush_thread_count state = @output_flush_threads[@output_flush_thread_current_position] state.next_time = 0 state.thread.run end
support_in_v12_style?(feature)
click to toggle source
# File lib/fluent/plugin/output.rb, line 484 def support_in_v12_style?(feature) # for plugins written in v0.12 styles case feature when :synchronous then false when :buffered then false when :delayed_commit then false when :custom_format then false else raise ArgumentError, "unknown feature: #{feature}" end end
terminate()
click to toggle source
Calls superclass method
Fluent::PluginLoggerMixin#terminate
# File lib/fluent/plugin/output.rb, line 477 def terminate @buffer.terminate if @buffering && @buffer @secondary.terminate if @secondary super end
try_flush()
click to toggle source
# File lib/fluent/plugin/output.rb, line 963 def try_flush chunk = @buffer.dequeue_chunk return unless chunk log.debug "trying flush for a chunk", chunk: dump_unique_id_hex(chunk.unique_id) output = self using_secondary = false if @retry_mutex.synchronize{ @retry && @retry.secondary? } output = @secondary using_secondary = true end if @enable_msgpack_streamer chunk.extend ChunkMessagePackEventStreamer end begin if output.delayed_commit log.trace "executing delayed write and commit", chunk: dump_unique_id_hex(chunk.unique_id) @counters_monitor.synchronize{ @write_count += 1 } output.try_write(chunk) @dequeued_chunks_mutex.synchronize do # delayed_commit_timeout for secondary is configured in <buffer> of primary (<secondary> don't get <buffer>) @dequeued_chunks << DequeuedChunkInfo.new(chunk.unique_id, Time.now, self.delayed_commit_timeout) end else # output plugin without delayed purge chunk_id = chunk.unique_id dump_chunk_id = dump_unique_id_hex(chunk_id) log.trace "adding write count", instance: self.object_id @counters_monitor.synchronize{ @write_count += 1 } log.trace "executing sync write", chunk: dump_chunk_id output.write(chunk) log.trace "write operation done, committing", chunk: dump_chunk_id commit_write(chunk_id, secondary: using_secondary) log.trace "done to commit a chunk", chunk: dump_chunk_id end rescue => e log.debug "taking back chunk for errors.", plugin_id: plugin_id, chunk: dump_unique_id_hex(chunk.unique_id) @buffer.takeback_chunk(chunk.unique_id) if @under_plugin_development raise end @retry_mutex.synchronize do if @retry @counters_monitor.synchronize{ @num_errors += 1 } if @retry.limit? records = @buffer.queued_records log.error "failed to flush the buffer, and hit limit for retries. dropping all chunks in the buffer queue.", plugin_id: plugin_id, retry_times: @retry.steps, records: records, error: e log.error_backtrace e.backtrace @buffer.clear_queue! log.debug "buffer queue cleared", plugin_id: plugin_id @retry = nil else @retry.step msg = if using_secondary "failed to flush the buffer with secondary output." else "failed to flush the buffer." end log.warn msg, plugin_id: plugin_id, retry_time: @retry.steps, next_retry: @retry.next_time, chunk: dump_unique_id_hex(chunk.unique_id), error: e log.warn_backtrace e.backtrace end else @retry = retry_state(@buffer_config.retry_randomize) @counters_monitor.synchronize{ @num_errors += 1 } log.warn "failed to flush the buffer.", plugin_id: plugin_id, retry_time: @retry.steps, next_retry: @retry.next_time, chunk: dump_unique_id_hex(chunk.unique_id), error: e log.warn_backtrace e.backtrace end end end end
try_rollback_all()
click to toggle source
# File lib/fluent/plugin/output.rb, line 940 def try_rollback_all return unless @dequeued_chunks @dequeued_chunks_mutex.synchronize do until @dequeued_chunks.empty? info = @dequeued_chunks.shift if @buffer.takeback_chunk(info.chunk_id) @counters_monitor.synchronize{ @rollback_count += 1 } log.info "delayed commit for buffer chunks was cancelled in shutdown", plugin_id: plugin_id, chunk_id: dump_unique_id_hex(info.chunk_id) end end end end
try_rollback_write()
click to toggle source
# File lib/fluent/plugin/output.rb, line 928 def try_rollback_write @dequeued_chunks_mutex.synchronize do while @dequeued_chunks.first && @dequeued_chunks.first.expired? info = @dequeued_chunks.shift if @buffer.takeback_chunk(info.chunk_id) @counters_monitor.synchronize{ @rollback_count += 1 } log.warn "failed to flush the buffer chunk, timeout to commit.", plugin_id: plugin_id, chunk_id: dump_unique_id_hex(info.chunk_id), flushed_at: info.time end end end end
try_write(chunk)
click to toggle source
# File lib/fluent/plugin/output.rb, line 113 def try_write(chunk) raise NotImplementedError, "BUG: output plugins MUST implement this method" end
write(chunk)
click to toggle source
# File lib/fluent/plugin/output.rb, line 109 def write(chunk) raise NotImplementedError, "BUG: output plugins MUST implement this method" end
write_guard(&block)
click to toggle source
# File lib/fluent/plugin/output.rb, line 778 def write_guard(&block) begin block.call rescue Fluent::Plugin::Buffer::BufferOverflowError log.warn "failed to write data into buffer by buffer overflow" case @buffer_config.overflow_action when :throw_exception raise when :block log.debug "buffer.write is now blocking" until @buffer.storable? sleep 1 end log.debug "retrying buffer.write after blocked operation" retry when :drop_oldest_chunk begin oldest = @buffer.dequeue_chunk if oldest log.warn "dropping oldest chunk to make space after buffer overflow", chunk_id: oldest.unique_id @buffer.purge_chunk(oldest.unique_id) else log.error "no queued chunks to be dropped for drop_oldest_chunk" end rescue # ignore any errors end raise unless @buffer.storable? retry else raise "BUG: unknown overflow_action '#{@buffer_config.overflow_action}'" end end end