从0开始学习Redis#

使用的redis版本为7.2.2

gui客户端为 https://goanother.com/

首先看尽量多的文档,了解各种数据类型/存储选择/架构选择,再看代码。
把重要的代码流程记录下来。


1 数据类型#

https://redis.com/redis-enterprise/data-structures/
https://redis.io/docs/data-types/

1.1 string#

最基本的字符/bytes/文本。

redis的key是string类型。

一个string最大512mb。

1.2 list#

list是string的链表。
常用来实现stack和queue。

LPUSH key element [element ...]

1.3 set#

无序的set,存string,元素唯一。

最大容量2^32-1(4,294,967,295)。

1.4 hash#

一个kv的集合。

res1 = r.hset(
    "bike:1",
    mapping={
        "model": "Deimos",
        "brand": "Ergonom",
        "type": "Enduro bikes",
        "price": 4972,
    },
)

1.5 sorted set#

有序的set,依某种score排序,如果score相同,按key的字符串顺序再排。
典型应用为排行榜/计数器。

底层实现为hash table + skip list
zadd复杂度为O(log(N))。

1.6 stream#

可看作一个append-only log。记录顺序的数据。

典型应用

  • 记录用户操作

  • 记录设备事件

  • 在不同stream中分别记录每个用户的提醒

每个stream有一个唯一id。
有一些trimming策略,防止数据过多。

底层数据结构为radix trees。

1.7 Geospatial indexe#

存地理坐标。可快速查询某个坐标指定半径或box内的数据。

1.8 bitmap#

不是一个真正的数据结构。是一个string/bit vector。

可设置bit
setbit key 10 1
setbit key 10 1

获取单个bit
getbit key 10
1

统计总数
bitcount key
2

1.9 bitfield#

可给任意数字a对应一个数字b,对b进行各种操作。

例如设置玩家的状态,金钱对应的a值定为0,杀怪数a值定为1。

初始金钱
BITFIELD player:1:stats SET u32 #0 1000

杀了一个怪,金钱+50,杀怪数+1。
BITFIELD player:1:stats INCRBY u32 #0 50 INCRBY u32 #1 1


2 replication#

https://redis.io/docs/management/replication/

主要机制

  1. 当主从正常连接时,当master发生改变时,给replica发送命令流,在replica上复制master上的操作。保证数据一致。

  2. 发生断连,从发起重连,获取缺失的命令(partial resynchronization)。

  3. 无法进行partial resynchronization。发起full resynchronization。整个重做数据。

数据的同步默认是异步的,例如客户端修改一个数据,在master上成功后立即返回,然后再同步到slave,可能造成客户端成功后,从slave读到老数据,数据出现不一致。
WAIT(https://redis.io/commands/wait/) 可解决,可等待slave完成同步再返回。
类似mysql/mongodb的write concern之类概念。

重点

  • 复制是异步的

  • 可有多个replica

  • replica不是只能连master,可以连其他replica。可以做成层级。

  • 复制在master侧是非阻塞的,就是第一点?复制不会导致master不可用。

  • 复制在replica侧大体是非阻塞的?这里讲的不是很清楚。replica做初始同步时,可以用旧数据。有相应的设置。

  • replica可做scalability,做读写分离/数据安全。

  • 可让master不做持久化,从而提速。让副本去做持久化。

关闭master持久化的危险#

如果master完全关闭持久化,并且打开自动重启。会有危险。

  1. master关闭持久化,副本正常运行。

  2. master挂掉并重启,master数据清空。

  3. 副本复制master,数据也被清空。


3 sentinel#

用于非cluster的高可用

https://redis.io/docs/management/sentinel/

https://www.baeldung.com/redis-sentinel-vs-clustering

https://medium.com/@chaewonkong/redis-sentinel-vs-redis-cluster-a-comparative-overview-8c2561d3168f#:~:text=Redis%20Sentinel%20is%20an%20excellent,high%20throughputs%20across%20multiple%20nodes

  • 监控
    监控master和replica

  • 通知
    把监控产生的事件通知给外部

  • 自动failover
    当master处理问题,启动failover流程,选出一个新的master并通知客户端。

  • 作为配置提供者
    客户端连上sentinel可获取服务信息,例如上述failover,可获取新的master地址。

sentinel作为分布式系统#

sentinel是一个分布式系统。

  1. 多个sentinel节点会监控master,如果他们一致认为master有问题,会启动对应的流程。

  2. 即使少数sentinel节点挂掉也可以保证整个系统正常运行。

重点

  • 原则上需要三个sentinel节点

  • 原则上三个节点需要独立,比如在三地。

  • 即使sentinel也无法保证异常情况下写操作的确认。但可以降低出错概率。

  • 第三方客户端需要支持sentinel。

  • 需要注意网络网络配置


4 cluster#

https://redis.io/docs/management/scaling/

https://redis.io/docs/reference/cluster-spec/

集群用于水平扩展。

待研究

5 持久化#

持久化选项

  • RDB(Redis Database)
    point-in-time快照

  • AOF(Append Only File)
    对每个写操作做log。可在重启时用log做replay,还原数据。

  • RDB+AOF

  • 可关闭持久化

RDB优势

  • rdb是整个数据的一份point-in-time的快照。很适用于备份。
    比如每24备份整个数据

  • 适用于灾后重建数据

  • 效率高,父进程起一个子进程处理快照就行,自己不用处理磁盘io。

  • 对于大量数据,重启比AOF快。

  • 对于replica,rdb支持partial resynchronization

RDB劣势

  • 对于server崩溃的情况,不善于保住数据。
    例如一般可以设置一个快照间隔,比如5分钟做一次快照。那么最多可以能丢5分钟的数据。

  • RDB需要fork一个子进程来走快照流程,如果数据量很大,可造成卡顿。

AOF优势

  • aof更耐用,可以采用不同的fsync(写磁盘)策略,不写/每秒/每个query
    即使是默认的每秒做fsync,仍有很好的表现。默认最多丢失1秒的数据。

  • append-only log,不会有seek操作。断电等情况不会导致数据错乱。
    即使出现问题导致某个命令执行了一半,也能妥善处理。

  • rewrite机制。可在后台重写aof,使之包含最小的能重建整个数据的数据。类似定期压缩一下log。

  • log内容清晰易懂。
    如果不小心FLUSHALL清除了所有数据,可以在rewrite之前关闭服务,把最后的那个FLUSHALL命令从log中删除,重启后能恢复到之前的状态。

AOF劣势

  • 文件比RDB大

  • 一般情况比RDB慢,因为需要频繁fsync,除非关掉fsync。

如何选择?如果数据尽可能不要丢,用aof。如果数据很多,要快,可以承受丢几分钟数据,用rdb。


6 看代码#

暂时只看最简的流程。不看sentinel和cluster等。
挑一些能反映数据结构重要性质的命令来看。

添加defined
__linux__

6.1 dict基本结构#

本质就是一个hash。


typedef struct dictType {
    // 定义一系列函数和变量。形成不同的dict子类型
}

// 具体的一条数据。包含一个key和一个v。next指向hash值相等的下一条数据。
struct dictEntry {
    void *key;
    union {
        void *val;
        uint64_t u64;
        int64_t s64;
        double d;
    } v;
    struct dictEntry *next;     /* Next entry in the same hash bucket. */
    void *metadata[];           /* An arbitrary number of bytes (starting at a
                                 * pointer-aligned address) of size as returned
                                 * by dictType's dictEntryMetadataBytes(). */
};

struct dict {
    dictType *type; // 上述dictType

    dictEntry **ht_table[2];
    unsigned long ht_used[2];

    long rehashidx; /* rehashing not in progress if rehashidx == -1 */

    /* Keep small vars at end for optimal (minimal) struct padding */
    int16_t pauserehash; /* If >0 rehashing is paused (<0 indicates coding error) */
    signed char ht_size_exp[2]; /* exponent of size. (size = 1<<exp) */

    void *metadata[];           /* An arbitrary number of bytes (starting at a
                                 * pointer-aligned address) of size as defined
                                 * by dictType's dictEntryBytes. */
};


// 关于hash和取模的一些讨论
// hash出来是一个很大的值,好的hash可以认为是均匀分布。
// 大致可以认为取模影响很小,正好可以控制容量,把数据做到一个数组里。
// https://www.quora.com/Why-is-the-modulo-operator-used-in-hashing-What-characteristics-makes-it-ideal-in-calculating-location-of-values-in-a-hash-table

// 有个dictRehash操作待研究

dictAdd(dict *d, void *key, void *val)
    dictEntry *entry = dictAddRaw(d,key,NULL);
        void *position = dictFindPositionForInsert(d, key, existing);
            uint64_t hash = dictHashKey(d, key); // 算hash值
                // 常用dictSdsHash
                dictSdsHash
                    dictGenHashFunction
                        siphash

            // 按需用_dictExpandIfNeeded扩容。

            idx = hash & DICTHT_SIZE_MASK(d->ht_size_exp[table]); // 算index

            // index对应bucket
            dictEntry **bucket = &d->ht_table[dictIsRehashing(d) ? 1 : 0][idx];
            return bucket;

        return dictInsertAtPosition(d, key, position);
            // 有table 0和table 1之分。暂不深入

            // 分配内存。插入到此bucket即可

    dictSetVal



6.2 listpack基本结构#

listpack是最新引入的一种结构,取代ziplist。
基本就是把数据按协议打包为二进制数据,依次存到一块内存。而不是简单的链表。


// 值是long long或string
typedef struct {
    /* When string is used, it is provided with the length (slen). */
    unsigned char *sval;
    uint32_t slen;
    /* When integer is used, 'sval' is NULL, and lval holds the value. */
    long long lval;
} listpackEntry;


// 新建
#define LP_HDR_SIZE 6

unsigned char *lpNew(size_t capacity) {
    unsigned char *lp = lp_malloc(capacity > LP_HDR_SIZE+1 ? capacity : LP_HDR_SIZE+1);
    if (lp == NULL) return NULL;
    lpSetTotalBytes(lp,LP_HDR_SIZE+1);
        // 总bytes数存在前4字节
        #define lpSetTotalBytes(p,v) do { \
            (p)[0] = (v)&0xff; \
            (p)[1] = ((v)>>8)&0xff; \
            (p)[2] = ((v)>>16)&0xff; \
            (p)[3] = ((v)>>24)&0xff; \
        } while(0)

    lpSetNumElements(lp,0);
        // item数量存在5/6字节
        #define lpSetNumElements(p,v) do { \
            (p)[4] = (v)&0xff; \
            (p)[5] = ((v)>>8)&0xff; \
        } while(0)

    lp[LP_HDR_SIZE] = LP_EOF;
    return lp;
}

