Skip to content

yuz10/lua-resty-rocketmq

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Name

lua-resty-rocketmq - Lua rocketmq client driver for the ngx_lua based on the cosocket API

Test

Table of Contents

Status

Production ready.

Description

This Lua library is a RocketMQ client driver for the ngx_lua nginx module:

This Lua library takes advantage of ngx_lua's cosocket API, which ensures 100% nonblocking behavior.

Quick start

Install OpenResty

for ubuntu:

wget -O - https://openresty.org/package/pubkey.gpg | sudo apt-key add -
echo "deb http://openresty.org/package/ubuntu $(lsb_release -sc) main" \
    | sudo tee /etc/apt/sources.list.d/openresty.list
sudo apt-get update
sudo apt-get -y install openresty

see https://openresty.org/cn/linux-packages.html for more distributions

Install and start RocketMQ

wget https://archive.apache.org/dist/rocketmq/5.3.0/rocketmq-all-5.3.0-bin-release.zip
unzip rocketmq-all-5.3.0-bin-release.zip
cd rocketmq-5.3.0
nohup bash bin/mqnamesrv &
nohup bash bin/mqbroker -n localhost:9876 -c conf/broker.conf &

Run examples of this project

cd examples
chmod +x producer.lua
./producer.lua

Synopsis

    lua_package_path "/path/to/lua-resty-rocketmq/lib/?.lua;;";

    server {
        location /test {
            content_by_lua_block {
                local cjson = require "cjson"
                local producer = require "resty.rocketmq.producer"
                local consumer = require "resty.rocketmq.consumer"

                local nameservers = { "127.0.0.1:9876" }

                local message = "halo world"

                local p = producer.new(nameservers, "produce_group")
                
                -- set acl
                local aclHook = require("resty.rocketmq.acl_rpchook").new("RocketMQ","123456781")
                p:addRPCHook(aclHook)
                
                -- use tls mode
                p:setUseTLS(true)
                
                local res, err = p:send("TopicTest", message)
                if not res then
                    ngx.say("send err:", err)
                    return
                end
                ngx.say("send success")
                
                -- consume
                local c = consumer.new(nameservers, "group1")
                c:subscribe("TopicTest", "*")
                c:registerMessageListener({
                    consumeMessage = function(self, msgs, context)
                        ngx.say("consume success:", cjson.encode(msgs))
                        return consumer.CONSUME_SUCCESS
                    end
                })
                
                c:start()
                ngx.sleep(5)
                c:stop()
            }
        }
    }

Back to TOC

Modules

resty.rocketmq.producer

To load this module, just do this

    local producer = require "resty.rocketmq.producer"

Methods

new

syntax: p = producer.new(nameservers, produce_group, enableMsgTrace, customizedTraceTopic)

nameservers is list of nameserver addresses

addRPCHook

syntax: p:addRPCHook(hook)

hook is a table that contains two functions as follows:

  • doBeforeRequest(self, addr, header, body)
  • doAfterResponse(self, addr, header, body, respHeader, respBody)

there is an acl hook provided, usage is:

    local accessKey, secretKey = "RocketMQ", "12345678"
    local aclHook = require("resty.rocketmq.acl_rpchook").new(accessKey, secretKey)
    p:addRPCHook(aclHook)

setUseTLS

syntax: p:setUseTLS(useTLS)

useTLS is a boolean

setTimeout

syntax: p:setTimeout(timeout)

timeout is in milliseconds, default 3000

registerSendMessageHook

syntax: p:registerSendMessageHook(hook)

hook is a table that contains two functions as follows:

  • sendMessageBefore(self, context)
  • sendMessageAfter(self, context)

context is a table that contains:

  • producer
  • producerGroup
  • communicationMode
  • bornHost
  • brokerAddr
  • message
  • mq
  • msgType
  • sendResult
  • exception

registerEndTransactionHook

syntax: p:registerEndTransactionHook(hook)

