nlib
msgpack/jsonrpc/jsonrpc.cpp

JSON-RPCサンプルのクライアント側です。

JSON-RPCのクライアント側とサーバー側を別々のスレッドに記述し、JSON-RPCで通信を行うサンプルです。 nlibのソケットライブラリがサポートしている場合はソケットで通信を行います。

なお、JSON-RPCは必ずしもhttpやTCP上で通信を行う必要はなく、JSON-RPC 2.0ではどのプロトコル上で通信を行うべきかについては記述されていません。 このサンプルではsocketnlib経由で使うことができない場合(NLIB_NO_SOCKET)、メモリ上でJSON-RPCのリクエストとレスポンスをやりとりして動作しています。

/*---------------------------------------------------------------------------*
Project: CrossRoad
Copyright (C)2012-2016 Nintendo. All rights reserved.
These coded instructions, statements, and computer programs contain
proprietary information of Nintendo of America Inc. and/or Nintendo
Company Ltd., and are protected by Federal copyright law. They may
not be disclosed to third parties or copied or duplicated in any form,
in whole or in part, without the prior written consent of Nintendo.
*---------------------------------------------------------------------------*/
#include <string>
#include <map>
#include "jsonrpc.h" // NOLINT
#include "nn/nlib/Nlist.h"
#ifndef DONT_USE_SOCKET
#include "nn/nlib/TcpInputStream.h"
#include "nn/nlib/TcpOutputStream.h"
#endif
using nlib_ns::threading::Thread;
using nlib_ns::threading::Future;
using nlib_ns::msgpack::MpObject;
using nlib_ns::msgpack::jsonrpc::JsonRpcResponse;
using nlib_ns::msgpack::jsonrpc::JsonRpcClient;
using nlib_ns::msgpack::jsonrpc::JsonRpcRequest;
using nlib_ns::msgpack::jsonrpc::JsonRpcRequestWriter;
using nlib_ns::msgpack::jsonrpc::JsonRpcResponseHandler;
Tunnel g_c2s; // data queue from the client to the server
Tunnel g_s2c; // data queue from the server to the client
namespace {
volatile bool g_ShutdownClient = false;
#ifdef DONT_USE_SOCKET
class ClientEndByMemory {
public:
errno_t Init() { return 0; }
errno_t Connect() { return 0; }
g_c2s.Send(data, n);
return 0;
}
while (!g_s2c.Receive(data, n)) {
if (g_ShutdownClient) {
*n = 0;
return 0;
}
}
return 0;
}
bool IsOpen() { return true; }
errno_t Close() { return 0; }
};
#else
class ClientEndByTcp {
public:
ClientEndByTcp() : socket_(NLIB_SOCKET_INVALID) {}
errno_t Init() { return nlib_socket(&socket_, AF_INET, SOCK_STREAM | NLIB_SOCK_NONBLOCK, 0); }
errno_t Connect() {
nlib_sockaddr_in addr;
addr.sin_family = AF_INET;
addr.sin_port = nlib_htons(NLIB_SOCKPORT_SAMPLE);
addr.sin_addr.s_addr = nlib_htonl(INADDR_LOOPBACK);
e = nlib_connect_for(socket_, reinterpret_cast<nlib_sockaddr*>(&addr), sizeof(addr),
0, 500 * 10000);
if (nlib_is_error(e)) {
socket_ = NLIB_SOCKET_INVALID;
return e;
}
return stream_.Init(socket_);
}
using nlib_ns::threading::SimpleCriticalSection;
using nlib_ns::threading::ScopedLock;
ScopedLock<SimpleCriticalSection> l(send_lock_);
// transmit by netstring
nlib_ns::TcpOutputStream os;
os.Init(socket_);
{
writer.Init();
writer.Open(&os);
writer.WriteFormat("%" PRIuS ":", n);
}
os.Write(data->get(), n);
os.Write(',');
os.Flush();
if (!nlib_is_error(os))
return 0;
else
return os.GetErrorValue();
}
using nlib_ns::threading::SimpleCriticalSection;
using nlib_ns::threading::ScopedLock;
ScopedLock<SimpleCriticalSection> l(receive_lock_);
if (stream_.Peek() < 0) {
if (nlib_is_error(stream_)) return EIO;
*n = 0;
return 0;
}
int c;
size_t nbytes = 0;
for (;;) {
c = stream_.Read();
if (c == ':') break;
if (!nlib_isdigit(c)) return EILSEQ;
size_t newVal = nbytes * 10 + c - '0';
if (newVal < nbytes) return EILSEQ;
nbytes = newVal;
}
data->reset(reinterpret_cast<uint8_t*>(nlib_malloc(nbytes)));
if (!*data) return ENOMEM;
if (nbytes != stream_.Read(data->get(), nbytes)) return EILSEQ;
if (stream_.Read() != ',') return EILSEQ;
*n = nbytes;
return 0;
}
bool IsOpen() { return socket_ != -1; }
errno_t Close() { return nlib_shutdownsocket(socket_, SHUT_RDWR); }
private:
nlib_sock socket_;
nlib_ns::TcpInputStream stream_;
nlib_ns::threading::SimpleCriticalSection send_lock_;
nlib_ns::threading::SimpleCriticalSection receive_lock_;
};
#endif
#ifndef DONT_USE_SOCKET
typedef ClientEndByTcp ClientEnd;
#else
typedef ClientEndByMemory ClientEnd;
#endif
UniquePtr<ClientEnd> g_ClientEnd;
JsonRpcClient g_Client;
errno_t SendRequest(const char* method, uint32_t reqid, MpObject& params) { // NOLINT
Nlist<JsonRpcRequest> reqList;
JsonRpcRequest* req = reqList.push_back();
if (!req) return ENOMEM;
req->SetId(reqid); // Sets the request id.
if (!req->SetMethod(method)) { // Sets the method name.
return ENOMEM;
}
req->MoveParamsFrom(params); // Moves MpObject as parameters.
// NOTE:
// You can write the data in msgpack if the argument of the constructor is 'true'.
// The server responds in JSON if it receives JSON,
// it responds in msgpack if it receives msgpack.
JsonRpcRequestWriter w(false);
// You can write a single request or a batch request in the same manner.
e = w.BeginWriteRequest(static_cast<uint32_t>(reqList.size()));
if (nlib_is_error(e)) return e;
Nlist<JsonRpcRequest>::const_iterator it = reqList.begin();
Nlist<JsonRpcRequest>::const_iterator end = reqList.end();
for (; it != end; ++it) {
e = w.WriteRequest(*it);
if (nlib_is_error(e)) return e;
}
// You can obtain the written request on the memory.
size_t n;
ReallocOutputStream::UniquePtrType reqBytes;
e = w.EndWriteRequest(&reqBytes, &n);
if (nlib_is_error(e)) return e;
// Sends the serialized data to the server via socket or others.
return g_ClientEnd->Send(&reqBytes, n);
}
void ResponseHandlerThread() {
ReallocOutputStream::UniquePtrType data;
for (;;) {
size_t n = 0;
errno_t e = g_ClientEnd->Receive(&data, &n);
if (nlib_is_error(e)) {
if (g_ShutdownClient || !g_ClientEnd->IsOpen()) break;
nlib_printf("CLIENT ERROR: %d\n", e);
} else if (n == 0) {
break;
} else {
// Makes g_Client to resolve the response, and enables the corresponding future object.
e = g_Client.ResolveResponse(data.get(), n);
if (nlib_is_error(e)) {
nlib_printf("CLIENT ERROR: %d\n", e);
}
}
}
nlib_printf("ResponseHandlerThread() end\n");
}
} // namespace
namespace {
std::string CreateMessage(DateTime dt, std::string msg) {
// Sends date, time, and text to the server, receives an URI as a key for them.
MpObject obj;
char dtcstr[32];
{
// obj = { "datetime" : "...", "text": msg }
// NOTE: you can also desribe them without map or string.
std::map<std::string, std::string> params;
e = dt.ToW3cDtf(dtcstr);
NLIB_ASSERT_NOERR(e);
params["datetime"] = dtcstr;
params["text"] = msg;
obj.Box(params);
}
// Generates an unique request id.
NLIB_ASSERT(g_Client);
reqid_t reqid = g_Client.GenerateId();
// Registers the handler associated with the request id.
JsonRpcClient::FutureType future;
e = g_Client.GetFutureForId(&future, reqid);
if (nlib_is_error(e)) return std::string();
e = SendRequest("create", reqid, obj);
if (nlib_is_error(e)) {
// You have to abort the request of g_Client if SendRequest fails.
g_Client.Abort(reqid);
return std::string();
}
if (nlib_is_error(future.WaitFor(TimeSpan(0, 3)))) {
// You have to abort the request of g_Client if timeout occurs.
// Otherwise, g_Client may wait reqid forever.
g_Client.Abort(reqid);
return std::string();
}
JsonRpcResponse* response = future.Get();
std::string uri;
if (!response || nlib_is_error(*response)) {
} else {
e = response->GetMpObject().Unbox(&uri);
NLIB_ASSERT_NOERR(e);
nlib_printf("===create('%s', '%s'): success. uri='%s'\n", dtcstr, msg.c_str(), uri.c_str());
}
return uri;
}
bool ReadMessage(MpObject* obj) {
NLIB_ASSERT(g_Client);
reqid_t reqid = g_Client.GenerateId();
JsonRpcClient::FutureType future;
e = g_Client.GetFutureForId(&future, reqid);
if (nlib_is_error(e)) return false;
MpObject dummy;
e = SendRequest("read", reqid, dummy);
if (nlib_is_error(e)) {
// You have to abort the request of g_Client if SendRequest fails.
g_Client.Abort(reqid);
return false;
}
e = future.WaitFor(TimeSpan(0, 3));
if (nlib_is_error(e)) {
// You have to abort the request of g_Client if timeout occurs.
// Otherwise, g_Client may wait reqid forever.
g_Client.Abort(reqid);
nlib_printf("===read(): timeout\n");
return false;
}
JsonRpcResponse* response = future.Get();
if (!response || nlib_is_error(*response)) {
nlib_printf("===read(): error '%s'\n",
response ? response->GetErrorMessage() : "response null");
return false;
} else {
MpObject& o = response->GetMpObject();
NLIB_ASSERT(o.IsArray());
uint32_t n = o.GetSize();
nlib_printf("===read(): success messages = %" PRIu32 "\n", n);
for (uint32_t i = 0; i < n; ++i) {
MpObject* item = o.GetArrayItem(i);
std::map<std::string, std::string> params;
e = item->Unbox(&params);
NLIB_ASSERT_NOERR(e);
nlib_printf("#%" PRIu32 ": uri='%s', datetime='%s', text='%s'\n", i,
params["uri"].c_str(), params["datetime"].c_str(), params["text"].c_str());
}
nlib_printf("\n");
if (obj) o.swap(*obj);
return true;
}
}
bool UpdateMessage(std::string uri, DateTime dt, std::string msg) {
// Sends URI, date, time, and text to the server,
// and updates the data associated with URI on the server.
MpObject obj;
char dtcstr[32];
{
std::map<std::string, std::string> params;
e = dt.ToW3cDtf(dtcstr);
NLIB_ASSERT_NOERR(e);
params["datetime"] = dtcstr;
params["text"] = msg;
params["uri"] = uri;
obj.Box(params);
}
NLIB_ASSERT(g_Client);
reqid_t reqid = g_Client.GenerateId();
JsonRpcClient::FutureType future;
e = g_Client.GetFutureForId(&future, reqid);
if (nlib_is_error(e)) return false;
e = SendRequest("update", reqid, obj);
if (nlib_is_error(e)) {
// You have to abort the request of g_Client if SendRequest fails.
g_Client.Abort(reqid);
return false;
}
if (nlib_is_error(future.WaitFor(TimeSpan(0, 3)))) {
// You have to abort the request of g_Client if timeout occurs.
// Otherwise, g_Client may wait reqid forever.
g_Client.Abort(reqid);
return false;
}
JsonRpcResponse* response = future.Get();
if (!response || nlib_is_error(*response)) {
nlib_printf("===update('%s', '%s', '%s'): error '%s'\n", uri.c_str(), dtcstr, msg.c_str(),
response ? response->GetErrorMessage() : "response null");
return false;
} else {
nlib_printf("===update('%s', '%s', '%s'): success\n", uri.c_str(), dtcstr, msg.c_str());
return true;
}
}
bool DeleteMessage(std::string uri) {
// Sends URI to the server, and deletes the data associated with URI on the server.
MpObject obj;
{
std::map<std::string, std::string> params;
params["uri"] = uri;
obj.Box(params);
}
NLIB_ASSERT(g_Client);
reqid_t reqid = g_Client.GenerateId();
JsonRpcClient::FutureType future;
errno_t e = g_Client.GetFutureForId(&future, reqid);
if (nlib_is_error(e)) return false;
e = SendRequest("delete", reqid, obj);
if (nlib_is_error(e)) {
// You have to abort the request of g_Client if SendRequest fails.
g_Client.Abort(reqid);
return false;
}
if (nlib_is_error(future.WaitFor(TimeSpan(0, 3)))) {
// You have to abort the request of g_Client if timeout occurs.
// Otherwise, g_Client may wait reqid forever.
g_Client.Abort(reqid);
return false;
}
JsonRpcResponse* response = future.Get();
if (!response || nlib_is_error(*response)) {
nlib_printf("===delete('%s'): error '%s'\n", uri.c_str(),
response ? response->GetErrorMessage() : "response null");
return false;
} else {
nlib_printf("===delete('%s'): success\n", uri.c_str());
return true;
}
}
} // namespace
const int numConsoleData = 6;
struct ConsoleData {
const char* description;
const char* date;
} consoleData[numConsoleData] = {
{"Cassette Vision(13500 yen)", "1981-07-30"},
{"Pyu-ta(59800 yen)", "1982-08-20"},
{"Faminly Computer(14800 yen)", "1983-07-15"},
{"SG-3000(29800 yen)", "1983-07-15"},
{"Mark III(15000 yen)", "1985-10-20"},
{"Disk System", "1976-02-21"} // the date is incorrect
};
bool Listup() {
Future<bool> future;
MpObject* ptr = NULL;
Async(&future, ReadMessage, ptr);
return future.Get();
}
bool JsonRpc() {
// THREADS:
// main thread: this thread(jsonrpc.cpp)
// serverThread: Receives requests, and make responses respectively(server.cpp)
// responseHandlerThread: Analyzes responses and enalbes
// the corresponding future objects.(jsonrpc.cpp)
// misc: Makes a request and receives a response(jsonrpc.cpp)
e = InitServer();
if (nlib_is_error(e)) return false;
std::string uri1, uri5;
g_ClientEnd.reset(new ClientEnd());
if (!g_ClientEnd) return false;
e = g_ClientEnd->Init();
if (nlib_is_error(e)) return false;
e = g_ClientEnd->Connect();
if (nlib_is_error(e)) return false;
// START Client(response handler) thread
NLIB_ASSERT(!g_Client);
g_Client = JsonRpcClient::OpenClient("myclient");
if (!g_Client) return false;
Thread responseHandlerThread;
e = responseHandlerThread.Start(ResponseHandlerThread);
if (nlib_is_error(e)) {
g_ShutdownClient = true;
g_ClientEnd->Close();
return false;
}
// START Server thread
Thread serverThread;
e = serverThread.Start(ServerThread);
if (nlib_is_error(e)) {
g_ShutdownClient = true;
g_ClientEnd->Close();
responseHandlerThread.Join();
return false;
}
// 1) CREATE
// Registers the entries in 'consoleData' onto the server by multiple threads.
nlib_printf("Posting data to the server\n");
Future<std::string> ft[numConsoleData];
for (int i = 0; i < numConsoleData; ++i) {
const ConsoleData& c = consoleData[i];
DateTime dt;
TimeSpan dummy;
e = DateTime::Parse(c.date, &dt, &dummy);
NLIB_ASSERT_NOERR(e);
NLIB_UNUSED(e);
e = Async(&ft[i], CreateMessage, dt, std::string(c.description));
if (nlib_is_error(e)) goto INTERNAL_ERROR;
}
#if 1
// Comments out this block, and 1) and 2) execute concurrently.
for (int i = 0; i < numConsoleData; ++i) {
ft[i].Get();
}
#endif
// 2) READ
// Lists up the registered data by querying the server.
if (!Listup()) {
nlib_printf("ERROR: ReadMessage failed\n");
}
// 3) UPDATE
// Updates the data on the server, by specifying URI.
uri5 = ft[5].Get();
if (!uri5.empty()) {
nlib_printf("Update '%s' to fix data\n", uri5.c_str());
DateTime dtUp1;
dtUp1.Init(1986, 2, 21);
Future<bool> result;
e = Async(&result, UpdateMessage, ft[5].Get(), dtUp1,
std::string("Disk System(15000 yen)"));
if (nlib_is_error(e)) goto INTERNAL_ERROR;
if (!result.Get()) {
nlib_printf("ERROR: UpdateMessage failed\n");
goto GENERAL_ERROR;
}
} else {
nlib_printf("ERROR: No URI for DiskSystem\n");
goto GENERAL_ERROR;
}
// 4) READ
// Lists up the registered data by querying the server.
if (!Listup()) {
nlib_printf("ERROR: ReadMessage failed\n");
}
// 5) DELETE
// Deletes an message from the server, by specifying URI.
// This program tries to remove it from two threads, and one of them fails.
uri1 = ft[1].Get();
if (!uri1.empty()) {
nlib_printf("Remove '%s'\n", uri1.c_str());
Future<bool> ftDel1, ftDel2;
e = Async(&ftDel1, DeleteMessage, uri1);
if (nlib_is_error(e)) goto INTERNAL_ERROR;
e = Async(&ftDel2, DeleteMessage, uri1);
if (nlib_is_error(e)) goto INTERNAL_ERROR;
// Only one of ftDel1 or ftDel2 succeeds.
if (ftDel1.Get()) {
if (ftDel2.Get()) goto INTERNAL_ERROR;
} else if (!ftDel2.Get()) {
goto INTERNAL_ERROR;
}
} else {
nlib_printf("ERROR: No URI for Pyu-ta\n");
goto GENERAL_ERROR;
}
// 6) READ
// Lists up the registered data by querying the server.
if (!Listup()) {
nlib_printf("ERROR: ReadMessage failed\n");
}
// Enables flags and make it possible to terminate the thread.
g_ShutdownClient = true;
g_ShutdownServer = true;
g_ClientEnd->Close();
responseHandlerThread.Join();
serverThread.Join();
return true;
GENERAL_ERROR:
g_ShutdownClient = true;
g_ShutdownServer = true;
g_ClientEnd->Close();
responseHandlerThread.Join();
serverThread.Join();
// Returns true because this error depends on the timing(not bug).
return true;
INTERNAL_ERROR:
g_ShutdownClient = true;
g_ShutdownServer = true;
g_ClientEnd->Close();
responseHandlerThread.Join();
serverThread.Join();
return false;
}
bool SampleMain(int, char**) {
bool result = JsonRpc();
g_ClientEnd.reset();
return result;
}
NLIB_MAINFUNC