60
60
/* The queue is a linked list of these. */
61
61
62
62
static struct dlq_item_s * queue_head = NULL ; /* Head of linked list for queue. */
63
+ static struct dlq_item_s * queue_tail = NULL ; /* Tail of linked list for queue. */
64
+ int queue_length = 0 ; /* Count of items in queue */
63
65
64
66
#if __WIN32__
65
67
@@ -75,8 +77,6 @@ static pthread_mutex_t dlq_mutex; /* Critical section for updating queues. */
75
77
76
78
static pthread_cond_t wake_up_cond ; /* Notify received packet processing thread when queue not empty. */
77
79
78
- static pthread_mutex_t wake_up_mutex ; /* Required by cond_wait. */
79
-
80
80
static volatile int recv_thread_is_waiting = 0 ;
81
81
82
82
#endif
@@ -117,7 +117,8 @@ void dlq_init (void)
117
117
dw_printf ("dlq_init ( )\n" );
118
118
#endif
119
119
120
- queue_head = NULL ;
120
+ queue_head = queue_tail = NULL ;
121
+ queue_length = 0 ;
121
122
122
123
123
124
#if DEBUG
@@ -129,13 +130,6 @@ void dlq_init (void)
129
130
InitializeCriticalSection (& dlq_cs );
130
131
#else
131
132
int err ;
132
- err = pthread_mutex_init (& wake_up_mutex , NULL );
133
- if (err != 0 ) {
134
- text_color_set (DW_COLOR_ERROR );
135
- dw_printf ("dlq_init: pthread_mutex_init err=%d" , err );
136
- perror ("" );
137
- exit (EXIT_FAILURE );
138
- }
139
133
err = pthread_mutex_init (& dlq_mutex , NULL );
140
134
if (err != 0 ) {
141
135
text_color_set (DW_COLOR_ERROR );
@@ -258,7 +252,7 @@ void dlq_rec_frame (int chan, int subchan, int slice, packet_t pp, alevel_t alev
258
252
259
253
/* Allocate a new queue item. */
260
254
261
- pnew = (struct dlq_item_s * ) calloc (sizeof (struct dlq_item_s ), 1 );
255
+ pnew = (struct dlq_item_s * ) malloc (sizeof (struct dlq_item_s ));
262
256
if (pnew == NULL ) {
263
257
text_color_set (DW_COLOR_ERROR );
264
258
dw_printf ("FATAL ERROR: Out of memory.\n" );
@@ -314,9 +308,6 @@ void dlq_rec_frame (int chan, int subchan, int slice, packet_t pp, alevel_t alev
314
308
315
309
static void append_to_queue (struct dlq_item_s * pnew )
316
310
{
317
- struct dlq_item_s * plast ;
318
- int queue_length = 0 ;
319
-
320
311
if ( ! was_init ) {
321
312
dlq_init ();
322
313
}
@@ -341,30 +332,16 @@ static void append_to_queue (struct dlq_item_s *pnew)
341
332
#endif
342
333
343
334
if (queue_head == NULL ) {
344
- queue_head = pnew ;
335
+ queue_head = queue_tail = pnew ;
345
336
queue_length = 1 ;
337
+ } else {
338
+ queue_tail -> nextp = pnew ;
339
+ queue_tail = pnew ;
340
+ queue_length ++ ;
346
341
}
347
- else {
348
- queue_length = 2 ; /* head + new one */
349
- plast = queue_head ;
350
- while (plast -> nextp != NULL ) {
351
- plast = plast -> nextp ;
352
- queue_length ++ ;
353
- }
354
- plast -> nextp = pnew ;
355
- }
356
-
357
342
358
343
#if __WIN32__
359
344
LeaveCriticalSection (& dlq_cs );
360
- #else
361
- err = pthread_mutex_unlock (& dlq_mutex );
362
- if (err != 0 ) {
363
- text_color_set (DW_COLOR_ERROR );
364
- dw_printf ("dlq append_to_queue: pthread_mutex_unlock err=%d" , err );
365
- perror ("" );
366
- exit (1 );
367
- }
368
345
#endif
369
346
#if DEBUG1
370
347
text_color_set (DW_COLOR_DEBUG );
@@ -416,7 +393,7 @@ static void append_to_queue (struct dlq_item_s *pnew)
416
393
* and blocking on a write.
417
394
*/
418
395
419
- if (queue_length > 10 ) {
396
+ if (queue_length > 15 ) {
420
397
text_color_set (DW_COLOR_ERROR );
421
398
dw_printf ("Received frame queue is out of control. Length=%d.\n" , queue_length );
422
399
dw_printf ("Reader thread is probably frozen.\n" );
@@ -431,29 +408,21 @@ static void append_to_queue (struct dlq_item_s *pnew)
431
408
#else
432
409
if (recv_thread_is_waiting ) {
433
410
434
- err = pthread_mutex_lock (& wake_up_mutex );
435
- if (err != 0 ) {
436
- text_color_set (DW_COLOR_ERROR );
437
- dw_printf ("dlq append_to_queue: pthread_mutex_lock wu err=%d" , err );
438
- perror ("" );
439
- exit (1 );
440
- }
441
-
442
411
err = pthread_cond_signal (& wake_up_cond );
443
412
if (err != 0 ) {
444
413
text_color_set (DW_COLOR_ERROR );
445
414
dw_printf ("dlq append_to_queue: pthread_cond_signal err=%d" , err );
446
415
perror ("" );
447
416
exit (1 );
448
417
}
418
+ }
449
419
450
- err = pthread_mutex_unlock (& wake_up_mutex );
451
- if (err != 0 ) {
452
- text_color_set (DW_COLOR_ERROR );
453
- dw_printf ("dlq append_to_queue: pthread_mutex_unlock wu err=%d" , err );
454
- perror ("" );
455
- exit (1 );
456
- }
420
+ err = pthread_mutex_unlock (& dlq_mutex );
421
+ if (err != 0 ) {
422
+ text_color_set (DW_COLOR_ERROR );
423
+ dw_printf ("dlq append_to_queue: pthread_mutex_unlock wu err=%d" , err );
424
+ perror ("" );
425
+ exit (1 );
457
426
}
458
427
#endif
459
428
@@ -1011,9 +980,29 @@ int dlq_wait_while_empty (double timeout)
1011
980
dlq_init ();
1012
981
}
1013
982
983
+ #if DEBUG1
984
+ text_color_set (DW_COLOR_DEBUG );
985
+ dw_printf ("dlq dlq_wait_while_empty: enter critical section\n" );
986
+ #endif
987
+ #if __WIN32__
988
+ EnterCriticalSection (& dlq_cs );
989
+ #else
990
+ int err ;
991
+ err = pthread_mutex_lock (& dlq_mutex );
992
+ if (err != 0 ) {
993
+ text_color_set (DW_COLOR_ERROR );
994
+ dw_printf ("dlq append_to_queue: pthread_mutex_lock err=%d" , err );
995
+ perror ("" );
996
+ exit (1 );
997
+ }
998
+ #endif
1014
999
1015
1000
if (queue_head == NULL ) {
1016
1001
1002
+ #if __WIN32__
1003
+ LeaveCriticalSection (& dlq_cs );
1004
+ #endif
1005
+
1017
1006
#if DEBUG
1018
1007
text_color_set (DW_COLOR_DEBUG );
1019
1008
dw_printf ("dlq_wait_while_empty (): prepare to SLEEP...\n" );
@@ -1037,45 +1026,41 @@ int dlq_wait_while_empty (double timeout)
1037
1026
else {
1038
1027
WaitForSingleObject (wake_up_event , INFINITE );
1039
1028
}
1029
+ } else {
1030
+ #if __WIN32__
1031
+ LeaveCriticalSection (& dlq_cs );
1032
+ #endif
1033
+ }
1040
1034
1041
1035
#else
1042
1036
int err ;
1043
1037
1044
- err = pthread_mutex_lock (& wake_up_mutex );
1045
- if (err != 0 ) {
1046
- text_color_set (DW_COLOR_ERROR );
1047
- dw_printf ("dlq_wait_while_empty: pthread_mutex_lock wu err=%d" , err );
1048
- perror ("" );
1049
- exit (1 );
1050
- }
1051
-
1052
1038
recv_thread_is_waiting = 1 ;
1053
1039
if (timeout != 0.0 ) {
1054
1040
struct timespec abstime ;
1055
1041
1056
1042
abstime .tv_sec = (time_t )(long )timeout ;
1057
1043
abstime .tv_nsec = (long )((timeout - (long )abstime .tv_sec ) * 1000000000.0 );
1058
1044
1059
- err = pthread_cond_timedwait (& wake_up_cond , & wake_up_mutex , & abstime );
1045
+ err = pthread_cond_timedwait (& wake_up_cond , & dlq_mutex , & abstime );
1060
1046
if (err == ETIMEDOUT ) {
1061
1047
timed_out_result = 1 ;
1062
1048
}
1063
1049
}
1064
1050
else {
1065
- err = pthread_cond_wait (& wake_up_cond , & wake_up_mutex );
1051
+ err = pthread_cond_wait (& wake_up_cond , & dlq_mutex );
1066
1052
}
1067
1053
recv_thread_is_waiting = 0 ;
1068
-
1069
- err = pthread_mutex_unlock (& wake_up_mutex );
1070
- if (err != 0 ) {
1071
- text_color_set (DW_COLOR_ERROR );
1072
- dw_printf ("dlq_wait_while_empty: pthread_mutex_unlock wu err=%d" , err );
1073
- perror ("" );
1074
- exit (1 );
1075
- }
1076
- #endif
1077
1054
}
1078
1055
1056
+ err = pthread_mutex_unlock (& dlq_mutex );
1057
+ if (err != 0 ) {
1058
+ text_color_set (DW_COLOR_ERROR );
1059
+ dw_printf ("dlq_wait_while_empty: pthread_mutex_unlock wu err=%d" , err );
1060
+ perror ("" );
1061
+ exit (1 );
1062
+ }
1063
+ #endif
1079
1064
1080
1065
#if DEBUG
1081
1066
text_color_set (DW_COLOR_DEBUG );
@@ -1105,7 +1090,6 @@ struct dlq_item_s *dlq_remove (void)
1105
1090
{
1106
1091
1107
1092
struct dlq_item_s * result = NULL ;
1108
- //int err;
1109
1093
1110
1094
#if DEBUG1
1111
1095
text_color_set (DW_COLOR_DEBUG );
@@ -1133,6 +1117,11 @@ struct dlq_item_s *dlq_remove (void)
1133
1117
if (queue_head != NULL ) {
1134
1118
result = queue_head ;
1135
1119
queue_head = queue_head -> nextp ;
1120
+ queue_length -- ;
1121
+
1122
+ if (queue_head == NULL ) {
1123
+ queue_tail = NULL ;
1124
+ }
1136
1125
}
1137
1126
1138
1127
#if __WIN32__
0 commit comments