/*初始数据结构

整个就是一串bytes
+------------------+---------------------+-------------+
|     4 bytes      |      2 bytes        |    1 byte   |
|   total_bytes=7  |   entry_count=0     |    LP_EOF   |
+------------------+---------------------+-------------+

*/


lpAppendInteger
    // 取前4字节总bytes数
    uint64_t listpack_bytes = lpGetTotalBytes(lp);

    // eof的位置就是总bytes数-1
    unsigned char *eofptr = lp + listpack_bytes - 1;

    // 在lp中,在eofptr之前,插入lval。
    return lpInsertInteger(lp, lval, eofptr, LP_BEFORE, NULL);
        // 进行编码
        uint64_t enclen; /* The length of the encoded element. */
        unsigned char intenc[LP_MAX_INT_ENCODING_LEN];

        lpEncodeIntegerGetType(lval, intenc, &enclen);
            // 按数字范围进行编码

        return lpInsert(lp, NULL, intenc, enclen, p, where, newp);
            // 针对插入int
            // p是eof

            unsigned long poff = p-lp; // eof的offset

            enctype = LP_ENCODING_INT;
            enclen = size;

            unsigned long backlen_size = (!delete) ? lpEncodeBacklen(backlen,enclen) : 0;
                // 又是按数字范围编码
                // 对于数字来说很小,最大9。这里算它9。
                // 9编码得1。

            uint64_t old_listpack_bytes = lpGetTotalBytes(lp);
            uint32_t replaced_len  = 0;

            // 计算插入后的大小
            // 例如从初始状态开始
            // new_listpack_bytes = 7 + 9 + 1 - 0 = 17
            uint64_t new_listpack_bytes = old_listpack_bytes + enclen + backlen_size - replaced_len;

            unsigned char *dst = lp + poff; // 指向末尾

            // 变大,需要分配内存。
            if (new_listpack_bytes > old_listpack_bytes &&
                new_listpack_bytes > lp_malloc_size(lp)) {
                if ((lp = lp_realloc(lp,new_listpack_bytes)) == NULL) return NULL;
                dst = lp + poff;
            }

            // move大块数据
            if (where == LP_BEFORE) {
                // 对于添加到末尾,old_listpack_bytes-poff == 0。就是不用move。
                memmove(dst+enclen+backlen_size,dst,old_listpack_bytes-poff);
            }

            // 赋值新数据
            memcpy(dst,eleint,enclen);
            dst += enclen;
            memcpy(dst,backlen,backlen_size);
            dst += backlen_size;

            // 更新头数据
            if (where != LP_REPLACE || delete) {
                uint32_t num_elements = lpGetNumElements(lp);
                if (num_elements != LP_HDR_NUMELE_UNKNOWN) {
                    if (!delete)
                        lpSetNumElements(lp,num_elements+1);
                    else
                        lpSetNumElements(lp,num_elements-1);
                }
            }
            lpSetTotalBytes(lp,new_listpack_bytes);


