nlib
msgpack/jsonrpc/server.cpp

Server-side JSON-RPC sample.

/*---------------------------------------------------------------------------*
Project: CrossRoad
Copyright (C)2012-2016 Nintendo. All rights reserved.
These coded instructions, statements, and computer programs contain
proprietary information of Nintendo of America Inc. and/or Nintendo
Company Ltd., and are protected by Federal copyright law. They may
not be disclosed to third parties or copied or duplicated in any form,
in whole or in part, without the prior written consent of Nintendo.
*---------------------------------------------------------------------------*/
#include <algorithm>
#include <map>
#include <string>
#include <vector>
#include "jsonrpc.h" // NOLINT
#ifndef DONT_USE_SOCKET
#include "nn/nlib/TcpInputStream.h"
#include "nn/nlib/TcpOutputStream.h"
#endif
using nlib_ns::msgpack::MpObject;
using nlib_ns::msgpack::MpObjectKv;
using nlib_ns::msgpack::jsonrpc::JsonRpcResponse;
namespace {
#ifdef DONT_USE_SOCKET
class ServerEndByMemory {
public:
errno_t Init() { return 0; }
errno_t Accept() { return 0; }
g_s2c.Send(data, n);
return 0;
}
while (!g_c2s.Receive(data, n)) {
if (g_ShutdownServer) {
*n = 0;
data->reset();
return 0;
}
}
return 0;
}
void Close() {}
};
#else
class ServerEndByTcp {
public:
errno_t Init() {
e = nlib_socket(&acceptor_, AF_INET, SOCK_STREAM, 0);
if (nlib_is_error(e)) return e;
int yes = 1;
e = nlib_setsockopt(acceptor_, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes));
if (nlib_is_error(e)) return e;
nlib_sockaddr_in addr;
addr.sin_family = AF_INET;
addr.sin_port = nlib_htons(NLIB_SOCKPORT_SAMPLE);
addr.sin_addr.s_addr = nlib_htonl(INADDR_ANY);
e = nlib_bind(acceptor_, reinterpret_cast<nlib_sockaddr*>(&addr), sizeof(addr));
if (nlib_is_error(e)) return e;
e = nlib_listen(acceptor_, 1);
return e;
}
errno_t Accept() {
errno_t e = nlib_accept(&socket_, acceptor_, NULL, NULL, 0);
if (nlib_is_error(e)) return e;
stream_.Init(socket_);
return 0;
}
// This sample sends and receives the data in netstring method.
// you can also close a socket every time.
// JSON-RPC does not refer to them.
using nlib_ns::threading::SimpleCriticalSection;
using nlib_ns::threading::ScopedLock;
ScopedLock<SimpleCriticalSection> l(send_lock_);
nlib_ns::TcpOutputStream os;
os.Init(socket_);
{
writer.Init();
writer.Open(&os);
writer.WriteFormat("%" PRIuS ":", n);
}
os.Write(data->get(), n);
os.Write(',');
os.Flush();
if (!nlib_is_error(os))
return 0;
else
return os.GetErrorValue();
}
using nlib_ns::threading::SimpleCriticalSection;
using nlib_ns::threading::ScopedLock;
ScopedLock<SimpleCriticalSection> l(receive_lock_);
if (stream_.Peek() < 0) {
if (nlib_is_error(stream_)) return EIO;
*n = 0;
return 0;
}
int c;
size_t nBytes = 0;
for (;;) {
c = stream_.Read();
if (c == ':') break;
if (!nlib_isdigit(c)) return EILSEQ;
size_t newVal = nBytes * 10 + c - '0';
if (newVal < nBytes) return EILSEQ;
nBytes = newVal;
}
data->reset(reinterpret_cast<uint8_t*>(nlib_malloc(nBytes)));
if (!*data) return ENOMEM;
if (nBytes != stream_.Read(data->get(), nBytes)) return EILSEQ;
if (stream_.Read() != ',') return EILSEQ;
*n = nBytes;
return 0;
}
void Close() {
nlib_closesocket(socket_);
nlib_closesocket(acceptor_);
}
private:
nlib_sock acceptor_;
nlib_sock socket_;
nlib_ns::TcpInputStream stream_;
nlib_ns::threading::SimpleCriticalSection send_lock_;
nlib_ns::threading::SimpleCriticalSection receive_lock_;
};
#endif
#ifndef DONT_USE_SOCKET
typedef ServerEndByTcp ServerEnd;
#else
typedef ServerEndByMemory ServerEnd;
#endif
UniquePtr<ServerEnd> g_ServerEnd;
int g_Counter = 0;
struct Msg {
std::string uri;
DateTime datetime;
std::string text;
public:
Msg(DateTime datetime_, const char* text_) : datetime(datetime_), text(text_) {
char buf[128];
::nlib_ns::SnPrintf(buf, "http://example.com/board/%d", g_Counter);
++g_Counter;
uri = buf;
}
bool operator<(const Msg& rhs) const { return datetime < rhs.datetime; }
bool ToMpObject(MpObject* obj) {
// { "datetime": ...., "uri": ..., "text": .... }
char dt[32];
datetime.ToW3cDtf(dt);
std::map<std::string, std::string> m;
m["datetime"] = dt;
m["uri"] = uri;
m["text"] = text;
MpObject tmp;
if (nlib_is_error(tmp.Box(m))) return false;
tmp.swap(*obj);
return true;
}
};
class MsgDb {
public:
~MsgDb() {
std::vector<Msg*>::iterator it = data_.begin();
std::vector<Msg*>::iterator end = data_.end();
for (; it != end; ++it) {
delete *it;
}
}
private:
Msg* FindByUri(const char* uri) {
std::string uristr(uri);
std::vector<Msg*>::iterator it = data_.begin();
std::vector<Msg*>::iterator end = data_.end();
for (; it != end; ++it) {
if ((*it)->uri == uristr) {
return *it;
}
}
return NULL;
}
bool DeleteByUri(const char* uri) {
std::string uristr(uri);
std::vector<Msg*>::iterator it = data_.begin();
std::vector<Msg*>::iterator end = data_.end();
for (; it != end; ++it) {
if ((*it)->uri == uristr) {
delete *it;
data_.erase(it);
return true;
}
}
return false;
}
static bool LessFunc(Msg* lhs, Msg* rhs) { return *lhs < *rhs; }
public:
std::string Create(DateTime datetime, const char* text) {
data_.push_back(new Msg(datetime, text));
std::string rval = data_.back()->uri;
std::sort(data_.begin(), data_.end(), LessFunc);
return rval;
}
bool Read(MpObject* obj) const {
uint32_t n = static_cast<uint32_t>(data_.size());
errno_t e = obj->InitArray(n);
if (nlib_is_error(e)) return false;
for (uint32_t i = 0; i < n; ++i) {
data_[i]->ToMpObject(obj->GetArrayItem(i));
}
return true;
}
bool Update(const std::string& uri, DateTime datetime, const std::string& text) {
Msg* msg = FindByUri(uri.c_str());
if (!msg) return false;
msg->uri = uri;
msg->datetime = datetime;
msg->text = text;
std::sort(data_.begin(), data_.end(), LessFunc);
return true;
}
bool Delete(const char* uri) { return DeleteByUri(uri); }
private:
std::vector<Msg*> data_;
} g_MsgDb;
bool GetDateTimeField(const MpObject& param, DateTime* dt) {
const MpObject* datetimeObj = param.GetMapItem("datetime");
if (!datetimeObj) return false;
std::string dtstr;
if (nlib_is_error(datetimeObj->Unbox(&dtstr))) return false;
if (nlib_is_error(DateTime::Parse(dtstr.c_str(), dt, &span))) return false;
return true;
}
bool GetTextField(const MpObject& param, std::string* text) {
const MpObject* txtObj = param.GetMapItem("text");
if (!txtObj) return false;
if (nlib_is_error(txtObj->Unbox(text))) return false;
return true;
}
bool GetUriField(const MpObject& param, std::string* uri) {
const MpObject* uriObj = param.GetMapItem("uri");
if (!uriObj) return false;
if (nlib_is_error(uriObj->Unbox(uri))) return false;
return true;
}
JsonRpcServerFuncCallError Create(MpObject& param, JsonRpcResponse& response) { // NOLINT
// STRING create({datetime:STRING, text:STRING});
DateTime dt;
if (!GetDateTimeField(param, &dt)) return JSONSERVER_INVALID_PARAMS;
std::string text;
if (!GetTextField(param, &text)) return JSONSERVER_INVALID_PARAMS;
// Creates a message and receives URI as the key for it.
std::string uri = g_MsgDb.Create(dt, text.c_str());
MpObject obj;
if (nlib_is_error(obj.Box(uri))) return JSONSERVER_INTERNAL_ERROR;
response.MoveResultFrom(obj);
// You can make the client timeout by delaying the response.
// ::nlib_ns::threading::this_thread::SleepMilliSeconds(500);
return JSONSERVER_OK;
}
JsonRpcServerFuncCallError Read(MpObject&, JsonRpcResponse& response) { // NOLINT
// [{uri:STRING, datetime:STRING, text:STRING}, ...] read()
// This function simplly responds all the messages which the server stores.
// You can implement extended version of this function
// by analyzing the parameters in MpObject.
MpObject obj;
if (!g_MsgDb.Read(&obj)) return JSONSERVER_INTERNAL_ERROR;
response.MoveResultFrom(obj);
return JSONSERVER_OK;
}
JsonRpcServerFuncCallError Update(MpObject& param, JsonRpcResponse& response) { // NOLINT
// void update([{uri:STRING, datetime:STRING, text:STRING}, ...])
std::string uri;
if (!GetUriField(param, &uri)) return JSONSERVER_INVALID_PARAMS;
std::string text;
if (!GetTextField(param, &text)) return JSONSERVER_INVALID_PARAMS;
DateTime dt;
if (!GetDateTimeField(param, &dt)) return JSONSERVER_INVALID_PARAMS;
if (!g_MsgDb.Update(uri, dt, text)) {
// Returns JSONSERVER_OK if you set non-predefined error.
response.SetError(-32000, "uri not found");
}
return JSONSERVER_OK;
}
JsonRpcServerFuncCallError Delete(MpObject& param, JsonRpcResponse& response) { // NOLINT
// void delete({uri:STRING})
std::string uritext;
if (!GetUriField(param, &uritext)) return JSONSERVER_INVALID_PARAMS;
if (!g_MsgDb.Delete(uritext.c_str())) {
// Returns JSONSERVER_OK if you set non-predefined error.
response.SetError(-32000, "uri not found");
}
return JSONSERVER_OK;
}
void ProcessRequest() {
ReallocOutputStream::UniquePtrType reqBytes;
size_t n = 0;
ReallocOutputStream::UniquePtrType responseBytes;
errno_t e = g_ServerEnd->Receive(&reqBytes, &n);
if (nlib_is_error(e) || n == 0) return;
size_t rn = JsonRpcServerExec(reqBytes.get(), n, &responseBytes);
g_ServerEnd->Send(&responseBytes, rn);
}
} // namespace
errno_t InitServer() {
g_ServerEnd.reset(new ServerEnd());
if (!g_ServerEnd) {
return ENOMEM;
}
e = g_ServerEnd->Init();
if (nlib_is_error(e)) {
nlib_printf("SERVER ERROR: %d\n", e);
return e;
}
e = AddJsonRpcServerMethod("create", Create);
if (nlib_is_error(e)) {
nlib_printf("SERVER ERROR: %d\n", e);
return e;
}
if (nlib_is_error(e)) {
nlib_printf("SERVER ERROR: %d\n", e);
return e;
}
e = AddJsonRpcServerMethod("update", Update);
if (nlib_is_error(e)) {
nlib_printf("SERVER ERROR: %d\n", e);
return e;
}
e = AddJsonRpcServerMethod("delete", Delete);
if (nlib_is_error(e)) {
nlib_printf("SERVER ERROR: %d\n", e);
return e;
}
return 0;
}
volatile bool g_ShutdownServer = false;
void ServerThread() {
// NOTE:
// This sample uses single connection.
e = g_ServerEnd->Accept();
if (nlib_is_error(e)) {
nlib_printf("SERVER ERROR: %d\n", e);
return;
}
for (;;) {
ProcessRequest();
if (g_ShutdownServer) break;
}
g_ServerEnd->Close();
g_ServerEnd.reset();
nlib_printf("ServerThread() end\n");
}