Subversion Repositories f9daq

Rev

Rev 195 | Go to most recent revision | Details | Compare with Previous | Last modification | View Log | RSS feed

Rev Author Line No. Line
195 f9daq 1
/********************************************************************\
2
 
3
  Name:         rb.c
4
  Created by:   Stefan Ritt
5
 
6
  $Id: rb.cpp 21437 2014-07-30 14:13:29Z ritt $
7
 
8
\********************************************************************/
9
 
10
#include <stdio.h>
11
#ifdef OS_DARWIN
12
#include <sys/malloc.h>
13
#else
14
#include <malloc.h>
15
#endif
16
#include <string.h>
17
#include <assert.h>
18
 
19
#include "rb.h"
20
 
21
/********************************************************************\
22
*                                                                    *
23
*                 Ring buffer functions                              *
24
*                                                                    *
25
* Provide an inter-thread buffer scheme for handling front-end       *
26
* events. This code allows concurrent data acquisition, calibration  *
27
* and network transfer on a multi-CPU machine. One thread reads      *
28
* out the data, passes it vis the ring buffer functions              *
29
* to another thread running on the other CPU, which can then         *
30
* calibrate and/or send the data over the network.                   *
31
*                                                                    *
32
\********************************************************************/
33
 
34
typedef struct {
35
   unsigned char *buffer;
36
   unsigned int size;
37
   unsigned int max_event_size;
38
   unsigned char *rp;
39
   unsigned char *wp;
40
   unsigned char *ep;
41
} RING_BUFFER;
42
 
43
#define MAX_RING_BUFFER 100
44
RING_BUFFER rb[MAX_RING_BUFFER];
45
 
46
volatile int _rb_nonblocking = 0;
47
 
48
extern void ss_sleep(int ms);
49
 
50
int rb_set_nonblocking()
51
/********************************************************************\
52
 
53
  Routine: rb_set_nonblocking
54
 
55
  Purpose: Set all rb_get_xx to nonblocking. Needed in multi-thread
56
           environments for stopping all theads without deadlock
57
 
58
  Input:
59
    NONE
60
 
61
  Output:
62
    NONE
63
 
64
  Function value:
65
    RB_SUCCESS       Successful completion
66
 
67
\********************************************************************/
68
{
69
   _rb_nonblocking = 1;
70
 
71
   return RB_SUCCESS;
72
}
73
 
74
int rb_create(int size, int max_event_size, int *handle)
75
/********************************************************************\
76
 
77
  Routine: rb_create
78
 
79
  Purpose: Create a ring buffer with a given size
80
 
81
  Input:
82
    int size             Size of ring buffer, must be larger than
83
                         2*max_event_size
84
    int max_event_size   Maximum event size to be placed into
85
                         ring buffer
86
  Output:
87
    int *handle          Handle to ring buffer
88
 
89
  Function value:
90
    DB_SUCCESS           Successful completion
91
    DB_NO_MEMORY         Maximum number of ring buffers exceeded
92
    DB_INVALID_PARAM     Invalid event size specified
93
 
94
\********************************************************************/
95
{
96
   int i;
97
 
98
   for (i = 0; i < MAX_RING_BUFFER; i++)
99
      if (rb[i].buffer == NULL)
100
         break;
101
 
102
   if (i == MAX_RING_BUFFER)
103
      return RB_NO_MEMORY;
104
 
105
   if (size < max_event_size * 2)
106
      return RB_INVALID_PARAM;
107
 
108
   memset(&rb[i], 0, sizeof(RING_BUFFER));
109
   rb[i].buffer = (unsigned char *) malloc(size);
110
   assert(rb[i].buffer);
111
   rb[i].size = size;
112
   rb[i].max_event_size = max_event_size;
113
   rb[i].rp = rb[i].buffer;
114
   rb[i].wp = rb[i].buffer;
115
   rb[i].ep = rb[i].buffer;
116
 
117
   *handle = i + 1;
118
 
119
   return RB_SUCCESS;
120
}
121
 
