GNU/Linux >> Belajar Linux >  >> Linux

Linux non-blocking fifo (on demand logging)

Ini adalah utas yang (sangat) lama, tetapi akhir-akhir ini saya mengalami masalah serupa. Sebenarnya, yang saya butuhkan adalah kloning stdin ke stdout dengan salinan ke pipa yang tidak menghalangi. ftee yang diusulkan di jawaban pertama benar-benar membantu di sana, tetapi (untuk kasus penggunaan saya) terlalu fluktuatif. Berarti saya kehilangan data yang bisa saya proses jika saya mendapatkannya tepat waktu.

Skenario yang saya hadapi adalah saya memiliki proses (some_process) yang mengumpulkan beberapa data dan menulis hasilnya setiap tiga detik ke stdout. Penyiapan (disederhanakan) terlihat seperti ini (dalam penyiapan sebenarnya saya menggunakan pipa bernama):

some_process | ftee >(onlineAnalysis.pl > results) | gzip > raw_data.gz

Sekarang, raw_data.gz harus dikompresi dan harus lengkap. ftee melakukan pekerjaan ini dengan sangat baik. Tapi pipa yang saya gunakan di tengah terlalu lambat untuk mengambil data yang keluar - tetapi cukup cepat untuk memproses semuanya jika bisa, yang diuji dengan tee normal. Namun, tee normal memblokir jika terjadi sesuatu pada pipa yang tidak disebutkan namanya, dan karena saya ingin dapat terhubung sesuai permintaan, tee bukanlah pilihan. Kembali ke topik:Menjadi lebih baik ketika saya meletakkan buffer di antaranya, menghasilkan:

some_process | ftee >(mbuffer -m 32M| onlineAnalysis.pl > results) | gzip > raw_data.gz

Tapi itu masih kehilangan data yang bisa saya proses. Jadi saya melanjutkan dan memperluas ftee yang diusulkan sebelumnya ke versi buffered (bftee). Itu masih memiliki semua properti yang sama, tetapi menggunakan buffer internal (tidak efisien?) Jika penulisan gagal. Itu masih kehilangan data jika buffer berjalan penuh, tetapi berfungsi dengan baik untuk kasus saya. Seperti biasa, ada banyak ruang untuk perbaikan, tetapi saat saya menyalin kode dari sini, saya ingin membagikannya kembali kepada orang-orang yang mungkin membutuhkannya.

