Created
January 7, 2022 21:55
-
-
Save kala13x/bad220ca9a31747eebec525b5190e082 to your computer and use it in GitHub Desktop.
High performance event based non-blocking echo server
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/*! | |
* @file libxutils/examples/events.c | |
* | |
* This source is part of "libxutils" project | |
* 2015-2020 Sun Dro (f4tb0y@protonmail.com) | |
* | |
* @brief Implementation of high performance event based non-blocking echo server. | |
* The xUtils library will use poll() or epoll() depending on the operating system. | |
*/ | |
#include <xutils/xstd.h> | |
#include <xutils/buffer.h> | |
#include <xutils/event.h> | |
#include <xutils/sock.h> | |
#include <xutils/xlog.h> | |
#define XEVENT_RX_SIZE 2048 | |
static int g_nInterrupted = 0; | |
void signal_callback(int sig) | |
{ | |
if (sig == 2) printf("\n"); | |
g_nInterrupted = 1; | |
} | |
void clear_event(xevent_data_t *pEvData) | |
{ | |
if (pEvData != NULL) | |
{ | |
if (pEvData->pContext != NULL) | |
{ | |
XByteBuffer_Clear((xbyte_buffer_t*)pEvData->pContext); | |
pEvData->pContext = NULL; | |
} | |
if (pEvData->nFD >= 0) | |
{ | |
shutdown(pEvData->nFD, SHUT_RDWR); | |
close(pEvData->nFD); | |
pEvData->nFD = -1; | |
} | |
} | |
} | |
int read_event(xevents_t *pEvents, xevent_data_t *pEvData) | |
{ | |
if (pEvents == NULL || pEvData == NULL) return XEVENTS_DISCONNECT; | |
xsock_t *pListener = (xsock_t*)pEvents->pUserSpace; | |
if (pListener->nFD == pEvData->nFD) | |
{ | |
xsock_t newSock; | |
XSock_Accept(pListener, &newSock); | |
XSock_NonBlock(&newSock, 1); | |
if (newSock.eStatus == XSOCK_ERR_NONE) | |
{ | |
xbyte_buffer_t *pBuffer = XByteBuffer_New(XEVENT_RX_SIZE, 0); | |
if (pBuffer == NULL) | |
{ | |
xloge("Can not allocate memory for rx buffer: %d", errno); | |
XSock_Close(&newSock); | |
return XEVENTS_CONTINUE; | |
} | |
xlogn("Accepted connection: fd(%d)", newSock.nFD); | |
XEvents_RegisterEvent(pEvents, pBuffer, newSock.nFD, XPOLLIN, 0); | |
return XEVENTS_ACCEPT; | |
} | |
return XEVENTS_CONTINUE; | |
} | |
else | |
{ | |
xsock_t tcpSock; | |
XSock_Init(&tcpSock, XSOCK_TCP_PEER, pEvData->nFD, 1); | |
char buffer[XEVENT_RX_SIZE]; | |
int nLength = XSock_Read(&tcpSock, buffer, sizeof(buffer)); | |
if (tcpSock.eStatus == XSOCK_FINAL_PACKET) | |
{ | |
xlogn("%s (%d)", XSock_Error(&tcpSock), pEvData->nFD); | |
return XEVENTS_DISCONNECT; | |
} | |
else if (tcpSock.eStatus != XSOCK_ERR_NONE) | |
{ | |
xloge("%s (%s)", XSock_Error(&tcpSock), strerror(errno)); | |
pEvData->nFD = XSOCK_INVALID; | |
return XEVENTS_DISCONNECT; | |
} | |
xbyte_buffer_t *pBuffer = (xbyte_buffer_t*)pEvData->pContext; | |
if (XByteBuffer_Add(pBuffer, (uint8_t*)buffer, nLength) <= 0) | |
{ | |
xloge("Can not append data to rx buffer: %d", errno); | |
return XEVENTS_DISCONNECT; | |
} | |
XEvents_Modify(pEvents, pEvData, XPOLLOUT); | |
xlogn("Finish read: fd(%d), len(%zu), (rx buffer) << %s", | |
pEvData->nFD, pBuffer->nUsed, (char*)pBuffer->pData); | |
} | |
return XEVENTS_CONTINUE; | |
} | |
int write_event(xevents_t *pEvents, xevent_data_t *pEvData) | |
{ | |
if (pEvents == NULL || pEvData == NULL) return XEVENTS_DISCONNECT; | |
xbyte_buffer_t *pBuffer = (xbyte_buffer_t*)pEvData->pContext; | |
xsock_t socket; | |
XSock_Init(&socket, XSOCK_TCP_PEER, pEvData->nFD, 1); | |
int nSent = XSock_Write(&socket, pBuffer->pData, pBuffer->nUsed); | |
if (nSent < 0) | |
{ | |
xloge("%s (%s)", XSock_Error(&socket), strerror(errno)); | |
return XEVENTS_DISCONNECT; | |
} | |
xlogn("Finish write: fd(%d), len(%d), (tx buffer) >> %s", | |
pEvData->nFD, pBuffer->nUsed, (char*)pBuffer->pData); | |
if (!XByteBuffer_Advance(pBuffer, nSent)) | |
XEvents_Modify(pEvents, pEvData, XPOLLIN); | |
return XEVENTS_CONTINUE; | |
} | |
int event_callback(void *events, void* data, int fd, int reason) | |
{ | |
xevent_data_t *pData = (xevent_data_t*)data; | |
xevents_t *pEvents = (xevents_t*)events; | |
switch(reason) | |
{ | |
case XEVENT_READ: | |
xlogn("RX callback: fd(%d)", fd); | |
return read_event(pEvents, pData); | |
case XEVENT_WRITE: | |
xlogn("TX callback: fd(%d)", fd); | |
return write_event(pEvents, pData); | |
case XEVENT_HUNGED: | |
xlogw("Connection hunged: fd(%d)", fd); | |
return XEVENTS_DISCONNECT; | |
case XEVENT_CLOSED: | |
xlogi("Connection closed: fd(%d)", fd); | |
return XEVENTS_DISCONNECT; | |
case XEVENT_INTERRUPT: | |
xlogi("Interrupted by signal"); | |
return XEVENTS_DISCONNECT; | |
case XEVENT_CLEAR: | |
xlogi("Clear callback: fd(%d)", fd); | |
clear_event(pData); | |
break; | |
case XEVENT_DESTROY: | |
xlogi("Service destroyed"); | |
break; | |
default: | |
break; | |
} | |
return XEVENTS_CONTINUE; | |
} | |
int main(int argc, char* argv[]) | |
{ | |
xlog_defaults(); | |
xlog_timing(XLOG_TIME_ONLY); | |
xlog_setfl(XLOG_ALL); | |
/* Register interrupt/termination signals */ | |
signal(SIGTERM, signal_callback); | |
signal(SIGINT, signal_callback); | |
/* Used variables */ | |
xevent_status_t status; | |
xevents_t events; | |
xsock_t socket; | |
/* Check valid args */ | |
if (argc < 2) | |
{ | |
xlog("Usage: %s [address] [port]", argv[0]); | |
xlog("Example: %s 127.0.0.1 6969", argv[0]); | |
return 1; | |
} | |
/* Create server socket */ | |
XSock_Create(&socket, XSOCK_TCP_SERVER, argv[1], atoi(argv[2])); | |
if (socket.nFD < 0) | |
{ | |
xloge("%s", XSock_Error(&socket)); | |
return 1; | |
} | |
xlogi("Socket started listen to port: %d", atoi(argv[2])); | |
/* Create event instance */ | |
status = XEvents_Create(&events, 0, &socket, event_callback, 1); | |
if (status != XEVENT_STATUS_SUCCESS) | |
{ | |
xloge("%s", XEvents_ErrStr(status)); | |
XSock_Close(&socket); | |
return 1; | |
} | |
/* Add listener socket to the event instance */ | |
xevent_data_t *pListener = XEvents_RegisterEvent(&events, NULL, socket.nFD, XPOLLIN, 0); | |
if (pListener == NULL) | |
{ | |
xloge("Failed to register listener event"); | |
XEvents_Destroy(&events); | |
XSock_Close(&socket); | |
return 1; | |
} | |
/* Main service loop */ | |
while (status == XEVENT_STATUS_SUCCESS) | |
status = XEvents_Service(&events, 100); | |
XEvents_Destroy(&events); | |
return 0; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment