#include <algorithm>
#include <map>
#include <string>
#include <vector>
#include "jsonrpc.h"
#ifndef DONT_USE_SOCKET
#include "nn/nlib/TcpInputStream.h"
#include "nn/nlib/TcpOutputStream.h"
#endif
using nlib_ns::msgpack::MpObject;
using nlib_ns::msgpack::MpObjectKv;
using nlib_ns::msgpack::jsonrpc::JsonRpcResponse;
namespace {
#ifdef DONT_USE_SOCKET
class ServerEndByMemory {
public:
g_s2c.Send(data, n);
return 0;
}
while (!g_c2s.Receive(data, n)) {
if (g_ShutdownServer) {
*n = 0;
data->reset();
return 0;
}
}
return 0;
}
void Close() {}
};
#else
class ServerEndByTcp {
public:
e = nlib_socket(&acceptor_, AF_INET, SOCK_STREAM, 0);
int yes = 1;
e = nlib_setsockopt(acceptor_, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes));
nlib_sockaddr_in addr;
addr.sin_family = AF_INET;
addr.sin_port = nlib_htons(NLIB_SOCKPORT_SAMPLE);
addr.sin_addr.s_addr = nlib_htonl(INADDR_ANY);
e = nlib_bind(acceptor_, reinterpret_cast<nlib_sockaddr*>(&addr), sizeof(addr));
e = nlib_listen(acceptor_, 1);
return e;
}
errno_t e = nlib_accept(&socket_, acceptor_, NULL, NULL, 0);
stream_.Init(socket_);
return 0;
}
using nlib_ns::threading::SimpleCriticalSection;
using nlib_ns::threading::ScopedLock;
ScopedLock<SimpleCriticalSection> l(send_lock_);
nlib_ns::TcpOutputStream os;
os.Init(socket_);
{
}
os.Write(data->get(), n);
os.Write(',');
os.Flush();
return 0;
else
return os.GetErrorValue();
}
using nlib_ns::threading::SimpleCriticalSection;
using nlib_ns::threading::ScopedLock;
ScopedLock<SimpleCriticalSection> l(receive_lock_);
if (stream_.Peek() < 0) {
*n = 0;
return 0;
}
int c;
size_t nBytes = 0;
for (;;) {
c = stream_.Read();
if (c == ':') break;
size_t newVal = nBytes * 10 + c - '0';
if (newVal < nBytes) return EILSEQ;
nBytes = newVal;
}
data->reset(reinterpret_cast<uint8_t*>(
nlib_malloc(nBytes)));
if (!*data) return ENOMEM;
if (nBytes != stream_.Read(data->get(), nBytes)) return EILSEQ;
if (stream_.Read() != ',') return EILSEQ;
*n = nBytes;
return 0;
}
void Close() {
nlib_closesocket(socket_);
nlib_closesocket(acceptor_);
}
private:
nlib_sock acceptor_;
nlib_sock socket_;
nlib_ns::TcpInputStream stream_;
nlib_ns::threading::SimpleCriticalSection send_lock_;
nlib_ns::threading::SimpleCriticalSection receive_lock_;
};
#endif
#ifndef DONT_USE_SOCKET
typedef ServerEndByTcp ServerEnd;
#else
typedef ServerEndByMemory ServerEnd;
#endif
UniquePtr<ServerEnd> g_ServerEnd;
int g_Counter = 0;
struct Msg {
std::string uri;
DateTime datetime;
std::string text;
public:
Msg(DateTime datetime_, const char* text_) : datetime(datetime_), text(text_) {
char buf[128];
++g_Counter;
uri = buf;
}
bool operator<(
const Msg& rhs)
const {
return datetime < rhs.datetime; }
bool ToMpObject(MpObject* obj) {
char dt[32];
datetime.ToW3cDtf(dt);
std::map<std::string, std::string> m;
m["datetime"] = dt;
m["uri"] = uri;
m["text"] = text;
MpObject tmp;
tmp.swap(*obj);
return true;
}
};
class MsgDb {
public:
~MsgDb() {
std::vector<Msg*>::iterator it = data_.begin();
std::vector<Msg*>::iterator end = data_.end();
for (; it != end; ++it) {
delete *it;
}
}
private:
Msg* FindByUri(const char* uri) {
std::string uristr(uri);
std::vector<Msg*>::iterator it = data_.begin();
std::vector<Msg*>::iterator end = data_.end();
for (; it != end; ++it) {
if ((*it)->uri == uristr) {
return *it;
}
}
return NULL;
}
bool DeleteByUri(const char* uri) {
std::string uristr(uri);
std::vector<Msg*>::iterator it = data_.begin();
std::vector<Msg*>::iterator end = data_.end();
for (; it != end; ++it) {
if ((*it)->uri == uristr) {
delete *it;
data_.erase(it);
return true;
}
}
return false;
}
static bool LessFunc(Msg* lhs, Msg* rhs) { return *lhs < *rhs; }
public:
std::string Create(DateTime datetime, const char* text) {
data_.push_back(new Msg(datetime, text));
std::string rval = data_.back()->uri;
std::sort(data_.begin(), data_.end(), LessFunc);
return rval;
}
bool Read(MpObject* obj)
const {
uint32_t n = static_cast<uint32_t>(data_.size());
for (uint32_t i = 0; i < n; ++i) {
data_[i]->ToMpObject(obj->GetArrayItem(i));
}
return true;
}
bool Update(const std::string& uri, DateTime datetime, const std::string& text) {
Msg* msg = FindByUri(uri.c_str());
if (!msg) return false;
msg->uri = uri;
msg->datetime = datetime;
msg->text = text;
std::sort(data_.begin(), data_.end(), LessFunc);
return true;
}
bool Delete(const char* uri) { return DeleteByUri(uri); }
private:
std::vector<Msg*> data_;
} g_MsgDb;
bool GetDateTimeField(const MpObject& param, DateTime* dt) {
const MpObject* datetimeObj = param.GetMapItem("datetime");
if (!datetimeObj) return false;
std::string dtstr;
if (
nlib_is_error(DateTime::Parse(dtstr.c_str(), dt, &span)))
return false;
return true;
}
bool GetTextField(const MpObject& param, std::string* text) {
const MpObject* txtObj = param.GetMapItem("text");
if (!txtObj) return false;
return true;
}
bool GetUriField(const MpObject& param, std::string* uri) {
const MpObject* uriObj = param.GetMapItem("uri");
if (!uriObj) return false;
return true;
}
DateTime dt;
std::string text;
std::string uri = g_MsgDb.Create(dt, text.c_str());
MpObject obj;
response.MoveResultFrom(obj);
}
MpObject obj;
response.MoveResultFrom(obj);
}
std::string uri;
std::string text;
DateTime dt;
if (!g_MsgDb.Update(uri, dt, text)) {
response.SetError(-32000, "uri not found");
}
}
std::string uritext;
if (!g_MsgDb.Delete(uritext.c_str())) {
response.SetError(-32000, "uri not found");
}
}
void ProcessRequest() {
ReallocOutputStream::UniquePtrType reqBytes;
size_t n = 0;
ReallocOutputStream::UniquePtrType responseBytes;
errno_t e = g_ServerEnd->Receive(&reqBytes, &n);
g_ServerEnd->Send(&responseBytes, rn);
}
}
g_ServerEnd.reset(new ServerEnd());
if (!g_ServerEnd) {
return ENOMEM;
}
e = g_ServerEnd->Init();
return e;
}
return e;
}
return e;
}
return e;
}
return e;
}
return 0;
}
volatile bool g_ShutdownServer = false;
void ServerThread() {
e = g_ServerEnd->Accept();
return;
}
for (;;) {
ProcessRequest();
if (g_ShutdownServer) break;
}
g_ServerEnd->Close();
g_ServerEnd.reset();
}