hook is a table that contains a function as follows:

  • endTransaction(self, context)

context is a table that contains:

  • producerGroup
  • brokerAddr
  • message
  • msgId
  • transactionId
  • transactionState
  • fromTransactionCheck

send

syntax: res, err = p:send(topic, message, tags, keys, properties)

properties is a table that contains:

  • WAIT
  • DELAY

In case of success, returns the a table of results. In case of errors, returns nil with a string describing the error.

setTransactionListener

syntax: res, err = p:setTransactionListener(transactionListener)

transactionListener is a table that contains two functions as follows:

  • executeLocalTransaction(self, msg, arg)
  • checkLocalTransaction(self, msg)

sendMessageInTransaction

syntax: res, err = p:sendMessageInTransaction(topic, arg, message, tags, keys, properties)

batchSend

syntax: res, err = p:batchSend(msgs)

msgs is a list of msgs, each msg is a table that contains:

  • topic
  • body
  • tags
  • keys
  • properties

start

syntax: p:start()

note that if you don't call p:start() before sending messages, messages will be sent successfully, but the trace is not send.

stop

syntax: p:stop()

Back to TOC

resty.rocketmq.consumer

To load this module, just do this

    local consumer = require "resty.rocketmq.consumer"

Methods

new

syntax: c = consumer.new(nameservers, consumerGroup)

nameservers is list of nameserver addresses

addRPCHook

syntax: c:addRPCHook(hook)

hook is a table that contains two functions as follows:

  • doBeforeRequest(self, addr, header, body)
  • doAfterResponse(self, addr, header, body, respHeader, respBody)

there is an acl hook provided, usage is:

    local accessKey, secretKey = "RocketMQ", "12345678"
    local aclHook = require("resty.rocketmq.acl_rpchook").new(accessKey, secretKey)
    c:addRPCHook(aclHook)

setUseTLS

syntax: c:setUseTLS(useTLS)

useTLS is a boolean

setTimeout

syntax: c:setTimeout(timeout)

timeout is in milliseconds, default 3000

registerMessageListener

syntax: c:registerMessageListener(messageListener)

messageListener is a table that contains a function as follows:

  • consumeMessage(self, msgs, context)

registerConsumeMessageHook

syntax: c:registerConsumeMessageHook(hook)

hook is a table that contains two functions as follows:

  • consumeMessageBefore(self, context)
  • consumeMessageAfter(self, context)

context is a table that contains:

  • consumerGroup
  • mq
  • msgList
  • success
  • status
  • consumeContextType

subscribe

syntax: c:subscribe(topic, subExpression)

start

syntax: c:start()

stop

syntax: c:stop()

setAllocateMessageQueueStrategy

syntax: c:setAllocateMessageQueueStrategy(strategy)

strategy is a functions as follows:

  • function(consumerGroup, currentCID, mqAll, cidAll)

default value is consumer.AllocateMessageQueueAveragely

getAllocateMessageQueueStrategy

syntax: local strategy = c:getAllocateMessageQueueStrategy()

setEnableMsgTrace

syntax: c:setEnableMsgTrace(enableMsgTrace)

isEnableMsgTrace

syntax: local enableMsgTrace = c:isEnableMsgTrace()

setCustomizedTraceTopic

syntax: c:setCustomizedTraceTopic(customizedTraceTopic)

getCustomizedTraceTopic

syntax: local customizedTraceTopic = c:getCustomizedTraceTopic()

setConsumeFromWhere

syntax: c:setConsumeFromWhere(consumeFromWhere)

getConsumeFromWhere

syntax: local consumeFromWhere = c:getConsumeFromWhere()

setConsumeTimestamp

syntax: c:setConsumeTimestamp(consumeTimestamp)

getConsumeTimestamp

syntax: local consumeTimestamp = c:getConsumeTimestamp()

setPullThresholdForQueue

syntax: c:setPullThresholdForQueue(pullThresholdForQueue)

getPullThresholdForQueue

