Разделение на отдельные пакеты байтового потока данных.

DataStreamSeparator

Потребовалось мне тут обеспечить общение двух девайсов (пусть будут, для упрощения, Ардуинки — тестировалось всё равно на них) через последовательный интерфейс, ака com-порт, он же rs232.

Так как оба девайса имеют буферы на приём и отправку, то нет никакой гарантии, что если мы отправим 5 байт и сделаем небольшую задержку, то на другой девайс так же придут эти 5 байт разом. То есть при Serial.readBytes их там может оказаться и 5 и 8 (от предыдущей отправки). То есть нужно как-то гарантированно разделять на отдельные «пакеты» поток байтов.

Сразу напрашивается некоторый «разделитель» пакетов, например три байта подряд 0xFF. НО никто не даст гарантии, что при обмене не окажется этих трёх значений среди самих данных, тем самым нарушив нашу разбивку на «пакеты».
Можно, конечно, увеличить разделитель до 20 байт, но пропускная способность существенно снизиться, к тому же память у девайсов не резиновая (Особенно Ардуинок). Либо самого себя ограничить при отправке, что не будет посылать подряд эти байты, короче, извращаться. Меня это не устраивает, т.к. лень. Хочется отправить, не думая, и получить, не парясь особо.

Для упрощения, пока что, опустим тот факт, что данные могут быть искажены при передаче. Так как контрольные суммы и прочее тема отдельного поста. В общем, считаем, что потерь/искажений нет.

Так же ограничим себя следующими хотелками и условиями:
1. Не должно быть существенного увеличения объёма передаваемых данных для служебных целей.
2. Вероятность ошибки при разбивке и сборке должна быть равной 0.
3. Обеспечить минимальную потерю производительности
4. Не выделять дополнительный объём памяти динамически.
5. Длина одного пакета максимум 254 байта.
6. Без потерь

Естественно, под это подходят разные алгоритмы сжатия данных, например LZ77 , но тогда придётся вставлять дополнительные байты сколько есть повторов/сколько без повторов, что в худшем случае, увеличит объём данных практически в 1.5 раза. И цели, как таковой, «сжать» у нас не было. Разве что бонусом получить.

Давайте для примера, возьмём разделитель пакетов из 3х байт 0x0, как самые неудобные и часто встречающиеся.

Так как нам нужно исключить повторение лишь этой последовательности, то придуман (наверняка не мной и он уже как-то по-умному называется, но я не смог найти)

Алгоритм исключения последовательности из данных с последующим восстановлением:

Допустим есть массив байт (десятичные значения тут и далее):
5 6 7 0 0 0 65 12 14 88 66 0 0 88 99 104 55 19 22 29 0 0 0 18 78 89 0 0 0 252 5

Ищем первое появление 2х и более нулей подряд, запоминаем смещение первого 0 и считаем сколько их. Как посчитали, по следующему индексу после него ставим количество, а перед всеми данными вставляем смещение.
4 5 6 7 0 0 0 65 12 14 88 66 0 0 88 99 104 55 19 22 29 0 0 0 18 78 89 0 0 0 252 5

Ищем следующее включение 2х и более нулей подряд, запоминаем индекс, считаем сколько подряд.
На следующий от текущего индекса ставим сколько повторений нулей подряд.
Выясняем смещение, относительно предыдущего (так сказать получается расстояние до следующего).
По индексу смещения, найденного в предыдущий раз вставляем это расстояние. Остальное можно забить любыми значениями (у меня 255).

4 5 6 7 17 3 255 65 12 14 88 66 0 0 88 99 104 55 19 22 29 0 0 0 18 78 89 0 0 0 254 5

Можно, конечно, и вместо, тем самым бонусом немного «сжать», но это ведёт к усложнению декодирования, так что пока что не паримся.

Повторяем так для всех следующих включений 2х нулей подряд:

4 5 6 7 17 3 255 65 12 14 88 66 0 0 88 99 104 55 19 22 29 6 3 255 18 78 89 0 0 0 252 5

Если достигли конца, и повторы не найдены, то на место предыдущих нулей записать 255 как индикатор, что дальше нет трёх подряд.
Если трёх подряд вообще не было, то вставить 255 перед всеми данными, опять же, как индикатор, что можно не париться.

4 5 6 7 17 3 255 65 12 14 88 66 0 0 88 99 104 55 19 22 29 6 3 255 18 78 89 255 3 255 252 5

