登峰小蚁 2020-05-11
SparkSession spark = SparkSession
.Builder()
.AppName("Apache User Log Processing")
.GetOrCreate();DataFrame generalDf = spark.Read().Text("<path to input data set>");string s_apacheRx = "^(\S+) (\S+) (\S+) [([\w:/]+\s[+-]\d{4})] \"(\S+) (\S+) (\S+)\" (\d{3}) (\d+)";我们如何对DataFrame的每一行执行计算,比如将每个日志条目与上面的s_apacheRx进行匹配?答案是Spark SQL。
spark.Udf().Register<string, bool>("GeneralReg", log => Regex.IsMatch(log, s_apacheRx));DataFrame generalDf = spark.Sql("SELECT logs.value, GeneralReg(logs.value) FROM Logs");generalDf = generalDf.Filter(generalDf["GeneralReg(value)"]); generalDf.Show();
// Choose valid log entries that start with 10
spark.Udf().Register<string, bool>(
"IPReg",
log => Regex.IsMatch(log, "^(?=10)"));
generalDf.CreateOrReplaceTempView("IPLogs");
// Apply UDF to get valid log entries starting with 10
DataFrame ipDf = spark.Sql(
"SELECT iplogs.value FROM IPLogs WHERE IPReg(iplogs.value)");
ipDf.Show();
// Choose valid log entries that start with 10 and deal with spam
spark.Udf().Register<string, bool>(
"SpamRegEx",
log => Regex.IsMatch(log, "\\b(?=spam)\\b"));
ipDf.CreateOrReplaceTempView("SpamLogs");
// Apply UDF to get valid, start with 10, spam entries
DataFrame spamDF = spark.Sql(
"SELECT spamlogs.value FROM SpamLogs WHERE SpamRegEx(spamlogs.value)");int numGetRequests = spamDF
.Collect()
.Where(r => ContainsGet(r.GetAs<string>("value")))
.Count();// Use regex matching to group data
// Each group matches a column in our log schema
// i.e. first group = first column = IP
public static bool ContainsGet(string logLine)
{
Match match = Regex.Match(logLine, s_apacheRx);
// Determine if valid log entry is a GET request
if (match.Success)
{
Console.WriteLine("Full log entry: ‘{0}‘", match.Groups[0].Value);
// 5th column/group in schema is "method"
if (match.Groups[5].Value == "GET")
{
return true;
}
}
return false;
}spark-submit --class org.apache.spark.deploy.dotnet.DotnetRunner --master local /path/to/microsoft-spark-<version>.jar dotnet /path/to/netcoreapp<version>/LoggingApp.dll
var mySqlQuery = "SELECT * FROM table WHERE id = " + std_name;
每次执行这一条查询的时候返回的结果都可能会不一样,这取决于std_name的值。