syntax: local pullThresholdForQueue = c:getPullThresholdForQueue()

setPullThresholdSizeForQueue

syntax: c:setPullThresholdSizeForQueue(pullThresholdSizeForQueue)

getPullThresholdSizeForQueue

syntax: local pullThresholdSizeForQueue = c:getPullThresholdSizeForQueue()

setPullTimeDelayMillsWhenException

syntax: c:setPullTimeDelayMillsWhenException(pullTimeDelayMillsWhenException)

getPullTimeDelayMillsWhenException

syntax: local pullTimeDelayMillsWhenException = c:getPullTimeDelayMillsWhenException()

setPullBatchSize

syntax: c:setPullBatchSize(pullBatchSize)

getPullBatchSize

syntax: local pullBatchSize = c:getPullBatchSize()

setPullInterval

syntax: c:setPullInterval(pullInterval)

getPullInterval

syntax: local pullInterval = c:getPullInterval()

setConsumeMessageBatchMaxSize

syntax: c:setConsumeMessageBatchMaxSize(consumeMessageBatchMaxSize)

getConsumeMessageBatchMaxSize

syntax: local consumeMessageBatchMaxSize = c:getConsumeMessageBatchMaxSize()

setMaxReconsumeTimes

syntax: c:setMaxReconsumeTimes(maxReconsumeTimes)

getMaxReconsumeTimes

syntax: local maxReconsumeTimes = c:getMaxReconsumeTimes()

setClientRebalance

syntax: c:setClientRebalance(clientRebalance)

isClientRebalance

syntax: local clientRebalance = c:isClientRebalance()

setPopThresholdForQueue

syntax: c:setPopThresholdForQueue(popThresholdForQueue)

getPopThresholdForQueue

syntax: local popThresholdForQueue = c:getPopThresholdForQueue()

setPopInvisibleTime

syntax: c:setPopInvisibleTime(popInvisibleTime)

getPopInvisibleTime

syntax: local popInvisibleTime = c:getPopInvisibleTime()

setPopBatchNums

syntax: c:setPopBatchNums(popBatchNums)

getPopBatchNums

syntax: local popBatchNums = c:getPopBatchNums()

Back to TOC

resty.rocketmq.admin

To load this module, just do this

    local admin = require "resty.rocketmq.admin"

Methods

new

syntax: adm = admin.new(nameservers)

nameservers is list of nameserver addresses

addRPCHook

syntax: adm:addRPCHook(hook)

hook is a table that contains two functions as follows:

  • doBeforeRequest(self, addr, header, body)

  • doAfterResponse(self, addr, header, body, respHeader, respBody)

there is an acl hook provided, usage is:

    local accessKey, secretKey = "RocketMQ", "12345678"
    local aclHook = require("resty.rocketmq.acl_rpchook").new(accessKey, secretKey)
    adm:addRPCHook(aclHook)

setUseTLS

syntax: adm:setUseTLS(useTLS)

useTLS is a boolean

setTimeout

syntax: adm:setTimeout(timeout)

timeout is in milliseconds, default 3000

createTopic

syntax: res, err = adm:createTopic(newTopic, queueNum, topicSysFlag)

  • newTopic: the new topic name
  • queueNum: read and write queue numbers
  • topicSysFlag: system flag of the topic

createTopicForBroker

syntax: res, err = adm:createTopicForBroker(addr, topicConfig)

  • addr: broker address
  • topicConfig: a table containing:
    • topic
    • readQueueNums
    • writeQueueNums
    • perm
    • topicFilterType
    • topicSysFlag
    • order

searchOffset

syntax: res, err = adm:searchOffset(mq, timestamp)

  • mq: a table containing:
    • brokerName
    • topic
    • queueId
  • timestamp: search time

maxOffset

syntax: res, err = adm:maxOffset(mq)

  • mq: a table containing:
    • brokerName
    • topic
    • queueId

minOffset

