资料库的webrtc文件传输

时间:2022-12-05 07:34:20

一、一个看似简单的事情往往不简单

一个简单的事情往往会倾注你的心血,也许你看到很简单往往其实没那么简单;其实想想今年业余时间的大把代码,真正能成品的好像并不多。

马上年底了,写下这篇文章。每一行程序就像写小说一样,不管好不好;代码都倾注了我的心血。

真正把一个东西做好;做到好用,用的人多,具有很好的利他性,并不是一个容易的事情,需要不断的量变。

webrtc演示网址:

文件下载测试

二、为什么放弃百度网盘

把数据存储到百度网盘上大该有半年时间,后来发现了一些致命的问题

1、发现分享外链到了10几万条之后就不能分享了

2、分享的百度外链经常会被屏蔽掉

3、分享速度不能过快,1小时只能分享100多条外链,如果分享的过快整个网盘的分享功能都会

4、百度账号需要实名认证,很容易被人认出来,作为一个技术人员我认为应该藏在幕后,尽量不要让别人认出来,这回避免很多不必要的问题

三、为什么不用云存储

4个T的存储,每月就要400元,一年就是5600元,而且不算流量费用一年就是5600,这个费用是不高的,最关键的是使用了云存储很

1、可能被黑客共计,如果别人使用curl去恶意刷流量现有的安全策略很可能被人把费用都刷去

2、每年存储费用是递增的

3、需要实名认证,一旦绑定域名,绑定着手机号,很可能被人找到

四、我的WebRtc架构设计

拜托传统的中继服务器,使用p2p打洞,调研了一些方案,本来打算使用 coturn + janus 的架构设计,但是发现janus c 写的,没有文件传输的插件,如果自己再用c去做一个插件真的是很累,于是就打算用golang 实现一个文件传输的网关。

资料库的webrtc文件传输

大体步骤

1、启动网关

2、存储节点 连接到网关

3、用户浏览器请求网关,请求和存储节点交换信令

4、信令交换完成,使用dataChannel进行通信

五、网关设计

网关主要是一个websocket server 采用golang编写

网关设计主要分成了几大模块

1、room

room是一个公共领域,主要是做client和node的鉴权操作,如果鉴权成功,那么会进入Manager进行调度

2、抽象client和node

client和node 有一个共性是他们都具有连接属性,所以应该设计一个公共接口,把他们的共性抽象出来

type Lifecycle interface {
	Stop()
	Start()
}

type Describable interface {
	Name() string
	Type() ConnType
}


type Component interface {
	Lifecycle
	Describable
	AddCallback(ctx EventCallback)
	GetConnNumber() uint64
	SetContext(interface{})
	GetContext() interface{}
	Product(ctx Context)
}

通过添加一个AddCallback回调函数,确保不同模块的业务处理完全玻璃开,node的逻辑就在node里处理,client的逻辑只在client里处理,不能把不同模块的代码交叉处理,至于上下问的传输,统一抽象一个Context,里面抽象存储着我们需要的上下文信息,供给不同的回调函数以及协成之间传输使用。

type Context interface {
	GetData() []byte
	Error() error
	MessageType() int
	GetContext() interface{}
	SetContext(interface{})
	Name() string
}

type EventCallback interface {
	OnReceive(Context, *sync.WaitGroup)
	OnError(Context, *sync.WaitGroup)
	OnClose(Context, *sync.WaitGroup)
	OnWriteComplete(Context, *sync.WaitGroup)
}

type NodeCallback interface {
	OnReceive(Context, *sync.WaitGroup)
	OnError(Context, *sync.WaitGroup)
	OnClose(Context, *sync.WaitGroup)
	OnWriteComplete(Context, *sync.WaitGroup)
}

4、manager

负责整体调度,比如client进入manager后,查找当前可以用的node存储节点,找到后进行信令交换

节点调度,遍历查找拥有client最少的node节点,然后进行通信

// 选择最优线路
func (m *Manager) selectNode() *node.NodeClient {
	if len(m.nodeTree) == 0 {
		return nil
	}

	// 找一个挂载链接最少的节点
	var usableNode *node.NodeClient
	var weight uint64
	for _, conn := range m.nodeTree {
		if uint64(conn.GetConnNumber()) <= weight {
			usableNode = conn.(*node.NodeClient)
		}
	}
	return usableNode
}