/*添加一个数字后的结构

整个就是一串bytes
+------------------+-------------------+----------------+----------------------+-------------+
|     4 bytes      |      2 bytes      | enclen=9 byte  | backlen_size=1 byte  |    1 byte   |
|   total_bytes=17 |   entry_count=1   |    eleint      |      backlen         |    LP_EOF   |
+------------------+-------------------+----------------+----------------------+-------------+

*/


6.3 redis主框架#

主体大概是

  1. 单进程

  2. 各种初始化

  3. 起tcp server收client命令

  4. 起各种定时任务,包括数据的持久化。

  5. 起epoll事件循环

  6. 由事件驱动程序运行


#define LRU_BITS 24
struct redisObject {
    unsigned type:4;
    unsigned encoding:4;
    unsigned lru:LRU_BITS; /* LRU time (relative to global lru_clock) or
                            * LFU data (least significant 8 bits frequency
                            * and most significant 16 bits access time). */
    int refcount;
    void *ptr;
};
typedef struct redisObject robj;


// redis server总结构
struct redisServer {
    pid_t pid;

    int sentinel_mode;

    int module_pipe[2];         /* Pipe used to awake the event loop by module threads. */

    connListener listeners[CONN_TYPE_MAX]; /* TCP/Unix/TLS even more types */

    aofManifest *aof_manifest;       /* Used to track AOFs. */
}
struct redisServer server;

// 各种shared robj
struct sharedObjectsStruct {
}

struct sharedObjectsStruct shared;


main // 总入口

    // 时间/随机数等等初始化

    // 检查是否sentinel模式
    server.sentinel_mode = checkForSentinelMode(argc,argv, exec_name);
        // exe名字为redis-sentinel或者传了--sentinel参数。  
        // 本体就这一份代码,sentinel和其他模式是共存的。

    initServerConfig
        // server各种初始化

        // 初始化命令表
        server.commands = dictCreate(&commandTableDictType);
        server.orig_commands = dictCreate(&commandTableDictType);
        populateCommandTable();
            // 装载所有命令
            retval1 = dictAdd(server.commands, sdsdup(c->fullname), c);
            retval2 = dictAdd(server.orig_commands, sdsdup(c->fullname), c);

    // 初始化各种模块
    moduleInitModulesSystem
        moduleRegisterCoreAPI

        anetPipe

        pthread_mutex_lock(&moduleGIL);

    connTypeInitialize
        // 注册一些连接接口比如socket
        RedisRegisterConnectionTypeSocket
            connTypeRegister(&CT_Socket)
            // 见CT_Socket。包含socket的各种接口。


    // 检查aof和rdb。待续
    if (strstr(exec_name,"redis-check-rdb") != NULL)
        redis_check_rdb_main(argc,argv,NULL);
    else if (strstr(exec_name,"redis-check-aof") != NULL)
        redis_check_aof_main(argc,argv);

    // 处理参数
    if (argc >= 2) {
    }

    linuxMemoryWarnings


    if (background) daemonize();


    serverLog(LL_NOTICE, "oO0OoO0OoO0Oo Redis is starting oO0OoO0OoO0Oo");

    initServer
        // 配置信号的处理
        signal(SIGHUP, SIG_IGN);
        signal(SIGPIPE, SIG_IGN);
        setupSignalHandlers();
        makeThreadKillable();

        // server各种初始化

        resetReplicationBuffer

        createSharedObjects
            // 各种createObject和createStringObject
            // 主要创建各种写死的string

        adjustOpenFilesLimit

        server.el = aeCreateEventLoop(server.maxclients+CONFIG_FDSET_INCR);
            
            /* State of an event based program */
            typedef struct aeEventLoop {
                int maxfd;   /* highest file descriptor currently registered */
                int setsize; /* max number of file descriptors tracked */
                long long timeEventNextId;
                aeFileEvent *events; /* Registered events */
                aeFiredEvent *fired; /* Fired events */
                aeTimeEvent *timeEventHead;
                int stop;
                void *apidata; /* This is used for polling API specific data */
                aeBeforeSleepProc *beforesleep;
                aeBeforeSleepProc *aftersleep;
                int flags;
            } aeEventLoop;

            aeEventLoop *eventLoop;

            // 直接按最大client数预先分配好空间。后续直接用。
            eventLoop->events = zmalloc(sizeof(aeFileEvent)*setsize);
            eventLoop->fired = zmalloc(sizeof(aeFiredEvent)*setsize);

            aeApiCreate(eventLoop)
                typedef struct aeApiState {
                    int epfd;
                    struct epoll_event *events;
                } aeApiState;

                aeApiState *state = zmalloc(sizeof(aeApiState));
                state->events = zmalloc(sizeof(struct epoll_event)*eventLoop->setsize);
                state->epfd = epoll_create(1024); /* 1024 is just a hint for the kernel */
                eventLoop->apidata = state;



        typedef struct redisDb {
            dict *dict;                 /* The keyspace for this DB */
            dict *expires;              /* Timeout of keys with a timeout set */
            dict *blocking_keys;        /* Keys with clients waiting for data (BLPOP)*/
            dict *blocking_keys_unblock_on_nokey;   /* Keys with clients waiting for
                                                     * data, and should be unblocked if key is deleted (XREADEDGROUP).
                                                     * This is a subset of blocking_keys*/
            dict *ready_keys;           /* Blocked keys that received a PUSH */
            dict *watched_keys;         /* WATCHED keys for MULTI/EXEC CAS */
            int id;                     /* Database ID */
            long long avg_ttl;          /* Average TTL, just for stats */
            unsigned long expires_cursor; /* Cursor of the active expire cycle. */
            list *defrag_later;         /* List of key names to attempt to defrag one by one, gradually. */
            clusterSlotToKeyMapping *slots_to_keys; /* Array of slots to keys. Only used in cluster mode (db 0). */
        } redisDb;

        // 分配db内存
        server.db = zmalloc(sizeof(redisDb)*server.dbnum);

        for (j = 0; j < server.dbnum; j++) {
            // 初始化每个db的数据
        }

        evictionPoolAlloc(); /* Initialize the LRU keys pool. */

        // server各种初始化

        resetServerStats();

        // 起定时任务。执行比较频繁。默认每秒跑10次。CONFIG_DEFAULT_HZ
        aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL)
            serverCron
                // 跑各种定时流程
                // 比如是否要关闭系统
                // 比如各种检测流程

                // 定时对clients做一些操作。比如检查是否要踢掉。
                clientsCron();

                // aof流程
                run_with_period(1000) {
                    if ((server.aof_state == AOF_ON || server.aof_state == AOF_WAIT_REWRITE) &&
                        server.aof_last_write_status == C_ERR) 
                        {
                            flushAppendOnlyFile(0);
                        }
                }

        // 注册pipe事件到epoll
        aeCreateFileEvent(server.el, server.module_pipe[0], AE_READABLE, modulePipeReadable,NULL)
            modulePipeReadable
                eventLoopHandleOneShotEvents
                    // 调用对应的func

        aeSetBeforeSleepProc(server.el,beforeSleep);
        aeSetAfterSleepProc(server.el,afterSleep);

        scriptingInit(1);
        functionsInit();
        slowlogInit();
        latencyMonitorInit();

        ACLUpdateDefaultUserPassword(server.requirepass);

        applyWatchdogPeriod();

        // initServer结束

    // 打印logo等
    redisAsciiArt

    checkTcpBacklogSettings


    if (!server.sentinel_mode) {
        moduleInitModulesSystemLast();
        moduleLoadFromQueue();
    }

    initListeners();
        // 对每种连接类型进行listen
        connListen
            // 直接调用最开始注册的接口
            listener->ct->listen(listener);


    InitServerLast(); // 待续

    if (!server.sentinel_mode) {
        // 初始化aof/rdb数据。见后续分析。
        serverLog(LL_NOTICE,"Server initialized");
        aofLoadManifestFromDisk();
        loadDataFromDisk();
        aofOpenIfNeededOnServerStart();
        aofDelHistoryFiles();
    }


    redisSetCpuAffinity(server.server_cpulist);
    setOOMScoreAdj(-1);

    // 进入事件循环。之后就是死循环,由事件驱动各种流程。
    aeMain(server.el);
        eventLoop->stop = 0;
            while (!eventLoop->stop) {
                aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_BEFORE_SLEEP|AE_CALL_AFTER_SLEEP);
                    numevents = aeApiPoll(eventLoop, tvp);
                        epoll_wait // 在此wait epoll事件

            }

    aeDeleteEventLoop(server.el);

