Merge pull request #97 from smartergridsolutions/feature/PR-767
Make interactive server follow the same service model. Fix multi-threading issues.
This commit is contained in:
commit
193ba2c77a
|
@ -81,6 +81,9 @@ void bootstrap() {
|
|||
const char* config_path = "../etc/config.ini";
|
||||
if (ini_parse(config_path, config_handler, &config) < 0) {
|
||||
spdlog::info("Config file {} could not be read", config_path);
|
||||
// If we don't have the config file, then default to always
|
||||
// starting the interactive server.
|
||||
config.services.push_back("interactive");
|
||||
}
|
||||
|
||||
//======================================================
|
||||
|
|
|
@ -18,12 +18,13 @@
|
|||
|
||||
#ifdef OPLC_DNP3_OUTSTATION
|
||||
|
||||
#include <cctype>
|
||||
#include <cstdio>
|
||||
#include <cstdlib>
|
||||
#include <ctime>
|
||||
#include <csignal>
|
||||
#include <algorithm>
|
||||
#include <cctype>
|
||||
#include <chrono>
|
||||
#include <fstream>
|
||||
#include <functional>
|
||||
#include <iostream>
|
||||
|
@ -271,17 +272,21 @@ void bind_variables(const vector<string>& binding_defs,
|
|||
/// This is populated with values from the config file.
|
||||
struct Dnp3Config {
|
||||
Dnp3Config() :
|
||||
poll_interval(std::chrono::milliseconds(250)),
|
||||
port(20000),
|
||||
link(false, false)
|
||||
{}
|
||||
|
||||
// How fast we send and receive data into the runtime.
|
||||
std::chrono::milliseconds poll_interval;
|
||||
|
||||
uint16_t port;
|
||||
|
||||
/// Outstation config
|
||||
opendnp3::OutstationConfig outstation;
|
||||
|
||||
/// Link layer config
|
||||
opendnp3::LinkConfig link;
|
||||
opendnp3::LinkConfig link;
|
||||
|
||||
/// Descriptions of the bindings we want to create.
|
||||
vector<string> bindings;
|
||||
|
@ -346,7 +351,8 @@ OutstationStackConfig dnp3_create_config(istream& cfg_stream,
|
|||
Dnp3IndexedGroup& binary_commands,
|
||||
Dnp3IndexedGroup& analog_commands,
|
||||
Dnp3MappedGroup& measurements,
|
||||
uint16_t& port) {
|
||||
uint16_t& port,
|
||||
std::chrono::milliseconds& poll_interval) {
|
||||
// We need to know the size of the database (number of points) before
|
||||
// we can do anything. To avoid doing two passes of the stream, read
|
||||
// everything into a map, then get the database size, and finally
|
||||
|
@ -376,6 +382,7 @@ OutstationStackConfig dnp3_create_config(istream& cfg_stream,
|
|||
config.link = dnp3_config.link;
|
||||
|
||||
port = dnp3_config.port;
|
||||
poll_interval = dnp3_config.poll_interval;
|
||||
|
||||
return config;
|
||||
}
|
||||
|
@ -390,9 +397,10 @@ void dnp3s_start_server(unique_ptr<istream, function<void(istream*)>>& cfg_strea
|
|||
Dnp3IndexedGroup analog_commands = {0};
|
||||
Dnp3MappedGroup measurements = {0};
|
||||
uint16_t port;
|
||||
chrono::milliseconds poll_interval;
|
||||
auto config(dnp3_create_config(*cfg_stream, glue_variables,
|
||||
binary_commands, analog_commands,
|
||||
measurements, port));
|
||||
measurements, port, poll_interval));
|
||||
|
||||
// If we have a config override, then check for the port number
|
||||
port = strlen(cfg_overrides) > 0 ? atoi(cfg_overrides) : port;
|
||||
|
@ -425,10 +433,6 @@ void dnp3s_start_server(unique_ptr<istream, function<void(istream*)>>& cfg_strea
|
|||
|
||||
spdlog::info("DNP3 outstation enabled on port {0:d}", port);
|
||||
|
||||
// Continuously update
|
||||
struct timespec timer_start;
|
||||
clock_gettime(CLOCK_MONOTONIC, &timer_start);
|
||||
|
||||
// Run this until we get a signal to stop.
|
||||
while (run) {
|
||||
{
|
||||
|
@ -440,7 +444,7 @@ void dnp3s_start_server(unique_ptr<istream, function<void(istream*)>>& cfg_strea
|
|||
spdlog::trace("{} data points written to outstation", num_writes);
|
||||
}
|
||||
|
||||
sleep_until(&timer_start, OPLC_CYCLE);
|
||||
this_thread::sleep_for(poll_interval);
|
||||
}
|
||||
|
||||
outstation->Disable();
|
||||
|
|
|
@ -140,7 +140,8 @@ asiodnp3::OutstationStackConfig dnp3_create_config(std::istream& cfg_stream,
|
|||
Dnp3IndexedGroup& binary_commands,
|
||||
Dnp3IndexedGroup& analog_commands,
|
||||
Dnp3MappedGroup& measurements,
|
||||
uint16_t& port);
|
||||
uint16_t& port,
|
||||
std::chrono::milliseconds& poll_interval);
|
||||
|
||||
/// @brief Start the DNP3 server running on the specified port and configured
|
||||
/// using the specified stream.
|
||||
|
|
|
@ -51,7 +51,7 @@ inline bool ini_matches(const char* section_expected,
|
|||
/// @return the string or null if cannot read more.
|
||||
static char* istream_fgets(char* str, int num, void* stream) {
|
||||
auto st = reinterpret_cast<std::istream*>(stream);
|
||||
if (!st || st->eof()) {
|
||||
if (!st->good() || st->eof()) {
|
||||
// We previously reached the end of the file, so return the end signal.
|
||||
return nullptr;
|
||||
}
|
||||
|
|
|
@ -1,4 +1,5 @@
|
|||
// Copyright 2018 Thiago Alves
|
||||
// Copyright 2019 Smarter Grid Solutions
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
|
@ -21,18 +22,18 @@
|
|||
// Thiago Alves, Jun 2018
|
||||
//-----------------------------------------------------------------------------
|
||||
|
||||
#include <stdio.h>
|
||||
#include <stdlib.h>
|
||||
#include <unistd.h>
|
||||
#include <errno.h>
|
||||
#include <cstring>
|
||||
#include <cstdint>
|
||||
#include <chrono>
|
||||
#include <fstream>
|
||||
#include <functional>
|
||||
#include <istream>
|
||||
#include <memory>
|
||||
#include <netdb.h>
|
||||
#include <string.h>
|
||||
#include <pthread.h>
|
||||
#include <mutex>
|
||||
#include <thread>
|
||||
|
||||
#include <stdio.h>
|
||||
#include <arpa/inet.h>
|
||||
#include <fcntl.h>
|
||||
#include <time.h>
|
||||
#include <netinet/in.h>
|
||||
|
||||
#include <spdlog/spdlog.h>
|
||||
|
||||
|
@ -46,16 +47,15 @@
|
|||
* @{
|
||||
*/
|
||||
|
||||
//Global Variables
|
||||
const uint16_t BUFFER_MAX_SIZE(1024);
|
||||
std::mutex command_mutex;
|
||||
|
||||
// TODO Globals to move into services
|
||||
bool run_modbus = 0;
|
||||
uint16_t modbus_port = 502;
|
||||
bool run_enip = 0;
|
||||
uint16_t enip_port = 44818;
|
||||
unsigned char server_command[1024];
|
||||
int command_index = 0;
|
||||
bool processing_command = 0;
|
||||
time_t start_time;
|
||||
time_t end_time;
|
||||
|
||||
//Global Threads
|
||||
pthread_t modbus_thread;
|
||||
|
@ -92,11 +92,11 @@ void *enipThread(void *arg)
|
|||
/// @brief Read the argument from a command function
|
||||
/// @param *command
|
||||
///////////////////////////////////////////////////////////////////////////////
|
||||
int readCommandArgument(unsigned char *command)
|
||||
int readCommandArgument(const char *command)
|
||||
{
|
||||
int i = 0;
|
||||
int j = 0;
|
||||
unsigned char argument[1024];
|
||||
char argument[BUFFER_MAX_SIZE];
|
||||
|
||||
while (command[i] != '(' && command[i] != '\0') i++;
|
||||
if (command[i] == '(') i++;
|
||||
|
@ -118,7 +118,7 @@ int readCommandArgument(unsigned char *command)
|
|||
/// @return Zero on success. Non-zero on failure. If this function
|
||||
/// fails, it marks the target as an empty string so it is still safe
|
||||
/// to read the string but it will be empty.
|
||||
std::int8_t copy_command_config(unsigned char *source, char target[],
|
||||
std::int8_t copy_command_config(const char *source, char target[],
|
||||
size_t target_size)
|
||||
{
|
||||
// Search through source until we find the closing ")"
|
||||
|
@ -147,66 +147,68 @@ std::int8_t copy_command_config(unsigned char *source, char target[],
|
|||
return 0;
|
||||
}
|
||||
|
||||
///////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief Create the socket and bind it.
|
||||
/// @param port
|
||||
/// @return the file descriptor for the socket
|
||||
///////////////////////////////////////////////////////////////////////////////
|
||||
int createSocket_interactive(int port)
|
||||
/// @return the file descriptor for the socket, or less than 0 if a socket
|
||||
/// if an error occurred.
|
||||
int interactive_open_socket(uint16_t port)
|
||||
{
|
||||
int socket_fd;
|
||||
struct sockaddr_in server_addr;
|
||||
|
||||
//Create TCP Socket
|
||||
socket_fd = socket(AF_INET,SOCK_STREAM,0);
|
||||
socket_fd = socket(AF_INET, SOCK_STREAM, 0);
|
||||
if (socket_fd < 0)
|
||||
{
|
||||
spdlog::error("Interactive Server: error creating stream socket => {}", strerror(errno));
|
||||
exit(1);
|
||||
spdlog::error("Interactive Server: error creating stream socket => {}", strerror(errno));
|
||||
return -1;
|
||||
}
|
||||
|
||||
//Set SO_REUSEADDR
|
||||
int enable = 1;
|
||||
if (setsockopt(socket_fd, SOL_SOCKET, SO_REUSEADDR, &enable, sizeof(int)) < 0) {
|
||||
perror("setsockopt(SO_REUSEADDR) failed");
|
||||
}
|
||||
if (setsockopt(socket_fd, SOL_SOCKET, SO_REUSEADDR, &enable, sizeof(int)) < 0) {
|
||||
perror("setsockopt(SO_REUSEADDR) failed");
|
||||
}
|
||||
|
||||
SetSocketBlockingEnabled(socket_fd, false);
|
||||
|
||||
//Initialize Server Struct
|
||||
bzero((char *) &server_addr, sizeof(server_addr));
|
||||
memset(&server_addr, 0, sizeof(server_addr));
|
||||
server_addr.sin_family = AF_INET;
|
||||
server_addr.sin_addr.s_addr = inet_addr("127.0.0.1");
|
||||
server_addr.sin_port = htons(port);
|
||||
|
||||
//Bind socket
|
||||
if (bind(socket_fd,(struct sockaddr *)&server_addr,sizeof(server_addr)) < 0)
|
||||
if (bind(socket_fd, (struct sockaddr *)&server_addr, sizeof(server_addr)) < 0)
|
||||
{
|
||||
spdlog::error("Interactive Server: error binding socket => {}", strerror(errno));
|
||||
exit(1);
|
||||
spdlog::error("Interactive Server: error binding socket => {}", strerror(errno));
|
||||
return -2;
|
||||
}
|
||||
|
||||
// we accept max 5 pending connections
|
||||
listen(socket_fd,5);
|
||||
spdlog::info("Interactive Server: Listening on port {}", port);
|
||||
listen(socket_fd, 5);
|
||||
spdlog::info("Interactive Server: Listening on port {}", port);
|
||||
|
||||
return socket_fd;
|
||||
}
|
||||
|
||||
///////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief Blocking call. Wait here for the client to connect.
|
||||
/// @param socket_fd
|
||||
/// @return the file descriptor descriptor to communicate with the client.
|
||||
///////////////////////////////////////////////////////////////////////////////
|
||||
int waitForClient_interactive(int socket_fd)
|
||||
/// Poll for new clients that want to connect to the runtime.
|
||||
///
|
||||
/// This returns a client file desriptor for each client that wants to
|
||||
/// attach.
|
||||
/// @param run A flag that is set to false when we should stop polling.
|
||||
/// @param socket_fd The socket file descriptor we are listening on.
|
||||
/// @return the client file descriptor.
|
||||
int interactive_wait_new_client(volatile bool& run, int socket_fd)
|
||||
{
|
||||
int client_fd;
|
||||
struct sockaddr_in client_addr;
|
||||
socklen_t client_len;
|
||||
|
||||
spdlog::debug("Interactive Server: waiting for new client...");
|
||||
spdlog::trace("Interactive Server: waiting for new client...");
|
||||
|
||||
client_len = sizeof(client_addr);
|
||||
while (run_openplc)
|
||||
while (run)
|
||||
{
|
||||
client_fd = accept(socket_fd, (struct sockaddr *)&client_addr, &client_len); //non-blocking call
|
||||
if (client_fd > 0)
|
||||
|
@ -214,149 +216,88 @@ int waitForClient_interactive(int socket_fd)
|
|||
SetSocketBlockingEnabled(client_fd, true);
|
||||
break;
|
||||
}
|
||||
sleepms(100);
|
||||
|
||||
this_thread::sleep_for(chrono::milliseconds(100));
|
||||
}
|
||||
|
||||
return client_fd;
|
||||
}
|
||||
|
||||
/////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief Blocking call. Holds here until something is received from the client.
|
||||
/// Once the message is received, it is stored on the buffer and the function
|
||||
/// returns the number of bytes received.
|
||||
/// @param client_fd
|
||||
/// @param *buffer
|
||||
/// @return the number of bytes received.
|
||||
///////////////////////////////////////////////////////////////////////////////
|
||||
int listenToClient_interactive(int client_fd, unsigned char *buffer)
|
||||
{
|
||||
bzero(buffer, 1024);
|
||||
int n = read(client_fd, buffer, 1024);
|
||||
return n;
|
||||
}
|
||||
|
||||
/////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief Process client's commands for the interactive server
|
||||
/// @param *buffer
|
||||
/// @param client_fd
|
||||
/////////////////////////////////////////////////////////////////////////////////
|
||||
void processCommand(unsigned char *buffer, int client_fd)
|
||||
{
|
||||
spdlog::debug("Process command received {}", buffer);
|
||||
|
||||
int count_char = 0;
|
||||
|
||||
/// Handle a single command from the client.
|
||||
/// @param command The command as a text string.
|
||||
/// @param client_fd The file descriptor to write a response to.
|
||||
void interactive_client_command(const char* command, int client_fd) {
|
||||
// A buffer for the command configuration information.
|
||||
const size_t COMMAND_CONFIG_SIZE(1024);
|
||||
char command_config[COMMAND_CONFIG_SIZE];
|
||||
|
||||
if (processing_command)
|
||||
{
|
||||
count_char = sprintf(buffer, "Processing command...\n");
|
||||
write(client_fd, buffer, count_char);
|
||||
char command_buffer[BUFFER_MAX_SIZE];
|
||||
|
||||
std::unique_lock<std::mutex> lock(command_mutex, std::defer_lock);
|
||||
if (!lock.try_lock()) {
|
||||
spdlog::trace("Process command skipped because already processing {}", command);
|
||||
int count_char = sprintf(command_buffer, "Another command in progress...\n");
|
||||
write(client_fd, command_buffer, count_char);
|
||||
return;
|
||||
}
|
||||
|
||||
if (strncmp(buffer, "quit()", 6) == 0)
|
||||
|
||||
spdlog::trace("Process command received {}", command);
|
||||
|
||||
if (strncmp(command, "quit()", 6) == 0)
|
||||
{
|
||||
processing_command = true;
|
||||
spdlog::info("Issued quit() command");
|
||||
spdlog::info("Issued quit() command");
|
||||
if (run_modbus)
|
||||
{
|
||||
run_modbus = 0;
|
||||
pthread_join(modbus_thread, NULL);
|
||||
spdlog::info("Modbus server was stopped");
|
||||
spdlog::info("Modbus server was stopped");
|
||||
}
|
||||
services_stop();
|
||||
run_openplc = 0;
|
||||
processing_command = false;
|
||||
}
|
||||
else if (strncmp(buffer, "start_modbus(", 13) == 0)
|
||||
else if (strncmp(command, "start_modbus(", 13) == 0)
|
||||
{
|
||||
processing_command = true;
|
||||
modbus_port = readCommandArgument(buffer);
|
||||
spdlog::info("Issued start_modbus() command to start on port: {}", modbus_port);
|
||||
modbus_port = readCommandArgument(command);
|
||||
spdlog::info("Issued start_modbus() command to start on port: {}", modbus_port);
|
||||
|
||||
if (run_modbus)
|
||||
{
|
||||
spdlog::info("Modbus server already active. Restarting on port: {}", modbus_port);
|
||||
spdlog::info("Modbus server already active. Restarting on port: {}", modbus_port);
|
||||
//Stop Modbus server
|
||||
run_modbus = 0;
|
||||
pthread_join(modbus_thread, NULL);
|
||||
spdlog::info("Modbus server was stopped");
|
||||
spdlog::info("Modbus server was stopped");
|
||||
}
|
||||
//Start Modbus server
|
||||
run_modbus = 1;
|
||||
pthread_create(&modbus_thread, NULL, modbusThread, NULL);
|
||||
processing_command = false;
|
||||
}
|
||||
else if (strncmp(buffer, "stop_modbus()", 13) == 0)
|
||||
else if (strncmp(command, "stop_modbus()", 13) == 0)
|
||||
{
|
||||
processing_command = true;
|
||||
spdlog::info("Issued stop_modbus() command");
|
||||
spdlog::info("Issued stop_modbus() command");
|
||||
if (run_modbus)
|
||||
{
|
||||
run_modbus = 0;
|
||||
pthread_join(modbus_thread, NULL);
|
||||
spdlog::info("Modbus server was stopped");
|
||||
spdlog::info("Modbus server was stopped");
|
||||
}
|
||||
processing_command = false;
|
||||
}
|
||||
#ifdef OPLC_DNP3_OUTSTATION
|
||||
else if (strncmp(buffer, "start_dnp3(", 11) == 0)
|
||||
else if (strncmp(command, "start_dnp3(", 11) == 0)
|
||||
{
|
||||
processing_command = true;
|
||||
ServiceDefinition* def = services_find("dnp3s");
|
||||
if (def && copy_command_config(buffer + 11, command_config, COMMAND_CONFIG_SIZE) == 0) {
|
||||
def->start(command_config);
|
||||
if (def && copy_command_config(command + 11, command_buffer, BUFFER_MAX_SIZE) == 0) {
|
||||
def->start(command_buffer);
|
||||
}
|
||||
processing_command = false;
|
||||
}
|
||||
else if (strncmp(buffer, "stop_dnp3()", 11) == 0)
|
||||
else if (strncmp(command, "stop_dnp3()", 11) == 0)
|
||||
{
|
||||
processing_command = true;
|
||||
ServiceDefinition* def = services_find("dnp3s");
|
||||
if (def) {
|
||||
def->stop();
|
||||
}
|
||||
processing_command = false;
|
||||
}
|
||||
#endif // OPLC_DNP3_OUTSTATION
|
||||
else if (strncmp(buffer, "start_enip(", 11) == 0)
|
||||
else if (strncmp(command, "start_enip(", 11) == 0)
|
||||
{
|
||||
processing_command = true;
|
||||
spdlog::info("Issued start_enip() command to start on port: {}", readCommandArgument(buffer));
|
||||
enip_port = readCommandArgument(buffer);
|
||||
if (run_enip)
|
||||
{
|
||||
spdlog::info("EtherNet/IP server already active. Restarting on port: {}", enip_port);
|
||||
//Stop Enip server
|
||||
run_enip = 0;
|
||||
pthread_join(enip_thread, NULL);
|
||||
spdlog::info("EtherNet/IP server was stopped");
|
||||
}
|
||||
//Start Enip server
|
||||
run_enip = 1;
|
||||
pthread_create(&enip_thread, NULL, enipThread, NULL);
|
||||
processing_command = false;
|
||||
}
|
||||
else if (strncmp(buffer, "stop_enip()", 11) == 0)
|
||||
{
|
||||
processing_command = true;
|
||||
spdlog::info("Issued stop_enip() command");
|
||||
if (run_enip)
|
||||
{
|
||||
run_enip = 0;
|
||||
pthread_join(enip_thread, NULL);
|
||||
spdlog::info("EtherNet/IP server was stopped");
|
||||
}
|
||||
processing_command = false;
|
||||
}
|
||||
else if (strncmp(buffer, "start_enip(", 11) == 0)
|
||||
{
|
||||
processing_command = true;
|
||||
enip_port = readCommandArgument(buffer);
|
||||
spdlog::info("Issued start_enip() command to start on port: {}", enip_port);
|
||||
spdlog::info("Issued start_enip() command to start on port: {}", readCommandArgument(command));
|
||||
enip_port = readCommandArgument(command);
|
||||
if (run_enip)
|
||||
{
|
||||
spdlog::info("EtherNet/IP server already active. Restarting on port: {}", enip_port);
|
||||
|
@ -368,11 +309,9 @@ void processCommand(unsigned char *buffer, int client_fd)
|
|||
//Start Enip server
|
||||
run_enip = 1;
|
||||
pthread_create(&enip_thread, NULL, enipThread, NULL);
|
||||
processing_command = false;
|
||||
}
|
||||
else if (strncmp(buffer, "stop_enip()", 11) == 0)
|
||||
else if (strncmp(command, "stop_enip()", 11) == 0)
|
||||
{
|
||||
processing_command = true;
|
||||
spdlog::info("Issued stop_enip() command");
|
||||
if (run_enip)
|
||||
{
|
||||
|
@ -380,166 +319,148 @@ void processCommand(unsigned char *buffer, int client_fd)
|
|||
pthread_join(enip_thread, NULL);
|
||||
spdlog::info("EtherNet/IP server was stopped");
|
||||
}
|
||||
processing_command = false;
|
||||
}
|
||||
else if (strncmp(buffer, "start_pstorage(", 15) == 0)
|
||||
else if (strncmp(command, "start_pstorage(", 15) == 0)
|
||||
{
|
||||
processing_command = true;
|
||||
ServiceDefinition* def = services_find("pstorage");
|
||||
if (def && copy_command_config(buffer + 15, command_config, COMMAND_CONFIG_SIZE) == 0) {
|
||||
def->start(command_config);
|
||||
if (def && copy_command_config(command + 15, command_buffer, BUFFER_MAX_SIZE) == 0) {
|
||||
def->start(command_buffer);
|
||||
}
|
||||
processing_command = false;
|
||||
}
|
||||
else if (strncmp(buffer, "stop_pstorage()", 15) == 0)
|
||||
else if (strncmp(command, "stop_pstorage()", 15) == 0)
|
||||
{
|
||||
processing_command = true;
|
||||
ServiceDefinition* def = services_find("pstorage");
|
||||
if (def) {
|
||||
def->stop();
|
||||
}
|
||||
processing_command = false;
|
||||
}
|
||||
else if (strncmp(buffer, "runtime_logs()", 14) == 0)
|
||||
else if (strncmp(command, "runtime_logs()", 14) == 0)
|
||||
{
|
||||
processing_command = true;
|
||||
spdlog::debug("Issued runtime_logs() command");
|
||||
std::string data = log_sink->data();
|
||||
write(client_fd, data.c_str(), data.size());
|
||||
processing_command = false;
|
||||
return;
|
||||
}
|
||||
else if (strncmp(buffer, "exec_time()", 11) == 0)
|
||||
else if (strncmp(command, "exec_time()", 11) == 0)
|
||||
{
|
||||
processing_command = true;
|
||||
time_t end_time;
|
||||
time(&end_time);
|
||||
count_char = sprintf(buffer, "%llu\n", (unsigned long long)difftime(end_time, start_time));
|
||||
write(client_fd, buffer, count_char);
|
||||
processing_command = false;
|
||||
int count_char = sprintf(command_buffer, "%llu\n", (unsigned long long)difftime(end_time, start_time));
|
||||
write(client_fd, command_buffer, count_char);
|
||||
return;
|
||||
}
|
||||
else
|
||||
{
|
||||
processing_command = true;
|
||||
count_char = sprintf(buffer, "Error: unrecognized command\n");
|
||||
write(client_fd, buffer, count_char);
|
||||
processing_command = false;
|
||||
int count_char = sprintf(command_buffer, "Error: unrecognized command\n");
|
||||
write(client_fd, command_buffer, count_char);
|
||||
return;
|
||||
}
|
||||
|
||||
count_char = sprintf(buffer, "OK\n");
|
||||
write(client_fd, buffer, count_char);
|
||||
int count_char = sprintf(command_buffer, "OK\n");
|
||||
write(client_fd, command_buffer, count_char);
|
||||
}
|
||||
|
||||
/////////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief Process client's request
|
||||
/// @param *buffer
|
||||
/// @param bufferSize
|
||||
/// @param client_fd
|
||||
/////////////////////////////////////////////////////////////////////////////////////
|
||||
void processMessage_interactive(unsigned char *buffer, int bufferSize, int client_fd)
|
||||
{
|
||||
for (int i = 0; i < bufferSize; i++)
|
||||
{
|
||||
if (buffer[i] == '\r' || buffer[i] == '\n' || command_index >= 1024)
|
||||
{
|
||||
processCommand(server_command, client_fd);
|
||||
command_index = 0;
|
||||
break;
|
||||
}
|
||||
server_command[command_index] = buffer[i];
|
||||
command_index++;
|
||||
server_command[command_index] = '\0';
|
||||
}
|
||||
}
|
||||
/// Defines the arguments that client threads receive.
|
||||
struct ClientArgs {
|
||||
/// The client file descriptor for reading and writing.
|
||||
int client_fd;
|
||||
/// A flag that can indicate termination of the thread.
|
||||
volatile bool* run;
|
||||
};
|
||||
|
||||
/////////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief Thread to handle requests for each connected client
|
||||
/// @param *arguments
|
||||
/////////////////////////////////////////////////////////////////////////////////////
|
||||
void *handleConnections_interactive(void *arguments)
|
||||
{
|
||||
int client_fd = *(int *)arguments;
|
||||
unsigned char buffer[1024];
|
||||
int messageSize;
|
||||
/// Response to a client connection in a unique thread.
|
||||
/// @param arguments The arguments to the thread - must be the client arguments
|
||||
/// struture. This thread will be responsible for freeing the arguments.
|
||||
/// @return always nullptr.
|
||||
void* interactive_client_run(void* arguments) {
|
||||
auto client_args = reinterpret_cast<ClientArgs*>(arguments);
|
||||
|
||||
spdlog::debug("Interactive Server: Thread created for client ID: {}", client_fd);
|
||||
char buffer[BUFFER_MAX_SIZE];
|
||||
int message_size;
|
||||
|
||||
while(run_openplc)
|
||||
{
|
||||
//unsigned char buffer[1024];
|
||||
//int messageSize;
|
||||
while (*client_args->run) {
|
||||
memset(buffer, 0, BUFFER_MAX_SIZE);
|
||||
message_size = read(client_args->client_fd, buffer, BUFFER_MAX_SIZE);
|
||||
|
||||
messageSize = listenToClient_interactive(client_fd, buffer);
|
||||
if (messageSize <= 0 || messageSize > 1024)
|
||||
{
|
||||
// something has gone wrong or the client has closed connection
|
||||
if (messageSize == 0)
|
||||
{
|
||||
spdlog::debug("Interactive Server: client ID: {} has closed the connection", client_fd);
|
||||
}
|
||||
else
|
||||
{
|
||||
spdlog::error("Interactive Server: Something is wrong with the client ID: {} message Size : {}", client_fd, messageSize);
|
||||
}
|
||||
if (message_size <= 0) {
|
||||
spdlog::trace("Interactive Server: client ID: {} has closed the connection", client_args->client_fd);
|
||||
break;
|
||||
}
|
||||
|
||||
processMessage_interactive(buffer, messageSize, client_fd);
|
||||
if (message_size > BUFFER_MAX_SIZE) {
|
||||
spdlog::error("Interactive Server: Message size is too large for client {}", client_args->client_fd);
|
||||
break;
|
||||
}
|
||||
|
||||
// Process the received buffer, splitting into commands
|
||||
char* start = buffer;
|
||||
uint16_t cur_pos = 0;
|
||||
|
||||
while (cur_pos < BUFFER_MAX_SIZE && buffer[cur_pos] != '\0') {
|
||||
if (buffer[cur_pos] == '\n' || buffer[cur_pos] == '\r') {
|
||||
// Terminate the command
|
||||
buffer[cur_pos] = '\0';
|
||||
interactive_client_command(start, client_args->client_fd);
|
||||
}
|
||||
cur_pos += 1;
|
||||
}
|
||||
}
|
||||
|
||||
spdlog::debug("Closing client socket and calling pthread_exit in interactive_server.cpp");
|
||||
closeSocket(client_fd);
|
||||
spdlog::debug("Terminating interactive server connections");
|
||||
closeSocket(client_args->client_fd);
|
||||
spdlog::trace("Interactive server connection completed");
|
||||
delete client_args;
|
||||
pthread_exit(NULL);
|
||||
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
/////////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief Function to start the server. It receives the port number as argument and
|
||||
/// creates an infinite loop to listen and parse the messages sent by the
|
||||
/// clients
|
||||
/// @param port
|
||||
/////////////////////////////////////////////////////////////////////////////////////
|
||||
void startInteractiveServer(int port)
|
||||
{
|
||||
int socket_fd, client_fd;
|
||||
socket_fd = createSocket_interactive(port);
|
||||
/// Run the interactive server for responding to control and measurement
|
||||
/// requests through the socket API. This is the primary means for
|
||||
/// controlling the runtime.
|
||||
int8_t interactive_run(std::unique_ptr<istream, function<void(istream*)>>& cfg_stream,
|
||||
const char* cfg_overrides,
|
||||
const GlueVariablesBinding& bindings,
|
||||
volatile bool& run) {
|
||||
const uint16_t port = 43628;
|
||||
int socket_fd = interactive_open_socket(port);
|
||||
|
||||
while(run_openplc)
|
||||
{
|
||||
client_fd = waitForClient_interactive(socket_fd); //block until a client connects
|
||||
if (client_fd < 0)
|
||||
{
|
||||
spdlog::error("Interactive Server: Error accepting client!");
|
||||
// Listen for new connections to our socket. When we have a new
|
||||
// connection, we spawn a new thread to handle that connection.
|
||||
while (run) {
|
||||
int client_fd = interactive_wait_new_client(run, socket_fd);
|
||||
|
||||
if (client_fd < 0) {
|
||||
spdlog::error("Interactive Server: Error accepting client!");
|
||||
continue;
|
||||
}
|
||||
else
|
||||
{
|
||||
int arguments[1];
|
||||
pthread_t thread;
|
||||
int ret = -1;
|
||||
|
||||
spdlog::debug("Interactive Server: Client accepted! Creating thread for the new client ID: {}", client_fd);
|
||||
arguments[0] = client_fd;
|
||||
ret = pthread_create(&thread, NULL, handleConnections_interactive, arguments);
|
||||
if (ret==0)
|
||||
{
|
||||
pthread_detach(thread);
|
||||
}
|
||||
pthread_t thread;
|
||||
auto client_args = new ClientArgs { .client_fd=client_fd, .run=&run };
|
||||
|
||||
spdlog::trace("Interactive Server: Client accepted! Creating thread for the new client ID: {}", client_fd);
|
||||
int ret = pthread_create(&thread, NULL, interactive_client_run, client_args);
|
||||
if (ret == 0) {
|
||||
pthread_detach(thread);
|
||||
} else {
|
||||
delete client_args;
|
||||
}
|
||||
}
|
||||
spdlog::info("Shutting down internal threads\n");
|
||||
run_modbus = 0;
|
||||
run_enip = 0;
|
||||
pthread_join(modbus_thread, NULL);
|
||||
pthread_join(enip_thread, NULL);
|
||||
|
||||
spdlog::info("Closing socket...");
|
||||
closeSocket(socket_fd);
|
||||
closeSocket(client_fd);
|
||||
spdlog::debug("Terminating interactive server thread");
|
||||
}
|
||||
|
||||
void initializeLogging(int argc,char **argv)
|
||||
void interactive_service_run(const GlueVariablesBinding& binding,
|
||||
volatile bool& run, const char* config) {
|
||||
unique_ptr<istream, function<void(istream*)>> cfg_stream(new ifstream("../etc/config.ini"), [](istream* s)
|
||||
{
|
||||
reinterpret_cast<ifstream*>(s)->close();
|
||||
delete s;
|
||||
});
|
||||
|
||||
interactive_run(cfg_stream, config, binding, run);
|
||||
}
|
||||
|
||||
void initialize_logging(int argc,char **argv)
|
||||
{
|
||||
log_sink.reset(new buffered_sink(log_buffer, LOG_BUFFER_SIZE));
|
||||
spdlog::default_logger()->sinks().push_back(log_sink);
|
||||
|
|
|
@ -0,0 +1,32 @@
|
|||
// Copyright 2018 Thiago Alves
|
||||
// Copyright 2019 Smarter Grid Solutions
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http ://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissionsand
|
||||
// limitations under the License.
|
||||
|
||||
/** \addtogroup openplc_runtime
|
||||
* @{
|
||||
*/
|
||||
|
||||
class GlueVariablesBinding;
|
||||
|
||||
/// @brief Start the interactive socket server.
|
||||
///
|
||||
/// @param glue_variables The glue variables that may be bound into this
|
||||
/// server.
|
||||
/// @param run A signal for running this server. This server terminates when
|
||||
/// this signal is false.
|
||||
/// @param config The custom configuration for this service.
|
||||
void interactive_service_run(const GlueVariablesBinding& binding,
|
||||
volatile bool& run, const char* config);
|
||||
|
||||
/** @}*/
|
|
@ -133,12 +133,10 @@ void closeSocket(int fd);
|
|||
bool SetSocketBlockingEnabled(int fd, bool blocking);
|
||||
|
||||
//interactive_server.cpp
|
||||
void startInteractiveServer(int port);
|
||||
void initializeLogging(int argc,char **argv);
|
||||
void initialize_logging(int argc,char **argv);
|
||||
extern bool run_modbus;
|
||||
extern bool run_enip;
|
||||
extern time_t start_time;
|
||||
extern time_t end_time;
|
||||
|
||||
//modbus.cpp
|
||||
int processModbusMessage(unsigned char *buffer, int bufferSize);
|
||||
|
|
|
@ -80,16 +80,6 @@ void sleepms(int milliseconds)
|
|||
nanosleep(&ts, NULL);
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// \brief Interactive Server Thread. Creates the server to listen to commands
|
||||
/// on localhost
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
void *interactiveServerThread(void *arg)
|
||||
{
|
||||
startInteractiveServer(43628);
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// \brief Verify if pin is present in one of the ignored vectors
|
||||
/// \param
|
||||
|
@ -165,15 +155,15 @@ void handleSpecialFunctions()
|
|||
|
||||
int main(int argc,char **argv)
|
||||
{
|
||||
initializeLogging(argc, argv);
|
||||
initialize_logging(argc, argv);
|
||||
spdlog::info("OpenPLC Runtime starting...");
|
||||
|
||||
bootstrap();
|
||||
|
||||
// Start the thread for the interactive server
|
||||
time(&start_time);
|
||||
pthread_t interactive_thread;
|
||||
pthread_create(&interactive_thread, NULL, interactiveServerThread, NULL);
|
||||
|
||||
// Start the thread for the any services we have, including the
|
||||
// interactive server and anything out that is configured to
|
||||
// automatically start
|
||||
bootstrap();
|
||||
|
||||
//gets the starting point for the clock
|
||||
spdlog::debug("Getting current time");
|
||||
|
@ -212,8 +202,8 @@ int main(int argc,char **argv)
|
|||
// SHUTTING DOWN OPENPLC RUNTIME
|
||||
//======================================================
|
||||
services_stop();
|
||||
services_finalize();
|
||||
|
||||
pthread_join(interactive_thread, NULL);
|
||||
spdlog::debug("Disabling outputs...");
|
||||
disableOutputs();
|
||||
updateCustomOut();
|
||||
|
|
|
@ -62,54 +62,54 @@ const size_t FILE_HEADER_SIZE(extent<decltype(FILE_HEADER)>::value);
|
|||
/// @param size the size type.
|
||||
/// @return The required storage size in number of bytes.
|
||||
inline uint8_t get_size_bytes(IecLocationSize size) {
|
||||
switch (size) {
|
||||
case IECLST_BIT:
|
||||
return 1;
|
||||
case IECLST_BYTE:
|
||||
return 1;
|
||||
case IECLST_WORD:
|
||||
return 2;
|
||||
case IECLST_DOUBLEWORD:
|
||||
return 4;
|
||||
case IECLST_LONGWORD:
|
||||
return 8;
|
||||
}
|
||||
switch (size) {
|
||||
case IECLST_BIT:
|
||||
return 1;
|
||||
case IECLST_BYTE:
|
||||
return 1;
|
||||
case IECLST_WORD:
|
||||
return 2;
|
||||
case IECLST_DOUBLEWORD:
|
||||
return 4;
|
||||
case IECLST_LONGWORD:
|
||||
return 8;
|
||||
}
|
||||
|
||||
return 0;
|
||||
return 0;
|
||||
}
|
||||
|
||||
/// Get the total number of bytes required to store the bindings.
|
||||
/// @param bindings The bindings that we want to store.
|
||||
/// @return The total number of bytes required.
|
||||
size_t get_size_bytes(const GlueVariablesBinding& bindings) {
|
||||
size_t size(0);
|
||||
size_t size(0);
|
||||
|
||||
for (uint16_t index(0); index < bindings.size; ++index) {
|
||||
const GlueVariable& glue = bindings.glue_variables[index];
|
||||
for (uint16_t index(0); index < bindings.size; ++index) {
|
||||
const GlueVariable& glue = bindings.glue_variables[index];
|
||||
|
||||
if (glue.dir != IECLDT_MEM) {
|
||||
// We only care about items that are stored in memory
|
||||
continue;
|
||||
}
|
||||
if (glue.dir != IECLDT_MEM) {
|
||||
// We only care about items that are stored in memory
|
||||
continue;
|
||||
}
|
||||
|
||||
size += get_size_bytes(glue.size);
|
||||
}
|
||||
size += get_size_bytes(glue.size);
|
||||
}
|
||||
|
||||
return size;
|
||||
return size;
|
||||
}
|
||||
|
||||
inline uint8_t mask_index(GlueBoolGroup* group, uint8_t i) {
|
||||
if (!group->values[i]) {
|
||||
return 0;
|
||||
}
|
||||
if (!group->values[i]) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
return (*group->values) ? (1 << i) : 0;
|
||||
return (*group->values) ? (1 << i) : 0;
|
||||
}
|
||||
|
||||
inline void set_index(GlueBoolGroup* group, uint8_t i, uint8_t v) {
|
||||
if (group->values[i]) {
|
||||
(*group->values[i]) = ((1 << i) & v) ? TRUE: FALSE;
|
||||
}
|
||||
if (group->values[i]) {
|
||||
(*group->values[i]) = ((1 << i) & v) ? TRUE: FALSE;
|
||||
}
|
||||
}
|
||||
|
||||
/// Copy the glue values into the buffer.
|
||||
|
@ -117,284 +117,284 @@ inline void set_index(GlueBoolGroup* group, uint8_t i, uint8_t v) {
|
|||
/// @param buffer The buffer that we are copying into.
|
||||
/// @return The number of bytes that were written into the buffer.
|
||||
size_t pstorage_copy_glue(const GlueVariablesBinding& bindings, char* buffer) {
|
||||
lock_guard<mutex> guard(*bindings.buffer_lock);
|
||||
lock_guard<mutex> guard(*bindings.buffer_lock);
|
||||
|
||||
size_t num_written(0);
|
||||
for (uint16_t index(0); index < bindings.size; ++index) {
|
||||
const GlueVariable& glue = bindings.glue_variables[index];
|
||||
size_t num_written(0);
|
||||
for (uint16_t index(0); index < bindings.size; ++index) {
|
||||
const GlueVariable& glue = bindings.glue_variables[index];
|
||||
|
||||
if (glue.dir != IECLDT_MEM) {
|
||||
// We only care about items that are stored in memory
|
||||
continue;
|
||||
}
|
||||
if (glue.dir != IECLDT_MEM) {
|
||||
// We only care about items that are stored in memory
|
||||
continue;
|
||||
}
|
||||
|
||||
uint8_t num_bytes = get_size_bytes(glue.size);
|
||||
uint8_t num_bytes = get_size_bytes(glue.size);
|
||||
|
||||
if (glue.size == IECLST_BIT) {
|
||||
GlueBoolGroup* group = reinterpret_cast<GlueBoolGroup*>(glue.value);
|
||||
uint8_t bools_as_byte = mask_index(group, 0)
|
||||
| mask_index(group, 1)
|
||||
| mask_index(group, 2)
|
||||
| mask_index(group, 3)
|
||||
| mask_index(group, 4)
|
||||
| mask_index(group, 5)
|
||||
| mask_index(group, 6)
|
||||
| mask_index(group, 7);
|
||||
memcpy(buffer, &bools_as_byte, 1);
|
||||
} else {
|
||||
// Write the number of bytes to the buffer
|
||||
memcpy(buffer, glue.value, num_bytes);
|
||||
}
|
||||
if (glue.size == IECLST_BIT) {
|
||||
GlueBoolGroup* group = reinterpret_cast<GlueBoolGroup*>(glue.value);
|
||||
uint8_t bools_as_byte = mask_index(group, 0)
|
||||
| mask_index(group, 1)
|
||||
| mask_index(group, 2)
|
||||
| mask_index(group, 3)
|
||||
| mask_index(group, 4)
|
||||
| mask_index(group, 5)
|
||||
| mask_index(group, 6)
|
||||
| mask_index(group, 7);
|
||||
memcpy(buffer, &bools_as_byte, 1);
|
||||
} else {
|
||||
// Write the number of bytes to the buffer
|
||||
memcpy(buffer, glue.value, num_bytes);
|
||||
}
|
||||
|
||||
// Advance the pointer to the next starting position
|
||||
num_written += num_bytes;
|
||||
buffer += num_bytes;
|
||||
}
|
||||
// Advance the pointer to the next starting position
|
||||
num_written += num_bytes;
|
||||
buffer += num_bytes;
|
||||
}
|
||||
|
||||
return num_written;
|
||||
return num_written;
|
||||
}
|
||||
|
||||
/// Container for reading in configuration from the config.ini
|
||||
/// This is populated with values from the config file.
|
||||
struct PstorageConfig {
|
||||
PstorageConfig() :
|
||||
poll_interval(std::chrono::seconds(10))
|
||||
{}
|
||||
std::chrono::seconds poll_interval;
|
||||
PstorageConfig() :
|
||||
poll_interval(std::chrono::seconds(10))
|
||||
{}
|
||||
std::chrono::seconds poll_interval;
|
||||
};
|
||||
|
||||
int pstorage_cfg_handler(void* user_data, const char* section,
|
||||
const char* name, const char* value) {
|
||||
if (strcmp("pstorage", section) != 0) {
|
||||
if (strcmp("pstorage", section) != 0) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
auto config = reinterpret_cast<PstorageConfig*>(user_data);
|
||||
auto config = reinterpret_cast<PstorageConfig*>(user_data);
|
||||
|
||||
if (strcmp(name, "poll_interval") == 0) {
|
||||
// We do not allow a poll period of less than 1 second as that
|
||||
// might cause lock contention problems.
|
||||
config->poll_interval = std::chrono::seconds(max(1, atoi(value)));
|
||||
} else if (strcmp(name, "enabled") == 0) {
|
||||
if (strcmp(name, "poll_interval") == 0) {
|
||||
// We do not allow a poll period of less than 1 second as that
|
||||
// might cause lock contention problems.
|
||||
config->poll_interval = std::chrono::seconds(max(1, atoi(value)));
|
||||
} else if (strcmp(name, "enabled") == 0) {
|
||||
// Nothing to do here - we already know this is enabled
|
||||
} else {
|
||||
} else {
|
||||
spdlog::warn("Unknown configuration item {}", name);
|
||||
return -1;
|
||||
}
|
||||
|
||||
return 0;
|
||||
return 0;
|
||||
}
|
||||
|
||||
int8_t pstorage_run(std::unique_ptr<std::istream, std::function<void(std::istream*)>>& cfg_stream,
|
||||
const char* cfg_overrides,
|
||||
const GlueVariablesBinding& bindings,
|
||||
volatile bool& run,
|
||||
function<std::ostream*(void)> stream_fn)
|
||||
const GlueVariablesBinding& bindings,
|
||||
volatile bool& run,
|
||||
function<std::ostream*(void)> stream_fn)
|
||||
{
|
||||
PstorageConfig config;
|
||||
ini_parse_stream(istream_fgets, cfg_stream.get(), pstorage_cfg_handler, &config);
|
||||
PstorageConfig config;
|
||||
ini_parse_stream(istream_fgets, cfg_stream.get(), pstorage_cfg_handler, &config);
|
||||
|
||||
// We are done with the file, so release the unique ptr. Normally this
|
||||
// We are done with the file, so release the unique ptr. Normally this
|
||||
// will close the reference to the file
|
||||
cfg_stream.reset(nullptr);
|
||||
|
||||
if (strlen(cfg_overrides) > 0) {
|
||||
config.poll_interval = std::chrono::seconds(max(1, atoi(cfg_overrides)));
|
||||
}
|
||||
if (strlen(cfg_overrides) > 0) {
|
||||
config.poll_interval = std::chrono::seconds(max(1, atoi(cfg_overrides)));
|
||||
}
|
||||
|
||||
const char endianness_header[2] = { IS_BIG_ENDIAN, '\n'};
|
||||
const char endianness_header[2] = { IS_BIG_ENDIAN, '\n'};
|
||||
|
||||
// This isn't ideal because we really only need enough space for
|
||||
// the located variables that are memory items, and this is all, but
|
||||
// it does ensure we have enough space.
|
||||
char buffer[OPLC_PERSISTENT_STORAGE_MAX_SIZE] = {0};
|
||||
// We keep a second block of memory so that we can detect if the values
|
||||
// have changed and not write if they are the same.
|
||||
char buffer_old[OPLC_PERSISTENT_STORAGE_MAX_SIZE] = {0};
|
||||
// This isn't ideal because we really only need enough space for
|
||||
// the located variables that are memory items, and this is all, but
|
||||
// it does ensure we have enough space.
|
||||
char buffer[OPLC_PERSISTENT_STORAGE_MAX_SIZE] = {0};
|
||||
// We keep a second block of memory so that we can detect if the values
|
||||
// have changed and not write if they are the same.
|
||||
char buffer_old[OPLC_PERSISTENT_STORAGE_MAX_SIZE] = {0};
|
||||
|
||||
// If the required size from bindings is greater than the configured
|
||||
// size, then just exit
|
||||
if (get_size_bytes(bindings) > extent<decltype(buffer)>::value) {
|
||||
spdlog::error("Stored variables too large for persistent storage");
|
||||
return -1;
|
||||
}
|
||||
// If the required size from bindings is greater than the configured
|
||||
// size, then just exit
|
||||
if (get_size_bytes(bindings) > extent<decltype(buffer)>::value) {
|
||||
spdlog::error("Stored variables too large for persistent storage");
|
||||
return -1;
|
||||
}
|
||||
|
||||
while (run) {
|
||||
size_t num_written = pstorage_copy_glue(bindings, buffer);
|
||||
while (run) {
|
||||
size_t num_written = pstorage_copy_glue(bindings, buffer);
|
||||
|
||||
if (memcmp(buffer, buffer_old, num_written) != 0) {
|
||||
// Try to open the file to do the initial write
|
||||
unique_ptr<ostream> out_stream(stream_fn());
|
||||
if (!out_stream) {
|
||||
spdlog::error("Unable to open persistent storage file for writing");
|
||||
return -2;
|
||||
}
|
||||
out_stream->write(FILE_HEADER, FILE_HEADER_SIZE);
|
||||
out_stream->write(endianness_header, 2);
|
||||
out_stream->write(bindings.checksum, strlen(bindings.checksum));
|
||||
out_stream->put('\n');
|
||||
out_stream->write(buffer, num_written);
|
||||
if (memcmp(buffer, buffer_old, num_written) != 0) {
|
||||
// Try to open the file to do the initial write
|
||||
unique_ptr<ostream> out_stream(stream_fn());
|
||||
if (!out_stream) {
|
||||
spdlog::error("Unable to open persistent storage file for writing");
|
||||
return -2;
|
||||
}
|
||||
out_stream->write(FILE_HEADER, FILE_HEADER_SIZE);
|
||||
out_stream->write(endianness_header, 2);
|
||||
out_stream->write(bindings.checksum, strlen(bindings.checksum));
|
||||
out_stream->put('\n');
|
||||
out_stream->write(buffer, num_written);
|
||||
|
||||
spdlog::info("Persistent storage updated");
|
||||
spdlog::info("Persistent storage updated");
|
||||
|
||||
// We should be able to avoid this memory copy entirely
|
||||
memcpy(buffer_old, buffer, num_written);
|
||||
} else {
|
||||
spdlog::debug("Skip persistent write because unchanged values");
|
||||
}
|
||||
// We should be able to avoid this memory copy entirely
|
||||
memcpy(buffer_old, buffer, num_written);
|
||||
} else {
|
||||
spdlog::debug("Skip persistent write because unchanged values");
|
||||
}
|
||||
|
||||
// Since we just wrote, we sleep.
|
||||
// TODO this needs a new mechanism for sleeping because this can
|
||||
// delay shutdown/stop if polling is long.
|
||||
this_thread::sleep_for(config.poll_interval);
|
||||
}
|
||||
// Since we just wrote, we sleep.
|
||||
// TODO this needs a new mechanism for sleeping because this can
|
||||
// delay shutdown/stop if polling is long.
|
||||
this_thread::sleep_for(config.poll_interval);
|
||||
}
|
||||
|
||||
spdlog::debug("Persistent storage ending normally");
|
||||
spdlog::debug("Persistent storage ending normally");
|
||||
|
||||
return 0;
|
||||
return 0;
|
||||
}
|
||||
|
||||
inline int8_t read_and_check(istream& input_stream, const char header[],
|
||||
char buffer[], size_t count) {
|
||||
if (!input_stream.read(buffer, count)) {
|
||||
spdlog::warn("Unable to read header from persistence file stream");
|
||||
return -1;
|
||||
}
|
||||
if (!input_stream.read(buffer, count)) {
|
||||
spdlog::warn("Unable to read header from persistence file stream");
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (memcmp(header, buffer, count) != 0) {
|
||||
spdlog::warn("Header does not match expected in persistence file ");
|
||||
return -2;
|
||||
}
|
||||
if (memcmp(header, buffer, count) != 0) {
|
||||
spdlog::warn("Header does not match expected in persistence file ");
|
||||
return -2;
|
||||
}
|
||||
|
||||
return 0;
|
||||
return 0;
|
||||
}
|
||||
|
||||
int8_t pstorage_read(istream& input_stream,
|
||||
const GlueVariablesBinding& bindings)
|
||||
{
|
||||
// Read the file header - we define the file header as a constant that
|
||||
// must be present as the header. We don't allow UTF BOMs here.
|
||||
char header_check[FILE_HEADER_SIZE];
|
||||
if (read_and_check(input_stream, FILE_HEADER, header_check, FILE_HEADER_SIZE) != 0) {
|
||||
return -1;
|
||||
}
|
||||
// Read the file header - we define the file header as a constant that
|
||||
// must be present as the header. We don't allow UTF BOMs here.
|
||||
char header_check[FILE_HEADER_SIZE];
|
||||
if (read_and_check(input_stream, FILE_HEADER, header_check, FILE_HEADER_SIZE) != 0) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
// Check endianness of the written file
|
||||
char endianness_expected[2] = { IS_BIG_ENDIAN, '\n'};
|
||||
char endianness_check[2];
|
||||
if (read_and_check(input_stream, endianness_expected, header_check, 2) != 0) {
|
||||
return -2;
|
||||
}
|
||||
// Check endianness of the written file
|
||||
char endianness_expected[2] = { IS_BIG_ENDIAN, '\n'};
|
||||
char endianness_check[2];
|
||||
if (read_and_check(input_stream, endianness_expected, header_check, 2) != 0) {
|
||||
return -2;
|
||||
}
|
||||
|
||||
// We have a digest in the header to try to prevent accidentally using
|
||||
// the wrong persistence file for a particular runtime.
|
||||
char checksum_check[32];
|
||||
if (read_and_check(input_stream, bindings.checksum, checksum_check, 32) != 0) {
|
||||
return -3;
|
||||
}
|
||||
// We have a digest in the header to try to prevent accidentally using
|
||||
// the wrong persistence file for a particular runtime.
|
||||
char checksum_check[32];
|
||||
if (read_and_check(input_stream, bindings.checksum, checksum_check, 32) != 0) {
|
||||
return -3;
|
||||
}
|
||||
|
||||
// Just add one newline character
|
||||
char padding_expected[1] = { '\n' };
|
||||
char padding_check[1];
|
||||
if (read_and_check(input_stream, padding_expected, padding_check, 1) != 0) {
|
||||
return -4;
|
||||
}
|
||||
// Just add one newline character
|
||||
char padding_expected[1] = { '\n' };
|
||||
char padding_check[1];
|
||||
if (read_and_check(input_stream, padding_expected, padding_check, 1) != 0) {
|
||||
return -4;
|
||||
}
|
||||
|
||||
// Now we know that the format is right, so read in the rest. We read
|
||||
// variable by variable so that we can assign into the right value.
|
||||
for (uint16_t index(0); index < bindings.size; ++index) {
|
||||
const GlueVariable& glue = bindings.glue_variables[index];
|
||||
// Now we know that the format is right, so read in the rest. We read
|
||||
// variable by variable so that we can assign into the right value.
|
||||
for (uint16_t index(0); index < bindings.size; ++index) {
|
||||
const GlueVariable& glue = bindings.glue_variables[index];
|
||||
|
||||
if (glue.dir != IECLDT_MEM) {
|
||||
// We only care about items that are stored in memory
|
||||
continue;
|
||||
}
|
||||
if (glue.dir != IECLDT_MEM) {
|
||||
// We only care about items that are stored in memory
|
||||
continue;
|
||||
}
|
||||
|
||||
uint8_t num_bytes;
|
||||
switch (glue.size) {
|
||||
case IECLST_BIT:
|
||||
num_bytes = 1;
|
||||
break;
|
||||
case IECLST_BYTE:
|
||||
num_bytes = 1;
|
||||
break;
|
||||
case IECLST_WORD:
|
||||
num_bytes = 2;
|
||||
break;
|
||||
case IECLST_DOUBLEWORD:
|
||||
num_bytes = 4;
|
||||
break;
|
||||
case IECLST_LONGWORD:
|
||||
num_bytes = 8;
|
||||
break;
|
||||
default:
|
||||
spdlog::error("Unexpected glue variable type {}", glue.size);
|
||||
return -5;
|
||||
}
|
||||
uint8_t num_bytes;
|
||||
switch (glue.size) {
|
||||
case IECLST_BIT:
|
||||
num_bytes = 1;
|
||||
break;
|
||||
case IECLST_BYTE:
|
||||
num_bytes = 1;
|
||||
break;
|
||||
case IECLST_WORD:
|
||||
num_bytes = 2;
|
||||
break;
|
||||
case IECLST_DOUBLEWORD:
|
||||
num_bytes = 4;
|
||||
break;
|
||||
case IECLST_LONGWORD:
|
||||
num_bytes = 8;
|
||||
break;
|
||||
default:
|
||||
spdlog::error("Unexpected glue variable type {}", glue.size);
|
||||
return -5;
|
||||
}
|
||||
|
||||
// Read the required number of bytes from the stream
|
||||
// 8 here is the maximum buffer size that we need based on
|
||||
// the types that we support.
|
||||
char buffer[8];
|
||||
if (!input_stream.read(buffer, num_bytes)) {
|
||||
spdlog::error("Persistent storage file too short; partially read");
|
||||
return -6;
|
||||
}
|
||||
// Read the required number of bytes from the stream
|
||||
// 8 here is the maximum buffer size that we need based on
|
||||
// the types that we support.
|
||||
char buffer[8];
|
||||
if (!input_stream.read(buffer, num_bytes)) {
|
||||
spdlog::error("Persistent storage file too short; partially read");
|
||||
return -6;
|
||||
}
|
||||
|
||||
// Assign the value into the glue value. The value is ether a simple
|
||||
// value or a group of booleans.
|
||||
// We don't actually care what the contents are - we just populate as
|
||||
// though they are raw bytes
|
||||
if (glue.size == IECLST_BIT) {
|
||||
GlueBoolGroup* group = reinterpret_cast<GlueBoolGroup*>(glue.value);
|
||||
uint8_t value = static_cast<uint8_t>(buffer[0]);
|
||||
set_index(group, 0, value);
|
||||
set_index(group, 1, value);
|
||||
set_index(group, 2, value);
|
||||
set_index(group, 3, value);
|
||||
set_index(group, 4, value);
|
||||
set_index(group, 5, value);
|
||||
set_index(group, 6, value);
|
||||
set_index(group, 7, value);
|
||||
} else {
|
||||
memcpy(glue.value, buffer, num_bytes);
|
||||
}
|
||||
}
|
||||
// Assign the value into the glue value. The value is ether a simple
|
||||
// value or a group of booleans.
|
||||
// We don't actually care what the contents are - we just populate as
|
||||
// though they are raw bytes
|
||||
if (glue.size == IECLST_BIT) {
|
||||
GlueBoolGroup* group = reinterpret_cast<GlueBoolGroup*>(glue.value);
|
||||
uint8_t value = static_cast<uint8_t>(buffer[0]);
|
||||
set_index(group, 0, value);
|
||||
set_index(group, 1, value);
|
||||
set_index(group, 2, value);
|
||||
set_index(group, 3, value);
|
||||
set_index(group, 4, value);
|
||||
set_index(group, 5, value);
|
||||
set_index(group, 6, value);
|
||||
set_index(group, 7, value);
|
||||
} else {
|
||||
memcpy(glue.value, buffer, num_bytes);
|
||||
}
|
||||
}
|
||||
|
||||
spdlog::info("Initialized from persistent storage");
|
||||
spdlog::info("Initialized from persistent storage");
|
||||
|
||||
return 0;
|
||||
return 0;
|
||||
}
|
||||
|
||||
void pstorage_service_init(const GlueVariablesBinding& binding) {
|
||||
ifstream stream("persistent.file", ios::binary);
|
||||
if (!stream) {
|
||||
spdlog::info("Skipped load persistence because file cannot be read.");
|
||||
return;
|
||||
}
|
||||
ifstream stream("persistent.file", ios::binary);
|
||||
if (!stream) {
|
||||
spdlog::info("Skipped load persistence because file cannot be read.");
|
||||
return;
|
||||
}
|
||||
|
||||
auto result = pstorage_read(stream, binding);
|
||||
spdlog::info("Storage read completed with result {}", result);
|
||||
auto result = pstorage_read(stream, binding);
|
||||
spdlog::info("Storage read completed with result {}", result);
|
||||
}
|
||||
|
||||
void pstorage_service_finalize(const GlueVariablesBinding& binding) {
|
||||
// We don't current do anything on finalize (although we probably should)
|
||||
// We don't current do anything on finalize (although we probably should)
|
||||
}
|
||||
|
||||
void pstorage_service_run(const GlueVariablesBinding& binding,
|
||||
volatile bool& run, const char* config) {
|
||||
|
||||
// We don't allow a poll duration of less than one second otherwise
|
||||
// that can have detrimental effects on performance
|
||||
auto create_stream = []() { return new ofstream("persistent.file", ios::binary); };
|
||||
// We don't allow a poll duration of less than one second otherwise
|
||||
// that can have detrimental effects on performance
|
||||
auto create_stream = []() { return new ofstream("persistent.file", ios::binary); };
|
||||
|
||||
unique_ptr<istream, function<void(istream*)>> cfg_stream(new ifstream("../etc/config.ini"), [](istream* s)
|
||||
unique_ptr<istream, function<void(istream*)>> cfg_stream(new ifstream("../etc/config.ini"), [](istream* s)
|
||||
{
|
||||
reinterpret_cast<ifstream*>(s)->close();
|
||||
delete s;
|
||||
});
|
||||
|
||||
pstorage_run(cfg_stream, config, binding, run, create_stream);
|
||||
pstorage_run(cfg_stream, config, binding, run, create_stream);
|
||||
}
|
||||
|
||||
/** @}*/
|
||||
|
|
|
@ -18,13 +18,16 @@
|
|||
#include "service_definition.h"
|
||||
#include "service_registry.h"
|
||||
#include "pstorage.h"
|
||||
#include "interactive_server.h"
|
||||
#include "../dnp3s/dnp3.h"
|
||||
|
||||
ServiceInitFunction pstorage_init_fn(pstorage_service_init);
|
||||
ServiceStartFunction pstorage_start_service_fn(pstorage_service_run);
|
||||
ServiceStartFunction dnp3s_start_service_fn(dnp3s_service_run);
|
||||
ServiceStartFunction interactive_start_service_fn(interactive_service_run);
|
||||
|
||||
ServiceDefinition* services[] = {
|
||||
new ServiceDefinition("interactive", interactive_start_service_fn),
|
||||
new ServiceDefinition("pstorage", pstorage_start_service_fn, pstorage_init_fn),
|
||||
#ifdef OPLC_DNP3_OUTSTATION
|
||||
new ServiceDefinition("dnp3s", dnp3s_start_service_fn),
|
||||
|
|
|
@ -22,7 +22,6 @@
|
|||
#include "catch.hpp"
|
||||
#include "fakeit.hpp"
|
||||
|
||||
void sleep_until(timespec*, int) {}
|
||||
#include "glue.h"
|
||||
#include "dnp3s/dnp3.h"
|
||||
|
||||
|
@ -36,6 +35,7 @@ SCENARIO("create_config", "")
|
|||
Dnp3IndexedGroup binary_commands = {0};
|
||||
Dnp3IndexedGroup analog_commands = {0};
|
||||
Dnp3MappedGroup measurements = {0};
|
||||
chrono::milliseconds poll_interval(1);
|
||||
std::uint16_t port;
|
||||
|
||||
GIVEN("<input stream>")
|
||||
|
@ -44,7 +44,7 @@ SCENARIO("create_config", "")
|
|||
{
|
||||
GlueVariablesBinding bindings(&glue_mutex, 0, nullptr, nullptr);
|
||||
std::stringstream input_stream;
|
||||
const OutstationStackConfig config(dnp3_create_config(input_stream, bindings, binary_commands, analog_commands, measurements, port));
|
||||
const OutstationStackConfig config(dnp3_create_config(input_stream, bindings, binary_commands, analog_commands, measurements, port, poll_interval));
|
||||
|
||||
REQUIRE(config.dbConfig.binary.IsEmpty());
|
||||
REQUIRE(config.dbConfig.doubleBinary.IsEmpty());
|
||||
|
@ -69,7 +69,7 @@ SCENARIO("create_config", "")
|
|||
};
|
||||
GlueVariablesBinding bindings(&glue_mutex, 1, glue_vars, nullptr);
|
||||
std::stringstream input_stream("[dnp3s]\nbind_location=name:%QX0.0,group:1,index:0,");
|
||||
const OutstationStackConfig config(dnp3_create_config(input_stream, bindings, binary_commands, analog_commands, measurements, port));
|
||||
const OutstationStackConfig config(dnp3_create_config(input_stream, bindings, binary_commands, analog_commands, measurements, port, poll_interval));
|
||||
|
||||
REQUIRE(config.dbConfig.binary.Size() == 1);
|
||||
REQUIRE(config.dbConfig.doubleBinary.Size() == 0);
|
||||
|
@ -97,7 +97,7 @@ SCENARIO("create_config", "")
|
|||
};
|
||||
GlueVariablesBinding bindings(&glue_mutex, 1, glue_vars, nullptr);
|
||||
std::stringstream input_stream("[dnp3s]\nbind_location=name:%IX0.0,group:1,index:1,");
|
||||
const OutstationStackConfig config(dnp3_create_config(input_stream, bindings, binary_commands, analog_commands, measurements, port));
|
||||
const OutstationStackConfig config(dnp3_create_config(input_stream, bindings, binary_commands, analog_commands, measurements, port, poll_interval));
|
||||
|
||||
REQUIRE(config.dbConfig.binary.Size() == 1);
|
||||
REQUIRE(config.dbConfig.doubleBinary.Size() == 0);
|
||||
|
@ -125,7 +125,7 @@ SCENARIO("create_config", "")
|
|||
};
|
||||
GlueVariablesBinding bindings(&glue_mutex, 1, glue_vars, nullptr);
|
||||
std::stringstream input_stream("[dnp3s]\nbind_location=name:%IX0.0,group:12,index:1,");
|
||||
const OutstationStackConfig config(dnp3_create_config(input_stream, bindings, binary_commands, analog_commands, measurements, port));
|
||||
const OutstationStackConfig config(dnp3_create_config(input_stream, bindings, binary_commands, analog_commands, measurements, port, poll_interval));
|
||||
|
||||
REQUIRE(config.dbConfig.binary.Size() == 0);
|
||||
REQUIRE(config.dbConfig.doubleBinary.Size() == 0);
|
||||
|
@ -155,7 +155,7 @@ SCENARIO("create_config", "")
|
|||
};
|
||||
GlueVariablesBinding bindings(&glue_mutex, 1, glue_vars, nullptr);
|
||||
std::stringstream input_stream("[dnp3s]\nbind_location=name:%QD0,group:30,index:1,");
|
||||
const OutstationStackConfig config(dnp3_create_config(input_stream, bindings, binary_commands, analog_commands, measurements, port));
|
||||
const OutstationStackConfig config(dnp3_create_config(input_stream, bindings, binary_commands, analog_commands, measurements, port, poll_interval));
|
||||
|
||||
REQUIRE(config.dbConfig.binary.Size() == 0);
|
||||
REQUIRE(config.dbConfig.doubleBinary.Size() == 0);
|
||||
|
|
Loading…
Reference in New Issue