Django 中零宕机重新索引 Elasticsearch 数据

How to Reindex the Elasticsearch Index data with Zero Downtime

March 31, 2018 - 5 minute read -
elasticsearch haystack django reindex zero downtime

一句话

在使用 Django 开发 web 应用,Haystack 作为搜索接口,Elasticsearch 作为搜索引擎的背景下,因为 haystack 提供的重建索引的命令会导致长时间的数据丢失,所以本文中介绍了我们如何使用 Elasticsearch 提供的 index alias 和 Haystack 多个 connection 的方式,结合蓝绿部署的概念,实现了零宕机的快速重新索引。

背景

承接上篇,在完成可配置化的 Haystack Elasticsearch Backend 后,这篇文章,我们就来介绍如何真正的实现零宕机的重新索引数据。

官方对重新索引的说明

使用更新后的 settings 创建一个新的 index,然后从旧的 index 中拷贝所有的数据到新的 index

由上篇文章,我们可以知道,haystack 提供的重新索引的方式(重建索引),对于一个线上的应用来说是不可接受的,那我们该如何实现零宕机的重新索引呢?

这里我们借鉴了蓝绿部署的思想,结合 Elasticsearch Index Alias 和 Haystack Multiple Connection 的功能,满足了我们的需求,实现了零宕机的,随时随地的,重新索引我们的数据。

重新索引的原因是在我们修改了索引的设置后,数据也不一定会被正确索引,设置也不一定会即时生效,根据 Index 设置的类型,有如下几种情况

  • 静态的设置 (static settings) 只有在 index 创建时或关闭之后修改才生效,而动态的设置 (dynamic settings) 可通过 update-index-settings 的 API 随时修改,即时生效。
  • 我们在新建 analyzer 或是修改了字段设置后,需要重新索引( reindex ) 才能保证数据被正确的索引。

Tips: 我们可以在索引中添加新类型,或是添加新的字段到该类型中,而不需要重新索引数据。

Multiple Haystack Connections

首先,在 Django Settings 中,添加 Haystack 的 Connections

# sepcify the haystack connections alias which are real indices.
ELASTICSEARCH_REAL_INDICES = ('blue', 'green')

HAYSTACK_CONNECTIONS = {
    'default': {
        'ENGINE': 'myapp.backends.ConfigurableElasticSearchEngine',
        'URL': env_var('HAYSTACK_URL', 'http://127.0.0.1:9200/'),
        'INDEX_NAME': 'haystack',
    },
    'blue': {
        'ENGINE': 'myapp.backends.ConfigurableElasticSearchEngine',
        'URL': env_var('HAYSTACK_URL', 'http://127.0.0.1:9200/'),
        'INDEX_NAME': 'haystack_blue',
    },
    'green': {
        'ENGINE': 'myapp.backends.ConfigurableElasticSearchEngine',
        'URL': env_var('HAYSTACK_URL', 'http://127.0.0.1:9200/'),
        'INDEX_NAME': 'haystack_green',
    }
}

我们将 blue / green 中的一个 index 的别名设置为 default 的 index 名称,这样对于我们的应用来说,索引的名称是始终固定的 haystack,只是我们在更新或添加新的设置时,交替的将 blue 和 green 的别名指向到 default 上,类似于蓝绿部署一样的功能,实现了灵活的零宕机的重新索引。

Django management command for reindex data

我们使用 Django 的管理命令具体实现了重新索引 (reindex) 这个行为,并将其添加到了我们的 CI 的 pipeline 中,作为一个按需的任务,交由开发人员使用。

我们重新索引的逻辑是这样的:

  1. 判断用户是否指定 --rollback 参数,决定是否要将当前默认 connection 的 index 别名指定到上一版本的真实 index 名称(在我们重新索引之后,如果出现意外情况,我们可以用其实现回滚)
  2. 获取当前的和新的 connections 的名称,得知哪个应该是 blue,哪个应该是 green
  3. 在非回滚的情况下,删除新的 connection 的 index,之后使用从代码中获取最新的 index settings 和 field mappings
  4. 判断用户是否指定 --copy 参数,决定是 reindex 还是新建索引并从数据库中加载数据(在我们没有修改mapping 时可以指定 copy 参数实现快速索引)。
  5. 判断默认的 connection 中 index 是否为别名 (是否为第一次运行 reindex),来决定是删除默认连接的 index 并为其指定 index 别名,还是重新指定新的 index 别名。
