nlib
msgpack/jsonrpc/server.cpp

JSON-RPCサンプルのサーバー側です。

/*--------------------------------------------------------------------------------*
Project: CrossRoad
Copyright (C)Nintendo All rights reserved.
These coded instructions, statements, and computer programs contain proprietary
information of Nintendo and/or its licensed developers and are protected by
national and international copyright laws. They may not be disclosed to third
parties or copied or duplicated in any form, in whole or in part, without the
prior written consent of Nintendo.
The content herein is highly confidential and should be handled accordingly.
*--------------------------------------------------------------------------------*/
#include <algorithm>
#include <map>
#include <string>
#include <vector>
#include "jsonrpc.h" // NOLINT
#ifndef DONT_USE_SOCKET
#include "nn/nlib/TcpInputStream.h"
#include "nn/nlib/TcpOutputStream.h"
#endif
using nlib_ns::msgpack::MpObject;
using nlib_ns::msgpack::MpObjectKv;
using nlib_ns::msgpack::jsonrpc::JsonRpcResponse;
namespace {
#ifdef DONT_USE_SOCKET
class ServerEndByMemory {
public:
errno_t Init() { return 0; }
errno_t Accept() { return 0; }
g_s2c.Send(data, n);
return 0;
}
while (!g_c2s.Receive(data, n)) {
if (g_shutdown_server) {
*n = 0;
data->reset();
return 0;
}
}
return 0;
}
void Close() {}
};
#else
class ServerEndByTcp {
public:
errno_t Init() {
e = nlib_socket(&acceptor_, AF_INET, SOCK_STREAM, 0);
if (nlib_is_error(e)) return e;
int yes = 1;
e = nlib_setsockopt(acceptor_, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes));
if (nlib_is_error(e)) return e;
nlib_sockaddr_in addr;
addr.sin_family = AF_INET;
addr.sin_port = nlib_htons(NLIB_SOCKPORT_SAMPLE);
addr.sin_addr.s_addr = nlib_htonl(INADDR_ANY);
e = nlib_bind(acceptor_, reinterpret_cast<nlib_sockaddr*>(&addr), sizeof(addr));
if (nlib_is_error(e)) return e;
e = nlib_listen(acceptor_, 1);
return e;
}
errno_t Accept() {
errno_t e = nlib_accept(&socket_, acceptor_, NULL, NULL, 0);
if (nlib_is_error(e)) return e;
stream_.Init(socket_);
return 0;
}
// This sample sends and receives the data in netstring method.
// you can also close a socket every time.
// JSON-RPC does not refer to them.
using nlib_ns::threading::SimpleCriticalSection;
using nlib_ns::threading::ScopedLock;
ScopedLock<SimpleCriticalSection> l(send_lock_);
nlib_ns::TcpOutputStream os;
os.Init(socket_);
{
writer.Init();
writer.Open(&os);
writer.WriteFormat("%" PRIuS ":", n);
}
os.Write(data->get(), n);
os.Write(',');
os.Flush();
if (!nlib_is_error(os))
return 0;
else
return os.GetErrorValue();
}
using nlib_ns::threading::SimpleCriticalSection;
using nlib_ns::threading::ScopedLock;
ScopedLock<SimpleCriticalSection> l(receive_lock_);
if (stream_.Peek() < 0) {
if (nlib_is_error(stream_)) return EIO;
*n = 0;
return 0;
}
int c;
size_t nbytes = 0;
for (;;) {
c = stream_.Read();
if (c == ':') break;
if (!nlib_isdigit(c)) return EILSEQ;
size_t new_val = nbytes * 10 + c - '0';
if (new_val < nbytes) return EILSEQ;
nbytes = new_val;
}
data->reset(reinterpret_cast<uint8_t*>(nlib_malloc(nbytes)));
if (!*data) return ENOMEM;
if (nbytes != stream_.Read(data->get(), nbytes)) return EILSEQ;
if (stream_.Read() != ',') return EILSEQ;
*n = nbytes;
return 0;
}
void Close() {
nlib_closesocket(socket_);
nlib_closesocket(acceptor_);
}
private:
nlib_sock acceptor_;
nlib_sock socket_;
nlib_ns::TcpInputStream stream_;
nlib_ns::threading::SimpleCriticalSection send_lock_;
nlib_ns::threading::SimpleCriticalSection receive_lock_;
};
#endif
#ifndef DONT_USE_SOCKET
typedef ServerEndByTcp ServerEnd;
#else
typedef ServerEndByMemory ServerEnd;
#endif
UniquePtr<ServerEnd> g_ServerEnd;
int g_counter = 0;
struct Msg {
std::string uri;
DateTime datetime;
std::string text;
public:
Msg(DateTime datetime_, const char* text_) : datetime(datetime_), text(text_) {
char buf[128];
::nlib_ns::SnPrintf(buf, "http://example.com/board/%d", g_counter);
++g_counter;
uri = buf;
}
bool operator<(const Msg& rhs) const { return datetime < rhs.datetime; }
bool ToMpObject(MpObject* obj) {
// { "datetime": ...., "uri": ..., "text": .... }
char dt[32];
datetime.ToW3cDtf(dt);
std::map<std::string, std::string> m;
m["datetime"] = dt;
m["uri"] = uri;
m["text"] = text;
MpObject tmp;
if (nlib_is_error(tmp.Box(m))) return false;
obj->assign(tmp, nlib_ns::move_tag());
return true;
}
};
class MsgDb {
public:
~MsgDb() {
std::vector<Msg*>::iterator it = data_.begin();
std::vector<Msg*>::iterator end = data_.end();
for (; it != end; ++it) {
delete *it;
}
}
private:
Msg* FindByUri(const char* uri) {
std::string uristr(uri);
std::vector<Msg*>::iterator it = data_.begin();
std::vector<Msg*>::iterator end = data_.end();
for (; it != end; ++it) {
if ((*it)->uri == uristr) {
return *it;
}
}
return NULL;
}
bool DeleteByUri(const char* uri) {
std::string uristr(uri);
std::vector<Msg*>::iterator it = data_.begin();
std::vector<Msg*>::iterator end = data_.end();
for (; it != end; ++it) {
if ((*it)->uri == uristr) {
delete *it;
data_.erase(it);
return true;
}
}
return false;
}
static bool LessFunc(Msg* lhs, Msg* rhs) { return *lhs < *rhs; }
public:
std::string Create(DateTime datetime, const char* text) {
data_.push_back(new Msg(datetime, text));
std::string rval = data_.back()->uri;
std::sort(data_.begin(), data_.end(), LessFunc);
return rval;
}
bool Read(MpObject* obj) const {
uint32_t n = static_cast<uint32_t>(data_.size());
errno_t e = obj->InitArray(n);
if (nlib_is_error(e)) return false;
for (uint32_t i = 0; i < n; ++i) {
data_[i]->ToMpObject(obj->GetArrayItem(i));
}
return true;
}
bool Update(const std::string& uri, DateTime datetime, const std::string& text) {
Msg* msg = FindByUri(uri.c_str());
if (!msg) return false;
msg->uri = uri;
msg->datetime = datetime;
msg->text = text;
std::sort(data_.begin(), data_.end(), LessFunc);
return true;
}
bool Delete(const char* uri) { return DeleteByUri(uri); }
private:
std::vector<Msg*> data_;
} g_msgdb;
bool GetDateTimeField(const MpObject& param, DateTime* dt) {
const MpObject* datetime_obj = param.GetMapItem("datetime");
if (!datetime_obj) return false;
std::string dtstr;
if (nlib_is_error(datetime_obj->Unbox(&dtstr))) return false;
if (nlib_is_error(DateTime::Parse(dtstr.c_str(), dt, &span))) return false;
return true;
}
bool GetTextField(const MpObject& param, std::string* text) {
const MpObject* text_obj = param.GetMapItem("text");
if (!text_obj) return false;
if (nlib_is_error(text_obj->Unbox(text))) return false;
return true;
}
bool GetUriField(const MpObject& param, std::string* uri) {
const MpObject* uri_obj = param.GetMapItem("uri");
if (!uri_obj) return false;
if (nlib_is_error(uri_obj->Unbox(uri))) return false;
return true;
}
JsonRpcServerFuncCallError Create(MpObject& param, JsonRpcResponse& response) { // NOLINT
// STRING create({datetime:STRING, text:STRING});
DateTime dt;
if (!GetDateTimeField(param, &dt)) return kJsonServerInvalidParams;
std::string text;
if (!GetTextField(param, &text)) return kJsonServerInvalidParams;
// Creates a message and receives URI as the key for it.
std::string uri = g_msgdb.Create(dt, text.c_str());
MpObject obj;
if (nlib_is_error(obj.Box(uri))) return kJsonServerInternalError;
response.MoveResultFrom(obj);
// You can make the client timeout by delaying the response.
// ::nlib_ns::threading::this_thread::SleepMilliSeconds(500);
return kJsonServerOk;
}
JsonRpcServerFuncCallError Read(MpObject&, JsonRpcResponse& response) { // NOLINT
// [{uri:STRING, datetime:STRING, text:STRING}, ...] read()
// This function simplly responds all the messages which the server stores.
// You can implement extended version of this function
// by analyzing the parameters in MpObject.
MpObject obj;
if (!g_msgdb.Read(&obj)) return kJsonServerInternalError;
response.MoveResultFrom(obj);
return kJsonServerOk;
}
JsonRpcServerFuncCallError Update(MpObject& param, JsonRpcResponse& response) { // NOLINT
// void update([{uri:STRING, datetime:STRING, text:STRING}, ...])
std::string uri;
if (!GetUriField(param, &uri)) return kJsonServerInvalidParams;
std::string text;
if (!GetTextField(param, &text)) return kJsonServerInvalidParams;
DateTime dt;
if (!GetDateTimeField(param, &dt)) return kJsonServerInvalidParams;
if (!g_msgdb.Update(uri, dt, text)) {
// Returns kJsonServerOk if you set non-predefined error.
response.SetError(-32000, "uri not found");
}
return kJsonServerOk;
}
JsonRpcServerFuncCallError Delete(MpObject& param, JsonRpcResponse& response) { // NOLINT
// void delete({uri:STRING})
std::string uritext;
if (!GetUriField(param, &uritext)) return kJsonServerInvalidParams;
if (!g_msgdb.Delete(uritext.c_str())) {
// Returns kJsonServerOk if you set non-predefined error.
response.SetError(-32000, "uri not found");
}
return kJsonServerOk;
}
void ProcessRequest() {
ReallocOutputStream::UniquePtrType request_bytes;
size_t n = 0;
ReallocOutputStream::UniquePtrType response_bytes;
errno_t e = g_ServerEnd->Receive(&request_bytes, &n);
if (nlib_is_error(e) || n == 0) return;
size_t rn = JsonRpcServerExec(request_bytes.get(), n, &response_bytes);
g_ServerEnd->Send(&response_bytes, rn);
}
} // namespace
errno_t InitServer() {
g_ServerEnd.reset(new ServerEnd());
if (!g_ServerEnd) {
return ENOMEM;
}
e = g_ServerEnd->Init();
if (nlib_is_error(e)) {
nlib_printf("SERVER ERROR: %d\n", e);
return e;
}
e = AddJsonRpcServerMethod("create", Create);
if (nlib_is_error(e)) {
nlib_printf("SERVER ERROR: %d\n", e);
return e;
}
if (nlib_is_error(e)) {
nlib_printf("SERVER ERROR: %d\n", e);
return e;
}
e = AddJsonRpcServerMethod("update", Update);
if (nlib_is_error(e)) {
nlib_printf("SERVER ERROR: %d\n", e);
return e;
}
e = AddJsonRpcServerMethod("delete", Delete);
if (nlib_is_error(e)) {
nlib_printf("SERVER ERROR: %d\n", e);
return e;
}
return 0;
}
volatile bool g_shutdown_server = false;
void ServerThread(void*) {
// NOTE:
// This sample uses single connection.
e = g_ServerEnd->Accept();
if (nlib_is_error(e)) {
nlib_printf("SERVER ERROR: %d\n", e);
return;
}
for (;;) {
ProcessRequest();
if (g_shutdown_server) break;
}
g_ServerEnd->Close();
g_ServerEnd.reset();
nlib_printf("ServerThread() end\n");
}