一句话
在使用 Django 开发 web 应用,Haystack 作为搜索接口,Elasticsearch 作为搜索引擎的背景下,因为 haystack 提供的重建索引的命令会导致长时间的数据丢失,所以本文中介绍了我们如何使用 Elasticsearch 提供的 index alias 和 Haystack 多个 connection 的方式,结合蓝绿部署的概念,实现了零宕机的快速重新索引。
背景
承接上篇,在完成可配置化的 Haystack Elasticsearch Backend 后,这篇文章,我们就来介绍如何真正的实现零宕机的重新索引数据。
使用更新后的 settings 创建一个新的 index,然后从旧的 index 中拷贝所有的数据到新的 index
由上篇文章,我们可以知道,haystack 提供的重新索引的方式(重建索引),对于一个线上的应用来说是不可接受的,那我们该如何实现零宕机的重新索引呢?
这里我们借鉴了蓝绿部署的思想,结合 Elasticsearch Index Alias 和 Haystack Multiple Connection 的功能,满足了我们的需求,实现了零宕机的,随时随地的,重新索引我们的数据。
重新索引的原因是在我们修改了索引的设置后,数据也不一定会被正确索引,设置也不一定会即时生效,根据 Index 设置的类型,有如下几种情况:
- 静态的设置 (static settings) 只有在 index 创建时或关闭之后修改才生效,而动态的设置 (dynamic settings) 可通过 update-index-settings 的 API 随时修改,即时生效。
- 我们在新建 analyzer 或是修改了字段设置后,需要重新索引( reindex ) 才能保证数据被正确的索引。
Tips: 我们可以在索引中添加新类型,或是添加新的字段到该类型中,而不需要重新索引数据。
Multiple Haystack Connections
首先,在 Django Settings 中,添加 Haystack 的 Connections
# sepcify the haystack connections alias which are real indices.
ELASTICSEARCH_REAL_INDICES = ('blue', 'green')
HAYSTACK_CONNECTIONS = {
'default': {
'ENGINE': 'myapp.backends.ConfigurableElasticSearchEngine',
'URL': env_var('HAYSTACK_URL', 'http://127.0.0.1:9200/'),
'INDEX_NAME': 'haystack',
},
'blue': {
'ENGINE': 'myapp.backends.ConfigurableElasticSearchEngine',
'URL': env_var('HAYSTACK_URL', 'http://127.0.0.1:9200/'),
'INDEX_NAME': 'haystack_blue',
},
'green': {
'ENGINE': 'myapp.backends.ConfigurableElasticSearchEngine',
'URL': env_var('HAYSTACK_URL', 'http://127.0.0.1:9200/'),
'INDEX_NAME': 'haystack_green',
}
}
我们将 blue / green 中的一个 index 的别名设置为 default 的 index 名称,这样对于我们的应用来说,索引的名称是始终固定的 haystack,只是我们在更新或添加新的设置时,交替的将 blue 和 green 的别名指向到 default 上,类似于蓝绿部署一样的功能,实现了灵活的零宕机的重新索引。
Django management command for reindex data
我们使用 Django 的管理命令具体实现了重新索引 (reindex) 这个行为,并将其添加到了我们的 CI 的 pipeline 中,作为一个按需的任务,交由开发人员使用。
我们重新索引的逻辑是这样的:
- 判断用户是否指定
--rollback
参数,决定是否要将当前默认 connection 的 index 别名指定到上一版本的真实 index 名称(在我们重新索引之后,如果出现意外情况,我们可以用其实现回滚) - 获取当前的和新的 connections 的名称,得知哪个应该是 blue,哪个应该是 green
- 在非回滚的情况下,删除新的 connection 的 index,之后使用从代码中获取最新的 index settings 和 field mappings
- 判断用户是否指定
--copy
参数,决定是 reindex 还是新建索引并从数据库中加载数据(在我们没有修改mapping 时可以指定 copy 参数实现快速索引)。 - 判断默认的 connection 中 index 是否为别名 (是否为第一次运行 reindex),来决定是删除默认连接的 index 并为其指定 index 别名,还是重新指定新的 index 别名。
from django.conf import settings
from django.core.management import call_command
from django.core.management.base import BaseCommand
from elasticsearch import Elasticsearch
from elasticsearch.helpers import reindex
class Toggle:
def __init__(self, first, second):
self.values = [first, second]
self.index = 2
def next(self):
self.index = 3 - self.index
return self.values[self.index - 1]
class Command(BaseCommand):
DEFAULT_ES_CONNECTION = settings.HAYSTACK_CONNECTIONS['default']
def __init__(self):
super(Command, self).__init__()
self.real_indices = settings.ELASTICSEARCH_REAL_INDICES
self.index_alias = self.DEFAULT_ES_CONNECTION['INDEX_NAME']
self.es_client = Elasticsearch(hosts=self.DEFAULT_ES_CONNECTION['URL'])
def get_indices(self):
connection = Toggle(*settings.ELASTICSEARCH_REAL_INDICES)
next_connection = connection.next()
if not self.es_client.indices.exists_alias(name=self.index_alias):
return self.index_alias, next_connection
current_real_indices = self.es_client.indices.get_alias(name=self.index_alias).keys()
current_real_index = list(current_real_indices)[0]
if next_connection in current_real_index:
next_connection = connection.next()
return current_real_index, next_connection
def add_arguments(self, parser):
parser.add_argument(
'-c', '--copy', action='store_true', default=False,
help='Copy the index data from old one instead update from database '
'NOTE: only using it when no mappings type changed.'
)
parser.add_argument(
'-r', '--rollback', action='store_true', default=False,
help='Rollback to the old index.'
)
def handle(self, *args, **options):
rollback = options.get('rollback', False)
copy = options.get('copy', False)
current_index, new_version = self.get_indices()
new_index = settings.HAYSTACK_CONNECTIONS[new_version]['INDEX_NAME']
if rollback:
self.stdout.write(
f"Rolling back to the index {new_index} from {current_index}.")
else:
self.stdout.write(
f"Reindexing the index from {current_index} to {new_index}.")
self.stdout.write(
f"Creating the new index {new_index} with specified settings and mappings.")
self.es_client.indices.delete(index=new_index, ignore=[404, 400])
if copy:
self.stdout.write(
f"Copying the index from {current_index} to the {new_index}")
reindex(self.es_client, current_index, new_index)
else:
call_command('update_index', using=[new_version], **options)
self.stdout.write(
f"Pointing the index {new_index} to the alias {self.index_alias}.")
if self.es_client.indices.exists_alias(name=self.index_alias):
update_aliases = {
"actions": [
{"remove": {"index": current_index, "alias": self.index_alias}},
{"add": {"index": new_index, "alias": self.index_alias}}
]
}
self.es_client.indices.update_aliases(body=update_aliases)
else:
self.es_client.indices.delete(self.index_alias, ignore=[404, 400])
self.es_client.indices.put_alias(index=new_index, name=self.index_alias)
self.stdout.write(u"Successfully reindex.")
NOTES
本文没有完全采用参考文章中的方法,原因如下(原文有坑):
- 原文中, reindex 时,使用的是从 Elasticsearch 中获取的 Mapping(应该从代码中生成),这导致代码中修改后的 data field 的设置在修改后不会在新的 index 中被使用。
- 原文中,扩展 Search Backend 时,将
setup_complete
属性,设置为了True
,这需要重新覆盖 haystack 所有的 setup 行为 (包括有应用初始化/搜索/更新/删除/清除/重构),都需要人工地使用管理命令操作。
改进
- 为 Reindex 的蓝绿更新画一个动态示意图
- 为 Reindex 的代码逻辑画一个流程图
- 可进一步实现为流水线上一个的通过判断 index settings 的变化,而自动触发的任务,而不需要人为干预,但会将其重新索引的行为发出通知的这样一个流程,实现全自动化的 reindex
参考
- https://www.elastic.co/guide/en/elasticsearch/guide/current/reindex.html
- https://www.elastic.co/guide/en/elasticsearch/guide/current/index-aliases.html
- https://www.elastic.co/guide/en/elasticsearch/reference/current/index-modules.html
- http://cstrap.blogspot.jp/2015/06/dealing-with-elasticsearch-reindex-and.html