Last active
April 29, 2019 22:42
-
-
Save bvaradar/d155b813ae9932bef08237badf9faee0 to your computer and use it in GitHub Desktop.
Spark Stage Retry Reproducing Patch
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
diff --git a/docker/compose/docker-compose_hadoop284_hive233_spark231.yml b/docker/compose/docker-compose_hadoop284_hive233_spark231.yml | |
index bbb9f10e..015c9e2b 100644 | |
--- a/docker/compose/docker-compose_hadoop284_hive233_spark231.yml | |
+++ b/docker/compose/docker-compose_hadoop284_hive233_spark231.yml | |
@@ -145,6 +145,45 @@ services: | |
- "8081:8081" | |
environment: | |
- "SPARK_MASTER=spark://sparkmaster:7077" | |
+ - "SPARK_WORKER_WEBUI_PORT=8081" | |
+ links: | |
+ - "hivemetastore" | |
+ - "hiveserver" | |
+ - "hive-metastore-postgresql" | |
+ - "namenode" | |
+ | |
+ spark-worker-2: | |
+ image: apachehudi/hudi-hadoop_2.8.4-hive_2.3.3-sparkworker_2.3.1:latest | |
+ hostname: spark-worker-2 | |
+ container_name: spark-worker-2 | |
+ env_file: | |
+ - ./hadoop.env | |
+ depends_on: | |
+ - sparkmaster | |
+ ports: | |
+ - "8082:8082" | |
+ environment: | |
+ - "SPARK_MASTER=spark://sparkmaster:7077" | |
+ - "SPARK_WORKER_WEBUI_PORT=8082" | |
+ links: | |
+ - "hivemetastore" | |
+ - "hiveserver" | |
+ - "hive-metastore-postgresql" | |
+ - "namenode" | |
+ | |
+ spark-worker-3: | |
+ image: apachehudi/hudi-hadoop_2.8.4-hive_2.3.3-sparkworker_2.3.1:latest | |
+ hostname: spark-worker-3 | |
+ container_name: spark-worker-3 | |
+ env_file: | |
+ - ./hadoop.env | |
+ depends_on: | |
+ - sparkmaster | |
+ ports: | |
+ - "8083:8083" | |
+ environment: | |
+ - "SPARK_MASTER=spark://sparkmaster:7077" | |
+ - "SPARK_WORKER_WEBUI_PORT=8083" | |
links: | |
- "hivemetastore" | |
- "hiveserver" | |
diff --git a/docker/demo/config/spark-defaults.conf b/docker/demo/config/spark-defaults.conf | |
index e496b46c..f348ed67 100644 | |
--- a/docker/demo/config/spark-defaults.conf | |
+++ b/docker/demo/config/spark-defaults.conf | |
@@ -19,7 +19,7 @@ | |
# This is useful for setting default environmental settings. | |
# Example: | |
-spark.master local[3] | |
+spark.master spark://sparkmaster:7077 | |
spark.eventLog.dir hdfs://namenode:8020/tmp/spark-events | |
spark.serializer org.apache.spark.serializer.KryoSerializer | |
#spark.executor.memory 4g | |
diff --git a/docker/demo/data/batch_1.json b/docker/demo/data/batch_1.json | |
index 3e7b149a..f9c895b3 100644 | |
--- a/docker/demo/data/batch_1.json | |
+++ b/docker/demo/data/batch_1.json | |
@@ -3480,3 +3480,5 @@ | |
{"volume": 10294, "symbol": "CRM", "ts": "2018-08-31 10:27:00", "month": "08", "high": 153.14, "low": 153.0607, "key": "CRM_2018-08-31 10", "year": 2018, "date": "2018/08/31", "close": 153.0607, "open": 153.14, "day": "31"} | |
{"volume": 6031, "symbol": "CRM", "ts": "2018-08-31 10:28:00", "month": "08", "high": 153.0, "low": 152.88, "key": "CRM_2018-08-31 10", "year": 2018, "date": "2018/08/31", "close": 152.88, "open": 153.0, "day": "31"} | |
{"volume": 6057, "symbol": "CRM", "ts": "2018-08-31 10:29:00", "month": "08", "high": 152.88, "low": 152.87, "key": "CRM_2018-08-31 10", "year": 2018, "date": "2018/08/31", "close": 152.87, "open": 152.88, "day": "31"} | |
+{"volume": 483951, "symbol": "MSFT", "ts": "2018-07-31 09:30:00", "month": "08", "high": 111.74, "low": 111.55, "key": "MSFT_2018-07-31 09", "year": 2018, "date": "2018/07/31", "close": 111.72, "open": 111.55, "day": "31"} | |
+{"volume": 1533226, "symbol": "AAPL", "ts": "2018-07-31 09:30:00", "month": "08", "high": 227.3101, "low": 226.23, "key": "AAPL_2018-07-31 09", "year": 2018, "date": "2018/07/31", "close": 227.3101, "open": 226.53, "day": "31"} | |
diff --git a/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java b/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java | |
index 40572f2c..c8f5b7d2 100644 | |
--- a/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java | |
+++ b/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java | |
@@ -416,6 +416,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo | |
private JavaRDD<WriteStatus> upsertRecordsInternal(JavaRDD<HoodieRecord<T>> preppedRecords, | |
String commitTime, HoodieTable<T> hoodieTable, final boolean isUpsert) { | |
+ logger.info("VB - Local code running !!"); | |
// Cache the tagged records, so we don't end up computing both | |
// TODO: Consistent contract in HoodieWriteClient regarding preppedRecord storage level handling | |
diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieCreateHandle.java b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieCreateHandle.java | |
index 49129eaf..6b99b36f 100644 | |
--- a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieCreateHandle.java | |
+++ b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieCreateHandle.java | |
@@ -25,6 +25,7 @@ import com.uber.hoodie.common.model.HoodieWriteStat; | |
import com.uber.hoodie.common.model.HoodieWriteStat.RuntimeStats; | |
import com.uber.hoodie.common.util.FSUtils; | |
import com.uber.hoodie.config.HoodieWriteConfig; | |
+import com.uber.hoodie.exception.HoodieException; | |
import com.uber.hoodie.exception.HoodieInsertException; | |
import com.uber.hoodie.io.storage.HoodieStorageWriter; | |
import com.uber.hoodie.io.storage.HoodieStorageWriterFactory; | |
@@ -50,15 +51,16 @@ public class HoodieCreateHandle<T extends HoodieRecordPayload> extends HoodieIOH | |
private long recordsDeleted = 0; | |
private Iterator<HoodieRecord<T>> recordIterator; | |
private boolean useWriterSchema = false; | |
+ private final String partitionPath; | |
+ private boolean createdRetryFile = false; | |
public HoodieCreateHandle(HoodieWriteConfig config, String commitTime, HoodieTable<T> hoodieTable, | |
String partitionPath, String fileId) { | |
super(config, commitTime, fileId, hoodieTable); | |
writeStatus.setFileId(fileId); | |
writeStatus.setPartitionPath(partitionPath); | |
- | |
this.path = makeNewPath(partitionPath); | |
- | |
+ this.partitionPath = partitionPath; | |
try { | |
HoodiePartitionMetadata partitionMetadata = new HoodiePartitionMetadata(fs, commitTime, | |
new Path(config.getBasePath()), FSUtils.getPartitionPath(config.getBasePath(), partitionPath)); | |
@@ -88,6 +90,38 @@ public class HoodieCreateHandle<T extends HoodieRecordPayload> extends HoodieIOH | |
return storageWriter.canWrite() && record.getPartitionPath().equals(writeStatus.getPartitionPath()); | |
} | |
+ public void reproduceRetryFailure(String partitionPath) throws Exception { | |
+ Path markerRootPath = new Path(hoodieTable.getMetaClient().getMarkerFolderPath(commitTime)); | |
+ Path path = new Path(FSUtils.getPartitionPath(markerRootPath, partitionPath), ".retry"); | |
+ if (partitionPath.equals("2017/08/31")) { | |
+ if (!fs.exists(path)) { | |
+ createdRetryFile = true; | |
+ fs.create(path, false); | |
+ logger.warn("Sleeping for 1 min. Will write after that"); | |
+ Thread.sleep(60 * 1000); | |
+ } else if (createdRetryFile) { | |
+ // Same task which created retry file. Let it succeed | |
+ logger.warn("Attempt#" + TaskContext.get().attemptNumber() + " Allowing through"); | |
+ } else { | |
+ // Duplicate run | |
+ logger.warn("Sleeping for 1.5 mins. Will error out after that"); | |
+ Thread.sleep(90 * 1000); | |
+ throw new IOException("Dummy Error"); | |
+ } | |
+ } else { | |
+ if (!fs.exists(path)) { | |
+ createdRetryFile = true; | |
+ fs.create(path, false); | |
+ logger.warn("Sleeping for 10 secs. Will exit after that to force stage retry"); | |
+ Thread.sleep(10 * 1000); | |
+ logger.warn("Exiting !!"); | |
+ System.exit(-1); | |
+ } else { | |
+ logger.warn("Attempt#" + TaskContext.get().attemptNumber() + " Passing through"); | |
+ } | |
+ } | |
+ } | |
+ | |
/** | |
* Perform the actual writing of the given record into the backing file. | |
*/ | |
@@ -116,6 +150,11 @@ public class HoodieCreateHandle<T extends HoodieRecordPayload> extends HoodieIOH | |
writeStatus.markFailure(record, t, recordMetadata); | |
logger.error("Error writing record " + record, t); | |
} | |
+ try { | |
+ reproduceRetryFailure(partitionPath); | |
+ } catch (Exception e) { | |
+ throw new HoodieException(e.getMessage(), e); | |
+ } | |
} | |
/** |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Using the above patch, I was able to reproduce Stage retries where duplicate tasks are retried.
Steps:
You will notice 2 marker files per partition. One of them is a duplicate. After deltastreamer completes, you should see that the marker folder is erased and only one parquet file per partition exists. Duplicate is cleared during finalize.
In the logs, you will also notice :
2019-04-29 22:07:37 WARN TaskSetManager:66 - Lost task 1.2 in stage 21.0 (TID 28, 172.18.0.11, executor 0): FetchFailed(null, shuffleId=6, mapId=-1, reduceId=1, message=
org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 6
at org.apache.spark.MapOutputTracker$$anonfun$convertMapStatuses$2.apply(MapOutputTracker.scala:867)
......
2019-04-29 22:07:37 INFO TaskSetManager:54 - Task 1.2 in stage 21.0 (TID 28) failed, but the task will not be re-executed (either because the task failed with a shuffle data fetch failure, so the previous stage needs to be re-run, or because a different copy of the task has already succeeded).