#include <algorithm>
#include <map>
#include <string>
#include <vector>
#include "jsonrpc.h"
#ifndef DONT_USE_SOCKET
#include "nn/nlib/TcpInputStream.h"
#include "nn/nlib/TcpOutputStream.h"
#endif
using nlib_ns::msgpack::MpObject;
using nlib_ns::msgpack::MpObjectKv;
using nlib_ns::msgpack::jsonrpc::JsonRpcResponse;
namespace {
#ifdef DONT_USE_SOCKET
class ServerEndByMemory {
public:
g_s2c.Send(data, n);
return 0;
}
while (!g_c2s.Receive(data, n)) {
if (g_ShutdownServer) {
*n = 0;
data->reset();
return 0;
}
}
return 0;
}
void Close() {}
};
#else
class ServerEndByTcp {
public:
e = nlib_socket(&m_Acceptor, NLIB_AF_INET, NLIB_SOCK_STREAM, 0);
if (e != 0) return e;
int yes = 1;
e = nlib_setsockopt(m_Acceptor, SOL_SOCKET, NLIB_SO_REUSEADDR, &yes, sizeof(yes));
if (e != 0) return e;
nlib_sockaddr_in addr;
addr.sin_family = NLIB_AF_INET;
addr.sin_port = nlib_htons(NLIB_SOCKPORT_SAMPLE);
addr.sin_addr.s_addr = nlib_htonl(NLIB_INADDR_ANY);
e = nlib_bind(m_Acceptor, reinterpret_cast<nlib_sockaddr*>(&addr), sizeof(addr));
if (e != 0) return e;
e = nlib_listen(m_Acceptor, 1);
return e;
}
errno_t e = nlib_accept(&m_Socket, m_Acceptor, NULL, NULL);
if (e != 0) return e;
m_Is.Init(m_Socket);
return 0;
}
using nlib_ns::threading::SimpleCriticalSection;
using nlib_ns::threading::ScopedLock;
ScopedLock<SimpleCriticalSection> l(m_LockS);
nlib_ns::TcpOutputStream os;
os.Init(m_Socket);
{
}
os.Write(data->get(), n);
os.Write(',');
os.Flush();
return os.IsOk() ? 0 : os.GetErrorValue();
}
using nlib_ns::threading::SimpleCriticalSection;
using nlib_ns::threading::ScopedLock;
ScopedLock<SimpleCriticalSection> l(m_LockR);
if (m_Is.Peek() < 0) {
if (!m_Is.IsOk()) return EIO;
*n = 0;
return 0;
}
int c;
size_t nBytes = 0;
for (;;) {
c = m_Is.Read();
if (c == ':') break;
size_t newVal = nBytes * 10 + c - '0';
if (newVal < nBytes) return EILSEQ;
nBytes = newVal;
}
data->reset(reinterpret_cast<uint8_t*>(
nlib_malloc(nBytes)));
if (!*data) return ENOMEM;
if (nBytes != m_Is.Read(data->get(), nBytes)) return EILSEQ;
if (m_Is.Read() != ',') return EILSEQ;
*n = nBytes;
return 0;
}
void Close() {
nlib_closesocket(m_Socket);
nlib_closesocket(m_Acceptor);
}
private:
nlib_sock m_Acceptor;
nlib_sock m_Socket;
nlib_ns::TcpInputStream m_Is;
nlib_ns::threading::SimpleCriticalSection m_LockS;
nlib_ns::threading::SimpleCriticalSection m_LockR;
};
#endif
#ifndef DONT_USE_SOCKET
typedef ServerEndByTcp ServerEnd;
#else
typedef ServerEndByMemory ServerEnd;
#endif
UniquePtr<ServerEnd> g_ServerEnd;
int g_Counter = 0;
struct Msg {
std::string uri;
DateTime datetime;
std::string text;
public:
Msg(DateTime datetime_, const char* text_) : datetime(datetime_), text(text_) {
char buf[128];
++g_Counter;
uri = buf;
}
bool operator<(
const Msg& rhs)
const {
return datetime < rhs.datetime; }
bool ToMpObject(MpObject* obj) {
char dt[32];
datetime.ToW3cDtf(dt);
std::map<std::string, std::string> m;
m["datetime"] = dt;
m["uri"] = uri;
m["text"] = text;
MpObject tmp;
if (tmp.Box(m) != 0) return false;
tmp.swap(*obj);
return true;
}
};
class MsgDb {
public:
~MsgDb() {
std::vector<Msg*>::iterator it = m_Data.begin();
std::vector<Msg*>::iterator end = m_Data.end();
for (; it != end; ++it) {
delete *it;
}
}
private:
Msg* FindByUri(const char* uri) {
std::string uristr(uri);
std::vector<Msg*>::iterator it = m_Data.begin();
std::vector<Msg*>::iterator end = m_Data.end();
for (; it != end; ++it) {
if ((*it)->uri == uristr) {
return *it;
}
}
return NULL;
}
bool DeleteByUri(const char* uri) {
std::string uristr(uri);
std::vector<Msg*>::iterator it = m_Data.begin();
std::vector<Msg*>::iterator end = m_Data.end();
for (; it != end; ++it) {
if ((*it)->uri == uristr) {
delete *it;
m_Data.erase(it);
return true;
}
}
return false;
}
static bool LessFunc(Msg* lhs, Msg* rhs) { return *lhs < *rhs; }
public:
std::string Create(DateTime datetime, const char* text) {
m_Data.push_back(new Msg(datetime, text));
std::string rval = m_Data.back()->uri;
std::sort(m_Data.begin(), m_Data.end(), LessFunc);
return rval;
}
bool Read(MpObject* obj)
const {
uint32_t n = static_cast<uint32_t>(m_Data.size());
if (e != 0) return false;
for (uint32_t i = 0; i < n; ++i) {
m_Data[i]->ToMpObject(obj->GetArrayItem(i));
}
return true;
}
bool Update(const std::string& uri, DateTime datetime, const std::string& text) {
Msg* msg = FindByUri(uri.c_str());
if (!msg) return false;
msg->uri = uri;
msg->datetime = datetime;
msg->text = text;
std::sort(m_Data.begin(), m_Data.end(), LessFunc);
return true;
}
bool Delete(const char* uri) { return DeleteByUri(uri); }
private:
std::vector<Msg*> m_Data;
} g_MsgDb;
bool GetDateTimeField(const MpObject& param, DateTime* dt) {
const MpObject* datetimeObj = param.GetMapItem("datetime");
if (!datetimeObj) return false;
std::string dtstr;
if (datetimeObj->Unbox(&dtstr) != 0) return false;
if (DateTime::Parse(dtstr.c_str(), dt, &span) != 0) return false;
return true;
}
bool GetTextField(const MpObject& param, std::string* text) {
const MpObject* txtObj = param.GetMapItem("text");
if (!txtObj) return false;
if (txtObj->Unbox(text) != 0) return false;
return true;
}
bool GetUriField(const MpObject& param, std::string* uri) {
const MpObject* uriObj = param.GetMapItem("uri");
if (!uriObj) return false;
if (uriObj->Unbox(uri) != 0) return false;
return true;
}
DateTime dt;
std::string text;
std::string uri = g_MsgDb.Create(dt, text.c_str());
MpObject obj;
response.MoveResultFrom(obj);
}
MpObject obj;
response.MoveResultFrom(obj);
}
std::string uri;
std::string text;
DateTime dt;
if (!g_MsgDb.Update(uri, dt, text)) {
response.SetError(-32000, "uri not found");
}
}
std::string uritext;
if (!g_MsgDb.Delete(uritext.c_str())) {
response.SetError(-32000, "uri not found");
}
}
void ProcessRequest() {
ReallocOutputStream::UniquePtrType reqBytes;
size_t n = 0;
ReallocOutputStream::UniquePtrType responseBytes;
errno_t e = g_ServerEnd->Receive(&reqBytes, &n);
if (e != 0 || n == 0) return;
g_ServerEnd->Send(&responseBytes, rn);
}
}
g_ServerEnd.reset(new ServerEnd());
if (!g_ServerEnd) {
return ENOMEM;
}
e = g_ServerEnd->Init();
if (e != 0) {
return e;
}
if (e != 0) {
return e;
}
if (e != 0) {
return e;
}
if (e != 0) {
return e;
}
if (e != 0) {
return e;
}
return 0;
}
volatile bool g_ShutdownServer = false;
void ServerThread() {
e = g_ServerEnd->Accept();
if (e != 0) {
return;
}
for (;;) {
ProcessRequest();
if (g_ShutdownServer) break;
}
g_ServerEnd->Close();
g_ServerEnd.reset();
}