16 #include <nn/pia/transport/transport_Definitions.h>
18 #include <nn/pia/transport/transport_Protocol.h>
19 #include <nn/pia/common/common_Time.h>
28 class ProtocolMessageWriter;
29 class ProtocolMessageReader;
50 PIA_PROTOCOL_TYPE_INFO(ProtocolTypeReliableBroadcast);
115 virtual void CleanupWithStationAddress();
125 virtual Result Dispatch();
140 virtual Result UpdateProtocolEvent(
const transport::ProtocolEvent& event);
150 Mode_Invalid = 0x00000000,
151 Mode_Wait = 0x01000000,
152 Mode_Send = 0x02000000,
153 Mode_Receive = 0x03000000
164 Condition_Wait = 0x00010000,
165 Condition_InProgress = 0x00020000,
166 Condition_Success = 0x00030000,
167 Condition_Failure = 0x00040000
179 return static_cast<Mode
>(m_State & 0xFF000000);
189 Condition GetCondition()
const
191 return static_cast<Condition
>(m_State & 0x00FF0000);
236 return static_cast<State>(m_State & 0xFFFF0000);
250 return (GetCondition() == Condition_InProgress);
287 operator uint16_t()
const
302 return IsFlagOn(Flag_SendToNewcomer);
313 return IsFlagOn(Flag_SyncronizeCompletion);
330 return IsFlagOn(Flag_SendEndless);
341 return IsFlagOn(Flag_FailIfRejected);
360 return IsFlagOn(Flag_MediateRequest);
374 SetFlag(Flag_SendToNewcomer, isOn);
387 SetFlag(Flag_SyncronizeCompletion, isOn);
400 SetFlag(Flag_SendEndless, isOn);
413 SetFlag(Flag_FailIfRejected, isOn);
426 SetFlag(Flag_MediateRequest, isOn);
489 Flag_SendToNewcomer = 1 << 0,
490 Flag_SyncronizeCompletion = 1 << 1,
491 Flag_SendEndless = 1 << 2,
492 Flag_FailIfRejected = 1 << 3,
493 Flag_MediateRequest = 1 << 4
496 bool IsFlagOn(Flag flag)
const
498 return ((m_Value & flag) == flag);
500 void SetFlag(Flag flag,
bool isOn)
817 return m_ThroughputLimit;
826 virtual void Trace(uint64_t flag)
const;
830 static const uint16_t InvalidMessageIdx = 0xffff;
836 uint32_t GetCount()
const;
837 bool IsOn(uint32_t idx)
const;
838 bool IsAllOff()
const;
839 bool IsComplete()
const
841 return m_FirstOffIdx >= m_UnitNum;
845 static const uint32_t FlagBlockNum = 4;
847 uint32_t m_FirstOffIdx;
848 uint32_t m_Flag[FlagBlockNum];
852 class ReceiverProgress :
public Progress
855 void Clear(uint32_t unitNum);
856 bool Raise(uint32_t idx);
857 void Get(uint32_t* pOffset, uint64_t* pFlag)
const;
861 class SenderProgress :
public Progress
864 void Clear(uint32_t unitNum);
865 bool Update(uint32_t offset, uint64_t flag);
866 bool IsReceivable(uint32_t offset)
const;
867 void UpdateMessageIdx(uint32_t unitIdx, uint16_t messageIdx);
868 uint32_t GetFirstOffIdx()
const
870 return m_FirstOffIdx;
872 uint32_t GetUnitNum()
const
876 uint16_t GetSendMessageIdx(uint32_t unitIdx)
const;
879 static const uint32_t DataSize = FlagBlockNum * 32;
881 uint16_t m_SendMessageIdx[DataSize];
888 MessageType_Invalid = 0x00,
889 MessageType_Request = 0x11,
890 MessageType_Data = 0x12,
891 MessageType_Cancel = 0x18,
892 MessageType_Complete = 0x19,
893 MessageType_DataAck = 0x21,
894 MessageType_CommandAck = 0x28,
895 MessageType_Reject = 0x29
900 class MessageAccessor
904 : m_Type(MessageType_Invalid)
907 bool IsSenderMessage()
const
909 return (m_Type & 0xf0) == 0x10;
911 bool IsReceiverMessage()
const
913 return (m_Type & 0xf0) == 0x20;
915 bool IsDataMessage()
const
917 return m_Type == MessageType_Data;
920 uint32_t GetMessageSize()
const;
921 static bool Unpack(MessageAccessor* pAccessor,
const ProtocolMessageReader& reader, uint32_t* pMessageIdx);
922 static bool Pack(ProtocolMessageWriter* pWriter,
const MessageAccessor& accessor, uint32_t messageIdx);
928 uint32_t m_TotalSize;
931 Configuration m_Configuration;
934 const void* m_cpPayload;
935 uint16_t m_PayloadSize;
937 uint32_t m_AckOffset;
943 struct InnerTransferSetting :
public TransferSetting
948 return (m_DataSize > 0);
950 bool IsPriorTo(
const TransferSetting& setting)
const;
964 SenderState_Rejected,
971 ReceiverState_Nothing,
972 ReceiverState_Receiving,
973 ReceiverState_Rejecting,
974 ReceiverState_Finished,
975 ReceiverState_Rejected
983 void Initialize(ReliableBroadcastProtocol* pProtocol);
988 void HandleMessage(
const MessageAccessor& accessor);
991 void ForceReserveReject()
993 m_ReservedAckFlag |= ReservedAckFlag_Reject;
998 void ClearReceiverState()
1000 m_ReceiverState = ReceiverState_Nothing;
1002 void StartReceive(uint16_t receiveId, uint32_t unitNum);
1004 bool IsConnected()
const
1010 return MyStationIndex();
1012 SenderState GetSenderState()
const
1014 return m_SenderState;
1016 ReceiverState GetReceiverState()
const
1018 return m_ReceiverState;
1020 const InnerTransferSetting&
GetRequest()
const
1029 uint16_t GetReturnedMessageIdx()
const
1031 return m_ReturnedMessageIdx;
1033 uint16_t GetReceivedMessageIdx()
const
1035 return m_ReceivedMessageIdx;
1038 void HandleMessageIdx(uint32_t messageIdx);
1040 uint32_t GetNextSendUnitIndex(
bool isOnlyPrior, uint32_t dispatchCount)
const;
1042 bool IsNeedSendRequest()
const
1044 return (GetReceiverState() == ReceiverState_Receiving) && m_IsNeedSendRequest;
1047 void RecordSendData(uint32_t dispatchCount, uint16_t unitIdx, uint16_t messageIdx);
1050 void UpdateRequest(
const MessageAccessor& accessor);
1051 bool CheckRequest(
const MessageAccessor& accessor)
const;
1055 return m_Request.m_SourceStationIndex;
1059 return m_Request.m_SourceStationIndex;
1061 uint16_t& SenderSequenceId()
1063 return m_Request.m_Id;
1065 uint16_t SenderSequenceId()
const
1067 return m_Request.m_Id;
1071 ReliableBroadcastProtocol* m_pProtocol;
1073 InnerTransferSetting m_Request;
1074 SenderState m_SenderState;
1076 enum ReservedAckFlag
1078 ReservedAckFlag_None = 0x0,
1079 ReservedAckFlag_Request = 0x1,
1080 ReservedAckFlag_Data = 0x2,
1081 ReservedAckFlag_Reject = 0x4,
1082 ReservedAckFlag_Command = 0x8
1084 uint32_t m_ReservedAckFlag;
1086 SenderProgress m_Progress;
1087 uint16_t m_ReceivingId;
1088 ReceiverState m_ReceiverState;
1090 uint16_t m_ReturnedMessageIdx;
1091 uint16_t m_ReceivedMessageIdx;
1093 bool m_IsNeedSendRequest;
1095 uint32_t m_LastSentDataDispatchCount;
1101 return (stationIndex >=
StationIndex_1) && (stationIndex < static_cast<StationIndex>(m_MaxConnections)) && (stationIndex != m_LocalStationIndex);
1107 PIA_ASSERT(IsValidStation(stationIndex));
1108 return &m_paStation[stationIndex < m_LocalStationIndex ? stationIndex : stationIndex - 1];
1112 const Station* GetStation(
StationIndex stationIndex)
const
1114 PIA_ASSERT(IsValidStation(stationIndex));
1115 return &m_paStation[stationIndex < m_LocalStationIndex ? stationIndex : stationIndex - 1];
1118 static uint32_t CalcProgress(
const InnerTransferSetting& transferSetting,
const Progress& progress);
1121 bool DispatchSendAck();
1122 bool DispatchSendCommand();
1123 bool DispatchSendData();
1124 void DispatchOnReceiveCanceling();
1126 void PushRequest(
const MessageAccessor& accessor,
const Station& sender);
1127 bool PushData(
const MessageAccessor& accessor,
const Station& sender);
1128 void PushComplete(
const MessageAccessor& accessor,
const Station& sender);
1129 void PushCancel(
const MessageAccessor& accessor,
const Station& sender);
1131 bool CreateMessage(
const MessageAccessor& accessor, Station* pDestinationStation);
1132 uint32_t IssueMessageIdx(
const Station* cpDestStation);
1134 static bool IsSameSetting(
const InnerTransferSetting& setting,
const MessageAccessor& accessor);
1137 uint32_t m_MaxConnections;
1142 InnerTransferSetting m_TransferSetting;
1144 void* m_pReceiveBuffer;
1145 ReceiverProgress m_ReceiveProgress;
1147 const void* m_cpSendBuffer;
1148 uint16_t m_NextSendId;
1149 uint32_t m_NextSendCommandStationIndex;
1150 uint32_t m_NextSendDataStationIndex;
1151 bool m_IsFinishSendEndless;
1153 uint32_t m_NextSendAckStationIndex;
1155 uint16_t m_SentMessageIdx;
1157 uint32_t m_ThroughputLimit;
1158 uint32_t m_RestThroughputLimit;
1160 uint32_t m_DispatchCount;
1162 Station* m_paStation;
1164 common::Time m_StartSendTime;
1166 static const uint32_t cDefaultPacketNumPerDispatch;