6.4 一个具体命令的触发#

  1. 初始化阶段起socket并listen

  2. 进入事件循环后client连上来触发accept等

  3. 收数据

  4. 解析命令

  5. 执行命令


// 这里定义了socket连接的各种接口
static ConnectionType CT_Socket = {
    .listen = connSocketListen,

    .accept_handler = connSocketAcceptHandler,

    .connect = connSocketConnect,

    .read = connSocketRead,

    .set_read_handler = connSocketSetReadHandler,
}

// main中的initListeners会调connSocketListen进行listen。起一个tcp server。  

initListeners
    connListen
        listener->ct->listen(listener);
            connSocketListen
                listenToPort
                    sfd->fd[sfd->count] = anetTcpServer
                        _anetTcpServer
                            socket
                            anetListen
                                bind
                                listen
    createSocketAcceptHandler(listener, connAcceptHandler(listener->ct))
        // 对创建的socket进行epoll监听
        aeCreateFileEvent
            aeApiAddEvent
                epoll_ctl

// 当listen有结果
accept_handler
    connSocketAcceptHandler
        anetTcpAccept
            anetGenericAccept
                accept
        acceptCommonHandler(connCreateAcceptedSocket(cfd, NULL),0,cip);
            connCreateAcceptedSocket
                connCreateSocket

            // 拿到了对端连接
            // 做一些检查。比如是否超出最大client数

            createClient(conn)
                // 初始化各种client数据

                // 设置socket可读事件
                connSetReadHandler(conn, readQueryFromClient);

            connAccept // 暂时没懂


// 当client可读。比如我们在客户端输入了命令,发送tcp数据到server。
readQueryFromClient
    processInputBuffer
        processCommandAndResetClient
            processCommand
                lookupCommand // 匹配命令
                    lookupCommandLogic(server.commands,argv,argc,0);
                        // 命令表为struct COMMAND_STRUCT redisCommandTable[]
                        // server.commands是所有命令的dict。在initServerConfig里填充。
                        dictFetchValue

                // 进行各种检查。各种rejectCommand

                call(c,flags) // 最终执行一个命令
                    c->cmd->proc(c); // 执行redisCommand的proc
                        lpushCommand // 执行具体的命令比如lpushCommand

                afterCommand
                    postExecutionUnitOperations
                        propagatePendingCommands
                            propagateNow
                                feedAppendOnlyFile
                                    if (server.aof_state == AOF_ON ||
                                        (server.aof_state == AOF_WAIT_REWRITE && server.child_type == CHILD_TYPE_AOF))
                                    {
                                        // 执行完一个命令。可能把该命令存到server.aof_buf
                                        server.aof_buf = sdscatlen(server.aof_buf, buf, sdslen(buf));
                                    }

            commandProcessed(c);

6.5 lpush流程#

添加数据到listpack结构


// LPUSH key element [element ...]
// 底层是listpack
lpushCommand
    pushGenericCommand(c,LIST_HEAD,0); 
        // 在db的dict里找key
        robj *lobj = lookupKeyWrite(c->db, c->argv[1]);
            lookupKey
                dictFind

        if (checkType(c,lobj,OBJ_LIST)) return; // 检查是否list类型

        if (!lobj) {

            // 如果key不存在。新建list。
            // listpack是较新的数据结构,取代老的ziplist。
            // OBJ_ENCODING_LISTPACK
            lobj = createListListpackObject();


            dbAdd(c->db,c->argv[1],lobj);
                dbAddInternal
                    // db中添加key,value为新建的list。

        }

        // 后续的参数都是要push的值。循环push。
        for (j = 2; j < c->argc; j++) {
            listTypePush(lobj,c->argv[j],where);
                // 插入int为例

                // head
                lpPrependInteger(subject->ptr, (long)value->ptr)
                    unsigned char *p = lpFirst(lp);
                    return lpInsertInteger(lp, lval, p, LP_BEFORE, NULL);



                // 或tail
                lpAppendInteger(subject->ptr, (long)value->ptr);

            server.dirty++;
        }

        // 返回目前list长度
        addReplyLongLong(c, listTypeLength(lobj));

6.6 sadd流程#

添加数据到hash表


// SADD key member [member ...]
// member如果是int,会转成string。
// 最简的set。和dict基本一样。是一个hash表。
saddCommand
    // 在db的dict里找key
    set = lookupKeyWrite(c->db,c->argv[1]);

    if (checkType(c,set,OBJ_SET)) return; // 检查是否为set类型

    if (set == NULL) { // 如果key不存在
        set = setTypeCreate(c->argv[2]->ptr, c->argc - 2);
            robj *o = createSetObject();
                dict *d = dictCreate(&setDictType);
                robj *o = createObject(OBJ_SET,d);
                o->encoding = OBJ_ENCODING_HT; // 可见底层是一个hash table

        dbAdd(c->db,c->argv[1],set);
            dbAddInternal
                // db中添加key,value为新建的set。
    } else {
        setTypeMaybeConvert(set, c->argc - 2);
    }

    // 添加后续member
    for (j = 2; j < c->argc; j++) {
        if (setTypeAdd(set,c->argv[j]->ptr)) added++;
            setTypeAddAux(subject, value, sdslen(value), 0, 1);
                // 和dict操作一样
                void *position = dictFindPositionForInsert(ht, sdsval, NULL);
                dictInsertAtPosition(ht, sdsval, position);


    }

    addReplyLongLong(c,added);