122
int rb_delete(int handle)
123
/********************************************************************\
124
 
125
  Routine: rb_delete
126
 
127
  Purpose: Delete a ring buffer
128
 
129
  Input:
130
    none
131
  Output:
132
    int handle       Handle to ring buffer
133
 
134
  Function value:
135
    DB_SUCCESS       Successful completion
136
 
137
\********************************************************************/
138
{
139
   if (handle < 0 || handle >= MAX_RING_BUFFER || rb[handle - 1].buffer == NULL)
140
      return RB_INVALID_HANDLE;
141
 
142
   free(rb[handle - 1].buffer);
143
   memset(&rb[handle - 1], 0, sizeof(RING_BUFFER));
144
 
145
   return RB_SUCCESS;
146
}
147
 
148
int rb_get_wp(int handle, void **p, int millisec)
149
/********************************************************************\
150
 
151
Routine: rb_get_wp
152
 
153
  Purpose: Retrieve write pointer where new data can be written
154
 
155
  Input:
156
     int handle               Ring buffer handle
157
     int millisec             Optional timeout in milliseconds if
158
                              buffer is full. Zero to not wait at
159
                              all (non-blocking)
160
 
161
  Output:
162
    char **p                  Write pointer
163
 
164
  Function value:
165
    DB_SUCCESS       Successful completion
166
 
167
\********************************************************************/
168
{
169
   int h, i;
170
   unsigned char *rp;
171
 
172
   if (handle < 1 || handle > MAX_RING_BUFFER || rb[handle - 1].buffer == NULL)
173
      return RB_INVALID_HANDLE;
174
 
175
   h = handle - 1;
176
 
177
   for (i = 0; i <= millisec / 10; i++) {
178
 
179
      rp = rb[h].rp;            // keep local copy, rb[h].rp might be changed by other thread
180
 
181
      /* check if enough size for wp >= rp without wrap-around */
182
      if (rb[h].wp >= rp
183
          && rb[h].wp + rb[h].max_event_size <= rb[h].buffer + rb[h].size - rb[h].max_event_size) {
184
         *p = rb[h].wp;
185
         return RB_SUCCESS;
186
      }
187
 
188
      /* check if enough size for wp >= rp with wrap-around */
189
      if (rb[h].wp >= rp && rb[h].wp + rb[h].max_event_size > rb[h].buffer + rb[h].size - rb[h].max_event_size && rb[h].rp > rb[h].buffer) {    // next increment of wp wraps around, so need space at beginning
190
         *p = rb[h].wp;
191
         return RB_SUCCESS;
192
      }
193
 
194
      /* check if enough size for wp < rp */
195
      if (rb[h].wp < rp && rb[h].wp + rb[h].max_event_size < rp) {
196
         *p = rb[h].wp;
197
         return RB_SUCCESS;
198
      }
199
 
200
      if (millisec == 0)
201
         return RB_TIMEOUT;
202
 
203
      if (_rb_nonblocking)
204
         return RB_TIMEOUT;
205
 
206
      /* wait one time slice */
207
      ss_sleep(10);
208
   }
209
 
210
   return RB_TIMEOUT;
211
}
212
 
213
int rb_increment_wp(int handle, int size)
214
/********************************************************************\
215
 
216
  Routine: rb_increment_wp
217
 
218
  Purpose: Increment current write pointer, making the data at
219
           the write pointer available to the receiving thread
220
 
221
  Input:
222
     int handle               Ring buffer handle
223
     int size                 Number of bytes placed at the WP
224
 
225
  Output:
226
    NONE
227
 
228
  Function value:
229
    RB_SUCCESS                Successful completion
230
    RB_INVALID_PARAM          Event size too large or invalid handle
231
\********************************************************************/
232
{
233
   int h;
234
   unsigned char *new_wp;
235
 
236
   if (handle < 1 || handle > MAX_RING_BUFFER || rb[handle - 1].buffer == NULL)
237
      return RB_INVALID_HANDLE;
238
 
239
   h = handle - 1;
240
 
241
   if ((unsigned int) size > rb[h].max_event_size)
242
      return RB_INVALID_PARAM;
243
 
244
   new_wp = rb[h].wp + size;
245
 
246
   /* wrap around wp if not enough space */
247
   if (new_wp > rb[h].buffer + rb[h].size - rb[h].max_event_size) {
248
      rb[h].ep = new_wp;
249
      new_wp = rb[h].buffer;
250
      assert(rb[h].rp != rb[h].buffer);
251
   }
252
 
253
   rb[h].wp = new_wp;
254
 
255
   return RB_SUCCESS;
256
}
257
 
