#include <iostream>
// For file writing:
#include <fstream>
#include <string>
#include <stdio.h>
#include <limits>
#include <cerrno>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <pthread.h>
#include "StreamingInput.h"
StreamingInput::StreamingInput(int x, int y, double temporal_step, string conn_url):module(x,y,temporal_step){
pthread_mutexattr_t mutex_attrib;
// Default input parameters
connection_url = conn_url;
SkipNInitFrames = 0; // No frame skipped by default
RepeatLastFrame = false; // Sim. is terminted after end of input
InputFramePeriod = 1; // by default one new frame is used each simulation millisecond
// Allocate image buffers buffer
outputImage = new CImg<double> (sizeY, sizeX, 1, 1, 0);
receiver_vars.buffer_img = new CImg<double> (sizeY, sizeX, 1, 1, 0);
// Init. internal vars
NextFrameTime = 0; // First frame must be received at time 0
endOfInput = false;
socket_fd = -1; // There is no connection at the beginning, so the Sockets and stream are not created yet
accept_socket_fd = -1;
receiver_vars.accept_socket_fh = NULL;
receiver_vars.exit_reception = false; // Exit has not been signaled
// Set specific mutex attributes: If a thread attempts to unlock an unlocked the fn just returns an error
// If the a thread attemps to relock a mutex just an error is returned (no deadlock occurs).
pthread_mutexattr_init(&mutex_attrib);
pthread_mutexattr_settype(&mutex_attrib, PTHREAD_MUTEX_NORMAL); // No mutex owner checking
pthread_mutex_init(&receiver_vars.buffer_mutex, &mutex_attrib); // Initialize buffer mutex
pthread_mutex_init(&receiver_vars.reception_mutex, &mutex_attrib); // Initialize reception mutex
pthread_mutexattr_destroy(&mutex_attrib); // We do not need the attributes: destroy them
// This implementation for mutual exclusion of access to receiver_vars.buffer_img is really not legal, because
// a thread should not unlock a mutex which has been locked by other thread, however, it does work because
// mutex are created with attibute PTHREAD_MUTEX_NORMAL or (PTHREAD_MUTEX_DEFAULT), so the thread ID is not checked.
// However, a legal implementation should use pthread_cond_wait to alternate the access to the buffer.
// Receiver thread has not been created yet, so set its ID to an invalid value
// For this class an invalid receiver thread value is the ID of the caller thread
Receiver_thread_id = pthread_self();
}
// This method will probably not be used
StreamingInput::StreamingInput(const StreamingInput ©):module(copy){
connection_url = copy.connection_url;
socket_fd = copy.socket_fd;
accept_socket_fd = copy.accept_socket_fd;
receiver_vars.accept_socket_fh = copy.receiver_vars.accept_socket_fh;
Receiver_thread_id = copy.Receiver_thread_id;
receiver_vars = copy.receiver_vars;
receiver_vars.buffer_mutex = copy.receiver_vars.buffer_mutex;
receiver_vars.reception_mutex = copy.receiver_vars.reception_mutex;
SkipNInitFrames = copy.SkipNInitFrames;
RepeatLastFrame = copy.RepeatLastFrame;
InputFramePeriod = copy.InputFramePeriod;
NextFrameTime = copy.NextFrameTime;
endOfInput = copy.endOfInput;
outputImage=new CImg<double>(*copy.outputImage);
receiver_vars.buffer_img=new CImg<double>(*copy.receiver_vars.buffer_img);
}
StreamingInput::~StreamingInput(){
stopStreamReception();
closeConnection();
pthread_mutex_unlock(&receiver_vars.reception_mutex); // Attempting to destroy a locked mutex results in undefined behaviour
pthread_mutex_unlock(&receiver_vars.buffer_mutex);
pthread_mutex_destroy(&receiver_vars.reception_mutex);
pthread_mutex_destroy(&receiver_vars.buffer_mutex);
if(receiver_vars.buffer_img != NULL)
delete receiver_vars.buffer_img;
if(outputImage != NULL)
delete outputImage;
}
//------------------------------------------------------------------------------//
bool StreamingInput::allocateValues(){
bool ret_correct;
module::allocateValues(); // Call the allocateValues() method of the base class
ret_correct = openConnetion(); // Set socket to listen and wait for a connection
if(ret_correct){
if(SkipNInitFrames>0)
cout << "Skipping " << SkipNInitFrames << " input frames" << endl;
for(int n_skipped_frames=0;n_skipped_frames<SkipNInitFrames;n_skipped_frames++)
outputImage->load_png(receiver_vars.accept_socket_fh); // Skip frame
// Use the first frame to find out the new dimensions of retina image size
outputImage->load_png(receiver_vars.accept_socket_fh); // Get first valid frame
sizeY=outputImage->width();
sizeX=outputImage->height();
NextFrameTime=InputFramePeriod; // Next frame must be read at this time
// output image should have been automatically resized after first frame load
// Resize buffer image as well
receiver_vars.buffer_img->assign(sizeY, sizeX, 1, 1, 0);
// The first frame is now ready in the output, but receicer thread can still populate the buffer image,
// so leave the receiver unlocked (as it is by default) and lock the access to the buffer
pthread_mutex_lock(&receiver_vars.buffer_mutex); // lock access the the buffer (it is not full yet)
ret_correct = receiveStream();
}
return(ret_correct);
}
//------------------------------------------------------------------------------//
bool StreamingInput::set_SkipNInitFrames(int n_frames){
bool ret_correct;
if (n_frames>=0) {
SkipNInitFrames = n_frames;
ret_correct=true;
} else
ret_correct=false;
return(ret_correct);
}
bool StreamingInput::set_RepeatLastFrame(bool repeat_flag){
RepeatLastFrame = repeat_flag;
return(true);
}
bool StreamingInput::set_InputFramePeriod(double sim_time_period){
bool ret_correct;
if (sim_time_period>0) {
InputFramePeriod = sim_time_period;
ret_correct=true;
} else
ret_correct=false;
return(ret_correct);
}
//------------------------------------------------------------------------------//
bool StreamingInput::setParameters(vector<double> params, vector<string> paramID){
bool correct = true;
for (vector<double>::size_type i = 0;i < params.size() && correct;i++){
const char * s = paramID[i].c_str();
if (strcmp(s,"SkipNInitFrames")==0){
correct = set_SkipNInitFrames((int)(params[i]));
} else if (strcmp(s,"RepeatLastFrame")==0){
correct = set_RepeatLastFrame(params[i] != 0.0);
} else if (strcmp(s,"InputFramePeriod")==0){
correct = set_InputFramePeriod(params[i]);
} else{
correct = false;
}
}
return correct;
}
//------------------------------------------------------------------------------//
// This method can only be used to set the simulation time
void StreamingInput::feedInput(double sim_time, const CImg<double> &new_input, bool isCurrent, int port){
// Update the current simulation time (although it is currently not used)
simTime = sim_time;
}
//------------------------------------------------------------------------------//
void StreamingInput::get_new_frame(){
if(receiver_vars.buffer_img != NULL) // Don't wait for a frame if input already ended
pthread_mutex_lock(&receiver_vars.buffer_mutex); // Waits until a new frame is ready
if(receiver_vars.buffer_img != NULL) {
*outputImage = *receiver_vars.buffer_img; // Get output from buffer
pthread_mutex_unlock(&receiver_vars.reception_mutex); // Signal the receiver thread that it can update the value of buffer_img buffer with a new frame
} else { // End of stream
if(!endOfInput && !RepeatLastFrame){
endOfInput=true; // Indicate an end of input (and simulation)
}
}
}
void StreamingInput::update(){
while(simTime >= NextFrameTime){ // It is time to get a new frame?:
get_new_frame();
NextFrameTime += InputFramePeriod; // Update start time of the next frame
}
}
//------------------------------------------------------------------------------//
#define FIRST_IP_PORT 1
#define LAST_IP_PORT 65535
// This method creates a socket and waits for a connection
bool StreamingInput::openConnetion(){
bool ret_correct;
string url_format = "tcp://passive:";
size_t url_format_len = url_format.size();
// Check whether the beginning of the specified connection_url is equal to url_format
if(connection_url.compare(0, url_format_len, url_format) == 0) {
string port_str = connection_url.substr(url_format_len);
int port_number = stoi(port_str);
if(port_number >= FIRST_IP_PORT && port_number <= LAST_IP_PORT) { // Valid port numbers
socket_fd = socket(AF_INET, SOCK_STREAM, 0);
if(socket_fd != -1) {
struct sockaddr_in serv_addr;
bzero((void *)&serv_addr, sizeof(serv_addr));
serv_addr.sin_family = AF_INET;
serv_addr.sin_port = htons(port_number);
serv_addr.sin_addr.s_addr = INADDR_ANY;
if(bind(socket_fd, (struct sockaddr *)&serv_addr, sizeof(serv_addr)) != -1) {
if(listen(socket_fd, 1) != -1) {
struct sockaddr_in cli_addr;
socklen_t cli_addr_len;
cout << "Waiting for an incoming connection at port " << port_number << "..." << endl;
cli_addr_len = sizeof(cli_addr);
accept_socket_fd = accept(socket_fd, (struct sockaddr *)&cli_addr, &cli_addr_len);
if(accept_socket_fd != -1) {
cout << "Connection from " << inet_ntoa(cli_addr.sin_addr) << " accepted!" << endl;
receiver_vars.accept_socket_fh = fdopen(accept_socket_fd, "rb"); // Open the socket as a file stream
if(receiver_vars.accept_socket_fh != NULL) {
ret_correct=true;
} else {
cout << "Error opening accept socket as a file stream. errno: " << errno << endl;
close(accept_socket_fd);
accept_socket_fd=-1;
close(socket_fd);
socket_fd=-1;
ret_correct = false;
}
}
else{
cout << "Error accepting incomming connection. errno: " << errno << endl;
close(socket_fd);
socket_fd=-1;
ret_correct = false;
}
} else {
cout << "Connection socket could not set to passive socket in port " << port_number << ". errno: " << errno << "." << endl;
close(socket_fd);
socket_fd=-1;
ret_correct = false;
}
} else {
cout << "Connection socket could not be binded to port " << port_number << ". errno: " << errno << "." << endl;
close(socket_fd);
socket_fd=-1;
ret_correct = false;
}
} else {
cout << "Error creating connection socket. errno: " << errno << "." << endl;
ret_correct = false;
}
} else {
cout << "Incorrect connection port number specified: " << port_number << ". Expected a number in [" << FIRST_IP_PORT << "," << LAST_IP_PORT << "]." << endl;
ret_correct = false;
}
} else {
cout << "Incorrect connection URL format specified: " << connection_url << ". Expected: " << url_format << "port_number." << endl;
ret_correct = false;
}
return(ret_correct);
}
void *image_receiver_thread(struct receiver_params *params)
{
int error_code;
error_code=0;
while(!params->exit_reception && error_code==0) {
error_code=pthread_mutex_lock(&(params->reception_mutex)); // Waits until reception is allowed (buffer has been consumed)
if(!params->exit_reception && error_code==0){
int first_frame_char;
// Try to get the last frame character to check if a next frame is being sent
first_frame_char=fgetc(params->accept_socket_fh);
if(first_frame_char != EOF) { // Another frame is comming
ungetc(first_frame_char, params->accept_socket_fh); // Put the char back in the stream as that a complete image can be loaded
params->buffer_img->load_png(params->accept_socket_fh); // Receive a complete frame
error_code=pthread_mutex_unlock(&(params->buffer_mutex)); // New frame available, unblock update()
} else {
pthread_mutex_unlock(&(params->buffer_mutex)); // New (empty) frame available, unblock update()
error_code=EIO; // Exit loop
delete params->buffer_img;
params->buffer_img=NULL; // End of stream is indicated with a NULL pointer frame
}
}
}
// The thread will return the error code or 0 if success.
if(error_code==EIO)
cout << "\rStreaming connection was closed by the other end." << endl;
// we do not know the sizeof(void *) in principle, so cast to intptr_t which has the same sizer to avoid warning
return((void *)(intptr_t)error_code);
}
bool StreamingInput::receiveStream(){
bool ret_correct;
int thread_error;
// Create joinable thread. The thread fn argument will be the socket fd
thread_error = pthread_create(&Receiver_thread_id, NULL, (void *(*)(void *))&image_receiver_thread, (void *)&receiver_vars);
if(thread_error == 0) { // If success
ret_correct = true;
} else {
cout << "Failed to create image receiver thread. return error code: " << thread_error << "." << endl;
ret_correct = false;
}
return(ret_correct);
}
bool StreamingInput::stopStreamReception(){
bool ret_correct;
if(pthread_equal(Receiver_thread_id,pthread_self()) == 0) {// Receiver_thread_id <> pthread_self(), so receiver thread was created
void *thread_ret_ptr;
int join_err, thread_ret;
receiver_vars.exit_reception = true; // Signal the receiver thread to terminate
pthread_mutex_unlock(&receiver_vars.reception_mutex); // Unlock reception (just in case the thread was locked)
join_err = pthread_join(Receiver_thread_id, &thread_ret_ptr); // Wait for thread to terminate
if(join_err == 0) {// If success joining
thread_ret = (int)(intptr_t)thread_ret_ptr;
if(thread_ret != 0){
if(thread_ret != EIO)
cout << "Frame reception ended anormally. errno: " << thread_ret << "." << endl;
}
ret_correct = true;
} else {
cout << "Error waiting for the image receiver thread to finish. return error code: " << join_err << endl;
ret_correct = false;
}
// pthread_cancel(
} else
ret_correct = true;
return(ret_correct);
}
bool StreamingInput::closeConnection(){
bool ret_correct;
if(receiver_vars.accept_socket_fh != NULL)
fclose(receiver_vars.accept_socket_fh); // Close file handle and correponding file descriptor
else {
if(accept_socket_fd != -1)
close(accept_socket_fd);
}
if(socket_fd != -1){ // Check if the socket was created
ret_correct = (close(socket_fd) == 0);
}
else
ret_correct = true;
return(ret_correct);
}
//------------------------------------------------------------------------------//
// This method returns the last received image which is stored in the output buffer
CImg<double>* StreamingInput::getOutput(){
if(endOfInput)
return NULL;
else
return outputImage; // outputImage is a pointer wich may have been already returned, so it should not be freed since its content may be accessed
}
//------------------------------------------------------------------------------//
bool StreamingInput::isDummy() {
return false;
};