Skip to content

Instantly share code, notes, and snippets.

@egalpin
Created May 24, 2023 21:26
Show Gist options
  • Save egalpin/50edc34c7ca7f1c222fd5c4244a14e59 to your computer and use it in GitHub Desktop.
Save egalpin/50edc34c7ca7f1c222fd5c4244a14e59 to your computer and use it in GitHub Desktop.
Tests for RateLimit.java
import static org.junit.Assert.assertTrue;
import org.apache.beam.sdk.testing.NeedsRunner;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PeriodicImpulse;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.Values;
import org.apache.beam.sdk.transforms.WithKeys;
import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.Test;
import org.junit.experimental.categories.Category;
public class RateLimitTest extends BasePipelineTest {
@Test
@Category({NeedsRunner.class})
public void testRateLimitWindowExpiration() throws Exception {
Instant start = Instant.now();
PCollection<String> rawOutputs =
pipeline
.apply(Create.of("bar", "baz", "123"))
.apply(WithKeys.of("foo"))
.apply(RateLimit.<String, String>ofSize(1).per(Duration.standardSeconds(1)))
.apply(Values.create());
PAssert.that(rawOutputs).containsInAnyOrder("bar", "baz", "123");
pipeline.run();
assertTrue(Instant.now().minus(start.getMillis()).getMillis() > 3000);
assertTrue(Instant.now().minus(start.getMillis()).getMillis() < 5000);
}
static MapElements<Instant, String> foo() {
return MapElements.into(TypeDescriptors.strings())
.via((SerializableFunction<Instant, String>) input -> "foo");
}
@Test
@Category({NeedsRunner.class})
public void testRateLimitIntervalExpiration() throws Exception {
Duration rateLimitInterval = Duration.millis(100);
Instant watermark = Instant.now();
PCollection<String> rawOutputs =
pipeline
.apply(
PeriodicImpulse.create()
.withInterval(rateLimitInterval.dividedBy(2))
.startAt(watermark)
.stopAt(watermark.plus(rateLimitInterval.multipliedBy(5))))
.apply(foo())
.apply(WithKeys.of("foo"))
.apply(RateLimit.<String, String>ofSize(1).per(rateLimitInterval))
.apply(Values.create());
PCollection<Long> numElems =
rawOutputs
.apply(
Window.<String>into(new GlobalWindows())
.triggering(AfterWatermark.pastEndOfWindow())
.withAllowedLateness(Duration.ZERO)
.discardingFiredPanes())
.apply(Count.globally());
PAssert.that(rawOutputs)
.containsInAnyOrder(
"foo", "foo", "foo", "foo", "foo", "foo", "foo", "foo", "foo", "foo", "foo");
PAssert.that(numElems).containsInAnyOrder(11L);
pipeline.run();
}
@Test
@Category({NeedsRunner.class})
public void testRateLimitBytesIntervalExpiration() throws Exception {
Duration rateLimitInterval = Duration.millis(100);
Instant watermark = Instant.now();
PCollection<String> rawOutputs =
pipeline
.apply(
PeriodicImpulse.create()
.withInterval(rateLimitInterval.dividedBy(2))
.startAt(watermark)
.stopAt(watermark.plus(rateLimitInterval.multipliedBy(5))))
.apply(foo())
.apply(WithKeys.of("foo"))
.apply(RateLimit.<String, String>ofByteSize(1).per(rateLimitInterval))
.apply(Values.create());
PCollection<Long> numElems =
rawOutputs
.apply(
Window.<String>into(new GlobalWindows())
.triggering(AfterWatermark.pastEndOfWindow())
.withAllowedLateness(Duration.ZERO)
.discardingFiredPanes())
.apply(Count.globally());
PAssert.that(rawOutputs)
.containsInAnyOrder(
"foo", "foo", "foo", "foo", "foo", "foo", "foo", "foo", "foo", "foo", "foo");
PAssert.that(numElems).containsInAnyOrder(11L);
pipeline.run();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment