pipeline.c 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134
  1. #include "pipeline.h"
  2. #include <gtk/gtk.h>
  3. #include <glib-unix.h>
  4. #include <assert.h>
  5. struct _MPPipeline {
  6. GMainContext *main_context;
  7. GMainLoop *main_loop;
  8. pthread_t thread;
  9. };
  10. static void *
  11. thread_main_loop(void *arg)
  12. {
  13. MPPipeline *pipeline = arg;
  14. g_main_loop_run(pipeline->main_loop);
  15. return NULL;
  16. }
  17. MPPipeline *
  18. mp_pipeline_new()
  19. {
  20. MPPipeline *pipeline = malloc(sizeof(MPPipeline));
  21. pipeline->main_context = g_main_context_new();
  22. pipeline->main_loop = g_main_loop_new(pipeline->main_context, false);
  23. int res =
  24. pthread_create(&pipeline->thread, NULL, thread_main_loop, pipeline);
  25. assert(res == 0);
  26. return pipeline;
  27. }
  28. struct invoke_args {
  29. MPPipeline *pipeline;
  30. MPPipelineCallback callback;
  31. };
  32. static bool
  33. invoke_impl(struct invoke_args *args)
  34. {
  35. args->callback(args->pipeline, args + 1);
  36. return false;
  37. }
  38. void
  39. mp_pipeline_invoke(MPPipeline *pipeline, MPPipelineCallback callback,
  40. const void *data, size_t size)
  41. {
  42. if (pthread_self() != pipeline->thread) {
  43. struct invoke_args *args = malloc(sizeof(struct invoke_args) + size);
  44. args->pipeline = pipeline;
  45. args->callback = callback;
  46. if (size > 0) {
  47. memcpy(args + 1, data, size);
  48. }
  49. g_main_context_invoke_full(pipeline->main_context,
  50. G_PRIORITY_DEFAULT,
  51. (GSourceFunc)invoke_impl, args, free);
  52. } else {
  53. callback(pipeline, data);
  54. }
  55. }
  56. static bool
  57. unlock_mutex(GMutex *mutex)
  58. {
  59. g_mutex_unlock(mutex);
  60. return false;
  61. }
  62. void
  63. mp_pipeline_sync(MPPipeline *pipeline)
  64. {
  65. GMutex mutex;
  66. g_mutex_init(&mutex);
  67. g_mutex_lock(&mutex);
  68. g_main_context_invoke_full(pipeline->main_context, G_PRIORITY_LOW, (GSourceFunc)unlock_mutex, &mutex, NULL);
  69. g_mutex_lock(&mutex);
  70. g_mutex_unlock(&mutex);
  71. g_mutex_clear(&mutex);
  72. }
  73. void
  74. mp_pipeline_free(MPPipeline *pipeline)
  75. {
  76. g_main_loop_quit(pipeline->main_loop);
  77. // Force the main thread loop to wake up, otherwise we might not exit
  78. g_main_context_wakeup(pipeline->main_context);
  79. void *r;
  80. pthread_join(pipeline->thread, &r);
  81. free(pipeline);
  82. }
  83. struct capture_source_args {
  84. MPCamera *camera;
  85. void (*callback)(MPBuffer, void *);
  86. void *user_data;
  87. };
  88. static bool
  89. on_capture(int fd, GIOCondition condition, struct capture_source_args *args)
  90. {
  91. MPBuffer buffer;
  92. if (mp_camera_capture_buffer(args->camera, &buffer)) {
  93. args->callback(buffer, args->user_data);
  94. }
  95. return true;
  96. }
  97. // Not thread safe
  98. GSource *
  99. mp_pipeline_add_capture_source(MPPipeline *pipeline, MPCamera *camera,
  100. void (*callback)(MPBuffer, void *), void *user_data)
  101. {
  102. int video_fd = mp_camera_get_video_fd(camera);
  103. GSource *video_source = g_unix_fd_source_new(video_fd, G_IO_IN);
  104. struct capture_source_args *args =
  105. malloc(sizeof(struct capture_source_args));
  106. args->camera = camera;
  107. args->callback = callback;
  108. args->user_data = user_data;
  109. g_source_set_callback(video_source, (GSourceFunc)on_capture, args, free);
  110. g_source_attach(video_source, pipeline->main_context);
  111. return video_source;
  112. }