nlib
msgpack/jsonrpc/jsonrpc.cpp

Client side JSON-RPC sample.

This sample writes the JSON-RPC client side and server side in separate threads, and communicates between the two using JSON-RPC. Communicates using sockets when the nlib socket library is supported.

JSON-RPC does not always need to be transmitted over HTTP or TCP, and the JSON-RPC 2.0 specification does not specify which protocol to use for transmission. In this sample, if sockets cannot be used through nlib (NLIB_NO_SOCKET), JSON-RPC requests and responses are exchanged in memory.

#include <string>
#include <map>
#include "jsonrpc.h" // NOLINT
#include "nn/nlib/Nlist.h"
#ifndef DONT_USE_SOCKET
#include "nn/nlib/TcpInputStream.h"
#include "nn/nlib/TcpOutputStream.h"
#endif
using nlib_ns::threading::Thread;
using nlib_ns::threading::Future;
using nlib_ns::msgpack::MpObject;
using nlib_ns::msgpack::jsonrpc::JsonRpcResponse;
using nlib_ns::msgpack::jsonrpc::JsonRpcClient;
using nlib_ns::msgpack::jsonrpc::JsonRpcRequest;
using nlib_ns::msgpack::jsonrpc::JsonRpcRequestWriter;
using nlib_ns::msgpack::jsonrpc::JsonRpcResponseHandler;
Tunnel g_c2s; // data queue from the client to the server
Tunnel g_s2c; // data queue from the server to the client
namespace {
volatile bool g_ShutdownClient = false;
#ifdef DONT_USE_SOCKET
class ClientEndByMemory {
public:
errno_t Init() { return 0; }
errno_t Connect() { return 0; }
g_c2s.Send(data, n);
return 0;
}
while (!g_s2c.Receive(data, n)) {
if (g_ShutdownClient) {
*n = 0;
return 0;
}
}
return 0;
}
bool IsOpen() { return true; }
errno_t Close() { return 0; }
};
#else
class ClientEndByTcp {
public:
ClientEndByTcp() : socket_(NLIB_SOCKET_INVALID) {}
errno_t Init() { return nlib_socket(&socket_, AF_INET, SOCK_STREAM | NLIB_SOCK_NONBLOCK, 0); }
errno_t Connect() {
nlib_sockaddr_in addr;
addr.sin_family = AF_INET;
addr.sin_port = nlib_htons(NLIB_SOCKPORT_SAMPLE);
addr.sin_addr.s_addr = nlib_htonl(INADDR_LOOPBACK);
e = nlib_connect_for(socket_, reinterpret_cast<nlib_sockaddr*>(&addr), sizeof(addr),
0, 500 * 10000);
if (nlib_is_error(e)) {
socket_ = NLIB_SOCKET_INVALID;
return e;
}
return stream_.Init(socket_);
}
using nlib_ns::threading::SimpleCriticalSection;
using nlib_ns::threading::ScopedLock;
ScopedLock<SimpleCriticalSection> l(send_lock_);
// transmit by netstring
nlib_ns::TcpOutputStream os;
os.Init(socket_);
{
writer.Init();
writer.Open(&os);
writer.WriteFormat("%" PRIuS ":", n);
}
os.Write(data->get(), n);
os.Write(',');
os.Flush();
if (!nlib_is_error(os))
return 0;
else
return os.GetErrorValue();
}
using nlib_ns::threading::SimpleCriticalSection;
using nlib_ns::threading::ScopedLock;
ScopedLock<SimpleCriticalSection> l(receive_lock_);
if (stream_.Peek() < 0) {
if (nlib_is_error(stream_)) return EIO;
*n = 0;
return 0;
}
int c;
size_t nbytes = 0;
for (;;) {
c = stream_.Read();
if (c == ':') break;
if (!nlib_isdigit(c)) return EILSEQ;
size_t newVal = nbytes * 10 + c - '0';
if (newVal < nbytes) return EILSEQ;
nbytes = newVal;
}
data->reset(reinterpret_cast<uint8_t*>(nlib_malloc(nbytes)));
if (!*data) return ENOMEM;
if (nbytes != stream_.Read(data->get(), nbytes)) return EILSEQ;
if (stream_.Read() != ',') return EILSEQ;
*n = nbytes;
return 0;
}
bool IsOpen() { return socket_ != -1; }
errno_t Close() { return nlib_shutdownsocket(socket_, SHUT_RDWR); }
private:
nlib_sock socket_;
nlib_ns::TcpInputStream stream_;
nlib_ns::threading::SimpleCriticalSection send_lock_;
nlib_ns::threading::SimpleCriticalSection receive_lock_;
};
#endif
#ifndef DONT_USE_SOCKET
typedef ClientEndByTcp ClientEnd;
#else
typedef ClientEndByMemory ClientEnd;
#endif
UniquePtr<ClientEnd> g_ClientEnd;
JsonRpcClient g_Client;
errno_t SendRequest(const char* method, uint32_t reqid, MpObject& params) { // NOLINT
Nlist<JsonRpcRequest> reqList;
JsonRpcRequest* req = reqList.push_back();
if (!req) return ENOMEM;
req->SetId(reqid); // Sets the request id.
if (!req->SetMethod(method)) { // Sets the method name.
return ENOMEM;
}
req->MoveParamsFrom(params); // Moves MpObject as parameters.
// NOTE:
// You can write the data in msgpack if the argument of the constructor is 'true'.
// The server responds in JSON if it receives JSON,
// it responds in msgpack if it receives msgpack.
JsonRpcRequestWriter w(false);
// You can write a single request or a batch request in the same manner.
e = w.BeginWriteRequest(static_cast<uint32_t>(reqList.size()));
if (nlib_is_error(e)) return e;
Nlist<JsonRpcRequest>::const_iterator it = reqList.begin();
Nlist<JsonRpcRequest>::const_iterator end = reqList.end();
for (; it != end; ++it) {
e = w.WriteRequest(*it);
if (nlib_is_error(e)) return e;
}
// You can obtain the written request on the memory.
size_t n;
ReallocOutputStream::UniquePtrType reqBytes;
e = w.EndWriteRequest(&reqBytes, &n);
if (nlib_is_error(e)) return e;
// Sends the serialized data to the server via socket or others.
return g_ClientEnd->Send(&reqBytes, n);
}
void ResponseHandlerThread() {
ReallocOutputStream::UniquePtrType data;
for (;;) {
size_t n = 0;
errno_t e = g_ClientEnd->Receive(&data, &n);
if (nlib_is_error(e)) {
if (g_ShutdownClient || !g_ClientEnd->IsOpen()) break;
nlib_printf("CLIENT ERROR: %d\n", e);
} else if (n == 0) {
break;
} else {
// Makes g_Client to resolve the response, and enables the corresponding future object.
e = g_Client.ResolveResponse(data.get(), n);
if (nlib_is_error(e)) {
nlib_printf("CLIENT ERROR: %d\n", e);
}
}
}
nlib_printf("ResponseHandlerThread() end\n");
}
} // namespace
namespace {
std::string CreateMessage(DateTime dt, std::string msg) {
// Sends date, time, and text to the server, receives an URI as a key for them.
MpObject obj;
char dtcstr[32];
{
// obj = { "datetime" : "...", "text": msg }
// NOTE: you can also desribe them without map or string.
std::map<std::string, std::string> params;
e = dt.ToW3cDtf(dtcstr);
NLIB_ASSERT_NOERR(e);
params["datetime"] = dtcstr;
params["text"] = msg;
obj.Box(params);
}
// Generates an unique request id.
NLIB_ASSERT(g_Client);
reqid_t reqid = g_Client.GenerateId();
// Registers the handler associated with the request id.
JsonRpcClient::FutureType future;
e = g_Client.GetFutureForId(&future, reqid);
if (nlib_is_error(e)) return std::string();
e = SendRequest("create", reqid, obj);
if (nlib_is_error(e)) {
// You have to abort the request of g_Client if SendRequest fails.
g_Client.Abort(reqid);
return std::string();
}
if (nlib_is_error(future.WaitFor(TimeSpan(0, 3)))) {
// You have to abort the request of g_Client if timeout occurs.
// Otherwise, g_Client may wait reqid forever.
g_Client.Abort(reqid);
return std::string();
}
JsonRpcResponse* response = future.Get();
std::string uri;
if (!response || nlib_is_error(*response)) {
} else {
e = response->GetMpObject().Unbox(&uri);
NLIB_ASSERT_NOERR(e);
nlib_printf("===create('%s', '%s'): success. uri='%s'\n", dtcstr, msg.c_str(), uri.c_str());
}
return uri;
}
bool ReadMessage(MpObject* obj) {
NLIB_ASSERT(g_Client);
reqid_t reqid = g_Client.GenerateId();
JsonRpcClient::FutureType future;
e = g_Client.GetFutureForId(&future, reqid);
if (nlib_is_error(e)) return false;
MpObject dummy;
e = SendRequest("read", reqid, dummy);
if (nlib_is_error(e)) {
// You have to abort the request of g_Client if SendRequest fails.
g_Client.Abort(reqid);
return false;
}
e = future.WaitFor(TimeSpan(0, 3));
if (nlib_is_error(e)) {
// You have to abort the request of g_Client if timeout occurs.
// Otherwise, g_Client may wait reqid forever.
g_Client.Abort(reqid);
nlib_printf("===read(): timeout\n");
return false;
}
JsonRpcResponse* response = future.Get();
if (!response || nlib_is_error(*response)) {
nlib_printf("===read(): error '%s'\n",
response ? response->GetErrorMessage() : "response null");
return false;
} else {
MpObject& o = response->GetMpObject();
NLIB_ASSERT(o.IsArray());
uint32_t n = o.GetSize();
nlib_printf("===read(): success messages = %" PRIu32 "\n", n);
for (uint32_t i = 0; i < n; ++i) {
MpObject* item = o.GetArrayItem(i);
std::map<std::string, std::string> params;
e = item->Unbox(&params);
NLIB_ASSERT_NOERR(e);
nlib_printf("#%" PRIu32 ": uri='%s', datetime='%s', text='%s'\n", i,
params["uri"].c_str(), params["datetime"].c_str(), params["text"].c_str());
}
nlib_printf("\n");
if (obj) o.swap(*obj);
return true;
}
}
bool UpdateMessage(std::string uri, DateTime dt, std::string msg) {
// Sends URI, date, time, and text to the server,
// and updates the data associated with URI on the server.
MpObject obj;
char dtcstr[32];
{
std::map<std::string, std::string> params;
e = dt.ToW3cDtf(dtcstr);
NLIB_ASSERT_NOERR(e);
params["datetime"] = dtcstr;
params["text"] = msg;
params["uri"] = uri;
obj.Box(params);
}
NLIB_ASSERT(g_Client);
reqid_t reqid = g_Client.GenerateId();
JsonRpcClient::FutureType future;
e = g_Client.GetFutureForId(&future, reqid);
if (nlib_is_error(e)) return false;
e = SendRequest("update", reqid, obj);
if (nlib_is_error(e)) {
// You have to abort the request of g_Client if SendRequest fails.
g_Client.Abort(reqid);
return false;
}
if (nlib_is_error(future.WaitFor(TimeSpan(0, 3)))) {
// You have to abort the request of g_Client if timeout occurs.
// Otherwise, g_Client may wait reqid forever.
g_Client.Abort(reqid);
return false;
}
JsonRpcResponse* response = future.Get();
if (!response || nlib_is_error(*response)) {
nlib_printf("===update('%s', '%s', '%s'): error '%s'\n", uri.c_str(), dtcstr, msg.c_str(),
response ? response->GetErrorMessage() : "response null");
return false;
} else {
nlib_printf("===update('%s', '%s', '%s'): success\n", uri.c_str(), dtcstr, msg.c_str());
return true;
}
}
bool DeleteMessage(std::string uri) {
// Sends URI to the server, and deletes the data associated with URI on the server.
MpObject obj;
{
std::map<std::string, std::string> params;
params["uri"] = uri;
obj.Box(params);
}
NLIB_ASSERT(g_Client);
reqid_t reqid = g_Client.GenerateId();
JsonRpcClient::FutureType future;
errno_t e = g_Client.GetFutureForId(&future, reqid);
if (nlib_is_error(e)) return false;
e = SendRequest("delete", reqid, obj);
if (nlib_is_error(e)) {
// You have to abort the request of g_Client if SendRequest fails.
g_Client.Abort(reqid);
return false;
}
if (nlib_is_error(future.WaitFor(TimeSpan(0, 3)))) {
// You have to abort the request of g_Client if timeout occurs.
// Otherwise, g_Client may wait reqid forever.
g_Client.Abort(reqid);
return false;
}
JsonRpcResponse* response = future.Get();
if (!response || nlib_is_error(*response)) {
nlib_printf("===delete('%s'): error '%s'\n", uri.c_str(),
response ? response->GetErrorMessage() : "response null");
return false;
} else {
nlib_printf("===delete('%s'): success\n", uri.c_str());
return true;
}
}
} // namespace
const int numConsoleData = 6;
struct ConsoleData {
const char* description;
const char* date;
} consoleData[numConsoleData] = {
{"Cassette Vision(13500 yen)", "1981-07-30"},
{"Pyu-ta(59800 yen)", "1982-08-20"},
{"Faminly Computer(14800 yen)", "1983-07-15"},
{"SG-3000(29800 yen)", "1983-07-15"},
{"Mark III(15000 yen)", "1985-10-20"},
{"Disk System", "1976-02-21"} // the date is incorrect
};
bool Listup() {
Future<bool> future;
MpObject* ptr = NULL;
Async(&future, ReadMessage, ptr);
return future.Get();
}
bool JsonRpc() {
// THREADS:
// main thread: this thread(jsonrpc.cpp)
// serverThread: Receives requests, and make responses respectively(server.cpp)
// responseHandlerThread: Analyzes responses and enalbes
// the corresponding future objects.(jsonrpc.cpp)
// misc: Makes a request and receives a response(jsonrpc.cpp)
e = InitServer();
if (nlib_is_error(e)) return false;
std::string uri1, uri5;
g_ClientEnd.reset(new ClientEnd());
if (!g_ClientEnd) return false;
e = g_ClientEnd->Init();
if (nlib_is_error(e)) return false;
e = g_ClientEnd->Connect();
if (nlib_is_error(e)) return false;
// START Client(response handler) thread
NLIB_ASSERT(!g_Client);
g_Client = JsonRpcClient::OpenClient("myclient");
if (!g_Client) return false;
Thread responseHandlerThread;
e = responseHandlerThread.Start(ResponseHandlerThread);
if (nlib_is_error(e)) {
g_ShutdownClient = true;
g_ClientEnd->Close();
return false;
}
// START Server thread
Thread serverThread;
e = serverThread.Start(ServerThread);
if (nlib_is_error(e)) {
g_ShutdownClient = true;
g_ClientEnd->Close();
responseHandlerThread.Join();
return false;
}
// 1) CREATE
// Registers the entries in 'consoleData' onto the server by multiple threads.
nlib_printf("Posting data to the server\n");
Future<std::string> ft[numConsoleData];
for (int i = 0; i < numConsoleData; ++i) {
const ConsoleData& c = consoleData[i];
DateTime dt;
TimeSpan dummy;
e = DateTime::Parse(c.date, &dt, &dummy);
NLIB_ASSERT_NOERR(e);
NLIB_UNUSED(e);
e = Async(&ft[i], CreateMessage, dt, std::string(c.description));
if (nlib_is_error(e)) goto INTERNAL_ERROR;
}
#if 1
// Comments out this block, and 1) and 2) execute concurrently.
for (int i = 0; i < numConsoleData; ++i) {
ft[i].Get();
}
#endif
// 2) READ
// Lists up the registered data by querying the server.
if (!Listup()) {
nlib_printf("ERROR: ReadMessage failed\n");
}
// 3) UPDATE
// Updates the data on the server, by specifying URI.
uri5 = ft[5].Get();
if (!uri5.empty()) {
nlib_printf("Update '%s' to fix data\n", uri5.c_str());
DateTime dtUp1;
dtUp1.Init(1986, 2, 21);
Future<bool> result;
e = Async(&result, UpdateMessage, ft[5].Get(), dtUp1,
std::string("Disk System(15000 yen)"));
if (nlib_is_error(e)) goto INTERNAL_ERROR;
if (!result.Get()) {
nlib_printf("ERROR: UpdateMessage failed\n");
goto GENERAL_ERROR;
}
} else {
nlib_printf("ERROR: No URI for DiskSystem\n");
goto GENERAL_ERROR;
}
// 4) READ
// Lists up the registered data by querying the server.
if (!Listup()) {
nlib_printf("ERROR: ReadMessage failed\n");
}
// 5) DELETE
// Deletes an message from the server, by specifying URI.
// This program tries to remove it from two threads, and one of them fails.
uri1 = ft[1].Get();
if (!uri1.empty()) {
nlib_printf("Remove '%s'\n", uri1.c_str());
Future<bool> ftDel1, ftDel2;
e = Async(&ftDel1, DeleteMessage, uri1);
if (nlib_is_error(e)) goto INTERNAL_ERROR;
e = Async(&ftDel2, DeleteMessage, uri1);
if (nlib_is_error(e)) goto INTERNAL_ERROR;
// Only one of ftDel1 or ftDel2 succeeds.
if (ftDel1.Get()) {
if (ftDel2.Get()) goto INTERNAL_ERROR;
} else if (!ftDel2.Get()) {
goto INTERNAL_ERROR;
}
} else {
nlib_printf("ERROR: No URI for Pyu-ta\n");
goto GENERAL_ERROR;
}
// 6) READ
// Lists up the registered data by querying the server.
if (!Listup()) {
nlib_printf("ERROR: ReadMessage failed\n");
}
// Enables flags and make it possible to terminate the thread.
g_ShutdownClient = true;
g_ShutdownServer = true;
g_ClientEnd->Close();
responseHandlerThread.Join();
serverThread.Join();
return true;
GENERAL_ERROR:
g_ShutdownClient = true;
g_ShutdownServer = true;
g_ClientEnd->Close();
responseHandlerThread.Join();
serverThread.Join();
// Returns true because this error depends on the timing(not bug).
return true;
INTERNAL_ERROR:
g_ShutdownClient = true;
g_ShutdownServer = true;
g_ClientEnd->Close();
responseHandlerThread.Join();
serverThread.Join();
return false;
}
bool SampleMain(int, char**) {
bool result = JsonRpc();
g_ClientEnd.reset();
return result;
}
NLIB_MAINFUNC