258
int rb_get_rp(int handle, void **p, int millisec)
259
/********************************************************************\
260
 
261
  Routine: rb_get_rp
262
 
263
  Purpose: Obtain the current read pointer at which new data is
264
           available with optional timeout
265
 
266
  Input:
267
    int handle               Ring buffer handle
268
    int millisec             Optional timeout in milliseconds if
269
                             buffer is full. Zero to not wait at
270
                             all (non-blocking)
271
 
272
  Output:
273
    char **p                 Address of pointer pointing to newly
274
                             available data. If p == NULL, only
275
                             return status.
276
 
277
  Function value:
278
    RB_SUCCESS       Successful completion
279
 
280
\********************************************************************/
281
{
282
   int i, h;
283
 
284
   if (handle < 1 || handle > MAX_RING_BUFFER || rb[handle - 1].buffer == NULL)
285
      return RB_INVALID_HANDLE;
286
 
287
   h = handle - 1;
288
 
289
   for (i = 0; i <= millisec / 10; i++) {
290
 
291
      if (rb[h].wp != rb[h].rp) {
292
         if (p != NULL)
293
            *p = rb[handle - 1].rp;
294
         return RB_SUCCESS;
295
      }
296
 
297
      if (millisec == 0)
298
         return RB_TIMEOUT;
299
 
300
      if (_rb_nonblocking)
301
         return RB_TIMEOUT;
302
 
303
      /* wait one time slice */
304
      ss_sleep(10);
305
   }
306
 
307
   return RB_TIMEOUT;
308
}
309
 
310
int rb_increment_rp(int handle, int size)
311
/********************************************************************\
312
 
313
  Routine: rb_increment_rp
314
 
315
  Purpose: Increment current read pointer, freeing up space for
316
           the writing thread.
317
 
318
  Input:
319
     int handle               Ring buffer handle
320
     int size                 Number of bytes to free up at current
321
                              read pointer
322
 
323
  Output:
324
    NONE
325
 
326
  Function value:
327
    RB_SUCCESS                Successful completion
328
    RB_INVALID_PARAM          Event size too large or invalid handle
329
 
330
\********************************************************************/
331
{
332
   int h;
333
 
334
   unsigned char *new_rp;
335
 
336
   if (handle < 1 || handle > MAX_RING_BUFFER || rb[handle - 1].buffer == NULL)
337
      return RB_INVALID_HANDLE;
338
 
339
   h = handle - 1;
340
 
341
   if ((unsigned int) size > rb[h].max_event_size)
342
      return RB_INVALID_PARAM;
343
 
344
   new_rp = rb[h].rp + size;
345
 
346
   /* wrap around if not enough space left */
347
   if (new_rp + rb[h].max_event_size > rb[h].buffer + rb[h].size)
348
      new_rp = rb[h].buffer;
349
 
350
   rb[handle - 1].rp = new_rp;
351
 
352
   return RB_SUCCESS;
353
}
354
 
355
int rb_get_buffer_level(int handle, int *n_bytes)
356
/********************************************************************\
357
 
358
  Routine: rb_get_buffer_level
359
 
360
  Purpose: Return number of bytes in a ring buffer
361
 
362
  Input:
363
    int handle              Handle of the buffer to get the info
364
 
365
  Output:
366
    int *n_bytes            Number of bytes in buffer
367
 
368
  Function value:
369
    RB_SUCCESS              Successful completion
370
    RB_INVALID_HANDLE       Buffer handle is invalid
371
 
372
\********************************************************************/
373
{
374
   int h;
375
 
376
   if (handle < 1 || handle > MAX_RING_BUFFER || rb[handle - 1].buffer == NULL)
377
      return RB_INVALID_HANDLE;
378
 
379
   h = handle - 1;
380
 
381
   if (rb[h].wp >= rb[h].rp)
382
      *n_bytes = rb[h].wp - rb[h].rp;
383
   else
384
      *n_bytes = rb[h].ep - rb[h].rp + rb[h].wp - rb[h].buffer;
385
 
386
   return RB_SUCCESS;
387
}