Если длина массива больше, чем 254 байта, то, соответственно, начинать заново весь алгоритм, но уже относительно смещения в 255.

В итоге
нужен всего лишь 1 байт дополнительных данных и всё =)
достаточно быстро, т.к. нет сложных мат. вычислений и всего лишь один прогон по всему массиву.

Итоговая реализация (на Qt):

#include <QCoreApplication>

#include <QByteArray>>
#include <QDebug>

/**
* @brief Выводим массив байт в виде десятичных значений
* @param title
* @param ba
*/
void showBA(QString title, const QByteArray ba)
{
    QStringList sl;
    foreach (uchar b, ba) {
        sl.append(QString::number(b, 10));
    }
    qDebug()<<"ByteArray:"<<title<<sl.join(" ");
}

/**
* @brief Кодируем байты
* @param ba
*/
int serialEncode(QByteArray &ba)
{
    int firstOffset = 255; //-- Что бы не сдвигать данные, будем возвращать, с какого места начинается первая последовательность. Пока что считаем, что таких нет.
    int lastIdx = -1; //-- С какого индекса в предыдущий раз началась последовательность
    int fromIdx = -1; //-- С какого индекса сейчас началась последовательность
    int together = 0;

    for (int i=0; i<ba.length(); ++i) {
        uchar b = ba[i];
        if ( b==0 ) { //-- Начинаем считать, сколько подряд
            together++;
            if ( fromIdx<0 ) { fromIdx = i; } //-- Запоминаем, с какого началась последовательность
            if ( together>2 ) { ba[i] = 0xFF; } //-- Всё, что дальше, забиваем сразу 255 что бы лишних нулей не было.
        } else //-- Подряд кончились
        if ( together>0 ) {
            if ( together>=2 ) { //-- Ну, как минимум 2 подряд есть
                ba[fromIdx+1] = together; //-- Укажем, сколько нас
                if (lastIdx==-1) { //-- Если первый раз нашли, то запоминаем отдельнго
                    firstOffset = fromIdx;
                } else {
                    ba[lastIdx] = fromIdx-lastIdx; //-- Расстояние до текущего
                }
                lastIdx = fromIdx;
            }
            together = 0;
            fromIdx = -1;
        }
    }

    if ( lastIdx>-1 ) { ba[lastIdx] = 255; }//-- Установим, что дальше нет подряд идущих

    return firstOffset;
}

/**
* @brief Декодируем обратно
* @param firstOffset - с какого индекса начинается первая последовательность
* @param ba
*/
void serialDecode(int firstOffset, QByteArray &ba)
{
    if ( firstOffset==255 ) { return; } //-- Нет у нас повторений, так что нечего декодировать

    int fromOffset = firstOffset; //-- С какого начинать заполнение
    int fillCount = 0; //-- Сколько заполнять
    for (int i=0; i<ba.length(); ++i) {
        if ( i==fromOffset ) { //-- Дошли до места, откуда нужно начинать заполнять
            fromOffset += ba[i]; //-- Узнаём, откуда в следующий раз начинать заполнять
            fillCount = ba[i+1]; //-- Узнаём, сколько сейчас надо заполнить
        }
        if ( fillCount>0 ) { //-- Если ещё не заполнили, то пора
            fillCount--;
            ba[i] = 0;
            if ( fillCount==0 && fromOffset==255 ) { break; } //-- Дальше идти нет смысла, т.к. повторения кончились.
        }
    }
}

int main(int argc, char *argv[])
{
    QCoreApplication a(argc, argv);

    QByteArray ba = QByteArray(std::begin<char>({0x5, 0x6, 0x7, 0x0, 0x0, 0x0, 0x41, 0xC, 0xE, 0x58, 0x42, 0x0, 0x0, 0x58, 0x63, 0x68, 0x37, 0x13, 0x16, 0x1D, 0x0, 0x0, 0x0, 0x12, 0x4E, 0x59, 0x0, 0x0, 0x0, 0x4, 0x5}), 31);

    showBA("Source  ", ba);

    int firstOffset = serialEncode(ba);
    showBA("Encoded:", ba);


    serialDecode(firstOffset, ba);
    showBA("Decoded:", ba);

    return a.exec();
}

Вот как-то так.

Related posts

QML Сделать задержку перед началом анимации

QtCreator не открывает диалог выбора файлов, проектов, не открывает проект

Сборка Qt 6.8 (Dev) из исходников в Docker контейнере (Linux)