PR-767 Rework the interactiver server to be similar to other services

This commit is contained in:
Garret Fick 2019-11-12 14:08:09 -05:00
parent 9df9fd18cd
commit 45b588acb5
11 changed files with 446 additions and 516 deletions

View File

@ -81,6 +81,9 @@ void bootstrap() {
const char* config_path = "../etc/config.ini";
if (ini_parse(config_path, config_handler, &config) < 0) {
spdlog::info("Config file {} could not be read", config_path);
// If we don't have the config file, then default to always
// starting the interactive server.
config.services.push_back("interactive");
}
//======================================================

View File

@ -18,12 +18,13 @@
#ifdef OPLC_DNP3_OUTSTATION
#include <cctype>
#include <cstdio>
#include <cstdlib>
#include <ctime>
#include <csignal>
#include <algorithm>
#include <cctype>
#include <chrono>
#include <fstream>
#include <functional>
#include <iostream>
@ -271,17 +272,21 @@ void bind_variables(const vector<string>& binding_defs,
/// This is populated with values from the config file.
struct Dnp3Config {
Dnp3Config() :
poll_interval(std::chrono::milliseconds(250)),
port(20000),
link(false, false)
{}
// How fast we send and receive data into the runtime.
std::chrono::milliseconds poll_interval;
uint16_t port;
/// Outstation config
opendnp3::OutstationConfig outstation;
opendnp3::OutstationConfig outstation;
/// Link layer config
opendnp3::LinkConfig link;
/// Link layer config
opendnp3::LinkConfig link;
vector<string> bindings;
};
@ -345,7 +350,8 @@ OutstationStackConfig dnp3_create_config(istream& cfg_stream,
Dnp3IndexedGroup& binary_commands,
Dnp3IndexedGroup& analog_commands,
Dnp3MappedGroup& measurements,
uint16_t& port) {
uint16_t& port,
std::chrono::milliseconds& poll_interval) {
// We need to know the size of the database (number of points) before
// we can do anything. To avoid doing two passes of the stream, read
// everything into a map, then get the database size, and finally
@ -375,6 +381,7 @@ OutstationStackConfig dnp3_create_config(istream& cfg_stream,
config.link = dnp3_config.link;
port = dnp3_config.port;
poll_interval = dnp3_config.poll_interval;
return config;
}
@ -389,9 +396,10 @@ void dnp3s_start_server(unique_ptr<istream, function<void(istream*)>>& cfg_strea
Dnp3IndexedGroup analog_commands = {0};
Dnp3MappedGroup measurements = {0};
uint16_t port;
chrono::milliseconds poll_interval;
auto config(dnp3_create_config(*cfg_stream, glue_variables,
binary_commands, analog_commands,
measurements, port));
measurements, port, poll_interval));
// If we have a config override, then check for the port number
port = strlen(cfg_overrides) > 0 ? atoi(cfg_overrides) : port;
@ -424,10 +432,6 @@ void dnp3s_start_server(unique_ptr<istream, function<void(istream*)>>& cfg_strea
spdlog::info("DNP3 outstation enabled on port {0:d}", port);
// Continuously update
struct timespec timer_start;
clock_gettime(CLOCK_MONOTONIC, &timer_start);
// Run this until we get a signal to stop.
while (run) {
{
@ -439,7 +443,7 @@ void dnp3s_start_server(unique_ptr<istream, function<void(istream*)>>& cfg_strea
spdlog::trace("{} data points written to outstation", num_writes);
}
sleep_until(&timer_start, OPLC_CYCLE);
this_thread::sleep_for(poll_interval);
}
outstation->Disable();

View File

@ -140,7 +140,8 @@ asiodnp3::OutstationStackConfig dnp3_create_config(std::istream& cfg_stream,
Dnp3IndexedGroup& binary_commands,
Dnp3IndexedGroup& analog_commands,
Dnp3MappedGroup& measurements,
uint16_t& port);
uint16_t& port,
std::chrono::milliseconds& poll_interval);
/// @brief Start the DNP3 server running on the specified port and configured
/// using the specified stream.

View File

@ -52,7 +52,7 @@ inline bool ini_matches(const char* section_expected,
/// @param Return the string or null if cannot read more.
static char* istream_fgets(char* str, int num, void* stream) {
auto st = reinterpret_cast<std::istream*>(stream);
if (!st || st->eof()) {
if (!st->good() || st->eof()) {
// We previously reached the end of the file, so return the end signal.
return nullptr;
}

View File

@ -1,4 +1,5 @@
// Copyright 2018 Thiago Alves
// Copyright 2019 Smarter Grid Solutions
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
@ -21,18 +22,18 @@
// Thiago Alves, Jun 2018
//-----------------------------------------------------------------------------
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <errno.h>
#include <cstring>
#include <cstdint>
#include <chrono>
#include <fstream>
#include <functional>
#include <istream>
#include <memory>
#include <netdb.h>
#include <string.h>
#include <pthread.h>
#include <mutex>
#include <thread>
#include <stdio.h>
#include <arpa/inet.h>
#include <fcntl.h>
#include <time.h>
#include <netinet/in.h>
#include <spdlog/spdlog.h>
@ -46,16 +47,15 @@
* @{
*/
//Global Variables
const uint16_t BUFFER_MAX_SIZE(1024);
std::mutex command_mutex;
// TODO Globals to move into services
bool run_modbus = 0;
uint16_t modbus_port = 502;
bool run_enip = 0;
uint16_t enip_port = 44818;
unsigned char server_command[1024];
int command_index = 0;
bool processing_command = 0;
time_t start_time;
time_t end_time;
//Global Threads
pthread_t modbus_thread;
@ -92,11 +92,11 @@ void *enipThread(void *arg)
/// @brief Read the argument from a command function
/// @param *command
///////////////////////////////////////////////////////////////////////////////
int readCommandArgument(unsigned char *command)
int readCommandArgument(const char *command)
{
int i = 0;
int j = 0;
unsigned char argument[1024];
char argument[BUFFER_MAX_SIZE];
while (command[i] != '(' && command[i] != '\0') i++;
if (command[i] == '(') i++;
@ -118,7 +118,7 @@ int readCommandArgument(unsigned char *command)
/// @return Zero on success. Non-zero on failure. If this function
/// fails, it marks the target as an empty string so it is still safe
/// to read the string but it will be empty.
std::int8_t copy_command_config(unsigned char *source, char target[],
std::int8_t copy_command_config(const char *source, char target[],
size_t target_size)
{
// Search through source until we find the closing ")"
@ -147,66 +147,61 @@ std::int8_t copy_command_config(unsigned char *source, char target[],
return 0;
}
///////////////////////////////////////////////////////////////////////////////
/// @brief Create the socket and bind it.
/// @param port
/// @return the file descriptor for the socket
///////////////////////////////////////////////////////////////////////////////
int createSocket_interactive(int port)
/// @return the file descriptor for the socket, or less than 0 if a socket
/// if an error occurred.
int interactive_open_socket(uint16_t port)
{
int socket_fd;
struct sockaddr_in server_addr;
//Create TCP Socket
socket_fd = socket(AF_INET,SOCK_STREAM,0);
socket_fd = socket(AF_INET, SOCK_STREAM, 0);
if (socket_fd < 0)
{
spdlog::error("Interactive Server: error creating stream socket => {}", strerror(errno));
exit(1);
spdlog::error("Interactive Server: error creating stream socket => {}", strerror(errno));
return -1;
}
//Set SO_REUSEADDR
int enable = 1;
if (setsockopt(socket_fd, SOL_SOCKET, SO_REUSEADDR, &enable, sizeof(int)) < 0) {
perror("setsockopt(SO_REUSEADDR) failed");
}
if (setsockopt(socket_fd, SOL_SOCKET, SO_REUSEADDR, &enable, sizeof(int)) < 0) {
perror("setsockopt(SO_REUSEADDR) failed");
}
SetSocketBlockingEnabled(socket_fd, false);
//Initialize Server Struct
bzero((char *) &server_addr, sizeof(server_addr));
memset(&server_addr, 0, sizeof(server_addr));
server_addr.sin_family = AF_INET;
server_addr.sin_addr.s_addr = inet_addr("127.0.0.1");
server_addr.sin_port = htons(port);
//Bind socket
if (bind(socket_fd,(struct sockaddr *)&server_addr,sizeof(server_addr)) < 0)
if (bind(socket_fd, (struct sockaddr *)&server_addr, sizeof(server_addr)) < 0)
{
spdlog::error("Interactive Server: error binding socket => {}", strerror(errno));
exit(1);
spdlog::error("Interactive Server: error binding socket => {}", strerror(errno));
return -2;
}
// we accept max 5 pending connections
listen(socket_fd,5);
spdlog::info("Interactive Server: Listening on port {}", port);
listen(socket_fd, 5);
spdlog::info("Interactive Server: Listening on port {}", port);
return socket_fd;
}
///////////////////////////////////////////////////////////////////////////////
/// @brief Blocking call. Wait here for the client to connect.
/// @param socket_fd
/// @return the file descriptor descriptor to communicate with the client.
///////////////////////////////////////////////////////////////////////////////
int waitForClient_interactive(int socket_fd)
int interactive_wait_new_client(volatile bool& run, int socket_fd)
{
int client_fd;
struct sockaddr_in client_addr;
socklen_t client_len;
spdlog::debug("Interactive Server: waiting for new client...");
spdlog::debug("Interactive Server: waiting for new client...");
client_len = sizeof(client_addr);
while (run_openplc)
while (run)
{
client_fd = accept(socket_fd, (struct sockaddr *)&client_addr, &client_len); //non-blocking call
if (client_fd > 0)
@ -214,149 +209,85 @@ int waitForClient_interactive(int socket_fd)
SetSocketBlockingEnabled(client_fd, true);
break;
}
sleepms(100);
this_thread::sleep_for(chrono::milliseconds(100));
}
return client_fd;
}
/////////////////////////////////////////////////////////////////////////////////
/// @brief Blocking call. Holds here until something is received from the client.
/// Once the message is received, it is stored on the buffer and the function
/// returns the number of bytes received.
/// @param client_fd
/// @param *buffer
/// @return the number of bytes received.
///////////////////////////////////////////////////////////////////////////////
int listenToClient_interactive(int client_fd, unsigned char *buffer)
{
bzero(buffer, 1024);
int n = read(client_fd, buffer, 1024);
return n;
}
/////////////////////////////////////////////////////////////////////////////////
/// @brief Process client's commands for the interactive server
/// @param *buffer
/// @param client_fd
/////////////////////////////////////////////////////////////////////////////////
void processCommand(unsigned char *buffer, int client_fd)
{
spdlog::debug("Process command received {}", buffer);
int count_char = 0;
void interactive_client_command(const char* command, int client_fd) {
// A buffer for the command configuration information.
const size_t COMMAND_CONFIG_SIZE(1024);
char command_config[COMMAND_CONFIG_SIZE];
if (processing_command)
{
count_char = sprintf(buffer, "Processing command...\n");
write(client_fd, buffer, count_char);
char command_buffer[BUFFER_MAX_SIZE];
std::unique_lock<std::mutex> lock(command_mutex, std::defer_lock);
if (!lock.try_lock()) {
spdlog::trace("Process command skipped because already processing {}", command);
int count_char = sprintf(command_buffer, "Another command in progress...\n");
write(client_fd, command_buffer, count_char);
return;
}
if (strncmp(buffer, "quit()", 6) == 0)
spdlog::trace("Process command received {}", command);
if (strncmp(command, "quit()", 6) == 0)
{
processing_command = true;
spdlog::info("Issued quit() command");
spdlog::info("Issued quit() command");
if (run_modbus)
{
run_modbus = 0;
pthread_join(modbus_thread, NULL);
spdlog::info("Modbus server was stopped");
spdlog::info("Modbus server was stopped");
}
services_stop();
run_openplc = 0;
processing_command = false;
}
else if (strncmp(buffer, "start_modbus(", 13) == 0)
else if (strncmp(command, "start_modbus(", 13) == 0)
{
processing_command = true;
modbus_port = readCommandArgument(buffer);
spdlog::info("Issued start_modbus() command to start on port: {}", modbus_port);
modbus_port = readCommandArgument(command);
spdlog::info("Issued start_modbus() command to start on port: {}", modbus_port);
if (run_modbus)
{
spdlog::info("Modbus server already active. Restarting on port: {}", modbus_port);
spdlog::info("Modbus server already active. Restarting on port: {}", modbus_port);
//Stop Modbus server
run_modbus = 0;
pthread_join(modbus_thread, NULL);
spdlog::info("Modbus server was stopped");
spdlog::info("Modbus server was stopped");
}
//Start Modbus server
run_modbus = 1;
pthread_create(&modbus_thread, NULL, modbusThread, NULL);
processing_command = false;
}
else if (strncmp(buffer, "stop_modbus()", 13) == 0)
else if (strncmp(command, "stop_modbus()", 13) == 0)
{
processing_command = true;
spdlog::info("Issued stop_modbus() command");
spdlog::info("Issued stop_modbus() command");
if (run_modbus)
{
run_modbus = 0;
pthread_join(modbus_thread, NULL);
spdlog::info("Modbus server was stopped");
spdlog::info("Modbus server was stopped");
}
processing_command = false;
}
#ifdef OPLC_DNP3_OUTSTATION
else if (strncmp(buffer, "start_dnp3(", 11) == 0)
else if (strncmp(command, "start_dnp3(", 11) == 0)
{
processing_command = true;
ServiceDefinition* def = services_find("dnp3s");
if (def && copy_command_config(buffer + 11, command_config, COMMAND_CONFIG_SIZE) == 0) {
def->start(command_config);
if (def && copy_command_config(command + 11, command_buffer, BUFFER_MAX_SIZE) == 0) {
def->start(command_buffer);
}
processing_command = false;
}
else if (strncmp(buffer, "stop_dnp3()", 11) == 0)
else if (strncmp(command, "stop_dnp3()", 11) == 0)
{
processing_command = true;
ServiceDefinition* def = services_find("dnp3s");
if (def) {
def->stop();
}
processing_command = false;
}
#endif // OPLC_DNP3_OUTSTATION
else if (strncmp(buffer, "start_enip(", 11) == 0)
else if (strncmp(command, "start_enip(", 11) == 0)
{
processing_command = true;
spdlog::info("Issued start_enip() command to start on port: {}", readCommandArgument(buffer));
enip_port = readCommandArgument(buffer);
if (run_enip)
{
spdlog::info("EtherNet/IP server already active. Restarting on port: {}", enip_port);
//Stop Enip server
run_enip = 0;
pthread_join(enip_thread, NULL);
spdlog::info("EtherNet/IP server was stopped");
}
//Start Enip server
run_enip = 1;
pthread_create(&enip_thread, NULL, enipThread, NULL);
processing_command = false;
}
else if (strncmp(buffer, "stop_enip()", 11) == 0)
{
processing_command = true;
spdlog::info("Issued stop_enip() command");
if (run_enip)
{
run_enip = 0;
pthread_join(enip_thread, NULL);
spdlog::info("EtherNet/IP server was stopped");
}
processing_command = false;
}
else if (strncmp(buffer, "start_enip(", 11) == 0)
{
processing_command = true;
enip_port = readCommandArgument(buffer);
spdlog::info("Issued start_enip() command to start on port: {}", enip_port);
spdlog::info("Issued start_enip() command to start on port: {}", readCommandArgument(command));
enip_port = readCommandArgument(command);
if (run_enip)
{
spdlog::info("EtherNet/IP server already active. Restarting on port: {}", enip_port);
@ -368,11 +299,9 @@ void processCommand(unsigned char *buffer, int client_fd)
//Start Enip server
run_enip = 1;
pthread_create(&enip_thread, NULL, enipThread, NULL);
processing_command = false;
}
else if (strncmp(buffer, "stop_enip()", 11) == 0)
else if (strncmp(command, "stop_enip()", 11) == 0)
{
processing_command = true;
spdlog::info("Issued stop_enip() command");
if (run_enip)
{
@ -380,166 +309,136 @@ void processCommand(unsigned char *buffer, int client_fd)
pthread_join(enip_thread, NULL);
spdlog::info("EtherNet/IP server was stopped");
}
processing_command = false;
}
else if (strncmp(buffer, "start_pstorage(", 15) == 0)
else if (strncmp(command, "start_pstorage(", 15) == 0)
{
processing_command = true;
ServiceDefinition* def = services_find("pstorage");
if (def && copy_command_config(buffer + 15, command_config, COMMAND_CONFIG_SIZE) == 0) {
def->start(command_config);
if (def && copy_command_config(command + 15, command_buffer, BUFFER_MAX_SIZE) == 0) {
def->start(command_buffer);
}
processing_command = false;
}
else if (strncmp(buffer, "stop_pstorage()", 15) == 0)
else if (strncmp(command, "stop_pstorage()", 15) == 0)
{
processing_command = true;
ServiceDefinition* def = services_find("pstorage");
if (def) {
def->stop();
}
processing_command = false;
}
else if (strncmp(buffer, "runtime_logs()", 14) == 0)
else if (strncmp(command, "runtime_logs()", 14) == 0)
{
processing_command = true;
spdlog::debug("Issued runtime_logs() command");
std::string data = log_sink->data();
write(client_fd, data.c_str(), data.size());
processing_command = false;
return;
}
else if (strncmp(buffer, "exec_time()", 11) == 0)
else if (strncmp(command, "exec_time()", 11) == 0)
{
processing_command = true;
time_t end_time;
time(&end_time);
count_char = sprintf(buffer, "%llu\n", (unsigned long long)difftime(end_time, start_time));
write(client_fd, buffer, count_char);
processing_command = false;
int count_char = sprintf(command_buffer, "%llu\n", (unsigned long long)difftime(end_time, start_time));
write(client_fd, command_buffer, count_char);
return;
}
else
{
processing_command = true;
count_char = sprintf(buffer, "Error: unrecognized command\n");
write(client_fd, buffer, count_char);
processing_command = false;
int count_char = sprintf(command_buffer, "Error: unrecognized command\n");
write(client_fd, command_buffer, count_char);
return;
}
count_char = sprintf(buffer, "OK\n");
write(client_fd, buffer, count_char);
int count_char = sprintf(command_buffer, "OK\n");
write(client_fd, command_buffer, count_char);
}
/////////////////////////////////////////////////////////////////////////////////////
/// @brief Process client's request
/// @param *buffer
/// @param bufferSize
/// @param client_fd
/////////////////////////////////////////////////////////////////////////////////////
void processMessage_interactive(unsigned char *buffer, int bufferSize, int client_fd)
{
for (int i = 0; i < bufferSize; i++)
{
if (buffer[i] == '\r' || buffer[i] == '\n' || command_index >= 1024)
{
processCommand(server_command, client_fd);
command_index = 0;
break;
}
server_command[command_index] = buffer[i];
command_index++;
server_command[command_index] = '\0';
}
}
struct ClientArgs {
int client_fd;
volatile bool* run;
};
/////////////////////////////////////////////////////////////////////////////////////
/// @brief Thread to handle requests for each connected client
/// @param *arguments
/////////////////////////////////////////////////////////////////////////////////////
void *handleConnections_interactive(void *arguments)
{
int client_fd = *(int *)arguments;
unsigned char buffer[1024];
int messageSize;
void* interactive_client_run(void* arguments) {
auto client_args = reinterpret_cast<ClientArgs*>(arguments);
spdlog::debug("Interactive Server: Thread created for client ID: {}", client_fd);
char buffer[BUFFER_MAX_SIZE];
int message_size;
while(run_openplc)
{
//unsigned char buffer[1024];
//int messageSize;
while (*client_args->run) {
memset(buffer, 0, BUFFER_MAX_SIZE);
message_size = read(client_args->client_fd, buffer, BUFFER_MAX_SIZE);
messageSize = listenToClient_interactive(client_fd, buffer);
if (messageSize <= 0 || messageSize > 1024)
{
// something has gone wrong or the client has closed connection
if (messageSize == 0)
{
spdlog::debug("Interactive Server: client ID: {} has closed the connection", client_fd);
}
else
{
spdlog::error("Interactive Server: Something is wrong with the client ID: {} message Size : {}", client_fd, messageSize);
}
if (message_size <= 0) {
spdlog::trace("Interactive Server: client ID: {} has closed the connection", client_args->client_fd);
break;
}
processMessage_interactive(buffer, messageSize, client_fd);
if (message_size > BUFFER_MAX_SIZE) {
spdlog::error("Interactive Server: Message size is too large for client {}", client_args->client_fd);
break;
}
// Process the received buffer, splitting into commands
char* start = buffer;
uint16_t cur_pos = 0;
while (cur_pos < BUFFER_MAX_SIZE && buffer[cur_pos] != '\0') {
if (buffer[cur_pos] == '\n' || buffer[cur_pos] == '\r') {
// Terminate the command
buffer[cur_pos] = '\0';
interactive_client_command(start, client_args->client_fd);
}
cur_pos += 1;
}
}
spdlog::debug("Closing client socket and calling pthread_exit in interactive_server.cpp");
closeSocket(client_fd);
spdlog::debug("Terminating interactive server connections");
closeSocket(client_args->client_fd);
spdlog::trace("Interactive server connection completed");
delete client_args;
pthread_exit(NULL);
}
/////////////////////////////////////////////////////////////////////////////////////
/// @brief Function to start the server. It receives the port number as argument and
/// creates an infinite loop to listen and parse the messages sent by the
/// clients
/// @param port
/////////////////////////////////////////////////////////////////////////////////////
void startInteractiveServer(int port)
{
int socket_fd, client_fd;
socket_fd = createSocket_interactive(port);
int8_t interactive_run(std::unique_ptr<istream, function<void(istream*)>>& cfg_stream,
const char* cfg_overrides,
const GlueVariablesBinding& bindings,
volatile bool& run) {
const uint16_t port = 43628;
int socket_fd = interactive_open_socket(port);
while(run_openplc)
{
client_fd = waitForClient_interactive(socket_fd); //block until a client connects
if (client_fd < 0)
{
spdlog::error("Interactive Server: Error accepting client!");
// Listen for new connections to our socket. When we have a new
// connection, we spawn a new thread to handle that connection.
while (run) {
int client_fd = interactive_wait_new_client(run, socket_fd);
if (client_fd < 0) {
spdlog::error("Interactive Server: Error accepting client!");
continue;
}
else
{
int arguments[1];
pthread_t thread;
int ret = -1;
spdlog::debug("Interactive Server: Client accepted! Creating thread for the new client ID: {}", client_fd);
arguments[0] = client_fd;
ret = pthread_create(&thread, NULL, handleConnections_interactive, arguments);
if (ret==0)
{
pthread_detach(thread);
}
pthread_t thread;
auto client_args = new ClientArgs { .client_fd=client_fd, .run=&run };
spdlog::trace("Interactive Server: Client accepted! Creating thread for the new client ID: {}", client_fd);
int ret = pthread_create(&thread, NULL, interactive_client_run, client_args);
if (ret == 0) {
pthread_detach(thread);
} else {
delete client_args;
}
}
spdlog::info("Shutting down internal threads\n");
run_modbus = 0;
run_enip = 0;
pthread_join(modbus_thread, NULL);
pthread_join(enip_thread, NULL);
spdlog::info("Closing socket...");
closeSocket(socket_fd);
closeSocket(client_fd);
spdlog::debug("Terminating interactive server thread");
}
void initializeLogging(int argc,char **argv)
void interactive_service_run(const GlueVariablesBinding& binding,
volatile bool& run, const char* config) {
unique_ptr<istream, function<void(istream*)>> cfg_stream(new ifstream("../etc/config.ini"), [](istream* s)
{
reinterpret_cast<ifstream*>(s)->close();
delete s;
});
interactive_run(cfg_stream, config, binding, run);
}
void initialize_logging(int argc,char **argv)
{
log_sink.reset(new buffered_sink(log_buffer, LOG_BUFFER_SIZE));
spdlog::default_logger()->sinks().push_back(log_sink);

View File

@ -0,0 +1,32 @@
// Copyright 2018 Thiago Alves
// Copyright 2019 Smarter Grid Solutions
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http ://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissionsand
// limitations under the License.
/** \addtogroup openplc_runtime
* @{
*/
class GlueVariablesBinding;
/// @brief Start the interactive socket server.
///
/// @param glue_variables The glue variables that may be bound into this
/// server.
/// @param run A signal for running this server. This server terminates when
/// this signal is false.
/// @param config The custom configuration for this service.
void interactive_service_run(const GlueVariablesBinding& binding,
volatile bool& run, const char* config);
/** @}*/

View File

@ -133,12 +133,10 @@ void closeSocket(int fd);
bool SetSocketBlockingEnabled(int fd, bool blocking);
//interactive_server.cpp
void startInteractiveServer(int port);
void initializeLogging(int argc,char **argv);
void initialize_logging(int argc,char **argv);
extern bool run_modbus;
extern bool run_enip;
extern time_t start_time;
extern time_t end_time;
//modbus.cpp
int processModbusMessage(unsigned char *buffer, int bufferSize);

View File

@ -80,16 +80,6 @@ void sleepms(int milliseconds)
nanosleep(&ts, NULL);
}
////////////////////////////////////////////////////////////////////////////////
/// \brief Interactive Server Thread. Creates the server to listen to commands
/// on localhost
////////////////////////////////////////////////////////////////////////////////
void *interactiveServerThread(void *arg)
{
startInteractiveServer(43628);
return nullptr;
}
////////////////////////////////////////////////////////////////////////////////
/// \brief Verify if pin is present in one of the ignored vectors
/// \param
@ -165,15 +155,15 @@ void handleSpecialFunctions()
int main(int argc,char **argv)
{
initializeLogging(argc, argv);
initialize_logging(argc, argv);
spdlog::info("OpenPLC Runtime starting...");
bootstrap();
// Start the thread for the interactive server
time(&start_time);
pthread_t interactive_thread;
pthread_create(&interactive_thread, NULL, interactiveServerThread, NULL);
// Start the thread for the any services we have, including the
// interactive server and anything out that is configured to
// automatically start
bootstrap();
//gets the starting point for the clock
spdlog::debug("Getting current time");
@ -212,8 +202,8 @@ int main(int argc,char **argv)
// SHUTTING DOWN OPENPLC RUNTIME
//======================================================
services_stop();
services_finalize();
pthread_join(interactive_thread, NULL);
spdlog::debug("Disabling outputs...");
disableOutputs();
updateCustomOut();

View File

@ -62,54 +62,54 @@ const size_t FILE_HEADER_SIZE(extent<decltype(FILE_HEADER)>::value);
/// @param size the size type.
/// @return The required storage size in number of bytes.
inline uint8_t get_size_bytes(IecLocationSize size) {
switch (size) {
case IECLST_BIT:
return 1;
case IECLST_BYTE:
return 1;
case IECLST_WORD:
return 2;
case IECLST_DOUBLEWORD:
return 4;
case IECLST_LONGWORD:
return 8;
}
switch (size) {
case IECLST_BIT:
return 1;
case IECLST_BYTE:
return 1;
case IECLST_WORD:
return 2;
case IECLST_DOUBLEWORD:
return 4;
case IECLST_LONGWORD:
return 8;
}
return 0;
return 0;
}
/// Get the total number of bytes required to store the bindings.
/// @param bindings The bindings that we want to store.
/// @return The total number of bytes required.
size_t get_size_bytes(const GlueVariablesBinding& bindings) {
size_t size(0);
size_t size(0);
for (uint16_t index(0); index < bindings.size; ++index) {
const GlueVariable& glue = bindings.glue_variables[index];
for (uint16_t index(0); index < bindings.size; ++index) {
const GlueVariable& glue = bindings.glue_variables[index];
if (glue.dir != IECLDT_MEM) {
// We only care about items that are stored in memory
continue;
}
if (glue.dir != IECLDT_MEM) {
// We only care about items that are stored in memory
continue;
}
size += get_size_bytes(glue.size);
}
size += get_size_bytes(glue.size);
}
return size;
return size;
}
inline uint8_t mask_index(GlueBoolGroup* group, uint8_t i) {
if (!group->values[i]) {
return 0;
}
if (!group->values[i]) {
return 0;
}
return (*group->values) ? (1 << i) : 0;
return (*group->values) ? (1 << i) : 0;
}
inline void set_index(GlueBoolGroup* group, uint8_t i, uint8_t v) {
if (group->values[i]) {
(*group->values[i]) = ((1 << i) & v) ? TRUE: FALSE;
}
if (group->values[i]) {
(*group->values[i]) = ((1 << i) & v) ? TRUE: FALSE;
}
}
/// Copy the glue values into the buffer.
@ -117,284 +117,284 @@ inline void set_index(GlueBoolGroup* group, uint8_t i, uint8_t v) {
/// @param buffer The buffer that we are copying into.
/// @return The number of bytes that were written into the buffer.
size_t pstorage_copy_glue(const GlueVariablesBinding& bindings, char* buffer) {
lock_guard<mutex> guard(*bindings.buffer_lock);
lock_guard<mutex> guard(*bindings.buffer_lock);
size_t num_written(0);
for (uint16_t index(0); index < bindings.size; ++index) {
const GlueVariable& glue = bindings.glue_variables[index];
size_t num_written(0);
for (uint16_t index(0); index < bindings.size; ++index) {
const GlueVariable& glue = bindings.glue_variables[index];
if (glue.dir != IECLDT_MEM) {
// We only care about items that are stored in memory
continue;
}
if (glue.dir != IECLDT_MEM) {
// We only care about items that are stored in memory
continue;
}
uint8_t num_bytes = get_size_bytes(glue.size);
uint8_t num_bytes = get_size_bytes(glue.size);
if (glue.size == IECLST_BIT) {
GlueBoolGroup* group = reinterpret_cast<GlueBoolGroup*>(glue.value);
uint8_t bools_as_byte = mask_index(group, 0)
| mask_index(group, 1)
| mask_index(group, 2)
| mask_index(group, 3)
| mask_index(group, 4)
| mask_index(group, 5)
| mask_index(group, 6)
| mask_index(group, 7);
memcpy(buffer, &bools_as_byte, 1);
} else {
// Write the number of bytes to the buffer
memcpy(buffer, glue.value, num_bytes);
}
if (glue.size == IECLST_BIT) {
GlueBoolGroup* group = reinterpret_cast<GlueBoolGroup*>(glue.value);
uint8_t bools_as_byte = mask_index(group, 0)
| mask_index(group, 1)
| mask_index(group, 2)
| mask_index(group, 3)
| mask_index(group, 4)
| mask_index(group, 5)
| mask_index(group, 6)
| mask_index(group, 7);
memcpy(buffer, &bools_as_byte, 1);
} else {
// Write the number of bytes to the buffer
memcpy(buffer, glue.value, num_bytes);
}
// Advance the pointer to the next starting position
num_written += num_bytes;
buffer += num_bytes;
}
// Advance the pointer to the next starting position
num_written += num_bytes;
buffer += num_bytes;
}
return num_written;
return num_written;
}
/// Container for reading in configuration from the config.ini
/// This is populated with values from the config file.
struct PstorageConfig {
PstorageConfig() :
poll_interval(std::chrono::seconds(10))
{}
std::chrono::seconds poll_interval;
PstorageConfig() :
poll_interval(std::chrono::seconds(10))
{}
std::chrono::seconds poll_interval;
};
int pstorage_cfg_handler(void* user_data, const char* section,
const char* name, const char* value) {
if (strcmp("pstorage", section) != 0) {
if (strcmp("pstorage", section) != 0) {
return 0;
}
auto config = reinterpret_cast<PstorageConfig*>(user_data);
auto config = reinterpret_cast<PstorageConfig*>(user_data);
if (strcmp(name, "poll_interval") == 0) {
// We do not allow a poll period of less than 1 second as that
// might cause lock contention problems.
config->poll_interval = std::chrono::seconds(max(1, atoi(value)));
} else if (strcmp(name, "enabled") == 0) {
if (strcmp(name, "poll_interval") == 0) {
// We do not allow a poll period of less than 1 second as that
// might cause lock contention problems.
config->poll_interval = std::chrono::seconds(max(1, atoi(value)));
} else if (strcmp(name, "enabled") == 0) {
// Nothing to do here - we already know this is enabled
} else {
} else {
spdlog::warn("Unknown configuration item {}", name);
return -1;
}
return 0;
return 0;
}
int8_t pstorage_run(std::unique_ptr<std::istream, std::function<void(std::istream*)>>& cfg_stream,
const char* cfg_overrides,
const GlueVariablesBinding& bindings,
volatile bool& run,
function<std::ostream*(void)> stream_fn)
const GlueVariablesBinding& bindings,
volatile bool& run,
function<std::ostream*(void)> stream_fn)
{
PstorageConfig config;
ini_parse_stream(istream_fgets, cfg_stream.get(), pstorage_cfg_handler, &config);
PstorageConfig config;
ini_parse_stream(istream_fgets, cfg_stream.get(), pstorage_cfg_handler, &config);
// We are done with the file, so release the unique ptr. Normally this
// We are done with the file, so release the unique ptr. Normally this
// will close the reference to the file
cfg_stream.reset(nullptr);
if (strlen(cfg_overrides) > 0) {
config.poll_interval = std::chrono::seconds(max(1, atoi(cfg_overrides)));
}
if (strlen(cfg_overrides) > 0) {
config.poll_interval = std::chrono::seconds(max(1, atoi(cfg_overrides)));
}
const char endianness_header[2] = { IS_BIG_ENDIAN, '\n'};
const char endianness_header[2] = { IS_BIG_ENDIAN, '\n'};
// This isn't ideal because we really only need enough space for
// the located variables that are memory items, and this is all, but
// it does ensure we have enough space.
char buffer[OPLC_PERSISTENT_STORAGE_MAX_SIZE] = {0};
// We keep a second block of memory so that we can detect if the values
// have changed and not write if they are the same.
char buffer_old[OPLC_PERSISTENT_STORAGE_MAX_SIZE] = {0};
// This isn't ideal because we really only need enough space for
// the located variables that are memory items, and this is all, but
// it does ensure we have enough space.
char buffer[OPLC_PERSISTENT_STORAGE_MAX_SIZE] = {0};
// We keep a second block of memory so that we can detect if the values
// have changed and not write if they are the same.
char buffer_old[OPLC_PERSISTENT_STORAGE_MAX_SIZE] = {0};
// If the required size from bindings is greater than the configured
// size, then just exit
if (get_size_bytes(bindings) > extent<decltype(buffer)>::value) {
spdlog::error("Stored variables too large for persistent storage");
return -1;
}
// If the required size from bindings is greater than the configured
// size, then just exit
if (get_size_bytes(bindings) > extent<decltype(buffer)>::value) {
spdlog::error("Stored variables too large for persistent storage");
return -1;
}
while (run) {
size_t num_written = pstorage_copy_glue(bindings, buffer);
while (run) {
size_t num_written = pstorage_copy_glue(bindings, buffer);
if (memcmp(buffer, buffer_old, num_written) != 0) {
// Try to open the file to do the initial write
unique_ptr<ostream> out_stream(stream_fn());
if (!out_stream) {
spdlog::error("Unable to open persistent storage file for writing");
return -2;
}
out_stream->write(FILE_HEADER, FILE_HEADER_SIZE);
out_stream->write(endianness_header, 2);
out_stream->write(bindings.checksum, strlen(bindings.checksum));
out_stream->put('\n');
out_stream->write(buffer, num_written);
if (memcmp(buffer, buffer_old, num_written) != 0) {
// Try to open the file to do the initial write
unique_ptr<ostream> out_stream(stream_fn());
if (!out_stream) {
spdlog::error("Unable to open persistent storage file for writing");
return -2;
}
out_stream->write(FILE_HEADER, FILE_HEADER_SIZE);
out_stream->write(endianness_header, 2);
out_stream->write(bindings.checksum, strlen(bindings.checksum));
out_stream->put('\n');
out_stream->write(buffer, num_written);
spdlog::info("Persistent storage updated");
spdlog::info("Persistent storage updated");
// We should be able to avoid this memory copy entirely
memcpy(buffer_old, buffer, num_written);
} else {
spdlog::debug("Skip persistent write because unchanged values");
}
// We should be able to avoid this memory copy entirely
memcpy(buffer_old, buffer, num_written);
} else {
spdlog::debug("Skip persistent write because unchanged values");
}
// Since we just wrote, we sleep.
// TODO this needs a new mechanism for sleeping because this can
// delay shutdown/stop if polling is long.
this_thread::sleep_for(config.poll_interval);
}
// Since we just wrote, we sleep.
// TODO this needs a new mechanism for sleeping because this can
// delay shutdown/stop if polling is long.
this_thread::sleep_for(config.poll_interval);
}
spdlog::debug("Persistent storage ending normally");
spdlog::debug("Persistent storage ending normally");
return 0;
return 0;
}
inline int8_t read_and_check(istream& input_stream, const char header[],
char buffer[], size_t count) {
if (!input_stream.read(buffer, count)) {
spdlog::warn("Unable to read header from persistence file stream");
return -1;
}
if (!input_stream.read(buffer, count)) {
spdlog::warn("Unable to read header from persistence file stream");
return -1;
}
if (memcmp(header, buffer, count) != 0) {
spdlog::warn("Header does not match expected in persistence file ");
return -2;
}
if (memcmp(header, buffer, count) != 0) {
spdlog::warn("Header does not match expected in persistence file ");
return -2;
}
return 0;
return 0;
}
int8_t pstorage_read(istream& input_stream,
const GlueVariablesBinding& bindings)
{
// Read the file header - we define the file header as a constant that
// must be present as the header. We don't allow UTF BOMs here.
char header_check[FILE_HEADER_SIZE];
if (read_and_check(input_stream, FILE_HEADER, header_check, FILE_HEADER_SIZE) != 0) {
return -1;
}
// Read the file header - we define the file header as a constant that
// must be present as the header. We don't allow UTF BOMs here.
char header_check[FILE_HEADER_SIZE];
if (read_and_check(input_stream, FILE_HEADER, header_check, FILE_HEADER_SIZE) != 0) {
return -1;
}
// Check endianness of the written file
char endianness_expected[2] = { IS_BIG_ENDIAN, '\n'};
char endianness_check[2];
if (read_and_check(input_stream, endianness_expected, header_check, 2) != 0) {
return -2;
}
// Check endianness of the written file
char endianness_expected[2] = { IS_BIG_ENDIAN, '\n'};
char endianness_check[2];
if (read_and_check(input_stream, endianness_expected, header_check, 2) != 0) {
return -2;
}
// We have a digest in the header to try to prevent accidentally using
// the wrong persistence file for a particular runtime.
char checksum_check[32];
if (read_and_check(input_stream, bindings.checksum, checksum_check, 32) != 0) {
return -3;
}
// We have a digest in the header to try to prevent accidentally using
// the wrong persistence file for a particular runtime.
char checksum_check[32];
if (read_and_check(input_stream, bindings.checksum, checksum_check, 32) != 0) {
return -3;
}
// Just add one newline character
char padding_expected[1] = { '\n' };
char padding_check[1];
if (read_and_check(input_stream, padding_expected, padding_check, 1) != 0) {
return -4;
}
// Just add one newline character
char padding_expected[1] = { '\n' };
char padding_check[1];
if (read_and_check(input_stream, padding_expected, padding_check, 1) != 0) {
return -4;
}
// Now we know that the format is right, so read in the rest. We read
// variable by variable so that we can assign into the right value.
for (uint16_t index(0); index < bindings.size; ++index) {
const GlueVariable& glue = bindings.glue_variables[index];
// Now we know that the format is right, so read in the rest. We read
// variable by variable so that we can assign into the right value.
for (uint16_t index(0); index < bindings.size; ++index) {
const GlueVariable& glue = bindings.glue_variables[index];
if (glue.dir != IECLDT_MEM) {
// We only care about items that are stored in memory
continue;
}
if (glue.dir != IECLDT_MEM) {
// We only care about items that are stored in memory
continue;
}
uint8_t num_bytes;
switch (glue.size) {
case IECLST_BIT:
num_bytes = 1;
break;
case IECLST_BYTE:
num_bytes = 1;
break;
case IECLST_WORD:
num_bytes = 2;
break;
case IECLST_DOUBLEWORD:
num_bytes = 4;
break;
case IECLST_LONGWORD:
num_bytes = 8;
break;
default:
spdlog::error("Unexpected glue variable type {}", glue.size);
return -5;
}
uint8_t num_bytes;
switch (glue.size) {
case IECLST_BIT:
num_bytes = 1;
break;
case IECLST_BYTE:
num_bytes = 1;
break;
case IECLST_WORD:
num_bytes = 2;
break;
case IECLST_DOUBLEWORD:
num_bytes = 4;
break;
case IECLST_LONGWORD:
num_bytes = 8;
break;
default:
spdlog::error("Unexpected glue variable type {}", glue.size);
return -5;
}
// Read the required number of bytes from the stream
// 8 here is the maximum buffer size that we need based on
// the types that we support.
char buffer[8];
if (!input_stream.read(buffer, num_bytes)) {
spdlog::error("Persistent storage file too short; partially read");
return -6;
}
// Read the required number of bytes from the stream
// 8 here is the maximum buffer size that we need based on
// the types that we support.
char buffer[8];
if (!input_stream.read(buffer, num_bytes)) {
spdlog::error("Persistent storage file too short; partially read");
return -6;
}
// Assign the value into the glue value. The value is ether a simple
// value or a group of booleans.
// We don't actually care what the contents are - we just populate as
// though they are raw bytes
if (glue.size == IECLST_BIT) {
GlueBoolGroup* group = reinterpret_cast<GlueBoolGroup*>(glue.value);
uint8_t value = static_cast<uint8_t>(buffer[0]);
set_index(group, 0, value);
set_index(group, 1, value);
set_index(group, 2, value);
set_index(group, 3, value);
set_index(group, 4, value);
set_index(group, 5, value);
set_index(group, 6, value);
set_index(group, 7, value);
} else {
memcpy(glue.value, buffer, num_bytes);
}
}
// Assign the value into the glue value. The value is ether a simple
// value or a group of booleans.
// We don't actually care what the contents are - we just populate as
// though they are raw bytes
if (glue.size == IECLST_BIT) {
GlueBoolGroup* group = reinterpret_cast<GlueBoolGroup*>(glue.value);
uint8_t value = static_cast<uint8_t>(buffer[0]);
set_index(group, 0, value);
set_index(group, 1, value);
set_index(group, 2, value);
set_index(group, 3, value);
set_index(group, 4, value);
set_index(group, 5, value);
set_index(group, 6, value);
set_index(group, 7, value);
} else {
memcpy(glue.value, buffer, num_bytes);
}
}
spdlog::info("Initialized from persistent storage");
spdlog::info("Initialized from persistent storage");
return 0;
return 0;
}
void pstorage_service_init(const GlueVariablesBinding& binding) {
ifstream stream("persistent.file", ios::binary);
if (!stream) {
spdlog::info("Skipped load persistence because file cannot be read.");
return;
}
ifstream stream("persistent.file", ios::binary);
if (!stream) {
spdlog::info("Skipped load persistence because file cannot be read.");
return;
}
auto result = pstorage_read(stream, binding);
spdlog::info("Storage read completed with result {}", result);
auto result = pstorage_read(stream, binding);
spdlog::info("Storage read completed with result {}", result);
}
void pstorage_service_finalize(const GlueVariablesBinding& binding) {
// We don't current do anything on finalize (although we probably should)
// We don't current do anything on finalize (although we probably should)
}
void pstorage_service_run(const GlueVariablesBinding& binding,
volatile bool& run, const char* config) {
// We don't allow a poll duration of less than one second otherwise
// that can have detrimental effects on performance
auto create_stream = []() { return new ofstream("persistent.file", ios::binary); };
// We don't allow a poll duration of less than one second otherwise
// that can have detrimental effects on performance
auto create_stream = []() { return new ofstream("persistent.file", ios::binary); };
unique_ptr<istream, function<void(istream*)>> cfg_stream(new ifstream("../etc/config.ini"), [](istream* s)
unique_ptr<istream, function<void(istream*)>> cfg_stream(new ifstream("../etc/config.ini"), [](istream* s)
{
reinterpret_cast<ifstream*>(s)->close();
delete s;
});
pstorage_run(cfg_stream, config, binding, run, create_stream);
pstorage_run(cfg_stream, config, binding, run, create_stream);
}
/** @}*/

View File

@ -18,13 +18,16 @@
#include "service_definition.h"
#include "service_registry.h"
#include "pstorage.h"
#include "interactive_server.h"
#include "../dnp3s/dnp3.h"
ServiceInitFunction pstorage_init_fn(pstorage_service_init);
ServiceStartFunction pstorage_start_service_fn(pstorage_service_run);
ServiceStartFunction dnp3s_start_service_fn(dnp3s_service_run);
ServiceStartFunction interactive_start_service_fn(interactive_service_run);
ServiceDefinition* services[] = {
new ServiceDefinition("interactive", interactive_start_service_fn),
new ServiceDefinition("pstorage", pstorage_start_service_fn, pstorage_init_fn),
#ifdef OPLC_DNP3_OUTSTATION
new ServiceDefinition("dnp3s", dnp3s_start_service_fn),

View File

@ -22,7 +22,6 @@
#include "catch.hpp"
#include "fakeit.hpp"
void sleep_until(timespec*, int) {}
#include "glue.h"
#include "dnp3s/dnp3.h"
@ -36,6 +35,7 @@ SCENARIO("create_config", "")
Dnp3IndexedGroup binary_commands = {0};
Dnp3IndexedGroup analog_commands = {0};
Dnp3MappedGroup measurements = {0};
chrono::milliseconds poll_interval(1);
std::uint16_t port;
GIVEN("<input stream>")
@ -44,7 +44,7 @@ SCENARIO("create_config", "")
{
GlueVariablesBinding bindings(&glue_mutex, 0, nullptr, nullptr);
std::stringstream input_stream;
const OutstationStackConfig config(dnp3_create_config(input_stream, bindings, binary_commands, analog_commands, measurements, port));
const OutstationStackConfig config(dnp3_create_config(input_stream, bindings, binary_commands, analog_commands, measurements, port, poll_interval));
REQUIRE(config.dbConfig.binary.IsEmpty());
REQUIRE(config.dbConfig.doubleBinary.IsEmpty());
@ -69,7 +69,7 @@ SCENARIO("create_config", "")
};
GlueVariablesBinding bindings(&glue_mutex, 1, glue_vars, nullptr);
std::stringstream input_stream("[dnp3s]\nbind_location=name:%QX0.0,group:1,index:0,");
const OutstationStackConfig config(dnp3_create_config(input_stream, bindings, binary_commands, analog_commands, measurements, port));
const OutstationStackConfig config(dnp3_create_config(input_stream, bindings, binary_commands, analog_commands, measurements, port, poll_interval));
REQUIRE(config.dbConfig.binary.Size() == 1);
REQUIRE(config.dbConfig.doubleBinary.Size() == 0);
@ -97,7 +97,7 @@ SCENARIO("create_config", "")
};
GlueVariablesBinding bindings(&glue_mutex, 1, glue_vars, nullptr);
std::stringstream input_stream("[dnp3s]\nbind_location=name:%IX0.0,group:1,index:1,");
const OutstationStackConfig config(dnp3_create_config(input_stream, bindings, binary_commands, analog_commands, measurements, port));
const OutstationStackConfig config(dnp3_create_config(input_stream, bindings, binary_commands, analog_commands, measurements, port, poll_interval));
REQUIRE(config.dbConfig.binary.Size() == 1);
REQUIRE(config.dbConfig.doubleBinary.Size() == 0);
@ -125,7 +125,7 @@ SCENARIO("create_config", "")
};
GlueVariablesBinding bindings(&glue_mutex, 1, glue_vars, nullptr);
std::stringstream input_stream("[dnp3s]\nbind_location=name:%IX0.0,group:12,index:1,");
const OutstationStackConfig config(dnp3_create_config(input_stream, bindings, binary_commands, analog_commands, measurements, port));
const OutstationStackConfig config(dnp3_create_config(input_stream, bindings, binary_commands, analog_commands, measurements, port, poll_interval));
REQUIRE(config.dbConfig.binary.Size() == 0);
REQUIRE(config.dbConfig.doubleBinary.Size() == 0);
@ -155,7 +155,7 @@ SCENARIO("create_config", "")
};
GlueVariablesBinding bindings(&glue_mutex, 1, glue_vars, nullptr);
std::stringstream input_stream("[dnp3s]\nbind_location=name:%QD0,group:30,index:1,");
const OutstationStackConfig config(dnp3_create_config(input_stream, bindings, binary_commands, analog_commands, measurements, port));
const OutstationStackConfig config(dnp3_create_config(input_stream, bindings, binary_commands, analog_commands, measurements, port, poll_interval));
REQUIRE(config.dbConfig.binary.Size() == 0);
REQUIRE(config.dbConfig.doubleBinary.Size() == 0);