pipeline.c 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144
  1. #include "pipeline.h"
  2. #include <assert.h>
  3. #include <glib-unix.h>
  4. #include <gtk/gtk.h>
  5. struct _MPPipeline {
  6. GMainContext *main_context;
  7. GMainLoop *main_loop;
  8. pthread_t thread;
  9. };
  10. static void *
  11. thread_main_loop(void *arg)
  12. {
  13. MPPipeline *pipeline = arg;
  14. g_main_loop_run(pipeline->main_loop);
  15. return NULL;
  16. }
  17. MPPipeline *
  18. mp_pipeline_new()
  19. {
  20. MPPipeline *pipeline = malloc(sizeof(MPPipeline));
  21. pipeline->main_context = g_main_context_new();
  22. pipeline->main_loop = g_main_loop_new(pipeline->main_context, false);
  23. int res =
  24. pthread_create(&pipeline->thread, NULL, thread_main_loop, pipeline);
  25. assert(res == 0);
  26. return pipeline;
  27. }
  28. struct invoke_args {
  29. MPPipeline *pipeline;
  30. MPPipelineCallback callback;
  31. };
  32. static bool
  33. invoke_impl(struct invoke_args *args)
  34. {
  35. args->callback(args->pipeline, args + 1);
  36. return false;
  37. }
  38. void
  39. mp_pipeline_invoke(MPPipeline *pipeline,
  40. MPPipelineCallback callback,
  41. const void *data,
  42. size_t size)
  43. {
  44. if (pthread_self() != pipeline->thread) {
  45. struct invoke_args *args = malloc(sizeof(struct invoke_args) + size);
  46. args->pipeline = pipeline;
  47. args->callback = callback;
  48. if (size > 0) {
  49. memcpy(args + 1, data, size);
  50. }
  51. g_main_context_invoke_full(pipeline->main_context,
  52. G_PRIORITY_DEFAULT,
  53. (GSourceFunc)invoke_impl,
  54. args,
  55. free);
  56. } else {
  57. callback(pipeline, data);
  58. }
  59. }
  60. static bool
  61. unlock_mutex(GMutex *mutex)
  62. {
  63. g_mutex_unlock(mutex);
  64. return false;
  65. }
  66. void
  67. mp_pipeline_sync(MPPipeline *pipeline)
  68. {
  69. GMutex mutex;
  70. g_mutex_init(&mutex);
  71. g_mutex_lock(&mutex);
  72. g_main_context_invoke_full(pipeline->main_context,
  73. G_PRIORITY_LOW,
  74. (GSourceFunc)unlock_mutex,
  75. &mutex,
  76. NULL);
  77. g_mutex_lock(&mutex);
  78. g_mutex_unlock(&mutex);
  79. g_mutex_clear(&mutex);
  80. }
  81. void
  82. mp_pipeline_free(MPPipeline *pipeline)
  83. {
  84. g_main_loop_quit(pipeline->main_loop);
  85. // Force the main thread loop to wake up, otherwise we might not exit
  86. g_main_context_wakeup(pipeline->main_context);
  87. void *r;
  88. pthread_join(pipeline->thread, &r);
  89. free(pipeline);
  90. }
  91. struct capture_source_args {
  92. MPCamera *camera;
  93. void (*callback)(MPBuffer, void *);
  94. void *user_data;
  95. };
  96. static bool
  97. on_capture(int fd, GIOCondition condition, struct capture_source_args *args)
  98. {
  99. MPBuffer buffer;
  100. if (mp_camera_capture_buffer(args->camera, &buffer)) {
  101. args->callback(buffer, args->user_data);
  102. }
  103. return true;
  104. }
  105. // Not thread safe
  106. GSource *
  107. mp_pipeline_add_capture_source(MPPipeline *pipeline,
  108. MPCamera *camera,
  109. void (*callback)(MPBuffer, void *),
  110. void *user_data)
  111. {
  112. int video_fd = mp_camera_get_video_fd(camera);
  113. GSource *video_source = g_unix_fd_source_new(video_fd, G_IO_IN);
  114. struct capture_source_args *args =
  115. malloc(sizeof(struct capture_source_args));
  116. args->camera = camera;
  117. args->callback = callback;
  118. args->user_data = user_data;
  119. g_source_set_callback(video_source, (GSourceFunc)on_capture, args, free);
  120. g_source_attach(video_source, pipeline->main_context);
  121. return video_source;
  122. }