syntax: res, err = adm:minOffset(mq)

  • mq: a table containing:
    • brokerName
    • topic
    • queueId

earliestMsgStoreTime

syntax: res, err = adm:earliestMsgStoreTime(mq)

  • mq: a table containing:
    • brokerName
    • topic
    • queueId

viewMessage

syntax: res, err = adm:viewMessage(offsetMsgId)

queryMessage

syntax: res, err = adm:queryMessage(topic, key, maxNum, beginTime, endTime, isUniqKey)

queryTraceByMsgId

syntax: res, err = adm:queryTraceByMsgId(traceTopic, msgId)

Back to TOC

HTTP proxy

quick start

HTTP proxy provides http API of produce/consume messages. RocketMQ >= 5.0.0 is required

Install and start RocketMQ

wget https://archive.apache.org/dist/rocketmq/5.3.0/rocketmq-all-5.3.0-bin-release.zip
unzip rocketmq-all-5.3.0-bin-release.zip
cd rocketmq-5.3.0
nohup bash bin/mqnamesrv &
nohup bash bin/mqbroker -n localhost:9876 -c conf/broker.conf &

Start proxy

cd examples/
openresty -c server/http_proxy.conf -p .

Send message using http proxy

curl localhost:8080/topics/topic1/messages -d '{"properties": {"a": "3", "KEYS": "key", "TAGS": "tag"}, "body": "hello proxy"}'

Consume message using http proxy

curl 'localhost:8080/topics/topic1/messages?consumer=group1&numOfMessages=16&waitseconds=10'

curl localhost:8080/topics/topic1/messages/ack?consumer=group1 -XPUT -d '{"receiptHandles":["32312031363736303232303830303335203630303030203020302062726F6B65722D302030203331"]}'

Back to TOC

SQS proxy

quick start

SQS proxy provides http API compatible with AWS SQS (https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_SendMessage.html)

Install and start RocketMQ

Start SQS proxy

cd examples/
openresty -c server/sqs.conf -p .

Set up AWS SDK project

https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/get-started.html

Send message using AWS SQS java SDK

        SqsClient sqsClient = SqsClient.builder()
                .httpClientBuilder(ApacheHttpClient.builder())
                .endpointOverride(URI.create("http://localhost:8088"))
                .build();
        SendMessageRequest req = SendMessageRequest.builder()
                .queueUrl("TopicTest")
                .messageBody("body")
                .build();
        SendMessageResponse resp = sqsClient.sendMessage(req);

Consume message using AWS SQS java SDK

        SqsClient sqsClient = SqsClient.builder()
                .httpClientBuilder(ApacheHttpClient.builder())
                .endpointOverride(URI.create("http://localhost:8088"))
                .build();
        while (true) {
            ReceiveMessageRequest req = ReceiveMessageRequest.builder()
                    .queueUrl("TopicTest")
                    .visibilityTimeout(60)
                    .maxNumberOfMessages(1)
                    .waitTimeSeconds(5)
                    .build();
            ReceiveMessageResponse resp = sqsClient.receiveMessage(req);
            System.out.printf("%s\n", resp);

            for (Message message : resp.messages()) {
                DeleteMessageRequest req2 = DeleteMessageRequest.builder()
                        .queueUrl("TopicTest")
                        .receiptHandle(message.receiptHandle())
                        .build();
                sqsClient.deleteMessage(req2);
            }
        }

Back to TOC

Installation

You need to configure the lua_package_path directive to add the path of your lua-resty-rocketmq source tree to ngx_lua's LUA_PATH search path, as in

    # nginx.conf
    http {
        lua_package_path "/path/to/lua-resty-rocketmq/lib/?.lua;;";
        ...
    }

Ensure that the system account running your Nginx ''worker'' proceses have enough permission to read the .lua file.

Back to TOC

See Also

Back to TOC

About

No description, website, or topics provided.

Resources

License

Stars

Watchers

Forks

Packages

No packages published

Contributors 2

  •  
  •