六、node节点收到信令后进行应答

1、创建RTCConnection

blockSize := 16384
	//前端页面会对sdp进行base64的encode
	b, err := base64.StdEncoding.DecodeString(sdp)
	if err != nil {
		log.Error("error:%s", err)
		return nil
	}

	str, err := url.QueryUnescape(string(b))
	if err != nil {
		log.Error("error:%s", err)
		return nil
	}

	sdpDes := webrtc.SessionDescription{}
	fmt.Println(str)
	err = json.Unmarshal([]byte(str), &sdpDes)
	if err != nil {
		log.Error("json.Unmarshal err:%s", err)
		return nil
	}

	//创建pc, 并且指定stun服务器
	pc, err := webrtc.NewPeerConnection(webrtc.Configuration{
		ICEServers: []webrtc.ICEServer{
			{
				URLs: []string{"stun:"},
			},
		},
	})

	stat, err := os.Stat("/home/zhanglei/Downloads/《实践论》(原文)*.pdf")
	if err != nil {
		log.Error("os.Stat %s error:%s", path, err)
		return nil
	}

	if offset > stat.Size() {
		log.Error("offset(%d) > stat.Size(%d)", offset, stat.Size())
		return nil
	}

	chunkSize := int(math.Ceil(float64(stat.Size() / int64(blockSize))))
	currentChunkSize := int(math.Ceil(float64(offset / int64(blockSize))))

	if err != nil {
		log.Error("%s", err)
		return nil
	}

	pc.OnConnectionStateChange(func(s webrtc.PeerConnectionState) {
		fmt.Println("OnConnectionStateChange")
		fmt.Printf("Peer Connection State has changed: %s (answerer)\n", s.String())
		if s == webrtc.PeerConnectionStateFailed {
			fmt.Println("webrtc.PeerConnectionStateFailed")
		}
	})

	// Register data channel creation handling
	pc.OnDataChannel(func(d *webrtc.DataChannel) {
		fmt.Printf("New DataChannel %s %d\n", d.Label(), d.ID())

		// Register channel opening handling
		d.OnOpen(func() {
			fmt.Printf("Data channel '%s'-'%d' open. Random messages will now be sent to any connected DataChannels every 5 seconds\n", d.Label(), d.ID())

			stat, err := os.Stat("/home/zhanglei/Downloads/《实践论》(原文)*.pdf")
			chunkSize = int(math.Ceil(float64(stat.Size() / int64(blockSize))))

			// 握手
			var chunk ChunkMessage
			chunk.Class = HANDSHAKE
			chunk.ChunkSize = uint64(chunkSize)
			handShakeBytes := serialize(&chunk)

			err = d.Send(handShakeBytes.Bytes())
			if err != nil {
				log.Error("%s", err)
				return
			}
		})

		// Register text message handling
		d.OnMessage(func(msg webrtc.DataChannelMessage) {
			fmt.Printf("Message from DataChannel '%s': '%s'\n", d.Label(), string(msg.Data))
			data, err := unSerialize(msg.Data)
			if err != nil {
				log.Error("os.Open is : %s", err)
				return
			}

			if data.Class == ACK {
				handle, err := os.Open("/home/zhanglei/Downloads/《实践论》(原文)*.pdf")
				if err != nil {
					log.Error("os.Open is : %s", err)
					return
				}

				defer handle.Close()

				bufferBytes := make([]byte, blockSize)
				read, err := handle.Read(bufferBytes)
				if err != nil {
					log.Error("handle.Read is : %s", err)
					return
				}

				if read < blockSize {
					bufferBytes = bufferBytes[:read]
				}

				var chunk ChunkMessage
				chunk.Class = SEND
				chunk.ChunkSize = uint64(chunkSize)
				chunk.CurrentChunk = uint64(currentChunkSize)
				chunk.Data = bufferBytes

				// 打包发送
				err = d.Send(serialize(&chunk).Bytes())
				if err != nil {
					log.Error("%s", err)
					return
				}
				return
			}

			if data.Class == RECEIVE {
				handle, err := os.Open("/home/zhanglei/Downloads/《实践论》(原文)*.pdf")
				if err != nil {
					log.Error("os.Open is : %s", err)
					return
				}

				if data.CurrentChunk == uint64(chunkSize) {
					log.Info("data transfer finish")
					return
				}

				nextChunk := data.CurrentChunk + 1

				bytes := make([]byte, blockSize)
				read, err := handle.ReadAt(bytes, int64(nextChunk)*int64(blockSize))

				if err != nil {
					if !errors.Is(err, io.EOF) {
						log.Error("handle.Read is : %s", err)
						return
					}

				}

				if read < blockSize {
					bytes = bytes[:read]
				}

				var sendData ChunkMessage
				sendData.Class = SEND
				sendData.CurrentChunk = nextChunk
				sendData.Data = bytes
				sendData.ChunkSize = uint64(chunkSize)
				sendDataBytes := serialize(&sendData)
				log.Info(" read %d", nextChunk)

				err = d.Send(sendDataBytes.Bytes())
				if err != nil {
					log.Error("%s", err)
					return
				}

				// 最后一块
				if nextChunk == uint64(chunkSize) {
					d.Close()
					pc.Close()
				}
				return
			}

		})
	})

	_, err = pc.CreateDataChannel("sendDataChannel", nil)
	if err != nil {
		log.Error("error:%s", err)
		return nil
	}

	//channel.OnOpen(func() {
	//
	//})

	//设置远端的sdp
	if err = pc.SetRemoteDescription(sdpDes); err != nil {
		log.Error("error:%s", err)
		return nil
	}

	//创建协商结果
	answer, err := pc.CreateAnswer(nil)
	if err != nil {
		log.Error("error:%s", err)
		return nil
	}

	pc.OnICECandidate(func(i *webrtc.ICECandidate) {
		fmt.Println("OnICECandidate")
		fmt.Println(i)

	})

	err = pc.SetLocalDescription(answer)
	if err != nil {
		log.Error("error:%s", err)
		return nil
	}

	//等待ice结束
	gatherCmp := webrtc.GatheringCompletePromise(pc)
	<-gatherCmp

	//将协商并且收集完candidate的answer,输出到控制台
	answerBytes, err := json.Marshal(*pc.LocalDescription())
	if err != nil {
		log.Error("error:%s", err)
		return nil
	}

	pc.OnICECandidate(func(candidate *webrtc.ICECandidate) {
		fmt.Println("OnICECandidate")
		fmt.Println(candidate)
	})

	t := &Transfer{
		sdp:              sdp,
		pc:               pc,
		offset:           uint64(offset),
		currentChunkSize: currentChunkSize,
		chunkSize:        chunkSize,
		answerSdp:        answerBytes,
		blockSize:        blockSize,
	}
	return t

2、打包传输

传输没有采用protobuf,自己写了个二进制传输

func serialize(data *ChunkMessage) *bytes.Buffer {
	data.Version = CodeVersion
	writeBuffer := bytes.NewBuffer(nil)
	writeBuffer.Write([]byte{data.Version})
	writeBuffer.Write([]byte{data.Class})

	// ChunkSize
	binary.Write(writeBuffer, binary.BigEndian, data.ChunkSize)

	// CurrentChunk
	binary.Write(writeBuffer, binary.BigEndian, data.CurrentChunk)

	// DataLen
	data.DataLen = uint64(len(data.Data))
	binary.Write(writeBuffer, binary.BigEndian, data.DataLen)

	if len(data.Data) > 0 {
		// 添加body
		writeBuffer.Write(data.Data)
	}

	return writeBuffer
}

//判断我们系统中的字节序类型
func systemEdian() binary.ByteOrder {
	var i int = 0x1
	bs := (*[int(unsafe.Sizeof(0))]byte)(unsafe.Pointer(&i))
	if bs[0] == 0 {
		return binary.LittleEndian
	} else {
		return binary.BigEndian
	}
}

func unSerialize(data []byte) (*ChunkMessage, error) {
	buf := bytes.NewBuffer(data)
	fmt.Println(buf)
	var chunk ChunkMessage
	binary.Read(buf, systemEdian(), &chunk.Version)
	binary.Read(buf, systemEdian(), &chunk.Class)
	binary.Read(buf, systemEdian(), &chunk.ChunkSize)
	binary.Read(buf, systemEdian(), &chunk.CurrentChunk)
	binary.Read(buf, systemEdian(), &chunk.DataLen)
	//chunkSize := uint64(unsafe.Pointer(&buf.Bytes()))
	//chunk.ChunkSize = chunkSize
	return &chunk, nil
}