6.7 hset流程#

添加键值对到hash


// HSET key field value [field value ...]
hsetCommand
    // 检查参数
    
    hashTypeLookupWriteOrCreate
        // 检查key存在

        robj *o = lookupKeyWrite(c->db,key);
        if (checkType(c,o,OBJ_HASH)) return NULL;

        if (o == NULL) {
            o = createHashObject();
                 unsigned char *zl = lpNew(0);
                robj *o = createObject(OBJ_HASH, zl);
                o->encoding = OBJ_ENCODING_LISTPACK; // 初始编码是listpack

            dbAdd(c->db,key,o);
        }

    // redis的存储机制。当元素比较小或某些情况下,以listpack等结构来存,它认为memory方面更高效。
    // 当数量变大,按配置来,会转成真正的hash表。
    // 参数见hash_max_listpack_entries,hash_max_listpack_value等
    hashTypeTryConversion(o,c->argv,2,c->argc-1);
        hashTypeConvert(o, OBJ_ENCODING_HT);
            hashTypeConvertListpack(o, enc);
                // OBJ_ENCODING_LISTPACK转为OBJ_ENCODING_HT


    // 添加后续元素
    for (i = 2; i < c->argc; i += 2)
        created += !hashTypeSet(o,c->argv[i]->ptr,c->argv[i+1]->ptr,HASH_SET_COPY);
            // 按OBJ_ENCODING_HT来看
            // 更dictAdd相似。就是走基本的算hash并插入流程。


6.8 skiplist和有序set#

https://igoro.com/archive/skip-lists-are-fascinating/
https://blog.reachsumit.com/posts/2020/07/skip-list/
https://en.wikipedia.org/wiki/Skip_list
https://www.geeksforgeeks.org/skip-list/
https://brilliant.org/wiki/skip-lists/
https://opendatastructures.org/newhtml/ods/latex/skiplists.html

skiplist是一个多层链表结构。
每一层都是有序。
最底层包含所有元素。每向上一层,理想状态是包含下一层的一般元素。具体位置有插入时随机决定。
整个结构在一个概率框架下,大致是一个二分的形式。
增删查都是log的复杂度。

从图示看来,各种操作都非常清晰。
插入时要随机向上添加元素。
删除时把整个节点扣掉即可。

最底层level为0。


#define ZSKIPLIST_MAXLEVEL 32

/* ZSETs use a specialized version of Skiplists */
typedef struct zskiplistNode {
    sds ele;
    double score;
    struct zskiplistNode *backward;
    struct zskiplistLevel {
        struct zskiplistNode *forward; // 前向的下一个节点
        unsigned long span;
    } level[];
} zskiplistNode;

typedef struct zskiplist {
    struct zskiplistNode *header, *tail;
    unsigned long length; // 元素数。初始0。
    int level; // 总level数。初始1。
} zskiplist;

typedef struct zset {
    dict *dict;
    zskiplist *zsl;
} zset;




// 有序set
// ZADD key [NX | XX] [GT | LT] [CH] [INCR] score member [score member...]
zaddCommand
    zaddGenericCommand(c,ZADD_IN_NONE);
        // 检查NX之类各种参数。这里暂忽略

        // 检查所有score

        // 检查key的存在
        zobj = lookupKeyWrite(c->db,key);
        if (checkType(c,zobj,OBJ_ZSET)) goto cleanup;

        if (zobj == NULL) {
            if (xx) goto reply_to_client; /* No key + XX option: nothing to do. */
            zobj = zsetTypeCreate(elements, sdslen(c->argv[scoreidx+1]->ptr));
                // 同hash,有个转换流程

                robj *zobj = createZsetObject();
                    // 创建zset。


                    // 初始为OBJ_ENCODING_LISTPACK
                    // 转化后为OBJ_ENCODING_SKIPLIST

                    zs->dict = dictCreate(&zsetDictType);
                    zs->zsl = zslCreate();
                        zsl->level = 1; // 默认level数
                        zsl->length = 0;
                        zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL,0,NULL);
                            // 分配一个zskiplistNode和32个zskiplistLevel的内存
                            // 作为第一列空的节点。
                            zskiplistNode *zn = zmalloc(sizeof(*zn)+level*sizeof(struct zskiplistLevel));
                            zn->score = score; // 为0
                            zn->ele = ele;     // 为NULL

                        // header每一层初始化
                        for (j = 0; j < ZSKIPLIST_MAXLEVEL; j++) {
                            zsl->header->level[j].forward = NULL;
                            zsl->header->level[j].span = 0;
                        }

                    o = createObject(OBJ_ZSET,zs);
                    o->encoding = OBJ_ENCODING_SKIPLIST;

            dbAdd(c->db,key,zobj);
        }

        // 添加每一对score/member
        for (j = 0; j < elements; j++) {
            int retval = zsetAdd(zobj, score, ele, flags, &retflags, &newscore);
                // 针对OBJ_ENCODING_SKIPLIST

                // 先从dict里找value
                // dict包含所有存在的value
                de = dictFind(zs->dict,ele);

                if (de == NULL) {
                    znode = zslInsert(zs->zsl,score,ele);
                        // skiplist的插入逻辑

                        // 找到底层位置
                        // zslRandomLevel随机计算level需要涨到哪一层
                        // 创建节点,插入到整个结构中。


                    // 添加到普通dict
                    dictAdd(zs->dict,ele,&znode->score)
                }
        }




6.9 超时流程#

添加key到db->expires。在某些必要时间点检查这个expires。

EXPIRE key seconds [NX | XX | GT | LT]


// 设置超时
expireCommand
    expireGenericCommand(c,commandTimeSnapshot(),UNIT_SECONDS);

        if (checkAlreadyExpired(when)) {
            // 已经到期直接删除
            dbGenericDelete
        } else {
            setExpire(c,c->db,key,when);
                // db中找key
                kde = dictFind(db->dict,key->ptr);

                // db中专门用expires存设置了超时的key
                de = dictAddOrFind(db->expires,dictGetKey(kde));

                // 找到并设置
                dictSetSignedIntegerVal(de,when);
        }

