Skip to content

Instantly share code, notes, and snippets.

@kala13x
Created January 7, 2022 21:55
Show Gist options
  • Save kala13x/bad220ca9a31747eebec525b5190e082 to your computer and use it in GitHub Desktop.
Save kala13x/bad220ca9a31747eebec525b5190e082 to your computer and use it in GitHub Desktop.
High performance event based non-blocking echo server
/*!
* @file libxutils/examples/events.c
*
* This source is part of "libxutils" project
* 2015-2020 Sun Dro (f4tb0y@protonmail.com)
*
* @brief Implementation of high performance event based non-blocking echo server.
* The xUtils library will use poll() or epoll() depending on the operating system.
*/
#include <xutils/xstd.h>
#include <xutils/buffer.h>
#include <xutils/event.h>
#include <xutils/sock.h>
#include <xutils/xlog.h>
#define XEVENT_RX_SIZE 2048
static int g_nInterrupted = 0;
void signal_callback(int sig)
{
if (sig == 2) printf("\n");
g_nInterrupted = 1;
}
void clear_event(xevent_data_t *pEvData)
{
if (pEvData != NULL)
{
if (pEvData->pContext != NULL)
{
XByteBuffer_Clear((xbyte_buffer_t*)pEvData->pContext);
pEvData->pContext = NULL;
}
if (pEvData->nFD >= 0)
{
shutdown(pEvData->nFD, SHUT_RDWR);
close(pEvData->nFD);
pEvData->nFD = -1;
}
}
}
int read_event(xevents_t *pEvents, xevent_data_t *pEvData)
{
if (pEvents == NULL || pEvData == NULL) return XEVENTS_DISCONNECT;
xsock_t *pListener = (xsock_t*)pEvents->pUserSpace;
if (pListener->nFD == pEvData->nFD)
{
xsock_t newSock;
XSock_Accept(pListener, &newSock);
XSock_NonBlock(&newSock, 1);
if (newSock.eStatus == XSOCK_ERR_NONE)
{
xbyte_buffer_t *pBuffer = XByteBuffer_New(XEVENT_RX_SIZE, 0);
if (pBuffer == NULL)
{
xloge("Can not allocate memory for rx buffer: %d", errno);
XSock_Close(&newSock);
return XEVENTS_CONTINUE;
}
xlogn("Accepted connection: fd(%d)", newSock.nFD);
XEvents_RegisterEvent(pEvents, pBuffer, newSock.nFD, XPOLLIN, 0);
return XEVENTS_ACCEPT;
}
return XEVENTS_CONTINUE;
}
else
{
xsock_t tcpSock;
XSock_Init(&tcpSock, XSOCK_TCP_PEER, pEvData->nFD, 1);
char buffer[XEVENT_RX_SIZE];
int nLength = XSock_Read(&tcpSock, buffer, sizeof(buffer));
if (tcpSock.eStatus == XSOCK_FINAL_PACKET)
{
xlogn("%s (%d)", XSock_Error(&tcpSock), pEvData->nFD);
return XEVENTS_DISCONNECT;
}
else if (tcpSock.eStatus != XSOCK_ERR_NONE)
{
xloge("%s (%s)", XSock_Error(&tcpSock), strerror(errno));
pEvData->nFD = XSOCK_INVALID;
return XEVENTS_DISCONNECT;
}
xbyte_buffer_t *pBuffer = (xbyte_buffer_t*)pEvData->pContext;
if (XByteBuffer_Add(pBuffer, (uint8_t*)buffer, nLength) <= 0)
{
xloge("Can not append data to rx buffer: %d", errno);
return XEVENTS_DISCONNECT;
}
XEvents_Modify(pEvents, pEvData, XPOLLOUT);
xlogn("Finish read: fd(%d), len(%zu), (rx buffer) << %s",
pEvData->nFD, pBuffer->nUsed, (char*)pBuffer->pData);
}
return XEVENTS_CONTINUE;
}
int write_event(xevents_t *pEvents, xevent_data_t *pEvData)
{
if (pEvents == NULL || pEvData == NULL) return XEVENTS_DISCONNECT;
xbyte_buffer_t *pBuffer = (xbyte_buffer_t*)pEvData->pContext;
xsock_t socket;
XSock_Init(&socket, XSOCK_TCP_PEER, pEvData->nFD, 1);
int nSent = XSock_Write(&socket, pBuffer->pData, pBuffer->nUsed);
if (nSent < 0)
{
xloge("%s (%s)", XSock_Error(&socket), strerror(errno));
return XEVENTS_DISCONNECT;
}
xlogn("Finish write: fd(%d), len(%d), (tx buffer) >> %s",
pEvData->nFD, pBuffer->nUsed, (char*)pBuffer->pData);
if (!XByteBuffer_Advance(pBuffer, nSent))
XEvents_Modify(pEvents, pEvData, XPOLLIN);
return XEVENTS_CONTINUE;
}
int event_callback(void *events, void* data, int fd, int reason)
{
xevent_data_t *pData = (xevent_data_t*)data;
xevents_t *pEvents = (xevents_t*)events;
switch(reason)
{
case XEVENT_READ:
xlogn("RX callback: fd(%d)", fd);
return read_event(pEvents, pData);
case XEVENT_WRITE:
xlogn("TX callback: fd(%d)", fd);
return write_event(pEvents, pData);
case XEVENT_HUNGED:
xlogw("Connection hunged: fd(%d)", fd);
return XEVENTS_DISCONNECT;
case XEVENT_CLOSED:
xlogi("Connection closed: fd(%d)", fd);
return XEVENTS_DISCONNECT;
case XEVENT_INTERRUPT:
xlogi("Interrupted by signal");
return XEVENTS_DISCONNECT;
case XEVENT_CLEAR:
xlogi("Clear callback: fd(%d)", fd);
clear_event(pData);
break;
case XEVENT_DESTROY:
xlogi("Service destroyed");
break;
default:
break;
}
return XEVENTS_CONTINUE;
}
int main(int argc, char* argv[])
{
xlog_defaults();
xlog_timing(XLOG_TIME_ONLY);
xlog_setfl(XLOG_ALL);
/* Register interrupt/termination signals */
signal(SIGTERM, signal_callback);
signal(SIGINT, signal_callback);
/* Used variables */
xevent_status_t status;
xevents_t events;
xsock_t socket;
/* Check valid args */
if (argc < 2)
{
xlog("Usage: %s [address] [port]", argv[0]);
xlog("Example: %s 127.0.0.1 6969", argv[0]);
return 1;
}
/* Create server socket */
XSock_Create(&socket, XSOCK_TCP_SERVER, argv[1], atoi(argv[2]));
if (socket.nFD < 0)
{
xloge("%s", XSock_Error(&socket));
return 1;
}
xlogi("Socket started listen to port: %d", atoi(argv[2]));
/* Create event instance */
status = XEvents_Create(&events, 0, &socket, event_callback, 1);
if (status != XEVENT_STATUS_SUCCESS)
{
xloge("%s", XEvents_ErrStr(status));
XSock_Close(&socket);
return 1;
}
/* Add listener socket to the event instance */
xevent_data_t *pListener = XEvents_RegisterEvent(&events, NULL, socket.nFD, XPOLLIN, 0);
if (pListener == NULL)
{
xloge("Failed to register listener event");
XEvents_Destroy(&events);
XSock_Close(&socket);
return 1;
}
/* Main service loop */
while (status == XEVENT_STATUS_SUCCESS)
status = XEvents_Service(&events, 100);
XEvents_Destroy(&events);
return 0;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment