Last active
June 9, 2019 13:29
-
-
Save polarnik/10dce91e372ed74ba7038250125f26a6 to your computer and use it in GitHub Desktop.
AMQP Gatling Test (Gatling 3.1.2, Scala 2.12.8, com.rabbitmq:amqp-client:5.3.0)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package simulations | |
import io.gatling.commons.validation.Validation | |
import io.gatling.core.Predef._ | |
import io.gatling.core.action.{RequestAction} | |
import io.gatling.core.action.builder.ActionBuilder | |
import io.gatling.core.session.{Expression, Session} | |
import io.gatling.core.structure.ScenarioContext | |
import io.gatling.core.action.Action | |
import scala.concurrent.duration._ | |
import scala.language.postfixOps | |
import com.rabbitmq.client.ConnectionFactory | |
import com.typesafe.scalalogging.StrictLogging | |
import io.gatling.commons.stats.OK | |
import io.gatling.commons.util.Clock | |
import io.gatling.core.stats.StatsEngine | |
import io.gatling.core.util.NameGen | |
import io.gatling.commons.validation._ | |
class RabbitTest extends Simulation { | |
def amqpSend(message: String) = new ActionBuilder { | |
def build(ctx: ScenarioContext, next: Action): Action = { | |
new AmqpSend(message, ctx, next) | |
} | |
} | |
val scn = scenario("AMQP protocol test") | |
.exec(amqpSend("Hello, World")) | |
setUp( | |
scn.inject( | |
rampUsersPerSec(1) to (60) during(180 seconds), | |
) | |
) | |
} | |
class Around(before: () => Unit, after: () => Unit) { | |
def apply(f: => Any): Unit = { | |
before() | |
f | |
after() | |
} | |
} | |
trait AmqpLogging extends StrictLogging { | |
def logMessage(text: => String, msg: String): Unit = { | |
logger.debug(text) | |
logger.trace(msg.toString) | |
} | |
} | |
class AmqpSend(val message: String, val ctx: ScenarioContext, val next: Action) extends RequestAction with AmqpLogging with NameGen { | |
override val name: String = genName("amqp publish") | |
override val requestName: Expression[String] = "amqp publish" | |
override def clock: Clock = ctx.coreComponents.clock | |
override def statsEngine: StatsEngine = ctx.coreComponents.statsEngine | |
def send(): Unit = { | |
val factory = new ConnectionFactory() | |
factory.setUsername("guest") | |
factory.setPassword("guest") | |
factory.setHost("localhost") | |
factory.setVirtualHost("/") | |
factory.setPort(5672) | |
val conn = factory.newConnection() | |
val channel = conn.createChannel() | |
channel.exchangeDeclare("amqp.direct.1", "direct", true) | |
val messageBodyBytes = message.getBytes() | |
channel.basicPublish("amqp.direct.1", "routing.key.1", null, messageBodyBytes) | |
channel.close() | |
conn.close() | |
} | |
override def sendRequest(requestName: String, session: Session): Validation[Unit] = | |
for { | |
around <- aroundSend(requestName, session) | |
} yield { | |
around( | |
send() | |
) | |
} | |
protected def aroundSend(requestName: String, session: Session): Validation[Around] = { | |
new Around( | |
before = () => { | |
if (logger.underlying.isDebugEnabled) { | |
logMessage(s"Message sent", message) | |
} | |
}, | |
after = () => { | |
ctx.coreComponents.configuration.resolve( | |
{ | |
val now = clock.nowMillis | |
statsEngine.logResponse(session, requestName, session.startDate, now, OK, None, None) | |
} | |
) | |
next ! session | |
} | |
).success | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Report Example