sources: kafka_source: type: kafka properties: topic: PROXY-EVENT kafka.bootstrap.servers: {{ kafka_source_servers }} kafka.client.id: PROXY-EVENT kafka.session.timeout.ms: 60000 kafka.max.poll.records: 3000 kafka.max.partition.fetch.bytes: 31457280 kafka.security.protocol: SASL_PLAINTEXT kafka.sasl.mechanism: PLAIN kafka.sasl.jaas.config: 454f65ea6eef1256e3067104f82730e737b68959560966b811e7ff364116b03124917eb2b0f3596f14733aa29ebad9352644ce1a5c85991c6f01ba8a5e8f177a80bea937958aaa485c2acc2b475603495a23eb59f055e037c0b186acb22886bd0275ca91f1633441d9943e7962942252 kafka.group.id: {{ kafka_source_group_id }} kafka.auto.offset.reset: latest format: json json.ignore.parse.errors: false processing_pipelines: etl_processor: type: projection functions: - function: SNOWFLAKE_ID lookup_fields: [''] output_fields: [log_id] parameters: data_center_id_num: {{ data_center_id_num }} - function: UNIX_TIMESTAMP_CONVERTER lookup_fields: [__timestamp] output_fields: [recv_time] parameters: precision: seconds - function: EVAL output_fields: [ingestion_time] parameters: value_expression: recv_time - function: BASE64_DECODE_TO_STRING output_fields: [mail_subject] parameters: value_field: mail_subject charset_field: mail_subject_charset - function: BASE64_DECODE_TO_STRING output_fields: [mail_attachment_name] parameters: value_field: mail_attachment_name charset_field: mail_attachment_name_charset - function: PATH_COMBINE lookup_fields: [rtp_pcap_path] output_fields: [rtp_pcap_path] parameters: path: [props.hos.path, props.hos.bucket.name.rtp_file, rtp_pcap_path] - function: PATH_COMBINE lookup_fields: [http_request_body] output_fields: [http_request_body] parameters: path: [props.hos.path, props.hos.bucket.name.http_file, http_request_body] - function: PATH_COMBINE lookup_fields: [http_response_body] output_fields: [http_response_body] parameters: path: [props.hos.path, props.hos.bucket.name.http_file, http_response_body] - function: PATH_COMBINE lookup_fields: [mail_eml_file] output_fields: [mail_eml_file] parameters: path: [props.hos.path, props.hos.bucket.name.eml_file, mail_eml_file] - function: PATH_COMBINE lookup_fields: [packet_capture_file] output_fields: [packet_capture_file] parameters: path: [props.hos.path, props.hos.bucket.name.policy_capture_file, packet_capture_file] - function: CURRENT_UNIX_TIMESTAMP output_fields: [ processing_time ] parameters: precision: seconds sinks: kafka_sink: type: kafka properties: topic: PROXY-EVENT kafka.bootstrap.servers: {{ kafka_sink_servers }} kafka.client.id: PROXY-EVENT kafka.retries: 0 kafka.linger.ms: 10 kafka.request.timeout.ms: 30000 kafka.batch.size: 262144 kafka.buffer.memory: 134217728 kafka.max.request.size: 10485760 kafka.compression.type: snappy kafka.security.protocol: SASL_PLAINTEXT kafka.sasl.mechanism: PLAIN kafka.sasl.jaas.config: 454f65ea6eef1256e3067104f82730e737b68959560966b811e7ff364116b03124917eb2b0f3596f14733aa29ebad9352644ce1a5c85991c6f01ba8a5e8f177a80bea937958aaa485c2acc2b475603495a23eb59f055e037c0b186acb22886bd0275ca91f1633441d9943e7962942252 format: json json.ignore.parse.errors: false log.failures.only: true clickhouse_sink: type: clickhouse properties: host: {{ clickhouse_sink_host }} table: tsg_galaxy_v3.proxy_event_local batch.size: 100000 batch.interval: 30s connection.user: e54c9568586180eede1506eecf3574e9 connection.password: 86cf0e2ffba3f541a6c6761313e5cc7e connection.connect_timeout: 30 connection.query_timeout: 300 application: env: name: {{ job_name }} shade.identifier: aes pipeline: object-reuse: true properties: hos.bucket.name.rtp_file: traffic_rtp_file_bucket hos.bucket.name.http_file: traffic_http_file_bucket hos.bucket.name.eml_file: traffic_eml_file_bucket hos.bucket.name.policy_capture_file: traffic_policy_capture_file_bucket {{ topology }}