nlib
msgpack/jsonrpc/jsonrpc.cpp

JSON-RPCサンプルのクライアント側です。

JSON-RPCのクライアント側とサーバー側を別々のスレッドに記述し、JSON-RPCで通信を行うサンプルです。 nlibのソケットライブラリがサポートしている場合はソケットで通信を行います。

なお、JSON-RPCは必ずしもhttpやTCP上で通信を行う必要はなく、JSON-RPC 2.0ではどのプロトコル上で通信を行うべきかについては記述されていません。 このサンプルではsocketnlib経由で使うことができない場合(NLIB_NO_SOCKET)、メモリ上でJSON-RPCのリクエストとレスポンスをやりとりして動作しています。

/*--------------------------------------------------------------------------------*
Project: CrossRoad
Copyright (C)Nintendo All rights reserved.
These coded instructions, statements, and computer programs contain proprietary
information of Nintendo and/or its licensed developers and are protected by
national and international copyright laws. They may not be disclosed to third
parties or copied or duplicated in any form, in whole or in part, without the
prior written consent of Nintendo.
The content herein is highly confidential and should be handled accordingly.
*--------------------------------------------------------------------------------*/
#include <string>
#include <map>
#include "jsonrpc.h" // NOLINT
#include "nn/nlib/Nlist.h"
#ifndef DONT_USE_SOCKET
#include "nn/nlib/TcpInputStream.h"
#include "nn/nlib/TcpOutputStream.h"
#endif
using nlib_ns::threading::Thread;
using nlib_ns::threading::Future;
using nlib_ns::msgpack::MpObject;
using nlib_ns::msgpack::jsonrpc::JsonRpcResponse;
using nlib_ns::msgpack::jsonrpc::JsonRpcClient;
using nlib_ns::msgpack::jsonrpc::JsonRpcRequest;
using nlib_ns::msgpack::jsonrpc::JsonRpcRequestWriter;
using nlib_ns::msgpack::jsonrpc::JsonRpcResponseHandler;
Tunnel g_c2s; // data queue from the client to the server
Tunnel g_s2c; // data queue from the server to the client
namespace {
volatile bool g_shutdown_client = false;
#ifdef DONT_USE_SOCKET
class ClientEndByMemory {
public:
errno_t Init() { return 0; }
errno_t Connect() { return 0; }
g_c2s.Send(data, n);
return 0;
}
while (!g_s2c.Receive(data, n)) {
if (g_shutdown_client) {
*n = 0;
return 0;
}
}
return 0;
}
bool IsOpen() { return true; }
errno_t Close() { return 0; }
};
#else
class ClientEndByTcp {
public:
ClientEndByTcp() : socket_(NLIB_SOCKET_INVALID) {}
errno_t Init() { return nlib_socket(&socket_, AF_INET, SOCK_STREAM | NLIB_SOCK_NONBLOCK, 0); }
errno_t Connect() {
nlib_sockaddr_in addr;
addr.sin_family = AF_INET;
addr.sin_port = nlib_htons(NLIB_SOCKPORT_SAMPLE);
addr.sin_addr.s_addr = nlib_htonl(INADDR_LOOPBACK);
e = nlib_connect_for(socket_, reinterpret_cast<nlib_sockaddr*>(&addr), sizeof(addr),
0, 500 * 10000);
if (nlib_is_error(e)) {
socket_ = NLIB_SOCKET_INVALID;
return e;
}
return stream_.Init(socket_);
}
using nlib_ns::threading::SimpleCriticalSection;
using nlib_ns::threading::ScopedLock;
ScopedLock<SimpleCriticalSection> l(send_lock_);
// transmit by netstring
nlib_ns::TcpOutputStream os;
os.Init(socket_);
{
writer.Init();
writer.Open(&os);
writer.WriteFormat("%" PRIuS ":", n);
}
os.Write(data->get(), n);
os.Write(',');
os.Flush();
if (!nlib_is_error(os))
return 0;
else
return os.GetErrorValue();
}
using nlib_ns::threading::SimpleCriticalSection;
using nlib_ns::threading::ScopedLock;
ScopedLock<SimpleCriticalSection> l(receive_lock_);
if (stream_.Peek() < 0) {
if (nlib_is_error(stream_)) return EIO;
*n = 0;
return 0;
}
int c;
size_t nbytes = 0;
for (;;) {
c = stream_.Read();
if (c == ':') break;
if (!nlib_isdigit(c)) return EILSEQ;
size_t new_val = nbytes * 10 + c - '0';
if (new_val < nbytes) return EILSEQ;
nbytes = new_val;
}
data->reset(reinterpret_cast<uint8_t*>(nlib_malloc(nbytes)));
if (!*data) return ENOMEM;
if (nbytes != stream_.Read(data->get(), nbytes)) return EILSEQ;
if (stream_.Read() != ',') return EILSEQ;
*n = nbytes;
return 0;
}
bool IsOpen() { return socket_ != -1; }
errno_t Close() { return nlib_shutdownsocket(socket_, SHUT_RDWR); }
private:
nlib_sock socket_;
nlib_ns::TcpInputStream stream_;
nlib_ns::threading::SimpleCriticalSection send_lock_;
nlib_ns::threading::SimpleCriticalSection receive_lock_;
};
#endif
#ifndef DONT_USE_SOCKET
typedef ClientEndByTcp ClientEnd;
#else
typedef ClientEndByMemory ClientEnd;
#endif
UniquePtr<ClientEnd> g_client_end;
JsonRpcClient g_client;
errno_t SendRequest(const char* method, uint32_t reqid, MpObject& params) { // NOLINT
Nlist<JsonRpcRequest> req_list;
JsonRpcRequest* req = req_list.push_back();
if (!req) return ENOMEM;
req->SetId(reqid); // Sets the request id.
if (!req->SetMethod(method)) { // Sets the method name.
return ENOMEM;
}
req->MoveParamsFrom(params); // Moves MpObject as parameters.
// NOTE:
// You can write the data in msgpack if the argument of the constructor is 'true'.
// The server responds in JSON if it receives JSON,
// it responds in msgpack if it receives msgpack.
JsonRpcRequestWriter w(false);
// You can write a single request or a batch request in the same manner.
e = w.BeginWriteRequest(static_cast<uint32_t>(req_list.size()));
if (nlib_is_error(e)) return e;
Nlist<JsonRpcRequest>::const_iterator it = req_list.begin();
Nlist<JsonRpcRequest>::const_iterator end = req_list.end();
for (; it != end; ++it) {
e = w.WriteRequest(*it);
if (nlib_is_error(e)) return e;
}
// You can obtain the written request on the memory.
size_t n;
ReallocOutputStream::UniquePtrType req_bytes;
e = w.EndWriteRequest(&req_bytes, &n);
if (nlib_is_error(e)) return e;
// Sends the serialized data to the server via socket or others.
return g_client_end->Send(&req_bytes, n);
}
void ResponseHandlerThread() {
ReallocOutputStream::UniquePtrType data;
for (;;) {
size_t n = 0;
errno_t e = g_client_end->Receive(&data, &n);
if (nlib_is_error(e)) {
if (g_shutdown_client || !g_client_end->IsOpen()) break;
nlib_printf("CLIENT ERROR: %d\n", e);
} else if (n == 0) {
break;
} else {
// Makes g_client to resolve the response, and enables the corresponding future object.
e = g_client.ResolveResponse(data.get(), n);
if (nlib_is_error(e)) {
nlib_printf("CLIENT ERROR: %d\n", e);
}
}
}
nlib_printf("ResponseHandlerThread() end\n");
}
} // namespace
namespace {
std::string CreateMessage(DateTime dt, std::string msg) {
// Sends date, time, and text to the server, receives an URI as a key for them.
MpObject obj;
char dtcstr[32];
{
// obj = { "datetime" : "...", "text": msg }
// NOTE: you can also desribe them without map or string.
std::map<std::string, std::string> params;
e = dt.ToW3cDtf(dtcstr);
NLIB_ASSERT_NOERR(e);
params["datetime"] = dtcstr;
params["text"] = msg;
obj.Box(params);
}
// Generates an unique request id.
NLIB_ASSERT(g_client);
reqid_t reqid = g_client.GenerateId();
// Registers the handler associated with the request id.
JsonRpcClient::FutureType future;
e = g_client.GetFutureForId(&future, reqid);
if (nlib_is_error(e)) return std::string();
e = SendRequest("create", reqid, obj);
if (nlib_is_error(e)) {
// You have to abort the request of g_client if SendRequest fails.
g_client.Abort(reqid);
return std::string();
}
if (nlib_is_error(future.WaitFor(TimeSpan(0, 3)))) {
// You have to abort the request of g_client if timeout occurs.
// Otherwise, g_client may wait reqid forever.
g_client.Abort(reqid);
return std::string();
}
JsonRpcResponse* response = future.Get();
std::string uri;
if (!response || nlib_is_error(*response)) {
} else {
e = response->GetMpObject().Unbox(&uri);
NLIB_ASSERT_NOERR(e);
nlib_printf("===create('%s', '%s'): success. uri='%s'\n", dtcstr, msg.c_str(), uri.c_str());
}
return uri;
}
bool ReadMessage(MpObject* obj) {
NLIB_ASSERT(g_client);
reqid_t reqid = g_client.GenerateId();
JsonRpcClient::FutureType future;
e = g_client.GetFutureForId(&future, reqid);
if (nlib_is_error(e)) return false;
MpObject dummy;
e = SendRequest("read", reqid, dummy);
if (nlib_is_error(e)) {
// You have to abort the request of g_client if SendRequest fails.
g_client.Abort(reqid);
return false;
}
e = future.WaitFor(TimeSpan(0, 3));
if (nlib_is_error(e)) {
// You have to abort the request of g_client if timeout occurs.
// Otherwise, g_client may wait reqid forever.
g_client.Abort(reqid);
nlib_printf("===read(): timeout\n");
return false;
}
JsonRpcResponse* response = future.Get();
if (!response || nlib_is_error(*response)) {
nlib_printf("===read(): error '%s'\n",
response ? response->GetErrorMessage() : "response null");
return false;
} else {
MpObject& o = response->GetMpObject();
NLIB_ASSERT(o.IsArray());
uint32_t n = o.GetSize();
nlib_printf("===read(): success messages = %" PRIu32 "\n", n);
for (uint32_t i = 0; i < n; ++i) {
MpObject* item = o.GetArrayItem(i);
std::map<std::string, std::string> params;
e = item->Unbox(&params);
NLIB_ASSERT_NOERR(e);
nlib_printf("#%" PRIu32 ": uri='%s', datetime='%s', text='%s'\n", i,
params["uri"].c_str(), params["datetime"].c_str(), params["text"].c_str());
}
nlib_printf("\n");
if (obj) o.swap(*obj);
return true;
}
}
bool UpdateMessage(std::string uri, DateTime dt, std::string msg) {
// Sends URI, date, time, and text to the server,
// and updates the data associated with URI on the server.
MpObject obj;
char dtcstr[32];
{
std::map<std::string, std::string> params;
e = dt.ToW3cDtf(dtcstr);
NLIB_ASSERT_NOERR(e);
params["datetime"] = dtcstr;
params["text"] = msg;
params["uri"] = uri;
obj.Box(params);
}
NLIB_ASSERT(g_client);
reqid_t reqid = g_client.GenerateId();
JsonRpcClient::FutureType future;
e = g_client.GetFutureForId(&future, reqid);
if (nlib_is_error(e)) return false;
e = SendRequest("update", reqid, obj);
if (nlib_is_error(e)) {
// You have to abort the request of g_client if SendRequest fails.
g_client.Abort(reqid);
return false;
}
if (nlib_is_error(future.WaitFor(TimeSpan(0, 3)))) {
// You have to abort the request of g_client if timeout occurs.
// Otherwise, g_client may wait reqid forever.
g_client.Abort(reqid);
return false;
}
JsonRpcResponse* response = future.Get();
if (!response || nlib_is_error(*response)) {
nlib_printf("===update('%s', '%s', '%s'): error '%s'\n", uri.c_str(), dtcstr, msg.c_str(),
response ? response->GetErrorMessage() : "response null");
return false;
} else {
nlib_printf("===update('%s', '%s', '%s'): success\n", uri.c_str(), dtcstr, msg.c_str());
return true;
}
}
bool DeleteMessage(std::string uri) {
// Sends URI to the server, and deletes the data associated with URI on the server.
MpObject obj;
{
std::map<std::string, std::string> params;
params["uri"] = uri;
obj.Box(params);
}
NLIB_ASSERT(g_client);
reqid_t reqid = g_client.GenerateId();
JsonRpcClient::FutureType future;
errno_t e = g_client.GetFutureForId(&future, reqid);
if (nlib_is_error(e)) return false;
e = SendRequest("delete", reqid, obj);
if (nlib_is_error(e)) {
// You have to abort the request of g_client if SendRequest fails.
g_client.Abort(reqid);
return false;
}
if (nlib_is_error(future.WaitFor(TimeSpan(0, 3)))) {
// You have to abort the request of g_client if timeout occurs.
// Otherwise, g_client may wait reqid forever.
g_client.Abort(reqid);
return false;
}
JsonRpcResponse* response = future.Get();
if (!response || nlib_is_error(*response)) {
nlib_printf("===delete('%s'): error '%s'\n", uri.c_str(),
response ? response->GetErrorMessage() : "response null");
return false;
} else {
nlib_printf("===delete('%s'): success\n", uri.c_str());
return true;
}
}
} // namespace
const int kNumConsoleData = 6;
struct ConsoleData {
const char* description;
const char* date;
} consoleData[kNumConsoleData] = {
{"Cassette Vision(13500 yen)", "1981-07-30"},
{"Pyu-ta(59800 yen)", "1982-08-20"},
{"Faminly Computer(14800 yen)", "1983-07-15"},
{"SG-3000(29800 yen)", "1983-07-15"},
{"Mark III(15000 yen)", "1985-10-20"},
{"Disk System", "1976-02-21"} // the date is incorrect
};
bool Listup() {
Future<bool> future;
MpObject* ptr = NULL;
Async(&future, ReadMessage, ptr);
return future.Get();
}
bool JsonRpc() {
// THREADS:
// main thread: this thread(jsonrpc.cpp)
// server_thread: Receives requests, and make responses respectively(server.cpp)
// response_handler_thread: Analyzes responses and enalbes
// the corresponding future objects.(jsonrpc.cpp)
// misc: Makes a request and receives a response(jsonrpc.cpp)
e = InitServer();
if (nlib_is_error(e)) return false;
std::string uri1, uri5;
g_client_end.reset(new ClientEnd());
if (!g_client_end) return false;
e = g_client_end->Init();
if (nlib_is_error(e)) return false;
e = g_client_end->Connect();
if (nlib_is_error(e)) return false;
// START Client(response handler) thread
NLIB_ASSERT(!g_client);
g_client = JsonRpcClient::OpenClient("myclient");
if (!g_client) return false;
Thread response_handler_thread;
e = response_handler_thread.Start(ResponseHandlerThread);
if (nlib_is_error(e)) {
g_shutdown_client = true;
g_client_end->Close();
return false;
}
// START Server thread
Thread server_thread;
e = server_thread.Start(ServerThread);
if (nlib_is_error(e)) {
g_shutdown_client = true;
g_client_end->Close();
response_handler_thread.Join();
return false;
}
// 1) CREATE
// Registers the entries in 'consoleData' onto the server by multiple threads.
nlib_printf("Posting data to the server\n");
Future<std::string> ft[kNumConsoleData];
for (int i = 0; i < kNumConsoleData; ++i) {
const ConsoleData& c = consoleData[i];
DateTime dt;
TimeSpan dummy;
e = DateTime::Parse(c.date, &dt, &dummy);
NLIB_ASSERT_NOERR(e);
NLIB_UNUSED(e);
e = Async(&ft[i], CreateMessage, dt, std::string(c.description));
if (nlib_is_error(e)) goto kInternalError;
}
#if 1
// Comments out this block, and 1) and 2) execute concurrently.
for (int i = 0; i < kNumConsoleData; ++i) {
ft[i].Get();
}
#endif
// 2) READ
// Lists up the registered data by querying the server.
if (!Listup()) {
nlib_printf("ERROR: ReadMessage failed\n");
}
// 3) UPDATE
// Updates the data on the server, by specifying URI.
uri5 = ft[5].Get();
if (!uri5.empty()) {
nlib_printf("Update '%s' to fix data\n", uri5.c_str());
DateTime dt_up1;
dt_up1.Init(1986, 2, 21);
Future<bool> result;
e = Async(&result, UpdateMessage, ft[5].Get(), dt_up1,
std::string("Disk System(15000 yen)"));
if (nlib_is_error(e)) goto kInternalError;
if (!result.Get()) {
nlib_printf("ERROR: UpdateMessage failed\n");
goto GENERAL_ERROR;
}
} else {
nlib_printf("ERROR: No URI for DiskSystem\n");
goto GENERAL_ERROR;
}
// 4) READ
// Lists up the registered data by querying the server.
if (!Listup()) {
nlib_printf("ERROR: ReadMessage failed\n");
}
// 5) DELETE
// Deletes an message from the server, by specifying URI.
// This program tries to remove it from two threads, and one of them fails.
uri1 = ft[1].Get();
if (!uri1.empty()) {
nlib_printf("Remove '%s'\n", uri1.c_str());
Future<bool> ft_del1, ft_del2;
e = Async(&ft_del1, DeleteMessage, uri1);
if (nlib_is_error(e)) goto kInternalError;
e = Async(&ft_del2, DeleteMessage, uri1);
if (nlib_is_error(e)) goto kInternalError;
// Only one of ft_del1 or ft_del2 succeeds.
if (ft_del1.Get()) {
if (ft_del2.Get()) goto kInternalError;
} else if (!ft_del2.Get()) {
goto kInternalError;
}
} else {
nlib_printf("ERROR: No URI for Pyu-ta\n");
goto GENERAL_ERROR;
}
// 6) READ
// Lists up the registered data by querying the server.
if (!Listup()) {
nlib_printf("ERROR: ReadMessage failed\n");
}
// Enables flags and make it possible to terminate the thread.
g_shutdown_client = true;
g_shutdown_server = true;
g_client_end->Close();
response_handler_thread.Join();
server_thread.Join();
return true;
GENERAL_ERROR:
g_shutdown_client = true;
g_shutdown_server = true;
g_client_end->Close();
response_handler_thread.Join();
server_thread.Join();
// Returns true because this error depends on the timing(not bug).
return true;
kInternalError:
g_shutdown_client = true;
g_shutdown_server = true;
g_client_end->Close();
response_handler_thread.Join();
server_thread.Join();
return false;
}
bool SampleMain(int, char**) {
bool result = JsonRpc();
g_client_end.reset();
return result;
}
NLIB_MAINFUNC