/* bftee - clone stdin to stdout and to a buffered, non-blocking pipe 
    (c) [email protected]
    (c) [email protected]
    WTFPL Licence */

    #include <stdio.h>
    #include <stdlib.h>
    #include <string.h>
    #include <sys/types.h>
    #include <sys/stat.h>
    #include <fcntl.h>
    #include <errno.h>
    #include <signal.h>
    #include <unistd.h>

    // the number of sBuffers that are being held at a maximum
    #define BUFFER_SIZE 4096
    #define BLOCK_SIZE 2048

    typedef struct {
      char data[BLOCK_SIZE];
      int bytes;
    } sBuffer;

    typedef struct {
      sBuffer *data;  //array of buffers
      int bufferSize; // number of buffer in data
      int start;      // index of the current start buffer
      int end;        // index of the current end buffer
      int active;     // number of active buffer (currently in use)
      int maxUse;     // maximum number of buffers ever used
      int drops;      // number of discarded buffer due to overflow
      int sWrites;    // number of buffer written to stdout
      int pWrites;    // number of buffers written to pipe
    } sQueue;

    void InitQueue(sQueue*, int);              // initialized the Queue
    void PushToQueue(sQueue*, sBuffer*, int);  // pushes a buffer into Queue at the end 
    sBuffer *RetrieveFromQueue(sQueue*);       // returns the first entry of the buffer and removes it or NULL is buffer is empty
    sBuffer *PeakAtQueue(sQueue*);             // returns the first entry of the buffer but does not remove it. Returns NULL on an empty buffer
    void ShrinkInQueue(sQueue *queue, int);    // shrinks the first entry of the buffer by n-bytes. Buffer is removed if it is empty
    void DelFromQueue(sQueue *queue);          // removes the first entry of the queue

    static void sigUSR1(int);                  // signal handled for SUGUSR1 - used for stats output to stderr
    static void sigINT(int);                   // signla handler for SIGKILL/SIGTERM - allows for a graceful stop ?

    sQueue queue;                              // Buffer storing the overflow
    volatile int quit;                         // for quiting the main loop

    int main(int argc, char *argv[])
    {   
        int readfd, writefd;
        struct stat status;
        char *fifonam;
        sBuffer buffer;
        ssize_t bytes;
        int bufferSize = BUFFER_SIZE;

        signal(SIGPIPE, SIG_IGN);
        signal(SIGUSR1, sigUSR1);
        signal(SIGTERM, sigINT);
        signal(SIGINT,  sigINT);

        /** Handle commandline args and open the pipe for non blocking writing **/

        if(argc < 2 || argc > 3)
        {   
            printf("Usage:\n someprog 2>&1 | %s FIFO [BufferSize]\n"
                   "FIFO - path to a named pipe, required argument\n"
                   "BufferSize - temporary Internal buffer size in case write to FIFO fails\n", argv[0]);
            exit(EXIT_FAILURE);
        }

        fifonam = argv[1];
        if (argc == 3) {
          bufferSize = atoi(argv[2]);
          if (bufferSize == 0) bufferSize = BUFFER_SIZE;
        }

        readfd = open(fifonam, O_RDONLY | O_NONBLOCK);
        if(-1==readfd)
        {   
            perror("bftee: readfd: open()");
            exit(EXIT_FAILURE);
        }

        if(-1==fstat(readfd, &status))
        {
            perror("bftee: fstat");
            close(readfd);
            exit(EXIT_FAILURE);
        }

        if(!S_ISFIFO(status.st_mode))
        {
            printf("bftee: %s in not a fifo!\n", fifonam);
            close(readfd);
            exit(EXIT_FAILURE);
        }

        writefd = open(fifonam, O_WRONLY | O_NONBLOCK);
        if(-1==writefd)
        {
            perror("bftee: writefd: open()");
            close(readfd);
            exit(EXIT_FAILURE);
        }

        close(readfd);


        InitQueue(&queue, bufferSize);
        quit = 0;

        while(!quit)
        {
            // read from STDIN
            bytes = read(STDIN_FILENO, buffer.data, sizeof(buffer.data));

            // if read failed due to interrupt, then retry, otherwise STDIN has closed and we should stop reading
            if (bytes < 0 && errno == EINTR) continue;
            if (bytes <= 0) break;

            // save the number if read bytes in the current buffer to be processed
            buffer.bytes = bytes;

            // this is a blocking write. As long as buffer is smaller than 4096 Bytes, the write is atomic to a pipe in Linux
            // thus, this cannot be interrupted. however, to be save this should handle the error cases of partial or interrupted write none the less.
            bytes = write(STDOUT_FILENO, buffer.data, buffer.bytes);
            queue.sWrites++;

            if(-1==bytes) {
                perror("ftee: writing to stdout");
                break;
            }

            sBuffer *tmpBuffer = NULL;

            // if the queue is empty (tmpBuffer gets set to NULL) the this does nothing - otherwise it tries to write
            // the buffered data to the pipe. This continues until the Buffer is empty or the write fails.
            // NOTE: bytes cannot be -1  (that would have failed just before) when the loop is entered. 
            while ((bytes != -1) && (tmpBuffer = PeakAtQueue(&queue)) != NULL) {
               // write the oldest buffer to the pipe
               bytes = write(writefd, tmpBuffer->data, tmpBuffer->bytes);

               // the  written bytes are equal to the buffer size, the write is successful - remove the buffer and continue
               if (bytes == tmpBuffer->bytes) {
                 DelFromQueue(&queue);
                 queue.pWrites++;
               } else if (bytes > 0) {
                 // on a positive bytes value there was a partial write. we shrink the current buffer
                 //  and handle this as a write failure
                 ShrinkInQueue(&queue, bytes);
                 bytes = -1;
               }
            }
            // There are several cases here:
            // 1.) The Queue is empty -> bytes is still set from the write to STDOUT. in this case, we try to write the read data directly to the pipe
            // 2.) The Queue was not empty but is now -> bytes is set from the last write (which was successful) and is bigger 0. also try to write the data
            // 3.) The Queue was not empty and still is not -> there was a write error before (even partial), and bytes is -1. Thus this line is skipped.
            if (bytes != -1) bytes = write(writefd, buffer.data, buffer.bytes);

            // again, there are several cases what can happen here
            // 1.) the write before was successful -> in this case bytes is equal to buffer.bytes and nothing happens
            // 2.) the write just before is partial or failed all together - bytes is either -1 or smaller than buffer.bytes -> add the remaining data to the queue
            // 3.) the write before did not happen as the buffer flush already had an error. In this case bytes is -1 -> add the remaining data to the queue
            if (bytes != buffer.bytes)
              PushToQueue(&queue, &buffer, bytes);
            else 
              queue.pWrites++;
        }

        // once we are done with STDIN, try to flush the buffer to the named pipe
        if (queue.active > 0) {
           //set output buffer to block - here we wait until we can write everything to the named pipe
           // --> this does not seem to work - just in case there is a busy loop that waits for buffer flush aswell. 
           int saved_flags = fcntl(writefd, F_GETFL);
           int new_flags = saved_flags & ~O_NONBLOCK;
           int res = fcntl(writefd, F_SETFL, new_flags);

           sBuffer *tmpBuffer = NULL;
           //TODO: this does not handle partial writes yet
           while ((tmpBuffer = PeakAtQueue(&queue)) != NULL) {
             int bytes = write(writefd, tmpBuffer->data, tmpBuffer->bytes);
             if (bytes != -1) DelFromQueue(&queue);
           }
        }

        close(writefd);

    }


    /** init a given Queue **/
    void InitQueue (sQueue *queue, int bufferSize) {
      queue->data = calloc(bufferSize, sizeof(sBuffer));
      queue->bufferSize = bufferSize;
      queue->start = 0;
      queue->end = 0;
      queue->active = 0;
      queue->maxUse = 0;
      queue->drops = 0;
      queue->sWrites = 0;
      queue->pWrites = 0;
    }

    /** push a buffer into the Queue**/
    void PushToQueue(sQueue *queue, sBuffer *p, int offset)
    {

        if (offset < 0) offset = 0;      // offset cannot be smaller than 0 - if that is the case, we were given an error code. Set it to 0 instead
        if (offset == p->bytes) return;  // in this case there are 0 bytes to add to the queue. Nothing to write

        // this should never happen - offset cannot be bigger than the buffer itself. Panic action
        if (offset > p->bytes) {perror("got more bytes to buffer than we read\n"); exit(EXIT_FAILURE);}

        // debug output on a partial write. TODO: remove this line
        // if (offset > 0 ) fprintf(stderr, "partial write to buffer\n");

        // copy the data from the buffer into the queue and remember its size
        memcpy(queue->data[queue->end].data, p->data + offset , p->bytes-offset);
        queue->data[queue->end].bytes = p->bytes - offset;

        // move the buffer forward
        queue->end = (queue->end + 1) % queue->bufferSize;

        // there is still space in the buffer
        if (queue->active < queue->bufferSize)
        {
            queue->active++;
            if (queue->active > queue->maxUse) queue->maxUse = queue->active;
        } else {
            // Overwriting the oldest. Move start to next-oldest
            queue->start = (queue->start + 1) % queue->bufferSize;
            queue->drops++;
        }
    }

    /** return the oldest entry in the Queue and remove it or return NULL in case the Queue is empty **/
    sBuffer *RetrieveFromQueue(sQueue *queue)
    {
        if (!queue->active) { return NULL; }

        queue->start = (queue->start + 1) % queue->bufferSize;
        queue->active--;
        return &(queue->data[queue->start]);
    }

    /** return the oldest entry in the Queue or NULL if the Queue is empty. Does not remove the entry **/
    sBuffer *PeakAtQueue(sQueue *queue)
    {
        if (!queue->active) { return NULL; }
        return &(queue->data[queue->start]);
    }

    /*** Shrinks the oldest entry i the Queue by bytes. Removes the entry if buffer of the oldest entry runs empty*/
    void ShrinkInQueue(sQueue *queue, int bytes) {

      // cannot remove negative amount of bytes - this is an error case. Ignore it
      if (bytes <= 0) return;

      // remove the entry if the offset is equal to the buffer size
      if (queue->data[queue->start].bytes == bytes) {
        DelFromQueue(queue);
        return;
      };

      // this is a partial delete
      if (queue->data[queue->start].bytes > bytes) {
        //shift the memory by the offset
        memmove(queue->data[queue->start].data, queue->data[queue->start].data + bytes, queue->data[queue->start].bytes - bytes);
        queue->data[queue->start].bytes = queue->data[queue->start].bytes - bytes;
        return;
      }

      // panic is the are to remove more than we have the buffer
      if (queue->data[queue->start].bytes < bytes) {
        perror("we wrote more than we had - this should never happen\n");
        exit(EXIT_FAILURE);
        return;
      }
    }

    /** delete the oldest entry from the queue. Do nothing if the Queue is empty **/
    void DelFromQueue(sQueue *queue)
    {
        if (queue->active > 0) {
          queue->start = (queue->start + 1) % queue->bufferSize;
          queue->active--;
        }
    }

    /** Stats output on SIGUSR1 **/
    static void sigUSR1(int signo) {
      fprintf(stderr, "Buffer use: %i (%i/%i), STDOUT: %i PIPE: %i:%i\n", queue.active, queue.maxUse, queue.bufferSize, queue.sWrites, queue.pWrites, queue.drops);
    }

    /** handle signal for terminating **/
    static void sigINT(int signo) {
      quit++;
      if (quit > 1) exit(EXIT_FAILURE);
    }

