#include #include #include #include #include #include #include #include #include "util.h" #include "postprocess.h" static char socket_path[100]; struct Job { pid_t pid; char burstdir[255]; char target[255]; int keep; }; void start_processing(struct Job job) { postprocess_internal(job.burstdir, job.target, 1); } int is_daemon_running() { int sock; struct sockaddr_un addr; struct timeval tv; // Daemon isn't running if the socket doesn't exist if (access(socket_path, F_OK)) { fprintf(stderr, "[fg] daemon socket doesn't exist\n"); return 0; } // Check if the daemon responds on the socket sock = socket(AF_UNIX, SOCK_SEQPACKET, 0); if (sock < 0) { err("could not make socket fd"); return 0; } // Set a short timeout on the socket tv.tv_sec = 0; tv.tv_usec = 500; setsockopt(sock, SOL_SOCKET, SO_RCVTIMEO, (const char *) &tv, sizeof tv); memset(&addr, 0, sizeof(struct sockaddr_un)); addr.sun_family = AF_UNIX; strncpy(addr.sun_path, socket_path, sizeof(addr.sun_path) - 1); if (connect(sock, (struct sockaddr *) &addr, sizeof(struct sockaddr_un)) < 0) { err("[fg] could not open daemon socket"); return 0; } close(sock); fprintf(stderr, "[fg] daemon is already running\n"); return 1; } int listen_on_socket() { int sock; struct sockaddr_un addr; // Clean existing socket if (remove(socket_path) == -1 && errno != ENOENT) { err("could not clean up old socket"); } // Make new unix domain socket to listen on sock = socket(AF_UNIX, SOCK_SEQPACKET, 0); if (sock < 0) { err("could not make socket fd"); return 0; } memset(&addr, 0, sizeof(struct sockaddr_un)); addr.sun_family = AF_UNIX; strncpy(addr.sun_path, socket_path, sizeof(addr.sun_path) - 1); unlink(socket_path); if (bind(sock, (struct sockaddr *) &addr, sizeof(struct sockaddr_un)) < 0) { err("failed to bind socket"); return 0; } if (listen(sock, 20) < 0) { err("failed to listen"); return 0; } return sock; } int queue_job(struct Job job) { int sock; struct sockaddr_un addr; char buffer[1024]; sock = socket(AF_UNIX, SOCK_SEQPACKET, 0); if (sock < 0) { err("[fg] could not make socket fd"); return 1; } memset(&addr, 0, sizeof(struct sockaddr_un)); addr.sun_family = AF_UNIX; strncpy(addr.sun_path, socket_path, sizeof(addr.sun_path) - 1); if (connect(sock, (struct sockaddr *) &addr, sizeof(struct sockaddr_un)) < 0) { err("[fg] failed to open socket"); return 2; } if (write(sock, &job, sizeof(job)) < 0) { err("[fg] failed to write"); return 3; } fprintf(stderr, "[fg] wait until processing is done\n"); // Wait until the background process return the resulting filename if (read(sock, buffer, 1024) < 1) { err("[fg] failed to read"); return 4; } fprintf(stderr, "[fg] processing is done\n"); // Pass the stacked filename to megapixels printf("%s\n", buffer); fprintf(stderr, "[fg] done\n"); exit(0); } int start_background_process() { int sock, fd; struct sockaddr_un cli_addr; unsigned int cli_len; struct Job job; char buffer[272]; int first = 1; const char *name_fg = "postprocessd fg"; const char *name_bg = "postprocessd bg"; // First fork pid_t child_pid = fork(); if (child_pid < 0) { err("fork failed"); } else if (child_pid > 0) { prctl(PR_SET_NAME, (unsigned long) name_fg); // In the parent process waitpid(child_pid, NULL, WNOHANG); // Give the fork a bit of time to create the socket while (access(socket_path, F_OK)) { usleep(100); } usleep(1000); return 1; } // Create new process group setsid(); // Second fork pid_t child2_pid = fork(); if (child2_pid != 0) { // The middle child, exit quickly exit(0); } // We're now in the grandchild prctl(PR_SET_NAME, (unsigned long) name_bg); // Clean up FDs for (fd = sysconf(_SC_OPEN_MAX); fd > 0; --fd) { close(fd); } // Recreate standard pipes #ifdef __GLIBC__ stdin = fopen("/dev/null", "r"); stdout = fopen("/dev/null", "w+"); stderr = fopen("/dev/null", "w+"); #else freopen("/dev/null", "r", stdin); freopen("/dev/null", "w", stdout); freopen("/dev/null", "w", stderr); #endif cli_len = sizeof(cli_addr); fprintf(stderr, "[bg] init postprocessd\n"); postprocess_setup(); sock = listen_on_socket(); fprintf(stderr, "[bg] socket created\n"); fprintf(stderr, "[bg] first accept start\n"); while (1) { fd = accept(sock, (struct sockaddr *) &cli_addr, &cli_len); if (fd < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) { if (first) { // When no queued item has been processed yet act like a proper nonblocking accept // Doing a busy loop isn't very efficient but this should last for less than a second usleep(100); continue; } // No client was available after processing the first image. Kill the background process fprintf(stderr, "[bg] shutting down\n"); close(sock); unlink(socket_path); exit(0); } else if (fd < 0) { // Something went wrong with the listening socket and it wasn't the nonblocking errnos err("[bg] failed to accept"); return 0; } fprintf(stderr, "[bg] accepted connection\n"); if (read(fd, &job, sizeof(job)) < 0) { err("[bg] failed to read"); return 0; } fprintf(stderr, "[bg] start processing job\n"); start_processing(job); first = 0; wait(NULL); fprintf(stderr, "[bg] job done\n"); snprintf(buffer, sizeof(buffer), "%s.stacked.jpg", job.target); fprintf(stderr, "[bg] result: '%s'\n", buffer); if (write(fd, buffer, sizeof(buffer)) < 0) { err("[bg] failed to write response"); } close(fd); // Make the listen socket nonblocking fcntl(sock, F_SETFL, O_NONBLOCK); } } void make_socket_path() { char fname[80]; char *xdg_runtime_dir = getenv("XDG_RUNTIME_DIR"); char *user = getenv("USER"); snprintf(fname, sizeof(fname), "postprocessd-%s.sock", user); if (xdg_runtime_dir) { snprintf(socket_path, sizeof(socket_path), "%s/%s", xdg_runtime_dir, fname); } else { snprintf(socket_path, sizeof(socket_path), "/tmp/%s", fname); } fprintf(stderr, "[fg] using socket '%s'\n", socket_path); } int handle_job(struct Job job) { /* * There's two parts to postprocessd, the postprocessd binary is called * by Megapixels and will check if there's already a daemon running. * If there isn't then it will fork() a daemon process that opens a socket. * * The main process will connect to that process over the unix socket and * send the task. Then it'll wait until the background process is completed * so the filename can be returned to Megapixels. This also means that if * multiple pictures are taken while processing there will be a few * postprocessd processes running that only wait on the daemon without using * CPU and the the daemon will process all the tasks SEQUENTIALLY until * done and notify the right waiting processes the job is done. */ if (!is_daemon_running()) { fprintf(stderr, "[fg] starting new daemon\n"); unlink(socket_path); start_background_process(); } fprintf(stderr, "[fg] queueing job\n"); return queue_job(job); } int main(int argc, char *argv[]) { struct Job job; make_socket_path(); if (argc == 4) { // Parse command line arguments into the job struct job.pid = 0; strncpy(job.burstdir, argv[1], sizeof(job.burstdir)); strncpy(job.target, argv[2], sizeof(job.target)); if (strcmp(argv[3], "0") == 0) { job.keep = 0; } else { job.keep = 1; } return handle_job(job); } else { printf("usage: %s burst-dir target-name keep\n", argv[0]); exit(1); } }