// 在某些操作节点,比如lookupKey时,会检查这个key的超时状态。如果超时就删除。
lookupKey
    expireIfNeeded
        if (!keyIsExpired(db,key)) return 0;

        deleteExpiredKeyAndPropagate(db,key);
            dbGenericDelete(db,keyobj,server.lazyfree_lazy_expire,DB_FLAG_KEY_EXPIRED);




6.10 AOF和RDB的读取#

https://redis.io/docs/management/persistence/

aof文件包含一个base文件,是一个快照,aof发生重写时生成。
还包含一系列增量文件,所有这些文件组成完整的数据。
这些文件由一个manifest文件管理。

rdb的格式见https://rdb.fnordig.de/file_format.html
可以看到大致就是各种类型和数据依次排列。
相当于把整个redis的数据打包成一个文件。解析时依次读取实际数据然后插入内存中的结构,完成数据的加载。

aof本质是命令的合集,加载时依次读出命令,然后执行即可还原所有数据。

https://www.memurai.com/blog/redis-persistence-deep-dive

main

    // 其他初始化流程

    if (!server.sentinel_mode) {
        serverLog(LL_NOTICE,"Server initialized");
        aofLoadManifestFromDisk();
            server.aof_manifest = aofManifestCreate();

            sds am_name = getAofManifestFileName();
            sds am_filepath = makePath(server.aof_dirname, am_name);

            aofManifest *am = aofLoadManifestFromFile(am_filepath);

        loadDataFromDisk();
            if (server.aof_state == AOF_ON) {
                // aof流程
                int ret = loadAppendOnlyFiles(server.aof_manifest);
                    total_num = getBaseAndIncrAppendOnlyFilesNum(am);
                        // 获取base文件和增量文件数
                        if (am->base_aof_info) num++;
                        if (am->incr_aof_list) num += listLength(am->incr_aof_list);

                    total_size = getBaseAndIncrAppendOnlyFilesSize(am, &status);
                        // 获取所有文件大小

                    startLoading(total_size, RDBFLAGS_AOF_PREAMBLE, 0);
                        // 置为loading状态

                    if (am->base_aof_info) {
                        // base文件

                        ret = loadSingleAppendOnlyFile(aof_name);

                            // 检查文件格式。可能为aof或rdb。
                            if (fread(sig,1,5,fp) != 5 || memcmp(sig,"REDIS",5) != 0) {
                                /* Not in RDB format, seek back at 0 offset. */
                                if (fseek(fp,0,SEEK_SET) == -1) goto readerr;
                            } else {
                                // 如果是rdb格式。aof的base文件就是rdb格式。

                                rio rdb;

                                rdbLoadRio(&rdb,RDBFLAGS_AOF_PREAMBLE,NULL)
                                    int retval = rdbLoadRioWithLoadingCtx(rdb,rdbflags,rsi,&loading_ctx);

                                        char buf[1024];

                                        // 读头部的特殊数据。包含'REDIS',rdb版本等。
                                        if (rioRead(rdb,buf,9) == 0) goto eoferr;

                                        while(1) {

                                            // 读一字节类型
                                            if ((type = rdbLoadType(rdb)) == -1) goto eoferr;

                                            // 然后case各种类型
                                            if (type == RDB_OPCODE_EXPIRETIME) {
                                                // 对于这种类型,就是读4字节的int32。存到expiretime
                                                expiretime = rdbLoadTime(rdb);
                                                    int32_t t32;
                                                    if (rioRead(rdb,&t32,4) == 0) return -1;
                                                    return (time_t)t32;

                                                expiretime *= 1000;
                                                if (rioGetReadError(rdb)) goto eoferr;
                                                continue; /* Read next opcode. */
                                            } else if (type == RDB_OPCODE_EOF) {
                                                // 读到eof,结束。
                                                break;

                                            } // ...各种类型


                                            // 不是特殊类型

                                            // 读key
                                            if ((key = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL)) == NULL)
                                                goto eoferr;

                                            // 读value
                                            val = rdbLoadObject(type,rdb,key,db->id,&error);
                                                // 超长的流程,解析所有类型的数据到内存。

                                            // 检查kv。检查过期等等。

                                            // 添加kv到db
                                            int added = dbAddRDBLoad(db,key,val);
                                        }
                                        // 检查checksum
                            
                            } // 读rdb完成

                            // 读aof
                            // aof本质上是一串实际的命令。那么循环读命令并执行即可。
                            while(1) {
                                // 读文件
                                char buf[AOF_ANNOTATION_LINE_MAX_LEN];
                                fgets(buf,sizeof(buf),fp)

                                // 匹配命令
                                cmd = lookupCommand(argv,argc);

                                // 执行
                                cmd->proc()
                            }

                    } // 读base文件结束

                    // 读所有增量aof文件
                    if (listLength(am->incr_aof_list)) {
                        while ((ln = listNext(&li)) != NULL) {
                            ret = loadSingleAppendOnlyFile(aof_name);
                        }
                    }
            } else {
                // rdb流程

                int rdb_load_ret = rdbLoad(server.rdb_filename, &rsi, rdb_flags);
                    retval = rdbLoadRio(&rdb,rdbflags,rsi);
                        // 上面已经看过

            }

        aofOpenIfNeededOnServerStart();
        aofDelHistoryFiles();
    }

6.11 AOF和RDB的写入#

main中会起定时任务,按配置定时存aof或rdb数据。

main
    initServer
        aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL)
            serverCron
                // 默认每隔1秒存一次aof
                flushAppendOnlyFile

                    // afterCommand中,每个命令都会存到server.aof_buf。
                    // 这里把这个buf写到文件,即完成了aof的保存。
                    nwritten = aofWrite(server.aof_fd,server.aof_buf,sdslen(server.aof_buf));

                // 存rdb流程
                // 如果不存在子进程,且距离上次存rdb至少CONFIG_BGSAVE_RETRY_DELAY=5秒,尝试存rdb。
                if (!hasActiveChildProcess() &&
                    server.rdb_bgsave_scheduled &&
                    (server.unixtime-server.lastbgsave_try > CONFIG_BGSAVE_RETRY_DELAY ||
                     server.lastbgsave_status == C_OK))
                {
                    rdbSaveInfo rsi, *rsiptr;
                    rsiptr = rdbPopulateSaveInfo(&rsi);
                    rdbSaveBackground(SLAVE_REQ_NONE,server.rdb_filename,rsiptr,RDBFLAGS_NONE)
                        if ((childpid = redisFork(CHILD_TYPE_RDB)) == 0) {
                            // fork出新进程来处理rdb

                            retval = rdbSave(req, filename,rsi,rdbflags);
                                rdbSaveInternal
                                    rdbSaveRio
                                        // 本质就是把当前整个内存数据依次dump出来,存到一个文件。加载时反向操作。

                                        for (j = 0; j < server.dbnum; j++) {
                                            if (rdbSaveDb(rdb, j, rdbflags, &key_counter) == -1) goto werr;
                                                // dump各种类型的数据
                                                // 各种层次的save
                                        }

                                    fflush
                                    fsync

                            exitFromChild((retval == C_OK) ? 0 : 1);
                                _exit // 无论如何结束该子进程

                        } else {
                            // 父进程继续跑
                        }
                }