Versi ini membutuhkan satu lagi argumen (opsional) yang menentukan jumlah blok yang akan disangga untuk pipa. Contoh panggilan saya sekarang terlihat seperti ini:

some_process | bftee >(onlineAnalysis.pl > results) 16384 | gzip > raw_data.gz

menghasilkan 16384 blok untuk disangga sebelum pembuangan terjadi. ini menggunakan memori sekitar 32 Mbyte lebih banyak, tapi... siapa peduli ?

Tentu saja, di lingkungan nyata saya menggunakan pipa bernama sehingga saya dapat memasang dan melepas sesuai kebutuhan. Ada tampilan seperti ini:

mkfifo named_pipe
some_process | bftee named_pipe 16384 | gzip > raw_data.gz &
cat named_pipe | onlineAnalysis.pl > results

Selain itu, proses bereaksi terhadap sinyal sebagai berikut:SIGUSR1 -> cetak penghitung ke STDERRSIGTERM, SIGINT -> pertama keluar dari loop utama dan membuang buffer ke pipa, yang kedua segera menghentikan program.

Mungkin ini membantu seseorang di masa depan...Nikmati


Terinspirasi oleh pertanyaan Anda, saya telah menulis program sederhana yang memungkinkan Anda melakukan ini:

$ myprogram 2>&1 | ftee /tmp/mylog

