日志作为企业数据资产的重要组成部分,为企业带来系统可观测性、网络安全、数据分析三个方面的价值。日志是你排除故障的首选手段、提高系统安全性的参考,也是你可以提取有助于业务增长的信息的数据挖掘。
日志是计算机系统中事件的顺序记录。如果你思考一下日志是如何生成和使用的,你就会知道理想的日志分析系统应该是什么样子:
- 应具有无模式支持。原始日志是非结构化的自由文本,基本上无法进行聚合和计算,因此需要将其转换为结构化表格(该过程称为“ETL”),然后再将其放入数据库或数据仓库进行分析。如果架构发生更改,则需要对 ETL 和结构化表进行大量复杂的调整。因此,出现了半结构化日志,主要是JSON格式。您可以添加或删除这些日志中的字段,日志存储系统将相应地调整架构。
- 应该是低成本的。日志巨大且不断生成。一家大的公司会产生 10~100 TB 的日志数据。出于业务或合规性原因,应将日志保留半年或更长时间。这意味着要存储以PB为单位的日志大小,因此成本相当可观。
- 应能实时处理。日志应该实时写入,否则工程师将无法在故障排除和安全跟踪中捕获最新事件。另外,一个好的日志系统应该提供全文搜索功能并快速响应交互式查询。
基于Elasticsearch的日志分析解决方案
数据行业中流行的日志处理解决方案是ELK 堆栈:Elasticsearch、Logstash 和 Kibana。该管道可以分为五个模块:
- 日志收集:Filebeat收集本地日志文件并将其写入Kafka消息队列。
- 日志传输:Kafka消息队列收集并缓存日志。
- 日志转换:Logstash对Kafka中的日志数据进行过滤和转换。
- 日志存储:Logstash将JSON格式的日志写入Elasticsearch进行存储。
- 日志查询:用户通过Kibana可视化搜索日志或通过Elasticsearch DSL API发送查询请求。
ELK堆栈具有出色的实时处理能力,但仍存在一些不足之处。
无模式支持不充分
在Elasticsearch中,索引映射(Index Mapping)定义了表的结构,包括字段名称、数据类型以及是否启用索引创建等信息。
Elasticsearch确实拥有动态映射(Dynamic Mapping)机制,它可以根据输入的JSON数据自动添加字段到映射中。这提供了某种程度上的无模式支持,但并不足够,原因如下:
- 动态映射在处理脏数据时经常会创建过多的字段,这会导致整个系统受到干扰。
- 字段的数据类型是不可变的。为了确保兼容性,用户通常将字段配置为"text"作为数据类型,但这会导致比如整数等二进制数据类型查询性能明显较慢。
- 字段的索引也是不可变的。用户无法为特定字段添加或删除索引,因此他们通常会为所有字段创建索引,以便在查询中进行数据过滤。然而,过多的索引会占用额外的存储空间,并减慢数据写入速度。
不足的分析能力
Elasticsearch拥有独特的领域特定语言(DSL),与大多数数据工程师和分析师熟悉的技术堆栈非常不同,因此学习曲线较陡。此外,Elasticsearch生态系统相对封闭,因此在与商业智能工具集成方面可能会遇到较大的阻力。最重要的是,Elasticsearch仅支持单表分析,无法满足现代OLAP对多表联接、子查询和视图等需求的要求。
高成本和低稳定性
Elasticsearch的用户一直在抱怨计算和存储成本。这个问题的根本原因在于Elasticsearch的工作方式。
- 计算成本:在数据写入过程中,Elasticsearch执行包括倒排索引创建、标记化和倒排索引排序在内的计算密集型操作。在这种情况下,每个核心的数据写入速度约为2MB/s。当CPU资源紧张时,在高峰期往往无法满足数据写入需求,进一步导致较高的延迟。
- 存储成本:为了加快检索速度,Elasticsearch存储了原始数据的正排索引、倒排索引和文档值,消耗了更多的存储空间。相对于大多数日志解决方案的5:1的压缩比,单个数据副本的压缩比仅为1.5:1。
随着数据和集群规模的增长,保持稳定性可能是另一个问题:
- 在数据写入高峰期:集群容易在数据写入高峰期出现负载过重的情况。
- 在查询过程中:由于所有查询都在内存中处理,大型查询很容易导致JVM OOM(内存溢出)。
- 恢复速度慢:若发生集群故障,Elasticsearch需要重新加载索引,这是一个消耗资源的过程,因此恢复时间可能需要数分钟。这对于服务的可用性保证提出了挑战。
更具成本效益的选择
针对基于Elasticsearch的解决方案的优势和限制,Apache Doris开发者针对日志处理进行了优化。
- 增加写入吞吐量:Elasticsearch的性能受到数据解析和倒排索引创建的瓶颈影响,因此Apache Doris改进了在这些方面的性能:通过SIMD指令和CPU向量指令加速数据解析和索引创建;然后移除了在日志分析场景中不必要的数据结构,如正排索引,以简化索引创建过程。
- 减少存储成本:Apache Doris移除了占据索引数据30%的正排索引。采用了列式存储和ZSTD压缩算法,从而实现了5:1到10:1的压缩比。鉴于大部分历史日志很少被访问,引入了分层存储来区分热数据和冷数据。超过指定时间段的日志将被移动到较便宜的对象存储中。这可以将存储成本降低约70%。
与Elasticsearch的官方测试工具ES Rally进行的基准测试显示,相比Elasticsearch,Apache Doris在数据写入方面快了约5倍,在查询方面快了2.3倍,并且仅消耗了Elasticsearch使用存储空间的1/5。在HTTP日志的测试数据集上,它实现了550 MB/s的写入速度和10:1的压缩比。
下图展示了一个典型的基于Apache Doris的日志处理系统的样貌。它更加全面,并且允许在数据摄取、分析和应用等方面具有更灵活的使用方式:
- 数据摄取:Apache Doris支持多种日志数据的摄取方法。你可以通过Logstash将日志推送到Doris,也可以使用Flink在将日志写入Doris之前进行预处理,还可以通过Routine Load和S3 Load从Flink或对象存储加载日志到Doris中。
- 数据分析:你可以将日志数据放入Doris,并对数据仓库中的日志和其他数据进行联接查询。
- 应用场景:Apache Doris兼容MySQL协议,因此你可以将各种数据分析工具和客户端(如Grafana和Tableau)集成到Doris中。你还可以通过JDBC和ODBC API连接应用程序到Doris。我们计划构建一个类似于Kibana的系统来可视化日志。
此外,Apache Doris具有更好的无模式支持和更用户友好的分析引擎。
对半结构化数据的本地支持
首先,Apache Doris对数据类型进行了优化。通过矢量化优化,Apache Doris改进了"text"的字符串搜索和正则表达式匹配,提高了2~10倍的性能。对于JSON字符串,Apache Doris将其解析并存储为更紧凑和高效的二进制格式,可以加快查询速度4倍。我们还添加了一个适用于复杂数据的新数据类型:Array Map。它可以将拼接的字符串结构化,以实现更高的压缩率和更快的查询速度。
其次,Apache Doris支持模式演化。这意味着你可以根据业务变化来调整模式。可以添加或删除字段和索引,并更改字段的数据类型。
Apache Doris提供轻量级模式变更功能,因此可以在毫秒级内添加或删除字段:
-- 添加一个列,将在毫秒级返回结果。
ALTER TABLE lineitem ADD COLUMN l_new_column INT;
还可以仅为目标字段添加索引,这样可以避免不必要的索引创建带来的开销。在添加索引之后,默认情况下,系统将为所有增量数据生成索引,并且可以指定哪些历史数据分区需要索引。
-- 添加倒排索引。在此之后,Doris将为所有新数据生成倒排索引。
ALTER TABLE table_name ADD INDEX index_name(column_name) USING INVERTED;
-- 构建特定历史数据分区的索引。
BUILD INDEX index_name ON table_name PARTITIONS(partition_name1, partition_name2);
基于SQL的分析引擎
基于SQL的分析引擎确保数据工程师和分析师可以在短时间内轻松掌握Apache Doris,并将他们对SQL的经验应用于这个OLAP引擎。借助SQL丰富的功能,用户可以执行数据检索、聚合、多表联接、子查询、UDF、逻辑视图和物化视图等操作,以满足自己的需求。
由于与MySQL兼容,Apache Doris可以与大数据生态系统中的大多数GUI和BI工具集成,使用户能够实现更复杂和多样化的数据分析。
性能表现
一家游戏公司已从ELK堆栈过渡到Apache Doris解决方案。他们基于Doris的日志系统所需的存储空间仅为之前的1/6。
某网络安全公司利用Apache Doris中的倒排索引构建了其日志分析系统,他们支持每秒30万行的数据写入速度,而只使用之前服务器资源的1/5。
实践指南
现在让我们按照三个步骤来构建一个基于Apache Doris的日志分析系统。
在开始之前,从网站下载Apache Doris 2.0或更新版本,并部署集群。
第一步:创建表
以下是一个表创建的示例。
配置解释:
- 将DATETIMEV2时间字段指定为Key,以加快查询最新N条日志记录的速度。
- 为经常访问的字段创建索引,并使用解析器参数指定需要进行全文搜索的字段。
- "PARTITION BY RANGE"表示按照时间字段的范围对数据进行分区,启用Dynamic Partition实现自动管理。
- "DISTRIBUTED BY RANDOM BUCKETS AUTO"表示将数据随机分布到桶中,并根据集群规模和数据量自动确定桶的数量。
- "log_policy_1day"和"log_s3"表示将超过1天的日志移动到S3存储中。
CREATE DATABASE log_db;
USE log_db;
CREATE RESOURCE "log_s3"
PROPERTIES
(
"type" = "s3",
"s3.endpoint" = "your_endpoint_url",
"s3.region" = "your_region",
"s3.bucket" = "your_bucket",
"s3.root.path" = "your_path",
"s3.access_key" = "your_ak",
"s3.secret_key" = "your_sk"
);
CREATE STORAGE POLICY log_policy_1day
PROPERTIES(
"storage_resource" = "log_s3",
"cooldown_ttl" = "86400"
);
CREATE TABLE log_table
(
`ts` DATETIMEV2,
`clientip` VARCHAR(20),
`request` TEXT,
`status` INT,
`size` INT,
INDEX idx_size (`size`) USING INVERTED,
INDEX idx_status (`status`) USING INVERTED,
INDEX idx_clientip (`clientip`) USING INVERTED,
INDEX idx_request (`request`) USING INVERTED PROPERTIES("parser" = "english")
)
ENGINE = OLAP
DUPLICATE KEY(`ts`)
PARTITION BY RANGE(`ts`) ()
DISTRIBUTED BY RANDOM BUCKETS AUTO
PROPERTIES (
"replication_num" = "1",
"storage_policy" = "log_policy_1day",
"deprecated_dynamic_schema" = "true",
"dynamic_partition.enable" = "true",
"dynamic_partition.time_unit" = "DAY",
"dynamic_partition.start" = "-3",
"dynamic_partition.end" = "7",
"dynamic_partition.prefix" = "p",
"dynamic_partition.buckets" = "AUTO",
"dynamic_partition.replication_num" = "1"
);
第二步:采集日志
Apache Doris支持多种数据采集方法。对于实时日志,推荐以下三种方法:
- 从Kafka消息队列中拉取日志:Routine Load
- 使用Logstash:通过HTTP API将日志写入Doris
- 自定义编写程序:通过HTTP API将日志写入Doris
从Kafka采集
对于写入Kafka消息队列的JSON日志,创建Routine Load以便Doris从Kafka中拉取数据。下面是一个示例。property.*的配置是可选的:
-- 准备Kafka集群和主题("log_topic")
-- 创建Routine Load,从Kafka的"log_topic"加载数据到"log_table"
CREATE ROUTINE LOAD load_log_kafka ON log_db.log_table
COLUMNS(ts, clientip, request, status, size)
PROPERTIES (
"max_batch_interval" = "10",
"max_batch_rows" = "1000000",
"max_batch_size" = "109715200",
"strict_mode" = "false",
"format" = "json"
)
FROM KAFKA (
"kafka_broker_list" = "host:port",
"kafka_topic" = "log_topic",
"property.group.id" = "your_group_id",
"property.security.protocol"="SASL_PLAINTEXT",
"property.sasl.mechanism"="GSSAPI",
"property.sasl.kerberos.service.name"="kafka",
"property.sasl.kerberos.keytab"="/path/to/xxx.keytab",
"property.sasl.kerberos.principal"="xxx@yyy.com"
);
可以使用SHOW ROUTINE LOAD命令来查看Routine Load的运行情况。
通过Logstash进行采集
配置Logstash的HTTP Output,然后数据将通过HTTP Stream Load发送到Doris。
1. 在logstash.yml中指定批量大小(batch size)和批量延迟(batch delay),以提高数据写入性能。
pipeline.batch.size: 100000
pipeline.batch.delay: 10000
2.在日志收集配置文件testlog.conf中添加HTTP Output,URL为Doris中的Stream Load地址。
- 由于Logstash不支持HTTP重定向,应该使用后端地址而不是前端地址。
- 头部的Authorization是基于http基本身份验证。可以使用echo -n 'username:password' | base64计算。
- 头部的load_to_single_tablet参数可以减少数据摄取过程中的小文件数量。
output {
http {
follow_redirects => true
keepalive => false
http_method => "put"
url => "http://172.21.0.5:8640/api/logdb/logtable/_stream_load"
headers => [
"format", "json",
"strip_outer_array", "true",
"load_to_single_tablet", "true",
"Authorization", "Basic cm9vdDo=",
"Expect", "100-continue"
]
format => "json_batch"
}
}
通过自定义程序进行采集
以下是通过HTTP Stream Load将数据采集到Doris的示例。
注意事项:
- 使用基本身份验证进行HTTP授权,可以使用echo -n 'username:password' | base64进行计算。
- http头部"format:json":指定数据类型为JSON。
- http头部"read_json_by_line:true":每行表示一个JSON记录。
- http头部"load_to_single_tablet:true":每次写入一个Tablet。
- 对于数据写入客户端,建议使用100MB~1GB的批量大小。未来版本将在服务器端启用Group Commit并减少客户端的批量大小。
curl \
--location-trusted \
-u username:password \
-H "format:json" \
-H "read_json_by_line:true" \
-H "load_to_single_tablet:true" \
-T logfile.json \
http://fe_host:fe_http_port/api/log_db/log_table/_stream_load
第三步:执行查询
Apache Doris支持标准SQL,因此可以通过MySQL客户端或JDBC连接到Doris,然后执行SQL查询。
mysql -h fe_host -P fe_mysql_port -u root -Dlog_db
日志分析中的几个常见查询:
- 查看最近10条记录。
SELECT * FROM log_table ORDER BY ts DESC LIMIT 10;
- 查看客户端IP“8.8.8.8”的最新10条记录。
SELECT * FROM log_table WHERE clientip = '8.8.8.8' ORDER BY ts DESC LIMIT 10;
- 检索“请求”字段中包含“错误”或“404”的最新 10 条记录。MATCH_ANY是Doris中用于全文搜索的SQL语法关键字。意思是查找包含任意一个指定关键字的记录。
SELECT * FROM log_table WHERE request MATCH_ANY 'error 404' ORDER BY ts DESC LIMIT 10;
- 检索“请求”字段中包含“图像”和“常见问题解答”的最新 10 条记录。MATCH_ALL也是Doris中用于全文搜索的SQL语法关键字。意思是查找包含所有指定关键字的记录。
SELECT * FROM log_table WHERE request MATCH_ALL 'image faq' ORDER BY ts DESC LIMIT 10;
结论
如果你正在寻找高效的日志分析解决方案,Apache Doris 对任何具备 SQL 知识的人都很友好;如果你发现 ELK 堆栈存在问题,请尝试 Apache Doris 提供更好的无模式支持,实现更快的数据写入和查询,并带来更少的存储负担。