7 实战#

7.1 基本replication#

https://redis.io/docs/management/replication/

感觉redis的复制/集群相关内容,以及各种模式的启动方式,比较混乱。
不如mongodb/mysql之类清晰。

除去sentinalcluster运行方式。可以起最基本的主从复制结构。

对于常规节点,非sentinel。有一个默认的redis.conf
https://download.redis.io/redis-stable/redis.conf
https://download.redis.io/redis-stable/

redis.conf我只改了一个save 3600 1,每小时存盘,其他默认。

# https://hub.docker.com/r/bitnami/redis

# master
    redis:
        image: bitnami/redis:7.2.3

        volumes:
            # 映射数据和配置
            - "/XXX/redis/data:/bitnami/redis/data"
            - "/XXX/redis/config/:/opt/bitnami/redis/etc"

        environment:
            TZ: Asia/Shanghai
            ALLOW_EMPTY_PASSWORD: 'false'
            REDIS_PASSWORD: passsssssss

            REDIS_AOF_ENABLED: 'no' # 要关闭aof的话必须在这设置。confi中改appendonly为no没用。
            
            REDIS_PORT_NUMBER: 6379

            REDIS_REPLICATION_MODE: master
            REDIS_MASTER_HOST: xxx.xxx.xxx.xxx
            REDIS_MASTER_PORT_NUMBER: 6379
            REDIS_MASTER_PASSWORD: passsssssss

            REDIS_REPLICA_IP: 172.16.6.25


# 从
    redis:
        image: bitnami/redis:7.2.3

        volumes:
            # 映射数据和配置
            - "/XXX/redis/data:/bitnami/redis/data"
            - "/XXX/redis/config/:/opt/bitnami/redis/etc/"

        environment:
            TZ: Asia/Shanghai
            ALLOW_EMPTY_PASSWORD: 'false'
            REDIS_PASSWORD: passsssssss

            REDIS_AOF_ENABLED: 'no' # 要关闭aof的话必须在这设置。confi中改appendonly为no没用。
            
            REDIS_PORT_NUMBER: 6379

            REDIS_REPLICATION_MODE: slave
            REDIS_MASTER_HOST: xxx.xxx.xxx.xxx
            REDIS_MASTER_PORT_NUMBER: 6379
            REDIS_MASTER_PASSWORD: passsssssss

            REDIS_REPLICA_IP: 172.16.6.34
            

启动后从节点默认为只读。会自动从主节点同步数据。
如果此时master挂掉,slave还是只读没有变,整个系统不可用。
slave上能看到一些信息,master_link_status=down之类。
重启master,完全自动恢复正常。

replica-serve-stale-data no这个选项貌似可以再master挂掉时,让slave不可读。

关掉从节点,系统正常运行。从master上可看到一些信息变化,例如slave数量变为0。
此时在master上修改一些数据,再重启slave。slave会迅速同步数据,整体完全恢复正常。

aof持久化大概是每个命令发生时都会进行,量大的话可能非常难受。
如果数据不是非常重要,可关掉aof持久化,可节省磁盘io等。
REDIS_AOF_ENABLED: no关掉aof。
save 3600 1设置每小时如果发生数据改变才持久化。
这样最多每小时存一次磁盘。

从节点持久化也是按自己的配置来,并不是master做了一次存盘,slave立马也要存盘。


7.2 sentinal#

https://redis.io/docs/management/sentinel/
https://redis.io/docs/reference/sentinel-clients/

针对sentinel模式有一个默认的sentinel.conf。
https://download.redis.io/redis-stable/sentinel.conf

redis经常会自动修改你的配置,以反映出当前系统实际的配置。
有一个CONFIG REWRITE命令,可以把启动时用户提供的配置改为当前系统的实际配置。
https://redis.io/commands/config-rewrite/

代码中的rewriteConfigrewriteConfigSentinelOption有不少信息。
sentinel模式的初始化在main函数开始部分server.sentinel_mode = checkForSentinelMode(argc,argv, exec_name);

# 官方的最小配置例子

sentinel monitor mymaster 127.0.0.1 6379 2
# sentinel monitor <master-name> <ip> <port> <quorum>
# 监控master-name ip port
# quorum是在当前master失败上,需要达成一致的sentinel数量。
# 3个sentinel可设置为2

sentinel down-after-milliseconds mymaster 60000
# 60秒无响应认为挂了

sentinel failover-timeout mymaster 180000
sentinel parallel-syncs mymaster 1


# 其他master
# 貌似是用于起多套主从。一般不用。
sentinel monitor resque 192.168.1.3 6380 4
sentinel down-after-milliseconds resque 10000
sentinel failover-timeout resque 180000
sentinel parallel-syncs resque 5

可直接裸起看看,拿出里面的默认配置看看。
客户端可以连上。
看到executable/redis-sentinel

# https://hub.docker.com/r/bitnami/redis-sentinel

redis-stn:
        image: bitnami/redis-sentinel:7.2.3

        volumes:
            - "/XXX/redis-sentinel/config:/bitnami/redis-sentinel/conf"

        ports:
            - "26379:26379"

        environment:
            TZ: Asia/Shanghai
            ALLOW_EMPTY_PASSWORD: 'false'
            REDIS_SENTINEL_PASSWORD: passssssss # 使用conf时貌似无效?得写到conf里。
            REDIS_SENTINEL_PORT_NUMBER: 26379
            REDIS_SENTINEL_QUORUM: 2

            #REDIS_MASTER_SET: mymaster
            #REDIS_MASTER_HOST: xxx.xxx.xxx.xxx
            #REDIS_MASTER_PORT_NUMBER: 6379
            #REDIS_MASTER_PASSWORD: passssssss
# sentinel.conf

# 每个sentinel设置自己的ip
sentinel announce-ip "172.16.6.22"
requirepass "passsssssssssss"

# master信息
sentinel monitor xc_redis_master 172.16.6.34 6379 2
sentinel auth-pass xc_redis_master passsssssssss

所有sentinel都只填一个master就行,从master中能自动获取slave的信息。
sentinel之间会相互发现,可以在log里看到。
sentinel上有些命令是无法执行的,貌似也是不能存实际数据的。
sentinel本质上应该是一个官方的中间件,做在了一套代码里,可作为一个模式独立运行。

此时master,slave和3个sentinel都起来了。用gui客户端都能连上。
5个节点分布在db1_ip/db2_ip/db3_ip三个机器。

ip问题#

docker环境下ip可能比较混乱。
默认配置可能有各种问题。