from django.conf import settings
from django.core.management import call_command
from django.core.management.base import BaseCommand

from elasticsearch import Elasticsearch
from elasticsearch.helpers import reindex


class Toggle:
    def __init__(self, first, second):
        self.values = [first, second]
        self.index = 2

    def next(self):
        self.index = 3 - self.index
        return self.values[self.index - 1]


class Command(BaseCommand):
    DEFAULT_ES_CONNECTION = settings.HAYSTACK_CONNECTIONS['default']

    def __init__(self):
        super(Command, self).__init__()
        self.real_indices = settings.ELASTICSEARCH_REAL_INDICES
        self.index_alias = self.DEFAULT_ES_CONNECTION['INDEX_NAME']
        self.es_client = Elasticsearch(hosts=self.DEFAULT_ES_CONNECTION['URL'])

    def get_indices(self):
        connection = Toggle(*settings.ELASTICSEARCH_REAL_INDICES)
        next_connection = connection.next()

        if not self.es_client.indices.exists_alias(name=self.index_alias):
            return self.index_alias, next_connection

        current_real_indices = self.es_client.indices.get_alias(name=self.index_alias).keys()
        current_real_index = list(current_real_indices)[0]

        if next_connection in current_real_index:
            next_connection = connection.next()

        return current_real_index, next_connection

    def add_arguments(self, parser):
        parser.add_argument(
            '-c', '--copy', action='store_true', default=False,
            help='Copy the index data from old one instead update from database '
                 'NOTE: only using it when no mappings type changed.'
        )

        parser.add_argument(
            '-r', '--rollback', action='store_true', default=False,
            help='Rollback to the old index.'
        )

    def handle(self, *args, **options):

        rollback = options.get('rollback', False)
        copy = options.get('copy', False)
        current_index, new_version = self.get_indices()
        new_index = settings.HAYSTACK_CONNECTIONS[new_version]['INDEX_NAME']

        if rollback:
            self.stdout.write(
                f"Rolling back to the index {new_index} from {current_index}.")
        else:
            self.stdout.write(
                f"Reindexing the index from {current_index} to {new_index}.")

            self.stdout.write(
                f"Creating the new index {new_index} with specified settings and mappings.")
            self.es_client.indices.delete(index=new_index, ignore=[404, 400])

            if copy:
                self.stdout.write(
                    f"Copying the index from {current_index} to the {new_index}")
                reindex(self.es_client, current_index, new_index)
            else:
                call_command('update_index', using=[new_version], **options)

        self.stdout.write(
            f"Pointing the index {new_index} to the alias {self.index_alias}.")

        if self.es_client.indices.exists_alias(name=self.index_alias):
            update_aliases = {
                "actions": [
                    {"remove": {"index": current_index, "alias": self.index_alias}},
                    {"add": {"index": new_index, "alias": self.index_alias}}
                ]
            }
            self.es_client.indices.update_aliases(body=update_aliases)
        else:
            self.es_client.indices.delete(self.index_alias, ignore=[404, 400])
            self.es_client.indices.put_alias(index=new_index, name=self.index_alias)

        self.stdout.write(u"Successfully reindex.")

NOTES

本文没有完全采用参考文章中的方法,原因如下(原文有坑):

  1. 原文中, reindex 时,使用的是从 Elasticsearch 中获取的 Mapping(应该从代码中生成),这导致代码中修改后的 data field 的设置在修改后不会在新的 index 中被使用。
  2. 原文中,扩展 Search Backend 时,将 setup_complete 属性,设置为了 True,这需要重新覆盖 haystack 所有的 setup 行为 (包括有应用初始化/搜索/更新/删除/清除/重构),都需要人工地使用管理命令操作。

改进

  1. 为 Reindex 的蓝绿更新画一个动态示意图
  2. 为 Reindex 的代码逻辑画一个流程图
  3. 可进一步实现为流水线上一个的通过判断 index settings 的变化,而自动触发的任务,而不需要人为干预,但会将其重新索引的行为发出通知的这样一个流程,实现全自动化的 reindex

参考