7、js前端webrtc提供报价

创建webrtc连接

var pcConfig = {
            'iceServers': [{
                'urls': 'stun:',
            }]
        };
        localConnection = new RTCPeerConnection(pcConfig);

        receiveDataChannel = localConnection.createDataChannel("receiveDataChannel")

        receiveDataChannel.binaryType = "arraybuffer"

        receiveDataChannel.addEventListener('open', dataChannel.onopen);
        receiveDataChannel.addEventListener('close', dataChannel.onclose);
        receiveDataChannel.addEventListener('message', dataChannel.onmessage);
        receiveDataChannel.addEventListener('error', dataChannel.onError);

        try {
            this.offer = await localConnection.createOffer();
        } catch (e) {
            console.log('Failed to create session description: ', e);
            return
        }

        try {
            await localConnection.setLocalDescription(this.offer)
        } catch (e) {
            console.log('Failed to create session description: ', e);
            return
        }

        //eyJ1c2VyX3V1aWQiOiI1ZmZkNDE0N2JkMTMyNWNmMjYwNDAyMWYwODA5OWUyMyIsImxvZ2luX3RpbWUiOjE2Njg0NzgzOTEsIm5vd190aW1lIjoxNjY5OTgzNzkwLCJyYW5fc3RyIjoiZDA3MTczNzI3NjFjMzY0MGU2NmRlYWI5YmYyODZhNzYiLCJzaWduIjoiZjc3NzI0YjZmMTc3MzczNmVhZWFkMTM2NzllNTE0NTcifQ==

        transfer.ws.send((JSON.stringify(downloadRequest)));

前端对收到的数据进行序列化和反序列化

function serialize(data) {
    var bufLen = protoColMinSize;
    if (!data.Data) {
        bufLen += 0;
    } else {
        bufLen += data.Data.length;
    }
    data.Version = 1;
    var protocolBuf = new ArrayBuffer(bufLen);
    const bufView = new DataView(protocolBuf);
    bufView.setUint8(0, data.Version);

    bufView.setUint8(1, data.Class);

    if (!data.ChunkSize) {
        data.ChunkSize = 0
    }
    bufView.setBigUint64(2, BigInt(data.ChunkSize));


    if (!data.CurrentChunk) {
        data.CurrentChunk = 0
    }
    bufView.setBigUint64(10, BigInt(data.CurrentChunk));

    if (data.Data && data.Data.length > 0) {
        bufView.setBigUint64(18, BigInt(data.Data.length));
    } else {
        bufView.setBigUint64(18, BigInt(0));
    }

    console.log(protocolBuf)
    return protocolBuf;
}

function unSerialize(bytes) {

    var versionView = new DataView(bytes).getUint8(0);
    // 最小长度
    var classByteView = new DataView(bytes).getUint8(1);
    // chunk 长度
    var chunkSizeView = parseInt(new DataView(bytes).getBigUint64(2));
    var currentChunkView = parseInt(new DataView(bytes).getBigUint64(10));
    var bodyLenView = parseInt(new DataView(bytes).getBigUint64(18));
    var returnData = {
        Version: versionView,
        Class: classByteView,
        ChunkSize: (chunkSizeView),
        CurrentChunk: currentChunkView,
        PayloadLength: bodyLenView,
        Payload: [],
    };

    if (bodyLenView > 0) {
        returnData.Payload = new Uint8Array(bytes, protoColMinSize, bodyLenView)
    }

    return returnData;
}

八、webrtc文件传输的优缺

1)优点

1、点对点传输,不经过中继服务器

2、民用带宽比较便宜,最差的情况下是带宽打满,不会出现很高的流量费用

3、可以自建存储,存储节点可以使用群辉,可以买自己服务器

4、不需要固定的ip地址

2)缺点

1、宽带线民用的不知道能申请几根

2、存储维护硬件也是一个麻烦的事情,硬盘很可能出现故障,运维也是一个头痛的事情