Skip to content

Instantly share code, notes, and snippets.

@ambud
Created October 17, 2017 01:26
Show Gist options
  • Save ambud/641f8fc25f7f8d3923d6fd10f64b7184 to your computer and use it in GitHub Desktop.
Save ambud/641f8fc25f7f8d3923d6fd10f64b7184 to your computer and use it in GitHub Desktop.
Spark Custom Metrics Source
/**
* Copyright 2017 Ambud Sharma
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ambud.sparkmetrics;
import java.io.Serializable;
import org.apache.spark.metrics.source.Source;
import com.codahale.metrics.Counter;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
public class KVMetricsSource implements Source, Serializable {
public static final String KV_METRICS_SOURCE = "hbasesource";
private static final long serialVersionUID = 1L;
private transient MetricRegistry registry;
private transient Counter putCounter;
private transient Counter getCounter;
private transient Counter deleteCounter;
private transient Timer putLatency;
private transient Timer getLatency;
private transient Timer deleteLatency;
public KVMetricsSource() {
registry = new MetricRegistry();
initialize();
}
private void initialize() {
putCounter = new Counter();
getCounter = new Counter();
deleteCounter = new Counter();
putLatency = new Timer();
getLatency = new Timer();
deleteLatency = new Timer();
registry.register("puts", putCounter);
registry.register("gets", getCounter);
registry.register("deletes", deleteCounter);
registry.register("putLatency", putLatency);
registry.register("getLatency", getLatency);
registry.register("deleteLatency", deleteLatency);
}
@Override
public MetricRegistry metricRegistry() {
return registry;
}
@Override
public String sourceName() {
return KV_METRICS_SOURCE;
}
public MetricRegistry getRegistry() {
return registry;
}
public Counter getPutCounter() {
return putCounter;
}
public Counter getGetCounter() {
return getCounter;
}
public Counter getDeleteCounter() {
return deleteCounter;
}
public Timer getPutLatency() {
return putLatency;
}
public Timer getGetLatency() {
return getLatency;
}
public Timer getDeleteLatency() {
return deleteLatency;
}
}
/**
* Copyright 2017 Ambud Sharma
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ambud.sparkmetrics;
import java.util.Arrays;
import java.util.List;
import org.apache.spark.SparkEnv;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.metrics.MetricsSystem;
import org.apache.spark.metrics.source.Source;
import scala.collection.JavaConversions;
import scala.collection.Seq;
/**
*
*/
public class SparkMetrics {
public static void main(String[] args) {
JavaSparkContext sc = new JavaSparkContext("local[*]", "hbase");
MetricsSystem system = SparkEnv.get().metricsSystem();
KVMetricsSource source = new KVMetricsSource();
system.registerSource(source);
JavaRDD<String> rdd = sc.parallelize(Arrays.asList("sdasd", "asdasd", "asdsad", "asdsadasdasd"));
rdd.foreachPartition(p -> {
Seq<Source> sourcesByName = SparkEnv.get().metricsSystem().getSourcesByName(KVMetricsSource.KV_METRICS_SOURCE);
List<Source> sources = JavaConversions.seqAsJavaList(sourcesByName);
p.forEachRemaining(v -> {
KVMetricsSource src = (KVMetricsSource) sources.get(0);
src.getPutCounter().inc();
try {
Thread.sleep(100);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
});
});
sc.close();
}
}
@hdattada
Copy link

hdattada commented Oct 9, 2020

I see a Task not serializable: java.io.NotSerializableException for the KVSourceMetrics is there anything I am missing?

@Raghu290
Copy link

Raghu290 commented Nov 3, 2020

i had the same issue. though you have implemented custom metric and inside transient for counter it wont work. you need to use metric outside your rdd.foreachPartition() loop.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment