Skip to content

Commit

Permalink
TCP - finish plugin skeleton
Browse files Browse the repository at this point in the history
  • Loading branch information
BonnyAD9 committed Feb 15, 2024
1 parent 2b502a1 commit 8c7a848
Show file tree
Hide file tree
Showing 11 changed files with 374 additions and 128 deletions.
82 changes: 82 additions & 0 deletions src/plugins/input/tcp/src/Acceptor.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/**
* \file
* \author Jakub Antonín Štigler <[email protected]>
* \brief Acceptor thread for TCP clients (header file)
* \date 2024
*
* Copyright: (C) 2023 CESNET, z.s.p.o.
* SPDX-License-Identifier: BSD-3-Clause
*/

#pragma once

#include <thread> // std::thread
#include <vector> // std::vector
#include <ipfixcol2.h> // ipx_ctx_t

#include "ClientManager.hpp" // ClientManager
#include "DecoderFactory.hpp" // DecoderFactory
#include "Config.hpp"

namespace tcp_in
{

/**
* @brief Acceptor thread for TCP clients.
*/
class Acceptor {
public:
/**
* @brief Creates the acceptor thread.
*
* @param clients Reference to client manager.
* @param factory Initialized decoder factory.
* @param config File configuration.
* @param ctx The plugin context.
*/
Acceptor(ClientManager &clients, DecoderFactory factory, Config config, ipx_ctx_t *ctx);

/**
* @brief Starts the acceptor thread.
*/
void start();

/**
* @brief Stops the acceptor thread.
*/
void stop();

private:
/**
* @brief The function that runs on the thread.
*/
void mainloop();

/**
* @brief File descriptor of epoll for accepting connections.
*/
int m_epoll_fd;
/**
* @brief Sockets listened to by epoll.
*/
std::vector<int> m_sockets;

/**
* @brief Write to this to gracefully exit the thread.
*/
int m_pipe_in_fd;
/**
* @brief Epoll listens to this, when it activates the acceptor thread will gracefuly exit.
*/
int m_pipe_out_fd;

/**
* @brief Accepted clients.
*/
ClientManager &m_clients;
DecoderFactory m_factory;
std::thread m_thread;
ipx_ctx_t *ctx;
};

} // namespace tcp_in
7 changes: 7 additions & 0 deletions src/plugins/input/tcp/src/ByteVector.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,13 @@ class ByteVector {
*/
uint8_t *take();

/**
* @brief Empties this vector and returns the data of this vector as new vector.
*
* @return ByteVector new vector with the data.
*/
ByteVector move();

/**
* @brief Checks whether there is some data in the vector.
* @return true if there is no data, otherwise false.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/**
* \file
* \author Jakub Antonín Štigler <[email protected]>
* \brief Mutexed vector of tcp connections (header file)
* \brief Manages TCP connection (header file)
* \date 2024
*
* Copyright: (C) 2023 CESNET, z.s.p.o.
Expand All @@ -21,10 +21,11 @@
namespace tcp_in
{

class ConnectionVec {
/**
* @brief Manager for TCP connections
*/
class ClientManager {
public:
// TODO: methods for waiting for connection and getting the connections

/**
* @brief Adds connection to the vector and epoll.
* @param fd file descriptor of the new tcp connection.
Expand All @@ -36,7 +37,7 @@ class ConnectionVec {
* main thread (not the acceptor thread).
* @param session session of the connection to remove.
*/
void remove_connection(struct ipx_session *session);
void close_connection(struct ipx_session *session);
private:
/**
* @brief Locks the session for safe adding, removing is safe only for the main thread (not the
Expand Down
37 changes: 25 additions & 12 deletions src/plugins/input/tcp/src/Connection.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@

#include <ipfixcol2.h> // ipx_ctx_t

#include "Session.hpp" // Session
#include "ByteVector.hpp" // ByteVector
#include "Decoder.hpp" // Decoder
#include "ByteVector.hpp" // ByteVector
#include "Decoder.hpp" // Decoder
#include "DecoderFactory.hpp" // DecoderFactory

namespace tcp_in
{
Expand All @@ -29,28 +29,41 @@ class Connection {
public:
/**
* @brief Creates new connection with this TCP connection file descriptor.
* @param fd file descriptor of the new tcp connection.
* @param fd File descriptor of the new tcp connection.
* @param decoder Decoder to use in this connection.
*/
Connection(int fd);
Connection(int fd, std::unique_ptr<Decoder> decoder);

/**
* @brief Reads from the TCP session while there is data or unil a full message is readed.
* @brief Reads from the TCP session while there is data.
*
* @param ctx Ipx context (used to send the readed message)
* @returns true on success, false if some error occured and the connection should be closed.
*/
void recieve(ipx_ctx_t *ctx);
bool recieve(ipx_ctx_t *ctx);

/**
* @brief Closes this connection and removes it from the epoll. Allocated data is freed in
* destructor.
* @brief Closes this connection. Allocated data is freed in destructor.
*
* @param ctx
* @param epoll_fd Epoll in which this connection is.
*/
void close(ipx_ctx_t *ctx, int epoll_fd);
void close(ipx_ctx_t *ctx);

/**
* @brief Gets the file descriptor of the TCP connection
*
* @return int TCP file descriptor
*/
inline int get_fd() const {
return m_fd;
}

private:
Session session;
struct ipx_session *m_session;
/**
* @brief TCP file descriptor
*/
int m_fd;
/**
* @brief true if this connection didn't recieve any full messages, otherwise false.
*/
Expand Down
105 changes: 105 additions & 0 deletions src/plugins/input/tcp/src/DecodeBuffer.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/**
* \file
* \author Jakub Antonín Štigler <[email protected]>
* \brief Buffer for managing decoded IPFIX data (header file)
* \date 2024
*
* Copyright: (C) 2023 CESNET, z.s.p.o.
* SPDX-License-Identifier: BSD-3-Clause
*/

#pragma once

#include <vector> // std::vector
#include <cstdint> // uint8_t

#include "ByteVector.hpp" // ByteVector

namespace tcp_in {

/**
* @brief Buffer for collecting and reconstructing decoded IPFIX messages.
*/
class DecodeBuffer {
public:
/**
* @brief Creates empty decode buffer.
*/
DecodeBuffer() : m_close_requested(false), m_decoded(), m_part_decoded(), m_decoded_size(0) {};

/**
* @brief Gets decoded data. Shouldn't be used by decoders.
*/
inline std::vector<ByteVector> &get_decoded() {
return m_decoded;
}

/**
* @brief Adds new decoded IPFX message to the buffer. The passed buffer is emptied.
*
* @param data IPFIX message to add. This buffer will be emptied.
*/
inline void add(ByteVector &data) {
m_decoded.push_back(data.move());
}

/**
* @brief Copies IPFIX data from buffer.
*
* The data may be any part of message (possibly incomplete or even multiple messages) but
* multiple calls to this metod must be with the message data in correct order so that it can be
* reconstructed.
* @param data data with the message
* @param size size of the data in `data`
*/
void read_from(const uint8_t *data, size_t size);

/**
* @brief Copies IPFIX data from circullar buffer.
*
* The data may be any part of message (possibly incomplete or even multiple messages) but
* multiple calls to this metod must be with the message data in correct order so that it can be
* reconstructed.
* @param data data of the circullar buffer
* @param buffer_size size of the circullar buffer (allocated space)
* @param data_size size of data to copy from the buffer
* @param position start position of the data in the buffer.
*/
void read_from(const uint8_t *data, size_t buffer_size, size_t data_size, size_t position);

/**
* @brief Called by decoders to request closing the connection because there was invalid data.
*/
inline void request_close() {
m_close_requested = true;
}

/**
* @brief Checks whether closing the connection was requested.
* @return true if connection should be closed, otherwise false.
*/
inline bool is_close_requested() const {
return m_close_requested;
}

private:
/**
* @brief True if buffer/decoder encountered broken data.
*/
bool m_close_requested;

/**
* @brief Decoded data waiting to be sent.
*/
std::vector<ByteVector> m_decoded;
/**
* @brief Partially decoded data.
*/
ByteVector m_part_decoded;
/**
* @brief Expected length of fully decoded data.
*/
size_t m_decoded_size;
};

} // namespace tcp_in
42 changes: 4 additions & 38 deletions src/plugins/input/tcp/src/Decoder.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,7 @@
* SPDX-License-Identifier: BSD-3-Clause
*/

#include <cstddef> // size_t
#include <cstdint> // uint8_t

#include "ByteVector.hpp" // ByteVector
#include "DecodeBuffer.hpp"

namespace tcp_in
{
Expand All @@ -22,41 +19,10 @@ namespace tcp_in
class Decoder {
public:
/**
* @brief Gets the number of bytes that need to be readed so that `identify` may be used.
* @return Number of bytes required for `identify`, this should be always the same value.
*/
virtual size_t first_header_len() const noexcept = 0;

/**
* @brief Checks whether this message starts comunication decoded by this decoder. If yes this
* will also initialize the decoder.
* @param[in] data Start of the first message. This is at least `first_header_len()` bytes.
* @return true if the communication is decoded by this decoder, otherwise false.
*/
virtual bool identify(const uint8_t *data) = 0;

/**
* @brief Gets the number of bytes that need to be readed so that `msg_len` can be used.
* @return Number of bytes required for `msg_len`, this should be always the same value.
*/
virtual size_t header_len() const noexcept = 0;

/**
* @brief Gets length of the message based on its header. This is length INCLUDING the header
* length.
* @param[in] data Start of the message, this is at least `header_len()` bytes long.
* @return Number of bytes needed to decode the message.
*/
virtual size_t msg_len(const uint8_t *data) const = 0;

/**
* @brief Decodes the message in `msg` into `result`.
*
* @param[out] result the resulting message. If it is passed in empty, it can be fully overwritten. If `result` is
* not empty, the decoded message should be appended to it.
* @param[in] msg message to decode, calee may take ownership of it.
* @brief Reads all available data from TCP stream and returns buffer with decoded messages.
* @returns Buffer with decoded messages.
*/
virtual void decode(ByteVector &result, ByteVector &msg) = 0;
virtual DecodeBuffer &decode() = 0;

virtual ~Decoder() {}
};
Expand Down
37 changes: 37 additions & 0 deletions src/plugins/input/tcp/src/DecoderFactory.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/**
* \file
* \author Jakub Antonín Štigler <[email protected]>
* \brief Factory for creating decoders (header file)
* \date 2024
*
* Copyright: (C) 2023 CESNET, z.s.p.o.
* SPDX-License-Identifier: BSD-3-Clause
*/

#pragma once

#include <memory> // std::unique_ptr

#include "Decoder.hpp" // Decoder

namespace tcp_in
{

/**
* @brief Factory for TCP decoders.
*/
class DecoderFactory {
public:
DecoderFactory();

/**
* @brief Detects the type of decoder that should be used to decode the given stream and
* constructs it. This function may block if the decoder cannot be determined without
* recieving more data.
* @param fd TCP stream file descriptor
* @return Instance of the correct decoder, nullptr no decoder matches the data.
*/
std::unique_ptr<Decoder> detect_decoder(int fd);
};

} // namespace tcp_in
Loading

0 comments on commit 8c7a848

Please sign in to comment.