Elasticsearch 8.X 小技巧:使用存儲腳本優化數據索引與轉換過程
1、引言
在 Elasticsearch 中,可以使用 Painless 腳本來實現一些非標準的處理結果。這些腳本可以直接嵌入到數據處理管道中,但為了使腳本與管道相互獨立,還可以將腳本單獨存儲在 Elasticsearch 中,并在數據攝取管道(Ingest pipeline)中按需調用它們。
這種存儲腳本的方式,咱們之前也有過介紹,Elasticsearch 中有個專有術語名詞與之對應,叫:stored script 存儲腳本。通過 stored script 方式,可以在不同的地方重復使用同一段腳本,而無需復制代碼。
在Elasticsearch中使用 stored script 存儲腳本是一種高效且靈活的方法,特別適用于那些需要在多個數據處理場景中重復使用相同邏輯的場合。通過這種方式,可以構建更加模塊化、易于管理的數據處理管道。
2、Base64 解碼的存儲腳本實現
如下腳本的目的是將源數據中的字段從Base64格式轉換為解碼后的文本。
2.1 創建 Base64 解碼腳本
PUT /_scripts/decodebase64
{
"script": {
"description": "Decode base64",
"lang": "painless",
"source": "def src=ctx[params['field']]; if (src == null) { return; } def target=params['target_field']; ctx[target]=src.decodeBase64();"
}
}
腳本解讀如下:
- PUT /_scripts/decodebase64: 這部分指示Elasticsearch創建或更新一個名為decodebase64的腳本。
- "script": 腳本的主體部分。
- "description": 腳本的描述,說明了腳本的作用,即解碼Base64。
- "lang": 腳本的編寫語言,這里使用的是Elasticsearch的Painless腳本語言。
- "source": 腳本的具體內容。這個腳本接受一個字段名作為輸入(params['field']),檢查是否為空,如果不為空,則將其Base64解碼并存儲在指定的目標字段(params['target_field'])。
這個腳本可以在Elasticsearch的攝取管道中使用,用于在數據索引之前動態地對字段進行Base64解碼。
2.2 獲取存儲腳本
如下腳本僅驗證,實戰中可忽略。
GET /_scripts/decodebase64
召回結果如下:
{
"_id": "decodebase64",
"found": true,
"script": {
"lang": "painless",
"source": "def src=ctx[params['field']]; if (src == null) { return; } def target=params['target_field']; ctx[target]=src.decodeBase64();"
}
}
注意:之前咱們很少這么用。看細節,上面的召回結果有 "_id": "decodebase64", 你關注一下,一會就能用到!
2.3 創建使用Base64 解碼存儲腳本的管道
PUT /_ingest/pipeline/decodebase64
{
"description": "Decode hash values",
"processors": [
{
"script": {
"id": "decodebase64",
"params": {
"field": "name_base64",
"target_field": "name"
}
}
}
]
}
上述代碼創建了一個名為 decodebase64 的 Elasticsearch 攝取管道,其功能是使用存儲的腳本 decodebase64 將字段 name_base64 中的 Base64 編碼值解碼,并將解碼后的文本存儲到 name 字段中。
和咱們之前講的不同的地方、靈活的地方在于:field 和 target_field 變成了變量了,可以靈活按照項目需求替換之。
2.4 批量寫入數據的時候同時指定 pipeline
POST /fruits/_bulk?pipeline=decodebase64
{"index":{"_id":"1"}}
{"name_base64":"QXBwbGU="}
{"index":{"_id":"2"}}
{"name_base64":"QW5hbmFz"}
{"index":{"_id":"3"}}
{"name_base64":"Q2hlcnJ5"}
如上 bulk 批量寫入的時候指定 pipeline 的方式,咱們之前也少有講解。
GET fruits/_search
結果如下圖所示:
圖片
我們清晰的看到,咱們寫入的 name_base64 字段借助我們創建的管道、基于存儲腳本解碼為 name字段值。
不著急下結論,咱們再看一組例子。
3、16進制解碼的存儲腳本實現
步驟參見第2部分,咱們只講重點。
3.1 創建16進制解碼存儲腳本
如下存儲腳本的目的:在Elasticsearch中創建并存儲一個名為decodehex的腳本,該腳本用于將HEX(十六進制)編碼的字符串轉換為普通文本。
PUT /_scripts/decodehex
{
"script": {
"description": "Decode HEX",
"lang": "painless",
"source": "def src=ctx[params['field']]; if (src == null) { return; } def target=params['target_field']; StringBuilder sb = new StringBuilder(); for (int i = 0; i < src.length(); i += 2) { String byteStr = src.substring(i, i + 2); char byteChar = (char) Integer.parseInt(byteStr, 16); sb.append(byteChar) } ctx[target] = sb.toString();"
}
}
腳本解讀如下:
- PUT /_scripts/decodehex: 這部分指示Elasticsearch創建或更新一個名為decodehex的腳本。
- script: 腳本的主體部分。
- description: 腳本的描述,說明了腳本的作用,即解碼HEX字符串。
- lang: 腳本的編寫語言,這里使用的是Elasticsearch的Painless腳本語言。
- source: 腳本的具體內容。這個腳本接受一個字段名作為輸入(params['field']),檢查是否為空,如果不為空,則將其HEX編碼的內容轉換為普通文本并存儲在指定的目標字段(params['target_field'])。
如上腳本可以在Elasticsearch的攝取管道中使用,用于在數據索引之前動態地對字段進行 HEX 解碼。
3.2 獲取16進制解碼存儲腳本
如下腳本僅驗證,實戰中可忽略。
GET /_scripts/decodehex
召回結果如下:
圖片
3.3 創建使用16進制解碼腳本的管道
PUT /_ingest/pipeline/decodehex
{
"description": "Decode hash values",
"processors": [
{
"script": {
"id": "decodehex",
"params": {
"field": "color_hex",
"target_field": "color"
}
}
}
]
}
該管道的功能是使用存儲的腳本 decodehex 來處理數據:它會取 color_hex 字段中的HEX(十六進制)編碼字符串,將其解碼成普通文本,并將解碼后的結果存儲到 color 字段中。這個過程主要用于在將數據索引到 Elasticsearch 之前自動進行數據轉換和預處理。
同樣,靈活的地方在于:field、target_field 是變量。
3.4 批量寫入數據的時候同時指定 pipeline
POST /fruits_ext/_bulk?pipeline=decodehex
{"index":{"_id":"1"}}
{"color_hex":"477265656e"}
{"index":{"_id":"2"}}
{"color_hex":"59656c6c6f77"}
{"index":{"_id":"3"}}
{"color_hex":"526564"}
如上 bulk 批量寫入的時候指定 pipeline 的方式,咱們之前也少有講解。
GET fruits_ext/_search
結果如下圖所示:
圖片
當然,第2部分、第3部分的存儲腳本使用可以靈活的整合為一部分,如下所示。
PUT /_ingest/pipeline/decodehashes
{
"description": "Decode hash values",
"processors": [
{
"script": {
"id": "decodebase64",
"params": {
"field": "name_base64",
"target_field": "name"
}
}
},
{
"script": {
"id": "decodehex",
"params": {
"field": "color_hex",
"target_field": "color"
}
}
}
]
}
批量構建數據結果:
POST /fruits_all/_bulk?pipeline=decodehashes
{"index":{"_id":"1"}}
{"name_base64":"QXBwbGU=","color_hex":"477265656e"}
{"index":{"_id":"2"}}
{"name_base64":"QW5hbmFz","color_hex":"59656c6c6f77"}
{"index":{"_id":"3"}}
{"name_base64":"Q2hlcnJ5","color_hex":"526564"}
執行檢索效果:
圖片
4、小結
我們一起探索了如何在Elasticsearch中創建并存儲腳本,以及如何檢索這些腳本,以確認它們的 id 和內容。我們還學習了如何在數據處理的攝取管道中調用這些存儲的腳本。
通過這種方法,你可以有效地節省存儲空間,并減少因重復編寫相同腳本而可能出現的錯誤。簡而言之,你只需編寫和存儲一次腳本,就可以在多個地方反復使用,這無疑提高了工作效率,同時也使得數據處理過程更加流暢和可靠。