例如slave连上master后,master获取的slave的ip可能有问题。导致从别处再连这个slave的ip连不上。
从master的info命令可以看到slave的ip,可能是一个docker网络中自定义的ip,例如172.18.0.9。 slave的log中有sdown slave 172.18.0.9,诸如此类,导致从其他机器连不上。

在slave的docker-compose配置中用REDIS_REPLICA_IP指定服务器的私网ip,例如172.16.6.34
会决定slave的replica-announce-ip参数。
此后可以看到master的info中,slave的ip为正常的服务器私网ip。
slave的log中可看到fix-slave-config slave 172.16.6.34:6379
此后slave就不会down了。

可能得深入研究docker网络,有更好的解法。

同理对于sentinel,也是各种sdown sentinel连不上。
配置REDIS_SENTINEL_ANNOUNCE_IPsentinel announce-ip后解决。

测试1#


# 此时master正常可读写,slave只读。  

# 用python客户端测试,同样的结果,写slave报`ReadOnlyError`。  

# 起sentinel客户端,可正常读写。  

# redis-py有多种client。详见文档。  

# https://redis-py.readthedocs.io/en/stable/  

import redis

db1_ip = 'xxx'
db2_ip = 'xxx'
db3_ip = 'xxx'

# 普通client
client = redis.Redis(host = db2_ip, port = 6379, db = 1, decode_responses = False, password = '66666')

print(client)
client.ping()

print(client.exists('test'))
print(client.set('wtf', '666666'))

# setinenl client
sentinel = redis.Sentinel([(db1_ip, 26379), (db2_ip, 26379), (db3_ip, 26379)],
                          db = 1, decode_responses = False,
                          password = '66666',
                          sentinel_kwargs = {'password':'66666'})

master = sentinel.discover_master('db2')
print(master)

client = sentinel.master_for('db2', socket_timeout = 3)
print(client)
print(dir(client))
print(client.exists('wtf1'))
print(client.set('wtf1', '666666'))

sentinel的配置中填的是master是内网地址。
sentinel.discover_master目前返回的是内网ip。
从外网无法连上sentinel,待研究。

测试2#

关掉1个sentinel仍然正常,关掉2个sentinel仍然正常。
关掉3个sentinel时,redis.Sentinel失败,报MasterNotFoundError
普通client仍然正常。

只要存在一个sentinel,sentinel客户端就能正常运作。

测试3#

关掉slave。sentinel会显示+sdown slave 172.16.6.25:6379 172.16.6.25 6379 @ xc_redis_master 172.16.6.34 6379

系统仍正常运行。

实际应用中也需要不断检测slave状态,保证slave可用。

打开slave,sentinel检测到slave,恢复正常。

测试4#

关掉master

sentinel会检测到master挂掉,会进行failover。

看sentinel的log。

redis-stn_1  | 1:X 24 Nov 2023 13:06:20.347 # +sdown master xc_redis_master 172.16.6.34 6379
redis-stn_1  | 1:X 24 Nov 2023 13:06:20.446 * Sentinel new configuration saved on disk
redis-stn_1  | 1:X 24 Nov 2023 13:06:20.447 # +new-epoch 1
redis-stn_1  | 1:X 24 Nov 2023 13:06:20.454 * Sentinel new configuration saved on disk
redis-stn_1  | 1:X 24 Nov 2023 13:06:20.454 # +vote-for-leader 006f59bb45c65186bcc4ff16be7de43a2562ecba 1
redis-stn_1  | 1:X 24 Nov 2023 13:06:21.351 # +config-update-from sentinel 006f59bb45c65186bcc4ff16be7de43a2562ecba 172.16.6.25 26379 @ xc_redis_master 172.16.6.34 6379
redis-stn_1  | 1:X 24 Nov 2023 13:06:21.351 # +switch-master xc_redis_master 172.16.6.34 6379 172.16.6.25 6379
redis-stn_1  | 1:X 24 Nov 2023 13:06:21.351 * +slave slave 172.16.6.34:6379 172.16.6.34 6379 @ xc_redis_master 172.16.6.25 6379
redis-stn_1  | 1:X 24 Nov 2023 13:06:21.358 * Sentinel new configuration saved on disk

可看到原slave切换为master。
挂到到切换完成期间,服务不可用,持续1分钟左右,根据配置来。
切换完成后服务可用。

之后如果打开原master,会变成slave。
此时master和slave完全完成了互换,服务正常运行。

客户端方面

# 起循环模拟实际应用

# 当master挂掉时,立即出现ConnectionRefusedError。

# 此时需等待sentinel进行failover。默认需要30秒判断master挂掉。
# down-after-milliseconds参数可设置

# ConnectionRefusedError会相应持续一段时间
# 等failover完成后客户端会自动恢复。不需要手动干预。
# 恢复后可看到discover_master和discover_slaves结果互换。

while True:
    try:
        print(f'while True ----------')

        print(f"exists {client.exists('wtf1')}")
        print(f"get {client.get('wtf1')}")
        print(f"set {client.set('wtf1', random.randint(0, 1000))}")

        print(f"discover_master = {sentinel.discover_master('xc_redis_master')}")
        print(f"discover_slaves = {sentinel.discover_slaves('xc_redis_master')}")

        time.sleep(10)
    except:
        traceback.print_exc()
        time.sleep(10)

来回模拟挂掉master和slave,倒腾failover,符合正常预期。
不用手动处理应用程序,等切换完成后client = sentinel.master_for这个client会自动连到新的master。做好异常处理就行。

测试5#

上面测试时保证master和slave有一个没挂。
如果master和slave都挂掉。有一些问题。

首先两个redis节点初始配置,一个为master一个为slave。
测试时配置保持不变。
当只有一个节点挂掉时,failover完成后会自动调整master,不受配置影响。
也就是节点配置为master,实际可能是slave。

当两个节点都挂掉,如果挂之前的实际角色和配置不一致,这时起配置为slave的节点是起不来的。会报imeout reached before the port went into state "inuse"
此时起配置为master的节点可以起来,但是会消耗更长一点时间。 等master起来后,之前起不来的slave可以起来。然后正常运行。

期间应用程序同样不需要手动干预,等待一段时间后会自动恢复。

此情况的数据一致问题没研究。估计极端情况有问题。
但总体还行。如果全挂,是重大灾难。也只需重启master然后slave就能开始服务,不需要折腾数据。
对于非重要数据,可以接受。
重要的数据就应该进有一致性保障的数据库。
或者用redis的wait机制保证数据写入了master和slave。


7.3 cluster#

https://redis.io/docs/management/scaling/
https://redis.io/docs/reference/cluster-spec/

待研究

貌似会强行sharding,而且也需要一堆replication节点。
不是像mysql或mongodb简单起一个replication集群就行。