Last active
March 12, 2018 10:38
-
-
Save pulkitsinghal/9806702 to your computer and use it in GitHub Desktop.
Reindex data from one elasticsearch index to another
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import io.searchbox.Action; | |
import io.searchbox.client.JestClient; | |
import io.searchbox.client.JestClientFactory; | |
import io.searchbox.client.JestResult; | |
import io.searchbox.client.config.ClientConfig; | |
import io.searchbox.core.Bulk; | |
import io.searchbox.core.Bulk.Builder; | |
import io.searchbox.core.Index; | |
import io.searchbox.core.Search; | |
import io.searchbox.core.SearchScroll; | |
import io.searchbox.params.Parameters; | |
import io.searchbox.params.SearchType; | |
import java.util.List; | |
import java.util.Map; | |
import org.elasticsearch.index.query.QueryBuilders; | |
import org.elasticsearch.search.builder.SearchSourceBuilder; | |
import org.junit.Assert; | |
import org.junit.Test; | |
public class ReindexData { | |
public static String FROM_INDEX = System.getProperty("FROM_INDEX"); | |
public static String TO_INDEX = System.getProperty("TO_INDEX"); | |
private static final String INDEX_TYPE = System.getProperty("INDEX_TYPE"); | |
private int PAGE_SIZE = Integer.parseInt(System.getProperty("PAGE_SIZE", "1000")); | |
private String ELASTICSEARCH_URL = | |
System.getProperty( | |
"ELASTICSEARCH_URL", | |
"https://username:password@my.elasticsearch.com:443" | |
); | |
@Test | |
public void reindex() | |
{ | |
Assert.assertNotNull(ELASTICSEARCH_URL); | |
Assert.assertNotNull(FROM_INDEX); | |
Assert.assertNotNull(TO_INDEX); | |
Assert.assertNotNull(INDEX_TYPE); | |
// Configuration | |
ClientConfig clientConfig = | |
new ClientConfig.Builder(ELASTICSEARCH_URL) | |
.multiThreaded(false) | |
.build(); | |
// Construct a new Jest client according to configuration via factory | |
JestClientFactory factory = new JestClientFactory(); | |
factory.setClientConfig(clientConfig); | |
JestClient client = factory.getObject(); | |
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); | |
searchSourceBuilder.query(QueryBuilders.matchAllQuery()); | |
reindexData(client, searchSourceBuilder, INDEX_TYPE); | |
System.out.println("************************"); | |
} | |
private void reindexData( | |
JestClient client, | |
SearchSourceBuilder searchSourceBuilder, | |
String type) { | |
Search search = new Search.Builder(searchSourceBuilder.toString()) | |
.addIndex(FROM_INDEX) | |
.addType(type) | |
.setParameter(Parameters.SEARCH_TYPE, SearchType.SCAN) | |
.setParameter(Parameters.SIZE, PAGE_SIZE) | |
.setParameter(Parameters.SCROLL, "5m") | |
.build(); | |
System.out.println(search.getData(null)); | |
JestResult result = handleResult(client, search); | |
String scrollId = result.getJsonObject().get("_scroll_id").getAsString(); | |
int currentResultSize = 0; | |
int pageNumber = 1; | |
do { | |
SearchScroll scroll = new SearchScroll.Builder(scrollId, "5m").build(); | |
result = handleResult(client, scroll); | |
scrollId = result.getJsonObject().get("_scroll_id").getAsString(); | |
List hits = ((List) ((Map) result.getJsonMap().get("hits")).get("hits")); | |
currentResultSize = hits.size(); | |
System.out.println("finished scrolling page # " + pageNumber++ + " which had " + currentResultSize + " results."); | |
Builder bulkIndexBuilder = new Bulk.Builder() | |
.defaultIndex(TO_INDEX) | |
.defaultType(type); | |
boolean somethingToIndex = false; | |
for (int i = 0; i < currentResultSize; i++) { | |
Map source = ((Map) ((Map) hits.get(i)).get("_source")); | |
String sourceId = ((String) ((Map) hits.get(i)).get("_id")); | |
System.out.println("adding " + sourceId + " for bulk indexing"); | |
// TODO: we could transform the source if we wanted to here, | |
// before adding it to the bulk index queue | |
Index index = new Index.Builder(source) | |
.index(TO_INDEX) | |
.type(type) | |
.id(sourceId) | |
.build(); | |
bulkIndexBuilder = bulkIndexBuilder.addAction(index); | |
somethingToIndex = true; | |
} | |
if (somethingToIndex) { | |
Bulk bulk = bulkIndexBuilder.build(); | |
//System.out.println(bulk.getData(null)); | |
handleResult(client, bulk); | |
} else { | |
System.out.println("there weren't any results to index in this set/page"); | |
} | |
} while (currentResultSize == PAGE_SIZE); | |
} | |
protected JestResult handleResult (JestClient client, Action action) { | |
JestResult result = null; | |
try { | |
result = client.execute(action); | |
if (result.isSucceeded()) { | |
System.out.println(result.getJsonString()); | |
//List hits = ((List) ((Map) result.getJsonMap().get("hits")).get("hits")); | |
//System.out.println("hits.size(): " + hits.size()); | |
} else { | |
System.out.println(result.getErrorMessage()); | |
System.out.println(result.getJsonString()); | |
System.exit(0); | |
} | |
} catch (Exception e) { | |
e.printStackTrace(); | |
System.exit(0); | |
} | |
return result; | |
} | |
} |
"} while (currentResultSize == PAGE_SIZE);" looks suspect. One shard can finish its results before the others do, then you'd have a result that's less than the PAGE_SIZE, but still get results on subsequent calls. A exit condition of " currentResultSize > 0 " maybe better in some situations.
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Folks that find this snippet might also find the following utility to be more useful: http://tech.taskrabbit.com/blog/2014/01/06/elasticsearch-dump/