elasticsearch 数据迁移

默认分类·工作 · 2024-01-15

获取所有索引

curl --location 'http://xx.xx.xx.xx:9200/_cat/indices?v=null&h=index&s=index%3Aasc' \
--header 'Authorization: xxx' \
--data ''

将所有index放入index.txt中

创建账号密码凭证auth.ini

user=xxx
password="xxx"

修改代码 es.py

# -*- coding=utf-8 -*-
import os
import multiprocessing

index_names = 'index_1,index_2'

index_list = []
from_host = 'xxx'
to_host = 'xxx'


def migrate_one_index_child(index_name):
    print(str.format("### begin to migrate index, pid: {}, index: {}", os.getpid(), index_name))
    # 迁移settings
    cmd = str.format('elasticdump --limit 10000 --input={0}/{2} --output={1}/{2} --httpAuthFile=auth.ini --type=settings', from_host, to_host, index_name)
    os.system(cmd)
    # 迁移mapping
    cmd = str.format('elasticdump --limit 10000 --input={0}/{2} --output={1}/{2} --httpAuthFile=auth.ini --type=mapping', from_host, to_host, index_name)
    os.system(cmd)
    # 迁移data
    cmd = str.format('elasticdump --limit 10000 --input={0}/{2} --output={1}/{2}  --httpAuthFile=auth.ini --type=data', from_host, to_host, index_name)
    os.system(cmd)
    print(str.format("### end to migrate index, pid: {}, index: {}", os.getpid(), index_name))


def pre_handle():
    with open('index.txt', 'r') as f:
        for line in f.readlines():
            line = line.strip('\n')
            index_list.append(line)
            print(line)


if __name__ == "__main__":
    pre_handle()
    pool = multiprocessing.Pool(processes=5)
    for index in index_list:
        pool.apply_async(migrate_one_index_child, (index,))
    pool.close()
    pool.join()
elasticsearch 数据迁移 脚本
Theme Jasmine by Kent Liao