# Provide input/output streams for transforming Monasca logs. # Filters should be provided in other configuration files. input { kafka { zk_connect => "{{ monasca_zookeeper_servers }}" topic_id => "{{ monasca_raw_logs_topic }}" group_id => "log_transformer" consumer_id => "log_transformer_{{ ansible_hostname }}" consumer_threads => "{{ monasca_log_pipeline_threads }}" } } filter { # Update the timestamp of the event based on the time in the message. date { match => [ "[log][dimensions][timestamp]", "yyyy-MM-dd HH:mm:ss +0000", "ISO8601"] remove_field => [ "[log][dimensions][timestamp]", "[log][dimensions][Timestamp]" ] } # OpenStack log levels are uppercase, and syslog are lowercase. # Furthermore, syslog has more log levels that OpenStack. To avoid # mapping syslog log levels to OpenStack log levels, we standardise # on the syslog style here. if [log][dimensions][log_level] { mutate { lowercase => [ "[log][dimensions][log_level]" ] } } } output { kafka { bootstrap_servers => "{{ monasca_kafka_servers }}" topic_id => "{{ monasca_transformed_logs_topic }}" client_id => "log_transformer_{{ ansible_hostname }}" workers => {{ monasca_log_pipeline_threads|int }} } }