open62541/epics read-service performance measurement app
#include <time.h>
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <unistd.h>
#ifdef _WIN32
#include <windows.h>
#define MIN(a, b) (((a) < (b)) ? (a) : (b))
#define MAX(a, b) (((a) > (b)) ? (a) : (b))
// Select between supported communication frameworks:
#ifndef OPC
#define EPICS
// Include communication framework header.
#if defined(OPC)
#include "open62541.h"
#elif defined(EPICS)
#include "cadef.h"
// Framework related globals.
#if defined(OPC)
static UA_ReadRequest req;
static UA_Client* client;
#elif defined(EPICS)
#define MAX_T_DISCONN 10 // total accepted disconnects
#define MAX_CONN_RETRIES 20 // retries after 1 disconnect (a 1 second)
#define BENCH_SIZE 4000 // size of benchmark arrays
#define N_BENCH_CHANNELS 4000 // number of benchmark channels
typedef enum {
} channel_type;
typedef struct {
char * pvname;
chid chid;
} channel_struct;
// the names of the benchmark records are
// - benchlong:0 - benchlong:1999 // 2000 int32s
// - benchlongarray:0 - benchlongarray:1999 // arrays of 2000 int32s
// - benchdouble:0 - benchdouble:1999 // doubles
// - benchdoublearray:0 - benchdoublearray:1999 // arrays of 2000 doubles
static channel_struct channels [N_BENCH_CHANNELS];
static channel_type ch_type;
// Define parameters/defaults.
static char* variable_name = NULL;
static int array_size = 1;
static int array_item_size = 4; // int32 assumed. Not configurable yet.
static int transaction_size = 1;
static int transaction_count = 1000;
static float runout_duration = 0.0;
static int verify = 0;
static int verbose = 0;
static int sync_begin = 0;
static int sync_end = 0;
#if defined(OPC)
static char* protocol = "opc.tcp://";
static char* server_address = "localhost";
static char* port_nr = "48010";
static int namespace_index = 1;
const char* measure_trigger = "measure";
// Argument parsing and usage.
static void print_usage(char** argv, char *title)
{ printf ("%s\n", title);
printf("usage: %s\n"
#if defined(OPC)
" [-server server_address ] (default: %s)\n"
" [-port port_nr ] (default: %s)\n"
" [-ns namespace_index ] (default: %d)\n"
" [-var variable_name ] (overrules name constructed from array size)\n"
" [-size array_size ] (default: %d, unit: # int32 items, to construct variable name)\n"
" [-vars transaction_size] (default: %d, unit: # variables)\n"
" [-repeat nr_transactions ] (default: %d)\n"
" [-runout duration ] (default: %g)\n"
" [-verify ] (verify results)\n"
" [-sync_begin ] (wait on '%s' trigger file to sync)\n"
" [-sync_end ] (continue as long as '%s' trigger file present)\n"
" [-v ] (verbose)\n"
" [-h ] (help)\n",
#if defined(OPC)
server_address, port_nr, namespace_index,
array_size, transaction_size, transaction_count, runout_duration,
measure_trigger, measure_trigger
static void parse_arguments(int argc, char** argv)
// Parse arguments.
int i;
for(i = 1; i < argc; i++)
{ if(strcmp(argv[i], "-h") == 0)
{ print_usage(argv, NULL);
else if(strcmp(argv[i], "-v") == 0)
{ verbose = 1;
else if(strcmp(argv[i], "-verify") == 0)
{ verify = 1;
else if(strcmp(argv[i], "-sync_begin") == 0)
{ sync_begin = 1;
else if(strcmp(argv[i], "-sync_end") == 0)
{ sync_end = 1;
else if(i < argc - 1)
#if defined(OPC)
if(strcmp(argv[i], "-server") == 0)
{ server_address = argv[i + 1];
else if(strcmp(argv[i], "-port") == 0)
{ port_nr = argv[i + 1];
else if(strcmp(argv[i], "-ns") == 0)
{ namespace_index = atoi(argv[i + 1]);
if(strcmp(argv[i], "-var") == 0)
{ variable_name = argv[i + 1];
else if(strcmp(argv[i], "-size") == 0)
{ array_size = atoi(argv[i + 1]);
else if(strcmp(argv[i], "-vars") == 0)
{ transaction_size = atoi(argv[i + 1]);
else if(strcmp(argv[i], "-repeat") == 0)
{ transaction_count = atoi(argv[i + 1]);
else if(strcmp(argv[i], "-runout") == 0)
{ runout_duration = atof(argv[i + 1]);
if(variable_name == NULL)
{ variable_name = malloc(128);
#if defined(OPC)
sprintf(variable_name, "var%d", array_size);
#elif defined(EPICS)
sprintf(variable_name, "VAR%d.VAL", array_size);
{ // Make a string copy, such that it can be freed during cleanup. Needed to align with the case above.
char* s = malloc(strlen(variable_name) + 1);
strcpy(s, variable_name);
variable_name = s;
{ printf("%s %s %s %s "
#if defined(OPC)
"-server %s -port %s -ns %d "
"-var %s -size %d -vars %d -repeat %d -runout %g\n",
argv[0], verify ? "-verify" : "", sync_begin ? "-sync_begin" : "", sync_end ? "-sync_end" : "",
#if defined(OPC)
server_address, port_nr, namespace_index,
variable_name, array_size, transaction_size, transaction_count, runout_duration);
#if defined(EPICS)
if (array_size > BENCH_SIZE) {
printf ("EPICS Array size cannot exceed %d\n", BENCH_SIZE); exit (1);
if (transaction_size > N_BENCH_CHANNELS) {
printf ("EPICS Number of channels cannot exceed %d\n", N_BENCH_CHANNELS); exit (1);
if (transaction_size * array_size > BENCH_SIZE) {
printf ("EPICS Array size * number of channels cannot exceed %d\n", BENCH_SIZE); exit (1);
#if defined(EPICS)
// reconnect if status indicates disconnect -------------------------------
static int handle_disconnect (int status, channel_struct *pch)
static int total_disconnects = 0;
int tries = 0;
if ((status==ECA_DISCONN) || (status == ECA_TIMEOUT)) {
while ((status != ECA_NORMAL) &&
(total_disconnects < MAX_T_DISCONN) &&
(tries++ < MAX_CONN_RETRIES) ) {
status = ca_create_channel(pch->pvname, NULL, NULL, 10, &pch->chid);
if (status == ECA_NORMAL) {
status = ca_pend_io (5.0);
if (status != ECA_NORMAL) {
printf ("ERROR: Failed to reconnect after disconnect; pv %s retries = %d; total disconnects = %d\n",
pch->pvname, tries, total_disconnects);
exit (1);
return status;
#if defined(_WIN32)
typedef LARGE_INTEGER Clock_t;
static void get_clock(Clock_t* c)
{ QueryPerformanceCounter(c);
static float get_clock_diff(Clock_t* c1, Clock_t* c2)
return (float)(c1->QuadPart - c2->QuadPart) / f.QuadPart;
#elif _POSIX_C_SOURCE >= 199309L
typedef struct timespec Clock_t;
static void get_clock(Clock_t* c)
{ clock_gettime(CLOCK_REALTIME, c);
static float get_clock_diff(Clock_t* c1, Clock_t* c2)
{ return (c1->tv_sec - c2->tv_sec) + (c1->tv_nsec - c2->tv_nsec )/1e9;
typedef clock_t Clock_t;
static void get_clock(Clock_t* c)
{ *c = clock();
static float get_clock_diff(Clock_t* c1, Clock_t* c2)
{ return (*c1 - *c2)/(float)CLOCKS_PER_SEC;
// Execute the read transactions.
void run_transactions(int max_transaction_count, float max_measurement_duration,
int sync_begin, int sync_end, int print_output)
Clock_t measurement_begin_clock;
Clock_t measurement_end_clock;
int measurement_begin_clocked = 0;
int measurement_end_clocked = 0;
int measurement_ok = 1;
int measuring = 0;
float measurement_duration = 0.0;
int transaction_count = 0;
Clock_t transaction_begin_clock;
Clock_t transaction_end_clock;
float transaction_duration_max = 0.0;
float transaction_duration_min = 1e12;
float transaction_duration_sum = 0.0;
#if defined(OPC)
UA_Int32 value = -1;
#elif defined(EPICS)
long l;
long* pl = (long *)malloc(16384);
while(!measurement_end_clocked &&
(max_transaction_count == 0 || transaction_count < max_transaction_count) &&
(max_measurement_duration == 0.0 || measurement_duration < max_measurement_duration))
{ if(measuring && !verify)
{ printf("%d \r", transaction_count);
else if(!measuring)
{ printf("Waiting ...\r");
#if defined(OPC)
// Do the read transaction.
UA_ReadResponse resp = UA_Client_read(client, &req);
if(resp.responseHeader.serviceResult != UA_STATUSCODE_GOOD)
{ measurement_ok = 0;
printf("ERROR: Read failed. Service result is not good: 0x%X\n", resp.responseHeader.serviceResult);
if(resp.resultsSize != transaction_size)
{ measurement_ok = 0;
printf("ERROR: Read failed. Result size is: %d, instead of: %d\n", resp.resultsSize, transaction_size);
{ measurement_ok = 0;
printf("ERROR: Read failed. Result has no value.\n");
if(resp.results[0].value.type != &UA_TYPES[UA_TYPES_INT32])
{ measurement_ok = 0;
printf("ERROR: Read failed. Result has wrong type: %d\n",
if(resp.resultsSize != transaction_size)
{ measurement_ok = 0;
printf("ERROR: Read failed. Wrong number of results received: %d\n", resp.resultsSize);
UA_Variant* variant = &resp.results[0].value;
int variant_length = UA_Variant_isScalar(variant) ? 1 : variant->arrayLength;
if(variant_length != array_size)
{ measurement_ok = 0;
printf("ERROR: Read failed. Wrong array size: %d\n", variant_length);
value = 0;
if(verify && measuring)
{ // Retrieve the individual values.
int j;
for(j = 0; j < resp.resultsSize; j++)
{ if(resp.results[j].hasValue && resp.results[j].value.type == &UA_TYPES[UA_TYPES_INT32])
{ UA_Variant* variant = &resp.results[j].value;
int variant_length = UA_Variant_isScalar(variant) ? 1 : variant->arrayLength;
int k;
for(k = 0; k < variant_length; k++)
{ value = ((UA_Int32*)variant->data)[k];
{ printf("%d ", value);
{ printf("\r");
// Cleanup.
#elif defined(EPICS)
int retries = 0;
int status = ECA_NORMAL;
int j = 0;
// Loop through all channels per transaction
for (j = 0; j < transaction_size && status == ECA_NORMAL; j++) {
channel_struct *pch = &channels [j];
int ec = 0;
while ((ec == 0) && (retries++ < MAX_CONN_RETRIES)) {
ec = ca_element_count (pch -> chid);
if (ec == 0) handle_disconnect (ECA_DISCONN, pch);
if (ch_type == T_LONG) {
status = ca_get(DBR_LONG, pch -> chid, (void *) &l);
else {
status = ca_array_get(DBR_LONG, array_size, pch -> chid, (void *) pl);
SEVCHK (status, "ca_<array>_get failure in loop");
status = ca_pend_io(5.0);
SEVCHK(status, "ca_pend_io failure in loop");
if (status != ECA_NORMAL) {
printf ("Premature exit after %d reads\n", transaction_count);
float transaction_duration = get_clock_diff(&transaction_end_clock, &transaction_begin_clock);
transaction_duration_min = MIN(transaction_duration, transaction_duration_min);
transaction_duration_max = MAX(transaction_duration, transaction_duration_max);
transaction_duration_sum += transaction_duration;
// Wait with time measurement until trigger file present, that acts as synchronization between multiple clients.
if(!measurement_begin_clocked && !measuring && (!sync_begin || access(measure_trigger, F_OK) != -1))
{ get_clock(&measurement_begin_clock);
measurement_begin_clocked = 1;
measuring = 1;
// End the measurement when trigger file removed, or when max duration reached (used for run-out).
// Thus determine the duration of the measurement so far.
else if(!measurement_end_clocked && measuring)
{ get_clock(&measurement_end_clock);
measurement_duration = get_clock_diff(&measurement_end_clock, &measurement_begin_clock);
if(sync_end && access(measure_trigger, F_OK) == -1)
{ measurement_end_clocked = 1;
measuring = 0;
transaction_count += measuring;
int bytes_per_transaction = array_size * array_item_size * transaction_size;
float bandwidth = bytes_per_transaction * transaction_count / measurement_duration;
if(measurement_ok && print_output)
{ printf("%d transactions, %g seconds, %d B/transaction, %g MB/s",
transaction_count, measurement_duration, bytes_per_transaction, bandwidth * 1e-6);
if(transaction_duration_sum > 0.0)
{ printf(", latency min/avg/max: %g/%g/%g seconds",
transaction_duration_min, transaction_duration_sum / transaction_count, transaction_duration_max);
#if defined(EPICS)
// create a variable name from a sequence number and a type, and return it in 'the_name'
static void makename (int seqno, channel_type type, char **the_name)
char dest [100];
switch (type)
case T_LONG: // long
sprintf (dest, "benchlong:%d.RVAL", seqno);
case T_LONG_ARRAY: // long array
sprintf (dest, "benchlongarray:%d.VAL", seqno);
// case T_DOUBLE: // one double var
// sprintf (dest, "benchdouble:%d.VAL", seqno);
// break;
// case T_DOUBLE_ARRAY: // double array
// sprintf (dest, "benchdoublearray:%d.VAL", seqno);
// break;
default: sprintf (dest, "ILLEGAL");
*the_name = (char *) malloc (strlen (dest) + 1);
if (*the_name == NULL) {
printf ("ERROR: Failed to allocate %d bytes of memory for variable name %s\n", (int) (strlen (dest)) + 1, dest);
exit (1);
strcpy (*the_name, dest);
// Create all EPICS channels ---------------------------------------
// Depending on array size (=1 or >1) create an array of scalar channels or an array of array channels
static void make_channels (void)
int i;
int status;
channel_struct *pcs = &channels [0];
if (array_size == 1) {
ch_type = T_LONG;
else {
ch_type = T_LONG_ARRAY;
for (i = 0; i < transaction_size; i++) {
pcs = &channels [i];
makename (i, ch_type, &(pcs -> pvname));
status = ca_create_channel (pcs->pvname, NULL, NULL, 10, &pcs->chid);
if (status != ECA_NORMAL) {
printf ("make_channels: ca_create_channel failure, channel %d\n", i);
SEVCHK(status,"ca_create_channel failure");
status = ca_pend_io(5.0);
if (status != ECA_NORMAL) {
printf ("ca_pend_io failure creating channel %s; offline?\n", pcs -> pvname);
SEVCHK (status, "ca_pend_io failure creating channel");
// Destroy all created EPICS channels ---------------------------------------
static void destroy_channels (void)
int i;
int status;
for (i = 0; i < transaction_size; i++) {
channel_struct *pcs = &channels [i];
status = ca_clear_channel(pcs->chid);
if (status != ECA_NORMAL) {
printf ("ca_clear_channel failure, channel %s\n", pcs -> pvname);
SEVCHK(status,"ca_clear_channel failure");
status = ca_pend_io(5.0);
if (status != ECA_NORMAL) {
printf ("ca_pend_io failure destroying channels, channel %s\n", pcs -> pvname);
SEVCHK (status, "ca_pend_io failure destroying channels");
free (pcs -> pvname);
// Main app.
int main(int argc, char** argv)
// Handle command line arguments.
parse_arguments(argc, argv);
// Setup the connnection.
#if defined(OPC)
char* server_url = malloc(strlen(protocol) + strlen(server_address) + 1 + strlen(port_nr) + 1);
server_url[0] = 0;
strcat(server_url, protocol);
strcat(server_url, server_address);
strcat(server_url, ":");
strcat(server_url, port_nr);
// open65241 stack does not support message chunking yet.
// Hence single buffer to be specified that is big enough to contain all transaction data, including overhead.
int buffer_overhead = 1024; // Extra space for non-payload.
int buffer_size = array_size * sizeof(UA_Int32) * transaction_size + buffer_overhead;
if(buffer_size < 65536)
{ buffer_size = 65536;
UA_ClientConfig config = {
.timeout = 5, // sync response timeout in ms
.secureChannelLifeTime = 1000000, // lifetime in ms (then the channel needs to be renewed)
.timeToRenewSecureChannel = 2000, // time in ms before expiration to renew the secure channel
{.protocolVersion = 0, .sendBufferSize = buffer_size, .recvBufferSize = buffer_size,
.maxMessageSize = buffer_size, .maxChunkCount = 1
client = UA_Client_new(config /*UA_ClientConfig_standard*/, Logger_Stdout_new());
UA_StatusCode retval = UA_Client_connect(client, ClientNetworkLayerTCP_connect, server_url);
if(retval != UA_STATUSCODE_GOOD)
{ printf("Aborted.\n");
return retval;
req.nodesToReadSize = transaction_size;
req.nodesToRead = UA_Array_new(&UA_TYPES[UA_TYPES_READVALUEID], req.nodesToReadSize);
int i;
for(i = 0; i < req.nodesToReadSize; i++)
{ UA_ReadValueId_init(&(req.nodesToRead[i]));
req.nodesToRead[i].nodeId = UA_NODEID_STRING_ALLOC(namespace_index, variable_name); // nodeId string deleted with req
req.nodesToRead[i].attributeId = UA_ATTRIBUTEID_VALUE;
#elif defined(EPICS)
int status;
make_channels ();
status = ca_context_create(ca_disable_preemptive_callback);
SEVCHK(status,"ca_context_create failure");
// Run the transactions.
run_transactions(transaction_count, 0.0, sync_begin, sync_end, 1);
if(runout_duration > 0.0)
{ if(verbose)
{ printf("Run-out ...\n");
run_transactions(0, runout_duration, 0, 0, 0);
// Cleanup.
#if defined(OPC)
#elif defined (EPICS)
destroy_channels ();
// SEVCHK(ca_context_destroy(),"ca_context_destroy");
SEVCHK(ca_task_exit(),"ca_task_exit failure");
return 0;
gcc -O3 -DOPC -D_WIN32 -std=c11 client.c open62541.c -lws2_32 -o client.exe
gcc -O3 -std=c11 server.c open62541.c -lws2_32 -o server.exe
gcc -O3 -DOPC -D_POSIX_C_SOURCE=199309L -std=c11 client.c open62541.c -o client.bin
gcc -O3 -std=c11 server.c open62541.c -o server.bin
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <signal.h>
#include "open62541.h"
// Application arguments/defaults
static int port_nr = 48010;
static int thread_count = 4;
static int verbose = 0;
// Argument parsing and usage.
void print_usage(char *title)
{ printf("%s\n", title);
"usage: \n"
" [-port port_nr ] (default: %d)\n"
" [-threads thread_count] (default: %d)\n"
" [-v ] (verbose)\n"
" [-h ] (help)\n",
port_nr, thread_count
void parse_arguments(int argc, char** argv)
for(int i = 1; i < argc; i++)
{ if(strcmp(argv[i], "-h") == 0)
{ print_usage(NULL);
else if(strcmp(argv[i], "-v") == 0)
{ verbose = 1;
else if(i < argc - 1)
{ if(strcmp(argv[1], "-port") == 0)
{ port_nr = atoi(argv[i+1]);
else if(strcmp(argv[i], "-threads") == 0)
{ thread_count = atoi(argv[i+1]);
{ printf("%s -port %d -threads %d\n", argv[0], port_nr, thread_count);
// Define arrays to provide data to the variable nodes.
typedef struct
{ int size;
char* name;
UA_Int32* data;
static int variable_sizes[] =
{1, 2, 4, 5, 8, 10, 13, 16, 20, 32, 50, 64, 100, 128, 200, 256, 500, 512, 1000, 1024,
2000, 2048, 4096, 5000, 8192, 10000, 16384, 20000, 25000, 32768, 50000, 65536,
100000, 131072, 200000, 262144, 500000, 524288, 1000000, 1048576, 2097152};
static int max_variable_size;
static int variable_count;
static Variable_t** variables;
void setup_variables()
variable_count = sizeof(variable_sizes)/sizeof(int);
variables = malloc(variable_count * sizeof(Variable_t));
if(variables == NULL)
{ printf("ERROR: Memory allocation of variables data failed.\n");
for(int i = 0; i < variable_count; i++)
{ int size = variable_sizes[i];
max_variable_size = size > max_variable_size ? size : max_variable_size;
variables[i] = malloc(sizeof(Variable_t));
variables[i]->size = size;
variables[i]->data = malloc(size * sizeof(UA_Int32));
for(int j = 0; j < size; j++)
{ variables[i]->data[j] = j;
variables[i]->name = malloc(5 + i);
sprintf(variables[i]->name, "var%d", size);
void cleanup_variables()
{ for(int i = 0; i < variable_count; i++)
{ free(variables[i]->data);
// DataSource callbacks
static UA_StatusCode read(void *handle, UA_Boolean includeSourceTimeStamp, const UA_NumericRange *range, UA_DataValue *dataValue)
Variable_t* variable = handle;
if(variable->size == 1)
{ UA_Variant_setScalar(&dataValue->value, variable->data, &UA_TYPES[UA_TYPES_INT32]);
{ UA_Variant_setArray(&dataValue->value, variable->data, variable->size, &UA_TYPES[UA_TYPES_INT32]);
dataValue->status = UA_STATUSCODE_GOOD;
dataValue->hasValue = UA_TRUE;
dataValue->hasStatus = UA_TRUE;
{ dataValue->hasSourceTimestamp = UA_TRUE;
dataValue->sourceTimestamp = UA_DateTime_now();
{ printf("%s: %d", variable->name, variable->data[variable->size - 1]);
{ UA_ByteString s;
UA_DateTime_toString(dataValue->sourceTimestamp, &s);
printf(" %s",;
printf(" \r");
// Increase last array element, in order to give feedback to the client that the data is really updated.
variable->data[variable->size - 1]++;
static void release(void *handle, UA_DataValue *dataValue)
{ // UA_Variant_deleteMembers(&dataValue->value);
// Not to be deleted because data is not copied into the variant. Just referenced.
static UA_StatusCode write(void *handle, const UA_Variant *data, const UA_NumericRange *range)
// Callback to handle ctrl-c in order to stop the server with cleanup.
static UA_Boolean running = UA_TRUE;
static void stop_handler(int sign)
{ running = UA_FALSE;
// Main app.
int main(int argc, char** argv)
// Handle command line arguments.main.c
parse_arguments(argc, argv);
// Setup variable node data.
// Initialize server.
int buffer_overhead = 1024; // Extra space for non-payload.
int buffer_size = max_variable_size + buffer_overhead;
UA_ConnectionConfig config =
{.protocolVersion = 0, .sendBufferSize = buffer_size, .recvBufferSize = buffer_size,
.maxMessageSize = buffer_size, .maxChunkCount = 1};
UA_Server *server = UA_Server_new(UA_ServerConfig_standard);
UA_Server_addNetworkLayer(server, ServerNetworkLayerTCP_new(config, port_nr));
// Add the variable nodes.
for(int i = 0; i < variable_count; i++)
{ printf("Variable: %s\n", variables[i]->name);
UA_QualifiedName nodeName = UA_QUALIFIEDNAME(1, variables[i]->name);
UA_NodeId nodeId = UA_NODEID_STRING(1, variables[i]->name);
UA_NodeId parentReferenceNodeId = UA_NODEID_NUMERIC(0, UA_NS0ID_ORGANIZES);
UA_DataSource dataSource = {variables[i], read, release, write};
UA_Server_addDataSourceVariableNode(server, dataSource, nodeName, nodeId, parentNodeId, parentReferenceNodeId);
// Run the server loop.
printf("Running ...\n");
signal(SIGINT, stop_handler); /* Catches ctrl-c */
UA_StatusCode retval = UA_Server_run(server, thread_count, &running);
// Cleanup.
return retval;
