方案为:filebeat一行一行喂,logstash通过aggregate插件来收集,全部收集完成后发到kibana

filebeat.yml


filebeat.inputs:
- type: log

  enabled: true

  paths:
    - /Users/jiyarong/webs6/jkb_rails2/log/sidekiq.log

filebeat.config.modules:
  path: ${path.config}/modules.d/*.yml


setup.template.settings:
  index.number_of_shards: 1

setup.kibana:
  host: "localhost:5601"

output.logstash:
  # The Logstash hosts
  hosts: ["localhost:5044"]

logstash.conf

input {
	beats {
		port => 5044
	}

	tcp {
		port => 5000
	}
}
filter {
	ruby {
		path => "/usr/share/logstash/pipeline/ruby_scripts/job_filter1.rb"
	}

	if [job_position] == "job_head" {
		aggregate {
       task_id => "%{jid}"
       code => "
			 	 map['_message'] = [event.get('message')]
                 event.cancel()
			 "
       map_action => "create"
     }
	}

	elseif [job_position] == "job_content" {
		 aggregate {
       task_id => "%{jid}"
       code => "
		      map['_message'] << event.get('message')
              event.cancel()
			 "
       map_action => "update"
     }
	}

	else if [job_position] == "job_end" {
		 aggregate {
       task_id => "%{jid}"
       code => "
			 map['_message'] << event.get('message')
             event.set('message', map['_message'])
		"
       map_action => "update"
       end_of_task => true
       timeout => 120
     }
	}
}

## Add your filters / logstash plugins configuration here

output {
	elasticsearch {
		hosts => "elasticsearch:9200"
		user => elastic
		password => changeme
	}
}

job_filter1.rb

def filter(event)
  message = event.get("message")
  job_id = message.match(/JID-([a-z,0-9]{24})/).to_s
  if job_id.length > 0
    event.set("jid", job_id)
    if message.match(/:\sstart/).to_s.length > 0
      event.set("job_position", 'job_head')
    elsif message.match(/:\sdone/).to_s.length > 0
      event.set("job_position", 'job_end')
    else
      event.set("job_position", 'job_content')  
    end
  else
    event.cancel()  
  end  
  
  [event]
end  

启用aggregate插件要注意的是,需要把filter works的数量设置为1,否则无法正常实现效果

logstash.yml

---
## Default Logstash configuration from logstash-docker.
## from https://github.com/elastic/logstash-docker/blob/master/build/logstash/config/logstash-full.yml
#
http.host: "0.0.0.0"
xpack.monitoring.elasticsearch.hosts: [ "http://elasticsearch:9200" ]

## X-Pack security credentials
#
xpack.monitoring.elasticsearch.username: elastic
xpack.monitoring.elasticsearch.password: changeme

pipeline.workers: 1