Berperilaku mirip dengan tee tetapi mengkloning stdin ke stdout dan ke pipa bernama (persyaratan untuk saat ini) tanpa memblokir. Ini berarti bahwa jika Anda ingin masuk dengan cara ini, mungkin saja Anda akan kehilangan data log Anda, tetapi saya kira itu dapat diterima dalam skenario Anda.Caranya adalah dengan memblokir SIGPIPE sinyal dan untuk mengabaikan kesalahan saat menulis ke fifo yang rusak. Contoh ini tentu saja dapat dioptimalkan dengan berbagai cara, tetapi sejauh ini, menurut saya berhasil.

/* ftee - clone stdin to stdout and to a named pipe 
(c) [email protected]
WTFPL Licence */

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <errno.h>
#include <signal.h>
#include <unistd.h>

int main(int argc, char *argv[])
{
    int readfd, writefd;
    struct stat status;
    char *fifonam;
    char buffer[BUFSIZ];
    ssize_t bytes;
    
    signal(SIGPIPE, SIG_IGN);

    if(2!=argc)
    {
        printf("Usage:\n someprog 2>&1 | %s FIFO\n FIFO - path to a"
            " named pipe, required argument\n", argv[0]);
        exit(EXIT_FAILURE);
    }
    fifonam = argv[1];

    readfd = open(fifonam, O_RDONLY | O_NONBLOCK);
    if(-1==readfd)
    {
        perror("ftee: readfd: open()");
        exit(EXIT_FAILURE);
    }

    if(-1==fstat(readfd, &status))
    {
        perror("ftee: fstat");
        close(readfd);
        exit(EXIT_FAILURE);
    }

    if(!S_ISFIFO(status.st_mode))
    {
        printf("ftee: %s in not a fifo!\n", fifonam);
        close(readfd);
        exit(EXIT_FAILURE);
    }

    writefd = open(fifonam, O_WRONLY | O_NONBLOCK);
    if(-1==writefd)
    {
        perror("ftee: writefd: open()");
        close(readfd);
        exit(EXIT_FAILURE);
    }

    close(readfd);

    while(1)
    {
        bytes = read(STDIN_FILENO, buffer, sizeof(buffer));
        if (bytes < 0 && errno == EINTR)
            continue;
        if (bytes <= 0)
            break;

        bytes = write(STDOUT_FILENO, buffer, bytes);
        if(-1==bytes)
            perror("ftee: writing to stdout");
        bytes = write(writefd, buffer, bytes);
        if(-1==bytes);//Ignoring the errors
    }
    close(writefd); 
    return(0);
}

Anda dapat mengompilasinya dengan perintah standar ini:

$ gcc ftee.c -o ftee

Anda dapat memverifikasinya dengan cepat dengan menjalankan mis.:

$ ping www.google.com | ftee /tmp/mylog

$ cat /tmp/mylog

Perhatikan juga - ini bukan multiplexer. Anda hanya dapat memiliki satu proses yang melakukan $ cat /tmp/mylog sekaligus.


Sepertinya bash <> operator redirection (3.6.10 Membuka Deskriptor File untuk Membaca dan MenulisLihat) membuat penulisan ke file/fifo dibuka dengannya tanpa pemblokiran. Ini seharusnya berfungsi:

$ mkfifo /tmp/mylog
$ exec 4<>/tmp/mylog
$ myprogram 2>&1 | tee >&4
$ cat /tmp/mylog # on demend

Solusi diberikan oleh gniourf_gniourf di saluran #bash IRC.


Namun, ini akan membuat file log yang terus berkembang bahkan jika tidak digunakan hingga drive kehabisan ruang.

Mengapa tidak memutar log secara berkala? Bahkan ada program untuk melakukannya untuk Anda logrotate .

Ada juga sistem untuk menghasilkan pesan log dan melakukan hal yang berbeda dengannya sesuai dengan jenisnya. Namanya syslog .

Anda bahkan bisa menggabungkan keduanya. Mintalah program Anda membuat pesan syslog, konfigurasikan syslog untuk menempatkannya di file dan gunakan logrotate untuk memastikan pesan tidak memenuhi disk.

Jika ternyata Anda menulis untuk sistem tersemat kecil dan keluaran programnya berat, ada berbagai teknik yang dapat Anda pertimbangkan.

  • Silog jarak jauh:mengirim pesan syslog ke server syslog di jaringan.
  • Gunakan tingkat keparahan yang tersedia di syslog untuk melakukan berbagai hal dengan pesan. Misalnya. buang "INFO" tetapi catat dan teruskan "ERR" atau lebih besar. Misalnya. untuk menghibur
  • Gunakan penangan sinyal dalam program Anda untuk membaca ulang konfigurasi di HUP dan memvariasikan pembuatan log "sesuai permintaan" dengan cara ini.
  • Buatlah program Anda mendengarkan pada soket unix dan menulis pesan ketika terbuka. Anda bahkan dapat mengimplementasikan dan konsol interaktif ke dalam program Anda dengan cara ini.
  • Dengan menggunakan file konfigurasi, berikan kontrol terperinci atas hasil logging.

Linux
  1. Nyalakan editor teks terminal Linux Anda dengan ed

  2. Perintah Linux dmesg - Cetak Kernel Ring Buffer

  3. Linux – Mengerti Login di Linux?

  1. Apakah alokasi memori di linux tidak memblokir?

  2. C++ Dapatkan string dari Clipboard di Linux

  3. Tingkatkan ukuran buffer scrollback terminal linux

  1. Linux – Mengatur Gubernur CPU Sesuai Permintaan Atau Konservatif?

  2. Memeriksa Repositori dan Logging Linux

  3. mencatat memori RAM tanda air tinggi dari proses Linux